Added real time spectra: RtAps. All seem to work. Bugfix with SiQtys storage. Added extra lock guards for constructor and destructors of InDataHandlers (otherwise race conditions occur). Changed time_constant integer to fs_tau in AvPowerSpectra.

This commit is contained in:
Anne de Jong 2022-10-06 21:13:21 +02:00
parent 01674db1e8
commit f7a49dc4ff
16 changed files with 249 additions and 39 deletions

View File

@ -1,4 +1,4 @@
#define DEBUGTRACE_ENABLED
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_daqconfig.h"

View File

@ -318,14 +318,18 @@ void StreamMgr::stopStream(const StreamType t) {
}
void StreamMgr::addInDataHandler(InDataHandler &handler) {
DEBUGTRACE_ENTER;
checkRightThread();
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
_inDataHandlers.push_back(&handler);
if(_inputStream) {
handler.reset(_inputStream.get());
} else {
handler.reset(nullptr);
}
}
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
DEBUGTRACE_ENTER;
checkRightThread();
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
_inDataHandlers.remove(&handler);

View File

@ -6,6 +6,7 @@ set(lasp_dsp_files
lasp_siggen_impl.cpp
lasp_window.cpp
lasp_fft.cpp
lasp_rtaps.cpp
lasp_avpowerspectra.cpp
lasp_biquadbank.cpp
lasp_thread.cpp

View File

@ -1,5 +1,6 @@
/* #define DEBUGTRACE_ENABLED */
#include "lasp_avpowerspectra.h"
#include "lasp_mathtypes.h"
#include "debugtrace.hpp"
#include <cmath>
#include <optional>
@ -39,7 +40,7 @@ arma::Cube<c> PowerSpectra::compute(const dmat &input) {
for (us i = 0; i < input.n_cols; i++) {
/// This one can be run in parallel without any problem. Note that it is
/// the inner loop that is run in parallel.
#pragma omp parallel for
#pragma omp parallel for
for (us j = 0; j < input.n_cols; j++) {
output.slice(j).col(i) =
_scale_fac * (rfft.col(i) % arma::conj(rfft.col(j)));
@ -53,7 +54,7 @@ arma::Cube<c> PowerSpectra::compute(const dmat &input) {
AvPowerSpectra::AvPowerSpectra(const us nfft, const Window::WindowType w,
const d overlap_percentage,
const int time_constant)
const d time_constant_times_fs)
: _ps(nfft, w) {
DEBUGTRACE_ENTER;
@ -66,21 +67,28 @@ AvPowerSpectra::AvPowerSpectra(const us nfft, const Window::WindowType w,
throw rte("Overlap is too high. Results in no jump. Please "
"choose a smaller overlap percentage or a higher nfft");
}
if (time_constant < 0) {
if (time_constant_times_fs < 0) {
_mode = Mode::Averaging;
} else if (time_constant == 0) {
} else if (time_constant_times_fs == 0) {
_mode = Mode::Spectrogram;
} else {
_mode = Mode::Leaking;
_alpha = exp(-static_cast<d>(nfft - _overlap_keep) / time_constant);
_alpha = d_exp(-static_cast<d>((nfft - _overlap_keep)/time_constant_times_fs));
DEBUGTRACE_PRINT(_alpha);
}
}
std::optional<arma::cx_cube> AvPowerSpectra::compute(const dmat &timedata) {
/* DEBUGTRACE_ENTER; */
/* DEBUGTRACE_PRINT(timedata.n_rows); */
/* DEBUGTRACE_PRINT(_ps.nfft); */
_timeBuf.push(timedata);
if (_est.n_cols == 0) {
_est = arma::cx_cube(_ps.nfft / 2 + 1, timedata.n_cols, timedata.n_cols,
arma::fill::zeros);
}
std::optional<arma::cx_cube> res;
@ -101,6 +109,7 @@ std::optional<arma::cx_cube> AvPowerSpectra::compute(const dmat &timedata) {
}
} break;
case (Mode::Leaking): {
/* DEBUGTRACE_PRINT("Leaking mode"); */
if (arma::size(_est) == arma::size(0, 0, 0)) {
_est = _ps.compute(samples.value());
} else {
@ -111,7 +120,7 @@ std::optional<arma::cx_cube> AvPowerSpectra::compute(const dmat &timedata) {
i++;
}
if (i > 0) {
return _est;
return std::make_optional(_est);
}
return std::nullopt;
}

View File

@ -14,7 +14,7 @@
* @{
*/
/**
/**
* @brief Computes single-sided cross-power spectra for a group of channels.
* Only a single block of length fft, no averaging. This class should normally
* not be used. Please refer to AvPowerSpectra instead.
@ -110,12 +110,12 @@ public:
* @param overlap_percentage A number 0 < overlap_percentage <= 100. It
* determines the amount of overlap used in Welch' method. A typical value is
* 50 %, i.e. 50.
* @param time_constant Value should either be < 0, indicating that the
* @param fs_tau Value should either be < 0, indicating that the
* estimate is averages over all time data.
* For a value = 0 the instantaneous power spectrum is returned, which can be
* interpreted as the spectrogram. For a value > 0 a exponential forgetting is
* used, where the value is used as the time constant such that the decay
* follows approximately the trend exp(-n/time_constant), where n is the
* follows approximately the trend exp(-n/fs_tau), where n is the
* sample number in the power spectra. To choose 'fast' time weighting, set
* time_constant to the value of fs*0.125, where fs denotes the sampling
* frequency.
@ -123,7 +123,7 @@ public:
AvPowerSpectra(const us nfft = 2048,
const Window::WindowType w = Window::WindowType::Hann,
const d overlap_percentage = 50.,
const int time_constant = -1);
const d fs_tau = -1);
AvPowerSpectra(const AvPowerSpectra &) = delete;
AvPowerSpectra &operator=(const AvPowerSpectra &) = delete;
@ -131,16 +131,16 @@ public:
void reset() { _est.reset(); }
/**
* @brief Comnpute an update of the power spectra based on given time data.
* @brief Compute an update of the power spectra based on given time data.
* Note that the number of channels is determined from the first time this
* function is called. If a later call has an incompatible number of
* channels, a runtime error is thrown.
*
* @param timedata
*
* @return Optionally, a reference (NOT OWNING POINTER) to a new estimate of
* the power spectra. An update is only given if the amount of new time data
* is enough to compute a new estimate.
* @return Optionally, a copy of the latest estimate of the power spectra. An
* update is only given if the amount of new time data is enough to compute a
* new estimate. It can be checked by operator bool().
*
*/
std::optional<arma::cx_cube> compute(const dmat &timedata);

View File

@ -13,6 +13,7 @@
#define d_acos acos
#define d_sqrt sqrt
#define c_exp cexp
#define d_exp exp
#define d_sin sin
#define d_cos cos
#define d_pow pow
@ -30,6 +31,7 @@
#define d_acos acosf
#define d_sqrt sqrtf
#define c_exp cexpf
#define d_exp expf
#define d_sin sinf
#define d_cos cosf
#define d_pow powf
@ -45,5 +47,6 @@ using vc = arma::Col<c>;
using vrc = arma::Row<c>;
using dmat = arma::Mat<d>;
using cmat = arma::Mat<c>;
using cube = arma::Cube<c>;
const d number_pi = arma::datum::pi;

View File

@ -1,4 +1,4 @@
#define DEBUGTRACE_ENABLED
/* #define DEBUGTRACE_ENABLED */
#include "lasp_ppm.h"
#include "debugtrace.hpp"
#include <mutex>
@ -10,10 +10,11 @@ using Lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps)
: ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) {
DEBUGTRACE_ENTER;
start();
}
: ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) {
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx);
start();
}
bool PPMHandler::inCallback_threaded(const DaqData &d) {
/* DEBUGTRACE_ENTER; */
std::scoped_lock<std::mutex> lck(_mtx);
@ -57,6 +58,8 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) {
}
std::tuple<vd, arma::uvec> PPMHandler::getCurrentValue() const {
/* DEBUGTRACE_ENTER; */
std::scoped_lock<std::mutex> lck(_mtx);
arma::uvec clips(_clip_time.size(), arma::fill::zeros);
@ -66,13 +69,14 @@ std::tuple<vd, arma::uvec> PPMHandler::getCurrentValue() const {
}
void PPMHandler::reset(const Daq *daq) {
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx);
_cur_max.zeros();
_clip_time.fill(-1);
if (daq) {
_cur_max = vrd(daq->neninchannels(), arma::fill::zeros);
_clip_time = vd(daq->neninchannels(), arma::fill::value(-1));
const d fs = daq->samplerate();
DEBUGTRACE_PRINT(fs);
@ -80,10 +84,15 @@ void PPMHandler::reset(const Daq *daq) {
_alpha = std::max<d>(d_pow(10, -_dt * _decay_dBps / (20)), 0);
DEBUGTRACE_PRINT(_alpha);
} else {
_cur_max.clear();
_clip_time.clear();
}
}
PPMHandler::~PPMHandler() {
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx);
stop();
}

View File

@ -0,0 +1,65 @@
#define DEBUGTRACE_ENABLED
#include "lasp_rtaps.h"
#include "debugtrace.hpp"
#include <mutex>
using std::cerr;
using std::endl;
bool RtAps::inCallback_threaded(const DaqData &data) {
/* DEBUGTRACE_ENTER; */
std::scoped_lock<std::mutex> lck(_mtx);
dmat fltdata = data.toFloat();
const us nchannels = fltdata.n_cols;
if (_filterPrototype) {
// Adjust number of filters, if necessary
if (nchannels > _freqWeightingFilter.size()) {
while (nchannels > _freqWeightingFilter.size()) {
_freqWeightingFilter.emplace_back(_filterPrototype->clone());
}
for (auto &filter : _freqWeightingFilter) {
filter->reset();
}
}
// Apply filtering
#pragma omp parallel for
for (us i = 0; i < nchannels; i++) {
vd col = fltdata.col(i);
_freqWeightingFilter.at(i)->filter(col);
fltdata.col(i) = col;
}
} // End of if(_filterPrototype)
std::optional<arma::cx_cube> res = _ps.compute(fltdata);
if (res.has_value()) {
/* DEBUGTRACE_PRINT("Data ready!"); */
_latest_est = std::make_unique<cube>(std::move(res.value()));
}
return true;
}
void RtAps::reset(const Daq *daq) { // Explicitly say
// to GCC that
// the argument is
// not used.
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx);
_ps.reset();
_latest_est.reset();
}
std::unique_ptr<cube> RtAps::getCurrentValue() {
/* DEBUGTRACE_ENTER; */
std::scoped_lock<std::mutex> lck(_mtx);
return std::move(_latest_est);
}

70
src/lasp/dsp/lasp_rtaps.h Normal file
View File

@ -0,0 +1,70 @@
// lasp_threadedaps.h
//
// Author: J.A. de Jong - ASCEE
//
// Description: Real Time Spectrum Viewer
#pragma once
#include "lasp_avpowerspectra.h"
#include "lasp_filter.h"
#include "lasp_mathtypes.h"
#include "lasp_threadedindatahandler.h"
#include <memory>
#include <mutex>
/**
* \addtogroup dsp
* @{
*
* \addtogroup rt
* @{
*/
class RtAps : public ThreadedInDataHandler {
std::mutex _mtx;
std::unique_ptr<Filter> _filterPrototype;
std::vector<std::unique_ptr<Filter>> _freqWeightingFilter;
bool _data_ready = false;
std::unique_ptr<cube> _latest_est;
AvPowerSpectra _ps;
public:
/**
* @brief Initialize RtAps.
*
* @param mgr StreamMgr singleton reference
* @param freqWeightingFilter Optionally: the frequency weighting filter.
* Nullptr should be given for Z-weighting.
* @param For all other arguments, see constructor of AvPowerSpectra
*/
RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, const us nfft = 2048,
const Window::WindowType w = Window::WindowType::Hann,
const d overlap_percentage = 50., const d time_constant = -1)
: ThreadedInDataHandler(mgr),
_ps(nfft, w, overlap_percentage, time_constant) {
std::scoped_lock<std::mutex> lck(_mtx);
if (freqWeightingFilter != nullptr) {
_filterPrototype = freqWeightingFilter->clone();
}
start();
}
~RtAps() {
std::scoped_lock<std::mutex> lck(_mtx);
stop();
}
/**
* @brief Get the latest estimate of the power spectra
*
* @return Optionally, if available, the latest values
*/
std::unique_ptr<cube> getCurrentValue();
bool inCallback_threaded(const DaqData &) override final;
void reset(const Daq *) override final;
};
/** @} */
/** @} */

View File

@ -16,7 +16,9 @@ vd Siggen::genSignal(const us nframes) {
DEBUGTRACE_ENTER;
mutexlock lck(_mtx);
DEBUGTRACE_PRINT(nframes);
vd signal(nframes, arma::fill::value(_dc_offset));
if (!_muted) {
vd signal_dynamic = _level_linear * genSignalUnscaled(nframes);
if (_filter) {

View File

@ -1,4 +1,4 @@
#define DEBUGTRACE_ENABLED
/* #define DEBUGTRACE_ENABLED */
#include "lasp_threadedindatahandler.h"
#include "debugtrace.hpp"
#include "lasp_thread.h"

View File

@ -33,6 +33,11 @@ class ThreadedInDataHandler: public InDataHandler {
void threadFcn();
public:
/**
* @brief Initialize a ThreadedInDataHandler
*
* @param mgr StreamMgr singleton reference
*/
ThreadedInDataHandler(StreamMgr& mgr);
~ThreadedInDataHandler();

View File

@ -123,10 +123,21 @@ class SIQtys(Enum):
physical quantity information.
"""
for qty in SIQtys:
if qty.value.cpp_enum.value == enum:
if qty.value.cpp_enum == enum:
return qty.value
raise RuntimeError(f'Qty corresponding to enum {enum} not found')
@staticmethod
def fromInt(val):
"""
Convert integer index from - say - a measurement file back into
physical quantity information.
"""
for qty in SIQtys:
if qty.value.cpp_enum.value == val:
return qty.value
raise RuntimeError(f'Qty corresponding to integer {val} not found')
@dataclass
class CalSetting:

View File

@ -262,7 +262,7 @@ class Measurement:
try:
qtys_enum_idx = f.attrs['qtys_enum_idx']
self._qtys = [SIQtys.fromCppEnum(idx) for idx in qtys_enum_idx]
self._qtys = [SIQtys.fromInt(idx) for idx in qtys_enum_idx]
except KeyError:
pass
@ -318,7 +318,7 @@ class Measurement:
for ch in chcfg:
chname.append(ch.name)
sens.append(ch.sensitivity)
qtys.append(SIQtys.fromCppEnum(ch.qty))
qtys.append(SIQtys.fromInt(ch.qty))
self.channelNames = chname
self.sensitivity = sens

View File

@ -2,6 +2,8 @@
#include "lasp_avpowerspectra.h"
#include "lasp_biquadbank.h"
#include "lasp_fft.h"
#include "lasp_streammgr.h"
#include "lasp_filter.h"
#include "lasp_slm.h"
#include "lasp_window.h"
#include <iostream>
@ -70,7 +72,7 @@ void init_dsp(py::module &m) {
/// AvPowerSpectra
py::class_<AvPowerSpectra> aps(m, "AvPowerSpectra");
aps.def(py::init<const us, const Window::WindowType, const d, const int>(),
aps.def(py::init<const us, const Window::WindowType, const d, const d>(),
py::arg("nfft") = 2048,
py::arg("windowType") = Window::WindowType::Hann,
py::arg("overlap_percentage") = 50.0, py::arg("time_constant") = -1);
@ -98,5 +100,6 @@ void init_dsp(py::module &m) {
slm.def("Leq", &SLM::Leq);
slm.def("Lmax", &SLM::Lmax);
slm.def_static("suggestedDownSamplingFac", &SLM::suggestedDownSamplingFac);
}
/** @} */

View File

@ -1,19 +1,14 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include <carma>
#include <armadillo>
#define DEBUGTRACE_ENABLED
#include "debugtrace.hpp"
#include "lasp_ppm.h"
#include "lasp_rtaps.h"
#include "lasp_streammgr.h"
#include "lasp_threadedindatahandler.h"
#include <atomic>
#include <chrono>
#include <pybind11/buffer_info.h>
#include <pybind11/cast.h>
#include <pybind11/gil.h>
#include <pybind11/numpy.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <pybind11/stl.h>
#include <thread>
using namespace std::literals::chrono_literals;
using std::cerr;
@ -138,18 +133,51 @@ public:
return true;
}
};
void init_datahandler(py::module &m) {
py::class_<PyIndataHandler> h(m, "InDataHandler");
h.def(py::init<StreamMgr &, py::function, py::function>());
/// Peak Programme Meter
py::class_<PPMHandler> ppm(m, "PPMHandler");
ppm.def(py::init<StreamMgr &, const d>());
ppm.def(py::init<StreamMgr &>());
ppm.def("getCurrentValue", [](const PPMHandler &ppm) {
auto [level, clip] = ppm.getCurrentValue();
std::tuple<vd, arma::uvec> tp = ppm.getCurrentValue();
return py::make_tuple(carma::col_to_arr(std::move(level)),
carma::col_to_arr(std::move(clip)));
return py::make_tuple(carma::col_to_arr(std::get<0>(tp)),
carma::col_to_arr(std::get<1>(tp)));
});
/// Real time Aps
///
py::class_<RtAps> rtaps(m, "RtAps");
rtaps.def(py::init<StreamMgr &, // StreamMgr
Filter *const, // FreqWeighting filter
const us, // Nfft
const Window::WindowType, // Window
const d, // Overlap percentage 0<=o<100
const d // Time constant
>(),
py::arg("streammgr"), // StreamMgr
py::arg("preFilter").none(true),
/// Below list of arguments *SHOULD* be same as for
/// AvPowerSpectra constructor!
py::arg("nfft") = 2048, //
py::arg("windowType") = Window::WindowType::Hann, //
py::arg("overlap_percentage") = 50.0, //
py::arg("time_constant") = -1 //
);
rtaps.def("getCurrentValue", [](RtAps &rt) {
std::unique_ptr<cube> val = rt.getCurrentValue();
if (val) {
return carma::cube_to_arr<c>(std::move(*val));
}
return carma::cube_to_arr<c>(cube(1, 0, 0));
});
}