// #define DEBUGTRACE_ENABLED #include "lasp_streammgr.h" #include #include #include #include #include #include #include #include "debugtrace.hpp" #include "lasp_biquadbank.h" #include "lasp_indatahandler.h" #include "lasp_thread.h" using namespace std::literals::chrono_literals; using std::cerr; using std::endl; using rte = std::runtime_error; /** * @brief The main global handle to a stream, stored in a weak pointer, if it * does not yet exist, via StreamMgr::getInstance, a new stream mgr is created. * It also makes sure that the stream manager is deleted once the latest handle * to it has been destroyed (no global stuff left). */ std::weak_ptr _mgr; std::mutex _mgr_mutex; using Lck = std::scoped_lock; /** * @brief The only way to obtain a stream manager, can only be called from the * thread that does it the first time. * * @return Stream manager handle */ SmgrHandle StreamMgr::getInstance() { DEBUGTRACE_ENTER; std::scoped_lock lck(_mgr_mutex); auto mgr = _mgr.lock(); if (!mgr) { // Double Check Locking Pattern, if two threads would simultaneously // instantiate the singleton instance. auto mgr = _mgr.lock(); if (mgr) { return mgr; } mgr = SmgrHandle(new StreamMgr()); if (!mgr) { throw rte("Fatal: could not allocate stream manager!"); } // Update global weak pointer _mgr = mgr; return mgr; } #if LASP_DEBUG == 1 // Make sure we never ask for a new SmgrHandle from a different thread. assert(std::this_thread::get_id() == mgr->main_thread_id); #endif return mgr; } StreamMgr::StreamMgr() #if LASP_DEBUG == 1 : main_thread_id(std::this_thread::get_id()) #endif { DEBUGTRACE_ENTER; // Trigger a scan for the available devices, in the background. rescanDAQDevices(false); } #if LASP_DEBUG == 1 void StreamMgr::checkRightThread() const { assert(std::this_thread::get_id() == main_thread_id); } #endif void StreamMgr::rescanDAQDevices(bool background, std::function callback) { DEBUGTRACE_ENTER; DEBUGTRACE_PRINT(background); if (_scanningDevices) { throw rte("A background device scan is already busy"); } Lck lck(_mtx); checkRightThread(); if (_inputStream || _outputStream) { throw rte("Rescanning DAQ devices only possible when no stream is running"); } _devices.clear(); if (!background) { _scanningDevices = true; rescanDAQDevices_impl(callback); } else { DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread..."); _scanningDevices = true; _pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); } } void StreamMgr::rescanDAQDevices_impl(std::function callback) { DEBUGTRACE_ENTER; Lck lck(_mtx); _devices = DeviceInfo::getDeviceInfo(); if (callback) { callback(); } _scanningDevices = false; } void StreamMgr::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; Lck lck(_mtx); assert(_inputFilters.size() == data.nchannels); if (std::count_if(_inputFilters.cbegin(), _inputFilters.cend(), [](const auto &a) { return bool(a); }) > 0) { /// Found a filter in vector of input filters. So we have to apply the /// filters to each channel. DaqData input_filtered(data.nframes, data.nchannels, data.dtype); for (us ch = 0; ch < data.nchannels; ch++) { if (_inputFilters[ch]) { DEBUGTRACE_PRINT("Filter ch:"); DEBUGTRACE_PRINT(ch); vd inout = data.toFloat(ch); _inputFilters[ch]->filter(inout); input_filtered.fromFloat(ch, inout); } else { DEBUGTRACE_PRINT("No filter ch:"); DEBUGTRACE_PRINT(ch); input_filtered.copyInFromRaw(ch, data.raw_ptr(0, ch)); } } DEBUGTRACE_PRINT("Calling incallback for handlers (filtered)..."); for (auto &handler : _inDataHandlers) { handler->inCallback(input_filtered); } } else { /// No input filters DEBUGTRACE_PRINT("Calling incallback for handlers..."); for (auto &handler : _inDataHandlers) { handler->inCallback(data); } } } void StreamMgr::setSiggen(std::shared_ptr siggen) { DEBUGTRACE_ENTER; checkRightThread(); Lck lck(_mtx); // If not set to nullptr, and a stream is running, we update the signal // generator by resetting it. if (isStreamRunningOK(StreamType::output) && siggen) { const Daq *daq = getDaq(StreamType::output); assert(daq != nullptr); // Reset the signal generator. siggen->reset(daq->samplerate()); } _siggen = siggen; } #define DEBUG_FILLDATA 0 /** * @brief Converts from double precision floating point to output signal in * non-interleaving format. * * @tparam T * @param data * @param signal * * @return */ template bool fillData(DaqData &data, const vd &signal) { /* DEBUGTRACE_ENTER; */ assert(data.nframes == signal.size()); T *res = reinterpret_cast(data.raw_ptr()); if (std::is_floating_point()) { for (us ch = 0; ch < data.nchannels; ch++) { for (us frame = 0; frame < data.nframes; frame++) { #if DEBUG_FILLDATA == 1 DEBUGTRACE_PRINT("SLOW flt"); data.setSlow(frame, ch, reinterpret_cast(&signal[frame])); #else res[ch * data.nframes + frame] = signal[frame]; #endif } } } else { for (us ch = 0; ch < data.nchannels; ch++) { for (us frame = 0; frame < data.nframes; frame++) { const T val = (signal[frame] * std::numeric_limits::max()); #if DEBUG_FILLDATA == 1 data.setSlow(frame, ch, reinterpret_cast(&val)); #else res[ch * data.nframes + frame] = val; #endif } } } return true; } void StreamMgr::outCallback(DaqData &data) { DEBUGTRACE_ENTER; Lck lck(_mtx); if (_siggen) { vd signal = _siggen->genSignal(data.nframes); switch (data.dtype) { case (DataTypeDescriptor::DataType::dtype_fl32): fillData(data, signal); break; case (DataTypeDescriptor::DataType::dtype_fl64): fillData(data, signal); break; case (DataTypeDescriptor::DataType::dtype_int8): fillData(data, signal); break; case (DataTypeDescriptor::DataType::dtype_int16): fillData(data, signal); break; case (DataTypeDescriptor::DataType::dtype_int32): fillData(data, signal); break; } } else { // Set all values to 0. std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0); } } StreamMgr::~StreamMgr() { DEBUGTRACE_ENTER; while (_scanningDevices) { std::this_thread::sleep_for(10us); } #if LASP_DEBUG == 1 { // Careful, this lock needs to be released to make sure the streams can // obtain a lock to the stream manager. Lck lck(_mtx); checkRightThread(); } #endif // Stream manager now handled by shared pointer. Each indata handler gets a // shared pointer to the stream manager, and stores a weak pointer to it. // Hence, we do not have to do any cleanup here. It also makes sure that the // order in which destructors are called does not matter anymore. As soon as // the stream manager is destructed, the weak pointers loose there ref, and do // not have to removeInDataHandler() anymore. // Stop the streams in this phase, otherwise it might happen during the // destruction of the Siggen, in which case we might get calls to pure // virtual methods. This was really a bug. _inputStream.reset(); _outputStream.reset(); } void StreamMgr::stopAllStreams() { DEBUGTRACE_ENTER; { Lck lck(_mtx); checkRightThread(); } // No lock here! _inputStream.reset(); _outputStream.reset(); } void StreamMgr::startStream(const DaqConfiguration &config) { DEBUGTRACE_ENTER; if (_scanningDevices) { throw rte("DAQ device scan is busy. Cannot start stream."); } Lck lck(_mtx); checkRightThread(); bool isInput = std::count_if(config.inchannel_config.cbegin(), config.inchannel_config.cend(), [](auto &i) { return i.enabled; }) > 0; bool isOutput = std::count_if(config.outchannel_config.cbegin(), config.outchannel_config.cend(), [](auto &i) { return i.enabled; }) > 0; // Find the first device that matches with the configuration DeviceInfo *devinfo = nullptr; // Match configuration to a device in the list of devices for (auto &devinfoi : _devices) { if (config.match(*devinfoi)) { devinfo = devinfoi.get(); break; } } if (devinfo == nullptr) { throw rte("Could not find a device with name " + config.device_name + " in list of devices."); } isInput |= (config.monitorOutput && devinfo->hasInternalOutputMonitor); DEBUGTRACE_PRINT(isInput); bool isDuplex = isInput && isOutput; if (!isInput && !isOutput) { throw rte( "Attempted stream start failed, stream does not have any enabled " "channels. Please first enable channels in the channel configuration."); } if (isInput && _inputStream) { throw rte( "Error: an input stream is already running. Please " "first stop existing stream"); } else if (isOutput && _outputStream) { throw rte( "Error: output stream is already running. Please " "first stop existing stream"); } else if (_inputStream) { if (_inputStream->duplexMode() && isOutput) { throw rte( "Error: output stream is already running (in duplex mode). " "Please " "first stop existing stream"); } } if (_outputStream && isInput && _outputStream->duplexModeForced && config.match(*_outputStream)) { throw rte( "This device is already opened for output. If input is also " "required, please enable duplex mode for this device"); } if (_inputStream && isOutput && _inputStream->duplexModeForced && config.match(*_inputStream)) { throw rte( "This device is already opened for input. If output is also " "required, please enable duplex mode for this device"); } InDaqCallback inCallback; OutDaqCallback outCallback; using namespace std::placeholders; std::unique_ptr daq = Daq::createDaq(*devinfo, config); assert(daq); if (isInput) { /// Give incallback as parameter to stream inCallback = std::bind(&StreamMgr::inCallback, this, _1); /// Reset handlers in case of an input stream for (auto &handler : _inDataHandlers) { handler->reset(daq.get()); } d fs = daq->samplerate(); /// Create input filters _inputFilters.clear(); /// No input filter for monitor channel, which comes as the first input /// channel In the list if (config.monitorOutput && devinfo->hasInternalOutputMonitor) { _inputFilters.push_back(nullptr); } for (auto &ch : daq->inchannel_config) { if (ch.enabled) { if (ch.digitalHighPassCutOn < 0) { _inputFilters.push_back(nullptr); } else if (ch.digitalHighPassCutOn == 0) { throw rte("Digital highpass cuton should be > 0 if activated"); } else { // Put in a digital high-pass filter. _inputFilters.emplace_back(std::make_unique( SeriesBiquad::firstOrderHighPass(fs, ch.digitalHighPassCutOn))); } } } // End of input filter creation } if (isOutput) { /// Give outcallback as parameter to stream outCallback = std::bind(&StreamMgr::outCallback, this, _1); /// Reset signal generator in case of an output stream if (_siggen) { DEBUGTRACE_PRINT("Resetting _siggen with new samplerate of "); DEBUGTRACE_PRINT(daq->samplerate()); _siggen->reset(daq->samplerate()); } } /// Start the DAQ. If it fails, everything is still nicely cleaned up and /// the daq unique_ptr cleans up resources nicely. daq->start(inCallback, outCallback); // Move daq ptr to right place if (isInput) { _inputStream = std::move(daq); } else { _outputStream = std::move(daq); } } void StreamMgr::stopStream(const StreamType t) { DEBUGTRACE_ENTER; checkRightThread(); bool resetHandlers = false; std::unique_ptr *streamToStop = nullptr; { // Mutex locked in this scope Lck lck(_mtx); if (t == StreamType::input) { if (!_inputStream) { throw rte("Input stream is not running"); } streamToStop = std::addressof(_inputStream); resetHandlers = true; } else { /// t == output /// Kill input stream in case that one is a duplex stream if (_inputStream && _inputStream->duplexMode()) { streamToStop = std::addressof(_inputStream); } else { if (!_outputStream) { throw rte("Output stream is not running"); } streamToStop = std::addressof(_outputStream); } // end else } } // End of mutex lock. When stopping stream, mutex should be unlocked. // If we arrive here, we should have a stream to stop. assert(streamToStop != nullptr); streamToStop->reset(); /// Send reset to all in data handlers if (resetHandlers) { Lck lck(_mtx); for (auto &handler : _inDataHandlers) { handler->reset(nullptr); } } } void StreamMgr::addInDataHandler(InDataHandler *handler) { DEBUGTRACE_ENTER; Lck lck(_mtx); checkRightThread(); assert(handler); handler->reset(_inputStream.get()); if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) != _inDataHandlers.cend()) { throw std::runtime_error( "Error: handler already added. Probably start() " "is called more than once on a handler object"); } _inDataHandlers.push_back(handler); DEBUGTRACE_PRINT(_inDataHandlers.size()); } void StreamMgr::removeInDataHandler(InDataHandler &handler) { DEBUGTRACE_ENTER; Lck lck(_mtx); // checkRightThread(); _inDataHandlers.remove(&handler); DEBUGTRACE_PRINT(_inDataHandlers.size()); } Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const { DEBUGTRACE_ENTER; Lck lck(_mtx); checkRightThread(); // Default constructor, says stream is not running, but also no errors const Daq *daq = getDaq(type); if (daq) { return daq->getStreamStatus(); } else { return Daq::StreamStatus(); } } const Daq *StreamMgr::getDaq(StreamType type) const { Lck lck(_mtx); checkRightThread(); if (type == StreamType::input) { return _inputStream.get(); } else { // Output stream. If input runs in duplex mode, this is also the output // stream. In that case, we return the input stream if (_inputStream && _inputStream->duplexMode()) { return _inputStream.get(); } else { return _outputStream.get(); } } }