From ae3f8043e0465600c41016583992273e5abb8083 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Fri, 2 Jun 2023 14:25:17 +0200 Subject: [PATCH 1/7] Bugfix: not cleanup done of h5 dataset in recording. That might be problematic --- src/lasp/lasp_record.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/lasp/lasp_record.py b/src/lasp/lasp_record.py index 627c1ef..243a687 100644 --- a/src/lasp/lasp_record.py +++ b/src/lasp/lasp_record.py @@ -218,6 +218,10 @@ class Recording: # from StreamMgr. self.indh = None + # Remove handle to dataset otherwise the h5 file is not closed + # properly. + self.ad = None + try: # Close the recording file self.f.close() From dd2bbb5973967f147c7a0182cb8e743ed828e5e9 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Tue, 6 Jun 2023 15:57:20 +0200 Subject: [PATCH 2/7] Some improvements in the clearyness of meaning in uldaq code. No bugs found. --- src/lasp/device/lasp_uldaq.cpp | 54 ++++++++++++++--------------- src/lasp/device/lasp_uldaq_impl.cpp | 48 +++++++++++++++---------- src/lasp/device/lasp_uldaq_impl.h | 4 ++- 3 files changed, 59 insertions(+), 47 deletions(-) diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index 34e1f8e..a7252c3 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -6,13 +6,10 @@ #include "lasp_uldaq_impl.h" #include - - void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { DEBUGTRACE_ENTER; - UlError err; unsigned int numdevs = MAX_ULDAQ_DEV_COUNT_PER_API; @@ -35,32 +32,35 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { devinfo._uldaqDescriptor = descriptor; devinfo.api = uldaqapi; - string name, interface; - string productname = descriptor.productName; - if (productname != "DT9837A") { - throw rte("Unknown UlDAQ type: " + productname); + { + string name, interface; + string productname = descriptor.productName; + if (productname != "DT9837A") { + throw rte("Unknown UlDAQ type: " + productname); + } + + switch (descriptor.devInterface) { + case USB_IFC: + name = "USB - "; + break; + case BLUETOOTH_IFC: + /* devinfo. */ + name = "Bluetooth - "; + break; + + case ETHERNET_IFC: + /* devinfo. */ + name = "Ethernet - "; + break; + default: + name = "Uknown interface = "; + } + + name += + string(descriptor.productName) + " " + string(descriptor.uniqueId); + devinfo.device_name = std::move(name); } - switch (descriptor.devInterface) { - case USB_IFC: - name = "USB - "; - break; - case BLUETOOTH_IFC: - /* devinfo. */ - name = "Bluetooth - "; - break; - - case ETHERNET_IFC: - /* devinfo. */ - name = "Ethernet - "; - break; - default: - name = "Uknown interface = "; - } - - name += string(descriptor.productName) + " " + string(descriptor.uniqueId); - devinfo.device_name = std::move(name); - devinfo.physicalOutputQty = DaqChannel::Qty::Voltage; devinfo.availableDataTypes.push_back( diff --git a/src/lasp/device/lasp_uldaq_impl.cpp b/src/lasp/device/lasp_uldaq_impl.cpp index 6177b89..e91f63e 100644 --- a/src/lasp/device/lasp_uldaq_impl.cpp +++ b/src/lasp/device/lasp_uldaq_impl.cpp @@ -70,7 +70,8 @@ DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config) throw rte("Unsensible number of samples per block chosen"); } - if (samplerate() < ULDAQ_SAMPLERATES.at(0) || samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size()-1)) { + if (samplerate() < ULDAQ_SAMPLERATES.at(0) || + samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size() - 1)) { throw rte("Invalid sample rate"); } @@ -135,11 +136,13 @@ void DT9837A::stop() { throw rte("No data acquisition running"); } + // Stop the thread and join it _stopThread = true; - if (_thread.joinable()) { - _thread.join(); - } + assert(_thread.joinable()); + _thread.join(); _stopThread = false; + + // Update stream status status.isRunning = false; _streamStatus = status; } @@ -150,9 +153,9 @@ void DT9837A::stop() { * log of errors definded here (109 in total). Except for some, we will map * most of them to a driver error. * - * @param e + * @param e The backend error code. */ -inline void throwUlException(UlError err) { +inline void throwOnPossibleUlException(UlError err) { if (err == ERR_NO_ERROR) { return; } @@ -207,14 +210,14 @@ InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb) // monitor channel (hence the false flag). dvec ranges = daq.inputRangeForEnabledChannels(false); - us enabled_ch_count = 0; + us enabled_ch_counter = 0; for (us chin = 0; chin < 4; chin++) { if (eninchannels_without_mon[chin] == true) { DaqInChanDescriptor indesc; indesc.type = DAQI_ANALOG_SE; indesc.channel = chin; - double rangeval = ranges.at(enabled_ch_count); + double rangeval = ranges.at(enabled_ch_counter); Range rangenum; if (fabs(rangeval - 1.0) < 1e-8) { rangenum = BIP1VOLTS; @@ -227,7 +230,7 @@ InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb) } indesc.range = rangenum; indescs.push_back(indesc); - enabled_ch_count++; + enabled_ch_counter++; } } @@ -248,7 +251,7 @@ InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb) err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels, 2 * nFramesPerBlock, // Watch the 2 here! &samplerate, scanoptions, inscanflags, buf.data()); - throwUlException(err); + throwOnPossibleUlException(err); } void InBufHandler::start() { @@ -256,7 +259,7 @@ void InBufHandler::start() { ScanStatus status; TransferStatus transferStatus; UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); - throwUlException(err); + throwOnPossibleUlException(err); totalFramesCount = transferStatus.currentTotalCount; topenqueued = true; @@ -279,10 +282,15 @@ bool InBufHandler::operator()() { if (monitorOutput) { for (us frame = 0; frame < nFramesPerBlock; frame++) { data.value(frame, 0) = - buf[totalOffset + (frame * nchannels) + (nchannels - 1)]; + buf[totalOffset // Offset to lowest part of the buffer, or not + + (frame * nchannels) // Data is interleaved, so skip each + + (nchannels - 1)] // Monitor comes as last in the channel list, + // but we want it first in the output data. + ; } } + // Now, all normal channels for (us channel = 0; channel < nchannels - monitorOffset; channel++) { /* DEBUGTRACE_PRINT(channel); */ for (us frame = 0; frame < nFramesPerBlock; frame++) { @@ -297,7 +305,7 @@ bool InBufHandler::operator()() { TransferStatus transferStatus; UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); - throwUlException(err); + throwOnPossibleUlException(err); us increment = transferStatus.currentTotalCount - totalFramesCount; totalFramesCount += increment; @@ -342,7 +350,7 @@ OutBufHandler::OutBufHandler(DT9837A &daq, OutDaqCallback cb) 2 * nFramesPerBlock, // Watch the 2 here! &samplerate, scanoptions, outscanflags, buf.data()); - throwUlException(err); + throwOnPossibleUlException(err); } void OutBufHandler::start() { @@ -373,7 +381,7 @@ bool OutBufHandler::operator()() { TransferStatus transferStatus; err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus); - throwUlException(err); + throwOnPossibleUlException(err); if (status != SS_RUNNING) { return false; } @@ -387,7 +395,9 @@ bool OutBufHandler::operator()() { if (transferStatus.currentIndex < buffer_mid_idx) { topenqueued = false; if (!botenqueued) { - DaqData d(nFramesPerBlock, 1, dtype_descr.dtype); + DaqData d(nFramesPerBlock, 1,// Only one output channel + dtype_descr.dtype); + // Receive data res = cb(d); d.copyToRaw(0, reinterpret_cast(&(buf[buffer_mid_idx]))); @@ -396,7 +406,9 @@ bool OutBufHandler::operator()() { } else { botenqueued = false; if (!topenqueued) { - DaqData d(nFramesPerBlock, 1, dtype_descr.dtype); + DaqData d(nFramesPerBlock, 1,// Only one output channel + dtype_descr.dtype); + // Receive res = cb(d); d.copyToRaw(0, reinterpret_cast(&(buf[0]))); @@ -473,11 +485,9 @@ void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { status.errorType = e.e; _streamStatus = status; - /* cerr << "\n******************\n"; cerr << "Catched error in UlDAQ thread: " << e.what() << endl; cerr << "\n******************\n"; - */ } } diff --git a/src/lasp/device/lasp_uldaq_impl.h b/src/lasp/device/lasp_uldaq_impl.h index 90c308a..cc2cade 100644 --- a/src/lasp/device/lasp_uldaq_impl.h +++ b/src/lasp/device/lasp_uldaq_impl.h @@ -1,4 +1,5 @@ #pragma once +#include "debugtrace.hpp" #include "lasp_daq.h" #include #include @@ -30,7 +31,8 @@ class UlDaqDeviceInfo : public DeviceInfo { public: DaqDeviceDescriptor _uldaqDescriptor; - virtual std::unique_ptr clone() const { + virtual std::unique_ptr clone() const override { + DEBUGTRACE_ENTER; return std::make_unique(*this); } }; From 6fc1bd90b13d7373332a881fd1406f19e4b5f736 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Tue, 6 Jun 2023 16:05:24 +0200 Subject: [PATCH 3/7] Moved IndataHandler to its own implementation file. Refactored some code. Fixed race conditions when starting and stopping indatahandlers. It appears that this does not solve the segfault, but is at least mitigates some race conditions when constructors are not ready on an object, and avoiding the call of virtual functions of an object which destructor has already been called. Added some extra assert check that a function is called from the right thread. Put explicit start and stop methods in constructor / destructor of PyInDataHandler. WARNING: this means all .start() and .stop() methods should be removed. THIS IS AN API break! --- src/lasp/device/CMakeLists.txt | 1 + src/lasp/device/lasp_daq.h | 2 + src/lasp/device/lasp_deviceinfo.h | 1 + src/lasp/device/lasp_indatahandler.cpp | 39 +++++++++++ src/lasp/device/lasp_indatahandler.h | 76 +++++++++++++++++++++ src/lasp/device/lasp_streammgr.cpp | 36 ++-------- src/lasp/device/lasp_streammgr.h | 72 +++---------------- src/lasp/dsp/lasp_clip.cpp | 6 +- src/lasp/dsp/lasp_ppm.cpp | 6 +- src/lasp/dsp/lasp_rtaps.cpp | 6 +- src/lasp/dsp/lasp_rtsignalviewer.cpp | 6 +- src/lasp/dsp/lasp_threadedindatahandler.cpp | 58 +++++++++++----- src/lasp/dsp/lasp_threadedindatahandler.h | 33 ++++++--- src/lasp/lasp_record.py | 5 +- src/lasp/pybind11/lasp_pyindatahandler.cpp | 25 +++---- src/lasp/pybind11/lasp_streammgr.cpp | 1 + 16 files changed, 232 insertions(+), 141 deletions(-) create mode 100644 src/lasp/device/lasp_indatahandler.cpp create mode 100644 src/lasp/device/lasp_indatahandler.h diff --git a/src/lasp/device/CMakeLists.txt b/src/lasp/device/CMakeLists.txt index e576d23..0792341 100644 --- a/src/lasp/device/CMakeLists.txt +++ b/src/lasp/device/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(lasp_device_lib OBJECT lasp_deviceinfo.cpp lasp_rtaudiodaq.cpp lasp_streammgr.cpp + lasp_indatahandler.cpp lasp_uldaq.cpp lasp_uldaq_impl.cpp ) diff --git a/src/lasp/device/lasp_daq.h b/src/lasp/device/lasp_daq.h index dbc7392..3fa35ff 100644 --- a/src/lasp/device/lasp_daq.h +++ b/src/lasp/device/lasp_daq.h @@ -5,6 +5,8 @@ #include "lasp_types.h" #include #include +#include +#include /** * \defgroup device Device interfacing diff --git a/src/lasp/device/lasp_deviceinfo.h b/src/lasp/device/lasp_deviceinfo.h index 81772ed..7d3d8ad 100644 --- a/src/lasp/device/lasp_deviceinfo.h +++ b/src/lasp/device/lasp_deviceinfo.h @@ -19,6 +19,7 @@ public: * @brief Virtual desctructor. Can be derived class. */ virtual ~DeviceInfo() {} + DeviceInfo& operator=(const DeviceInfo&) = delete; /** * @brief Clone a device info. diff --git a/src/lasp/device/lasp_indatahandler.cpp b/src/lasp/device/lasp_indatahandler.cpp new file mode 100644 index 0000000..6afce6c --- /dev/null +++ b/src/lasp/device/lasp_indatahandler.cpp @@ -0,0 +1,39 @@ +/* #define DEBUGTRACE_ENABLED */ +#include +#include "debugtrace.hpp" +#include "lasp_indatahandler.h" +#include "lasp_streammgr.h" + +InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) +#if LASP_DEBUG==1 + , _main_thread_id(std::this_thread::get_id()) +#endif +{ DEBUGTRACE_ENTER; } +void InDataHandler::start() { + DEBUGTRACE_ENTER; + _mgr.addInDataHandler(*this); + +#if LASP_DEBUG == 1 + assert(_mgr._main_thread_id == _main_thread_id); +#endif +} +void InDataHandler::stop() { +#if LASP_DEBUG == 1 + stopCalled = true; +#endif + _mgr.removeInDataHandler(*this); +} + +InDataHandler::~InDataHandler() { + + DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + if (!stopCalled) { + std::cerr << "************ BUG: Stop function not called while arriving at " + "InDataHandler's destructor. Fix this by calling " + "InDataHandler::stop() from the derived class' destructor." + << std::endl; + abort(); + } +#endif +} diff --git a/src/lasp/device/lasp_indatahandler.h b/src/lasp/device/lasp_indatahandler.h new file mode 100644 index 0000000..5008b68 --- /dev/null +++ b/src/lasp/device/lasp_indatahandler.h @@ -0,0 +1,76 @@ +#pragma once +#include +#include +#include "lasp_types.h" + +/** \addtogroup device + * @{ + */ +class StreamMgr; +class DaqData; +class Daq; +class InDataHandler { + +protected: + StreamMgr &_mgr; +#if LASP_DEBUG == 1 + // This is a flag to indicate whether the method stop() is called for the + // current handler. It should call the method stop() from the derived class's + // destructor. + std::atomic stopCalled{false}; +#endif + +public: + virtual ~InDataHandler(); + + /** + * @brief When constructed, the handler is added to the stream manager, which + * will call the handlers's inCallback() until stop() is called. + * + * @param mgr Stream manager. + */ + InDataHandler(StreamMgr &mgr); + + /** + * @brief This function is called when input data from a DAQ is available. + * + * @param daqdata Input data from DAQ + * + * @return true if no error. False to stop the stream from running. + */ + virtual bool inCallback(const DaqData &daqdata) = 0; + + /** + * @brief Reset in-data handler. + * + * @param daq New DAQ configuration of inCallback(). If nullptr is given, + * it means that the stream is stopped. + */ + virtual void reset(const Daq *daq = nullptr) = 0; + + /** + * @brief This function should be called from the constructor of the + * implementation of InDataHandler, that one than also implements + * `inCallback`. It will start the stream's calling of inCallback(). + */ + void start(); + + /** + * @brief This function should be called from the destructor of derived + * classes, to disable the calls to inCallback(), such that proper + * destruction of the object is allowed and no other threads call methods + * from the object. It removes the inCallback() from the callback list of the + * StreamMgr(). **Failing to call this function results in deadlocks, errors + * like "pure virtual function called", or other**. + */ + void stop(); + +private: +#if LASP_DEBUG == 1 + const std::thread::id _main_thread_id; + void checkRightThread() const; +#else + void checkRightThread() const {} +#endif +}; +/** @} */ diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index 42adf4c..19af4bc 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -1,8 +1,9 @@ /* #define DEBUGTRACE_ENABLED */ -#include "lasp_streammgr.h" #include "debugtrace.hpp" +#include "lasp_streammgr.h" #include "lasp_biquadbank.h" #include "lasp_thread.h" +#include "lasp_indatahandler.h" #include #include #include @@ -12,30 +13,6 @@ using std::cerr; using std::endl; using rte = std::runtime_error; -InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) { DEBUGTRACE_ENTER; } -void InDataHandler::start() { - DEBUGTRACE_ENTER; - _mgr.addInDataHandler(*this); -} -void InDataHandler::stop() { -#if LASP_DEBUG == 1 - stopCalled = true; -#endif - _mgr.removeInDataHandler(*this); -} -InDataHandler::~InDataHandler() { - - DEBUGTRACE_ENTER; -#if LASP_DEBUG == 1 - if (!stopCalled) { - cerr << "************ BUG: Stop function not called while arriving at " - "InDataHandler's destructor. Fix this by calling " - "InDataHandler::stop() from the derived class' destructor." - << endl; - abort(); - } -#endif -} StreamMgr &StreamMgr::getInstance() { @@ -43,6 +20,7 @@ StreamMgr &StreamMgr::getInstance() { static StreamMgr mgr; return mgr; } + StreamMgr::StreamMgr() { DEBUGTRACE_ENTER; #if LASP_DEBUG == 1 @@ -63,16 +41,16 @@ void StreamMgr::rescanDAQDevices(bool background, auto &pool = getPool(); checkRightThread(); + if (_inputStream || _outputStream) { + throw rte("Rescanning DAQ devices only possible when no stream is running"); + } if (!_devices_mtx.try_lock()) { throw rte("A background DAQ device scan is probably already running"); } _devices_mtx.unlock(); - if (_inputStream || _outputStream) { - throw rte("Rescanning DAQ devices only possible when no stream is running"); - } + std::scoped_lock lck(_devices_mtx); _devices.clear(); - /* auto &pool = getPool(); */ if (!background) { rescanDAQDevices_impl(callback); } else { diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index fc69222..665ea3a 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -10,60 +10,8 @@ * @{ */ class StreamMgr; +class InDataHandler; -class InDataHandler { - -protected: - StreamMgr &_mgr; -#if LASP_DEBUG == 1 - std::atomic stopCalled{false}; -#endif - -public: - virtual ~InDataHandler(); - - /** - * @brief When constructed, the handler is added to the stream manager, which - * will call the handlers's inCallback() until stop() is called. - * - * @param mgr Stream manager. - */ - InDataHandler(StreamMgr &mgr); - - /** - * @brief This function is called when input data from a DAQ is available. - * - * @param daqdata Input data from DAQ - * - * @return true if no error. False to stop the stream from running. - */ - virtual bool inCallback(const DaqData &daqdata) = 0; - - /** - * @brief Reset in-data handler. - * - * @param daq New DAQ configuration of inCallback(). If nullptr is given, - * it means that the stream is stopped. - */ - virtual void reset(const Daq *daq = nullptr) = 0; - - /** - * @brief This function should be called from the constructor of the - * implementation of InDataHandler. It will start the stream's calling of - * inCallback(). - */ - void start(); - - /** - * @brief This function should be called from the destructor of derived - * classes, to disable the calls to inCallback(), such that proper - * destruction of the object is allowed and no other threads call methods - * from the object. It removes the inCallback() from the callback list of the - * StreamMgr(). **Failing to call this function results in deadlocks, errors - * like "pure virtual function called", or other**. - */ - void stop(); -}; class SeriesBiquad; @@ -76,9 +24,6 @@ class SeriesBiquad; * fact is asserted. */ class StreamMgr { -#if LASP_DEBUG == 1 - std::thread::id _main_thread_id; -#endif /** * @brief Storage for streams. @@ -91,22 +36,25 @@ class StreamMgr { * thread-safety. */ std::list _inDataHandlers; - std::mutex _inDataHandler_mtx; + mutable std::mutex _inDataHandler_mtx; /** * @brief Signal generator in use to generate output data. Currently * implemented as to generate the same data for all output channels. */ std::shared_ptr _siggen; + std::mutex _siggen_mtx; /** * @brief Filters on input stream. For example, a digital high pass filter. */ std::vector> _inputFilters; - std::mutex _siggen_mtx; - std::mutex _devices_mtx; + mutable std::recursive_mutex _devices_mtx; + /** + * @brief Current storage for the device list + */ DeviceInfoList _devices; StreamMgr(); @@ -146,7 +94,7 @@ class StreamMgr { * @return A copy of the internal stored list of devices */ DeviceInfoList getDeviceInfo() const { - std::scoped_lock lck(const_cast(_devices_mtx)); + std::scoped_lock lck(_devices_mtx); DeviceInfoList d2; for(const auto& dev: _devices) { d2.push_back(dev->clone()); @@ -157,7 +105,8 @@ class StreamMgr { /** * @brief Triggers a background scan of the DAQ devices, which updates the * internally stored list of devices. Throws a runtime error when a - * background thread is already scanning for devices. + * background thread is already scanning for devices, or if a stream is + * running. * * @param background Perform searching for DAQ devices in the background. If * set to true, the function returns immediately. @@ -261,6 +210,7 @@ private: void rescanDAQDevices_impl(std::function callback); #if LASP_DEBUG == 1 + std::thread::id _main_thread_id; void checkRightThread() const; #else void checkRightThread() const {} diff --git a/src/lasp/dsp/lasp_clip.cpp b/src/lasp/dsp/lasp_clip.cpp index a3b8c8f..fcf14e3 100644 --- a/src/lasp/dsp/lasp_clip.cpp +++ b/src/lasp/dsp/lasp_clip.cpp @@ -1,6 +1,8 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" #include "lasp_clip.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include using std::cerr; @@ -13,6 +15,7 @@ ClipHandler::ClipHandler(StreamMgr &mgr) : ThreadedInDataHandler(mgr){ DEBUGTRACE_ENTER; + startThread(); } bool ClipHandler::inCallback_threaded(const DaqData &d) { @@ -89,6 +92,5 @@ void ClipHandler::reset(const Daq *daq) { ClipHandler::~ClipHandler() { DEBUGTRACE_ENTER; - Lck lck(_mtx); - stop(); + stopThread(); } diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 4e9c640..0afb46b 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -1,6 +1,8 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" #include "lasp_ppm.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include using std::cerr; @@ -13,6 +15,7 @@ PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps) : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { DEBUGTRACE_ENTER; + startThread(); } bool PPMHandler::inCallback_threaded(const DaqData &d) { @@ -106,6 +109,5 @@ void PPMHandler::reset(const Daq *daq) { PPMHandler::~PPMHandler() { DEBUGTRACE_ENTER; - Lck lck(_mtx); - stop(); + stopThread(); } diff --git a/src/lasp/dsp/lasp_rtaps.cpp b/src/lasp/dsp/lasp_rtaps.cpp index 1e7d2cb..f39e03e 100644 --- a/src/lasp/dsp/lasp_rtaps.cpp +++ b/src/lasp/dsp/lasp_rtaps.cpp @@ -1,5 +1,7 @@ /* #define DEBUGTRACE_ENABLED */ #include "lasp_rtaps.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include "debugtrace.hpp" #include @@ -18,10 +20,10 @@ RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, _filterPrototype = freqWeightingFilter->clone(); } + startThread(); } RtAps::~RtAps() { - Lck lck(_ps_mtx); - stop(); + stopThread(); } bool RtAps::inCallback_threaded(const DaqData &data) { diff --git a/src/lasp/dsp/lasp_rtsignalviewer.cpp b/src/lasp/dsp/lasp_rtsignalviewer.cpp index fa16ca6..69af040 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.cpp +++ b/src/lasp/dsp/lasp_rtsignalviewer.cpp @@ -1,5 +1,7 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include "lasp_rtsignalviewer.h" #include #include @@ -22,6 +24,7 @@ RtSignalViewer::RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, if (resolution <= 1) { throw rte("Invalid resolution. Should be > 1"); } + startThread(); } bool RtSignalViewer::inCallback_threaded(const DaqData &data) { @@ -54,8 +57,7 @@ bool RtSignalViewer::inCallback_threaded(const DaqData &data) { } RtSignalViewer::~RtSignalViewer() { - Lck lck(_sv_mtx); - stop(); + stopThread(); } void RtSignalViewer::reset(const Daq *daq) { diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 9223720..84ee117 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -1,11 +1,12 @@ /* #define DEBUGTRACE_ENABLED */ #include "lasp_threadedindatahandler.h" #include "debugtrace.hpp" +#include "lasp_daqdata.h" #include "lasp_thread.h" #include -#include -#include #include +#include +#include using namespace std::literals::chrono_literals; using lck = std::scoped_lock; @@ -16,9 +17,10 @@ using std::endl; class SafeQueue { std::queue _queue; std::mutex _mtx; - std::atomic_int32_t _contents {0}; - public: - void push(const DaqData& d) { + std::atomic_int32_t _contents{0}; + +public: + void push(const DaqData &d) { DEBUGTRACE_ENTER; lck lock(_mtx); _queue.push(d); @@ -48,24 +50,32 @@ class SafeQueue { }; ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) - : InDataHandler(mgr), _queue(std::make_unique()) { + : InDataHandler(mgr), _queue(std::make_unique()) { - DEBUGTRACE_ENTER; + DEBUGTRACE_ENTER; - // Initialize thread pool, if not already done - getPool(); - } + // Initialize thread pool, if not already done + getPool(); +} +void ThreadedInDataHandler::startThread() { + _thread_can_safely_run = true; + start(); +} bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { DEBUGTRACE_ENTER; + std::scoped_lock lck(_mtx); + + // Early return in case object is under DESTRUCTION + if (!_thread_can_safely_run) + return true; if (!_lastCallbackResult) { return false; } _queue->push(daqdata); - - if (!_thread_running && (!_stopThread) && _lastCallbackResult) { + if (!_thread_running && _lastCallbackResult) { auto &pool = getPool(); DEBUGTRACE_PRINT("Pushing new thread in pool"); _thread_running = true; @@ -74,11 +84,13 @@ bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { return _lastCallbackResult; } +void ThreadedInDataHandler::stopThread() { -ThreadedInDataHandler::~ThreadedInDataHandler() { + // Make sure inCallback is no longer called + _thread_can_safely_run = false; + stop(); - DEBUGTRACE_ENTER; - _stopThread = true; + std::scoped_lock lck(_mtx); // Then wait in steps for the thread to stop running. while (_thread_running) { @@ -86,16 +98,28 @@ ThreadedInDataHandler::~ThreadedInDataHandler() { } } +ThreadedInDataHandler::~ThreadedInDataHandler() { + + DEBUGTRACE_ENTER; + if (_thread_can_safely_run) { + stopThread(); + cerr << "*** BUG: InDataHandlers have not been all stopped, while " + "StreamMgr destructor is called. This is a misuse BUG." + << endl; + abort(); + } +} + void ThreadedInDataHandler::threadFcn() { DEBUGTRACE_ENTER; - while(!_queue->empty() && !_stopThread) { + while (!_queue->empty() && _thread_can_safely_run) { // Call inCallback_threaded if (!inCallback_threaded(_queue->pop())) { cerr << "*********** Callback result returned false! *************" - << endl; + << endl; _lastCallbackResult = false; } } diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index 20f5df2..c447ce0 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -1,9 +1,11 @@ #pragma once -#include "lasp_streammgr.h" +#include "lasp_indatahandler.h" +#include +#include +#include const us RINGBUFFER_SIZE = 1024; - /** * \addtogroup dsp * @{ @@ -17,25 +19,40 @@ class SafeQueue; * @brief Threaded in data handler. Buffers inCallback data and calls a * callback with the same signature on a different thread. */ -class ThreadedInDataHandler: public InDataHandler { +class ThreadedInDataHandler : protected InDataHandler { /** * @brief The queue used to push elements to the handling thread. */ std::unique_ptr _queue; + mutable std::recursive_mutex _mtx; std::atomic _thread_running{false}; - std::atomic _stopThread{false}; std::atomic _lastCallbackResult{true}; + std::atomic _thread_can_safely_run{false}; void threadFcn(); - public: +protected: + /** + * @brief This method should be called from the derived class' constructor, + * to start the thread and data is incoming. + */ + void startThread(); + /** + * @brief This method SHOULD be called from all classes that derive on + * ThreadedInDataHandler. It is to make sure the inCallback_threaded() + * function is no longer called when the destructor of the derived class is + * called. Not calling this function is regarded as a BUG. + */ + void stopThread(); + +public: /** * @brief Initialize a ThreadedInDataHandler * * @param mgr StreamMgr singleton reference */ - ThreadedInDataHandler(StreamMgr& mgr); + ThreadedInDataHandler(StreamMgr &mgr); ~ThreadedInDataHandler(); /** @@ -55,10 +72,8 @@ class ThreadedInDataHandler: public InDataHandler { * * @return true on succes. False when an error occured. */ - virtual bool inCallback_threaded(const DaqData& d) = 0; - + virtual bool inCallback_threaded(const DaqData &d) = 0; }; - /** @} */ /** @} */ diff --git a/src/lasp/lasp_record.py b/src/lasp/lasp_record.py index 243a687..307f2cb 100644 --- a/src/lasp/lasp_record.py +++ b/src/lasp/lasp_record.py @@ -19,7 +19,9 @@ class RecordStatus: class Recording: """ - Class used to perform a recording. + Class used to perform a recording. Recording data can come in from a + different thread, that is supposed to call the `inCallback` method, with + audio data as an argument. """ def __init__( @@ -99,7 +101,6 @@ class Recording: logging.debug("Starting record....") self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback) - self.indh.start() if wait: logging.debug("Stop recording with CTRL-C") diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index d8639b3..be6d33f 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -5,8 +5,10 @@ #include "lasp_clip.h" #include "lasp_rtaps.h" #include "lasp_rtsignalviewer.h" -#include "lasp_streammgr.h" #include "lasp_threadedindatahandler.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" +#include "lasp_streammgr.h" #include #include #include @@ -111,11 +113,11 @@ public: DEBUGTRACE_ENTER; /// Start should be called externally, as at constructor time no virtual /// functions should be called. - /* start(); */ + startThread(); } ~PyIndataHandler() { DEBUGTRACE_ENTER; - stop(); + stopThread(); } void reset(const Daq *daq) override final { reset_callback(daq); } @@ -169,20 +171,13 @@ public: void init_datahandler(py::module &m) { - py::class_ idh(m, "InDataHandler_base"); - idh.def("start", &InDataHandler::start); - idh.def("stop", &InDataHandler::stop); - - py::class_ tidh( - m, "ThreadedInDataHandler"); - /// The C++ class is PyIndataHandler, but for Python, it is called /// InDataHandler - py::class_ pyidh(m, "InDataHandler"); + py::class_ pyidh(m, "InDataHandler"); pyidh.def(py::init()); /// Peak Programme Meter - py::class_ ppm(m, "PPMHandler"); + py::class_ ppm(m, "PPMHandler"); ppm.def(py::init()); ppm.def(py::init()); @@ -194,7 +189,7 @@ void init_datahandler(py::module &m) { }); /// Clip Detector - py::class_ clip(m, "ClipHandler"); + py::class_ clip(m, "ClipHandler"); clip.def(py::init()); clip.def("getCurrentValue", [](const ClipHandler &clip) { @@ -206,7 +201,7 @@ void init_datahandler(py::module &m) { /// Real time Aps /// - py::class_ rtaps(m, "RtAps"); + py::class_ rtaps(m, "RtAps"); rtaps.def(py::init rtsv(m, "RtSignalViewer"); + py::class_ rtsv(m, "RtSignalViewer"); rtsv.def(py::init #include #include From c87a5cec259c55efe843dfa83a10baa803dbabaf Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Wed, 7 Jun 2023 21:49:07 +0200 Subject: [PATCH 4/7] StreamMgr handle now via shared pointers. InDataHandler stores weak pointers. Reset callback in PyInDataHandler could be problematic. Refactored the UlDaq code and moved to a subfolder. --- src/lasp/device/CMakeLists.txt | 5 +- src/lasp/device/lasp_indatahandler.cpp | 33 ++- src/lasp/device/lasp_indatahandler.h | 9 +- src/lasp/device/lasp_streammgr.cpp | 58 ++-- src/lasp/device/lasp_streammgr.h | 11 +- src/lasp/device/lasp_uldaq.cpp | 19 +- src/lasp/device/lasp_uldaq_impl.h | 164 ----------- .../lasp_uldaq_bufhandler.cpp} | 261 +----------------- src/lasp/device/uldaq/lasp_uldaq_bufhandler.h | 99 +++++++ src/lasp/device/uldaq/lasp_uldaq_common.cpp | 45 +++ src/lasp/device/uldaq/lasp_uldaq_common.h | 60 ++++ src/lasp/device/uldaq/lasp_uldaq_impl.cpp | 212 ++++++++++++++ src/lasp/device/uldaq/lasp_uldaq_impl.h | 96 +++++++ src/lasp/dsp/lasp_clip.cpp | 2 +- src/lasp/dsp/lasp_clip.h | 2 +- src/lasp/dsp/lasp_ppm.cpp | 2 +- src/lasp/dsp/lasp_ppm.h | 2 +- src/lasp/dsp/lasp_rtaps.cpp | 2 +- src/lasp/dsp/lasp_rtaps.h | 2 +- src/lasp/dsp/lasp_rtsignalviewer.cpp | 2 +- src/lasp/dsp/lasp_rtsignalviewer.h | 2 +- src/lasp/dsp/lasp_threadedindatahandler.cpp | 5 +- src/lasp/dsp/lasp_threadedindatahandler.h | 4 +- src/lasp/pybind11/lasp_pyindatahandler.cpp | 49 ++-- src/lasp/pybind11/lasp_streammgr.cpp | 4 +- 25 files changed, 647 insertions(+), 503 deletions(-) delete mode 100644 src/lasp/device/lasp_uldaq_impl.h rename src/lasp/device/{lasp_uldaq_impl.cpp => uldaq/lasp_uldaq_bufhandler.cpp} (50%) create mode 100644 src/lasp/device/uldaq/lasp_uldaq_bufhandler.h create mode 100644 src/lasp/device/uldaq/lasp_uldaq_common.cpp create mode 100644 src/lasp/device/uldaq/lasp_uldaq_common.h create mode 100644 src/lasp/device/uldaq/lasp_uldaq_impl.cpp create mode 100644 src/lasp/device/uldaq/lasp_uldaq_impl.h diff --git a/src/lasp/device/CMakeLists.txt b/src/lasp/device/CMakeLists.txt index 0792341..133c788 100644 --- a/src/lasp/device/CMakeLists.txt +++ b/src/lasp/device/CMakeLists.txt @@ -1,4 +1,5 @@ # src/lasp/device/CMakeLists.txt +include_directories(uldaq) add_library(lasp_device_lib OBJECT lasp_daq.cpp @@ -9,7 +10,9 @@ add_library(lasp_device_lib OBJECT lasp_streammgr.cpp lasp_indatahandler.cpp lasp_uldaq.cpp - lasp_uldaq_impl.cpp + uldaq/lasp_uldaq_impl.cpp + uldaq/lasp_uldaq_bufhandler.cpp + uldaq/lasp_uldaq_common.cpp ) # Callback requires certain arguments that are not used by code. This disables diff --git a/src/lasp/device/lasp_indatahandler.cpp b/src/lasp/device/lasp_indatahandler.cpp index 6afce6c..1598e4c 100644 --- a/src/lasp/device/lasp_indatahandler.cpp +++ b/src/lasp/device/lasp_indatahandler.cpp @@ -1,27 +1,34 @@ /* #define DEBUGTRACE_ENABLED */ -#include -#include "debugtrace.hpp" #include "lasp_indatahandler.h" +#include "debugtrace.hpp" #include "lasp_streammgr.h" +#include -InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) -#if LASP_DEBUG==1 - , _main_thread_id(std::this_thread::get_id()) +InDataHandler::InDataHandler(SmgrHandle mgr) + : _mgr(mgr) +#if LASP_DEBUG == 1 + , + _main_thread_id(std::this_thread::get_id()) #endif -{ DEBUGTRACE_ENTER; } +{ + DEBUGTRACE_ENTER; +} void InDataHandler::start() { DEBUGTRACE_ENTER; - _mgr.addInDataHandler(*this); - + if (SmgrHandle handle = _mgr.lock()) { + handle->addInDataHandler(this); #if LASP_DEBUG == 1 - assert(_mgr._main_thread_id == _main_thread_id); + assert(handle->_main_thread_id == _main_thread_id); #endif + } } void InDataHandler::stop() { #if LASP_DEBUG == 1 stopCalled = true; #endif - _mgr.removeInDataHandler(*this); + if (SmgrHandle handle = _mgr.lock()) { + /* handle->removeInDataHandler(*this); */ + } } InDataHandler::~InDataHandler() { @@ -30,9 +37,9 @@ InDataHandler::~InDataHandler() { #if LASP_DEBUG == 1 if (!stopCalled) { std::cerr << "************ BUG: Stop function not called while arriving at " - "InDataHandler's destructor. Fix this by calling " - "InDataHandler::stop() from the derived class' destructor." - << std::endl; + "InDataHandler's destructor. Fix this by calling " + "InDataHandler::stop() from the derived class' destructor." + << std::endl; abort(); } #endif diff --git a/src/lasp/device/lasp_indatahandler.h b/src/lasp/device/lasp_indatahandler.h index 5008b68..e42824f 100644 --- a/src/lasp/device/lasp_indatahandler.h +++ b/src/lasp/device/lasp_indatahandler.h @@ -1,18 +1,21 @@ #pragma once #include +#include #include #include "lasp_types.h" +class StreamMgr; +using SmgrHandle = std::shared_ptr; + /** \addtogroup device * @{ */ -class StreamMgr; class DaqData; class Daq; class InDataHandler { protected: - StreamMgr &_mgr; + std::weak_ptr _mgr; #if LASP_DEBUG == 1 // This is a flag to indicate whether the method stop() is called for the // current handler. It should call the method stop() from the derived class's @@ -29,7 +32,7 @@ public: * * @param mgr Stream manager. */ - InDataHandler(StreamMgr &mgr); + InDataHandler(SmgrHandle mgr); /** * @brief This function is called when input data from a DAQ is available. diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index 19af4bc..d663222 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -1,24 +1,35 @@ -/* #define DEBUGTRACE_ENABLED */ -#include "debugtrace.hpp" +#define DEBUGTRACE_ENABLED #include "lasp_streammgr.h" +#include "debugtrace.hpp" #include "lasp_biquadbank.h" -#include "lasp_thread.h" #include "lasp_indatahandler.h" +#include "lasp_thread.h" #include #include #include #include +#include using std::cerr; using std::endl; using rte = std::runtime_error; +/** + * @brief The main global handle to a stream, stored in a shared pointer. + */ +std::shared_ptr _mgr; -StreamMgr &StreamMgr::getInstance() { - +std::shared_ptr StreamMgr::getInstance() { DEBUGTRACE_ENTER; - static StreamMgr mgr; - return mgr; + + if (!_mgr) { + _mgr = std::shared_ptr(new StreamMgr()); + if (!_mgr) { + throw rte("Fatal: could not allocate stream manager!"); + } + } + + return _mgr; } StreamMgr::StreamMgr() { @@ -211,13 +222,12 @@ bool StreamMgr::outCallback(DaqData &data) { StreamMgr::~StreamMgr() { DEBUGTRACE_ENTER; checkRightThread(); - stopAllStreams(); - if (!_inDataHandlers.empty()) { - cerr << "*** WARNING: InDataHandlers have not been all stopped, while " - "StreamMgr destructor is called. This is a misuse BUG" - << endl; - abort(); - } + // Stream manager now handled by shared pointer. Each indata handler gets a + // shared pointer to the stream manager, and stores a weak pointer to it. + // Hence, we do not have to do any cleanup here. It also makes sure that the + // order in which destructors are called does not matter anymore. As soon as + // the stream manager is destructed, the weak pointers loose there ref, and do + // not have to removeInDataHandler() anymore. } void StreamMgr::stopAllStreams() { DEBUGTRACE_ENTER; @@ -242,15 +252,15 @@ void StreamMgr::startStream(const DaqConfiguration &config) { std::scoped_lock lck(_devices_mtx); DeviceInfo *devinfo = nullptr; - bool found = false; + // Match configuration to a device in the list of devices for (auto &devinfoi : _devices) { if (config.match(*devinfoi)) { devinfo = devinfoi.get(); break; } } - if (!devinfo) { + if (devinfo == nullptr) { throw rte("Could not find a device with name " + config.device_name + " in list of devices."); } @@ -383,23 +393,20 @@ void StreamMgr::stopStream(const StreamType t) { } } -void StreamMgr::addInDataHandler(InDataHandler &handler) { +void StreamMgr::addInDataHandler(InDataHandler *handler) { DEBUGTRACE_ENTER; checkRightThread(); + assert(handler); std::scoped_lock lck(_inDataHandler_mtx); - if (_inputStream) { - handler.reset(_inputStream.get()); - } else { - handler.reset(nullptr); - } - if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), &handler) != + handler->reset(_inputStream.get()); + + if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) != _inDataHandlers.cend()) { throw std::runtime_error("Error: handler already added. Probably start() " "is called more than once on a handler object"); } - _inDataHandlers.push_back(&handler); + _inDataHandlers.push_back(handler); DEBUGTRACE_PRINT(_inDataHandlers.size()); - } void StreamMgr::removeInDataHandler(InDataHandler &handler) { @@ -409,7 +416,6 @@ void StreamMgr::removeInDataHandler(InDataHandler &handler) { _inDataHandlers.remove(&handler); DEBUGTRACE_PRINT(_inDataHandlers.size()); - } Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const { diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index 665ea3a..bb60cd0 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -57,15 +57,18 @@ class StreamMgr { */ DeviceInfoList _devices; + // Singleton, no public constructor. Can only be obtained using + // getInstance(); StreamMgr(); friend class InDataHandler; friend class Siggen; - // Singleton, no public destructor - ~StreamMgr(); public: + + ~StreamMgr(); + enum class StreamType : us { /** * @brief Input stream @@ -85,7 +88,7 @@ class StreamMgr { * * @return Reference to stream manager. */ - static StreamMgr &getInstance(); + static std::shared_ptr getInstance(); /** * @brief Obtain a list of devices currently available. When the StreamMgr is @@ -200,7 +203,7 @@ private: * * @param handler The handler to add. */ - void addInDataHandler(InDataHandler &handler); + void addInDataHandler(InDataHandler *handler); /** * @brief Do the actual rescanning. diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index a7252c3..0524cc4 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -1,6 +1,7 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" #include "lasp_config.h" + #if LASP_HAS_ULDAQ == 1 #include "lasp_uldaq.h" #include "lasp_uldaq_impl.h" @@ -17,13 +18,13 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { DaqDeviceDescriptor descriptor; DaqDeviceInterface interfaceType = ANY_IFC; - err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, - static_cast(&numdevs)); + err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, &numdevs); if (err != ERR_NO_ERROR) { throw rte("UlDaq device inventarization failed"); } + DEBUGTRACE_PRINT(string("Number of devices: ") + std::to_string(numdevs)); for (unsigned i = 0; i < numdevs; i++) { descriptor = devdescriptors[i]; @@ -33,7 +34,7 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { devinfo.api = uldaqapi; { - string name, interface; + string name; string productname = descriptor.productName; if (productname != "DT9837A") { throw rte("Unknown UlDAQ type: " + productname); @@ -56,9 +57,8 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { name = "Uknown interface = "; } - name += - string(descriptor.productName) + " " + string(descriptor.uniqueId); - devinfo.device_name = std::move(name); + name += productname + " " + string(descriptor.uniqueId); + devinfo.device_name = name; } devinfo.physicalOutputQty = DaqChannel::Qty::Voltage; @@ -93,7 +93,12 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { std::unique_ptr createUlDaqDevice(const DeviceInfo &devinfo, const DaqConfiguration &config) { - return std::make_unique(devinfo, config); + const UlDaqDeviceInfo *_info = + dynamic_cast(&devinfo); + if (_info == nullptr) { + throw rte("BUG: Could not cast DeviceInfo to UlDaqDeviceInfo"); + } + return std::make_unique(*_info, config); } #endif // LASP_HAS_ULDAQ diff --git a/src/lasp/device/lasp_uldaq_impl.h b/src/lasp/device/lasp_uldaq_impl.h deleted file mode 100644 index cc2cade..0000000 --- a/src/lasp/device/lasp_uldaq_impl.h +++ /dev/null @@ -1,164 +0,0 @@ -#pragma once -#include "debugtrace.hpp" -#include "lasp_daq.h" -#include -#include -#include -#include -#include -#include -#include -#include - -using std::atomic; -using std::cerr; -using std::endl; -using rte = std::runtime_error; - -/** - * @brief List of available sampling frequencies for DT9837A - */ -const std::vector ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000, - 22050, 24000, 32000, 44056, 44100, - 47250, 48000, 50000, 50400, 51000}; - - -/** - * @brief UlDaq-specific device information. Adds a copy of the underlying - * DaqDeDaqDeviceDescriptor. - */ -class UlDaqDeviceInfo : public DeviceInfo { - -public: - DaqDeviceDescriptor _uldaqDescriptor; - virtual std::unique_ptr clone() const override { - DEBUGTRACE_ENTER; - return std::make_unique(*this); - } -}; - -class DT9837A : public Daq { - - DaqDeviceHandle _handle = 0; - std::mutex _daqmutex; - - std::thread _thread; - atomic _stopThread{false}; - atomic _streamStatus; - - const us _nFramesPerBlock; - - void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback); - -public: - DaqDeviceHandle getHandle() const { return _handle; } - /** - * @brief Create a DT9837A instance. - * - * @param devinfo DeviceInfo to connect to - * @param config DaqConfiguration settings - */ - DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config); - - virtual ~DT9837A(); - - bool isRunning() const; - - void stop() override final; - - friend class InBufHandler; - friend class OutBufHandler; - - virtual void start(InDaqCallback inCallback, - OutDaqCallback outCallback) override final; - - virtual StreamStatus getStreamStatus() const override { - return _streamStatus; - } -}; - -/** - * @brief Helper class for managing input and output samples of the DAQ device. - */ -class BufHandler { -protected: - /** - * @brief Reference to underlying Daq - */ - DT9837A &daq; - /** - * @brief The type of data, in this case always double precision floats - */ - const DataTypeDescriptor dtype_descr = dtype_desc_fl64; - /** - * @brief The number of channels, number of frames per callback (block). - */ - us nchannels, nFramesPerBlock; - /** - * @brief Sampling frequency in Hz - */ - double samplerate; - std::vector buf; - /** - * @brief Whether the top / bottom part of the buffer are ready to be - * enqueued - */ - bool topenqueued, botenqueued; - - /** - * @brief Counter for the total number of frames acquired / sent since the - * start of the stream. - */ - us totalFramesCount = 0; - long long buffer_mid_idx; - -public: - /** - * @brief Initialize bufhandler - * - * @param daq - * @param nchannels - */ - BufHandler(DT9837A &daq, const us nchannels) - : daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels), - nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()), - buf(2 * nchannels * - nFramesPerBlock, // Watch the two here, the top and the bottom! - 0), - buffer_mid_idx(nchannels * nFramesPerBlock) { - assert(nchannels > 0); - } -}; -/** - * @brief Specific helper for the input buffer - */ -class InBufHandler : public BufHandler { - bool monitorOutput; - InDaqCallback cb; - -public: - InBufHandler(DT9837A &daq, InDaqCallback cb); - void start(); - /** - * @brief InBufHandler::operator()() - * - * @return true on success - */ - bool operator()(); - ~InBufHandler(); -}; -class OutBufHandler : public BufHandler { - OutDaqCallback cb; - -public: - OutBufHandler(DT9837A &daq, OutDaqCallback cb); - void start(); - /** - * @brief OutBufHandler::operator() - * - * @return true on success - */ - bool operator()(); - - ~OutBufHandler(); -}; diff --git a/src/lasp/device/lasp_uldaq_impl.cpp b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp similarity index 50% rename from src/lasp/device/lasp_uldaq_impl.cpp rename to src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp index e91f63e..7cc41c9 100644 --- a/src/lasp/device/lasp_uldaq_impl.cpp +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp @@ -3,192 +3,8 @@ #include "lasp_config.h" #if LASP_HAS_ULDAQ == 1 -#include "lasp_daqconfig.h" -#include "lasp_uldaq.h" -#include "lasp_uldaq_impl.h" - -using namespace std::literals::chrono_literals; - -/** - * @brief Reserve some space for an error message from UlDaq - */ -const us UL_ERR_MSG_LEN = 512; - -/** - * @brief Return a string corresponding to the UlDaq API error - * - * @param err error code - * - * @return Error string - */ -string getErrMsg(UlError err) { - string errstr; - errstr.reserve(UL_ERR_MSG_LEN); - char errmsg[UL_ERR_MSG_LEN]; - errstr = "UlDaq API Error: "; - ulGetErrMsg(err, errmsg); - errstr += errmsg; - return errstr; -} -inline void showErr(string errstr) { - std::cerr << "\b\n**************** UlDAQ backend error **********\n"; - std::cerr << errstr << std::endl; - std::cerr << "***********************************************\n\n"; -} -inline void showErr(UlError err) { - if (err != ERR_NO_ERROR) - showErr(getErrMsg(err)); -} - -DT9837A::~DT9837A() { - UlError err; - if (isRunning()) { - stop(); - } - - if (_handle) { - err = ulDisconnectDaqDevice(_handle); - showErr(err); - err = ulReleaseDaqDevice(_handle); - showErr(err); - } -} -DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config) - : Daq(devinfo, config), - _nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) { - - // Some sanity checks - if (inchannel_config.size() != 4) { - throw rte("Invalid length of enabled inChannels vector"); - } - - if (outchannel_config.size() != 1) { - throw rte("Invalid length of enabled outChannels vector"); - } - - if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) { - throw rte("Unsensible number of samples per block chosen"); - } - - if (samplerate() < ULDAQ_SAMPLERATES.at(0) || - samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size() - 1)) { - throw rte("Invalid sample rate"); - } - - const UlDaqDeviceInfo *_info = - dynamic_cast(&devinfo); - if (_info == nullptr) { - throw rte("BUG: Could not cast DeviceInfo to UlDaqDeviceInfo"); - } - - // get a handle to the DAQ device associated with the first descriptor - _handle = ulCreateDaqDevice(_info->_uldaqDescriptor); - - if (_handle == 0) { - throw rte("Unable to create a handle to the specified DAQ " - "device. Is the device currently in use? Please make sure to set " - "the DAQ configuration in duplex mode if simultaneous input and " - "output is required."); - } - - UlError err = ulConnectDaqDevice(_handle); - - if (err != ERR_NO_ERROR) { - ulReleaseDaqDevice(_handle); - _handle = 0; - throw rte("Unable to connect to device: " + getErrMsg(err)); - } - - /// Loop over input channels, set parameters - for (us ch = 0; ch < 4; ch++) { - - err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0); - showErr(err); - if (err != ERR_NO_ERROR) { - throw rte("Fatal: could normalize channel sensitivity"); - } - - CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC; - err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Fatal: could not set AC/DC coupling mode"); - } - - IepeMode iepe = - inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED; - err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Fatal: could not set IEPE mode"); - } - } -} - -bool DT9837A::isRunning() const { - DEBUGTRACE_ENTER; - return _thread.joinable(); -} -void DT9837A::stop() { - DEBUGTRACE_ENTER; - StreamStatus status = _streamStatus; - if (!isRunning()) { - throw rte("No data acquisition running"); - } - - // Stop the thread and join it - _stopThread = true; - assert(_thread.joinable()); - _thread.join(); - _stopThread = false; - - // Update stream status - status.isRunning = false; - _streamStatus = status; -} - -/** - * @brief Throws an appropriate stream exception based on the UlError number. - * The mapping is based on the error numbers as given in uldaq.h. There are a - * log of errors definded here (109 in total). Except for some, we will map - * most of them to a driver error. - * - * @param e The backend error code. - */ -inline void throwOnPossibleUlException(UlError err) { - if (err == ERR_NO_ERROR) { - return; - } - string errstr = getErrMsg(err); - showErr(errstr); - Daq::StreamStatus::StreamError serr; - if ((int)err == 18) { - serr = Daq::StreamStatus::StreamError::inputXRun; - } else if ((int)err == 19) { - serr = Daq::StreamStatus::StreamError::outputXRun; - } else { - serr = Daq::StreamStatus::StreamError::driverError; - } - - throw Daq::StreamException(serr, errstr); -} - -void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { - DEBUGTRACE_ENTER; - if (isRunning()) { - throw rte("DAQ is already running"); - } - if (neninchannels() > 0) { - if (!inCallback) - throw rte("DAQ requires a callback for input data"); - } - if (nenoutchannels() > 0) { - if (!outCallback) - throw rte("DAQ requires a callback for output data"); - } - assert(neninchannels() + nenoutchannels() > 0); - _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); -} +#include "lasp_uldaq_bufhandler.h" +#include "lasp_daq.h" InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb) : BufHandler(daq, daq.neninchannels()), cb(cb) @@ -371,7 +187,7 @@ void OutBufHandler::start() { } bool OutBufHandler::operator()() { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; bool res = true; assert(daq.getHandle() != 0); @@ -389,7 +205,8 @@ bool OutBufHandler::operator()() { totalFramesCount += increment; if (increment > nFramesPerBlock) { - throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun); + cerr << "totalFramesCount: " << totalFramesCount << ". Detected output underrun" << endl; + /* throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun); */ } if (transferStatus.currentIndex < buffer_mid_idx) { @@ -425,70 +242,4 @@ OutBufHandler::~OutBufHandler() { showErr(err); } } - -void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { - - DEBUGTRACE_ENTER; - - try { - - std::unique_ptr obh; - std::unique_ptr ibh; - - StreamStatus status = _streamStatus; - status.isRunning = true; - _streamStatus = status; - - if (nenoutchannels() > 0) { - assert(outCallback); - obh = std::make_unique(*this, outCallback); - } - if (neninchannels() > 0) { - assert(inCallback); - ibh = std::make_unique(*this, inCallback); - } - if (obh) - obh->start(); - if (ibh) - ibh->start(); - - const double sleeptime_s = - static_cast(_nFramesPerBlock) / (16 * samplerate()); - const us sleeptime_us = static_cast(sleeptime_s * 1e6); - - while (!_stopThread) { - if (ibh) { - if (!(*ibh)()) { - _stopThread = true; - break; - } - } - if (obh) { - if (!(*obh)()) { - _stopThread = true; - break; - } - } - - std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us)); - } - - /// Update stream status that we are not running anymore - status.isRunning = false; - _streamStatus = status; - _stopThread = false; - - } catch (StreamException &e) { - - StreamStatus status = _streamStatus; - // Copy over error type - status.errorType = e.e; - _streamStatus = status; - - cerr << "\n******************\n"; - cerr << "Catched error in UlDAQ thread: " << e.what() << endl; - cerr << "\n******************\n"; - } -} - -#endif // LASP_HAS_ULDAQ +#endif diff --git a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h new file mode 100644 index 0000000..05ca6ae --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h @@ -0,0 +1,99 @@ +#pragma once +#include +#include "lasp_types.h" +#include "lasp_uldaq_impl.h" +#include "lasp_uldaq_common.h" + + +class DT9837A; + +/** + * @brief Helper class for managing input and output samples of the DAQ device. + */ +class BufHandler { +protected: + /** + * @brief Reference to underlying Daq + */ + DT9837A &daq; + /** + * @brief The type of data, in this case always double precision floats + */ + const DataTypeDescriptor dtype_descr = dtype_desc_fl64; + /** + * @brief The number of channels, number of frames per callback (block). + */ + us nchannels, nFramesPerBlock; + /** + * @brief Sampling frequency in Hz + */ + double samplerate; + std::vector buf; + /** + * @brief Whether the top part of the buffer is enqueued + */ + bool topenqueued = false; + /** + * @brief Whether the bottom part of the buffer is enqueued + * enqueued + */ + bool botenqueued = false; + + /** + * @brief Counter for the total number of frames acquired / sent since the + * start of the stream. + */ + us totalFramesCount = 0; + long long buffer_mid_idx; + +public: + /** + * @brief Initialize bufhandler + * + * @param daq + * @param nchannels + */ + BufHandler(DT9837A &daq, const us nchannels) + : daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels), + nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()), + buf(2 * nchannels * + nFramesPerBlock, // Watch the two here, the top and the bottom! + 0), + buffer_mid_idx(nchannels * nFramesPerBlock) { + assert(nchannels > 0); + assert(nFramesPerBlock > 0); + } +}; +/** + * @brief Specific helper for the input buffer + */ +class InBufHandler : public BufHandler { + bool monitorOutput; + InDaqCallback cb; + +public: + InBufHandler(DT9837A &daq, InDaqCallback cb); + void start(); + /** + * @brief InBufHandler::operator()() + * + * @return true on success + */ + bool operator()(); + ~InBufHandler(); +}; +class OutBufHandler : public BufHandler { + OutDaqCallback cb; + +public: + OutBufHandler(DT9837A &daq, OutDaqCallback cb); + void start(); + /** + * @brief OutBufHandler::operator() + * + * @return true on success + */ + bool operator()(); + + ~OutBufHandler(); +}; diff --git a/src/lasp/device/uldaq/lasp_uldaq_common.cpp b/src/lasp/device/uldaq/lasp_uldaq_common.cpp new file mode 100644 index 0000000..d05c629 --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_common.cpp @@ -0,0 +1,45 @@ +/* #define DEBUGTRACE_ENABLED */ +#include "debugtrace.hpp" +#include "lasp_config.h" + +#if LASP_HAS_ULDAQ == 1 +#include "lasp_uldaq_common.h" +#include "lasp_daq.h" + +string getErrMsg(UlError err) { + string errstr; + errstr.reserve(ERR_MSG_LEN); + char errmsg[ERR_MSG_LEN]; + errstr = "UlDaq API Error: "; + ulGetErrMsg(err, errmsg); + errstr += errmsg; + return errstr; +} +void showErr(string errstr) { + std::cerr << "\b\n**************** UlDAQ backend error **********\n"; + std::cerr << errstr << std::endl; + std::cerr << "***********************************************\n\n"; +} +void showErr(UlError err) { + if (err != ERR_NO_ERROR) + showErr(getErrMsg(err)); +} +#endif + +void throwOnPossibleUlException(UlError err) { + if (err == ERR_NO_ERROR) { + return; + } + string errstr = getErrMsg(err); + showErr(errstr); + Daq::StreamStatus::StreamError serr; + if ((int)err == 18) { + serr = Daq::StreamStatus::StreamError::inputXRun; + } else if ((int)err == 19) { + serr = Daq::StreamStatus::StreamError::outputXRun; + } else { + serr = Daq::StreamStatus::StreamError::driverError; + } + + throw Daq::StreamException(serr, errstr); +} diff --git a/src/lasp/device/uldaq/lasp_uldaq_common.h b/src/lasp/device/uldaq/lasp_uldaq_common.h new file mode 100644 index 0000000..371974c --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_common.h @@ -0,0 +1,60 @@ +#pragma once +#include +#include +#include "lasp_deviceinfo.h" + +/** + * @brief Throws an appropriate stream exception based on the UlError number. + * The mapping is based on the error numbers as given in uldaq.h. There are a + * log of errors definded here (109 in total). Except for some, we will map + * most of them to a driver error. + * + * @param e The backend error code. + */ +void throwOnPossibleUlException(UlError err); + +/** + * @brief Return a string corresponding to the UlDaq API error + * + * @param err error code + * + * @return Error string + */ +string getErrMsg(UlError err); + +/** + * @brief Print error message to stderr + * + * @param errstr The string to print + */ +void showErr(UlError err); + +/** + * @brief Get a string representation of the error + * + * @param errstr + */ +void showErr(std::string errstr); + +/** + * @brief UlDaq-specific device information. Adds a copy of the underlying + * DaqDeDaqDeviceDescriptor. + */ +class UlDaqDeviceInfo : public DeviceInfo { + +public: + DaqDeviceDescriptor _uldaqDescriptor; + virtual std::unique_ptr clone() const override { + DEBUGTRACE_ENTER; + return std::make_unique(*this); + } +}; + +/** + * @brief List of available sampling frequencies for DT9837A + */ +const std::vector ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000, + 22050, 24000, 32000, 44056, 44100, + 47250, 48000, 50000, 50400, 51000}; + + diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.cpp b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp new file mode 100644 index 0000000..32419dc --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp @@ -0,0 +1,212 @@ +/* #define DEBUGTRACE_ENABLED */ +#include "debugtrace.hpp" +#include "lasp_config.h" + +#if LASP_HAS_ULDAQ == 1 +#include "lasp_daqconfig.h" +#include "lasp_uldaq.h" +#include "lasp_uldaq_bufhandler.h" +#include "lasp_uldaq_impl.h" + +using namespace std::literals::chrono_literals; + +DT9837A::~DT9837A() { + DEBUGTRACE_ENTER; + UlError err; + if (isRunning()) { + DEBUGTRACE_PRINT("Stop UlDAQ from destructor"); + stop(); + } + + if (_handle) { + DEBUGTRACE_PRINT("Disconnecting and releasing DaqDevice"); + /* err = ulDisconnectDaqDevice(_handle); */ + /* showErr(err); */ + err = ulReleaseDaqDevice(_handle); + showErr(err); + } +} +DT9837A::DT9837A(const UlDaqDeviceInfo &devinfo, const DaqConfiguration &config) + : Daq(devinfo, config), + _nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) { + + const DaqDeviceDescriptor &descriptor = devinfo._uldaqDescriptor; + DEBUGTRACE_PRINT(string("Device: ") + descriptor.productName); + DEBUGTRACE_PRINT(string("Product id: ") + to_string(descriptor.productId)); + DEBUGTRACE_PRINT(string("Dev string: ") + descriptor.devString); + DEBUGTRACE_PRINT(string("Unique id: ") + descriptor.uniqueId); + + // get a handle to the DAQ device associated with the first descriptor + _handle = ulCreateDaqDevice(descriptor); + + if (_handle == 0) { + throw rte("Unable to create a handle to the specified DAQ " + "device. Is the device currently in use? Please make sure to set " + "the DAQ configuration in duplex mode if simultaneous input and " + "output is required."); + } + + UlError err = ulConnectDaqDevice(_handle); + + if (err != ERR_NO_ERROR) { + ulReleaseDaqDevice(_handle); + _handle = 0; + throw rte("Unable to connect to device: " + getErrMsg(err)); + } + + /// Loop over input channels, set parameters + for (us ch = 0; ch < 4; ch++) { + + err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0); + showErr(err); + if (err != ERR_NO_ERROR) { + throw rte("Fatal: could normalize channel sensitivity"); + } + + CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC; + err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm); + if (err != ERR_NO_ERROR) { + showErr(err); + throw rte("Fatal: could not set AC/DC coupling mode"); + } + + IepeMode iepe = + inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED; + err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe); + if (err != ERR_NO_ERROR) { + showErr(err); + throw rte("Fatal: could not set IEPE mode"); + } + } +} + +bool DT9837A::isRunning() const { + DEBUGTRACE_ENTER; + /* return _thread.joinable(); */ + StreamStatus status = _streamStatus; + return status.isRunning; +} +void DT9837A::stop() { + DEBUGTRACE_ENTER; + StreamStatus status = _streamStatus; + status.isRunning = true; + _streamStatus = status; + /* if (!isRunning()) { */ + /* throw rte("No data acquisition running"); */ + /* } */ + + // Stop the thread and join it + /* _stopThread = true; */ + /* assert(_thread.joinable()); */ + /* _thread.join(); */ + /* _stopThread = false; */ + + // Update stream status + status.isRunning = false; + _streamStatus = status; +} + +void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { + DEBUGTRACE_ENTER; + if (isRunning()) { + throw rte("DAQ is already running"); + } + if (neninchannels() > 0) { + if (!inCallback) + throw rte("DAQ requires a callback for input data"); + } + if (nenoutchannels() > 0) { + if (!outCallback) + throw rte("DAQ requires a callback for output data"); + } + assert(neninchannels() + nenoutchannels() > 0); + /* _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); + */ +} + +void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { + + DEBUGTRACE_ENTER; + + try { + + std::unique_ptr obh; + std::unique_ptr ibh; + + StreamStatus status = _streamStatus; + status.isRunning = true; + _streamStatus = status; + + if (nenoutchannels() > 0) { + assert(outCallback); + obh = std::make_unique(*this, outCallback); + } + if (neninchannels() > 0) { + assert(inCallback); + ibh = std::make_unique(*this, inCallback); + } + if (obh) + obh->start(); + if (ibh) + ibh->start(); + + const double sleeptime_s = + static_cast(_nFramesPerBlock) / (16 * samplerate()); + const us sleeptime_us = static_cast(sleeptime_s * 1e6); + + while (!_stopThread) { + if (ibh) { + if (!(*ibh)()) { + _stopThread = true; + break; + } + } + if (obh) { + if (!(*obh)()) { + _stopThread = true; + break; + } + } + + std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us)); + } + + /// Update stream status that we are not running anymore + status.isRunning = false; + _streamStatus = status; + _stopThread = false; + + } catch (StreamException &e) { + + StreamStatus status = _streamStatus; + // Copy over error type + status.errorType = e.e; + _streamStatus = status; + + cerr << "\n******************\n"; + cerr << "Catched error in UlDAQ thread: " << e.what() << endl; + cerr << "\n******************\n"; + } +} + +void DT9837A::sanityChecks() const { + // Some sanity checks + if (inchannel_config.size() != 4) { + throw rte("Invalid length of enabled inChannels vector"); + } + + if (outchannel_config.size() != 1) { + throw rte("Invalid length of enabled outChannels vector"); + } + + if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) { + throw rte("Unsensible number of samples per block chosen"); + } + + if (samplerate() < ULDAQ_SAMPLERATES.at(0) || + samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size() - 1)) { + throw rte("Invalid sample rate"); + } +} + +#endif // LASP_HAS_ULDAQ diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.h b/src/lasp/device/uldaq/lasp_uldaq_impl.h new file mode 100644 index 0000000..aa170d0 --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.h @@ -0,0 +1,96 @@ +#pragma once +#include "debugtrace.hpp" +#include "lasp_uldaq_common.h" +#include +#include +#include +#include +#include +#include +#include +#include "lasp_daq.h" + +using std::atomic; +using std::cerr; +using std::endl; +using rte = std::runtime_error; + +class InBufHandler; +class OutBufHandler; + + +class DT9837A : public Daq { + + DaqDeviceHandle _handle = 0; + std::mutex _daqmutex; + + /** + * @brief The thread that is doing I/O with UlDaq + */ + std::thread _thread; + + + /** + * @brief Flag indicating the thread to stop processing. + */ + atomic _stopThread{false}; + /** + * @brief Storage for exchanging information on the stream + */ + atomic _streamStatus; + + const us _nFramesPerBlock; + + /** + * @brief The function that is running in a thread + * + * @param inCallback + * @param outcallback + */ + void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback); + + /** + * @brief Obtain a handle to the underlying device + * + * @return Handle + */ + DaqDeviceHandle getHandle() const { return _handle; } + /** + * @brief Perform several sanity checks + */ + void sanityChecks() const; +public: + + /** + * @brief Create a DT9837A instance. + * + * @param devinfo DeviceInfo to connect to + * @param config DaqConfiguration settings + */ + DT9837A(const UlDaqDeviceInfo &devinfo, const DaqConfiguration &config); + + virtual ~DT9837A(); + + bool isRunning() const; + + /** + * @brief Stop the data-acquisition + */ + void stop() override final; + + friend class InBufHandler; + friend class OutBufHandler; + + virtual void start(InDaqCallback inCallback, + OutDaqCallback outCallback) override final; + + /** + * @brief Obtain copy of stream status (thread-safe function) + * + * @return StreamStatus object + */ + virtual StreamStatus getStreamStatus() const override { + return _streamStatus; + } +}; + diff --git a/src/lasp/dsp/lasp_clip.cpp b/src/lasp/dsp/lasp_clip.cpp index fcf14e3..fe37945 100644 --- a/src/lasp/dsp/lasp_clip.cpp +++ b/src/lasp/dsp/lasp_clip.cpp @@ -11,7 +11,7 @@ using std::endl; using Lck = std::scoped_lock; using rte = std::runtime_error; -ClipHandler::ClipHandler(StreamMgr &mgr) +ClipHandler::ClipHandler(SmgrHandle mgr) : ThreadedInDataHandler(mgr){ DEBUGTRACE_ENTER; diff --git a/src/lasp/dsp/lasp_clip.h b/src/lasp/dsp/lasp_clip.h index e5f76e5..ec2dfd5 100644 --- a/src/lasp/dsp/lasp_clip.h +++ b/src/lasp/dsp/lasp_clip.h @@ -58,7 +58,7 @@ class ClipHandler: public ThreadedInDataHandler { * * @param mgr Stream Mgr to operate on */ - ClipHandler(StreamMgr& mgr); + ClipHandler(SmgrHandle mgr); ~ClipHandler(); /** diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 0afb46b..4cb185d 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -11,7 +11,7 @@ using std::endl; using Lck = std::scoped_lock; using rte = std::runtime_error; -PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps) +PPMHandler::PPMHandler(SmgrHandle mgr, const d decay_dBps) : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { DEBUGTRACE_ENTER; diff --git a/src/lasp/dsp/lasp_ppm.h b/src/lasp/dsp/lasp_ppm.h index 51b0872..aecf02d 100644 --- a/src/lasp/dsp/lasp_ppm.h +++ b/src/lasp/dsp/lasp_ppm.h @@ -73,7 +73,7 @@ class PPMHandler: public ThreadedInDataHandler { * @param decay_dBps The level decay in units dB/s, after a peak has been * hit. */ - PPMHandler(StreamMgr& mgr,const d decay_dBps = 20.0); + PPMHandler(SmgrHandle mgr,const d decay_dBps = 20.0); ~PPMHandler(); /** diff --git a/src/lasp/dsp/lasp_rtaps.cpp b/src/lasp/dsp/lasp_rtaps.cpp index f39e03e..9e66528 100644 --- a/src/lasp/dsp/lasp_rtaps.cpp +++ b/src/lasp/dsp/lasp_rtaps.cpp @@ -9,7 +9,7 @@ using std::cerr; using std::endl; using Lck = std::scoped_lock; -RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, +RtAps::RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, const us nfft, const Window::WindowType w, const d overlap_percentage, const d time_constant) diff --git a/src/lasp/dsp/lasp_rtaps.h b/src/lasp/dsp/lasp_rtaps.h index 5c86bb0..006ce1d 100644 --- a/src/lasp/dsp/lasp_rtaps.h +++ b/src/lasp/dsp/lasp_rtaps.h @@ -49,7 +49,7 @@ public: * * For all other arguments, see constructor of AvPowerSpectra */ - RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, const us nfft = 2048, + RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, const us nfft = 2048, const Window::WindowType w = Window::WindowType::Hann, const d overlap_percentage = 50., const d time_constant = -1); ~RtAps(); diff --git a/src/lasp/dsp/lasp_rtsignalviewer.cpp b/src/lasp/dsp/lasp_rtsignalviewer.cpp index 69af040..d6d06d5 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.cpp +++ b/src/lasp/dsp/lasp_rtsignalviewer.cpp @@ -11,7 +11,7 @@ using std::endl; using Lck = std::scoped_lock; using rte = std::runtime_error; -RtSignalViewer::RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, +RtSignalViewer::RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, const us resolution, const us channel) : ThreadedInDataHandler(mgr), _approx_time_hist(approx_time_hist), _resolution(resolution), _channel(channel) { diff --git a/src/lasp/dsp/lasp_rtsignalviewer.h b/src/lasp/dsp/lasp_rtsignalviewer.h index 85fefd1..0a7676d 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.h +++ b/src/lasp/dsp/lasp_rtsignalviewer.h @@ -71,7 +71,7 @@ public: * @param resolution Number of time points * @param channel The channel number */ - RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, const us resolution, + RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, const us resolution, const us channel); ~RtSignalViewer(); diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 84ee117..3240787 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -49,7 +49,7 @@ public: bool empty() const { return _contents == 0; } }; -ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) +ThreadedInDataHandler::ThreadedInDataHandler(SmgrHandle mgr) : InDataHandler(mgr), _queue(std::make_unique()) { DEBUGTRACE_ENTER; @@ -58,6 +58,7 @@ ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) getPool(); } void ThreadedInDataHandler::startThread() { + DEBUGTRACE_ENTER; _thread_can_safely_run = true; start(); } @@ -85,7 +86,7 @@ bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { return _lastCallbackResult; } void ThreadedInDataHandler::stopThread() { - + DEBUGTRACE_ENTER; // Make sure inCallback is no longer called _thread_can_safely_run = false; stop(); diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index c447ce0..12e162b 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -52,8 +52,8 @@ public: * * @param mgr StreamMgr singleton reference */ - ThreadedInDataHandler(StreamMgr &mgr); - ~ThreadedInDataHandler(); + ThreadedInDataHandler(SmgrHandle mgr); + virtual ~ThreadedInDataHandler(); /** * @brief Pushes a copy of the daqdata to the thread queue and returns diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index be6d33f..b13f0fd 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -1,14 +1,14 @@ /* #define DEBUGTRACE_ENABLED */ #include "arma_npy.h" #include "debugtrace.hpp" -#include "lasp_ppm.h" #include "lasp_clip.h" +#include "lasp_daq.h" +#include "lasp_daqdata.h" +#include "lasp_ppm.h" #include "lasp_rtaps.h" #include "lasp_rtsignalviewer.h" -#include "lasp_threadedindatahandler.h" -#include "lasp_daqdata.h" -#include "lasp_daq.h" #include "lasp_streammgr.h" +#include "lasp_threadedindatahandler.h" #include #include #include @@ -107,7 +107,7 @@ class PyIndataHandler : public ThreadedInDataHandler { py::function cb, reset_callback; public: - PyIndataHandler(StreamMgr &mgr, py::function cb, py::function reset_callback) + PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) : ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) { DEBUGTRACE_ENTER; @@ -119,7 +119,25 @@ public: DEBUGTRACE_ENTER; stopThread(); } - void reset(const Daq *daq) override final { reset_callback(daq); } + void reset(const Daq *daq) override final { + DEBUGTRACE_ENTER; + py::gil_scoped_acquire acquire; + try { + if (daq) { + reset_callback(daq); + } else { + reset_callback(py::none()); + } + } catch (py::error_already_set &e) { + cerr << "*************** Error calling reset callback!\n"; + cerr << e.what() << endl; + cerr << "*************** \n"; + /// Throwing a runtime error here does not work out one way or another. + /// Therefore, it is better to dive out and prevent undefined behaviour + abort(); + /* throw std::runtime_error(e.what()); */ + } + } /** * @brief Reads from the buffer @@ -174,12 +192,12 @@ void init_datahandler(py::module &m) { /// The C++ class is PyIndataHandler, but for Python, it is called /// InDataHandler py::class_ pyidh(m, "InDataHandler"); - pyidh.def(py::init()); + pyidh.def(py::init()); /// Peak Programme Meter py::class_ ppm(m, "PPMHandler"); - ppm.def(py::init()); - ppm.def(py::init()); + ppm.def(py::init()); + ppm.def(py::init()); ppm.def("getCurrentValue", [](const PPMHandler &ppm) { std::tuple tp = ppm.getCurrentValue(); @@ -190,10 +208,9 @@ void init_datahandler(py::module &m) { /// Clip Detector py::class_ clip(m, "ClipHandler"); - clip.def(py::init()); + clip.def(py::init()); clip.def("getCurrentValue", [](const ClipHandler &clip) { - arma::uvec cval = clip.getCurrentValue(); return ColToNpy(cval); // something goes wrong here @@ -202,7 +219,7 @@ void init_datahandler(py::module &m) { /// Real time Aps /// py::class_ rtaps(m, "RtAps"); - rtaps.def(py::init rtsv(m, "RtSignalViewer"); - rtsv.def(py::init()); rtsv.def("getCurrentValue", [](RtSignalViewer &rt) { diff --git a/src/lasp/pybind11/lasp_streammgr.cpp b/src/lasp/pybind11/lasp_streammgr.cpp index fa98657..101da9d 100644 --- a/src/lasp/pybind11/lasp_streammgr.cpp +++ b/src/lasp/pybind11/lasp_streammgr.cpp @@ -13,7 +13,7 @@ void init_streammgr(py::module &m) { /// The stream manager is a singleton, and the lifetime is managed elsewhere. // It should not be deleted. - py::class_> smgr( + py::class_> smgr( m, "StreamMgr"); py::enum_(smgr, "StreamType") @@ -23,7 +23,7 @@ void init_streammgr(py::module &m) { smgr.def("startStream", &StreamMgr::startStream); smgr.def("stopStream", &StreamMgr::stopStream); smgr.def_static("getInstance", []() { - return std::unique_ptr(&StreamMgr::getInstance()); + return StreamMgr::getInstance(); }); smgr.def("stopAllStreams", &StreamMgr::stopAllStreams); From 028bed9229af6e30d70a63a1903cc4c4f3c19683 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Wed, 7 Jun 2023 21:51:03 +0200 Subject: [PATCH 5/7] One forgotten debugtrace back to disabled --- src/lasp/device/lasp_streammgr.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index d663222..43e79c1 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -1,4 +1,4 @@ -#define DEBUGTRACE_ENABLED +/* #define DEBUGTRACE_ENABLED */ #include "lasp_streammgr.h" #include "debugtrace.hpp" #include "lasp_biquadbank.h" From 21df1bc6cf71479f28ac7ac177ac46286ba70dee Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Fri, 9 Jun 2023 10:43:04 +0200 Subject: [PATCH 6/7] Incallbacks should not return anything anymore. From inheritance to composition for InDataHandler code. StreamMgr singleton only weak ptr stored, this makes sure destruction from Python is more often done. UlDAQ code back to working. --- src/lasp/device/lasp_daq.h | 4 +- src/lasp/device/lasp_indatahandler.cpp | 25 +++-- src/lasp/device/lasp_indatahandler.h | 66 ++++++------- src/lasp/device/lasp_rtaudiodaq.cpp | 13 +-- src/lasp/device/lasp_rtaudiodaq.h | 21 ++++ src/lasp/device/lasp_streammgr.cpp | 61 +++++++----- src/lasp/device/lasp_streammgr.h | 16 +++- src/lasp/device/lasp_uldaq.cpp | 6 ++ src/lasp/device/lasp_uldaq.h | 16 +++- .../device/uldaq/lasp_uldaq_bufhandler.cpp | 10 +- src/lasp/device/uldaq/lasp_uldaq_bufhandler.h | 12 ++- src/lasp/device/uldaq/lasp_uldaq_common.h | 6 ++ src/lasp/device/uldaq/lasp_uldaq_impl.cpp | 18 ++-- src/lasp/device/uldaq/lasp_uldaq_impl.h | 14 +++ src/lasp/dsp/lasp_clip.cpp | 3 +- src/lasp/dsp/lasp_clip.h | 6 +- src/lasp/dsp/lasp_ppm.cpp | 9 +- src/lasp/dsp/lasp_ppm.h | 9 +- src/lasp/dsp/lasp_rtaps.cpp | 5 +- src/lasp/dsp/lasp_rtaps.h | 6 +- src/lasp/dsp/lasp_rtsignalviewer.cpp | 4 +- src/lasp/dsp/lasp_rtsignalviewer.h | 6 +- src/lasp/dsp/lasp_threadedindatahandler.cpp | 46 ++++----- src/lasp/dsp/lasp_threadedindatahandler.h | 95 +++++++++++++------ src/lasp/pybind11/lasp_pyindatahandler.cpp | 36 ++++--- 25 files changed, 322 insertions(+), 191 deletions(-) diff --git a/src/lasp/device/lasp_daq.h b/src/lasp/device/lasp_daq.h index 3fa35ff..e1f9d3d 100644 --- a/src/lasp/device/lasp_daq.h +++ b/src/lasp/device/lasp_daq.h @@ -15,12 +15,12 @@ * @brief Callback of DAQ for input data. Callback should return * false for a stop request. */ -using InDaqCallback = std::function; +using InDaqCallback = std::function; /** * @brief */ -using OutDaqCallback = std::function; +using OutDaqCallback = std::function; /** * @brief Base cass for all DAQ (Data Acquisition) interfaces. A DAQ can be a diff --git a/src/lasp/device/lasp_indatahandler.cpp b/src/lasp/device/lasp_indatahandler.cpp index 1598e4c..8bdeb17 100644 --- a/src/lasp/device/lasp_indatahandler.cpp +++ b/src/lasp/device/lasp_indatahandler.cpp @@ -4,43 +4,56 @@ #include "lasp_streammgr.h" #include -InDataHandler::InDataHandler(SmgrHandle mgr) - : _mgr(mgr) +InDataHandler::InDataHandler(SmgrHandle mgr, const InCallbackType cb, + const InResetType resetfcn) + : _mgr(mgr), inCallback(cb), reset(resetfcn) #if LASP_DEBUG == 1 , - _main_thread_id(std::this_thread::get_id()) + main_thread_id(std::this_thread::get_id()) #endif { DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + assert(mgr->main_thread_id == main_thread_id); +#endif } void InDataHandler::start() { DEBUGTRACE_ENTER; + checkRightThread(); if (SmgrHandle handle = _mgr.lock()) { handle->addInDataHandler(this); #if LASP_DEBUG == 1 - assert(handle->_main_thread_id == _main_thread_id); + assert(handle->main_thread_id == main_thread_id); #endif } } void InDataHandler::stop() { + DEBUGTRACE_ENTER; + checkRightThread(); #if LASP_DEBUG == 1 stopCalled = true; #endif if (SmgrHandle handle = _mgr.lock()) { - /* handle->removeInDataHandler(*this); */ + handle->removeInDataHandler(*this); } } InDataHandler::~InDataHandler() { DEBUGTRACE_ENTER; + checkRightThread(); #if LASP_DEBUG == 1 if (!stopCalled) { std::cerr << "************ BUG: Stop function not called while arriving at " "InDataHandler's destructor. Fix this by calling " - "InDataHandler::stop() from the derived class' destructor." + "InDataHandler::stop()." << std::endl; abort(); } #endif } +#if LASP_DEBUG == 1 +void InDataHandler::checkRightThread() const { + assert(std::this_thread::get_id() == main_thread_id); +} +#endif diff --git a/src/lasp/device/lasp_indatahandler.h b/src/lasp/device/lasp_indatahandler.h index e42824f..2ff47b1 100644 --- a/src/lasp/device/lasp_indatahandler.h +++ b/src/lasp/device/lasp_indatahandler.h @@ -1,17 +1,29 @@ #pragma once +#include "lasp_types.h" #include +#include #include #include -#include "lasp_types.h" class StreamMgr; using SmgrHandle = std::shared_ptr; +class DaqData; +class Daq; + /** \addtogroup device * @{ */ -class DaqData; -class Daq; + +/** + * @brief The function definition of callbacks with incoming DAQ data + */ +using InCallbackType = std::function; +/** + * @brief Function definition for the reset callback. + */ +using InResetType = std::function; + class InDataHandler { protected: @@ -24,53 +36,41 @@ protected: #endif public: - virtual ~InDataHandler(); + ~InDataHandler(); + const InCallbackType inCallback; + const InResetType reset; /** * @brief When constructed, the handler is added to the stream manager, which * will call the handlers's inCallback() until stop() is called. * * @param mgr Stream manager. + * @param cb The callback that is stored, and called on new DAQ data + * @param resetfcn The callback that is stored, and called when the DAQ + * changes state. */ - InDataHandler(SmgrHandle mgr); + InDataHandler(SmgrHandle mgr, InCallbackType cb, + InResetType resetfcn); /** - * @brief This function is called when input data from a DAQ is available. - * - * @param daqdata Input data from DAQ - * - * @return true if no error. False to stop the stream from running. - */ - virtual bool inCallback(const DaqData &daqdata) = 0; - - /** - * @brief Reset in-data handler. - * - * @param daq New DAQ configuration of inCallback(). If nullptr is given, - * it means that the stream is stopped. - */ - virtual void reset(const Daq *daq = nullptr) = 0; - - /** - * @brief This function should be called from the constructor of the - * implementation of InDataHandler, that one than also implements - * `inCallback`. It will start the stream's calling of inCallback(). + * @brief Adds the current InDataHandler to the list of handlers in the + * StreamMgr. After this happens, the reset() method stored in this + * object is called back. When the stream is running, right after this, + * inCallback() is called with DaqData. */ void start(); /** - * @brief This function should be called from the destructor of derived - * classes, to disable the calls to inCallback(), such that proper - * destruction of the object is allowed and no other threads call methods - * from the object. It removes the inCallback() from the callback list of the - * StreamMgr(). **Failing to call this function results in deadlocks, errors - * like "pure virtual function called", or other**. + * @brief Removes the currend InDataHandler from the list of handlers in the + * StreamMgr. From that point on, the object can be safely destroyed. Not + * calling stop() before destruction of this object is considered a BUG. I.e. + * a class which *uses* an InDataHandler should always call stop() in its + * destructor. */ void stop(); -private: #if LASP_DEBUG == 1 - const std::thread::id _main_thread_id; + const std::thread::id main_thread_id; void checkRightThread() const; #else void checkRightThread() const {} diff --git a/src/lasp/device/lasp_rtaudiodaq.cpp b/src/lasp/device/lasp_rtaudiodaq.cpp index f2fef80..b10d54c 100644 --- a/src/lasp/device/lasp_rtaudiodaq.cpp +++ b/src/lasp/device/lasp_rtaudiodaq.cpp @@ -368,11 +368,7 @@ public: DaqData d{nFramesPerBlock, neninchannels, dtype}; d.copyInFromRaw(ptrs); - bool ret = _incallback(d); - if (!ret) { - stopWithError(se::noError); - return 1; - } + _incallback(d); } if (outputBuffer) { @@ -395,11 +391,8 @@ public: } DaqData d{nFramesPerBlock, nenoutchannels, dtype}; - bool ret = _outcallback(d); - if (!ret) { - stopWithError(se::noError); - return 1; - } + _outcallback(d); + // Copy over the buffer us j = 0; for (auto ptr : ptrs) { d.copyToRaw(j, ptr); diff --git a/src/lasp/device/lasp_rtaudiodaq.h b/src/lasp/device/lasp_rtaudiodaq.h index 74b6d3e..288431a 100644 --- a/src/lasp/device/lasp_rtaudiodaq.h +++ b/src/lasp/device/lasp_rtaudiodaq.h @@ -2,6 +2,25 @@ #include "lasp_daq.h" #include +/** \addtogroup device + * @{ + * \defgroup rtaudio RtAudio backend + * This code is used to interface with the RtAudio cross-platform audio + * interface. + * + * \addtogroup rtaudio + * @{ + */ + + +/** + * @brief Method called from Daq::createDaq. + * + * @param devinfo Device info + * @param config DAQ Configuration settings + * + * @return Pointer to Daq instance. Throws Runtime errors on error. + */ std::unique_ptr createRtAudioDevice(const DeviceInfo& devinfo, const DaqConfiguration& config); @@ -12,3 +31,5 @@ std::unique_ptr createRtAudioDevice(const DeviceInfo& devinfo, */ void fillRtAudioDeviceInfo(DeviceInfoList &devinfolist); +/** @} */ +/** @} */ diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index 43e79c1..e744d6b 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -15,34 +15,55 @@ using std::endl; using rte = std::runtime_error; /** - * @brief The main global handle to a stream, stored in a shared pointer. + * @brief The main global handle to a stream, stored in a weak pointer, if it + * does not yet exist, via StreamMgr::getInstance, a new stream mgr is created. + * It also makes sure that the stream manager is deleted once the latest handle + * to it has been destroyed (no global stuff left). */ -std::shared_ptr _mgr; +std::weak_ptr _mgr; -std::shared_ptr StreamMgr::getInstance() { + + +/** + * @brief The only way to obtain a stream manager, can only be called from the + * thread that does it the first time. + * + * @return Stream manager handle + */ +SmgrHandle StreamMgr::getInstance() { DEBUGTRACE_ENTER; - if (!_mgr) { - _mgr = std::shared_ptr(new StreamMgr()); - if (!_mgr) { + auto mgr = _mgr.lock(); + if (!mgr) { + + mgr = SmgrHandle(new StreamMgr()); + if (!mgr) { throw rte("Fatal: could not allocate stream manager!"); } + // Update global weak pointer + _mgr = mgr; + return mgr; } +#if LASP_DEBUG == 1 + // Make sure we never ask for a new SmgrHandle from a different thread. + assert(std::this_thread::get_id() == mgr->main_thread_id); +#endif - return _mgr; + return mgr; } -StreamMgr::StreamMgr() { - DEBUGTRACE_ENTER; +StreamMgr::StreamMgr() #if LASP_DEBUG == 1 - _main_thread_id = std::this_thread::get_id(); + : main_thread_id(std::this_thread::get_id()) #endif +{ + DEBUGTRACE_ENTER; // Trigger a scan for the available devices, in the background. rescanDAQDevices(true); } #if LASP_DEBUG == 1 void StreamMgr::checkRightThread() const { - assert(std::this_thread::get_id() == _main_thread_id); + assert(std::this_thread::get_id() == main_thread_id); } #endif @@ -65,6 +86,7 @@ void StreamMgr::rescanDAQDevices(bool background, if (!background) { rescanDAQDevices_impl(callback); } else { + DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread..."); pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); } } @@ -76,7 +98,7 @@ void StreamMgr::rescanDAQDevices_impl(std::function callback) { callback(); } } -bool StreamMgr::inCallback(const DaqData &data) { +void StreamMgr::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -107,23 +129,15 @@ bool StreamMgr::inCallback(const DaqData &data) { } for (auto &handler : _inDataHandlers) { - bool res = handler->inCallback(input_filtered); - if (!res) { - return false; - } + handler->inCallback(input_filtered); } } else { /// No input filters for (auto &handler : _inDataHandlers) { - - bool res = handler->inCallback(data); - if (!res) { - return false; - } + handler->inCallback(data); } } - return true; } void StreamMgr::setSiggen(std::shared_ptr siggen) { @@ -187,7 +201,7 @@ template bool fillData(DaqData &data, const vd &signal) { return true; } -bool StreamMgr::outCallback(DaqData &data) { +void StreamMgr::outCallback(DaqData &data) { /* DEBUGTRACE_ENTER; */ @@ -216,7 +230,6 @@ bool StreamMgr::outCallback(DaqData &data) { // Set all values to 0. std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0); } - return true; } StreamMgr::~StreamMgr() { diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index bb60cd0..80494b0 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -183,17 +183,17 @@ class StreamMgr { /** * @brief Set active signal generator for output streams. Only one `Siggen' * is active at the same time. Siggen controls its own data race protection - * using a mutex. + * using a mutex. If no Siggen is there, and an output stream is running, it + * will send a default signal of 0. * * @param s New Siggen pointer */ void setSiggen(std::shared_ptr s); private: - bool inCallback(const DaqData &data); - bool outCallback(DaqData &data); + void inCallback(const DaqData &data); + void outCallback(DaqData &data); - void removeInDataHandler(InDataHandler &handler); /** * @brief Add an input data handler. The handler's inCallback() function is @@ -205,6 +205,12 @@ private: */ void addInDataHandler(InDataHandler *handler); + /** + * @brief Remove InDataHandler from the list. + * + * @param handler + */ + void removeInDataHandler(InDataHandler &handler); /** * @brief Do the actual rescanning. * @@ -213,7 +219,7 @@ private: void rescanDAQDevices_impl(std::function callback); #if LASP_DEBUG == 1 - std::thread::id _main_thread_id; + const std::thread::id main_thread_id; void checkRightThread() const; #else void checkRightThread() const {} diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index 0524cc4..04c21e5 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -7,6 +7,12 @@ #include "lasp_uldaq_impl.h" #include +/** + * @brief The maximum number of devices that can be enumerated when calling + * ulGetDaqDeviceInventory() + */ +const us MAX_ULDAQ_DEV_COUNT_PER_API = 100; + void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { DEBUGTRACE_ENTER; diff --git a/src/lasp/device/lasp_uldaq.h b/src/lasp/device/lasp_uldaq.h index 26e2305..0b54961 100644 --- a/src/lasp/device/lasp_uldaq.h +++ b/src/lasp/device/lasp_uldaq.h @@ -1,18 +1,24 @@ #pragma once #include "lasp_daq.h" -/** - * @brief The maximum number of devices that can be enumerated when calling - * ulGetDaqDeviceInventory() +/** \addtogroup device + * \defgroup uldaq UlDAQ specific code + * This code is used to interface with UlDAQ compatible devices. It is only + * tested on Linux. + * @{ + * \addtogroup uldaq + * @{ */ -const us MAX_ULDAQ_DEV_COUNT_PER_API = 100; std::unique_ptr createUlDaqDevice(const DeviceInfo &devinfo, const DaqConfiguration &config); /** - * @brief Fill device info list with UlDaq specific devices, if any. + * @brief Append device info list with UlDaq specific devices, if any. * * @param devinfolist Info list to append to. */ void fillUlDaqDeviceInfo(DeviceInfoList& devinfolist); + +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp index 7cc41c9..1eb9389 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp @@ -134,13 +134,13 @@ bool InBufHandler::operator()() { if (transferStatus.currentIndex < (long long)buffer_mid_idx) { topenqueued = false; if (!botenqueued) { - ret = runCallback(nchannels * nFramesPerBlock); + runCallback(nchannels * nFramesPerBlock); botenqueued = true; } } else { botenqueued = false; if (!topenqueued) { - ret = runCallback(0); + runCallback(0); topenqueued = true; } } @@ -214,8 +214,8 @@ bool OutBufHandler::operator()() { if (!botenqueued) { DaqData d(nFramesPerBlock, 1,// Only one output channel dtype_descr.dtype); - // Receive data - res = cb(d); + // Receive data, run callback + cb(d); d.copyToRaw(0, reinterpret_cast(&(buf[buffer_mid_idx]))); botenqueued = true; @@ -226,7 +226,7 @@ bool OutBufHandler::operator()() { DaqData d(nFramesPerBlock, 1,// Only one output channel dtype_descr.dtype); // Receive - res = cb(d); + cb(d); d.copyToRaw(0, reinterpret_cast(&(buf[0]))); topenqueued = true; diff --git a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h index 05ca6ae..f1ea849 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h @@ -5,11 +5,16 @@ #include "lasp_uldaq_common.h" -class DT9837A; +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ /** * @brief Helper class for managing input and output samples of the DAQ device. */ + +class DT9837A; class BufHandler { protected: /** @@ -28,6 +33,9 @@ protected: * @brief Sampling frequency in Hz */ double samplerate; + /** + * @brief Storage capacity for the DAQ I/O. + */ std::vector buf; /** * @brief Whether the top part of the buffer is enqueued @@ -97,3 +105,5 @@ public: ~OutBufHandler(); }; +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_common.h b/src/lasp/device/uldaq/lasp_uldaq_common.h index 371974c..f9ac26a 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_common.h +++ b/src/lasp/device/uldaq/lasp_uldaq_common.h @@ -3,6 +3,10 @@ #include #include "lasp_deviceinfo.h" +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ /** * @brief Throws an appropriate stream exception based on the UlError number. * The mapping is based on the error numbers as given in uldaq.h. There are a @@ -58,3 +62,5 @@ const std::vector ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000, 47250, 48000, 50000, 50400, 51000}; +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.cpp b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp index 32419dc..86b00f5 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_impl.cpp +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp @@ -91,15 +91,15 @@ void DT9837A::stop() { StreamStatus status = _streamStatus; status.isRunning = true; _streamStatus = status; - /* if (!isRunning()) { */ - /* throw rte("No data acquisition running"); */ - /* } */ + if (!isRunning()) { + throw rte("No data acquisition running"); + } // Stop the thread and join it - /* _stopThread = true; */ - /* assert(_thread.joinable()); */ - /* _thread.join(); */ - /* _stopThread = false; */ + _stopThread = true; + assert(_thread.joinable()); + _thread.join(); + _stopThread = false; // Update stream status status.isRunning = false; @@ -120,8 +120,8 @@ void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { throw rte("DAQ requires a callback for output data"); } assert(neninchannels() + nenoutchannels() > 0); - /* _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); - */ + _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); + } void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.h b/src/lasp/device/uldaq/lasp_uldaq_impl.h index aa170d0..648e121 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_impl.h +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.h @@ -18,7 +18,14 @@ using rte = std::runtime_error; class InBufHandler; class OutBufHandler; +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ +/** + * @brief Data translation DT9837A Daq device. + */ class DT9837A : public Daq { DaqDeviceHandle _handle = 0; @@ -71,6 +78,11 @@ public: virtual ~DT9837A(); + /** + * @brief Returns true when the stream is running + * + * @return as above stated + */ bool isRunning() const; /** @@ -94,3 +106,5 @@ public: } }; +/** @} */ +/** @} */ diff --git a/src/lasp/dsp/lasp_clip.cpp b/src/lasp/dsp/lasp_clip.cpp index fe37945..0be004c 100644 --- a/src/lasp/dsp/lasp_clip.cpp +++ b/src/lasp/dsp/lasp_clip.cpp @@ -18,7 +18,7 @@ ClipHandler::ClipHandler(SmgrHandle mgr) startThread(); } -bool ClipHandler::inCallback_threaded(const DaqData &d) { +void ClipHandler::inCallback(const DaqData &d) { DEBUGTRACE_ENTER; Lck lck(_mtx); @@ -52,7 +52,6 @@ bool ClipHandler::inCallback_threaded(const DaqData &d) { _clip_time(i) += _dt; } } - return true; } arma::uvec ClipHandler::getCurrentValue() const { diff --git a/src/lasp/dsp/lasp_clip.h b/src/lasp/dsp/lasp_clip.h index ec2dfd5..2294558 100644 --- a/src/lasp/dsp/lasp_clip.h +++ b/src/lasp/dsp/lasp_clip.h @@ -21,7 +21,7 @@ /** * @brief Clipping detector (Clip). Detects when a signal overdrives the input * */ -class ClipHandler: public ThreadedInDataHandler { +class ClipHandler: public ThreadedInDataHandler { /** * @brief Assuming full scale of a signal is +/- 1.0. If a value is found @@ -68,8 +68,8 @@ class ClipHandler: public ThreadedInDataHandler { */ arma::uvec getCurrentValue() const; - bool inCallback_threaded(const DaqData& ) override final; - void reset(const Daq*) override final; + void inCallback(const DaqData& ); + void reset(const Daq*); }; diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 4cb185d..aa92f6b 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -12,13 +12,13 @@ using Lck = std::scoped_lock; using rte = std::runtime_error; PPMHandler::PPMHandler(SmgrHandle mgr, const d decay_dBps) - : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { + : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { DEBUGTRACE_ENTER; startThread(); } -bool PPMHandler::inCallback_threaded(const DaqData &d) { +void PPMHandler::inCallback(const DaqData &d) { DEBUGTRACE_ENTER; Lck lck(_mtx); @@ -64,12 +64,11 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) { _cur_max(i) *= _alpha; } } - return true; } std::tuple PPMHandler::getCurrentValue() const { - DEBUGTRACE_ENTER; + /* DEBUGTRACE_ENTER; */ Lck lck(_mtx); arma::uvec clips(_clip_time.size(), arma::fill::zeros); @@ -85,9 +84,11 @@ void PPMHandler::reset(const Daq *daq) { if (daq) { + DEBUGTRACE_PRINT("New daq found"); _cur_max.fill(1e-80); const us nchannels = daq->neninchannels(); + DEBUGTRACE_PRINT(nchannels); _max_range.resize(nchannels); dvec ranges = daq->inputRangeForEnabledChannels(); diff --git a/src/lasp/dsp/lasp_ppm.h b/src/lasp/dsp/lasp_ppm.h index aecf02d..b30ecc6 100644 --- a/src/lasp/dsp/lasp_ppm.h +++ b/src/lasp/dsp/lasp_ppm.h @@ -4,7 +4,6 @@ // // Description: Peak Programme Meter #pragma once -#include #include "lasp_filter.h" #include "lasp_mathtypes.h" #include "lasp_threadedindatahandler.h" @@ -23,7 +22,7 @@ * with a certain amount of dB/s. If a new peak is found, it goes up again. * Also detects clipping. * */ -class PPMHandler: public ThreadedInDataHandler { +class PPMHandler : public ThreadedInDataHandler { /** * @brief Assuming full scale of a signal is +/- 1.0. If a value is found @@ -69,7 +68,7 @@ class PPMHandler: public ThreadedInDataHandler { /** * @brief Constructs Peak Programme Meter * - * @param mgr Stream Mgr to operate on + * @param mgr Stream Mgr to install callbacks for * @param decay_dBps The level decay in units dB/s, after a peak has been * hit. */ @@ -91,8 +90,8 @@ class PPMHandler: public ThreadedInDataHandler { * * @return true when stream should continue. */ - bool inCallback_threaded(const DaqData& d) override final; - void reset(const Daq*) override final; + void inCallback(const DaqData& d); + void reset(const Daq*); }; diff --git a/src/lasp/dsp/lasp_rtaps.cpp b/src/lasp/dsp/lasp_rtaps.cpp index 9e66528..96d9802 100644 --- a/src/lasp/dsp/lasp_rtaps.cpp +++ b/src/lasp/dsp/lasp_rtaps.cpp @@ -25,7 +25,7 @@ RtAps::RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, RtAps::~RtAps() { stopThread(); } -bool RtAps::inCallback_threaded(const DaqData &data) { +void RtAps::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -35,7 +35,7 @@ bool RtAps::inCallback_threaded(const DaqData &data) { const us nchannels = fltdata.n_cols; if(nchannels != _sens.size()) { cerr << "**** Error: sensitivity size does not match! *****" << endl; - return false; + return; } fltdata.each_row() %= _sens.as_row(); @@ -63,7 +63,6 @@ bool RtAps::inCallback_threaded(const DaqData &data) { _ps.compute(fltdata); - return true; } void RtAps::reset(const Daq *daq) { diff --git a/src/lasp/dsp/lasp_rtaps.h b/src/lasp/dsp/lasp_rtaps.h index 006ce1d..b55240f 100644 --- a/src/lasp/dsp/lasp_rtaps.h +++ b/src/lasp/dsp/lasp_rtaps.h @@ -23,7 +23,7 @@ * @brief Real time spectral estimator using Welch method of spectral * estimation. */ -class RtAps : public ThreadedInDataHandler { +class RtAps : public ThreadedInDataHandler { std::unique_ptr _filterPrototype; std::vector> _freqWeightingFilters; @@ -69,8 +69,8 @@ public: * * @return true if stream should continue. */ - bool inCallback_threaded(const DaqData & d) override final; - void reset(const Daq *) override final; + void inCallback(const DaqData & d); + void reset(const Daq *); }; /** @} */ diff --git a/src/lasp/dsp/lasp_rtsignalviewer.cpp b/src/lasp/dsp/lasp_rtsignalviewer.cpp index d6d06d5..bbef3ad 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.cpp +++ b/src/lasp/dsp/lasp_rtsignalviewer.cpp @@ -27,7 +27,7 @@ RtSignalViewer::RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, startThread(); } -bool RtSignalViewer::inCallback_threaded(const DaqData &data) { +void RtSignalViewer::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -52,8 +52,6 @@ bool RtSignalViewer::inCallback_threaded(const DaqData &data) { _dat(_resolution-1, 1) = newmin; _dat(_resolution-1, 2) = newmax; } - - return true; } RtSignalViewer::~RtSignalViewer() { diff --git a/src/lasp/dsp/lasp_rtsignalviewer.h b/src/lasp/dsp/lasp_rtsignalviewer.h index 0a7676d..253a934 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.h +++ b/src/lasp/dsp/lasp_rtsignalviewer.h @@ -24,7 +24,7 @@ * @brief Real time signal viewer. Shows envelope of the signal based on amount * of history shown. */ -class RtSignalViewer : public ThreadedInDataHandler { +class RtSignalViewer : public ThreadedInDataHandler { /** * @brief Storage for sensitivity values @@ -85,8 +85,8 @@ public: */ dmat getCurrentValue() const; - bool inCallback_threaded(const DaqData &) override final; - void reset(const Daq *) override final; + void inCallback(const DaqData &); + void reset(const Daq *); }; /** @} */ diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 3240787..bc60b86 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -13,6 +13,7 @@ using lck = std::scoped_lock; using rte = std::runtime_error; using std::cerr; using std::endl; +using std::placeholders::_1; class SafeQueue { std::queue _queue; @@ -49,47 +50,50 @@ public: bool empty() const { return _contents == 0; } }; -ThreadedInDataHandler::ThreadedInDataHandler(SmgrHandle mgr) - : InDataHandler(mgr), _queue(std::make_unique()) { +ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr, + InCallbackType cb, + InResetType reset) + : _indatahandler( + mgr, + std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this, + _1), + reset), + _queue(std::make_unique()), inCallback(cb) { DEBUGTRACE_ENTER; // Initialize thread pool, if not already done getPool(); } -void ThreadedInDataHandler::startThread() { +void ThreadedInDataHandlerBase::startThread() { DEBUGTRACE_ENTER; _thread_can_safely_run = true; - start(); + _indatahandler.start(); } -bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { +void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( + const DaqData &daqdata) { DEBUGTRACE_ENTER; std::scoped_lock lck(_mtx); // Early return in case object is under DESTRUCTION if (!_thread_can_safely_run) - return true; - - if (!_lastCallbackResult) { - return false; - } + return; _queue->push(daqdata); - if (!_thread_running && _lastCallbackResult) { + if (!_thread_running) { auto &pool = getPool(); DEBUGTRACE_PRINT("Pushing new thread in pool"); _thread_running = true; - pool.push_task(&ThreadedInDataHandler::threadFcn, this); + pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this); } - - return _lastCallbackResult; } -void ThreadedInDataHandler::stopThread() { + +void ThreadedInDataHandlerBase::stopThread() { DEBUGTRACE_ENTER; // Make sure inCallback is no longer called _thread_can_safely_run = false; - stop(); + _indatahandler.stop(); std::scoped_lock lck(_mtx); @@ -99,7 +103,7 @@ void ThreadedInDataHandler::stopThread() { } } -ThreadedInDataHandler::~ThreadedInDataHandler() { +ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { DEBUGTRACE_ENTER; if (_thread_can_safely_run) { @@ -111,18 +115,14 @@ ThreadedInDataHandler::~ThreadedInDataHandler() { } } -void ThreadedInDataHandler::threadFcn() { +void ThreadedInDataHandlerBase::threadFcn() { DEBUGTRACE_ENTER; while (!_queue->empty() && _thread_can_safely_run) { // Call inCallback_threaded - if (!inCallback_threaded(_queue->pop())) { - cerr << "*********** Callback result returned false! *************" - << endl; - _lastCallbackResult = false; - } + inCallback(_queue->pop()); } _thread_running = false; } diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index 12e162b..0569205 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -1,9 +1,11 @@ #pragma once +#include "debugtrace.hpp" #include "lasp_indatahandler.h" #include #include #include +using std::placeholders::_1; const us RINGBUFFER_SIZE = 1024; /** @@ -16,23 +18,45 @@ const us RINGBUFFER_SIZE = 1024; class SafeQueue; /** - * @brief Threaded in data handler. Buffers inCallback data and calls a - * callback with the same signature on a different thread. + * @brief Threaded in data handler base. Buffers inCallback data and calls a + * callback with the same signature on a different thread. The main function of + * this is to offload the thread that handles the stream, such that expensive + * computations do not result in stream buffer xruns. */ -class ThreadedInDataHandler : protected InDataHandler { +class ThreadedInDataHandlerBase { /** * @brief The queue used to push elements to the handling thread. */ + + InDataHandler _indatahandler; std::unique_ptr _queue; + mutable std::recursive_mutex _mtx; std::atomic _thread_running{false}; - std::atomic _lastCallbackResult{true}; std::atomic _thread_can_safely_run{false}; + /** + * @brief Function pointer that is called when new DaqData arrives. + */ + const InCallbackType inCallback; + void threadFcn(); -protected: + + /** + * @brief Pushes a copy of the daqdata to the thread queue and returns. + * Adds a thread to handle the queue, whihc will call inCallback(); + * + * @param daqdata the daq info to push + * + * @return true, to continue with sampling. + */ + void _inCallbackFromInDataHandler(const DaqData &daqdata); + + public: + ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset); + ~ThreadedInDataHandlerBase(); /** * @brief This method should be called from the derived class' constructor, * to start the thread and data is incoming. @@ -46,33 +70,44 @@ protected: */ void stopThread(); -public: - /** - * @brief Initialize a ThreadedInDataHandler - * - * @param mgr StreamMgr singleton reference - */ - ThreadedInDataHandler(SmgrHandle mgr); - virtual ~ThreadedInDataHandler(); +}; - /** - * @brief Pushes a copy of the daqdata to the thread queue and returns - * - * @param daqdata the daq info to push - * - * @return true, to continue with sampling. - */ - virtual bool inCallback(const DaqData &daqdata) override final; +/** + * @brief A bit of curiously recurring template pattern, to connect the + * specific handlers and connect the proper callbacks in a type-agnostic way. + * Using this class, each threaded handler should just implement its reset() + * and inCallback() method. Ellides the virtual method calls. + * + * Usage: class XHandler: public ThreadedInDataHandler { + * public: + * XHandler(streammgr) : ThreadedInDataHandler(streammgr) {} + * void inCallback(const DaqData& d) { ... do something with d } + * void reset(const Daq* daq) { ... do something with daq } + * }; + * + * For examples, see PPMHandler, etc. + * + * @tparam Derived The + */ +template +class ThreadedInDataHandler : public ThreadedInDataHandlerBase { + public: + ThreadedInDataHandler(SmgrHandle mgr): + ThreadedInDataHandlerBase(mgr, + std::bind(&ThreadedInDataHandler::_inCallback, this, _1), + std::bind(&ThreadedInDataHandler::_reset, this, _1)) + { - /** - * @brief This function should be overridden with an actual implementation, - * of what should happen on a different thread. - * - * @param d Input daq data - * - * @return true on succes. False when an error occured. - */ - virtual bool inCallback_threaded(const DaqData &d) = 0; + } + + void _reset(const Daq* daq) { + DEBUGTRACE_ENTER; + return static_cast(this)->reset(daq); + } + void _inCallback(const DaqData& data) { + DEBUGTRACE_ENTER; + return static_cast(this)->inCallback(data); + } }; /** @} */ diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index b13f0fd..cf4406b 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -96,17 +96,26 @@ py::array_t dmat_to_ndarray(const DaqData &d) { } /** - * @brief Wraps the InDataHandler such that it calls a Python callback with a - * buffer of sample data. The Python callback is called from a different - * thread, using a Numpy array as argument. + * @brief Wraps the ThreadedInDataHandler such that it calls a Python callback with a + * buffer of sample data. Converts DaqData objects to Numpy arrays and calls + * Python given as argument to the constructor */ -class PyIndataHandler : public ThreadedInDataHandler { +class PyIndataHandler : public ThreadedInDataHandler { /** * @brief The callback functions that is called. */ py::function cb, reset_callback; public: + /** + * @brief Initialize PyIndataHandler + * + * @param mgr StreamMgr handle + * @param cb Python callback that is called with Numpy input data from device + * @param reset_callback Python callback that is called with a Daq pointer. + * Careful: do not store this handle, as it is only valid as long as reset() + * is called, when a stream stops, this pointer / handle will dangle. + */ PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) : ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) { @@ -119,7 +128,12 @@ public: DEBUGTRACE_ENTER; stopThread(); } - void reset(const Daq *daq) override final { + /** + * @brief Calls the reset callback in Python. + * + * @param daq Daq device, or nullptr in case no input stream is running. + */ + void reset(const Daq *daq) { DEBUGTRACE_ENTER; py::gil_scoped_acquire acquire; try { @@ -140,9 +154,10 @@ public: } /** - * @brief Reads from the buffer + * @brief Calls the Python callback method / function with a Numpy array of + * stream data. */ - bool inCallback_threaded(const DaqData &d) override final { + void inCallback(const DaqData &d) { /* DEBUGTRACE_ENTER; */ @@ -172,18 +187,15 @@ public: } // End of switch bool res = bool_val.cast(); - if (!res) - return false; } catch (py::error_already_set &e) { cerr << "ERROR: Python raised exception from callback function: "; cerr << e.what() << endl; - return false; + abort(); } catch (py::cast_error &e) { cerr << e.what() << endl; cerr << "ERROR: Python callback does not return boolean value." << endl; - return false; + abort(); } - return true; } }; From 9b724ab9d545a9254f1dceaa5e957e0821c51b52 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Sat, 10 Jun 2023 15:47:52 +0200 Subject: [PATCH 7/7] Made thread pool itself thread safe. Besides, added some extra safety for StreamMgr singleton instance allocation. --- src/lasp/device/lasp_streammgr.cpp | 17 ++++-- src/lasp/device/lasp_streammgr.h | 3 ++ src/lasp/dsp/lasp_biquadbank.cpp | 30 +++++------ src/lasp/dsp/lasp_biquadbank.h | 2 + src/lasp/dsp/lasp_slm.cpp | 3 -- src/lasp/dsp/lasp_slm.h | 2 + src/lasp/dsp/lasp_thread.cpp | 36 ++++++++----- src/lasp/dsp/lasp_thread.h | 60 ++++++++++++++++----- src/lasp/dsp/lasp_threadedindatahandler.cpp | 5 +- src/lasp/dsp/lasp_threadedindatahandler.h | 3 ++ third_party/gsl-lite | 2 +- 11 files changed, 110 insertions(+), 53 deletions(-) diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index e744d6b..58c7534 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -9,6 +9,7 @@ #include #include #include +#include using std::cerr; using std::endl; @@ -21,8 +22,9 @@ using rte = std::runtime_error; * to it has been destroyed (no global stuff left). */ std::weak_ptr _mgr; +std::mutex _mgr_mutex; - +using Lck = std::scoped_lock; /** * @brief The only way to obtain a stream manager, can only be called from the @@ -35,6 +37,14 @@ SmgrHandle StreamMgr::getInstance() { auto mgr = _mgr.lock(); if (!mgr) { + // Double Check Locking Pattern, if two threads would simultaneously + // instantiate the singleton instance. + Lck lck(_mgr_mutex); + + auto mgr = _mgr.lock(); + if (mgr) { + return mgr; + } mgr = SmgrHandle(new StreamMgr()); if (!mgr) { @@ -54,7 +64,7 @@ SmgrHandle StreamMgr::getInstance() { StreamMgr::StreamMgr() #if LASP_DEBUG == 1 - : main_thread_id(std::this_thread::get_id()) + : main_thread_id(std::this_thread::get_id()) #endif { DEBUGTRACE_ENTER; @@ -70,7 +80,6 @@ void StreamMgr::checkRightThread() const { void StreamMgr::rescanDAQDevices(bool background, std::function callback) { DEBUGTRACE_ENTER; - auto &pool = getPool(); checkRightThread(); if (_inputStream || _outputStream) { @@ -87,7 +96,7 @@ void StreamMgr::rescanDAQDevices(bool background, rescanDAQDevices_impl(callback); } else { DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread..."); - pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); + _pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); } } void StreamMgr::rescanDAQDevices_impl(std::function callback) { diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index 80494b0..8955cc6 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -1,6 +1,7 @@ #pragma once #include "lasp_daq.h" #include "lasp_siggen.h" +#include "lasp_thread.h" #include #include #include @@ -30,6 +31,8 @@ class StreamMgr { */ std::unique_ptr _inputStream, _outputStream; + ThreadSafeThreadPool _pool; + /** * @brief All indata handlers are called when input data is available. Note * that they can be called from different threads and should take care of diff --git a/src/lasp/dsp/lasp_biquadbank.cpp b/src/lasp/dsp/lasp_biquadbank.cpp index 09ec834..d31744e 100644 --- a/src/lasp/dsp/lasp_biquadbank.cpp +++ b/src/lasp/dsp/lasp_biquadbank.cpp @@ -44,19 +44,21 @@ SeriesBiquad::SeriesBiquad(const vd &filter_coefs) { SeriesBiquad SeriesBiquad::firstOrderHighPass(const d fs, const d cuton_Hz) { - if(fs <= 0) { + if (fs <= 0) { throw rte("Invalid sampling frequency: " + std::to_string(fs) + " [Hz]"); } - if(cuton_Hz <= 0) { + if (cuton_Hz <= 0) { throw rte("Invalid cuton frequency: " + std::to_string(cuton_Hz) + " [Hz]"); } - if(cuton_Hz >= 0.98*fs/2) { - throw rte("Invalid cuton frequency. We limit this to 0.98* fs / 2. Given value" + std::to_string(cuton_Hz) + " [Hz]"); + if (cuton_Hz >= 0.98 * fs / 2) { + throw rte( + "Invalid cuton frequency. We limit this to 0.98* fs / 2. Given value" + + std::to_string(cuton_Hz) + " [Hz]"); } - const d tau = 1/(2*arma::datum::pi*cuton_Hz); - const d facnum = 2*fs*tau/(1+2*fs*tau); - const d facden = (1-2*fs*tau)/(1+2*fs*tau); + const d tau = 1 / (2 * arma::datum::pi * cuton_Hz); + const d facnum = 2 * fs * tau / (1 + 2 * fs * tau); + const d facden = (1 - 2 * fs * tau) / (1 + 2 * fs * tau); vd coefs(6); // b0 @@ -76,10 +78,8 @@ SeriesBiquad SeriesBiquad::firstOrderHighPass(const d fs, const d cuton_Hz) { coefs(5) = 0; return SeriesBiquad(coefs); - } - std::unique_ptr SeriesBiquad::clone() const { // sos.as_col() concatenates all columns, exactly what we want. return std::make_unique(sos.as_col()); @@ -124,7 +124,6 @@ BiquadBank::BiquadBank(const dmat &filters, const vd *gains) { * for use. */ lock lck(_mtx); - getPool(); for (us i = 0; i < filters.n_cols; i++) { _filters.emplace_back(filters.col(i)); @@ -153,16 +152,15 @@ void BiquadBank::filter(vd &inout) { std::vector> futs; #if 1 - auto &pool = getPool(); vd inout_cpy = inout; for (us i = 0; i < _filters.size(); i++) { - futs.emplace_back(pool.submit( - [&](vd inout, us i) { + futs.emplace_back(_pool.submit( + [&](vd inout, us i) { _filters[i].filter(inout); return inout; - }, // Launch a task to filter. - inout_cpy, i // Column i as argument to the lambda function above. - )); + }, // Launch a task to filter. + inout_cpy, i // Column i as argument to the lambda function above. + )); } // Zero-out in-out and sum-up the filtered values diff --git a/src/lasp/dsp/lasp_biquadbank.h b/src/lasp/dsp/lasp_biquadbank.h index 1426cb5..0328cea 100644 --- a/src/lasp/dsp/lasp_biquadbank.h +++ b/src/lasp/dsp/lasp_biquadbank.h @@ -1,5 +1,6 @@ #pragma once #include "lasp_filter.h" +#include "lasp_thread.h" /** * \addtogroup dsp @@ -60,6 +61,7 @@ public: class BiquadBank : public Filter { std::vector _filters; vd _gains; + ThreadSafeThreadPool _pool; mutable std::mutex _mtx; public: diff --git a/src/lasp/dsp/lasp_slm.cpp b/src/lasp/dsp/lasp_slm.cpp index e934800..74f9b28 100644 --- a/src/lasp/dsp/lasp_slm.cpp +++ b/src/lasp/dsp/lasp_slm.cpp @@ -37,9 +37,6 @@ SLM::SLM(const d fs, const d Lref, const us downsampling_fac, const d tau, DEBUGTRACE_ENTER; DEBUGTRACE_PRINT(_alpha); - // Make sure thread pool is running - getPool(); - if (Lref <= 0) { throw rte("Invalid reference level"); } diff --git a/src/lasp/dsp/lasp_slm.h b/src/lasp/dsp/lasp_slm.h index f4afc0e..1c2d871 100644 --- a/src/lasp/dsp/lasp_slm.h +++ b/src/lasp/dsp/lasp_slm.h @@ -1,6 +1,7 @@ #pragma once #include "lasp_biquadbank.h" #include "lasp_filter.h" +#include "lasp_thread.h" #include #include @@ -14,6 +15,7 @@ * channel. A channel is the result of a filtered signal */ class SLM { + ThreadSafeThreadPool _pool; /** * @brief A, C or Z weighting, depending on the pre-filter installed. */ diff --git a/src/lasp/dsp/lasp_thread.cpp b/src/lasp/dsp/lasp_thread.cpp index 76bc400..a04e051 100644 --- a/src/lasp/dsp/lasp_thread.cpp +++ b/src/lasp/dsp/lasp_thread.cpp @@ -5,21 +5,31 @@ #include /** - * @brief It seems to work much better in cooperation with Pybind11 when this - * singleton is implemented with a unique_ptr. + * @brief Store a global weak_ptr, that is used to create new shared pointers + * if any other shared pointers are still alive. If not, we create a new + * instance. */ -std::unique_ptr _static_storage_threadpool; +std::weak_ptr _global_weak_pool; -void destroyThreadPool() { +/** + * @brief Static storage for the mutex. + */ +std::mutex ThreadSafeThreadPool::_mtx; + +using Lck = std::scoped_lock; +using rte = std::runtime_error; + +ThreadSafeThreadPool::ThreadSafeThreadPool() { DEBUGTRACE_ENTER; - _static_storage_threadpool = nullptr; -} - -BS::thread_pool &getPool() { - /* DEBUGTRACE_ENTER; */ - if (!_static_storage_threadpool) { - DEBUGTRACE_PRINT("Creating new thread pool"); - _static_storage_threadpool = std::make_unique(); + Lck lck(_mtx); + /// See if we can get it from the global ptr. If not, time to allocate it. + _pool = _global_weak_pool.lock(); + if (!_pool) { + _pool = std::make_shared(); + if (!_pool) { + throw rte("Fatal: could not allocate thread pool!"); + } + // Update global weak pointer + _global_weak_pool = _pool; } - return *_static_storage_threadpool; } diff --git a/src/lasp/dsp/lasp_thread.h b/src/lasp/dsp/lasp_thread.h index 8a6c0ca..c28805e 100644 --- a/src/lasp/dsp/lasp_thread.h +++ b/src/lasp/dsp/lasp_thread.h @@ -2,18 +2,54 @@ #include "BS_thread_pool.hpp" /** - * @brief Return reference to global (singleton) thread pool. The threadpool is - * created using the default argument, which results in exactly - * hardware_concurrency() amount of threads. - * - * @return Thread pool ref. + * @brief Simple wrapper around BS::thread_pool that makes a BS::thread_pool a + * singleton, such that a thread pool can be used around in the code, and + * safely spawn threads also from other threads. Only wraps a submit() and + * push_task for now. */ -BS::thread_pool& getPool(); +class ThreadSafeThreadPool { + /** + * @brief Shared access to the thread pool. + */ + std::shared_ptr _pool; + /** + * @brief Global mutex, used to restrict pool access to a single thread at + * once. + */ + static std::mutex _mtx; + + using Lck = std::scoped_lock; + ThreadSafeThreadPool(const ThreadSafeThreadPool&) = delete; + ThreadSafeThreadPool & + operator=(const ThreadSafeThreadPool&) = delete; + +public: + /** + * @brief Instantiate handle to the thread pool. + */ + ThreadSafeThreadPool(); + + + /** + * @brief Wrapper around BS::thread_pool::submit(...) + */ + template < + typename F, typename... A, + typename R = std::invoke_result_t, std::decay_t...>> + [[nodiscard]] std::future submit(F &&task, A &&...args) { + /// Lock access to pool + Lck lck(_mtx); + + return _pool->submit(task, args...); + } + /** + * @brief Wrapper around BS::thread_pool::push_task(...) + */ + template void push_task(F &&task, A &&...args) { + /// Lock access to pool + Lck lck(_mtx); + _pool->push_task(task, args...); + } +}; -/** - * @brief The global thread pool is stored in a unique_ptr, so in normal C++ - * code the thread pool is deleted at the end of main(). However this does not - * hold when LASP code is run - */ -void destroyThreadPool(); diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index bc60b86..340dc1a 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -62,8 +62,6 @@ ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr, DEBUGTRACE_ENTER; - // Initialize thread pool, if not already done - getPool(); } void ThreadedInDataHandlerBase::startThread() { DEBUGTRACE_ENTER; @@ -82,10 +80,9 @@ void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( _queue->push(daqdata); if (!_thread_running) { - auto &pool = getPool(); DEBUGTRACE_PRINT("Pushing new thread in pool"); _thread_running = true; - pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this); + _pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this); } } diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index 0569205..b769ad1 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -1,6 +1,7 @@ #pragma once #include "debugtrace.hpp" #include "lasp_indatahandler.h" +#include "lasp_thread.h" #include #include #include @@ -36,6 +37,8 @@ class ThreadedInDataHandlerBase { std::atomic _thread_running{false}; std::atomic _thread_can_safely_run{false}; + ThreadSafeThreadPool _pool; + /** * @brief Function pointer that is called when new DaqData arrives. */ diff --git a/third_party/gsl-lite b/third_party/gsl-lite index 4720a29..a8c7e5b 160000 --- a/third_party/gsl-lite +++ b/third_party/gsl-lite @@ -1 +1 @@ -Subproject commit 4720a2980a30da085b4ddb4a0ea2a71af7351a48 +Subproject commit a8c7e5bbbd08841836f9b92d72747fb8769dbec4