From 3b3bd6d83d6620e510b3a46658478026f7b8d859 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Mon, 13 Jun 2022 21:30:02 +0200 Subject: [PATCH] Backend ready for some testing --- lasp/device/lasp_daq.cpp | 11 -- lasp/device/lasp_daq.h | 50 ++----- lasp/device/lasp_daqdata.cpp | 31 +++++ lasp/device/lasp_daqdata.h | 88 +++++++++++++ lasp/device/lasp_rtaudiodaq.cpp | 227 +++++++++++++++----------------- lasp/device/lasp_uldaq.cpp | 118 +++++++++-------- 6 files changed, 297 insertions(+), 228 deletions(-) create mode 100644 lasp/device/lasp_daqdata.cpp create mode 100644 lasp/device/lasp_daqdata.h diff --git a/lasp/device/lasp_daq.cpp b/lasp/device/lasp_daq.cpp index 4acd372..cc2b54c 100644 --- a/lasp/device/lasp_daq.cpp +++ b/lasp/device/lasp_daq.cpp @@ -11,17 +11,6 @@ DEBUGTRACE_VARIABLES; #endif using std::runtime_error; -DaqData::DaqData(const us nchannels, const us nframes, - const DataTypeDescriptor::DataType dtype) - : nchannels(nchannels), nframes(nframes), dtype(dtype), - dtype_descr(dtype_map.at(dtype)) { - static_assert(sizeof(char) == 1, "Invalid char size"); - - const DataTypeDescriptor &desc = dtype_map.at(dtype); - _data.resize(nframes * nchannels * desc.sw); -} - - std::unique_ptr Daq::createDaq(const DeviceInfo &devinfo, const DaqConfiguration &config) { DEBUGTRACE_ENTER; diff --git a/lasp/device/lasp_daq.h b/lasp/device/lasp_daq.h index 8027f85..fd0ee01 100644 --- a/lasp/device/lasp_daq.h +++ b/lasp/device/lasp_daq.h @@ -1,50 +1,22 @@ #pragma once +#include #include "lasp_config.h" -#include "lasp_daqconfig.h" +#include "lasp_daqdata.h" #include "lasp_deviceinfo.h" #include "lasp_types.h" #include #include -#include -#include /** - * @brief Data coming from / going to DAQ. **Non-interleaved format**, which means - * data in buffer is ordered by channel: _ptr[sample+channel*nframes] - */ -class DaqData { -protected: - /** - * @brief Storage for actual data. - */ - std::vector _data; - -public: - const us nchannels; - const us nframes; - const DataTypeDescriptor::DataType dtype; - const DataTypeDescriptor dtype_descr; - - DaqData(const us nchannels, const us nframes, - const DataTypeDescriptor::DataType dtype_descr); - virtual ~DaqData() = default; - - /** - * @brief Return reference to internal vector - * - * @return Reference to vector of data storage. - */ - std::vector &raw_vec() { return _data; } - us size_bytes() const { return _data.size(); } -}; - -/** - * @brief Callback of DAQ. Called with arguments of a vector of data - * spans for each channel, and a datatype descriptor. Callback should return + * @brief Callback of DAQ for input data. Callback should return * false for a stop request. */ -using DaqCallback = - std::function)>; +using InDaqCallback = std::function; + +/** + * @brief + */ +using OutDaqCallback = std::function; /** * @brief Base cass for all DAQ instances @@ -75,8 +47,8 @@ public: * required, the return value of the function should be nullptr. If no input * data is presented, the function is called with a nullptr as argument. */ - virtual void start(std::optional inCallback, - std::optional outCallback); + virtual void start(std::optional inCallback, + std::optional outCallback); /** * @brief Stop the Daq device. Throws an exception if the device is not diff --git a/lasp/device/lasp_daqdata.cpp b/lasp/device/lasp_daqdata.cpp new file mode 100644 index 0000000..04170c3 --- /dev/null +++ b/lasp/device/lasp_daqdata.cpp @@ -0,0 +1,31 @@ +#include "lasp_daqdata.h" +#include "debugtrace.hpp" +#include + +DEBUGTRACE_VARIABLES; + + +DaqData::DaqData(const us nchannels, const us nframes, + const DataTypeDescriptor::DataType dtype) + : nchannels(nchannels), nframes(nframes), dtype(dtype), + dtype_descr(dtype_map.at(dtype)), +sw(dtype_descr.sw) { + static_assert(sizeof(char) == 1, "Invalid char size"); + + const DataTypeDescriptor &desc = dtype_map.at(dtype); + _data.resize(nframes * nchannels * desc.sw); +} + +void DaqData::copyInFromRaw(const std::vector &ptrs) { + us ch = 0; + assert(ptrs.size() == nchannels); + for (auto ptr : ptrs) { + std::copy(ptr, ptr + sw * nframes, &_data[sw * ch * nframes]); + ch++; + } +} + +void DaqData::copyToRaw(const us channel,uint8_t *ptr) { + std::copy(&_data[sw * nframes * channel], + &_data[sw * nframes * (channel + 1)], ptr); +} diff --git a/lasp/device/lasp_daqdata.h b/lasp/device/lasp_daqdata.h new file mode 100644 index 0000000..fbbf94c --- /dev/null +++ b/lasp/device/lasp_daqdata.h @@ -0,0 +1,88 @@ +#pragma once +#include "lasp_daqconfig.h" +#include "lasp_types.h" +#include +#include +#include +#include + +/** + * @brief Data coming from / going to DAQ. **Non-interleaved format**, which + * means data in buffer is ordered by channel: _ptr[sample+channel*nframes] + */ +class DaqData { +protected: + /** + * @brief Storage for the actual data. + */ + std::vector _data; + +public: + /** + * @brief The number of channels + */ + const us nchannels; + + /** + * @brief The number of frames in this block of data. + */ + const us nframes; + + const DataTypeDescriptor::DataType dtype; + const DataTypeDescriptor &dtype_descr; + + /** + * @brief The number of bytes per sample (sample width, sw) + */ + const us sw; + + DaqData(const us nchannels, const us nframes, + const DataTypeDescriptor::DataType dtype); + virtual ~DaqData() = default; + + /** + * @brief Return reference to internal vector + * + * @return Reference to vector of data storage. + */ + int8_t *raw_ptr() { return _data.data(); } + /** + * @brief Return the total number of bytes + * + * @return Number of bytes of data. + */ + us size_bytes() const { return _data.size(); } + + /** + * @brief Copy data from a set of raw pointers of *uninterleaved* data. + * Overwrites any existing available data. + * + * @param ptrs Pointers to data from channels + */ + void copyInFromRaw(const std::vector &ptrs); + + /** + * @brief Copy contents of DaqData for a certain channel to a raw pointer. + * + * @param channel The channel to copy. + * @param ptr The pointer where data is copied to. + */ + void copyToRaw(const us channel, uint8_t *ptr); +}; + +template class TypedDaqData : public DaqData { +public: + TypedDaqData(const us nchannels, const us nframes, + const DataTypeDescriptor::DataType dtype_descr) + : DaqData(nchannels, nframes, dtype_descr) {} + + T &operator[](const us i) { return _data[this->sw * i]; } + + T &operator()(const us sample, const us channel) { + return reinterpret_cast(_data[sw * (sample + channel * nframes)]); + } + + void copyToRaw(const us channel, T *ptr) { + DaqData::copyToRaw(channel, reinterpret_cast(ptr)); + } +}; diff --git a/lasp/device/lasp_rtaudiodaq.cpp b/lasp/device/lasp_rtaudiodaq.cpp index d385c57..e0427f3 100644 --- a/lasp/device/lasp_rtaudiodaq.cpp +++ b/lasp/device/lasp_rtaudiodaq.cpp @@ -116,6 +116,9 @@ class RtAudioDaq : public Daq { RtAudioDaq(const RtAudioDaq &) = delete; RtAudioDaq &operator=(const RtAudioDaq &) = delete; + InDaqCallback _incallback; + OutDaqCallback _outcallback; + public: RtAudioDaq(const DeviceInfo &devinfo, const DaqConfiguration &config) : Daq(devinfo, config), @@ -155,7 +158,7 @@ public: } RtAudio::StreamOptions streamoptions; - streamoptions.flags = RTAUDIO_HOG_DEVICE; + streamoptions.flags = RTAUDIO_HOG_DEVICE | RTAUDIO_NONINTERLEAVED; streamoptions.numberOfBuffers = 2; streamoptions.streamName = "RtAudio stream"; @@ -196,148 +199,130 @@ public: &myerrorcallback); } + virtual void start(std::optional inCallback, + std::optional outCallback) override { + + assert(!monitorOutput); + + if (isRunning()) { + throw runtime_error("Stream already running"); + } + // Logical XOR + if (!inCallback != !outCallback) { + throw runtime_error("Either input or output stream possible for RtAudio. " + "Stream duplex mode not provided."); + } + + if (inCallback) { + _incallback = *inCallback; + if (!neninchannels()) { + throw runtime_error( + "Input callback given, but stream does not provide input data"); + } + } + if (outCallback) { + _outcallback = *outCallback; + if (!nenoutchannels()) { + throw runtime_error( + "Output callback given, but stream does not provide output data"); + } + } + rtaudio.startStream(); + } + + bool isRunning() const override { return (rtaudio.isStreamRunning()); } + + void stop() override { + if (!isRunning()) { + /* cerr << "Stream is already stopped" << endl; */ + } else { + rtaudio.stopStream(); + } + } + int streamCallback(void *outputBuffer, void *inputBuffer, unsigned int nFrames, double streamTime, RtAudioStreamStatus status) { - auto dtype = dataType(); - us neninchannels_inc_mon = neninchannels(); + const auto &dtype_descr = DataTypeDescriptor(); + const auto dtype = dataType(); + us neninchannels = this->neninchannels(); us nenoutchannels = this->nenoutchannels(); - - us bytesperchan = dtype_map.at(dtype).sw * nFrames; - us monitorOffset = ((us)monitorOutput) * bytesperchan; + us sw = dtype_descr.sw; + if (nFrames != nFramesPerBlock) { + cerr << "RtAudio backend error: nFrames does not match block size!" + << endl; + return 1; + } if (inputBuffer) { - - u_int8_t *inbuffercpy = - (u_int8_t *)malloc(bytesperchan * neninchannels_inc_mon); - if (inputBuffer) { - us j = 0; // OUR buffer channel counter - us i = 0; // RtAudio channel counter - for (int ch = getLowestInChannel(); ch <= getHighestInChannel(); ch++) { - if (eninchannels[ch]) { - memcpy(&(inbuffercpy[monitorOffset + j * bytesperchan]), - &(inputBuffer[i * bytesperchan]), bytesperchan); - j++; - } - i++; + std::vector ptrs; + ptrs.reserve(neninchannels); + /* DaqData(neninchannels_inc_mon, nFramesPerBlock, dtype); */ + for (int ch = getLowestInChannel(); ch <= getHighestInChannel(); ch++) { + if (eninchannels[ch]) { + ptrs.push_back(&static_cast( + inputBuffer)[sw * ninchannels * ch * nFramesPerBlock]); } } + DaqData d{neninchannels, nFramesPerBlock, dtype}; + d.copyInFromRaw(ptrs); + bool ret = _incallback(d); + if (!ret) { + return 1; + } } if (outputBuffer) { - if (!outqueue->empty()) { - u_int8_t *outbuffercpy = (u_int8_t *)outqueue->dequeue(); - us j = 0; // OUR buffer channel counter - us i = 0; // RtAudio channel counter - for (us ch = 0; ch <= daq->getHighestOutChannel(); ch++) { - /* cerr << "Copying from queue... " << endl; */ - if (enoutchannels[ch]) { - memcpy(&(outputBuffer[i * bytesperchan]), - &(outbuffercpy[j * bytesperchan]), bytesperchan); - j++; - } else { - /* cerr << "unused output channel in list" << endl; */ - memset(&(outputBuffer[i * bytesperchan]), 0, bytesperchan); - } - i++; + std::vector ptrs; + ptrs.reserve(neninchannels); + DaqData data(nenoutchannels, nFramesPerBlock, dtype); + + /* outCallback */ + for (int ch = 0; ch <= getHighestOutChannel(); ch++) { + if (enoutchannels[ch]) { + ptrs.push_back(&static_cast( + outputBuffer)[sw * nenoutchannels * ch * nFramesPerBlock]); } - if (!monitorOutput) { - free(outbuffercpy); - } else { - assert(outDelayqueue); - outDelayqueue->enqueue((void *)outbuffercpy); - } - } else { - cerr << "RtAudio backend: stream output buffer underflow!" << endl; + } + DaqData d{nenoutchannels, nFramesPerBlock, dtype}; + bool ret = _outcallback(d); + if (!ret) { + return 1; + } + us j = 0; + for (auto ptr : ptrs) { + d.copyToRaw(j, ptr); + j++; } } return 0; } + + ~RtAudioDaq() { + if (isRunning()) { + stop(); + } + if (rtaudio.isStreamOpen()) { + rtaudio.closeStream(); + } + } +}; + +std::unique_ptr createRtAudioDevice(const DeviceInfo &devinfo, + const DaqConfiguration &config) { + return std::make_unique(devinfo, config); } -virtual void -start(std::optional inCallback, - std::optional outCallback) { - - assert(!monitorOutput); - - if (isRunning()) { - throw runtime_error("Stream already running"); - } - - if (neninchannels(false) > 0 && !inqueue) { - throw runtime_error("inqueue argument not given"); - } - if (nenoutchannels() > 0 && !outqueue) { - throw runtime_error("outqueue argument not given"); - } - assert(rtaudio); - rtaudio->startStream(); +void myerrorcallback(RtAudioError::Type, const string &errorText) { + cerr << errorText << endl; } +int mycallback(void *outputBuffer, void *inputBuffer, unsigned int nFrames, + double streamTime, RtAudioStreamStatus status, void *userData) { -void stop() { - if (!isRunning()) { - cerr << "Stream is already stopped" << endl; - } else { - assert(rtaudio); - rtaudio->stopStream(); - } - if (inqueue) { - inqueue = nullptr; - } - if (outqueue) { - outqueue = nullptr; - } - if (outDelayqueue) { - delete outDelayqueue; - outDelayqueue = nullptr; - } + return static_cast(userData)->streamCallback( + outputBuffer, inputBuffer, nFrames, streamTime, status); } -bool isRunning() const { return (rtaudio && rtaudio->isStreamRunning()); } - -~RtAudioDaq() { - assert(rtaudio); - if (isRunning()) { - stop(); - } - if (rtaudio->isStreamOpen()) { - rtaudio->closeStream(); - } -} -} -; - -Daq *createRtAudioDevice(const DeviceInfo &devinfo, - const DaqConfiguration &config) { - - AudioDaq *daq = NULL; - - try { - daq = new AudioDaq(devinfo, config); - - } catch (runtime_error &e) { - if (daq) - delete daq; - throw; - } - return daq; -} - -int mycallback(void *outputBuffervoid, void *inputBuffervoid, - unsigned int nFrames, double streamTime, - RtAudioStreamStatus status, void *userData) { - - void myerrorcallback(RtAudioError::Type, const string &errorText) { - cerr << errorText << endl; - } - int mycallback(void *outputBuffer, void *inputBuffer, unsigned int nFrames, - double streamTime, RtAudioStreamStatus status, - void *userData) { - - return static_cast(userData)->streamCallback( - outputBuffer, inputBuffer, nFrames, streamTime, status); - } #endif // LASP_HAS_RTAUDIO == 1 diff --git a/lasp/device/lasp_uldaq.cpp b/lasp/device/lasp_uldaq.cpp index 12d437a..dbad5bc 100644 --- a/lasp/device/lasp_uldaq.cpp +++ b/lasp/device/lasp_uldaq.cpp @@ -59,8 +59,8 @@ class DT9837A : public Daq { const us _nFramesPerBlock; - void threadFcn(std::optional inCallback, - std::optional outcallback); + void threadFcn(std::optional inCallback, + std::optional outcallback); public: DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config); @@ -80,8 +80,8 @@ public: } bool isRunning() const override final { return _thread.joinable(); } - virtual void start(std::optional inCallback, - std::optional outCallback) override final; + virtual void start(std::optional inCallback, + std::optional outCallback) override final; void stop() override final { DEBUGTRACE_ENTER; @@ -99,8 +99,8 @@ public: friend class OutBufHandler; }; -void DT9837A::start(std::optional inCallback, - std::optional outCallback) { +void DT9837A::start(std::optional inCallback, + std::optional outCallback) { DEBUGTRACE_ENTER; if (isRunning()) { throw runtime_error("DAQ is already running"); @@ -122,7 +122,6 @@ protected: DaqDeviceHandle _handle; const DataTypeDescriptor dtype_descr; us nchannels, nFramesPerBlock; - DaqCallback cb; double samplerate; dvec buf; bool topenqueued, botenqueued; @@ -132,10 +131,10 @@ protected: public: BufHandler(DaqDeviceHandle handle, const DataTypeDescriptor dtype_descr, - const us nchannels, const us nFramesPerBlock, DaqCallback cb, + const us nchannels, const us nFramesPerBlock, const double samplerate) : _handle(handle), dtype_descr(dtype_descr), nchannels(nchannels), - nFramesPerBlock(nFramesPerBlock), cb(cb), samplerate(samplerate), + nFramesPerBlock(nFramesPerBlock), samplerate(samplerate), buf(2 * nchannels * nFramesPerBlock, // Watch the two here, the top and the bottom! 0), @@ -145,11 +144,13 @@ public: }; class InBufHandler : public BufHandler { bool monitorOutput; + InDaqCallback cb; public: - InBufHandler(DT9837A &daq, DaqCallback cb) + InBufHandler(DT9837A &daq, InDaqCallback cb) : BufHandler(daq._handle, daq.dtypeDescr(), daq.neninchannels(), - daq._nFramesPerBlock, cb, daq.samplerate()) + daq._nFramesPerBlock, daq.samplerate()), + cb(cb) { DEBUGTRACE_ENTER; @@ -210,31 +211,29 @@ public: bool operator()() { + bool ret = true; + auto runCallback = ([&](us totalOffset) { us monitoroffset = monitorOutput ? 1 : 0; - DaqData data(nchannels, nFramesPerBlock, - DataTypeDescriptor::DataType::dtype_fl64); - us ch_no = 0; + TypedDaqData data(nchannels, nFramesPerBlock, + DataTypeDescriptor::DataType::dtype_fl64); + if (monitorOutput) { - - reinterpret_cast( - &buf[totalOffset + (nchannels - 1) * nFramesPerBlock]), - nFramesPerBlock * sizeof(double)); + for (us sample = 0; sample < nFramesPerBlock; sample++) { + data(0, sample) = + buf[totalOffset + (sample * nchannels) + (nchannels - 1)]; + } } - /* if(mon */ - /* for (us channel = monitoroffset; channel < (nchannels - monitoroffset); - */ - /* channel++) { */ - /* cv[channel] = */ - /* gsl::span(reinterpret_cast( */ - /* &buf[totalOffset + channel * nFramesPerBlock]), */ - /* nFramesPerBlock * sizeof(double)); */ - /* } */ - - /* cv[0] = gsl::span( */ - /* cb(cv, dtype_descr); */ + for (us channel = monitoroffset; channel < (nchannels - monitoroffset); + channel++) { + for (us sample = 0; sample < nFramesPerBlock; sample++) { + data(channel, sample) = + buf[totalOffset + (sample * nchannels) + channel]; + } + } + return cb(data); }); ScanStatus status; @@ -243,7 +242,7 @@ public: UlError err = ulDaqInScanStatus(_handle, &status, &transferStatus); if (err != ERR_NO_ERROR) { showErr(err); - return; + return false; } increment = transferStatus.currentTotalCount - totalFramesCount; @@ -251,23 +250,24 @@ public: if (increment > nFramesPerBlock) { cerr << "Error: overrun for input of DAQ!" << endl; - return; + return false; } assert(status == SS_RUNNING); if (transferStatus.currentIndex < (long long)buffer_mid_idx) { topenqueued = false; if (!botenqueued) { - runCallback(nchannels * nFramesPerBlock); + ret = runCallback(nchannels * nFramesPerBlock); botenqueued = true; } } else { botenqueued = false; if (!topenqueued) { - runCallback(0); + ret = runCallback(0); topenqueued = true; } } + return ret; } ~InBufHandler() { // At exit of the function, stop scanning. @@ -280,10 +280,13 @@ public: }; class OutBufHandler : public BufHandler { + OutDaqCallback cb; + public: - OutBufHandler(DT9837A &daq, DaqCallback cb) + OutBufHandler(DT9837A &daq, OutDaqCallback cb) : BufHandler(daq._handle, daq.dtypeDescr(), daq.neninchannels(), - daq._nFramesPerBlock, cb, daq.samplerate()) { + daq._nFramesPerBlock, daq.samplerate()), + cb(cb) { DEBUGTRACE_MESSAGE("Starting output scan"); AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT; @@ -298,14 +301,10 @@ public: } botenqueued = false, topenqueued = true; - - // Run callback to first fill top part - ChannelView cv{gsl::span(reinterpret_cast(&buf[0]), - nFramesPerBlock * sizeof(double))}; - cb(cv, dtype_descr); } - void operator()() { + bool operator()() { + bool res = true; assert(_handle != 0); UlError err = ERR_NO_ERROR; @@ -316,40 +315,41 @@ public: err = ulAOutScanStatus(_handle, &status, &transferStatus); if (err != ERR_NO_ERROR) { showErr(err); - return; + return false; } if (status != SS_RUNNING) { - return; + return false; } increment = transferStatus.currentTotalCount - totalFramesCount; totalFramesCount += increment; if (increment > nFramesPerBlock) { cerr << "Error: underrun for output of DAQ!" << endl; - return; + return false; } if (transferStatus.currentIndex < buffer_mid_idx) { topenqueued = false; if (!botenqueued) { + TypedDaqData d(1, nFramesPerBlock, + DataTypeDescriptor::DataType::dtype_fl64); + res = cb(d); + d.copyToRaw(0, &buf[buffer_mid_idx]); - ChannelView cv{ - gsl::span(reinterpret_cast(&buf[buffer_mid_idx]), - nFramesPerBlock * sizeof(double))}; - cb(cv, dtype_descr); botenqueued = true; } } else { botenqueued = false; if (!topenqueued) { - - ChannelView cv{gsl::span(reinterpret_cast(&buf[0]), - nFramesPerBlock * sizeof(double))}; - cb(cv, dtype_descr); + TypedDaqData d(1, nFramesPerBlock, + DataTypeDescriptor::DataType::dtype_fl64); + res = cb(d); + d.copyToRaw(0, buf.data()); topenqueued = true; } } + return res; } ~OutBufHandler() { @@ -361,8 +361,8 @@ public: } }; -void DT9837A::threadFcn(std::optional inCallback, - std::optional outCallback) { +void DT9837A::threadFcn(std::optional inCallback, + std::optional outCallback) { DEBUGTRACE_ENTER; std::unique_ptr ibh; @@ -381,10 +381,14 @@ void DT9837A::threadFcn(std::optional inCallback, while (!_stopThread) { if (ibh) { - (*ibh)(); + if (!(*ibh)()) { + _stopThread = true; + } } if (obh) { - (*obh)(); + if (!(*obh)()) { + _stopThread = true; + } } std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us)); }