From c87a5cec259c55efe843dfa83a10baa803dbabaf Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Wed, 7 Jun 2023 21:49:07 +0200 Subject: [PATCH] StreamMgr handle now via shared pointers. InDataHandler stores weak pointers. Reset callback in PyInDataHandler could be problematic. Refactored the UlDaq code and moved to a subfolder. --- src/lasp/device/CMakeLists.txt | 5 +- src/lasp/device/lasp_indatahandler.cpp | 33 ++- src/lasp/device/lasp_indatahandler.h | 9 +- src/lasp/device/lasp_streammgr.cpp | 58 ++-- src/lasp/device/lasp_streammgr.h | 11 +- src/lasp/device/lasp_uldaq.cpp | 19 +- src/lasp/device/lasp_uldaq_impl.h | 164 ----------- .../lasp_uldaq_bufhandler.cpp} | 261 +----------------- src/lasp/device/uldaq/lasp_uldaq_bufhandler.h | 99 +++++++ src/lasp/device/uldaq/lasp_uldaq_common.cpp | 45 +++ src/lasp/device/uldaq/lasp_uldaq_common.h | 60 ++++ src/lasp/device/uldaq/lasp_uldaq_impl.cpp | 212 ++++++++++++++ src/lasp/device/uldaq/lasp_uldaq_impl.h | 96 +++++++ src/lasp/dsp/lasp_clip.cpp | 2 +- src/lasp/dsp/lasp_clip.h | 2 +- src/lasp/dsp/lasp_ppm.cpp | 2 +- src/lasp/dsp/lasp_ppm.h | 2 +- src/lasp/dsp/lasp_rtaps.cpp | 2 +- src/lasp/dsp/lasp_rtaps.h | 2 +- src/lasp/dsp/lasp_rtsignalviewer.cpp | 2 +- src/lasp/dsp/lasp_rtsignalviewer.h | 2 +- src/lasp/dsp/lasp_threadedindatahandler.cpp | 5 +- src/lasp/dsp/lasp_threadedindatahandler.h | 4 +- src/lasp/pybind11/lasp_pyindatahandler.cpp | 49 ++-- src/lasp/pybind11/lasp_streammgr.cpp | 4 +- 25 files changed, 647 insertions(+), 503 deletions(-) delete mode 100644 src/lasp/device/lasp_uldaq_impl.h rename src/lasp/device/{lasp_uldaq_impl.cpp => uldaq/lasp_uldaq_bufhandler.cpp} (50%) create mode 100644 src/lasp/device/uldaq/lasp_uldaq_bufhandler.h create mode 100644 src/lasp/device/uldaq/lasp_uldaq_common.cpp create mode 100644 src/lasp/device/uldaq/lasp_uldaq_common.h create mode 100644 src/lasp/device/uldaq/lasp_uldaq_impl.cpp create mode 100644 src/lasp/device/uldaq/lasp_uldaq_impl.h diff --git a/src/lasp/device/CMakeLists.txt b/src/lasp/device/CMakeLists.txt index 0792341..133c788 100644 --- a/src/lasp/device/CMakeLists.txt +++ b/src/lasp/device/CMakeLists.txt @@ -1,4 +1,5 @@ # src/lasp/device/CMakeLists.txt +include_directories(uldaq) add_library(lasp_device_lib OBJECT lasp_daq.cpp @@ -9,7 +10,9 @@ add_library(lasp_device_lib OBJECT lasp_streammgr.cpp lasp_indatahandler.cpp lasp_uldaq.cpp - lasp_uldaq_impl.cpp + uldaq/lasp_uldaq_impl.cpp + uldaq/lasp_uldaq_bufhandler.cpp + uldaq/lasp_uldaq_common.cpp ) # Callback requires certain arguments that are not used by code. This disables diff --git a/src/lasp/device/lasp_indatahandler.cpp b/src/lasp/device/lasp_indatahandler.cpp index 6afce6c..1598e4c 100644 --- a/src/lasp/device/lasp_indatahandler.cpp +++ b/src/lasp/device/lasp_indatahandler.cpp @@ -1,27 +1,34 @@ /* #define DEBUGTRACE_ENABLED */ -#include -#include "debugtrace.hpp" #include "lasp_indatahandler.h" +#include "debugtrace.hpp" #include "lasp_streammgr.h" +#include -InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) -#if LASP_DEBUG==1 - , _main_thread_id(std::this_thread::get_id()) +InDataHandler::InDataHandler(SmgrHandle mgr) + : _mgr(mgr) +#if LASP_DEBUG == 1 + , + _main_thread_id(std::this_thread::get_id()) #endif -{ DEBUGTRACE_ENTER; } +{ + DEBUGTRACE_ENTER; +} void InDataHandler::start() { DEBUGTRACE_ENTER; - _mgr.addInDataHandler(*this); - + if (SmgrHandle handle = _mgr.lock()) { + handle->addInDataHandler(this); #if LASP_DEBUG == 1 - assert(_mgr._main_thread_id == _main_thread_id); + assert(handle->_main_thread_id == _main_thread_id); #endif + } } void InDataHandler::stop() { #if LASP_DEBUG == 1 stopCalled = true; #endif - _mgr.removeInDataHandler(*this); + if (SmgrHandle handle = _mgr.lock()) { + /* handle->removeInDataHandler(*this); */ + } } InDataHandler::~InDataHandler() { @@ -30,9 +37,9 @@ InDataHandler::~InDataHandler() { #if LASP_DEBUG == 1 if (!stopCalled) { std::cerr << "************ BUG: Stop function not called while arriving at " - "InDataHandler's destructor. Fix this by calling " - "InDataHandler::stop() from the derived class' destructor." - << std::endl; + "InDataHandler's destructor. Fix this by calling " + "InDataHandler::stop() from the derived class' destructor." + << std::endl; abort(); } #endif diff --git a/src/lasp/device/lasp_indatahandler.h b/src/lasp/device/lasp_indatahandler.h index 5008b68..e42824f 100644 --- a/src/lasp/device/lasp_indatahandler.h +++ b/src/lasp/device/lasp_indatahandler.h @@ -1,18 +1,21 @@ #pragma once #include +#include #include #include "lasp_types.h" +class StreamMgr; +using SmgrHandle = std::shared_ptr; + /** \addtogroup device * @{ */ -class StreamMgr; class DaqData; class Daq; class InDataHandler { protected: - StreamMgr &_mgr; + std::weak_ptr _mgr; #if LASP_DEBUG == 1 // This is a flag to indicate whether the method stop() is called for the // current handler. It should call the method stop() from the derived class's @@ -29,7 +32,7 @@ public: * * @param mgr Stream manager. */ - InDataHandler(StreamMgr &mgr); + InDataHandler(SmgrHandle mgr); /** * @brief This function is called when input data from a DAQ is available. diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index 19af4bc..d663222 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -1,24 +1,35 @@ -/* #define DEBUGTRACE_ENABLED */ -#include "debugtrace.hpp" +#define DEBUGTRACE_ENABLED #include "lasp_streammgr.h" +#include "debugtrace.hpp" #include "lasp_biquadbank.h" -#include "lasp_thread.h" #include "lasp_indatahandler.h" +#include "lasp_thread.h" #include #include #include #include +#include using std::cerr; using std::endl; using rte = std::runtime_error; +/** + * @brief The main global handle to a stream, stored in a shared pointer. + */ +std::shared_ptr _mgr; -StreamMgr &StreamMgr::getInstance() { - +std::shared_ptr StreamMgr::getInstance() { DEBUGTRACE_ENTER; - static StreamMgr mgr; - return mgr; + + if (!_mgr) { + _mgr = std::shared_ptr(new StreamMgr()); + if (!_mgr) { + throw rte("Fatal: could not allocate stream manager!"); + } + } + + return _mgr; } StreamMgr::StreamMgr() { @@ -211,13 +222,12 @@ bool StreamMgr::outCallback(DaqData &data) { StreamMgr::~StreamMgr() { DEBUGTRACE_ENTER; checkRightThread(); - stopAllStreams(); - if (!_inDataHandlers.empty()) { - cerr << "*** WARNING: InDataHandlers have not been all stopped, while " - "StreamMgr destructor is called. This is a misuse BUG" - << endl; - abort(); - } + // Stream manager now handled by shared pointer. Each indata handler gets a + // shared pointer to the stream manager, and stores a weak pointer to it. + // Hence, we do not have to do any cleanup here. It also makes sure that the + // order in which destructors are called does not matter anymore. As soon as + // the stream manager is destructed, the weak pointers loose there ref, and do + // not have to removeInDataHandler() anymore. } void StreamMgr::stopAllStreams() { DEBUGTRACE_ENTER; @@ -242,15 +252,15 @@ void StreamMgr::startStream(const DaqConfiguration &config) { std::scoped_lock lck(_devices_mtx); DeviceInfo *devinfo = nullptr; - bool found = false; + // Match configuration to a device in the list of devices for (auto &devinfoi : _devices) { if (config.match(*devinfoi)) { devinfo = devinfoi.get(); break; } } - if (!devinfo) { + if (devinfo == nullptr) { throw rte("Could not find a device with name " + config.device_name + " in list of devices."); } @@ -383,23 +393,20 @@ void StreamMgr::stopStream(const StreamType t) { } } -void StreamMgr::addInDataHandler(InDataHandler &handler) { +void StreamMgr::addInDataHandler(InDataHandler *handler) { DEBUGTRACE_ENTER; checkRightThread(); + assert(handler); std::scoped_lock lck(_inDataHandler_mtx); - if (_inputStream) { - handler.reset(_inputStream.get()); - } else { - handler.reset(nullptr); - } - if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), &handler) != + handler->reset(_inputStream.get()); + + if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) != _inDataHandlers.cend()) { throw std::runtime_error("Error: handler already added. Probably start() " "is called more than once on a handler object"); } - _inDataHandlers.push_back(&handler); + _inDataHandlers.push_back(handler); DEBUGTRACE_PRINT(_inDataHandlers.size()); - } void StreamMgr::removeInDataHandler(InDataHandler &handler) { @@ -409,7 +416,6 @@ void StreamMgr::removeInDataHandler(InDataHandler &handler) { _inDataHandlers.remove(&handler); DEBUGTRACE_PRINT(_inDataHandlers.size()); - } Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const { diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index 665ea3a..bb60cd0 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -57,15 +57,18 @@ class StreamMgr { */ DeviceInfoList _devices; + // Singleton, no public constructor. Can only be obtained using + // getInstance(); StreamMgr(); friend class InDataHandler; friend class Siggen; - // Singleton, no public destructor - ~StreamMgr(); public: + + ~StreamMgr(); + enum class StreamType : us { /** * @brief Input stream @@ -85,7 +88,7 @@ class StreamMgr { * * @return Reference to stream manager. */ - static StreamMgr &getInstance(); + static std::shared_ptr getInstance(); /** * @brief Obtain a list of devices currently available. When the StreamMgr is @@ -200,7 +203,7 @@ private: * * @param handler The handler to add. */ - void addInDataHandler(InDataHandler &handler); + void addInDataHandler(InDataHandler *handler); /** * @brief Do the actual rescanning. diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index a7252c3..0524cc4 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -1,6 +1,7 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" #include "lasp_config.h" + #if LASP_HAS_ULDAQ == 1 #include "lasp_uldaq.h" #include "lasp_uldaq_impl.h" @@ -17,13 +18,13 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { DaqDeviceDescriptor descriptor; DaqDeviceInterface interfaceType = ANY_IFC; - err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, - static_cast(&numdevs)); + err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, &numdevs); if (err != ERR_NO_ERROR) { throw rte("UlDaq device inventarization failed"); } + DEBUGTRACE_PRINT(string("Number of devices: ") + std::to_string(numdevs)); for (unsigned i = 0; i < numdevs; i++) { descriptor = devdescriptors[i]; @@ -33,7 +34,7 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { devinfo.api = uldaqapi; { - string name, interface; + string name; string productname = descriptor.productName; if (productname != "DT9837A") { throw rte("Unknown UlDAQ type: " + productname); @@ -56,9 +57,8 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { name = "Uknown interface = "; } - name += - string(descriptor.productName) + " " + string(descriptor.uniqueId); - devinfo.device_name = std::move(name); + name += productname + " " + string(descriptor.uniqueId); + devinfo.device_name = name; } devinfo.physicalOutputQty = DaqChannel::Qty::Voltage; @@ -93,7 +93,12 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { std::unique_ptr createUlDaqDevice(const DeviceInfo &devinfo, const DaqConfiguration &config) { - return std::make_unique(devinfo, config); + const UlDaqDeviceInfo *_info = + dynamic_cast(&devinfo); + if (_info == nullptr) { + throw rte("BUG: Could not cast DeviceInfo to UlDaqDeviceInfo"); + } + return std::make_unique(*_info, config); } #endif // LASP_HAS_ULDAQ diff --git a/src/lasp/device/lasp_uldaq_impl.h b/src/lasp/device/lasp_uldaq_impl.h deleted file mode 100644 index cc2cade..0000000 --- a/src/lasp/device/lasp_uldaq_impl.h +++ /dev/null @@ -1,164 +0,0 @@ -#pragma once -#include "debugtrace.hpp" -#include "lasp_daq.h" -#include -#include -#include -#include -#include -#include -#include -#include - -using std::atomic; -using std::cerr; -using std::endl; -using rte = std::runtime_error; - -/** - * @brief List of available sampling frequencies for DT9837A - */ -const std::vector ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000, - 22050, 24000, 32000, 44056, 44100, - 47250, 48000, 50000, 50400, 51000}; - - -/** - * @brief UlDaq-specific device information. Adds a copy of the underlying - * DaqDeDaqDeviceDescriptor. - */ -class UlDaqDeviceInfo : public DeviceInfo { - -public: - DaqDeviceDescriptor _uldaqDescriptor; - virtual std::unique_ptr clone() const override { - DEBUGTRACE_ENTER; - return std::make_unique(*this); - } -}; - -class DT9837A : public Daq { - - DaqDeviceHandle _handle = 0; - std::mutex _daqmutex; - - std::thread _thread; - atomic _stopThread{false}; - atomic _streamStatus; - - const us _nFramesPerBlock; - - void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback); - -public: - DaqDeviceHandle getHandle() const { return _handle; } - /** - * @brief Create a DT9837A instance. - * - * @param devinfo DeviceInfo to connect to - * @param config DaqConfiguration settings - */ - DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config); - - virtual ~DT9837A(); - - bool isRunning() const; - - void stop() override final; - - friend class InBufHandler; - friend class OutBufHandler; - - virtual void start(InDaqCallback inCallback, - OutDaqCallback outCallback) override final; - - virtual StreamStatus getStreamStatus() const override { - return _streamStatus; - } -}; - -/** - * @brief Helper class for managing input and output samples of the DAQ device. - */ -class BufHandler { -protected: - /** - * @brief Reference to underlying Daq - */ - DT9837A &daq; - /** - * @brief The type of data, in this case always double precision floats - */ - const DataTypeDescriptor dtype_descr = dtype_desc_fl64; - /** - * @brief The number of channels, number of frames per callback (block). - */ - us nchannels, nFramesPerBlock; - /** - * @brief Sampling frequency in Hz - */ - double samplerate; - std::vector buf; - /** - * @brief Whether the top / bottom part of the buffer are ready to be - * enqueued - */ - bool topenqueued, botenqueued; - - /** - * @brief Counter for the total number of frames acquired / sent since the - * start of the stream. - */ - us totalFramesCount = 0; - long long buffer_mid_idx; - -public: - /** - * @brief Initialize bufhandler - * - * @param daq - * @param nchannels - */ - BufHandler(DT9837A &daq, const us nchannels) - : daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels), - nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()), - buf(2 * nchannels * - nFramesPerBlock, // Watch the two here, the top and the bottom! - 0), - buffer_mid_idx(nchannels * nFramesPerBlock) { - assert(nchannels > 0); - } -}; -/** - * @brief Specific helper for the input buffer - */ -class InBufHandler : public BufHandler { - bool monitorOutput; - InDaqCallback cb; - -public: - InBufHandler(DT9837A &daq, InDaqCallback cb); - void start(); - /** - * @brief InBufHandler::operator()() - * - * @return true on success - */ - bool operator()(); - ~InBufHandler(); -}; -class OutBufHandler : public BufHandler { - OutDaqCallback cb; - -public: - OutBufHandler(DT9837A &daq, OutDaqCallback cb); - void start(); - /** - * @brief OutBufHandler::operator() - * - * @return true on success - */ - bool operator()(); - - ~OutBufHandler(); -}; diff --git a/src/lasp/device/lasp_uldaq_impl.cpp b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp similarity index 50% rename from src/lasp/device/lasp_uldaq_impl.cpp rename to src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp index e91f63e..7cc41c9 100644 --- a/src/lasp/device/lasp_uldaq_impl.cpp +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp @@ -3,192 +3,8 @@ #include "lasp_config.h" #if LASP_HAS_ULDAQ == 1 -#include "lasp_daqconfig.h" -#include "lasp_uldaq.h" -#include "lasp_uldaq_impl.h" - -using namespace std::literals::chrono_literals; - -/** - * @brief Reserve some space for an error message from UlDaq - */ -const us UL_ERR_MSG_LEN = 512; - -/** - * @brief Return a string corresponding to the UlDaq API error - * - * @param err error code - * - * @return Error string - */ -string getErrMsg(UlError err) { - string errstr; - errstr.reserve(UL_ERR_MSG_LEN); - char errmsg[UL_ERR_MSG_LEN]; - errstr = "UlDaq API Error: "; - ulGetErrMsg(err, errmsg); - errstr += errmsg; - return errstr; -} -inline void showErr(string errstr) { - std::cerr << "\b\n**************** UlDAQ backend error **********\n"; - std::cerr << errstr << std::endl; - std::cerr << "***********************************************\n\n"; -} -inline void showErr(UlError err) { - if (err != ERR_NO_ERROR) - showErr(getErrMsg(err)); -} - -DT9837A::~DT9837A() { - UlError err; - if (isRunning()) { - stop(); - } - - if (_handle) { - err = ulDisconnectDaqDevice(_handle); - showErr(err); - err = ulReleaseDaqDevice(_handle); - showErr(err); - } -} -DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config) - : Daq(devinfo, config), - _nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) { - - // Some sanity checks - if (inchannel_config.size() != 4) { - throw rte("Invalid length of enabled inChannels vector"); - } - - if (outchannel_config.size() != 1) { - throw rte("Invalid length of enabled outChannels vector"); - } - - if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) { - throw rte("Unsensible number of samples per block chosen"); - } - - if (samplerate() < ULDAQ_SAMPLERATES.at(0) || - samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size() - 1)) { - throw rte("Invalid sample rate"); - } - - const UlDaqDeviceInfo *_info = - dynamic_cast(&devinfo); - if (_info == nullptr) { - throw rte("BUG: Could not cast DeviceInfo to UlDaqDeviceInfo"); - } - - // get a handle to the DAQ device associated with the first descriptor - _handle = ulCreateDaqDevice(_info->_uldaqDescriptor); - - if (_handle == 0) { - throw rte("Unable to create a handle to the specified DAQ " - "device. Is the device currently in use? Please make sure to set " - "the DAQ configuration in duplex mode if simultaneous input and " - "output is required."); - } - - UlError err = ulConnectDaqDevice(_handle); - - if (err != ERR_NO_ERROR) { - ulReleaseDaqDevice(_handle); - _handle = 0; - throw rte("Unable to connect to device: " + getErrMsg(err)); - } - - /// Loop over input channels, set parameters - for (us ch = 0; ch < 4; ch++) { - - err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0); - showErr(err); - if (err != ERR_NO_ERROR) { - throw rte("Fatal: could normalize channel sensitivity"); - } - - CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC; - err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Fatal: could not set AC/DC coupling mode"); - } - - IepeMode iepe = - inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED; - err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Fatal: could not set IEPE mode"); - } - } -} - -bool DT9837A::isRunning() const { - DEBUGTRACE_ENTER; - return _thread.joinable(); -} -void DT9837A::stop() { - DEBUGTRACE_ENTER; - StreamStatus status = _streamStatus; - if (!isRunning()) { - throw rte("No data acquisition running"); - } - - // Stop the thread and join it - _stopThread = true; - assert(_thread.joinable()); - _thread.join(); - _stopThread = false; - - // Update stream status - status.isRunning = false; - _streamStatus = status; -} - -/** - * @brief Throws an appropriate stream exception based on the UlError number. - * The mapping is based on the error numbers as given in uldaq.h. There are a - * log of errors definded here (109 in total). Except for some, we will map - * most of them to a driver error. - * - * @param e The backend error code. - */ -inline void throwOnPossibleUlException(UlError err) { - if (err == ERR_NO_ERROR) { - return; - } - string errstr = getErrMsg(err); - showErr(errstr); - Daq::StreamStatus::StreamError serr; - if ((int)err == 18) { - serr = Daq::StreamStatus::StreamError::inputXRun; - } else if ((int)err == 19) { - serr = Daq::StreamStatus::StreamError::outputXRun; - } else { - serr = Daq::StreamStatus::StreamError::driverError; - } - - throw Daq::StreamException(serr, errstr); -} - -void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { - DEBUGTRACE_ENTER; - if (isRunning()) { - throw rte("DAQ is already running"); - } - if (neninchannels() > 0) { - if (!inCallback) - throw rte("DAQ requires a callback for input data"); - } - if (nenoutchannels() > 0) { - if (!outCallback) - throw rte("DAQ requires a callback for output data"); - } - assert(neninchannels() + nenoutchannels() > 0); - _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); -} +#include "lasp_uldaq_bufhandler.h" +#include "lasp_daq.h" InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb) : BufHandler(daq, daq.neninchannels()), cb(cb) @@ -371,7 +187,7 @@ void OutBufHandler::start() { } bool OutBufHandler::operator()() { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; bool res = true; assert(daq.getHandle() != 0); @@ -389,7 +205,8 @@ bool OutBufHandler::operator()() { totalFramesCount += increment; if (increment > nFramesPerBlock) { - throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun); + cerr << "totalFramesCount: " << totalFramesCount << ". Detected output underrun" << endl; + /* throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun); */ } if (transferStatus.currentIndex < buffer_mid_idx) { @@ -425,70 +242,4 @@ OutBufHandler::~OutBufHandler() { showErr(err); } } - -void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { - - DEBUGTRACE_ENTER; - - try { - - std::unique_ptr obh; - std::unique_ptr ibh; - - StreamStatus status = _streamStatus; - status.isRunning = true; - _streamStatus = status; - - if (nenoutchannels() > 0) { - assert(outCallback); - obh = std::make_unique(*this, outCallback); - } - if (neninchannels() > 0) { - assert(inCallback); - ibh = std::make_unique(*this, inCallback); - } - if (obh) - obh->start(); - if (ibh) - ibh->start(); - - const double sleeptime_s = - static_cast(_nFramesPerBlock) / (16 * samplerate()); - const us sleeptime_us = static_cast(sleeptime_s * 1e6); - - while (!_stopThread) { - if (ibh) { - if (!(*ibh)()) { - _stopThread = true; - break; - } - } - if (obh) { - if (!(*obh)()) { - _stopThread = true; - break; - } - } - - std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us)); - } - - /// Update stream status that we are not running anymore - status.isRunning = false; - _streamStatus = status; - _stopThread = false; - - } catch (StreamException &e) { - - StreamStatus status = _streamStatus; - // Copy over error type - status.errorType = e.e; - _streamStatus = status; - - cerr << "\n******************\n"; - cerr << "Catched error in UlDAQ thread: " << e.what() << endl; - cerr << "\n******************\n"; - } -} - -#endif // LASP_HAS_ULDAQ +#endif diff --git a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h new file mode 100644 index 0000000..05ca6ae --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h @@ -0,0 +1,99 @@ +#pragma once +#include +#include "lasp_types.h" +#include "lasp_uldaq_impl.h" +#include "lasp_uldaq_common.h" + + +class DT9837A; + +/** + * @brief Helper class for managing input and output samples of the DAQ device. + */ +class BufHandler { +protected: + /** + * @brief Reference to underlying Daq + */ + DT9837A &daq; + /** + * @brief The type of data, in this case always double precision floats + */ + const DataTypeDescriptor dtype_descr = dtype_desc_fl64; + /** + * @brief The number of channels, number of frames per callback (block). + */ + us nchannels, nFramesPerBlock; + /** + * @brief Sampling frequency in Hz + */ + double samplerate; + std::vector buf; + /** + * @brief Whether the top part of the buffer is enqueued + */ + bool topenqueued = false; + /** + * @brief Whether the bottom part of the buffer is enqueued + * enqueued + */ + bool botenqueued = false; + + /** + * @brief Counter for the total number of frames acquired / sent since the + * start of the stream. + */ + us totalFramesCount = 0; + long long buffer_mid_idx; + +public: + /** + * @brief Initialize bufhandler + * + * @param daq + * @param nchannels + */ + BufHandler(DT9837A &daq, const us nchannels) + : daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels), + nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()), + buf(2 * nchannels * + nFramesPerBlock, // Watch the two here, the top and the bottom! + 0), + buffer_mid_idx(nchannels * nFramesPerBlock) { + assert(nchannels > 0); + assert(nFramesPerBlock > 0); + } +}; +/** + * @brief Specific helper for the input buffer + */ +class InBufHandler : public BufHandler { + bool monitorOutput; + InDaqCallback cb; + +public: + InBufHandler(DT9837A &daq, InDaqCallback cb); + void start(); + /** + * @brief InBufHandler::operator()() + * + * @return true on success + */ + bool operator()(); + ~InBufHandler(); +}; +class OutBufHandler : public BufHandler { + OutDaqCallback cb; + +public: + OutBufHandler(DT9837A &daq, OutDaqCallback cb); + void start(); + /** + * @brief OutBufHandler::operator() + * + * @return true on success + */ + bool operator()(); + + ~OutBufHandler(); +}; diff --git a/src/lasp/device/uldaq/lasp_uldaq_common.cpp b/src/lasp/device/uldaq/lasp_uldaq_common.cpp new file mode 100644 index 0000000..d05c629 --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_common.cpp @@ -0,0 +1,45 @@ +/* #define DEBUGTRACE_ENABLED */ +#include "debugtrace.hpp" +#include "lasp_config.h" + +#if LASP_HAS_ULDAQ == 1 +#include "lasp_uldaq_common.h" +#include "lasp_daq.h" + +string getErrMsg(UlError err) { + string errstr; + errstr.reserve(ERR_MSG_LEN); + char errmsg[ERR_MSG_LEN]; + errstr = "UlDaq API Error: "; + ulGetErrMsg(err, errmsg); + errstr += errmsg; + return errstr; +} +void showErr(string errstr) { + std::cerr << "\b\n**************** UlDAQ backend error **********\n"; + std::cerr << errstr << std::endl; + std::cerr << "***********************************************\n\n"; +} +void showErr(UlError err) { + if (err != ERR_NO_ERROR) + showErr(getErrMsg(err)); +} +#endif + +void throwOnPossibleUlException(UlError err) { + if (err == ERR_NO_ERROR) { + return; + } + string errstr = getErrMsg(err); + showErr(errstr); + Daq::StreamStatus::StreamError serr; + if ((int)err == 18) { + serr = Daq::StreamStatus::StreamError::inputXRun; + } else if ((int)err == 19) { + serr = Daq::StreamStatus::StreamError::outputXRun; + } else { + serr = Daq::StreamStatus::StreamError::driverError; + } + + throw Daq::StreamException(serr, errstr); +} diff --git a/src/lasp/device/uldaq/lasp_uldaq_common.h b/src/lasp/device/uldaq/lasp_uldaq_common.h new file mode 100644 index 0000000..371974c --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_common.h @@ -0,0 +1,60 @@ +#pragma once +#include +#include +#include "lasp_deviceinfo.h" + +/** + * @brief Throws an appropriate stream exception based on the UlError number. + * The mapping is based on the error numbers as given in uldaq.h. There are a + * log of errors definded here (109 in total). Except for some, we will map + * most of them to a driver error. + * + * @param e The backend error code. + */ +void throwOnPossibleUlException(UlError err); + +/** + * @brief Return a string corresponding to the UlDaq API error + * + * @param err error code + * + * @return Error string + */ +string getErrMsg(UlError err); + +/** + * @brief Print error message to stderr + * + * @param errstr The string to print + */ +void showErr(UlError err); + +/** + * @brief Get a string representation of the error + * + * @param errstr + */ +void showErr(std::string errstr); + +/** + * @brief UlDaq-specific device information. Adds a copy of the underlying + * DaqDeDaqDeviceDescriptor. + */ +class UlDaqDeviceInfo : public DeviceInfo { + +public: + DaqDeviceDescriptor _uldaqDescriptor; + virtual std::unique_ptr clone() const override { + DEBUGTRACE_ENTER; + return std::make_unique(*this); + } +}; + +/** + * @brief List of available sampling frequencies for DT9837A + */ +const std::vector ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000, + 22050, 24000, 32000, 44056, 44100, + 47250, 48000, 50000, 50400, 51000}; + + diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.cpp b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp new file mode 100644 index 0000000..32419dc --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp @@ -0,0 +1,212 @@ +/* #define DEBUGTRACE_ENABLED */ +#include "debugtrace.hpp" +#include "lasp_config.h" + +#if LASP_HAS_ULDAQ == 1 +#include "lasp_daqconfig.h" +#include "lasp_uldaq.h" +#include "lasp_uldaq_bufhandler.h" +#include "lasp_uldaq_impl.h" + +using namespace std::literals::chrono_literals; + +DT9837A::~DT9837A() { + DEBUGTRACE_ENTER; + UlError err; + if (isRunning()) { + DEBUGTRACE_PRINT("Stop UlDAQ from destructor"); + stop(); + } + + if (_handle) { + DEBUGTRACE_PRINT("Disconnecting and releasing DaqDevice"); + /* err = ulDisconnectDaqDevice(_handle); */ + /* showErr(err); */ + err = ulReleaseDaqDevice(_handle); + showErr(err); + } +} +DT9837A::DT9837A(const UlDaqDeviceInfo &devinfo, const DaqConfiguration &config) + : Daq(devinfo, config), + _nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) { + + const DaqDeviceDescriptor &descriptor = devinfo._uldaqDescriptor; + DEBUGTRACE_PRINT(string("Device: ") + descriptor.productName); + DEBUGTRACE_PRINT(string("Product id: ") + to_string(descriptor.productId)); + DEBUGTRACE_PRINT(string("Dev string: ") + descriptor.devString); + DEBUGTRACE_PRINT(string("Unique id: ") + descriptor.uniqueId); + + // get a handle to the DAQ device associated with the first descriptor + _handle = ulCreateDaqDevice(descriptor); + + if (_handle == 0) { + throw rte("Unable to create a handle to the specified DAQ " + "device. Is the device currently in use? Please make sure to set " + "the DAQ configuration in duplex mode if simultaneous input and " + "output is required."); + } + + UlError err = ulConnectDaqDevice(_handle); + + if (err != ERR_NO_ERROR) { + ulReleaseDaqDevice(_handle); + _handle = 0; + throw rte("Unable to connect to device: " + getErrMsg(err)); + } + + /// Loop over input channels, set parameters + for (us ch = 0; ch < 4; ch++) { + + err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0); + showErr(err); + if (err != ERR_NO_ERROR) { + throw rte("Fatal: could normalize channel sensitivity"); + } + + CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC; + err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm); + if (err != ERR_NO_ERROR) { + showErr(err); + throw rte("Fatal: could not set AC/DC coupling mode"); + } + + IepeMode iepe = + inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED; + err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe); + if (err != ERR_NO_ERROR) { + showErr(err); + throw rte("Fatal: could not set IEPE mode"); + } + } +} + +bool DT9837A::isRunning() const { + DEBUGTRACE_ENTER; + /* return _thread.joinable(); */ + StreamStatus status = _streamStatus; + return status.isRunning; +} +void DT9837A::stop() { + DEBUGTRACE_ENTER; + StreamStatus status = _streamStatus; + status.isRunning = true; + _streamStatus = status; + /* if (!isRunning()) { */ + /* throw rte("No data acquisition running"); */ + /* } */ + + // Stop the thread and join it + /* _stopThread = true; */ + /* assert(_thread.joinable()); */ + /* _thread.join(); */ + /* _stopThread = false; */ + + // Update stream status + status.isRunning = false; + _streamStatus = status; +} + +void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { + DEBUGTRACE_ENTER; + if (isRunning()) { + throw rte("DAQ is already running"); + } + if (neninchannels() > 0) { + if (!inCallback) + throw rte("DAQ requires a callback for input data"); + } + if (nenoutchannels() > 0) { + if (!outCallback) + throw rte("DAQ requires a callback for output data"); + } + assert(neninchannels() + nenoutchannels() > 0); + /* _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); + */ +} + +void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { + + DEBUGTRACE_ENTER; + + try { + + std::unique_ptr obh; + std::unique_ptr ibh; + + StreamStatus status = _streamStatus; + status.isRunning = true; + _streamStatus = status; + + if (nenoutchannels() > 0) { + assert(outCallback); + obh = std::make_unique(*this, outCallback); + } + if (neninchannels() > 0) { + assert(inCallback); + ibh = std::make_unique(*this, inCallback); + } + if (obh) + obh->start(); + if (ibh) + ibh->start(); + + const double sleeptime_s = + static_cast(_nFramesPerBlock) / (16 * samplerate()); + const us sleeptime_us = static_cast(sleeptime_s * 1e6); + + while (!_stopThread) { + if (ibh) { + if (!(*ibh)()) { + _stopThread = true; + break; + } + } + if (obh) { + if (!(*obh)()) { + _stopThread = true; + break; + } + } + + std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us)); + } + + /// Update stream status that we are not running anymore + status.isRunning = false; + _streamStatus = status; + _stopThread = false; + + } catch (StreamException &e) { + + StreamStatus status = _streamStatus; + // Copy over error type + status.errorType = e.e; + _streamStatus = status; + + cerr << "\n******************\n"; + cerr << "Catched error in UlDAQ thread: " << e.what() << endl; + cerr << "\n******************\n"; + } +} + +void DT9837A::sanityChecks() const { + // Some sanity checks + if (inchannel_config.size() != 4) { + throw rte("Invalid length of enabled inChannels vector"); + } + + if (outchannel_config.size() != 1) { + throw rte("Invalid length of enabled outChannels vector"); + } + + if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) { + throw rte("Unsensible number of samples per block chosen"); + } + + if (samplerate() < ULDAQ_SAMPLERATES.at(0) || + samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size() - 1)) { + throw rte("Invalid sample rate"); + } +} + +#endif // LASP_HAS_ULDAQ diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.h b/src/lasp/device/uldaq/lasp_uldaq_impl.h new file mode 100644 index 0000000..aa170d0 --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.h @@ -0,0 +1,96 @@ +#pragma once +#include "debugtrace.hpp" +#include "lasp_uldaq_common.h" +#include +#include +#include +#include +#include +#include +#include +#include "lasp_daq.h" + +using std::atomic; +using std::cerr; +using std::endl; +using rte = std::runtime_error; + +class InBufHandler; +class OutBufHandler; + + +class DT9837A : public Daq { + + DaqDeviceHandle _handle = 0; + std::mutex _daqmutex; + + /** + * @brief The thread that is doing I/O with UlDaq + */ + std::thread _thread; + + + /** + * @brief Flag indicating the thread to stop processing. + */ + atomic _stopThread{false}; + /** + * @brief Storage for exchanging information on the stream + */ + atomic _streamStatus; + + const us _nFramesPerBlock; + + /** + * @brief The function that is running in a thread + * + * @param inCallback + * @param outcallback + */ + void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback); + + /** + * @brief Obtain a handle to the underlying device + * + * @return Handle + */ + DaqDeviceHandle getHandle() const { return _handle; } + /** + * @brief Perform several sanity checks + */ + void sanityChecks() const; +public: + + /** + * @brief Create a DT9837A instance. + * + * @param devinfo DeviceInfo to connect to + * @param config DaqConfiguration settings + */ + DT9837A(const UlDaqDeviceInfo &devinfo, const DaqConfiguration &config); + + virtual ~DT9837A(); + + bool isRunning() const; + + /** + * @brief Stop the data-acquisition + */ + void stop() override final; + + friend class InBufHandler; + friend class OutBufHandler; + + virtual void start(InDaqCallback inCallback, + OutDaqCallback outCallback) override final; + + /** + * @brief Obtain copy of stream status (thread-safe function) + * + * @return StreamStatus object + */ + virtual StreamStatus getStreamStatus() const override { + return _streamStatus; + } +}; + diff --git a/src/lasp/dsp/lasp_clip.cpp b/src/lasp/dsp/lasp_clip.cpp index fcf14e3..fe37945 100644 --- a/src/lasp/dsp/lasp_clip.cpp +++ b/src/lasp/dsp/lasp_clip.cpp @@ -11,7 +11,7 @@ using std::endl; using Lck = std::scoped_lock; using rte = std::runtime_error; -ClipHandler::ClipHandler(StreamMgr &mgr) +ClipHandler::ClipHandler(SmgrHandle mgr) : ThreadedInDataHandler(mgr){ DEBUGTRACE_ENTER; diff --git a/src/lasp/dsp/lasp_clip.h b/src/lasp/dsp/lasp_clip.h index e5f76e5..ec2dfd5 100644 --- a/src/lasp/dsp/lasp_clip.h +++ b/src/lasp/dsp/lasp_clip.h @@ -58,7 +58,7 @@ class ClipHandler: public ThreadedInDataHandler { * * @param mgr Stream Mgr to operate on */ - ClipHandler(StreamMgr& mgr); + ClipHandler(SmgrHandle mgr); ~ClipHandler(); /** diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 0afb46b..4cb185d 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -11,7 +11,7 @@ using std::endl; using Lck = std::scoped_lock; using rte = std::runtime_error; -PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps) +PPMHandler::PPMHandler(SmgrHandle mgr, const d decay_dBps) : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { DEBUGTRACE_ENTER; diff --git a/src/lasp/dsp/lasp_ppm.h b/src/lasp/dsp/lasp_ppm.h index 51b0872..aecf02d 100644 --- a/src/lasp/dsp/lasp_ppm.h +++ b/src/lasp/dsp/lasp_ppm.h @@ -73,7 +73,7 @@ class PPMHandler: public ThreadedInDataHandler { * @param decay_dBps The level decay in units dB/s, after a peak has been * hit. */ - PPMHandler(StreamMgr& mgr,const d decay_dBps = 20.0); + PPMHandler(SmgrHandle mgr,const d decay_dBps = 20.0); ~PPMHandler(); /** diff --git a/src/lasp/dsp/lasp_rtaps.cpp b/src/lasp/dsp/lasp_rtaps.cpp index f39e03e..9e66528 100644 --- a/src/lasp/dsp/lasp_rtaps.cpp +++ b/src/lasp/dsp/lasp_rtaps.cpp @@ -9,7 +9,7 @@ using std::cerr; using std::endl; using Lck = std::scoped_lock; -RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, +RtAps::RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, const us nfft, const Window::WindowType w, const d overlap_percentage, const d time_constant) diff --git a/src/lasp/dsp/lasp_rtaps.h b/src/lasp/dsp/lasp_rtaps.h index 5c86bb0..006ce1d 100644 --- a/src/lasp/dsp/lasp_rtaps.h +++ b/src/lasp/dsp/lasp_rtaps.h @@ -49,7 +49,7 @@ public: * * For all other arguments, see constructor of AvPowerSpectra */ - RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, const us nfft = 2048, + RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, const us nfft = 2048, const Window::WindowType w = Window::WindowType::Hann, const d overlap_percentage = 50., const d time_constant = -1); ~RtAps(); diff --git a/src/lasp/dsp/lasp_rtsignalviewer.cpp b/src/lasp/dsp/lasp_rtsignalviewer.cpp index 69af040..d6d06d5 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.cpp +++ b/src/lasp/dsp/lasp_rtsignalviewer.cpp @@ -11,7 +11,7 @@ using std::endl; using Lck = std::scoped_lock; using rte = std::runtime_error; -RtSignalViewer::RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, +RtSignalViewer::RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, const us resolution, const us channel) : ThreadedInDataHandler(mgr), _approx_time_hist(approx_time_hist), _resolution(resolution), _channel(channel) { diff --git a/src/lasp/dsp/lasp_rtsignalviewer.h b/src/lasp/dsp/lasp_rtsignalviewer.h index 85fefd1..0a7676d 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.h +++ b/src/lasp/dsp/lasp_rtsignalviewer.h @@ -71,7 +71,7 @@ public: * @param resolution Number of time points * @param channel The channel number */ - RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, const us resolution, + RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, const us resolution, const us channel); ~RtSignalViewer(); diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 84ee117..3240787 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -49,7 +49,7 @@ public: bool empty() const { return _contents == 0; } }; -ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) +ThreadedInDataHandler::ThreadedInDataHandler(SmgrHandle mgr) : InDataHandler(mgr), _queue(std::make_unique()) { DEBUGTRACE_ENTER; @@ -58,6 +58,7 @@ ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) getPool(); } void ThreadedInDataHandler::startThread() { + DEBUGTRACE_ENTER; _thread_can_safely_run = true; start(); } @@ -85,7 +86,7 @@ bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { return _lastCallbackResult; } void ThreadedInDataHandler::stopThread() { - + DEBUGTRACE_ENTER; // Make sure inCallback is no longer called _thread_can_safely_run = false; stop(); diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index c447ce0..12e162b 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -52,8 +52,8 @@ public: * * @param mgr StreamMgr singleton reference */ - ThreadedInDataHandler(StreamMgr &mgr); - ~ThreadedInDataHandler(); + ThreadedInDataHandler(SmgrHandle mgr); + virtual ~ThreadedInDataHandler(); /** * @brief Pushes a copy of the daqdata to the thread queue and returns diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index be6d33f..b13f0fd 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -1,14 +1,14 @@ /* #define DEBUGTRACE_ENABLED */ #include "arma_npy.h" #include "debugtrace.hpp" -#include "lasp_ppm.h" #include "lasp_clip.h" +#include "lasp_daq.h" +#include "lasp_daqdata.h" +#include "lasp_ppm.h" #include "lasp_rtaps.h" #include "lasp_rtsignalviewer.h" -#include "lasp_threadedindatahandler.h" -#include "lasp_daqdata.h" -#include "lasp_daq.h" #include "lasp_streammgr.h" +#include "lasp_threadedindatahandler.h" #include #include #include @@ -107,7 +107,7 @@ class PyIndataHandler : public ThreadedInDataHandler { py::function cb, reset_callback; public: - PyIndataHandler(StreamMgr &mgr, py::function cb, py::function reset_callback) + PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) : ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) { DEBUGTRACE_ENTER; @@ -119,7 +119,25 @@ public: DEBUGTRACE_ENTER; stopThread(); } - void reset(const Daq *daq) override final { reset_callback(daq); } + void reset(const Daq *daq) override final { + DEBUGTRACE_ENTER; + py::gil_scoped_acquire acquire; + try { + if (daq) { + reset_callback(daq); + } else { + reset_callback(py::none()); + } + } catch (py::error_already_set &e) { + cerr << "*************** Error calling reset callback!\n"; + cerr << e.what() << endl; + cerr << "*************** \n"; + /// Throwing a runtime error here does not work out one way or another. + /// Therefore, it is better to dive out and prevent undefined behaviour + abort(); + /* throw std::runtime_error(e.what()); */ + } + } /** * @brief Reads from the buffer @@ -174,12 +192,12 @@ void init_datahandler(py::module &m) { /// The C++ class is PyIndataHandler, but for Python, it is called /// InDataHandler py::class_ pyidh(m, "InDataHandler"); - pyidh.def(py::init()); + pyidh.def(py::init()); /// Peak Programme Meter py::class_ ppm(m, "PPMHandler"); - ppm.def(py::init()); - ppm.def(py::init()); + ppm.def(py::init()); + ppm.def(py::init()); ppm.def("getCurrentValue", [](const PPMHandler &ppm) { std::tuple tp = ppm.getCurrentValue(); @@ -190,10 +208,9 @@ void init_datahandler(py::module &m) { /// Clip Detector py::class_ clip(m, "ClipHandler"); - clip.def(py::init()); + clip.def(py::init()); clip.def("getCurrentValue", [](const ClipHandler &clip) { - arma::uvec cval = clip.getCurrentValue(); return ColToNpy(cval); // something goes wrong here @@ -202,7 +219,7 @@ void init_datahandler(py::module &m) { /// Real time Aps /// py::class_ rtaps(m, "RtAps"); - rtaps.def(py::init rtsv(m, "RtSignalViewer"); - rtsv.def(py::init()); rtsv.def("getCurrentValue", [](RtSignalViewer &rt) { diff --git a/src/lasp/pybind11/lasp_streammgr.cpp b/src/lasp/pybind11/lasp_streammgr.cpp index fa98657..101da9d 100644 --- a/src/lasp/pybind11/lasp_streammgr.cpp +++ b/src/lasp/pybind11/lasp_streammgr.cpp @@ -13,7 +13,7 @@ void init_streammgr(py::module &m) { /// The stream manager is a singleton, and the lifetime is managed elsewhere. // It should not be deleted. - py::class_> smgr( + py::class_> smgr( m, "StreamMgr"); py::enum_(smgr, "StreamType") @@ -23,7 +23,7 @@ void init_streammgr(py::module &m) { smgr.def("startStream", &StreamMgr::startStream); smgr.def("stopStream", &StreamMgr::stopStream); smgr.def_static("getInstance", []() { - return std::unique_ptr(&StreamMgr::getInstance()); + return StreamMgr::getInstance(); }); smgr.def("stopAllStreams", &StreamMgr::stopAllStreams);