lasp/cpp_src/device/lasp_streammgr.cpp
J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F 83c7aa6ade
Some checks failed
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 2m1s
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped
More subtle locking and unlocking of mutexes in stopstream
2024-03-14 08:25:47 +01:00

523 lines
15 KiB
C++

// #define DEBUGTRACE_ENABLED
#include "lasp_streammgr.h"
#include <assert.h>
#include <algorithm>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include "debugtrace.hpp"
#include "lasp_biquadbank.h"
#include "lasp_indatahandler.h"
#include "lasp_thread.h"
using namespace std::literals::chrono_literals;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
/**
* @brief The main global handle to a stream, stored in a weak pointer, if it
* does not yet exist, via StreamMgr::getInstance, a new stream mgr is created.
* It also makes sure that the stream manager is deleted once the latest handle
* to it has been destroyed (no global stuff left).
*/
std::weak_ptr<StreamMgr> _mgr;
std::mutex _mgr_mutex;
using Lck = std::scoped_lock<std::recursive_mutex>;
/**
* @brief The only way to obtain a stream manager, can only be called from the
* thread that does it the first time.
*
* @return Stream manager handle
*/
SmgrHandle StreamMgr::getInstance() {
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mgr_mutex);
auto mgr = _mgr.lock();
if (!mgr) {
// Double Check Locking Pattern, if two threads would simultaneously
// instantiate the singleton instance.
auto mgr = _mgr.lock();
if (mgr) {
return mgr;
}
mgr = SmgrHandle(new StreamMgr());
if (!mgr) {
throw rte("Fatal: could not allocate stream manager!");
}
// Update global weak pointer
_mgr = mgr;
return mgr;
}
#if LASP_DEBUG == 1
// Make sure we never ask for a new SmgrHandle from a different thread.
assert(std::this_thread::get_id() == mgr->main_thread_id);
#endif
return mgr;
}
StreamMgr::StreamMgr()
#if LASP_DEBUG == 1
: main_thread_id(std::this_thread::get_id())
#endif
{
DEBUGTRACE_ENTER;
// Trigger a scan for the available devices, in the background.
rescanDAQDevices(false);
}
#if LASP_DEBUG == 1
void StreamMgr::checkRightThread() const {
assert(std::this_thread::get_id() == main_thread_id);
}
#endif
void StreamMgr::rescanDAQDevices(bool background,
std::function<void()> callback) {
DEBUGTRACE_ENTER;
DEBUGTRACE_PRINT(background);
if (_scanningDevices) {
throw rte("A background device scan is already busy");
}
Lck lck(_mtx);
checkRightThread();
if (_inputStream || _outputStream) {
throw rte("Rescanning DAQ devices only possible when no stream is running");
}
_devices.clear();
if (!background) {
_scanningDevices = true;
rescanDAQDevices_impl(callback);
} else {
DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread...");
_scanningDevices = true;
_pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback);
}
}
void StreamMgr::rescanDAQDevices_impl(std::function<void()> callback) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
_devices = DeviceInfo::getDeviceInfo();
if (callback) {
callback();
}
_scanningDevices = false;
}
void StreamMgr::inCallback(const DaqData &data) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
assert(_inputFilters.size() == data.nchannels);
if (std::count_if(_inputFilters.cbegin(), _inputFilters.cend(),
[](const auto &a) { return bool(a); }) > 0) {
/// Found a filter in vector of input filters. So we have to apply the
/// filters to each channel.
DaqData input_filtered(data.nframes, data.nchannels, data.dtype);
for (us ch = 0; ch < data.nchannels; ch++) {
if (_inputFilters[ch]) {
DEBUGTRACE_PRINT("Filter ch:");
DEBUGTRACE_PRINT(ch);
vd inout = data.toFloat(ch);
_inputFilters[ch]->filter(inout);
input_filtered.fromFloat(ch, inout);
} else {
DEBUGTRACE_PRINT("No filter ch:");
DEBUGTRACE_PRINT(ch);
input_filtered.copyInFromRaw(ch, data.raw_ptr(0, ch));
}
}
DEBUGTRACE_PRINT("Calling incallback for handlers (filtered)...");
for (auto &handler : _inDataHandlers) {
handler->inCallback(input_filtered);
}
} else {
/// No input filters
DEBUGTRACE_PRINT("Calling incallback for handlers...");
for (auto &handler : _inDataHandlers) {
handler->inCallback(data);
}
}
}
void StreamMgr::setSiggen(std::shared_ptr<Siggen> siggen) {
DEBUGTRACE_ENTER;
checkRightThread();
Lck lck(_mtx);
// If not set to nullptr, and a stream is running, we update the signal
// generator by resetting it.
if (isStreamRunningOK(StreamType::output) && siggen) {
const Daq *daq = getDaq(StreamType::output);
assert(daq != nullptr);
// Reset the signal generator.
siggen->reset(daq->samplerate());
}
_siggen = siggen;
}
#define DEBUG_FILLDATA 0
/**
* @brief Converts from double precision floating point to output signal in
* non-interleaving format.
*
* @tparam T
* @param data
* @param signal
*
* @return
*/
template <typename T>
bool fillData(DaqData &data, const vd &signal) {
/* DEBUGTRACE_ENTER; */
assert(data.nframes == signal.size());
T *res = reinterpret_cast<T *>(data.raw_ptr());
if (std::is_floating_point<T>()) {
for (us ch = 0; ch < data.nchannels; ch++) {
for (us frame = 0; frame < data.nframes; frame++) {
#if DEBUG_FILLDATA == 1
DEBUGTRACE_PRINT("SLOW flt");
data.setSlow(frame, ch,
reinterpret_cast<const int8_t *>(&signal[frame]));
#else
res[ch * data.nframes + frame] = signal[frame];
#endif
}
}
} else {
for (us ch = 0; ch < data.nchannels; ch++) {
for (us frame = 0; frame < data.nframes; frame++) {
const T val = (signal[frame] * std::numeric_limits<T>::max());
#if DEBUG_FILLDATA == 1
data.setSlow(frame, ch, reinterpret_cast<const int8_t *>(&val));
#else
res[ch * data.nframes + frame] = val;
#endif
}
}
}
return true;
}
void StreamMgr::outCallback(DaqData &data) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
if (_siggen) {
vd signal = _siggen->genSignal(data.nframes);
switch (data.dtype) {
case (DataTypeDescriptor::DataType::dtype_fl32):
fillData<float>(data, signal);
break;
case (DataTypeDescriptor::DataType::dtype_fl64):
fillData<double>(data, signal);
break;
case (DataTypeDescriptor::DataType::dtype_int8):
fillData<int8_t>(data, signal);
break;
case (DataTypeDescriptor::DataType::dtype_int16):
fillData<int16_t>(data, signal);
break;
case (DataTypeDescriptor::DataType::dtype_int32):
fillData<int32_t>(data, signal);
break;
}
} else {
// Set all values to 0.
std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0);
}
}
StreamMgr::~StreamMgr() {
DEBUGTRACE_ENTER;
while (_scanningDevices) {
std::this_thread::sleep_for(10us);
}
#if LASP_DEBUG == 1
{ // Careful, this lock needs to be released to make sure the streams can
// obtain a lock to the stream manager.
Lck lck(_mtx);
checkRightThread();
}
#endif
// Stream manager now handled by shared pointer. Each indata handler gets a
// shared pointer to the stream manager, and stores a weak pointer to it.
// Hence, we do not have to do any cleanup here. It also makes sure that the
// order in which destructors are called does not matter anymore. As soon as
// the stream manager is destructed, the weak pointers loose there ref, and do
// not have to removeInDataHandler() anymore.
// Stop the streams in this phase, otherwise it might happen during the
// destruction of the Siggen, in which case we might get calls to pure
// virtual methods. This was really a bug.
_inputStream.reset();
_outputStream.reset();
}
void StreamMgr::stopAllStreams() {
DEBUGTRACE_ENTER;
{
Lck lck(_mtx);
checkRightThread();
}
// No lock here!
_inputStream.reset();
_outputStream.reset();
}
void StreamMgr::startStream(const DaqConfiguration &config) {
DEBUGTRACE_ENTER;
if (_scanningDevices) {
throw rte("DAQ device scan is busy. Cannot start stream.");
}
Lck lck(_mtx);
checkRightThread();
bool isInput = std::count_if(config.inchannel_config.cbegin(),
config.inchannel_config.cend(),
[](auto &i) { return i.enabled; }) > 0;
bool isOutput = std::count_if(config.outchannel_config.cbegin(),
config.outchannel_config.cend(),
[](auto &i) { return i.enabled; }) > 0;
// Find the first device that matches with the configuration
DeviceInfo *devinfo = nullptr;
// Match configuration to a device in the list of devices
for (auto &devinfoi : _devices) {
if (config.match(*devinfoi)) {
devinfo = devinfoi.get();
break;
}
}
if (devinfo == nullptr) {
throw rte("Could not find a device with name " + config.device_name +
" in list of devices.");
}
isInput |= (config.monitorOutput && devinfo->hasInternalOutputMonitor);
DEBUGTRACE_PRINT(isInput);
bool isDuplex = isInput && isOutput;
if (!isInput && !isOutput) {
throw rte(
"Attempted stream start failed, stream does not have any enabled "
"channels. Please first enable channels in the channel configuration.");
}
if (isInput && _inputStream) {
throw rte(
"Error: an input stream is already running. Please "
"first stop existing stream");
} else if (isOutput && _outputStream) {
throw rte(
"Error: output stream is already running. Please "
"first stop existing stream");
} else if (_inputStream) {
if (_inputStream->duplexMode() && isOutput) {
throw rte(
"Error: output stream is already running (in duplex mode). "
"Please "
"first stop existing stream");
}
}
if (_outputStream && isInput && _outputStream->duplexModeForced &&
config.match(*_outputStream)) {
throw rte(
"This device is already opened for output. If input is also "
"required, please enable duplex mode for this device");
}
if (_inputStream && isOutput && _inputStream->duplexModeForced &&
config.match(*_inputStream)) {
throw rte(
"This device is already opened for input. If output is also "
"required, please enable duplex mode for this device");
}
InDaqCallback inCallback;
OutDaqCallback outCallback;
using namespace std::placeholders;
std::unique_ptr<Daq> daq = Daq::createDaq(*devinfo, config);
assert(daq);
if (isInput) {
/// Give incallback as parameter to stream
inCallback = std::bind(&StreamMgr::inCallback, this, _1);
/// Reset handlers in case of an input stream
for (auto &handler : _inDataHandlers) {
handler->reset(daq.get());
}
d fs = daq->samplerate();
/// Create input filters
_inputFilters.clear();
/// No input filter for monitor channel, which comes as the first input
/// channel In the list
if (config.monitorOutput && devinfo->hasInternalOutputMonitor) {
_inputFilters.push_back(nullptr);
}
for (auto &ch : daq->inchannel_config) {
if (ch.enabled) {
if (ch.digitalHighPassCutOn < 0) {
_inputFilters.push_back(nullptr);
} else if (ch.digitalHighPassCutOn == 0) {
throw rte("Digital highpass cuton should be > 0 if activated");
} else {
// Put in a digital high-pass filter.
_inputFilters.emplace_back(std::make_unique<SeriesBiquad>(
SeriesBiquad::firstOrderHighPass(fs, ch.digitalHighPassCutOn)));
}
}
} // End of input filter creation
}
if (isOutput) {
/// Give outcallback as parameter to stream
outCallback = std::bind(&StreamMgr::outCallback, this, _1);
/// Reset signal generator in case of an output stream
if (_siggen) {
DEBUGTRACE_PRINT("Resetting _siggen with new samplerate of ");
DEBUGTRACE_PRINT(daq->samplerate());
_siggen->reset(daq->samplerate());
}
}
/// Start the DAQ. If it fails, everything is still nicely cleaned up and
/// the daq unique_ptr cleans up resources nicely.
daq->start(inCallback, outCallback);
// Move daq ptr to right place
if (isInput) {
_inputStream = std::move(daq);
} else {
_outputStream = std::move(daq);
}
}
void StreamMgr::stopStream(const StreamType t) {
DEBUGTRACE_ENTER;
checkRightThread();
bool resetHandlers = false;
std::unique_ptr<Daq> *streamToStop = nullptr;
{ // Mutex locked in this scope
Lck lck(_mtx);
if (t == StreamType::input) {
if (!_inputStream) {
throw rte("Input stream is not running");
}
streamToStop = std::addressof(_inputStream);
resetHandlers = true;
} else {
/// t == output
/// Kill input stream in case that one is a duplex stream
if (_inputStream && _inputStream->duplexMode()) {
streamToStop = std::addressof(_inputStream);
} else {
if (!_outputStream) {
throw rte("Output stream is not running");
}
streamToStop = std::addressof(_outputStream);
} // end else
}
} // End of mutex lock. When stopping stream, mutex should be unlocked.
// If we arrive here, we should have a stream to stop.
assert(streamToStop != nullptr);
streamToStop->reset();
/// Send reset to all in data handlers
if (resetHandlers) {
Lck lck(_mtx);
for (auto &handler : _inDataHandlers) {
handler->reset(nullptr);
}
}
}
void StreamMgr::addInDataHandler(InDataHandler *handler) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
checkRightThread();
assert(handler);
handler->reset(_inputStream.get());
if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) !=
_inDataHandlers.cend()) {
throw std::runtime_error(
"Error: handler already added. Probably start() "
"is called more than once on a handler object");
}
_inDataHandlers.push_back(handler);
DEBUGTRACE_PRINT(_inDataHandlers.size());
}
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
// checkRightThread();
_inDataHandlers.remove(&handler);
DEBUGTRACE_PRINT(_inDataHandlers.size());
}
Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
checkRightThread();
// Default constructor, says stream is not running, but also no errors
const Daq *daq = getDaq(type);
if (daq) {
return daq->getStreamStatus();
} else {
return Daq::StreamStatus();
}
}
const Daq *StreamMgr::getDaq(StreamType type) const {
Lck lck(_mtx);
checkRightThread();
if (type == StreamType::input) {
return _inputStream.get();
} else {
// Output stream. If input runs in duplex mode, this is also the output
// stream. In that case, we return the input stream
if (_inputStream && _inputStream->duplexMode()) {
return _inputStream.get();
} else {
return _outputStream.get();
}
}
}