diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index e79d9ba..f9c2384 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -336,12 +336,15 @@ void StreamMgr::addInDataHandler(InDataHandler &handler) { DEBUGTRACE_ENTER; checkRightThread(); std::scoped_lock lck(_inDataHandler_mtx); - _inDataHandlers.push_back(&handler); if (_inputStream) { handler.reset(_inputStream.get()); } else { handler.reset(nullptr); } + if(std::find(_inDataHandlers.cbegin(),_inDataHandlers.cend(), &handler) != _inDataHandlers.cend()) { + throw std::runtime_error("Error: handler already added. Probably start() is called more than once on a handler object"); + } + _inDataHandlers.push_back(&handler); } void StreamMgr::removeInDataHandler(InDataHandler &handler) { DEBUGTRACE_ENTER; diff --git a/src/lasp/dsp/lasp_avpowerspectra.cpp b/src/lasp/dsp/lasp_avpowerspectra.cpp index 83c7f05..59ccb08 100644 --- a/src/lasp/dsp/lasp_avpowerspectra.cpp +++ b/src/lasp/dsp/lasp_avpowerspectra.cpp @@ -1,7 +1,7 @@ /* #define DEBUGTRACE_ENABLED */ #include "lasp_avpowerspectra.h" -#include "lasp_mathtypes.h" #include "debugtrace.hpp" +#include "lasp_mathtypes.h" #include #include @@ -85,7 +85,7 @@ void AvPowerSpectra::reset() { } -std::optional AvPowerSpectra::compute(const dmat &timedata) { +ccube AvPowerSpectra::compute(const dmat &timedata) { DEBUGTRACE_ENTER; DEBUGTRACE_PRINT(timedata.n_rows); @@ -93,42 +93,44 @@ std::optional AvPowerSpectra::compute(const dmat &timedata) { _timeBuf.push(timedata); - us i = 0; - while (auto samples = _timeBuf.pop(_ps.nfft, _overlap_keep)) { + bool run_once = false; + while (_timeBuf.size() >= _ps.nfft) { + DEBUGTRACE_PRINT((int)_mode); + + dmat samples = _timeBuf.pop(_ps.nfft, _overlap_keep); + switch (_mode) { - case (Mode::Spectrogram): { + case (Mode::Spectrogram): DEBUGTRACE_PRINT("Spectrogram mode"); - _est = _ps.compute(samples.value()); - } break; - case (Mode::Averaging): { + _est = _ps.compute(samples); + break; + case (Mode::Averaging): DEBUGTRACE_PRINT("Averaging mode"); n_averages++; if (n_averages == 1) { - _est = _ps.compute(samples.value()); + _est = _ps.compute(samples); } else { _est = (static_cast(n_averages - 1) / n_averages) * _est + - _ps.compute(samples.value()) / n_averages; + _ps.compute(samples) / n_averages; } - } break; - case (Mode::Leaking): { + break; + case (Mode::Leaking): DEBUGTRACE_PRINT("Leaking mode"); if (arma::size(_est) == arma::size(0, 0, 0)) { - _est = _ps.compute(samples.value()); + _est = _ps.compute(samples); } else { - _est = _alpha * _est + (1 - _alpha) * _ps.compute(samples.value()); + _est = _alpha * _est + (1 - _alpha) * _ps.compute(samples); } - } break; + break; } // end switch mode - i++; + run_once = true; } - if (i > 0) { - return std::make_optional(_est); - } - return std::nullopt; + + /// Return a copy of current estimator in case we have done one update. + /// Othewise, we return an empty ccube. + return run_once ? _est : ccube(); } -std::optional AvPowerSpectra::get_est() const { - if (_est.n_cols > 0) - return _est; - return std::nullopt; +ccube AvPowerSpectra::get_est() const { + return _est; } diff --git a/src/lasp/dsp/lasp_avpowerspectra.h b/src/lasp/dsp/lasp_avpowerspectra.h index 95092e4..e195839 100644 --- a/src/lasp/dsp/lasp_avpowerspectra.h +++ b/src/lasp/dsp/lasp_avpowerspectra.h @@ -142,20 +142,21 @@ public: * * @param timedata * - * @return Optionally, a copy of the latest estimate of the power spectra. An + * @return a copy of the latest estimate of the power spectra. an * update is only given if the amount of new time data is enough to compute a - * new estimate. It can be checked by operator bool(). - * - */ - std::optional compute(const dmat &timedata); + * new estimate. if no new estimate is available, it returns an empty ccube. + * Note that the latest available estimate can be obtained using get_est(). + * */ + ccube compute(const dmat &timedata); /** * @brief Returns the latest estimate of cps (cross-power spectra. * - * @return Pointer (reference, not owning!) to spectral estimate date, or - * nullptr, in case nothing could yet be computed. - */ - std::optional get_est() const; + * @return a copy of the latest estimate of the power spectra. an + * update is only given if the amount of new time data is enough to compute a + * new estimate. If no estimate is available, it returns an empty ccube. + * */ + ccube get_est() const; /** * @brief The overlap is rounded to a certain amount of time samples. This diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 68b8d20..6ef3d14 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -18,7 +18,7 @@ PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps) bool PPMHandler::inCallback_threaded(const DaqData &d) { DEBUGTRACE_ENTER; - std::scoped_lock lck(_mtx); + Lck lck(_mtx); dmat data = d.toFloat(); /* data.print(); */ @@ -73,7 +73,7 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) { std::tuple PPMHandler::getCurrentValue() const { DEBUGTRACE_ENTER; - std::scoped_lock lck(_mtx); + Lck lck(_mtx); arma::uvec clips(_clip_time.size(), arma::fill::zeros); clips.elem(arma::find(_clip_time >= 0)).fill(1); @@ -83,13 +83,13 @@ std::tuple PPMHandler::getCurrentValue() const { void PPMHandler::reset(const Daq *daq) { - /* DEBUGTRACE_ENTER; */ - std::scoped_lock lck(_mtx); + DEBUGTRACE_ENTER; + Lck lck(_mtx); if (daq) { _cur_max.fill(1e-80); - ; + _clip_time.fill(-1); const d fs = daq->samplerate(); @@ -103,6 +103,6 @@ void PPMHandler::reset(const Daq *daq) { PPMHandler::~PPMHandler() { DEBUGTRACE_ENTER; - std::scoped_lock lck(_mtx); + Lck lck(_mtx); stop(); } diff --git a/src/lasp/dsp/lasp_rtaps.cpp b/src/lasp/dsp/lasp_rtaps.cpp index 4f3af10..3768c8e 100644 --- a/src/lasp/dsp/lasp_rtaps.cpp +++ b/src/lasp/dsp/lasp_rtaps.cpp @@ -5,6 +5,7 @@ using std::cerr; using std::endl; +using Lck = std::scoped_lock; RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, const us nfft, @@ -17,17 +18,16 @@ RtAps::RtAps(StreamMgr &mgr, const Filter *freqWeightingFilter, _filterPrototype = freqWeightingFilter->clone(); } - start(); } RtAps::~RtAps() { - std::scoped_lock lck(_mtx); + Lck lck(_ps_mtx); stop(); } bool RtAps::inCallback_threaded(const DaqData &data) { DEBUGTRACE_ENTER; - std::scoped_lock lck(_mtx); + dmat fltdata = data.toFloat(); const us nchannels = fltdata.n_cols; @@ -53,6 +53,7 @@ bool RtAps::inCallback_threaded(const DaqData &data) { } } // End of if(_filterPrototype) + Lck lck(_ps_mtx); _ps.compute(fltdata); return true; @@ -63,15 +64,14 @@ void RtAps::reset(const Daq *daq) { // Explicitly say // not used. DEBUGTRACE_ENTER; - std::scoped_lock lck(_mtx); + Lck lck(_ps_mtx); _ps.reset(); } -std::unique_ptr RtAps::getCurrentValue() { +ccube RtAps::getCurrentValue() const { /* DEBUGTRACE_ENTER; */ - std::scoped_lock lck(_mtx); - auto est = _ps.get_est(); - return std::make_unique(est.value_or(ccube())); + Lck lck(_ps_mtx); + return _ps.get_est(); } diff --git a/src/lasp/dsp/lasp_rtaps.h b/src/lasp/dsp/lasp_rtaps.h index 9443c75..3cb631c 100644 --- a/src/lasp/dsp/lasp_rtaps.h +++ b/src/lasp/dsp/lasp_rtaps.h @@ -21,10 +21,13 @@ class RtAps : public ThreadedInDataHandler { - std::mutex _mtx; std::unique_ptr _filterPrototype; std::vector> _freqWeightingFilters; + /** + * @brief Mutex only for _ps. Other members are only accessed in thread. + */ + mutable std::mutex _ps_mtx; AvPowerSpectra _ps; public: @@ -44,9 +47,10 @@ public: /** * @brief Get the latest estimate of the power spectra * - * @return Optionally, if available, the latest values + * @return If available, the latest estimate. If no estimate available, an + * empty ccube(). */ - std::unique_ptr getCurrentValue(); + ccube getCurrentValue() const; bool inCallback_threaded(const DaqData &) override final; void reset(const Daq *) override final; diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 4ee16ac..9a6d724 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -24,7 +24,7 @@ class SafeQueue { } DaqData pop() { DEBUGTRACE_ENTER; - if(_contents ==0) { + if (empty()) { throw rte("BUG: Pop on empty queue"); } lck lock(_mtx); @@ -36,11 +36,13 @@ class SafeQueue { return d; } - bool empty() const { - return _contents == 0; - } - - + /** + * @brief Empty implemented using atomic var, safes some mutex lock/unlock + * cycles. + * + * @return true if queue is empty + */ + bool empty() const { return _contents == 0; } }; ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) @@ -61,9 +63,10 @@ bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { _queue->push(daqdata); - auto &pool = getPool(); - if (!_thread_running && (!_stopThread) && _lastCallbackResult) { + auto &pool = getPool(); + DEBUGTRACE_PRINT("Pushing new thread in pool"); + _thread_running = true; pool.push_task(&ThreadedInDataHandler::threadFcn, this); } @@ -84,19 +87,15 @@ ThreadedInDataHandler::~ThreadedInDataHandler() { void ThreadedInDataHandler::threadFcn() { DEBUGTRACE_ENTER; - _thread_running = true; - - if (!_queue->empty() && !_stopThread) { - DaqData d(_queue->pop()); + while(!_queue->empty() && !_stopThread) { // Call inCallback_threaded - if (inCallback_threaded(d) == false) { + if (!inCallback_threaded(_queue->pop())) { + cerr << "*********** Callback result returned false! *************" + << endl; _lastCallbackResult = false; - _thread_running = false; - return; } } - _lastCallbackResult = true; _thread_running = false; } diff --git a/src/lasp/dsp/lasp_timebuffer.cpp b/src/lasp/dsp/lasp_timebuffer.cpp index f2d4fe1..80755f1 100644 --- a/src/lasp/dsp/lasp_timebuffer.cpp +++ b/src/lasp/dsp/lasp_timebuffer.cpp @@ -35,43 +35,40 @@ public: } } - std::optional pop(const us nframes, const us keep) { + dmat pop(const us nframes, const us keep) { DEBUGTRACE_ENTER; if (keep >= nframes) { throw rte("keep should be < nframes"); } - - if (nframes <= n_frames()) { - - assert(!_storage.empty()); - - dmat res(nframes, _storage.front().n_cols); - - us j = 0; - for (us i = 0; i < nframes; i++) { - - if (i + keep < nframes) { - // Just pop elements and copy over - res.row(i) = _storage.front(); - _storage.pop_front(); - - } else { - - // Suppose keep == 0, then we never arrive here - // Suppose keep == 1, then storage[0] is copyied over. - // Suppose keep == 2, then storage[0[ and storage[1] is copyied over. - // Etc. - res.row(i) = _storage[j]; - j++; - } - } - return res; + if (nframes > n_frames()) { + throw rte("Requested more than currently in storage"); } - // If nsamples is too much for what we have, we just return nothing. - return std::nullopt; + assert(!_storage.empty()); + + dmat res(nframes, _storage.front().n_cols); + + us j = 0; + for (us i = 0; i < nframes; i++) { + if (i + keep < nframes) { + // Just pop elements and copy over + res.row(i) = _storage.front(); + _storage.pop_front(); + + } else { + + // Suppose keep == 0, then we never arrive here + // Suppose keep == 1, then storage[0] is copyied over. + // Suppose keep == 2, then storage[0[ and storage[1] is copyied over. + // Etc. + res.row(i) = _storage[j]; + j++; + } + } + + return res; } /** @@ -83,9 +80,10 @@ public: }; TimeBuffer::TimeBuffer() : _imp(std::make_unique()) {} -std::optional TimeBuffer::pop(const us n_rows, const us keep) { +dmat TimeBuffer::pop(const us n_rows, const us keep) { return _imp->pop(n_rows, keep); } TimeBuffer::~TimeBuffer() {} void TimeBuffer::reset() { _imp->reset(); } void TimeBuffer::push(const dmat &dat) { _imp->push(dat); } +us TimeBuffer::size() const { return _imp->n_frames(); } diff --git a/src/lasp/dsp/lasp_timebuffer.h b/src/lasp/dsp/lasp_timebuffer.h index baa4f07..ae6fb13 100644 --- a/src/lasp/dsp/lasp_timebuffer.h +++ b/src/lasp/dsp/lasp_timebuffer.h @@ -27,14 +27,21 @@ public: void reset(); /** - * @brief Try top pop frames from the buffer. + * @brief Returns current size of stored amount of frames. + * + * @return The current amount of frames in the storage + */ + us size() const; + + /** + * @brief Pop frames from the buffer. Throws a runtime error if more frames + * are requested than actually stored. * * @param nframes The number of rows * @param keep The number of frames to copy, but also to keep in the buffer * (usage: overlap) * - * @return An optional container, containing a matrix of time samples for - * each channel + * @return Array time samples for each channel */ - std::optional pop(const us nframes, const us keep = 0); + dmat pop(const us nframes, const us keep = 0); }; diff --git a/src/lasp/pybind11/lasp_dsp_pybind.cpp b/src/lasp/pybind11/lasp_dsp_pybind.cpp index 48d0273..a770647 100644 --- a/src/lasp/pybind11/lasp_dsp_pybind.cpp +++ b/src/lasp/pybind11/lasp_dsp_pybind.cpp @@ -122,8 +122,8 @@ void init_dsp(py::module &m) { return CubeToNpy(res.value_or(ccube(0, 0, 0))); }); aps.def("get_est", [](const AvPowerSpectra &ps) { - auto est = ps.get_est(); - return CubeToNpy(est.value_or(ccube(0, 0, 0))); + ccube est = ps.get_est(); + return CubeToNpy(est); }); py::class_ slm(m, "cppSLM"); diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index 8f909c9..69af06f 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -215,10 +215,7 @@ void init_datahandler(py::module &m) { ); rtaps.def("getCurrentValue", [](RtAps &rt) { - std::unique_ptr val = rt.getCurrentValue(); - if (val) { - return CubeToNpy (*val); - } - return CubeToNpy(ccube(1,0,0)); + ccube val = rt.getCurrentValue(); + return CubeToNpy(val); }); }