diff --git a/src/lasp/device/lasp_daq.h b/src/lasp/device/lasp_daq.h index 3fa35ff..e1f9d3d 100644 --- a/src/lasp/device/lasp_daq.h +++ b/src/lasp/device/lasp_daq.h @@ -15,12 +15,12 @@ * @brief Callback of DAQ for input data. Callback should return * false for a stop request. */ -using InDaqCallback = std::function; +using InDaqCallback = std::function; /** * @brief */ -using OutDaqCallback = std::function; +using OutDaqCallback = std::function; /** * @brief Base cass for all DAQ (Data Acquisition) interfaces. A DAQ can be a diff --git a/src/lasp/device/lasp_indatahandler.cpp b/src/lasp/device/lasp_indatahandler.cpp index 1598e4c..8bdeb17 100644 --- a/src/lasp/device/lasp_indatahandler.cpp +++ b/src/lasp/device/lasp_indatahandler.cpp @@ -4,43 +4,56 @@ #include "lasp_streammgr.h" #include -InDataHandler::InDataHandler(SmgrHandle mgr) - : _mgr(mgr) +InDataHandler::InDataHandler(SmgrHandle mgr, const InCallbackType cb, + const InResetType resetfcn) + : _mgr(mgr), inCallback(cb), reset(resetfcn) #if LASP_DEBUG == 1 , - _main_thread_id(std::this_thread::get_id()) + main_thread_id(std::this_thread::get_id()) #endif { DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + assert(mgr->main_thread_id == main_thread_id); +#endif } void InDataHandler::start() { DEBUGTRACE_ENTER; + checkRightThread(); if (SmgrHandle handle = _mgr.lock()) { handle->addInDataHandler(this); #if LASP_DEBUG == 1 - assert(handle->_main_thread_id == _main_thread_id); + assert(handle->main_thread_id == main_thread_id); #endif } } void InDataHandler::stop() { + DEBUGTRACE_ENTER; + checkRightThread(); #if LASP_DEBUG == 1 stopCalled = true; #endif if (SmgrHandle handle = _mgr.lock()) { - /* handle->removeInDataHandler(*this); */ + handle->removeInDataHandler(*this); } } InDataHandler::~InDataHandler() { DEBUGTRACE_ENTER; + checkRightThread(); #if LASP_DEBUG == 1 if (!stopCalled) { std::cerr << "************ BUG: Stop function not called while arriving at " "InDataHandler's destructor. Fix this by calling " - "InDataHandler::stop() from the derived class' destructor." + "InDataHandler::stop()." << std::endl; abort(); } #endif } +#if LASP_DEBUG == 1 +void InDataHandler::checkRightThread() const { + assert(std::this_thread::get_id() == main_thread_id); +} +#endif diff --git a/src/lasp/device/lasp_indatahandler.h b/src/lasp/device/lasp_indatahandler.h index e42824f..2ff47b1 100644 --- a/src/lasp/device/lasp_indatahandler.h +++ b/src/lasp/device/lasp_indatahandler.h @@ -1,17 +1,29 @@ #pragma once +#include "lasp_types.h" #include +#include #include #include -#include "lasp_types.h" class StreamMgr; using SmgrHandle = std::shared_ptr; +class DaqData; +class Daq; + /** \addtogroup device * @{ */ -class DaqData; -class Daq; + +/** + * @brief The function definition of callbacks with incoming DAQ data + */ +using InCallbackType = std::function; +/** + * @brief Function definition for the reset callback. + */ +using InResetType = std::function; + class InDataHandler { protected: @@ -24,53 +36,41 @@ protected: #endif public: - virtual ~InDataHandler(); + ~InDataHandler(); + const InCallbackType inCallback; + const InResetType reset; /** * @brief When constructed, the handler is added to the stream manager, which * will call the handlers's inCallback() until stop() is called. * * @param mgr Stream manager. + * @param cb The callback that is stored, and called on new DAQ data + * @param resetfcn The callback that is stored, and called when the DAQ + * changes state. */ - InDataHandler(SmgrHandle mgr); + InDataHandler(SmgrHandle mgr, InCallbackType cb, + InResetType resetfcn); /** - * @brief This function is called when input data from a DAQ is available. - * - * @param daqdata Input data from DAQ - * - * @return true if no error. False to stop the stream from running. - */ - virtual bool inCallback(const DaqData &daqdata) = 0; - - /** - * @brief Reset in-data handler. - * - * @param daq New DAQ configuration of inCallback(). If nullptr is given, - * it means that the stream is stopped. - */ - virtual void reset(const Daq *daq = nullptr) = 0; - - /** - * @brief This function should be called from the constructor of the - * implementation of InDataHandler, that one than also implements - * `inCallback`. It will start the stream's calling of inCallback(). + * @brief Adds the current InDataHandler to the list of handlers in the + * StreamMgr. After this happens, the reset() method stored in this + * object is called back. When the stream is running, right after this, + * inCallback() is called with DaqData. */ void start(); /** - * @brief This function should be called from the destructor of derived - * classes, to disable the calls to inCallback(), such that proper - * destruction of the object is allowed and no other threads call methods - * from the object. It removes the inCallback() from the callback list of the - * StreamMgr(). **Failing to call this function results in deadlocks, errors - * like "pure virtual function called", or other**. + * @brief Removes the currend InDataHandler from the list of handlers in the + * StreamMgr. From that point on, the object can be safely destroyed. Not + * calling stop() before destruction of this object is considered a BUG. I.e. + * a class which *uses* an InDataHandler should always call stop() in its + * destructor. */ void stop(); -private: #if LASP_DEBUG == 1 - const std::thread::id _main_thread_id; + const std::thread::id main_thread_id; void checkRightThread() const; #else void checkRightThread() const {} diff --git a/src/lasp/device/lasp_rtaudiodaq.cpp b/src/lasp/device/lasp_rtaudiodaq.cpp index f2fef80..b10d54c 100644 --- a/src/lasp/device/lasp_rtaudiodaq.cpp +++ b/src/lasp/device/lasp_rtaudiodaq.cpp @@ -368,11 +368,7 @@ public: DaqData d{nFramesPerBlock, neninchannels, dtype}; d.copyInFromRaw(ptrs); - bool ret = _incallback(d); - if (!ret) { - stopWithError(se::noError); - return 1; - } + _incallback(d); } if (outputBuffer) { @@ -395,11 +391,8 @@ public: } DaqData d{nFramesPerBlock, nenoutchannels, dtype}; - bool ret = _outcallback(d); - if (!ret) { - stopWithError(se::noError); - return 1; - } + _outcallback(d); + // Copy over the buffer us j = 0; for (auto ptr : ptrs) { d.copyToRaw(j, ptr); diff --git a/src/lasp/device/lasp_rtaudiodaq.h b/src/lasp/device/lasp_rtaudiodaq.h index 74b6d3e..288431a 100644 --- a/src/lasp/device/lasp_rtaudiodaq.h +++ b/src/lasp/device/lasp_rtaudiodaq.h @@ -2,6 +2,25 @@ #include "lasp_daq.h" #include +/** \addtogroup device + * @{ + * \defgroup rtaudio RtAudio backend + * This code is used to interface with the RtAudio cross-platform audio + * interface. + * + * \addtogroup rtaudio + * @{ + */ + + +/** + * @brief Method called from Daq::createDaq. + * + * @param devinfo Device info + * @param config DAQ Configuration settings + * + * @return Pointer to Daq instance. Throws Runtime errors on error. + */ std::unique_ptr createRtAudioDevice(const DeviceInfo& devinfo, const DaqConfiguration& config); @@ -12,3 +31,5 @@ std::unique_ptr createRtAudioDevice(const DeviceInfo& devinfo, */ void fillRtAudioDeviceInfo(DeviceInfoList &devinfolist); +/** @} */ +/** @} */ diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index 43e79c1..e744d6b 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -15,34 +15,55 @@ using std::endl; using rte = std::runtime_error; /** - * @brief The main global handle to a stream, stored in a shared pointer. + * @brief The main global handle to a stream, stored in a weak pointer, if it + * does not yet exist, via StreamMgr::getInstance, a new stream mgr is created. + * It also makes sure that the stream manager is deleted once the latest handle + * to it has been destroyed (no global stuff left). */ -std::shared_ptr _mgr; +std::weak_ptr _mgr; -std::shared_ptr StreamMgr::getInstance() { + + +/** + * @brief The only way to obtain a stream manager, can only be called from the + * thread that does it the first time. + * + * @return Stream manager handle + */ +SmgrHandle StreamMgr::getInstance() { DEBUGTRACE_ENTER; - if (!_mgr) { - _mgr = std::shared_ptr(new StreamMgr()); - if (!_mgr) { + auto mgr = _mgr.lock(); + if (!mgr) { + + mgr = SmgrHandle(new StreamMgr()); + if (!mgr) { throw rte("Fatal: could not allocate stream manager!"); } + // Update global weak pointer + _mgr = mgr; + return mgr; } +#if LASP_DEBUG == 1 + // Make sure we never ask for a new SmgrHandle from a different thread. + assert(std::this_thread::get_id() == mgr->main_thread_id); +#endif - return _mgr; + return mgr; } -StreamMgr::StreamMgr() { - DEBUGTRACE_ENTER; +StreamMgr::StreamMgr() #if LASP_DEBUG == 1 - _main_thread_id = std::this_thread::get_id(); + : main_thread_id(std::this_thread::get_id()) #endif +{ + DEBUGTRACE_ENTER; // Trigger a scan for the available devices, in the background. rescanDAQDevices(true); } #if LASP_DEBUG == 1 void StreamMgr::checkRightThread() const { - assert(std::this_thread::get_id() == _main_thread_id); + assert(std::this_thread::get_id() == main_thread_id); } #endif @@ -65,6 +86,7 @@ void StreamMgr::rescanDAQDevices(bool background, if (!background) { rescanDAQDevices_impl(callback); } else { + DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread..."); pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); } } @@ -76,7 +98,7 @@ void StreamMgr::rescanDAQDevices_impl(std::function callback) { callback(); } } -bool StreamMgr::inCallback(const DaqData &data) { +void StreamMgr::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -107,23 +129,15 @@ bool StreamMgr::inCallback(const DaqData &data) { } for (auto &handler : _inDataHandlers) { - bool res = handler->inCallback(input_filtered); - if (!res) { - return false; - } + handler->inCallback(input_filtered); } } else { /// No input filters for (auto &handler : _inDataHandlers) { - - bool res = handler->inCallback(data); - if (!res) { - return false; - } + handler->inCallback(data); } } - return true; } void StreamMgr::setSiggen(std::shared_ptr siggen) { @@ -187,7 +201,7 @@ template bool fillData(DaqData &data, const vd &signal) { return true; } -bool StreamMgr::outCallback(DaqData &data) { +void StreamMgr::outCallback(DaqData &data) { /* DEBUGTRACE_ENTER; */ @@ -216,7 +230,6 @@ bool StreamMgr::outCallback(DaqData &data) { // Set all values to 0. std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0); } - return true; } StreamMgr::~StreamMgr() { diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index bb60cd0..80494b0 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -183,17 +183,17 @@ class StreamMgr { /** * @brief Set active signal generator for output streams. Only one `Siggen' * is active at the same time. Siggen controls its own data race protection - * using a mutex. + * using a mutex. If no Siggen is there, and an output stream is running, it + * will send a default signal of 0. * * @param s New Siggen pointer */ void setSiggen(std::shared_ptr s); private: - bool inCallback(const DaqData &data); - bool outCallback(DaqData &data); + void inCallback(const DaqData &data); + void outCallback(DaqData &data); - void removeInDataHandler(InDataHandler &handler); /** * @brief Add an input data handler. The handler's inCallback() function is @@ -205,6 +205,12 @@ private: */ void addInDataHandler(InDataHandler *handler); + /** + * @brief Remove InDataHandler from the list. + * + * @param handler + */ + void removeInDataHandler(InDataHandler &handler); /** * @brief Do the actual rescanning. * @@ -213,7 +219,7 @@ private: void rescanDAQDevices_impl(std::function callback); #if LASP_DEBUG == 1 - std::thread::id _main_thread_id; + const std::thread::id main_thread_id; void checkRightThread() const; #else void checkRightThread() const {} diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index 0524cc4..04c21e5 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -7,6 +7,12 @@ #include "lasp_uldaq_impl.h" #include +/** + * @brief The maximum number of devices that can be enumerated when calling + * ulGetDaqDeviceInventory() + */ +const us MAX_ULDAQ_DEV_COUNT_PER_API = 100; + void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { DEBUGTRACE_ENTER; diff --git a/src/lasp/device/lasp_uldaq.h b/src/lasp/device/lasp_uldaq.h index 26e2305..0b54961 100644 --- a/src/lasp/device/lasp_uldaq.h +++ b/src/lasp/device/lasp_uldaq.h @@ -1,18 +1,24 @@ #pragma once #include "lasp_daq.h" -/** - * @brief The maximum number of devices that can be enumerated when calling - * ulGetDaqDeviceInventory() +/** \addtogroup device + * \defgroup uldaq UlDAQ specific code + * This code is used to interface with UlDAQ compatible devices. It is only + * tested on Linux. + * @{ + * \addtogroup uldaq + * @{ */ -const us MAX_ULDAQ_DEV_COUNT_PER_API = 100; std::unique_ptr createUlDaqDevice(const DeviceInfo &devinfo, const DaqConfiguration &config); /** - * @brief Fill device info list with UlDaq specific devices, if any. + * @brief Append device info list with UlDaq specific devices, if any. * * @param devinfolist Info list to append to. */ void fillUlDaqDeviceInfo(DeviceInfoList& devinfolist); + +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp index 7cc41c9..1eb9389 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp @@ -134,13 +134,13 @@ bool InBufHandler::operator()() { if (transferStatus.currentIndex < (long long)buffer_mid_idx) { topenqueued = false; if (!botenqueued) { - ret = runCallback(nchannels * nFramesPerBlock); + runCallback(nchannels * nFramesPerBlock); botenqueued = true; } } else { botenqueued = false; if (!topenqueued) { - ret = runCallback(0); + runCallback(0); topenqueued = true; } } @@ -214,8 +214,8 @@ bool OutBufHandler::operator()() { if (!botenqueued) { DaqData d(nFramesPerBlock, 1,// Only one output channel dtype_descr.dtype); - // Receive data - res = cb(d); + // Receive data, run callback + cb(d); d.copyToRaw(0, reinterpret_cast(&(buf[buffer_mid_idx]))); botenqueued = true; @@ -226,7 +226,7 @@ bool OutBufHandler::operator()() { DaqData d(nFramesPerBlock, 1,// Only one output channel dtype_descr.dtype); // Receive - res = cb(d); + cb(d); d.copyToRaw(0, reinterpret_cast(&(buf[0]))); topenqueued = true; diff --git a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h index 05ca6ae..f1ea849 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h @@ -5,11 +5,16 @@ #include "lasp_uldaq_common.h" -class DT9837A; +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ /** * @brief Helper class for managing input and output samples of the DAQ device. */ + +class DT9837A; class BufHandler { protected: /** @@ -28,6 +33,9 @@ protected: * @brief Sampling frequency in Hz */ double samplerate; + /** + * @brief Storage capacity for the DAQ I/O. + */ std::vector buf; /** * @brief Whether the top part of the buffer is enqueued @@ -97,3 +105,5 @@ public: ~OutBufHandler(); }; +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_common.h b/src/lasp/device/uldaq/lasp_uldaq_common.h index 371974c..f9ac26a 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_common.h +++ b/src/lasp/device/uldaq/lasp_uldaq_common.h @@ -3,6 +3,10 @@ #include #include "lasp_deviceinfo.h" +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ /** * @brief Throws an appropriate stream exception based on the UlError number. * The mapping is based on the error numbers as given in uldaq.h. There are a @@ -58,3 +62,5 @@ const std::vector ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000, 47250, 48000, 50000, 50400, 51000}; +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.cpp b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp index 32419dc..86b00f5 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_impl.cpp +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp @@ -91,15 +91,15 @@ void DT9837A::stop() { StreamStatus status = _streamStatus; status.isRunning = true; _streamStatus = status; - /* if (!isRunning()) { */ - /* throw rte("No data acquisition running"); */ - /* } */ + if (!isRunning()) { + throw rte("No data acquisition running"); + } // Stop the thread and join it - /* _stopThread = true; */ - /* assert(_thread.joinable()); */ - /* _thread.join(); */ - /* _stopThread = false; */ + _stopThread = true; + assert(_thread.joinable()); + _thread.join(); + _stopThread = false; // Update stream status status.isRunning = false; @@ -120,8 +120,8 @@ void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { throw rte("DAQ requires a callback for output data"); } assert(neninchannels() + nenoutchannels() > 0); - /* _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); - */ + _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); + } void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.h b/src/lasp/device/uldaq/lasp_uldaq_impl.h index aa170d0..648e121 100644 --- a/src/lasp/device/uldaq/lasp_uldaq_impl.h +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.h @@ -18,7 +18,14 @@ using rte = std::runtime_error; class InBufHandler; class OutBufHandler; +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ +/** + * @brief Data translation DT9837A Daq device. + */ class DT9837A : public Daq { DaqDeviceHandle _handle = 0; @@ -71,6 +78,11 @@ public: virtual ~DT9837A(); + /** + * @brief Returns true when the stream is running + * + * @return as above stated + */ bool isRunning() const; /** @@ -94,3 +106,5 @@ public: } }; +/** @} */ +/** @} */ diff --git a/src/lasp/dsp/lasp_clip.cpp b/src/lasp/dsp/lasp_clip.cpp index fe37945..0be004c 100644 --- a/src/lasp/dsp/lasp_clip.cpp +++ b/src/lasp/dsp/lasp_clip.cpp @@ -18,7 +18,7 @@ ClipHandler::ClipHandler(SmgrHandle mgr) startThread(); } -bool ClipHandler::inCallback_threaded(const DaqData &d) { +void ClipHandler::inCallback(const DaqData &d) { DEBUGTRACE_ENTER; Lck lck(_mtx); @@ -52,7 +52,6 @@ bool ClipHandler::inCallback_threaded(const DaqData &d) { _clip_time(i) += _dt; } } - return true; } arma::uvec ClipHandler::getCurrentValue() const { diff --git a/src/lasp/dsp/lasp_clip.h b/src/lasp/dsp/lasp_clip.h index ec2dfd5..2294558 100644 --- a/src/lasp/dsp/lasp_clip.h +++ b/src/lasp/dsp/lasp_clip.h @@ -21,7 +21,7 @@ /** * @brief Clipping detector (Clip). Detects when a signal overdrives the input * */ -class ClipHandler: public ThreadedInDataHandler { +class ClipHandler: public ThreadedInDataHandler { /** * @brief Assuming full scale of a signal is +/- 1.0. If a value is found @@ -68,8 +68,8 @@ class ClipHandler: public ThreadedInDataHandler { */ arma::uvec getCurrentValue() const; - bool inCallback_threaded(const DaqData& ) override final; - void reset(const Daq*) override final; + void inCallback(const DaqData& ); + void reset(const Daq*); }; diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 4cb185d..aa92f6b 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -12,13 +12,13 @@ using Lck = std::scoped_lock; using rte = std::runtime_error; PPMHandler::PPMHandler(SmgrHandle mgr, const d decay_dBps) - : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { + : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { DEBUGTRACE_ENTER; startThread(); } -bool PPMHandler::inCallback_threaded(const DaqData &d) { +void PPMHandler::inCallback(const DaqData &d) { DEBUGTRACE_ENTER; Lck lck(_mtx); @@ -64,12 +64,11 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) { _cur_max(i) *= _alpha; } } - return true; } std::tuple PPMHandler::getCurrentValue() const { - DEBUGTRACE_ENTER; + /* DEBUGTRACE_ENTER; */ Lck lck(_mtx); arma::uvec clips(_clip_time.size(), arma::fill::zeros); @@ -85,9 +84,11 @@ void PPMHandler::reset(const Daq *daq) { if (daq) { + DEBUGTRACE_PRINT("New daq found"); _cur_max.fill(1e-80); const us nchannels = daq->neninchannels(); + DEBUGTRACE_PRINT(nchannels); _max_range.resize(nchannels); dvec ranges = daq->inputRangeForEnabledChannels(); diff --git a/src/lasp/dsp/lasp_ppm.h b/src/lasp/dsp/lasp_ppm.h index aecf02d..b30ecc6 100644 --- a/src/lasp/dsp/lasp_ppm.h +++ b/src/lasp/dsp/lasp_ppm.h @@ -4,7 +4,6 @@ // // Description: Peak Programme Meter #pragma once -#include #include "lasp_filter.h" #include "lasp_mathtypes.h" #include "lasp_threadedindatahandler.h" @@ -23,7 +22,7 @@ * with a certain amount of dB/s. If a new peak is found, it goes up again. * Also detects clipping. * */ -class PPMHandler: public ThreadedInDataHandler { +class PPMHandler : public ThreadedInDataHandler { /** * @brief Assuming full scale of a signal is +/- 1.0. If a value is found @@ -69,7 +68,7 @@ class PPMHandler: public ThreadedInDataHandler { /** * @brief Constructs Peak Programme Meter * - * @param mgr Stream Mgr to operate on + * @param mgr Stream Mgr to install callbacks for * @param decay_dBps The level decay in units dB/s, after a peak has been * hit. */ @@ -91,8 +90,8 @@ class PPMHandler: public ThreadedInDataHandler { * * @return true when stream should continue. */ - bool inCallback_threaded(const DaqData& d) override final; - void reset(const Daq*) override final; + void inCallback(const DaqData& d); + void reset(const Daq*); }; diff --git a/src/lasp/dsp/lasp_rtaps.cpp b/src/lasp/dsp/lasp_rtaps.cpp index 9e66528..96d9802 100644 --- a/src/lasp/dsp/lasp_rtaps.cpp +++ b/src/lasp/dsp/lasp_rtaps.cpp @@ -25,7 +25,7 @@ RtAps::RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, RtAps::~RtAps() { stopThread(); } -bool RtAps::inCallback_threaded(const DaqData &data) { +void RtAps::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -35,7 +35,7 @@ bool RtAps::inCallback_threaded(const DaqData &data) { const us nchannels = fltdata.n_cols; if(nchannels != _sens.size()) { cerr << "**** Error: sensitivity size does not match! *****" << endl; - return false; + return; } fltdata.each_row() %= _sens.as_row(); @@ -63,7 +63,6 @@ bool RtAps::inCallback_threaded(const DaqData &data) { _ps.compute(fltdata); - return true; } void RtAps::reset(const Daq *daq) { diff --git a/src/lasp/dsp/lasp_rtaps.h b/src/lasp/dsp/lasp_rtaps.h index 006ce1d..b55240f 100644 --- a/src/lasp/dsp/lasp_rtaps.h +++ b/src/lasp/dsp/lasp_rtaps.h @@ -23,7 +23,7 @@ * @brief Real time spectral estimator using Welch method of spectral * estimation. */ -class RtAps : public ThreadedInDataHandler { +class RtAps : public ThreadedInDataHandler { std::unique_ptr _filterPrototype; std::vector> _freqWeightingFilters; @@ -69,8 +69,8 @@ public: * * @return true if stream should continue. */ - bool inCallback_threaded(const DaqData & d) override final; - void reset(const Daq *) override final; + void inCallback(const DaqData & d); + void reset(const Daq *); }; /** @} */ diff --git a/src/lasp/dsp/lasp_rtsignalviewer.cpp b/src/lasp/dsp/lasp_rtsignalviewer.cpp index d6d06d5..bbef3ad 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.cpp +++ b/src/lasp/dsp/lasp_rtsignalviewer.cpp @@ -27,7 +27,7 @@ RtSignalViewer::RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, startThread(); } -bool RtSignalViewer::inCallback_threaded(const DaqData &data) { +void RtSignalViewer::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -52,8 +52,6 @@ bool RtSignalViewer::inCallback_threaded(const DaqData &data) { _dat(_resolution-1, 1) = newmin; _dat(_resolution-1, 2) = newmax; } - - return true; } RtSignalViewer::~RtSignalViewer() { diff --git a/src/lasp/dsp/lasp_rtsignalviewer.h b/src/lasp/dsp/lasp_rtsignalviewer.h index 0a7676d..253a934 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.h +++ b/src/lasp/dsp/lasp_rtsignalviewer.h @@ -24,7 +24,7 @@ * @brief Real time signal viewer. Shows envelope of the signal based on amount * of history shown. */ -class RtSignalViewer : public ThreadedInDataHandler { +class RtSignalViewer : public ThreadedInDataHandler { /** * @brief Storage for sensitivity values @@ -85,8 +85,8 @@ public: */ dmat getCurrentValue() const; - bool inCallback_threaded(const DaqData &) override final; - void reset(const Daq *) override final; + void inCallback(const DaqData &); + void reset(const Daq *); }; /** @} */ diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 3240787..bc60b86 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -13,6 +13,7 @@ using lck = std::scoped_lock; using rte = std::runtime_error; using std::cerr; using std::endl; +using std::placeholders::_1; class SafeQueue { std::queue _queue; @@ -49,47 +50,50 @@ public: bool empty() const { return _contents == 0; } }; -ThreadedInDataHandler::ThreadedInDataHandler(SmgrHandle mgr) - : InDataHandler(mgr), _queue(std::make_unique()) { +ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr, + InCallbackType cb, + InResetType reset) + : _indatahandler( + mgr, + std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this, + _1), + reset), + _queue(std::make_unique()), inCallback(cb) { DEBUGTRACE_ENTER; // Initialize thread pool, if not already done getPool(); } -void ThreadedInDataHandler::startThread() { +void ThreadedInDataHandlerBase::startThread() { DEBUGTRACE_ENTER; _thread_can_safely_run = true; - start(); + _indatahandler.start(); } -bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { +void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( + const DaqData &daqdata) { DEBUGTRACE_ENTER; std::scoped_lock lck(_mtx); // Early return in case object is under DESTRUCTION if (!_thread_can_safely_run) - return true; - - if (!_lastCallbackResult) { - return false; - } + return; _queue->push(daqdata); - if (!_thread_running && _lastCallbackResult) { + if (!_thread_running) { auto &pool = getPool(); DEBUGTRACE_PRINT("Pushing new thread in pool"); _thread_running = true; - pool.push_task(&ThreadedInDataHandler::threadFcn, this); + pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this); } - - return _lastCallbackResult; } -void ThreadedInDataHandler::stopThread() { + +void ThreadedInDataHandlerBase::stopThread() { DEBUGTRACE_ENTER; // Make sure inCallback is no longer called _thread_can_safely_run = false; - stop(); + _indatahandler.stop(); std::scoped_lock lck(_mtx); @@ -99,7 +103,7 @@ void ThreadedInDataHandler::stopThread() { } } -ThreadedInDataHandler::~ThreadedInDataHandler() { +ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { DEBUGTRACE_ENTER; if (_thread_can_safely_run) { @@ -111,18 +115,14 @@ ThreadedInDataHandler::~ThreadedInDataHandler() { } } -void ThreadedInDataHandler::threadFcn() { +void ThreadedInDataHandlerBase::threadFcn() { DEBUGTRACE_ENTER; while (!_queue->empty() && _thread_can_safely_run) { // Call inCallback_threaded - if (!inCallback_threaded(_queue->pop())) { - cerr << "*********** Callback result returned false! *************" - << endl; - _lastCallbackResult = false; - } + inCallback(_queue->pop()); } _thread_running = false; } diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index 12e162b..0569205 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -1,9 +1,11 @@ #pragma once +#include "debugtrace.hpp" #include "lasp_indatahandler.h" #include #include #include +using std::placeholders::_1; const us RINGBUFFER_SIZE = 1024; /** @@ -16,23 +18,45 @@ const us RINGBUFFER_SIZE = 1024; class SafeQueue; /** - * @brief Threaded in data handler. Buffers inCallback data and calls a - * callback with the same signature on a different thread. + * @brief Threaded in data handler base. Buffers inCallback data and calls a + * callback with the same signature on a different thread. The main function of + * this is to offload the thread that handles the stream, such that expensive + * computations do not result in stream buffer xruns. */ -class ThreadedInDataHandler : protected InDataHandler { +class ThreadedInDataHandlerBase { /** * @brief The queue used to push elements to the handling thread. */ + + InDataHandler _indatahandler; std::unique_ptr _queue; + mutable std::recursive_mutex _mtx; std::atomic _thread_running{false}; - std::atomic _lastCallbackResult{true}; std::atomic _thread_can_safely_run{false}; + /** + * @brief Function pointer that is called when new DaqData arrives. + */ + const InCallbackType inCallback; + void threadFcn(); -protected: + + /** + * @brief Pushes a copy of the daqdata to the thread queue and returns. + * Adds a thread to handle the queue, whihc will call inCallback(); + * + * @param daqdata the daq info to push + * + * @return true, to continue with sampling. + */ + void _inCallbackFromInDataHandler(const DaqData &daqdata); + + public: + ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset); + ~ThreadedInDataHandlerBase(); /** * @brief This method should be called from the derived class' constructor, * to start the thread and data is incoming. @@ -46,33 +70,44 @@ protected: */ void stopThread(); -public: - /** - * @brief Initialize a ThreadedInDataHandler - * - * @param mgr StreamMgr singleton reference - */ - ThreadedInDataHandler(SmgrHandle mgr); - virtual ~ThreadedInDataHandler(); +}; - /** - * @brief Pushes a copy of the daqdata to the thread queue and returns - * - * @param daqdata the daq info to push - * - * @return true, to continue with sampling. - */ - virtual bool inCallback(const DaqData &daqdata) override final; +/** + * @brief A bit of curiously recurring template pattern, to connect the + * specific handlers and connect the proper callbacks in a type-agnostic way. + * Using this class, each threaded handler should just implement its reset() + * and inCallback() method. Ellides the virtual method calls. + * + * Usage: class XHandler: public ThreadedInDataHandler { + * public: + * XHandler(streammgr) : ThreadedInDataHandler(streammgr) {} + * void inCallback(const DaqData& d) { ... do something with d } + * void reset(const Daq* daq) { ... do something with daq } + * }; + * + * For examples, see PPMHandler, etc. + * + * @tparam Derived The + */ +template +class ThreadedInDataHandler : public ThreadedInDataHandlerBase { + public: + ThreadedInDataHandler(SmgrHandle mgr): + ThreadedInDataHandlerBase(mgr, + std::bind(&ThreadedInDataHandler::_inCallback, this, _1), + std::bind(&ThreadedInDataHandler::_reset, this, _1)) + { - /** - * @brief This function should be overridden with an actual implementation, - * of what should happen on a different thread. - * - * @param d Input daq data - * - * @return true on succes. False when an error occured. - */ - virtual bool inCallback_threaded(const DaqData &d) = 0; + } + + void _reset(const Daq* daq) { + DEBUGTRACE_ENTER; + return static_cast(this)->reset(daq); + } + void _inCallback(const DaqData& data) { + DEBUGTRACE_ENTER; + return static_cast(this)->inCallback(data); + } }; /** @} */ diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index b13f0fd..cf4406b 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -96,17 +96,26 @@ py::array_t dmat_to_ndarray(const DaqData &d) { } /** - * @brief Wraps the InDataHandler such that it calls a Python callback with a - * buffer of sample data. The Python callback is called from a different - * thread, using a Numpy array as argument. + * @brief Wraps the ThreadedInDataHandler such that it calls a Python callback with a + * buffer of sample data. Converts DaqData objects to Numpy arrays and calls + * Python given as argument to the constructor */ -class PyIndataHandler : public ThreadedInDataHandler { +class PyIndataHandler : public ThreadedInDataHandler { /** * @brief The callback functions that is called. */ py::function cb, reset_callback; public: + /** + * @brief Initialize PyIndataHandler + * + * @param mgr StreamMgr handle + * @param cb Python callback that is called with Numpy input data from device + * @param reset_callback Python callback that is called with a Daq pointer. + * Careful: do not store this handle, as it is only valid as long as reset() + * is called, when a stream stops, this pointer / handle will dangle. + */ PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) : ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) { @@ -119,7 +128,12 @@ public: DEBUGTRACE_ENTER; stopThread(); } - void reset(const Daq *daq) override final { + /** + * @brief Calls the reset callback in Python. + * + * @param daq Daq device, or nullptr in case no input stream is running. + */ + void reset(const Daq *daq) { DEBUGTRACE_ENTER; py::gil_scoped_acquire acquire; try { @@ -140,9 +154,10 @@ public: } /** - * @brief Reads from the buffer + * @brief Calls the Python callback method / function with a Numpy array of + * stream data. */ - bool inCallback_threaded(const DaqData &d) override final { + void inCallback(const DaqData &d) { /* DEBUGTRACE_ENTER; */ @@ -172,18 +187,15 @@ public: } // End of switch bool res = bool_val.cast(); - if (!res) - return false; } catch (py::error_already_set &e) { cerr << "ERROR: Python raised exception from callback function: "; cerr << e.what() << endl; - return false; + abort(); } catch (py::cast_error &e) { cerr << e.what() << endl; cerr << "ERROR: Python callback does not return boolean value." << endl; - return false; + abort(); } - return true; } };