Backend ready for some testing

This commit is contained in:
Anne de Jong 2022-06-13 21:30:02 +02:00
parent 7eaaf43653
commit 3b3bd6d83d
6 changed files with 297 additions and 228 deletions

View File

@ -11,17 +11,6 @@ DEBUGTRACE_VARIABLES;
#endif
using std::runtime_error;
DaqData::DaqData(const us nchannels, const us nframes,
const DataTypeDescriptor::DataType dtype)
: nchannels(nchannels), nframes(nframes), dtype(dtype),
dtype_descr(dtype_map.at(dtype)) {
static_assert(sizeof(char) == 1, "Invalid char size");
const DataTypeDescriptor &desc = dtype_map.at(dtype);
_data.resize(nframes * nchannels * desc.sw);
}
std::unique_ptr<Daq> Daq::createDaq(const DeviceInfo &devinfo,
const DaqConfiguration &config) {
DEBUGTRACE_ENTER;

View File

@ -1,50 +1,22 @@
#pragma once
#include <memory>
#include "lasp_config.h"
#include "lasp_daqconfig.h"
#include "lasp_daqdata.h"
#include "lasp_deviceinfo.h"
#include "lasp_types.h"
#include <functional>
#include <gsl/gsl-lite.hpp>
#include <memory>
#include <optional>
/**
* @brief Data coming from / going to DAQ. **Non-interleaved format**, which means
* data in buffer is ordered by channel: _ptr[sample+channel*nframes]
*/
class DaqData {
protected:
/**
* @brief Storage for actual data.
*/
std::vector<int8_t> _data;
public:
const us nchannels;
const us nframes;
const DataTypeDescriptor::DataType dtype;
const DataTypeDescriptor dtype_descr;
DaqData(const us nchannels, const us nframes,
const DataTypeDescriptor::DataType dtype_descr);
virtual ~DaqData() = default;
/**
* @brief Return reference to internal vector
*
* @return Reference to vector of data storage.
*/
std::vector<int8_t> &raw_vec() { return _data; }
us size_bytes() const { return _data.size(); }
};
/**
* @brief Callback of DAQ. Called with arguments of a vector of data
* spans for each channel, and a datatype descriptor. Callback should return
* @brief Callback of DAQ for input data. Callback should return
* false for a stop request.
*/
using DaqCallback =
std::function<bool(std::unique_ptr<DaqData>)>;
using InDaqCallback = std::function<bool(const DaqData&)>;
/**
* @brief
*/
using OutDaqCallback = std::function<bool(DaqData&)>;
/**
* @brief Base cass for all DAQ instances
@ -75,8 +47,8 @@ public:
* required, the return value of the function should be nullptr. If no input
* data is presented, the function is called with a nullptr as argument.
*/
virtual void start(std::optional<DaqCallback> inCallback,
std::optional<DaqCallback> outCallback);
virtual void start(std::optional<InDaqCallback> inCallback,
std::optional<OutDaqCallback> outCallback);
/**
* @brief Stop the Daq device. Throws an exception if the device is not

View File

@ -0,0 +1,31 @@
#include "lasp_daqdata.h"
#include "debugtrace.hpp"
#include <cassert>
DEBUGTRACE_VARIABLES;
DaqData::DaqData(const us nchannels, const us nframes,
const DataTypeDescriptor::DataType dtype)
: nchannels(nchannels), nframes(nframes), dtype(dtype),
dtype_descr(dtype_map.at(dtype)),
sw(dtype_descr.sw) {
static_assert(sizeof(char) == 1, "Invalid char size");
const DataTypeDescriptor &desc = dtype_map.at(dtype);
_data.resize(nframes * nchannels * desc.sw);
}
void DaqData::copyInFromRaw(const std::vector<uint8_t *> &ptrs) {
us ch = 0;
assert(ptrs.size() == nchannels);
for (auto ptr : ptrs) {
std::copy(ptr, ptr + sw * nframes, &_data[sw * ch * nframes]);
ch++;
}
}
void DaqData::copyToRaw(const us channel,uint8_t *ptr) {
std::copy(&_data[sw * nframes * channel],
&_data[sw * nframes * (channel + 1)], ptr);
}

View File

@ -0,0 +1,88 @@
#pragma once
#include "lasp_daqconfig.h"
#include "lasp_types.h"
#include <functional>
#include <gsl/gsl-lite.hpp>
#include <memory>
#include <optional>
/**
* @brief Data coming from / going to DAQ. **Non-interleaved format**, which
* means data in buffer is ordered by channel: _ptr[sample+channel*nframes]
*/
class DaqData {
protected:
/**
* @brief Storage for the actual data.
*/
std::vector<int8_t> _data;
public:
/**
* @brief The number of channels
*/
const us nchannels;
/**
* @brief The number of frames in this block of data.
*/
const us nframes;
const DataTypeDescriptor::DataType dtype;
const DataTypeDescriptor &dtype_descr;
/**
* @brief The number of bytes per sample (sample width, sw)
*/
const us sw;
DaqData(const us nchannels, const us nframes,
const DataTypeDescriptor::DataType dtype);
virtual ~DaqData() = default;
/**
* @brief Return reference to internal vector
*
* @return Reference to vector of data storage.
*/
int8_t *raw_ptr() { return _data.data(); }
/**
* @brief Return the total number of bytes
*
* @return Number of bytes of data.
*/
us size_bytes() const { return _data.size(); }
/**
* @brief Copy data from a set of raw pointers of *uninterleaved* data.
* Overwrites any existing available data.
*
* @param ptrs Pointers to data from channels
*/
void copyInFromRaw(const std::vector<uint8_t *> &ptrs);
/**
* @brief Copy contents of DaqData for a certain channel to a raw pointer.
*
* @param channel The channel to copy.
* @param ptr The pointer where data is copied to.
*/
void copyToRaw(const us channel, uint8_t *ptr);
};
template <typename T> class TypedDaqData : public DaqData {
public:
TypedDaqData(const us nchannels, const us nframes,
const DataTypeDescriptor::DataType dtype_descr)
: DaqData(nchannels, nframes, dtype_descr) {}
T &operator[](const us i) { return _data[this->sw * i]; }
T &operator()(const us sample, const us channel) {
return reinterpret_cast<T &>(_data[sw * (sample + channel * nframes)]);
}
void copyToRaw(const us channel, T *ptr) {
DaqData::copyToRaw(channel, reinterpret_cast<uint8_t *>(ptr));
}
};

View File

@ -116,6 +116,9 @@ class RtAudioDaq : public Daq {
RtAudioDaq(const RtAudioDaq &) = delete;
RtAudioDaq &operator=(const RtAudioDaq &) = delete;
InDaqCallback _incallback;
OutDaqCallback _outcallback;
public:
RtAudioDaq(const DeviceInfo &devinfo, const DaqConfiguration &config)
: Daq(devinfo, config),
@ -155,7 +158,7 @@ public:
}
RtAudio::StreamOptions streamoptions;
streamoptions.flags = RTAUDIO_HOG_DEVICE;
streamoptions.flags = RTAUDIO_HOG_DEVICE | RTAUDIO_NONINTERLEAVED;
streamoptions.numberOfBuffers = 2;
streamoptions.streamName = "RtAudio stream";
@ -196,148 +199,130 @@ public:
&myerrorcallback);
}
virtual void start(std::optional<InDaqCallback> inCallback,
std::optional<OutDaqCallback> outCallback) override {
assert(!monitorOutput);
if (isRunning()) {
throw runtime_error("Stream already running");
}
// Logical XOR
if (!inCallback != !outCallback) {
throw runtime_error("Either input or output stream possible for RtAudio. "
"Stream duplex mode not provided.");
}
if (inCallback) {
_incallback = *inCallback;
if (!neninchannels()) {
throw runtime_error(
"Input callback given, but stream does not provide input data");
}
}
if (outCallback) {
_outcallback = *outCallback;
if (!nenoutchannels()) {
throw runtime_error(
"Output callback given, but stream does not provide output data");
}
}
rtaudio.startStream();
}
bool isRunning() const override { return (rtaudio.isStreamRunning()); }
void stop() override {
if (!isRunning()) {
/* cerr << "Stream is already stopped" << endl; */
} else {
rtaudio.stopStream();
}
}
int streamCallback(void *outputBuffer, void *inputBuffer,
unsigned int nFrames, double streamTime,
RtAudioStreamStatus status) {
auto dtype = dataType();
us neninchannels_inc_mon = neninchannels();
const auto &dtype_descr = DataTypeDescriptor();
const auto dtype = dataType();
us neninchannels = this->neninchannels();
us nenoutchannels = this->nenoutchannels();
us bytesperchan = dtype_map.at(dtype).sw * nFrames;
us monitorOffset = ((us)monitorOutput) * bytesperchan;
us sw = dtype_descr.sw;
if (nFrames != nFramesPerBlock) {
cerr << "RtAudio backend error: nFrames does not match block size!"
<< endl;
return 1;
}
if (inputBuffer) {
u_int8_t *inbuffercpy =
(u_int8_t *)malloc(bytesperchan * neninchannels_inc_mon);
if (inputBuffer) {
us j = 0; // OUR buffer channel counter
us i = 0; // RtAudio channel counter
for (int ch = getLowestInChannel(); ch <= getHighestInChannel(); ch++) {
if (eninchannels[ch]) {
memcpy(&(inbuffercpy[monitorOffset + j * bytesperchan]),
&(inputBuffer[i * bytesperchan]), bytesperchan);
j++;
}
i++;
std::vector<uint8_t *> ptrs;
ptrs.reserve(neninchannels);
/* DaqData(neninchannels_inc_mon, nFramesPerBlock, dtype); */
for (int ch = getLowestInChannel(); ch <= getHighestInChannel(); ch++) {
if (eninchannels[ch]) {
ptrs.push_back(&static_cast<uint8_t *>(
inputBuffer)[sw * ninchannels * ch * nFramesPerBlock]);
}
}
DaqData d{neninchannels, nFramesPerBlock, dtype};
d.copyInFromRaw(ptrs);
bool ret = _incallback(d);
if (!ret) {
return 1;
}
}
if (outputBuffer) {
if (!outqueue->empty()) {
u_int8_t *outbuffercpy = (u_int8_t *)outqueue->dequeue();
us j = 0; // OUR buffer channel counter
us i = 0; // RtAudio channel counter
for (us ch = 0; ch <= daq->getHighestOutChannel(); ch++) {
/* cerr << "Copying from queue... " << endl; */
if (enoutchannels[ch]) {
memcpy(&(outputBuffer[i * bytesperchan]),
&(outbuffercpy[j * bytesperchan]), bytesperchan);
j++;
} else {
/* cerr << "unused output channel in list" << endl; */
memset(&(outputBuffer[i * bytesperchan]), 0, bytesperchan);
}
i++;
std::vector<uint8_t *> ptrs;
ptrs.reserve(neninchannels);
DaqData data(nenoutchannels, nFramesPerBlock, dtype);
/* outCallback */
for (int ch = 0; ch <= getHighestOutChannel(); ch++) {
if (enoutchannels[ch]) {
ptrs.push_back(&static_cast<uint8_t *>(
outputBuffer)[sw * nenoutchannels * ch * nFramesPerBlock]);
}
if (!monitorOutput) {
free(outbuffercpy);
} else {
assert(outDelayqueue);
outDelayqueue->enqueue((void *)outbuffercpy);
}
} else {
cerr << "RtAudio backend: stream output buffer underflow!" << endl;
}
DaqData d{nenoutchannels, nFramesPerBlock, dtype};
bool ret = _outcallback(d);
if (!ret) {
return 1;
}
us j = 0;
for (auto ptr : ptrs) {
d.copyToRaw(j, ptr);
j++;
}
}
return 0;
}
~RtAudioDaq() {
if (isRunning()) {
stop();
}
if (rtaudio.isStreamOpen()) {
rtaudio.closeStream();
}
}
};
std::unique_ptr<Daq> createRtAudioDevice(const DeviceInfo &devinfo,
const DaqConfiguration &config) {
return std::make_unique<RtAudioDaq>(devinfo, config);
}
virtual void
start(std::optional<DaqCallback> inCallback,
std::optional<DaqCallback> outCallback) {
assert(!monitorOutput);
if (isRunning()) {
throw runtime_error("Stream already running");
}
if (neninchannels(false) > 0 && !inqueue) {
throw runtime_error("inqueue argument not given");
}
if (nenoutchannels() > 0 && !outqueue) {
throw runtime_error("outqueue argument not given");
}
assert(rtaudio);
rtaudio->startStream();
void myerrorcallback(RtAudioError::Type, const string &errorText) {
cerr << errorText << endl;
}
int mycallback(void *outputBuffer, void *inputBuffer, unsigned int nFrames,
double streamTime, RtAudioStreamStatus status, void *userData) {
void stop() {
if (!isRunning()) {
cerr << "Stream is already stopped" << endl;
} else {
assert(rtaudio);
rtaudio->stopStream();
}
if (inqueue) {
inqueue = nullptr;
}
if (outqueue) {
outqueue = nullptr;
}
if (outDelayqueue) {
delete outDelayqueue;
outDelayqueue = nullptr;
}
return static_cast<RtAudioDaq *>(userData)->streamCallback(
outputBuffer, inputBuffer, nFrames, streamTime, status);
}
bool isRunning() const { return (rtaudio && rtaudio->isStreamRunning()); }
~RtAudioDaq() {
assert(rtaudio);
if (isRunning()) {
stop();
}
if (rtaudio->isStreamOpen()) {
rtaudio->closeStream();
}
}
}
;
Daq *createRtAudioDevice(const DeviceInfo &devinfo,
const DaqConfiguration &config) {
AudioDaq *daq = NULL;
try {
daq = new AudioDaq(devinfo, config);
} catch (runtime_error &e) {
if (daq)
delete daq;
throw;
}
return daq;
}
int mycallback(void *outputBuffervoid, void *inputBuffervoid,
unsigned int nFrames, double streamTime,
RtAudioStreamStatus status, void *userData) {
void myerrorcallback(RtAudioError::Type, const string &errorText) {
cerr << errorText << endl;
}
int mycallback(void *outputBuffer, void *inputBuffer, unsigned int nFrames,
double streamTime, RtAudioStreamStatus status,
void *userData) {
return static_cast<RtAudioDaq *>(userData)->streamCallback(
outputBuffer, inputBuffer, nFrames, streamTime, status);
}
#endif // LASP_HAS_RTAUDIO == 1

View File

@ -59,8 +59,8 @@ class DT9837A : public Daq {
const us _nFramesPerBlock;
void threadFcn(std::optional<DaqCallback> inCallback,
std::optional<DaqCallback> outcallback);
void threadFcn(std::optional<InDaqCallback> inCallback,
std::optional<OutDaqCallback> outcallback);
public:
DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config);
@ -80,8 +80,8 @@ public:
}
bool isRunning() const override final { return _thread.joinable(); }
virtual void start(std::optional<DaqCallback> inCallback,
std::optional<DaqCallback> outCallback) override final;
virtual void start(std::optional<InDaqCallback> inCallback,
std::optional<OutDaqCallback> outCallback) override final;
void stop() override final {
DEBUGTRACE_ENTER;
@ -99,8 +99,8 @@ public:
friend class OutBufHandler;
};
void DT9837A::start(std::optional<DaqCallback> inCallback,
std::optional<DaqCallback> outCallback) {
void DT9837A::start(std::optional<InDaqCallback> inCallback,
std::optional<OutDaqCallback> outCallback) {
DEBUGTRACE_ENTER;
if (isRunning()) {
throw runtime_error("DAQ is already running");
@ -122,7 +122,6 @@ protected:
DaqDeviceHandle _handle;
const DataTypeDescriptor dtype_descr;
us nchannels, nFramesPerBlock;
DaqCallback cb;
double samplerate;
dvec buf;
bool topenqueued, botenqueued;
@ -132,10 +131,10 @@ protected:
public:
BufHandler(DaqDeviceHandle handle, const DataTypeDescriptor dtype_descr,
const us nchannels, const us nFramesPerBlock, DaqCallback cb,
const us nchannels, const us nFramesPerBlock,
const double samplerate)
: _handle(handle), dtype_descr(dtype_descr), nchannels(nchannels),
nFramesPerBlock(nFramesPerBlock), cb(cb), samplerate(samplerate),
nFramesPerBlock(nFramesPerBlock), samplerate(samplerate),
buf(2 * nchannels *
nFramesPerBlock, // Watch the two here, the top and the bottom!
0),
@ -145,11 +144,13 @@ public:
};
class InBufHandler : public BufHandler {
bool monitorOutput;
InDaqCallback cb;
public:
InBufHandler(DT9837A &daq, DaqCallback cb)
InBufHandler(DT9837A &daq, InDaqCallback cb)
: BufHandler(daq._handle, daq.dtypeDescr(), daq.neninchannels(),
daq._nFramesPerBlock, cb, daq.samplerate())
daq._nFramesPerBlock, daq.samplerate()),
cb(cb)
{
DEBUGTRACE_ENTER;
@ -210,31 +211,29 @@ public:
bool operator()() {
bool ret = true;
auto runCallback = ([&](us totalOffset) {
us monitoroffset = monitorOutput ? 1 : 0;
DaqData data(nchannels, nFramesPerBlock,
DataTypeDescriptor::DataType::dtype_fl64);
us ch_no = 0;
TypedDaqData<double> data(nchannels, nFramesPerBlock,
DataTypeDescriptor::DataType::dtype_fl64);
if (monitorOutput) {
reinterpret_cast<uint8_t *>(
&buf[totalOffset + (nchannels - 1) * nFramesPerBlock]),
nFramesPerBlock * sizeof(double));
for (us sample = 0; sample < nFramesPerBlock; sample++) {
data(0, sample) =
buf[totalOffset + (sample * nchannels) + (nchannels - 1)];
}
}
/* if(mon */
/* for (us channel = monitoroffset; channel < (nchannels - monitoroffset);
*/
/* channel++) { */
/* cv[channel] = */
/* gsl::span(reinterpret_cast<uint8_t *>( */
/* &buf[totalOffset + channel * nFramesPerBlock]), */
/* nFramesPerBlock * sizeof(double)); */
/* } */
/* cv[0] = gsl::span( */
/* cb(cv, dtype_descr); */
for (us channel = monitoroffset; channel < (nchannels - monitoroffset);
channel++) {
for (us sample = 0; sample < nFramesPerBlock; sample++) {
data(channel, sample) =
buf[totalOffset + (sample * nchannels) + channel];
}
}
return cb(data);
});
ScanStatus status;
@ -243,7 +242,7 @@ public:
UlError err = ulDaqInScanStatus(_handle, &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
return;
return false;
}
increment = transferStatus.currentTotalCount - totalFramesCount;
@ -251,23 +250,24 @@ public:
if (increment > nFramesPerBlock) {
cerr << "Error: overrun for input of DAQ!" << endl;
return;
return false;
}
assert(status == SS_RUNNING);
if (transferStatus.currentIndex < (long long)buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
runCallback(nchannels * nFramesPerBlock);
ret = runCallback(nchannels * nFramesPerBlock);
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
runCallback(0);
ret = runCallback(0);
topenqueued = true;
}
}
return ret;
}
~InBufHandler() {
// At exit of the function, stop scanning.
@ -280,10 +280,13 @@ public:
};
class OutBufHandler : public BufHandler {
OutDaqCallback cb;
public:
OutBufHandler(DT9837A &daq, DaqCallback cb)
OutBufHandler(DT9837A &daq, OutDaqCallback cb)
: BufHandler(daq._handle, daq.dtypeDescr(), daq.neninchannels(),
daq._nFramesPerBlock, cb, daq.samplerate()) {
daq._nFramesPerBlock, daq.samplerate()),
cb(cb) {
DEBUGTRACE_MESSAGE("Starting output scan");
AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT;
@ -298,14 +301,10 @@ public:
}
botenqueued = false, topenqueued = true;
// Run callback to first fill top part
ChannelView cv{gsl::span(reinterpret_cast<uint8_t *>(&buf[0]),
nFramesPerBlock * sizeof(double))};
cb(cv, dtype_descr);
}
void operator()() {
bool operator()() {
bool res = true;
assert(_handle != 0);
UlError err = ERR_NO_ERROR;
@ -316,40 +315,41 @@ public:
err = ulAOutScanStatus(_handle, &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
return;
return false;
}
if (status != SS_RUNNING) {
return;
return false;
}
increment = transferStatus.currentTotalCount - totalFramesCount;
totalFramesCount += increment;
if (increment > nFramesPerBlock) {
cerr << "Error: underrun for output of DAQ!" << endl;
return;
return false;
}
if (transferStatus.currentIndex < buffer_mid_idx) {
topenqueued = false;
if (!botenqueued) {
TypedDaqData<double> d(1, nFramesPerBlock,
DataTypeDescriptor::DataType::dtype_fl64);
res = cb(d);
d.copyToRaw(0, &buf[buffer_mid_idx]);
ChannelView cv{
gsl::span(reinterpret_cast<uint8_t *>(&buf[buffer_mid_idx]),
nFramesPerBlock * sizeof(double))};
cb(cv, dtype_descr);
botenqueued = true;
}
} else {
botenqueued = false;
if (!topenqueued) {
ChannelView cv{gsl::span(reinterpret_cast<uint8_t *>(&buf[0]),
nFramesPerBlock * sizeof(double))};
cb(cv, dtype_descr);
TypedDaqData<double> d(1, nFramesPerBlock,
DataTypeDescriptor::DataType::dtype_fl64);
res = cb(d);
d.copyToRaw(0, buf.data());
topenqueued = true;
}
}
return res;
}
~OutBufHandler() {
@ -361,8 +361,8 @@ public:
}
};
void DT9837A::threadFcn(std::optional<DaqCallback> inCallback,
std::optional<DaqCallback> outCallback) {
void DT9837A::threadFcn(std::optional<InDaqCallback> inCallback,
std::optional<OutDaqCallback> outCallback) {
DEBUGTRACE_ENTER;
std::unique_ptr<InBufHandler> ibh;
@ -381,10 +381,14 @@ void DT9837A::threadFcn(std::optional<DaqCallback> inCallback,
while (!_stopThread) {
if (ibh) {
(*ibh)();
if (!(*ibh)()) {
_stopThread = true;
}
}
if (obh) {
(*obh)();
if (!(*obh)()) {
_stopThread = true;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us));
}