diff --git a/cpp_src/dsp/lasp_avpowerspectra.cpp b/cpp_src/dsp/lasp_avpowerspectra.cpp index 59ccb08..100fd27 100644 --- a/cpp_src/dsp/lasp_avpowerspectra.cpp +++ b/cpp_src/dsp/lasp_avpowerspectra.cpp @@ -2,12 +2,9 @@ #include "lasp_avpowerspectra.h" #include "debugtrace.hpp" #include "lasp_mathtypes.h" -#include -#include +#include using rte = std::runtime_error; -using std::cerr; -using std::endl; PowerSpectra::PowerSpectra(const us nfft, const Window::WindowType w) : PowerSpectra(Window::create(w, nfft)) {} diff --git a/cpp_src/dsp/lasp_avpowerspectra.h b/cpp_src/dsp/lasp_avpowerspectra.h index e195839..a90e5b9 100644 --- a/cpp_src/dsp/lasp_avpowerspectra.h +++ b/cpp_src/dsp/lasp_avpowerspectra.h @@ -3,8 +3,6 @@ #include "lasp_mathtypes.h" #include "lasp_timebuffer.h" #include "lasp_window.h" -#include -#include /** \defgroup dsp Digital Signal Processing utilities * These are classes and functions used for processing raw signal data, to diff --git a/cpp_src/dsp/lasp_biquadbank.cpp b/cpp_src/dsp/lasp_biquadbank.cpp index d31744e..582a899 100644 --- a/cpp_src/dsp/lasp_biquadbank.cpp +++ b/cpp_src/dsp/lasp_biquadbank.cpp @@ -2,7 +2,6 @@ #include "lasp_biquadbank.h" #include "debugtrace.hpp" #include "lasp_thread.h" -#include #include using std::cerr; diff --git a/cpp_src/dsp/lasp_rtsignalviewer.h b/cpp_src/dsp/lasp_rtsignalviewer.h index 253a934..6502781 100644 --- a/cpp_src/dsp/lasp_rtsignalviewer.h +++ b/cpp_src/dsp/lasp_rtsignalviewer.h @@ -4,12 +4,9 @@ // // Description: Real Time Signal Viewer. #pragma once -#include "lasp_avpowerspectra.h" -#include "lasp_filter.h" #include "lasp_mathtypes.h" #include "lasp_threadedindatahandler.h" #include "lasp_timebuffer.h" -#include #include /** diff --git a/cpp_src/pybind11/lasp_pyindatahandler.cpp b/cpp_src/pybind11/lasp_pyindatahandler.cpp index ce1ca9f..76e072e 100644 --- a/cpp_src/pybind11/lasp_pyindatahandler.cpp +++ b/cpp_src/pybind11/lasp_pyindatahandler.cpp @@ -1,4 +1,10 @@ -/* #define DEBUGTRACE_ENABLED */ +// #define DEBUGTRACE_ENABLED +#include +#include + +#include +#include + #include "arma_npy.h" #include "debugtrace.hpp" #include "lasp_clip.h" @@ -9,10 +15,6 @@ #include "lasp_rtsignalviewer.h" #include "lasp_streammgr.h" #include "lasp_threadedindatahandler.h" -#include -#include -#include -#include using namespace std::literals::chrono_literals; using std::cerr; @@ -48,17 +50,17 @@ py::array_t getPyArrayNoCpy(const DaqData &d) { */ return py::array_t( - py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape + py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape - py::array::StridesContainer( // Strides + py::array::StridesContainer( // Strides {sizeof(T), - sizeof(T) * d.nframes}), // Strides (in bytes) for each index + sizeof(T) * d.nframes}), // Strides (in bytes) for each index reinterpret_cast( - const_cast(d).raw_ptr()), // Pointer to buffer + const_cast(d).raw_ptr()), // Pointer to buffer - dummyDataOwner // As stated above, now Numpy does not take ownership of - // the data pointer. + dummyDataOwner // As stated above, now Numpy does not take ownership of + // the data pointer. ); } @@ -81,17 +83,17 @@ py::array_t dmat_to_ndarray(const DaqData &d) { */ return py::array_t( - py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape + py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape - py::array::StridesContainer( // Strides + py::array::StridesContainer( // Strides {sizeof(T), - sizeof(T) * d.nframes}), // Strides (in bytes) for each index + sizeof(T) * d.nframes}), // Strides (in bytes) for each index reinterpret_cast( - const_cast(d).raw_ptr()), // Pointer to buffer + const_cast(d).raw_ptr()), // Pointer to buffer - dummyDataOwner // As stated above, now Numpy does not take ownership of - // the data pointer. + dummyDataOwner // As stated above, now Numpy does not take ownership of + // the data pointer. ); } @@ -104,8 +106,8 @@ class PyIndataHandler : public ThreadedInDataHandler { /** * @brief The callback functions that is called. */ - std::unique_ptr cb, reset_callback; - bool _done{false}; + py::weakref _cb, _reset_callback; + std::atomic _done{false}; public: /** @@ -118,9 +120,7 @@ class PyIndataHandler : public ThreadedInDataHandler { * is called, when a stream stops, this pointer / handle will dangle. */ PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) - : ThreadedInDataHandler(mgr), - cb(std::make_unique(cb)), - reset_callback(std::make_unique(reset_callback)) { + : ThreadedInDataHandler(mgr), _cb(cb), _reset_callback(reset_callback) { DEBUGTRACE_ENTER; /// Start should be called externally, as at constructor time no virtual /// functions should be called. @@ -139,27 +139,38 @@ class PyIndataHandler : public ThreadedInDataHandler { * * @param daq Daq device, or nullptr in case no input stream is running. */ - void reset(const Daq *daq) { + void reset(const Daq *daqi) { DEBUGTRACE_ENTER; if (_done) return; - try { + { py::gil_scoped_acquire acquire; - if (daq) { - (*reset_callback)(daq); - } else { - (*reset_callback)(py::none()); + try { + py::object reset_callback = _reset_callback(); + if (reset_callback.is_none()) { + DEBUGTRACE_PRINT("cb is none, weakref killed"); + _done = true; + return; + } + if (daqi != nullptr) { + assert(reset_callback); + reset_callback(daqi); + } else { + assert(reset_callback); + reset_callback(py::none()); + } + } catch (py::error_already_set &e) { + cerr << "*************** Error calling reset callback!\n"; + cerr << e.what() << endl; + cerr << "*************** \n"; + /// Throwing a runtime error here does not work out one way or another. + /// Therefore, it is better to dive out and prevent undefined behaviour + abort(); + /* throw std::runtime_error(e.what()); */ + } catch (std::exception &e) { + cerr << "Caught unknown exception in reset callback:" << e.what() + << endl; + abort(); } - } catch (py::error_already_set &e) { - cerr << "*************** Error calling reset callback!\n"; - cerr << e.what() << endl; - cerr << "*************** \n"; - /// Throwing a runtime error here does not work out one way or another. - /// Therefore, it is better to dive out and prevent undefined behaviour - abort(); - /* throw std::runtime_error(e.what()); */ - } catch (std::exception &e) { - cerr << "Caught unknown exception in reset callback:" << e.what() << endl; - abort(); } } @@ -168,62 +179,79 @@ class PyIndataHandler : public ThreadedInDataHandler { * stream data. */ void inCallback(const DaqData &d) { - - /* DEBUGTRACE_ENTER; */ + // DEBUGTRACE_ENTER; + // cerr << "Thread ID: " << std::this_thread::get_id() << endl; using DataType = DataTypeDescriptor::DataType; - if (_done) return; - - try { - py::gil_scoped_acquire acquire; - py::object bool_val; - switch (d.dtype) { - case (DataType::dtype_int8): { - bool_val = (*cb)(getPyArrayNoCpy(d)); - } break; - case (DataType::dtype_int16): { - bool_val = (*cb)(getPyArrayNoCpy(d)); - } break; - case (DataType::dtype_int32): { - bool_val = (*cb)(getPyArrayNoCpy(d)); - } break; - case (DataType::dtype_fl32): { - bool_val = (*cb)(getPyArrayNoCpy(d)); - } break; - case (DataType::dtype_fl64): { - bool_val = (*cb)(getPyArrayNoCpy(d)); - } break; - default: - throw std::runtime_error("BUG"); - } // End of switch - - bool res = bool_val.cast(); - if (res == false) { - DEBUGTRACE_PRINT("Setting callbacks to None") - _done = true; - // cb = py::function(py::none()); - // reset_callback = py::function(py::none()); - cb.reset(); - reset_callback.reset(); - } - } catch (py::error_already_set &e) { - cerr << "ERROR: Python raised exception from callback function: "; - cerr << e.what() << endl; - abort(); - } catch (py::cast_error &e) { - cerr << e.what() << endl; - cerr << "ERROR: Python callback does not return boolean value." << endl; - abort(); - } catch (std::exception &e) { - cerr << "Caught unknown exception in Python callback:" << e.what() - << endl; - abort(); + if (_done) { + DEBUGTRACE_PRINT("Early stop, done"); + return; } - } + { + py::gil_scoped_acquire acquire; + try { + py::object py_bool; + py::object cb = _cb(); + if (cb.is_none()) { + DEBUGTRACE_PRINT("cb is none, weakref killed"); + _done = true; + return; + } + switch (d.dtype) { + case (DataType::dtype_int8): { + py_bool = cb(getPyArrayNoCpy(d)); + } break; + case (DataType::dtype_int16): { + py_bool = cb(getPyArrayNoCpy(d)); + } break; + case (DataType::dtype_int32): { + py_bool = cb(getPyArrayNoCpy(d)); + } break; + case (DataType::dtype_fl32): { + py_bool = cb(getPyArrayNoCpy(d)); + } break; + case (DataType::dtype_fl64): { + py_bool = cb(getPyArrayNoCpy(d)); + } break; + default: + throw std::runtime_error("BUG"); + } // End of switch + + bool res = py_bool.cast(); + if (res == false) { + DEBUGTRACE_PRINT("Setting callbacks to None") + _done = true; + + // By doing this, we remove the references, but in the mean time this + // might also trigger removing Python objects. Including itself, as + // there is no reference to it anymore. The consequence is that the + // current object might be destroyed from this thread. However, if we + // do not remove these references and in lasp_record.py finish() is + // not called, we end up with not-garbage collected recordings in + // memory. This is also not good. How can we force Python to not yet + // destroy this object? + // cb.reset(); + // reset_callback.reset(); + } + } catch (py::error_already_set &e) { + cerr << "ERROR: Python raised exception from callback function: "; + cerr << e.what() << endl; + abort(); + } catch (py::cast_error &e) { + cerr << e.what() << endl; + cerr << "ERROR: Python callback does not return boolean value." << endl; + abort(); + } catch (std::exception &e) { + cerr << "Caught unknown exception in Python callback:" << e.what() + << endl; + abort(); + } + + } // End of scope in which the GIL is acquired + } // End of function }; void init_datahandler(py::module &m) { - /// The C++ class is PyIndataHandler, but for Python, it is called /// InDataHandler py::class_ pyidh(m, "InDataHandler"); @@ -256,29 +284,29 @@ void init_datahandler(py::module &m) { cval = clip.getCurrentValue(); } - return ColToNpy(cval); // something goes wrong here + return ColToNpy(cval); // something goes wrong here }); /// Real time Aps /// py::class_ rtaps(m, "RtAps"); - rtaps.def(py::init(), - py::arg("streammgr"), // StreamMgr + py::arg("streammgr"), // StreamMgr py::arg("preFilter").none(true), /// Below list of arguments *SHOULD* be same as for /// AvPowerSpectra constructor! - py::arg("nfft") = 2048, // - py::arg("windowType") = Window::WindowType::Hann, // - py::arg("overlap_percentage") = 50.0, // - py::arg("time_constant") = -1 // + py::arg("nfft") = 2048, // + py::arg("windowType") = Window::WindowType::Hann, // + py::arg("overlap_percentage") = 50.0, // + py::arg("time_constant") = -1 // ); rtaps.def("getCurrentValue", [](RtAps &rt) { @@ -293,10 +321,10 @@ void init_datahandler(py::module &m) { /// Real time Signal Viewer /// py::class_ rtsv(m, "RtSignalViewer"); - rtsv.def(py::init()); rtsv.def("getCurrentValue", [](RtSignalViewer &rt) { diff --git a/python_src/lasp/lasp_record.py b/python_src/lasp/lasp_record.py index c918ceb..8377a0c 100644 --- a/python_src/lasp/lasp_record.py +++ b/python_src/lasp/lasp_record.py @@ -190,7 +190,7 @@ class Recording: When returning False, it will stop the stream. """ - logger.debug(f"inCallback({adata})") + logger.debug(f"inCallback()") if self._stop(): logger.debug("Stop flag set, early return in inCallback") # Stop flag is raised. We do not add any data anymore. @@ -228,10 +228,10 @@ class Recording: self._progressCallback(recstatus) case RecordingState.AllDataStored: - pass + return False case RecordingState.Finished: - pass + return False return True