Improved code for cubes. RtAps is not working. Don't know why. TimeBuffer code has better readability. Bugfix in output from Column to Numpy. GIL release for AvPowerSpectra::compute via Pybind.

This commit is contained in:
Anne de Jong 2022-10-11 14:50:44 +02:00
parent 6aa1262c73
commit 4764a52de8
16 changed files with 157 additions and 114 deletions

View File

@ -53,11 +53,11 @@ Daq::Daq(const DeviceInfo &devinfo, const DaqConfiguration &config)
if (!config.match(devinfo)) {
throw rte("DaqConfiguration does not match device info");
}
if (neninchannels(false) > ninchannels) {
if (neninchannels(false) > devinfo.ninchannels) {
throw rte(
"Number of enabled input channels is higher than device capability");
}
if (nenoutchannels() > noutchannels) {
if (nenoutchannels() > devinfo.noutchannels) {
throw rte(
"Number of enabled output channels is higher than device capability");
}

View File

@ -25,6 +25,7 @@ DaqData::DaqData(const us nframes, const us nchannels,
DEBUGTRACE_ENTER;
DEBUGTRACE_PRINT(sw);
assert(sw > 0 && sw <= 8);
_data = new (std::align_val_t{8}) byte_t[sw * nchannels * nframes];
if (!_data) {
throw rte("Could not allocate memory for DaqData!");
@ -180,15 +181,18 @@ dmat DaqData::toFloat() const {
return toFloat<int8_t>();
break;
case (DataType::dtype_int16):
DEBUGTRACE_PRINT("Dtype = int16");
return toFloat<int16_t>();
break;
case (DataType::dtype_int32):
return toFloat<int32_t>();
break;
case (DataType::dtype_fl32):
DEBUGTRACE_PRINT("Dtype = float32");
return toFloat<float>();
break;
case (DataType::dtype_fl64):
DEBUGTRACE_PRINT("Dtype = float64");
return toFloat<double>();
break;
default:
@ -206,5 +210,10 @@ void DaqData::print() const {
cout << "First sample of first channel (as float)" << toFloat(0,0) << endl;
cout << "Last sample of first channel (as float)" << toFloat(nframes-1,0) << endl;
cout << "Last sample of last channel (as float)" << toFloat(nframes-1,nchannels-1) << endl;
dmat data = toFloat();
vrd max = arma::max(data, 0);
vrd min = arma::min(data, 0);
cout << "Maximum value in buf: " << max << endl;
cout << "Minumum value in buf: " << min << endl;
}

View File

@ -260,9 +260,9 @@ public:
us monitorOffset = monitorOutput ? 1 : 0;
/* /// Put the output monitor in front */
if (monitorOutput) {
for (us sample = 0; sample < nFramesPerBlock; sample++) {
data.value<double>(sample, 0) =
buf[totalOffset + sample * nchannels + (nchannels - 1)];
for (us frame = 0; frame < nFramesPerBlock; frame++) {
data.value<double>(frame, 0) =
buf[totalOffset + (frame * nchannels) + (nchannels - 1)];
}
}

View File

@ -24,7 +24,7 @@ PowerSpectra::PowerSpectra(const vd &window)
DEBUGTRACE_PRINT(win_pow);
}
arma::Cube<c> PowerSpectra::compute(const dmat &input) {
ccube PowerSpectra::compute(const dmat &input) {
/// Run very often. Silence this one.
/* DEBUGTRACE_ENTER; */
@ -36,7 +36,7 @@ arma::Cube<c> PowerSpectra::compute(const dmat &input) {
cmat rfft = _fft.fft(input_tmp);
arma::cx_cube output(rfft.n_rows, input.n_cols, input.n_cols);
ccube output(rfft.n_rows, input.n_cols, input.n_cols);
for (us i = 0; i < input.n_cols; i++) {
/// This one can be run in parallel without any problem. Note that it is
/// the inner loop that is run in parallel.
@ -63,6 +63,7 @@ AvPowerSpectra::AvPowerSpectra(const us nfft, const Window::WindowType w,
}
_overlap_keep = (nfft * overlap_percentage) / 100;
DEBUGTRACE_PRINT(_overlap_keep);
if (_overlap_keep >= nfft) {
throw rte("Overlap is too high. Results in no jump. Please "
"choose a smaller overlap percentage or a higher nfft");
@ -77,39 +78,40 @@ AvPowerSpectra::AvPowerSpectra(const us nfft, const Window::WindowType w,
DEBUGTRACE_PRINT(_alpha);
}
}
void AvPowerSpectra::reset() {
_timeBuf.reset();
_est.reset();
std::optional<arma::cx_cube> AvPowerSpectra::compute(const dmat &timedata) {
/* DEBUGTRACE_ENTER; */
/* DEBUGTRACE_PRINT(timedata.n_rows); */
/* DEBUGTRACE_PRINT(_ps.nfft); */
_timeBuf.push(timedata);
if (_est.n_cols == 0) {
_est = arma::cx_cube(_ps.nfft / 2 + 1, timedata.n_cols, timedata.n_cols,
arma::fill::zeros);
}
std::optional<arma::cx_cube> res;
std::optional<ccube> AvPowerSpectra::compute(const dmat &timedata) {
DEBUGTRACE_ENTER;
DEBUGTRACE_PRINT(timedata.n_rows);
DEBUGTRACE_PRINT(_ps.nfft);
_timeBuf.push(timedata);
us i = 0;
while (auto samples = _timeBuf.pop(_ps.nfft, _overlap_keep)) {
/* DEBUGTRACE_PRINT((int)_mode); */
DEBUGTRACE_PRINT((int)_mode);
switch (_mode) {
case (Mode::Spectrogram): {
DEBUGTRACE_PRINT("Spectrogram mode");
_est = _ps.compute(samples.value());
} break;
case (Mode::Averaging): {
DEBUGTRACE_PRINT("Averaging mode");
n_averages++;
if (n_averages == 1) {
_est = _ps.compute(samples.value());
} else {
_est = _est * (static_cast<d>(n_averages - 1) / n_averages) +
_est = (static_cast<d>(n_averages - 1) / n_averages) * _est +
_ps.compute(samples.value()) / n_averages;
}
} break;
case (Mode::Leaking): {
/* DEBUGTRACE_PRINT("Leaking mode"); */
DEBUGTRACE_PRINT("Leaking mode");
if (arma::size(_est) == arma::size(0, 0, 0)) {
_est = _ps.compute(samples.value());
} else {
@ -124,7 +126,7 @@ std::optional<arma::cx_cube> AvPowerSpectra::compute(const dmat &timedata) {
}
return std::nullopt;
}
std::optional<arma::cx_cube> AvPowerSpectra::get_est() {
std::optional<ccube> AvPowerSpectra::get_est() const {
if (_est.n_cols > 0)
return _est;
return std::nullopt;

View File

@ -65,7 +65,7 @@ public:
* can be accessed by: C(f, i, j). Storage is such that frequency components
* are contiguous.
*/
arma::Cube<c> compute(const dmat &input);
ccube compute(const dmat &input);
};
/**
@ -89,7 +89,7 @@ class AvPowerSpectra {
/**
* @brief Current estimate of cross-spectral density
*/
arma::cx_cube _est;
ccube _est;
/**
* @brief Buffer of storage of time data.
@ -128,7 +128,11 @@ public:
AvPowerSpectra(const AvPowerSpectra &) = delete;
AvPowerSpectra &operator=(const AvPowerSpectra &) = delete;
void reset() { _est.reset(); }
/**
* @brief Reset to empty state. Clears the time buffer and sets estimator to
* empty.
*/
void reset();
/**
* @brief Compute an update of the power spectra based on given time data.
@ -143,7 +147,7 @@ public:
* new estimate. It can be checked by operator bool().
*
*/
std::optional<arma::cx_cube> compute(const dmat &timedata);
std::optional<ccube> compute(const dmat &timedata);
/**
* @brief Returns the latest estimate of cps (cross-power spectra.
@ -151,7 +155,7 @@ public:
* @return Pointer (reference, not owning!) to spectral estimate date, or
* nullptr, in case nothing could yet be computed.
*/
std::optional<arma::cx_cube> get_est();
std::optional<ccube> get_est() const;
/**
* @brief The overlap is rounded to a certain amount of time samples. This

View File

@ -47,6 +47,7 @@ using vc = arma::Col<c>;
using vrc = arma::Row<c>;
using dmat = arma::Mat<d>;
using cmat = arma::Mat<c>;
using cube = arma::Cube<c>;
using ccube = arma::Cube<c>;
using dcube = arma::Cube<d>;
const d number_pi = arma::datum::pi;

View File

@ -6,42 +6,55 @@
using std::cerr;
using std::endl;
RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter,
const us nfft,
const Window::WindowType w,
const d overlap_percentage, const d time_constant)
: ThreadedInDataHandler(mgr),
_ps(nfft, w, overlap_percentage, time_constant) {
/* if (freqWeightingFilter != nullptr) { */
/* _filterPrototype = freqWeightingFilter->clone(); */
/* } */
start();
}
RtAps::~RtAps() {
std::scoped_lock<std::mutex> lck(_mtx);
stop();
}
bool RtAps::inCallback_threaded(const DaqData &data) {
/* DEBUGTRACE_ENTER; */
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx);
dmat fltdata = data.toFloat();
const us nchannels = fltdata.n_cols;
data.print();
/* const us nchannels = fltdata.n_cols; */
if (_filterPrototype) {
/* if (_filterPrototype) { */
// Adjust number of filters, if necessary
if (nchannels > _freqWeightingFilter.size()) {
while (nchannels > _freqWeightingFilter.size()) {
_freqWeightingFilter.emplace_back(_filterPrototype->clone());
}
/* // Adjust number of filters, if necessary */
/* if (nchannels > _freqWeightingFilter.size()) { */
/* while (nchannels > _freqWeightingFilter.size()) { */
/* _freqWeightingFilter.emplace_back(_filterPrototype->clone()); */
/* } */
for (auto &filter : _freqWeightingFilter) {
filter->reset();
}
}
/* for (auto &filter : _freqWeightingFilter) { */
/* filter->reset(); */
/* } */
/* } */
// Apply filtering
#pragma omp parallel for
for (us i = 0; i < nchannels; i++) {
vd col = fltdata.col(i);
_freqWeightingFilter.at(i)->filter(col);
fltdata.col(i) = col;
}
} // End of if(_filterPrototype)
/* // Apply filtering */
/* #pragma omp parallel for */
/* for (us i = 0; i < nchannels; i++) { */
/* vd col = fltdata.col(i); */
/* _freqWeightingFilter.at(i)->filter(col); */
/* fltdata.col(i) = col; */
/* } */
/* } // End of if(_filterPrototype) */
std::optional<arma::cx_cube> res = _ps.compute(fltdata);
if (res.has_value()) {
/* DEBUGTRACE_PRINT("Data ready!"); */
_latest_est = std::make_unique<cube>(std::move(res.value()));
}
_ps.compute(fltdata);
return true;
}
@ -53,13 +66,18 @@ void RtAps::reset(const Daq *daq) { // Explicitly say
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx);
_ps.reset();
_latest_est.reset();
}
std::unique_ptr<cube> RtAps::getCurrentValue() {
std::unique_ptr<ccube> RtAps::getCurrentValue() {
/* DEBUGTRACE_ENTER; */
std::scoped_lock<std::mutex> lck(_mtx);
auto est = _ps.get_est();
return std::make_unique<ccube>(est.value_or(ccube()));
return std::move(_latest_est);
/* return std::move(_latest_est); */
/* if (_latest_est) { */
/* return std::make_unique<cube>(cube(*_latest_est)); */
/* } else */
/* return nullptr; */
}

View File

@ -24,8 +24,6 @@ class RtAps : public ThreadedInDataHandler {
std::mutex _mtx;
std::unique_ptr<Filter> _filterPrototype;
std::vector<std::unique_ptr<Filter>> _freqWeightingFilter;
bool _data_ready = false;
std::unique_ptr<cube> _latest_est;
AvPowerSpectra _ps;
@ -40,29 +38,15 @@ public:
*/
RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, const us nfft = 2048,
const Window::WindowType w = Window::WindowType::Hann,
const d overlap_percentage = 50., const d time_constant = -1)
: ThreadedInDataHandler(mgr),
_ps(nfft, w, overlap_percentage, time_constant) {
{
std::scoped_lock<std::mutex> lck(_mtx);
if (freqWeightingFilter != nullptr) {
_filterPrototype = freqWeightingFilter->clone();
}
}
start();
}
~RtAps() {
std::scoped_lock<std::mutex> lck(_mtx);
stop();
}
const d overlap_percentage = 50., const d time_constant = -1);
~RtAps();
/**
* @brief Get the latest estimate of the power spectra
*
* @return Optionally, if available, the latest values
*/
std::unique_ptr<cube> getCurrentValue();
std::unique_ptr<ccube> getCurrentValue();
bool inCallback_threaded(const DaqData &) override final;
void reset(const Daq *) override final;

View File

@ -1,6 +1,6 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_timebuffer.h"
#include "debugtrace.hpp"
#include <algorithm>
#include <cassert>
#include <deque>
@ -35,32 +35,36 @@ public:
}
}
std::optional<dmat> pop(const us nsamples, us keep) {
std::optional<dmat> pop(const us nframes, const us keep) {
DEBUGTRACE_ENTER;
if (keep > nsamples)
throw rte("keep should be <= nsamples");
if (keep >= nframes) {
throw rte("keep should be < nframes");
}
if (nsamples <= n_frames()) {
if (nframes <= n_frames()) {
assert(!_storage.empty());
dmat res(nsamples, _storage.front().n_cols);
dmat res(nframes, _storage.front().n_cols);
for (us i = 0; i < nsamples; i++) {
us j = 0;
for (us i = 0; i < nframes; i++) {
if (i + keep < nframes) {
// Just pop elements and copy over
res.row(i) = _storage.front();
_storage.pop_front();
} else {
if (i + keep >= nsamples) {
// Suppose keep == 0, then we never arrive here
// Suppose keep == 1, then storage[0] is copyied over.
// Suppose keep == 2, then storage[0[ and storage[1] is copyied over.
// Etc.
res.row(i) = _storage[i + keep - nsamples];
} else {
// Just pop elements and copy over
res.row(i) = _storage.front();
_storage.pop_front();
res.row(i) = _storage[j];
j++;
}
}
return res;
@ -83,7 +87,5 @@ std::optional<dmat> TimeBuffer::pop(const us n_rows,const us keep) {
return _imp->pop(n_rows, keep);
}
TimeBuffer::~TimeBuffer() {}
void TimeBuffer::reset() {
_imp->reset();
}
void TimeBuffer::reset() { _imp->reset(); }
void TimeBuffer::push(const dmat &dat) { _imp->push(dat); }

View File

@ -9,6 +9,7 @@ namespace py = pybind11;
template<typename T>
using pyarray = py::array_t<T, py::array::f_style | py::array::forcecast>;
using dpyarray = pyarray<d>;
using cpyarray = pyarray<c>;
/**
@ -127,6 +128,6 @@ arma::Mat<T> NpyToCol(pyarray<T> data) {
if (data.ndim() != 1) {
throw std::runtime_error("Expects a 1D array");
}
return arma::Col<d>(data.mutable_data(0), data.size(), copy);
return arma::Col<T>(data.mutable_data(0), data.size(), copy);
}

View File

@ -12,6 +12,7 @@
using std::cerr;
using std::endl;
namespace py = pybind11;
using rte = std::runtime_error;
/**
* \ingroup pybind
@ -29,11 +30,25 @@ void init_dsp(py::module &m) {
py::class_<Fft> fft(m, "Fft");
fft.def(py::init<us>());
fft.def("fft", py::overload_cast<const vd &>(&Fft::fft));
fft.def("fft", py::overload_cast<const dmat &>(&Fft::fft));
fft.def("fft", [](Fft &f, dpyarray dat) {
if (dat.ndim() == 1) {
return ColToNpy<c>(f.fft(NpyToCol<d, false>(dat)));
} else if (dat.ndim() == 2) {
return MatToNpy<c>(f.fft(NpyToMat<d, false>(dat)));
} else {
throw rte("Invalid dimensions of array");
}
});
fft.def("ifft", [](Fft &f, cpyarray dat) {
if (dat.ndim() == 1) {
return ColToNpy<d>(f.ifft(NpyToCol<c, false>(dat)));
} else if (dat.ndim() == 2) {
return MatToNpy<d>(f.ifft(NpyToMat<c, false>(dat)));
} else {
throw rte("Invalid dimensions of array");
}
});
fft.def("ifft", py::overload_cast<const vc &>(&Fft::ifft));
fft.def("ifft", py::overload_cast<const cmat &>(&Fft::ifft));
fft.def_static("load_fft_wisdom", &Fft::load_fft_wisdom);
fft.def_static("store_fft_wisdom", &Fft::store_fft_wisdom);
@ -89,10 +104,17 @@ void init_dsp(py::module &m) {
py::arg("overlap_percentage") = 50.0, py::arg("time_constant") = -1);
aps.def("compute", [](AvPowerSpectra &aps, dpyarray timedata) {
std::optional<arma::cx_cube> res =
aps.compute(NpyToMat<d, false>(timedata));
std::optional<ccube> res;
{
py::gil_scoped_release release;
res = aps.compute(NpyToMat<d, false>(timedata));
}
return CubeToNpy<c>(res.value_or(cube(0, 0, 0)));
return CubeToNpy<c>(res.value_or(ccube(0, 0, 0)));
});
aps.def("get_est", [](const AvPowerSpectra &ps) {
auto est = ps.get_est();
return CubeToNpy<c>(est.value_or(ccube(0, 0, 0)));
});
py::class_<SLM> slm(m, "cppSLM");

View File

@ -215,10 +215,10 @@ void init_datahandler(py::module &m) {
);
rtaps.def("getCurrentValue", [](RtAps &rt) {
std::unique_ptr<cube> val = rt.getCurrentValue();
std::unique_ptr<ccube> val = rt.getCurrentValue();
if (val) {
return CubeToNpy<c> (*val);
}
return CubeToNpy<c>(cube(1,0,0));
return CubeToNpy<c>(ccube(1,0,0));
});
}

View File

@ -23,7 +23,7 @@ def test_aps1():
sig = np.random.randn(int(fs*tend))
cps = aps.compute(sig)
cps = aps.compute(sig[:,None])
pow1 = np.sum(sig**2)/sig.size
pow2 = np.sum((cps[:,0,0]).real)

View File

@ -27,7 +27,7 @@ out = bq.filter(in_)
from scipy.signal import welch
nfft = 48000
freq, H1 = welch(out[:,0],
freq, H1 = welch(out,
# scaling
fs=fs,nfft=nfft)

View File

@ -20,7 +20,7 @@ def test_forward_fft():
sig = np.random.randn(nfft)
fft_lasp = Fft(nfft)
res_lasp = fft_lasp.fft(sig)[:,0]
res_lasp = fft_lasp.fft(sig)
res_npy = np.fft.rfft(sig)
assert(np.isclose(np.linalg.norm(res_lasp- res_npy), 0))
@ -38,11 +38,11 @@ def test_backward_fft():
Sig = Sigr + 1j*Sigi
fft_lasp = Fft(nfft)
sig_lasp = fft_lasp.ifft(Sig)[:,0]
sig_lasp = fft_lasp.ifft(Sig)
sig_py = np.fft.irfft(Sig)
assert(np.isclose(np.linalg.norm(sig_py- sig_lasp), 0))
if __name__ == '__main__':
test_forward_fft()
# test_backward_fft()()
test_backward_fft()

View File

@ -23,7 +23,7 @@ def test_ps():
# sig = 8*np.cos(omg*t)
cps = ps.compute(sig)
cps = ps.compute(sig[:,None])
pow1 = np.sum(sig**2)/sig.size
pow2 = np.sum((cps[:,0,0]).real)