2022-08-14 19:00:22 +00:00
|
|
|
#define DEBUGTRACE_ENABLED
|
2022-07-29 07:32:26 +00:00
|
|
|
#include "debugtrace.hpp"
|
2022-08-14 19:00:22 +00:00
|
|
|
#include "lasp_thread.h"
|
|
|
|
#include "lasp_streammgr.h"
|
2022-07-20 12:58:48 +00:00
|
|
|
#include <algorithm>
|
|
|
|
#include <assert.h>
|
|
|
|
#include <functional>
|
2022-07-29 07:32:26 +00:00
|
|
|
#include <iostream>
|
|
|
|
|
|
|
|
using std::cerr;
|
|
|
|
using std::endl;
|
2022-06-29 10:25:32 +00:00
|
|
|
|
|
|
|
InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) {
|
2022-07-29 07:32:26 +00:00
|
|
|
DEBUGTRACE_ENTER;
|
|
|
|
}
|
|
|
|
void InDataHandler::start() {
|
|
|
|
DEBUGTRACE_ENTER;
|
|
|
|
_mgr.addInDataHandler(*this);
|
|
|
|
}
|
|
|
|
void InDataHandler::stop() {
|
|
|
|
#if LASP_DEBUG == 1
|
|
|
|
stopCalled = true;
|
|
|
|
#endif
|
|
|
|
_mgr.removeInDataHandler(*this);
|
|
|
|
}
|
|
|
|
InDataHandler::~InDataHandler() {
|
|
|
|
|
|
|
|
DEBUGTRACE_ENTER;
|
|
|
|
#if LASP_DEBUG == 1
|
|
|
|
if (!stopCalled) {
|
|
|
|
cerr << "************ BUG: Stop function not called while arriving at "
|
|
|
|
"InDataHandler's destructor. Fix this by calling "
|
|
|
|
"InDataHandler::stop() from the derived class' destructor."
|
|
|
|
<< endl;
|
|
|
|
}
|
|
|
|
#endif
|
2022-06-29 10:25:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
StreamMgr &StreamMgr::getInstance() {
|
2022-07-29 07:32:26 +00:00
|
|
|
|
|
|
|
DEBUGTRACE_ENTER;
|
2022-08-14 19:00:22 +00:00
|
|
|
getPool();
|
2022-06-29 10:25:32 +00:00
|
|
|
static StreamMgr mgr;
|
|
|
|
return mgr;
|
|
|
|
}
|
2022-07-29 07:32:26 +00:00
|
|
|
StreamMgr::StreamMgr() { DEBUGTRACE_ENTER; }
|
2022-06-29 10:25:32 +00:00
|
|
|
|
|
|
|
bool StreamMgr::inCallback(const DaqData &data) {
|
2022-07-29 07:32:26 +00:00
|
|
|
|
|
|
|
/* DEBUGTRACE_ENTER; */
|
2022-06-29 10:25:32 +00:00
|
|
|
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
|
|
|
|
|
|
|
|
for (auto &handler : _inDataHandlers) {
|
|
|
|
bool res = handler->inCallback(data);
|
|
|
|
if (!res)
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @brief Converts from double precision floating point to output signal in
|
|
|
|
* non-interleaving format.
|
|
|
|
*
|
|
|
|
* @tparam T
|
|
|
|
* @param data
|
|
|
|
* @param signal
|
|
|
|
*
|
|
|
|
* @return
|
|
|
|
*/
|
|
|
|
template <typename T> bool fillData(DaqData &data, const vd &signal) {
|
|
|
|
assert(data.nframes == signal.size());
|
|
|
|
|
2022-07-20 12:58:48 +00:00
|
|
|
T *res = reinterpret_cast<T *>(data.raw_ptr());
|
2022-06-29 10:25:32 +00:00
|
|
|
if (std::is_floating_point<T>()) {
|
|
|
|
for (us ch = 0; ch < data.nchannels; ch++) {
|
|
|
|
for (us frame = 0; frame < data.nframes; frame++) {
|
2022-07-20 12:58:48 +00:00
|
|
|
res[ch * data.nframes + frame] = signal[frame];
|
2022-06-29 10:25:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
for (us ch = 0; ch < data.nchannels; ch++) {
|
|
|
|
for (us frame = 0; frame < data.nframes; frame++) {
|
|
|
|
res[ch * data.nframes + frame] =
|
2022-07-29 07:32:26 +00:00
|
|
|
(signal[frame] * std::numeric_limits<T>::max());
|
2022-06-29 10:25:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
void StreamMgr::setSiggen(std::shared_ptr<Siggen> siggen) {
|
|
|
|
|
|
|
|
std::scoped_lock<std::mutex> lck(_siggen_mtx);
|
|
|
|
_siggen = siggen;
|
2022-07-20 12:58:48 +00:00
|
|
|
|
2022-06-29 10:25:32 +00:00
|
|
|
// If not set to nullptr, and a stream is running, we update the signal
|
|
|
|
// generator by resetting it.
|
2022-07-20 12:58:48 +00:00
|
|
|
if (isStreamRunningOK(StreamType::output) && siggen) {
|
|
|
|
const Daq *daq = getDaq(StreamType::output);
|
2022-06-29 10:25:32 +00:00
|
|
|
assert(daq != nullptr);
|
|
|
|
// Reset the signal generator.
|
|
|
|
_siggen->reset(daq->samplerate());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-20 12:58:48 +00:00
|
|
|
bool StreamMgr::outCallback(DaqData &data) {
|
|
|
|
|
|
|
|
/* DEBUGTRACE_ENTER; */
|
|
|
|
|
2022-06-29 10:25:32 +00:00
|
|
|
std::scoped_lock<std::mutex> lck(_siggen_mtx);
|
2022-07-20 12:58:48 +00:00
|
|
|
if (_siggen) {
|
2022-06-29 10:25:32 +00:00
|
|
|
vd signal = _siggen->genSignal(data.nframes);
|
|
|
|
switch (data.dtype) {
|
2022-07-29 07:32:26 +00:00
|
|
|
case (DataTypeDescriptor::DataType::dtype_fl32):
|
|
|
|
fillData<float>(data, signal);
|
|
|
|
break;
|
|
|
|
case (DataTypeDescriptor::DataType::dtype_fl64):
|
|
|
|
fillData<double>(data, signal);
|
|
|
|
break;
|
|
|
|
case (DataTypeDescriptor::DataType::dtype_int8):
|
|
|
|
fillData<int8_t>(data, signal);
|
|
|
|
break;
|
|
|
|
case (DataTypeDescriptor::DataType::dtype_int16):
|
|
|
|
fillData<int16_t>(data, signal);
|
|
|
|
break;
|
|
|
|
case (DataTypeDescriptor::DataType::dtype_int32):
|
|
|
|
fillData<int32_t>(data, signal);
|
|
|
|
break;
|
2022-06-29 10:25:32 +00:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Set all values to 0.
|
2022-07-20 12:58:48 +00:00
|
|
|
std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0);
|
2022-06-29 10:25:32 +00:00
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2022-07-29 07:32:26 +00:00
|
|
|
StreamMgr::~StreamMgr() {
|
|
|
|
DEBUGTRACE_ENTER;
|
|
|
|
stopAllStreams();
|
|
|
|
}
|
2022-07-20 12:58:48 +00:00
|
|
|
void StreamMgr::stopAllStreams() {
|
|
|
|
_inputStream.reset();
|
|
|
|
_outputStream.reset();
|
|
|
|
}
|
2022-06-29 10:25:32 +00:00
|
|
|
|
|
|
|
void StreamMgr::startStream(const DeviceInfo &devinfo,
|
2022-07-29 07:32:26 +00:00
|
|
|
const DaqConfiguration &config) {
|
2022-06-29 10:25:32 +00:00
|
|
|
|
2022-07-20 12:58:48 +00:00
|
|
|
bool isInput = std::count_if(config.inchannel_config.cbegin(),
|
2022-07-29 07:32:26 +00:00
|
|
|
config.inchannel_config.cend(),
|
|
|
|
[](auto &i) { return i.enabled; });
|
2022-07-20 12:58:48 +00:00
|
|
|
isInput |= config.monitorOutput && devinfo.hasInternalOutputMonitor;
|
|
|
|
|
|
|
|
bool isOutput = std::count_if(config.outchannel_config.cbegin(),
|
2022-07-29 07:32:26 +00:00
|
|
|
config.outchannel_config.cend(),
|
|
|
|
[](auto &i) { return i.enabled; });
|
2022-07-20 12:58:48 +00:00
|
|
|
|
2022-06-29 10:25:32 +00:00
|
|
|
bool isDuplex = isInput && isOutput;
|
|
|
|
|
|
|
|
if (!isInput && !isOutput) {
|
2022-07-29 07:32:26 +00:00
|
|
|
throw std::runtime_error("Neither input, nor output channels enabled for "
|
|
|
|
"stream. Cannotr start.");
|
2022-06-29 10:25:32 +00:00
|
|
|
}
|
|
|
|
|
2022-07-20 12:58:48 +00:00
|
|
|
if ((isDuplex || isInput) && _inputStream) {
|
2022-07-29 07:32:26 +00:00
|
|
|
throw std::runtime_error(
|
|
|
|
"Error: an input stream is already running. Please "
|
2022-07-20 12:58:48 +00:00
|
|
|
"first stop existing stream");
|
|
|
|
} else if (isOutput && _outputStream) {
|
|
|
|
throw std::runtime_error("Error: output stream is already running. Please "
|
2022-07-29 07:32:26 +00:00
|
|
|
"first stop existing stream");
|
2022-07-20 12:58:48 +00:00
|
|
|
}
|
2022-06-29 10:25:32 +00:00
|
|
|
|
2022-07-20 12:58:48 +00:00
|
|
|
InDaqCallback inCallback;
|
|
|
|
OutDaqCallback outCallback;
|
|
|
|
|
|
|
|
using namespace std::placeholders;
|
|
|
|
std::unique_ptr<Daq> daq = Daq::createDaq(devinfo, config);
|
|
|
|
|
2022-07-29 07:32:26 +00:00
|
|
|
if (isInput) {
|
|
|
|
inCallback = std::bind(&StreamMgr::inCallback, this, _1);
|
2022-07-20 12:58:48 +00:00
|
|
|
}
|
|
|
|
|
2022-07-29 07:32:26 +00:00
|
|
|
if (isOutput) {
|
|
|
|
if (_siggen) {
|
2022-07-20 12:58:48 +00:00
|
|
|
_siggen->reset(daq->samplerate());
|
2022-06-29 10:25:32 +00:00
|
|
|
}
|
2022-07-20 12:58:48 +00:00
|
|
|
outCallback = std::bind(&StreamMgr::outCallback, this, _1);
|
2022-06-29 10:25:32 +00:00
|
|
|
}
|
2022-07-20 12:58:48 +00:00
|
|
|
DEBUGTRACE_PRINT(isInput);
|
|
|
|
DEBUGTRACE_PRINT(isOutput);
|
|
|
|
|
|
|
|
daq->start(inCallback, outCallback);
|
|
|
|
|
2022-07-29 07:32:26 +00:00
|
|
|
if (isInput) {
|
2022-07-20 12:58:48 +00:00
|
|
|
_inputStream = std::move(daq);
|
|
|
|
} else {
|
|
|
|
assert(isOutput);
|
|
|
|
_outputStream = std::move(daq);
|
|
|
|
}
|
2022-06-29 10:25:32 +00:00
|
|
|
}
|
2022-08-14 19:00:22 +00:00
|
|
|
void StreamMgr::stopStream(const StreamType t) {
|
|
|
|
switch (t) {
|
|
|
|
case (StreamType::input): {
|
|
|
|
if (!_inputStream) {
|
|
|
|
throw std::runtime_error("Input stream is not running");
|
|
|
|
}
|
|
|
|
_inputStream = nullptr;
|
|
|
|
} break;
|
|
|
|
case (StreamType::output): {
|
|
|
|
if (_inputStream && _inputStream->duplexMode()) {
|
|
|
|
_inputStream = nullptr;
|
|
|
|
} else {
|
|
|
|
if (!_outputStream) {
|
|
|
|
throw std::runtime_error("Output stream is not running");
|
|
|
|
}
|
|
|
|
_outputStream = nullptr;
|
|
|
|
} // end else
|
|
|
|
} break;
|
|
|
|
default:
|
|
|
|
throw std::runtime_error("BUG");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2022-06-29 10:25:32 +00:00
|
|
|
|
|
|
|
void StreamMgr::addInDataHandler(InDataHandler &handler) {
|
|
|
|
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
|
|
|
|
_inDataHandlers.push_back(&handler);
|
|
|
|
}
|
|
|
|
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
|
|
|
|
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
|
|
|
|
_inDataHandlers.remove(&handler);
|
|
|
|
}
|
2022-07-20 12:58:48 +00:00
|
|
|
|
|
|
|
Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {
|
|
|
|
|
|
|
|
// Default constructor, says stream is not running, but also no errors
|
|
|
|
Daq::StreamStatus s;
|
|
|
|
|
|
|
|
const Daq *daq = getDaq(type);
|
|
|
|
if (daq) {
|
|
|
|
s = daq->getStreamStatus();
|
|
|
|
}
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
const Daq *StreamMgr::getDaq(StreamType type) const {
|
|
|
|
if (type == StreamType::input) {
|
|
|
|
return _inputStream.get();
|
|
|
|
} else {
|
|
|
|
// Output stream. If input runs in duplex mode, this is also the output
|
|
|
|
// stream. In that case, we return the outputstram
|
|
|
|
if (_inputStream && _inputStream->duplexMode()) {
|
|
|
|
return _inputStream.get();
|
|
|
|
}
|
|
|
|
return _outputStream.get();
|
|
|
|
}
|
|
|
|
}
|