Splitted UlDAQ code in header and cpp file to make a more logical separation between declaration and implementation. BUGFIX: lasp_record did not return True in self.finish(), resulting in a stream stop.

This commit is contained in:
Anne de Jong 2023-01-04 14:21:39 +01:00
parent 44c3e390b3
commit 8befe4afc8
9 changed files with 669 additions and 603 deletions

View File

@ -8,6 +8,7 @@ add_library(lasp_device_lib OBJECT
lasp_rtaudiodaq.cpp lasp_rtaudiodaq.cpp
lasp_streammgr.cpp lasp_streammgr.cpp
lasp_uldaq.cpp lasp_uldaq.cpp
lasp_uldaq_impl.cpp
) )
# Callback requires certain arguments that are not used by code. This disables # Callback requires certain arguments that are not used by code. This disables

View File

@ -44,7 +44,6 @@ public:
systemError, systemError,
threadError, threadError,
logicError, logicError,
apiSpecificError
}; };
/** /**
@ -59,6 +58,7 @@ public:
{StreamError::threadError, "Thread error"}, {StreamError::threadError, "Thread error"},
{StreamError::logicError, "Logic error (probably a bug)"}, {StreamError::logicError, "Logic error (probably a bug)"},
}; };
bool isRunning = false; bool isRunning = false;
/** /**
* @brief Check if stream has error * @brief Check if stream has error
@ -78,6 +78,25 @@ public:
* @return as described above. * @return as described above.
*/ */
bool runningOK() const { return isRunning && !error(); } bool runningOK() const { return isRunning && !error(); }
}; // End of class StreamStatus
using rte = std::runtime_error;
/**
* @brief Used for internal throwing of exceptions.
*/
class StreamException : public rte {
using StreamError = StreamStatus::StreamError;
public:
StreamStatus::StreamError e;
StreamException(const StreamStatus::StreamError e)
: rte(StreamStatus::errorMessages.at(e)), e(e) {}
StreamException(const StreamStatus::StreamError e,
const std::string &additional_info)
: rte(StreamStatus::errorMessages.at(e) + ": " + additional_info),
e(e) {}
operator StreamError() { return e; }
}; };
/** /**
@ -161,7 +180,7 @@ public:
* * * *
* @return Maximum offset from 0 before clipping. * @return Maximum offset from 0 before clipping.
*/ */
dvec inputRangeForEnabledChannels(const bool include_monitor=true) const; dvec inputRangeForEnabledChannels(const bool include_monitor = true) const;
/** /**
* @brief Returns datatype (enum) corresponding to the datatype of the * @brief Returns datatype (enum) corresponding to the datatype of the
@ -176,7 +195,7 @@ public:
* *
* @return A DataTypeDescriptor * @return A DataTypeDescriptor
*/ */
const DataTypeDescriptor& dtypeDescr() const; const DataTypeDescriptor &dtypeDescr() const;
/** /**
* @brief The number of frames that is send in a block of DaqData. * @brief The number of frames that is send in a block of DaqData.

View File

@ -17,8 +17,6 @@ using rte = std::runtime_error;
using std::vector; using std::vector;
using lck = std::scoped_lock<std::mutex>; using lck = std::scoped_lock<std::mutex>;
DEBUGTRACE_VARIABLES;
void fillRtAudioDeviceInfo(vector<DeviceInfo> &devinfolist) { void fillRtAudioDeviceInfo(vector<DeviceInfo> &devinfolist) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;

View File

@ -1,599 +1,22 @@
/* #define DEBUGTRACE_ENABLED */ /* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp" #include "debugtrace.hpp"
#include "lasp_config.h" #include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1 #if LASP_HAS_ULDAQ == 1
#include "lasp_daqconfig.h"
#include "lasp_uldaq.h"
#include <algorithm>
#include <atomic>
#include <cassert>
#include <chrono>
#include <gsl/gsl-lite.hpp>
#include <iostream>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <uldaq.h> #include <uldaq.h>
#include <vector> #include "lasp_uldaq.h"
#include "lasp_uldaq_impl.h"
using namespace std::literals::chrono_literals;
using std::atomic;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
#include "debugtrace.hpp" void fillUlDaqDeviceInfo(std::vector<DeviceInfo> &devinfolist,void* vDescriptors) {
DEBUGTRACE_VARIABLES;
const us MAX_DEV_COUNT_PER_API = 100;
/**
* @brief Reserve some space for an error message from UlDaq
*/
const us UL_ERR_MSG_LEN = 512;
/**
* @brief Show the error to default error stream and return a string
* corresponding to the error
*
* @param err Error string
*/
string showErr(UlError err) {
string errstr;
errstr.reserve(UL_ERR_MSG_LEN);
if (err != ERR_NO_ERROR) {
char errmsg[UL_ERR_MSG_LEN];
errstr = "UlDaq API Error: ";
ulGetErrMsg(err, errmsg);
errstr += errmsg;
std::cerr << "\b\n**************** UlDAQ backend error **********\n";
std::cerr << errstr << std::endl;
std::cerr << "***********************************************\n\n";
return errstr;
}
return errstr;
}
class DT9837A : public Daq {
DaqDeviceHandle _handle = 0;
std::mutex _daqmutex;
std::thread _thread;
atomic<bool> _stopThread{false};
atomic<StreamStatus> _streamStatus;
const us _nFramesPerBlock;
void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback);
public:
DaqDeviceHandle getHandle() const { return _handle; }
DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config);
~DT9837A() {
UlError err;
if (isRunning()) {
stop();
}
if (_handle) {
err = ulDisconnectDaqDevice(_handle);
showErr(err);
err = ulReleaseDaqDevice(_handle);
showErr(err);
}
}
bool isRunning() const {
DEBUGTRACE_ENTER;
return _thread.joinable();
}
virtual void start(InDaqCallback inCallback,
OutDaqCallback outCallback) override final;
virtual StreamStatus getStreamStatus() const override {
return _streamStatus;
}
void stop() override final {
DEBUGTRACE_ENTER;
StreamStatus status = _streamStatus;
if (!isRunning()) {
throw rte("No data acquisition running");
}
_stopThread = true;
if (_thread.joinable()) {
_thread.join();
}
_stopThread = false;
status.isRunning = false;
_streamStatus = status;
}
friend class InBufHandler;
friend class OutBufHandler;
};
void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
if (isRunning()) {
throw rte("DAQ is already running");
}
if (neninchannels() > 0) {
if (!inCallback)
throw rte("DAQ requires a callback for input data");
}
if (nenoutchannels() > 0) {
if (!outCallback)
throw rte("DAQ requires a callback for output data");
}
assert(neninchannels() + nenoutchannels() > 0);
_thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback);
}
class BufHandler {
protected:
DT9837A &daq;
const DataTypeDescriptor dtype_descr;
us nchannels, nFramesPerBlock;
double samplerate;
std::vector<double> buf;
bool topenqueued, botenqueued;
us increment = 0;
us totalFramesCount = 0;
long long buffer_mid_idx;
public:
BufHandler(DT9837A &daq, const us nchannels)
: daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels),
nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()),
buf(2 * nchannels *
nFramesPerBlock, // Watch the two here, the top and the bottom!
0),
buffer_mid_idx(nchannels * nFramesPerBlock) {
assert(nchannels > 0);
}
};
class InBufHandler : public BufHandler {
bool monitorOutput;
InDaqCallback cb;
public:
InBufHandler(DT9837A &daq, InDaqCallback cb)
: BufHandler(daq, daq.neninchannels()), cb(cb)
{
DEBUGTRACE_ENTER;
assert(daq.getHandle() != 0);
monitorOutput = daq.monitorOutput;
DaqInScanFlag inscanflags = DAQINSCAN_FF_DEFAULT;
ScanOption scanoptions = SO_CONTINUOUS;
UlError err = ERR_NO_ERROR;
std::vector<DaqInChanDescriptor> indescs;
boolvec eninchannels_without_mon = daq.eninchannels(false);
DEBUGTRACE_PRINT(eninchannels_without_mon.size());
// Initialize input, if any
dvec ranges = daq.inputRangeForEnabledChannels(false);
// Update range index only when an enabled channel is found.
us range_idx = 0;
for (us chin = 0; chin < 4; chin++) {
if (eninchannels_without_mon[chin] == true) {
DaqInChanDescriptor indesc;
indesc.type = DAQI_ANALOG_SE;
indesc.channel = chin;
double rangeval = ranges.at(range_idx);
range_idx++;
Range rangenum;
if (fabs(rangeval - 1.0) < 1e-8) {
rangenum = BIP1VOLTS;
} else if (fabs(rangeval - 10.0) < 1e-8) {
rangenum = BIP10VOLTS;
} else {
std::cerr << "Fatal: input range value is invalid" << endl;
return;
}
indesc.range = rangenum;
indescs.push_back(indesc);
}
}
// Add possibly last channel as monitor
if (monitorOutput) {
DaqInChanDescriptor indesc;
indesc.type = DAQI_DAC;
indesc.channel = 0;
/// The output only has a range of 10V, therefore the monitor of the
/// output also has to be set to this value.
indesc.range = BIP10VOLTS;
indescs.push_back(indesc);
}
assert(indescs.size() == nchannels);
DEBUGTRACE_MESSAGE("Starting input scan");
err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels,
2 * nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, inscanflags, buf.data());
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Could not start input DAQ");
}
}
void start() {
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Unable to start input on DAQ");
}
totalFramesCount = transferStatus.currentTotalCount;
topenqueued = true;
botenqueued = true;
}
/**
* @brief InBufHandler::operator()()
*
* @return true on success
*/
bool operator()() {
/* DEBUGTRACE_ENTER; */
bool ret = true;
auto runCallback = ([&](us totalOffset) {
/* DEBUGTRACE_ENTER; */
DaqData data(nFramesPerBlock, nchannels,
DataTypeDescriptor::DataType::dtype_fl64);
us monitorOffset = monitorOutput ? 1 : 0;
/* /// Put the output monitor in front */
if (monitorOutput) {
for (us frame = 0; frame < nFramesPerBlock; frame++) {
data.value<double>(frame, 0) =
buf[totalOffset + (frame * nchannels) + (nchannels - 1)];
}
}
for (us channel = 0; channel < nchannels - monitorOffset; channel++) {
/* DEBUGTRACE_PRINT(channel); */
for (us frame = 0; frame < nFramesPerBlock; frame++) {
data.value<double>(frame, channel + monitorOffset) =
buf[totalOffset + (frame * nchannels) + channel];
}
}
return cb(data);
});
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
return false;
}
increment = transferStatus.currentTotalCount - totalFramesCount;
totalFramesCount += increment;
if (increment > nFramesPerBlock) {
cerr << "Error: overrun for input of DAQ!" << endl;
return false;
}
assert(status == SS_RUNNING);
if (transferStatus.currentIndex < (long long)buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
ret = runCallback(nchannels * nFramesPerBlock);
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
ret = runCallback(0);
topenqueued = true;
}
}
return ret;
}
~InBufHandler() {
// At exit of the function, stop scanning.
DEBUGTRACE_ENTER;
UlError err = ulDaqInScanStop(daq.getHandle());
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
};
class OutBufHandler : public BufHandler {
OutDaqCallback cb;
public:
OutBufHandler(DT9837A &daq, OutDaqCallback cb)
: BufHandler(daq, daq.nenoutchannels()), cb(cb) {
DEBUGTRACE_MESSAGE("Starting output scan");
DEBUGTRACE_PRINT(nchannels);
AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT;
ScanOption scanoptions = SO_CONTINUOUS;
UlError err =
ulAOutScan(daq.getHandle(), 0, 0, BIP10VOLTS,
2 * nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, outscanflags, buf.data());
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Unable to start output on DAQ");
}
}
void start() {
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Unable to start output on DAQ");
}
if (status != SS_RUNNING) {
throw rte("Unable to start output on DAQ");
}
totalFramesCount = transferStatus.currentTotalCount;
topenqueued = true;
botenqueued = true;
}
/**
* @brief OutBufHandler::operator()
*
* @return true on success
*/
bool operator()() {
/* DEBUGTRACE_ENTER; */
bool res = true;
assert(daq.getHandle() != 0);
UlError err = ERR_NO_ERROR;
ScanStatus status;
TransferStatus transferStatus;
err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
return false;
}
if (status != SS_RUNNING) {
return false;
}
increment = transferStatus.currentTotalCount - totalFramesCount;
totalFramesCount += increment;
if (increment > nFramesPerBlock) {
cerr << "Error: underrun for output of DAQ!" << endl;
return false;
}
if (transferStatus.currentIndex < buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
DaqData d(nFramesPerBlock,1,
DataTypeDescriptor::DataType::dtype_fl64);
res = cb(d);
d.copyToRaw(0, reinterpret_cast<byte_t*>(&(buf[buffer_mid_idx])));
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
DaqData d(nFramesPerBlock,1,
DataTypeDescriptor::DataType::dtype_fl64);
res = cb(d);
d.copyToRaw(0, reinterpret_cast<byte_t*>(&(buf[0])));
topenqueued = true;
}
}
return res;
}
~OutBufHandler() {
DEBUGTRACE_ENTER;
UlError err = ulAOutScanStop(daq.getHandle());
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
};
void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
/* cerr << "******************\n" */ DaqDeviceDescriptor* descriptors_copy = static_cast<DaqDeviceDescriptor*>(vDescriptors);
/* "Todo: the current way of handling timing in this DAQ thread is not " */
/* "really robust, due " */
/* "to input / output callbacks that can be too time-consuming. We have " */
/* "to fix the " */
/* "sleep_for to properly deal with longer callbacks." */
/* "\n*****************" */
/* << endl; */
try {
std::unique_ptr<OutBufHandler> obh;
std::unique_ptr<InBufHandler> ibh;
StreamStatus status = _streamStatus;
status.isRunning = true;
_streamStatus = status;
if (nenoutchannels() > 0) {
assert(outCallback);
obh = std::make_unique<OutBufHandler>(*this, outCallback);
}
if (neninchannels() > 0) {
assert(inCallback);
ibh = std::make_unique<InBufHandler>(*this, inCallback);
}
if (obh)
obh->start();
if (ibh)
ibh->start();
const double sleeptime_s =
static_cast<double>(_nFramesPerBlock) / (16 * samplerate());
const us sleeptime_us = static_cast<us>(sleeptime_s * 1e6);
while (!_stopThread) {
if (ibh) {
if (!(*ibh)()) {
_stopThread = true;
}
}
if (obh) {
if (!(*obh)()) {
_stopThread = true;
}
} else {
std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us));
}
}
} catch (rte &e) {
StreamStatus status = _streamStatus;
status.isRunning = false;
status.errorType = StreamStatus::StreamError::systemError;
_streamStatus = status;
cerr << "\n******************\n";
cerr << "Catched error in UlDAQ thread: " << e.what() << endl;
cerr << "\n******************\n";
}
StreamStatus status = _streamStatus;
status.isRunning = false;
_streamStatus = status;
_stopThread = false;
}
std::unique_ptr<Daq> createUlDaqDevice(const DeviceInfo &devinfo,
const DaqConfiguration &config) {
return std::make_unique<DT9837A>(devinfo, config);
}
DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config)
: Daq(devinfo, config),
_nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) {
// Some sanity checks
if (inchannel_config.size() != 4) {
throw rte("Invalid length of enabled inChannels vector");
}
if (outchannel_config.size() != 1) {
throw rte("Invalid length of enabled outChannels vector");
}
if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) {
throw rte("Unsensible number of samples per block chosen");
}
if (samplerate() < 10000 || samplerate() > 51000) {
throw rte("Invalid sample rate");
}
DaqDeviceDescriptor devdescriptors[MAX_DEV_COUNT_PER_API];
DaqDeviceDescriptor descriptor;
DaqDeviceInterface interfaceType = ANY_IFC;
UlError err; UlError err;
unsigned int numdevs = MAX_ULDAQ_DEV_COUNT_PER_API;
us numdevs = MAX_DEV_COUNT_PER_API; DaqDeviceDescriptor devdescriptors[MAX_ULDAQ_DEV_COUNT_PER_API];
err = ulGetDaqDeviceInventory(interfaceType, devdescriptors,
(unsigned *)&numdevs);
if (err != ERR_NO_ERROR) {
throw rte("Device inventarization failed");
}
if ((us)api_specific_devindex >= numdevs) {
throw rte("Device number {deviceno} too high {err}. This could "
"happen when the device is currently not connected");
}
descriptor = devdescriptors[api_specific_devindex];
// get a handle to the DAQ device associated with the first descriptor
_handle = ulCreateDaqDevice(descriptor);
if (_handle == 0) {
throw rte("Unable to create a handle to the specified DAQ "
"device. Is the device currently in use? Please make sure to set "
"the DAQ configuration in duplex mode if simultaneous input and "
"output is required.");
}
err = ulConnectDaqDevice(_handle);
if (err != ERR_NO_ERROR) {
ulReleaseDaqDevice(_handle);
_handle = 0;
throw rte(string("Unable to connect to device: " + showErr(err)));
}
for (us ch = 0; ch < 4; ch++) {
err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0);
showErr(err);
if (err != ERR_NO_ERROR) {
throw rte("Fatal: could normalize channel sensitivity");
}
CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC;
err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set AC/DC coupling mode");
}
IepeMode iepe =
inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED;
err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set IEPE mode");
}
}
}
void fillUlDaqDeviceInfo(std::vector<DeviceInfo> &devinfolist) {
DEBUGTRACE_ENTER;
UlError err;
unsigned int numdevs = MAX_DEV_COUNT_PER_API;
DaqDeviceDescriptor devdescriptors[MAX_DEV_COUNT_PER_API];
DaqDeviceDescriptor descriptor; DaqDeviceDescriptor descriptor;
DaqDeviceInterface interfaceType = ANY_IFC; DaqDeviceInterface interfaceType = ANY_IFC;
@ -608,6 +31,11 @@ void fillUlDaqDeviceInfo(std::vector<DeviceInfo> &devinfolist) {
descriptor = devdescriptors[i]; descriptor = devdescriptors[i];
// Copy structure over, if given as not nullptr
if(descriptors_copy) {
descriptors_copy[i] = descriptor;
}
DeviceInfo devinfo; DeviceInfo devinfo;
devinfo.api = uldaqapi; devinfo.api = uldaqapi;
string name, interface; string name, interface;
@ -666,4 +94,11 @@ void fillUlDaqDeviceInfo(std::vector<DeviceInfo> &devinfolist) {
devinfolist.push_back(devinfo); devinfolist.push_back(devinfo);
} }
} }
std::unique_ptr<Daq> createUlDaqDevice(const DeviceInfo &devinfo,
const DaqConfiguration &config) {
return std::make_unique<DT9837A>(devinfo, config);
}
#endif // LASP_HAS_ULDAQ #endif // LASP_HAS_ULDAQ

View File

@ -1,6 +1,13 @@
#pragma once #pragma once
#include "lasp_daq.h" #include "lasp_daq.h"
/**
* @brief The maximum number of devices that can be enumerated when calling
* ulGetDaqDeviceInventory()
*/
const us MAX_ULDAQ_DEV_COUNT_PER_API = 100;
std::unique_ptr<Daq> createUlDaqDevice(const DeviceInfo& devinfo, std::unique_ptr<Daq> createUlDaqDevice(const DeviceInfo& devinfo,
const DaqConfiguration& config); const DaqConfiguration& config);
@ -8,7 +15,10 @@ std::unique_ptr<Daq> createUlDaqDevice(const DeviceInfo& devinfo,
* @brief Fill device info list with UlDaq specific devices, if any. * @brief Fill device info list with UlDaq specific devices, if any.
* *
* @param devinfolist Info list to append to * @param devinfolist Info list to append to
* @param descriptors Pointer to array
* DaqDeviceDescriptors[MAX_ULDAQ_DEV_COUNT_PER_API]. If a pointer is given, a
* copy of the device descriptors is set to the memory of this pointer. We use
* a void* pointer here to not expose the implementation of UlDaq.
*/ */
void fillUlDaqDeviceInfo(std::vector<DeviceInfo> &devinfolist); void fillUlDaqDeviceInfo(std::vector<DeviceInfo>, void* descriptors=nullptr);

View File

@ -0,0 +1,487 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_config.h"
#if LASP_HAS_ULDAQ == 1
#include "lasp_daqconfig.h"
#include "lasp_uldaq.h"
#include "lasp_uldaq_impl.h"
using namespace std::literals::chrono_literals;
/**
* @brief Reserve some space for an error message from UlDaq
*/
const us UL_ERR_MSG_LEN = 512;
/**
* @brief Return a string corresponding to the UlDaq API error
*
* @param err error code
*
* @return Error string
*/
string getErrMsg(UlError err) {
string errstr;
errstr.reserve(UL_ERR_MSG_LEN);
char errmsg[UL_ERR_MSG_LEN];
errstr = "UlDaq API Error: ";
ulGetErrMsg(err, errmsg);
errstr += errmsg;
return errstr;
}
inline void showErr(string errstr) {
std::cerr << "\b\n**************** UlDAQ backend error **********\n";
std::cerr << errstr << std::endl;
std::cerr << "***********************************************\n\n";
}
inline void showErr(UlError err) { showErr(getErrMsg(err)); }
DT9837A::~DT9837A() {
UlError err;
if (isRunning()) {
stop();
}
if (_handle) {
err = ulDisconnectDaqDevice(_handle);
showErr(err);
err = ulReleaseDaqDevice(_handle);
showErr(err);
}
}
DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config)
: Daq(devinfo, config),
_nFramesPerBlock(availableFramesPerBlock.at(framesPerBlockIndex)) {
// Some sanity checks
if (inchannel_config.size() != 4) {
throw rte("Invalid length of enabled inChannels vector");
}
if (outchannel_config.size() != 1) {
throw rte("Invalid length of enabled outChannels vector");
}
if (_nFramesPerBlock < 24 || _nFramesPerBlock > 8192) {
throw rte("Unsensible number of samples per block chosen");
}
if (samplerate() < 10000 || samplerate() > 51000) {
throw rte("Invalid sample rate");
}
std::vector<DeviceInfo> devs;
DaqDeviceDescriptor devdescriptors[MAX_ULDAQ_DEV_COUNT_PER_API];
fillUlDaqDeviceInfo(devs, static_cast<void *>(devdescriptors));
if (devs.size() == 0) {
throw rte("Unable to find any UlDaq devices");
}
if (devinfo.api_specific_devindex < 0 ||
devinfo.api_specific_devindex >= (int) MAX_ULDAQ_DEV_COUNT_PER_API) {
throw rte("Invalid device index");
}
// get a handle to the DAQ device associated with the first descriptor
_handle = ulCreateDaqDevice(devdescriptors[devinfo.api_specific_devindex]);
if (_handle == 0) {
throw rte("Unable to create a handle to the specified DAQ "
"device. Is the device currently in use? Please make sure to set "
"the DAQ configuration in duplex mode if simultaneous input and "
"output is required.");
}
UlError err = ulConnectDaqDevice(_handle);
if (err != ERR_NO_ERROR) {
ulReleaseDaqDevice(_handle);
_handle = 0;
throw rte("Unable to connect to device: " + getErrMsg(err));
}
/// Loop over input channels, set parameters
for (us ch = 0; ch < 4; ch++) {
err = ulAISetConfigDbl(_handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0);
showErr(err);
if (err != ERR_NO_ERROR) {
throw rte("Fatal: could normalize channel sensitivity");
}
CouplingMode cm = inchannel_config.at(ch).ACCouplingMode ? CM_AC : CM_DC;
err = ulAISetConfig(_handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set AC/DC coupling mode");
}
IepeMode iepe =
inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED;
err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Fatal: could not set IEPE mode");
}
}
}
bool DT9837A::isRunning() const {
DEBUGTRACE_ENTER;
return _thread.joinable();
}
void DT9837A::stop() {
DEBUGTRACE_ENTER;
StreamStatus status = _streamStatus;
if (!isRunning()) {
throw rte("No data acquisition running");
}
_stopThread = true;
if (_thread.joinable()) {
_thread.join();
}
_stopThread = false;
status.isRunning = false;
_streamStatus = status;
}
/**
* @brief Throws an exception in case it happens. Does nothing in case of no
* error.
*
* @param e
*/
inline void throwUlException(UlError err) {
if (err == ERR_NO_ERROR) {
return;
}
string errstr = getErrMsg(err);
showErr(errstr);
Daq::StreamStatus::StreamError serr;
if ((int)err < 5) {
serr = Daq::StreamStatus::StreamError::logicError;
} else if ((int)err < 9) {
serr = Daq::StreamStatus::StreamError::systemError;
} else if ((int)err < 18) {
serr = Daq::StreamStatus::StreamError::logicError;
} else if ((int)err == 18) {
serr = Daq::StreamStatus::StreamError::inputXRun;
} else if ((int)err == 19) {
serr = Daq::StreamStatus::StreamError::outputXRun;
} else {
serr = Daq::StreamStatus::StreamError::driverError;
}
throw Daq::StreamException(serr, errstr);
}
void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
if (isRunning()) {
throw rte("DAQ is already running");
}
if (neninchannels() > 0) {
if (!inCallback)
throw rte("DAQ requires a callback for input data");
}
if (nenoutchannels() > 0) {
if (!outCallback)
throw rte("DAQ requires a callback for output data");
}
assert(neninchannels() + nenoutchannels() > 0);
_thread = std::thread(&DT9837A::threadFcn, this, inCallback, outCallback);
}
InBufHandler::InBufHandler(DT9837A &daq, InDaqCallback cb)
: BufHandler(daq, daq.neninchannels()), cb(cb)
{
DEBUGTRACE_ENTER;
assert(daq.getHandle() != 0);
monitorOutput = daq.monitorOutput;
DaqInScanFlag inscanflags = DAQINSCAN_FF_DEFAULT;
ScanOption scanoptions = SO_CONTINUOUS;
UlError err = ERR_NO_ERROR;
std::vector<DaqInChanDescriptor> indescs;
boolvec eninchannels_without_mon = daq.eninchannels(false);
// Set ranges for each input. Below asks only channels that are not a
// monitor channel (hence the false flag).
dvec ranges = daq.inputRangeForEnabledChannels(false);
for (us chin = 0; chin < 4; chin++) {
if (eninchannels_without_mon[chin] == true) {
DaqInChanDescriptor indesc;
indesc.type = DAQI_ANALOG_SE;
indesc.channel = chin;
double rangeval = ranges.at(chin);
Range rangenum;
if (fabs(rangeval - 1.0) < 1e-8) {
rangenum = BIP1VOLTS;
} else if (fabs(rangeval - 10.0) < 1e-8) {
rangenum = BIP10VOLTS;
} else {
throw Daq::StreamException(Daq::StreamStatus::StreamError::logicError);
std::cerr << "Fatal: input range value is invalid" << endl;
return;
}
indesc.range = rangenum;
indescs.push_back(indesc);
}
}
// Add possibly last channel as monitor
if (monitorOutput) {
DaqInChanDescriptor indesc;
indesc.type = DAQI_DAC;
indesc.channel = 0;
/// The output only has a range of 10V, therefore the monitor of the
/// output also has to be set to this value.
indesc.range = BIP10VOLTS;
indescs.push_back(indesc);
}
assert(indescs.size() == nchannels);
DEBUGTRACE_MESSAGE("Starting input scan");
err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels,
2 * nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, inscanflags, buf.data());
throwUlException(err);
}
void InBufHandler::start() {
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus);
throwUlException(err);
totalFramesCount = transferStatus.currentTotalCount;
topenqueued = true;
botenqueued = true;
}
bool InBufHandler::operator()() {
/* DEBUGTRACE_ENTER; */
bool ret = true;
auto runCallback = ([&](us totalOffset) {
/* DEBUGTRACE_ENTER; */
DaqData data(nFramesPerBlock, nchannels,
DataTypeDescriptor::DataType::dtype_fl64);
us monitorOffset = monitorOutput ? 1 : 0;
/* /// Put the output monitor in front */
if (monitorOutput) {
for (us frame = 0; frame < nFramesPerBlock; frame++) {
data.value<double>(frame, 0) =
buf[totalOffset + (frame * nchannels) + (nchannels - 1)];
}
}
for (us channel = 0; channel < nchannels - monitorOffset; channel++) {
/* DEBUGTRACE_PRINT(channel); */
for (us frame = 0; frame < nFramesPerBlock; frame++) {
data.value<double>(frame, channel + monitorOffset) =
buf[totalOffset + (frame * nchannels) + channel];
}
}
return cb(data);
});
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus);
throwUlException(err);
increment = transferStatus.currentTotalCount - totalFramesCount;
totalFramesCount += increment;
if (increment > nFramesPerBlock) {
throw Daq::StreamException(Daq::StreamStatus::StreamError::inputXRun);
}
assert(status == SS_RUNNING);
if (transferStatus.currentIndex < (long long)buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
ret = runCallback(nchannels * nFramesPerBlock);
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
ret = runCallback(0);
topenqueued = true;
}
}
return ret;
}
InBufHandler::~InBufHandler() {
// At exit of the function, stop scanning.
DEBUGTRACE_ENTER;
UlError err = ulDaqInScanStop(daq.getHandle());
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
OutBufHandler::OutBufHandler(DT9837A &daq, OutDaqCallback cb)
: BufHandler(daq, daq.nenoutchannels()), cb(cb) {
DEBUGTRACE_MESSAGE("Starting output scan");
DEBUGTRACE_PRINT(nchannels);
AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT;
ScanOption scanoptions = SO_CONTINUOUS;
UlError err = ulAOutScan(daq.getHandle(), 0, 0, BIP10VOLTS,
2 * nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, outscanflags, buf.data());
throwUlException(err);
}
void OutBufHandler::start() {
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
throw rte("Unable to start output on DAQ");
}
if (status != SS_RUNNING) {
throw rte("Unable to start output on DAQ");
}
totalFramesCount = transferStatus.currentTotalCount;
topenqueued = true;
botenqueued = true;
}
bool OutBufHandler::operator()() {
/* DEBUGTRACE_ENTER; */
bool res = true;
assert(daq.getHandle() != 0);
UlError err = ERR_NO_ERROR;
ScanStatus status;
TransferStatus transferStatus;
err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus);
throwUlException(err);
if (status != SS_RUNNING) {
return false;
}
increment = transferStatus.currentTotalCount - totalFramesCount;
totalFramesCount += increment;
if (increment > nFramesPerBlock) {
throw Daq::StreamException(Daq::StreamStatus::StreamError::outputXRun);
}
if (transferStatus.currentIndex < buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
DaqData d(nFramesPerBlock, 1, DataTypeDescriptor::DataType::dtype_fl64);
res = cb(d);
d.copyToRaw(0, reinterpret_cast<byte_t *>(&(buf[buffer_mid_idx])));
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
DaqData d(nFramesPerBlock, 1, DataTypeDescriptor::DataType::dtype_fl64);
res = cb(d);
d.copyToRaw(0, reinterpret_cast<byte_t *>(&(buf[0])));
topenqueued = true;
}
}
return res;
}
OutBufHandler::~OutBufHandler() {
DEBUGTRACE_ENTER;
UlError err = ulAOutScanStop(daq.getHandle());
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
try {
std::unique_ptr<OutBufHandler> obh;
std::unique_ptr<InBufHandler> ibh;
StreamStatus status = _streamStatus;
status.isRunning = true;
_streamStatus = status;
if (nenoutchannels() > 0) {
assert(outCallback);
obh = std::make_unique<OutBufHandler>(*this, outCallback);
}
if (neninchannels() > 0) {
assert(inCallback);
ibh = std::make_unique<InBufHandler>(*this, inCallback);
}
if (obh)
obh->start();
if (ibh)
ibh->start();
const double sleeptime_s =
static_cast<double>(_nFramesPerBlock) / (16 * samplerate());
const us sleeptime_us = static_cast<us>(sleeptime_s * 1e6);
while (!_stopThread) {
if (ibh) {
if (!(*ibh)()) {
_stopThread = true;
}
}
if (obh) {
if (!(*obh)()) {
_stopThread = true;
}
} else {
std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us));
}
}
/// Update stream status that we are not running anymore
status.isRunning = false;
_streamStatus = status;
_stopThread = false;
} catch (StreamException &e) {
StreamStatus status = _streamStatus;
// Copy over error type
status.errorType = e.e;
_streamStatus = status;
/*
cerr << "\n******************\n";
cerr << "Catched error in UlDAQ thread: " << e.what() << endl;
cerr << "\n******************\n";
*/
}
}
#endif // LASP_HAS_ULDAQ

View File

@ -0,0 +1,118 @@
#pragma once
#include "lasp_daq.h"
#include <algorithm>
#include <cassert>
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <uldaq.h>
#include <vector>
using std::atomic;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
class DT9837A : public Daq {
DaqDeviceHandle _handle = 0;
std::mutex _daqmutex;
std::thread _thread;
atomic<bool> _stopThread{false};
atomic<StreamStatus> _streamStatus;
const us _nFramesPerBlock;
void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback);
public:
DaqDeviceHandle getHandle() const { return _handle; }
/**
* @brief Create a DT9837A instance.
*
* @param devinfo DeviceInfo to connect to
* @param config DaqConfiguration settings
*/
DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config);
virtual ~DT9837A();
bool isRunning() const;
void stop() override final;
friend class InBufHandler;
friend class OutBufHandler;
virtual void start(InDaqCallback inCallback,
OutDaqCallback outCallback) override final;
virtual StreamStatus getStreamStatus() const override {
return _streamStatus;
}
};
/**
* @brief Helper class for managing input and output samples of the DAQ device.
*/
class BufHandler {
protected:
DT9837A &daq;
const DataTypeDescriptor dtype_descr;
us nchannels, nFramesPerBlock;
double samplerate;
std::vector<double> buf;
bool topenqueued, botenqueued;
us increment = 0;
us totalFramesCount = 0;
long long buffer_mid_idx;
public:
BufHandler(DT9837A &daq, const us nchannels)
: daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels),
nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()),
buf(2 * nchannels *
nFramesPerBlock, // Watch the two here, the top and the bottom!
0),
buffer_mid_idx(nchannels * nFramesPerBlock) {
assert(nchannels > 0);
}
};
/**
* @brief Specific helper for the input buffer
*/
class InBufHandler : public BufHandler {
bool monitorOutput;
InDaqCallback cb;
public:
InBufHandler(DT9837A &daq, InDaqCallback cb);
void start();
/**
* @brief InBufHandler::operator()()
*
* @return true on success
*/
bool operator()();
~InBufHandler();
};
class OutBufHandler : public BufHandler {
OutDaqCallback cb;
public:
OutBufHandler(DT9837A &daq, OutDaqCallback cb);
void start();
/**
* @brief OutBufHandler::operator()
*
* @return true on success
*/
bool operator()();
~OutBufHandler();
};

View File

@ -135,7 +135,8 @@ class Recording:
f.attrs["channelNames"] = [ch.name for ch in in_ch] f.attrs["channelNames"] = [ch.name for ch in in_ch]
# Add the start delay here, as firstFrames() is called right after the # Add the start delay here, as firstFrames() is called right after the
# constructor is called. # constructor is called. time.time() returns a floating point
# number of seconds after epoch.
f.attrs["time"] = time.time() + self.startDelay f.attrs["time"] = time.time() + self.startDelay
# In V2, we do not store JSON metadata anymore, but just an enumeration # In V2, we do not store JSON metadata anymore, but just an enumeration
@ -143,6 +144,7 @@ class Recording:
f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch] f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch]
# Measured physical quantity metadata # Measured physical quantity metadata
# This was how it was in LASP version < 1.0
# f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch] # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch]
def firstFrames(self, adata): def firstFrames(self, adata):
@ -174,18 +176,15 @@ class Recording:
def inCallback(self, adata): def inCallback(self, adata):
""" """
This method should be called to grab data from the input queue, which This method is called when a block of audio data from the stream is
is filled by the stream, and put it into a file. It should be called at available. It should return either True or False.
a regular interval to prevent overflowing of the queue. It is called
within the start() method of the recording, if block is set to True.
Otherwise, it should be called from its parent at regular intervals.
For example, in Qt this can be done using a QTimer.
When returning False, it will stop the stream.
""" """
if self.stop(): if self.stop():
# Stop flag is raised. We do not add any data anymore. # Stop flag is raised. We do not add any data anymore.
return return True
with self.file_mtx: with self.file_mtx:

View File

@ -25,18 +25,17 @@ void init_daq(py::module &m) {
.value("driverError", Daq::StreamStatus::StreamError::driverError) .value("driverError", Daq::StreamStatus::StreamError::driverError)
.value("systemError", Daq::StreamStatus::StreamError::systemError) .value("systemError", Daq::StreamStatus::StreamError::systemError)
.value("threadError", Daq::StreamStatus::StreamError::threadError) .value("threadError", Daq::StreamStatus::StreamError::threadError)
.value("logicError", Daq::StreamStatus::StreamError::logicError) .value("logicError", Daq::StreamStatus::StreamError::logicError);
.value("apiSpecificError",
Daq::StreamStatus::StreamError::apiSpecificError);
ss.def("errorMsg", &Daq::StreamStatus::errorMsg); ss.def("errorMsg", &Daq::StreamStatus::errorMsg);
/// Daq /// Daq
daq.def("neninchannels", &Daq::neninchannels, py::arg("include_monitor") = true); daq.def("neninchannels", &Daq::neninchannels,
py::arg("include_monitor") = true);
daq.def("nenoutchannels", &Daq::nenoutchannels); daq.def("nenoutchannels", &Daq::nenoutchannels);
daq.def("samplerate", &Daq::samplerate); daq.def("samplerate", &Daq::samplerate);
daq.def("dataType", &Daq::dataType); daq.def("dataType", &Daq::dataType);
daq.def("framesPerBlock", &Daq::framesPerBlock); daq.def("framesPerBlock", &Daq::framesPerBlock);
daq.def("getStreamStatus", &Daq::getStreamStatus); daq.def("getStreamStatus", &Daq::getStreamStatus);
} }