lasp/cpp_src/device/lasp_streammgr.cpp
J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F 838a0f7cc1
All checks were successful
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Successful in -59s
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped
Silence portaudio alsa errors when querying device info AND when starting stream. Do not know whether this solves the problem of its verbosity, but at least the code is where it belongs
2024-06-26 12:17:43 +02:00

529 lines
15 KiB
C++

// #define DEBUGTRACE_ENABLED
#include "lasp_streammgr.h"
#include <assert.h>
#include <algorithm>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include "debugtrace.hpp"
#include "lasp_biquadbank.h"
#include "lasp_indatahandler.h"
#include "lasp_thread.h"
using namespace std::literals::chrono_literals;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
/**
* @brief The main global handle to a stream, stored in a weak pointer, if it
* does not yet exist, via StreamMgr::getInstance, a new stream mgr is created.
* It also makes sure that the stream manager is deleted once the latest handle
* to it has been destroyed (no global stuff left).
*/
std::weak_ptr<StreamMgr> _mgr;
std::mutex _mgr_mutex;
using Lck = std::scoped_lock<std::recursive_mutex>;
/**
* @brief The only way to obtain a stream manager, can only be called from the
* thread that does it the first time.
*
* @return Stream manager handle
*/
SmgrHandle StreamMgr::getInstance() {
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mgr_mutex);
auto mgr = _mgr.lock();
if (!mgr) {
// Double Check Locking Pattern, if two threads would simultaneously
// instantiate the singleton instance.
auto mgr = _mgr.lock();
if (mgr) {
return mgr;
}
mgr = SmgrHandle(new StreamMgr());
if (!mgr) {
throw rte("Fatal: could not allocate stream manager!");
}
// Update global weak pointer
_mgr = mgr;
return mgr;
}
#if LASP_DEBUG == 1
// Make sure we never ask for a new SmgrHandle from a different thread.
assert(std::this_thread::get_id() == mgr->main_thread_id);
#endif
return mgr;
}
StreamMgr::StreamMgr()
#if LASP_DEBUG == 1
: main_thread_id(std::this_thread::get_id())
#endif
{
DEBUGTRACE_ENTER;
// Trigger a scan for the available devices, in the background.
rescanDAQDevices(true);
}
#if LASP_DEBUG == 1
void StreamMgr::checkRightThread() const {
assert(std::this_thread::get_id() == main_thread_id);
}
#endif
void StreamMgr::rescanDAQDevices(bool background,
std::function<void()> callback) {
DEBUGTRACE_ENTER;
DEBUGTRACE_PRINT(background);
if (_scanningDevices) {
throw rte("A background device scan is already busy");
}
Lck lck(_mtx);
checkRightThread();
if (_inputStream || _outputStream) {
throw rte("Rescanning DAQ devices only possible when no stream is running");
}
_devices.clear();
if (!background) {
_scanningDevices = true;
rescanDAQDevices_impl(callback);
} else {
DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread...");
_scanningDevices = true;
_pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback);
}
}
void StreamMgr::rescanDAQDevices_impl(std::function<void()> callback) {
DEBUGTRACE_ENTER;
assert(!_inputStream && !_outputStream);
Lck lck(_mtx);
// Alsa spits out annoying messages that are not useful
{
_devices = DeviceInfo::getDeviceInfo();
}
if (callback) {
callback();
}
_scanningDevices = false;
}
void StreamMgr::inCallback(const DaqData &data) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
assert(_inputFilters.size() == data.nchannels);
if (std::count_if(_inputFilters.cbegin(), _inputFilters.cend(),
[](const auto &a) { return bool(a); }) > 0) {
/// Found a filter in vector of input filters. So we have to apply the
/// filters to each channel.
DaqData input_filtered(data.nframes, data.nchannels, data.dtype);
for (us ch = 0; ch < data.nchannels; ch++) {
if (_inputFilters[ch]) {
DEBUGTRACE_PRINT("Filter ch:");
DEBUGTRACE_PRINT(ch);
vd inout = data.toFloat(ch);
_inputFilters[ch]->filter(inout);
input_filtered.fromFloat(ch, inout);
} else {
DEBUGTRACE_PRINT("No filter ch:");
DEBUGTRACE_PRINT(ch);
input_filtered.copyInFromRaw(ch, data.raw_ptr(0, ch));
}
}
DEBUGTRACE_PRINT("Calling incallback for handlers (filtered)...");
for (auto &handler : _inDataHandlers) {
handler->inCallback(input_filtered);
}
} else {
/// No input filters
DEBUGTRACE_PRINT("Calling incallback for handlers...");
for (auto &handler : _inDataHandlers) {
handler->inCallback(data);
}
}
}
void StreamMgr::setSiggen(std::shared_ptr<Siggen> siggen) {
DEBUGTRACE_ENTER;
checkRightThread();
Lck lck(_mtx);
// If not set to nullptr, and a stream is running, we update the signal
// generator by resetting it.
if (isStreamRunningOK(StreamType::output) && siggen) {
const Daq *daq = getDaq(StreamType::output);
assert(daq != nullptr);
// Reset the signal generator.
siggen->reset(daq->samplerate());
}
_siggen = siggen;
}
#define DEBUG_FILLDATA 0
/**
* @brief Converts from double precision floating point to output signal in
* non-interleaving format.
*
* @tparam T
* @param data
* @param signal
*
* @return
*/
template <typename T>
bool fillData(DaqData &data, const vd &signal) {
/* DEBUGTRACE_ENTER; */
assert(data.nframes == signal.size());
T *res = reinterpret_cast<T *>(data.raw_ptr());
if (std::is_floating_point<T>()) {
for (us ch = 0; ch < data.nchannels; ch++) {
for (us frame = 0; frame < data.nframes; frame++) {
#if DEBUG_FILLDATA == 1
DEBUGTRACE_PRINT("SLOW flt");
data.setSlow(frame, ch,
reinterpret_cast<const int8_t *>(&signal[frame]));
#else
res[ch * data.nframes + frame] = signal[frame];
#endif
}
}
} else {
for (us ch = 0; ch < data.nchannels; ch++) {
for (us frame = 0; frame < data.nframes; frame++) {
const T val = (signal[frame] * std::numeric_limits<T>::max());
#if DEBUG_FILLDATA == 1
data.setSlow(frame, ch, reinterpret_cast<const int8_t *>(&val));
#else
res[ch * data.nframes + frame] = val;
#endif
}
}
}
return true;
}
void StreamMgr::outCallback(DaqData &data) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
if (_siggen) {
vd signal = _siggen->genSignal(data.nframes);
switch (data.dtype) {
case (DataTypeDescriptor::DataType::dtype_fl32):
fillData<float>(data, signal);
break;
case (DataTypeDescriptor::DataType::dtype_fl64):
fillData<double>(data, signal);
break;
case (DataTypeDescriptor::DataType::dtype_int8):
fillData<int8_t>(data, signal);
break;
case (DataTypeDescriptor::DataType::dtype_int16):
fillData<int16_t>(data, signal);
break;
case (DataTypeDescriptor::DataType::dtype_int32):
fillData<int32_t>(data, signal);
break;
}
} else {
// Set all values to 0.
std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0);
}
}
StreamMgr::~StreamMgr() {
DEBUGTRACE_ENTER;
while (_scanningDevices) {
std::this_thread::sleep_for(10us);
}
#if LASP_DEBUG == 1
{ // Careful, this lock needs to be released to make sure the streams can
// obtain a lock to the stream manager.
Lck lck(_mtx);
checkRightThread();
}
#endif
// Stream manager now handled by shared pointer. Each indata handler gets a
// shared pointer to the stream manager, and stores a weak pointer to it.
// Hence, we do not have to do any cleanup here. It also makes sure that the
// order in which destructors are called does not matter anymore. As soon as
// the stream manager is destructed, the weak pointers loose there ref, and do
// not have to removeInDataHandler() anymore.
// Stop the streams in this phase, otherwise it might happen during the
// destruction of the Siggen, in which case we might get calls to pure
// virtual methods. This was really a bug.
_inputStream.reset();
_outputStream.reset();
}
void StreamMgr::stopAllStreams() {
DEBUGTRACE_ENTER;
{
Lck lck(_mtx);
checkRightThread();
}
// No lock here!
_inputStream.reset();
_outputStream.reset();
}
void StreamMgr::startStream(const DaqConfiguration &config) {
DEBUGTRACE_ENTER;
if (_scanningDevices) {
throw rte("DAQ device scan is busy. Cannot start stream.");
}
Lck lck(_mtx);
checkRightThread();
bool isInput = std::count_if(config.inchannel_config.cbegin(),
config.inchannel_config.cend(),
[](auto &i) { return i.enabled; }) > 0;
bool isOutput = std::count_if(config.outchannel_config.cbegin(),
config.outchannel_config.cend(),
[](auto &i) { return i.enabled; }) > 0;
// Find the first device that matches with the configuration
DeviceInfo *devinfo = nullptr;
// Match configuration to a device in the list of devices
for (auto &devinfoi : _devices) {
if (config.match(*devinfoi)) {
devinfo = devinfoi.get();
break;
}
}
if (devinfo == nullptr) {
throw rte("Could not find a device with name " + config.device_name +
" in list of devices.");
}
isInput |= (config.monitorOutput && devinfo->hasInternalOutputMonitor);
DEBUGTRACE_PRINT(isInput);
bool isDuplex = isInput && isOutput;
if (!isInput && !isOutput) {
throw rte(
"Attempted stream start failed, stream does not have any enabled "
"channels. Please first enable channels in the channel configuration.");
}
if (isInput && _inputStream) {
throw rte(
"Error: an input stream is already running. Please "
"first stop existing stream");
} else if (isOutput && _outputStream) {
throw rte(
"Error: output stream is already running. Please "
"first stop existing stream");
} else if (_inputStream) {
if (_inputStream->duplexMode() && isOutput) {
throw rte(
"Error: output stream is already running (in duplex mode). "
"Please "
"first stop existing stream");
}
}
if (_outputStream && isInput && _outputStream->duplexModeForced &&
config.match(*_outputStream)) {
throw rte(
"This device is already opened for output. If input is also "
"required, please enable duplex mode for this device");
}
if (_inputStream && isOutput && _inputStream->duplexModeForced &&
config.match(*_inputStream)) {
throw rte(
"This device is already opened for input. If output is also "
"required, please enable duplex mode for this device");
}
InDaqCallback inCallback;
OutDaqCallback outCallback;
using namespace std::placeholders;
std::unique_ptr<Daq> daq = Daq::createDaq(*devinfo, config);
assert(daq);
if (isInput) {
/// Give incallback as parameter to stream
inCallback = std::bind(&StreamMgr::inCallback, this, _1);
/// Reset handlers in case of an input stream
for (auto &handler : _inDataHandlers) {
handler->reset(daq.get());
}
d fs = daq->samplerate();
/// Create input filters
_inputFilters.clear();
/// No input filter for monitor channel, which comes as the first input
/// channel In the list
if (config.monitorOutput && devinfo->hasInternalOutputMonitor) {
_inputFilters.push_back(nullptr);
}
for (auto &ch : daq->inchannel_config) {
if (ch.enabled) {
if (ch.digitalHighPassCutOn < 0) {
_inputFilters.push_back(nullptr);
} else if (ch.digitalHighPassCutOn == 0) {
throw rte("Digital highpass cuton should be > 0 if activated");
} else {
// Put in a digital high-pass filter.
_inputFilters.emplace_back(std::make_unique<SeriesBiquad>(
SeriesBiquad::firstOrderHighPass(fs, ch.digitalHighPassCutOn)));
}
}
} // End of input filter creation
}
if (isOutput) {
/// Give outcallback as parameter to stream
outCallback = std::bind(&StreamMgr::outCallback, this, _1);
/// Reset signal generator in case of an output stream
if (_siggen) {
DEBUGTRACE_PRINT("Resetting _siggen with new samplerate of ");
DEBUGTRACE_PRINT(daq->samplerate());
_siggen->reset(daq->samplerate());
}
}
/// Start the DAQ. If it fails, everything is still nicely cleaned up and
/// the daq unique_ptr cleans up resources nicely.
daq->start(inCallback, outCallback);
// Move daq ptr to right place
if (isInput) {
_inputStream = std::move(daq);
} else {
_outputStream = std::move(daq);
}
}
void StreamMgr::stopStream(const StreamType t) {
DEBUGTRACE_ENTER;
checkRightThread();
bool resetHandlers = false;
std::unique_ptr<Daq> *streamToStop = nullptr;
{ // Mutex locked in this scope
Lck lck(_mtx);
if (t == StreamType::input) {
if (!_inputStream) {
throw rte("Input stream is not running");
}
streamToStop = std::addressof(_inputStream);
resetHandlers = true;
} else {
/// t == output
/// Kill input stream in case that one is a duplex stream
if (_inputStream && _inputStream->duplexMode()) {
streamToStop = std::addressof(_inputStream);
} else {
if (!_outputStream) {
throw rte("Output stream is not running");
}
streamToStop = std::addressof(_outputStream);
} // end else
}
} // End of mutex lock. When stopping stream, mutex should be unlocked.
// If we arrive here, we should have a stream to stop.
assert(streamToStop != nullptr);
streamToStop->reset();
/// Send reset to all in data handlers
if (resetHandlers) {
Lck lck(_mtx);
for (auto &handler : _inDataHandlers) {
handler->reset(nullptr);
}
}
}
void StreamMgr::addInDataHandler(InDataHandler *handler) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
checkRightThread();
assert(handler);
handler->reset(_inputStream.get());
if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) !=
_inDataHandlers.cend()) {
throw std::runtime_error(
"Error: handler already added. Probably start() "
"is called more than once on a handler object");
}
_inDataHandlers.push_back(handler);
DEBUGTRACE_PRINT(_inDataHandlers.size());
}
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
// checkRightThread();
_inDataHandlers.remove(&handler);
DEBUGTRACE_PRINT(_inDataHandlers.size());
}
Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
checkRightThread();
// Default constructor, says stream is not running, but also no errors
const Daq *daq = getDaq(type);
if (daq) {
return daq->getStreamStatus();
} else {
return Daq::StreamStatus();
}
}
const Daq *StreamMgr::getDaq(StreamType type) const {
Lck lck(_mtx);
checkRightThread();
if (type == StreamType::input) {
return _inputStream.get();
} else {
// Output stream. If input runs in duplex mode, this is also the output
// stream. In that case, we return the input stream
if (_inputStream && _inputStream->duplexMode()) {
return _inputStream.get();
} else {
return _outputStream.get();
}
}
}