StreamMgr handle now via shared pointers. InDataHandler stores weak pointers. Reset callback in PyInDataHandler could be problematic. Refactored the UlDaq code and moved to a subfolder.

This commit is contained in:
Anne de Jong 2023-06-07 21:49:07 +02:00
parent 6fc1bd90b1
commit c87a5cec25
25 changed files with 647 additions and 503 deletions

View File

@ -1,4 +1,5 @@
# src/lasp/device/CMakeLists.txt
include_directories(uldaq)
add_library(lasp_device_lib OBJECT
lasp_daq.cpp
@ -9,7 +10,9 @@ add_library(lasp_device_lib OBJECT
lasp_streammgr.cpp
lasp_indatahandler.cpp
lasp_uldaq.cpp
lasp_uldaq_impl.cpp
uldaq/lasp_uldaq_impl.cpp
uldaq/lasp_uldaq_bufhandler.cpp
uldaq/lasp_uldaq_common.cpp
)
# Callback requires certain arguments that are not used by code. This disables

View File

@ -1,27 +1,34 @@
/* #define DEBUGTRACE_ENABLED */
#include <thread>
#include "debugtrace.hpp"
#include "lasp_indatahandler.h"
#include "debugtrace.hpp"
#include "lasp_streammgr.h"
#include <thread>
InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr)
#if LASP_DEBUG==1
, _main_thread_id(std::this_thread::get_id())
InDataHandler::InDataHandler(SmgrHandle mgr)
: _mgr(mgr)
#if LASP_DEBUG == 1
,
_main_thread_id(std::this_thread::get_id())
#endif
{ DEBUGTRACE_ENTER; }
{
DEBUGTRACE_ENTER;
}
void InDataHandler::start() {
DEBUGTRACE_ENTER;
_mgr.addInDataHandler(*this);
if (SmgrHandle handle = _mgr.lock()) {
handle->addInDataHandler(this);
#if LASP_DEBUG == 1
assert(_mgr._main_thread_id == _main_thread_id);
assert(handle->_main_thread_id == _main_thread_id);
#endif
}
}
void InDataHandler::stop() {
#if LASP_DEBUG == 1
stopCalled = true;
#endif
_mgr.removeInDataHandler(*this);
if (SmgrHandle handle = _mgr.lock()) {
/* handle->removeInDataHandler(*this); */
}
}
InDataHandler::~InDataHandler() {
@ -30,9 +37,9 @@ InDataHandler::~InDataHandler() {
#if LASP_DEBUG == 1
if (!stopCalled) {
std::cerr << "************ BUG: Stop function not called while arriving at "
"InDataHandler's destructor. Fix this by calling "
"InDataHandler::stop() from the derived class' destructor."
<< std::endl;
"InDataHandler's destructor. Fix this by calling "
"InDataHandler::stop() from the derived class' destructor."
<< std::endl;
abort();
}
#endif

View File

@ -1,18 +1,21 @@
#pragma once
#include <atomic>
#include <memory>
#include <thread>
#include "lasp_types.h"
class StreamMgr;
using SmgrHandle = std::shared_ptr<StreamMgr>;
/** \addtogroup device
* @{
*/
class StreamMgr;
class DaqData;
class Daq;
class InDataHandler {
protected:
StreamMgr &_mgr;
std::weak_ptr<StreamMgr> _mgr;
#if LASP_DEBUG == 1
// This is a flag to indicate whether the method stop() is called for the
// current handler. It should call the method stop() from the derived class's
@ -29,7 +32,7 @@ public:
*
* @param mgr Stream manager.
*/
InDataHandler(StreamMgr &mgr);
InDataHandler(SmgrHandle mgr);
/**
* @brief This function is called when input data from a DAQ is available.

View File

@ -1,24 +1,35 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#define DEBUGTRACE_ENABLED
#include "lasp_streammgr.h"
#include "debugtrace.hpp"
#include "lasp_biquadbank.h"
#include "lasp_thread.h"
#include "lasp_indatahandler.h"
#include "lasp_thread.h"
#include <algorithm>
#include <assert.h>
#include <functional>
#include <iostream>
#include <memory>
using std::cerr;
using std::endl;
using rte = std::runtime_error;
/**
* @brief The main global handle to a stream, stored in a shared pointer.
*/
std::shared_ptr<StreamMgr> _mgr;
StreamMgr &StreamMgr::getInstance() {
std::shared_ptr<StreamMgr> StreamMgr::getInstance() {
DEBUGTRACE_ENTER;
static StreamMgr mgr;
return mgr;
if (!_mgr) {
_mgr = std::shared_ptr<StreamMgr>(new StreamMgr());
if (!_mgr) {
throw rte("Fatal: could not allocate stream manager!");
}
}
return _mgr;
}
StreamMgr::StreamMgr() {
@ -211,13 +222,12 @@ bool StreamMgr::outCallback(DaqData &data) {
StreamMgr::~StreamMgr() {
DEBUGTRACE_ENTER;
checkRightThread();
stopAllStreams();
if (!_inDataHandlers.empty()) {
cerr << "*** WARNING: InDataHandlers have not been all stopped, while "
"StreamMgr destructor is called. This is a misuse BUG"
<< endl;
abort();
}
// Stream manager now handled by shared pointer. Each indata handler gets a
// shared pointer to the stream manager, and stores a weak pointer to it.
// Hence, we do not have to do any cleanup here. It also makes sure that the
// order in which destructors are called does not matter anymore. As soon as
// the stream manager is destructed, the weak pointers loose there ref, and do
// not have to removeInDataHandler() anymore.
}
void StreamMgr::stopAllStreams() {
DEBUGTRACE_ENTER;
@ -242,15 +252,15 @@ void StreamMgr::startStream(const DaqConfiguration &config) {
std::scoped_lock lck(_devices_mtx);
DeviceInfo *devinfo = nullptr;
bool found = false;
// Match configuration to a device in the list of devices
for (auto &devinfoi : _devices) {
if (config.match(*devinfoi)) {
devinfo = devinfoi.get();
break;
}
}
if (!devinfo) {
if (devinfo == nullptr) {
throw rte("Could not find a device with name " + config.device_name +
" in list of devices.");
}
@ -383,23 +393,20 @@ void StreamMgr::stopStream(const StreamType t) {
}
}
void StreamMgr::addInDataHandler(InDataHandler &handler) {
void StreamMgr::addInDataHandler(InDataHandler *handler) {
DEBUGTRACE_ENTER;
checkRightThread();
assert(handler);
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
if (_inputStream) {
handler.reset(_inputStream.get());
} else {
handler.reset(nullptr);
}
if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), &handler) !=
handler->reset(_inputStream.get());
if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) !=
_inDataHandlers.cend()) {
throw std::runtime_error("Error: handler already added. Probably start() "
"is called more than once on a handler object");
}
_inDataHandlers.push_back(&handler);
_inDataHandlers.push_back(handler);
DEBUGTRACE_PRINT(_inDataHandlers.size());
}
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
@ -409,7 +416,6 @@ void StreamMgr::removeInDataHandler(InDataHandler &handler) {
_inDataHandlers.remove(&handler);
DEBUGTRACE_PRINT(_inDataHandlers.size());
}
Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {

View File

@ -57,15 +57,18 @@ class StreamMgr {
*/
DeviceInfoList _devices;
// Singleton, no public constructor. Can only be obtained using
// getInstance();
StreamMgr();
friend class InDataHandler;
friend class Siggen;
// Singleton, no public destructor
~StreamMgr();
public:
~StreamMgr();
enum class StreamType : us {
/**
* @brief Input stream
@ -85,7 +88,7 @@ class StreamMgr {
*
* @return Reference to stream manager.
*/
static StreamMgr &getInstance();
static std::shared_ptr<StreamMgr> getInstance();
/**
* @brief Obtain a list of devices currently available. When the StreamMgr is
@ -200,7 +203,7 @@ private:
*
* @param handler The handler to add.
*/
void addInDataHandler(InDataHandler &handler);
void addInDataHandler(InDataHandler *handler);
/**
* @brief Do the actual rescanning.

View File

@ -1,6 +1,7 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1
#include "lasp_uldaq.h"
#include "lasp_uldaq_impl.h"
@ -17,13 +18,13 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) {
DaqDeviceDescriptor descriptor;
DaqDeviceInterface interfaceType = ANY_IFC;
err = ulGetDaqDeviceInventory(interfaceType, devdescriptors,
static_cast<unsigned *>(&numdevs));
err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, &numdevs);
if (err != ERR_NO_ERROR) {
throw rte("UlDaq device inventarization failed");
}
DEBUGTRACE_PRINT(string("Number of devices: ") + std::to_string(numdevs));
for (unsigned i = 0; i < numdevs; i++) {
descriptor = devdescriptors[i];
@ -33,7 +34,7 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) {
devinfo.api = uldaqapi;
{
string name, interface;
string name;
string productname = descriptor.productName;
if (productname != "DT9837A") {
throw rte("Unknown UlDAQ type: " + productname);
@ -56,9 +57,8 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) {
name = "Uknown interface = ";
}
name +=
string(descriptor.productName) + " " + string(descriptor.uniqueId);
devinfo.device_name = std::move(name);
name += productname + " " + string(descriptor.uniqueId);
devinfo.device_name = name;
}
devinfo.physicalOutputQty = DaqChannel::Qty::Voltage;
@ -93,7 +93,12 @@ void fillUlDaqDeviceInfo(DeviceInfoList &devinfolist) {
std::unique_ptr<Daq> createUlDaqDevice(const DeviceInfo &devinfo,
const DaqConfiguration &config) {
return std::make_unique<DT9837A>(devinfo, config);
const UlDaqDeviceInfo *_info =
dynamic_cast<const UlDaqDeviceInfo *>(&devinfo);
if (_info == nullptr) {
throw rte("BUG: Could not cast DeviceInfo to UlDaqDeviceInfo");
}
return std::make_unique<DT9837A>(*_info, config);
}
#endif // LASP_HAS_ULDAQ

View File

@ -1,164 +0,0 @@
#pragma once
#include "debugtrace.hpp"
#include "lasp_daq.h"
#include <algorithm>
#include <cassert>
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <uldaq.h>
#include <vector>
using std::atomic;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
/**
* @brief List of available sampling frequencies for DT9837A
*/
const std::vector<d> ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000,
22050, 24000, 32000, 44056, 44100,
47250, 48000, 50000, 50400, 51000};
/**
* @brief UlDaq-specific device information. Adds a copy of the underlying
* DaqDeDaqDeviceDescriptor.
*/
class UlDaqDeviceInfo : public DeviceInfo {
public:
DaqDeviceDescriptor _uldaqDescriptor;
virtual std::unique_ptr<DeviceInfo> clone() const override {
DEBUGTRACE_ENTER;
return std::make_unique<UlDaqDeviceInfo>(*this);
}
};
class DT9837A : public Daq {
DaqDeviceHandle _handle = 0;
std::mutex _daqmutex;
std::thread _thread;
atomic<bool> _stopThread{false};
atomic<StreamStatus> _streamStatus;
const us _nFramesPerBlock;
void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback);
public:
DaqDeviceHandle getHandle() const { return _handle; }
/**
* @brief Create a DT9837A instance.
*
* @param devinfo DeviceInfo to connect to
* @param config DaqConfiguration settings
*/
DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config);
virtual ~DT9837A();
bool isRunning() const;
void stop() override final;
friend class InBufHandler;
friend class OutBufHandler;
virtual void start(InDaqCallback inCallback,
OutDaqCallback outCallback) override final;
virtual StreamStatus getStreamStatus() const override {
return _streamStatus;
}
};
/**
* @brief Helper class for managing input and output samples of the DAQ device.
*/
class BufHandler {
protected:
/**
* @brief Reference to underlying Daq
*/
DT9837A &daq;
/**
* @brief The type of data, in this case always double precision floats
*/
const DataTypeDescriptor dtype_descr = dtype_desc_fl64;
/**
* @brief The number of channels, number of frames per callback (block).
*/
us nchannels, nFramesPerBlock;
/**
* @brief Sampling frequency in Hz
*/
double samplerate;
std::vector<double> buf;
/**
* @brief Whether the top / bottom part of the buffer are ready to be
* enqueued
*/
bool topenqueued, botenqueued;
/**
* @brief Counter for the total number of frames acquired / sent since the
* start of the stream.
*/
us totalFramesCount = 0;
long long buffer_mid_idx;
public:
/**
* @brief Initialize bufhandler
*
* @param daq
* @param nchannels
*/
BufHandler(DT9837A &daq, const us nchannels)
: daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels),
nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()),
buf(2 * nchannels *
nFramesPerBlock, // Watch the two here, the top and the bottom!
0),
buffer_mid_idx(nchannels * nFramesPerBlock) {
assert(nchannels > 0);
}
};
/**
* @brief Specific helper for the input buffer
*/
class InBufHandler : public BufHandler {
bool monitorOutput;
InDaqCallback cb;
public:
InBufHandler(DT9837A &daq, InDaqCallback cb);
void start();
/**
* @brief InBufHandler::operator()()
*
* @return true on success
*/
bool operator()();
~InBufHandler();
};
class OutBufHandler : public BufHandler {
OutDaqCallback cb;
public:
OutBufHandler(DT9837A &daq, OutDaqCallback cb);
void start();
/**
* @brief OutBufHandler::operator()
*
* @return true on success
*/
bool operator()();
~OutBufHandler();
};

View File

@ -3,192 +3,8 @@
#include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1
#include "lasp_daqconfig.h"
#include "lasp_uldaq.h"
#include "lasp_uldaq_impl.h"
using namespace std::literals::chrono_literals;
/**
* @brief Reserve some space for an error message from UlDaq
*/
const us UL_ERR_MSG_LEN = 512;
/**
* @brief Return a string corresponding to the UlDaq API error
*
* @param err error code
*
* @return Error string
*/
string getErrMsg(UlError err) {
string errstr;
errstr.reserve(UL_ERR_MSG_LEN);
char errmsg[UL_ERR_MSG_LEN];
errstr = "UlDaq API Error: ";
ulGetErrMsg(err, errmsg);
errstr += errmsg;
return errstr;
}
inline void showErr(string errstr) {
std::cerr << "\b\n**************** UlDAQ backend error **********\n";
std::cerr << errstr << std::endl;
std::cerr << "***********************************************\n\n";
}
inline void showErr(UlError err) {
if (err != ERR_NO_ERROR)
showErr(getErrMsg(err));
}
DT9837A::~DT9837A() {
UlError err;
if (isRunning()) {
stop();
}
if (_handle) {
err = ulDisconnectDaqDevice(_handle);
showErr(err);
err = ulReleaseDaqDevice(_handle);
showErr(err);
}
}
DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config)
: Daq(devinfo, config),
_nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) {
// Some sanity checks
if (inchannel_config.size() != 4) {
throw rte("Invalid length of enabled inChannels vector");
}
if (outchannel_config.size() != 1) {
throw rte("Invalid length of enabled outChannels vector");
}
if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) {
throw rte("Unsensible number of samples per block chosen");
}
if (samplerate() < ULDAQ_SAMPLERATES.at(0) ||
samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size() - 1)) {
throw rte("Invalid sample rate");
}
const UlDaqDeviceInfo *_info =
dynamic_cast<const UlDaqDeviceInfo *>(&devinfo);
if (_info == nullptr) {
throw rte("BUG: Could not cast DeviceInfo to UlDaqDeviceInfo");
}
// get a handle to the DAQ device associated with the first descriptor
_handle = ulCreateDaqDevice(_info->_uldaqDescriptor);
if (_handle == 0) {
throw rte("Unable to create a handle to the specified DAQ "
"device. Is the device currently in use? Please make sure to set "
"the DAQ configuration in duplex mode if simultaneous input and "
"output is required.");
}
UlError err = ulConnectDaqDevice(_handle);
if (err != ERR_NO_ERROR) {
ulReleaseDaqDevice(_handle);
_handle = 0;
throw rte("Unable to connect to device: " + getErrMsg(err));
}
/// Loop over input channels, set parameters
for (us ch = 0; ch < 4; ch++) {
err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0);
showErr(err);
if (err != ERR_NO_ERROR) {
throw rte("Fatal: could normalize channel sensitivity");
}
CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC;
err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set AC/DC coupling mode");
}
IepeMode iepe =
inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED;
err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set IEPE mode");
}
}
}
bool DT9837A::isRunning() const {
DEBUGTRACE_ENTER;
return _thread.joinable();
}
void DT9837A::stop() {
DEBUGTRACE_ENTER;
StreamStatus status = _streamStatus;
if (!isRunning()) {
throw rte("No data acquisition running");
}
// Stop the thread and join it
_stopThread = true;
assert(_thread.joinable());
_thread.join();
_stopThread = false;
// Update stream status
status.isRunning = false;
_streamStatus = status;
}
/**
* @brief Throws an appropriate stream exception based on the UlError number.
* The mapping is based on the error numbers as given in uldaq.h. There are a
* log of errors definded here (109 in total). Except for some, we will map
* most of them to a driver error.
*
* @param e The backend error code.
*/
inline void throwOnPossibleUlException(UlError err) {
if (err == ERR_NO_ERROR) {
return;
}
string errstr = getErrMsg(err);
showErr(errstr);
Daq::StreamStatus::StreamError serr;
if ((int)err == 18) {
serr = Daq::StreamStatus::StreamError::inputXRun;
} else if ((int)err == 19) {
serr = Daq::StreamStatus::StreamError::outputXRun;
} else {
serr = Daq::StreamStatus::StreamError::driverError;
}
throw Daq::StreamException(serr, errstr);
}
void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
if (isRunning()) {
throw rte("DAQ is already running");
}
if (neninchannels() > 0) {
if (!inCallback)
throw rte("DAQ requires a callback for input data");
}
if (nenoutchannels() > 0) {
if (!outCallback)
throw rte("DAQ requires a callback for output data");
}
assert(neninchannels() + nenoutchannels() > 0);
_thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback);
}
#include "lasp_uldaq_bufhandler.h"
#include "lasp_daq.h"
InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb)
: BufHandler(daq, daq.neninchannels()), cb(cb)
@ -371,7 +187,7 @@ void OutBufHandler::start() {
}
bool OutBufHandler::operator()() {
/* DEBUGTRACE_ENTER; */
DEBUGTRACE_ENTER;
bool res = true;
assert(daq.getHandle() != 0);
@ -389,7 +205,8 @@ bool OutBufHandler::operator()() {
totalFramesCount += increment;
if (increment > nFramesPerBlock) {
throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun);
cerr << "totalFramesCount: " << totalFramesCount << ". Detected output underrun" << endl;
/* throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun); */
}
if (transferStatus.currentIndex < buffer_mid_idx) {
@ -425,70 +242,4 @@ OutBufHandler::~OutBufHandler() {
showErr(err);
}
}
void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
try {
std::unique_ptr<OutBufHandler> obh;
std::unique_ptr<InBufHandler> ibh;
StreamStatus status = _streamStatus;
status.isRunning = true;
_streamStatus = status;
if (nenoutchannels() > 0) {
assert(outCallback);
obh = std::make_unique<OutBufHandler>(*this, outCallback);
}
if (neninchannels() > 0) {
assert(inCallback);
ibh = std::make_unique<InBufHandler>(*this, inCallback);
}
if (obh)
obh->start();
if (ibh)
ibh->start();
const double sleeptime_s =
static_cast<double>(_nFramesPerBlock) / (16 * samplerate());
const us sleeptime_us = static_cast<us>(sleeptime_s * 1e6);
while (!_stopThread) {
if (ibh) {
if (!(*ibh)()) {
_stopThread = true;
break;
}
}
if (obh) {
if (!(*obh)()) {
_stopThread = true;
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us));
}
/// Update stream status that we are not running anymore
status.isRunning = false;
_streamStatus = status;
_stopThread = false;
} catch (StreamException &e) {
StreamStatus status = _streamStatus;
// Copy over error type
status.errorType = e.e;
_streamStatus = status;
cerr << "\n******************\n";
cerr << "Catched error in UlDAQ thread: " << e.what() << endl;
cerr << "\n******************\n";
}
}
#endif // LASP_HAS_ULDAQ
#endif

View File

@ -0,0 +1,99 @@
#pragma once
#include <uldaq.h>
#include "lasp_types.h"
#include "lasp_uldaq_impl.h"
#include "lasp_uldaq_common.h"
class DT9837A;
/**
* @brief Helper class for managing input and output samples of the DAQ device.
*/
class BufHandler {
protected:
/**
* @brief Reference to underlying Daq
*/
DT9837A &daq;
/**
* @brief The type of data, in this case always double precision floats
*/
const DataTypeDescriptor dtype_descr = dtype_desc_fl64;
/**
* @brief The number of channels, number of frames per callback (block).
*/
us nchannels, nFramesPerBlock;
/**
* @brief Sampling frequency in Hz
*/
double samplerate;
std::vector<double> buf;
/**
* @brief Whether the top part of the buffer is enqueued
*/
bool topenqueued = false;
/**
* @brief Whether the bottom part of the buffer is enqueued
* enqueued
*/
bool botenqueued = false;
/**
* @brief Counter for the total number of frames acquired / sent since the
* start of the stream.
*/
us totalFramesCount = 0;
long long buffer_mid_idx;
public:
/**
* @brief Initialize bufhandler
*
* @param daq
* @param nchannels
*/
BufHandler(DT9837A &daq, const us nchannels)
: daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels),
nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()),
buf(2 * nchannels *
nFramesPerBlock, // Watch the two here, the top and the bottom!
0),
buffer_mid_idx(nchannels * nFramesPerBlock) {
assert(nchannels > 0);
assert(nFramesPerBlock > 0);
}
};
/**
* @brief Specific helper for the input buffer
*/
class InBufHandler : public BufHandler {
bool monitorOutput;
InDaqCallback cb;
public:
InBufHandler(DT9837A &daq, InDaqCallback cb);
void start();
/**
* @brief InBufHandler::operator()()
*
* @return true on success
*/
bool operator()();
~InBufHandler();
};
class OutBufHandler : public BufHandler {
OutDaqCallback cb;
public:
OutBufHandler(DT9837A &daq, OutDaqCallback cb);
void start();
/**
* @brief OutBufHandler::operator()
*
* @return true on success
*/
bool operator()();
~OutBufHandler();
};

View File

@ -0,0 +1,45 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1
#include "lasp_uldaq_common.h"
#include "lasp_daq.h"
string getErrMsg(UlError err) {
string errstr;
errstr.reserve(ERR_MSG_LEN);
char errmsg[ERR_MSG_LEN];
errstr = "UlDaq API Error: ";
ulGetErrMsg(err, errmsg);
errstr += errmsg;
return errstr;
}
void showErr(string errstr) {
std::cerr << "\b\n**************** UlDAQ backend error **********\n";
std::cerr << errstr << std::endl;
std::cerr << "***********************************************\n\n";
}
void showErr(UlError err) {
if (err != ERR_NO_ERROR)
showErr(getErrMsg(err));
}
#endif
void throwOnPossibleUlException(UlError err) {
if (err == ERR_NO_ERROR) {
return;
}
string errstr = getErrMsg(err);
showErr(errstr);
Daq::StreamStatus::StreamError serr;
if ((int)err == 18) {
serr = Daq::StreamStatus::StreamError::inputXRun;
} else if ((int)err == 19) {
serr = Daq::StreamStatus::StreamError::outputXRun;
} else {
serr = Daq::StreamStatus::StreamError::driverError;
}
throw Daq::StreamException(serr, errstr);
}

View File

@ -0,0 +1,60 @@
#pragma once
#include <uldaq.h>
#include <string>
#include "lasp_deviceinfo.h"
/**
* @brief Throws an appropriate stream exception based on the UlError number.
* The mapping is based on the error numbers as given in uldaq.h. There are a
* log of errors definded here (109 in total). Except for some, we will map
* most of them to a driver error.
*
* @param e The backend error code.
*/
void throwOnPossibleUlException(UlError err);
/**
* @brief Return a string corresponding to the UlDaq API error
*
* @param err error code
*
* @return Error string
*/
string getErrMsg(UlError err);
/**
* @brief Print error message to stderr
*
* @param errstr The string to print
*/
void showErr(UlError err);
/**
* @brief Get a string representation of the error
*
* @param errstr
*/
void showErr(std::string errstr);
/**
* @brief UlDaq-specific device information. Adds a copy of the underlying
* DaqDeDaqDeviceDescriptor.
*/
class UlDaqDeviceInfo : public DeviceInfo {
public:
DaqDeviceDescriptor _uldaqDescriptor;
virtual std::unique_ptr<DeviceInfo> clone() const override {
DEBUGTRACE_ENTER;
return std::make_unique<UlDaqDeviceInfo>(*this);
}
};
/**
* @brief List of available sampling frequencies for DT9837A
*/
const std::vector<d> ULDAQ_SAMPLERATES = {8000, 10000, 11025, 16000, 20000,
22050, 24000, 32000, 44056, 44100,
47250, 48000, 50000, 50400, 51000};

View File

@ -0,0 +1,212 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1
#include "lasp_daqconfig.h"
#include "lasp_uldaq.h"
#include "lasp_uldaq_bufhandler.h"
#include "lasp_uldaq_impl.h"
using namespace std::literals::chrono_literals;
DT9837A::~DT9837A() {
DEBUGTRACE_ENTER;
UlError err;
if (isRunning()) {
DEBUGTRACE_PRINT("Stop UlDAQ from destructor");
stop();
}
if (_handle) {
DEBUGTRACE_PRINT("Disconnecting and releasing DaqDevice");
/* err = ulDisconnectDaqDevice(_handle); */
/* showErr(err); */
err = ulReleaseDaqDevice(_handle);
showErr(err);
}
}
DT9837A::DT9837A(const UlDaqDeviceInfo &devinfo, const DaqConfiguration &config)
: Daq(devinfo, config),
_nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) {
const DaqDeviceDescriptor &descriptor = devinfo._uldaqDescriptor;
DEBUGTRACE_PRINT(string("Device: ") + descriptor.productName);
DEBUGTRACE_PRINT(string("Product id: ") + to_string(descriptor.productId));
DEBUGTRACE_PRINT(string("Dev string: ") + descriptor.devString);
DEBUGTRACE_PRINT(string("Unique id: ") + descriptor.uniqueId);
// get a handle to the DAQ device associated with the first descriptor
_handle = ulCreateDaqDevice(descriptor);
if (_handle == 0) {
throw rte("Unable to create a handle to the specified DAQ "
"device. Is the device currently in use? Please make sure to set "
"the DAQ configuration in duplex mode if simultaneous input and "
"output is required.");
}
UlError err = ulConnectDaqDevice(_handle);
if (err != ERR_NO_ERROR) {
ulReleaseDaqDevice(_handle);
_handle = 0;
throw rte("Unable to connect to device: " + getErrMsg(err));
}
/// Loop over input channels, set parameters
for (us ch = 0; ch < 4; ch++) {
err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0);
showErr(err);
if (err != ERR_NO_ERROR) {
throw rte("Fatal: could normalize channel sensitivity");
}
CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC;
err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set AC/DC coupling mode");
}
IepeMode iepe =
inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED;
err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set IEPE mode");
}
}
}
bool DT9837A::isRunning() const {
DEBUGTRACE_ENTER;
/* return _thread.joinable(); */
StreamStatus status = _streamStatus;
return status.isRunning;
}
void DT9837A::stop() {
DEBUGTRACE_ENTER;
StreamStatus status = _streamStatus;
status.isRunning = true;
_streamStatus = status;
/* if (!isRunning()) { */
/* throw rte("No data acquisition running"); */
/* } */
// Stop the thread and join it
/* _stopThread = true; */
/* assert(_thread.joinable()); */
/* _thread.join(); */
/* _stopThread = false; */
// Update stream status
status.isRunning = false;
_streamStatus = status;
}
void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
if (isRunning()) {
throw rte("DAQ is already running");
}
if (neninchannels() > 0) {
if (!inCallback)
throw rte("DAQ requires a callback for input data");
}
if (nenoutchannels() > 0) {
if (!outCallback)
throw rte("DAQ requires a callback for output data");
}
assert(neninchannels() + nenoutchannels() > 0);
/* _thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback);
*/
}
void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
try {
std::unique_ptr<OutBufHandler> obh;
std::unique_ptr<InBufHandler> ibh;
StreamStatus status = _streamStatus;
status.isRunning = true;
_streamStatus = status;
if (nenoutchannels() > 0) {
assert(outCallback);
obh = std::make_unique<OutBufHandler>(*this, outCallback);
}
if (neninchannels() > 0) {
assert(inCallback);
ibh = std::make_unique<InBufHandler>(*this, inCallback);
}
if (obh)
obh->start();
if (ibh)
ibh->start();
const double sleeptime_s =
static_cast<double>(_nFramesPerBlock) / (16 * samplerate());
const us sleeptime_us = static_cast<us>(sleeptime_s * 1e6);
while (!_stopThread) {
if (ibh) {
if (!(*ibh)()) {
_stopThread = true;
break;
}
}
if (obh) {
if (!(*obh)()) {
_stopThread = true;
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us));
}
/// Update stream status that we are not running anymore
status.isRunning = false;
_streamStatus = status;
_stopThread = false;
} catch (StreamException &e) {
StreamStatus status = _streamStatus;
// Copy over error type
status.errorType = e.e;
_streamStatus = status;
cerr << "\n******************\n";
cerr << "Catched error in UlDAQ thread: " << e.what() << endl;
cerr << "\n******************\n";
}
}
void DT9837A::sanityChecks() const {
// Some sanity checks
if (inchannel_config.size() != 4) {
throw rte("Invalid length of enabled inChannels vector");
}
if (outchannel_config.size() != 1) {
throw rte("Invalid length of enabled outChannels vector");
}
if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) {
throw rte("Unsensible number of samples per block chosen");
}
if (samplerate() < ULDAQ_SAMPLERATES.at(0) ||
samplerate() > ULDAQ_SAMPLERATES.at(ULDAQ_SAMPLERATES.size() - 1)) {
throw rte("Invalid sample rate");
}
}
#endif // LASP_HAS_ULDAQ

View File

@ -0,0 +1,96 @@
#pragma once
#include "debugtrace.hpp"
#include "lasp_uldaq_common.h"
#include <algorithm>
#include <cassert>
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <vector>
#include "lasp_daq.h"
using std::atomic;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
class InBufHandler;
class OutBufHandler;
class DT9837A : public Daq {
DaqDeviceHandle _handle = 0;
std::mutex _daqmutex;
/**
* @brief The thread that is doing I/O with UlDaq
*/
std::thread _thread;
/**
* @brief Flag indicating the thread to stop processing.
*/
atomic<bool> _stopThread{false};
/**
* @brief Storage for exchanging information on the stream
*/
atomic<StreamStatus> _streamStatus;
const us _nFramesPerBlock;
/**
* @brief The function that is running in a thread
*
* @param inCallback
* @param outcallback
*/
void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback);
/**
* @brief Obtain a handle to the underlying device
*
* @return Handle
*/
DaqDeviceHandle getHandle() const { return _handle; }
/**
* @brief Perform several sanity checks
*/
void sanityChecks() const;
public:
/**
* @brief Create a DT9837A instance.
*
* @param devinfo DeviceInfo to connect to
* @param config DaqConfiguration settings
*/
DT9837A(const UlDaqDeviceInfo &devinfo, const DaqConfiguration &config);
virtual ~DT9837A();
bool isRunning() const;
/**
* @brief Stop the data-acquisition
*/
void stop() override final;
friend class InBufHandler;
friend class OutBufHandler;
virtual void start(InDaqCallback inCallback,
OutDaqCallback outCallback) override final;
/**
* @brief Obtain copy of stream status (thread-safe function)
*
* @return StreamStatus object
*/
virtual StreamStatus getStreamStatus() const override {
return _streamStatus;
}
};

View File

@ -11,7 +11,7 @@ using std::endl;
using Lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
ClipHandler::ClipHandler(StreamMgr &mgr)
ClipHandler::ClipHandler(SmgrHandle mgr)
: ThreadedInDataHandler(mgr){
DEBUGTRACE_ENTER;

View File

@ -58,7 +58,7 @@ class ClipHandler: public ThreadedInDataHandler {
*
* @param mgr Stream Mgr to operate on
*/
ClipHandler(StreamMgr& mgr);
ClipHandler(SmgrHandle mgr);
~ClipHandler();
/**

View File

@ -11,7 +11,7 @@ using std::endl;
using Lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps)
PPMHandler::PPMHandler(SmgrHandle mgr, const d decay_dBps)
: ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) {
DEBUGTRACE_ENTER;

View File

@ -73,7 +73,7 @@ class PPMHandler: public ThreadedInDataHandler {
* @param decay_dBps The level decay in units dB/s, after a peak has been
* hit.
*/
PPMHandler(StreamMgr& mgr,const d decay_dBps = 20.0);
PPMHandler(SmgrHandle mgr,const d decay_dBps = 20.0);
~PPMHandler();
/**

View File

@ -9,7 +9,7 @@ using std::cerr;
using std::endl;
using Lck = std::scoped_lock<std::mutex>;
RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter,
RtAps::RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter,
const us nfft,
const Window::WindowType w,
const d overlap_percentage, const d time_constant)

View File

@ -49,7 +49,7 @@ public:
*
* For all other arguments, see constructor of AvPowerSpectra
*/
RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, const us nfft = 2048,
RtAps(SmgrHandle mgr, const Filter *freqWeightingFilter, const us nfft = 2048,
const Window::WindowType w = Window::WindowType::Hann,
const d overlap_percentage = 50., const d time_constant = -1);
~RtAps();

View File

@ -11,7 +11,7 @@ using std::endl;
using Lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
RtSignalViewer::RtSignalViewer(StreamMgr &mgr, const d approx_time_hist,
RtSignalViewer::RtSignalViewer(SmgrHandle mgr, const d approx_time_hist,
const us resolution, const us channel)
: ThreadedInDataHandler(mgr), _approx_time_hist(approx_time_hist),
_resolution(resolution), _channel(channel) {

View File

@ -71,7 +71,7 @@ public:
* @param resolution Number of time points
* @param channel The channel number
*/
RtSignalViewer(StreamMgr &mgr, const d approx_time_hist, const us resolution,
RtSignalViewer(SmgrHandle mgr, const d approx_time_hist, const us resolution,
const us channel);
~RtSignalViewer();

View File

@ -49,7 +49,7 @@ public:
bool empty() const { return _contents == 0; }
};
ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr)
ThreadedInDataHandler::ThreadedInDataHandler(SmgrHandle mgr)
: InDataHandler(mgr), _queue(std::make_unique<SafeQueue>()) {
DEBUGTRACE_ENTER;
@ -58,6 +58,7 @@ ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr)
getPool();
}
void ThreadedInDataHandler::startThread() {
DEBUGTRACE_ENTER;
_thread_can_safely_run = true;
start();
}
@ -85,7 +86,7 @@ bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) {
return _lastCallbackResult;
}
void ThreadedInDataHandler::stopThread() {
DEBUGTRACE_ENTER;
// Make sure inCallback is no longer called
_thread_can_safely_run = false;
stop();

View File

@ -52,8 +52,8 @@ public:
*
* @param mgr StreamMgr singleton reference
*/
ThreadedInDataHandler(StreamMgr &mgr);
~ThreadedInDataHandler();
ThreadedInDataHandler(SmgrHandle mgr);
virtual ~ThreadedInDataHandler();
/**
* @brief Pushes a copy of the daqdata to the thread queue and returns

View File

@ -1,14 +1,14 @@
/* #define DEBUGTRACE_ENABLED */
#include "arma_npy.h"
#include "debugtrace.hpp"
#include "lasp_ppm.h"
#include "lasp_clip.h"
#include "lasp_daq.h"
#include "lasp_daqdata.h"
#include "lasp_ppm.h"
#include "lasp_rtaps.h"
#include "lasp_rtsignalviewer.h"
#include "lasp_threadedindatahandler.h"
#include "lasp_daqdata.h"
#include "lasp_daq.h"
#include "lasp_streammgr.h"
#include "lasp_threadedindatahandler.h"
#include <armadillo>
#include <atomic>
#include <chrono>
@ -107,7 +107,7 @@ class PyIndataHandler : public ThreadedInDataHandler {
py::function cb, reset_callback;
public:
PyIndataHandler(StreamMgr &mgr, py::function cb, py::function reset_callback)
PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback)
: ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) {
DEBUGTRACE_ENTER;
@ -119,7 +119,25 @@ public:
DEBUGTRACE_ENTER;
stopThread();
}
void reset(const Daq *daq) override final { reset_callback(daq); }
void reset(const Daq *daq) override final {
DEBUGTRACE_ENTER;
py::gil_scoped_acquire acquire;
try {
if (daq) {
reset_callback(daq);
} else {
reset_callback(py::none());
}
} catch (py::error_already_set &e) {
cerr << "*************** Error calling reset callback!\n";
cerr << e.what() << endl;
cerr << "*************** \n";
/// Throwing a runtime error here does not work out one way or another.
/// Therefore, it is better to dive out and prevent undefined behaviour
abort();
/* throw std::runtime_error(e.what()); */
}
}
/**
* @brief Reads from the buffer
@ -174,12 +192,12 @@ void init_datahandler(py::module &m) {
/// The C++ class is PyIndataHandler, but for Python, it is called
/// InDataHandler
py::class_<PyIndataHandler> pyidh(m, "InDataHandler");
pyidh.def(py::init<StreamMgr &, py::function, py::function>());
pyidh.def(py::init<SmgrHandle, py::function, py::function>());
/// Peak Programme Meter
py::class_<PPMHandler> ppm(m, "PPMHandler");
ppm.def(py::init<StreamMgr &, const d>());
ppm.def(py::init<StreamMgr &>());
ppm.def(py::init<SmgrHandle, const d>());
ppm.def(py::init<SmgrHandle>());
ppm.def("getCurrentValue", [](const PPMHandler &ppm) {
std::tuple<vd, arma::uvec> tp = ppm.getCurrentValue();
@ -190,10 +208,9 @@ void init_datahandler(py::module &m) {
/// Clip Detector
py::class_<ClipHandler> clip(m, "ClipHandler");
clip.def(py::init<StreamMgr &>());
clip.def(py::init<SmgrHandle>());
clip.def("getCurrentValue", [](const ClipHandler &clip) {
arma::uvec cval = clip.getCurrentValue();
return ColToNpy<arma::uword>(cval); // something goes wrong here
@ -202,7 +219,7 @@ void init_datahandler(py::module &m) {
/// Real time Aps
///
py::class_<RtAps> rtaps(m, "RtAps");
rtaps.def(py::init<StreamMgr &, // StreamMgr
rtaps.def(py::init<SmgrHandle, // StreamMgr
Filter *const, // FreqWeighting filter
const us, // Nfft
const Window::WindowType, // Window
@ -229,10 +246,10 @@ void init_datahandler(py::module &m) {
/// Real time Signal Viewer
///
py::class_<RtSignalViewer> rtsv(m, "RtSignalViewer");
rtsv.def(py::init<StreamMgr &, // StreamMgr
const d, // Time history
const us, // Resolution
const us // Channel number
rtsv.def(py::init<SmgrHandle, // StreamMgr
const d, // Time history
const us, // Resolution
const us // Channel number
>());
rtsv.def("getCurrentValue", [](RtSignalViewer &rt) {

View File

@ -13,7 +13,7 @@ void init_streammgr(py::module &m) {
/// The stream manager is a singleton, and the lifetime is managed elsewhere.
// It should not be deleted.
py::class_<StreamMgr, std::unique_ptr<StreamMgr, py::nodelete>> smgr(
py::class_<StreamMgr, std::shared_ptr<StreamMgr>> smgr(
m, "StreamMgr");
py::enum_<StreamMgr::StreamType>(smgr, "StreamType")
@ -23,7 +23,7 @@ void init_streammgr(py::module &m) {
smgr.def("startStream", &StreamMgr::startStream);
smgr.def("stopStream", &StreamMgr::stopStream);
smgr.def_static("getInstance", []() {
return std::unique_ptr<StreamMgr, py::nodelete>(&StreamMgr::getInstance());
return StreamMgr::getInstance();
});
smgr.def("stopAllStreams", &StreamMgr::stopAllStreams);