#pragma once #include "debugtrace.hpp" #include "lasp_indatahandler.h" #include "lasp_thread.h" #include #include #include using std::placeholders::_1; const us RINGBUFFER_SIZE = 1024; /** * \addtogroup dsp * @{ * * \defgroup rt Real time signal handlers * @{ */ class SafeQueue; /** * @brief Threaded in data handler base. Buffers inCallback data and calls a * callback with the same signature on a different thread. The main function of * this is to offload the thread that handles the stream, such that expensive * computations do not result in stream buffer xruns. */ class ThreadedInDataHandlerBase { /** * @brief The queue used to push elements to the handling thread. */ InDataHandler _indatahandler; std::unique_ptr _queue; mutable std::recursive_mutex _mtx; std::atomic _thread_running{false}; std::atomic _thread_can_safely_run{false}; GlobalThreadPool _pool; /** * @brief Function pointer that is called when new DaqData arrives. */ const InCallbackType inCallback; void threadFcn(); /** * @brief Pushes a copy of the daqdata to the thread queue and returns. * Adds a thread to handle the queue, whihc will call inCallback(); * * @param daqdata the daq info to push * * @return true, to continue with sampling. */ void _inCallbackFromInDataHandler(const DaqData &daqdata); public: ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset); ~ThreadedInDataHandlerBase(); /** * @brief This method should be called from the derived class' constructor, * to start the thread and data is incoming. */ void startThread(); /** * @brief This method SHOULD be called from all classes that derive on * ThreadedInDataHandler. It is to make sure the inCallback_threaded() * function is no longer called when the destructor of the derived class is * called. Not calling this function is regarded as a BUG. */ void stopThread(); }; /** * @brief A bit of curiously recurring template pattern, to connect the * specific handlers and connect the proper callbacks in a type-agnostic way. * Using this class, each threaded handler should just implement its reset() * and inCallback() method. Ellides the virtual method calls. * * Usage: class XHandler: public ThreadedInDataHandler { * public: * XHandler(streammgr) : ThreadedInDataHandler(streammgr) {} * void inCallback(const DaqData& d) { ... do something with d } * void reset(const Daq* daq) { ... do something with daq } * }; * * For examples, see PPMHandler, etc. * * @tparam Derived The */ template class ThreadedInDataHandler : public ThreadedInDataHandlerBase { public: ThreadedInDataHandler(SmgrHandle mgr): ThreadedInDataHandlerBase(mgr, std::bind(&ThreadedInDataHandler::_inCallback, this, _1), std::bind(&ThreadedInDataHandler::_reset, this, _1)) { } void _reset(const Daq* daq) { DEBUGTRACE_ENTER; return static_cast(this)->reset(daq); } void _inCallback(const DaqData& data) { DEBUGTRACE_ENTER; return static_cast(this)->inCallback(data); } }; /** @} */ /** @} */