Some more bugfixes: weak references stored in indatahandler, to avoid calling destructor from wrong thread. Removed some unneccessary include statements on the way

This commit is contained in:
Anne de Jong 2024-03-12 21:13:13 +01:00
parent d0d494fcb2
commit e24cac2805
6 changed files with 137 additions and 118 deletions

View File

@ -2,12 +2,9 @@
#include "lasp_avpowerspectra.h" #include "lasp_avpowerspectra.h"
#include "debugtrace.hpp" #include "debugtrace.hpp"
#include "lasp_mathtypes.h" #include "lasp_mathtypes.h"
#include <cmath> #include <stdexcept>
#include <optional>
using rte = std::runtime_error; using rte = std::runtime_error;
using std::cerr;
using std::endl;
PowerSpectra::PowerSpectra(const us nfft, const Window::WindowType w) PowerSpectra::PowerSpectra(const us nfft, const Window::WindowType w)
: PowerSpectra(Window::create(w, nfft)) {} : PowerSpectra(Window::create(w, nfft)) {}

View File

@ -3,8 +3,6 @@
#include "lasp_mathtypes.h" #include "lasp_mathtypes.h"
#include "lasp_timebuffer.h" #include "lasp_timebuffer.h"
#include "lasp_window.h" #include "lasp_window.h"
#include <memory>
#include <optional>
/** \defgroup dsp Digital Signal Processing utilities /** \defgroup dsp Digital Signal Processing utilities
* These are classes and functions used for processing raw signal data, to * These are classes and functions used for processing raw signal data, to

View File

@ -2,7 +2,6 @@
#include "lasp_biquadbank.h" #include "lasp_biquadbank.h"
#include "debugtrace.hpp" #include "debugtrace.hpp"
#include "lasp_thread.h" #include "lasp_thread.h"
#include <cassert>
#include <vector> #include <vector>
using std::cerr; using std::cerr;

View File

@ -4,12 +4,9 @@
// //
// Description: Real Time Signal Viewer. // Description: Real Time Signal Viewer.
#pragma once #pragma once
#include "lasp_avpowerspectra.h"
#include "lasp_filter.h"
#include "lasp_mathtypes.h" #include "lasp_mathtypes.h"
#include "lasp_threadedindatahandler.h" #include "lasp_threadedindatahandler.h"
#include "lasp_timebuffer.h" #include "lasp_timebuffer.h"
#include <memory>
#include <mutex> #include <mutex>
/** /**

View File

@ -1,4 +1,10 @@
/* #define DEBUGTRACE_ENABLED */ // #define DEBUGTRACE_ENABLED
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <armadillo>
#include <atomic>
#include "arma_npy.h" #include "arma_npy.h"
#include "debugtrace.hpp" #include "debugtrace.hpp"
#include "lasp_clip.h" #include "lasp_clip.h"
@ -9,10 +15,6 @@
#include "lasp_rtsignalviewer.h" #include "lasp_rtsignalviewer.h"
#include "lasp_streammgr.h" #include "lasp_streammgr.h"
#include "lasp_threadedindatahandler.h" #include "lasp_threadedindatahandler.h"
#include <armadillo>
#include <atomic>
#include <chrono>
#include <pybind11/pybind11.h>
using namespace std::literals::chrono_literals; using namespace std::literals::chrono_literals;
using std::cerr; using std::cerr;
@ -48,17 +50,17 @@ py::array_t<T> getPyArrayNoCpy(const DaqData &d) {
*/ */
return py::array_t<T>( return py::array_t<T>(
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
py::array::StridesContainer( // Strides py::array::StridesContainer( // Strides
{sizeof(T), {sizeof(T),
sizeof(T) * d.nframes}), // Strides (in bytes) for each index sizeof(T) * d.nframes}), // Strides (in bytes) for each index
reinterpret_cast<T *>( reinterpret_cast<T *>(
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
dummyDataOwner // As stated above, now Numpy does not take ownership of dummyDataOwner // As stated above, now Numpy does not take ownership of
// the data pointer. // the data pointer.
); );
} }
@ -81,17 +83,17 @@ py::array_t<d> dmat_to_ndarray(const DaqData &d) {
*/ */
return py::array_t<T>( return py::array_t<T>(
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
py::array::StridesContainer( // Strides py::array::StridesContainer( // Strides
{sizeof(T), {sizeof(T),
sizeof(T) * d.nframes}), // Strides (in bytes) for each index sizeof(T) * d.nframes}), // Strides (in bytes) for each index
reinterpret_cast<T *>( reinterpret_cast<T *>(
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
dummyDataOwner // As stated above, now Numpy does not take ownership of dummyDataOwner // As stated above, now Numpy does not take ownership of
// the data pointer. // the data pointer.
); );
} }
@ -104,8 +106,8 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
/** /**
* @brief The callback functions that is called. * @brief The callback functions that is called.
*/ */
std::unique_ptr<py::function> cb, reset_callback; py::weakref _cb, _reset_callback;
bool _done{false}; std::atomic<bool> _done{false};
public: public:
/** /**
@ -118,9 +120,7 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
* is called, when a stream stops, this pointer / handle will dangle. * is called, when a stream stops, this pointer / handle will dangle.
*/ */
PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback)
: ThreadedInDataHandler(mgr), : ThreadedInDataHandler(mgr), _cb(cb), _reset_callback(reset_callback) {
cb(std::make_unique<py::function>(cb)),
reset_callback(std::make_unique<py::function>(reset_callback)) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
/// Start should be called externally, as at constructor time no virtual /// Start should be called externally, as at constructor time no virtual
/// functions should be called. /// functions should be called.
@ -139,27 +139,38 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
* *
* @param daq Daq device, or nullptr in case no input stream is running. * @param daq Daq device, or nullptr in case no input stream is running.
*/ */
void reset(const Daq *daq) { void reset(const Daq *daqi) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
if (_done) return; if (_done) return;
try { {
py::gil_scoped_acquire acquire; py::gil_scoped_acquire acquire;
if (daq) { try {
(*reset_callback)(daq); py::object reset_callback = _reset_callback();
} else { if (reset_callback.is_none()) {
(*reset_callback)(py::none()); DEBUGTRACE_PRINT("cb is none, weakref killed");
_done = true;
return;
}
if (daqi != nullptr) {
assert(reset_callback);
reset_callback(daqi);
} else {
assert(reset_callback);
reset_callback(py::none());
}
} catch (py::error_already_set &e) {
cerr << "*************** Error calling reset callback!\n";
cerr << e.what() << endl;
cerr << "*************** \n";
/// Throwing a runtime error here does not work out one way or another.
/// Therefore, it is better to dive out and prevent undefined behaviour
abort();
/* throw std::runtime_error(e.what()); */
} catch (std::exception &e) {
cerr << "Caught unknown exception in reset callback:" << e.what()
<< endl;
abort();
} }
} catch (py::error_already_set &e) {
cerr << "*************** Error calling reset callback!\n";
cerr << e.what() << endl;
cerr << "*************** \n";
/// Throwing a runtime error here does not work out one way or another.
/// Therefore, it is better to dive out and prevent undefined behaviour
abort();
/* throw std::runtime_error(e.what()); */
} catch (std::exception &e) {
cerr << "Caught unknown exception in reset callback:" << e.what() << endl;
abort();
} }
} }
@ -168,62 +179,79 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
* stream data. * stream data.
*/ */
void inCallback(const DaqData &d) { void inCallback(const DaqData &d) {
// DEBUGTRACE_ENTER;
/* DEBUGTRACE_ENTER; */ // cerr << "Thread ID: " << std::this_thread::get_id() << endl;
using DataType = DataTypeDescriptor::DataType; using DataType = DataTypeDescriptor::DataType;
if (_done) return; if (_done) {
DEBUGTRACE_PRINT("Early stop, done");
try { return;
py::gil_scoped_acquire acquire;
py::object bool_val;
switch (d.dtype) {
case (DataType::dtype_int8): {
bool_val = (*cb)(getPyArrayNoCpy<int8_t>(d));
} break;
case (DataType::dtype_int16): {
bool_val = (*cb)(getPyArrayNoCpy<int16_t>(d));
} break;
case (DataType::dtype_int32): {
bool_val = (*cb)(getPyArrayNoCpy<int32_t>(d));
} break;
case (DataType::dtype_fl32): {
bool_val = (*cb)(getPyArrayNoCpy<float>(d));
} break;
case (DataType::dtype_fl64): {
bool_val = (*cb)(getPyArrayNoCpy<double>(d));
} break;
default:
throw std::runtime_error("BUG");
} // End of switch
bool res = bool_val.cast<bool>();
if (res == false) {
DEBUGTRACE_PRINT("Setting callbacks to None")
_done = true;
// cb = py::function(py::none());
// reset_callback = py::function(py::none());
cb.reset();
reset_callback.reset();
}
} catch (py::error_already_set &e) {
cerr << "ERROR: Python raised exception from callback function: ";
cerr << e.what() << endl;
abort();
} catch (py::cast_error &e) {
cerr << e.what() << endl;
cerr << "ERROR: Python callback does not return boolean value." << endl;
abort();
} catch (std::exception &e) {
cerr << "Caught unknown exception in Python callback:" << e.what()
<< endl;
abort();
} }
} {
py::gil_scoped_acquire acquire;
try {
py::object py_bool;
py::object cb = _cb();
if (cb.is_none()) {
DEBUGTRACE_PRINT("cb is none, weakref killed");
_done = true;
return;
}
switch (d.dtype) {
case (DataType::dtype_int8): {
py_bool = cb(getPyArrayNoCpy<int8_t>(d));
} break;
case (DataType::dtype_int16): {
py_bool = cb(getPyArrayNoCpy<int16_t>(d));
} break;
case (DataType::dtype_int32): {
py_bool = cb(getPyArrayNoCpy<int32_t>(d));
} break;
case (DataType::dtype_fl32): {
py_bool = cb(getPyArrayNoCpy<float>(d));
} break;
case (DataType::dtype_fl64): {
py_bool = cb(getPyArrayNoCpy<double>(d));
} break;
default:
throw std::runtime_error("BUG");
} // End of switch
bool res = py_bool.cast<bool>();
if (res == false) {
DEBUGTRACE_PRINT("Setting callbacks to None")
_done = true;
// By doing this, we remove the references, but in the mean time this
// might also trigger removing Python objects. Including itself, as
// there is no reference to it anymore. The consequence is that the
// current object might be destroyed from this thread. However, if we
// do not remove these references and in lasp_record.py finish() is
// not called, we end up with not-garbage collected recordings in
// memory. This is also not good. How can we force Python to not yet
// destroy this object?
// cb.reset();
// reset_callback.reset();
}
} catch (py::error_already_set &e) {
cerr << "ERROR: Python raised exception from callback function: ";
cerr << e.what() << endl;
abort();
} catch (py::cast_error &e) {
cerr << e.what() << endl;
cerr << "ERROR: Python callback does not return boolean value." << endl;
abort();
} catch (std::exception &e) {
cerr << "Caught unknown exception in Python callback:" << e.what()
<< endl;
abort();
}
} // End of scope in which the GIL is acquired
} // End of function
}; };
void init_datahandler(py::module &m) { void init_datahandler(py::module &m) {
/// The C++ class is PyIndataHandler, but for Python, it is called /// The C++ class is PyIndataHandler, but for Python, it is called
/// InDataHandler /// InDataHandler
py::class_<PyIndataHandler> pyidh(m, "InDataHandler"); py::class_<PyIndataHandler> pyidh(m, "InDataHandler");
@ -256,29 +284,29 @@ void init_datahandler(py::module &m) {
cval = clip.getCurrentValue(); cval = clip.getCurrentValue();
} }
return ColToNpy<arma::uword>(cval); // something goes wrong here return ColToNpy<arma::uword>(cval); // something goes wrong here
}); });
/// Real time Aps /// Real time Aps
/// ///
py::class_<RtAps> rtaps(m, "RtAps"); py::class_<RtAps> rtaps(m, "RtAps");
rtaps.def(py::init<SmgrHandle, // StreamMgr rtaps.def(py::init<SmgrHandle, // StreamMgr
Filter *const, // FreqWeighting filter Filter *const, // FreqWeighting filter
const us, // Nfft const us, // Nfft
const Window::WindowType, // Window const Window::WindowType, // Window
const d, // Overlap percentage 0<=o<100 const d, // Overlap percentage 0<=o<100
const d // Time constant const d // Time constant
>(), >(),
py::arg("streammgr"), // StreamMgr py::arg("streammgr"), // StreamMgr
py::arg("preFilter").none(true), py::arg("preFilter").none(true),
/// Below list of arguments *SHOULD* be same as for /// Below list of arguments *SHOULD* be same as for
/// AvPowerSpectra constructor! /// AvPowerSpectra constructor!
py::arg("nfft") = 2048, // py::arg("nfft") = 2048, //
py::arg("windowType") = Window::WindowType::Hann, // py::arg("windowType") = Window::WindowType::Hann, //
py::arg("overlap_percentage") = 50.0, // py::arg("overlap_percentage") = 50.0, //
py::arg("time_constant") = -1 // py::arg("time_constant") = -1 //
); );
rtaps.def("getCurrentValue", [](RtAps &rt) { rtaps.def("getCurrentValue", [](RtAps &rt) {
@ -293,10 +321,10 @@ void init_datahandler(py::module &m) {
/// Real time Signal Viewer /// Real time Signal Viewer
/// ///
py::class_<RtSignalViewer> rtsv(m, "RtSignalViewer"); py::class_<RtSignalViewer> rtsv(m, "RtSignalViewer");
rtsv.def(py::init<SmgrHandle, // StreamMgr rtsv.def(py::init<SmgrHandle, // StreamMgr
const d, // Time history const d, // Time history
const us, // Resolution const us, // Resolution
const us // Channel number const us // Channel number
>()); >());
rtsv.def("getCurrentValue", [](RtSignalViewer &rt) { rtsv.def("getCurrentValue", [](RtSignalViewer &rt) {

View File

@ -190,7 +190,7 @@ class Recording:
When returning False, it will stop the stream. When returning False, it will stop the stream.
""" """
logger.debug(f"inCallback({adata})") logger.debug(f"inCallback()")
if self._stop(): if self._stop():
logger.debug("Stop flag set, early return in inCallback") logger.debug("Stop flag set, early return in inCallback")
# Stop flag is raised. We do not add any data anymore. # Stop flag is raised. We do not add any data anymore.
@ -228,10 +228,10 @@ class Recording:
self._progressCallback(recstatus) self._progressCallback(recstatus)
case RecordingState.AllDataStored: case RecordingState.AllDataStored:
pass return False
case RecordingState.Finished: case RecordingState.Finished:
pass return False
return True return True