
146 lines
4.0 KiB
Raw Normal View History

#include "debugtrace.hpp"
#include "lasp_streammgr.h"
#include "lasp_threadedindatahandler.h"
#include <atomic>
#include <chrono>
#include <pybind11/buffer_info.h>
#include <pybind11/cast.h>
#include <pybind11/gil.h>
#include <pybind11/numpy.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <pybind11/stl.h>
#include <thread>
using namespace std::literals::chrono_literals;
using std::cerr;
using std::endl;
namespace py = pybind11;
* @brief Generate a Numpy array from daqdata, does *NOT* create a copy of the
* data!. Instead, it shares the data from the DaqData container.
* @tparam T The type of the stored sample
* @param d The daqdata to convert
* @return Numpy array
template <typename T> py::array_t<T> getPyArray(const DaqData &d) {
// When a valid object is passed as 'base', it tells pybind not to take
// ownership of the data, because 'base' will own it. In fact 'packet' will
// own it, but - psss! - , we don't tell it to pybind... Alos note that ANY
// valid object is good for this purpose, so I chose "str"...
py::str dummyDataOwner;
* Signature:
array_t(ShapeContainer shape,
StridesContainer strides,
const T *ptr = nullptr,
handle base = handle());
return py::array_t<T>(
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
py::array::StridesContainer( // Strides
sizeof(T) * d.nframes}), // Strides (in bytes) for each index
reinterpret_cast<T *>(
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
dummyDataOwner // As stated above, now Numpy does not take ownership of
// the data pointer.
* @brief Wraps the InDataHandler such that it calls a Python callback with a
* buffer of sample data. The Python callback is called from a different
* thread, using a Numpy argument as
class PyIndataHandler : public ThreadedInDataHandler {
* @brief The callback function that is called.
py::function cb;
PyIndataHandler(StreamMgr &mgr, py::function cb)
: ThreadedInDataHandler(mgr), cb(cb) {
/// TODO: Note that if start() throws an exception, which means that the
/// destructor of PyIndataHandler is not called and the thread destructor
/// calls terminate(). It is a kind of rude way to crash, but it is also
/// *very* unlikely to happen, as start() does only add a reference to this
/// handler to a list in the stream mgr.
~PyIndataHandler() {
* @brief Reads from the buffer
bool inCallback_threaded(const DaqData &d) {
using DataType = DataTypeDescriptor::DataType;
py::gil_scoped_acquire acquire;
try {
py::array binfo;
switch (d.dtype) {
case (DataType::dtype_int8):
binfo = getPyArray<int8_t>(d);
case (DataType::dtype_int16):
binfo = getPyArray<int16_t>(d);
case (DataType::dtype_int32):
binfo = getPyArray<int32_t>(d);
case (DataType::dtype_fl32):
binfo = getPyArray<float>(d);
case (DataType::dtype_fl64):
binfo = getPyArray<double>(d);
throw std::runtime_error("BUG");
} // End of switch
py::object bool_val = cb(binfo);
bool res = bool_val.cast<bool>();
if (!res)
return false;
} catch (py::error_already_set &e) {
cerr << "ERROR: Python raised exception from callback function: ";
cerr << e.what() << endl;
return false;
} catch (py::cast_error &e) {
cerr << e.what() << endl;
cerr << "ERROR: Python callback does not return boolean value." << endl;
return false;
return true;
void init_datahandler(py::module &m) {
py::class_<PyIndataHandler> h(m, "InDataHandler");
h.def(py::init<StreamMgr &, py::function>());