diff --git a/examples/test_input.ipynb b/examples/test_input.ipynb index 78895bd..ffeab5b 100644 --- a/examples/test_input.ipynb +++ b/examples/test_input.ipynb @@ -10,10 +10,37 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "ac06df04", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "make: Entering directory '/home/anne/wip/mycode/lasp'\n", + "make[1]: Entering directory '/home/anne/wip/mycode/lasp'\n", + "make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n", + "\u001b[35m\u001b[1mConsolidate compiler generated dependencies of target lasp_dsp_lib\u001b[0m\n", + "make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n", + "[ 42%] Built target lasp_dsp_lib\n", + "make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n", + "\u001b[35m\u001b[1mConsolidate compiler generated dependencies of target lasp_device_lib\u001b[0m\n", + "make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n", + "[ 67%] Built target lasp_device_lib\n", + "make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n", + "\u001b[35m\u001b[1mConsolidate compiler generated dependencies of target lasp_cpp\u001b[0m\n", + "make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n", + "make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n", + "[ 71%] \u001b[32mBuilding CXX object src/lasp/CMakeFiles/lasp_cpp.dir/pybind11/lasp_dsp_pybind.cpp.o\u001b[0m\n", + "[ 75%] \u001b[32m\u001b[1mLinking CXX shared module lasp_cpp.cpython-310-x86_64-linux-gnu.so\u001b[0m\n", + "make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n", + "[100%] Built target lasp_cpp\n", + "make[1]: Leaving directory '/home/anne/wip/mycode/lasp'\n", + "make: Leaving directory '/home/anne/wip/mycode/lasp'\n" + ] + } + ], "source": [ "# !make -j -C ~/wip/mycode/lasp" ] diff --git a/src/lasp/device/lasp_daqdata.cpp b/src/lasp/device/lasp_daqdata.cpp index 94174b3..8a4b34b 100644 --- a/src/lasp/device/lasp_daqdata.cpp +++ b/src/lasp/device/lasp_daqdata.cpp @@ -1,16 +1,15 @@ /* #define DEBUGTRACE_ENABLED */ -#include "debugtrace.hpp" -#include #include "lasp_daqdata.h" +#include "debugtrace.hpp" +#include "lasp_mathtypes.h" +#include DEBUGTRACE_VARIABLES; - DaqData::DaqData(const us nchannels, const us nframes, const DataTypeDescriptor::DataType dtype) : nchannels(nchannels), nframes(nframes), dtype(dtype), - dtype_descr(dtype_map.at(dtype)), -sw(dtype_descr.sw) { + dtype_descr(dtype_map.at(dtype)), sw(dtype_descr.sw) { static_assert(sizeof(char) == 1, "Invalid char size"); const DataTypeDescriptor &desc = dtype_map.at(dtype); @@ -26,7 +25,82 @@ void DaqData::copyInFromRaw(const std::vector &ptrs) { } } -void DaqData::copyToRaw(const us channel,uint8_t *ptr) { +void DaqData::copyToRaw(const us channel, uint8_t *ptr) { std::copy(&_data[sw * nframes * channel], &_data[sw * nframes * (channel + 1)], ptr); } + +template d convertToFloat(const int_type val) { + if constexpr (std::is_integral::value) { + return static_cast(val) / std::numeric_limits::max(); + } else { + return static_cast(val); + } +} +template +inline vd channelToFloat(const byte_t *vals, const us nframes) { + vd res(nframes); + for (us i = 0; i < nframes; i++) { + res(i) = convertToFloat(reinterpret_cast(vals)[i]); + } + return res; +} +template +inline dmat allToFloat(const byte_t *vals, const us nframes, const us nchannels) { + dmat res(nframes, nchannels); + for (us j = 0; j < nchannels; j++) { + for (us i = 0; i < nframes; i++) { + res(i, j) = convertToFloat( + reinterpret_cast(vals)[i + j * nframes]); + } + } + return res; +} + +vd DaqData::toFloat(const us channel_no) const { + using DataType = DataTypeDescriptor::DataType; + switch (dtype) { + case (DataType::dtype_int8): { + return channelToFloat(raw_ptr(0, channel_no), nframes); + } break; + case (DataType::dtype_int16): { + return channelToFloat(raw_ptr(0, channel_no), nframes); + + } break; + case (DataType::dtype_int32): { + return channelToFloat(raw_ptr(0, channel_no), nframes); + } break; + case (DataType::dtype_fl32): { + return channelToFloat(raw_ptr(0, channel_no), nframes); + } break; + case (DataType::dtype_fl64): { + return channelToFloat(raw_ptr(0, channel_no), nframes); + } break; + default: + throw std::runtime_error("BUG"); + } // End of switch +} + +dmat DaqData::toFloat() const { + dmat result(nframes, nchannels); + using DataType = DataTypeDescriptor::DataType; + switch (dtype) { + case (DataType::dtype_int8): { + return allToFloat(raw_ptr(0), nframes, nchannels); + } break; + case (DataType::dtype_int16): { + return allToFloat(raw_ptr(0), nframes, nchannels); + } break; + case (DataType::dtype_int32): { + return allToFloat(raw_ptr(0), nframes, nchannels); + } break; + case (DataType::dtype_fl32): { + return allToFloat(raw_ptr(0), nframes, nchannels); + } break; + case (DataType::dtype_fl64): { + return allToFloat(raw_ptr(0), nframes, nchannels); + } break; + default: + throw std::runtime_error("BUG"); + } // End of switch +} diff --git a/src/lasp/device/lasp_daqdata.h b/src/lasp/device/lasp_daqdata.h index 4cdfba7..84e91ee 100644 --- a/src/lasp/device/lasp_daqdata.h +++ b/src/lasp/device/lasp_daqdata.h @@ -4,10 +4,14 @@ #include #include #include +#include /** \addtogroup device * @{ */ +class Daq; + +using byte_t = char; /** * @brief Data coming from / going to DAQ. **Non-interleaved format**, which @@ -18,7 +22,7 @@ protected: /** * @brief Storage for the actual data. */ - std::vector _data; + std::vector _data; public: /** @@ -58,7 +62,8 @@ public: /** * @brief Initialize using no allocation */ - DaqData() : DaqData(0, 0, DataTypeDescriptor::DataType::dtype_int8) {} + DaqData() + : DaqData(0, 0, DataTypeDescriptor::DataType::dtype_int8) {} virtual ~DaqData() = default; DaqData &operator=(const DaqData &) = default; @@ -70,13 +75,16 @@ public: * * @return Pointer to sample, not casted to final type */ - int8_t *raw_ptr(const us frame = 0, const us channel = 0) { + byte_t *raw_ptr(const us frame = 0, const us channel = 0) { + return &(_data.data()[sw * (frame + channel * nframes)]); + } + const byte_t *raw_ptr(const us frame = 0, const us channel = 0) const { return &(_data.data()[sw * (frame + channel * nframes)]); } - void setSlow(const us frame, const us channel, const int8_t* val) { - for(us i=0;i toFloat() const; + + /** + * @brief Convert samples to floating point values and return a nframes + * column vector of floats. For data that is not already floating-point, + * the data is scaled back from MAX_INT to +1.0. + * + * @param channel The channel to convert + * + * @return Array of floats + */ + arma::Col toFloat(const us channel_no) const; }; template class TypedDaqData : public DaqData { @@ -135,5 +163,6 @@ public: void copyToRaw(const us channel, T *buffer) { DaqData::copyToRaw(channel, reinterpret_cast(buffer)); } + }; /** @} */ diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index b42eee5..9e8156c 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -275,6 +275,9 @@ void StreamMgr::startStream(const DaqConfiguration &config) { if (isInput) { _inputStream = std::move(daq); + for(auto& handler: _inDataHandlers) { + handler->reset(_inputStream.get()); + } if (_inputStream->duplexMode()) { assert(!_outputStream); } @@ -291,7 +294,12 @@ void StreamMgr::stopStream(const StreamType t) { if (!_inputStream) { throw rte("Input stream is not running"); } + /// Kills input stream _inputStream = nullptr; + /// Send reset to all in data handlers + for(auto& handler: _inDataHandlers) { + handler->reset(nullptr); + } } break; case (StreamType::output): { if (_inputStream && _inputStream->duplexMode()) { @@ -313,6 +321,9 @@ void StreamMgr::addInDataHandler(InDataHandler &handler) { checkRightThread(); std::scoped_lock lck(_inDataHandler_mtx); _inDataHandlers.push_back(&handler); + if(_inputStream) { + handler.reset(_inputStream.get()); + } } void StreamMgr::removeInDataHandler(InDataHandler &handler) { checkRightThread(); diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index 8498dbb..cc16cb2 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -39,6 +39,14 @@ class InDataHandler { */ virtual bool inCallback(const DaqData &daqdata) = 0; + /** + * @brief Reset in-data handler. + * + * @param daq New DAQ configuration of inCallback(). If nullptr is given, + * it means that the stream is stopped. + */ + virtual void reset(const Daq* daq = nullptr) = 0; + /** * @brief This function should be called from the constructor of the * implementation of InDataHandler. It will start the stream's calling of diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index a335ffd..d4eb78e 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -60,10 +60,10 @@ class DT9837A : public Daq { const us _nFramesPerBlock; - void threadFcn(InDaqCallback inCallback, - OutDaqCallback outcallback); + void threadFcn(InDaqCallback inCallback, OutDaqCallback outcallback); public: + DaqDeviceHandle getHandle() const { return _handle; } DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config); ~DT9837A() { @@ -108,8 +108,7 @@ public: friend class OutBufHandler; }; -void DT9837A::start(InDaqCallback inCallback, - OutDaqCallback outCallback) { +void DT9837A::start(InDaqCallback inCallback, OutDaqCallback outCallback) { DEBUGTRACE_ENTER; if (isRunning()) { throw runtime_error("DAQ is already running"); @@ -128,7 +127,7 @@ void DT9837A::start(InDaqCallback inCallback, class BufHandler { protected: - DaqDeviceHandle _handle; + DT9837A &daq; const DataTypeDescriptor dtype_descr; us nchannels, nFramesPerBlock; double samplerate; @@ -139,11 +138,9 @@ protected: long long buffer_mid_idx; public: - BufHandler(DaqDeviceHandle handle, const DataTypeDescriptor dtype_descr, - const us nchannels, const us nFramesPerBlock, - const double samplerate) - : _handle(handle), dtype_descr(dtype_descr), nchannels(nchannels), - nFramesPerBlock(nFramesPerBlock), samplerate(samplerate), + BufHandler(DT9837A &daq, const us nchannels) + : daq(daq), dtype_descr(daq.dtypeDescr()), nchannels(nchannels), + nFramesPerBlock(daq.framesPerBlock()), samplerate(daq.samplerate()), buf(2 * nchannels * nFramesPerBlock, // Watch the two here, the top and the bottom! 0), @@ -157,9 +154,7 @@ class InBufHandler : public BufHandler { public: InBufHandler(DT9837A &daq, InDaqCallback cb) - : BufHandler(daq._handle, daq.dtypeDescr(), daq.neninchannels(), - daq._nFramesPerBlock, daq.samplerate()), - cb(cb) + : BufHandler(daq, daq.neninchannels()), cb(cb) { DEBUGTRACE_ENTER; @@ -206,7 +201,7 @@ public: } assert(indescs.size() == nchannels); DEBUGTRACE_MESSAGE("Starting input scan"); - err = ulDaqInScan(_handle, indescs.data(), nchannels, + err = ulDaqInScan(daq.getHandle(), indescs.data(), nchannels, 2 * nFramesPerBlock, // Watch the 2 here! &samplerate, scanoptions, inscanflags, buf.data()); if (err != ERR_NO_ERROR) { @@ -248,7 +243,7 @@ public: ScanStatus status; TransferStatus transferStatus; - UlError err = ulDaqInScanStatus(_handle, &status, &transferStatus); + UlError err = ulDaqInScanStatus(daq.getHandle(), &status, &transferStatus); if (err != ERR_NO_ERROR) { showErr(err); return false; @@ -281,7 +276,7 @@ public: ~InBufHandler() { // At exit of the function, stop scanning. DEBUGTRACE_ENTER; - UlError err = ulDaqInScanStop(_handle); + UlError err = ulDaqInScanStop(daq.getHandle()); if (err != ERR_NO_ERROR) { showErr(err); } @@ -293,15 +288,13 @@ class OutBufHandler : public BufHandler { public: OutBufHandler(DT9837A &daq, OutDaqCallback cb) - : BufHandler(daq._handle, daq.dtypeDescr(), daq.neninchannels(), - daq._nFramesPerBlock, daq.samplerate()), - cb(cb) { + : BufHandler(daq, daq.nenoutchannels()), cb(cb) { DEBUGTRACE_MESSAGE("Starting output scan"); AOutScanFlag outscanflags = AOUTSCAN_FF_DEFAULT; ScanOption scanoptions = SO_CONTINUOUS; UlError err = - ulAOutScan(_handle, 0, 0, BIP10VOLTS, + ulAOutScan(daq.getHandle(), 0, 0, BIP10VOLTS, 2 * nFramesPerBlock, // Watch the 2 here! &samplerate, scanoptions, outscanflags, buf.data()); if (err != ERR_NO_ERROR) { @@ -321,7 +314,7 @@ public: ScanStatus status; TransferStatus transferStatus; - err = ulAOutScanStatus(_handle, &status, &transferStatus); + err = ulAOutScanStatus(daq.getHandle(), &status, &transferStatus); if (err != ERR_NO_ERROR) { showErr(err); return false; @@ -363,15 +356,14 @@ public: ~OutBufHandler() { DEBUGTRACE_ENTER; - UlError err = ulAOutScanStop(_handle); + UlError err = ulAOutScanStop(daq.getHandle()); if (err != ERR_NO_ERROR) { showErr(err); } } }; -void DT9837A::threadFcn(InDaqCallback inCallback, - OutDaqCallback outCallback) { +void DT9837A::threadFcn(InDaqCallback inCallback, OutDaqCallback outCallback) { DEBUGTRACE_ENTER; @@ -501,7 +493,8 @@ DT9837A::DT9837A(const DeviceInfo &devinfo, const DaqConfiguration &config) throw runtime_error("Fatal: could not set AC/DC coupling mode"); } - IepeMode iepe = inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED; + IepeMode iepe = + inchannel_config.at(ch).IEPEEnabled ? IEPE_ENABLED : IEPE_DISABLED; err = ulAISetConfig(_handle, AI_CFG_CHAN_IEPE_MODE, ch, iepe); if (err != ERR_NO_ERROR) { showErr(err); diff --git a/src/lasp/dsp/CMakeLists.txt b/src/lasp/dsp/CMakeLists.txt index b5284cf..981cebd 100644 --- a/src/lasp/dsp/CMakeLists.txt +++ b/src/lasp/dsp/CMakeLists.txt @@ -12,6 +12,7 @@ set(lasp_dsp_files lasp_timebuffer.cpp lasp_slm.cpp lasp_threadedindatahandler.cpp + lasp_ppm.cpp ) diff --git a/src/lasp/dsp/lasp_biquadbank.cpp b/src/lasp/dsp/lasp_biquadbank.cpp index 51fc976..a1cb18b 100644 --- a/src/lasp/dsp/lasp_biquadbank.cpp +++ b/src/lasp/dsp/lasp_biquadbank.cpp @@ -12,13 +12,13 @@ SeriesBiquad::SeriesBiquad(const vd &filter_coefs) { DEBUGTRACE_ENTER; if (filter_coefs.n_cols != 1) { throw rte("Expected filter coefficients for a single SeriesBiquad as a " - "single column with length 6 x n_filters"); + "single column with length 6 x n_filters"); } if (filter_coefs.n_rows % 6 != 0) { cerr << "Number of rows given: " << filter_coefs.n_rows << endl; throw rte("filter_coefs should be multiple of 6, given: " + - std::to_string(filter_coefs.n_rows)); + std::to_string(filter_coefs.n_rows)); } us nfilters = filter_coefs.n_rows / 6; @@ -32,13 +32,17 @@ SeriesBiquad::SeriesBiquad(const vd &filter_coefs) { /// Check if third row in this matrix equals unity. if (!arma::approx_equal(sos.row(3), arma::rowvec(nfilters, arma::fill::ones), - "absdiff", 1e-9)) { + "absdiff", 1e-9)) { std::cerr << "Read row: " << sos.row(3) << endl; throw rte( "Filter coefficients should have fourth element (a0) equal to 1.0"); } } +std::unique_ptr SeriesBiquad::clone() const { + // sos.as_col() concatenates all columns, exactly what we want. + return std::make_unique(sos.as_col()); +} void SeriesBiquad::reset() { DEBUGTRACE_ENTER; state.zeros(); @@ -107,14 +111,14 @@ void BiquadBank::filter(vd &inout) { auto &pool = getPool(); for (us i = 0; i < res.n_cols; i++) { futs.emplace_back(pool.submit( - [&](us i) { + [&](us i) { // Copy column vd col = inout; _filters[i].filter(col); return col; - }, // Launch a task to filter. - i // Column i as argument to the lambda function above. - )); + }, // Launch a task to filter. + i // Column i as argument to the lambda function above. + )); } // Zero-out in-out and sum-up the filtered values @@ -129,3 +133,6 @@ void BiquadBank::reset() { f.reset(); } } +std::unique_ptr BiquadBank::clone() const { + return std::make_unique(_filters, _gains); +} diff --git a/src/lasp/dsp/lasp_biquadbank.h b/src/lasp/dsp/lasp_biquadbank.h index f4bd3d3..0fd6fab 100644 --- a/src/lasp/dsp/lasp_biquadbank.h +++ b/src/lasp/dsp/lasp_biquadbank.h @@ -38,6 +38,7 @@ public: virtual void filter(vd &inout) override final; virtual ~SeriesBiquad() override {} void reset() override final; + std::unique_ptr clone() const override final; }; /** @@ -49,6 +50,7 @@ class BiquadBank : public Filter { std::vector _filters; vd _gains; + public: /** * @brief Initialize biquadbank. @@ -60,6 +62,15 @@ public: */ BiquadBank(const dmat &filters, const vd *gains = nullptr); + /** + * @brief Construct biquad bank from already given set of series biquad filters and gain values + * + * @param filters The filters to set + * @param gains The gain values for each filter + */ + BiquadBank(std::vector filters,vd gains): + _filters(std::move(filters)),_gains(std::move(gains)) {} + /** * @brief Set new gain values for each filter in the BiquadBank * @@ -78,6 +89,7 @@ public: virtual void filter(vd &inout) override final; void reset() override final; + std::unique_ptr clone() const override final; }; /** @} */ diff --git a/src/lasp/dsp/lasp_fft.h b/src/lasp/dsp/lasp_fft.h index 099784d..6e3bcf6 100644 --- a/src/lasp/dsp/lasp_fft.h +++ b/src/lasp/dsp/lasp_fft.h @@ -10,20 +10,9 @@ #include "lasp_mathtypes.h" /** - * Load wisdom data from a NULL-terminated character string. Note that at this - * point in time this is only used by the FFTW fft backend. - * - * @param[in] wisdom: Wisdom string + * \addtogroup dsp + * @{ */ -void load_fft_wisdom(const char* wisdom); - -/** - * Creates a NULL-terminated string containing FFTW wisdom, or state knowledge - * from other libraries. - * - * @returns A NULL-terminated string. *Should be deallocated after being used* - */ -char* store_fft_wisdom(void); class Fft_impl; @@ -105,3 +94,5 @@ class Fft { static std::string store_fft_wisdom(); }; + +/** @} */ diff --git a/src/lasp/dsp/lasp_filter.h b/src/lasp/dsp/lasp_filter.h index e895643..8cf4a12 100644 --- a/src/lasp/dsp/lasp_filter.h +++ b/src/lasp/dsp/lasp_filter.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include "lasp_types.h" #include "lasp_mathtypes.h" /** @@ -19,6 +20,13 @@ public: * @brief Reset filter state to 0 (history was all-zero). */ virtual void reset() = 0; + + /** + * @brief Clone a filter, to generate a copy. + * + * @return Copy of filter with state set to zero. + */ + virtual std::unique_ptr clone() const = 0; }; diff --git a/src/lasp/dsp/lasp_mathtypes.h b/src/lasp/dsp/lasp_mathtypes.h index f7078e2..9d808d9 100644 --- a/src/lasp/dsp/lasp_mathtypes.h +++ b/src/lasp/dsp/lasp_mathtypes.h @@ -40,7 +40,9 @@ #endif // LASP_DOUBLE_PRECISION using vd = arma::Col; +using vrd = arma::Row; using vc = arma::Col; +using vrc = arma::Row; using dmat = arma::Mat; using cmat = arma::Mat; diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp new file mode 100644 index 0000000..f374d41 --- /dev/null +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -0,0 +1,89 @@ +#define DEBUGTRACE_ENABLED +#include "lasp_ppm.h" +#include "debugtrace.hpp" +#include + +using std::cerr; +using std::endl; + +using Lck = std::scoped_lock; +using rte = std::runtime_error; + +PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps) + : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { + DEBUGTRACE_ENTER; + start(); + } +bool PPMHandler::inCallback_threaded(const DaqData &d) { + /* DEBUGTRACE_ENTER; */ + std::scoped_lock lck(_mtx); + dmat data = d.toFloat(); + + const us nchannels = _cur_max.size(); + + vrd maxabs = arma::max(arma::abs(data)); + + arma::uvec clip_indices = arma::find(maxabs > clip_point); + arma::uvec clip(nchannels, arma::fill::zeros); + clip.elem(clip_indices).fill(1); + + arma::uvec update_max_idx = arma::find(maxabs > _cur_max); + arma::uvec update_max(nchannels, arma::fill::zeros); + update_max.elem(update_max_idx).fill(1); + + assert(_cur_max.size() == _clip_time.size()); + + for (us i = 0; i < nchannels; i++) { + if (clip(i)) { + /// Reset clip counter + _clip_time(i) = 0; + } else if (_clip_time(i) > clip_indication_time) { + /// Reset to 'unclipped' + _clip_time(i) = -1; + } else if(_clip_time(i) >= 0) { + /// Add a bit of clip time + _clip_time(i) += _dt; + } + + /* cerr << "maxabs(i)" << maxabs(i) << endl; */ + /* cerr << "curmax(i)" << _cur_max(i) << endl; */ + if (update_max(i)) { + _cur_max(i) = maxabs(i); + } else { + _cur_max(i) *= _alpha; + } + } + return true; +} + +std::tuple PPMHandler::getCurrentValue() const { + std::scoped_lock lck(_mtx); + + return {20 * arma::log10(_cur_max + arma::datum::eps).as_col(), + arma::find(_clip_time >= 1.0)}; + /* return {(_cur_max + arma::datum::eps).as_col(), */ + /* arma::find(_clip_time >= 1.0)}; */ +} + +void PPMHandler::reset(const Daq *daq) { + DEBUGTRACE_ENTER; + std::scoped_lock lck(_mtx); + _cur_max.zeros(); + _clip_time.fill(-1); + + if (daq) { + _cur_max = vrd(daq->neninchannels(), arma::fill::zeros); + _clip_time = vd(daq->neninchannels(), arma::fill::value(-1)); + const d fs = daq->samplerate(); + DEBUGTRACE_PRINT(fs); + _dt = daq->framesPerBlock() / fs; + + _alpha = std::max(d_pow(10, -_dt*_decay_dBps / (20)), 0); + DEBUGTRACE_PRINT(_alpha); + } +} + +PPMHandler::~PPMHandler() { + DEBUGTRACE_ENTER; + stop(); +} diff --git a/src/lasp/dsp/lasp_ppm.h b/src/lasp/dsp/lasp_ppm.h new file mode 100644 index 0000000..a9926fb --- /dev/null +++ b/src/lasp/dsp/lasp_ppm.h @@ -0,0 +1,84 @@ +// lasp_ppm.h +// +// Author: J.A. de Jong - ASCEE +// +// Description: Peak Programme Meter +#pragma once +#include +#include "lasp_filter.h" +#include "lasp_mathtypes.h" +#include "lasp_threadedindatahandler.h" + +/** + * \addtogroup dsp + * @{ + */ + + +/** + * @brief Digital Peak Programme Meter (PPM). Let the latest maximum flow away + * with a certain amount of dB/s. If a new peak is found, it goes up again. + * Also detects clipping. + * */ +class PPMHandler: public ThreadedInDataHandler { + + /** + * @brief Assuming full scale of a signal is +/- 1.0. If a value is found + */ + static inline const d clip_point = 0.98; + + /** + * @brief How long it takes in [s] after a clip event has happened, that we + * are actually still in 'clip' mode. + */ + static inline const d clip_indication_time = 3.0; + + const d _decay_dBps; + + /** + * @brief Inverse of block sampling frequency [s]: (framesPerBlock/fs) + */ + d _dt; + + /** + * @brief 1st order low_pass value of levels, based on _decay_dBps; + */ + d _alpha; + + mutable std::mutex _mtx; + /** + * @brief Current maximum values + */ + vrd _cur_max; + + /** + * @brief How long ago the last clip has happened. Negative in case no clip + * has happened. + */ + vd _clip_time; + + public: + /** + * @brief Constructs Peak Programme Meter + * + * @param mgr Stream Mgr to operate on + * @param decay_dBps The level decay in units dB/s, after a peak has been + * hit. + */ + PPMHandler(StreamMgr& mgr,const d decay_dBps = 20.0); + ~PPMHandler(); + + /** + * @brief Get the current values of the PPM. Returns an array of levels in dB + * and a vector of True's. + * + * @return Current levels in [dB], clipping indication + */ + std::tuple getCurrentValue() const; + + bool inCallback_threaded(const DaqData& ) override final; + void reset(const Daq*) override final; + +}; + +/** @} */ diff --git a/src/lasp/dsp/lasp_slm.cpp b/src/lasp/dsp/lasp_slm.cpp index 6e6c4fd..6ed72a1 100644 --- a/src/lasp/dsp/lasp_slm.cpp +++ b/src/lasp/dsp/lasp_slm.cpp @@ -165,10 +165,10 @@ dmat SLM::run(const vd &input_orig) { #pragma omp parallel for for (us i = 0; i < _bandpass.size(); i++) { res.col(i) = run_single(input, i); - /* DEBUGTRACE_PRINT(_bandpass.size()); */ - /* DEBUGTRACE_PRINT(res.n_cols); */ - /* DEBUGTRACE_PRINT(res.n_rows); */ - /* DEBUGTRACE_PRINT(futs.size()); */ + /* DEBUGTRACE_PRINT(_bandpass.size()); */ + /* DEBUGTRACE_PRINT(res.n_cols); */ + /* DEBUGTRACE_PRINT(res.n_rows); */ + /* DEBUGTRACE_PRINT(futs.size()); */ // Update the total number of samples harvested so far. NOTE: *This should be // done AFTER the threads are done!!!* diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index 9556ad9..02ef725 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -33,8 +33,16 @@ class ThreadedInDataHandler: public InDataHandler { * * @return true, to continue with sampling. */ - virtual bool inCallback(const DaqData &daqdata) override; + virtual bool inCallback(const DaqData &daqdata) override final; + /** + * @brief This function should be overridden with an actual implementation, + * of what should happen on a different thread. + * + * @param DaqData Input daq data + * + * @return true on succes. False when an error occured. + */ virtual bool inCallback_threaded(const DaqData&) = 0; }; diff --git a/src/lasp/lasp_record.py b/src/lasp/lasp_record.py index 6d402a3..3dacecf 100644 --- a/src/lasp/lasp_record.py +++ b/src/lasp/lasp_record.py @@ -3,15 +3,12 @@ """ Read data from stream and record sound and video at the same time """ -import dataclasses -import logging -import os -import time -import h5py +import dataclasses, logging, os, time, h5py, threading import numpy as np -from .lasp_cpp import (InDataHandler, StreamMgr, LASP_VERSION_MAJOR, -LASP_VERSION_MINOR) + from .lasp_atomic import Atomic +from .lasp_cpp import (LASP_VERSION_MAJOR, LASP_VERSION_MINOR, InDataHandler, + StreamMgr) @dataclasses.dataclass @@ -25,10 +22,15 @@ class Recording: Class used to perform a recording. """ - def __init__(self, fn: str, streammgr: StreamMgr, - rectime: float = None, wait: bool = True, - progressCallback=None, - startDelay: float = 0): + def __init__( + self, + fn: str, + streammgr: StreamMgr, + rectime: float = None, + wait: bool = True, + progressCallback=None, + startDelay: float = 0, + ): """ Start a recording. Blocks if wait is set to True. @@ -44,7 +46,7 @@ class Recording: startDelay: Optional delay added before the recording is *actually* started in [s]. """ - ext = '.h5' + ext = ".h5" if ext not in fn: fn += ext @@ -70,11 +72,16 @@ class Recording: # Counter of the number of blocks self.ablockno = Atomic(0) + # Stop flag, set when recording is finished. + self.stop = Atomic(False) + + # Mutex, on who is working with the H5py data + self.file_mtx = threading.Lock() + self.progressCallback = progressCallback # Open the file - self.f = h5py.File(self.fn, 'w') - f = self.f + self.f = h5py.File(self.fn, "w") # This flag is used to delete the file on finish(), and can be used # when a recording is canceled. @@ -84,18 +91,17 @@ class Recording: streamstatus = streammgr.getStreamStatus(StreamMgr.StreamType.input) if not streamstatus.runningOK(): raise RuntimeError( - "Stream is not running properly. Please first start the stream") + "Stream is not running properly. Please first start the stream" + ) self.ad = None - logging.debug('Starting record....') + logging.debug("Starting record....") - self.stop = Atomic(False) - - self.indh = InDataHandler(streammgr, self.inCallback) + self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback) if wait: - logging.debug('Stop recording with CTRL-C') + logging.debug("Stop recording with CTRL-C") try: while not self.stop(): time.sleep(0.01) @@ -104,56 +110,66 @@ class Recording: finally: self.finish() - def firstFrames(self, data): + def resetCallback(self, daq): """ - When the first frames arrive, we setup the file and add all metadata, - and make ready for storing data. + Function called with initial stream data. + """ + with self.file_mtx: + in_ch = daq.enabledInChannels() + blocksize = daq.framesPerBlock() + self.blocksize = blocksize + self.nchannels = daq.neninchannels() + self.fs = daq.samplerate() + + f = self.f + + f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR + f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR + + # Set the bunch of attributes + f.attrs["samplerate"] = daq.samplerate() + f.attrs["nchannels"] = daq.neninchannels() + f.attrs["blocksize"] = blocksize + f.attrs["sensitivity"] = [ch.sensitivity for ch in in_ch] + f.attrs["channelNames"] = [ch.name for ch in in_ch] + + # Add the start delay here, as firstFrames() is called right after the + # constructor is called. + f.attrs["time"] = time.time() + self.startDelay + + # In V2, we do not store JSON metadata anymore, but just an enumeration + # index to a physical quantity. + f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch] + + # Measured physical quantity metadata + # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch] + + def firstFrames(self, adata): + """ + Set up the dataset in which to store the audio data. This will create + the attribute `self.ad` + + Args: + adata: Numpy array with data from DAQ + """ - daq = self.smgr.getDaq(StreamMgr.StreamType.input) - in_ch = daq.enabledInChannels() - blocksize = daq.framesPerBlock() - self.blocksize = blocksize # The array data type cannot # datatype = daq.dataType() - dtype = np.dtype(data.dtype) + dtype = np.dtype(adata.dtype) - f = self.f - - f.attrs['LASP_VERSION_MAJOR'] = LASP_VERSION_MAJOR - f.attrs['LASP_VERSION_MINOR'] = LASP_VERSION_MINOR - - # Set the bunch of attributes - f.attrs['samplerate'] = daq.samplerate() - f.attrs['nchannels'] = daq.neninchannels() - f.attrs['blocksize'] = blocksize - f.attrs['sensitivity'] = [ch.sensitivity for ch in in_ch] - f.attrs['channelNames'] = [ch.name for ch in in_ch] - # f.attrs[' - - # Add the start delay here, as firstFrames() is called right after the - # constructor is called. - f.attrs['time'] = time.time() + self.startDelay - - nchannels = len(in_ch) - self.ad = f.create_dataset('audio', - (1, blocksize, nchannels), - dtype=dtype, - maxshape=( - None, # This means, we can add blocks - # indefinitely - blocksize, - nchannels), - compression='gzip' - ) - self.fs = daq.samplerate() - - # Measured physical quantity metadata - # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch] - - # In V2, we do not store JSON metadata anymore, but just an enumeration - # index to a physical quantity. - f.attrs['qtys_enum_idx'] = [ch.qty.value for ch in in_ch] + self.ad = self.f.create_dataset( + "audio", + (1, self.blocksize, self.nchannels), + dtype=dtype, + maxshape=( + None, # This means, we can add blocks + # indefinitely + self.blocksize, + self.nchannels, + ), + compression="gzip", + ) def inCallback(self, adata): """ @@ -166,11 +182,17 @@ class Recording: """ - if self.ad is None: - self.firstFrames(adata) + if self.stop(): + # Stop flag is raised. We do not add any data anymore. + return - self.__addTimeData(adata) - return True + with self.file_mtx: + + if self.ad is None: + self.firstFrames(adata) + + self.__addTimeData(adata) + return True def setDelete(self, val: bool): """ @@ -186,21 +208,25 @@ class Recording: remove the queue from the stream, etc. """ + logging.debug("Recording::finish()") + self.stop <<= True - logging.debug('Recording::finish()') - # Remove indata handler - self.indh = None + with self.file_mtx: - try: - # Close the recording file - self.f.close() - except Exception as e: - logging.error(f'Error closing file: {e}') + # Remove indata handler, which also should remove callback function + # from StreamMgr. + self.indh = None - logging.debug('Recording ended') - if self.deleteFile: - self.__deleteFile() + try: + # Close the recording file + self.f.close() + except Exception as e: + logging.error(f"Error closing file: {e}") + + logging.debug("Recording ended") + if self.deleteFile: + self.__deleteFile() def __deleteFile(self): """ @@ -209,7 +235,7 @@ class Recording: try: os.remove(self.fn) except Exception as e: - logging.error(f'Error deleting file: {self.fn}') + logging.error(f"Error deleting file: {self.fn}: {str(e)}") def __addTimeData(self, indata): """ @@ -217,11 +243,7 @@ class Recording: """ # logging.debug('Recording::__addTimeData()') - if self.stop(): - # Stop flag is raised. We stop recording here. - return - - curT = self.ablockno()*self.blocksize/self.fs + curT = self.ablockno() * self.blocksize / self.fs # Increase the block counter self.ablockno += 1 @@ -238,9 +260,7 @@ class Recording: curT = 0 ablockno = self.ablockno() - recstatus = RecordStatus( - curT=curT, - done=False) + recstatus = RecordStatus(curT=curT, done=False) if self.progressCallback is not None: self.progressCallback(recstatus) @@ -248,9 +268,9 @@ class Recording: curT_rounded_to_seconds = int(curT) if curT_rounded_to_seconds > self.curT_rounded_to_seconds: self.curT_rounded_to_seconds = curT_rounded_to_seconds - print(f'{curT_rounded_to_seconds}', end='', flush=True) + print(f"{curT_rounded_to_seconds}", end="", flush=True) else: - print('.', end='', flush=True) + print(".", end="", flush=True) if self.rectime is not None and curT > self.rectime: # We are done! @@ -262,5 +282,4 @@ class Recording: # Add the data to the file, and resize the audio data blocks self.ad.resize(ablockno, axis=0) - self.ad[ablockno-1, :, :] = indata - + self.ad[ablockno - 1, :, :] = indata diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index ab0a79a..7c42fd5 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -1,5 +1,7 @@ -#define DEBUGTRACE_ENABLED +/* #define DEBUGTRACE_ENABLED */ +#include #include "debugtrace.hpp" +#include "lasp_ppm.h" #include "lasp_streammgr.h" #include "lasp_threadedindatahandler.h" #include @@ -63,17 +65,17 @@ template py::array_t getPyArray(const DaqData &d) { /** * @brief Wraps the InDataHandler such that it calls a Python callback with a * buffer of sample data. The Python callback is called from a different - * thread, using a Numpy argument as + * thread, using a Numpy array as argument. */ class PyIndataHandler : public ThreadedInDataHandler { /** - * @brief The callback function that is called. + * @brief The callback functions that is called. */ - py::function cb; + py::function cb, reset_callback; public: - PyIndataHandler(StreamMgr &mgr, py::function cb) - : ThreadedInDataHandler(mgr), cb(cb) { + PyIndataHandler(StreamMgr &mgr, py::function cb, py::function reset_callback) + : ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) { DEBUGTRACE_ENTER; /// TODO: Note that if start() throws an exception, which means that the @@ -87,11 +89,12 @@ public: DEBUGTRACE_ENTER; stop(); } + void reset(const Daq *daq) override final { reset_callback(daq); } /** * @brief Reads from the buffer */ - bool inCallback_threaded(const DaqData &d) { + bool inCallback_threaded(const DaqData &d) override final { /* DEBUGTRACE_ENTER; */ @@ -137,6 +140,15 @@ public: }; void init_datahandler(py::module &m) { py::class_ h(m, "InDataHandler"); + h.def(py::init()); - h.def(py::init()); + py::class_ ppm(m, "PPMHandler"); + ppm.def(py::init()); + ppm.def(py::init()); + + ppm.def("getCurrentValue", [](const PPMHandler &ppm) { + auto [level, clip] = ppm.getCurrentValue(); + + return py::make_tuple(carma::col_to_arr(level), carma::col_to_arr(clip)); + }); } diff --git a/test/test_biquadbank.py b/test/test_biquadbank.py index 3809f5e..ed6f6ab 100644 --- a/test/test_biquadbank.py +++ b/test/test_biquadbank.py @@ -24,6 +24,7 @@ t = np.linspace(0, int((tend*fs)//fs), int(tend*fs), endpoint=False) in_ = np.random.randn(t.size) out = bq.filter(in_) + from scipy.signal import welch nfft = 48000 freq, H1 = welch(out[:,0], @@ -37,4 +38,4 @@ freq, H2 = welch(in_, plt.semilogx(freq,10*np.log10(np.abs(H1/H2))) omg, H_ex = sosfreqz(filt) -plt.semilogx(omg/(2*np.pi)*fs, 20*np.log10(np.abs(H_ex))) \ No newline at end of file +plt.semilogx(omg/(2*np.pi)*fs, 20*np.log10(np.abs(H_ex)))