diff --git a/src/lasp/device/CMakeLists.txt b/src/lasp/device/CMakeLists.txt index 052d997..e576d23 100644 --- a/src/lasp/device/CMakeLists.txt +++ b/src/lasp/device/CMakeLists.txt @@ -8,6 +8,7 @@ add_library(lasp_device_lib OBJECT lasp_rtaudiodaq.cpp lasp_streammgr.cpp lasp_uldaq.cpp + lasp_uldaq_impl.cpp ) # Callback requires certain arguments that are not used by code. This disables diff --git a/src/lasp/device/lasp_daq.h b/src/lasp/device/lasp_daq.h index a921a09..dbc7392 100644 --- a/src/lasp/device/lasp_daq.h +++ b/src/lasp/device/lasp_daq.h @@ -44,7 +44,6 @@ public: systemError, threadError, logicError, - apiSpecificError }; /** @@ -59,6 +58,7 @@ public: {StreamError::threadError, "Thread error"}, {StreamError::logicError, "Logic error (probably a bug)"}, }; + bool isRunning = false; /** * @brief Check if stream has error @@ -78,6 +78,25 @@ public: * @return as described above. */ bool runningOK() const { return isRunning && !error(); } + + }; // End of class StreamStatus + + using rte = std::runtime_error; + /** + * @brief Used for internal throwing of exceptions. + */ + class StreamException : public rte { + using StreamError = StreamStatus::StreamError; + + public: + StreamStatus::StreamError e; + StreamException(const StreamStatus::StreamError e) + : rte(StreamStatus::errorMessages.at(e)), e(e) {} + StreamException(const StreamStatus::StreamError e, + const std::string &additional_info) + : rte(StreamStatus::errorMessages.at(e) + ": " + additional_info), + e(e) {} + operator StreamError() { return e; } }; /** @@ -161,7 +180,7 @@ public: * * * @return Maximum offset from 0 before clipping. */ - dvec inputRangeForEnabledChannels(const bool include_monitor=true) const; + dvec inputRangeForEnabledChannels(const bool include_monitor = true) const; /** * @brief Returns datatype (enum) corresponding to the datatype of the @@ -176,7 +195,7 @@ public: * * @return A DataTypeDescriptor */ - const DataTypeDescriptor& dtypeDescr() const; + const DataTypeDescriptor &dtypeDescr() const; /** * @brief The number of frames that is send in a block of DaqData. diff --git a/src/lasp/device/lasp_rtaudiodaq.cpp b/src/lasp/device/lasp_rtaudiodaq.cpp index 59ad889..8c7db53 100644 --- a/src/lasp/device/lasp_rtaudiodaq.cpp +++ b/src/lasp/device/lasp_rtaudiodaq.cpp @@ -17,8 +17,6 @@ using rte = std::runtime_error; using std::vector; using lck = std::scoped_lock; -DEBUGTRACE_VARIABLES; - void fillRtAudioDeviceInfo(vector &devinfolist) { DEBUGTRACE_ENTER; diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index d74fab5..5e63376 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -1,599 +1,22 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" #include "lasp_config.h" - #if LASP_HAS_ULDAQ == 1 -#include "lasp_daqconfig.h" -#include "lasp_uldaq.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include +#include "lasp_uldaq.h" +#include "lasp_uldaq_impl.h" -using namespace std::literals::chrono_literals; -using std::atomic; -using std::cerr; -using std::endl; -using rte = std::runtime_error; -#include "debugtrace.hpp" -DEBUGTRACE_VARIABLES; - -const us MAX_DEV_COUNT_PER_API = 100; -/** - * @brief Reserve some space for an error message from UlDaq - */ -const us UL_ERR_MSG_LEN = 512; - -/** - * @brief Show the error to default error stream and return a string - * corresponding to the error - * - * @param err Error string - */ -string showErr(UlError err) { - string errstr; - errstr.reserve(UL_ERR_MSG_LEN); - if (err != ERR_NO_ERROR) { - char errmsg[UL_ERR_MSG_LEN]; - errstr = "UlDaq API Error: "; - ulGetErrMsg(err, errmsg); - errstr += errmsg; - - std::cerr << "\b\n**************** UlDAQ backend error **********\n"; - std::cerr << errstr << std::endl; - std::cerr << "***********************************************\n\n"; - return errstr; - } - return errstr; -} - -class DT9837A : public Daq { - - DaqDeviceHandle _handle = 0; - std::mutex _daqmutex; - - std::thread _thread; - atomic _stopThread{false}; - atomic _streamStatus; - - const us _nFramesPerBlock; - - void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback); - -public: - DaqDeviceHandle getHandle() const { return _handle; } - DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config); - - ~DT9837A() { - UlError err; - if (isRunning()) { - stop(); - } - - if (_handle) { - err = ulDisconnectDaqDevice(_handle); - showErr(err); - err = ulReleaseDaqDevice(_handle); - showErr(err); - } - } - - bool isRunning() const { - DEBUGTRACE_ENTER; - return _thread.joinable(); - } - virtual void start(InDaqCallback inCallback, - OutDaqCallback outCallback) override final; - - virtual StreamStatus getStreamStatus() const override { - return _streamStatus; - } - - void stop() override final { - DEBUGTRACE_ENTER; - StreamStatus status = _streamStatus; - if (!isRunning()) { - throw rte("No data acquisition running"); - } - - _stopThread = true; - if (_thread.joinable()) { - _thread.join(); - } - _stopThread = false; - status.isRunning = false; - _streamStatus = status; - } - - friend class InBufHandler; - friend class OutBufHandler; -}; - -void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { - DEBUGTRACE_ENTER; - if (isRunning()) { - throw rte("DAQ is already running"); - } - if (neninchannels() > 0) { - if (!inCallback) - throw rte("DAQ requires a callback for input data"); - } - if (nenoutchannels() > 0) { - if (!outCallback) - throw rte("DAQ requires a callback for output data"); - } - assert(neninchannels() + nenoutchannels() > 0); - _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); -} - -class BufHandler { -protected: - DT9837A &daq; - const DataTypeDescriptor dtype_descr; - us nchannels, nFramesPerBlock; - double samplerate; - std::vector buf; - bool topenqueued, botenqueued; - - us increment = 0; - - us totalFramesCount = 0; - long long buffer_mid_idx; - -public: - BufHandler(DT9837A &daq, const us nchannels) - : daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels), - nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()), - buf(2 * nchannels * - nFramesPerBlock, // Watch the two here, the top and the bottom! - 0), - buffer_mid_idx(nchannels * nFramesPerBlock) { - assert(nchannels > 0); - } -}; -class InBufHandler : public BufHandler { - bool monitorOutput; - InDaqCallback cb; - -public: - InBufHandler(DT9837A &daq, InDaqCallback cb) - : BufHandler(daq, daq.neninchannels()), cb(cb) - - { - DEBUGTRACE_ENTER; - assert(daq.getHandle() != 0); - - monitorOutput = daq.monitorOutput; - - DaqInScanFlag inscanflags = DAQINSCAN_FF_DEFAULT; - ScanOption scanoptions = SO_CONTINUOUS; - UlError err = ERR_NO_ERROR; - - std::vector indescs; - boolvec eninchannels_without_mon = daq.eninchannels(false); - DEBUGTRACE_PRINT(eninchannels_without_mon.size()); - - // Initialize input, if any - dvec ranges = daq.inputRangeForEnabledChannels(false); - - // Update range index only when an enabled channel is found. - us range_idx = 0; - - for (us chin = 0; chin < 4; chin++) { - if (eninchannels_without_mon[chin] == true) { - DaqInChanDescriptor indesc; - indesc.type = DAQI_ANALOG_SE; - indesc.channel = chin; - - double rangeval = ranges.at(range_idx); - range_idx++; - Range rangenum; - if (fabs(rangeval - 1.0) < 1e-8) { - rangenum = BIP1VOLTS; - } else if (fabs(rangeval - 10.0) < 1e-8) { - rangenum = BIP10VOLTS; - } else { - std::cerr << "Fatal: input range value is invalid" << endl; - return; - } - indesc.range = rangenum; - indescs.push_back(indesc); - } - } - - // Add possibly last channel as monitor - if (monitorOutput) { - DaqInChanDescriptor indesc; - indesc.type = DAQI_DAC; - indesc.channel = 0; - /// The output only has a range of 10V, therefore the monitor of the - /// output also has to be set to this value. - indesc.range = BIP10VOLTS; - indescs.push_back(indesc); - } - assert(indescs.size() == nchannels); - - DEBUGTRACE_MESSAGE("Starting input scan"); - - err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels, - 2 * nFramesPerBlock, // Watch the 2 here! - &samplerate, scanoptions, inscanflags, buf.data()); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Could not start input DAQ"); - } - } - void start() { - - ScanStatus status; - TransferStatus transferStatus; - UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Unable to start input on DAQ"); - } - - totalFramesCount = transferStatus.currentTotalCount; - topenqueued = true; - botenqueued = true; - } - - /** - * @brief InBufHandler::operator()() - * - * @return true on success - */ - bool operator()() { - - /* DEBUGTRACE_ENTER; */ - - bool ret = true; - - auto runCallback = ([&](us totalOffset) { - /* DEBUGTRACE_ENTER; */ - - DaqData data(nFramesPerBlock, nchannels, - DataTypeDescriptor::DataType::dtype_fl64); - - us monitorOffset = monitorOutput ? 1 : 0; - /* /// Put the output monitor in front */ - if (monitorOutput) { - for (us frame = 0; frame < nFramesPerBlock; frame++) { - data.value(frame, 0) = - buf[totalOffset + (frame * nchannels) + (nchannels - 1)]; - } - } - - for (us channel = 0; channel < nchannels - monitorOffset; channel++) { - /* DEBUGTRACE_PRINT(channel); */ - for (us frame = 0; frame < nFramesPerBlock; frame++) { - data.value(frame, channel + monitorOffset) = - buf[totalOffset + (frame * nchannels) + channel]; - } - } - return cb(data); - }); - - ScanStatus status; - TransferStatus transferStatus; - - UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); - if (err != ERR_NO_ERROR) { - showErr(err); - return false; - } - - increment = transferStatus.currentTotalCount - totalFramesCount; - totalFramesCount += increment; - - if (increment > nFramesPerBlock) { - cerr << "Error: overrun for input of DAQ!" << endl; - return false; - } - assert(status == SS_RUNNING); - - if (transferStatus.currentIndex < (long long)buffer_mid_idx) { - topenqueued = false; - if (!botenqueued) { - ret = runCallback(nchannels * nFramesPerBlock); - botenqueued = true; - } - } else { - botenqueued = false; - if (!topenqueued) { - ret = runCallback(0); - topenqueued = true; - } - } - return ret; - } - ~InBufHandler() { - // At exit of the function, stop scanning. - DEBUGTRACE_ENTER; - UlError err = ulDaqInScanStop(daq.getHandle()); - if (err != ERR_NO_ERROR) { - showErr(err); - } - } -}; - -class OutBufHandler : public BufHandler { - OutDaqCallback cb; - -public: - OutBufHandler(DT9837A &daq, OutDaqCallback cb) - : BufHandler(daq, daq.nenoutchannels()), cb(cb) { - - DEBUGTRACE_MESSAGE("Starting output scan"); - DEBUGTRACE_PRINT(nchannels); - AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT; - ScanOption scanoptions = SO_CONTINUOUS; - UlError err = - ulAOutScan(daq.getHandle(), 0, 0, BIP10VOLTS, - 2 * nFramesPerBlock, // Watch the 2 here! - &samplerate, scanoptions, outscanflags, buf.data()); - - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Unable to start output on DAQ"); - } - } - void start() { - - ScanStatus status; - TransferStatus transferStatus; - - UlError err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Unable to start output on DAQ"); - } - if (status != SS_RUNNING) { - throw rte("Unable to start output on DAQ"); - } - totalFramesCount = transferStatus.currentTotalCount; - topenqueued = true; - botenqueued = true; - } - /** - * @brief OutBufHandler::operator() - * - * @return true on success - */ - bool operator()() { - - /* DEBUGTRACE_ENTER; */ - bool res = true; - assert(daq.getHandle() != 0); - - UlError err = ERR_NO_ERROR; - - ScanStatus status; - TransferStatus transferStatus; - - err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus); - if (err != ERR_NO_ERROR) { - showErr(err); - return false; - } - if (status != SS_RUNNING) { - return false; - } - increment = transferStatus.currentTotalCount - totalFramesCount; - totalFramesCount += increment; - - if (increment > nFramesPerBlock) { - cerr << "Error: underrun for output of DAQ!" << endl; - return false; - } - - if (transferStatus.currentIndex < buffer_mid_idx) { - topenqueued = false; - if (!botenqueued) { - DaqData d(nFramesPerBlock,1, - DataTypeDescriptor::DataType::dtype_fl64); - res = cb(d); - d.copyToRaw(0, reinterpret_cast(&(buf[buffer_mid_idx]))); - - botenqueued = true; - } - } else { - botenqueued = false; - if (!topenqueued) { - DaqData d(nFramesPerBlock,1, - DataTypeDescriptor::DataType::dtype_fl64); - res = cb(d); - d.copyToRaw(0, reinterpret_cast(&(buf[0]))); - - topenqueued = true; - } - } - return res; - } - - ~OutBufHandler() { - DEBUGTRACE_ENTER; - UlError err = ulAOutScanStop(daq.getHandle()); - if (err != ERR_NO_ERROR) { - showErr(err); - } - } -}; - -void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { +void fillUlDaqDeviceInfo(std::vector &devinfolist,void* vDescriptors) { DEBUGTRACE_ENTER; - /* cerr << "******************\n" */ - /* "Todo: the current way of handling timing in this DAQ thread is not " */ - /* "really robust, due " */ - /* "to input / output callbacks that can be too time-consuming. We have " */ - /* "to fix the " */ - /* "sleep_for to properly deal with longer callbacks." */ - /* "\n*****************" */ - /* << endl; */ - try { - - std::unique_ptr obh; - std::unique_ptr ibh; - - StreamStatus status = _streamStatus; - status.isRunning = true; - _streamStatus = status; - - if (nenoutchannels() > 0) { - assert(outCallback); - obh = std::make_unique(*this, outCallback); - } - if (neninchannels() > 0) { - assert(inCallback); - ibh = std::make_unique(*this, inCallback); - } - if (obh) - obh->start(); - if (ibh) - ibh->start(); - - const double sleeptime_s = - static_cast(_nFramesPerBlock) / (16 * samplerate()); - const us sleeptime_us = static_cast(sleeptime_s * 1e6); - - while (!_stopThread) { - if (ibh) { - if (!(*ibh)()) { - _stopThread = true; - } - } - if (obh) { - if (!(*obh)()) { - _stopThread = true; - } - } else { - std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us)); - } - } - } catch (rte &e) { - - StreamStatus status = _streamStatus; - status.isRunning = false; - status.errorType = StreamStatus::StreamError::systemError; - _streamStatus = status; - - cerr << "\n******************\n"; - cerr << "Catched error in UlDAQ thread: " << e.what() << endl; - cerr << "\n******************\n"; - } - StreamStatus status = _streamStatus; - - status.isRunning = false; - _streamStatus = status; - _stopThread = false; -} - -std::unique_ptr createUlDaqDevice(const DeviceInfo &devinfo, - const DaqConfiguration &config) { - return std::make_unique(devinfo, config); -} - -DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config) - : Daq(devinfo, config), - _nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) { - - // Some sanity checks - if (inchannel_config.size() != 4) { - throw rte("Invalid length of enabled inChannels vector"); - } - - if (outchannel_config.size() != 1) { - throw rte("Invalid length of enabled outChannels vector"); - } - - if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) { - throw rte("Unsensible number of samples per block chosen"); - } - - if (samplerate() < 10000 || samplerate() > 51000) { - throw rte("Invalid sample rate"); - } - - DaqDeviceDescriptor devdescriptors[MAX_DEV_COUNT_PER_API]; - DaqDeviceDescriptor descriptor; - DaqDeviceInterface interfaceType = ANY_IFC; + DaqDeviceDescriptor* descriptors_copy = static_cast(vDescriptors); UlError err; + unsigned int numdevs = MAX_ULDAQ_DEV_COUNT_PER_API; - us numdevs = MAX_DEV_COUNT_PER_API; - err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, - (unsigned *)&numdevs); - if (err != ERR_NO_ERROR) { - throw rte("Device inventarization failed"); - } - - if ((us)api_specific_devindex >= numdevs) { - throw rte("Device number {deviceno} too high {err}. This could " - "happen when the device is currently not connected"); - } - - descriptor = devdescriptors[api_specific_devindex]; - - // get a handle to the DAQ device associated with the first descriptor - _handle = ulCreateDaqDevice(descriptor); - - if (_handle == 0) { - throw rte("Unable to create a handle to the specified DAQ " - "device. Is the device currently in use? Please make sure to set " - "the DAQ configuration in duplex mode if simultaneous input and " - "output is required."); - } - - err = ulConnectDaqDevice(_handle); - if (err != ERR_NO_ERROR) { - ulReleaseDaqDevice(_handle); - _handle = 0; - throw rte(string("Unable to connect to device: " + showErr(err))); - } - - for (us ch = 0; ch < 4; ch++) { - - err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0); - showErr(err); - if (err != ERR_NO_ERROR) { - throw rte("Fatal: could normalize channel sensitivity"); - } - - CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC; - err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Fatal: could not set AC/DC coupling mode"); - } - - IepeMode iepe = - inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED; - err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Fatal: could not set IEPE mode"); - } - } -} -void fillUlDaqDeviceInfo(std::vector &devinfolist) { - - DEBUGTRACE_ENTER; - - UlError err; - unsigned int numdevs = MAX_DEV_COUNT_PER_API; - - DaqDeviceDescriptor devdescriptors[MAX_DEV_COUNT_PER_API]; + DaqDeviceDescriptor devdescriptors[MAX_ULDAQ_DEV_COUNT_PER_API]; DaqDeviceDescriptor descriptor; DaqDeviceInterface interfaceType = ANY_IFC; @@ -608,6 +31,11 @@ void fillUlDaqDeviceInfo(std::vector &devinfolist) { descriptor = devdescriptors[i]; + // Copy structure over, if given as not nullptr + if(descriptors_copy) { + descriptors_copy[i] = descriptor; + } + DeviceInfo devinfo; devinfo.api = uldaqapi; string name, interface; @@ -666,4 +94,11 @@ void fillUlDaqDeviceInfo(std::vector &devinfolist) { devinfolist.push_back(devinfo); } } + +std::unique_ptr createUlDaqDevice(const DeviceInfo &devinfo, + const DaqConfiguration &config) { + return std::make_unique(devinfo, config); +} + + #endif // LASP_HAS_ULDAQ diff --git a/src/lasp/device/lasp_uldaq.h b/src/lasp/device/lasp_uldaq.h index 4dd7489..4d9ea19 100644 --- a/src/lasp/device/lasp_uldaq.h +++ b/src/lasp/device/lasp_uldaq.h @@ -1,6 +1,13 @@ #pragma once #include "lasp_daq.h" + +/** + * @brief The maximum number of devices that can be enumerated when calling + * ulGetDaqDeviceInventory() + */ +const us MAX_ULDAQ_DEV_COUNT_PER_API = 100; + std::unique_ptr createUlDaqDevice(const DeviceInfo& devinfo, const DaqConfiguration& config); @@ -8,7 +15,10 @@ std::unique_ptr createUlDaqDevice(const DeviceInfo& devinfo, * @brief Fill device info list with UlDaq specific devices, if any. * * @param devinfolist Info list to append to + * @param descriptors Pointer to array + * DaqDeviceDescriptors[MAX_ULDAQ_DEV_COUNT_PER_API]. If a pointer is given, a + * copy of the device descriptors is set to the memory of this pointer. We use + * a void* pointer here to not expose the implementation of UlDaq. */ -void fillUlDaqDeviceInfo(std::vector &devinfolist); - +void fillUlDaqDeviceInfo(std::vector, void* descriptors=nullptr); diff --git a/src/lasp/device/lasp_uldaq_impl.cpp b/src/lasp/device/lasp_uldaq_impl.cpp new file mode 100644 index 0000000..955d645 --- /dev/null +++ b/src/lasp/device/lasp_uldaq_impl.cpp @@ -0,0 +1,487 @@ +/* #define DEBUGTRACE_ENABLED */ +#include "debugtrace.hpp" +#include "lasp_config.h" + +#if LASP_HAS_ULDAQ == 1 +#include "lasp_daqconfig.h" +#include "lasp_uldaq.h" +#include "lasp_uldaq_impl.h" + +using namespace std::literals::chrono_literals; + +/** + * @brief Reserve some space for an error message from UlDaq + */ +const us UL_ERR_MSG_LEN = 512; + +/** + * @brief Return a string corresponding to the UlDaq API error + * + * @param err error code + * + * @return Error string + */ +string getErrMsg(UlError err) { + string errstr; + errstr.reserve(UL_ERR_MSG_LEN); + char errmsg[UL_ERR_MSG_LEN]; + errstr = "UlDaq API Error: "; + ulGetErrMsg(err, errmsg); + errstr += errmsg; + return errstr; +} +inline void showErr(string errstr) { + std::cerr << "\b\n**************** UlDAQ backend error **********\n"; + std::cerr << errstr << std::endl; + std::cerr << "***********************************************\n\n"; +} +inline void showErr(UlError err) { showErr(getErrMsg(err)); } + +DT9837A::~DT9837A() { + UlError err; + if (isRunning()) { + stop(); + } + + if (_handle) { + err = ulDisconnectDaqDevice(_handle); + showErr(err); + err = ulReleaseDaqDevice(_handle); + showErr(err); + } +} +DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config) + : Daq(devinfo, config), + _nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) { + + // Some sanity checks + if (inchannel_config.size() != 4) { + throw rte("Invalid length of enabled inChannels vector"); + } + + if (outchannel_config.size() != 1) { + throw rte("Invalid length of enabled outChannels vector"); + } + + if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) { + throw rte("Unsensible number of samples per block chosen"); + } + + if (samplerate() < 10000 || samplerate() > 51000) { + throw rte("Invalid sample rate"); + } + + std::vector devs; + DaqDeviceDescriptor devdescriptors[MAX_ULDAQ_DEV_COUNT_PER_API]; + fillUlDaqDeviceInfo(devs, static_cast(devdescriptors)); + + if (devs.size() == 0) { + throw rte("Unable to find any UlDaq devices"); + } + + if (devinfo.api_specific_devindex < 0 || + devinfo.api_specific_devindex >= (int) MAX_ULDAQ_DEV_COUNT_PER_API) { + throw rte("Invalid device index"); + } + + // get a handle to the DAQ device associated with the first descriptor + _handle = ulCreateDaqDevice(devdescriptors[devinfo.api_specific_devindex]); + + if (_handle == 0) { + throw rte("Unable to create a handle to the specified DAQ " + "device. Is the device currently in use? Please make sure to set " + "the DAQ configuration in duplex mode if simultaneous input and " + "output is required."); + } + + UlError err = ulConnectDaqDevice(_handle); + + if (err != ERR_NO_ERROR) { + ulReleaseDaqDevice(_handle); + _handle = 0; + throw rte("Unable to connect to device: " + getErrMsg(err)); + } + + /// Loop over input channels, set parameters + for (us ch = 0; ch < 4; ch++) { + + err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0); + showErr(err); + if (err != ERR_NO_ERROR) { + throw rte("Fatal: could normalize channel sensitivity"); + } + + CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC; + err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm); + if (err != ERR_NO_ERROR) { + showErr(err); + throw rte("Fatal: could not set AC/DC coupling mode"); + } + + IepeMode iepe = + inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED; + err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe); + if (err != ERR_NO_ERROR) { + showErr(err); + throw rte("Fatal: could not set IEPE mode"); + } + } +} + +bool DT9837A::isRunning() const { + DEBUGTRACE_ENTER; + return _thread.joinable(); +} +void DT9837A::stop() { + DEBUGTRACE_ENTER; + StreamStatus status = _streamStatus; + if (!isRunning()) { + throw rte("No data acquisition running"); + } + + _stopThread = true; + if (_thread.joinable()) { + _thread.join(); + } + _stopThread = false; + status.isRunning = false; + _streamStatus = status; +} + +/** + * @brief Throws an exception in case it happens. Does nothing in case of no + * error. + * + * @param e + */ +inline void throwUlException(UlError err) { + if (err == ERR_NO_ERROR) { + return; + } + string errstr = getErrMsg(err); + showErr(errstr); + Daq::StreamStatus::StreamError serr; + if ((int)err < 5) { + serr = Daq::StreamStatus::StreamError::logicError; + } else if ((int)err < 9) { + serr = Daq::StreamStatus::StreamError::systemError; + } else if ((int)err < 18) { + serr = Daq::StreamStatus::StreamError::logicError; + } else if ((int)err == 18) { + serr = Daq::StreamStatus::StreamError::inputXRun; + } else if ((int)err == 19) { + serr = Daq::StreamStatus::StreamError::outputXRun; + } else { + serr = Daq::StreamStatus::StreamError::driverError; + } + + throw Daq::StreamException(serr, errstr); +} + +void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { + DEBUGTRACE_ENTER; + if (isRunning()) { + throw rte("DAQ is already running"); + } + if (neninchannels() > 0) { + if (!inCallback) + throw rte("DAQ requires a callback for input data"); + } + if (nenoutchannels() > 0) { + if (!outCallback) + throw rte("DAQ requires a callback for output data"); + } + assert(neninchannels() + nenoutchannels() > 0); + _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); +} + +InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb) + : BufHandler(daq, daq.neninchannels()), cb(cb) + +{ + DEBUGTRACE_ENTER; + assert(daq.getHandle() != 0); + + monitorOutput = daq.monitorOutput; + + DaqInScanFlag inscanflags = DAQINSCAN_FF_DEFAULT; + ScanOption scanoptions = SO_CONTINUOUS; + UlError err = ERR_NO_ERROR; + + std::vector indescs; + boolvec eninchannels_without_mon = daq.eninchannels(false); + + // Set ranges for each input. Below asks only channels that are not a + // monitor channel (hence the false flag). + dvec ranges = daq.inputRangeForEnabledChannels(false); + for (us chin = 0; chin < 4; chin++) { + if (eninchannels_without_mon[chin] == true) { + DaqInChanDescriptor indesc; + indesc.type = DAQI_ANALOG_SE; + indesc.channel = chin; + + double rangeval = ranges.at(chin); + Range rangenum; + if (fabs(rangeval - 1.0) < 1e-8) { + rangenum = BIP1VOLTS; + } else if (fabs(rangeval - 10.0) < 1e-8) { + rangenum = BIP10VOLTS; + } else { + throw Daq::StreamException(Daq::StreamStatus::StreamError::logicError); + std::cerr << "Fatal: input range value is invalid" << endl; + return; + } + indesc.range = rangenum; + indescs.push_back(indesc); + } + } + + // Add possibly last channel as monitor + if (monitorOutput) { + DaqInChanDescriptor indesc; + indesc.type = DAQI_DAC; + indesc.channel = 0; + /// The output only has a range of 10V, therefore the monitor of the + /// output also has to be set to this value. + indesc.range = BIP10VOLTS; + indescs.push_back(indesc); + } + assert(indescs.size() == nchannels); + + DEBUGTRACE_MESSAGE("Starting input scan"); + + err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels, + 2 * nFramesPerBlock, // Watch the 2 here! + &samplerate, scanoptions, inscanflags, buf.data()); + throwUlException(err); +} +void InBufHandler::start() { + + ScanStatus status; + TransferStatus transferStatus; + UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); + throwUlException(err); + + totalFramesCount = transferStatus.currentTotalCount; + topenqueued = true; + botenqueued = true; +} + +bool InBufHandler::operator()() { + + /* DEBUGTRACE_ENTER; */ + + bool ret = true; + + auto runCallback = ([&](us totalOffset) { + /* DEBUGTRACE_ENTER; */ + + DaqData data(nFramesPerBlock, nchannels, + DataTypeDescriptor::DataType::dtype_fl64); + + us monitorOffset = monitorOutput ? 1 : 0; + /* /// Put the output monitor in front */ + if (monitorOutput) { + for (us frame = 0; frame < nFramesPerBlock; frame++) { + data.value(frame, 0) = + buf[totalOffset + (frame * nchannels) + (nchannels - 1)]; + } + } + + for (us channel = 0; channel < nchannels - monitorOffset; channel++) { + /* DEBUGTRACE_PRINT(channel); */ + for (us frame = 0; frame < nFramesPerBlock; frame++) { + data.value(frame, channel + monitorOffset) = + buf[totalOffset + (frame * nchannels) + channel]; + } + } + return cb(data); + }); + + ScanStatus status; + TransferStatus transferStatus; + + UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); + throwUlException(err); + + increment = transferStatus.currentTotalCount - totalFramesCount; + totalFramesCount += increment; + + if (increment > nFramesPerBlock) { + throw Daq::StreamException(Daq::StreamStatus::StreamError::inputXRun); + } + assert(status == SS_RUNNING); + + if (transferStatus.currentIndex < (long long)buffer_mid_idx) { + topenqueued = false; + if (!botenqueued) { + ret = runCallback(nchannels * nFramesPerBlock); + botenqueued = true; + } + } else { + botenqueued = false; + if (!topenqueued) { + ret = runCallback(0); + topenqueued = true; + } + } + return ret; +} +InBufHandler::~InBufHandler() { + // At exit of the function, stop scanning. + DEBUGTRACE_ENTER; + UlError err = ulDaqInScanStop(daq.getHandle()); + if (err != ERR_NO_ERROR) { + showErr(err); + } +} + +OutBufHandler::OutBufHandler(DT9837A &daq, OutDaqCallback cb) + : BufHandler(daq, daq.nenoutchannels()), cb(cb) { + + DEBUGTRACE_MESSAGE("Starting output scan"); + DEBUGTRACE_PRINT(nchannels); + AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT; + ScanOption scanoptions = SO_CONTINUOUS; + UlError err = ulAOutScan(daq.getHandle(), 0, 0, BIP10VOLTS, + 2 * nFramesPerBlock, // Watch the 2 here! + &samplerate, scanoptions, outscanflags, buf.data()); + + throwUlException(err); +} +void OutBufHandler::start() { + + ScanStatus status; + TransferStatus transferStatus; + + UlError err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus); + if (err != ERR_NO_ERROR) { + showErr(err); + throw rte("Unable to start output on DAQ"); + } + if (status != SS_RUNNING) { + throw rte("Unable to start output on DAQ"); + } + totalFramesCount = transferStatus.currentTotalCount; + topenqueued = true; + botenqueued = true; +} +bool OutBufHandler::operator()() { + + /* DEBUGTRACE_ENTER; */ + bool res = true; + assert(daq.getHandle() != 0); + + UlError err = ERR_NO_ERROR; + + ScanStatus status; + TransferStatus transferStatus; + + err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus); + throwUlException(err); + if (status != SS_RUNNING) { + return false; + } + increment = transferStatus.currentTotalCount - totalFramesCount; + totalFramesCount += increment; + + if (increment > nFramesPerBlock) { + throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun); + } + + if (transferStatus.currentIndex < buffer_mid_idx) { + topenqueued = false; + if (!botenqueued) { + DaqData d(nFramesPerBlock, 1, DataTypeDescriptor::DataType::dtype_fl64); + res = cb(d); + d.copyToRaw(0, reinterpret_cast(&(buf[buffer_mid_idx]))); + + botenqueued = true; + } + } else { + botenqueued = false; + if (!topenqueued) { + DaqData d(nFramesPerBlock, 1, DataTypeDescriptor::DataType::dtype_fl64); + res = cb(d); + d.copyToRaw(0, reinterpret_cast(&(buf[0]))); + + topenqueued = true; + } + } + return res; +} + +OutBufHandler::~OutBufHandler() { + DEBUGTRACE_ENTER; + UlError err = ulAOutScanStop(daq.getHandle()); + if (err != ERR_NO_ERROR) { + showErr(err); + } +} + +void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { + + DEBUGTRACE_ENTER; + + try { + + std::unique_ptr obh; + std::unique_ptr ibh; + + StreamStatus status = _streamStatus; + status.isRunning = true; + _streamStatus = status; + + if (nenoutchannels() > 0) { + assert(outCallback); + obh = std::make_unique(*this, outCallback); + } + if (neninchannels() > 0) { + assert(inCallback); + ibh = std::make_unique(*this, inCallback); + } + if (obh) + obh->start(); + if (ibh) + ibh->start(); + + const double sleeptime_s = + static_cast(_nFramesPerBlock) / (16 * samplerate()); + const us sleeptime_us = static_cast(sleeptime_s * 1e6); + + while (!_stopThread) { + if (ibh) { + if (!(*ibh)()) { + _stopThread = true; + } + } + if (obh) { + if (!(*obh)()) { + _stopThread = true; + } + } else { + std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us)); + } + } + + /// Update stream status that we are not running anymore + status.isRunning = false; + _streamStatus = status; + _stopThread = false; + + } catch (StreamException &e) { + + StreamStatus status = _streamStatus; + // Copy over error type + status.errorType = e.e; + _streamStatus = status; + + /* + cerr << "\n******************\n"; + cerr << "Catched error in UlDAQ thread: " << e.what() << endl; + cerr << "\n******************\n"; + */ + } +} + +#endif // LASP_HAS_ULDAQ diff --git a/src/lasp/device/lasp_uldaq_impl.h b/src/lasp/device/lasp_uldaq_impl.h new file mode 100644 index 0000000..4fea809 --- /dev/null +++ b/src/lasp/device/lasp_uldaq_impl.h @@ -0,0 +1,118 @@ +#pragma once +#include "lasp_daq.h" +#include +#include +#include +#include +#include +#include +#include +#include + +using std::atomic; +using std::cerr; +using std::endl; +using rte = std::runtime_error; + +class DT9837A : public Daq { + + DaqDeviceHandle _handle = 0; + std::mutex _daqmutex; + + std::thread _thread; + atomic _stopThread{false}; + atomic _streamStatus; + + const us _nFramesPerBlock; + + void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback); + +public: + DaqDeviceHandle getHandle() const { return _handle; } + /** + * @brief Create a DT9837A instance. + * + * @param devinfo DeviceInfo to connect to + * @param config DaqConfiguration settings + */ + DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config); + + virtual ~DT9837A(); + + bool isRunning() const; + + void stop() override final; + + friend class InBufHandler; + friend class OutBufHandler; + + virtual void start(InDaqCallback inCallback, + OutDaqCallback outCallback) override final; + + virtual StreamStatus getStreamStatus() const override { + return _streamStatus; + } +}; + +/** + * @brief Helper class for managing input and output samples of the DAQ device. + */ +class BufHandler { +protected: + DT9837A &daq; + const DataTypeDescriptor dtype_descr; + us nchannels, nFramesPerBlock; + double samplerate; + std::vector buf; + bool topenqueued, botenqueued; + + us increment = 0; + + us totalFramesCount = 0; + long long buffer_mid_idx; + +public: + BufHandler(DT9837A &daq, const us nchannels) + : daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels), + nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()), + buf(2 * nchannels * + nFramesPerBlock, // Watch the two here, the top and the bottom! + 0), + buffer_mid_idx(nchannels * nFramesPerBlock) { + assert(nchannels > 0); + } +}; +/** + * @brief Specific helper for the input buffer + */ +class InBufHandler : public BufHandler { + bool monitorOutput; + InDaqCallback cb; + +public: + InBufHandler(DT9837A &daq, InDaqCallback cb); + void start(); + /** + * @brief InBufHandler::operator()() + * + * @return true on success + */ + bool operator()(); + ~InBufHandler(); + +}; +class OutBufHandler : public BufHandler { + OutDaqCallback cb; + +public: + OutBufHandler(DT9837A &daq, OutDaqCallback cb); + void start(); + /** + * @brief OutBufHandler::operator() + * + * @return true on success + */ + bool operator()(); + + ~OutBufHandler(); +}; diff --git a/src/lasp/lasp_record.py b/src/lasp/lasp_record.py index 8f83ae5..627c1ef 100644 --- a/src/lasp/lasp_record.py +++ b/src/lasp/lasp_record.py @@ -135,7 +135,8 @@ class Recording: f.attrs["channelNames"] = [ch.name for ch in in_ch] # Add the start delay here, as firstFrames() is called right after the - # constructor is called. + # constructor is called. time.time() returns a floating point + # number of seconds after epoch. f.attrs["time"] = time.time() + self.startDelay # In V2, we do not store JSON metadata anymore, but just an enumeration @@ -143,6 +144,7 @@ class Recording: f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch] # Measured physical quantity metadata + # This was how it was in LASP version < 1.0 # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch] def firstFrames(self, adata): @@ -174,18 +176,15 @@ class Recording: def inCallback(self, adata): """ - This method should be called to grab data from the input queue, which - is filled by the stream, and put it into a file. It should be called at - a regular interval to prevent overflowing of the queue. It is called - within the start() method of the recording, if block is set to True. - Otherwise, it should be called from its parent at regular intervals. - For example, in Qt this can be done using a QTimer. + This method is called when a block of audio data from the stream is + available. It should return either True or False. + When returning False, it will stop the stream. """ if self.stop(): # Stop flag is raised. We do not add any data anymore. - return + return True with self.file_mtx: diff --git a/src/lasp/pybind11/lasp_daq.cpp b/src/lasp/pybind11/lasp_daq.cpp index fa91280..4192f6a 100644 --- a/src/lasp/pybind11/lasp_daq.cpp +++ b/src/lasp/pybind11/lasp_daq.cpp @@ -25,18 +25,17 @@ void init_daq(py::module &m) { .value("driverError", Daq::StreamStatus::StreamError::driverError) .value("systemError", Daq::StreamStatus::StreamError::systemError) .value("threadError", Daq::StreamStatus::StreamError::threadError) - .value("logicError", Daq::StreamStatus::StreamError::logicError) - .value("apiSpecificError", - Daq::StreamStatus::StreamError::apiSpecificError); + .value("logicError", Daq::StreamStatus::StreamError::logicError); ss.def("errorMsg", &Daq::StreamStatus::errorMsg); /// Daq - daq.def("neninchannels", &Daq::neninchannels, py::arg("include_monitor") = true); + daq.def("neninchannels", &Daq::neninchannels, + py::arg("include_monitor") = true); + daq.def("nenoutchannels", &Daq::nenoutchannels); daq.def("samplerate", &Daq::samplerate); daq.def("dataType", &Daq::dataType); daq.def("framesPerBlock", &Daq::framesPerBlock); daq.def("getStreamStatus", &Daq::getStreamStatus); - }