lasp/lasp/device/lasp_cppuldaq.cpp

637 lines
18 KiB
C++

#include "lasp_cppuldaq.h"
#include "lasp_config.h"
#include "lasp_tracer.h"
#include <atomic>
#include <cassert>
#include <chrono>
#include <iostream>
#include <thread>
#include <uldaq.h>
#include <vector>
using std::atomic;
/* using std::this_thread; */
const us MAX_DEV_COUNT_PER_API = 100;
const us UL_ERR_MSG_LEN = 512;
inline void showErr(UlError err) {
if (err != ERR_NO_ERROR) {
char errmsg[UL_ERR_MSG_LEN];
ulGetErrMsg(err, errmsg);
std::cerr << "UlError: " << errmsg << std::endl;
}
}
class DT9837A;
void threadfcn(DT9837A *td);
class DT9837A : public Daq {
atomic<bool> stopThread;
DaqDeviceHandle handle = 0;
std::thread *thread = NULL;
SafeQueue<void *> *inqueue = NULL;
SafeQueue<void *> *outqueue = NULL;
double *inbuffer = NULL;
double *outbuffer = NULL;
us nFramesPerBlock;
public:
DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config)
: Daq(devinfo, config) {
// Some sanity checks
if (eninchannels.size() != 4) {
throw runtime_error("Invalid length of enabled inChannels vector");
}
if (enoutchannels.size() != 1) {
throw runtime_error("Invalid length of enabled outChannels vector");
}
stopThread = false;
nFramesPerBlock = availableFramesPerBlock[framesPerBlockIndex];
if (nFramesPerBlock < 24 || nFramesPerBlock > 8192) {
throw runtime_error("Unsensible number of samples per block chosen");
}
if (samplerate() < 10000 || samplerate() > 51000) {
throw runtime_error("Invalid sample rate");
}
DaqDeviceDescriptor devdescriptors[MAX_DEV_COUNT_PER_API];
DaqDeviceDescriptor descriptor;
DaqDeviceInterface interfaceType = ANY_IFC;
UlError err;
us numdevs = MAX_DEV_COUNT_PER_API;
err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, (unsigned*) &numdevs);
if (err != ERR_NO_ERROR) {
throw runtime_error("Device inventarization failed");
}
if ((us) api_specific_devindex >= numdevs) {
throw runtime_error("Device number {deviceno} too high {err}. This could "
"happen when the device is currently not connected");
}
descriptor = devdescriptors[api_specific_devindex];
// get a handle to the DAQ device associated with the first descriptor
handle = ulCreateDaqDevice(descriptor);
if (handle == 0) {
throw runtime_error("Unable to create a handle to the specified DAQ "
"device. Is the device currently in use? Please make sure to set "
"the DAQ configuration in duplex mode if simultaneous input and "
"output is required.");
}
err = ulConnectDaqDevice(handle);
if (err != ERR_NO_ERROR) {
showErr(err);
ulReleaseDaqDevice(handle);
handle = 0;
throw runtime_error("Unable to connect to device: {err}");
}
for (us ch = 0; ch < 4; ch++) {
err = ulAISetConfigDbl(handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, ch, 1.0);
showErr(err);
if (err != ERR_NO_ERROR) {
throw runtime_error("Fatal: could normalize channel sensitivity");
}
CouplingMode cm = inputACCouplingMode[ch] ? CM_AC : CM_DC;
err = ulAISetConfig(handle, AI_CFG_CHAN_COUPLING_MODE, ch, cm);
if (err != ERR_NO_ERROR) {
showErr(err);
throw runtime_error("Fatal: could not set AC/DC coupling mode");
}
IepeMode iepe = inputIEPEEnabled[ch] ? IEPE_ENABLED : IEPE_DISABLED;
err = ulAISetConfig(handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe);
if (err != ERR_NO_ERROR) {
showErr(err);
throw runtime_error("Fatal: could not set IEPE mode");
}
}
}
DT9837A(const DT9837A &) = delete;
~DT9837A() {
UlError err;
if (isRunning()) {
stop();
}
if (handle) {
err = ulDisconnectDaqDevice(handle);
showErr(err);
err = ulReleaseDaqDevice(handle);
showErr(err);
}
}
bool isRunning() const { return bool(thread); }
void start(SafeQueue<void *> *inqueue, SafeQueue<void *> *outqueue) {
if (isRunning()) {
throw runtime_error("Thread is already running");
}
bool hasinput = neninchannels() > 0;
bool hasoutput = nenoutchannels() > 0;
if (hasinput && !inqueue) {
throw runtime_error("Inqueue not given, while input is enabled");
}
if (hasoutput && !outqueue) {
throw runtime_error("outqueue not given, while output is enabled");
}
if (hasinput) {
assert(!inbuffer);
inbuffer =
new double[neninchannels() * nFramesPerBlock * 2]; // Watch the 2!
}
if (hasoutput) {
assert(!outbuffer);
outbuffer =
new double[nenoutchannels() * nFramesPerBlock * 2]; // Watch the 2!
}
this->inqueue = inqueue;
this->outqueue = outqueue;
stopThread = false;
thread = new std::thread(threadfcn, this);
}
void stop() {
if (!isRunning()) {
throw runtime_error("No data acquisition running");
}
assert(thread);
stopThread = true;
thread->join();
delete thread;
thread = NULL;
outqueue = NULL;
inqueue = NULL;
if (inbuffer) {
delete inbuffer;
inbuffer = nullptr;
}
if (outbuffer) {
delete outbuffer;
outbuffer = nullptr;
}
}
friend void threadfcn(DT9837A *);
};
/**
* @brief Create an empty buffer and fill it with zeros.
*
* @param size The number of elements in the array
*
* @return Pointer to the array
*/
static double* createZeroBuffer(size_t size) {
double* buf = static_cast<double *>(
malloc(sizeof(double) * size));
for (us sample = 0; sample < size; sample++) {
buf[sample] = 0;
}
return buf;
}
/**
* @brief Copy samples from one linear array to the next.
*
* @param[in] from Buffer to copy from
* @param[out] to Buffer to copy to
* @param startFrom The position to start in the from-buffer
* @param startTo The position to start in the to-buffer
* @param N The number of samples to copy.
*/
static inline void copySamples(double* from,double* to,
const us startFrom,const us startTo,const us N) {
for (us sample = 0; sample < N; sample++) {
to[startTo + sample] = from[startFrom + sample];
}
}
void threadfcn(DT9837A *td) {
std::cerr << "Starting DAQ Thread fcn" << endl;
const us nenoutchannels = td->nenoutchannels();
us neninchannels = td->neninchannels();
const us nFramesPerBlock = td->nFramesPerBlock;
const bool hasinput = neninchannels > 0;
const bool hasoutput = nenoutchannels > 0;
bool monitorOutput = td->monitorOutput;
double *inbuffer = td->inbuffer;
double *outbuffer = td->outbuffer;
SafeQueue<void *> *inqueue = td->inqueue;
SafeQueue<void *> *outqueue = td->outqueue;
ScanStatus inscanstat;
ScanStatus outscanstat;
TransferStatus inxstat, outxstat;
double samplerate = td->samplerate();
const double sleeptime = ((double)nFramesPerBlock) / (4 * samplerate);
const us sleeptime_us = (us)(sleeptime * 1e6);
/* cerr << "Sleep time in loop: " << sleeptime_us << "us." << endl; */
if (sleeptime_us < 10) {
cerr << "ERROR: Too small buffer size (nFramesPerBlock) chosen!" << endl;
return;
}
const us buffer_mid_idx_in = neninchannels * nFramesPerBlock;
const us buffer_mid_idx_out = nenoutchannels * nFramesPerBlock;
DaqDeviceHandle handle = td->handle;
assert(handle);
DaqInScanFlag inscanflags = DAQINSCAN_FF_DEFAULT;
AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT;
ScanOption scanoptions = SO_CONTINUOUS;
UlError err = ERR_NO_ERROR;
DaqInChanDescriptor *indesc = NULL;
bool topinenqueued = true;
// Start with true here, to not copy here the first time
bool botinenqueued = true;
bool topoutenqueued = true;
bool botoutenqueued = true;
size_t inTotalCount = 0;
size_t outTotalCount = 0;
// initialize output, if any
if (hasoutput) {
assert(nenoutchannels == 1);
assert(outqueue);
// Initialize the buffer with zeros, before pushing any data.
for (us sample = 0; sample < 2 * nFramesPerBlock; sample++) {
outbuffer[sample] = 0;
}
cerr << "Starting output DAC" << endl;
err = ulAOutScan(handle, 0, 0, BIP10VOLTS,
/* BIP60VOLTS, */
2 * td->nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, outscanflags, outbuffer);
if (err != ERR_NO_ERROR) {
showErr(err);
goto exit;
}
}
// Initialize input, if any
if (hasinput) {
indesc = new DaqInChanDescriptor[neninchannels];
us j = 0;
for (us chin = 0; chin < 4; chin++) {
if (td->eninchannels[chin] == true) {
indesc[j].type = DAQI_ANALOG_SE;
indesc[j].channel = chin;
double rangeval = td->inputRangeForChannel(chin);
Range rangenum;
if (abs(rangeval - 1.0) < 1e-8) {
rangenum = BIP1VOLTS;
} else if (abs(rangeval - 10.0) < 1e-8) {
rangenum = BIP10VOLTS;
} else {
std::cerr << "Fatal: input range value is invalid" << endl;
goto exit;
}
indesc[j].range = rangenum;
j++;
}
}
// Overwrite last channel
if (monitorOutput) {
indesc[j].type = DAQI_DAC;
indesc[j].channel = 0;
indesc[j].range = BIP10VOLTS;
j++;
}
assert(j == neninchannels);
cerr << "Starting input ADC" << endl;
err = ulDaqInScan(handle, indesc, neninchannels,
2 * td->nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, inscanflags, inbuffer);
if (err != ERR_NO_ERROR) {
showErr(err);
goto exit;
}
}
// Runs scan status on output, to catch up with position
if (hasoutput) {
err = ulAOutScanStatus(handle, &outscanstat, &outxstat);
if (err != ERR_NO_ERROR) {
showErr(err);
goto exit;
}
outTotalCount = outxstat.currentScanCount;
assert(outscanstat == SS_RUNNING);
}
/* std::cerr << "Entering while loop" << endl; */
/* std::cerr << "hasinput: " << hasinput << endl; */
while (!td->stopThread && err == ERR_NO_ERROR) {
/* std::cerr << "While..." << endl; */
if (hasoutput) {
err = ulAOutScanStatus(handle, &outscanstat, &outxstat);
if (err != ERR_NO_ERROR) {
showErr(err);
goto exit;
}
assert(outscanstat == SS_RUNNING);
if (outxstat.currentScanCount > outTotalCount + 2 * nFramesPerBlock) {
cerr << "***** WARNING: Missing output sample blocks, DAQ Scan count="
<< outxstat.currentScanCount
<< " while loop count = " << outTotalCount
<< ", probably due to too small buffer size. *****" << endl;
}
outTotalCount = outxstat.currentScanCount;
/* std::cerr << "Samples scanned: " << outxstat.currentTotalCount <<
* endl;
*/
if (outxstat.currentIndex < buffer_mid_idx_out) {
topoutenqueued = false;
if (!botoutenqueued) {
/* cerr << "Copying output buffer to bottom" << endl; */
double *bufcpy;
assert(nenoutchannels > 0);
if (!outqueue->empty()) {
bufcpy = (double *)outqueue->dequeue();
} else {
cerr << "******* WARNING: OUTPUTQUEUE UNDERFLOW, FILLING SIGNAL "
"QUEUE WITH ZEROS ***********"
<< endl;
bufcpy = createZeroBuffer(nFramesPerBlock*nenoutchannels);
}
copySamples(bufcpy, outbuffer, 0, buffer_mid_idx_out, nFramesPerBlock);
free(bufcpy);
botoutenqueued = true;
}
} else {
botoutenqueued = false;
if (!topoutenqueued) {
/* cerr << "Copying output buffer to top" << endl; */
double *bufcpy;
assert(nenoutchannels > 0);
if (!outqueue->empty()) {
bufcpy = (double *)outqueue->dequeue();
} else {
cerr << "******* WARNING: OUTPUTQUEUE UNDERFLOW, FILLING SIGNAL "
"QUEUE WITH ZEROS ***********"
<< endl;
bufcpy = createZeroBuffer(nFramesPerBlock*nenoutchannels);
}
copySamples(bufcpy, outbuffer, 0, 0, nFramesPerBlock);
free(bufcpy);
topoutenqueued = true;
}
}
}
if (hasinput) {
err = ulDaqInScanStatus(handle, &inscanstat, &inxstat);
if (err != ERR_NO_ERROR) {
showErr(err);
goto exit;
}
assert(inscanstat == SS_RUNNING);
if (inxstat.currentScanCount > inTotalCount + 2 * nFramesPerBlock) {
cerr << "***** ERROR: Missing input sample blocks, count="
<< inxstat.currentScanCount
<< ", probably due to too small buffer size. Exiting thread. "
"*****"
<< endl;
break;
}
inTotalCount = inxstat.currentScanCount;
if (inxstat.currentIndex < buffer_mid_idx_in) {
topinenqueued = false;
if (!botinenqueued) {
/* cerr << "Copying in buffer bot" << endl; */
double *bufcpy = static_cast<double *>(
malloc(sizeof(double) * nFramesPerBlock * neninchannels));
us monitoroffset = monitorOutput ? 1 : 0;
assert(neninchannels > 0);
for (us channel = 0; channel < (neninchannels - monitoroffset);
channel++) {
for (us sample = 0; sample < nFramesPerBlock; sample++) {
bufcpy[(monitoroffset + channel) * nFramesPerBlock + sample] =
inbuffer[buffer_mid_idx_in + sample * neninchannels +
channel];
}
}
if (monitorOutput) {
// Monitor output goes to first channel, that is
// our convention
us channel = neninchannels - 1;
for (us sample = 0; sample < nFramesPerBlock; sample++) {
bufcpy[sample] = inbuffer[buffer_mid_idx_in +
sample * neninchannels + channel];
}
}
inqueue->enqueue((void *)bufcpy);
botinenqueued = true;
}
} else {
botinenqueued = false;
if (!topinenqueued) {
double *bufcpy = static_cast<double *>(
malloc(sizeof(double) * nFramesPerBlock * neninchannels));
us monitoroffset = monitorOutput ? 1 : 0;
assert(neninchannels > 0);
for (us channel = 0; channel < (neninchannels - monitoroffset);
channel++) {
for (us sample = 0; sample < nFramesPerBlock; sample++) {
bufcpy[(monitoroffset + channel) * nFramesPerBlock + sample] =
inbuffer[sample * neninchannels + channel];
}
}
if (monitorOutput) {
// Monitor output goes to first channel, that is
// our convention
us channel = neninchannels - 1;
for (us sample = 0; sample < nFramesPerBlock; sample++) {
bufcpy[sample] = inbuffer[sample * neninchannels + channel];
}
}
/* cerr << "Copying in buffer top" << endl; */
inqueue->enqueue((void *)bufcpy);
topinenqueued = true;
}
}
}
std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us));
} // End of while loop
/* std::cerr << "Exit of while loop" << endl; */
exit:
if (hasoutput) {
ulAOutScanStop(handle);
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
if (hasinput) {
ulDaqInScanStop(handle);
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
if (indesc)
delete indesc;
std::cerr << "Exit of DAQ thread fcn" << endl;
}
Daq *createUlDaqDevice(const DeviceInfo &devinfo,
const DaqConfiguration &config) {
DT9837A *daq = NULL;
try {
daq = new DT9837A(devinfo, config);
} catch (runtime_error &e) {
if (daq)
delete daq;
throw;
}
return daq;
}
void fillUlDaqDeviceInfo(vector<DeviceInfo> &devinfolist) {
fsTRACE(15);
UlError err;
unsigned int numdevs = MAX_DEV_COUNT_PER_API;
DaqDeviceDescriptor devdescriptors[MAX_DEV_COUNT_PER_API];
DaqDeviceDescriptor descriptor;
DaqDeviceInterface interfaceType = ANY_IFC;
err = ulGetDaqDeviceInventory(interfaceType, devdescriptors,static_cast<unsigned*>(&numdevs));
if (err != ERR_NO_ERROR) {
throw std::runtime_error("UlDaq device inventarization failed");
}
for (unsigned i = 0; i < numdevs; i++) {
descriptor = devdescriptors[i];
DeviceInfo devinfo;
devinfo.api = uldaqapi;
string name, interface;
if (string(descriptor.productName) == "DT9837A") {
if (descriptor.devInterface == USB_IFC) {
name = "USB - ";
} else if (descriptor.devInterface == BLUETOOTH_IFC) {
/* devinfo. */
name = "Bluetooth - ";
} else if (descriptor.devInterface == ETHERNET_IFC) {
/* devinfo. */
name = "Ethernet - ";
}
name += string(descriptor.productName) + " ";
name += string(descriptor.uniqueId);
devinfo.device_name = name;
devinfo.api_specific_devindex = i;
devinfo.availableDataTypes.push_back(dtype_fl64);
devinfo.prefDataTypeIndex = 0;
devinfo.availableSampleRates.push_back(8000);
devinfo.availableSampleRates.push_back(10000);
devinfo.availableSampleRates.push_back(11025);
devinfo.availableSampleRates.push_back(16000);
devinfo.availableSampleRates.push_back(20000);
devinfo.availableSampleRates.push_back(22050);
devinfo.availableSampleRates.push_back(24000);
devinfo.availableSampleRates.push_back(32000);
devinfo.availableSampleRates.push_back(44056);
devinfo.availableSampleRates.push_back(44100);
devinfo.availableSampleRates.push_back(47250);
devinfo.availableSampleRates.push_back(48000);
devinfo.availableSampleRates.push_back(50000);
devinfo.availableSampleRates.push_back(50400);
devinfo.availableSampleRates.push_back(51000);
devinfo.prefSampleRateIndex = 11;
devinfo.availableFramesPerBlock.push_back(512);
devinfo.availableFramesPerBlock.push_back(1024);
devinfo.availableFramesPerBlock.push_back(2048);
devinfo.availableFramesPerBlock.push_back(4096);
devinfo.availableFramesPerBlock.push_back(8192);
devinfo.prefFramesPerBlockIndex = 2;
devinfo.availableInputRanges = {1.0, 10.0};
devinfo.prefInputRangeIndex = 0;
devinfo.ninchannels = 4;
devinfo.noutchannels = 1;
devinfo.hasInputIEPE = true;
devinfo.hasInputACCouplingSwitch = true;
devinfo.hasInputTrigger = true;
// Finally, this devinfo is pushed back in list
devinfolist.push_back(devinfo);
}
}
feTRACE(15);
}