Added Peak Programme Meter, added functionality to clone SeriesBiquads without copying state. Improved recording callback mechanism. Added reset() function for IndataHandlers, which send a pointer to DAQ instance.

This commit is contained in:
Anne de Jong 2022-10-04 09:27:27 +02:00
parent bb26fc6bcc
commit 5f1a207104
19 changed files with 541 additions and 165 deletions

View File

@ -10,10 +10,37 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"id": "ac06df04",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"make: Entering directory '/home/anne/wip/mycode/lasp'\n",
"make[1]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"\u001b[35m\u001b[1mConsolidate compiler generated dependencies of target lasp_dsp_lib\u001b[0m\n",
"make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
"[ 42%] Built target lasp_dsp_lib\n",
"make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"\u001b[35m\u001b[1mConsolidate compiler generated dependencies of target lasp_device_lib\u001b[0m\n",
"make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
"[ 67%] Built target lasp_device_lib\n",
"make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"\u001b[35m\u001b[1mConsolidate compiler generated dependencies of target lasp_cpp\u001b[0m\n",
"make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
"make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"[ 71%] \u001b[32mBuilding CXX object src/lasp/CMakeFiles/lasp_cpp.dir/pybind11/lasp_dsp_pybind.cpp.o\u001b[0m\n",
"[ 75%] \u001b[32m\u001b[1mLinking CXX shared module lasp_cpp.cpython-310-x86_64-linux-gnu.so\u001b[0m\n",
"make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
"[100%] Built target lasp_cpp\n",
"make[1]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
"make: Leaving directory '/home/anne/wip/mycode/lasp'\n"
]
}
],
"source": [
"# !make -j -C ~/wip/mycode/lasp"
]

View File

@ -1,16 +1,15 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include <cassert>
#include "lasp_daqdata.h"
#include "debugtrace.hpp"
#include "lasp_mathtypes.h"
#include <cassert>
DEBUGTRACE_VARIABLES;
DaqData::DaqData(const us nchannels, const us nframes,
const DataTypeDescriptor::DataType dtype)
: nchannels(nchannels), nframes(nframes), dtype(dtype),
dtype_descr(dtype_map.at(dtype)),
sw(dtype_descr.sw) {
dtype_descr(dtype_map.at(dtype)), sw(dtype_descr.sw) {
static_assert(sizeof(char) == 1, "Invalid char size");
const DataTypeDescriptor &desc = dtype_map.at(dtype);
@ -30,3 +29,78 @@ void DaqData::copyToRaw(const us channel,uint8_t *ptr) {
std::copy(&_data[sw * nframes * channel],
&_data[sw * nframes * (channel + 1)], ptr);
}
template <typename int_type> d convertToFloat(const int_type val) {
if constexpr (std::is_integral<int_type>::value) {
return static_cast<d>(val) / std::numeric_limits<int_type>::max();
} else {
return static_cast<d>(val);
}
}
template <typename int_type>
inline vd channelToFloat(const byte_t *vals, const us nframes) {
vd res(nframes);
for (us i = 0; i < nframes; i++) {
res(i) = convertToFloat(reinterpret_cast<const int_type *>(vals)[i]);
}
return res;
}
template <typename int_type>
inline dmat allToFloat(const byte_t *vals, const us nframes, const us nchannels) {
dmat res(nframes, nchannels);
for (us j = 0; j < nchannels; j++) {
for (us i = 0; i < nframes; i++) {
res(i, j) = convertToFloat(
reinterpret_cast<const int_type *>(vals)[i + j * nframes]);
}
}
return res;
}
vd DaqData::toFloat(const us channel_no) const {
using DataType = DataTypeDescriptor::DataType;
switch (dtype) {
case (DataType::dtype_int8): {
return channelToFloat<int8_t>(raw_ptr(0, channel_no), nframes);
} break;
case (DataType::dtype_int16): {
return channelToFloat<int16_t>(raw_ptr(0, channel_no), nframes);
} break;
case (DataType::dtype_int32): {
return channelToFloat<int32_t>(raw_ptr(0, channel_no), nframes);
} break;
case (DataType::dtype_fl32): {
return channelToFloat<float>(raw_ptr(0, channel_no), nframes);
} break;
case (DataType::dtype_fl64): {
return channelToFloat<double>(raw_ptr(0, channel_no), nframes);
} break;
default:
throw std::runtime_error("BUG");
} // End of switch
}
dmat DaqData::toFloat() const {
dmat result(nframes, nchannels);
using DataType = DataTypeDescriptor::DataType;
switch (dtype) {
case (DataType::dtype_int8): {
return allToFloat<int8_t>(raw_ptr(0), nframes, nchannels);
} break;
case (DataType::dtype_int16): {
return allToFloat<int16_t>(raw_ptr(0), nframes, nchannels);
} break;
case (DataType::dtype_int32): {
return allToFloat<int32_t>(raw_ptr(0), nframes, nchannels);
} break;
case (DataType::dtype_fl32): {
return allToFloat<float>(raw_ptr(0), nframes, nchannels);
} break;
case (DataType::dtype_fl64): {
return allToFloat<double>(raw_ptr(0), nframes, nchannels);
} break;
default:
throw std::runtime_error("BUG");
} // End of switch
}

View File

@ -4,10 +4,14 @@
#include <functional>
#include <gsl/gsl-lite.hpp>
#include <memory>
#include <armadillo>
/** \addtogroup device
* @{
*/
class Daq;
using byte_t = char;
/**
* @brief Data coming from / going to DAQ. **Non-interleaved format**, which
@ -18,7 +22,7 @@ protected:
/**
* @brief Storage for the actual data.
*/
std::vector<int8_t> _data;
std::vector<byte_t> _data;
public:
/**
@ -58,7 +62,8 @@ public:
/**
* @brief Initialize using no allocation
*/
DaqData() : DaqData(0, 0, DataTypeDescriptor::DataType::dtype_int8) {}
DaqData()
: DaqData(0, 0, DataTypeDescriptor::DataType::dtype_int8) {}
virtual ~DaqData() = default;
DaqData &operator=(const DaqData &) = default;
@ -70,7 +75,10 @@ public:
*
* @return Pointer to sample, not casted to final type
*/
int8_t *raw_ptr(const us frame = 0, const us channel = 0) {
byte_t *raw_ptr(const us frame = 0, const us channel = 0) {
return &(_data.data()[sw * (frame + channel * nframes)]);
}
const byte_t *raw_ptr(const us frame = 0, const us channel = 0) const {
return &(_data.data()[sw * (frame + channel * nframes)]);
}
@ -102,6 +110,26 @@ public:
* @param ptr The pointer where data is copied to.
*/
void copyToRaw(const us channel, uint8_t *ptr);
/**
* @brief Convert samples to floating point values and return a nframes x
* nchannels array of floats. For data that is not already floating-point,
* the data is scaled back from MAX_INT to +1.0.
*
* @return Array of floats
*/
arma::Mat<d> toFloat() const;
/**
* @brief Convert samples to floating point values and return a nframes
* column vector of floats. For data that is not already floating-point,
* the data is scaled back from MAX_INT to +1.0.
*
* @param channel The channel to convert
*
* @return Array of floats
*/
arma::Col<d> toFloat(const us channel_no) const;
};
template <typename T> class TypedDaqData : public DaqData {
@ -135,5 +163,6 @@ public:
void copyToRaw(const us channel, T *buffer) {
DaqData::copyToRaw(channel, reinterpret_cast<uint8_t *>(buffer));
}
};
/** @} */

View File

@ -275,6 +275,9 @@ void StreamMgr::startStream(const DaqConfiguration &config) {
if (isInput) {
_inputStream = std::move(daq);
for(auto& handler: _inDataHandlers) {
handler->reset(_inputStream.get());
}
if (_inputStream->duplexMode()) {
assert(!_outputStream);
}
@ -291,7 +294,12 @@ void StreamMgr::stopStream(const StreamType t) {
if (!_inputStream) {
throw rte("Input stream is not running");
}
/// Kills input stream
_inputStream = nullptr;
/// Send reset to all in data handlers
for(auto& handler: _inDataHandlers) {
handler->reset(nullptr);
}
} break;
case (StreamType::output): {
if (_inputStream && _inputStream->duplexMode()) {
@ -313,6 +321,9 @@ void StreamMgr::addInDataHandler(InDataHandler &handler) {
checkRightThread();
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
_inDataHandlers.push_back(&handler);
if(_inputStream) {
handler.reset(_inputStream.get());
}
}
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
checkRightThread();

View File

@ -39,6 +39,14 @@ class InDataHandler {
*/
virtual bool inCallback(const DaqData &daqdata) = 0;
/**
* @brief Reset in-data handler.
*
* @param daq New DAQ configuration of inCallback(). If nullptr is given,
* it means that the stream is stopped.
*/
virtual void reset(const Daq* daq = nullptr) = 0;
/**
* @brief This function should be called from the constructor of the
* implementation of InDataHandler. It will start the stream's calling of

View File

@ -60,10 +60,10 @@ class DT9837A : public Daq {
const us _nFramesPerBlock;
void threadFcn(InDaqCallback inCallback,
OutDaqCallback outcallback);
void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback);
public:
DaqDeviceHandle getHandle() const { return _handle; }
DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config);
~DT9837A() {
@ -108,8 +108,7 @@ public:
friend class OutBufHandler;
};
void DT9837A::start(InDaqCallback inCallback,
OutDaqCallback outCallback) {
void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
if (isRunning()) {
throw runtime_error("DAQ is already running");
@ -128,7 +127,7 @@ void DT9837A::start(InDaqCallback inCallback,
class BufHandler {
protected:
DaqDeviceHandle _handle;
DT9837A &daq;
const DataTypeDescriptor dtype_descr;
us nchannels, nFramesPerBlock;
double samplerate;
@ -139,11 +138,9 @@ protected:
long long buffer_mid_idx;
public:
BufHandler(DaqDeviceHandle handle, const DataTypeDescriptor dtype_descr,
const us nchannels, const us nFramesPerBlock,
const double samplerate)
: _handle(handle), dtype_descr(dtype_descr), nchannels(nchannels),
nFramesPerBlock(nFramesPerBlock), samplerate(samplerate),
BufHandler(DT9837A &daq, const us nchannels)
: daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels),
nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()),
buf(2 * nchannels *
nFramesPerBlock, // Watch the two here, the top and the bottom!
0),
@ -157,9 +154,7 @@ class InBufHandler : public BufHandler {
public:
InBufHandler(DT9837A &daq, InDaqCallback cb)
: BufHandler(daq._handle, daq.dtypeDescr(), daq.neninchannels(),
daq._nFramesPerBlock, daq.samplerate()),
cb(cb)
: BufHandler(daq, daq.neninchannels()), cb(cb)
{
DEBUGTRACE_ENTER;
@ -206,7 +201,7 @@ public:
}
assert(indescs.size() == nchannels);
DEBUGTRACE_MESSAGE("Starting input scan");
err = ulDaqInScan(_handle, indescs.data(), nchannels,
err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels,
2 * nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, inscanflags, buf.data());
if (err != ERR_NO_ERROR) {
@ -248,7 +243,7 @@ public:
ScanStatus status;
TransferStatus transferStatus;
UlError err = ulDaqInScanStatus(_handle, &status, &transferStatus);
UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
return false;
@ -281,7 +276,7 @@ public:
~InBufHandler() {
// At exit of the function, stop scanning.
DEBUGTRACE_ENTER;
UlError err = ulDaqInScanStop(_handle);
UlError err = ulDaqInScanStop(daq.getHandle());
if (err != ERR_NO_ERROR) {
showErr(err);
}
@ -293,15 +288,13 @@ class OutBufHandler : public BufHandler {
public:
OutBufHandler(DT9837A &daq, OutDaqCallback cb)
: BufHandler(daq._handle, daq.dtypeDescr(), daq.neninchannels(),
daq._nFramesPerBlock, daq.samplerate()),
cb(cb) {
: BufHandler(daq, daq.nenoutchannels()), cb(cb) {
DEBUGTRACE_MESSAGE("Starting output scan");
AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT;
ScanOption scanoptions = SO_CONTINUOUS;
UlError err =
ulAOutScan(_handle, 0, 0, BIP10VOLTS,
ulAOutScan(daq.getHandle(), 0, 0, BIP10VOLTS,
2 * nFramesPerBlock, // Watch the 2 here!
&samplerate, scanoptions, outscanflags, buf.data());
if (err != ERR_NO_ERROR) {
@ -321,7 +314,7 @@ public:
ScanStatus status;
TransferStatus transferStatus;
err = ulAOutScanStatus(_handle, &status, &transferStatus);
err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus);
if (err != ERR_NO_ERROR) {
showErr(err);
return false;
@ -363,15 +356,14 @@ public:
~OutBufHandler() {
DEBUGTRACE_ENTER;
UlError err = ulAOutScanStop(_handle);
UlError err = ulAOutScanStop(daq.getHandle());
if (err != ERR_NO_ERROR) {
showErr(err);
}
}
};
void DT9837A::threadFcn(InDaqCallback inCallback,
OutDaqCallback outCallback) {
void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) {
DEBUGTRACE_ENTER;
@ -501,7 +493,8 @@ DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config)
throw runtime_error("Fatal: could not set AC/DC coupling mode");
}
IepeMode iepe = inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED;
IepeMode iepe =
inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED;
err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe);
if (err != ERR_NO_ERROR) {
showErr(err);

View File

@ -12,6 +12,7 @@ set(lasp_dsp_files
lasp_timebuffer.cpp
lasp_slm.cpp
lasp_threadedindatahandler.cpp
lasp_ppm.cpp
)

View File

@ -39,6 +39,10 @@ SeriesBiquad::SeriesBiquad(const vd &filter_coefs) {
"Filter coefficients should have fourth element (a0) equal to 1.0");
}
}
std::unique_ptr<Filter> SeriesBiquad::clone() const {
// sos.as_col() concatenates all columns, exactly what we want.
return std::make_unique<SeriesBiquad>(sos.as_col());
}
void SeriesBiquad::reset() {
DEBUGTRACE_ENTER;
state.zeros();
@ -129,3 +133,6 @@ void BiquadBank::reset() {
f.reset();
}
}
std::unique_ptr<Filter> BiquadBank::clone() const {
return std::make_unique<BiquadBank>(_filters, _gains);
}

View File

@ -38,6 +38,7 @@ public:
virtual void filter(vd &inout) override final;
virtual ~SeriesBiquad() override {}
void reset() override final;
std::unique_ptr<Filter> clone() const override final;
};
/**
@ -49,6 +50,7 @@ class BiquadBank : public Filter {
std::vector<SeriesBiquad> _filters;
vd _gains;
public:
/**
* @brief Initialize biquadbank.
@ -60,6 +62,15 @@ public:
*/
BiquadBank(const dmat &filters, const vd *gains = nullptr);
/**
* @brief Construct biquad bank from already given set of series biquad filters and gain values
*
* @param filters The filters to set
* @param gains The gain values for each filter
*/
BiquadBank(std::vector<SeriesBiquad> filters,vd gains):
_filters(std::move(filters)),_gains(std::move(gains)) {}
/**
* @brief Set new gain values for each filter in the BiquadBank
*
@ -78,6 +89,7 @@ public:
virtual void filter(vd &inout) override final;
void reset() override final;
std::unique_ptr<Filter> clone() const override final;
};
/** @} */

View File

@ -10,20 +10,9 @@
#include "lasp_mathtypes.h"
/**
* Load wisdom data from a NULL-terminated character string. Note that at this
* point in time this is only used by the FFTW fft backend.
*
* @param[in] wisdom: Wisdom string
* \addtogroup dsp
* @{
*/
void load_fft_wisdom(const char* wisdom);
/**
* Creates a NULL-terminated string containing FFTW wisdom, or state knowledge
* from other libraries.
*
* @returns A NULL-terminated string. *Should be deallocated after being used*
*/
char* store_fft_wisdom(void);
class Fft_impl;
@ -105,3 +94,5 @@ class Fft {
static std::string store_fft_wisdom();
};
/** @} */

View File

@ -1,5 +1,6 @@
#pragma once
#include <vector>
#include <memory>
#include "lasp_types.h"
#include "lasp_mathtypes.h"
/**
@ -19,6 +20,13 @@ public:
* @brief Reset filter state to 0 (history was all-zero).
*/
virtual void reset() = 0;
/**
* @brief Clone a filter, to generate a copy.
*
* @return Copy of filter with state set to zero.
*/
virtual std::unique_ptr<Filter> clone() const = 0;
};

View File

@ -40,7 +40,9 @@
#endif // LASP_DOUBLE_PRECISION
using vd = arma::Col<d>;
using vrd = arma::Row<d>;
using vc = arma::Col<c>;
using vrc = arma::Row<c>;
using dmat = arma::Mat<d>;
using cmat = arma::Mat<c>;

89
src/lasp/dsp/lasp_ppm.cpp Normal file
View File

@ -0,0 +1,89 @@
#define DEBUGTRACE_ENABLED
#include "lasp_ppm.h"
#include "debugtrace.hpp"
#include <mutex>
using std::cerr;
using std::endl;
using Lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps)
: ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) {
DEBUGTRACE_ENTER;
start();
}
bool PPMHandler::inCallback_threaded(const DaqData &d) {
/* DEBUGTRACE_ENTER; */
std::scoped_lock<std::mutex> lck(_mtx);
dmat data = d.toFloat();
const us nchannels = _cur_max.size();
vrd maxabs = arma::max(arma::abs(data));
arma::uvec clip_indices = arma::find(maxabs > clip_point);
arma::uvec clip(nchannels, arma::fill::zeros);
clip.elem(clip_indices).fill(1);
arma::uvec update_max_idx = arma::find(maxabs > _cur_max);
arma::uvec update_max(nchannels, arma::fill::zeros);
update_max.elem(update_max_idx).fill(1);
assert(_cur_max.size() == _clip_time.size());
for (us i = 0; i < nchannels; i++) {
if (clip(i)) {
/// Reset clip counter
_clip_time(i) = 0;
} else if (_clip_time(i) > clip_indication_time) {
/// Reset to 'unclipped'
_clip_time(i) = -1;
} else if(_clip_time(i) >= 0) {
/// Add a bit of clip time
_clip_time(i) += _dt;
}
/* cerr << "maxabs(i)" << maxabs(i) << endl; */
/* cerr << "curmax(i)" << _cur_max(i) << endl; */
if (update_max(i)) {
_cur_max(i) = maxabs(i);
} else {
_cur_max(i) *= _alpha;
}
}
return true;
}
std::tuple<vd, arma::uvec> PPMHandler::getCurrentValue() const {
std::scoped_lock<std::mutex> lck(_mtx);
return {20 * arma::log10(_cur_max + arma::datum::eps).as_col(),
arma::find(_clip_time >= 1.0)};
/* return {(_cur_max + arma::datum::eps).as_col(), */
/* arma::find(_clip_time >= 1.0)}; */
}
void PPMHandler::reset(const Daq *daq) {
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx);
_cur_max.zeros();
_clip_time.fill(-1);
if (daq) {
_cur_max = vrd(daq->neninchannels(), arma::fill::zeros);
_clip_time = vd(daq->neninchannels(), arma::fill::value(-1));
const d fs = daq->samplerate();
DEBUGTRACE_PRINT(fs);
_dt = daq->framesPerBlock() / fs;
_alpha = std::max<d>(d_pow(10, -_dt*_decay_dBps / (20)), 0);
DEBUGTRACE_PRINT(_alpha);
}
}
PPMHandler::~PPMHandler() {
DEBUGTRACE_ENTER;
stop();
}

84
src/lasp/dsp/lasp_ppm.h Normal file
View File

@ -0,0 +1,84 @@
// lasp_ppm.h
//
// Author: J.A. de Jong - ASCEE
//
// Description: Peak Programme Meter
#pragma once
#include <memory>
#include "lasp_filter.h"
#include "lasp_mathtypes.h"
#include "lasp_threadedindatahandler.h"
/**
* \addtogroup dsp
* @{
*/
/**
* @brief Digital Peak Programme Meter (PPM). Let the latest maximum flow away
* with a certain amount of dB/s. If a new peak is found, it goes up again.
* Also detects clipping.
* */
class PPMHandler: public ThreadedInDataHandler {
/**
* @brief Assuming full scale of a signal is +/- 1.0. If a value is found
*/
static inline const d clip_point = 0.98;
/**
* @brief How long it takes in [s] after a clip event has happened, that we
* are actually still in 'clip' mode.
*/
static inline const d clip_indication_time = 3.0;
const d _decay_dBps;
/**
* @brief Inverse of block sampling frequency [s]: (framesPerBlock/fs)
*/
d _dt;
/**
* @brief 1st order low_pass value of levels, based on _decay_dBps;
*/
d _alpha;
mutable std::mutex _mtx;
/**
* @brief Current maximum values
*/
vrd _cur_max;
/**
* @brief How long ago the last clip has happened. Negative in case no clip
* has happened.
*/
vd _clip_time;
public:
/**
* @brief Constructs Peak Programme Meter
*
* @param mgr Stream Mgr to operate on
* @param decay_dBps The level decay in units dB/s, after a peak has been
* hit.
*/
PPMHandler(StreamMgr& mgr,const d decay_dBps = 20.0);
~PPMHandler();
/**
* @brief Get the current values of the PPM. Returns an array of levels in dB
* and a vector of True's.
*
* @return Current levels in [dB], clipping indication
*/
std::tuple<vd,arma::uvec> getCurrentValue() const;
bool inCallback_threaded(const DaqData& ) override final;
void reset(const Daq*) override final;
};
/** @} */

View File

@ -33,8 +33,16 @@ class ThreadedInDataHandler: public InDataHandler {
*
* @return true, to continue with sampling.
*/
virtual bool inCallback(const DaqData &daqdata) override;
virtual bool inCallback(const DaqData &daqdata) override final;
/**
* @brief This function should be overridden with an actual implementation,
* of what should happen on a different thread.
*
* @param DaqData Input daq data
*
* @return true on succes. False when an error occured.
*/
virtual bool inCallback_threaded(const DaqData&) = 0;
};

View File

@ -3,15 +3,12 @@
"""
Read data from stream and record sound and video at the same time
"""
import dataclasses
import logging
import os
import time
import h5py
import dataclasses, logging, os, time, h5py, threading
import numpy as np
from .lasp_cpp import (InDataHandler, StreamMgr, LASP_VERSION_MAJOR,
LASP_VERSION_MINOR)
from .lasp_atomic import Atomic
from .lasp_cpp import (LASP_VERSION_MAJOR, LASP_VERSION_MINOR, InDataHandler,
StreamMgr)
@dataclasses.dataclass
@ -25,10 +22,15 @@ class Recording:
Class used to perform a recording.
"""
def __init__(self, fn: str, streammgr: StreamMgr,
rectime: float = None, wait: bool = True,
def __init__(
self,
fn: str,
streammgr: StreamMgr,
rectime: float = None,
wait: bool = True,
progressCallback=None,
startDelay: float = 0):
startDelay: float = 0,
):
"""
Start a recording. Blocks if wait is set to True.
@ -44,7 +46,7 @@ class Recording:
startDelay: Optional delay added before the recording is *actually*
started in [s].
"""
ext = '.h5'
ext = ".h5"
if ext not in fn:
fn += ext
@ -70,11 +72,16 @@ class Recording:
# Counter of the number of blocks
self.ablockno = Atomic(0)
# Stop flag, set when recording is finished.
self.stop = Atomic(False)
# Mutex, on who is working with the H5py data
self.file_mtx = threading.Lock()
self.progressCallback = progressCallback
# Open the file
self.f = h5py.File(self.fn, 'w')
f = self.f
self.f = h5py.File(self.fn, "w")
# This flag is used to delete the file on finish(), and can be used
# when a recording is canceled.
@ -84,18 +91,17 @@ class Recording:
streamstatus = streammgr.getStreamStatus(StreamMgr.StreamType.input)
if not streamstatus.runningOK():
raise RuntimeError(
"Stream is not running properly. Please first start the stream")
"Stream is not running properly. Please first start the stream"
)
self.ad = None
logging.debug('Starting record....')
logging.debug("Starting record....")
self.stop = Atomic(False)
self.indh = InDataHandler(streammgr, self.inCallback)
self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback)
if wait:
logging.debug('Stop recording with CTRL-C')
logging.debug("Stop recording with CTRL-C")
try:
while not self.stop():
time.sleep(0.01)
@ -104,56 +110,66 @@ class Recording:
finally:
self.finish()
def firstFrames(self, data):
def resetCallback(self, daq):
"""
When the first frames arrive, we setup the file and add all metadata,
and make ready for storing data.
Function called with initial stream data.
"""
daq = self.smgr.getDaq(StreamMgr.StreamType.input)
with self.file_mtx:
in_ch = daq.enabledInChannels()
blocksize = daq.framesPerBlock()
self.blocksize = blocksize
# The array data type cannot
# datatype = daq.dataType()
dtype = np.dtype(data.dtype)
self.nchannels = daq.neninchannels()
self.fs = daq.samplerate()
f = self.f
f.attrs['LASP_VERSION_MAJOR'] = LASP_VERSION_MAJOR
f.attrs['LASP_VERSION_MINOR'] = LASP_VERSION_MINOR
f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR
f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR
# Set the bunch of attributes
f.attrs['samplerate'] = daq.samplerate()
f.attrs['nchannels'] = daq.neninchannels()
f.attrs['blocksize'] = blocksize
f.attrs['sensitivity'] = [ch.sensitivity for ch in in_ch]
f.attrs['channelNames'] = [ch.name for ch in in_ch]
# f.attrs['
f.attrs["samplerate"] = daq.samplerate()
f.attrs["nchannels"] = daq.neninchannels()
f.attrs["blocksize"] = blocksize
f.attrs["sensitivity"] = [ch.sensitivity for ch in in_ch]
f.attrs["channelNames"] = [ch.name for ch in in_ch]
# Add the start delay here, as firstFrames() is called right after the
# constructor is called.
f.attrs['time'] = time.time() + self.startDelay
f.attrs["time"] = time.time() + self.startDelay
nchannels = len(in_ch)
self.ad = f.create_dataset('audio',
(1, blocksize, nchannels),
dtype=dtype,
maxshape=(
None, # This means, we can add blocks
# indefinitely
blocksize,
nchannels),
compression='gzip'
)
self.fs = daq.samplerate()
# In V2, we do not store JSON metadata anymore, but just an enumeration
# index to a physical quantity.
f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch]
# Measured physical quantity metadata
# f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch]
# In V2, we do not store JSON metadata anymore, but just an enumeration
# index to a physical quantity.
f.attrs['qtys_enum_idx'] = [ch.qty.value for ch in in_ch]
def firstFrames(self, adata):
"""
Set up the dataset in which to store the audio data. This will create
the attribute `self.ad`
Args:
adata: Numpy array with data from DAQ
"""
# The array data type cannot
# datatype = daq.dataType()
dtype = np.dtype(adata.dtype)
self.ad = self.f.create_dataset(
"audio",
(1, self.blocksize, self.nchannels),
dtype=dtype,
maxshape=(
None, # This means, we can add blocks
# indefinitely
self.blocksize,
self.nchannels,
),
compression="gzip",
)
def inCallback(self, adata):
"""
@ -166,6 +182,12 @@ class Recording:
"""
if self.stop():
# Stop flag is raised. We do not add any data anymore.
return
with self.file_mtx:
if self.ad is None:
self.firstFrames(adata)
@ -186,19 +208,23 @@ class Recording:
remove the queue from the stream, etc.
"""
self.stop <<= True
logging.debug('Recording::finish()')
logging.debug("Recording::finish()")
# Remove indata handler
self.stop <<= True
with self.file_mtx:
# Remove indata handler, which also should remove callback function
# from StreamMgr.
self.indh = None
try:
# Close the recording file
self.f.close()
except Exception as e:
logging.error(f'Error closing file: {e}')
logging.error(f"Error closing file: {e}")
logging.debug('Recording ended')
logging.debug("Recording ended")
if self.deleteFile:
self.__deleteFile()
@ -209,7 +235,7 @@ class Recording:
try:
os.remove(self.fn)
except Exception as e:
logging.error(f'Error deleting file: {self.fn}')
logging.error(f"Error deleting file: {self.fn}: {str(e)}")
def __addTimeData(self, indata):
"""
@ -217,10 +243,6 @@ class Recording:
"""
# logging.debug('Recording::__addTimeData()')
if self.stop():
# Stop flag is raised. We stop recording here.
return
curT = self.ablockno() * self.blocksize / self.fs
# Increase the block counter
@ -238,9 +260,7 @@ class Recording:
curT = 0
ablockno = self.ablockno()
recstatus = RecordStatus(
curT=curT,
done=False)
recstatus = RecordStatus(curT=curT, done=False)
if self.progressCallback is not None:
self.progressCallback(recstatus)
@ -248,9 +268,9 @@ class Recording:
curT_rounded_to_seconds = int(curT)
if curT_rounded_to_seconds > self.curT_rounded_to_seconds:
self.curT_rounded_to_seconds = curT_rounded_to_seconds
print(f'{curT_rounded_to_seconds}', end='', flush=True)
print(f"{curT_rounded_to_seconds}", end="", flush=True)
else:
print('.', end='', flush=True)
print(".", end="", flush=True)
if self.rectime is not None and curT > self.rectime:
# We are done!
@ -263,4 +283,3 @@ class Recording:
# Add the data to the file, and resize the audio data blocks
self.ad.resize(ablockno, axis=0)
self.ad[ablockno - 1, :, :] = indata

View File

@ -1,5 +1,7 @@
#define DEBUGTRACE_ENABLED
/* #define DEBUGTRACE_ENABLED */
#include <carma>
#include "debugtrace.hpp"
#include "lasp_ppm.h"
#include "lasp_streammgr.h"
#include "lasp_threadedindatahandler.h"
#include <atomic>
@ -63,17 +65,17 @@ template <typename T> py::array_t<T> getPyArray(const DaqData &d) {
/**
* @brief Wraps the InDataHandler such that it calls a Python callback with a
* buffer of sample data. The Python callback is called from a different
* thread, using a Numpy argument as
* thread, using a Numpy array as argument.
*/
class PyIndataHandler : public ThreadedInDataHandler {
/**
* @brief The callback function that is called.
* @brief The callback functions that is called.
*/
py::function cb;
py::function cb, reset_callback;
public:
PyIndataHandler(StreamMgr &mgr, py::function cb)
: ThreadedInDataHandler(mgr), cb(cb) {
PyIndataHandler(StreamMgr &mgr, py::function cb, py::function reset_callback)
: ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) {
DEBUGTRACE_ENTER;
/// TODO: Note that if start() throws an exception, which means that the
@ -87,11 +89,12 @@ public:
DEBUGTRACE_ENTER;
stop();
}
void reset(const Daq *daq) override final { reset_callback(daq); }
/**
* @brief Reads from the buffer
*/
bool inCallback_threaded(const DaqData &d) {
bool inCallback_threaded(const DaqData &d) override final {
/* DEBUGTRACE_ENTER; */
@ -137,6 +140,15 @@ public:
};
void init_datahandler(py::module &m) {
py::class_<PyIndataHandler> h(m, "InDataHandler");
h.def(py::init<StreamMgr &, py::function, py::function>());
h.def(py::init<StreamMgr &, py::function>());
py::class_<PPMHandler> ppm(m, "PPMHandler");
ppm.def(py::init<StreamMgr &, const d>());
ppm.def(py::init<StreamMgr &>());
ppm.def("getCurrentValue", [](const PPMHandler &ppm) {
auto [level, clip] = ppm.getCurrentValue();
return py::make_tuple(carma::col_to_arr(level), carma::col_to_arr(clip));
});
}

View File

@ -24,6 +24,7 @@ t = np.linspace(0, int((tend*fs)//fs), int(tend*fs), endpoint=False)
in_ = np.random.randn(t.size)
out = bq.filter(in_)
from scipy.signal import welch
nfft = 48000
freq, H1 = welch(out[:,0],