/* #define DEBUGTRACE_ENABLED */ #include "lasp_threadedindatahandler.h" #include "debugtrace.hpp" #include "lasp_daqdata.h" #include "lasp_thread.h" #include #include #include #include using namespace std::literals::chrono_literals; using lck = std::scoped_lock; using rte = std::runtime_error; using std::cerr; using std::endl; using std::placeholders::_1; class SafeQueue { std::queue _queue; std::mutex _mtx; std::atomic _contents{0}; public: void push(const DaqData &d) { DEBUGTRACE_ENTER; lck lock(_mtx); _queue.push(d); _contents++; assert(_contents == _queue.size()); } DaqData pop() { DEBUGTRACE_ENTER; if (empty()) { throw rte("BUG: Pop on empty queue"); } lck lock(_mtx); /* DaqData d(std::move(_queue.front())); */ DaqData d(_queue.front()); _queue.pop(); _contents--; assert(_contents == _queue.size()); return d; } /** * @brief Empty implemented using atomic var, safes some mutex lock/unlock * cycles. * * @return true if queue is empty */ bool empty() const { return _contents == 0; } }; ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset) : _indatahandler( mgr, std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this, _1), reset), _queue(std::make_unique()), inCallback(cb) { DEBUGTRACE_ENTER; } void ThreadedInDataHandlerBase::startThread() { DEBUGTRACE_ENTER; _thread_can_safely_run = true; _indatahandler.start(); } void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( const DaqData &daqdata) { DEBUGTRACE_ENTER; std::scoped_lock lck(_mtx); // Early return in case object is under DESTRUCTION if (!_thread_can_safely_run) return; _queue->push(daqdata); if (!_thread_running) { DEBUGTRACE_PRINT("Pushing new thread in pool"); _pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this); } } void ThreadedInDataHandlerBase::stopThread() { DEBUGTRACE_ENTER; // Make sure inCallback is no longer called _thread_can_safely_run = false; _indatahandler.stop(); std::scoped_lock lck(_mtx); // Then wait in steps for the thread to stop running. while (_thread_running) { std::this_thread::sleep_for(10us); } } ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { DEBUGTRACE_ENTER; if (_thread_can_safely_run) { stopThread(); cerr << "*** BUG: InDataHandlers have not been all stopped, while " "StreamMgr destructor is called. This is a misuse BUG." << endl; abort(); } } void ThreadedInDataHandlerBase::threadFcn() { DEBUGTRACE_ENTER; _thread_running = true; while (!_queue->empty() && _thread_can_safely_run) { // Call inCallback_threaded inCallback(_queue->pop()); } _thread_running = false; }