Compare commits

...

11 Commits

Author SHA1 Message Date
Anne de Jong 15cd62baf8 New smoothing algorithm - minor version bump
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped Details
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 2m6s Details
2024-03-12 11:20:31 +01:00
Anne de Jong ab080910fc Made power correction in smoothing algorithm optional. Window decreases in size symmetrically around the edged of the frequency spectrum
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Blocked by required conditions Details
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Has been cancelled Details
2024-03-12 11:19:52 +01:00
Anne de Jong 6799ee9287 Bugfix new smoother, including ac signal power correction
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 2m1s Details
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped Details
2024-03-12 09:21:07 +01:00
Anne de Jong f9cf059c90 Forgot to actually commit the Cpp files of the smoother
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 8s Details
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped Details
2024-03-11 16:33:28 +01:00
Anne de Jong 3ec15ec645 New smoothing implementation, that runs a bit faster
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after -1m19s Details
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped Details
2024-03-11 16:04:24 +01:00
Anne de Jong 48d262fbf0 Bugfix in sensitivity correction of realtime spectra
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped Details
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 2m8s Details
2024-03-07 09:36:50 +01:00
Anne de Jong 204e431d79 Bugfix on GIL release
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 2m9s Details
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped Details
2024-03-06 22:12:42 +01:00
Anne de Jong bf06402b11 BUGfix of segfault. Very subtle. ThreadedInDataHandler could be deleted, while a task was just pushed to the thread pool. Then, when the task is finally run, the object could be deleted, as the _thread_running flag was not set. Besides this, we made some fixes that makes sure that the handles to a Recording class are stored as a weakref inside of the C++ code. This makes it easier to garbage-collect a recording, even when the IndataHandler is still running. 2024-03-06 21:41:04 +01:00
Anne de Jong 26eef040a4 More locks on signal generator.
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 1m33s Details
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped Details
2024-03-04 15:49:29 +01:00
Anne de Jong b61e836f35 Bumped 1.4.6
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped Details
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 1m38s Details
2024-03-04 14:44:37 +01:00
Anne de Jong 0841dbd73b Create InDataHandler only from the moment startThread() is called. This is safer, and might fix a segfault 2024-03-04 14:44:00 +01:00
18 changed files with 411 additions and 155 deletions

View File

@ -109,6 +109,10 @@ set(CMAKE_C_FLAGS_RELEASE "-O3 -flto -mfpmath=sse -march=x86-64 -mtune=native \
-fdata-sections -ffunction-sections -fomit-frame-pointer -finline-functions")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wno-type-limits -Werror=return-type")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -flto -mfpmath=sse -march=x86-64 -mtune=native \
-fdata-sections -ffunction-sections -fomit-frame-pointer -finline-functions")
set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g -Wall ")
# ############################# End compilation flags
include_directories(/usr/lib/python3.10/site-packages/numpy/core/include)

View File

@ -5,7 +5,7 @@
#include <thread>
InDataHandler::InDataHandler(SmgrHandle mgr, const InCallbackType cb,
const InResetType resetfcn)
const ResetCallbackType resetfcn)
: _mgr(mgr), inCallback(cb), reset(resetfcn)
#if LASP_DEBUG == 1
,
@ -41,8 +41,8 @@ void InDataHandler::stop() {
InDataHandler::~InDataHandler() {
DEBUGTRACE_ENTER;
checkRightThread();
#if LASP_DEBUG == 1
checkRightThread();
if (!stopCalled) {
std::cerr << "************ BUG: Stop function not called while arriving at "
"InDataHandler's destructor. Fix this by calling "

View File

@ -22,7 +22,7 @@ using InCallbackType = std::function<void(const DaqData &)>;
/**
* @brief Function definition for the reset callback.
*/
using InResetType = std::function<void(const Daq *)>;
using ResetCallbackType = std::function<void(const Daq *)>;
class InDataHandler {
@ -38,7 +38,7 @@ protected:
public:
~InDataHandler();
const InCallbackType inCallback;
const InResetType reset;
const ResetCallbackType reset;
/**
* @brief When constructed, the handler is added to the stream manager, which
@ -50,7 +50,7 @@ public:
* changes state.
*/
InDataHandler(SmgrHandle mgr, InCallbackType cb,
InResetType resetfcn);
ResetCallbackType resetfcn);
/**
* @brief Adds the current InDataHandler to the list of handlers in the

View File

@ -16,6 +16,7 @@ set(lasp_dsp_files
lasp_threadedindatahandler.cpp
lasp_ppm.cpp
lasp_clip.cpp
lasp_freqsmooth.cpp
)

View File

@ -0,0 +1,133 @@
// #define DEBUGTRACE_ENABLED
#include "lasp_freqsmooth.h"
#include <cassert>
#include "debugtrace.hpp"
using rte = std::runtime_error;
vd freqSmooth(const vd& freq, const vd& X, const unsigned w,
bool power_correct) {
DEBUGTRACE_ENTER;
if (freq.size() < 2) {
throw rte("Invalid frequency size. Should be > 2");
}
if (freq.size() != X.size()) {
throw rte("Sizes of freq and X do not match");
}
if (freq.size() > std::numeric_limits<long>::max() / 2) {
throw rte("Frequency size limit for smoothing is 2^30");
}
if (w == 0) {
throw rte("Invalid number of octaves");
}
const us Nfreq = freq.size();
// Smoothing width in unit of number of octaves
const d Delta = 1 / d(w);
// Minimum frequency and maximum frequency to smooth on (frequency range that
// is interpolated to a log scale)
d freq_min;
const d freq_max = freq(Nfreq - 1);
const bool firstFreqEqZero = (d_abs(freq(0)) < 1e-15);
// AC-signal power
d ac_pwr;
if (firstFreqEqZero) {
freq_min = freq(1);
if (power_correct) {
ac_pwr = arma::sum(X.subvec(1, Nfreq - 1));
}
} else {
freq_min = freq(0);
if (power_correct) {
ac_pwr = arma::sum(X);
}
}
DEBUGTRACE_PRINT(freq_min);
DEBUGTRACE_PRINT(freq_max);
const vd freq_log =
arma::logspace(d_log10(freq_min), d_log10(freq_max), 10 * Nfreq);
DEBUGTRACE_PRINT("freq_log = ");
const long Nfreq_sm = freq_log.size();
// Interpolate X to logscale
vd X_log;
DEBUGTRACE_PRINT("X_log = :");
arma::interp1(freq, X, freq_log, X_log, "*linear");
// First and last point are not interpolated well, could be minimally out of
// the interpolation range, due to roundoff errors. Armadillo sets these
// points to nan, so we have to manually "interpolate" them.
X_log(Nfreq_sm - 1) = X(X.size() - 1);
if (firstFreqEqZero) {
X_log(0) = X(1);
} else {
X_log(0) = X(0);
}
// Allocate space for smoothed X on log scale
vd Xsm_log(freq_log.size());
const d beta = d_log10(Nfreq_sm) / d_log10(2) / (Nfreq_sm - 1);
// int rounds down
const long mu = int(Delta / d(2) / beta);
DEBUGTRACE_PRINT(mu);
// Long is at least 32 bits. So +/- 2M points length
for (long k = 0; k < Nfreq_sm; k++) {
// const d fcur = freq_log(k);
long idx_start = std::max(k - mu, 0l);
long idx_stop = std::min(k + mu, Nfreq_sm - 1);
// Make window smaller at the sides (close to the end of the array)
if (idx_start == 0 || idx_stop == Nfreq_sm - 1) {
const long mu_edge = std::min(k - idx_start, idx_stop - k);
idx_start = k - mu_edge;
idx_stop = k + mu_edge;
}
assert(idx_stop < Nfreq_sm);
assert(idx_start < Nfreq_sm);
DEBUGTRACE_PRINT(idx_start)
DEBUGTRACE_PRINT(idx_stop);
Xsm_log(k) = arma::mean(X_log.subvec(idx_start, idx_stop));
}
DEBUGTRACE_PRINT("Xsm_log:");
// std::cerr << Xsm_log << std::endl;
// Back-interpolate to a linear scale, and be wary of nans at the start end
// and range. Also interpolates power
vd Xsm(Nfreq);
if (firstFreqEqZero) {
vd Xsm_gt0;
arma::interp1(freq_log, Xsm_log, freq.subvec(1, Nfreq - 1), Xsm_gt0,
"*linear");
Xsm(0) = X(0);
Xsm.subvec(1, Nfreq - 1) = Xsm_gt0;
Xsm(1) = Xsm_log(1);
Xsm(Nfreq - 1) = Xsm_log(Nfreq_sm - 1);
// Final step: power-correct smoothed spectrum
if (power_correct) {
d new_acpwr = arma::sum(Xsm.subvec(1, Nfreq - 1));
Xsm.subvec(1, Nfreq - 1) *= ac_pwr / new_acpwr;
}
} else {
arma::interp1(freq_log, Xsm_log, freq, Xsm, "*linear");
Xsm(0) = X(0);
Xsm(Nfreq - 1) = Xsm_log(Nfreq_sm - 1);
// Final step: power-correct smoothed spectrum
if (power_correct) {
d new_acpwr = arma::sum(Xsm);
Xsm *= ac_pwr / new_acpwr;
}
}
return Xsm;
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <memory>
#include <vector>
#include "lasp_mathtypes.h"
#include "lasp_types.h"
/**
* \addtogroup dsp
* @{
*/
/**
* @brief Apply frequency domain smoothing to a Frequency domain (single
* sided)signal power spectrum
*
* @param freq Frequency range
* @param X Signal pwr
* @param w Parameter determining the smoothing with. 1 = 1/1 octave, 3 = 1/3th
* octave and so on
* @param power_correct Apply a correction to the whole spectrum to make the
* signal power equal to the unsmoothed signal power.
* @return vd Smoothed spectrum
*/
vd freqSmooth(const vd& freq, const vd& X, const unsigned w,
bool power_correct = false);
/** @} */

View File

@ -37,7 +37,7 @@ void RtAps::inCallback(const DaqData &data) {
cerr << "**** Error: sensitivity size does not match! *****" << endl;
return;
}
fltdata.each_row() %= _sens.as_row();
fltdata.each_row() /= _sens.as_row();
if (_filterPrototype) {

View File

@ -10,12 +10,12 @@ using rte = std::runtime_error;
inline d level_amp(d level_dB) { return pow(10, level_dB / 20); }
using mutexlock = std::scoped_lock<std::mutex>;
using slock = std::scoped_lock<std::recursive_mutex>;
vd Siggen::genSignal(const us nframes) {
DEBUGTRACE_ENTER;
mutexlock lck(_mtx);
slock lck(_mtx);
DEBUGTRACE_PRINT(nframes);
vd signal(nframes, arma::fill::value(_dc_offset));
@ -52,7 +52,7 @@ vd Siggen::genSignal(const us nframes) {
return signal;
}
void Siggen::setInterruptPeriod(const d newPeriod) {
mutexlock lck(_mtx);
slock lck(_mtx);
if (newPeriod == 0) {
throw rte("Interruption period cannot be 0");
}
@ -65,7 +65,7 @@ void Siggen::setInterruptPeriod(const d newPeriod) {
void Siggen::setFilter(const std::string &name,
std::shared_ptr<Filter> filter) {
DEBUGTRACE_ENTER;
mutexlock lck(_mtx);
slock lck(_mtx);
if (filter) {
_filters[name] = filter;
} else if (_filters.find(name) != _filters.end()) {
@ -74,17 +74,17 @@ void Siggen::setFilter(const std::string &name,
}
void Siggen::setDCOffset(const d offset) {
DEBUGTRACE_ENTER;
mutexlock lck(_mtx);
slock lck(_mtx);
_dc_offset = offset;
}
void Siggen::setLevel(const d level, bool dB) {
DEBUGTRACE_ENTER;
mutexlock lck(_mtx);
slock lck(_mtx);
_level_linear = dB ? level_amp(level) : level;
}
void Siggen::reset(const d newFs) {
DEBUGTRACE_ENTER;
mutexlock lck(_mtx);
slock lck(_mtx);
_fs = newFs;
for (auto &f : _filters) {
assert(f.second);

View File

@ -27,7 +27,7 @@ private:
bool _muted = false;
protected:
std::mutex _mtx;
mutable std::recursive_mutex _mtx;
d _fs = 0;
/**
* @brief Interuption of period the signal. If set, the signal will be

View File

@ -7,11 +7,14 @@
//////////////////////////////////////////////////////////////////////
/* #define DEBUGTRACE_ENABLED */
#include "lasp_siggen_impl.h"
#include "debugtrace.hpp"
#include "lasp_mathtypes.h"
#include <cassert>
#include "debugtrace.hpp"
#include "lasp_mathtypes.h"
using rte = std::runtime_error;
using slock = std::scoped_lock<std::recursive_mutex>;
DEBUGTRACE_VARIABLES;
@ -30,6 +33,7 @@ Sine::Sine(const d freq) : omg(2 * arma::datum::pi * freq) { DEBUGTRACE_ENTER; }
vd Sine::genSignalUnscaled(const us nframes) {
/* DEBUGTRACE_ENTER; */
slock lck(_mtx);
const d pi = arma::datum::pi;
vd phase_vec =
arma::linspace(phase, phase + omg * (nframes - 1) / _fs, nframes);
@ -41,8 +45,8 @@ vd Sine::genSignalUnscaled(const us nframes) {
}
vd Periodic::genSignalUnscaled(const us nframes) {
vd res(nframes);
slock lck(_mtx);
if (_signal.size() == 0) {
throw rte("No signal defined while calling");
}
@ -74,15 +78,15 @@ Sweep::Sweep(const d fl, const d fu, const d Ts, const d Tq, const us flags)
}
void Sweep::resetImpl() {
DEBUGTRACE_ENTER;
slock lck(_mtx);
_cur_pos = 0;
bool forward_sweep = flags & ForwardSweep;
bool backward_sweep = flags & BackwardSweep;
const d Dt = 1 / _fs; // Deltat
const d Dt = 1 / _fs; // Deltat
// Estimate N, the number of samples in the sweep part (non-quiescent part):
const us Ns = (us)(Ts * _fs);
@ -166,7 +170,6 @@ void Sweep::resetImpl() {
/* dVARTRACE(15, phase); */
}
} else if (flags & LogSweep) {
DEBUGTRACE_PRINT("Log sweep");
if (forward_sweep || backward_sweep) {
/* Forward or backward sweep */
@ -194,7 +197,6 @@ void Sweep::resetImpl() {
phase += 2 * number_pi * Dt * fn;
}
} else {
DEBUGTRACE_PRINT("Continuous sweep");
const us Nf = Ns / 2;
@ -249,17 +251,15 @@ void Sweep::resetImpl() {
/* dbgassert(fn >= 0, "BUG"); */
phase += 2 * number_pi * Dt * fn;
while (phase > 2 * number_pi)
phase -= 2 * number_pi;
while (phase > 2 * number_pi) phase -= 2 * number_pi;
/* dVARTRACE(17, phase); */
}
/* This should be a very small number!! */
DEBUGTRACE_PRINT(phase);
}
} // End of log sweep
} // End of log sweep
else {
// Either log or linear sweep had to be given as flags.
assert(false);
}
}

View File

@ -7,7 +7,7 @@
*/
/**
* \addtogroup siggen
* \addtogroup siggen
* @{
*/
@ -18,8 +18,8 @@ class Noise : public Siggen {
d level_linear;
virtual vd genSignalUnscaled(const us nframes) override;
void resetImpl() override;
public:
public:
/**
* @brief Constructs a noise generator. If no filter is used, the output will
* be white noise. By default, the output will be standard deviation = 1
@ -28,7 +28,6 @@ class Noise : public Siggen {
*/
Noise();
~Noise() = default;
};
/**
@ -37,13 +36,12 @@ class Noise : public Siggen {
class Sine : public Siggen {
d phase = 0;
d omg;
protected:
void resetImpl() override final { phase=0; }
protected:
void resetImpl() override final { phase = 0; }
virtual vd genSignalUnscaled(const us nframes) override final;
public:
public:
/**
* @brief Create a sine wave generator
*
@ -51,7 +49,7 @@ class Sine : public Siggen {
*/
Sine(const d freq_Hz);
~Sine() = default;
void setFreq(const d newFreq) { omg = 2*arma::datum::pi*newFreq; } ;
void setFreq(const d newFreq) { omg = 2 * arma::datum::pi * newFreq; };
};
/**
@ -86,8 +84,7 @@ class Sweep : public Periodic {
void resetImpl() override;
public:
public:
static constexpr int ForwardSweep = 1 << 0;
static constexpr int BackwardSweep = 1 << 1;
static constexpr int LinearSweep = 1 << 2;
@ -103,11 +100,11 @@ class Sweep : public Periodic {
* avoid temporal aliasing in case of measuring impulse responses.
* @param[in] sweep_flags: Sweep period [s]
*/
Sweep(const d fl, const d fu, const d Ts, const d Tq,
const us sweep_flags);
Sweep(const d fl, const d fu, const d Ts, const d Tq, const us sweep_flags);
~Sweep() = default;
};
/** @} */
/** @} */

View File

@ -1,13 +1,15 @@
/* #define DEBUGTRACE_ENABLED */
#include "lasp_threadedindatahandler.h"
#include "debugtrace.hpp"
#include "lasp_daqdata.h"
#include "lasp_thread.h"
#include <future>
#include <optional>
#include <queue>
#include <thread>
#include "debugtrace.hpp"
#include "lasp_daqdata.h"
#include "lasp_thread.h"
using namespace std::literals::chrono_literals;
using lck = std::scoped_lock<std::mutex>;
using rte = std::runtime_error;
@ -20,26 +22,26 @@ class SafeQueue {
std::mutex _mtx;
std::atomic<uint32_t> _contents{0};
public:
public:
void push(const DaqData &d) {
DEBUGTRACE_ENTER;
lck lock(_mtx);
_queue.push(d);
_contents++;
assert(_contents == _queue.size());
assert(_contents == _queue.size());
}
DaqData pop() {
DEBUGTRACE_ENTER;
if (empty()) {
throw rte("BUG: Pop on empty queue");
}
}
lck lock(_mtx);
/* DaqData d(std::move(_queue.front())); */
DaqData d(_queue.front());
_queue.pop();
_contents--;
assert(_contents == _queue.size());
assert(_contents == _queue.size());
return d;
}
/**
@ -52,56 +54,70 @@ public:
};
ThreadedInDataHandlerBase::ThreadedInDataHandlerBase(SmgrHandle mgr,
InCallbackType cb,
InResetType reset)
: _indatahandler(
mgr,
std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this,
_1),
reset),
_queue(std::make_unique<SafeQueue>()), inCallback(cb) {
InCallbackType cb,
ResetCallbackType reset)
: _queue(std::make_unique<SafeQueue>()),
inCallback(cb),
resetCallback(reset),
_smgr(mgr) {
DEBUGTRACE_ENTER;
}
void ThreadedInDataHandlerBase::startThread() {
DEBUGTRACE_ENTER;
if (_indatahandler) {
throw rte("BUG: ThreadedIndataHandler already started");
}
SmgrHandle smgr = _smgr.lock();
if (!smgr) {
cerr << "Stream manager destructed" << endl;
return;
}
_indatahandler = std::make_unique<InDataHandler>(
smgr,
std::bind(&ThreadedInDataHandlerBase::_inCallbackFromInDataHandler, this,
_1),
resetCallback);
_thread_can_safely_run = true;
_indatahandler.start();
_indatahandler->start();
}
void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
const DaqData &daqdata) {
DEBUGTRACE_ENTER;
std::scoped_lock lck(_mtx);
// Early return in case object is under DESTRUCTION
if (!_thread_can_safely_run)
return;
if (!_thread_can_safely_run) return;
_queue->push(daqdata);
if (!_thread_running) {
DEBUGTRACE_PRINT("Pushing new thread in pool");
_thread_running = true;
_pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this);
}
}
void ThreadedInDataHandlerBase::stopThread() {
DEBUGTRACE_ENTER;
// Make sure inCallback is no longer called
_thread_can_safely_run = false;
_indatahandler.stop();
if (!_indatahandler) {
throw rte("BUG: ThreadedIndataHandler not running");
}
std::scoped_lock lck(_mtx);
// Make sure no new data arrives
_indatahandler->stop();
// Stop the existing thread
_thread_can_safely_run = false;
// Then wait in steps for the thread to stop running.
while (_thread_running) {
std::this_thread::sleep_for(10us);
}
// Kill the handler
_indatahandler.reset();
}
ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
DEBUGTRACE_ENTER;
if (_thread_can_safely_run) {
stopThread();
@ -113,12 +129,9 @@ ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
}
void ThreadedInDataHandlerBase::threadFcn() {
DEBUGTRACE_ENTER;
_thread_running = true;
while (!_queue->empty() && _thread_can_safely_run) {
// Call inCallback_threaded
inCallback(_queue->pop());
}

View File

@ -29,21 +29,27 @@ class ThreadedInDataHandlerBase {
* @brief The queue used to push elements to the handling thread.
*/
InDataHandler _indatahandler;
std::unique_ptr<SafeQueue> _queue;
mutable std::recursive_mutex _mtx;
std::atomic<bool> _thread_running{false};
std::atomic<bool> _thread_can_safely_run{false};
GlobalThreadPool _pool;
/**
* @brief Function pointer that is called when new DaqData arrives.
*/
const InCallbackType inCallback;
/**
* @brief Function pointer that is called when reset() is called.
*/
const ResetCallbackType resetCallback;
std::weak_ptr<StreamMgr> _smgr;
std::unique_ptr<InDataHandler> _indatahandler;
std::atomic<bool> _thread_running{false};
std::atomic<bool> _thread_can_safely_run{false};
GlobalThreadPool _pool;
void threadFcn();
@ -58,7 +64,7 @@ class ThreadedInDataHandlerBase {
void _inCallbackFromInDataHandler(const DaqData &daqdata);
public:
ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, InResetType reset);
ThreadedInDataHandlerBase(SmgrHandle mgr, InCallbackType cb, ResetCallbackType reset);
~ThreadedInDataHandlerBase();
/**
* @brief This method should be called from the derived class' constructor,

View File

@ -1,13 +1,16 @@
#include <pybind11/pybind11.h>
#include <iostream>
#include "arma_npy.h"
#include "lasp_avpowerspectra.h"
#include "lasp_biquadbank.h"
#include "lasp_fft.h"
#include "lasp_filter.h"
#include "lasp_freqsmooth.h"
#include "lasp_slm.h"
#include "lasp_streammgr.h"
#include "lasp_window.h"
#include <iostream>
#include <pybind11/pybind11.h>
using std::cerr;
using std::endl;
@ -27,7 +30,6 @@ using rte = std::runtime_error;
*/
void init_dsp(py::module &m) {
py::class_<Fft> fft(m, "Fft");
fft.def(py::init<us>());
fft.def("fft", [](Fft &f, dpyarray dat) {
@ -114,9 +116,10 @@ void init_dsp(py::module &m) {
aps.def("compute", [](AvPowerSpectra &aps, dpyarray timedata) {
std::optional<ccube> res;
dmat timedata_mat = NpyToMat<d, false>(timedata);
{
py::gil_scoped_release release;
res = aps.compute(NpyToMat<d, false>(timedata));
res = aps.compute(timedata_mat);
}
return CubeToNpy<c>(res.value_or(ccube(0, 0, 0)));
@ -151,5 +154,12 @@ void init_dsp(py::module &m) {
slm.def("Lmax", [](const SLM &slm) { return ColToNpy<d>(slm.Lmax()); });
slm.def("Lpeak", [](const SLM &slm) { return ColToNpy<d>(slm.Lpeak()); });
slm.def_static("suggestedDownSamplingFac", &SLM::suggestedDownSamplingFac);
// Frequency smoother
m.def("freqSmooth", [](dpyarray freq, dpyarray X, unsigned w) {
vd freqa = NpyToCol<d, false>(freq);
vd Xa = NpyToCol<d, false>(X);
return ColToNpy(freqSmooth(freqa, Xa, w));
});
}
/** @} */

View File

@ -104,9 +104,10 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
/**
* @brief The callback functions that is called.
*/
py::function cb, reset_callback;
std::unique_ptr<py::function> cb, reset_callback;
bool _done{false};
public:
public:
/**
* @brief Initialize PyIndataHandler
*
@ -117,8 +118,9 @@ public:
* is called, when a stream stops, this pointer / handle will dangle.
*/
PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback)
: ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) {
: ThreadedInDataHandler(mgr),
cb(std::make_unique<py::function>(cb)),
reset_callback(std::make_unique<py::function>(reset_callback)) {
DEBUGTRACE_ENTER;
/// Start should be called externally, as at constructor time no virtual
/// functions should be called.
@ -139,12 +141,13 @@ public:
*/
void reset(const Daq *daq) {
DEBUGTRACE_ENTER;
if (_done) return;
try {
py::gil_scoped_acquire acquire;
if (daq) {
reset_callback(daq);
(*reset_callback)(daq);
} else {
reset_callback(py::none());
(*reset_callback)(py::none());
}
} catch (py::error_already_set &e) {
cerr << "*************** Error calling reset callback!\n";
@ -169,31 +172,40 @@ public:
/* DEBUGTRACE_ENTER; */
using DataType = DataTypeDescriptor::DataType;
if (_done) return;
try {
py::gil_scoped_acquire acquire;
py::object bool_val;
switch (d.dtype) {
case (DataType::dtype_int8): {
bool_val = cb(getPyArrayNoCpy<int8_t>(d));
} break;
case (DataType::dtype_int16): {
bool_val = cb(getPyArrayNoCpy<int16_t>(d));
} break;
case (DataType::dtype_int32): {
bool_val = cb(getPyArrayNoCpy<int32_t>(d));
} break;
case (DataType::dtype_fl32): {
bool_val = cb(getPyArrayNoCpy<float>(d));
} break;
case (DataType::dtype_fl64): {
bool_val = cb(getPyArrayNoCpy<double>(d));
} break;
default:
throw std::runtime_error("BUG");
} // End of switch
case (DataType::dtype_int8): {
bool_val = (*cb)(getPyArrayNoCpy<int8_t>(d));
} break;
case (DataType::dtype_int16): {
bool_val = (*cb)(getPyArrayNoCpy<int16_t>(d));
} break;
case (DataType::dtype_int32): {
bool_val = (*cb)(getPyArrayNoCpy<int32_t>(d));
} break;
case (DataType::dtype_fl32): {
bool_val = (*cb)(getPyArrayNoCpy<float>(d));
} break;
case (DataType::dtype_fl64): {
bool_val = (*cb)(getPyArrayNoCpy<double>(d));
} break;
default:
throw std::runtime_error("BUG");
} // End of switch
bool res = bool_val.cast<bool>();
if (res == false) {
DEBUGTRACE_PRINT("Setting callbacks to None")
_done = true;
// cb = py::function(py::none());
// reset_callback = py::function(py::none());
cb.reset();
reset_callback.reset();
}
} catch (py::error_already_set &e) {
cerr << "ERROR: Python raised exception from callback function: ";
cerr << e.what() << endl;

View File

@ -5,7 +5,7 @@ requires-python = ">=3.10"
description = "Library for Acoustic Signal Processing"
license = { "file" = "LICENSE" }
authors = [{ "name" = "J.A. de Jong", "email" = "j.a.dejong@ascee.nl" }]
version = "1.4.5"
version = "1.5.0"
keywords = ["DSP", "DAQ", "Signal processing"]

View File

@ -11,6 +11,11 @@ from enum import Enum, auto, unique
from .lasp_cpp import InDataHandler, StreamMgr
from .lasp_version import LASP_VERSION_MAJOR, LASP_VERSION_MINOR
import uuid
import logging
logger = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)
logger.setLevel(logging.INFO)
@dataclasses.dataclass
@ -60,10 +65,14 @@ class Recording:
startDelay: Optional delay added before the recording is *actually*
started in [s].
"""
logger.debug("__init__()")
ext = ".h5"
if ext not in fn:
fn += ext
if os.path.exists(fn):
raise RuntimeError("Recording file name already exists / is in use")
self._smgr = streammgr
self._metadata = None
@ -91,7 +100,7 @@ class Recording:
self._stop = Atomic(False)
# Mutex, on who is working with the H5py data and the class settings
self._rec_mutex = threading.Lock()
self._rec_mutex = threading.RLock()
self._progressCallback = progressCallback
@ -100,7 +109,7 @@ class Recording:
self._h5file = h5py.File(self._fn, "w", "stdio")
self._h5file.flush()
except Exception as e:
logging.error(f"Error creating measurement file {e}")
logger.error(f"Error creating measurement file {e}")
raise
# This flag is used to delete the file on finish(), and can be used
@ -115,19 +124,19 @@ class Recording:
# Audio dataset
self._ad = None
logging.debug("Starting record....")
logger.debug("Starting record....")
self._indataHandler = InDataHandler(
streammgr, self.inCallback, self.resetCallback
)
if wait:
logging.debug("Stop recording with CTRL-C")
logger.debug("Stop recording with CTRL-C")
try:
while not self._stop():
time.sleep(0.01)
except KeyboardInterrupt:
logging.debug("Keyboard interrupt on record")
logger.debug("Keyboard interrupt on record")
finally:
self.finish()
@ -138,6 +147,7 @@ class Recording:
"""
Function called with initial stream data.
"""
logger.debug(f"resetCallback({daq})")
with self._rec_mutex:
in_ch = daq.enabledInChannels()
blocksize = daq.framesPerBlock()
@ -180,10 +190,11 @@ class Recording:
When returning False, it will stop the stream.
"""
logger.debug(f"inCallback({adata})")
if self._stop():
logging.debug("Stop flag set, early return in inCallback")
logger.debug("Stop flag set, early return in inCallback")
# Stop flag is raised. We do not add any data anymore.
return True
return False
with self._rec_mutex:
@ -191,10 +202,7 @@ class Recording:
match self._recState:
case RecordingState.Waiting:
if (
self._allBlocks * self._blocksize / self._fs
> self._startDelay
):
if self._allBlocks * self._blocksize / self._fs > self._startDelay:
self._recState = RecordingState.Recording
case RecordingState.Recording:
@ -208,7 +216,10 @@ class Recording:
recstatus = RecordStatus(curT=self.recordedTime, done=False)
if self._requiredRecordingLength is not None and self.recordedTime >= self._requiredRecordingLength:
if (
self._requiredRecordingLength is not None
and self.recordedTime >= self._requiredRecordingLength
):
self._recState = RecordingState.AllDataStored
self._stop <<= True
recstatus.done = True
@ -227,9 +238,10 @@ class Recording:
@property
def recordedTime(self):
"""Return recorded time (not rounded) as float"""
if self._ad is None:
return 0.0
return self._recordedBlocks * self._blocksize / self._fs
with self._rec_mutex:
if self._ad is None:
return 0.0
return self._recordedBlocks * self._blocksize / self._fs
def __addFirstFramesToFile(self, adata):
"""
@ -240,29 +252,29 @@ class Recording:
adata: Numpy array with data from DAQ
"""
with self._rec_mutex:
# The array data type cannot
# datatype = daq.dataType()
dtype = np.dtype(adata.dtype)
# The array data type cannot
# datatype = daq.dataType()
dtype = np.dtype(adata.dtype)
assert self._ad is None
assert self._ad is None
self._ad = self._h5file.create_dataset(
"audio",
(1, self._blocksize, self._nchannels),
dtype=dtype,
maxshape=(
None, # This means, we can add blocks
# indefinitely
self._blocksize,
self._nchannels,
),
compression="gzip",
)
self._ad[0, :, :] = adata
self._ad = self._h5file.create_dataset(
"audio",
(1, self._blocksize, self._nchannels),
dtype=dtype,
maxshape=(
None, # This means, we can add blocks
# indefinitely
self._blocksize,
self._nchannels,
),
compression="gzip",
)
self._ad[0, :, :] = adata
self._h5file.flush()
self._deleteFile = False
self._h5file.flush()
self._deleteFile = False
def setDelete(self, val: bool):
"""
@ -279,36 +291,35 @@ class Recording:
remove the queue from the stream, etc.
"""
logging.debug("Recording::finish()")
logger.debug("Recording::finish()")
self._stop <<= True
with self._rec_mutex:
if self._recState == RecordingState.Finished:
raise RuntimeError('Recording has already finished')
self._h5file.flush()
if self._recState == RecordingState.Finished:
raise RuntimeError("Recording has already finished")
# Remove indata handler, which also should remove callback function
# from StreamMgr. This, however does not have to happen
# instantaneously. For which we have to implement extra mutex
# guards in this class
del self._indataHandler
self._indataHandler = None
self._h5file.flush()
# Remove handle to dataset otherwise the h5 file is not closed
# properly.
del self._ad
self._ad = None
try:
# Close the recording file
self._h5file.close()
del self._h5file
except Exception as e:
logging.error(f"Error closing file: {e}")
logger.error(f"Error closing file: {e}")
logging.debug("Recording ended")
logger.debug("Recording ended")
if self._deleteFile:
self.__deleteFile()
self._recState = RecordingState.Finished
@ -320,16 +331,17 @@ class Recording:
try:
os.remove(self._fn)
except Exception as e:
logging.error(f"Error deleting file: {self._fn}: {str(e)}")
logger.error(f"Error deleting file: {self._fn}: {str(e)}")
def __addTimeDataToFile(self, indata):
"""
Called by handleQueue() and adds new time data to the storage file.
"""
with self._rec_mutex:
ablockno = self._recordedBlocks
ablockno = self._recordedBlocks
# Add the data to the file, and resize the audio data blocks
self._ad.resize(ablockno + 1, axis=0)
self._ad[ablockno, :, :] = indata
self._h5file.flush()
# Add the data to the file, and resize the audio data blocks
self._ad.resize(ablockno + 1, axis=0)
self._ad[ablockno, :, :] = indata
self._h5file.flush()

View File

@ -20,6 +20,7 @@ from enum import Enum, unique
import copy
import numpy as np
from numpy import log2, pi, sin
from ..lasp_cpp import freqSmooth
@unique
@ -152,7 +153,7 @@ def smoothCalcMatrix(freq, sw: SmoothingWidth):
return Q
def smoothSpectralData(freq, M, sw: SmoothingWidth,
def smoothSpectralData_old(freq, M, sw: SmoothingWidth,
st: SmoothingType = SmoothingType.levels):
"""
Apply fractional octave smoothing to data in the frequency domain.
@ -220,6 +221,45 @@ def smoothSpectralData(freq, M, sw: SmoothingWidth,
return Psm
def smoothSpectralData(freq, M, sw: SmoothingWidth,
st: SmoothingType = SmoothingType.levels):
"""
Apply fractional octave smoothing to data in the frequency domain.
Args:
freq: array of frequencies of data points [Hz] - equally spaced
M: array of data, either power or dB
the smoothing type `st`, the smoothing is applied.
sw: smoothing width
st: smoothing type = data type of input data
Returns:
freq : array frequencies of data points [Hz]
Msm : float smoothed magnitude of data points
"""
# Safety
if st == SmoothingType.ps:
assert np.min(M) >= 0, 'Power spectrum values cannot be negative'
if st == SmoothingType.levels and isinstance(M.dtype, complex):
raise RuntimeError('Decibel input should be real-valued')
# Convert to power
if st == SmoothingType.levels:
P = 10**(M/10)
elif st == SmoothingType.ps:
P = M
else:
raise RuntimeError(f"Incorrect SmoothingType: {st}")
Psm = freqSmooth(freq, P, sw.value[0])
# Convert to original format
if st == SmoothingType.levels:
Psm = 10*np.log10(Psm)
return Psm
# %% Test
if __name__ == "__main__":