diff --git a/src/lasp/device/CMakeLists.txt b/src/lasp/device/CMakeLists.txt index e576d23..133c788 100644 --- a/src/lasp/device/CMakeLists.txt +++ b/src/lasp/device/CMakeLists.txt @@ -1,4 +1,5 @@ # src/lasp/device/CMakeLists.txt +include_directories(uldaq) add_library(lasp_device_lib OBJECT lasp_daq.cpp @@ -7,8 +8,11 @@ add_library(lasp_device_lib OBJECT lasp_deviceinfo.cpp lasp_rtaudiodaq.cpp lasp_streammgr.cpp + lasp_indatahandler.cpp lasp_uldaq.cpp - lasp_uldaq_impl.cpp + uldaq/lasp_uldaq_impl.cpp + uldaq/lasp_uldaq_bufhandler.cpp + uldaq/lasp_uldaq_common.cpp ) # Callback requires certain arguments that are not used by code. This disables diff --git a/src/lasp/device/lasp_daq.h b/src/lasp/device/lasp_daq.h index dbc7392..e1f9d3d 100644 --- a/src/lasp/device/lasp_daq.h +++ b/src/lasp/device/lasp_daq.h @@ -5,6 +5,8 @@ #include "lasp_types.h" #include #include +#include +#include /** * \defgroup device Device interfacing @@ -13,12 +15,12 @@ * @brief Callback of DAQ for input data. Callback should return * false for a stop request. */ -using InDaqCallback = std::function; +using InDaqCallback = std::function; /** * @brief */ -using OutDaqCallback = std::function; +using OutDaqCallback = std::function; /** * @brief Base cass for all DAQ (Data Acquisition) interfaces. A DAQ can be a diff --git a/src/lasp/device/lasp_deviceinfo.h b/src/lasp/device/lasp_deviceinfo.h index 81772ed..7d3d8ad 100644 --- a/src/lasp/device/lasp_deviceinfo.h +++ b/src/lasp/device/lasp_deviceinfo.h @@ -19,6 +19,7 @@ public: * @brief Virtual desctructor. Can be derived class. */ virtual ~DeviceInfo() {} + DeviceInfo& operator=(const DeviceInfo&) = delete; /** * @brief Clone a device info. diff --git a/src/lasp/device/lasp_indatahandler.cpp b/src/lasp/device/lasp_indatahandler.cpp new file mode 100644 index 0000000..8bdeb17 --- /dev/null +++ b/src/lasp/device/lasp_indatahandler.cpp @@ -0,0 +1,59 @@ +/* #define DEBUGTRACE_ENABLED */ +#include "lasp_indatahandler.h" +#include "debugtrace.hpp" +#include "lasp_streammgr.h" +#include + +InDataHandler::InDataHandler(SmgrHandle mgr, const InCallbackType cb, + const InResetType resetfcn) + : _mgr(mgr), inCallback(cb), reset(resetfcn) +#if LASP_DEBUG == 1 + , + main_thread_id(std::this_thread::get_id()) +#endif +{ + DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + assert(mgr->main_thread_id == main_thread_id); +#endif +} +void InDataHandler::start() { + DEBUGTRACE_ENTER; + checkRightThread(); + if (SmgrHandle handle = _mgr.lock()) { + handle->addInDataHandler(this); +#if LASP_DEBUG == 1 + assert(handle->main_thread_id == main_thread_id); +#endif + } +} +void InDataHandler::stop() { + DEBUGTRACE_ENTER; + checkRightThread(); +#if LASP_DEBUG == 1 + stopCalled = true; +#endif + if (SmgrHandle handle = _mgr.lock()) { + handle->removeInDataHandler(*this); + } +} + +InDataHandler::~InDataHandler() { + + DEBUGTRACE_ENTER; + checkRightThread(); +#if LASP_DEBUG == 1 + if (!stopCalled) { + std::cerr << "************ BUG: Stop function not called while arriving at " + "InDataHandler's destructor. Fix this by calling " + "InDataHandler::stop()." + << std::endl; + abort(); + } +#endif +} +#if LASP_DEBUG == 1 +void InDataHandler::checkRightThread() const { + assert(std::this_thread::get_id() == main_thread_id); +} +#endif diff --git a/src/lasp/device/lasp_indatahandler.h b/src/lasp/device/lasp_indatahandler.h new file mode 100644 index 0000000..2ff47b1 --- /dev/null +++ b/src/lasp/device/lasp_indatahandler.h @@ -0,0 +1,79 @@ +#pragma once +#include "lasp_types.h" +#include +#include +#include +#include + +class StreamMgr; +using SmgrHandle = std::shared_ptr; + +class DaqData; +class Daq; + +/** \addtogroup device + * @{ + */ + +/** + * @brief The function definition of callbacks with incoming DAQ data + */ +using InCallbackType = std::function; +/** + * @brief Function definition for the reset callback. + */ +using InResetType = std::function; + +class InDataHandler { + +protected: + std::weak_ptr _mgr; +#if LASP_DEBUG == 1 + // This is a flag to indicate whether the method stop() is called for the + // current handler. It should call the method stop() from the derived class's + // destructor. + std::atomic stopCalled{false}; +#endif + +public: + ~InDataHandler(); + const InCallbackType inCallback; + const InResetType reset; + + /** + * @brief When constructed, the handler is added to the stream manager, which + * will call the handlers's inCallback() until stop() is called. + * + * @param mgr Stream manager. + * @param cb The callback that is stored, and called on new DAQ data + * @param resetfcn The callback that is stored, and called when the DAQ + * changes state. + */ + InDataHandler(SmgrHandle mgr, InCallbackType cb, + InResetType resetfcn); + + /** + * @brief Adds the current InDataHandler to the list of handlers in the + * StreamMgr. After this happens, the reset() method stored in this + * object is called back. When the stream is running, right after this, + * inCallback() is called with DaqData. + */ + void start(); + + /** + * @brief Removes the currend InDataHandler from the list of handlers in the + * StreamMgr. From that point on, the object can be safely destroyed. Not + * calling stop() before destruction of this object is considered a BUG. I.e. + * a class which *uses* an InDataHandler should always call stop() in its + * destructor. + */ + void stop(); + +#if LASP_DEBUG == 1 + const std::thread::id main_thread_id; + void checkRightThread() const; +#else + void checkRightThread() const {} +#endif +}; +/** @} */ diff --git a/src/lasp/device/lasp_rtaudiodaq.cpp b/src/lasp/device/lasp_rtaudiodaq.cpp index 2728888..6099ae2 100644 --- a/src/lasp/device/lasp_rtaudiodaq.cpp +++ b/src/lasp/device/lasp_rtaudiodaq.cpp @@ -368,11 +368,7 @@ public: DaqData d{nFramesPerBlock, neninchannels, dtype}; d.copyInFromRaw(ptrs); - bool ret = _incallback(d); - if (!ret) { - stopWithError(se::noError); - return 1; - } + _incallback(d); } if (outputBuffer) { @@ -395,11 +391,8 @@ public: } DaqData d{nFramesPerBlock, nenoutchannels, dtype}; - bool ret = _outcallback(d); - if (!ret) { - stopWithError(se::noError); - return 1; - } + _outcallback(d); + // Copy over the buffer us j = 0; for (auto ptr : ptrs) { d.copyToRaw(j, ptr); diff --git a/src/lasp/device/lasp_rtaudiodaq.h b/src/lasp/device/lasp_rtaudiodaq.h index 74b6d3e..288431a 100644 --- a/src/lasp/device/lasp_rtaudiodaq.h +++ b/src/lasp/device/lasp_rtaudiodaq.h @@ -2,6 +2,25 @@ #include "lasp_daq.h" #include +/** \addtogroup device + * @{ + * \defgroup rtaudio RtAudio backend + * This code is used to interface with the RtAudio cross-platform audio + * interface. + * + * \addtogroup rtaudio + * @{ + */ + + +/** + * @brief Method called from Daq::createDaq. + * + * @param devinfo Device info + * @param config DAQ Configuration settings + * + * @return Pointer to Daq instance. Throws Runtime errors on error. + */ std::unique_ptr createRtAudioDevice(const DeviceInfo& devinfo, const DaqConfiguration& config); @@ -12,3 +31,5 @@ std::unique_ptr createRtAudioDevice(const DeviceInfo& devinfo, */ void fillRtAudioDeviceInfo(DeviceInfoList &devinfolist); +/** @} */ +/** @} */ diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index d1f70b7..89054e1 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -2,58 +2,78 @@ #include "lasp_streammgr.h" #include "debugtrace.hpp" #include "lasp_biquadbank.h" +#include "lasp_indatahandler.h" #include "lasp_thread.h" #include #include #include #include +#include +#include using std::cerr; using std::endl; using rte = std::runtime_error; -InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) { DEBUGTRACE_ENTER; } -void InDataHandler::start() { - DEBUGTRACE_ENTER; - _mgr.addInDataHandler(*this); -} -void InDataHandler::stop() { -#if LASP_DEBUG == 1 - stopCalled = true; -#endif - _mgr.removeInDataHandler(*this); -} -InDataHandler::~InDataHandler() { +/** + * @brief The main global handle to a stream, stored in a weak pointer, if it + * does not yet exist, via StreamMgr::getInstance, a new stream mgr is created. + * It also makes sure that the stream manager is deleted once the latest handle + * to it has been destroyed (no global stuff left). + */ +std::weak_ptr _mgr; +std::mutex _mgr_mutex; +using Lck = std::scoped_lock; + +/** + * @brief The only way to obtain a stream manager, can only be called from the + * thread that does it the first time. + * + * @return Stream manager handle + */ +SmgrHandle StreamMgr::getInstance() { DEBUGTRACE_ENTER; -#if LASP_DEBUG == 1 - if (!stopCalled) { - cerr << "************ BUG: Stop function not called while arriving at " - "InDataHandler's destructor. Fix this by calling " - "InDataHandler::stop() from the derived class' destructor." - << endl; - abort(); + + auto mgr = _mgr.lock(); + if (!mgr) { + // Double Check Locking Pattern, if two threads would simultaneously + // instantiate the singleton instance. + Lck lck(_mgr_mutex); + + auto mgr = _mgr.lock(); + if (mgr) { + return mgr; + } + + mgr = SmgrHandle(new StreamMgr()); + if (!mgr) { + throw rte("Fatal: could not allocate stream manager!"); + } + // Update global weak pointer + _mgr = mgr; + return mgr; } +#if LASP_DEBUG == 1 + // Make sure we never ask for a new SmgrHandle from a different thread. + assert(std::this_thread::get_id() == mgr->main_thread_id); #endif -} -StreamMgr &StreamMgr::getInstance() { - - DEBUGTRACE_ENTER; - static StreamMgr mgr; return mgr; } -StreamMgr::StreamMgr() { - DEBUGTRACE_ENTER; + +StreamMgr::StreamMgr() #if LASP_DEBUG == 1 - _main_thread_id = std::this_thread::get_id(); + : main_thread_id(std::this_thread::get_id()) #endif +{ + DEBUGTRACE_ENTER; // Trigger a scan for the available devices, in the background. rescanDAQDevices(true); } #if LASP_DEBUG == 1 void StreamMgr::checkRightThread() const { - assert(std::this_thread::get_id() == _main_thread_id); + assert(std::this_thread::get_id() == main_thread_id); } #endif @@ -61,22 +81,23 @@ void StreamMgr::rescanDAQDevices(bool background, std::function callback) { DEBUGTRACE_ENTER; DEBUGTRACE_PRINT(background); - auto &pool = getPool(); + checkRightThread(); + if (_inputStream || _outputStream) { + throw rte("Rescanning DAQ devices only possible when no stream is running"); + } if (!_devices_mtx.try_lock()) { throw rte("A background DAQ device scan is probably already running"); } _devices_mtx.unlock(); - if (_inputStream || _outputStream) { - throw rte("Rescanning DAQ devices only possible when no stream is running"); - } + std::scoped_lock lck(_devices_mtx); _devices.clear(); - /* auto &pool = getPool(); */ if (!background) { rescanDAQDevices_impl(callback); } else { - pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); + DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread..."); + _pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); } } void StreamMgr::rescanDAQDevices_impl(std::function callback) { @@ -87,7 +108,7 @@ void StreamMgr::rescanDAQDevices_impl(std::function callback) { callback(); } } -bool StreamMgr::inCallback(const DaqData &data) { +void StreamMgr::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -118,23 +139,15 @@ bool StreamMgr::inCallback(const DaqData &data) { } for (auto &handler : _inDataHandlers) { - bool res = handler->inCallback(input_filtered); - if (!res) { - return false; - } + handler->inCallback(input_filtered); } } else { /// No input filters for (auto &handler : _inDataHandlers) { - - bool res = handler->inCallback(data); - if (!res) { - return false; - } + handler->inCallback(data); } } - return true; } void StreamMgr::setSiggen(std::shared_ptr siggen) { @@ -198,7 +211,7 @@ template bool fillData(DaqData &data, const vd &signal) { return true; } -bool StreamMgr::outCallback(DaqData &data) { +void StreamMgr::outCallback(DaqData &data) { /* DEBUGTRACE_ENTER; */ @@ -227,19 +240,17 @@ bool StreamMgr::outCallback(DaqData &data) { // Set all values to 0. std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0); } - return true; } StreamMgr::~StreamMgr() { DEBUGTRACE_ENTER; checkRightThread(); - stopAllStreams(); - if (!_inDataHandlers.empty()) { - cerr << "*** WARNING: InDataHandlers have not been all stopped, while " - "StreamMgr destructor is called. This is a misuse BUG" - << endl; - abort(); - } + // Stream manager now handled by shared pointer. Each indata handler gets a + // shared pointer to the stream manager, and stores a weak pointer to it. + // Hence, we do not have to do any cleanup here. It also makes sure that the + // order in which destructors are called does not matter anymore. As soon as + // the stream manager is destructed, the weak pointers loose there ref, and do + // not have to removeInDataHandler() anymore. } void StreamMgr::stopAllStreams() { DEBUGTRACE_ENTER; @@ -264,15 +275,15 @@ void StreamMgr::startStream(const DaqConfiguration &config) { std::scoped_lock lck(_devices_mtx); DeviceInfo *devinfo = nullptr; - bool found = false; + // Match configuration to a device in the list of devices for (auto &devinfoi : _devices) { if (config.match(*devinfoi)) { devinfo = devinfoi.get(); break; } } - if (!devinfo) { + if (devinfo == nullptr) { throw rte("Could not find a device with name " + config.device_name + " in list of devices."); } @@ -408,23 +419,20 @@ void StreamMgr::stopStream(const StreamType t) { } } -void StreamMgr::addInDataHandler(InDataHandler &handler) { +void StreamMgr::addInDataHandler(InDataHandler *handler) { DEBUGTRACE_ENTER; checkRightThread(); + assert(handler); std::scoped_lock lck(_inDataHandler_mtx); - if (_inputStream) { - handler.reset(_inputStream.get()); - } else { - handler.reset(nullptr); - } - if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), &handler) != + handler->reset(_inputStream.get()); + + if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) != _inDataHandlers.cend()) { throw std::runtime_error("Error: handler already added. Probably start() " "is called more than once on a handler object"); } - _inDataHandlers.push_back(&handler); + _inDataHandlers.push_back(handler); DEBUGTRACE_PRINT(_inDataHandlers.size()); - } void StreamMgr::removeInDataHandler(InDataHandler &handler) { @@ -434,7 +442,6 @@ void StreamMgr::removeInDataHandler(InDataHandler &handler) { _inDataHandlers.remove(&handler); DEBUGTRACE_PRINT(_inDataHandlers.size()); - } Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const { diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index fc69222..8955cc6 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -1,6 +1,7 @@ #pragma once #include "lasp_daq.h" #include "lasp_siggen.h" +#include "lasp_thread.h" #include #include #include @@ -10,60 +11,8 @@ * @{ */ class StreamMgr; +class InDataHandler; -class InDataHandler { - -protected: - StreamMgr &_mgr; -#if LASP_DEBUG == 1 - std::atomic stopCalled{false}; -#endif - -public: - virtual ~InDataHandler(); - - /** - * @brief When constructed, the handler is added to the stream manager, which - * will call the handlers's inCallback() until stop() is called. - * - * @param mgr Stream manager. - */ - InDataHandler(StreamMgr &mgr); - - /** - * @brief This function is called when input data from a DAQ is available. - * - * @param daqdata Input data from DAQ - * - * @return true if no error. False to stop the stream from running. - */ - virtual bool inCallback(const DaqData &daqdata) = 0; - - /** - * @brief Reset in-data handler. - * - * @param daq New DAQ configuration of inCallback(). If nullptr is given, - * it means that the stream is stopped. - */ - virtual void reset(const Daq *daq = nullptr) = 0; - - /** - * @brief This function should be called from the constructor of the - * implementation of InDataHandler. It will start the stream's calling of - * inCallback(). - */ - void start(); - - /** - * @brief This function should be called from the destructor of derived - * classes, to disable the calls to inCallback(), such that proper - * destruction of the object is allowed and no other threads call methods - * from the object. It removes the inCallback() from the callback list of the - * StreamMgr(). **Failing to call this function results in deadlocks, errors - * like "pure virtual function called", or other**. - */ - void stop(); -}; class SeriesBiquad; @@ -76,48 +25,53 @@ class SeriesBiquad; * fact is asserted. */ class StreamMgr { -#if LASP_DEBUG == 1 - std::thread::id _main_thread_id; -#endif /** * @brief Storage for streams. */ std::unique_ptr _inputStream, _outputStream; + ThreadSafeThreadPool _pool; + /** * @brief All indata handlers are called when input data is available. Note * that they can be called from different threads and should take care of * thread-safety. */ std::list _inDataHandlers; - std::mutex _inDataHandler_mtx; + mutable std::mutex _inDataHandler_mtx; /** * @brief Signal generator in use to generate output data. Currently * implemented as to generate the same data for all output channels. */ std::shared_ptr _siggen; + std::mutex _siggen_mtx; /** * @brief Filters on input stream. For example, a digital high pass filter. */ std::vector> _inputFilters; - std::mutex _siggen_mtx; - std::mutex _devices_mtx; + mutable std::recursive_mutex _devices_mtx; + /** + * @brief Current storage for the device list + */ DeviceInfoList _devices; + // Singleton, no public constructor. Can only be obtained using + // getInstance(); StreamMgr(); friend class InDataHandler; friend class Siggen; - // Singleton, no public destructor - ~StreamMgr(); public: + + ~StreamMgr(); + enum class StreamType : us { /** * @brief Input stream @@ -137,7 +91,7 @@ class StreamMgr { * * @return Reference to stream manager. */ - static StreamMgr &getInstance(); + static std::shared_ptr getInstance(); /** * @brief Obtain a list of devices currently available. When the StreamMgr is @@ -146,7 +100,7 @@ class StreamMgr { * @return A copy of the internal stored list of devices */ DeviceInfoList getDeviceInfo() const { - std::scoped_lock lck(const_cast(_devices_mtx)); + std::scoped_lock lck(_devices_mtx); DeviceInfoList d2; for(const auto& dev: _devices) { d2.push_back(dev->clone()); @@ -157,7 +111,8 @@ class StreamMgr { /** * @brief Triggers a background scan of the DAQ devices, which updates the * internally stored list of devices. Throws a runtime error when a - * background thread is already scanning for devices. + * background thread is already scanning for devices, or if a stream is + * running. * * @param background Perform searching for DAQ devices in the background. If * set to true, the function returns immediately. @@ -231,17 +186,17 @@ class StreamMgr { /** * @brief Set active signal generator for output streams. Only one `Siggen' * is active at the same time. Siggen controls its own data race protection - * using a mutex. + * using a mutex. If no Siggen is there, and an output stream is running, it + * will send a default signal of 0. * * @param s New Siggen pointer */ void setSiggen(std::shared_ptr s); private: - bool inCallback(const DaqData &data); - bool outCallback(DaqData &data); + void inCallback(const DaqData &data); + void outCallback(DaqData &data); - void removeInDataHandler(InDataHandler &handler); /** * @brief Add an input data handler. The handler's inCallback() function is @@ -251,8 +206,14 @@ private: * * @param handler The handler to add. */ - void addInDataHandler(InDataHandler &handler); + void addInDataHandler(InDataHandler *handler); + /** + * @brief Remove InDataHandler from the list. + * + * @param handler + */ + void removeInDataHandler(InDataHandler &handler); /** * @brief Do the actual rescanning. * @@ -261,6 +222,7 @@ private: void rescanDAQDevices_impl(std::function callback); #if LASP_DEBUG == 1 + const std::thread::id main_thread_id; void checkRightThread() const; #else void checkRightThread() const {} diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index 34e1f8e..04c21e5 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -1,18 +1,22 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" #include "lasp_config.h" + #if LASP_HAS_ULDAQ == 1 #include "lasp_uldaq.h" #include "lasp_uldaq_impl.h" #include - +/** + * @brief The maximum number of devices that can be enumerated when calling + * ulGetDaqDeviceInventory() + */ +const us MAX_ULDAQ_DEV_COUNT_PER_API = 100; void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { DEBUGTRACE_ENTER; - UlError err; unsigned int numdevs = MAX_ULDAQ_DEV_COUNT_PER_API; @@ -20,13 +24,13 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { DaqDeviceDescriptor descriptor; DaqDeviceInterface interfaceType = ANY_IFC; - err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, - static_cast(&numdevs)); + err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, &numdevs); if (err != ERR_NO_ERROR) { throw rte("UlDaq device inventarization failed"); } + DEBUGTRACE_PRINT(string("Number of devices: ") + std::to_string(numdevs)); for (unsigned i = 0; i < numdevs; i++) { descriptor = devdescriptors[i]; @@ -35,32 +39,34 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { devinfo._uldaqDescriptor = descriptor; devinfo.api = uldaqapi; - string name, interface; - string productname = descriptor.productName; - if (productname != "DT9837A") { - throw rte("Unknown UlDAQ type: " + productname); + { + string name; + string productname = descriptor.productName; + if (productname != "DT9837A") { + throw rte("Unknown UlDAQ type: " + productname); + } + + switch (descriptor.devInterface) { + case USB_IFC: + name = "USB - "; + break; + case BLUETOOTH_IFC: + /* devinfo. */ + name = "Bluetooth - "; + break; + + case ETHERNET_IFC: + /* devinfo. */ + name = "Ethernet - "; + break; + default: + name = "Uknown interface = "; + } + + name += productname + " " + string(descriptor.uniqueId); + devinfo.device_name = name; } - switch (descriptor.devInterface) { - case USB_IFC: - name = "USB - "; - break; - case BLUETOOTH_IFC: - /* devinfo. */ - name = "Bluetooth - "; - break; - - case ETHERNET_IFC: - /* devinfo. */ - name = "Ethernet - "; - break; - default: - name = "Uknown interface = "; - } - - name += string(descriptor.productName) + " " + string(descriptor.uniqueId); - devinfo.device_name = std::move(name); - devinfo.physicalOutputQty = DaqChannel::Qty::Voltage; devinfo.availableDataTypes.push_back( @@ -93,7 +99,12 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) { std::unique_ptr createUlDaqDevice(const DeviceInfo &devinfo, const DaqConfiguration &config) { - return std::make_unique(devinfo, config); + const UlDaqDeviceInfo *_info = + dynamic_cast(&devinfo); + if (_info == nullptr) { + throw rte("BUG: Could not cast DeviceInfo to UlDaqDeviceInfo"); + } + return std::make_unique(*_info, config); } #endif // LASP_HAS_ULDAQ diff --git a/src/lasp/device/lasp_uldaq.h b/src/lasp/device/lasp_uldaq.h index 26e2305..0b54961 100644 --- a/src/lasp/device/lasp_uldaq.h +++ b/src/lasp/device/lasp_uldaq.h @@ -1,18 +1,24 @@ #pragma once #include "lasp_daq.h" -/** - * @brief The maximum number of devices that can be enumerated when calling - * ulGetDaqDeviceInventory() +/** \addtogroup device + * \defgroup uldaq UlDAQ specific code + * This code is used to interface with UlDAQ compatible devices. It is only + * tested on Linux. + * @{ + * \addtogroup uldaq + * @{ */ -const us MAX_ULDAQ_DEV_COUNT_PER_API = 100; std::unique_ptr createUlDaqDevice(const DeviceInfo &devinfo, const DaqConfiguration &config); /** - * @brief Fill device info list with UlDaq specific devices, if any. + * @brief Append device info list with UlDaq specific devices, if any. * * @param devinfolist Info list to append to. */ void fillUlDaqDeviceInfo(DeviceInfoList& devinfolist); + +/** @} */ +/** @} */ diff --git a/src/lasp/device/lasp_uldaq_impl.cpp b/src/lasp/device/lasp_uldaq_impl.cpp deleted file mode 100644 index 6177b89..0000000 --- a/src/lasp/device/lasp_uldaq_impl.cpp +++ /dev/null @@ -1,484 +0,0 @@ -/* #define DEBUGTRACE_ENABLED */ -#include "debugtrace.hpp" -#include "lasp_config.h" - -#if LASP_HAS_ULDAQ == 1 -#include "lasp_daqconfig.h" -#include "lasp_uldaq.h" -#include "lasp_uldaq_impl.h" - -using namespace std::literals::chrono_literals; - -/** - * @brief Reserve some space for an error message from UlDaq - */ -const us UL_ERR_MSG_LEN = 512; - -/** - * @brief Return a string corresponding to the UlDaq API error - * - * @param err error code - * - * @return Error string - */ -string getErrMsg(UlError err) { - string errstr; - errstr.reserve(UL_ERR_MSG_LEN); - char errmsg[UL_ERR_MSG_LEN]; - errstr = "UlDaq API Error: "; - ulGetErrMsg(err, errmsg); - errstr += errmsg; - return errstr; -} -inline void showErr(string errstr) { - std::cerr << "\b\n**************** UlDAQ backend error **********\n"; - std::cerr << errstr << std::endl; - std::cerr << "***********************************************\n\n"; -} -inline void showErr(UlError err) { - if (err != ERR_NO_ERROR) - showErr(getErrMsg(err)); -} - -DT9837A::~DT9837A() { - UlError err; - if (isRunning()) { - stop(); - } - - if (_handle) { - err = ulDisconnectDaqDevice(_handle); - showErr(err); - err = ulReleaseDaqDevice(_handle); - showErr(err); - } -} -DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config) - : Daq(devinfo, config), - _nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) { - - // Some sanity checks - if (inchannel_config.size() != 4) { - throw rte("Invalid length of enabled inChannels vector"); - } - - if (outchannel_config.size() != 1) { - throw rte("Invalid length of enabled outChannels vector"); - } - - if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) { - throw rte("Unsensible number of samples per block chosen"); - } - - if (samplerate() < ULDAQ_SAMPLERATES.at(0) || samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size()-1)) { - throw rte("Invalid sample rate"); - } - - const UlDaqDeviceInfo *_info = - dynamic_cast(&devinfo); - if (_info == nullptr) { - throw rte("BUG: Could not cast DeviceInfo to UlDaqDeviceInfo"); - } - - // get a handle to the DAQ device associated with the first descriptor - _handle = ulCreateDaqDevice(_info->_uldaqDescriptor); - - if (_handle == 0) { - throw rte("Unable to create a handle to the specified DAQ " - "device. Is the device currently in use? Please make sure to set " - "the DAQ configuration in duplex mode if simultaneous input and " - "output is required."); - } - - UlError err = ulConnectDaqDevice(_handle); - - if (err != ERR_NO_ERROR) { - ulReleaseDaqDevice(_handle); - _handle = 0; - throw rte("Unable to connect to device: " + getErrMsg(err)); - } - - /// Loop over input channels, set parameters - for (us ch = 0; ch < 4; ch++) { - - err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0); - showErr(err); - if (err != ERR_NO_ERROR) { - throw rte("Fatal: could normalize channel sensitivity"); - } - - CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC; - err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Fatal: could not set AC/DC coupling mode"); - } - - IepeMode iepe = - inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED; - err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Fatal: could not set IEPE mode"); - } - } -} - -bool DT9837A::isRunning() const { - DEBUGTRACE_ENTER; - return _thread.joinable(); -} -void DT9837A::stop() { - DEBUGTRACE_ENTER; - StreamStatus status = _streamStatus; - if (!isRunning()) { - throw rte("No data acquisition running"); - } - - _stopThread = true; - if (_thread.joinable()) { - _thread.join(); - } - _stopThread = false; - status.isRunning = false; - _streamStatus = status; -} - -/** - * @brief Throws an appropriate stream exception based on the UlError number. - * The mapping is based on the error numbers as given in uldaq.h. There are a - * log of errors definded here (109 in total). Except for some, we will map - * most of them to a driver error. - * - * @param e - */ -inline void throwUlException(UlError err) { - if (err == ERR_NO_ERROR) { - return; - } - string errstr = getErrMsg(err); - showErr(errstr); - Daq::StreamStatus::StreamError serr; - if ((int)err == 18) { - serr = Daq::StreamStatus::StreamError::inputXRun; - } else if ((int)err == 19) { - serr = Daq::StreamStatus::StreamError::outputXRun; - } else { - serr = Daq::StreamStatus::StreamError::driverError; - } - - throw Daq::StreamException(serr, errstr); -} - -void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { - DEBUGTRACE_ENTER; - if (isRunning()) { - throw rte("DAQ is already running"); - } - if (neninchannels() > 0) { - if (!inCallback) - throw rte("DAQ requires a callback for input data"); - } - if (nenoutchannels() > 0) { - if (!outCallback) - throw rte("DAQ requires a callback for output data"); - } - assert(neninchannels() + nenoutchannels() > 0); - _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); -} - -InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb) - : BufHandler(daq, daq.neninchannels()), cb(cb) - -{ - DEBUGTRACE_ENTER; - assert(daq.getHandle() != 0); - - monitorOutput = daq.monitorOutput; - - DaqInScanFlag inscanflags = DAQINSCAN_FF_DEFAULT; - ScanOption scanoptions = SO_CONTINUOUS; - UlError err = ERR_NO_ERROR; - - std::vector indescs; - boolvec eninchannels_without_mon = daq.eninchannels(false); - - // Set ranges for each input. Below asks only channels that are not a - // monitor channel (hence the false flag). - dvec ranges = daq.inputRangeForEnabledChannels(false); - - us enabled_ch_count = 0; - for (us chin = 0; chin < 4; chin++) { - if (eninchannels_without_mon[chin] == true) { - DaqInChanDescriptor indesc; - indesc.type = DAQI_ANALOG_SE; - indesc.channel = chin; - - double rangeval = ranges.at(enabled_ch_count); - Range rangenum; - if (fabs(rangeval - 1.0) < 1e-8) { - rangenum = BIP1VOLTS; - } else if (fabs(rangeval - 10.0) < 1e-8) { - rangenum = BIP10VOLTS; - } else { - throw Daq::StreamException(Daq::StreamStatus::StreamError::logicError); - std::cerr << "Fatal: input range value is invalid" << endl; - return; - } - indesc.range = rangenum; - indescs.push_back(indesc); - enabled_ch_count++; - } - } - - // Add possibly last channel as monitor - if (monitorOutput) { - DaqInChanDescriptor indesc; - indesc.type = DAQI_DAC; - indesc.channel = 0; - /// The output only has a range of 10V, therefore the monitor of the - /// output also has to be set to this value. - indesc.range = BIP10VOLTS; - indescs.push_back(indesc); - } - assert(indescs.size() == nchannels); - - DEBUGTRACE_MESSAGE("Starting input scan"); - - err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels, - 2 * nFramesPerBlock, // Watch the 2 here! - &samplerate, scanoptions, inscanflags, buf.data()); - throwUlException(err); -} -void InBufHandler::start() { - - DEBUGTRACE_ENTER; - ScanStatus status; - TransferStatus transferStatus; - UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); - throwUlException(err); - - totalFramesCount = transferStatus.currentTotalCount; - topenqueued = true; - botenqueued = true; -} - -bool InBufHandler::operator()() { - - /* DEBUGTRACE_ENTER; */ - - bool ret = true; - - auto runCallback = ([&](us totalOffset) { - /* DEBUGTRACE_ENTER; */ - - DaqData data(nFramesPerBlock, nchannels, dtype_descr.dtype); - - us monitorOffset = monitorOutput ? 1 : 0; - /* /// Put the output monitor in front */ - if (monitorOutput) { - for (us frame = 0; frame < nFramesPerBlock; frame++) { - data.value(frame, 0) = - buf[totalOffset + (frame * nchannels) + (nchannels - 1)]; - } - } - - for (us channel = 0; channel < nchannels - monitorOffset; channel++) { - /* DEBUGTRACE_PRINT(channel); */ - for (us frame = 0; frame < nFramesPerBlock; frame++) { - data.value(frame, channel + monitorOffset) = - buf[totalOffset + (frame * nchannels) + channel]; - } - } - return cb(data); - }); - - ScanStatus status; - TransferStatus transferStatus; - - UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); - throwUlException(err); - - us increment = transferStatus.currentTotalCount - totalFramesCount; - totalFramesCount += increment; - - if (increment > nFramesPerBlock) { - throw Daq::StreamException(Daq::StreamStatus::StreamError::inputXRun); - } - assert(status == SS_RUNNING); - - if (transferStatus.currentIndex < (long long)buffer_mid_idx) { - topenqueued = false; - if (!botenqueued) { - ret = runCallback(nchannels * nFramesPerBlock); - botenqueued = true; - } - } else { - botenqueued = false; - if (!topenqueued) { - ret = runCallback(0); - topenqueued = true; - } - } - return ret; -} -InBufHandler::~InBufHandler() { - // At exit of the function, stop scanning. - DEBUGTRACE_ENTER; - UlError err = ulDaqInScanStop(daq.getHandle()); - if (err != ERR_NO_ERROR) { - showErr(err); - } -} - -OutBufHandler::OutBufHandler(DT9837A &daq, OutDaqCallback cb) - : BufHandler(daq, daq.nenoutchannels()), cb(cb) { - - DEBUGTRACE_MESSAGE("Starting output scan"); - DEBUGTRACE_PRINT(nchannels); - AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT; - ScanOption scanoptions = SO_CONTINUOUS; - UlError err = ulAOutScan(daq.getHandle(), 0, 0, BIP10VOLTS, - 2 * nFramesPerBlock, // Watch the 2 here! - &samplerate, scanoptions, outscanflags, buf.data()); - - throwUlException(err); -} -void OutBufHandler::start() { - - ScanStatus status; - TransferStatus transferStatus; - - UlError err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus); - if (err != ERR_NO_ERROR) { - showErr(err); - throw rte("Unable to start output on DAQ"); - } - if (status != SS_RUNNING) { - throw rte("Unable to start output on DAQ"); - } - totalFramesCount = transferStatus.currentTotalCount; - topenqueued = true; - botenqueued = true; -} -bool OutBufHandler::operator()() { - - /* DEBUGTRACE_ENTER; */ - bool res = true; - assert(daq.getHandle() != 0); - - UlError err = ERR_NO_ERROR; - - ScanStatus status; - TransferStatus transferStatus; - - err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus); - throwUlException(err); - if (status != SS_RUNNING) { - return false; - } - us increment = transferStatus.currentTotalCount - totalFramesCount; - totalFramesCount += increment; - - if (increment > nFramesPerBlock) { - throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun); - } - - if (transferStatus.currentIndex < buffer_mid_idx) { - topenqueued = false; - if (!botenqueued) { - DaqData d(nFramesPerBlock, 1, dtype_descr.dtype); - res = cb(d); - d.copyToRaw(0, reinterpret_cast(&(buf[buffer_mid_idx]))); - - botenqueued = true; - } - } else { - botenqueued = false; - if (!topenqueued) { - DaqData d(nFramesPerBlock, 1, dtype_descr.dtype); - res = cb(d); - d.copyToRaw(0, reinterpret_cast(&(buf[0]))); - - topenqueued = true; - } - } - return res; -} - -OutBufHandler::~OutBufHandler() { - DEBUGTRACE_ENTER; - UlError err = ulAOutScanStop(daq.getHandle()); - if (err != ERR_NO_ERROR) { - showErr(err); - } -} - -void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { - - DEBUGTRACE_ENTER; - - try { - - std::unique_ptr obh; - std::unique_ptr ibh; - - StreamStatus status = _streamStatus; - status.isRunning = true; - _streamStatus = status; - - if (nenoutchannels() > 0) { - assert(outCallback); - obh = std::make_unique(*this, outCallback); - } - if (neninchannels() > 0) { - assert(inCallback); - ibh = std::make_unique(*this, inCallback); - } - if (obh) - obh->start(); - if (ibh) - ibh->start(); - - const double sleeptime_s = - static_cast(_nFramesPerBlock) / (16 * samplerate()); - const us sleeptime_us = static_cast(sleeptime_s * 1e6); - - while (!_stopThread) { - if (ibh) { - if (!(*ibh)()) { - _stopThread = true; - break; - } - } - if (obh) { - if (!(*obh)()) { - _stopThread = true; - break; - } - } - - std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us)); - } - - /// Update stream status that we are not running anymore - status.isRunning = false; - _streamStatus = status; - _stopThread = false; - - } catch (StreamException &e) { - - StreamStatus status = _streamStatus; - // Copy over error type - status.errorType = e.e; - _streamStatus = status; - - /* - cerr << "\n******************\n"; - cerr << "Catched error in UlDAQ thread: " << e.what() << endl; - cerr << "\n******************\n"; - */ - } -} - -#endif // LASP_HAS_ULDAQ diff --git a/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp new file mode 100644 index 0000000..1eb9389 --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.cpp @@ -0,0 +1,245 @@ +/* #define DEBUGTRACE_ENABLED */ +#include "debugtrace.hpp" +#include "lasp_config.h" + +#if LASP_HAS_ULDAQ == 1 +#include "lasp_uldaq_bufhandler.h" +#include "lasp_daq.h" + +InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb) + : BufHandler(daq, daq.neninchannels()), cb(cb) + +{ + DEBUGTRACE_ENTER; + assert(daq.getHandle() != 0); + + monitorOutput = daq.monitorOutput; + + DaqInScanFlag inscanflags = DAQINSCAN_FF_DEFAULT; + ScanOption scanoptions = SO_CONTINUOUS; + UlError err = ERR_NO_ERROR; + + std::vector indescs; + boolvec eninchannels_without_mon = daq.eninchannels(false); + + // Set ranges for each input. Below asks only channels that are not a + // monitor channel (hence the false flag). + dvec ranges = daq.inputRangeForEnabledChannels(false); + + us enabled_ch_counter = 0; + for (us chin = 0; chin < 4; chin++) { + if (eninchannels_without_mon[chin] == true) { + DaqInChanDescriptor indesc; + indesc.type = DAQI_ANALOG_SE; + indesc.channel = chin; + + double rangeval = ranges.at(enabled_ch_counter); + Range rangenum; + if (fabs(rangeval - 1.0) < 1e-8) { + rangenum = BIP1VOLTS; + } else if (fabs(rangeval - 10.0) < 1e-8) { + rangenum = BIP10VOLTS; + } else { + throw Daq::StreamException(Daq::StreamStatus::StreamError::logicError); + std::cerr << "Fatal: input range value is invalid" << endl; + return; + } + indesc.range = rangenum; + indescs.push_back(indesc); + enabled_ch_counter++; + } + } + + // Add possibly last channel as monitor + if (monitorOutput) { + DaqInChanDescriptor indesc; + indesc.type = DAQI_DAC; + indesc.channel = 0; + /// The output only has a range of 10V, therefore the monitor of the + /// output also has to be set to this value. + indesc.range = BIP10VOLTS; + indescs.push_back(indesc); + } + assert(indescs.size() == nchannels); + + DEBUGTRACE_MESSAGE("Starting input scan"); + + err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels, + 2 * nFramesPerBlock, // Watch the 2 here! + &samplerate, scanoptions, inscanflags, buf.data()); + throwOnPossibleUlException(err); +} +void InBufHandler::start() { + + DEBUGTRACE_ENTER; + ScanStatus status; + TransferStatus transferStatus; + UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); + throwOnPossibleUlException(err); + + totalFramesCount = transferStatus.currentTotalCount; + topenqueued = true; + botenqueued = true; +} + +bool InBufHandler::operator()() { + + /* DEBUGTRACE_ENTER; */ + + bool ret = true; + + auto runCallback = ([&](us totalOffset) { + /* DEBUGTRACE_ENTER; */ + + DaqData data(nFramesPerBlock, nchannels, dtype_descr.dtype); + + us monitorOffset = monitorOutput ? 1 : 0; + /* /// Put the output monitor in front */ + if (monitorOutput) { + for (us frame = 0; frame < nFramesPerBlock; frame++) { + data.value(frame, 0) = + buf[totalOffset // Offset to lowest part of the buffer, or not + + (frame * nchannels) // Data is interleaved, so skip each + + (nchannels - 1)] // Monitor comes as last in the channel list, + // but we want it first in the output data. + ; + } + } + + // Now, all normal channels + for (us channel = 0; channel < nchannels - monitorOffset; channel++) { + /* DEBUGTRACE_PRINT(channel); */ + for (us frame = 0; frame < nFramesPerBlock; frame++) { + data.value(frame, channel + monitorOffset) = + buf[totalOffset + (frame * nchannels) + channel]; + } + } + return cb(data); + }); + + ScanStatus status; + TransferStatus transferStatus; + + UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); + throwOnPossibleUlException(err); + + us increment = transferStatus.currentTotalCount - totalFramesCount; + totalFramesCount += increment; + + if (increment > nFramesPerBlock) { + throw Daq::StreamException(Daq::StreamStatus::StreamError::inputXRun); + } + assert(status == SS_RUNNING); + + if (transferStatus.currentIndex < (long long)buffer_mid_idx) { + topenqueued = false; + if (!botenqueued) { + runCallback(nchannels * nFramesPerBlock); + botenqueued = true; + } + } else { + botenqueued = false; + if (!topenqueued) { + runCallback(0); + topenqueued = true; + } + } + return ret; +} +InBufHandler::~InBufHandler() { + // At exit of the function, stop scanning. + DEBUGTRACE_ENTER; + UlError err = ulDaqInScanStop(daq.getHandle()); + if (err != ERR_NO_ERROR) { + showErr(err); + } +} + +OutBufHandler::OutBufHandler(DT9837A &daq, OutDaqCallback cb) + : BufHandler(daq, daq.nenoutchannels()), cb(cb) { + + DEBUGTRACE_MESSAGE("Starting output scan"); + DEBUGTRACE_PRINT(nchannels); + AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT; + ScanOption scanoptions = SO_CONTINUOUS; + UlError err = ulAOutScan(daq.getHandle(), 0, 0, BIP10VOLTS, + 2 * nFramesPerBlock, // Watch the 2 here! + &samplerate, scanoptions, outscanflags, buf.data()); + + throwOnPossibleUlException(err); +} +void OutBufHandler::start() { + + ScanStatus status; + TransferStatus transferStatus; + + UlError err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus); + if (err != ERR_NO_ERROR) { + showErr(err); + throw rte("Unable to start output on DAQ"); + } + if (status != SS_RUNNING) { + throw rte("Unable to start output on DAQ"); + } + totalFramesCount = transferStatus.currentTotalCount; + topenqueued = true; + botenqueued = true; +} +bool OutBufHandler::operator()() { + + DEBUGTRACE_ENTER; + bool res = true; + assert(daq.getHandle() != 0); + + UlError err = ERR_NO_ERROR; + + ScanStatus status; + TransferStatus transferStatus; + + err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus); + throwOnPossibleUlException(err); + if (status != SS_RUNNING) { + return false; + } + us increment = transferStatus.currentTotalCount - totalFramesCount; + totalFramesCount += increment; + + if (increment > nFramesPerBlock) { + cerr << "totalFramesCount: " << totalFramesCount << ". Detected output underrun" << endl; + /* throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun); */ + } + + if (transferStatus.currentIndex < buffer_mid_idx) { + topenqueued = false; + if (!botenqueued) { + DaqData d(nFramesPerBlock, 1,// Only one output channel + dtype_descr.dtype); + // Receive data, run callback + cb(d); + d.copyToRaw(0, reinterpret_cast(&(buf[buffer_mid_idx]))); + + botenqueued = true; + } + } else { + botenqueued = false; + if (!topenqueued) { + DaqData d(nFramesPerBlock, 1,// Only one output channel + dtype_descr.dtype); + // Receive + cb(d); + d.copyToRaw(0, reinterpret_cast(&(buf[0]))); + + topenqueued = true; + } + } + return res; +} + +OutBufHandler::~OutBufHandler() { + DEBUGTRACE_ENTER; + UlError err = ulAOutScanStop(daq.getHandle()); + if (err != ERR_NO_ERROR) { + showErr(err); + } +} +#endif diff --git a/src/lasp/device/lasp_uldaq_impl.h b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h similarity index 50% rename from src/lasp/device/lasp_uldaq_impl.h rename to src/lasp/device/uldaq/lasp_uldaq_bufhandler.h index 90c308a..f1ea849 100644 --- a/src/lasp/device/lasp_uldaq_impl.h +++ b/src/lasp/device/uldaq/lasp_uldaq_bufhandler.h @@ -1,83 +1,20 @@ #pragma once -#include "lasp_daq.h" -#include -#include -#include -#include -#include -#include #include -#include +#include "lasp_types.h" +#include "lasp_uldaq_impl.h" +#include "lasp_uldaq_common.h" -using std::atomic; -using std::cerr; -using std::endl; -using rte = std::runtime_error; -/** - * @brief List of available sampling frequencies for DT9837A + +/** \addtogroup device + * @{ + * \addtogroup uldaq */ -const std::vector ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000, - 22050, 24000, 32000, 44056, 44100, - 47250, 48000, 50000, 50400, 51000}; - - -/** - * @brief UlDaq-specific device information. Adds a copy of the underlying - * DaqDeDaqDeviceDescriptor. - */ -class UlDaqDeviceInfo : public DeviceInfo { - -public: - DaqDeviceDescriptor _uldaqDescriptor; - virtual std::unique_ptr clone() const { - return std::make_unique(*this); - } -}; - -class DT9837A : public Daq { - - DaqDeviceHandle _handle = 0; - std::mutex _daqmutex; - - std::thread _thread; - atomic _stopThread{false}; - atomic _streamStatus; - - const us _nFramesPerBlock; - - void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback); - -public: - DaqDeviceHandle getHandle() const { return _handle; } - /** - * @brief Create a DT9837A instance. - * - * @param devinfo DeviceInfo to connect to - * @param config DaqConfiguration settings - */ - DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config); - - virtual ~DT9837A(); - - bool isRunning() const; - - void stop() override final; - - friend class InBufHandler; - friend class OutBufHandler; - - virtual void start(InDaqCallback inCallback, - OutDaqCallback outCallback) override final; - - virtual StreamStatus getStreamStatus() const override { - return _streamStatus; - } -}; - /** * @brief Helper class for managing input and output samples of the DAQ device. */ + +class DT9837A; class BufHandler { protected: /** @@ -96,12 +33,19 @@ protected: * @brief Sampling frequency in Hz */ double samplerate; + /** + * @brief Storage capacity for the DAQ I/O. + */ std::vector buf; /** - * @brief Whether the top / bottom part of the buffer are ready to be + * @brief Whether the top part of the buffer is enqueued + */ + bool topenqueued = false; + /** + * @brief Whether the bottom part of the buffer is enqueued * enqueued */ - bool topenqueued, botenqueued; + bool botenqueued = false; /** * @brief Counter for the total number of frames acquired / sent since the @@ -125,6 +69,7 @@ public: 0), buffer_mid_idx(nchannels * nFramesPerBlock) { assert(nchannels > 0); + assert(nFramesPerBlock > 0); } }; /** @@ -160,3 +105,5 @@ public: ~OutBufHandler(); }; +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_common.cpp b/src/lasp/device/uldaq/lasp_uldaq_common.cpp new file mode 100644 index 0000000..d05c629 --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_common.cpp @@ -0,0 +1,45 @@ +/* #define DEBUGTRACE_ENABLED */ +#include "debugtrace.hpp" +#include "lasp_config.h" + +#if LASP_HAS_ULDAQ == 1 +#include "lasp_uldaq_common.h" +#include "lasp_daq.h" + +string getErrMsg(UlError err) { + string errstr; + errstr.reserve(ERR_MSG_LEN); + char errmsg[ERR_MSG_LEN]; + errstr = "UlDaq API Error: "; + ulGetErrMsg(err, errmsg); + errstr += errmsg; + return errstr; +} +void showErr(string errstr) { + std::cerr << "\b\n**************** UlDAQ backend error **********\n"; + std::cerr << errstr << std::endl; + std::cerr << "***********************************************\n\n"; +} +void showErr(UlError err) { + if (err != ERR_NO_ERROR) + showErr(getErrMsg(err)); +} +#endif + +void throwOnPossibleUlException(UlError err) { + if (err == ERR_NO_ERROR) { + return; + } + string errstr = getErrMsg(err); + showErr(errstr); + Daq::StreamStatus::StreamError serr; + if ((int)err == 18) { + serr = Daq::StreamStatus::StreamError::inputXRun; + } else if ((int)err == 19) { + serr = Daq::StreamStatus::StreamError::outputXRun; + } else { + serr = Daq::StreamStatus::StreamError::driverError; + } + + throw Daq::StreamException(serr, errstr); +} diff --git a/src/lasp/device/uldaq/lasp_uldaq_common.h b/src/lasp/device/uldaq/lasp_uldaq_common.h new file mode 100644 index 0000000..f9ac26a --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_common.h @@ -0,0 +1,66 @@ +#pragma once +#include +#include +#include "lasp_deviceinfo.h" + +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ +/** + * @brief Throws an appropriate stream exception based on the UlError number. + * The mapping is based on the error numbers as given in uldaq.h. There are a + * log of errors definded here (109 in total). Except for some, we will map + * most of them to a driver error. + * + * @param e The backend error code. + */ +void throwOnPossibleUlException(UlError err); + +/** + * @brief Return a string corresponding to the UlDaq API error + * + * @param err error code + * + * @return Error string + */ +string getErrMsg(UlError err); + +/** + * @brief Print error message to stderr + * + * @param errstr The string to print + */ +void showErr(UlError err); + +/** + * @brief Get a string representation of the error + * + * @param errstr + */ +void showErr(std::string errstr); + +/** + * @brief UlDaq-specific device information. Adds a copy of the underlying + * DaqDeDaqDeviceDescriptor. + */ +class UlDaqDeviceInfo : public DeviceInfo { + +public: + DaqDeviceDescriptor _uldaqDescriptor; + virtual std::unique_ptr clone() const override { + DEBUGTRACE_ENTER; + return std::make_unique(*this); + } +}; + +/** + * @brief List of available sampling frequencies for DT9837A + */ +const std::vector ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000, + 22050, 24000, 32000, 44056, 44100, + 47250, 48000, 50000, 50400, 51000}; + + +/** @} */ +/** @} */ diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.cpp b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp new file mode 100644 index 0000000..86b00f5 --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.cpp @@ -0,0 +1,212 @@ +/* #define DEBUGTRACE_ENABLED */ +#include "debugtrace.hpp" +#include "lasp_config.h" + +#if LASP_HAS_ULDAQ == 1 +#include "lasp_daqconfig.h" +#include "lasp_uldaq.h" +#include "lasp_uldaq_bufhandler.h" +#include "lasp_uldaq_impl.h" + +using namespace std::literals::chrono_literals; + +DT9837A::~DT9837A() { + DEBUGTRACE_ENTER; + UlError err; + if (isRunning()) { + DEBUGTRACE_PRINT("Stop UlDAQ from destructor"); + stop(); + } + + if (_handle) { + DEBUGTRACE_PRINT("Disconnecting and releasing DaqDevice"); + /* err = ulDisconnectDaqDevice(_handle); */ + /* showErr(err); */ + err = ulReleaseDaqDevice(_handle); + showErr(err); + } +} +DT9837A::DT9837A(const UlDaqDeviceInfo &devinfo, const DaqConfiguration &config) + : Daq(devinfo, config), + _nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) { + + const DaqDeviceDescriptor &descriptor = devinfo._uldaqDescriptor; + DEBUGTRACE_PRINT(string("Device: ") + descriptor.productName); + DEBUGTRACE_PRINT(string("Product id: ") + to_string(descriptor.productId)); + DEBUGTRACE_PRINT(string("Dev string: ") + descriptor.devString); + DEBUGTRACE_PRINT(string("Unique id: ") + descriptor.uniqueId); + + // get a handle to the DAQ device associated with the first descriptor + _handle = ulCreateDaqDevice(descriptor); + + if (_handle == 0) { + throw rte("Unable to create a handle to the specified DAQ " + "device. Is the device currently in use? Please make sure to set " + "the DAQ configuration in duplex mode if simultaneous input and " + "output is required."); + } + + UlError err = ulConnectDaqDevice(_handle); + + if (err != ERR_NO_ERROR) { + ulReleaseDaqDevice(_handle); + _handle = 0; + throw rte("Unable to connect to device: " + getErrMsg(err)); + } + + /// Loop over input channels, set parameters + for (us ch = 0; ch < 4; ch++) { + + err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0); + showErr(err); + if (err != ERR_NO_ERROR) { + throw rte("Fatal: could normalize channel sensitivity"); + } + + CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC; + err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm); + if (err != ERR_NO_ERROR) { + showErr(err); + throw rte("Fatal: could not set AC/DC coupling mode"); + } + + IepeMode iepe = + inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED; + err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe); + if (err != ERR_NO_ERROR) { + showErr(err); + throw rte("Fatal: could not set IEPE mode"); + } + } +} + +bool DT9837A::isRunning() const { + DEBUGTRACE_ENTER; + /* return _thread.joinable(); */ + StreamStatus status = _streamStatus; + return status.isRunning; +} +void DT9837A::stop() { + DEBUGTRACE_ENTER; + StreamStatus status = _streamStatus; + status.isRunning = true; + _streamStatus = status; + if (!isRunning()) { + throw rte("No data acquisition running"); + } + + // Stop the thread and join it + _stopThread = true; + assert(_thread.joinable()); + _thread.join(); + _stopThread = false; + + // Update stream status + status.isRunning = false; + _streamStatus = status; +} + +void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { + DEBUGTRACE_ENTER; + if (isRunning()) { + throw rte("DAQ is already running"); + } + if (neninchannels() > 0) { + if (!inCallback) + throw rte("DAQ requires a callback for input data"); + } + if (nenoutchannels() > 0) { + if (!outCallback) + throw rte("DAQ requires a callback for output data"); + } + assert(neninchannels() + nenoutchannels() > 0); + _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback); + +} + +void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { + + DEBUGTRACE_ENTER; + + try { + + std::unique_ptr obh; + std::unique_ptr ibh; + + StreamStatus status = _streamStatus; + status.isRunning = true; + _streamStatus = status; + + if (nenoutchannels() > 0) { + assert(outCallback); + obh = std::make_unique(*this, outCallback); + } + if (neninchannels() > 0) { + assert(inCallback); + ibh = std::make_unique(*this, inCallback); + } + if (obh) + obh->start(); + if (ibh) + ibh->start(); + + const double sleeptime_s = + static_cast(_nFramesPerBlock) / (16 * samplerate()); + const us sleeptime_us = static_cast(sleeptime_s * 1e6); + + while (!_stopThread) { + if (ibh) { + if (!(*ibh)()) { + _stopThread = true; + break; + } + } + if (obh) { + if (!(*obh)()) { + _stopThread = true; + break; + } + } + + std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us)); + } + + /// Update stream status that we are not running anymore + status.isRunning = false; + _streamStatus = status; + _stopThread = false; + + } catch (StreamException &e) { + + StreamStatus status = _streamStatus; + // Copy over error type + status.errorType = e.e; + _streamStatus = status; + + cerr << "\n******************\n"; + cerr << "Catched error in UlDAQ thread: " << e.what() << endl; + cerr << "\n******************\n"; + } +} + +void DT9837A::sanityChecks() const { + // Some sanity checks + if (inchannel_config.size() != 4) { + throw rte("Invalid length of enabled inChannels vector"); + } + + if (outchannel_config.size() != 1) { + throw rte("Invalid length of enabled outChannels vector"); + } + + if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) { + throw rte("Unsensible number of samples per block chosen"); + } + + if (samplerate() < ULDAQ_SAMPLERATES.at(0) || + samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size() - 1)) { + throw rte("Invalid sample rate"); + } +} + +#endif // LASP_HAS_ULDAQ diff --git a/src/lasp/device/uldaq/lasp_uldaq_impl.h b/src/lasp/device/uldaq/lasp_uldaq_impl.h new file mode 100644 index 0000000..648e121 --- /dev/null +++ b/src/lasp/device/uldaq/lasp_uldaq_impl.h @@ -0,0 +1,110 @@ +#pragma once +#include "debugtrace.hpp" +#include "lasp_uldaq_common.h" +#include +#include +#include +#include +#include +#include +#include +#include "lasp_daq.h" + +using std::atomic; +using std::cerr; +using std::endl; +using rte = std::runtime_error; + +class InBufHandler; +class OutBufHandler; + +/** \addtogroup device + * @{ + * \addtogroup uldaq + */ + +/** + * @brief Data translation DT9837A Daq device. + */ +class DT9837A : public Daq { + + DaqDeviceHandle _handle = 0; + std::mutex _daqmutex; + + /** + * @brief The thread that is doing I/O with UlDaq + */ + std::thread _thread; + + + /** + * @brief Flag indicating the thread to stop processing. + */ + atomic _stopThread{false}; + /** + * @brief Storage for exchanging information on the stream + */ + atomic _streamStatus; + + const us _nFramesPerBlock; + + /** + * @brief The function that is running in a thread + * + * @param inCallback + * @param outcallback + */ + void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback); + + /** + * @brief Obtain a handle to the underlying device + * + * @return Handle + */ + DaqDeviceHandle getHandle() const { return _handle; } + /** + * @brief Perform several sanity checks + */ + void sanityChecks() const; +public: + + /** + * @brief Create a DT9837A instance. + * + * @param devinfo DeviceInfo to connect to + * @param config DaqConfiguration settings + */ + DT9837A(const UlDaqDeviceInfo &devinfo, const DaqConfiguration &config); + + virtual ~DT9837A(); + + /** + * @brief Returns true when the stream is running + * + * @return as above stated + */ + bool isRunning() const; + + /** + * @brief Stop the data-acquisition + */ + void stop() override final; + + friend class InBufHandler; + friend class OutBufHandler; + + virtual void start(InDaqCallback inCallback, + OutDaqCallback outCallback) override final; + + /** + * @brief Obtain copy of stream status (thread-safe function) + * + * @return StreamStatus object + */ + virtual StreamStatus getStreamStatus() const override { + return _streamStatus; + } +}; + +/** @} */ +/** @} */ diff --git a/src/lasp/dsp/lasp_biquadbank.cpp b/src/lasp/dsp/lasp_biquadbank.cpp index 09ec834..d31744e 100644 --- a/src/lasp/dsp/lasp_biquadbank.cpp +++ b/src/lasp/dsp/lasp_biquadbank.cpp @@ -44,19 +44,21 @@ SeriesBiquad::SeriesBiquad(const vd &filter_coefs) { SeriesBiquad SeriesBiquad::firstOrderHighPass(const d fs, const d cuton_Hz) { - if(fs <= 0) { + if (fs <= 0) { throw rte("Invalid sampling frequency: " + std::to_string(fs) + " [Hz]"); } - if(cuton_Hz <= 0) { + if (cuton_Hz <= 0) { throw rte("Invalid cuton frequency: " + std::to_string(cuton_Hz) + " [Hz]"); } - if(cuton_Hz >= 0.98*fs/2) { - throw rte("Invalid cuton frequency. We limit this to 0.98* fs / 2. Given value" + std::to_string(cuton_Hz) + " [Hz]"); + if (cuton_Hz >= 0.98 * fs / 2) { + throw rte( + "Invalid cuton frequency. We limit this to 0.98* fs / 2. Given value" + + std::to_string(cuton_Hz) + " [Hz]"); } - const d tau = 1/(2*arma::datum::pi*cuton_Hz); - const d facnum = 2*fs*tau/(1+2*fs*tau); - const d facden = (1-2*fs*tau)/(1+2*fs*tau); + const d tau = 1 / (2 * arma::datum::pi * cuton_Hz); + const d facnum = 2 * fs * tau / (1 + 2 * fs * tau); + const d facden = (1 - 2 * fs * tau) / (1 + 2 * fs * tau); vd coefs(6); // b0 @@ -76,10 +78,8 @@ SeriesBiquad SeriesBiquad::firstOrderHighPass(const d fs, const d cuton_Hz) { coefs(5) = 0; return SeriesBiquad(coefs); - } - std::unique_ptr SeriesBiquad::clone() const { // sos.as_col() concatenates all columns, exactly what we want. return std::make_unique(sos.as_col()); @@ -124,7 +124,6 @@ BiquadBank::BiquadBank(const dmat &filters, const vd *gains) { * for use. */ lock lck(_mtx); - getPool(); for (us i = 0; i < filters.n_cols; i++) { _filters.emplace_back(filters.col(i)); @@ -153,16 +152,15 @@ void BiquadBank::filter(vd &inout) { std::vector> futs; #if 1 - auto &pool = getPool(); vd inout_cpy = inout; for (us i = 0; i < _filters.size(); i++) { - futs.emplace_back(pool.submit( - [&](vd inout, us i) { + futs.emplace_back(_pool.submit( + [&](vd inout, us i) { _filters[i].filter(inout); return inout; - }, // Launch a task to filter. - inout_cpy, i // Column i as argument to the lambda function above. - )); + }, // Launch a task to filter. + inout_cpy, i // Column i as argument to the lambda function above. + )); } // Zero-out in-out and sum-up the filtered values diff --git a/src/lasp/dsp/lasp_biquadbank.h b/src/lasp/dsp/lasp_biquadbank.h index 1426cb5..0328cea 100644 --- a/src/lasp/dsp/lasp_biquadbank.h +++ b/src/lasp/dsp/lasp_biquadbank.h @@ -1,5 +1,6 @@ #pragma once #include "lasp_filter.h" +#include "lasp_thread.h" /** * \addtogroup dsp @@ -60,6 +61,7 @@ public: class BiquadBank : public Filter { std::vector _filters; vd _gains; + ThreadSafeThreadPool _pool; mutable std::mutex _mtx; public: diff --git a/src/lasp/dsp/lasp_clip.cpp b/src/lasp/dsp/lasp_clip.cpp index a3b8c8f..0be004c 100644 --- a/src/lasp/dsp/lasp_clip.cpp +++ b/src/lasp/dsp/lasp_clip.cpp @@ -1,6 +1,8 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" #include "lasp_clip.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include using std::cerr; @@ -9,13 +11,14 @@ using std::endl; using Lck = std::scoped_lock; using rte = std::runtime_error; -ClipHandler::ClipHandler(StreamMgr &mgr) +ClipHandler::ClipHandler(SmgrHandle mgr) : ThreadedInDataHandler(mgr){ DEBUGTRACE_ENTER; + startThread(); } -bool ClipHandler::inCallback_threaded(const DaqData &d) { +void ClipHandler::inCallback(const DaqData &d) { DEBUGTRACE_ENTER; Lck lck(_mtx); @@ -49,7 +52,6 @@ bool ClipHandler::inCallback_threaded(const DaqData &d) { _clip_time(i) += _dt; } } - return true; } arma::uvec ClipHandler::getCurrentValue() const { @@ -89,6 +91,5 @@ void ClipHandler::reset(const Daq *daq) { ClipHandler::~ClipHandler() { DEBUGTRACE_ENTER; - Lck lck(_mtx); - stop(); + stopThread(); } diff --git a/src/lasp/dsp/lasp_clip.h b/src/lasp/dsp/lasp_clip.h index e5f76e5..2294558 100644 --- a/src/lasp/dsp/lasp_clip.h +++ b/src/lasp/dsp/lasp_clip.h @@ -21,7 +21,7 @@ /** * @brief Clipping detector (Clip). Detects when a signal overdrives the input * */ -class ClipHandler: public ThreadedInDataHandler { +class ClipHandler: public ThreadedInDataHandler { /** * @brief Assuming full scale of a signal is +/- 1.0. If a value is found @@ -58,7 +58,7 @@ class ClipHandler: public ThreadedInDataHandler { * * @param mgr Stream Mgr to operate on */ - ClipHandler(StreamMgr& mgr); + ClipHandler(SmgrHandle mgr); ~ClipHandler(); /** @@ -68,8 +68,8 @@ class ClipHandler: public ThreadedInDataHandler { */ arma::uvec getCurrentValue() const; - bool inCallback_threaded(const DaqData& ) override final; - void reset(const Daq*) override final; + void inCallback(const DaqData& ); + void reset(const Daq*); }; diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 4e9c640..aa92f6b 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -1,6 +1,8 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" #include "lasp_ppm.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include using std::cerr; @@ -9,13 +11,14 @@ using std::endl; using Lck = std::scoped_lock; using rte = std::runtime_error; -PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps) - : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { +PPMHandler::PPMHandler(SmgrHandle mgr, const d decay_dBps) + : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { DEBUGTRACE_ENTER; + startThread(); } -bool PPMHandler::inCallback_threaded(const DaqData &d) { +void PPMHandler::inCallback(const DaqData &d) { DEBUGTRACE_ENTER; Lck lck(_mtx); @@ -61,12 +64,11 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) { _cur_max(i) *= _alpha; } } - return true; } std::tuple PPMHandler::getCurrentValue() const { - DEBUGTRACE_ENTER; + /* DEBUGTRACE_ENTER; */ Lck lck(_mtx); arma::uvec clips(_clip_time.size(), arma::fill::zeros); @@ -82,9 +84,11 @@ void PPMHandler::reset(const Daq *daq) { if (daq) { + DEBUGTRACE_PRINT("New daq found"); _cur_max.fill(1e-80); const us nchannels = daq->neninchannels(); + DEBUGTRACE_PRINT(nchannels); _max_range.resize(nchannels); dvec ranges = daq->inputRangeForEnabledChannels(); @@ -106,6 +110,5 @@ void PPMHandler::reset(const Daq *daq) { PPMHandler::~PPMHandler() { DEBUGTRACE_ENTER; - Lck lck(_mtx); - stop(); + stopThread(); } diff --git a/src/lasp/dsp/lasp_ppm.h b/src/lasp/dsp/lasp_ppm.h index 51b0872..b30ecc6 100644 --- a/src/lasp/dsp/lasp_ppm.h +++ b/src/lasp/dsp/lasp_ppm.h @@ -4,7 +4,6 @@ // // Description: Peak Programme Meter #pragma once -#include #include "lasp_filter.h" #include "lasp_mathtypes.h" #include "lasp_threadedindatahandler.h" @@ -23,7 +22,7 @@ * with a certain amount of dB/s. If a new peak is found, it goes up again. * Also detects clipping. * */ -class PPMHandler: public ThreadedInDataHandler { +class PPMHandler : public ThreadedInDataHandler { /** * @brief Assuming full scale of a signal is +/- 1.0. If a value is found @@ -69,11 +68,11 @@ class PPMHandler: public ThreadedInDataHandler { /** * @brief Constructs Peak Programme Meter * - * @param mgr Stream Mgr to operate on + * @param mgr Stream Mgr to install callbacks for * @param decay_dBps The level decay in units dB/s, after a peak has been * hit. */ - PPMHandler(StreamMgr& mgr,const d decay_dBps = 20.0); + PPMHandler(SmgrHandle mgr,const d decay_dBps = 20.0); ~PPMHandler(); /** @@ -91,8 +90,8 @@ class PPMHandler: public ThreadedInDataHandler { * * @return true when stream should continue. */ - bool inCallback_threaded(const DaqData& d) override final; - void reset(const Daq*) override final; + void inCallback(const DaqData& d); + void reset(const Daq*); }; diff --git a/src/lasp/dsp/lasp_rtaps.cpp b/src/lasp/dsp/lasp_rtaps.cpp index 1e7d2cb..96d9802 100644 --- a/src/lasp/dsp/lasp_rtaps.cpp +++ b/src/lasp/dsp/lasp_rtaps.cpp @@ -1,5 +1,7 @@ /* #define DEBUGTRACE_ENABLED */ #include "lasp_rtaps.h" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include "debugtrace.hpp" #include @@ -7,7 +9,7 @@ using std::cerr; using std::endl; using Lck = std::scoped_lock; -RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, +RtAps::RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, const us nfft, const Window::WindowType w, const d overlap_percentage, const d time_constant) @@ -18,12 +20,12 @@ RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, _filterPrototype = freqWeightingFilter->clone(); } + startThread(); } RtAps::~RtAps() { - Lck lck(_ps_mtx); - stop(); + stopThread(); } -bool RtAps::inCallback_threaded(const DaqData &data) { +void RtAps::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -33,7 +35,7 @@ bool RtAps::inCallback_threaded(const DaqData &data) { const us nchannels = fltdata.n_cols; if(nchannels != _sens.size()) { cerr << "**** Error: sensitivity size does not match! *****" << endl; - return false; + return; } fltdata.each_row() %= _sens.as_row(); @@ -61,7 +63,6 @@ bool RtAps::inCallback_threaded(const DaqData &data) { _ps.compute(fltdata); - return true; } void RtAps::reset(const Daq *daq) { diff --git a/src/lasp/dsp/lasp_rtaps.h b/src/lasp/dsp/lasp_rtaps.h index 5c86bb0..b55240f 100644 --- a/src/lasp/dsp/lasp_rtaps.h +++ b/src/lasp/dsp/lasp_rtaps.h @@ -23,7 +23,7 @@ * @brief Real time spectral estimator using Welch method of spectral * estimation. */ -class RtAps : public ThreadedInDataHandler { +class RtAps : public ThreadedInDataHandler { std::unique_ptr _filterPrototype; std::vector> _freqWeightingFilters; @@ -49,7 +49,7 @@ public: * * For all other arguments, see constructor of AvPowerSpectra */ - RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, const us nfft = 2048, + RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, const us nfft = 2048, const Window::WindowType w = Window::WindowType::Hann, const d overlap_percentage = 50., const d time_constant = -1); ~RtAps(); @@ -69,8 +69,8 @@ public: * * @return true if stream should continue. */ - bool inCallback_threaded(const DaqData & d) override final; - void reset(const Daq *) override final; + void inCallback(const DaqData & d); + void reset(const Daq *); }; /** @} */ diff --git a/src/lasp/dsp/lasp_rtsignalviewer.cpp b/src/lasp/dsp/lasp_rtsignalviewer.cpp index fa16ca6..bbef3ad 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.cpp +++ b/src/lasp/dsp/lasp_rtsignalviewer.cpp @@ -1,5 +1,7 @@ /* #define DEBUGTRACE_ENABLED */ #include "debugtrace.hpp" +#include "lasp_daqdata.h" +#include "lasp_daq.h" #include "lasp_rtsignalviewer.h" #include #include @@ -9,7 +11,7 @@ using std::endl; using Lck = std::scoped_lock; using rte = std::runtime_error; -RtSignalViewer::RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, +RtSignalViewer::RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, const us resolution, const us channel) : ThreadedInDataHandler(mgr), _approx_time_hist(approx_time_hist), _resolution(resolution), _channel(channel) { @@ -22,9 +24,10 @@ RtSignalViewer::RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, if (resolution <= 1) { throw rte("Invalid resolution. Should be > 1"); } + startThread(); } -bool RtSignalViewer::inCallback_threaded(const DaqData &data) { +void RtSignalViewer::inCallback(const DaqData &data) { DEBUGTRACE_ENTER; @@ -49,13 +52,10 @@ bool RtSignalViewer::inCallback_threaded(const DaqData &data) { _dat(_resolution-1, 1) = newmin; _dat(_resolution-1, 2) = newmax; } - - return true; } RtSignalViewer::~RtSignalViewer() { - Lck lck(_sv_mtx); - stop(); + stopThread(); } void RtSignalViewer::reset(const Daq *daq) { diff --git a/src/lasp/dsp/lasp_rtsignalviewer.h b/src/lasp/dsp/lasp_rtsignalviewer.h index 85fefd1..253a934 100644 --- a/src/lasp/dsp/lasp_rtsignalviewer.h +++ b/src/lasp/dsp/lasp_rtsignalviewer.h @@ -24,7 +24,7 @@ * @brief Real time signal viewer. Shows envelope of the signal based on amount * of history shown. */ -class RtSignalViewer : public ThreadedInDataHandler { +class RtSignalViewer : public ThreadedInDataHandler { /** * @brief Storage for sensitivity values @@ -71,7 +71,7 @@ public: * @param resolution Number of time points * @param channel The channel number */ - RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, const us resolution, + RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, const us resolution, const us channel); ~RtSignalViewer(); @@ -85,8 +85,8 @@ public: */ dmat getCurrentValue() const; - bool inCallback_threaded(const DaqData &) override final; - void reset(const Daq *) override final; + void inCallback(const DaqData &); + void reset(const Daq *); }; /** @} */ diff --git a/src/lasp/dsp/lasp_slm.cpp b/src/lasp/dsp/lasp_slm.cpp index e934800..74f9b28 100644 --- a/src/lasp/dsp/lasp_slm.cpp +++ b/src/lasp/dsp/lasp_slm.cpp @@ -37,9 +37,6 @@ SLM::SLM(const d fs, const d Lref, const us downsampling_fac, const d tau, DEBUGTRACE_ENTER; DEBUGTRACE_PRINT(_alpha); - // Make sure thread pool is running - getPool(); - if (Lref <= 0) { throw rte("Invalid reference level"); } diff --git a/src/lasp/dsp/lasp_slm.h b/src/lasp/dsp/lasp_slm.h index f4afc0e..1c2d871 100644 --- a/src/lasp/dsp/lasp_slm.h +++ b/src/lasp/dsp/lasp_slm.h @@ -1,6 +1,7 @@ #pragma once #include "lasp_biquadbank.h" #include "lasp_filter.h" +#include "lasp_thread.h" #include #include @@ -14,6 +15,7 @@ * channel. A channel is the result of a filtered signal */ class SLM { + ThreadSafeThreadPool _pool; /** * @brief A, C or Z weighting, depending on the pre-filter installed. */ diff --git a/src/lasp/dsp/lasp_thread.cpp b/src/lasp/dsp/lasp_thread.cpp index 76bc400..a04e051 100644 --- a/src/lasp/dsp/lasp_thread.cpp +++ b/src/lasp/dsp/lasp_thread.cpp @@ -5,21 +5,31 @@ #include /** - * @brief It seems to work much better in cooperation with Pybind11 when this - * singleton is implemented with a unique_ptr. + * @brief Store a global weak_ptr, that is used to create new shared pointers + * if any other shared pointers are still alive. If not, we create a new + * instance. */ -std::unique_ptr _static_storage_threadpool; +std::weak_ptr _global_weak_pool; -void destroyThreadPool() { +/** + * @brief Static storage for the mutex. + */ +std::mutex ThreadSafeThreadPool::_mtx; + +using Lck = std::scoped_lock; +using rte = std::runtime_error; + +ThreadSafeThreadPool::ThreadSafeThreadPool() { DEBUGTRACE_ENTER; - _static_storage_threadpool = nullptr; -} - -BS::thread_pool &getPool() { - /* DEBUGTRACE_ENTER; */ - if (!_static_storage_threadpool) { - DEBUGTRACE_PRINT("Creating new thread pool"); - _static_storage_threadpool = std::make_unique(); + Lck lck(_mtx); + /// See if we can get it from the global ptr. If not, time to allocate it. + _pool = _global_weak_pool.lock(); + if (!_pool) { + _pool = std::make_shared(); + if (!_pool) { + throw rte("Fatal: could not allocate thread pool!"); + } + // Update global weak pointer + _global_weak_pool = _pool; } - return *_static_storage_threadpool; } diff --git a/src/lasp/dsp/lasp_thread.h b/src/lasp/dsp/lasp_thread.h index 8a6c0ca..c28805e 100644 --- a/src/lasp/dsp/lasp_thread.h +++ b/src/lasp/dsp/lasp_thread.h @@ -2,18 +2,54 @@ #include "BS_thread_pool.hpp" /** - * @brief Return reference to global (singleton) thread pool. The threadpool is - * created using the default argument, which results in exactly - * hardware_concurrency() amount of threads. - * - * @return Thread pool ref. + * @brief Simple wrapper around BS::thread_pool that makes a BS::thread_pool a + * singleton, such that a thread pool can be used around in the code, and + * safely spawn threads also from other threads. Only wraps a submit() and + * push_task for now. */ -BS::thread_pool& getPool(); +class ThreadSafeThreadPool { + /** + * @brief Shared access to the thread pool. + */ + std::shared_ptr _pool; + /** + * @brief Global mutex, used to restrict pool access to a single thread at + * once. + */ + static std::mutex _mtx; + + using Lck = std::scoped_lock; + ThreadSafeThreadPool(const ThreadSafeThreadPool&) = delete; + ThreadSafeThreadPool & + operator=(const ThreadSafeThreadPool&) = delete; + +public: + /** + * @brief Instantiate handle to the thread pool. + */ + ThreadSafeThreadPool(); + + + /** + * @brief Wrapper around BS::thread_pool::submit(...) + */ + template < + typename F, typename... A, + typename R = std::invoke_result_t, std::decay_t...>> + [[nodiscard]] std::future submit(F &&task, A &&...args) { + /// Lock access to pool + Lck lck(_mtx); + + return _pool->submit(task, args...); + } + /** + * @brief Wrapper around BS::thread_pool::push_task(...) + */ + template void push_task(F &&task, A &&...args) { + /// Lock access to pool + Lck lck(_mtx); + _pool->push_task(task, args...); + } +}; -/** - * @brief The global thread pool is stored in a unique_ptr, so in normal C++ - * code the thread pool is deleted at the end of main(). However this does not - * hold when LASP code is run - */ -void destroyThreadPool(); diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 9223720..340dc1a 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -1,24 +1,27 @@ /* #define DEBUGTRACE_ENABLED */ #include "lasp_threadedindatahandler.h" #include "debugtrace.hpp" +#include "lasp_daqdata.h" #include "lasp_thread.h" #include -#include -#include #include +#include +#include using namespace std::literals::chrono_literals; using lck = std::scoped_lock; using rte = std::runtime_error; using std::cerr; using std::endl; +using std::placeholders::_1; class SafeQueue { std::queue _queue; std::mutex _mtx; - std::atomic_int32_t _contents {0}; - public: - void push(const DaqData& d) { + std::atomic_int32_t _contents{0}; + +public: + void push(const DaqData &d) { DEBUGTRACE_ENTER; lck lock(_mtx); _queue.push(d); @@ -47,38 +50,49 @@ class SafeQueue { bool empty() const { return _contents == 0; } }; -ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) - : InDataHandler(mgr), _queue(std::make_unique()) { +ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr, + InCallbackType cb, + InResetType reset) + : _indatahandler( + mgr, + std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this, + _1), + reset), + _queue(std::make_unique()), inCallback(cb) { - DEBUGTRACE_ENTER; - - // Initialize thread pool, if not already done - getPool(); - } - -bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { DEBUGTRACE_ENTER; - if (!_lastCallbackResult) { - return false; - } - - _queue->push(daqdata); - - if (!_thread_running && (!_stopThread) && _lastCallbackResult) { - auto &pool = getPool(); - DEBUGTRACE_PRINT("Pushing new thread in pool"); - _thread_running = true; - pool.push_task(&ThreadedInDataHandler::threadFcn, this); - } - - return _lastCallbackResult; +} +void ThreadedInDataHandlerBase::startThread() { + DEBUGTRACE_ENTER; + _thread_can_safely_run = true; + _indatahandler.start(); } -ThreadedInDataHandler::~ThreadedInDataHandler() { - +void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( + const DaqData &daqdata) { DEBUGTRACE_ENTER; - _stopThread = true; + std::scoped_lock lck(_mtx); + + // Early return in case object is under DESTRUCTION + if (!_thread_can_safely_run) + return; + + _queue->push(daqdata); + if (!_thread_running) { + DEBUGTRACE_PRINT("Pushing new thread in pool"); + _thread_running = true; + _pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this); + } +} + +void ThreadedInDataHandlerBase::stopThread() { + DEBUGTRACE_ENTER; + // Make sure inCallback is no longer called + _thread_can_safely_run = false; + _indatahandler.stop(); + + std::scoped_lock lck(_mtx); // Then wait in steps for the thread to stop running. while (_thread_running) { @@ -86,18 +100,26 @@ ThreadedInDataHandler::~ThreadedInDataHandler() { } } -void ThreadedInDataHandler::threadFcn() { +ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { + + DEBUGTRACE_ENTER; + if (_thread_can_safely_run) { + stopThread(); + cerr << "*** BUG: InDataHandlers have not been all stopped, while " + "StreamMgr destructor is called. This is a misuse BUG." + << endl; + abort(); + } +} + +void ThreadedInDataHandlerBase::threadFcn() { DEBUGTRACE_ENTER; - while(!_queue->empty() && !_stopThread) { + while (!_queue->empty() && _thread_can_safely_run) { // Call inCallback_threaded - if (!inCallback_threaded(_queue->pop())) { - cerr << "*********** Callback result returned false! *************" - << endl; - _lastCallbackResult = false; - } + inCallback(_queue->pop()); } _thread_running = false; } diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index 20f5df2..b769ad1 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -1,9 +1,14 @@ #pragma once -#include "lasp_streammgr.h" +#include "debugtrace.hpp" +#include "lasp_indatahandler.h" +#include "lasp_thread.h" +#include +#include +#include +using std::placeholders::_1; const us RINGBUFFER_SIZE = 1024; - /** * \addtogroup dsp * @{ @@ -14,51 +19,99 @@ const us RINGBUFFER_SIZE = 1024; class SafeQueue; /** - * @brief Threaded in data handler. Buffers inCallback data and calls a - * callback with the same signature on a different thread. + * @brief Threaded in data handler base. Buffers inCallback data and calls a + * callback with the same signature on a different thread. The main function of + * this is to offload the thread that handles the stream, such that expensive + * computations do not result in stream buffer xruns. */ -class ThreadedInDataHandler: public InDataHandler { +class ThreadedInDataHandlerBase { /** * @brief The queue used to push elements to the handling thread. */ + + InDataHandler _indatahandler; std::unique_ptr _queue; + mutable std::recursive_mutex _mtx; + std::atomic _thread_running{false}; - std::atomic _stopThread{false}; - std::atomic _lastCallbackResult{true}; + std::atomic _thread_can_safely_run{false}; + + ThreadSafeThreadPool _pool; + + /** + * @brief Function pointer that is called when new DaqData arrives. + */ + const InCallbackType inCallback; void threadFcn(); - public: - /** - * @brief Initialize a ThreadedInDataHandler - * - * @param mgr StreamMgr singleton reference - */ - ThreadedInDataHandler(StreamMgr& mgr); - ~ThreadedInDataHandler(); /** - * @brief Pushes a copy of the daqdata to the thread queue and returns + * @brief Pushes a copy of the daqdata to the thread queue and returns. + * Adds a thread to handle the queue, whihc will call inCallback(); * * @param daqdata the daq info to push * * @return true, to continue with sampling. */ - virtual bool inCallback(const DaqData &daqdata) override final; + void _inCallbackFromInDataHandler(const DaqData &daqdata); + public: + ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset); + ~ThreadedInDataHandlerBase(); /** - * @brief This function should be overridden with an actual implementation, - * of what should happen on a different thread. - * - * @param d Input daq data - * - * @return true on succes. False when an error occured. + * @brief This method should be called from the derived class' constructor, + * to start the thread and data is incoming. */ - virtual bool inCallback_threaded(const DaqData& d) = 0; + void startThread(); + /** + * @brief This method SHOULD be called from all classes that derive on + * ThreadedInDataHandler. It is to make sure the inCallback_threaded() + * function is no longer called when the destructor of the derived class is + * called. Not calling this function is regarded as a BUG. + */ + void stopThread(); }; +/** + * @brief A bit of curiously recurring template pattern, to connect the + * specific handlers and connect the proper callbacks in a type-agnostic way. + * Using this class, each threaded handler should just implement its reset() + * and inCallback() method. Ellides the virtual method calls. + * + * Usage: class XHandler: public ThreadedInDataHandler { + * public: + * XHandler(streammgr) : ThreadedInDataHandler(streammgr) {} + * void inCallback(const DaqData& d) { ... do something with d } + * void reset(const Daq* daq) { ... do something with daq } + * }; + * + * For examples, see PPMHandler, etc. + * + * @tparam Derived The + */ +template +class ThreadedInDataHandler : public ThreadedInDataHandlerBase { + public: + ThreadedInDataHandler(SmgrHandle mgr): + ThreadedInDataHandlerBase(mgr, + std::bind(&ThreadedInDataHandler::_inCallback, this, _1), + std::bind(&ThreadedInDataHandler::_reset, this, _1)) + { + + } + + void _reset(const Daq* daq) { + DEBUGTRACE_ENTER; + return static_cast(this)->reset(daq); + } + void _inCallback(const DaqData& data) { + DEBUGTRACE_ENTER; + return static_cast(this)->inCallback(data); + } +}; /** @} */ /** @} */ diff --git a/src/lasp/lasp_record.py b/src/lasp/lasp_record.py index 627c1ef..307f2cb 100644 --- a/src/lasp/lasp_record.py +++ b/src/lasp/lasp_record.py @@ -19,7 +19,9 @@ class RecordStatus: class Recording: """ - Class used to perform a recording. + Class used to perform a recording. Recording data can come in from a + different thread, that is supposed to call the `inCallback` method, with + audio data as an argument. """ def __init__( @@ -99,7 +101,6 @@ class Recording: logging.debug("Starting record....") self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback) - self.indh.start() if wait: logging.debug("Stop recording with CTRL-C") @@ -218,6 +219,10 @@ class Recording: # from StreamMgr. self.indh = None + # Remove handle to dataset otherwise the h5 file is not closed + # properly. + self.ad = None + try: # Close the recording file self.f.close() diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index d8639b3..cf4406b 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -1,8 +1,10 @@ /* #define DEBUGTRACE_ENABLED */ #include "arma_npy.h" #include "debugtrace.hpp" -#include "lasp_ppm.h" #include "lasp_clip.h" +#include "lasp_daq.h" +#include "lasp_daqdata.h" +#include "lasp_ppm.h" #include "lasp_rtaps.h" #include "lasp_rtsignalviewer.h" #include "lasp_streammgr.h" @@ -94,35 +96,68 @@ py::array_t dmat_to_ndarray(const DaqData &d) { } /** - * @brief Wraps the InDataHandler such that it calls a Python callback with a - * buffer of sample data. The Python callback is called from a different - * thread, using a Numpy array as argument. + * @brief Wraps the ThreadedInDataHandler such that it calls a Python callback with a + * buffer of sample data. Converts DaqData objects to Numpy arrays and calls + * Python given as argument to the constructor */ -class PyIndataHandler : public ThreadedInDataHandler { +class PyIndataHandler : public ThreadedInDataHandler { /** * @brief The callback functions that is called. */ py::function cb, reset_callback; public: - PyIndataHandler(StreamMgr &mgr, py::function cb, py::function reset_callback) + /** + * @brief Initialize PyIndataHandler + * + * @param mgr StreamMgr handle + * @param cb Python callback that is called with Numpy input data from device + * @param reset_callback Python callback that is called with a Daq pointer. + * Careful: do not store this handle, as it is only valid as long as reset() + * is called, when a stream stops, this pointer / handle will dangle. + */ + PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) : ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) { DEBUGTRACE_ENTER; /// Start should be called externally, as at constructor time no virtual /// functions should be called. - /* start(); */ + startThread(); } ~PyIndataHandler() { DEBUGTRACE_ENTER; - stop(); + stopThread(); + } + /** + * @brief Calls the reset callback in Python. + * + * @param daq Daq device, or nullptr in case no input stream is running. + */ + void reset(const Daq *daq) { + DEBUGTRACE_ENTER; + py::gil_scoped_acquire acquire; + try { + if (daq) { + reset_callback(daq); + } else { + reset_callback(py::none()); + } + } catch (py::error_already_set &e) { + cerr << "*************** Error calling reset callback!\n"; + cerr << e.what() << endl; + cerr << "*************** \n"; + /// Throwing a runtime error here does not work out one way or another. + /// Therefore, it is better to dive out and prevent undefined behaviour + abort(); + /* throw std::runtime_error(e.what()); */ + } } - void reset(const Daq *daq) override final { reset_callback(daq); } /** - * @brief Reads from the buffer + * @brief Calls the Python callback method / function with a Numpy array of + * stream data. */ - bool inCallback_threaded(const DaqData &d) override final { + void inCallback(const DaqData &d) { /* DEBUGTRACE_ENTER; */ @@ -152,39 +187,29 @@ public: } // End of switch bool res = bool_val.cast(); - if (!res) - return false; } catch (py::error_already_set &e) { cerr << "ERROR: Python raised exception from callback function: "; cerr << e.what() << endl; - return false; + abort(); } catch (py::cast_error &e) { cerr << e.what() << endl; cerr << "ERROR: Python callback does not return boolean value." << endl; - return false; + abort(); } - return true; } }; void init_datahandler(py::module &m) { - py::class_ idh(m, "InDataHandler_base"); - idh.def("start", &InDataHandler::start); - idh.def("stop", &InDataHandler::stop); - - py::class_ tidh( - m, "ThreadedInDataHandler"); - /// The C++ class is PyIndataHandler, but for Python, it is called /// InDataHandler - py::class_ pyidh(m, "InDataHandler"); - pyidh.def(py::init()); + py::class_ pyidh(m, "InDataHandler"); + pyidh.def(py::init()); /// Peak Programme Meter - py::class_ ppm(m, "PPMHandler"); - ppm.def(py::init()); - ppm.def(py::init()); + py::class_ ppm(m, "PPMHandler"); + ppm.def(py::init()); + ppm.def(py::init()); ppm.def("getCurrentValue", [](const PPMHandler &ppm) { std::tuple tp = ppm.getCurrentValue(); @@ -194,11 +219,10 @@ void init_datahandler(py::module &m) { }); /// Clip Detector - py::class_ clip(m, "ClipHandler"); - clip.def(py::init()); + py::class_ clip(m, "ClipHandler"); + clip.def(py::init()); clip.def("getCurrentValue", [](const ClipHandler &clip) { - arma::uvec cval = clip.getCurrentValue(); return ColToNpy(cval); // something goes wrong here @@ -206,8 +230,8 @@ void init_datahandler(py::module &m) { /// Real time Aps /// - py::class_ rtaps(m, "RtAps"); - rtaps.def(py::init rtaps(m, "RtAps"); + rtaps.def(py::init rtsv(m, "RtSignalViewer"); - rtsv.def(py::init rtsv(m, "RtSignalViewer"); + rtsv.def(py::init()); rtsv.def("getCurrentValue", [](RtSignalViewer &rt) { diff --git a/src/lasp/pybind11/lasp_streammgr.cpp b/src/lasp/pybind11/lasp_streammgr.cpp index 6c62fe5..101da9d 100644 --- a/src/lasp/pybind11/lasp_streammgr.cpp +++ b/src/lasp/pybind11/lasp_streammgr.cpp @@ -1,4 +1,5 @@ #include "lasp_streammgr.h" +#include "lasp_indatahandler.h" #include #include #include @@ -12,7 +13,7 @@ void init_streammgr(py::module &m) { /// The stream manager is a singleton, and the lifetime is managed elsewhere. // It should not be deleted. - py::class_> smgr( + py::class_> smgr( m, "StreamMgr"); py::enum_(smgr, "StreamType") @@ -22,8 +23,8 @@ void init_streammgr(py::module &m) { smgr.def("startStream", &StreamMgr::startStream); smgr.def("stopStream", &StreamMgr::stopStream); smgr.def_static("getInstance", []() { - return std::unique_ptr(&StreamMgr::getInstance()); - }, py::return_value_policy::reference_internal); + return StreamMgr::getInstance(); + }); smgr.def("stopAllStreams", &StreamMgr::stopAllStreams); smgr.def("setSiggen", &StreamMgr::setSiggen); diff --git a/third_party/gsl-lite b/third_party/gsl-lite index 4720a29..a8c7e5b 160000 --- a/third_party/gsl-lite +++ b/third_party/gsl-lite @@ -1 +1 @@ -Subproject commit 4720a2980a30da085b4ddb4a0ea2a71af7351a48 +Subproject commit a8c7e5bbbd08841836f9b92d72747fb8769dbec4