Incallbacks should not return anything anymore. From inheritance to composition for InDataHandler code. StreamMgr singleton only weak ptr stored, this makes sure destruction from Python is more often done. UlDAQ code back to working.
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Anne de Jong 2023-06-09 10:43:04 +02:00
parent 028bed9229
commit 21df1bc6cf
25 changed files with 322 additions and 191 deletions

View File

@ -15,12 +15,12 @@
* @brief Callback of DAQ for input data. Callback should return
* false for a stop request.
*/
using InDaqCallback = std::function<bool(const DaqData &)>;
using InDaqCallback = std::function<void(const DaqData &)>;
/**
* @brief
*/
using OutDaqCallback = std::function<bool(DaqData &)>;
using OutDaqCallback = std::function<void(DaqData &)>;
/**
* @brief Base cass for all DAQ (Data Acquisition) interfaces. A DAQ can be a

View File

@ -4,43 +4,56 @@
#include "lasp_streammgr.h"
#include <thread>
InDataHandler::InDataHandler(SmgrHandle mgr)
: _mgr(mgr)
InDataHandler::InDataHandler(SmgrHandle mgr, const InCallbackType cb,
const InResetType resetfcn)
: _mgr(mgr), inCallback(cb), reset(resetfcn)
#if LASP_DEBUG == 1
,
_main_thread_id(std::this_thread::get_id())
main_thread_id(std::this_thread::get_id())
#endif
{
DEBUGTRACE_ENTER;
#if LASP_DEBUG == 1
assert(mgr->main_thread_id == main_thread_id);
#endif
}
void InDataHandler::start() {
DEBUGTRACE_ENTER;
checkRightThread();
if (SmgrHandle handle = _mgr.lock()) {
handle->addInDataHandler(this);
#if LASP_DEBUG == 1
assert(handle->_main_thread_id == _main_thread_id);
assert(handle->main_thread_id == main_thread_id);
#endif
}
}
void InDataHandler::stop() {
DEBUGTRACE_ENTER;
checkRightThread();
#if LASP_DEBUG == 1
stopCalled = true;
#endif
if (SmgrHandle handle = _mgr.lock()) {
/* handle->removeInDataHandler(*this); */
handle->removeInDataHandler(*this);
}
}
InDataHandler::~InDataHandler() {
DEBUGTRACE_ENTER;
checkRightThread();
#if LASP_DEBUG == 1
if (!stopCalled) {
std::cerr << "************ BUG: Stop function not called while arriving at "
"InDataHandler's destructor. Fix this by calling "
"InDataHandler::stop() from the derived class' destructor."
"InDataHandler::stop()."
<< std::endl;
abort();
}
#endif
}
#if LASP_DEBUG == 1
void InDataHandler::checkRightThread() const {
assert(std::this_thread::get_id() == main_thread_id);
}
#endif

View File

@ -1,17 +1,29 @@
#pragma once
#include "lasp_types.h"
#include <atomic>
#include <functional>
#include <memory>
#include <thread>
#include "lasp_types.h"
class StreamMgr;
using SmgrHandle = std::shared_ptr<StreamMgr>;
class DaqData;
class Daq;
/** \addtogroup device
* @{
*/
class DaqData;
class Daq;
/**
* @brief The function definition of callbacks with incoming DAQ data
*/
using InCallbackType = std::function<void(const DaqData &)>;
/**
* @brief Function definition for the reset callback.
*/
using InResetType = std::function<void(const Daq *)>;
class InDataHandler {
protected:
@ -24,53 +36,41 @@ protected:
#endif
public:
virtual ~InDataHandler();
~InDataHandler();
const InCallbackType inCallback;
const InResetType reset;
/**
* @brief When constructed, the handler is added to the stream manager, which
* will call the handlers's inCallback() until stop() is called.
*
* @param mgr Stream manager.
* @param cb The callback that is stored, and called on new DAQ data
* @param resetfcn The callback that is stored, and called when the DAQ
* changes state.
*/
InDataHandler(SmgrHandle mgr);
InDataHandler(SmgrHandle mgr, InCallbackType cb,
InResetType resetfcn);
/**
* @brief This function is called when input data from a DAQ is available.
*
* @param daqdata Input data from DAQ
*
* @return true if no error. False to stop the stream from running.
*/
virtual bool inCallback(const DaqData &daqdata) = 0;
/**
* @brief Reset in-data handler.
*
* @param daq New DAQ configuration of inCallback(). If nullptr is given,
* it means that the stream is stopped.
*/
virtual void reset(const Daq *daq = nullptr) = 0;
/**
* @brief This function should be called from the constructor of the
* implementation of InDataHandler, that one than also implements
* `inCallback`. It will start the stream's calling of inCallback().
* @brief Adds the current InDataHandler to the list of handlers in the
* StreamMgr. After this happens, the reset() method stored in this
* object is called back. When the stream is running, right after this,
* inCallback() is called with DaqData.
*/
void start();
/**
* @brief This function should be called from the destructor of derived
* classes, to disable the calls to inCallback(), such that proper
* destruction of the object is allowed and no other threads call methods
* from the object. It removes the inCallback() from the callback list of the
* StreamMgr(). **Failing to call this function results in deadlocks, errors
* like "pure virtual function called", or other**.
* @brief Removes the currend InDataHandler from the list of handlers in the
* StreamMgr. From that point on, the object can be safely destroyed. Not
* calling stop() before destruction of this object is considered a BUG. I.e.
* a class which *uses* an InDataHandler should always call stop() in its
* destructor.
*/
void stop();
private:
#if LASP_DEBUG == 1
const std::thread::id _main_thread_id;
const std::thread::id main_thread_id;
void checkRightThread() const;
#else
void checkRightThread() const {}

View File

@ -368,11 +368,7 @@ public:
DaqData d{nFramesPerBlock, neninchannels, dtype};
d.copyInFromRaw(ptrs);
bool ret = _incallback(d);
if (!ret) {
stopWithError(se::noError);
return 1;
}
_incallback(d);
}
if (outputBuffer) {
@ -395,11 +391,8 @@ public:
}
DaqData d{nFramesPerBlock, nenoutchannels, dtype};
bool ret = _outcallback(d);
if (!ret) {
stopWithError(se::noError);
return 1;
}
_outcallback(d);
// Copy over the buffer
us j = 0;
for (auto ptr : ptrs) {
d.copyToRaw(j, ptr);

View File

@ -2,6 +2,25 @@
#include "lasp_daq.h"
#include <memory>
/** \addtogroup device
* @{
* \defgroup rtaudio RtAudio backend
* This code is used to interface with the RtAudio cross-platform audio
* interface.
*
* \addtogroup rtaudio
* @{
*/
/**
* @brief Method called from Daq::createDaq.
*
* @param devinfo Device info
* @param config DAQ Configuration settings
*
* @return Pointer to Daq instance. Throws Runtime errors on error.
*/
std::unique_ptr<Daq> createRtAudioDevice(const DeviceInfo& devinfo,
const DaqConfiguration& config);
@ -12,3 +31,5 @@ std::unique_ptr<Daq> createRtAudioDevice(const DeviceInfo& devinfo,
*/
void fillRtAudioDeviceInfo(DeviceInfoList &devinfolist);
/** @} */
/** @} */

View File

@ -15,34 +15,55 @@ using std::endl;
using rte = std::runtime_error;
/**
* @brief The main global handle to a stream, stored in a shared pointer.
* @brief The main global handle to a stream, stored in a weak pointer, if it
* does not yet exist, via StreamMgr::getInstance, a new stream mgr is created.
* It also makes sure that the stream manager is deleted once the latest handle
* to it has been destroyed (no global stuff left).
*/
std::shared_ptr<StreamMgr> _mgr;
std::weak_ptr<StreamMgr> _mgr;
std::shared_ptr<StreamMgr> StreamMgr::getInstance() {
/**
* @brief The only way to obtain a stream manager, can only be called from the
* thread that does it the first time.
*
* @return Stream manager handle
*/
SmgrHandle StreamMgr::getInstance() {
DEBUGTRACE_ENTER;
if (!_mgr) {
_mgr = std::shared_ptr<StreamMgr>(new StreamMgr());
if (!_mgr) {
auto mgr = _mgr.lock();
if (!mgr) {
mgr = SmgrHandle(new StreamMgr());
if (!mgr) {
throw rte("Fatal: could not allocate stream manager!");
}
// Update global weak pointer
_mgr = mgr;
return mgr;
}
#if LASP_DEBUG == 1
// Make sure we never ask for a new SmgrHandle from a different thread.
assert(std::this_thread::get_id() == mgr->main_thread_id);
#endif
return _mgr;
return mgr;
}
StreamMgr::StreamMgr() {
DEBUGTRACE_ENTER;
StreamMgr::StreamMgr()
#if LASP_DEBUG == 1
_main_thread_id = std::this_thread::get_id();
: main_thread_id(std::this_thread::get_id())
#endif
{
DEBUGTRACE_ENTER;
// Trigger a scan for the available devices, in the background.
rescanDAQDevices(true);
}
#if LASP_DEBUG == 1
void StreamMgr::checkRightThread() const {
assert(std::this_thread::get_id() == _main_thread_id);
assert(std::this_thread::get_id() == main_thread_id);
}
#endif
@ -65,6 +86,7 @@ void StreamMgr::rescanDAQDevices(bool background,
if (!background) {
rescanDAQDevices_impl(callback);
} else {
DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread...");
pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback);
}
}
@ -76,7 +98,7 @@ void StreamMgr::rescanDAQDevices_impl(std::function<void()> callback) {
callback();
}
}
bool StreamMgr::inCallback(const DaqData &data) {
void StreamMgr::inCallback(const DaqData &data) {
DEBUGTRACE_ENTER;
@ -107,23 +129,15 @@ bool StreamMgr::inCallback(const DaqData &data) {
}
for (auto &handler : _inDataHandlers) {
bool res = handler->inCallback(input_filtered);
if (!res) {
return false;
}
handler->inCallback(input_filtered);
}
} else {
/// No input filters
for (auto &handler : _inDataHandlers) {
bool res = handler->inCallback(data);
if (!res) {
return false;
}
handler->inCallback(data);
}
}
return true;
}
void StreamMgr::setSiggen(std::shared_ptr<Siggen> siggen) {
@ -187,7 +201,7 @@ template <typename T> bool fillData(DaqData &data, const vd &signal) {
return true;
}
bool StreamMgr::outCallback(DaqData &data) {
void StreamMgr::outCallback(DaqData &data) {
/* DEBUGTRACE_ENTER; */
@ -216,7 +230,6 @@ bool StreamMgr::outCallback(DaqData &data) {
// Set all values to 0.
std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0);
}
return true;
}
StreamMgr::~StreamMgr() {

View File

@ -183,17 +183,17 @@ class StreamMgr {
/**
* @brief Set active signal generator for output streams. Only one `Siggen'
* is active at the same time. Siggen controls its own data race protection
* using a mutex.
* using a mutex. If no Siggen is there, and an output stream is running, it
* will send a default signal of 0.
*
* @param s New Siggen pointer
*/
void setSiggen(std::shared_ptr<Siggen> s);
private:
bool inCallback(const DaqData &data);
bool outCallback(DaqData &data);
void inCallback(const DaqData &data);
void outCallback(DaqData &data);
void removeInDataHandler(InDataHandler &handler);
/**
* @brief Add an input data handler. The handler's inCallback() function is
@ -205,6 +205,12 @@ private:
*/
void addInDataHandler(InDataHandler *handler);
/**
* @brief Remove InDataHandler from the list.
*
* @param handler
*/
void removeInDataHandler(InDataHandler &handler);
/**
* @brief Do the actual rescanning.
*
@ -213,7 +219,7 @@ private:
void rescanDAQDevices_impl(std::function<void()> callback);
#if LASP_DEBUG == 1
std::thread::id _main_thread_id;
const std::thread::id main_thread_id;
void checkRightThread() const;
#else
void checkRightThread() const {}

View File

@ -7,6 +7,12 @@
#include "lasp_uldaq_impl.h"
#include <uldaq.h>
/**
* @brief The maximum number of devices that can be enumerated when calling
* ulGetDaqDeviceInventory()
*/
const us MAX_ULDAQ_DEV_COUNT_PER_API = 100;
void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) {
DEBUGTRACE_ENTER;

View File

@ -1,18 +1,24 @@
#pragma once
#include "lasp_daq.h"
/**
* @brief The maximum number of devices that can be enumerated when calling
* ulGetDaqDeviceInventory()
/** \addtogroup device
* \defgroup uldaq UlDAQ specific code
* This code is used to interface with UlDAQ compatible devices. It is only
* tested on Linux.
* @{
* \addtogroup uldaq
* @{
*/
const us MAX_ULDAQ_DEV_COUNT_PER_API = 100;
std::unique_ptr<Daq> createUlDaqDevice(const DeviceInfo &devinfo,
const DaqConfiguration &config);
/**
* @brief Fill device info list with UlDaq specific devices, if any.
* @brief Append device info list with UlDaq specific devices, if any.
*
* @param devinfolist Info list to append to.
*/
void fillUlDaqDeviceInfo(DeviceInfoList& devinfolist);
/** @} */
/** @} */

View File

@ -134,13 +134,13 @@ bool InBufHandler::operator()() {
if (transferStatus.currentIndex < (long long)buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
ret = runCallback(nchannels * nFramesPerBlock);
runCallback(nchannels * nFramesPerBlock);
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
ret = runCallback(0);
runCallback(0);
topenqueued = true;
}
}
@ -214,8 +214,8 @@ bool OutBufHandler::operator()() {
if (!botenqueued) {
DaqData d(nFramesPerBlock, 1,// Only one output channel
dtype_descr.dtype);
// Receive data
res = cb(d);
// Receive data, run callback
cb(d);
d.copyToRaw(0, reinterpret_cast<byte_t *>(&(buf[buffer_mid_idx])));
botenqueued = true;
@ -226,7 +226,7 @@ bool OutBufHandler::operator()() {
DaqData d(nFramesPerBlock, 1,// Only one output channel
dtype_descr.dtype);
// Receive
res = cb(d);
cb(d);
d.copyToRaw(0, reinterpret_cast<byte_t *>(&(buf[0])));
topenqueued = true;

View File

@ -5,11 +5,16 @@
#include "lasp_uldaq_common.h"
class DT9837A;
/** \addtogroup device
* @{
* \addtogroup uldaq
*/
/**
* @brief Helper class for managing input and output samples of the DAQ device.
*/
class DT9837A;
class BufHandler {
protected:
/**
@ -28,6 +33,9 @@ protected:
* @brief Sampling frequency in Hz
*/
double samplerate;
/**
* @brief Storage capacity for the DAQ I/O.
*/
std::vector<double> buf;
/**
* @brief Whether the top part of the buffer is enqueued
@ -97,3 +105,5 @@ public:
~OutBufHandler();
};
/** @} */
/** @} */

View File

@ -3,6 +3,10 @@
#include <string>
#include "lasp_deviceinfo.h"
/** \addtogroup device
* @{
* \addtogroup uldaq
*/
/**
* @brief Throws an appropriate stream exception based on the UlError number.
* The mapping is based on the error numbers as given in uldaq.h. There are a
@ -58,3 +62,5 @@ const std::vector<d> ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000,
47250, 48000, 50000, 50400, 51000};
/** @} */
/** @} */

View File

@ -91,15 +91,15 @@ void DT9837A::stop() {
StreamStatus status = _streamStatus;
status.isRunning = true;
_streamStatus = status;
/* if (!isRunning()) { */
/* throw rte("No data acquisition running"); */
/* } */
if (!isRunning()) {
throw rte("No data acquisition running");
}
// Stop the thread and join it
/* _stopThread = true; */
/* assert(_thread.joinable()); */
/* _thread.join(); */
/* _stopThread = false; */
_stopThread = true;
assert(_thread.joinable());
_thread.join();
_stopThread = false;
// Update stream status
status.isRunning = false;
@ -120,8 +120,8 @@ void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) {
throw rte("DAQ requires a callback for output data");
}
assert(neninchannels() + nenoutchannels() > 0);
/* _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback);
*/
_thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback);
}
void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) {

View File

@ -18,7 +18,14 @@ using rte = std::runtime_error;
class InBufHandler;
class OutBufHandler;
/** \addtogroup device
* @{
* \addtogroup uldaq
*/
/**
* @brief Data translation DT9837A Daq device.
*/
class DT9837A : public Daq {
DaqDeviceHandle _handle = 0;
@ -71,6 +78,11 @@ public:
virtual ~DT9837A();
/**
* @brief Returns true when the stream is running
*
* @return as above stated
*/
bool isRunning() const;
/**
@ -94,3 +106,5 @@ public:
}
};
/** @} */
/** @} */

View File

@ -18,7 +18,7 @@ ClipHandler::ClipHandler(SmgrHandle mgr)
startThread();
}
bool ClipHandler::inCallback_threaded(const DaqData &d) {
void ClipHandler::inCallback(const DaqData &d) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
@ -52,7 +52,6 @@ bool ClipHandler::inCallback_threaded(const DaqData &d) {
_clip_time(i) += _dt;
}
}
return true;
}
arma::uvec ClipHandler::getCurrentValue() const {

View File

@ -21,7 +21,7 @@
/**
* @brief Clipping detector (Clip). Detects when a signal overdrives the input
* */
class ClipHandler: public ThreadedInDataHandler {
class ClipHandler: public ThreadedInDataHandler<ClipHandler> {
/**
* @brief Assuming full scale of a signal is +/- 1.0. If a value is found
@ -68,8 +68,8 @@ class ClipHandler: public ThreadedInDataHandler {
*/
arma::uvec getCurrentValue() const;
bool inCallback_threaded(const DaqData& ) override final;
void reset(const Daq*) override final;
void inCallback(const DaqData& );
void reset(const Daq*);
};

View File

@ -12,13 +12,13 @@ using Lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
PPMHandler::PPMHandler(SmgrHandle mgr, const d decay_dBps)
: ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) {
: ThreadedInDataHandler<PPMHandler>(mgr), _decay_dBps(decay_dBps) {
DEBUGTRACE_ENTER;
startThread();
}
bool PPMHandler::inCallback_threaded(const DaqData &d) {
void PPMHandler::inCallback(const DaqData &d) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
@ -64,12 +64,11 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) {
_cur_max(i) *= _alpha;
}
}
return true;
}
std::tuple<vd, arma::uvec> PPMHandler::getCurrentValue() const {
DEBUGTRACE_ENTER;
/* DEBUGTRACE_ENTER; */
Lck lck(_mtx);
arma::uvec clips(_clip_time.size(), arma::fill::zeros);
@ -85,9 +84,11 @@ void PPMHandler::reset(const Daq *daq) {
if (daq) {
DEBUGTRACE_PRINT("New daq found");
_cur_max.fill(1e-80);
const us nchannels = daq->neninchannels();
DEBUGTRACE_PRINT(nchannels);
_max_range.resize(nchannels);
dvec ranges = daq->inputRangeForEnabledChannels();

View File

@ -4,7 +4,6 @@
//
// Description: Peak Programme Meter
#pragma once
#include <memory>
#include "lasp_filter.h"
#include "lasp_mathtypes.h"
#include "lasp_threadedindatahandler.h"
@ -23,7 +22,7 @@
* with a certain amount of dB/s. If a new peak is found, it goes up again.
* Also detects clipping.
* */
class PPMHandler: public ThreadedInDataHandler {
class PPMHandler : public ThreadedInDataHandler<PPMHandler> {
/**
* @brief Assuming full scale of a signal is +/- 1.0. If a value is found
@ -69,7 +68,7 @@ class PPMHandler: public ThreadedInDataHandler {
/**
* @brief Constructs Peak Programme Meter
*
* @param mgr Stream Mgr to operate on
* @param mgr Stream Mgr to install callbacks for
* @param decay_dBps The level decay in units dB/s, after a peak has been
* hit.
*/
@ -91,8 +90,8 @@ class PPMHandler: public ThreadedInDataHandler {
*
* @return true when stream should continue.
*/
bool inCallback_threaded(const DaqData& d) override final;
void reset(const Daq*) override final;
void inCallback(const DaqData& d);
void reset(const Daq*);
};

View File

@ -25,7 +25,7 @@ RtAps::RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter,
RtAps::~RtAps() {
stopThread();
}
bool RtAps::inCallback_threaded(const DaqData &data) {
void RtAps::inCallback(const DaqData &data) {
DEBUGTRACE_ENTER;
@ -35,7 +35,7 @@ bool RtAps::inCallback_threaded(const DaqData &data) {
const us nchannels = fltdata.n_cols;
if(nchannels != _sens.size()) {
cerr << "**** Error: sensitivity size does not match! *****" << endl;
return false;
return;
}
fltdata.each_row() %= _sens.as_row();
@ -63,7 +63,6 @@ bool RtAps::inCallback_threaded(const DaqData &data) {
_ps.compute(fltdata);
return true;
}
void RtAps::reset(const Daq *daq) {

View File

@ -23,7 +23,7 @@
* @brief Real time spectral estimator using Welch method of spectral
* estimation.
*/
class RtAps : public ThreadedInDataHandler {
class RtAps : public ThreadedInDataHandler<RtAps> {
std::unique_ptr<Filter> _filterPrototype;
std::vector<std::unique_ptr<Filter>> _freqWeightingFilters;
@ -69,8 +69,8 @@ public:
*
* @return true if stream should continue.
*/
bool inCallback_threaded(const DaqData & d) override final;
void reset(const Daq *) override final;
void inCallback(const DaqData & d);
void reset(const Daq *);
};
/** @} */

View File

@ -27,7 +27,7 @@ RtSignalViewer::RtSignalViewer(SmgrHandle mgr, const d approx_time_hist,
startThread();
}
bool RtSignalViewer::inCallback_threaded(const DaqData &data) {
void RtSignalViewer::inCallback(const DaqData &data) {
DEBUGTRACE_ENTER;
@ -52,8 +52,6 @@ bool RtSignalViewer::inCallback_threaded(const DaqData &data) {
_dat(_resolution-1, 1) = newmin;
_dat(_resolution-1, 2) = newmax;
}
return true;
}
RtSignalViewer::~RtSignalViewer() {

View File

@ -24,7 +24,7 @@
* @brief Real time signal viewer. Shows envelope of the signal based on amount
* of history shown.
*/
class RtSignalViewer : public ThreadedInDataHandler {
class RtSignalViewer : public ThreadedInDataHandler<RtSignalViewer> {
/**
* @brief Storage for sensitivity values
@ -85,8 +85,8 @@ public:
*/
dmat getCurrentValue() const;
bool inCallback_threaded(const DaqData &) override final;
void reset(const Daq *) override final;
void inCallback(const DaqData &);
void reset(const Daq *);
};
/** @} */

View File

@ -13,6 +13,7 @@ using lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
using std::cerr;
using std::endl;
using std::placeholders::_1;
class SafeQueue {
std::queue<DaqData> _queue;
@ -49,47 +50,50 @@ public:
bool empty() const { return _contents == 0; }
};
ThreadedInDataHandler::ThreadedInDataHandler(SmgrHandle mgr)
: InDataHandler(mgr), _queue(std::make_unique<SafeQueue>()) {
ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr,
InCallbackType cb,
InResetType reset)
: _indatahandler(
mgr,
std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this,
_1),
reset),
_queue(std::make_unique<SafeQueue>()), inCallback(cb) {
DEBUGTRACE_ENTER;
// Initialize thread pool, if not already done
getPool();
}
void ThreadedInDataHandler::startThread() {
void ThreadedInDataHandlerBase::startThread() {
DEBUGTRACE_ENTER;
_thread_can_safely_run = true;
start();
_indatahandler.start();
}
bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) {
void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
const DaqData &daqdata) {
DEBUGTRACE_ENTER;
std::scoped_lock lck(_mtx);
// Early return in case object is under DESTRUCTION
if (!_thread_can_safely_run)
return true;
if (!_lastCallbackResult) {
return false;
}
return;
_queue->push(daqdata);
if (!_thread_running && _lastCallbackResult) {
if (!_thread_running) {
auto &pool = getPool();
DEBUGTRACE_PRINT("Pushing new thread in pool");
_thread_running = true;
pool.push_task(&ThreadedInDataHandler::threadFcn, this);
pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this);
}
return _lastCallbackResult;
}
void ThreadedInDataHandler::stopThread() {
void ThreadedInDataHandlerBase::stopThread() {
DEBUGTRACE_ENTER;
// Make sure inCallback is no longer called
_thread_can_safely_run = false;
stop();
_indatahandler.stop();
std::scoped_lock lck(_mtx);
@ -99,7 +103,7 @@ void ThreadedInDataHandler::stopThread() {
}
}
ThreadedInDataHandler::~ThreadedInDataHandler() {
ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
DEBUGTRACE_ENTER;
if (_thread_can_safely_run) {
@ -111,18 +115,14 @@ ThreadedInDataHandler::~ThreadedInDataHandler() {
}
}
void ThreadedInDataHandler::threadFcn() {
void ThreadedInDataHandlerBase::threadFcn() {
DEBUGTRACE_ENTER;
while (!_queue->empty() && _thread_can_safely_run) {
// Call inCallback_threaded
if (!inCallback_threaded(_queue->pop())) {
cerr << "*********** Callback result returned false! *************"
<< endl;
_lastCallbackResult = false;
}
inCallback(_queue->pop());
}
_thread_running = false;
}

View File

@ -1,9 +1,11 @@
#pragma once
#include "debugtrace.hpp"
#include "lasp_indatahandler.h"
#include <atomic>
#include <memory>
#include <mutex>
using std::placeholders::_1;
const us RINGBUFFER_SIZE = 1024;
/**
@ -16,23 +18,45 @@ const us RINGBUFFER_SIZE = 1024;
class SafeQueue;
/**
* @brief Threaded in data handler. Buffers inCallback data and calls a
* callback with the same signature on a different thread.
* @brief Threaded in data handler base. Buffers inCallback data and calls a
* callback with the same signature on a different thread. The main function of
* this is to offload the thread that handles the stream, such that expensive
* computations do not result in stream buffer xruns.
*/
class ThreadedInDataHandler : protected InDataHandler {
class ThreadedInDataHandlerBase {
/**
* @brief The queue used to push elements to the handling thread.
*/
InDataHandler _indatahandler;
std::unique_ptr<SafeQueue> _queue;
mutable std::recursive_mutex _mtx;
std::atomic<bool> _thread_running{false};
std::atomic<bool> _lastCallbackResult{true};
std::atomic<bool> _thread_can_safely_run{false};
/**
* @brief Function pointer that is called when new DaqData arrives.
*/
const InCallbackType inCallback;
void threadFcn();
protected:
/**
* @brief Pushes a copy of the daqdata to the thread queue and returns.
* Adds a thread to handle the queue, whihc will call inCallback();
*
* @param daqdata the daq info to push
*
* @return true, to continue with sampling.
*/
void _inCallbackFromInDataHandler(const DaqData &daqdata);
public:
ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset);
~ThreadedInDataHandlerBase();
/**
* @brief This method should be called from the derived class' constructor,
* to start the thread and data is incoming.
@ -46,33 +70,44 @@ protected:
*/
void stopThread();
public:
/**
* @brief Initialize a ThreadedInDataHandler
*
* @param mgr StreamMgr singleton reference
*/
ThreadedInDataHandler(SmgrHandle mgr);
virtual ~ThreadedInDataHandler();
};
/**
* @brief Pushes a copy of the daqdata to the thread queue and returns
*
* @param daqdata the daq info to push
*
* @return true, to continue with sampling.
*/
virtual bool inCallback(const DaqData &daqdata) override final;
/**
* @brief A bit of curiously recurring template pattern, to connect the
* specific handlers and connect the proper callbacks in a type-agnostic way.
* Using this class, each threaded handler should just implement its reset()
* and inCallback() method. Ellides the virtual method calls.
*
* Usage: class XHandler: public ThreadedInDataHandler<XHandler> {
* public:
* XHandler(streammgr) : ThreadedInDataHandler(streammgr) {}
* void inCallback(const DaqData& d) { ... do something with d }
* void reset(const Daq* daq) { ... do something with daq }
* };
*
* For examples, see PPMHandler, etc.
*
* @tparam Derived The
*/
template <typename Derived>
class ThreadedInDataHandler : public ThreadedInDataHandlerBase {
public:
ThreadedInDataHandler(SmgrHandle mgr):
ThreadedInDataHandlerBase(mgr,
std::bind(&ThreadedInDataHandler::_inCallback, this, _1),
std::bind(&ThreadedInDataHandler::_reset, this, _1))
{
/**
* @brief This function should be overridden with an actual implementation,
* of what should happen on a different thread.
*
* @param d Input daq data
*
* @return true on succes. False when an error occured.
*/
virtual bool inCallback_threaded(const DaqData &d) = 0;
}
void _reset(const Daq* daq) {
DEBUGTRACE_ENTER;
return static_cast<Derived*>(this)->reset(daq);
}
void _inCallback(const DaqData& data) {
DEBUGTRACE_ENTER;
return static_cast<Derived*>(this)->inCallback(data);
}
};
/** @} */

View File

@ -96,17 +96,26 @@ py::array_t<d> dmat_to_ndarray(const DaqData &d) {
}
/**
* @brief Wraps the InDataHandler such that it calls a Python callback with a
* buffer of sample data. The Python callback is called from a different
* thread, using a Numpy array as argument.
* @brief Wraps the ThreadedInDataHandler such that it calls a Python callback with a
* buffer of sample data. Converts DaqData objects to Numpy arrays and calls
* Python given as argument to the constructor
*/
class PyIndataHandler : public ThreadedInDataHandler {
class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
/**
* @brief The callback functions that is called.
*/
py::function cb, reset_callback;
public:
/**
* @brief Initialize PyIndataHandler
*
* @param mgr StreamMgr handle
* @param cb Python callback that is called with Numpy input data from device
* @param reset_callback Python callback that is called with a Daq pointer.
* Careful: do not store this handle, as it is only valid as long as reset()
* is called, when a stream stops, this pointer / handle will dangle.
*/
PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback)
: ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) {
@ -119,7 +128,12 @@ public:
DEBUGTRACE_ENTER;
stopThread();
}
void reset(const Daq *daq) override final {
/**
* @brief Calls the reset callback in Python.
*
* @param daq Daq device, or nullptr in case no input stream is running.
*/
void reset(const Daq *daq) {
DEBUGTRACE_ENTER;
py::gil_scoped_acquire acquire;
try {
@ -140,9 +154,10 @@ public:
}
/**
* @brief Reads from the buffer
* @brief Calls the Python callback method / function with a Numpy array of
* stream data.
*/
bool inCallback_threaded(const DaqData &d) override final {
void inCallback(const DaqData &d) {
/* DEBUGTRACE_ENTER; */
@ -172,18 +187,15 @@ public:
} // End of switch
bool res = bool_val.cast<bool>();
if (!res)
return false;
} catch (py::error_already_set &e) {
cerr << "ERROR: Python raised exception from callback function: ";
cerr << e.what() << endl;
return false;
abort();
} catch (py::cast_error &e) {
cerr << e.what() << endl;
cerr << "ERROR: Python callback does not return boolean value." << endl;
return false;
abort();
}
return true;
}
};