lasp/src/lasp/dsp/lasp_threadedindatahandler.cpp

103 lines
2.0 KiB
C++

/* #define DEBUGTRACE_ENABLED */
#include "lasp_threadedindatahandler.h"
#include "debugtrace.hpp"
#include "lasp_thread.h"
#include <future>
#include <thread>
#include <queue>
#include <optional>
using namespace std::literals::chrono_literals;
using lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
class SafeQueue {
std::queue<DaqData> _queue;
std::mutex _mtx;
std::atomic_int32_t _contents {0};
public:
void push(const DaqData& d) {
DEBUGTRACE_ENTER;
lck lock(_mtx);
_queue.push(d);
_contents++;
}
DaqData pop() {
DEBUGTRACE_ENTER;
if(_contents ==0) {
throw rte("BUG: Pop on empty queue");
}
lck lock(_mtx);
/* DaqData d(std::move(_queue.front())); */
DaqData d(_queue.front());
_queue.pop();
_contents--;
return d;
}
bool empty() const {
return _contents == 0;
}
};
ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr)
: InDataHandler(mgr), _queue(std::make_unique<SafeQueue>()) {
DEBUGTRACE_ENTER;
// Initialize thread pool, if not already done
getPool();
}
bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) {
DEBUGTRACE_ENTER;
if (!_lastCallbackResult) {
return false;
}
_queue->push(daqdata);
auto &pool = getPool();
if (!_thread_running && (!_stopThread) && _lastCallbackResult) {
pool.push_task(&ThreadedInDataHandler::threadFcn, this);
}
return _lastCallbackResult;
}
ThreadedInDataHandler::~ThreadedInDataHandler() {
DEBUGTRACE_ENTER;
_stopThread = true;
// Then wait in steps for the thread to stop running.
while (_thread_running) {
std::this_thread::sleep_for(10us);
}
}
void ThreadedInDataHandler::threadFcn() {
DEBUGTRACE_ENTER;
_thread_running = true;
if (!_queue->empty() && !_stopThread) {
DaqData d(_queue->pop());
// Call inCallback_threaded
if (inCallback_threaded(d) == false) {
_lastCallbackResult = false;
_thread_running = false;
return;
}
}
_lastCallbackResult = true;
_thread_running = false;
}