#include "lasp_streammgr.h" #include #include #include #include "debugtrace.hpp" InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) { mgr.addInDataHandler(*this); } InDataHandler::~InDataHandler() { _mgr.removeInDataHandler(*this); } StreamMgr &StreamMgr::getInstance() { static StreamMgr mgr; return mgr; } StreamMgr::StreamMgr() {} bool StreamMgr::inCallback(const DaqData &data) { std::scoped_lock lck(_inDataHandler_mtx); for (auto &handler : _inDataHandlers) { bool res = handler->inCallback(data); if (!res) return false; } return true; } /** * @brief Converts from double precision floating point to output signal in * non-interleaving format. * * @tparam T * @param data * @param signal * * @return */ template bool fillData(DaqData &data, const vd &signal) { assert(data.nframes == signal.size()); T *res = reinterpret_cast(data.raw_ptr()); if (std::is_floating_point()) { for (us ch = 0; ch < data.nchannels; ch++) { for (us frame = 0; frame < data.nframes; frame++) { res[ch * data.nframes + frame] = signal[frame]; } } } else { for (us ch = 0; ch < data.nchannels; ch++) { for (us frame = 0; frame < data.nframes; frame++) { res[ch * data.nframes + frame] = (signal[frame] * std::numeric_limits::max()); } } } return true; } void StreamMgr::setSiggen(std::shared_ptr siggen) { std::scoped_lock lck(_siggen_mtx); _siggen = siggen; // If not set to nullptr, and a stream is running, we update the signal // generator by resetting it. if (isStreamRunningOK(StreamType::output) && siggen) { const Daq *daq = getDaq(StreamType::output); assert(daq != nullptr); // Reset the signal generator. _siggen->reset(daq->samplerate()); } } bool StreamMgr::outCallback(DaqData &data) { /* DEBUGTRACE_ENTER; */ std::scoped_lock lck(_siggen_mtx); if (_siggen) { vd signal = _siggen->genSignal(data.nframes); switch (data.dtype) { case (DataTypeDescriptor::DataType::dtype_fl32): fillData(data, signal); break; case (DataTypeDescriptor::DataType::dtype_fl64): fillData(data, signal); break; case (DataTypeDescriptor::DataType::dtype_int8): fillData(data, signal); break; case (DataTypeDescriptor::DataType::dtype_int16): fillData(data, signal); break; case (DataTypeDescriptor::DataType::dtype_int32): fillData(data, signal); break; } } else { // Set all values to 0. std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0); } return true; } StreamMgr::~StreamMgr() { stopAllStreams(); } void StreamMgr::stopAllStreams() { _inputStream.reset(); _outputStream.reset(); } void StreamMgr::startStream(const DeviceInfo &devinfo, const DaqConfiguration &config) { bool isInput = std::count_if(config.inchannel_config.cbegin(), config.inchannel_config.cend(), [](auto &i) { return i.enabled; }); isInput |= config.monitorOutput && devinfo.hasInternalOutputMonitor; bool isOutput = std::count_if(config.outchannel_config.cbegin(), config.outchannel_config.cend(), [](auto &i) { return i.enabled; }); bool isDuplex = isInput && isOutput; if (!isInput && !isOutput) { throw std::runtime_error( "Neither input, nor output channels enabled for stream. Cannotr start."); } if ((isDuplex || isInput) && _inputStream) { throw std::runtime_error("Error: an input stream is already running. Please " "first stop existing stream"); } else if (isOutput && _outputStream) { throw std::runtime_error("Error: output stream is already running. Please " "first stop existing stream"); } InDaqCallback inCallback; OutDaqCallback outCallback; using namespace std::placeholders; std::unique_ptr daq = Daq::createDaq(devinfo, config); if(isInput) { inCallback = std::bind(&StreamMgr::inCallback, this, _1); } if(isOutput) { if(_siggen) { _siggen->reset(daq->samplerate()); } outCallback = std::bind(&StreamMgr::outCallback, this, _1); } DEBUGTRACE_PRINT(isInput); DEBUGTRACE_PRINT(isOutput); daq->start(inCallback, outCallback); if(isInput) { _inputStream = std::move(daq); } else { assert(isOutput); _outputStream = std::move(daq); } } void StreamMgr::addInDataHandler(InDataHandler &handler) { std::scoped_lock lck(_inDataHandler_mtx); _inDataHandlers.push_back(&handler); } void StreamMgr::removeInDataHandler(InDataHandler &handler) { std::scoped_lock lck(_inDataHandler_mtx); _inDataHandlers.remove(&handler); } Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const { // Default constructor, says stream is not running, but also no errors Daq::StreamStatus s; const Daq *daq = getDaq(type); if (daq) { s = daq->getStreamStatus(); } return s; } const Daq *StreamMgr::getDaq(StreamType type) const { if (type == StreamType::input) { return _inputStream.get(); } else { // Output stream. If input runs in duplex mode, this is also the output // stream. In that case, we return the outputstram if (_inputStream && _inputStream->duplexMode()) { return _inputStream.get(); } return _outputStream.get(); } }