2022-10-06 19:13:21 +00:00
|
|
|
/* #define DEBUGTRACE_ENABLED */
|
2022-08-14 19:00:22 +00:00
|
|
|
#include "lasp_threadedindatahandler.h"
|
|
|
|
#include "debugtrace.hpp"
|
2023-06-06 14:05:24 +00:00
|
|
|
#include "lasp_daqdata.h"
|
2022-08-14 19:00:22 +00:00
|
|
|
#include "lasp_thread.h"
|
|
|
|
#include <future>
|
2022-10-10 17:17:38 +00:00
|
|
|
#include <optional>
|
2023-06-06 14:05:24 +00:00
|
|
|
#include <queue>
|
|
|
|
#include <thread>
|
2022-08-14 19:00:22 +00:00
|
|
|
|
|
|
|
using namespace std::literals::chrono_literals;
|
2022-10-10 17:17:38 +00:00
|
|
|
using lck = std::scoped_lock<std::mutex>;
|
|
|
|
using rte = std::runtime_error;
|
2022-10-16 19:30:17 +00:00
|
|
|
using std::cerr;
|
|
|
|
using std::endl;
|
2023-06-09 08:43:04 +00:00
|
|
|
using std::placeholders::_1;
|
2022-10-10 17:17:38 +00:00
|
|
|
|
|
|
|
class SafeQueue {
|
|
|
|
std::queue<DaqData> _queue;
|
|
|
|
std::mutex _mtx;
|
2023-06-17 14:03:14 +00:00
|
|
|
std::atomic<uint32_t> _contents{0};
|
2023-06-06 14:05:24 +00:00
|
|
|
|
|
|
|
public:
|
|
|
|
void push(const DaqData &d) {
|
2022-10-10 17:17:38 +00:00
|
|
|
DEBUGTRACE_ENTER;
|
|
|
|
lck lock(_mtx);
|
|
|
|
_queue.push(d);
|
|
|
|
_contents++;
|
2023-06-17 14:03:14 +00:00
|
|
|
assert(_contents == _queue.size());
|
2022-10-10 17:17:38 +00:00
|
|
|
}
|
|
|
|
DaqData pop() {
|
|
|
|
DEBUGTRACE_ENTER;
|
2022-10-16 19:26:06 +00:00
|
|
|
if (empty()) {
|
2022-10-10 17:17:38 +00:00
|
|
|
throw rte("BUG: Pop on empty queue");
|
2023-06-17 14:03:14 +00:00
|
|
|
}
|
2022-10-10 17:17:38 +00:00
|
|
|
lck lock(_mtx);
|
|
|
|
|
|
|
|
/* DaqData d(std::move(_queue.front())); */
|
|
|
|
DaqData d(_queue.front());
|
|
|
|
_queue.pop();
|
|
|
|
_contents--;
|
2023-06-17 14:03:14 +00:00
|
|
|
assert(_contents == _queue.size());
|
2022-10-10 17:17:38 +00:00
|
|
|
return d;
|
|
|
|
}
|
2022-10-16 19:26:06 +00:00
|
|
|
/**
|
|
|
|
* @brief Empty implemented using atomic var, safes some mutex lock/unlock
|
|
|
|
* cycles.
|
|
|
|
*
|
|
|
|
* @return true if queue is empty
|
|
|
|
*/
|
|
|
|
bool empty() const { return _contents == 0; }
|
2022-10-10 17:17:38 +00:00
|
|
|
};
|
2022-08-14 19:00:22 +00:00
|
|
|
|
2023-06-09 08:43:04 +00:00
|
|
|
ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr,
|
|
|
|
InCallbackType cb,
|
|
|
|
InResetType reset)
|
|
|
|
: _indatahandler(
|
|
|
|
mgr,
|
|
|
|
std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this,
|
|
|
|
_1),
|
|
|
|
reset),
|
|
|
|
_queue(std::make_unique<SafeQueue>()), inCallback(cb) {
|
2022-08-14 19:00:22 +00:00
|
|
|
|
2023-06-06 14:05:24 +00:00
|
|
|
DEBUGTRACE_ENTER;
|
2022-08-14 19:00:22 +00:00
|
|
|
|
2023-06-06 14:05:24 +00:00
|
|
|
}
|
2023-06-09 08:43:04 +00:00
|
|
|
void ThreadedInDataHandlerBase::startThread() {
|
2023-06-07 19:49:07 +00:00
|
|
|
DEBUGTRACE_ENTER;
|
2023-06-06 14:05:24 +00:00
|
|
|
_thread_can_safely_run = true;
|
2023-06-09 08:43:04 +00:00
|
|
|
_indatahandler.start();
|
2023-06-06 14:05:24 +00:00
|
|
|
}
|
2022-08-14 19:00:22 +00:00
|
|
|
|
2023-06-09 08:43:04 +00:00
|
|
|
void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
|
|
|
|
const DaqData &daqdata) {
|
2022-10-10 17:17:38 +00:00
|
|
|
DEBUGTRACE_ENTER;
|
2023-06-06 14:05:24 +00:00
|
|
|
std::scoped_lock lck(_mtx);
|
|
|
|
|
|
|
|
// Early return in case object is under DESTRUCTION
|
|
|
|
if (!_thread_can_safely_run)
|
2023-06-09 08:43:04 +00:00
|
|
|
return;
|
2022-10-10 17:17:38 +00:00
|
|
|
|
|
|
|
_queue->push(daqdata);
|
2023-06-09 08:43:04 +00:00
|
|
|
if (!_thread_running) {
|
2022-10-16 19:26:06 +00:00
|
|
|
DEBUGTRACE_PRINT("Pushing new thread in pool");
|
2023-06-10 13:47:52 +00:00
|
|
|
_pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this);
|
2022-08-14 19:00:22 +00:00
|
|
|
}
|
|
|
|
}
|
2023-06-09 08:43:04 +00:00
|
|
|
|
|
|
|
void ThreadedInDataHandlerBase::stopThread() {
|
2023-06-07 19:49:07 +00:00
|
|
|
DEBUGTRACE_ENTER;
|
2023-06-06 14:05:24 +00:00
|
|
|
// Make sure inCallback is no longer called
|
|
|
|
_thread_can_safely_run = false;
|
2023-06-09 08:43:04 +00:00
|
|
|
_indatahandler.stop();
|
2022-08-14 19:00:22 +00:00
|
|
|
|
2023-06-06 14:05:24 +00:00
|
|
|
std::scoped_lock lck(_mtx);
|
2022-08-14 19:00:22 +00:00
|
|
|
|
|
|
|
// Then wait in steps for the thread to stop running.
|
|
|
|
while (_thread_running) {
|
|
|
|
std::this_thread::sleep_for(10us);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-09 08:43:04 +00:00
|
|
|
ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
|
2023-06-06 14:05:24 +00:00
|
|
|
|
|
|
|
DEBUGTRACE_ENTER;
|
|
|
|
if (_thread_can_safely_run) {
|
|
|
|
stopThread();
|
|
|
|
cerr << "*** BUG: InDataHandlers have not been all stopped, while "
|
|
|
|
"StreamMgr destructor is called. This is a misuse BUG."
|
|
|
|
<< endl;
|
|
|
|
abort();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-09 08:43:04 +00:00
|
|
|
void ThreadedInDataHandlerBase::threadFcn() {
|
2022-08-14 19:00:22 +00:00
|
|
|
|
2022-10-10 17:17:38 +00:00
|
|
|
DEBUGTRACE_ENTER;
|
2023-06-17 14:03:14 +00:00
|
|
|
_thread_running = true;
|
2022-08-14 19:00:22 +00:00
|
|
|
|
2023-06-06 14:05:24 +00:00
|
|
|
while (!_queue->empty() && _thread_can_safely_run) {
|
2022-08-14 19:00:22 +00:00
|
|
|
|
|
|
|
// Call inCallback_threaded
|
2023-06-09 08:43:04 +00:00
|
|
|
inCallback(_queue->pop());
|
2022-08-14 19:00:22 +00:00
|
|
|
}
|
|
|
|
_thread_running = false;
|
|
|
|
}
|