lasp/src/lasp/dsp/lasp_threadedindatahandler.cpp

128 lines
2.8 KiB
C++

/* #define DEBUGTRACE_ENABLED */
#include "lasp_threadedindatahandler.h"
#include "debugtrace.hpp"
#include "lasp_daqdata.h"
#include "lasp_thread.h"
#include <future>
#include <optional>
#include <queue>
#include <thread>
using namespace std::literals::chrono_literals;
using lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
using std::cerr;
using std::endl;
class SafeQueue {
std::queue<DaqData> _queue;
std::mutex _mtx;
std::atomic_int32_t _contents{0};
public:
void push(const DaqData &d) {
DEBUGTRACE_ENTER;
lck lock(_mtx);
_queue.push(d);
_contents++;
}
DaqData pop() {
DEBUGTRACE_ENTER;
if (empty()) {
throw rte("BUG: Pop on empty queue");
}
lck lock(_mtx);
/* DaqData d(std::move(_queue.front())); */
DaqData d(_queue.front());
_queue.pop();
_contents--;
return d;
}
/**
* @brief Empty implemented using atomic var, safes some mutex lock/unlock
* cycles.
*
* @return true if queue is empty
*/
bool empty() const { return _contents == 0; }
};
ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr)
: InDataHandler(mgr), _queue(std::make_unique<SafeQueue>()) {
DEBUGTRACE_ENTER;
// Initialize thread pool, if not already done
getPool();
}
void ThreadedInDataHandler::startThread() {
_thread_can_safely_run = true;
start();
}
bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) {
DEBUGTRACE_ENTER;
std::scoped_lock lck(_mtx);
// Early return in case object is under DESTRUCTION
if (!_thread_can_safely_run)
return true;
if (!_lastCallbackResult) {
return false;
}
_queue->push(daqdata);
if (!_thread_running && _lastCallbackResult) {
auto &pool = getPool();
DEBUGTRACE_PRINT("Pushing new thread in pool");
_thread_running = true;
pool.push_task(&ThreadedInDataHandler::threadFcn, this);
}
return _lastCallbackResult;
}
void ThreadedInDataHandler::stopThread() {
// Make sure inCallback is no longer called
_thread_can_safely_run = false;
stop();
std::scoped_lock lck(_mtx);
// Then wait in steps for the thread to stop running.
while (_thread_running) {
std::this_thread::sleep_for(10us);
}
}
ThreadedInDataHandler::~ThreadedInDataHandler() {
DEBUGTRACE_ENTER;
if (_thread_can_safely_run) {
stopThread();
cerr << "*** BUG: InDataHandlers have not been all stopped, while "
"StreamMgr destructor is called. This is a misuse BUG."
<< endl;
abort();
}
}
void ThreadedInDataHandler::threadFcn() {
DEBUGTRACE_ENTER;
while (!_queue->empty() && _thread_can_safely_run) {
// Call inCallback_threaded
if (!inCallback_threaded(_queue->pop())) {
cerr << "*********** Callback result returned false! *************"
<< endl;
_lastCallbackResult = false;
}
}
_thread_running = false;
}