// #define DEBUGTRACE_ENABLED #include "lasp_threadedindatahandler.h" #include #include #include #include #include "debugtrace.hpp" #include "lasp_daqdata.h" #include "lasp_thread.h" using namespace std::literals::chrono_literals; using lck = std::scoped_lock; using rte = std::runtime_error; using std::cerr; using std::endl; using std::placeholders::_1; class SafeQueue { std::queue _queue; std::mutex _mtx; std::atomic _contents{0}; public: void push(const DaqData &d) { DEBUGTRACE_ENTER; lck lock(_mtx); _queue.push(d); _contents++; assert(_contents == _queue.size()); } DaqData pop() { DEBUGTRACE_ENTER; if (empty()) { throw rte("BUG: Pop on empty queue"); } lck lock(_mtx); /* DaqData d(std::move(_queue.front())); */ DaqData d(_queue.front()); _queue.pop(); _contents--; assert(_contents == _queue.size()); return d; } /** * @brief Empty implemented using atomic var, safes some mutex lock/unlock * cycles. * * @return true if queue is empty */ bool empty() const { return _contents == 0; } }; ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, ResetCallbackType reset) : _queue(std::make_unique()), inCallback(cb), resetCallback(reset), _smgr(mgr) { DEBUGTRACE_ENTER; } void ThreadedInDataHandlerBase::startThread() { DEBUGTRACE_ENTER; if (_indatahandler) { throw rte("BUG: ThreadedIndataHandler already started"); } SmgrHandle smgr = _smgr.lock(); if (!smgr) { cerr << "Stream manager destructed" << endl; return; } _indatahandler = std::make_unique( smgr, std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this, _1), resetCallback); _thread_allowed_to_run = true; _indatahandler->start(); } void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( const DaqData &daqdata) { DEBUGTRACE_ENTER; // Early return in case object is under DESTRUCTION if (!_thread_allowed_to_run) return; _queue->push(daqdata); if (!_thread_running) { DEBUGTRACE_PRINT("Pushing new thread in pool"); _thread_running = true; _pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this); } } void ThreadedInDataHandlerBase::stopThread() { DEBUGTRACE_ENTER; if (!_indatahandler) { throw rte("BUG: ThreadedIndataHandler not running"); } // Stop the existing thread _thread_allowed_to_run = false; // Make sure no new data arrives _indatahandler->stop(); _indatahandler.reset(); DEBUGTRACE_PRINT("Indatahandler stopped. Waiting for thread to finish..."); // Then wait in steps for the thread to stop running. while (_thread_running) { std::this_thread::sleep_for(10us); } DEBUGTRACE_PRINT("Thread stopped"); // Kill the handler DEBUGTRACE_PRINT("Handler resetted"); } ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { DEBUGTRACE_ENTER; if (_thread_allowed_to_run) { stopThread(); cerr << "*** BUG: InDataHandlers have not been all stopped, while " "StreamMgr destructor is called. This is a misuse BUG." << endl; abort(); } } void ThreadedInDataHandlerBase::threadFcn() { DEBUGTRACE_ENTER; while (!_queue->empty() && _thread_allowed_to_run) { // Call inCallback_threaded inCallback(_queue->pop()); } _thread_running = false; }