Create InDataHandler only from the moment startThread() is called. This is safer, and might fix a segfault

This commit is contained in:
Anne de Jong 2024-03-04 14:44:00 +01:00
parent 5e8e40db7a
commit 0841dbd73b
4 changed files with 60 additions and 41 deletions

View File

@ -5,7 +5,7 @@
#include <thread> #include <thread>
InDataHandler::InDataHandler(SmgrHandle mgr, const InCallbackType cb, InDataHandler::InDataHandler(SmgrHandle mgr, const InCallbackType cb,
const InResetType resetfcn) const ResetCallbackType resetfcn)
: _mgr(mgr), inCallback(cb), reset(resetfcn) : _mgr(mgr), inCallback(cb), reset(resetfcn)
#if LASP_DEBUG == 1 #if LASP_DEBUG == 1
, ,

View File

@ -22,7 +22,7 @@ using InCallbackType = std::function<void(const DaqData &)>;
/** /**
* @brief Function definition for the reset callback. * @brief Function definition for the reset callback.
*/ */
using InResetType = std::function<void(const Daq *)>; using ResetCallbackType = std::function<void(const Daq *)>;
class InDataHandler { class InDataHandler {
@ -38,7 +38,7 @@ protected:
public: public:
~InDataHandler(); ~InDataHandler();
const InCallbackType inCallback; const InCallbackType inCallback;
const InResetType reset; const ResetCallbackType reset;
/** /**
* @brief When constructed, the handler is added to the stream manager, which * @brief When constructed, the handler is added to the stream manager, which
@ -50,7 +50,7 @@ public:
* changes state. * changes state.
*/ */
InDataHandler(SmgrHandle mgr, InCallbackType cb, InDataHandler(SmgrHandle mgr, InCallbackType cb,
InResetType resetfcn); ResetCallbackType resetfcn);
/** /**
* @brief Adds the current InDataHandler to the list of handlers in the * @brief Adds the current InDataHandler to the list of handlers in the

View File

@ -1,13 +1,15 @@
/* #define DEBUGTRACE_ENABLED */ /* #define DEBUGTRACE_ENABLED */
#include "lasp_threadedindatahandler.h" #include "lasp_threadedindatahandler.h"
#include "debugtrace.hpp"
#include "lasp_daqdata.h"
#include "lasp_thread.h"
#include <future> #include <future>
#include <optional> #include <optional>
#include <queue> #include <queue>
#include <thread> #include <thread>
#include "debugtrace.hpp"
#include "lasp_daqdata.h"
#include "lasp_thread.h"
using namespace std::literals::chrono_literals; using namespace std::literals::chrono_literals;
using lck = std::scoped_lock<std::mutex>; using lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error; using rte = std::runtime_error;
@ -20,26 +22,26 @@ class SafeQueue {
std::mutex _mtx; std::mutex _mtx;
std::atomic<uint32_t> _contents{0}; std::atomic<uint32_t> _contents{0};
public: public:
void push(const DaqData &d) { void push(const DaqData &d) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
lck lock(_mtx); lck lock(_mtx);
_queue.push(d); _queue.push(d);
_contents++; _contents++;
assert(_contents == _queue.size()); assert(_contents == _queue.size());
} }
DaqData pop() { DaqData pop() {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
if (empty()) { if (empty()) {
throw rte("BUG: Pop on empty queue"); throw rte("BUG: Pop on empty queue");
} }
lck lock(_mtx); lck lock(_mtx);
/* DaqData d(std::move(_queue.front())); */ /* DaqData d(std::move(_queue.front())); */
DaqData d(_queue.front()); DaqData d(_queue.front());
_queue.pop(); _queue.pop();
_contents--; _contents--;
assert(_contents == _queue.size()); assert(_contents == _queue.size());
return d; return d;
} }
/** /**
@ -52,32 +54,40 @@ public:
}; };
ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr, ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr,
InCallbackType cb, InCallbackType cb,
InResetType reset) ResetCallbackType reset)
: _indatahandler( : _queue(std::make_unique<SafeQueue>()),
mgr, inCallback(cb),
std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this, resetCallback(reset),
_1), _smgr(mgr) {
reset),
_queue(std::make_unique<SafeQueue>()), inCallback(cb) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
} }
void ThreadedInDataHandlerBase::startThread() { void ThreadedInDataHandlerBase::startThread() {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
if (_indatahandler) {
throw rte("BUG: ThreadedIndataHandler already started");
}
SmgrHandle smgr = _smgr.lock();
if (!smgr) {
cerr << "Stream manager destructed" << endl;
return;
}
_indatahandler = std::make_unique<InDataHandler>(
smgr,
std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this,
_1),
resetCallback);
_indatahandler->start();
_thread_can_safely_run = true; _thread_can_safely_run = true;
_indatahandler.start();
} }
void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
const DaqData &daqdata) { const DaqData &daqdata) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
std::scoped_lock lck(_mtx);
// Early return in case object is under DESTRUCTION // Early return in case object is under DESTRUCTION
if (!_thread_can_safely_run) if (!_thread_can_safely_run) return;
return;
_queue->push(daqdata); _queue->push(daqdata);
if (!_thread_running) { if (!_thread_running) {
@ -88,20 +98,25 @@ void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
void ThreadedInDataHandlerBase::stopThread() { void ThreadedInDataHandlerBase::stopThread() {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
// Make sure inCallback is no longer called if(!_indatahandler) {
_thread_can_safely_run = false; throw rte("BUG: ThreadedIndataHandler not running");
_indatahandler.stop(); }
std::scoped_lock lck(_mtx); // Make sure no new data arrives
_indatahandler->stop();
// Stop the existing thread
_thread_can_safely_run = false;
// Then wait in steps for the thread to stop running. // Then wait in steps for the thread to stop running.
while (_thread_running) { while (_thread_running) {
std::this_thread::sleep_for(10us); std::this_thread::sleep_for(10us);
} }
// Kill the handler
_indatahandler.reset();
} }
ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
if (_thread_can_safely_run) { if (_thread_can_safely_run) {
stopThread(); stopThread();
@ -113,12 +128,10 @@ ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
} }
void ThreadedInDataHandlerBase::threadFcn() { void ThreadedInDataHandlerBase::threadFcn() {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
_thread_running = true; _thread_running = true;
while (!_queue->empty() && _thread_can_safely_run) { while (!_queue->empty() && _thread_can_safely_run) {
// Call inCallback_threaded // Call inCallback_threaded
inCallback(_queue->pop()); inCallback(_queue->pop());
} }

View File

@ -29,21 +29,27 @@ class ThreadedInDataHandlerBase {
* @brief The queue used to push elements to the handling thread. * @brief The queue used to push elements to the handling thread.
*/ */
InDataHandler _indatahandler;
std::unique_ptr<SafeQueue> _queue; std::unique_ptr<SafeQueue> _queue;
mutable std::recursive_mutex _mtx;
std::atomic<bool> _thread_running{false};
std::atomic<bool> _thread_can_safely_run{false};
GlobalThreadPool _pool;
/** /**
* @brief Function pointer that is called when new DaqData arrives. * @brief Function pointer that is called when new DaqData arrives.
*/ */
const InCallbackType inCallback; const InCallbackType inCallback;
/**
* @brief Function pointer that is called when reset() is called.
*/
const ResetCallbackType resetCallback;
std::weak_ptr<StreamMgr> _smgr;
std::unique_ptr<InDataHandler> _indatahandler;
std::atomic<bool> _thread_running{false};
std::atomic<bool> _thread_can_safely_run{false};
GlobalThreadPool _pool;
void threadFcn(); void threadFcn();
@ -58,7 +64,7 @@ class ThreadedInDataHandlerBase {
void _inCallbackFromInDataHandler(const DaqData &daqdata); void _inCallbackFromInDataHandler(const DaqData &daqdata);
public: public:
ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset); ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, ResetCallbackType reset);
~ThreadedInDataHandlerBase(); ~ThreadedInDataHandlerBase();
/** /**
* @brief This method should be called from the derived class' constructor, * @brief This method should be called from the derived class' constructor,