Bugfix: reset() was called after inCallback() when adding new handler to StreamMgr. Bugfix: start() was doubly called for RtAPS. Once from Python and once from C++ in constructor. Renamed some scoped_lock to Lck. Added some comments

This commit is contained in:
Anne de Jong 2022-10-16 21:26:06 +02:00
parent e2aa149030
commit bebd270b44
11 changed files with 119 additions and 108 deletions

View File

@ -336,12 +336,15 @@ void StreamMgr::addInDataHandler(InDataHandler &handler) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
checkRightThread(); checkRightThread();
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx); std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
_inDataHandlers.push_back(&handler);
if (_inputStream) { if (_inputStream) {
handler.reset(_inputStream.get()); handler.reset(_inputStream.get());
} else { } else {
handler.reset(nullptr); handler.reset(nullptr);
} }
if(std::find(_inDataHandlers.cbegin(),_inDataHandlers.cend(), &handler) != _inDataHandlers.cend()) {
throw std::runtime_error("Error: handler already added. Probably start() is called more than once on a handler object");
}
_inDataHandlers.push_back(&handler);
} }
void StreamMgr::removeInDataHandler(InDataHandler &handler) { void StreamMgr::removeInDataHandler(InDataHandler &handler) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;

View File

@ -1,7 +1,7 @@
/* #define DEBUGTRACE_ENABLED */ /* #define DEBUGTRACE_ENABLED */
#include "lasp_avpowerspectra.h" #include "lasp_avpowerspectra.h"
#include "lasp_mathtypes.h"
#include "debugtrace.hpp" #include "debugtrace.hpp"
#include "lasp_mathtypes.h"
#include <cmath> #include <cmath>
#include <optional> #include <optional>
@ -85,7 +85,7 @@ void AvPowerSpectra::reset() {
} }
std::optional<ccube> AvPowerSpectra::compute(const dmat &timedata) { ccube AvPowerSpectra::compute(const dmat &timedata) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
DEBUGTRACE_PRINT(timedata.n_rows); DEBUGTRACE_PRINT(timedata.n_rows);
@ -93,42 +93,44 @@ std::optional<ccube> AvPowerSpectra::compute(const dmat &timedata) {
_timeBuf.push(timedata); _timeBuf.push(timedata);
us i = 0; bool run_once = false;
while (auto samples = _timeBuf.pop(_ps.nfft, _overlap_keep)) { while (_timeBuf.size() >= _ps.nfft) {
DEBUGTRACE_PRINT((int)_mode); DEBUGTRACE_PRINT((int)_mode);
dmat samples = _timeBuf.pop(_ps.nfft, _overlap_keep);
switch (_mode) { switch (_mode) {
case (Mode::Spectrogram): { case (Mode::Spectrogram):
DEBUGTRACE_PRINT("Spectrogram mode"); DEBUGTRACE_PRINT("Spectrogram mode");
_est = _ps.compute(samples.value()); _est = _ps.compute(samples);
} break; break;
case (Mode::Averaging): { case (Mode::Averaging):
DEBUGTRACE_PRINT("Averaging mode"); DEBUGTRACE_PRINT("Averaging mode");
n_averages++; n_averages++;
if (n_averages == 1) { if (n_averages == 1) {
_est = _ps.compute(samples.value()); _est = _ps.compute(samples);
} else { } else {
_est = (static_cast<d>(n_averages - 1) / n_averages) * _est + _est = (static_cast<d>(n_averages - 1) / n_averages) * _est +
_ps.compute(samples.value()) / n_averages; _ps.compute(samples) / n_averages;
} }
} break; break;
case (Mode::Leaking): { case (Mode::Leaking):
DEBUGTRACE_PRINT("Leaking mode"); DEBUGTRACE_PRINT("Leaking mode");
if (arma::size(_est) == arma::size(0, 0, 0)) { if (arma::size(_est) == arma::size(0, 0, 0)) {
_est = _ps.compute(samples.value()); _est = _ps.compute(samples);
} else { } else {
_est = _alpha * _est + (1 - _alpha) * _ps.compute(samples.value()); _est = _alpha * _est + (1 - _alpha) * _ps.compute(samples);
} }
} break; break;
} // end switch mode } // end switch mode
i++; run_once = true;
} }
if (i > 0) {
return std::make_optional(_est); /// Return a copy of current estimator in case we have done one update.
} /// Othewise, we return an empty ccube.
return std::nullopt; return run_once ? _est : ccube();
} }
std::optional<ccube> AvPowerSpectra::get_est() const { ccube AvPowerSpectra::get_est() const {
if (_est.n_cols > 0) return _est;
return _est;
return std::nullopt;
} }

View File

@ -142,20 +142,21 @@ public:
* *
* @param timedata * @param timedata
* *
* @return Optionally, a copy of the latest estimate of the power spectra. An * @return a copy of the latest estimate of the power spectra. an
* update is only given if the amount of new time data is enough to compute a * update is only given if the amount of new time data is enough to compute a
* new estimate. It can be checked by operator bool(). * new estimate. if no new estimate is available, it returns an empty ccube.
* * Note that the latest available estimate can be obtained using get_est().
*/ * */
std::optional<ccube> compute(const dmat &timedata); ccube compute(const dmat &timedata);
/** /**
* @brief Returns the latest estimate of cps (cross-power spectra. * @brief Returns the latest estimate of cps (cross-power spectra.
* *
* @return Pointer (reference, not owning!) to spectral estimate date, or * @return a copy of the latest estimate of the power spectra. an
* nullptr, in case nothing could yet be computed. * update is only given if the amount of new time data is enough to compute a
*/ * new estimate. If no estimate is available, it returns an empty ccube.
std::optional<ccube> get_est() const; * */
ccube get_est() const;
/** /**
* @brief The overlap is rounded to a certain amount of time samples. This * @brief The overlap is rounded to a certain amount of time samples. This

View File

@ -18,7 +18,7 @@ PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps)
bool PPMHandler::inCallback_threaded(const DaqData &d) { bool PPMHandler::inCallback_threaded(const DaqData &d) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx); Lck lck(_mtx);
dmat data = d.toFloat(); dmat data = d.toFloat();
/* data.print(); */ /* data.print(); */
@ -73,7 +73,7 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) {
std::tuple<vd, arma::uvec> PPMHandler::getCurrentValue() const { std::tuple<vd, arma::uvec> PPMHandler::getCurrentValue() const {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx); Lck lck(_mtx);
arma::uvec clips(_clip_time.size(), arma::fill::zeros); arma::uvec clips(_clip_time.size(), arma::fill::zeros);
clips.elem(arma::find(_clip_time >= 0)).fill(1); clips.elem(arma::find(_clip_time >= 0)).fill(1);
@ -83,13 +83,13 @@ std::tuple<vd, arma::uvec> PPMHandler::getCurrentValue() const {
void PPMHandler::reset(const Daq *daq) { void PPMHandler::reset(const Daq *daq) {
/* DEBUGTRACE_ENTER; */ DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx); Lck lck(_mtx);
if (daq) { if (daq) {
_cur_max.fill(1e-80); _cur_max.fill(1e-80);
;
_clip_time.fill(-1); _clip_time.fill(-1);
const d fs = daq->samplerate(); const d fs = daq->samplerate();
@ -103,6 +103,6 @@ void PPMHandler::reset(const Daq *daq) {
PPMHandler::~PPMHandler() { PPMHandler::~PPMHandler() {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx); Lck lck(_mtx);
stop(); stop();
} }

View File

@ -5,6 +5,7 @@
using std::cerr; using std::cerr;
using std::endl; using std::endl;
using Lck = std::scoped_lock<std::mutex>;
RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter,
const us nfft, const us nfft,
@ -17,17 +18,16 @@ RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter,
_filterPrototype = freqWeightingFilter->clone(); _filterPrototype = freqWeightingFilter->clone();
} }
start();
} }
RtAps::~RtAps() { RtAps::~RtAps() {
std::scoped_lock<std::mutex> lck(_mtx); Lck lck(_ps_mtx);
stop(); stop();
} }
bool RtAps::inCallback_threaded(const DaqData &data) { bool RtAps::inCallback_threaded(const DaqData &data) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx);
dmat fltdata = data.toFloat(); dmat fltdata = data.toFloat();
const us nchannels = fltdata.n_cols; const us nchannels = fltdata.n_cols;
@ -53,6 +53,7 @@ bool RtAps::inCallback_threaded(const DaqData &data) {
} }
} // End of if(_filterPrototype) } // End of if(_filterPrototype)
Lck lck(_ps_mtx);
_ps.compute(fltdata); _ps.compute(fltdata);
return true; return true;
@ -63,15 +64,14 @@ void RtAps::reset(const Daq *daq) { // Explicitly say
// not used. // not used.
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mtx); Lck lck(_ps_mtx);
_ps.reset(); _ps.reset();
} }
std::unique_ptr<ccube> RtAps::getCurrentValue() { ccube RtAps::getCurrentValue() const {
/* DEBUGTRACE_ENTER; */ /* DEBUGTRACE_ENTER; */
std::scoped_lock<std::mutex> lck(_mtx); Lck lck(_ps_mtx);
auto est = _ps.get_est(); return _ps.get_est();
return std::make_unique<ccube>(est.value_or(ccube()));
} }

View File

@ -21,10 +21,13 @@
class RtAps : public ThreadedInDataHandler { class RtAps : public ThreadedInDataHandler {
std::mutex _mtx;
std::unique_ptr<Filter> _filterPrototype; std::unique_ptr<Filter> _filterPrototype;
std::vector<std::unique_ptr<Filter>> _freqWeightingFilters; std::vector<std::unique_ptr<Filter>> _freqWeightingFilters;
/**
* @brief Mutex only for _ps. Other members are only accessed in thread.
*/
mutable std::mutex _ps_mtx;
AvPowerSpectra _ps; AvPowerSpectra _ps;
public: public:
@ -44,9 +47,10 @@ public:
/** /**
* @brief Get the latest estimate of the power spectra * @brief Get the latest estimate of the power spectra
* *
* @return Optionally, if available, the latest values * @return If available, the latest estimate. If no estimate available, an
* empty ccube().
*/ */
std::unique_ptr<ccube> getCurrentValue(); ccube getCurrentValue() const;
bool inCallback_threaded(const DaqData &) override final; bool inCallback_threaded(const DaqData &) override final;
void reset(const Daq *) override final; void reset(const Daq *) override final;

View File

@ -24,7 +24,7 @@ class SafeQueue {
} }
DaqData pop() { DaqData pop() {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
if(_contents ==0) { if (empty()) {
throw rte("BUG: Pop on empty queue"); throw rte("BUG: Pop on empty queue");
} }
lck lock(_mtx); lck lock(_mtx);
@ -36,11 +36,13 @@ class SafeQueue {
return d; return d;
} }
bool empty() const { /**
return _contents == 0; * @brief Empty implemented using atomic var, safes some mutex lock/unlock
} * cycles.
*
* @return true if queue is empty
*/
bool empty() const { return _contents == 0; }
}; };
ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr)
@ -61,9 +63,10 @@ bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) {
_queue->push(daqdata); _queue->push(daqdata);
auto &pool = getPool();
if (!_thread_running && (!_stopThread) && _lastCallbackResult) { if (!_thread_running && (!_stopThread) && _lastCallbackResult) {
auto &pool = getPool();
DEBUGTRACE_PRINT("Pushing new thread in pool");
_thread_running = true;
pool.push_task(&ThreadedInDataHandler::threadFcn, this); pool.push_task(&ThreadedInDataHandler::threadFcn, this);
} }
@ -84,19 +87,15 @@ ThreadedInDataHandler::~ThreadedInDataHandler() {
void ThreadedInDataHandler::threadFcn() { void ThreadedInDataHandler::threadFcn() {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
_thread_running = true;
while(!_queue->empty() && !_stopThread) {
if (!_queue->empty() && !_stopThread) {
DaqData d(_queue->pop());
// Call inCallback_threaded // Call inCallback_threaded
if (inCallback_threaded(d) == false) { if (!inCallback_threaded(_queue->pop())) {
cerr << "*********** Callback result returned false! *************"
<< endl;
_lastCallbackResult = false; _lastCallbackResult = false;
_thread_running = false;
return;
} }
} }
_lastCallbackResult = true;
_thread_running = false; _thread_running = false;
} }

View File

@ -35,43 +35,40 @@ public:
} }
} }
std::optional<dmat> pop(const us nframes, const us keep) { dmat pop(const us nframes, const us keep) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
if (keep >= nframes) { if (keep >= nframes) {
throw rte("keep should be < nframes"); throw rte("keep should be < nframes");
} }
if (nframes > n_frames()) {
if (nframes <= n_frames()) { throw rte("Requested more than currently in storage");
assert(!_storage.empty());
dmat res(nframes, _storage.front().n_cols);
us j = 0;
for (us i = 0; i < nframes; i++) {
if (i + keep < nframes) {
// Just pop elements and copy over
res.row(i) = _storage.front();
_storage.pop_front();
} else {
// Suppose keep == 0, then we never arrive here
// Suppose keep == 1, then storage[0] is copyied over.
// Suppose keep == 2, then storage[0[ and storage[1] is copyied over.
// Etc.
res.row(i) = _storage[j];
j++;
}
}
return res;
} }
// If nsamples is too much for what we have, we just return nothing. assert(!_storage.empty());
return std::nullopt;
dmat res(nframes, _storage.front().n_cols);
us j = 0;
for (us i = 0; i < nframes; i++) {
if (i + keep < nframes) {
// Just pop elements and copy over
res.row(i) = _storage.front();
_storage.pop_front();
} else {
// Suppose keep == 0, then we never arrive here
// Suppose keep == 1, then storage[0] is copyied over.
// Suppose keep == 2, then storage[0[ and storage[1] is copyied over.
// Etc.
res.row(i) = _storage[j];
j++;
}
}
return res;
} }
/** /**
@ -83,9 +80,10 @@ public:
}; };
TimeBuffer::TimeBuffer() : _imp(std::make_unique<TimeBufferImp>()) {} TimeBuffer::TimeBuffer() : _imp(std::make_unique<TimeBufferImp>()) {}
std::optional<dmat> TimeBuffer::pop(const us n_rows, const us keep) { dmat TimeBuffer::pop(const us n_rows, const us keep) {
return _imp->pop(n_rows, keep); return _imp->pop(n_rows, keep);
} }
TimeBuffer::~TimeBuffer() {} TimeBuffer::~TimeBuffer() {}
void TimeBuffer::reset() { _imp->reset(); } void TimeBuffer::reset() { _imp->reset(); }
void TimeBuffer::push(const dmat &dat) { _imp->push(dat); } void TimeBuffer::push(const dmat &dat) { _imp->push(dat); }
us TimeBuffer::size() const { return _imp->n_frames(); }

View File

@ -27,14 +27,21 @@ public:
void reset(); void reset();
/** /**
* @brief Try top pop frames from the buffer. * @brief Returns current size of stored amount of frames.
*
* @return The current amount of frames in the storage
*/
us size() const;
/**
* @brief Pop frames from the buffer. Throws a runtime error if more frames
* are requested than actually stored.
* *
* @param nframes The number of rows * @param nframes The number of rows
* @param keep The number of frames to copy, but also to keep in the buffer * @param keep The number of frames to copy, but also to keep in the buffer
* (usage: overlap) * (usage: overlap)
* *
* @return An optional container, containing a matrix of time samples for * @return Array time samples for each channel
* each channel
*/ */
std::optional<dmat> pop(const us nframes, const us keep = 0); dmat pop(const us nframes, const us keep = 0);
}; };

View File

@ -122,8 +122,8 @@ void init_dsp(py::module &m) {
return CubeToNpy<c>(res.value_or(ccube(0, 0, 0))); return CubeToNpy<c>(res.value_or(ccube(0, 0, 0)));
}); });
aps.def("get_est", [](const AvPowerSpectra &ps) { aps.def("get_est", [](const AvPowerSpectra &ps) {
auto est = ps.get_est(); ccube est = ps.get_est();
return CubeToNpy<c>(est.value_or(ccube(0, 0, 0))); return CubeToNpy<c>(est);
}); });
py::class_<SLM> slm(m, "cppSLM"); py::class_<SLM> slm(m, "cppSLM");

View File

@ -215,10 +215,7 @@ void init_datahandler(py::module &m) {
); );
rtaps.def("getCurrentValue", [](RtAps &rt) { rtaps.def("getCurrentValue", [](RtAps &rt) {
std::unique_ptr<ccube> val = rt.getCurrentValue(); ccube val = rt.getCurrentValue();
if (val) { return CubeToNpy<c>(val);
return CubeToNpy<c> (*val);
}
return CubeToNpy<c>(ccube(1,0,0));
}); });
} }