// #define DEBUGTRACE_ENABLED #include #include #include #include #include "arma_npy.h" #include "debugtrace.hpp" #include "lasp_clip.h" #include "lasp_daq.h" #include "lasp_daqdata.h" #include "lasp_ppm.h" #include "lasp_rtaps.h" #include "lasp_rtsignalviewer.h" #include "lasp_streammgr.h" #include "lasp_threadedindatahandler.h" using namespace std::literals::chrono_literals; using std::cerr; using std::endl; using rte = std::runtime_error; using Lck = std::scoped_lock; namespace py = pybind11; /** * @brief Generate a Numpy array from daqdata, does *NOT* create a copy of the * data!. Instead, it shares the data from the DaqData container. * * @tparam T The type of the stored sample * @param d The daqdata to convert * * @return Numpy array */ template py::array_t getPyArrayNoCpy(const DaqData &d) { // https://github.com/pybind/pybind11/issues/323 // // When a valid object is passed as 'base', it tells pybind not to take // ownership of the data, because 'base' will own it. In fact 'packet' will // own it, but - psss! - , we don't tell it to pybind... Alos note that ANY // valid object is good for this purpose, so I choose "str"... py::str dummyDataOwner; /* * Signature: array_t(ShapeContainer shape, StridesContainer strides, const T *ptr = nullptr, handle base = handle()); */ return py::array_t( py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape py::array::StridesContainer( // Strides {sizeof(T), sizeof(T) * d.nframes}), // Strides (in bytes) for each index reinterpret_cast( const_cast(d).raw_ptr()), // Pointer to buffer dummyDataOwner // As stated above, now Numpy does not take ownership of // the data pointer. ); } template py::array_t dmat_to_ndarray(const DaqData &d) { // https://github.com/pybind/pybind11/issues/323 // // When a valid object is passed as 'base', it tells pybind not to take // ownership of the data, because 'base' will own it. In fact 'packet' will // own it, but - psss! - , we don't tell it to pybind... Alos note that ANY // valid object is good for this purpose, so I choose "str"... py::str dummyDataOwner; /* * Signature: array_t(ShapeContainer shape, StridesContainer strides, const T *ptr = nullptr, handle base = handle()); */ return py::array_t( py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape py::array::StridesContainer( // Strides {sizeof(T), sizeof(T) * d.nframes}), // Strides (in bytes) for each index reinterpret_cast( const_cast(d).raw_ptr()), // Pointer to buffer dummyDataOwner // As stated above, now Numpy does not take ownership of // the data pointer. ); } /** * @brief Wraps the ThreadedInDataHandler such that it calls a Python callback * with a buffer of sample data. Converts DaqData objects to Numpy arrays and * calls Python given as argument to the constructor */ class PyIndataHandler : public ThreadedInDataHandler { /** * @brief The callback functions that is called. */ py::object _cb, _reset_callback; std::atomic _done{false}; std::recursive_mutex _mtx; public: /** * @brief Initialize PyIndataHandler * * @param mgr StreamMgr handle * @param cb Python callback that is called with Numpy input data from device * @param reset_callback Python callback that is called with a Daq pointer. * Careful: do not store this handle, as it is only valid as long as reset() * is called, when a stream stops, this pointer / handle will dangle. */ PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) : ThreadedInDataHandler(mgr), _cb(py::weakref(cb)), _reset_callback(py::weakref(reset_callback)) { DEBUGTRACE_ENTER; // cerr << "Thread ID: " << std::this_thread::get_id() << endl; /// Start should be called externally, as at constructor time no virtual /// functions should be called. if (_cb().is_none() || _reset_callback().is_none()) { throw rte("cb or reset_callback is none!"); } startThread(); } ~PyIndataHandler() { DEBUGTRACE_ENTER; // cerr << "Thread ID: " << std::this_thread::get_id() << endl; /// Callback cannot be called, which results in a deadlock on the GIL /// without this release. py::gil_scoped_release release; DEBUGTRACE_PRINT("Gil released"); _done = true; stopThread(); } /** * @brief Calls the reset callback in Python. * * @param daq Daq device, or nullptr in case no input stream is running. */ void reset(const Daq *daqi) { DEBUGTRACE_ENTER; // cerr << "Thread ID: " << std::this_thread::get_id() << endl; if (_done) return; { try { py::object reset_callback = _reset_callback(); if (reset_callback.is_none()) { DEBUGTRACE_PRINT("reset_callback is none, weakref killed"); _done = true; return; } if (daqi != nullptr) { assert(reset_callback); reset_callback(daqi); } else { assert(reset_callback); reset_callback(py::none()); } } catch (py::error_already_set &e) { cerr << "*************** Error calling reset callback!\n"; cerr << e.what() << endl; cerr << "*************** \n"; /// Throwing a runtime error here does not work out one way or another. /// Therefore, it is better to dive out and prevent undefined behaviour abort(); /* throw std::runtime_error(e.what()); */ } catch (std::exception &e) { cerr << "Caught unknown exception in reset callback:" << e.what() << endl; abort(); } } // end of GIL scope } // end of function reset() /** * @brief Calls the Python callback method / function with a Numpy array of * stream data. */ void inCallback(const DaqData &d) { DEBUGTRACE_ENTER; // cerr << "=== Enter incallback for thread ID: " << std::this_thread::get_id() << endl; using DataType = DataTypeDescriptor::DataType; if (_done) { DEBUGTRACE_PRINT("Early stop, done"); return; } { DEBUGTRACE_PRINT("================ TRYING TO OBTAIN GIL in inCallback..."); py::gil_scoped_acquire acquire; try { py::object py_bool; py::object cb = _cb(); if (cb.is_none()) { DEBUGTRACE_PRINT("cb is none, weakref killed"); _done = true; return; } switch (d.dtype) { case (DataType::dtype_int8): { py_bool = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_int16): { py_bool = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_int32): { py_bool = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_fl32): { py_bool = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_fl64): { py_bool = cb(getPyArrayNoCpy(d)); } break; default: throw std::runtime_error("BUG"); } // End of switch bool res = py_bool.cast(); if (res == false) { DEBUGTRACE_PRINT("Setting callbacks to None") _done = true; // By doing this, we remove the references, but in the mean time this // might also trigger removing Python objects. Including itself, as // there is no reference to it anymore. The consequence is that the // current object might be destroyed from this thread. However, if we // do not remove these references and in lasp_record.py finish() is // not called, we end up with not-garbage collected recordings in // memory. This is also not good. How can we force Python to not yet // destroy this object? // cb.reset(); // reset_callback.reset(); } } catch (py::error_already_set &e) { cerr << "ERROR (BUG): Python raised exception from callback function: "; cerr << e.what() << endl; abort(); } catch (py::cast_error &e) { cerr << e.what() << endl; cerr << "ERROR (BUG): Python callback does not return boolean value." << endl; abort(); } catch (std::exception &e) { cerr << "Caught unknown exception in Python callback:" << e.what() << endl; abort(); } } // End of scope in which the GIL is acquired // cerr << "=== LEAVE incallback for thread ID: " << std::this_thread::get_id() << endl; } // End of function inCallback() }; void init_datahandler(py::module &m) { /// The C++ class is PyIndataHandler, but for Python, it is called /// InDataHandler py::class_ pyidh(m, "InDataHandler"); pyidh.def(py::init()); /// Peak Programme Meter py::class_ ppm(m, "PPMHandler"); ppm.def(py::init()); ppm.def(py::init()); ppm.def("getCurrentValue", [](const PPMHandler &ppm) { std::tuple tp; { py::gil_scoped_release release; tp = ppm.getCurrentValue(); } return py::make_tuple(ColToNpy(std::get<0>(tp)), ColToNpy(std::get<1>(tp))); }); /// Clip Detector py::class_ clip(m, "ClipHandler"); clip.def(py::init()); clip.def("getCurrentValue", [](const ClipHandler &clip) { arma::uvec cval; { py::gil_scoped_release release; cval = clip.getCurrentValue(); } return ColToNpy(cval); // something goes wrong here }); /// Real time Aps /// py::class_ rtaps(m, "RtAps"); rtaps.def(py::init(), py::arg("streammgr"), // StreamMgr py::arg("preFilter").none(true), /// Below list of arguments *SHOULD* be same as for /// AvPowerSpectra constructor! py::arg("nfft") = 2048, // py::arg("windowType") = Window::WindowType::Hann, // py::arg("overlap_percentage") = 50.0, // py::arg("time_constant") = -1 // ); rtaps.def("getCurrentValue", [](RtAps &rt) { ccube val; { py::gil_scoped_release release; val = rt.getCurrentValue(); } return CubeToNpy(val); }); /// Real time Signal Viewer /// py::class_ rtsv(m, "RtSignalViewer"); rtsv.def(py::init()); rtsv.def("getCurrentValue", [](RtSignalViewer &rt) { dmat val; { py::gil_scoped_release release; val = rt.getCurrentValue(); } return MatToNpy(val); }); }