#define DEBUGTRACE_ENABLED #include "debugtrace.hpp" #include "lasp_streammgr.h" #include "lasp_threadedindatahandler.h" #include #include #include #include #include #include #include #include #include #include using namespace std::literals::chrono_literals; using std::cerr; using std::endl; namespace py = pybind11; /** * @brief Generate a Numpy array from daqdata, does *NOT* create a copy of the * data!. Instead, it shares the data from the DaqData container. * * @tparam T The type of the stored sample * @param d The daqdata to convert * * @return Numpy array */ template py::array_t getPyArray(const DaqData &d) { // https://github.com/pybind/pybind11/issues/323 // // When a valid object is passed as 'base', it tells pybind not to take // ownership of the data, because 'base' will own it. In fact 'packet' will // own it, but - psss! - , we don't tell it to pybind... Alos note that ANY // valid object is good for this purpose, so I chose "str"... py::str dummyDataOwner; /* * Signature: array_t(ShapeContainer shape, StridesContainer strides, const T *ptr = nullptr, handle base = handle()); */ return py::array_t( py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape py::array::StridesContainer( // Strides {sizeof(T), sizeof(T) * d.nframes}), // Strides (in bytes) for each index reinterpret_cast( const_cast(d).raw_ptr()), // Pointer to buffer dummyDataOwner // As stated above, now Numpy does not take ownership of // the data pointer. ); } /** * @brief Wraps the InDataHandler such that it calls a Python callback with a * buffer of sample data. The Python callback is called from a different * thread, using a Numpy argument as */ class PyIndataHandler : public ThreadedInDataHandler { /** * @brief The callback function that is called. */ py::function cb; public: PyIndataHandler(StreamMgr &mgr, py::function cb) : ThreadedInDataHandler(mgr), cb(cb) { DEBUGTRACE_ENTER; /// TODO: Note that if start() throws an exception, which means that the /// destructor of PyIndataHandler is not called and the thread destructor /// calls terminate(). It is a kind of rude way to crash, but it is also /// *very* unlikely to happen, as start() does only add a reference to this /// handler to a list in the stream mgr. start(); } ~PyIndataHandler() { DEBUGTRACE_ENTER; stop(); } /** * @brief Reads from the buffer */ bool inCallback_threaded(const DaqData &d) { /* DEBUGTRACE_ENTER; */ using DataType = DataTypeDescriptor::DataType; py::gil_scoped_acquire acquire; try { py::object bool_val; switch (d.dtype) { case (DataType::dtype_int8): { bool_val = cb(getPyArray(d)); } break; case (DataType::dtype_int16): { bool_val = cb(getPyArray(d)); } break; case (DataType::dtype_int32): { bool_val = cb(getPyArray(d)); } break; case (DataType::dtype_fl32): { bool_val = cb(getPyArray(d)); } break; case (DataType::dtype_fl64): { bool_val = cb(getPyArray(d)); } break; default: throw std::runtime_error("BUG"); } // End of switch bool res = bool_val.cast(); if (!res) return false; } catch (py::error_already_set &e) { cerr << "ERROR: Python raised exception from callback function: "; cerr << e.what() << endl; return false; } catch (py::cast_error &e) { cerr << e.what() << endl; cerr << "ERROR: Python callback does not return boolean value." << endl; return false; } return true; } }; void init_datahandler(py::module &m) { py::class_ h(m, "InDataHandler"); h.def(py::init()); }