Compare commits

...

5 Commits

Author SHA1 Message Date
Anne de Jong 21df1bc6cf Incallbacks should not return anything anymore. From inheritance to composition for InDataHandler code. StreamMgr singleton only weak ptr stored, this makes sure destruction from Python is more often done. UlDAQ code back to working.
continuous-integration/drone/push Build is passing Details
2023-06-09 10:43:04 +02:00
Anne de Jong 028bed9229 One forgotten debugtrace back to disabled 2023-06-07 21:51:03 +02:00
Anne de Jong c87a5cec25 StreamMgr handle now via shared pointers. InDataHandler stores weak pointers. Reset callback in PyInDataHandler could be problematic. Refactored the UlDaq code and moved to a subfolder. 2023-06-07 21:49:07 +02:00
Anne de Jong 6fc1bd90b1 Moved IndataHandler to its own implementation file. Refactored some code. Fixed race conditions when starting and stopping indatahandlers. It appears that this does not solve the segfault, but is at least mitigates some race conditions when constructors are not ready on an object, and avoiding the call of virtual functions of an object which destructor has already been called. Added some extra assert check that a function is called from the right thread. Put explicit start and stop methods in constructor / destructor of PyInDataHandler. WARNING: this means all .start() and .stop() methods should be removed. THIS IS AN API break!
continuous-integration/drone/push Build is passing Details
2023-06-06 16:05:24 +02:00
Anne de Jong dd2bbb5973 Some improvements in the clearyness of meaning in uldaq code. No bugs found. 2023-06-06 15:57:20 +02:00
31 changed files with 1256 additions and 878 deletions

View File

@ -1,4 +1,5 @@
# src/lasp/device/CMakeLists.txt
include_directories(uldaq)
add_library(lasp_device_lib OBJECT
lasp_daq.cpp
@ -7,8 +8,11 @@ add_library(lasp_device_lib OBJECT
lasp_deviceinfo.cpp
lasp_rtaudiodaq.cpp
lasp_streammgr.cpp
lasp_indatahandler.cpp
lasp_uldaq.cpp
lasp_uldaq_impl.cpp
uldaq/lasp_uldaq_impl.cpp
uldaq/lasp_uldaq_bufhandler.cpp
uldaq/lasp_uldaq_common.cpp
)
# Callback requires certain arguments that are not used by code. This disables

View File

@ -5,6 +5,8 @@
#include "lasp_types.h"
#include <functional>
#include <memory>
#include <mutex>
#include <atomic>
/**
* \defgroup device Device interfacing
@ -13,12 +15,12 @@
* @brief Callback of DAQ for input data. Callback should return
* false for a stop request.
*/
using InDaqCallback = std::function<bool(const DaqData &)>;
using InDaqCallback = std::function<void(const DaqData &)>;
/**
* @brief
*/
using OutDaqCallback = std::function<bool(DaqData &)>;
using OutDaqCallback = std::function<void(DaqData &)>;
/**
* @brief Base cass for all DAQ (Data Acquisition) interfaces. A DAQ can be a

View File

@ -19,6 +19,7 @@ public:
* @brief Virtual desctructor. Can be derived class.
*/
virtual ~DeviceInfo() {}
DeviceInfo& operator=(const DeviceInfo&) = delete;
/**
* @brief Clone a device info.

View File

@ -0,0 +1,59 @@
/* #define DEBUGTRACE_ENABLED */
#include "lasp_indatahandler.h"
#include "debugtrace.hpp"
#include "lasp_streammgr.h"
#include <thread>
InDataHandler::InDataHandler(SmgrHandle mgr, const InCallbackType cb,
const InResetType resetfcn)
: _mgr(mgr), inCallback(cb), reset(resetfcn)
#if LASP_DEBUG == 1
,
main_thread_id(std::this_thread::get_id())
#endif
{
DEBUGTRACE_ENTER;
#if LASP_DEBUG == 1
assert(mgr->main_thread_id == main_thread_id);
#endif
}
void InDataHandler::start() {
DEBUGTRACE_ENTER;
checkRightThread();
if (SmgrHandle handle = _mgr.lock()) {
handle->addInDataHandler(this);
#if LASP_DEBUG == 1
assert(handle->main_thread_id == main_thread_id);
#endif
}
}
void InDataHandler::stop() {
DEBUGTRACE_ENTER;
checkRightThread();
#if LASP_DEBUG == 1
stopCalled = true;
#endif
if (SmgrHandle handle = _mgr.lock()) {
handle->removeInDataHandler(*this);
}
}
InDataHandler::~InDataHandler() {
DEBUGTRACE_ENTER;
checkRightThread();
#if LASP_DEBUG == 1
if (!stopCalled) {
std::cerr << "************ BUG: Stop function not called while arriving at "
"InDataHandler's destructor. Fix this by calling "
"InDataHandler::stop()."
<< std::endl;
abort();
}
#endif
}
#if LASP_DEBUG == 1
void InDataHandler::checkRightThread() const {
assert(std::this_thread::get_id() == main_thread_id);
}
#endif

View File

@ -0,0 +1,79 @@
#pragma once
#include "lasp_types.h"
#include <atomic>
#include <functional>
#include <memory>
#include <thread>
class StreamMgr;
using SmgrHandle = std::shared_ptr<StreamMgr>;
class DaqData;
class Daq;
/** \addtogroup device
* @{
*/
/**
* @brief The function definition of callbacks with incoming DAQ data
*/
using InCallbackType = std::function<void(const DaqData &)>;
/**
* @brief Function definition for the reset callback.
*/
using InResetType = std::function<void(const Daq *)>;
class InDataHandler {
protected:
std::weak_ptr<StreamMgr> _mgr;
#if LASP_DEBUG == 1
// This is a flag to indicate whether the method stop() is called for the
// current handler. It should call the method stop() from the derived class's
// destructor.
std::atomic<bool> stopCalled{false};
#endif
public:
~InDataHandler();
const InCallbackType inCallback;
const InResetType reset;
/**
* @brief When constructed, the handler is added to the stream manager, which
* will call the handlers's inCallback() until stop() is called.
*
* @param mgr Stream manager.
* @param cb The callback that is stored, and called on new DAQ data
* @param resetfcn The callback that is stored, and called when the DAQ
* changes state.
*/
InDataHandler(SmgrHandle mgr, InCallbackType cb,
InResetType resetfcn);
/**
* @brief Adds the current InDataHandler to the list of handlers in the
* StreamMgr. After this happens, the reset() method stored in this
* object is called back. When the stream is running, right after this,
* inCallback() is called with DaqData.
*/
void start();
/**
* @brief Removes the currend InDataHandler from the list of handlers in the
* StreamMgr. From that point on, the object can be safely destroyed. Not
* calling stop() before destruction of this object is considered a BUG. I.e.
* a class which *uses* an InDataHandler should always call stop() in its
* destructor.
*/
void stop();
#if LASP_DEBUG == 1
const std::thread::id main_thread_id;
void checkRightThread() const;
#else
void checkRightThread() const {}
#endif
};
/** @} */

View File

@ -368,11 +368,7 @@ public:
DaqData d{nFramesPerBlock, neninchannels, dtype};
d.copyInFromRaw(ptrs);
bool ret = _incallback(d);
if (!ret) {
stopWithError(se::noError);
return 1;
}
_incallback(d);
}
if (outputBuffer) {
@ -395,11 +391,8 @@ public:
}
DaqData d{nFramesPerBlock, nenoutchannels, dtype};
bool ret = _outcallback(d);
if (!ret) {
stopWithError(se::noError);
return 1;
}
_outcallback(d);
// Copy over the buffer
us j = 0;
for (auto ptr : ptrs) {
d.copyToRaw(j, ptr);

View File

@ -2,6 +2,25 @@
#include "lasp_daq.h"
#include <memory>
/** \addtogroup device
* @{
* \defgroup rtaudio RtAudio backend
* This code is used to interface with the RtAudio cross-platform audio
* interface.
*
* \addtogroup rtaudio
* @{
*/
/**
* @brief Method called from Daq::createDaq.
*
* @param devinfo Device info
* @param config DAQ Configuration settings
*
* @return Pointer to Daq instance. Throws Runtime errors on error.
*/
std::unique_ptr<Daq> createRtAudioDevice(const DeviceInfo& devinfo,
const DaqConfiguration& config);
@ -12,3 +31,5 @@ std::unique_ptr<Daq> createRtAudioDevice(const DeviceInfo& devinfo,
*/
void fillRtAudioDeviceInfo(DeviceInfoList &devinfolist);
/** @} */
/** @} */

View File

@ -2,58 +2,68 @@
#include "lasp_streammgr.h"
#include "debugtrace.hpp"
#include "lasp_biquadbank.h"
#include "lasp_indatahandler.h"
#include "lasp_thread.h"
#include <algorithm>
#include <assert.h>
#include <functional>
#include <iostream>
#include <memory>
using std::cerr;
using std::endl;
using rte = std::runtime_error;
InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) { DEBUGTRACE_ENTER; }
void InDataHandler::start() {
DEBUGTRACE_ENTER;
_mgr.addInDataHandler(*this);
}
void InDataHandler::stop() {
#if LASP_DEBUG == 1
stopCalled = true;
#endif
_mgr.removeInDataHandler(*this);
}
InDataHandler::~InDataHandler() {
/**
* @brief The main global handle to a stream, stored in a weak pointer, if it
* does not yet exist, via StreamMgr::getInstance, a new stream mgr is created.
* It also makes sure that the stream manager is deleted once the latest handle
* to it has been destroyed (no global stuff left).
*/
std::weak_ptr<StreamMgr> _mgr;
/**
* @brief The only way to obtain a stream manager, can only be called from the
* thread that does it the first time.
*
* @return Stream manager handle
*/
SmgrHandle StreamMgr::getInstance() {
DEBUGTRACE_ENTER;
#if LASP_DEBUG == 1
if (!stopCalled) {
cerr << "************ BUG: Stop function not called while arriving at "
"InDataHandler's destructor. Fix this by calling "
"InDataHandler::stop() from the derived class' destructor."
<< endl;
abort();
auto mgr = _mgr.lock();
if (!mgr) {
mgr = SmgrHandle(new StreamMgr());
if (!mgr) {
throw rte("Fatal: could not allocate stream manager!");
}
// Update global weak pointer
_mgr = mgr;
return mgr;
}
#if LASP_DEBUG == 1
// Make sure we never ask for a new SmgrHandle from a different thread.
assert(std::this_thread::get_id() == mgr->main_thread_id);
#endif
}
StreamMgr &StreamMgr::getInstance() {
DEBUGTRACE_ENTER;
static StreamMgr mgr;
return mgr;
}
StreamMgr::StreamMgr() {
DEBUGTRACE_ENTER;
StreamMgr::StreamMgr()
#if LASP_DEBUG == 1
_main_thread_id = std::this_thread::get_id();
: main_thread_id(std::this_thread::get_id())
#endif
{
DEBUGTRACE_ENTER;
// Trigger a scan for the available devices, in the background.
rescanDAQDevices(true);
}
#if LASP_DEBUG == 1
void StreamMgr::checkRightThread() const {
assert(std::this_thread::get_id() == _main_thread_id);
assert(std::this_thread::get_id() == main_thread_id);
}
#endif
@ -63,19 +73,20 @@ void StreamMgr::rescanDAQDevices(bool background,
auto &pool = getPool();
checkRightThread();
if (_inputStream || _outputStream) {
throw rte("Rescanning DAQ devices only possible when no stream is running");
}
if (!_devices_mtx.try_lock()) {
throw rte("A background DAQ device scan is probably already running");
}
_devices_mtx.unlock();
if (_inputStream || _outputStream) {
throw rte("Rescanning DAQ devices only possible when no stream is running");
}
std::scoped_lock lck(_devices_mtx);
_devices.clear();
/* auto &pool = getPool(); */
if (!background) {
rescanDAQDevices_impl(callback);
} else {
DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread...");
pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback);
}
}
@ -87,7 +98,7 @@ void StreamMgr::rescanDAQDevices_impl(std::function<void()> callback) {
callback();
}
}
bool StreamMgr::inCallback(const DaqData &data) {
void StreamMgr::inCallback(const DaqData &data) {
DEBUGTRACE_ENTER;
@ -118,23 +129,15 @@ bool StreamMgr::inCallback(const DaqData &data) {
}
for (auto &handler : _inDataHandlers) {
bool res = handler->inCallback(input_filtered);
if (!res) {
return false;
}
handler->inCallback(input_filtered);
}
} else {
/// No input filters
for (auto &handler : _inDataHandlers) {
bool res = handler->inCallback(data);
if (!res) {
return false;
}
handler->inCallback(data);
}
}
return true;
}
void StreamMgr::setSiggen(std::shared_ptr<Siggen> siggen) {
@ -198,7 +201,7 @@ template <typename T> bool fillData(DaqData &data, const vd &signal) {
return true;
}
bool StreamMgr::outCallback(DaqData &data) {
void StreamMgr::outCallback(DaqData &data) {
/* DEBUGTRACE_ENTER; */
@ -227,19 +230,17 @@ bool StreamMgr::outCallback(DaqData &data) {
// Set all values to 0.
std::fill(data.raw_ptr(), data.raw_ptr() + data.size_bytes(), 0);
}
return true;
}
StreamMgr::~StreamMgr() {
DEBUGTRACE_ENTER;
checkRightThread();
stopAllStreams();
if (!_inDataHandlers.empty()) {
cerr << "*** WARNING: InDataHandlers have not been all stopped, while "
"StreamMgr destructor is called. This is a misuse BUG"
<< endl;
abort();
}
// Stream manager now handled by shared pointer. Each indata handler gets a
// shared pointer to the stream manager, and stores a weak pointer to it.
// Hence, we do not have to do any cleanup here. It also makes sure that the
// order in which destructors are called does not matter anymore. As soon as
// the stream manager is destructed, the weak pointers loose there ref, and do
// not have to removeInDataHandler() anymore.
}
void StreamMgr::stopAllStreams() {
DEBUGTRACE_ENTER;
@ -264,15 +265,15 @@ void StreamMgr::startStream(const DaqConfiguration &config) {
std::scoped_lock lck(_devices_mtx);
DeviceInfo *devinfo = nullptr;
bool found = false;
// Match configuration to a device in the list of devices
for (auto &devinfoi : _devices) {
if (config.match(*devinfoi)) {
devinfo = devinfoi.get();
break;
}
}
if (!devinfo) {
if (devinfo == nullptr) {
throw rte("Could not find a device with name " + config.device_name +
" in list of devices.");
}
@ -405,23 +406,20 @@ void StreamMgr::stopStream(const StreamType t) {
}
}
void StreamMgr::addInDataHandler(InDataHandler &handler) {
void StreamMgr::addInDataHandler(InDataHandler *handler) {
DEBUGTRACE_ENTER;
checkRightThread();
assert(handler);
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
if (_inputStream) {
handler.reset(_inputStream.get());
} else {
handler.reset(nullptr);
}
if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), &handler) !=
handler->reset(_inputStream.get());
if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) !=
_inDataHandlers.cend()) {
throw std::runtime_error("Error: handler already added. Probably start() "
"is called more than once on a handler object");
}
_inDataHandlers.push_back(&handler);
_inDataHandlers.push_back(handler);
DEBUGTRACE_PRINT(_inDataHandlers.size());
}
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
@ -431,7 +429,6 @@ void StreamMgr::removeInDataHandler(InDataHandler &handler) {
_inDataHandlers.remove(&handler);
DEBUGTRACE_PRINT(_inDataHandlers.size());
}
Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {

View File

@ -10,60 +10,8 @@
* @{
*/
class StreamMgr;
class InDataHandler;
class InDataHandler {
protected:
StreamMgr &_mgr;
#if LASP_DEBUG == 1
std::atomic<bool> stopCalled{false};
#endif
public:
virtual ~InDataHandler();
/**
* @brief When constructed, the handler is added to the stream manager, which
* will call the handlers's inCallback() until stop() is called.
*
* @param mgr Stream manager.
*/
InDataHandler(StreamMgr &mgr);
/**
* @brief This function is called when input data from a DAQ is available.
*
* @param daqdata Input data from DAQ
*
* @return true if no error. False to stop the stream from running.
*/
virtual bool inCallback(const DaqData &daqdata) = 0;
/**
* @brief Reset in-data handler.
*
* @param daq New DAQ configuration of inCallback(). If nullptr is given,
* it means that the stream is stopped.
*/
virtual void reset(const Daq *daq = nullptr) = 0;
/**
* @brief This function should be called from the constructor of the
* implementation of InDataHandler. It will start the stream's calling of
* inCallback().
*/
void start();
/**
* @brief This function should be called from the destructor of derived
* classes, to disable the calls to inCallback(), such that proper
* destruction of the object is allowed and no other threads call methods
* from the object. It removes the inCallback() from the callback list of the
* StreamMgr(). **Failing to call this function results in deadlocks, errors
* like "pure virtual function called", or other**.
*/
void stop();
};
class SeriesBiquad;
@ -76,9 +24,6 @@ class SeriesBiquad;
* fact is asserted.
*/
class StreamMgr {
#if LASP_DEBUG == 1
std::thread::id _main_thread_id;
#endif
/**
* @brief Storage for streams.
@ -91,33 +36,39 @@ class StreamMgr {
* thread-safety.
*/
std::list<InDataHandler *> _inDataHandlers;
std::mutex _inDataHandler_mtx;
mutable std::mutex _inDataHandler_mtx;
/**
* @brief Signal generator in use to generate output data. Currently
* implemented as to generate the same data for all output channels.
*/
std::shared_ptr<Siggen> _siggen;
std::mutex _siggen_mtx;
/**
* @brief Filters on input stream. For example, a digital high pass filter.
*/
std::vector<std::unique_ptr<SeriesBiquad>> _inputFilters;
std::mutex _siggen_mtx;
std::mutex _devices_mtx;
mutable std::recursive_mutex _devices_mtx;
/**
* @brief Current storage for the device list
*/
DeviceInfoList _devices;
// Singleton, no public constructor. Can only be obtained using
// getInstance();
StreamMgr();
friend class InDataHandler;
friend class Siggen;
// Singleton, no public destructor
~StreamMgr();
public:
~StreamMgr();
enum class StreamType : us {
/**
* @brief Input stream
@ -137,7 +88,7 @@ class StreamMgr {
*
* @return Reference to stream manager.
*/
static StreamMgr &getInstance();
static std::shared_ptr<StreamMgr> getInstance();
/**
* @brief Obtain a list of devices currently available. When the StreamMgr is
@ -146,7 +97,7 @@ class StreamMgr {
* @return A copy of the internal stored list of devices
*/
DeviceInfoList getDeviceInfo() const {
std::scoped_lock lck(const_cast<std::mutex &>(_devices_mtx));
std::scoped_lock lck(_devices_mtx);
DeviceInfoList d2;
for(const auto& dev: _devices) {
d2.push_back(dev->clone());
@ -157,7 +108,8 @@ class StreamMgr {
/**
* @brief Triggers a background scan of the DAQ devices, which updates the
* internally stored list of devices. Throws a runtime error when a
* background thread is already scanning for devices.
* background thread is already scanning for devices, or if a stream is
* running.
*
* @param background Perform searching for DAQ devices in the background. If
* set to true, the function returns immediately.
@ -231,17 +183,17 @@ class StreamMgr {
/**
* @brief Set active signal generator for output streams. Only one `Siggen'
* is active at the same time. Siggen controls its own data race protection
* using a mutex.
* using a mutex. If no Siggen is there, and an output stream is running, it
* will send a default signal of 0.
*
* @param s New Siggen pointer
*/
void setSiggen(std::shared_ptr<Siggen> s);
private:
bool inCallback(const DaqData &data);
bool outCallback(DaqData &data);
void inCallback(const DaqData &data);
void outCallback(DaqData &data);
void removeInDataHandler(InDataHandler &handler);
/**
* @brief Add an input data handler. The handler's inCallback() function is
@ -251,8 +203,14 @@ private:
*
* @param handler The handler to add.
*/
void addInDataHandler(InDataHandler &handler);
void addInDataHandler(InDataHandler *handler);
/**
* @brief Remove InDataHandler from the list.
*
* @param handler
*/
void removeInDataHandler(InDataHandler &handler);
/**
* @brief Do the actual rescanning.
*
@ -261,6 +219,7 @@ private:
void rescanDAQDevices_impl(std::function<void()> callback);
#if LASP_DEBUG == 1
const std::thread::id main_thread_id;
void checkRightThread() const;
#else
void checkRightThread() const {}

View File

@ -1,18 +1,22 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1
#include "lasp_uldaq.h"
#include "lasp_uldaq_impl.h"
#include <uldaq.h>
/**
* @brief The maximum number of devices that can be enumerated when calling
* ulGetDaqDeviceInventory()
*/
const us MAX_ULDAQ_DEV_COUNT_PER_API = 100;
void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) {
DEBUGTRACE_ENTER;
UlError err;
unsigned int numdevs = MAX_ULDAQ_DEV_COUNT_PER_API;
@ -20,13 +24,13 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) {
DaqDeviceDescriptor descriptor;
DaqDeviceInterface interfaceType = ANY_IFC;
err = ulGetDaqDeviceInventory(interfaceType, devdescriptors,
static_cast<unsigned *>(&numdevs));
err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, &numdevs);
if (err != ERR_NO_ERROR) {
throw rte("UlDaq device inventarization failed");
}
DEBUGTRACE_PRINT(string("Number of devices: ") + std::to_string(numdevs));
for (unsigned i = 0; i < numdevs; i++) {
descriptor = devdescriptors[i];
@ -35,32 +39,34 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) {
devinfo._uldaqDescriptor = descriptor;
devinfo.api = uldaqapi;
string name, interface;
string productname = descriptor.productName;
if (productname != "DT9837A") {
throw rte("Unknown UlDAQ type: " + productname);
{
string name;
string productname = descriptor.productName;
if (productname != "DT9837A") {
throw rte("Unknown UlDAQ type: " + productname);
}
switch (descriptor.devInterface) {
case USB_IFC:
name = "USB - ";
break;
case BLUETOOTH_IFC:
/* devinfo. */
name = "Bluetooth - ";
break;
case ETHERNET_IFC:
/* devinfo. */
name = "Ethernet - ";
break;
default:
name = "Uknown interface = ";
}
name += productname + " " + string(descriptor.uniqueId);
devinfo.device_name = name;
}
switch (descriptor.devInterface) {
case USB_IFC:
name = "USB - ";
break;
case BLUETOOTH_IFC:
/* devinfo. */
name = "Bluetooth - ";
break;
case ETHERNET_IFC:
/* devinfo. */
name = "Ethernet - ";
break;
default:
name = "Uknown interface = ";
}
name += string(descriptor.productName) + " " + string(descriptor.uniqueId);
devinfo.device_name = std::move(name);
devinfo.physicalOutputQty = DaqChannel::Qty::Voltage;
devinfo.availableDataTypes.push_back(
@ -93,7 +99,12 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) {
std::unique_ptr<Daq> createUlDaqDevice(const DeviceInfo &devinfo,
const DaqConfiguration &config) {
return std::make_unique<DT9837A>(devinfo, config);
const UlDaqDeviceInfo *_info =
dynamic_cast<const UlDaqDeviceInfo *>(&devinfo);
if (_info == nullptr) {
throw rte("BUG: Could not cast DeviceInfo to UlDaqDeviceInfo");
}
return std::make_unique<DT9837A>(*_info, config);
}
#endif // LASP_HAS_ULDAQ

View File

@ -1,18 +1,24 @@
#pragma once
#include "lasp_daq.h"
/**
* @brief The maximum number of devices that can be enumerated when calling
* ulGetDaqDeviceInventory()
/** \addtogroup device
* \defgroup uldaq UlDAQ specific code
* This code is used to interface with UlDAQ compatible devices. It is only
* tested on Linux.
* @{
* \addtogroup uldaq
* @{
*/
const us MAX_ULDAQ_DEV_COUNT_PER_API = 100;
std::unique_ptr<Daq> createUlDaqDevice(const DeviceInfo &devinfo,
const DaqConfiguration &config);
/**
* @brief Fill device info list with UlDaq specific devices, if any.
* @brief Append device info list with UlDaq specific devices, if any.
*
* @param devinfolist Info list to append to.
*/
void fillUlDaqDeviceInfo(DeviceInfoList& devinfolist);
/** @} */
/** @} */

View File

@ -1,484 +0,0 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1
#include "lasp_daqconfig.h"
#include "lasp_uldaq.h"
#include "lasp_uldaq_impl.h"
using namespace std::literals::chrono_literals;
/**
* @brief Reserve some space for an error message from UlDaq
*/
const us UL_ERR_MSG_LEN = 512;
/**
* @brief Return a string corresponding to the UlDaq API error
*
* @param err error code
*
* @return Error string
*/
string getErrMsg(UlError err) {
string errstr;
errstr.reserve(UL_ERR_MSG_LEN);
char errmsg[UL_ERR_MSG_LEN];
errstr = "UlDaq API Error: ";
ulGetErrMsg(err, errmsg);
errstr += errmsg;
return errstr;
}
inline void showErr(string errstr) {
std::cerr << "\b\n**************** UlDAQ backend error **********\n";
std::cerr << errstr << std::endl;
std::cerr << "***********************************************\n\n";
}
inline void showErr(UlError err) {
if (err != ERR_NO_ERROR)
showErr(getErrMsg(err));
}
DT9837A::~DT9837A() {
UlError err;
if (isRunning()) {
stop();
}
if (_handle) {
err = ulDisconnectDaqDevice(_handle);
showErr(err);
err = ulReleaseDaqDevice(_handle);
showErr(err);
}
}
DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config)
: Daq(devinfo, config),
_nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) {
// Some sanity checks
if (inchannel_config.size() != 4) {
throw rte("Invalid length of enabled inChannels vector");
}
if (outchannel_config.size() != 1) {
throw rte("Invalid length of enabled outChannels vector");
}
if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) {
throw rte("Unsensible number of samples per block chosen");
}
if (samplerate() < ULDAQ_SAMPLERATES.at(0) || samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size()-1)) {
throw rte("Invalid sample rate");
}
const UlDaqDeviceInfo *_info =
dynamic_cast<const UlDaqDeviceInfo *>(&devinfo);
if (_info == nullptr) {
throw rte("BUG: Could not cast DeviceInfo to UlDaqDeviceInfo");
}
// get a handle to the DAQ device associated with the first descriptor
_handle = ulCreateDaqDevice(_info->_uldaqDescriptor);
if (_handle == 0) {
throw rte("Unable to create a handle to the specified DAQ "
"device. Is the device currently in use? Please make sure to set "
"the DAQ configuration in duplex mode if simultaneous input and "
"output is required.");
}
UlError err = ulConnectDaqDevice(_handle);
if (err != ERR_NO_ERROR) {
ulReleaseDaqDevice(_handle);
_handle = 0;
throw rte("Unable to connect to device: " + getErrMsg(err));
}
/// Loop over input channels, set parameters
for (us ch = 0; ch < 4; ch++) {
err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0);
showErr(err);
if (err != ERR_NO_ERROR) {
throw rte("Fatal: could normalize channel sensitivity");
}
CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC;
err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set AC/DC coupling mode");
}
IepeMode iepe =
inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED;
err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set IEPE mode");
}
}
}
bool DT9837A::isRunning() const {
DEBUGTRACE_ENTER;
return _thread.joinable();
}
void DT9837A::stop() {
DEBUGTRACE_ENTER;
StreamStatus status = _streamStatus;
if (!isRunning()) {
throw rte("No data acquisition running");
}
_stopThread = true;
if (_thread.joinable()) {
_thread.join();
}
_stopThread = false;
status.isRunning = false;
_streamStatus = status;
}
/**
* @brief Throws an appropriate stream exception based on the UlError number.
* The mapping is based on the error numbers as given in uldaq.h. There are a
* log of errors definded here (109 in total). Except for some, we will map
* most of them to a driver error.
*
* @param e
*/
inline void throwUlException(UlError err) {
if (err == ERR_NO_ERROR) {
return;
}
string errstr = getErrMsg(err);
showErr(errstr);
Daq::StreamStatus::StreamError serr;
if ((int)err == 18) {
serr = Daq::StreamStatus::StreamError::inputXRun;
} else if ((int)err == 19) {
serr = Daq::StreamStatus::StreamError::outputXRun;
} else {
serr = Daq::StreamStatus::StreamError::driverError;
}
throw Daq::StreamException(serr, errstr);
}
void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
if (isRunning()) {
throw rte("DAQ is already running");
}
if (neninchannels() > 0) {
if (!inCallback)
throw rte("DAQ requires a callback for input data");
}
if (nenoutchannels() > 0) {
if (!outCallback)
throw rte("DAQ requires a callback for output data");
}
assert(neninchannels() + nenoutchannels() > 0);
_thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback);
}
InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb)
: BufHandler(daq, daq.neninchannels()), cb(cb)
{
DEBUGTRACE_ENTER;
assert(daq.getHandle() != 0);
monitorOutput = daq.monitorOutput;
DaqInScanFlag inscanflags = DAQINSCAN_FF_DEFAULT;
ScanOption scanoptions = SO_CONTINUOUS;
UlError err = ERR_NO_ERROR;
std::vector<DaqInChanDescriptor> indescs;
boolvec eninchannels_without_mon = daq.eninchannels(false);
// Set ranges for each input. Below asks only channels that are not a
// monitor channel (hence the false flag).
dvec ranges = daq.inputRangeForEnabledChannels(false);
us enabled_ch_count = 0;
for (us chin = 0; chin < 4; chin++) {
if (eninchannels_without_mon[chin] == true) {
DaqInChanDescriptor indesc;
indesc.type = DAQI_ANALOG_SE;
indesc.channel = chin;
double rangeval = ranges.at(enabled_ch_count);
Range rangenum;
if (fabs(rangeval - 1.0) < 1e-8) {
rangenum = BIP1VOLTS;
} else if (fabs(rangeval - 10.0) < 1e-8) {
rangenum = BIP10VOLTS;
} else {
throw Daq::StreamException(Daq::StreamStatus::StreamError::logicError);
std::cerr << "Fatal: input range value is invalid" << endl;
return;
}
indesc.range = rangenum;
indescs.push_back(indesc);
enabled_ch_count++;
}
}
// Add possibly last channel as monitor
if (monitorOutput) {
DaqInChanDescriptor indesc;
indesc.type = DAQI_DAC;
indesc.channel = 0;
/// The output only has a range of 10V, therefore the monitor of the
/// output also has to be set to this value.
indesc.range = BIP10VOLTS;
indescs.push_back(indesc);
}
assert(indescs.size() == nchannels);
DEBUGTRACE_MESSAGE("Starting input scan");
err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels,
2 * nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, inscanflags, buf.data());
throwUlException(err);
}
void InBufHandler::start() {
DEBUGTRACE_ENTER;
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus);
throwUlException(err);
totalFramesCount = transferStatus.currentTotalCount;
topenqueued = true;
botenqueued = true;
}
bool InBufHandler::operator()() {
/* DEBUGTRACE_ENTER; */
bool ret = true;
auto runCallback = ([&](us totalOffset) {
/* DEBUGTRACE_ENTER; */
DaqData data(nFramesPerBlock, nchannels, dtype_descr.dtype);
us monitorOffset = monitorOutput ? 1 : 0;
/* /// Put the output monitor in front */
if (monitorOutput) {
for (us frame = 0; frame < nFramesPerBlock; frame++) {
data.value<double>(frame, 0) =
buf[totalOffset + (frame * nchannels) + (nchannels - 1)];
}
}
for (us channel = 0; channel < nchannels - monitorOffset; channel++) {
/* DEBUGTRACE_PRINT(channel); */
for (us frame = 0; frame < nFramesPerBlock; frame++) {
data.value<double>(frame, channel + monitorOffset) =
buf[totalOffset + (frame * nchannels) + channel];
}
}
return cb(data);
});
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus);
throwUlException(err);
us increment = transferStatus.currentTotalCount - totalFramesCount;
totalFramesCount += increment;
if (increment > nFramesPerBlock) {
throw Daq::StreamException(Daq::StreamStatus::StreamError::inputXRun);
}
assert(status == SS_RUNNING);
if (transferStatus.currentIndex < (long long)buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
ret = runCallback(nchannels * nFramesPerBlock);
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
ret = runCallback(0);
topenqueued = true;
}
}
return ret;
}
InBufHandler::~InBufHandler() {
// At exit of the function, stop scanning.
DEBUGTRACE_ENTER;
UlError err = ulDaqInScanStop(daq.getHandle());
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
OutBufHandler::OutBufHandler(DT9837A &daq, OutDaqCallback cb)
: BufHandler(daq, daq.nenoutchannels()), cb(cb) {
DEBUGTRACE_MESSAGE("Starting output scan");
DEBUGTRACE_PRINT(nchannels);
AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT;
ScanOption scanoptions = SO_CONTINUOUS;
UlError err = ulAOutScan(daq.getHandle(), 0, 0, BIP10VOLTS,
2 * nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, outscanflags, buf.data());
throwUlException(err);
}
void OutBufHandler::start() {
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Unable to start output on DAQ");
}
if (status != SS_RUNNING) {
throw rte("Unable to start output on DAQ");
}
totalFramesCount = transferStatus.currentTotalCount;
topenqueued = true;
botenqueued = true;
}
bool OutBufHandler::operator()() {
/* DEBUGTRACE_ENTER; */
bool res = true;
assert(daq.getHandle() != 0);
UlError err = ERR_NO_ERROR;
ScanStatus status;
TransferStatus transferStatus;
err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus);
throwUlException(err);
if (status != SS_RUNNING) {
return false;
}
us increment = transferStatus.currentTotalCount - totalFramesCount;
totalFramesCount += increment;
if (increment > nFramesPerBlock) {
throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun);
}
if (transferStatus.currentIndex < buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
DaqData d(nFramesPerBlock, 1, dtype_descr.dtype);
res = cb(d);
d.copyToRaw(0, reinterpret_cast<byte_t *>(&(buf[buffer_mid_idx])));
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
DaqData d(nFramesPerBlock, 1, dtype_descr.dtype);
res = cb(d);
d.copyToRaw(0, reinterpret_cast<byte_t *>(&(buf[0])));
topenqueued = true;
}
}
return res;
}
OutBufHandler::~OutBufHandler() {
DEBUGTRACE_ENTER;
UlError err = ulAOutScanStop(daq.getHandle());
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
try {
std::unique_ptr<OutBufHandler> obh;
std::unique_ptr<InBufHandler> ibh;
StreamStatus status = _streamStatus;
status.isRunning = true;
_streamStatus = status;
if (nenoutchannels() > 0) {
assert(outCallback);
obh = std::make_unique<OutBufHandler>(*this, outCallback);
}
if (neninchannels() > 0) {
assert(inCallback);
ibh = std::make_unique<InBufHandler>(*this, inCallback);
}
if (obh)
obh->start();
if (ibh)
ibh->start();
const double sleeptime_s =
static_cast<double>(_nFramesPerBlock) / (16 * samplerate());
const us sleeptime_us = static_cast<us>(sleeptime_s * 1e6);
while (!_stopThread) {
if (ibh) {
if (!(*ibh)()) {
_stopThread = true;
break;
}
}
if (obh) {
if (!(*obh)()) {
_stopThread = true;
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us));
}
/// Update stream status that we are not running anymore
status.isRunning = false;
_streamStatus = status;
_stopThread = false;
} catch (StreamException &e) {
StreamStatus status = _streamStatus;
// Copy over error type
status.errorType = e.e;
_streamStatus = status;
/*
cerr << "\n******************\n";
cerr << "Catched error in UlDAQ thread: " << e.what() << endl;
cerr << "\n******************\n";
*/
}
}
#endif // LASP_HAS_ULDAQ

View File

@ -0,0 +1,245 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1
#include "lasp_uldaq_bufhandler.h"
#include "lasp_daq.h"
InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb)
: BufHandler(daq, daq.neninchannels()), cb(cb)
{
DEBUGTRACE_ENTER;
assert(daq.getHandle() != 0);
monitorOutput = daq.monitorOutput;
DaqInScanFlag inscanflags = DAQINSCAN_FF_DEFAULT;
ScanOption scanoptions = SO_CONTINUOUS;
UlError err = ERR_NO_ERROR;
std::vector<DaqInChanDescriptor> indescs;
boolvec eninchannels_without_mon = daq.eninchannels(false);
// Set ranges for each input. Below asks only channels that are not a
// monitor channel (hence the false flag).
dvec ranges = daq.inputRangeForEnabledChannels(false);
us enabled_ch_counter = 0;
for (us chin = 0; chin < 4; chin++) {
if (eninchannels_without_mon[chin] == true) {
DaqInChanDescriptor indesc;
indesc.type = DAQI_ANALOG_SE;
indesc.channel = chin;
double rangeval = ranges.at(enabled_ch_counter);
Range rangenum;
if (fabs(rangeval - 1.0) < 1e-8) {
rangenum = BIP1VOLTS;
} else if (fabs(rangeval - 10.0) < 1e-8) {
rangenum = BIP10VOLTS;
} else {
throw Daq::StreamException(Daq::StreamStatus::StreamError::logicError);
std::cerr << "Fatal: input range value is invalid" << endl;
return;
}
indesc.range = rangenum;
indescs.push_back(indesc);
enabled_ch_counter++;
}
}
// Add possibly last channel as monitor
if (monitorOutput) {
DaqInChanDescriptor indesc;
indesc.type = DAQI_DAC;
indesc.channel = 0;
/// The output only has a range of 10V, therefore the monitor of the
/// output also has to be set to this value.
indesc.range = BIP10VOLTS;
indescs.push_back(indesc);
}
assert(indescs.size() == nchannels);
DEBUGTRACE_MESSAGE("Starting input scan");
err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels,
2 * nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, inscanflags, buf.data());
throwOnPossibleUlException(err);
}
void InBufHandler::start() {
DEBUGTRACE_ENTER;
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus);
throwOnPossibleUlException(err);
totalFramesCount = transferStatus.currentTotalCount;
topenqueued = true;
botenqueued = true;
}
bool InBufHandler::operator()() {
/* DEBUGTRACE_ENTER; */
bool ret = true;
auto runCallback = ([&](us totalOffset) {
/* DEBUGTRACE_ENTER; */
DaqData data(nFramesPerBlock, nchannels, dtype_descr.dtype);
us monitorOffset = monitorOutput ? 1 : 0;
/* /// Put the output monitor in front */
if (monitorOutput) {
for (us frame = 0; frame < nFramesPerBlock; frame++) {
data.value<double>(frame, 0) =
buf[totalOffset // Offset to lowest part of the buffer, or not
+ (frame * nchannels) // Data is interleaved, so skip each
+ (nchannels - 1)] // Monitor comes as last in the channel list,
// but we want it first in the output data.
;
}
}
// Now, all normal channels
for (us channel = 0; channel < nchannels - monitorOffset; channel++) {
/* DEBUGTRACE_PRINT(channel); */
for (us frame = 0; frame < nFramesPerBlock; frame++) {
data.value<double>(frame, channel + monitorOffset) =
buf[totalOffset + (frame * nchannels) + channel];
}
}
return cb(data);
});
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus);
throwOnPossibleUlException(err);
us increment = transferStatus.currentTotalCount - totalFramesCount;
totalFramesCount += increment;
if (increment > nFramesPerBlock) {
throw Daq::StreamException(Daq::StreamStatus::StreamError::inputXRun);
}
assert(status == SS_RUNNING);
if (transferStatus.currentIndex < (long long)buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
runCallback(nchannels * nFramesPerBlock);
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
runCallback(0);
topenqueued = true;
}
}
return ret;
}
InBufHandler::~InBufHandler() {
// At exit of the function, stop scanning.
DEBUGTRACE_ENTER;
UlError err = ulDaqInScanStop(daq.getHandle());
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
OutBufHandler::OutBufHandler(DT9837A &daq, OutDaqCallback cb)
: BufHandler(daq, daq.nenoutchannels()), cb(cb) {
DEBUGTRACE_MESSAGE("Starting output scan");
DEBUGTRACE_PRINT(nchannels);
AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT;
ScanOption scanoptions = SO_CONTINUOUS;
UlError err = ulAOutScan(daq.getHandle(), 0, 0, BIP10VOLTS,
2 * nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, outscanflags, buf.data());
throwOnPossibleUlException(err);
}
void OutBufHandler::start() {
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Unable to start output on DAQ");
}
if (status != SS_RUNNING) {
throw rte("Unable to start output on DAQ");
}
totalFramesCount = transferStatus.currentTotalCount;
topenqueued = true;
botenqueued = true;
}
bool OutBufHandler::operator()() {
DEBUGTRACE_ENTER;
bool res = true;
assert(daq.getHandle() != 0);
UlError err = ERR_NO_ERROR;
ScanStatus status;
TransferStatus transferStatus;
err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus);
throwOnPossibleUlException(err);
if (status != SS_RUNNING) {
return false;
}
us increment = transferStatus.currentTotalCount - totalFramesCount;
totalFramesCount += increment;
if (increment > nFramesPerBlock) {
cerr << "totalFramesCount: " << totalFramesCount << ". Detected output underrun" << endl;
/* throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun); */
}
if (transferStatus.currentIndex < buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
DaqData d(nFramesPerBlock, 1,// Only one output channel
dtype_descr.dtype);
// Receive data, run callback
cb(d);
d.copyToRaw(0, reinterpret_cast<byte_t *>(&(buf[buffer_mid_idx])));
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
DaqData d(nFramesPerBlock, 1,// Only one output channel
dtype_descr.dtype);
// Receive
cb(d);
d.copyToRaw(0, reinterpret_cast<byte_t *>(&(buf[0])));
topenqueued = true;
}
}
return res;
}
OutBufHandler::~OutBufHandler() {
DEBUGTRACE_ENTER;
UlError err = ulAOutScanStop(daq.getHandle());
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
#endif

View File

@ -1,83 +1,20 @@
#pragma once
#include "lasp_daq.h"
#include <algorithm>
#include <cassert>
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <uldaq.h>
#include <vector>
#include "lasp_types.h"
#include "lasp_uldaq_impl.h"
#include "lasp_uldaq_common.h"
using std::atomic;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
/**
* @brief List of available sampling frequencies for DT9837A
/** \addtogroup device
* @{
* \addtogroup uldaq
*/
const std::vector<d> ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000,
22050, 24000, 32000, 44056, 44100,
47250, 48000, 50000, 50400, 51000};
/**
* @brief UlDaq-specific device information. Adds a copy of the underlying
* DaqDeDaqDeviceDescriptor.
*/
class UlDaqDeviceInfo : public DeviceInfo {
public:
DaqDeviceDescriptor _uldaqDescriptor;
virtual std::unique_ptr<DeviceInfo> clone() const {
return std::make_unique<UlDaqDeviceInfo>(*this);
}
};
class DT9837A : public Daq {
DaqDeviceHandle _handle = 0;
std::mutex _daqmutex;
std::thread _thread;
atomic<bool> _stopThread{false};
atomic<StreamStatus> _streamStatus;
const us _nFramesPerBlock;
void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback);
public:
DaqDeviceHandle getHandle() const { return _handle; }
/**
* @brief Create a DT9837A instance.
*
* @param devinfo DeviceInfo to connect to
* @param config DaqConfiguration settings
*/
DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config);
virtual ~DT9837A();
bool isRunning() const;
void stop() override final;
friend class InBufHandler;
friend class OutBufHandler;
virtual void start(InDaqCallback inCallback,
OutDaqCallback outCallback) override final;
virtual StreamStatus getStreamStatus() const override {
return _streamStatus;
}
};
/**
* @brief Helper class for managing input and output samples of the DAQ device.
*/
class DT9837A;
class BufHandler {
protected:
/**
@ -96,12 +33,19 @@ protected:
* @brief Sampling frequency in Hz
*/
double samplerate;
/**
* @brief Storage capacity for the DAQ I/O.
*/
std::vector<double> buf;
/**
* @brief Whether the top / bottom part of the buffer are ready to be
* @brief Whether the top part of the buffer is enqueued
*/
bool topenqueued = false;
/**
* @brief Whether the bottom part of the buffer is enqueued
* enqueued
*/
bool topenqueued, botenqueued;
bool botenqueued = false;
/**
* @brief Counter for the total number of frames acquired / sent since the
@ -125,6 +69,7 @@ public:
0),
buffer_mid_idx(nchannels * nFramesPerBlock) {
assert(nchannels > 0);
assert(nFramesPerBlock > 0);
}
};
/**
@ -160,3 +105,5 @@ public:
~OutBufHandler();
};
/** @} */
/** @} */

View File

@ -0,0 +1,45 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1
#include "lasp_uldaq_common.h"
#include "lasp_daq.h"
string getErrMsg(UlError err) {
string errstr;
errstr.reserve(ERR_MSG_LEN);
char errmsg[ERR_MSG_LEN];
errstr = "UlDaq API Error: ";
ulGetErrMsg(err, errmsg);
errstr += errmsg;
return errstr;
}
void showErr(string errstr) {
std::cerr << "\b\n**************** UlDAQ backend error **********\n";
std::cerr << errstr << std::endl;
std::cerr << "***********************************************\n\n";
}
void showErr(UlError err) {
if (err != ERR_NO_ERROR)
showErr(getErrMsg(err));
}
#endif
void throwOnPossibleUlException(UlError err) {
if (err == ERR_NO_ERROR) {
return;
}
string errstr = getErrMsg(err);
showErr(errstr);
Daq::StreamStatus::StreamError serr;
if ((int)err == 18) {
serr = Daq::StreamStatus::StreamError::inputXRun;
} else if ((int)err == 19) {
serr = Daq::StreamStatus::StreamError::outputXRun;
} else {
serr = Daq::StreamStatus::StreamError::driverError;
}
throw Daq::StreamException(serr, errstr);
}

View File

@ -0,0 +1,66 @@
#pragma once
#include <uldaq.h>
#include <string>
#include "lasp_deviceinfo.h"
/** \addtogroup device
* @{
* \addtogroup uldaq
*/
/**
* @brief Throws an appropriate stream exception based on the UlError number.
* The mapping is based on the error numbers as given in uldaq.h. There are a
* log of errors definded here (109 in total). Except for some, we will map
* most of them to a driver error.
*
* @param e The backend error code.
*/
void throwOnPossibleUlException(UlError err);
/**
* @brief Return a string corresponding to the UlDaq API error
*
* @param err error code
*
* @return Error string
*/
string getErrMsg(UlError err);
/**
* @brief Print error message to stderr
*
* @param errstr The string to print
*/
void showErr(UlError err);
/**
* @brief Get a string representation of the error
*
* @param errstr
*/
void showErr(std::string errstr);
/**
* @brief UlDaq-specific device information. Adds a copy of the underlying
* DaqDeDaqDeviceDescriptor.
*/
class UlDaqDeviceInfo : public DeviceInfo {
public:
DaqDeviceDescriptor _uldaqDescriptor;
virtual std::unique_ptr<DeviceInfo> clone() const override {
DEBUGTRACE_ENTER;
return std::make_unique<UlDaqDeviceInfo>(*this);
}
};
/**
* @brief List of available sampling frequencies for DT9837A
*/
const std::vector<d> ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000,
22050, 24000, 32000, 44056, 44100,
47250, 48000, 50000, 50400, 51000};
/** @} */
/** @} */

View File

@ -0,0 +1,212 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1
#include "lasp_daqconfig.h"
#include "lasp_uldaq.h"
#include "lasp_uldaq_bufhandler.h"
#include "lasp_uldaq_impl.h"
using namespace std::literals::chrono_literals;
DT9837A::~DT9837A() {
DEBUGTRACE_ENTER;
UlError err;
if (isRunning()) {
DEBUGTRACE_PRINT("Stop UlDAQ from destructor");
stop();
}
if (_handle) {
DEBUGTRACE_PRINT("Disconnecting and releasing DaqDevice");
/* err = ulDisconnectDaqDevice(_handle); */
/* showErr(err); */
err = ulReleaseDaqDevice(_handle);
showErr(err);
}
}
DT9837A::DT9837A(const UlDaqDeviceInfo &devinfo, const DaqConfiguration &config)
: Daq(devinfo, config),
_nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) {
const DaqDeviceDescriptor &descriptor = devinfo._uldaqDescriptor;
DEBUGTRACE_PRINT(string("Device: ") + descriptor.productName);
DEBUGTRACE_PRINT(string("Product id: ") + to_string(descriptor.productId));
DEBUGTRACE_PRINT(string("Dev string: ") + descriptor.devString);
DEBUGTRACE_PRINT(string("Unique id: ") + descriptor.uniqueId);
// get a handle to the DAQ device associated with the first descriptor
_handle = ulCreateDaqDevice(descriptor);
if (_handle == 0) {
throw rte("Unable to create a handle to the specified DAQ "
"device. Is the device currently in use? Please make sure to set "
"the DAQ configuration in duplex mode if simultaneous input and "
"output is required.");
}
UlError err = ulConnectDaqDevice(_handle);
if (err != ERR_NO_ERROR) {
ulReleaseDaqDevice(_handle);
_handle = 0;
throw rte("Unable to connect to device: " + getErrMsg(err));
}
/// Loop over input channels, set parameters
for (us ch = 0; ch < 4; ch++) {
err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0);
showErr(err);
if (err != ERR_NO_ERROR) {
throw rte("Fatal: could normalize channel sensitivity");
}
CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC;
err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set AC/DC coupling mode");
}
IepeMode iepe =
inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED;
err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set IEPE mode");
}
}
}
bool DT9837A::isRunning() const {
DEBUGTRACE_ENTER;
/* return _thread.joinable(); */
StreamStatus status = _streamStatus;
return status.isRunning;
}
void DT9837A::stop() {
DEBUGTRACE_ENTER;
StreamStatus status = _streamStatus;
status.isRunning = true;
_streamStatus = status;
if (!isRunning()) {
throw rte("No data acquisition running");
}
// Stop the thread and join it
_stopThread = true;
assert(_thread.joinable());
_thread.join();
_stopThread = false;
// Update stream status
status.isRunning = false;
_streamStatus = status;
}
void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
if (isRunning()) {
throw rte("DAQ is already running");
}
if (neninchannels() > 0) {
if (!inCallback)
throw rte("DAQ requires a callback for input data");
}
if (nenoutchannels() > 0) {
if (!outCallback)
throw rte("DAQ requires a callback for output data");
}
assert(neninchannels() + nenoutchannels() > 0);
_thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback);
}
void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
try {
std::unique_ptr<OutBufHandler> obh;
std::unique_ptr<InBufHandler> ibh;
StreamStatus status = _streamStatus;
status.isRunning = true;
_streamStatus = status;
if (nenoutchannels() > 0) {
assert(outCallback);
obh = std::make_unique<OutBufHandler>(*this, outCallback);
}
if (neninchannels() > 0) {
assert(inCallback);
ibh = std::make_unique<InBufHandler>(*this, inCallback);
}
if (obh)
obh->start();
if (ibh)
ibh->start();
const double sleeptime_s =
static_cast<double>(_nFramesPerBlock) / (16 * samplerate());
const us sleeptime_us = static_cast<us>(sleeptime_s * 1e6);
while (!_stopThread) {
if (ibh) {
if (!(*ibh)()) {
_stopThread = true;
break;
}
}
if (obh) {
if (!(*obh)()) {
_stopThread = true;
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us));
}
/// Update stream status that we are not running anymore
status.isRunning = false;
_streamStatus = status;
_stopThread = false;
} catch (StreamException &e) {
StreamStatus status = _streamStatus;
// Copy over error type
status.errorType = e.e;
_streamStatus = status;
cerr << "\n******************\n";
cerr << "Catched error in UlDAQ thread: " << e.what() << endl;
cerr << "\n******************\n";
}
}
void DT9837A::sanityChecks() const {
// Some sanity checks
if (inchannel_config.size() != 4) {
throw rte("Invalid length of enabled inChannels vector");
}
if (outchannel_config.size() != 1) {
throw rte("Invalid length of enabled outChannels vector");
}
if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) {
throw rte("Unsensible number of samples per block chosen");
}
if (samplerate() < ULDAQ_SAMPLERATES.at(0) ||
samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size() - 1)) {
throw rte("Invalid sample rate");
}
}
#endif // LASP_HAS_ULDAQ

View File

@ -0,0 +1,110 @@
#pragma once
#include "debugtrace.hpp"
#include "lasp_uldaq_common.h"
#include <algorithm>
#include <cassert>
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <vector>
#include "lasp_daq.h"
using std::atomic;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
class InBufHandler;
class OutBufHandler;
/** \addtogroup device
* @{
* \addtogroup uldaq
*/
/**
* @brief Data translation DT9837A Daq device.
*/
class DT9837A : public Daq {
DaqDeviceHandle _handle = 0;
std::mutex _daqmutex;
/**
* @brief The thread that is doing I/O with UlDaq
*/
std::thread _thread;
/**
* @brief Flag indicating the thread to stop processing.
*/
atomic<bool> _stopThread{false};
/**
* @brief Storage for exchanging information on the stream
*/
atomic<StreamStatus> _streamStatus;
const us _nFramesPerBlock;
/**
* @brief The function that is running in a thread
*
* @param inCallback
* @param outcallback
*/
void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback);
/**
* @brief Obtain a handle to the underlying device
*
* @return Handle
*/
DaqDeviceHandle getHandle() const { return _handle; }
/**
* @brief Perform several sanity checks
*/
void sanityChecks() const;
public:
/**
* @brief Create a DT9837A instance.
*
* @param devinfo DeviceInfo to connect to
* @param config DaqConfiguration settings
*/
DT9837A(const UlDaqDeviceInfo &devinfo, const DaqConfiguration &config);
virtual ~DT9837A();
/**
* @brief Returns true when the stream is running
*
* @return as above stated
*/
bool isRunning() const;
/**
* @brief Stop the data-acquisition
*/
void stop() override final;
friend class InBufHandler;
friend class OutBufHandler;
virtual void start(InDaqCallback inCallback,
OutDaqCallback outCallback) override final;
/**
* @brief Obtain copy of stream status (thread-safe function)
*
* @return StreamStatus object
*/
virtual StreamStatus getStreamStatus() const override {
return _streamStatus;
}
};
/** @} */
/** @} */

View File

@ -1,6 +1,8 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_clip.h"
#include "lasp_daqdata.h"
#include "lasp_daq.h"
#include <mutex>
using std::cerr;
@ -9,13 +11,14 @@ using std::endl;
using Lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
ClipHandler::ClipHandler(StreamMgr &mgr)
ClipHandler::ClipHandler(SmgrHandle mgr)
: ThreadedInDataHandler(mgr){
DEBUGTRACE_ENTER;
startThread();
}
bool ClipHandler::inCallback_threaded(const DaqData &d) {
void ClipHandler::inCallback(const DaqData &d) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
@ -49,7 +52,6 @@ bool ClipHandler::inCallback_threaded(const DaqData &d) {
_clip_time(i) += _dt;
}
}
return true;
}
arma::uvec ClipHandler::getCurrentValue() const {
@ -89,6 +91,5 @@ void ClipHandler::reset(const Daq *daq) {
ClipHandler::~ClipHandler() {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
stop();
stopThread();
}

View File

@ -21,7 +21,7 @@
/**
* @brief Clipping detector (Clip). Detects when a signal overdrives the input
* */
class ClipHandler: public ThreadedInDataHandler {
class ClipHandler: public ThreadedInDataHandler<ClipHandler> {
/**
* @brief Assuming full scale of a signal is +/- 1.0. If a value is found
@ -58,7 +58,7 @@ class ClipHandler: public ThreadedInDataHandler {
*
* @param mgr Stream Mgr to operate on
*/
ClipHandler(StreamMgr& mgr);
ClipHandler(SmgrHandle mgr);
~ClipHandler();
/**
@ -68,8 +68,8 @@ class ClipHandler: public ThreadedInDataHandler {
*/
arma::uvec getCurrentValue() const;
bool inCallback_threaded(const DaqData& ) override final;
void reset(const Daq*) override final;
void inCallback(const DaqData& );
void reset(const Daq*);
};

View File

@ -1,6 +1,8 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_ppm.h"
#include "lasp_daqdata.h"
#include "lasp_daq.h"
#include <mutex>
using std::cerr;
@ -9,13 +11,14 @@ using std::endl;
using Lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps)
: ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) {
PPMHandler::PPMHandler(SmgrHandle mgr, const d decay_dBps)
: ThreadedInDataHandler<PPMHandler>(mgr), _decay_dBps(decay_dBps) {
DEBUGTRACE_ENTER;
startThread();
}
bool PPMHandler::inCallback_threaded(const DaqData &d) {
void PPMHandler::inCallback(const DaqData &d) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
@ -61,12 +64,11 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) {
_cur_max(i) *= _alpha;
}
}
return true;
}
std::tuple<vd, arma::uvec> PPMHandler::getCurrentValue() const {
DEBUGTRACE_ENTER;
/* DEBUGTRACE_ENTER; */
Lck lck(_mtx);
arma::uvec clips(_clip_time.size(), arma::fill::zeros);
@ -82,9 +84,11 @@ void PPMHandler::reset(const Daq *daq) {
if (daq) {
DEBUGTRACE_PRINT("New daq found");
_cur_max.fill(1e-80);
const us nchannels = daq->neninchannels();
DEBUGTRACE_PRINT(nchannels);
_max_range.resize(nchannels);
dvec ranges = daq->inputRangeForEnabledChannels();
@ -106,6 +110,5 @@ void PPMHandler::reset(const Daq *daq) {
PPMHandler::~PPMHandler() {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
stop();
stopThread();
}

View File

@ -4,7 +4,6 @@
//
// Description: Peak Programme Meter
#pragma once
#include <memory>
#include "lasp_filter.h"
#include "lasp_mathtypes.h"
#include "lasp_threadedindatahandler.h"
@ -23,7 +22,7 @@
* with a certain amount of dB/s. If a new peak is found, it goes up again.
* Also detects clipping.
* */
class PPMHandler: public ThreadedInDataHandler {
class PPMHandler : public ThreadedInDataHandler<PPMHandler> {
/**
* @brief Assuming full scale of a signal is +/- 1.0. If a value is found
@ -69,11 +68,11 @@ class PPMHandler: public ThreadedInDataHandler {
/**
* @brief Constructs Peak Programme Meter
*
* @param mgr Stream Mgr to operate on
* @param mgr Stream Mgr to install callbacks for
* @param decay_dBps The level decay in units dB/s, after a peak has been
* hit.
*/
PPMHandler(StreamMgr& mgr,const d decay_dBps = 20.0);
PPMHandler(SmgrHandle mgr,const d decay_dBps = 20.0);
~PPMHandler();
/**
@ -91,8 +90,8 @@ class PPMHandler: public ThreadedInDataHandler {
*
* @return true when stream should continue.
*/
bool inCallback_threaded(const DaqData& d) override final;
void reset(const Daq*) override final;
void inCallback(const DaqData& d);
void reset(const Daq*);
};

View File

@ -1,5 +1,7 @@
/* #define DEBUGTRACE_ENABLED */
#include "lasp_rtaps.h"
#include "lasp_daqdata.h"
#include "lasp_daq.h"
#include "debugtrace.hpp"
#include <mutex>
@ -7,7 +9,7 @@ using std::cerr;
using std::endl;
using Lck = std::scoped_lock<std::mutex>;
RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter,
RtAps::RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter,
const us nfft,
const Window::WindowType w,
const d overlap_percentage, const d time_constant)
@ -18,12 +20,12 @@ RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter,
_filterPrototype = freqWeightingFilter->clone();
}
startThread();
}
RtAps::~RtAps() {
Lck lck(_ps_mtx);
stop();
stopThread();
}
bool RtAps::inCallback_threaded(const DaqData &data) {
void RtAps::inCallback(const DaqData &data) {
DEBUGTRACE_ENTER;
@ -33,7 +35,7 @@ bool RtAps::inCallback_threaded(const DaqData &data) {
const us nchannels = fltdata.n_cols;
if(nchannels != _sens.size()) {
cerr << "**** Error: sensitivity size does not match! *****" << endl;
return false;
return;
}
fltdata.each_row() %= _sens.as_row();
@ -61,7 +63,6 @@ bool RtAps::inCallback_threaded(const DaqData &data) {
_ps.compute(fltdata);
return true;
}
void RtAps::reset(const Daq *daq) {

View File

@ -23,7 +23,7 @@
* @brief Real time spectral estimator using Welch method of spectral
* estimation.
*/
class RtAps : public ThreadedInDataHandler {
class RtAps : public ThreadedInDataHandler<RtAps> {
std::unique_ptr<Filter> _filterPrototype;
std::vector<std::unique_ptr<Filter>> _freqWeightingFilters;
@ -49,7 +49,7 @@ public:
*
* For all other arguments, see constructor of AvPowerSpectra
*/
RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, const us nfft = 2048,
RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, const us nfft = 2048,
const Window::WindowType w = Window::WindowType::Hann,
const d overlap_percentage = 50., const d time_constant = -1);
~RtAps();
@ -69,8 +69,8 @@ public:
*
* @return true if stream should continue.
*/
bool inCallback_threaded(const DaqData & d) override final;
void reset(const Daq *) override final;
void inCallback(const DaqData & d);
void reset(const Daq *);
};
/** @} */

View File

@ -1,5 +1,7 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_daqdata.h"
#include "lasp_daq.h"
#include "lasp_rtsignalviewer.h"
#include <algorithm>
#include <mutex>
@ -9,7 +11,7 @@ using std::endl;
using Lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
RtSignalViewer::RtSignalViewer(StreamMgr &mgr, const d approx_time_hist,
RtSignalViewer::RtSignalViewer(SmgrHandle mgr, const d approx_time_hist,
const us resolution, const us channel)
: ThreadedInDataHandler(mgr), _approx_time_hist(approx_time_hist),
_resolution(resolution), _channel(channel) {
@ -22,9 +24,10 @@ RtSignalViewer::RtSignalViewer(StreamMgr &mgr, const d approx_time_hist,
if (resolution <= 1) {
throw rte("Invalid resolution. Should be > 1");
}
startThread();
}
bool RtSignalViewer::inCallback_threaded(const DaqData &data) {
void RtSignalViewer::inCallback(const DaqData &data) {
DEBUGTRACE_ENTER;
@ -49,13 +52,10 @@ bool RtSignalViewer::inCallback_threaded(const DaqData &data) {
_dat(_resolution-1, 1) = newmin;
_dat(_resolution-1, 2) = newmax;
}
return true;
}
RtSignalViewer::~RtSignalViewer() {
Lck lck(_sv_mtx);
stop();
stopThread();
}
void RtSignalViewer::reset(const Daq *daq) {

View File

@ -24,7 +24,7 @@
* @brief Real time signal viewer. Shows envelope of the signal based on amount
* of history shown.
*/
class RtSignalViewer : public ThreadedInDataHandler {
class RtSignalViewer : public ThreadedInDataHandler<RtSignalViewer> {
/**
* @brief Storage for sensitivity values
@ -71,7 +71,7 @@ public:
* @param resolution Number of time points
* @param channel The channel number
*/
RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, const us resolution,
RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, const us resolution,
const us channel);
~RtSignalViewer();
@ -85,8 +85,8 @@ public:
*/
dmat getCurrentValue() const;
bool inCallback_threaded(const DaqData &) override final;
void reset(const Daq *) override final;
void inCallback(const DaqData &);
void reset(const Daq *);
};
/** @} */

View File

@ -1,24 +1,27 @@
/* #define DEBUGTRACE_ENABLED */
#include "lasp_threadedindatahandler.h"
#include "debugtrace.hpp"
#include "lasp_daqdata.h"
#include "lasp_thread.h"
#include <future>
#include <thread>
#include <queue>
#include <optional>
#include <queue>
#include <thread>
using namespace std::literals::chrono_literals;
using lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
using std::cerr;
using std::endl;
using std::placeholders::_1;
class SafeQueue {
std::queue<DaqData> _queue;
std::mutex _mtx;
std::atomic_int32_t _contents {0};
public:
void push(const DaqData& d) {
std::atomic_int32_t _contents{0};
public:
void push(const DaqData &d) {
DEBUGTRACE_ENTER;
lck lock(_mtx);
_queue.push(d);
@ -47,38 +50,52 @@ class SafeQueue {
bool empty() const { return _contents == 0; }
};
ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr)
: InDataHandler(mgr), _queue(std::make_unique<SafeQueue>()) {
ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr,
InCallbackType cb,
InResetType reset)
: _indatahandler(
mgr,
std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this,
_1),
reset),
_queue(std::make_unique<SafeQueue>()), inCallback(cb) {
DEBUGTRACE_ENTER;
// Initialize thread pool, if not already done
getPool();
}
bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) {
DEBUGTRACE_ENTER;
if (!_lastCallbackResult) {
return false;
}
// Initialize thread pool, if not already done
getPool();
}
void ThreadedInDataHandlerBase::startThread() {
DEBUGTRACE_ENTER;
_thread_can_safely_run = true;
_indatahandler.start();
}
void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
const DaqData &daqdata) {
DEBUGTRACE_ENTER;
std::scoped_lock lck(_mtx);
// Early return in case object is under DESTRUCTION
if (!_thread_can_safely_run)
return;
_queue->push(daqdata);
if (!_thread_running && (!_stopThread) && _lastCallbackResult) {
if (!_thread_running) {
auto &pool = getPool();
DEBUGTRACE_PRINT("Pushing new thread in pool");
_thread_running = true;
pool.push_task(&ThreadedInDataHandler::threadFcn, this);
pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this);
}
return _lastCallbackResult;
}
ThreadedInDataHandler::~ThreadedInDataHandler() {
void ThreadedInDataHandlerBase::stopThread() {
DEBUGTRACE_ENTER;
_stopThread = true;
// Make sure inCallback is no longer called
_thread_can_safely_run = false;
_indatahandler.stop();
std::scoped_lock lck(_mtx);
// Then wait in steps for the thread to stop running.
while (_thread_running) {
@ -86,18 +103,26 @@ ThreadedInDataHandler::~ThreadedInDataHandler() {
}
}
void ThreadedInDataHandler::threadFcn() {
ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
DEBUGTRACE_ENTER;
if (_thread_can_safely_run) {
stopThread();
cerr << "*** BUG: InDataHandlers have not been all stopped, while "
"StreamMgr destructor is called. This is a misuse BUG."
<< endl;
abort();
}
}
void ThreadedInDataHandlerBase::threadFcn() {
DEBUGTRACE_ENTER;
while(!_queue->empty() && !_stopThread) {
while (!_queue->empty() && _thread_can_safely_run) {
// Call inCallback_threaded
if (!inCallback_threaded(_queue->pop())) {
cerr << "*********** Callback result returned false! *************"
<< endl;
_lastCallbackResult = false;
}
inCallback(_queue->pop());
}
_thread_running = false;
}

View File

@ -1,9 +1,13 @@
#pragma once
#include "lasp_streammgr.h"
#include "debugtrace.hpp"
#include "lasp_indatahandler.h"
#include <atomic>
#include <memory>
#include <mutex>
using std::placeholders::_1;
const us RINGBUFFER_SIZE = 1024;
/**
* \addtogroup dsp
* @{
@ -14,51 +18,97 @@ const us RINGBUFFER_SIZE = 1024;
class SafeQueue;
/**
* @brief Threaded in data handler. Buffers inCallback data and calls a
* callback with the same signature on a different thread.
* @brief Threaded in data handler base. Buffers inCallback data and calls a
* callback with the same signature on a different thread. The main function of
* this is to offload the thread that handles the stream, such that expensive
* computations do not result in stream buffer xruns.
*/
class ThreadedInDataHandler: public InDataHandler {
class ThreadedInDataHandlerBase {
/**
* @brief The queue used to push elements to the handling thread.
*/
InDataHandler _indatahandler;
std::unique_ptr<SafeQueue> _queue;
mutable std::recursive_mutex _mtx;
std::atomic<bool> _thread_running{false};
std::atomic<bool> _stopThread{false};
std::atomic<bool> _lastCallbackResult{true};
std::atomic<bool> _thread_can_safely_run{false};
/**
* @brief Function pointer that is called when new DaqData arrives.
*/
const InCallbackType inCallback;
void threadFcn();
public:
/**
* @brief Initialize a ThreadedInDataHandler
*
* @param mgr StreamMgr singleton reference
*/
ThreadedInDataHandler(StreamMgr& mgr);
~ThreadedInDataHandler();
/**
* @brief Pushes a copy of the daqdata to the thread queue and returns
* @brief Pushes a copy of the daqdata to the thread queue and returns.
* Adds a thread to handle the queue, whihc will call inCallback();
*
* @param daqdata the daq info to push
*
* @return true, to continue with sampling.
*/
virtual bool inCallback(const DaqData &daqdata) override final;
void _inCallbackFromInDataHandler(const DaqData &daqdata);
public:
ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset);
~ThreadedInDataHandlerBase();
/**
* @brief This function should be overridden with an actual implementation,
* of what should happen on a different thread.
*
* @param d Input daq data
*
* @return true on succes. False when an error occured.
* @brief This method should be called from the derived class' constructor,
* to start the thread and data is incoming.
*/
virtual bool inCallback_threaded(const DaqData& d) = 0;
void startThread();
/**
* @brief This method SHOULD be called from all classes that derive on
* ThreadedInDataHandler. It is to make sure the inCallback_threaded()
* function is no longer called when the destructor of the derived class is
* called. Not calling this function is regarded as a BUG.
*/
void stopThread();
};
/**
* @brief A bit of curiously recurring template pattern, to connect the
* specific handlers and connect the proper callbacks in a type-agnostic way.
* Using this class, each threaded handler should just implement its reset()
* and inCallback() method. Ellides the virtual method calls.
*
* Usage: class XHandler: public ThreadedInDataHandler<XHandler> {
* public:
* XHandler(streammgr) : ThreadedInDataHandler(streammgr) {}
* void inCallback(const DaqData& d) { ... do something with d }
* void reset(const Daq* daq) { ... do something with daq }
* };
*
* For examples, see PPMHandler, etc.
*
* @tparam Derived The
*/
template <typename Derived>
class ThreadedInDataHandler : public ThreadedInDataHandlerBase {
public:
ThreadedInDataHandler(SmgrHandle mgr):
ThreadedInDataHandlerBase(mgr,
std::bind(&ThreadedInDataHandler::_inCallback, this, _1),
std::bind(&ThreadedInDataHandler::_reset, this, _1))
{
}
void _reset(const Daq* daq) {
DEBUGTRACE_ENTER;
return static_cast<Derived*>(this)->reset(daq);
}
void _inCallback(const DaqData& data) {
DEBUGTRACE_ENTER;
return static_cast<Derived*>(this)->inCallback(data);
}
};
/** @} */
/** @} */

View File

@ -19,7 +19,9 @@ class RecordStatus:
class Recording:
"""
Class used to perform a recording.
Class used to perform a recording. Recording data can come in from a
different thread, that is supposed to call the `inCallback` method, with
audio data as an argument.
"""
def __init__(
@ -99,7 +101,6 @@ class Recording:
logging.debug("Starting record....")
self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback)
self.indh.start()
if wait:
logging.debug("Stop recording with CTRL-C")

View File

@ -1,8 +1,10 @@
/* #define DEBUGTRACE_ENABLED */
#include "arma_npy.h"
#include "debugtrace.hpp"
#include "lasp_ppm.h"
#include "lasp_clip.h"
#include "lasp_daq.h"
#include "lasp_daqdata.h"
#include "lasp_ppm.h"
#include "lasp_rtaps.h"
#include "lasp_rtsignalviewer.h"
#include "lasp_streammgr.h"
@ -94,35 +96,68 @@ py::array_t<d> dmat_to_ndarray(const DaqData &d) {
}
/**
* @brief Wraps the InDataHandler such that it calls a Python callback with a
* buffer of sample data. The Python callback is called from a different
* thread, using a Numpy array as argument.
* @brief Wraps the ThreadedInDataHandler such that it calls a Python callback with a
* buffer of sample data. Converts DaqData objects to Numpy arrays and calls
* Python given as argument to the constructor
*/
class PyIndataHandler : public ThreadedInDataHandler {
class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
/**
* @brief The callback functions that is called.
*/
py::function cb, reset_callback;
public:
PyIndataHandler(StreamMgr &mgr, py::function cb, py::function reset_callback)
/**
* @brief Initialize PyIndataHandler
*
* @param mgr StreamMgr handle
* @param cb Python callback that is called with Numpy input data from device
* @param reset_callback Python callback that is called with a Daq pointer.
* Careful: do not store this handle, as it is only valid as long as reset()
* is called, when a stream stops, this pointer / handle will dangle.
*/
PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback)
: ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) {
DEBUGTRACE_ENTER;
/// Start should be called externally, as at constructor time no virtual
/// functions should be called.
/* start(); */
startThread();
}
~PyIndataHandler() {
DEBUGTRACE_ENTER;
stop();
stopThread();
}
/**
* @brief Calls the reset callback in Python.
*
* @param daq Daq device, or nullptr in case no input stream is running.
*/
void reset(const Daq *daq) {
DEBUGTRACE_ENTER;
py::gil_scoped_acquire acquire;
try {
if (daq) {
reset_callback(daq);
} else {
reset_callback(py::none());
}
} catch (py::error_already_set &e) {
cerr << "*************** Error calling reset callback!\n";
cerr << e.what() << endl;
cerr << "*************** \n";
/// Throwing a runtime error here does not work out one way or another.
/// Therefore, it is better to dive out and prevent undefined behaviour
abort();
/* throw std::runtime_error(e.what()); */
}
}
void reset(const Daq *daq) override final { reset_callback(daq); }
/**
* @brief Reads from the buffer
* @brief Calls the Python callback method / function with a Numpy array of
* stream data.
*/
bool inCallback_threaded(const DaqData &d) override final {
void inCallback(const DaqData &d) {
/* DEBUGTRACE_ENTER; */
@ -152,39 +187,29 @@ public:
} // End of switch
bool res = bool_val.cast<bool>();
if (!res)
return false;
} catch (py::error_already_set &e) {
cerr << "ERROR: Python raised exception from callback function: ";
cerr << e.what() << endl;
return false;
abort();
} catch (py::cast_error &e) {
cerr << e.what() << endl;
cerr << "ERROR: Python callback does not return boolean value." << endl;
return false;
abort();
}
return true;
}
};
void init_datahandler(py::module &m) {
py::class_<InDataHandler> idh(m, "InDataHandler_base");
idh.def("start", &InDataHandler::start);
idh.def("stop", &InDataHandler::stop);
py::class_<ThreadedInDataHandler, InDataHandler> tidh(
m, "ThreadedInDataHandler");
/// The C++ class is PyIndataHandler, but for Python, it is called
/// InDataHandler
py::class_<PyIndataHandler, ThreadedInDataHandler> pyidh(m, "InDataHandler");
pyidh.def(py::init<StreamMgr &, py::function, py::function>());
py::class_<PyIndataHandler> pyidh(m, "InDataHandler");
pyidh.def(py::init<SmgrHandle, py::function, py::function>());
/// Peak Programme Meter
py::class_<PPMHandler, ThreadedInDataHandler> ppm(m, "PPMHandler");
ppm.def(py::init<StreamMgr &, const d>());
ppm.def(py::init<StreamMgr &>());
py::class_<PPMHandler> ppm(m, "PPMHandler");
ppm.def(py::init<SmgrHandle, const d>());
ppm.def(py::init<SmgrHandle>());
ppm.def("getCurrentValue", [](const PPMHandler &ppm) {
std::tuple<vd, arma::uvec> tp = ppm.getCurrentValue();
@ -194,11 +219,10 @@ void init_datahandler(py::module &m) {
});
/// Clip Detector
py::class_<ClipHandler, ThreadedInDataHandler> clip(m, "ClipHandler");
clip.def(py::init<StreamMgr &>());
py::class_<ClipHandler> clip(m, "ClipHandler");
clip.def(py::init<SmgrHandle>());
clip.def("getCurrentValue", [](const ClipHandler &clip) {
arma::uvec cval = clip.getCurrentValue();
return ColToNpy<arma::uword>(cval); // something goes wrong here
@ -206,8 +230,8 @@ void init_datahandler(py::module &m) {
/// Real time Aps
///
py::class_<RtAps, ThreadedInDataHandler> rtaps(m, "RtAps");
rtaps.def(py::init<StreamMgr &, // StreamMgr
py::class_<RtAps> rtaps(m, "RtAps");
rtaps.def(py::init<SmgrHandle, // StreamMgr
Filter *const, // FreqWeighting filter
const us, // Nfft
const Window::WindowType, // Window
@ -233,11 +257,11 @@ void init_datahandler(py::module &m) {
/// Real time Signal Viewer
///
py::class_<RtSignalViewer, ThreadedInDataHandler> rtsv(m, "RtSignalViewer");
rtsv.def(py::init<StreamMgr &, // StreamMgr
const d, // Time history
const us, // Resolution
const us // Channel number
py::class_<RtSignalViewer> rtsv(m, "RtSignalViewer");
rtsv.def(py::init<SmgrHandle, // StreamMgr
const d, // Time history
const us, // Resolution
const us // Channel number
>());
rtsv.def("getCurrentValue", [](RtSignalViewer &rt) {

View File

@ -1,4 +1,5 @@
#include "lasp_streammgr.h"
#include "lasp_indatahandler.h"
#include <pybind11/numpy.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
@ -12,7 +13,7 @@ void init_streammgr(py::module &m) {
/// The stream manager is a singleton, and the lifetime is managed elsewhere.
// It should not be deleted.
py::class_<StreamMgr, std::unique_ptr<StreamMgr, py::nodelete>> smgr(
py::class_<StreamMgr, std::shared_ptr<StreamMgr>> smgr(
m, "StreamMgr");
py::enum_<StreamMgr::StreamType>(smgr, "StreamType")
@ -22,7 +23,7 @@ void init_streammgr(py::module &m) {
smgr.def("startStream", &StreamMgr::startStream);
smgr.def("stopStream", &StreamMgr::stopStream);
smgr.def_static("getInstance", []() {
return std::unique_ptr<StreamMgr, py::nodelete>(&StreamMgr::getInstance());
return StreamMgr::getInstance();
});
smgr.def("stopAllStreams", &StreamMgr::stopAllStreams);