From 21df1bc6cf71479f28ac7ac177ac46286ba70dee Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Fri, 9 Jun 2023 10:43:04 +0200 Subject: [PATCH] Incallbacks should not return anything anymore. From inheritance to composition for InDataHandler code. StreamMgr singleton only weak ptr stored, this makes sure destruction from Python is more often done. UlDAQ code back to working. --- src/lasp/device/lasp_daq.h | 4 +- src/lasp/device/lasp_indatahandler.cpp | 25 +++-- src/lasp/device/lasp_indatahandler.h | 66 ++++++------- src/lasp/device/lasp_rtaudiodaq.cpp | 13 +-- src/lasp/device/lasp_rtaudiodaq.h | 21 ++++ src/lasp/device/lasp_streammgr.cpp | 61 +++++++----- src/lasp/device/lasp_streammgr.h | 16 +++- src/lasp/device/lasp_uldaq.cpp | 6 ++ src/lasp/device/lasp_uldaq.h | 16 +++- .../device/uldaq/lasp_uldaq_bufhandler.cpp | 10 +- src/lasp/device/uldaq/lasp_uldaq_bufhandler.h | 12 ++- src/lasp/device/uldaq/lasp_uldaq_common.h | 6 ++ src/lasp/device/uldaq/lasp_uldaq_impl.cpp | 18 ++-- src/lasp/device/uldaq/lasp_uldaq_impl.h | 14 +++ src/lasp/dsp/lasp_clip.cpp | 3 +- src/lasp/dsp/lasp_clip.h | 6 +- src/lasp/dsp/lasp_ppm.cpp | 9 +- src/lasp/dsp/lasp_ppm.h | 9 +- src/lasp/dsp/lasp_rtaps.cpp | 5 +- src/lasp/dsp/lasp_rtaps.h | 6 +- src/lasp/dsp/lasp_rtsignalviewer.cpp | 4 +- src/lasp/dsp/lasp_rtsignalviewer.h | 6 +- src/lasp/dsp/lasp_threadedindatahandler.cpp | 46 ++++----- src/lasp/dsp/lasp_threadedindatahandler.h | 95 +++++++++++++------ src/lasp/pybind11/lasp_pyindatahandler.cpp | 36 ++++--- 25 files changed, 322 insertions(+), 191 deletions(-) diff --git a/src/lasp/device/lasp_daq.h b/src/lasp/device/lasp_daq.h index 3fa35ff..e1f9d3d 100644 --- a/src/lasp/device/lasp_daq.h +++ b/src/lasp/device/lasp_daq.h @@ -15,12 +15,12 @@ * @brief Callback of DAQ for input data. Callback should return * false for a stop request. */ -using InDaqCallback = std::function; +using InDaqCallback = std::function; /** * @brief */ -using OutDaqCallback = std::function; +using OutDaqCallback = std::function; /** * @brief Base cass for all DAQ (Data Acquisition) interfaces. A DAQ can be a diff --git a/src/lasp/device/lasp_indatahandler.cpp b/src/lasp/device/lasp_indatahandler.cpp index 1598e4c..8bdeb17 100644 --- a/src/lasp/device/lasp_indatahandler.cpp +++ b/src/lasp/device/lasp_indatahandler.cpp @@ -4,43 +4,56 @@ #include "lasp_streammgr.h" #include -InDataHandler::InDataHandler(SmgrHandle mgr) - : _mgr(mgr) +InDataHandler::InDataHandler(SmgrHandle mgr, const InCallbackType cb, + const InResetType resetfcn) + : _mgr(mgr), inCallback(cb), reset(resetfcn) #if LASP_DEBUG == 1 , - _main_thread_id(std::this_thread::get_id()) + main_thread_id(std::this_thread::get_id()) #endif { DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + assert(mgr->main_thread_id == main_thread_id); +#endif } void InDataHandler::start() { DEBUGTRACE_ENTER; + checkRightThread(); if (SmgrHandle handle = _mgr.lock()) { handle->addInDataHandler(this); #if LASP_DEBUG == 1 - assert(handle->_main_thread_id == _main_thread_id); + assert(handle->main_thread_id == main_thread_id); #endif } } void InDataHandler::stop() { + DEBUGTRACE_ENTER; + checkRightThread(); #if LASP_DEBUG == 1 stopCalled = true; #endif if (SmgrHandle handle = _mgr.lock()) { - /* handle->removeInDataHandler(*this); */ + handle->removeInDataHandler(*this); } } InDataHandler::~InDataHandler() { DEBUGTRACE_ENTER; + checkRightThread(); #if LASP_DEBUG == 1 if (!stopCalled) { std::cerr << "************ BUG: Stop function not called while arriving at " "InDataHandler's destructor. Fix this by calling " - "InDataHandler::stop() from the derived class' destructor." + "InDataHandler::stop()." << std::endl; abort(); } #endif } +#if LASP_DEBUG == 1 +void InDataHandler::checkRightThread() const { + assert(std::this_thread::get_id() == main_thread_id); +} +#endif diff --git a/src/lasp/device/lasp_indatahandler.h b/src/lasp/device/lasp_indatahandler.h index e42824f..2ff47b1 100644 --- a/src/lasp/device/lasp_indatahandler.h +++ b/src/lasp/device/lasp_indatahandler.h @@ -1,17 +1,29 @@ #pragma once +#include "lasp_types.h" #include +#include #include #include -#include "lasp_types.h" class StreamMgr; using SmgrHandle = std::shared_ptr; +class DaqData; +class Daq; + /** \addtogroup device * @{ */ -class DaqData; -class Daq; + +/** + * @brief The function definition of callbacks with incoming DAQ data + */ +using InCallbackType = std::function; +/** + * @brief Function definition for the reset callback. + */ +using InResetType = std::function; + class InDataHandler { protected: @@ -24,53 +36,41 @@ protected: #endif public: - virtual ~InDataHandler(); + ~InDataHandler(); + const InCallbackType inCallback; + const InResetType reset; /** * @brief When constructed, the handler is added to the stream manager, which * will call the handlers's inCallback() until stop() is called. * * @param mgr Stream manager. + * @param cb The callback that is stored, and called on new DAQ data + * @param resetfcn The callback that is stored, and called when the DAQ + * changes state. */ - InDataHandler(SmgrHandle mgr); + InDataHandler(SmgrHandle mgr, InCallbackType cb, + InResetType resetfcn); /** - * @brief This function is called when input data from a DAQ is available. - * - * @param daqdata Input data from DAQ - * - * @return true if no error. False to stop the stream from running. - */ - virtual bool inCallback(const DaqData &daqdata) = 0; - - /** - * @brief Reset in-data handler. - * - * @param daq New DAQ configuration of inCallback(). If nullptr is given, - * it means that the stream is stopped. - */ - virtual void reset(const Daq *daq = nullptr) = 0; - - /** - * @brief This function should be called from the constructor of the - * implementation of InDataHandler, that one than also implements - * `inCallback`. It will start the stream's calling of inCallback(). + * @brief Adds the current InDataHandler to the list of handlers in the + * StreamMgr. After this happens, the reset() method stored in this + * object is called back. When the stream is running, right after this, + * inCallback() is called with DaqData. */ void start(); /** - * @brief This function should be called from the destructor of derived - * classes, to disable the calls to inCallback(), such that proper - * destruction of the object is allowed and no other threads call methods - * from the object. It removes the inCallback() from the callback list of the - * StreamMgr(). **Failing to call this function results in deadlocks, errors - * like "pure virtual function called", or other**. + * @brief Removes the currend InDataHandler from the list of handlers in the + * StreamMgr. From that point on, the object can be safely destroyed. Not + * calling stop() before destruction of this object is considered a BUG. I.e. + * a class which *uses* an InDataHandler should always call stop() in its + * destructor. */ void stop(); -private: #if LASP_DEBUG == 1 - const std::thread::id _main_thread_id; + const std::thread::id main_thread_id; void checkRightThread() const; #else void checkRightThread() const {} diff --git a/src/lasp/device/lasp_rtaudiodaq.cpp b/src/lasp/device/lasp_rtaudiodaq.cpp index f2fef80..b10d54c 100644 --- a/src/lasp/device/lasp_rtaudiodaq.cpp +++ b/src/lasp/device/lasp_rtaudiodaq.cpp @@ -368,11 +368,7 @@ public: DaqData d{nFramesPerBlock, neninchannels, dtype}; d.copyInFromRaw(ptrs); - bool ret = _incallback(d); - if (!ret) { - stopWithError(se::noError); - return 1; - } + _incallback(d); } if (outputBuffer) { @@ -395,11 +391,8 @@ public: } DaqData d{nFramesPerBlock, nenoutchannels, dtype}; - bool ret = _outcallback(d); - if (!ret) { - stopWithError(se::noError); - return 1; - } + _outcallback(d); + // Copy over the buffer us j = 0; for (auto ptr : ptrs) { d.copyToRaw(j, ptr); diff --git a/src/lasp/device/lasp_rtaudiodaq.h b/src/lasp/device/lasp_rtaudiodaq.h index 74b6d3e..288431a 100644 --- a/src/lasp/device/lasp_rtaudiodaq.h +++ b/src/lasp/device/lasp_rtaudiodaq.h @@ -2,6 +2,25 @@ #include "lasp_daq.h" #include +/** \addtogroup device + * @{ + * \defgroup rtaudio RtAudio backend + * This code is used to interface with the RtAudio cross-platform audio + * interface. + * + * \addtogroup rtaudio + * @{ + */ + + +/** + * @brief Method called from Daq::createDaq. + * + * @param devinfo Device info + * @param config DAQ Configuration settings + * + * @return Pointer to Daq instance. Throws Runtime errors on error. + */ std::unique_ptr createRtAudioDevice(const DeviceInfo& devinfo, const DaqConfiguration& config); @@ -12,3 +31,5 @@ std::unique_ptr createRtAudioDevice(const DeviceInfo& devinfo, */ void fillRtAudioDeviceInfo(DeviceInfoList &devinfolist); +/** @} */ +/** @} */ diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index 43e79c1..e744d6b 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -15,34 +15,55 @@ using std::endl; using rte = std::runtime_error; /** - * @brief The main global handle to a stream, stored in a shared pointer. + * @brief The main global handle to a stream, stored in a weak pointer, if it + * does not yet exist, via StreamMgr::getInstance, a new stream mgr is created. + * It also makes sure that the stream manager is deleted once the latest handle + * to it has been destroyed (no global stuff left). */ -std::shared_ptr _mgr; +std::weak_ptr _mgr; -std::shared_ptr StreamMgr::getInstance() { + + +/** + * @brief The only way to obtain a stream manager, can only be called from the + * thread that does it the first time. + * + * @return Stream manager handle + */ +SmgrHandle StreamMgr::getInstance() { DEBUGTRACE_ENTER; - if (!_mgr) { - _mgr = std::shared_ptr(new StreamMgr()); - if (!_mgr) { + auto mgr = _mgr.lock(); + if (!mgr) { + + mgr = SmgrHandle(new StreamMgr()); + if (!mgr) { throw rte("Fatal: could not allocate stream manager!"); } + // Update global weak pointer + _mgr = mgr; + return mgr; } +#if LASP_DEBUG == 1 + // Make sure we never ask for a new SmgrHandle from a different thread. + assert(std::this_thread::get_id() == mgr->main_thread_id); +#endif - return _mgr; + return mgr; } -StreamMgr::StreamMgr() { - DEBUGTRACE_ENTER; +StreamMgr::StreamMgr() #if LASP_DEBUG == 1 - _main_thread_id = std::this_thread::get_id(); + : main_thread_id(std::this_thread::get_id()) #endif +{ + DEBUGTRACE_ENTER; // Trigger a scan for the available devices, in the background. rescanDAQDevices(true); } #if LASP_DEBUG == 1 void StreamMgr::checkRightThread() const { - assert(std::this_thread::get_id() == _main_thread_id); + assert(std::this_thread::get_id() == main_thread_id); } #endif @@ -65,6 +86,7 @@ void StreamMgr::rescanDAQDevices(bool background, if (!background) { rescanDAQDevices_impl(callback); } else { + DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread..."); pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); } } @@ -76,7 +98,7 @@ void StreamMgr::rescanDAQDevices_impl(std::function callback) { callback(); } } -bool StreamMgr::inCallback(const DaqData &data) { +void StreamMgr::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -107,23 +129,15 @@ bool StreamMgr::inCallback(const DaqData &data) { } for (auto &handler : _inDataHandlers) { - bool res = handler->inCallback(input_filtered); - if (!res) { - return false; - } + handler->inCallback(input_filtered); } } else { /// No input filters for (auto &handler : _inDataHandlers) { - - bool res = handler->inCallback(data); - if (!res) { - return false; - } + handler->inCallback(data); } } - return true; } void StreamMgr::setSiggen(std::shared_ptr siggen) { @@ -187,7 +201,7 @@ template bool fillData(DaqData &data, const vd &signal) { return true; } -bool StreamMgr::outCallback(DaqData &data) { +void StreamMgr::outCallback(DaqData &data) { /* DEBUGTRACE_ENTER; */ @@ -216,7 +230,6 @@ bool StreamMgr::outCallback(DaqData &data) { // Set all values to 0. std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0); } - return true; } StreamMgr::~StreamMgr() { diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index bb60cd0..80494b0 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -183,17 +183,17 @@ class StreamMgr { /** * @brief Set active signal generator for output streams. Only one `Siggen' * is active at the same time. Siggen controls its own data race protection - * using a mutex. + * using a mutex. If no Siggen is there, and an output stream is running, it + * will send a default signal of 0. * * @param s New Siggen pointer */ void setSiggen(std::shared_ptr s); private: - bool inCallback(const DaqData &data); - bool outCallback(DaqData &data); + void inCallback(const DaqData &data); + void outCallback(DaqData &data); - void removeInDataHandler(InDataHandler &handler); /** * @brief Add an input data handler. The handler's inCallback() function is @@ -205,6 +205,12 @@ private: */ void addInDataHandler(InDataHandler *handler); + /** + * @brief Remove InDataHandler from the list. + * + * @param handler + */ + void removeInDataHandler(InDataHandler &handler); /** * @brief Do the actual rescanning. * @@ -213,7 +219,7 @@ private: void rescanDAQDevices_impl(std::function callback); #if LASP_DEBUG == 1 - std::thread::id _main_thread_id; + const std::thread::id main_thread_id; void checkRightThread() const; #else void checkRightThread() const {} diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index 0524cc4..04c21e5 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -7,6 +7,12 @@ #include "lasp_uldaq_impl.h" #include +/** + * @brief The maximum number of devices that can be enumerated when calling + * ulGetDaqDeviceInventory() + */ +const us MAX_ULDAQ_DEV_COUNT_PER_API = 100; + void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { DEBUGTRACE_ENTER; diff --git a/src/lasp/device/lasp_uldaq.h b/src/lasp/device/lasp_uldaq.h index 26e2305..0b54961 100644 --- a/src/lasp/device/lasp_uldaq.h +++ b/src/lasp/device/lasp_uldaq.h @@ -1,18 +1,24 @@ #pragma once #include "lasp_daq.h" -/** - * @brief The maximum number of devices that can be enumerated when calling - * ulGetDaqDeviceInventory() +/** \addtogroup device + * \defgroup uldaq UlDAQ specific code + * This code is used to interface with UlDAQ compatible devices. It is only + * tested on Linux. + * @{ + * \addtogroup uldaq + * @{ */ -const us MAX_ULDAQ_DEV_COUNT_PER_API = 100; std::unique_ptr createUlDaqDevice(const DeviceInfo &devinfo, const DaqConfiguration &config); /** - * @brief Fill device info list with UlDaq specific devices, if any. + * @brief Append device info list with UlDaq specific devices, if any. * * @param devinfolist Info list to append to. */ void fillUlDaqDeviceInfo(DeviceInfoList& devinfolist); + +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp index 7cc41c9..1eb9389 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp @@ -134,13 +134,13 @@ bool InBufHandler::operator()() { if (transferStatus.currentIndex < (long long)buffer_mid_idx) { topenqueued = false; if (!botenqueued) { - ret = runCallback(nchannels * nFramesPerBlock); + runCallback(nchannels * nFramesPerBlock); botenqueued = true; } } else { botenqueued = false; if (!topenqueued) { - ret = runCallback(0); + runCallback(0); topenqueued = true; } } @@ -214,8 +214,8 @@ bool OutBufHandler::operator()() { if (!botenqueued) { DaqData d(nFramesPerBlock, 1,// Only one output channel dtype_descr.dtype); - // Receive data - res = cb(d); + // Receive data, run callback + cb(d); d.copyToRaw(0, reinterpret_cast(&(buf[buffer_mid_idx]))); botenqueued = true; @@ -226,7 +226,7 @@ bool OutBufHandler::operator()() { DaqData d(nFramesPerBlock, 1,// Only one output channel dtype_descr.dtype); // Receive - res = cb(d); + cb(d); d.copyToRaw(0, reinterpret_cast(&(buf[0]))); topenqueued = true; diff --git a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h index 05ca6ae..f1ea849 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h @@ -5,11 +5,16 @@ #include "lasp_uldaq_common.h" -class DT9837A; +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ /** * @brief Helper class for managing input and output samples of the DAQ device. */ + +class DT9837A; class BufHandler { protected: /** @@ -28,6 +33,9 @@ protected: * @brief Sampling frequency in Hz */ double samplerate; + /** + * @brief Storage capacity for the DAQ I/O. + */ std::vector buf; /** * @brief Whether the top part of the buffer is enqueued @@ -97,3 +105,5 @@ public: ~OutBufHandler(); }; +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_common.h b/src/lasp/device/uldaq/lasp_uldaq_common.h index 371974c..f9ac26a 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_common.h +++ b/src/lasp/device/uldaq/lasp_uldaq_common.h @@ -3,6 +3,10 @@ #include #include "lasp_deviceinfo.h" +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ /** * @brief Throws an appropriate stream exception based on the UlError number. * The mapping is based on the error numbers as given in uldaq.h. There are a @@ -58,3 +62,5 @@ const std::vector ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000, 47250, 48000, 50000, 50400, 51000}; +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.cpp b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp index 32419dc..86b00f5 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_impl.cpp +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp @@ -91,15 +91,15 @@ void DT9837A::stop() { StreamStatus status = _streamStatus; status.isRunning = true; _streamStatus = status; - /* if (!isRunning()) { */ - /* throw rte("No data acquisition running"); */ - /* } */ + if (!isRunning()) { + throw rte("No data acquisition running"); + } // Stop the thread and join it - /* _stopThread = true; */ - /* assert(_thread.joinable()); */ - /* _thread.join(); */ - /* _stopThread = false; */ + _stopThread = true; + assert(_thread.joinable()); + _thread.join(); + _stopThread = false; // Update stream status status.isRunning = false; @@ -120,8 +120,8 @@ void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { throw rte("DAQ requires a callback for output data"); } assert(neninchannels() + nenoutchannels() > 0); - /* _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); - */ + _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); + } void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.h b/src/lasp/device/uldaq/lasp_uldaq_impl.h index aa170d0..648e121 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_impl.h +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.h @@ -18,7 +18,14 @@ using rte = std::runtime_error; class InBufHandler; class OutBufHandler; +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ +/** + * @brief Data translation DT9837A Daq device. + */ class DT9837A : public Daq { DaqDeviceHandle _handle = 0; @@ -71,6 +78,11 @@ public: virtual ~DT9837A(); + /** + * @brief Returns true when the stream is running + * + * @return as above stated + */ bool isRunning() const; /** @@ -94,3 +106,5 @@ public: } }; +/** @} */ +/** @} */ diff --git a/src/lasp/dsp/lasp_clip.cpp b/src/lasp/dsp/lasp_clip.cpp index fe37945..0be004c 100644 --- a/src/lasp/dsp/lasp_clip.cpp +++ b/src/lasp/dsp/lasp_clip.cpp @@ -18,7 +18,7 @@ ClipHandler::ClipHandler(SmgrHandle mgr) startThread(); } -bool ClipHandler::inCallback_threaded(const DaqData &d) { +void ClipHandler::inCallback(const DaqData &d) { DEBUGTRACE_ENTER; Lck lck(_mtx); @@ -52,7 +52,6 @@ bool ClipHandler::inCallback_threaded(const DaqData &d) { _clip_time(i) += _dt; } } - return true; } arma::uvec ClipHandler::getCurrentValue() const { diff --git a/src/lasp/dsp/lasp_clip.h b/src/lasp/dsp/lasp_clip.h index ec2dfd5..2294558 100644 --- a/src/lasp/dsp/lasp_clip.h +++ b/src/lasp/dsp/lasp_clip.h @@ -21,7 +21,7 @@ /** * @brief Clipping detector (Clip). Detects when a signal overdrives the input * */ -class ClipHandler: public ThreadedInDataHandler { +class ClipHandler: public ThreadedInDataHandler { /** * @brief Assuming full scale of a signal is +/- 1.0. If a value is found @@ -68,8 +68,8 @@ class ClipHandler: public ThreadedInDataHandler { */ arma::uvec getCurrentValue() const; - bool inCallback_threaded(const DaqData& ) override final; - void reset(const Daq*) override final; + void inCallback(const DaqData& ); + void reset(const Daq*); }; diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 4cb185d..aa92f6b 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -12,13 +12,13 @@ using Lck = std::scoped_lock; using rte = std::runtime_error; PPMHandler::PPMHandler(SmgrHandle mgr, const d decay_dBps) - : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { + : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { DEBUGTRACE_ENTER; startThread(); } -bool PPMHandler::inCallback_threaded(const DaqData &d) { +void PPMHandler::inCallback(const DaqData &d) { DEBUGTRACE_ENTER; Lck lck(_mtx); @@ -64,12 +64,11 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) { _cur_max(i) *= _alpha; } } - return true; } std::tuple PPMHandler::getCurrentValue() const { - DEBUGTRACE_ENTER; + /* DEBUGTRACE_ENTER; */ Lck lck(_mtx); arma::uvec clips(_clip_time.size(), arma::fill::zeros); @@ -85,9 +84,11 @@ void PPMHandler::reset(const Daq *daq) { if (daq) { + DEBUGTRACE_PRINT("New daq found"); _cur_max.fill(1e-80); const us nchannels = daq->neninchannels(); + DEBUGTRACE_PRINT(nchannels); _max_range.resize(nchannels); dvec ranges = daq->inputRangeForEnabledChannels(); diff --git a/src/lasp/dsp/lasp_ppm.h b/src/lasp/dsp/lasp_ppm.h index aecf02d..b30ecc6 100644 --- a/src/lasp/dsp/lasp_ppm.h +++ b/src/lasp/dsp/lasp_ppm.h @@ -4,7 +4,6 @@ // // Description: Peak Programme Meter #pragma once -#include #include "lasp_filter.h" #include "lasp_mathtypes.h" #include "lasp_threadedindatahandler.h" @@ -23,7 +22,7 @@ * with a certain amount of dB/s. If a new peak is found, it goes up again. * Also detects clipping. * */ -class PPMHandler: public ThreadedInDataHandler { +class PPMHandler : public ThreadedInDataHandler { /** * @brief Assuming full scale of a signal is +/- 1.0. If a value is found @@ -69,7 +68,7 @@ class PPMHandler: public ThreadedInDataHandler { /** * @brief Constructs Peak Programme Meter * - * @param mgr Stream Mgr to operate on + * @param mgr Stream Mgr to install callbacks for * @param decay_dBps The level decay in units dB/s, after a peak has been * hit. */ @@ -91,8 +90,8 @@ class PPMHandler: public ThreadedInDataHandler { * * @return true when stream should continue. */ - bool inCallback_threaded(const DaqData& d) override final; - void reset(const Daq*) override final; + void inCallback(const DaqData& d); + void reset(const Daq*); }; diff --git a/src/lasp/dsp/lasp_rtaps.cpp b/src/lasp/dsp/lasp_rtaps.cpp index 9e66528..96d9802 100644 --- a/src/lasp/dsp/lasp_rtaps.cpp +++ b/src/lasp/dsp/lasp_rtaps.cpp @@ -25,7 +25,7 @@ RtAps::RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, RtAps::~RtAps() { stopThread(); } -bool RtAps::inCallback_threaded(const DaqData &data) { +void RtAps::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -35,7 +35,7 @@ bool RtAps::inCallback_threaded(const DaqData &data) { const us nchannels = fltdata.n_cols; if(nchannels != _sens.size()) { cerr << "**** Error: sensitivity size does not match! *****" << endl; - return false; + return; } fltdata.each_row() %= _sens.as_row(); @@ -63,7 +63,6 @@ bool RtAps::inCallback_threaded(const DaqData &data) { _ps.compute(fltdata); - return true; } void RtAps::reset(const Daq *daq) { diff --git a/src/lasp/dsp/lasp_rtaps.h b/src/lasp/dsp/lasp_rtaps.h index 006ce1d..b55240f 100644 --- a/src/lasp/dsp/lasp_rtaps.h +++ b/src/lasp/dsp/lasp_rtaps.h @@ -23,7 +23,7 @@ * @brief Real time spectral estimator using Welch method of spectral * estimation. */ -class RtAps : public ThreadedInDataHandler { +class RtAps : public ThreadedInDataHandler { std::unique_ptr _filterPrototype; std::vector> _freqWeightingFilters; @@ -69,8 +69,8 @@ public: * * @return true if stream should continue. */ - bool inCallback_threaded(const DaqData & d) override final; - void reset(const Daq *) override final; + void inCallback(const DaqData & d); + void reset(const Daq *); }; /** @} */ diff --git a/src/lasp/dsp/lasp_rtsignalviewer.cpp b/src/lasp/dsp/lasp_rtsignalviewer.cpp index d6d06d5..bbef3ad 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.cpp +++ b/src/lasp/dsp/lasp_rtsignalviewer.cpp @@ -27,7 +27,7 @@ RtSignalViewer::RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, startThread(); } -bool RtSignalViewer::inCallback_threaded(const DaqData &data) { +void RtSignalViewer::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -52,8 +52,6 @@ bool RtSignalViewer::inCallback_threaded(const DaqData &data) { _dat(_resolution-1, 1) = newmin; _dat(_resolution-1, 2) = newmax; } - - return true; } RtSignalViewer::~RtSignalViewer() { diff --git a/src/lasp/dsp/lasp_rtsignalviewer.h b/src/lasp/dsp/lasp_rtsignalviewer.h index 0a7676d..253a934 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.h +++ b/src/lasp/dsp/lasp_rtsignalviewer.h @@ -24,7 +24,7 @@ * @brief Real time signal viewer. Shows envelope of the signal based on amount * of history shown. */ -class RtSignalViewer : public ThreadedInDataHandler { +class RtSignalViewer : public ThreadedInDataHandler { /** * @brief Storage for sensitivity values @@ -85,8 +85,8 @@ public: */ dmat getCurrentValue() const; - bool inCallback_threaded(const DaqData &) override final; - void reset(const Daq *) override final; + void inCallback(const DaqData &); + void reset(const Daq *); }; /** @} */ diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 3240787..bc60b86 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -13,6 +13,7 @@ using lck = std::scoped_lock; using rte = std::runtime_error; using std::cerr; using std::endl; +using std::placeholders::_1; class SafeQueue { std::queue _queue; @@ -49,47 +50,50 @@ public: bool empty() const { return _contents == 0; } }; -ThreadedInDataHandler::ThreadedInDataHandler(SmgrHandle mgr) - : InDataHandler(mgr), _queue(std::make_unique()) { +ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr, + InCallbackType cb, + InResetType reset) + : _indatahandler( + mgr, + std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this, + _1), + reset), + _queue(std::make_unique()), inCallback(cb) { DEBUGTRACE_ENTER; // Initialize thread pool, if not already done getPool(); } -void ThreadedInDataHandler::startThread() { +void ThreadedInDataHandlerBase::startThread() { DEBUGTRACE_ENTER; _thread_can_safely_run = true; - start(); + _indatahandler.start(); } -bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { +void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( + const DaqData &daqdata) { DEBUGTRACE_ENTER; std::scoped_lock lck(_mtx); // Early return in case object is under DESTRUCTION if (!_thread_can_safely_run) - return true; - - if (!_lastCallbackResult) { - return false; - } + return; _queue->push(daqdata); - if (!_thread_running && _lastCallbackResult) { + if (!_thread_running) { auto &pool = getPool(); DEBUGTRACE_PRINT("Pushing new thread in pool"); _thread_running = true; - pool.push_task(&ThreadedInDataHandler::threadFcn, this); + pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this); } - - return _lastCallbackResult; } -void ThreadedInDataHandler::stopThread() { + +void ThreadedInDataHandlerBase::stopThread() { DEBUGTRACE_ENTER; // Make sure inCallback is no longer called _thread_can_safely_run = false; - stop(); + _indatahandler.stop(); std::scoped_lock lck(_mtx); @@ -99,7 +103,7 @@ void ThreadedInDataHandler::stopThread() { } } -ThreadedInDataHandler::~ThreadedInDataHandler() { +ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { DEBUGTRACE_ENTER; if (_thread_can_safely_run) { @@ -111,18 +115,14 @@ ThreadedInDataHandler::~ThreadedInDataHandler() { } } -void ThreadedInDataHandler::threadFcn() { +void ThreadedInDataHandlerBase::threadFcn() { DEBUGTRACE_ENTER; while (!_queue->empty() && _thread_can_safely_run) { // Call inCallback_threaded - if (!inCallback_threaded(_queue->pop())) { - cerr << "*********** Callback result returned false! *************" - << endl; - _lastCallbackResult = false; - } + inCallback(_queue->pop()); } _thread_running = false; } diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index 12e162b..0569205 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -1,9 +1,11 @@ #pragma once +#include "debugtrace.hpp" #include "lasp_indatahandler.h" #include #include #include +using std::placeholders::_1; const us RINGBUFFER_SIZE = 1024; /** @@ -16,23 +18,45 @@ const us RINGBUFFER_SIZE = 1024; class SafeQueue; /** - * @brief Threaded in data handler. Buffers inCallback data and calls a - * callback with the same signature on a different thread. + * @brief Threaded in data handler base. Buffers inCallback data and calls a + * callback with the same signature on a different thread. The main function of + * this is to offload the thread that handles the stream, such that expensive + * computations do not result in stream buffer xruns. */ -class ThreadedInDataHandler : protected InDataHandler { +class ThreadedInDataHandlerBase { /** * @brief The queue used to push elements to the handling thread. */ + + InDataHandler _indatahandler; std::unique_ptr _queue; + mutable std::recursive_mutex _mtx; std::atomic _thread_running{false}; - std::atomic _lastCallbackResult{true}; std::atomic _thread_can_safely_run{false}; + /** + * @brief Function pointer that is called when new DaqData arrives. + */ + const InCallbackType inCallback; + void threadFcn(); -protected: + + /** + * @brief Pushes a copy of the daqdata to the thread queue and returns. + * Adds a thread to handle the queue, whihc will call inCallback(); + * + * @param daqdata the daq info to push + * + * @return true, to continue with sampling. + */ + void _inCallbackFromInDataHandler(const DaqData &daqdata); + + public: + ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset); + ~ThreadedInDataHandlerBase(); /** * @brief This method should be called from the derived class' constructor, * to start the thread and data is incoming. @@ -46,33 +70,44 @@ protected: */ void stopThread(); -public: - /** - * @brief Initialize a ThreadedInDataHandler - * - * @param mgr StreamMgr singleton reference - */ - ThreadedInDataHandler(SmgrHandle mgr); - virtual ~ThreadedInDataHandler(); +}; - /** - * @brief Pushes a copy of the daqdata to the thread queue and returns - * - * @param daqdata the daq info to push - * - * @return true, to continue with sampling. - */ - virtual bool inCallback(const DaqData &daqdata) override final; +/** + * @brief A bit of curiously recurring template pattern, to connect the + * specific handlers and connect the proper callbacks in a type-agnostic way. + * Using this class, each threaded handler should just implement its reset() + * and inCallback() method. Ellides the virtual method calls. + * + * Usage: class XHandler: public ThreadedInDataHandler { + * public: + * XHandler(streammgr) : ThreadedInDataHandler(streammgr) {} + * void inCallback(const DaqData& d) { ... do something with d } + * void reset(const Daq* daq) { ... do something with daq } + * }; + * + * For examples, see PPMHandler, etc. + * + * @tparam Derived The + */ +template +class ThreadedInDataHandler : public ThreadedInDataHandlerBase { + public: + ThreadedInDataHandler(SmgrHandle mgr): + ThreadedInDataHandlerBase(mgr, + std::bind(&ThreadedInDataHandler::_inCallback, this, _1), + std::bind(&ThreadedInDataHandler::_reset, this, _1)) + { - /** - * @brief This function should be overridden with an actual implementation, - * of what should happen on a different thread. - * - * @param d Input daq data - * - * @return true on succes. False when an error occured. - */ - virtual bool inCallback_threaded(const DaqData &d) = 0; + } + + void _reset(const Daq* daq) { + DEBUGTRACE_ENTER; + return static_cast(this)->reset(daq); + } + void _inCallback(const DaqData& data) { + DEBUGTRACE_ENTER; + return static_cast(this)->inCallback(data); + } }; /** @} */ diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index b13f0fd..cf4406b 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -96,17 +96,26 @@ py::array_t dmat_to_ndarray(const DaqData &d) { } /** - * @brief Wraps the InDataHandler such that it calls a Python callback with a - * buffer of sample data. The Python callback is called from a different - * thread, using a Numpy array as argument. + * @brief Wraps the ThreadedInDataHandler such that it calls a Python callback with a + * buffer of sample data. Converts DaqData objects to Numpy arrays and calls + * Python given as argument to the constructor */ -class PyIndataHandler : public ThreadedInDataHandler { +class PyIndataHandler : public ThreadedInDataHandler { /** * @brief The callback functions that is called. */ py::function cb, reset_callback; public: + /** + * @brief Initialize PyIndataHandler + * + * @param mgr StreamMgr handle + * @param cb Python callback that is called with Numpy input data from device + * @param reset_callback Python callback that is called with a Daq pointer. + * Careful: do not store this handle, as it is only valid as long as reset() + * is called, when a stream stops, this pointer / handle will dangle. + */ PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) : ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) { @@ -119,7 +128,12 @@ public: DEBUGTRACE_ENTER; stopThread(); } - void reset(const Daq *daq) override final { + /** + * @brief Calls the reset callback in Python. + * + * @param daq Daq device, or nullptr in case no input stream is running. + */ + void reset(const Daq *daq) { DEBUGTRACE_ENTER; py::gil_scoped_acquire acquire; try { @@ -140,9 +154,10 @@ public: } /** - * @brief Reads from the buffer + * @brief Calls the Python callback method / function with a Numpy array of + * stream data. */ - bool inCallback_threaded(const DaqData &d) override final { + void inCallback(const DaqData &d) { /* DEBUGTRACE_ENTER; */ @@ -172,18 +187,15 @@ public: } // End of switch bool res = bool_val.cast(); - if (!res) - return false; } catch (py::error_already_set &e) { cerr << "ERROR: Python raised exception from callback function: "; cerr << e.what() << endl; - return false; + abort(); } catch (py::cast_error &e) { cerr << e.what() << endl; cerr << "ERROR: Python callback does not return boolean value." << endl; - return false; + abort(); } - return true; } };