From 0841dbd73baf1d8a08980e5c022fdde167352280 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Mon, 4 Mar 2024 14:44:00 +0100 Subject: [PATCH] Create InDataHandler only from the moment startThread() is called. This is safer, and might fix a segfault --- cpp_src/device/lasp_indatahandler.cpp | 2 +- cpp_src/device/lasp_indatahandler.h | 6 +- cpp_src/dsp/lasp_threadedindatahandler.cpp | 69 +++++++++++++--------- cpp_src/dsp/lasp_threadedindatahandler.h | 24 +++++--- 4 files changed, 60 insertions(+), 41 deletions(-) diff --git a/cpp_src/device/lasp_indatahandler.cpp b/cpp_src/device/lasp_indatahandler.cpp index 8bdeb17..c9a4d15 100644 --- a/cpp_src/device/lasp_indatahandler.cpp +++ b/cpp_src/device/lasp_indatahandler.cpp @@ -5,7 +5,7 @@ #include InDataHandler::InDataHandler(SmgrHandle mgr, const InCallbackType cb, - const InResetType resetfcn) + const ResetCallbackType resetfcn) : _mgr(mgr), inCallback(cb), reset(resetfcn) #if LASP_DEBUG == 1 , diff --git a/cpp_src/device/lasp_indatahandler.h b/cpp_src/device/lasp_indatahandler.h index 2ff47b1..85daf56 100644 --- a/cpp_src/device/lasp_indatahandler.h +++ b/cpp_src/device/lasp_indatahandler.h @@ -22,7 +22,7 @@ using InCallbackType = std::function; /** * @brief Function definition for the reset callback. */ -using InResetType = std::function; +using ResetCallbackType = std::function; class InDataHandler { @@ -38,7 +38,7 @@ protected: public: ~InDataHandler(); const InCallbackType inCallback; - const InResetType reset; + const ResetCallbackType reset; /** * @brief When constructed, the handler is added to the stream manager, which @@ -50,7 +50,7 @@ public: * changes state. */ InDataHandler(SmgrHandle mgr, InCallbackType cb, - InResetType resetfcn); + ResetCallbackType resetfcn); /** * @brief Adds the current InDataHandler to the list of handlers in the diff --git a/cpp_src/dsp/lasp_threadedindatahandler.cpp b/cpp_src/dsp/lasp_threadedindatahandler.cpp index a304fe3..cd60750 100644 --- a/cpp_src/dsp/lasp_threadedindatahandler.cpp +++ b/cpp_src/dsp/lasp_threadedindatahandler.cpp @@ -1,13 +1,15 @@ /* #define DEBUGTRACE_ENABLED */ #include "lasp_threadedindatahandler.h" -#include "debugtrace.hpp" -#include "lasp_daqdata.h" -#include "lasp_thread.h" + #include #include #include #include +#include "debugtrace.hpp" +#include "lasp_daqdata.h" +#include "lasp_thread.h" + using namespace std::literals::chrono_literals; using lck = std::scoped_lock; using rte = std::runtime_error; @@ -20,26 +22,26 @@ class SafeQueue { std::mutex _mtx; std::atomic _contents{0}; -public: + public: void push(const DaqData &d) { DEBUGTRACE_ENTER; lck lock(_mtx); _queue.push(d); _contents++; - assert(_contents == _queue.size()); + assert(_contents == _queue.size()); } DaqData pop() { DEBUGTRACE_ENTER; if (empty()) { throw rte("BUG: Pop on empty queue"); - } + } lck lock(_mtx); /* DaqData d(std::move(_queue.front())); */ DaqData d(_queue.front()); _queue.pop(); _contents--; - assert(_contents == _queue.size()); + assert(_contents == _queue.size()); return d; } /** @@ -52,32 +54,40 @@ public: }; ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr, - InCallbackType cb, - InResetType reset) - : _indatahandler( - mgr, - std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this, - _1), - reset), - _queue(std::make_unique()), inCallback(cb) { - + InCallbackType cb, + ResetCallbackType reset) + : _queue(std::make_unique()), + inCallback(cb), + resetCallback(reset), + _smgr(mgr) { DEBUGTRACE_ENTER; - } void ThreadedInDataHandlerBase::startThread() { DEBUGTRACE_ENTER; + if (_indatahandler) { + throw rte("BUG: ThreadedIndataHandler already started"); + } + SmgrHandle smgr = _smgr.lock(); + if (!smgr) { + cerr << "Stream manager destructed" << endl; + return; + } + _indatahandler = std::make_unique( + smgr, + std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this, + _1), + resetCallback); + + _indatahandler->start(); _thread_can_safely_run = true; - _indatahandler.start(); } void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( const DaqData &daqdata) { DEBUGTRACE_ENTER; - std::scoped_lock lck(_mtx); // Early return in case object is under DESTRUCTION - if (!_thread_can_safely_run) - return; + if (!_thread_can_safely_run) return; _queue->push(daqdata); if (!_thread_running) { @@ -88,20 +98,25 @@ void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( void ThreadedInDataHandlerBase::stopThread() { DEBUGTRACE_ENTER; - // Make sure inCallback is no longer called - _thread_can_safely_run = false; - _indatahandler.stop(); + if(!_indatahandler) { + throw rte("BUG: ThreadedIndataHandler not running"); + } - std::scoped_lock lck(_mtx); + // Make sure no new data arrives + _indatahandler->stop(); + + // Stop the existing thread + _thread_can_safely_run = false; // Then wait in steps for the thread to stop running. while (_thread_running) { std::this_thread::sleep_for(10us); } + // Kill the handler + _indatahandler.reset(); } ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { - DEBUGTRACE_ENTER; if (_thread_can_safely_run) { stopThread(); @@ -113,12 +128,10 @@ ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { } void ThreadedInDataHandlerBase::threadFcn() { - DEBUGTRACE_ENTER; _thread_running = true; while (!_queue->empty() && _thread_can_safely_run) { - // Call inCallback_threaded inCallback(_queue->pop()); } diff --git a/cpp_src/dsp/lasp_threadedindatahandler.h b/cpp_src/dsp/lasp_threadedindatahandler.h index 0d7f030..9b1c7c3 100644 --- a/cpp_src/dsp/lasp_threadedindatahandler.h +++ b/cpp_src/dsp/lasp_threadedindatahandler.h @@ -29,21 +29,27 @@ class ThreadedInDataHandlerBase { * @brief The queue used to push elements to the handling thread. */ - InDataHandler _indatahandler; std::unique_ptr _queue; - mutable std::recursive_mutex _mtx; - - std::atomic _thread_running{false}; - std::atomic _thread_can_safely_run{false}; - - GlobalThreadPool _pool; - /** * @brief Function pointer that is called when new DaqData arrives. */ const InCallbackType inCallback; + /** + * @brief Function pointer that is called when reset() is called. + */ + const ResetCallbackType resetCallback; + + std::weak_ptr _smgr; + + std::unique_ptr _indatahandler; + + std::atomic _thread_running{false}; + std::atomic _thread_can_safely_run{false}; + + GlobalThreadPool _pool; + void threadFcn(); @@ -58,7 +64,7 @@ class ThreadedInDataHandlerBase { void _inCallbackFromInDataHandler(const DaqData &daqdata); public: - ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset); + ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, ResetCallbackType reset); ~ThreadedInDataHandlerBase(); /** * @brief This method should be called from the derived class' constructor,