Compare commits

...

2 Commits

Author SHA1 Message Date
Anne de Jong e973f14884 Weak refs to Recording methods. Made the mutexes more simple for stream manager. Added extra guards and statements here and there. Code passes a sever stress test.
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 2m3s Details
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped Details
2024-03-13 12:19:24 +01:00
Anne de Jong e24cac2805 Some more bugfixes: weak references stored in indatahandler, to avoid calling destructor from wrong thread. Removed some unneccessary include statements on the way 2024-03-12 21:13:13 +01:00
13 changed files with 255 additions and 186 deletions

View File

@ -1,4 +1,4 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include "debugtrace.hpp"
#include "lasp_daqconfig.h"

View File

@ -1,4 +1,4 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include "lasp_indatahandler.h"
#include "debugtrace.hpp"
#include "lasp_streammgr.h"
@ -29,12 +29,14 @@ void InDataHandler::start() {
}
void InDataHandler::stop() {
DEBUGTRACE_ENTER;
checkRightThread();
// checkRightThread();
#if LASP_DEBUG == 1
stopCalled = true;
#endif
if (SmgrHandle handle = _mgr.lock()) {
handle->removeInDataHandler(*this);
if (SmgrHandle smgr = _mgr.lock()) {
smgr->removeInDataHandler(*this);
} else {
DEBUGTRACE_PRINT("No stream manager alive anymore!");
}
}
@ -42,7 +44,7 @@ InDataHandler::~InDataHandler() {
DEBUGTRACE_ENTER;
#if LASP_DEBUG == 1
checkRightThread();
// checkRightThread();
if (!stopCalled) {
std::cerr << "************ BUG: Stop function not called while arriving at "
"InDataHandler's destructor. Fix this by calling "

View File

@ -1,7 +1,8 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include "lasp_streammgr.h"
#include <assert.h>
#include <thread>
#include <algorithm>
#include <functional>
@ -14,6 +15,8 @@
#include "lasp_indatahandler.h"
#include "lasp_thread.h"
using namespace std::literals::chrono_literals;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
@ -27,7 +30,7 @@ using rte = std::runtime_error;
std::weak_ptr<StreamMgr> _mgr;
std::mutex _mgr_mutex;
using Lck = std::scoped_lock<std::mutex>;
using Lck = std::scoped_lock<std::recursive_mutex>;
/**
* @brief The only way to obtain a stream manager, can only be called from the
@ -38,11 +41,11 @@ using Lck = std::scoped_lock<std::mutex>;
SmgrHandle StreamMgr::getInstance() {
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mgr_mutex);
auto mgr = _mgr.lock();
if (!mgr) {
// Double Check Locking Pattern, if two threads would simultaneously
// instantiate the singleton instance.
Lck lck(_mgr_mutex);
auto mgr = _mgr.lock();
if (mgr) {
@ -72,7 +75,7 @@ StreamMgr::StreamMgr()
{
DEBUGTRACE_ENTER;
// Trigger a scan for the available devices, in the background.
rescanDAQDevices(true);
rescanDAQDevices(false);
}
#if LASP_DEBUG == 1
void StreamMgr::checkRightThread() const {
@ -84,37 +87,39 @@ void StreamMgr::rescanDAQDevices(bool background,
std::function<void()> callback) {
DEBUGTRACE_ENTER;
DEBUGTRACE_PRINT(background);
if(_scanningDevices) {
throw rte("A background device scan is already busy");
}
Lck lck(_mtx);
checkRightThread();
if (_inputStream || _outputStream) {
throw rte("Rescanning DAQ devices only possible when no stream is running");
}
if (!_devices_mtx.try_lock()) {
throw rte("A background DAQ device scan is probably already running");
}
_devices_mtx.unlock();
std::scoped_lock lck(_devices_mtx);
_devices.clear();
if (!background) {
_scanningDevices = true;
rescanDAQDevices_impl(callback);
} else {
DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread...");
_scanningDevices = true;
_pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback);
}
}
void StreamMgr::rescanDAQDevices_impl(std::function<void()> callback) {
DEBUGTRACE_ENTER;
std::scoped_lock lck(_devices_mtx);
Lck lck(_mtx);
_devices = DeviceInfo::getDeviceInfo();
if (callback) {
callback();
}
_scanningDevices = false;
}
void StreamMgr::inCallback(const DaqData &data) {
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
Lck lck(_mtx);
assert(_inputFilters.size() == data.nchannels);
@ -139,12 +144,14 @@ void StreamMgr::inCallback(const DaqData &data) {
}
}
DEBUGTRACE_PRINT("Calling incallback for handlers (filtered)...");
for (auto &handler : _inDataHandlers) {
handler->inCallback(input_filtered);
}
} else {
/// No input filters
DEBUGTRACE_PRINT("Calling incallback for handlers...");
for (auto &handler : _inDataHandlers) {
handler->inCallback(data);
}
@ -155,8 +162,7 @@ void StreamMgr::setSiggen(std::shared_ptr<Siggen> siggen) {
DEBUGTRACE_ENTER;
checkRightThread();
std::scoped_lock<std::mutex> lck(_siggen_mtx);
Lck lck(_mtx);
// If not set to nullptr, and a stream is running, we update the signal
// generator by resetting it.
if (isStreamRunningOK(StreamType::output) && siggen) {
@ -213,9 +219,9 @@ bool fillData(DaqData &data, const vd &signal) {
return true;
}
void StreamMgr::outCallback(DaqData &data) {
/* DEBUGTRACE_ENTER; */
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_siggen_mtx);
Lck lck(_mtx);
if (_siggen) {
vd signal = _siggen->genSignal(data.nframes);
@ -244,7 +250,17 @@ void StreamMgr::outCallback(DaqData &data) {
StreamMgr::~StreamMgr() {
DEBUGTRACE_ENTER;
checkRightThread();
while (_scanningDevices) {
std::this_thread::sleep_for(10us);
}
#if LASP_DEBUG == 1
{ // Careful, this lock needs to be released to make sure the streams can
// obtain a lock to the stream manager.
Lck lck(_mtx);
checkRightThread();
}
#endif
// Stream manager now handled by shared pointer. Each indata handler gets a
// shared pointer to the stream manager, and stores a weak pointer to it.
// Hence, we do not have to do any cleanup here. It also makes sure that the
@ -260,6 +276,8 @@ StreamMgr::~StreamMgr() {
}
void StreamMgr::stopAllStreams() {
DEBUGTRACE_ENTER;
// No lock here!
// Lck lck(_mtx);
checkRightThread();
_inputStream.reset();
_outputStream.reset();
@ -267,6 +285,10 @@ void StreamMgr::stopAllStreams() {
void StreamMgr::startStream(const DaqConfiguration &config) {
DEBUGTRACE_ENTER;
if(_scanningDevices) {
throw rte("DAQ device scan is busy. Cannot start stream.");
}
Lck lck(_mtx);
checkRightThread();
bool isInput = std::count_if(config.inchannel_config.cbegin(),
@ -278,8 +300,6 @@ void StreamMgr::startStream(const DaqConfiguration &config) {
[](auto &i) { return i.enabled; }) > 0;
// Find the first device that matches with the configuration
std::scoped_lock lck(_devices_mtx);
DeviceInfo *devinfo = nullptr;
// Match configuration to a device in the list of devices
@ -411,7 +431,9 @@ void StreamMgr::stopStream(const StreamType t) {
}
/// Kills input stream
_inputStream.reset();
/// Send reset to all in data handlers
Lck lck(_mtx);
for (auto &handler : _inDataHandlers) {
handler->reset(nullptr);
}
@ -425,6 +447,7 @@ void StreamMgr::stopStream(const StreamType t) {
if (!_outputStream) {
throw rte("Output stream is not running");
}
Lck lck(_mtx);
_outputStream.reset();
} // end else
}
@ -432,9 +455,9 @@ void StreamMgr::stopStream(const StreamType t) {
void StreamMgr::addInDataHandler(InDataHandler *handler) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
checkRightThread();
assert(handler);
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
handler->reset(_inputStream.get());
if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) !=
@ -449,16 +472,17 @@ void StreamMgr::addInDataHandler(InDataHandler *handler) {
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
DEBUGTRACE_ENTER;
checkRightThread();
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
Lck lck(_mtx);
// checkRightThread();
_inDataHandlers.remove(&handler);
DEBUGTRACE_PRINT(_inDataHandlers.size());
}
Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {
/* DEBUGTRACE_ENTER; */
DEBUGTRACE_ENTER;
Lck lck(_mtx);
checkRightThread();
// Default constructor, says stream is not running, but also no errors
@ -471,6 +495,7 @@ Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {
}
const Daq *StreamMgr::getDaq(StreamType type) const {
Lck lck(_mtx);
checkRightThread();
if (type == StreamType::input) {

View File

@ -1,19 +1,19 @@
#pragma once
#include "lasp_daq.h"
#include "lasp_siggen.h"
#include "lasp_thread.h"
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include "lasp_daq.h"
#include "lasp_siggen.h"
#include "lasp_thread.h"
/** \addtogroup device
* @{
*/
class StreamMgr;
class InDataHandler;
class SeriesBiquad;
/**
@ -25,12 +25,15 @@ class SeriesBiquad;
* fact is asserted.
*/
class StreamMgr {
mutable std::recursive_mutex _mtx;
/**
* @brief Storage for streams.
*/
std::unique_ptr<Daq> _inputStream, _outputStream;
std::atomic<bool> _scanningDevices{false};
GlobalThreadPool _pool;
/**
@ -39,22 +42,18 @@ class StreamMgr {
* thread-safety.
*/
std::list<InDataHandler *> _inDataHandlers;
mutable std::mutex _inDataHandler_mtx;
/**
* @brief Signal generator in use to generate output data. Currently
* implemented as to generate the same data for all output channels.
*/
std::shared_ptr<Siggen> _siggen;
std::mutex _siggen_mtx;
/**
* @brief Filters on input stream. For example, a digital high pass filter.
*/
std::vector<std::unique_ptr<SeriesBiquad>> _inputFilters;
mutable std::recursive_mutex _devices_mtx;
/**
* @brief Current storage for the device list
*/
@ -67,9 +66,7 @@ class StreamMgr {
friend class InDataHandler;
friend class Siggen;
public:
public:
~StreamMgr();
enum class StreamType : us {
@ -100,9 +97,10 @@ class StreamMgr {
* @return A copy of the internal stored list of devices
*/
DeviceInfoList getDeviceInfo() const {
std::scoped_lock lck(_devices_mtx);
std::scoped_lock lck(_mtx);
DeviceInfoList d2;
for(const auto& dev: _devices) {
for (const auto &dev : _devices) {
assert(dev != nullptr);
d2.push_back(dev->clone());
}
return d2;
@ -118,9 +116,9 @@ class StreamMgr {
* set to true, the function returns immediately.
* @param callback Function to call when complete.
*/
void
rescanDAQDevices(bool background = false,
std::function<void()> callback = std::function<void()>());
void rescanDAQDevices(
bool background = false,
std::function<void()> callback = std::function<void()>());
/**
* @brief Start a stream based on given configuration.
@ -141,12 +139,12 @@ class StreamMgr {
}
bool isStreamRunning(const StreamType type) const {
switch (type) {
case (StreamType::input):
return bool(_inputStream);
break;
case (StreamType::output):
return bool(_outputStream);
break;
case (StreamType::input):
return bool(_inputStream);
break;
case (StreamType::output):
return bool(_outputStream);
break;
}
return false;
}
@ -193,11 +191,10 @@ class StreamMgr {
*/
void setSiggen(std::shared_ptr<Siggen> s);
private:
private:
void inCallback(const DaqData &data);
void outCallback(DaqData &data);
/**
* @brief Add an input data handler. The handler's inCallback() function is
* called with data when available. This function should *NOT* be called by

View File

@ -1,4 +1,4 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include "debugtrace.hpp"
#include "lasp_config.h"
@ -422,6 +422,7 @@ Daq::StreamStatus PortAudioDaq::getStreamStatus() const {
}
PortAudioDaq::~PortAudioDaq() {
DEBUGTRACE_ENTER;
PaError err;
assert(_stream);
if (Pa_IsStreamActive(_stream)) {
@ -445,7 +446,7 @@ int PortAudioDaq::memberPaCallback(const void *inputBuffer, void *outputBuffer,
unsigned long framesPerBuffer,
const PaStreamCallbackTimeInfo *timeInfo,
PaStreamCallbackFlags statusFlags) {
// DEBUGTRACE_ENTER;
DEBUGTRACE_ENTER;
typedef Daq::StreamStatus::StreamError se;
if (statusFlags & paPrimingOutput) {
// Initial output buffers generated. So nothing with input yet

View File

@ -2,12 +2,9 @@
#include "lasp_avpowerspectra.h"
#include "debugtrace.hpp"
#include "lasp_mathtypes.h"
#include <cmath>
#include <optional>
#include <stdexcept>
using rte = std::runtime_error;
using std::cerr;
using std::endl;
PowerSpectra::PowerSpectra(const us nfft, const Window::WindowType w)
: PowerSpectra(Window::create(w, nfft)) {}

View File

@ -3,8 +3,6 @@
#include "lasp_mathtypes.h"
#include "lasp_timebuffer.h"
#include "lasp_window.h"
#include <memory>
#include <optional>
/** \defgroup dsp Digital Signal Processing utilities
* These are classes and functions used for processing raw signal data, to

View File

@ -2,7 +2,6 @@
#include "lasp_biquadbank.h"
#include "debugtrace.hpp"
#include "lasp_thread.h"
#include <cassert>
#include <vector>
using std::cerr;

View File

@ -4,12 +4,9 @@
//
// Description: Real Time Signal Viewer.
#pragma once
#include "lasp_avpowerspectra.h"
#include "lasp_filter.h"
#include "lasp_mathtypes.h"
#include "lasp_threadedindatahandler.h"
#include "lasp_timebuffer.h"
#include <memory>
#include <mutex>
/**

View File

@ -1,4 +1,4 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include "lasp_threadedindatahandler.h"
#include <future>
@ -78,7 +78,7 @@ void ThreadedInDataHandlerBase::startThread() {
_1),
resetCallback);
_thread_can_safely_run = true;
_thread_allowed_to_run = true;
_indatahandler->start();
}
@ -87,7 +87,7 @@ void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
DEBUGTRACE_ENTER;
// Early return in case object is under DESTRUCTION
if (!_thread_can_safely_run) return;
if (!_thread_allowed_to_run) return;
_queue->push(daqdata);
if (!_thread_running) {
@ -103,23 +103,26 @@ void ThreadedInDataHandlerBase::stopThread() {
throw rte("BUG: ThreadedIndataHandler not running");
}
// Stop the existing thread
_thread_allowed_to_run = false;
// Make sure no new data arrives
_indatahandler->stop();
_indatahandler.reset();
// Stop the existing thread
_thread_can_safely_run = false;
DEBUGTRACE_PRINT("Indatahandler stopped. Waiting for thread to finish...");
// Then wait in steps for the thread to stop running.
while (_thread_running) {
std::this_thread::sleep_for(10us);
}
DEBUGTRACE_PRINT("Thread stopped");
// Kill the handler
_indatahandler.reset();
DEBUGTRACE_PRINT("Handler resetted");
}
ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
DEBUGTRACE_ENTER;
if (_thread_can_safely_run) {
if (_thread_allowed_to_run) {
stopThread();
cerr << "*** BUG: InDataHandlers have not been all stopped, while "
"StreamMgr destructor is called. This is a misuse BUG."
@ -131,7 +134,7 @@ ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
void ThreadedInDataHandlerBase::threadFcn() {
DEBUGTRACE_ENTER;
while (!_queue->empty() && _thread_can_safely_run) {
while (!_queue->empty() && _thread_allowed_to_run) {
// Call inCallback_threaded
inCallback(_queue->pop());
}

View File

@ -46,7 +46,7 @@ class ThreadedInDataHandlerBase {
std::unique_ptr<InDataHandler> _indatahandler;
std::atomic<bool> _thread_running{false};
std::atomic<bool> _thread_can_safely_run{false};
std::atomic<bool> _thread_allowed_to_run{false};
GlobalThreadPool _pool;

View File

@ -1,4 +1,10 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <armadillo>
#include <atomic>
#include "arma_npy.h"
#include "debugtrace.hpp"
#include "lasp_clip.h"
@ -9,14 +15,12 @@
#include "lasp_rtsignalviewer.h"
#include "lasp_streammgr.h"
#include "lasp_threadedindatahandler.h"
#include <armadillo>
#include <atomic>
#include <chrono>
#include <pybind11/pybind11.h>
using namespace std::literals::chrono_literals;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
using Lck = std::scoped_lock<std::recursive_mutex>;
namespace py = pybind11;
@ -48,17 +52,17 @@ py::array_t<T> getPyArrayNoCpy(const DaqData &d) {
*/
return py::array_t<T>(
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
py::array::StridesContainer( // Strides
py::array::StridesContainer( // Strides
{sizeof(T),
sizeof(T) * d.nframes}), // Strides (in bytes) for each index
sizeof(T) * d.nframes}), // Strides (in bytes) for each index
reinterpret_cast<T *>(
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
dummyDataOwner // As stated above, now Numpy does not take ownership of
// the data pointer.
dummyDataOwner // As stated above, now Numpy does not take ownership of
// the data pointer.
);
}
@ -81,17 +85,17 @@ py::array_t<d> dmat_to_ndarray(const DaqData &d) {
*/
return py::array_t<T>(
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
py::array::StridesContainer( // Strides
py::array::StridesContainer( // Strides
{sizeof(T),
sizeof(T) * d.nframes}), // Strides (in bytes) for each index
sizeof(T) * d.nframes}), // Strides (in bytes) for each index
reinterpret_cast<T *>(
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
dummyDataOwner // As stated above, now Numpy does not take ownership of
// the data pointer.
dummyDataOwner // As stated above, now Numpy does not take ownership of
// the data pointer.
);
}
@ -104,8 +108,9 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
/**
* @brief The callback functions that is called.
*/
std::unique_ptr<py::function> cb, reset_callback;
bool _done{false};
py::object _cb, _reset_callback;
std::atomic<bool> _done{false};
std::recursive_mutex _mtx;
public:
/**
@ -119,19 +124,25 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
*/
PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback)
: ThreadedInDataHandler(mgr),
cb(std::make_unique<py::function>(cb)),
reset_callback(std::make_unique<py::function>(reset_callback)) {
_cb(py::weakref(cb)),
_reset_callback(py::weakref(reset_callback)) {
DEBUGTRACE_ENTER;
// cerr << "Thread ID: " << std::this_thread::get_id() << endl;
/// Start should be called externally, as at constructor time no virtual
/// functions should be called.
py::gil_scoped_release release;
if (_cb().is_none() || _reset_callback().is_none()) {
throw rte("cb or reset_callback is none!");
}
startThread();
}
~PyIndataHandler() {
DEBUGTRACE_ENTER;
// cerr << "Thread ID: " << std::this_thread::get_id() << endl;
/// Callback cannot be called, which results in a deadlock on the GIL
/// without this release.
py::gil_scoped_release release;
DEBUGTRACE_PRINT("Gil released");
_done = true;
stopThread();
}
/**
@ -139,91 +150,123 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
*
* @param daq Daq device, or nullptr in case no input stream is running.
*/
void reset(const Daq *daq) {
void reset(const Daq *daqi) {
DEBUGTRACE_ENTER;
// cerr << "Thread ID: " << std::this_thread::get_id() << endl;
if (_done) return;
try {
py::gil_scoped_acquire acquire;
if (daq) {
(*reset_callback)(daq);
} else {
(*reset_callback)(py::none());
{
try {
py::object reset_callback = _reset_callback();
if (reset_callback.is_none()) {
DEBUGTRACE_PRINT("reset_callback is none, weakref killed");
_done = true;
return;
}
if (daqi != nullptr) {
assert(reset_callback);
reset_callback(daqi);
} else {
assert(reset_callback);
reset_callback(py::none());
}
} catch (py::error_already_set &e) {
cerr << "*************** Error calling reset callback!\n";
cerr << e.what() << endl;
cerr << "*************** \n";
/// Throwing a runtime error here does not work out one way or another.
/// Therefore, it is better to dive out and prevent undefined behaviour
abort();
/* throw std::runtime_error(e.what()); */
} catch (std::exception &e) {
cerr << "Caught unknown exception in reset callback:" << e.what()
<< endl;
abort();
}
} catch (py::error_already_set &e) {
cerr << "*************** Error calling reset callback!\n";
cerr << e.what() << endl;
cerr << "*************** \n";
/// Throwing a runtime error here does not work out one way or another.
/// Therefore, it is better to dive out and prevent undefined behaviour
abort();
/* throw std::runtime_error(e.what()); */
} catch (std::exception &e) {
cerr << "Caught unknown exception in reset callback:" << e.what() << endl;
abort();
}
}
} // end of GIL scope
} // end of function reset()
/**
* @brief Calls the Python callback method / function with a Numpy array of
* stream data.
*/
void inCallback(const DaqData &d) {
/* DEBUGTRACE_ENTER; */
DEBUGTRACE_ENTER;
// cerr << "=== Enter incallback for thread ID: " << std::this_thread::get_id() << endl;
using DataType = DataTypeDescriptor::DataType;
if (_done) return;
try {
py::gil_scoped_acquire acquire;
py::object bool_val;
switch (d.dtype) {
case (DataType::dtype_int8): {
bool_val = (*cb)(getPyArrayNoCpy<int8_t>(d));
} break;
case (DataType::dtype_int16): {
bool_val = (*cb)(getPyArrayNoCpy<int16_t>(d));
} break;
case (DataType::dtype_int32): {
bool_val = (*cb)(getPyArrayNoCpy<int32_t>(d));
} break;
case (DataType::dtype_fl32): {
bool_val = (*cb)(getPyArrayNoCpy<float>(d));
} break;
case (DataType::dtype_fl64): {
bool_val = (*cb)(getPyArrayNoCpy<double>(d));
} break;
default:
throw std::runtime_error("BUG");
} // End of switch
bool res = bool_val.cast<bool>();
if (res == false) {
DEBUGTRACE_PRINT("Setting callbacks to None")
_done = true;
// cb = py::function(py::none());
// reset_callback = py::function(py::none());
cb.reset();
reset_callback.reset();
}
} catch (py::error_already_set &e) {
cerr << "ERROR: Python raised exception from callback function: ";
cerr << e.what() << endl;
abort();
} catch (py::cast_error &e) {
cerr << e.what() << endl;
cerr << "ERROR: Python callback does not return boolean value." << endl;
abort();
} catch (std::exception &e) {
cerr << "Caught unknown exception in Python callback:" << e.what()
<< endl;
abort();
if (_done) {
DEBUGTRACE_PRINT("Early stop, done");
return;
}
}
{
DEBUGTRACE_PRINT("================ TRYING TO OBTAIN GIL in inCallback...");
py::gil_scoped_acquire acquire;
try {
py::object py_bool;
py::object cb = _cb();
if (cb.is_none()) {
DEBUGTRACE_PRINT("cb is none, weakref killed");
_done = true;
return;
}
switch (d.dtype) {
case (DataType::dtype_int8): {
py_bool = cb(getPyArrayNoCpy<int8_t>(d));
} break;
case (DataType::dtype_int16): {
py_bool = cb(getPyArrayNoCpy<int16_t>(d));
} break;
case (DataType::dtype_int32): {
py_bool = cb(getPyArrayNoCpy<int32_t>(d));
} break;
case (DataType::dtype_fl32): {
py_bool = cb(getPyArrayNoCpy<float>(d));
} break;
case (DataType::dtype_fl64): {
py_bool = cb(getPyArrayNoCpy<double>(d));
} break;
default:
throw std::runtime_error("BUG");
} // End of switch
bool res = py_bool.cast<bool>();
if (res == false) {
DEBUGTRACE_PRINT("Setting callbacks to None")
_done = true;
// By doing this, we remove the references, but in the mean time this
// might also trigger removing Python objects. Including itself, as
// there is no reference to it anymore. The consequence is that the
// current object might be destroyed from this thread. However, if we
// do not remove these references and in lasp_record.py finish() is
// not called, we end up with not-garbage collected recordings in
// memory. This is also not good. How can we force Python to not yet
// destroy this object?
// cb.reset();
// reset_callback.reset();
}
} catch (py::error_already_set &e) {
cerr << "ERROR (BUG): Python raised exception from callback function: ";
cerr << e.what() << endl;
abort();
} catch (py::cast_error &e) {
cerr << e.what() << endl;
cerr << "ERROR (BUG): Python callback does not return boolean value."
<< endl;
abort();
} catch (std::exception &e) {
cerr << "Caught unknown exception in Python callback:" << e.what()
<< endl;
abort();
}
} // End of scope in which the GIL is acquired
// cerr << "=== LEAVE incallback for thread ID: " << std::this_thread::get_id() << endl;
} // End of function inCallback()
};
void init_datahandler(py::module &m) {
/// The C++ class is PyIndataHandler, but for Python, it is called
/// InDataHandler
py::class_<PyIndataHandler> pyidh(m, "InDataHandler");
@ -256,29 +299,29 @@ void init_datahandler(py::module &m) {
cval = clip.getCurrentValue();
}
return ColToNpy<arma::uword>(cval); // something goes wrong here
return ColToNpy<arma::uword>(cval); // something goes wrong here
});
/// Real time Aps
///
py::class_<RtAps> rtaps(m, "RtAps");
rtaps.def(py::init<SmgrHandle, // StreamMgr
Filter *const, // FreqWeighting filter
const us, // Nfft
const Window::WindowType, // Window
const d, // Overlap percentage 0<=o<100
rtaps.def(py::init<SmgrHandle, // StreamMgr
Filter *const, // FreqWeighting filter
const us, // Nfft
const Window::WindowType, // Window
const d, // Overlap percentage 0<=o<100
const d // Time constant
const d // Time constant
>(),
py::arg("streammgr"), // StreamMgr
py::arg("streammgr"), // StreamMgr
py::arg("preFilter").none(true),
/// Below list of arguments *SHOULD* be same as for
/// AvPowerSpectra constructor!
py::arg("nfft") = 2048, //
py::arg("windowType") = Window::WindowType::Hann, //
py::arg("overlap_percentage") = 50.0, //
py::arg("time_constant") = -1 //
py::arg("nfft") = 2048, //
py::arg("windowType") = Window::WindowType::Hann, //
py::arg("overlap_percentage") = 50.0, //
py::arg("time_constant") = -1 //
);
rtaps.def("getCurrentValue", [](RtAps &rt) {
@ -293,10 +336,10 @@ void init_datahandler(py::module &m) {
/// Real time Signal Viewer
///
py::class_<RtSignalViewer> rtsv(m, "RtSignalViewer");
rtsv.def(py::init<SmgrHandle, // StreamMgr
const d, // Time history
const us, // Resolution
const us // Channel number
rtsv.def(py::init<SmgrHandle, // StreamMgr
const d, // Time history
const us, // Resolution
const us // Channel number
>());
rtsv.def("getCurrentValue", [](RtSignalViewer &rt) {

View File

@ -126,8 +126,15 @@ class Recording:
logger.debug("Starting record....")
# In the PyInDataHandler, a weak reference is stored to the python
# methods reset and incallback. One way or another, the weak ref is gone
# on the callback thread. If we store an "extra" ref to this method over
# here, the weak ref stays alive. We do not know whether this is a bug
# or a feature, but in any case storing this extra ref to inCallback
# solves the problem.
self._incalback_cpy = self.inCallback
self._indataHandler = InDataHandler(
streammgr, self.inCallback, self.resetCallback
streammgr, self._incalback_cpy, self.resetCallback
)
if wait:
@ -190,7 +197,7 @@ class Recording:
When returning False, it will stop the stream.
"""
logger.debug(f"inCallback({adata})")
logger.debug(f"inCallback()")
if self._stop():
logger.debug("Stop flag set, early return in inCallback")
# Stop flag is raised. We do not add any data anymore.
@ -228,10 +235,10 @@ class Recording:
self._progressCallback(recstatus)
case RecordingState.AllDataStored:
pass
return False
case RecordingState.Finished:
pass
return False
return True