diff --git a/src/lasp/device/CMakeLists.txt b/src/lasp/device/CMakeLists.txt index e576d23..0792341 100644 --- a/src/lasp/device/CMakeLists.txt +++ b/src/lasp/device/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(lasp_device_lib OBJECT lasp_deviceinfo.cpp lasp_rtaudiodaq.cpp lasp_streammgr.cpp + lasp_indatahandler.cpp lasp_uldaq.cpp lasp_uldaq_impl.cpp ) diff --git a/src/lasp/device/lasp_daq.h b/src/lasp/device/lasp_daq.h index dbc7392..3fa35ff 100644 --- a/src/lasp/device/lasp_daq.h +++ b/src/lasp/device/lasp_daq.h @@ -5,6 +5,8 @@ #include "lasp_types.h" #include #include +#include +#include /** * \defgroup device Device interfacing diff --git a/src/lasp/device/lasp_deviceinfo.h b/src/lasp/device/lasp_deviceinfo.h index 81772ed..7d3d8ad 100644 --- a/src/lasp/device/lasp_deviceinfo.h +++ b/src/lasp/device/lasp_deviceinfo.h @@ -19,6 +19,7 @@ public: * @brief Virtual desctructor. Can be derived class. */ virtual ~DeviceInfo() {} + DeviceInfo& operator=(const DeviceInfo&) = delete; /** * @brief Clone a device info. diff --git a/src/lasp/device/lasp_indatahandler.cpp b/src/lasp/device/lasp_indatahandler.cpp new file mode 100644 index 0000000..6afce6c --- /dev/null +++ b/src/lasp/device/lasp_indatahandler.cpp @@ -0,0 +1,39 @@ +/* #define DEBUGTRACE_ENABLED */ +#include +#include "debugtrace.hpp" +#include "lasp_indatahandler.h" +#include "lasp_streammgr.h" + +InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) +#if LASP_DEBUG==1 + , _main_thread_id(std::this_thread::get_id()) +#endif +{ DEBUGTRACE_ENTER; } +void InDataHandler::start() { + DEBUGTRACE_ENTER; + _mgr.addInDataHandler(*this); + +#if LASP_DEBUG == 1 + assert(_mgr._main_thread_id == _main_thread_id); +#endif +} +void InDataHandler::stop() { +#if LASP_DEBUG == 1 + stopCalled = true; +#endif + _mgr.removeInDataHandler(*this); +} + +InDataHandler::~InDataHandler() { + + DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + if (!stopCalled) { + std::cerr << "************ BUG: Stop function not called while arriving at " + "InDataHandler's destructor. Fix this by calling " + "InDataHandler::stop() from the derived class' destructor." + << std::endl; + abort(); + } +#endif +} diff --git a/src/lasp/device/lasp_indatahandler.h b/src/lasp/device/lasp_indatahandler.h new file mode 100644 index 0000000..5008b68 --- /dev/null +++ b/src/lasp/device/lasp_indatahandler.h @@ -0,0 +1,76 @@ +#pragma once +#include +#include +#include "lasp_types.h" + +/** \addtogroup device + * @{ + */ +class StreamMgr; +class DaqData; +class Daq; +class InDataHandler { + +protected: + StreamMgr &_mgr; +#if LASP_DEBUG == 1 + // This is a flag to indicate whether the method stop() is called for the + // current handler. It should call the method stop() from the derived class's + // destructor. + std::atomic stopCalled{false}; +#endif + +public: + virtual ~InDataHandler(); + + /** + * @brief When constructed, the handler is added to the stream manager, which + * will call the handlers's inCallback() until stop() is called. + * + * @param mgr Stream manager. + */ + InDataHandler(StreamMgr &mgr); + + /** + * @brief This function is called when input data from a DAQ is available. + * + * @param daqdata Input data from DAQ + * + * @return true if no error. False to stop the stream from running. + */ + virtual bool inCallback(const DaqData &daqdata) = 0; + + /** + * @brief Reset in-data handler. + * + * @param daq New DAQ configuration of inCallback(). If nullptr is given, + * it means that the stream is stopped. + */ + virtual void reset(const Daq *daq = nullptr) = 0; + + /** + * @brief This function should be called from the constructor of the + * implementation of InDataHandler, that one than also implements + * `inCallback`. It will start the stream's calling of inCallback(). + */ + void start(); + + /** + * @brief This function should be called from the destructor of derived + * classes, to disable the calls to inCallback(), such that proper + * destruction of the object is allowed and no other threads call methods + * from the object. It removes the inCallback() from the callback list of the + * StreamMgr(). **Failing to call this function results in deadlocks, errors + * like "pure virtual function called", or other**. + */ + void stop(); + +private: +#if LASP_DEBUG == 1 + const std::thread::id _main_thread_id; + void checkRightThread() const; +#else + void checkRightThread() const {} +#endif +}; +/** @} */ diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index 42adf4c..19af4bc 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -1,8 +1,9 @@ /* #define DEBUGTRACE_ENABLED */ -#include "lasp_streammgr.h" #include "debugtrace.hpp" +#include "lasp_streammgr.h" #include "lasp_biquadbank.h" #include "lasp_thread.h" +#include "lasp_indatahandler.h" #include #include #include @@ -12,30 +13,6 @@ using std::cerr; using std::endl; using rte = std::runtime_error; -InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) { DEBUGTRACE_ENTER; } -void InDataHandler::start() { - DEBUGTRACE_ENTER; - _mgr.addInDataHandler(*this); -} -void InDataHandler::stop() { -#if LASP_DEBUG == 1 - stopCalled = true; -#endif - _mgr.removeInDataHandler(*this); -} -InDataHandler::~InDataHandler() { - - DEBUGTRACE_ENTER; -#if LASP_DEBUG == 1 - if (!stopCalled) { - cerr << "************ BUG: Stop function not called while arriving at " - "InDataHandler's destructor. Fix this by calling " - "InDataHandler::stop() from the derived class' destructor." - << endl; - abort(); - } -#endif -} StreamMgr &StreamMgr::getInstance() { @@ -43,6 +20,7 @@ StreamMgr &StreamMgr::getInstance() { static StreamMgr mgr; return mgr; } + StreamMgr::StreamMgr() { DEBUGTRACE_ENTER; #if LASP_DEBUG == 1 @@ -63,16 +41,16 @@ void StreamMgr::rescanDAQDevices(bool background, auto &pool = getPool(); checkRightThread(); + if (_inputStream || _outputStream) { + throw rte("Rescanning DAQ devices only possible when no stream is running"); + } if (!_devices_mtx.try_lock()) { throw rte("A background DAQ device scan is probably already running"); } _devices_mtx.unlock(); - if (_inputStream || _outputStream) { - throw rte("Rescanning DAQ devices only possible when no stream is running"); - } + std::scoped_lock lck(_devices_mtx); _devices.clear(); - /* auto &pool = getPool(); */ if (!background) { rescanDAQDevices_impl(callback); } else { diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index fc69222..665ea3a 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -10,60 +10,8 @@ * @{ */ class StreamMgr; +class InDataHandler; -class InDataHandler { - -protected: - StreamMgr &_mgr; -#if LASP_DEBUG == 1 - std::atomic stopCalled{false}; -#endif - -public: - virtual ~InDataHandler(); - - /** - * @brief When constructed, the handler is added to the stream manager, which - * will call the handlers's inCallback() until stop() is called. - * - * @param mgr Stream manager. - */ - InDataHandler(StreamMgr &mgr); - - /** - * @brief This function is called when input data from a DAQ is available. - * - * @param daqdata Input data from DAQ - * - * @return true if no error. False to stop the stream from running. - */ - virtual bool inCallback(const DaqData &daqdata) = 0; - - /** - * @brief Reset in-data handler. - * - * @param daq New DAQ configuration of inCallback(). If nullptr is given, - * it means that the stream is stopped. - */ - virtual void reset(const Daq *daq = nullptr) = 0; - - /** - * @brief This function should be called from the constructor of the - * implementation of InDataHandler. It will start the stream's calling of - * inCallback(). - */ - void start(); - - /** - * @brief This function should be called from the destructor of derived - * classes, to disable the calls to inCallback(), such that proper - * destruction of the object is allowed and no other threads call methods - * from the object. It removes the inCallback() from the callback list of the - * StreamMgr(). **Failing to call this function results in deadlocks, errors - * like "pure virtual function called", or other**. - */ - void stop(); -}; class SeriesBiquad; @@ -76,9 +24,6 @@ class SeriesBiquad; * fact is asserted. */ class StreamMgr { -#if LASP_DEBUG == 1 - std::thread::id _main_thread_id; -#endif /** * @brief Storage for streams. @@ -91,22 +36,25 @@ class StreamMgr { * thread-safety. */ std::list _inDataHandlers; - std::mutex _inDataHandler_mtx; + mutable std::mutex _inDataHandler_mtx; /** * @brief Signal generator in use to generate output data. Currently * implemented as to generate the same data for all output channels. */ std::shared_ptr _siggen; + std::mutex _siggen_mtx; /** * @brief Filters on input stream. For example, a digital high pass filter. */ std::vector> _inputFilters; - std::mutex _siggen_mtx; - std::mutex _devices_mtx; + mutable std::recursive_mutex _devices_mtx; + /** + * @brief Current storage for the device list + */ DeviceInfoList _devices; StreamMgr(); @@ -146,7 +94,7 @@ class StreamMgr { * @return A copy of the internal stored list of devices */ DeviceInfoList getDeviceInfo() const { - std::scoped_lock lck(const_cast(_devices_mtx)); + std::scoped_lock lck(_devices_mtx); DeviceInfoList d2; for(const auto& dev: _devices) { d2.push_back(dev->clone()); @@ -157,7 +105,8 @@ class StreamMgr { /** * @brief Triggers a background scan of the DAQ devices, which updates the * internally stored list of devices. Throws a runtime error when a - * background thread is already scanning for devices. + * background thread is already scanning for devices, or if a stream is + * running. * * @param background Perform searching for DAQ devices in the background. If * set to true, the function returns immediately. @@ -261,6 +210,7 @@ private: void rescanDAQDevices_impl(std::function callback); #if LASP_DEBUG == 1 + std::thread::id _main_thread_id; void checkRightThread() const; #else void checkRightThread() const {} diff --git a/src/lasp/dsp/lasp_clip.cpp b/src/lasp/dsp/lasp_clip.cpp index a3b8c8f..fcf14e3 100644 --- a/src/lasp/dsp/lasp_clip.cpp +++ b/src/lasp/dsp/lasp_clip.cpp @@ -1,6 +1,8 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" #include "lasp_clip.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include using std::cerr; @@ -13,6 +15,7 @@ ClipHandler::ClipHandler(StreamMgr &mgr) : ThreadedInDataHandler(mgr){ DEBUGTRACE_ENTER; + startThread(); } bool ClipHandler::inCallback_threaded(const DaqData &d) { @@ -89,6 +92,5 @@ void ClipHandler::reset(const Daq *daq) { ClipHandler::~ClipHandler() { DEBUGTRACE_ENTER; - Lck lck(_mtx); - stop(); + stopThread(); } diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 4e9c640..0afb46b 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -1,6 +1,8 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" #include "lasp_ppm.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include using std::cerr; @@ -13,6 +15,7 @@ PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps) : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { DEBUGTRACE_ENTER; + startThread(); } bool PPMHandler::inCallback_threaded(const DaqData &d) { @@ -106,6 +109,5 @@ void PPMHandler::reset(const Daq *daq) { PPMHandler::~PPMHandler() { DEBUGTRACE_ENTER; - Lck lck(_mtx); - stop(); + stopThread(); } diff --git a/src/lasp/dsp/lasp_rtaps.cpp b/src/lasp/dsp/lasp_rtaps.cpp index 1e7d2cb..f39e03e 100644 --- a/src/lasp/dsp/lasp_rtaps.cpp +++ b/src/lasp/dsp/lasp_rtaps.cpp @@ -1,5 +1,7 @@ /* #define DEBUGTRACE_ENABLED */ #include "lasp_rtaps.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include "debugtrace.hpp" #include @@ -18,10 +20,10 @@ RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, _filterPrototype = freqWeightingFilter->clone(); } + startThread(); } RtAps::~RtAps() { - Lck lck(_ps_mtx); - stop(); + stopThread(); } bool RtAps::inCallback_threaded(const DaqData &data) { diff --git a/src/lasp/dsp/lasp_rtsignalviewer.cpp b/src/lasp/dsp/lasp_rtsignalviewer.cpp index fa16ca6..69af040 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.cpp +++ b/src/lasp/dsp/lasp_rtsignalviewer.cpp @@ -1,5 +1,7 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include "lasp_rtsignalviewer.h" #include #include @@ -22,6 +24,7 @@ RtSignalViewer::RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, if (resolution <= 1) { throw rte("Invalid resolution. Should be > 1"); } + startThread(); } bool RtSignalViewer::inCallback_threaded(const DaqData &data) { @@ -54,8 +57,7 @@ bool RtSignalViewer::inCallback_threaded(const DaqData &data) { } RtSignalViewer::~RtSignalViewer() { - Lck lck(_sv_mtx); - stop(); + stopThread(); } void RtSignalViewer::reset(const Daq *daq) { diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 9223720..84ee117 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -1,11 +1,12 @@ /* #define DEBUGTRACE_ENABLED */ #include "lasp_threadedindatahandler.h" #include "debugtrace.hpp" +#include "lasp_daqdata.h" #include "lasp_thread.h" #include -#include -#include #include +#include +#include using namespace std::literals::chrono_literals; using lck = std::scoped_lock; @@ -16,9 +17,10 @@ using std::endl; class SafeQueue { std::queue _queue; std::mutex _mtx; - std::atomic_int32_t _contents {0}; - public: - void push(const DaqData& d) { + std::atomic_int32_t _contents{0}; + +public: + void push(const DaqData &d) { DEBUGTRACE_ENTER; lck lock(_mtx); _queue.push(d); @@ -48,24 +50,32 @@ class SafeQueue { }; ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) - : InDataHandler(mgr), _queue(std::make_unique()) { + : InDataHandler(mgr), _queue(std::make_unique()) { - DEBUGTRACE_ENTER; + DEBUGTRACE_ENTER; - // Initialize thread pool, if not already done - getPool(); - } + // Initialize thread pool, if not already done + getPool(); +} +void ThreadedInDataHandler::startThread() { + _thread_can_safely_run = true; + start(); +} bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { DEBUGTRACE_ENTER; + std::scoped_lock lck(_mtx); + + // Early return in case object is under DESTRUCTION + if (!_thread_can_safely_run) + return true; if (!_lastCallbackResult) { return false; } _queue->push(daqdata); - - if (!_thread_running && (!_stopThread) && _lastCallbackResult) { + if (!_thread_running && _lastCallbackResult) { auto &pool = getPool(); DEBUGTRACE_PRINT("Pushing new thread in pool"); _thread_running = true; @@ -74,11 +84,13 @@ bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { return _lastCallbackResult; } +void ThreadedInDataHandler::stopThread() { -ThreadedInDataHandler::~ThreadedInDataHandler() { + // Make sure inCallback is no longer called + _thread_can_safely_run = false; + stop(); - DEBUGTRACE_ENTER; - _stopThread = true; + std::scoped_lock lck(_mtx); // Then wait in steps for the thread to stop running. while (_thread_running) { @@ -86,16 +98,28 @@ ThreadedInDataHandler::~ThreadedInDataHandler() { } } +ThreadedInDataHandler::~ThreadedInDataHandler() { + + DEBUGTRACE_ENTER; + if (_thread_can_safely_run) { + stopThread(); + cerr << "*** BUG: InDataHandlers have not been all stopped, while " + "StreamMgr destructor is called. This is a misuse BUG." + << endl; + abort(); + } +} + void ThreadedInDataHandler::threadFcn() { DEBUGTRACE_ENTER; - while(!_queue->empty() && !_stopThread) { + while (!_queue->empty() && _thread_can_safely_run) { // Call inCallback_threaded if (!inCallback_threaded(_queue->pop())) { cerr << "*********** Callback result returned false! *************" - << endl; + << endl; _lastCallbackResult = false; } } diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index 20f5df2..c447ce0 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -1,9 +1,11 @@ #pragma once -#include "lasp_streammgr.h" +#include "lasp_indatahandler.h" +#include +#include +#include const us RINGBUFFER_SIZE = 1024; - /** * \addtogroup dsp * @{ @@ -17,25 +19,40 @@ class SafeQueue; * @brief Threaded in data handler. Buffers inCallback data and calls a * callback with the same signature on a different thread. */ -class ThreadedInDataHandler: public InDataHandler { +class ThreadedInDataHandler : protected InDataHandler { /** * @brief The queue used to push elements to the handling thread. */ std::unique_ptr _queue; + mutable std::recursive_mutex _mtx; std::atomic _thread_running{false}; - std::atomic _stopThread{false}; std::atomic _lastCallbackResult{true}; + std::atomic _thread_can_safely_run{false}; void threadFcn(); - public: +protected: + /** + * @brief This method should be called from the derived class' constructor, + * to start the thread and data is incoming. + */ + void startThread(); + /** + * @brief This method SHOULD be called from all classes that derive on + * ThreadedInDataHandler. It is to make sure the inCallback_threaded() + * function is no longer called when the destructor of the derived class is + * called. Not calling this function is regarded as a BUG. + */ + void stopThread(); + +public: /** * @brief Initialize a ThreadedInDataHandler * * @param mgr StreamMgr singleton reference */ - ThreadedInDataHandler(StreamMgr& mgr); + ThreadedInDataHandler(StreamMgr &mgr); ~ThreadedInDataHandler(); /** @@ -55,10 +72,8 @@ class ThreadedInDataHandler: public InDataHandler { * * @return true on succes. False when an error occured. */ - virtual bool inCallback_threaded(const DaqData& d) = 0; - + virtual bool inCallback_threaded(const DaqData &d) = 0; }; - /** @} */ /** @} */ diff --git a/src/lasp/lasp_record.py b/src/lasp/lasp_record.py index 243a687..307f2cb 100644 --- a/src/lasp/lasp_record.py +++ b/src/lasp/lasp_record.py @@ -19,7 +19,9 @@ class RecordStatus: class Recording: """ - Class used to perform a recording. + Class used to perform a recording. Recording data can come in from a + different thread, that is supposed to call the `inCallback` method, with + audio data as an argument. """ def __init__( @@ -99,7 +101,6 @@ class Recording: logging.debug("Starting record....") self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback) - self.indh.start() if wait: logging.debug("Stop recording with CTRL-C") diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index d8639b3..be6d33f 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -5,8 +5,10 @@ #include "lasp_clip.h" #include "lasp_rtaps.h" #include "lasp_rtsignalviewer.h" -#include "lasp_streammgr.h" #include "lasp_threadedindatahandler.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" +#include "lasp_streammgr.h" #include #include #include @@ -111,11 +113,11 @@ public: DEBUGTRACE_ENTER; /// Start should be called externally, as at constructor time no virtual /// functions should be called. - /* start(); */ + startThread(); } ~PyIndataHandler() { DEBUGTRACE_ENTER; - stop(); + stopThread(); } void reset(const Daq *daq) override final { reset_callback(daq); } @@ -169,20 +171,13 @@ public: void init_datahandler(py::module &m) { - py::class_ idh(m, "InDataHandler_base"); - idh.def("start", &InDataHandler::start); - idh.def("stop", &InDataHandler::stop); - - py::class_ tidh( - m, "ThreadedInDataHandler"); - /// The C++ class is PyIndataHandler, but for Python, it is called /// InDataHandler - py::class_ pyidh(m, "InDataHandler"); + py::class_ pyidh(m, "InDataHandler"); pyidh.def(py::init()); /// Peak Programme Meter - py::class_ ppm(m, "PPMHandler"); + py::class_ ppm(m, "PPMHandler"); ppm.def(py::init()); ppm.def(py::init()); @@ -194,7 +189,7 @@ void init_datahandler(py::module &m) { }); /// Clip Detector - py::class_ clip(m, "ClipHandler"); + py::class_ clip(m, "ClipHandler"); clip.def(py::init()); clip.def("getCurrentValue", [](const ClipHandler &clip) { @@ -206,7 +201,7 @@ void init_datahandler(py::module &m) { /// Real time Aps /// - py::class_ rtaps(m, "RtAps"); + py::class_ rtaps(m, "RtAps"); rtaps.def(py::init rtsv(m, "RtSignalViewer"); + py::class_ rtsv(m, "RtSignalViewer"); rtsv.def(py::init #include #include