124 lines
3.3 KiB
C++
124 lines
3.3 KiB
C++
#pragma once
|
|
#include "debugtrace.hpp"
|
|
#include "lasp_indatahandler.h"
|
|
#include "lasp_thread.h"
|
|
#include <atomic>
|
|
#include <memory>
|
|
#include <mutex>
|
|
|
|
using std::placeholders::_1;
|
|
const us RINGBUFFER_SIZE = 1024;
|
|
|
|
/**
|
|
* \addtogroup dsp
|
|
* @{
|
|
*
|
|
* \defgroup rt Real time signal handlers
|
|
* @{
|
|
*/
|
|
|
|
class SafeQueue;
|
|
/**
|
|
* @brief Threaded in data handler base. Buffers inCallback data and calls a
|
|
* callback with the same signature on a different thread. The main function of
|
|
* this is to offload the thread that handles the stream, such that expensive
|
|
* computations do not result in stream buffer xruns.
|
|
*/
|
|
class ThreadedInDataHandlerBase {
|
|
/**
|
|
* @brief The queue used to push elements to the handling thread.
|
|
*/
|
|
|
|
std::unique_ptr<SafeQueue> _queue;
|
|
|
|
/**
|
|
* @brief Function pointer that is called when new DaqData arrives.
|
|
*/
|
|
const InCallbackType inCallback;
|
|
|
|
/**
|
|
* @brief Function pointer that is called when reset() is called.
|
|
*/
|
|
const ResetCallbackType resetCallback;
|
|
|
|
std::weak_ptr<StreamMgr> _smgr;
|
|
|
|
std::unique_ptr<InDataHandler> _indatahandler;
|
|
|
|
std::atomic<bool> _thread_running{false};
|
|
std::atomic<bool> _thread_can_safely_run{false};
|
|
|
|
GlobalThreadPool _pool;
|
|
|
|
void threadFcn();
|
|
|
|
|
|
/**
|
|
* @brief Pushes a copy of the daqdata to the thread queue and returns.
|
|
* Adds a thread to handle the queue, whihc will call inCallback();
|
|
*
|
|
* @param daqdata the daq info to push
|
|
*
|
|
* @return true, to continue with sampling.
|
|
*/
|
|
void _inCallbackFromInDataHandler(const DaqData &daqdata);
|
|
|
|
public:
|
|
ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, ResetCallbackType reset);
|
|
~ThreadedInDataHandlerBase();
|
|
/**
|
|
* @brief This method should be called from the derived class' constructor,
|
|
* to start the thread and data is incoming.
|
|
*/
|
|
void startThread();
|
|
/**
|
|
* @brief This method SHOULD be called from all classes that derive on
|
|
* ThreadedInDataHandler. It is to make sure the inCallback_threaded()
|
|
* function is no longer called when the destructor of the derived class is
|
|
* called. Not calling this function is regarded as a BUG.
|
|
*/
|
|
void stopThread();
|
|
|
|
};
|
|
|
|
/**
|
|
* @brief A bit of curiously recurring template pattern, to connect the
|
|
* specific handlers and connect the proper callbacks in a type-agnostic way.
|
|
* Using this class, each threaded handler should just implement its reset()
|
|
* and inCallback() method. Ellides the virtual method calls.
|
|
*
|
|
* Usage: class XHandler: public ThreadedInDataHandler<XHandler> {
|
|
* public:
|
|
* XHandler(streammgr) : ThreadedInDataHandler(streammgr) {}
|
|
* void inCallback(const DaqData& d) { ... do something with d }
|
|
* void reset(const Daq* daq) { ... do something with daq }
|
|
* };
|
|
*
|
|
* For examples, see PPMHandler, etc.
|
|
*
|
|
* @tparam Derived The
|
|
*/
|
|
template <typename Derived>
|
|
class ThreadedInDataHandler : public ThreadedInDataHandlerBase {
|
|
public:
|
|
ThreadedInDataHandler(SmgrHandle mgr):
|
|
ThreadedInDataHandlerBase(mgr,
|
|
std::bind(&ThreadedInDataHandler::_inCallback, this, _1),
|
|
std::bind(&ThreadedInDataHandler::_reset, this, _1))
|
|
{
|
|
|
|
}
|
|
|
|
void _reset(const Daq* daq) {
|
|
DEBUGTRACE_ENTER;
|
|
return static_cast<Derived*>(this)->reset(daq);
|
|
}
|
|
void _inCallback(const DaqData& data) {
|
|
DEBUGTRACE_ENTER;
|
|
return static_cast<Derived*>(this)->inCallback(data);
|
|
}
|
|
};
|
|
|
|
/** @} */
|
|
/** @} */
|