From 83c7aa6adeebeeeaf1d699f9c8de715267db7039 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Thu, 14 Mar 2024 08:25:47 +0100 Subject: [PATCH] More subtle locking and unlocking of mutexes in stopstream --- cpp_src/device/lasp_streammgr.cpp | 63 ++++++++++++++++++------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/cpp_src/device/lasp_streammgr.cpp b/cpp_src/device/lasp_streammgr.cpp index e8db813..b72f575 100644 --- a/cpp_src/device/lasp_streammgr.cpp +++ b/cpp_src/device/lasp_streammgr.cpp @@ -2,13 +2,13 @@ #include "lasp_streammgr.h" #include -#include #include #include #include #include #include +#include #include "debugtrace.hpp" #include "lasp_biquadbank.h" @@ -87,7 +87,7 @@ void StreamMgr::rescanDAQDevices(bool background, std::function callback) { DEBUGTRACE_ENTER; DEBUGTRACE_PRINT(background); - if(_scanningDevices) { + if (_scanningDevices) { throw rte("A background device scan is already busy"); } @@ -148,7 +148,6 @@ void StreamMgr::inCallback(const DaqData &data) { for (auto &handler : _inDataHandlers) { handler->inCallback(input_filtered); } - } else { /// No input filters DEBUGTRACE_PRINT("Calling incallback for handlers..."); @@ -253,10 +252,10 @@ StreamMgr::~StreamMgr() { while (_scanningDevices) { std::this_thread::sleep_for(10us); } - + #if LASP_DEBUG == 1 { // Careful, this lock needs to be released to make sure the streams can - // obtain a lock to the stream manager. + // obtain a lock to the stream manager. Lck lck(_mtx); checkRightThread(); } @@ -276,16 +275,18 @@ StreamMgr::~StreamMgr() { } void StreamMgr::stopAllStreams() { DEBUGTRACE_ENTER; + { + Lck lck(_mtx); + checkRightThread(); + } // No lock here! - // Lck lck(_mtx); - checkRightThread(); _inputStream.reset(); _outputStream.reset(); } void StreamMgr::startStream(const DaqConfiguration &config) { DEBUGTRACE_ENTER; - if(_scanningDevices) { + if (_scanningDevices) { throw rte("DAQ device scan is busy. Cannot start stream."); } Lck lck(_mtx); @@ -424,31 +425,41 @@ void StreamMgr::startStream(const DaqConfiguration &config) { void StreamMgr::stopStream(const StreamType t) { DEBUGTRACE_ENTER; checkRightThread(); + bool resetHandlers = false; + std::unique_ptr *streamToStop = nullptr; - if (t == StreamType::input) { - if (!_inputStream) { - throw rte("Input stream is not running"); + { // Mutex locked in this scope + Lck lck(_mtx); + if (t == StreamType::input) { + if (!_inputStream) { + throw rte("Input stream is not running"); + } + streamToStop = std::addressof(_inputStream); + resetHandlers = true; + } else { + /// t == output + /// Kill input stream in case that one is a duplex stream + if (_inputStream && _inputStream->duplexMode()) { + streamToStop = std::addressof(_inputStream); + } else { + if (!_outputStream) { + throw rte("Output stream is not running"); + } + streamToStop = std::addressof(_outputStream); + } // end else } - /// Kills input stream - _inputStream.reset(); + } // End of mutex lock. When stopping stream, mutex should be unlocked. - /// Send reset to all in data handlers + // If we arrive here, we should have a stream to stop. + assert(streamToStop != nullptr); + streamToStop->reset(); + + /// Send reset to all in data handlers + if (resetHandlers) { Lck lck(_mtx); for (auto &handler : _inDataHandlers) { handler->reset(nullptr); } - } else { - /// t == output - - /// Kill input stream in case that one is a duplex stream - if (_inputStream && _inputStream->duplexMode()) { - _inputStream.reset(); - } else { - if (!_outputStream) { - throw rte("Output stream is not running"); - } - _outputStream.reset(); - } // end else } }