diff --git a/cpp_src/device/lasp_daq.cpp b/cpp_src/device/lasp_daq.cpp index bb161f3..8b65379 100644 --- a/cpp_src/device/lasp_daq.cpp +++ b/cpp_src/device/lasp_daq.cpp @@ -1,4 +1,4 @@ -/* #define DEBUGTRACE_ENABLED */ +// #define DEBUGTRACE_ENABLED #include "debugtrace.hpp" #include "lasp_daqconfig.h" diff --git a/cpp_src/device/lasp_indatahandler.cpp b/cpp_src/device/lasp_indatahandler.cpp index a6a0946..e184293 100644 --- a/cpp_src/device/lasp_indatahandler.cpp +++ b/cpp_src/device/lasp_indatahandler.cpp @@ -1,4 +1,4 @@ -/* #define DEBUGTRACE_ENABLED */ +// #define DEBUGTRACE_ENABLED #include "lasp_indatahandler.h" #include "debugtrace.hpp" #include "lasp_streammgr.h" @@ -29,12 +29,14 @@ void InDataHandler::start() { } void InDataHandler::stop() { DEBUGTRACE_ENTER; - checkRightThread(); + // checkRightThread(); #if LASP_DEBUG == 1 stopCalled = true; #endif - if (SmgrHandle handle = _mgr.lock()) { - handle->removeInDataHandler(*this); + if (SmgrHandle smgr = _mgr.lock()) { + smgr->removeInDataHandler(*this); + } else { + DEBUGTRACE_PRINT("No stream manager alive anymore!"); } } @@ -42,7 +44,7 @@ InDataHandler::~InDataHandler() { DEBUGTRACE_ENTER; #if LASP_DEBUG == 1 - checkRightThread(); + // checkRightThread(); if (!stopCalled) { std::cerr << "************ BUG: Stop function not called while arriving at " "InDataHandler's destructor. Fix this by calling " diff --git a/cpp_src/device/lasp_streammgr.cpp b/cpp_src/device/lasp_streammgr.cpp index af5f56b..b92f21b 100644 --- a/cpp_src/device/lasp_streammgr.cpp +++ b/cpp_src/device/lasp_streammgr.cpp @@ -1,7 +1,8 @@ -/* #define DEBUGTRACE_ENABLED */ +// #define DEBUGTRACE_ENABLED #include "lasp_streammgr.h" #include +#include #include #include @@ -14,6 +15,8 @@ #include "lasp_indatahandler.h" #include "lasp_thread.h" +using namespace std::literals::chrono_literals; + using std::cerr; using std::endl; using rte = std::runtime_error; @@ -27,7 +30,7 @@ using rte = std::runtime_error; std::weak_ptr _mgr; std::mutex _mgr_mutex; -using Lck = std::scoped_lock; +using Lck = std::scoped_lock; /** * @brief The only way to obtain a stream manager, can only be called from the @@ -38,11 +41,11 @@ using Lck = std::scoped_lock; SmgrHandle StreamMgr::getInstance() { DEBUGTRACE_ENTER; + std::scoped_lock lck(_mgr_mutex); auto mgr = _mgr.lock(); if (!mgr) { // Double Check Locking Pattern, if two threads would simultaneously // instantiate the singleton instance. - Lck lck(_mgr_mutex); auto mgr = _mgr.lock(); if (mgr) { @@ -72,7 +75,7 @@ StreamMgr::StreamMgr() { DEBUGTRACE_ENTER; // Trigger a scan for the available devices, in the background. - rescanDAQDevices(true); + rescanDAQDevices(false); } #if LASP_DEBUG == 1 void StreamMgr::checkRightThread() const { @@ -84,37 +87,39 @@ void StreamMgr::rescanDAQDevices(bool background, std::function callback) { DEBUGTRACE_ENTER; DEBUGTRACE_PRINT(background); + if(_scanningDevices) { + throw rte("A background device scan is already busy"); + } + Lck lck(_mtx); checkRightThread(); if (_inputStream || _outputStream) { throw rte("Rescanning DAQ devices only possible when no stream is running"); } - if (!_devices_mtx.try_lock()) { - throw rte("A background DAQ device scan is probably already running"); - } - _devices_mtx.unlock(); - std::scoped_lock lck(_devices_mtx); _devices.clear(); if (!background) { + _scanningDevices = true; rescanDAQDevices_impl(callback); } else { DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread..."); + _scanningDevices = true; _pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); } } void StreamMgr::rescanDAQDevices_impl(std::function callback) { DEBUGTRACE_ENTER; - std::scoped_lock lck(_devices_mtx); + Lck lck(_mtx); _devices = DeviceInfo::getDeviceInfo(); if (callback) { callback(); } + _scanningDevices = false; } void StreamMgr::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; - std::scoped_lock lck(_inDataHandler_mtx); + Lck lck(_mtx); assert(_inputFilters.size() == data.nchannels); @@ -139,12 +144,14 @@ void StreamMgr::inCallback(const DaqData &data) { } } + DEBUGTRACE_PRINT("Calling incallback for handlers (filtered)..."); for (auto &handler : _inDataHandlers) { handler->inCallback(input_filtered); } } else { /// No input filters + DEBUGTRACE_PRINT("Calling incallback for handlers..."); for (auto &handler : _inDataHandlers) { handler->inCallback(data); } @@ -155,8 +162,7 @@ void StreamMgr::setSiggen(std::shared_ptr siggen) { DEBUGTRACE_ENTER; checkRightThread(); - std::scoped_lock lck(_siggen_mtx); - + Lck lck(_mtx); // If not set to nullptr, and a stream is running, we update the signal // generator by resetting it. if (isStreamRunningOK(StreamType::output) && siggen) { @@ -213,9 +219,9 @@ bool fillData(DaqData &data, const vd &signal) { return true; } void StreamMgr::outCallback(DaqData &data) { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; - std::scoped_lock lck(_siggen_mtx); + Lck lck(_mtx); if (_siggen) { vd signal = _siggen->genSignal(data.nframes); @@ -244,7 +250,17 @@ void StreamMgr::outCallback(DaqData &data) { StreamMgr::~StreamMgr() { DEBUGTRACE_ENTER; - checkRightThread(); + while (_scanningDevices) { + std::this_thread::sleep_for(10us); + } + +#if LASP_DEBUG == 1 + { // Careful, this lock needs to be released to make sure the streams can + // obtain a lock to the stream manager. + Lck lck(_mtx); + checkRightThread(); + } +#endif // Stream manager now handled by shared pointer. Each indata handler gets a // shared pointer to the stream manager, and stores a weak pointer to it. // Hence, we do not have to do any cleanup here. It also makes sure that the @@ -260,6 +276,8 @@ StreamMgr::~StreamMgr() { } void StreamMgr::stopAllStreams() { DEBUGTRACE_ENTER; + // No lock here! + // Lck lck(_mtx); checkRightThread(); _inputStream.reset(); _outputStream.reset(); @@ -267,6 +285,10 @@ void StreamMgr::stopAllStreams() { void StreamMgr::startStream(const DaqConfiguration &config) { DEBUGTRACE_ENTER; + if(_scanningDevices) { + throw rte("DAQ device scan is busy. Cannot start stream."); + } + Lck lck(_mtx); checkRightThread(); bool isInput = std::count_if(config.inchannel_config.cbegin(), @@ -278,8 +300,6 @@ void StreamMgr::startStream(const DaqConfiguration &config) { [](auto &i) { return i.enabled; }) > 0; // Find the first device that matches with the configuration - std::scoped_lock lck(_devices_mtx); - DeviceInfo *devinfo = nullptr; // Match configuration to a device in the list of devices @@ -411,7 +431,9 @@ void StreamMgr::stopStream(const StreamType t) { } /// Kills input stream _inputStream.reset(); + /// Send reset to all in data handlers + Lck lck(_mtx); for (auto &handler : _inDataHandlers) { handler->reset(nullptr); } @@ -425,6 +447,7 @@ void StreamMgr::stopStream(const StreamType t) { if (!_outputStream) { throw rte("Output stream is not running"); } + Lck lck(_mtx); _outputStream.reset(); } // end else } @@ -432,9 +455,9 @@ void StreamMgr::stopStream(const StreamType t) { void StreamMgr::addInDataHandler(InDataHandler *handler) { DEBUGTRACE_ENTER; + Lck lck(_mtx); checkRightThread(); assert(handler); - std::scoped_lock lck(_inDataHandler_mtx); handler->reset(_inputStream.get()); if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) != @@ -449,16 +472,17 @@ void StreamMgr::addInDataHandler(InDataHandler *handler) { void StreamMgr::removeInDataHandler(InDataHandler &handler) { DEBUGTRACE_ENTER; - checkRightThread(); - std::scoped_lock lck(_inDataHandler_mtx); + Lck lck(_mtx); + // checkRightThread(); _inDataHandlers.remove(&handler); DEBUGTRACE_PRINT(_inDataHandlers.size()); } Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; + Lck lck(_mtx); checkRightThread(); // Default constructor, says stream is not running, but also no errors @@ -471,6 +495,7 @@ Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const { } const Daq *StreamMgr::getDaq(StreamType type) const { + Lck lck(_mtx); checkRightThread(); if (type == StreamType::input) { diff --git a/cpp_src/device/lasp_streammgr.h b/cpp_src/device/lasp_streammgr.h index d5e1e4c..9aff9fb 100644 --- a/cpp_src/device/lasp_streammgr.h +++ b/cpp_src/device/lasp_streammgr.h @@ -1,19 +1,19 @@ #pragma once -#include "lasp_daq.h" -#include "lasp_siggen.h" -#include "lasp_thread.h" #include #include #include #include +#include "lasp_daq.h" +#include "lasp_siggen.h" +#include "lasp_thread.h" + /** \addtogroup device * @{ */ class StreamMgr; class InDataHandler; - class SeriesBiquad; /** @@ -25,12 +25,15 @@ class SeriesBiquad; * fact is asserted. */ class StreamMgr { + mutable std::recursive_mutex _mtx; /** * @brief Storage for streams. */ std::unique_ptr _inputStream, _outputStream; + std::atomic _scanningDevices{false}; + GlobalThreadPool _pool; /** @@ -39,22 +42,18 @@ class StreamMgr { * thread-safety. */ std::list _inDataHandlers; - mutable std::mutex _inDataHandler_mtx; /** * @brief Signal generator in use to generate output data. Currently * implemented as to generate the same data for all output channels. */ std::shared_ptr _siggen; - std::mutex _siggen_mtx; /** * @brief Filters on input stream. For example, a digital high pass filter. */ std::vector> _inputFilters; - - mutable std::recursive_mutex _devices_mtx; /** * @brief Current storage for the device list */ @@ -67,9 +66,7 @@ class StreamMgr { friend class InDataHandler; friend class Siggen; - - public: - + public: ~StreamMgr(); enum class StreamType : us { @@ -100,9 +97,10 @@ class StreamMgr { * @return A copy of the internal stored list of devices */ DeviceInfoList getDeviceInfo() const { - std::scoped_lock lck(_devices_mtx); + std::scoped_lock lck(_mtx); DeviceInfoList d2; - for(const auto& dev: _devices) { + for (const auto &dev : _devices) { + assert(dev != nullptr); d2.push_back(dev->clone()); } return d2; @@ -118,9 +116,9 @@ class StreamMgr { * set to true, the function returns immediately. * @param callback Function to call when complete. */ - void - rescanDAQDevices(bool background = false, - std::function callback = std::function()); + void rescanDAQDevices( + bool background = false, + std::function callback = std::function()); /** * @brief Start a stream based on given configuration. @@ -141,12 +139,12 @@ class StreamMgr { } bool isStreamRunning(const StreamType type) const { switch (type) { - case (StreamType::input): - return bool(_inputStream); - break; - case (StreamType::output): - return bool(_outputStream); - break; + case (StreamType::input): + return bool(_inputStream); + break; + case (StreamType::output): + return bool(_outputStream); + break; } return false; } @@ -193,11 +191,10 @@ class StreamMgr { */ void setSiggen(std::shared_ptr s); -private: + private: void inCallback(const DaqData &data); void outCallback(DaqData &data); - /** * @brief Add an input data handler. The handler's inCallback() function is * called with data when available. This function should *NOT* be called by diff --git a/cpp_src/device/portaudio/lasp_portaudiodaq.cpp b/cpp_src/device/portaudio/lasp_portaudiodaq.cpp index 853bb0f..a724b91 100644 --- a/cpp_src/device/portaudio/lasp_portaudiodaq.cpp +++ b/cpp_src/device/portaudio/lasp_portaudiodaq.cpp @@ -1,4 +1,4 @@ -/* #define DEBUGTRACE_ENABLED */ +// #define DEBUGTRACE_ENABLED #include "debugtrace.hpp" #include "lasp_config.h" @@ -422,6 +422,7 @@ Daq::StreamStatus PortAudioDaq::getStreamStatus() const { } PortAudioDaq::~PortAudioDaq() { + DEBUGTRACE_ENTER; PaError err; assert(_stream); if (Pa_IsStreamActive(_stream)) { @@ -445,7 +446,7 @@ int PortAudioDaq::memberPaCallback(const void *inputBuffer, void *outputBuffer, unsigned long framesPerBuffer, const PaStreamCallbackTimeInfo *timeInfo, PaStreamCallbackFlags statusFlags) { - // DEBUGTRACE_ENTER; + DEBUGTRACE_ENTER; typedef Daq::StreamStatus::StreamError se; if (statusFlags & paPrimingOutput) { // Initial output buffers generated. So nothing with input yet diff --git a/cpp_src/dsp/lasp_threadedindatahandler.cpp b/cpp_src/dsp/lasp_threadedindatahandler.cpp index bf47d4b..4614e96 100644 --- a/cpp_src/dsp/lasp_threadedindatahandler.cpp +++ b/cpp_src/dsp/lasp_threadedindatahandler.cpp @@ -1,4 +1,4 @@ -/* #define DEBUGTRACE_ENABLED */ +// #define DEBUGTRACE_ENABLED #include "lasp_threadedindatahandler.h" #include @@ -78,7 +78,7 @@ void ThreadedInDataHandlerBase::startThread() { _1), resetCallback); - _thread_can_safely_run = true; + _thread_allowed_to_run = true; _indatahandler->start(); } @@ -87,7 +87,7 @@ void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( DEBUGTRACE_ENTER; // Early return in case object is under DESTRUCTION - if (!_thread_can_safely_run) return; + if (!_thread_allowed_to_run) return; _queue->push(daqdata); if (!_thread_running) { @@ -103,23 +103,26 @@ void ThreadedInDataHandlerBase::stopThread() { throw rte("BUG: ThreadedIndataHandler not running"); } + // Stop the existing thread + _thread_allowed_to_run = false; + // Make sure no new data arrives _indatahandler->stop(); + _indatahandler.reset(); - // Stop the existing thread - _thread_can_safely_run = false; - + DEBUGTRACE_PRINT("Indatahandler stopped. Waiting for thread to finish..."); // Then wait in steps for the thread to stop running. while (_thread_running) { std::this_thread::sleep_for(10us); } + DEBUGTRACE_PRINT("Thread stopped"); // Kill the handler - _indatahandler.reset(); + DEBUGTRACE_PRINT("Handler resetted"); } ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { DEBUGTRACE_ENTER; - if (_thread_can_safely_run) { + if (_thread_allowed_to_run) { stopThread(); cerr << "*** BUG: InDataHandlers have not been all stopped, while " "StreamMgr destructor is called. This is a misuse BUG." @@ -131,7 +134,7 @@ ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { void ThreadedInDataHandlerBase::threadFcn() { DEBUGTRACE_ENTER; - while (!_queue->empty() && _thread_can_safely_run) { + while (!_queue->empty() && _thread_allowed_to_run) { // Call inCallback_threaded inCallback(_queue->pop()); } diff --git a/cpp_src/dsp/lasp_threadedindatahandler.h b/cpp_src/dsp/lasp_threadedindatahandler.h index 9b1c7c3..3f241bc 100644 --- a/cpp_src/dsp/lasp_threadedindatahandler.h +++ b/cpp_src/dsp/lasp_threadedindatahandler.h @@ -46,7 +46,7 @@ class ThreadedInDataHandlerBase { std::unique_ptr _indatahandler; std::atomic _thread_running{false}; - std::atomic _thread_can_safely_run{false}; + std::atomic _thread_allowed_to_run{false}; GlobalThreadPool _pool; diff --git a/cpp_src/pybind11/lasp_pyindatahandler.cpp b/cpp_src/pybind11/lasp_pyindatahandler.cpp index 76e072e..1219359 100644 --- a/cpp_src/pybind11/lasp_pyindatahandler.cpp +++ b/cpp_src/pybind11/lasp_pyindatahandler.cpp @@ -19,6 +19,8 @@ using namespace std::literals::chrono_literals; using std::cerr; using std::endl; +using rte = std::runtime_error; +using Lck = std::scoped_lock; namespace py = pybind11; @@ -106,8 +108,9 @@ class PyIndataHandler : public ThreadedInDataHandler { /** * @brief The callback functions that is called. */ - py::weakref _cb, _reset_callback; + py::object _cb, _reset_callback; std::atomic _done{false}; + std::recursive_mutex _mtx; public: /** @@ -120,18 +123,26 @@ class PyIndataHandler : public ThreadedInDataHandler { * is called, when a stream stops, this pointer / handle will dangle. */ PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) - : ThreadedInDataHandler(mgr), _cb(cb), _reset_callback(reset_callback) { + : ThreadedInDataHandler(mgr), + _cb(py::weakref(cb)), + _reset_callback(py::weakref(reset_callback)) { DEBUGTRACE_ENTER; + // cerr << "Thread ID: " << std::this_thread::get_id() << endl; /// Start should be called externally, as at constructor time no virtual /// functions should be called. - py::gil_scoped_release release; + if (_cb().is_none() || _reset_callback().is_none()) { + throw rte("cb or reset_callback is none!"); + } startThread(); } ~PyIndataHandler() { DEBUGTRACE_ENTER; + // cerr << "Thread ID: " << std::this_thread::get_id() << endl; /// Callback cannot be called, which results in a deadlock on the GIL /// without this release. py::gil_scoped_release release; + DEBUGTRACE_PRINT("Gil released"); + _done = true; stopThread(); } /** @@ -141,13 +152,13 @@ class PyIndataHandler : public ThreadedInDataHandler { */ void reset(const Daq *daqi) { DEBUGTRACE_ENTER; + // cerr << "Thread ID: " << std::this_thread::get_id() << endl; if (_done) return; { - py::gil_scoped_acquire acquire; try { py::object reset_callback = _reset_callback(); if (reset_callback.is_none()) { - DEBUGTRACE_PRINT("cb is none, weakref killed"); + DEBUGTRACE_PRINT("reset_callback is none, weakref killed"); _done = true; return; } @@ -171,16 +182,16 @@ class PyIndataHandler : public ThreadedInDataHandler { << endl; abort(); } - } - } + } // end of GIL scope + } // end of function reset() /** * @brief Calls the Python callback method / function with a Numpy array of * stream data. */ void inCallback(const DaqData &d) { - // DEBUGTRACE_ENTER; - // cerr << "Thread ID: " << std::this_thread::get_id() << endl; + DEBUGTRACE_ENTER; + // cerr << "=== Enter incallback for thread ID: " << std::this_thread::get_id() << endl; using DataType = DataTypeDescriptor::DataType; if (_done) { @@ -188,6 +199,7 @@ class PyIndataHandler : public ThreadedInDataHandler { return; } { + DEBUGTRACE_PRINT("================ TRYING TO OBTAIN GIL in inCallback..."); py::gil_scoped_acquire acquire; try { py::object py_bool; @@ -234,12 +246,13 @@ class PyIndataHandler : public ThreadedInDataHandler { // reset_callback.reset(); } } catch (py::error_already_set &e) { - cerr << "ERROR: Python raised exception from callback function: "; + cerr << "ERROR (BUG): Python raised exception from callback function: "; cerr << e.what() << endl; abort(); } catch (py::cast_error &e) { cerr << e.what() << endl; - cerr << "ERROR: Python callback does not return boolean value." << endl; + cerr << "ERROR (BUG): Python callback does not return boolean value." + << endl; abort(); } catch (std::exception &e) { cerr << "Caught unknown exception in Python callback:" << e.what() @@ -248,7 +261,9 @@ class PyIndataHandler : public ThreadedInDataHandler { } } // End of scope in which the GIL is acquired - } // End of function + // cerr << "=== LEAVE incallback for thread ID: " << std::this_thread::get_id() << endl; + + } // End of function inCallback() }; void init_datahandler(py::module &m) { diff --git a/python_src/lasp/lasp_record.py b/python_src/lasp/lasp_record.py index 8377a0c..2cc6ae6 100644 --- a/python_src/lasp/lasp_record.py +++ b/python_src/lasp/lasp_record.py @@ -126,8 +126,15 @@ class Recording: logger.debug("Starting record....") + # In the PyInDataHandler, a weak reference is stored to the python + # methods reset and incallback. One way or another, the weak ref is gone + # on the callback thread. If we store an "extra" ref to this method over + # here, the weak ref stays alive. We do not know whether this is a bug + # or a feature, but in any case storing this extra ref to inCallback + # solves the problem. + self._incalback_cpy = self.inCallback self._indataHandler = InDataHandler( - streammgr, self.inCallback, self.resetCallback + streammgr, self._incalback_cpy, self.resetCallback ) if wait: