lasp/cpp_src/dsp/lasp_threadedindatahandler.cpp

140 lines
3.4 KiB
C++

/* #define DEBUGTRACE_ENABLED */
#include "lasp_threadedindatahandler.h"
#include <future>
#include <optional>
#include <queue>
#include <thread>
#include "debugtrace.hpp"
#include "lasp_daqdata.h"
#include "lasp_thread.h"
using namespace std::literals::chrono_literals;
using lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
using std::cerr;
using std::endl;
using std::placeholders::_1;
class SafeQueue {
std::queue<DaqData> _queue;
std::mutex _mtx;
std::atomic<uint32_t> _contents{0};
public:
void push(const DaqData &d) {
DEBUGTRACE_ENTER;
lck lock(_mtx);
_queue.push(d);
_contents++;
assert(_contents == _queue.size());
}
DaqData pop() {
DEBUGTRACE_ENTER;
if (empty()) {
throw rte("BUG: Pop on empty queue");
}
lck lock(_mtx);
/* DaqData d(std::move(_queue.front())); */
DaqData d(_queue.front());
_queue.pop();
_contents--;
assert(_contents == _queue.size());
return d;
}
/**
* @brief Empty implemented using atomic var, safes some mutex lock/unlock
* cycles.
*
* @return true if queue is empty
*/
bool empty() const { return _contents == 0; }
};
ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr,
InCallbackType cb,
ResetCallbackType reset)
: _queue(std::make_unique<SafeQueue>()),
inCallback(cb),
resetCallback(reset),
_smgr(mgr) {
DEBUGTRACE_ENTER;
}
void ThreadedInDataHandlerBase::startThread() {
DEBUGTRACE_ENTER;
if (_indatahandler) {
throw rte("BUG: ThreadedIndataHandler already started");
}
SmgrHandle smgr = _smgr.lock();
if (!smgr) {
cerr << "Stream manager destructed" << endl;
return;
}
_indatahandler = std::make_unique<InDataHandler>(
smgr,
std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this,
_1),
resetCallback);
_thread_can_safely_run = true;
_indatahandler->start();
}
void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
const DaqData &daqdata) {
DEBUGTRACE_ENTER;
// Early return in case object is under DESTRUCTION
if (!_thread_can_safely_run) return;
_queue->push(daqdata);
if (!_thread_running) {
DEBUGTRACE_PRINT("Pushing new thread in pool");
_thread_running = true;
_pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this);
}
}
void ThreadedInDataHandlerBase::stopThread() {
DEBUGTRACE_ENTER;
if (!_indatahandler) {
throw rte("BUG: ThreadedIndataHandler not running");
}
// Make sure no new data arrives
_indatahandler->stop();
// Stop the existing thread
_thread_can_safely_run = false;
// Then wait in steps for the thread to stop running.
while (_thread_running) {
std::this_thread::sleep_for(10us);
}
// Kill the handler
_indatahandler.reset();
}
ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
DEBUGTRACE_ENTER;
if (_thread_can_safely_run) {
stopThread();
cerr << "*** BUG: InDataHandlers have not been all stopped, while "
"StreamMgr destructor is called. This is a misuse BUG."
<< endl;
abort();
}
}
void ThreadedInDataHandlerBase::threadFcn() {
DEBUGTRACE_ENTER;
while (!_queue->empty() && _thread_can_safely_run) {
// Call inCallback_threaded
inCallback(_queue->pop());
}
_thread_running = false;
}