Weak refs to Recording methods. Made the mutexes more simple for stream manager. Added extra guards and statements here and there. Code passes a sever stress test.
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 2m3s Details
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped Details

This commit is contained in:
Anne de Jong 2024-03-13 12:19:24 +01:00
parent e24cac2805
commit e973f14884
9 changed files with 127 additions and 77 deletions

View File

@ -1,4 +1,4 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include "debugtrace.hpp"
#include "lasp_daqconfig.h"

View File

@ -1,4 +1,4 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include "lasp_indatahandler.h"
#include "debugtrace.hpp"
#include "lasp_streammgr.h"
@ -29,12 +29,14 @@ void InDataHandler::start() {
}
void InDataHandler::stop() {
DEBUGTRACE_ENTER;
checkRightThread();
// checkRightThread();
#if LASP_DEBUG == 1
stopCalled = true;
#endif
if (SmgrHandle handle = _mgr.lock()) {
handle->removeInDataHandler(*this);
if (SmgrHandle smgr = _mgr.lock()) {
smgr->removeInDataHandler(*this);
} else {
DEBUGTRACE_PRINT("No stream manager alive anymore!");
}
}
@ -42,7 +44,7 @@ InDataHandler::~InDataHandler() {
DEBUGTRACE_ENTER;
#if LASP_DEBUG == 1
checkRightThread();
// checkRightThread();
if (!stopCalled) {
std::cerr << "************ BUG: Stop function not called while arriving at "
"InDataHandler's destructor. Fix this by calling "

View File

@ -1,7 +1,8 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include "lasp_streammgr.h"
#include <assert.h>
#include <thread>
#include <algorithm>
#include <functional>
@ -14,6 +15,8 @@
#include "lasp_indatahandler.h"
#include "lasp_thread.h"
using namespace std::literals::chrono_literals;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
@ -27,7 +30,7 @@ using rte = std::runtime_error;
std::weak_ptr<StreamMgr> _mgr;
std::mutex _mgr_mutex;
using Lck = std::scoped_lock<std::mutex>;
using Lck = std::scoped_lock<std::recursive_mutex>;
/**
* @brief The only way to obtain a stream manager, can only be called from the
@ -38,11 +41,11 @@ using Lck = std::scoped_lock<std::mutex>;
SmgrHandle StreamMgr::getInstance() {
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_mgr_mutex);
auto mgr = _mgr.lock();
if (!mgr) {
// Double Check Locking Pattern, if two threads would simultaneously
// instantiate the singleton instance.
Lck lck(_mgr_mutex);
auto mgr = _mgr.lock();
if (mgr) {
@ -72,7 +75,7 @@ StreamMgr::StreamMgr()
{
DEBUGTRACE_ENTER;
// Trigger a scan for the available devices, in the background.
rescanDAQDevices(true);
rescanDAQDevices(false);
}
#if LASP_DEBUG == 1
void StreamMgr::checkRightThread() const {
@ -84,37 +87,39 @@ void StreamMgr::rescanDAQDevices(bool background,
std::function<void()> callback) {
DEBUGTRACE_ENTER;
DEBUGTRACE_PRINT(background);
if(_scanningDevices) {
throw rte("A background device scan is already busy");
}
Lck lck(_mtx);
checkRightThread();
if (_inputStream || _outputStream) {
throw rte("Rescanning DAQ devices only possible when no stream is running");
}
if (!_devices_mtx.try_lock()) {
throw rte("A background DAQ device scan is probably already running");
}
_devices_mtx.unlock();
std::scoped_lock lck(_devices_mtx);
_devices.clear();
if (!background) {
_scanningDevices = true;
rescanDAQDevices_impl(callback);
} else {
DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread...");
_scanningDevices = true;
_pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback);
}
}
void StreamMgr::rescanDAQDevices_impl(std::function<void()> callback) {
DEBUGTRACE_ENTER;
std::scoped_lock lck(_devices_mtx);
Lck lck(_mtx);
_devices = DeviceInfo::getDeviceInfo();
if (callback) {
callback();
}
_scanningDevices = false;
}
void StreamMgr::inCallback(const DaqData &data) {
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
Lck lck(_mtx);
assert(_inputFilters.size() == data.nchannels);
@ -139,12 +144,14 @@ void StreamMgr::inCallback(const DaqData &data) {
}
}
DEBUGTRACE_PRINT("Calling incallback for handlers (filtered)...");
for (auto &handler : _inDataHandlers) {
handler->inCallback(input_filtered);
}
} else {
/// No input filters
DEBUGTRACE_PRINT("Calling incallback for handlers...");
for (auto &handler : _inDataHandlers) {
handler->inCallback(data);
}
@ -155,8 +162,7 @@ void StreamMgr::setSiggen(std::shared_ptr<Siggen> siggen) {
DEBUGTRACE_ENTER;
checkRightThread();
std::scoped_lock<std::mutex> lck(_siggen_mtx);
Lck lck(_mtx);
// If not set to nullptr, and a stream is running, we update the signal
// generator by resetting it.
if (isStreamRunningOK(StreamType::output) && siggen) {
@ -213,9 +219,9 @@ bool fillData(DaqData &data, const vd &signal) {
return true;
}
void StreamMgr::outCallback(DaqData &data) {
/* DEBUGTRACE_ENTER; */
DEBUGTRACE_ENTER;
std::scoped_lock<std::mutex> lck(_siggen_mtx);
Lck lck(_mtx);
if (_siggen) {
vd signal = _siggen->genSignal(data.nframes);
@ -244,7 +250,17 @@ void StreamMgr::outCallback(DaqData &data) {
StreamMgr::~StreamMgr() {
DEBUGTRACE_ENTER;
checkRightThread();
while (_scanningDevices) {
std::this_thread::sleep_for(10us);
}
#if LASP_DEBUG == 1
{ // Careful, this lock needs to be released to make sure the streams can
// obtain a lock to the stream manager.
Lck lck(_mtx);
checkRightThread();
}
#endif
// Stream manager now handled by shared pointer. Each indata handler gets a
// shared pointer to the stream manager, and stores a weak pointer to it.
// Hence, we do not have to do any cleanup here. It also makes sure that the
@ -260,6 +276,8 @@ StreamMgr::~StreamMgr() {
}
void StreamMgr::stopAllStreams() {
DEBUGTRACE_ENTER;
// No lock here!
// Lck lck(_mtx);
checkRightThread();
_inputStream.reset();
_outputStream.reset();
@ -267,6 +285,10 @@ void StreamMgr::stopAllStreams() {
void StreamMgr::startStream(const DaqConfiguration &config) {
DEBUGTRACE_ENTER;
if(_scanningDevices) {
throw rte("DAQ device scan is busy. Cannot start stream.");
}
Lck lck(_mtx);
checkRightThread();
bool isInput = std::count_if(config.inchannel_config.cbegin(),
@ -278,8 +300,6 @@ void StreamMgr::startStream(const DaqConfiguration &config) {
[](auto &i) { return i.enabled; }) > 0;
// Find the first device that matches with the configuration
std::scoped_lock lck(_devices_mtx);
DeviceInfo *devinfo = nullptr;
// Match configuration to a device in the list of devices
@ -411,7 +431,9 @@ void StreamMgr::stopStream(const StreamType t) {
}
/// Kills input stream
_inputStream.reset();
/// Send reset to all in data handlers
Lck lck(_mtx);
for (auto &handler : _inDataHandlers) {
handler->reset(nullptr);
}
@ -425,6 +447,7 @@ void StreamMgr::stopStream(const StreamType t) {
if (!_outputStream) {
throw rte("Output stream is not running");
}
Lck lck(_mtx);
_outputStream.reset();
} // end else
}
@ -432,9 +455,9 @@ void StreamMgr::stopStream(const StreamType t) {
void StreamMgr::addInDataHandler(InDataHandler *handler) {
DEBUGTRACE_ENTER;
Lck lck(_mtx);
checkRightThread();
assert(handler);
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
handler->reset(_inputStream.get());
if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) !=
@ -449,16 +472,17 @@ void StreamMgr::addInDataHandler(InDataHandler *handler) {
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
DEBUGTRACE_ENTER;
checkRightThread();
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
Lck lck(_mtx);
// checkRightThread();
_inDataHandlers.remove(&handler);
DEBUGTRACE_PRINT(_inDataHandlers.size());
}
Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {
/* DEBUGTRACE_ENTER; */
DEBUGTRACE_ENTER;
Lck lck(_mtx);
checkRightThread();
// Default constructor, says stream is not running, but also no errors
@ -471,6 +495,7 @@ Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {
}
const Daq *StreamMgr::getDaq(StreamType type) const {
Lck lck(_mtx);
checkRightThread();
if (type == StreamType::input) {

View File

@ -1,19 +1,19 @@
#pragma once
#include "lasp_daq.h"
#include "lasp_siggen.h"
#include "lasp_thread.h"
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include "lasp_daq.h"
#include "lasp_siggen.h"
#include "lasp_thread.h"
/** \addtogroup device
* @{
*/
class StreamMgr;
class InDataHandler;
class SeriesBiquad;
/**
@ -25,12 +25,15 @@ class SeriesBiquad;
* fact is asserted.
*/
class StreamMgr {
mutable std::recursive_mutex _mtx;
/**
* @brief Storage for streams.
*/
std::unique_ptr<Daq> _inputStream, _outputStream;
std::atomic<bool> _scanningDevices{false};
GlobalThreadPool _pool;
/**
@ -39,22 +42,18 @@ class StreamMgr {
* thread-safety.
*/
std::list<InDataHandler *> _inDataHandlers;
mutable std::mutex _inDataHandler_mtx;
/**
* @brief Signal generator in use to generate output data. Currently
* implemented as to generate the same data for all output channels.
*/
std::shared_ptr<Siggen> _siggen;
std::mutex _siggen_mtx;
/**
* @brief Filters on input stream. For example, a digital high pass filter.
*/
std::vector<std::unique_ptr<SeriesBiquad>> _inputFilters;
mutable std::recursive_mutex _devices_mtx;
/**
* @brief Current storage for the device list
*/
@ -67,9 +66,7 @@ class StreamMgr {
friend class InDataHandler;
friend class Siggen;
public:
public:
~StreamMgr();
enum class StreamType : us {
@ -100,9 +97,10 @@ class StreamMgr {
* @return A copy of the internal stored list of devices
*/
DeviceInfoList getDeviceInfo() const {
std::scoped_lock lck(_devices_mtx);
std::scoped_lock lck(_mtx);
DeviceInfoList d2;
for(const auto& dev: _devices) {
for (const auto &dev : _devices) {
assert(dev != nullptr);
d2.push_back(dev->clone());
}
return d2;
@ -118,9 +116,9 @@ class StreamMgr {
* set to true, the function returns immediately.
* @param callback Function to call when complete.
*/
void
rescanDAQDevices(bool background = false,
std::function<void()> callback = std::function<void()>());
void rescanDAQDevices(
bool background = false,
std::function<void()> callback = std::function<void()>());
/**
* @brief Start a stream based on given configuration.
@ -141,12 +139,12 @@ class StreamMgr {
}
bool isStreamRunning(const StreamType type) const {
switch (type) {
case (StreamType::input):
return bool(_inputStream);
break;
case (StreamType::output):
return bool(_outputStream);
break;
case (StreamType::input):
return bool(_inputStream);
break;
case (StreamType::output):
return bool(_outputStream);
break;
}
return false;
}
@ -193,11 +191,10 @@ class StreamMgr {
*/
void setSiggen(std::shared_ptr<Siggen> s);
private:
private:
void inCallback(const DaqData &data);
void outCallback(DaqData &data);
/**
* @brief Add an input data handler. The handler's inCallback() function is
* called with data when available. This function should *NOT* be called by

View File

@ -1,4 +1,4 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include "debugtrace.hpp"
#include "lasp_config.h"
@ -422,6 +422,7 @@ Daq::StreamStatus PortAudioDaq::getStreamStatus() const {
}
PortAudioDaq::~PortAudioDaq() {
DEBUGTRACE_ENTER;
PaError err;
assert(_stream);
if (Pa_IsStreamActive(_stream)) {
@ -445,7 +446,7 @@ int PortAudioDaq::memberPaCallback(const void *inputBuffer, void *outputBuffer,
unsigned long framesPerBuffer,
const PaStreamCallbackTimeInfo *timeInfo,
PaStreamCallbackFlags statusFlags) {
// DEBUGTRACE_ENTER;
DEBUGTRACE_ENTER;
typedef Daq::StreamStatus::StreamError se;
if (statusFlags & paPrimingOutput) {
// Initial output buffers generated. So nothing with input yet

View File

@ -1,4 +1,4 @@
/* #define DEBUGTRACE_ENABLED */
// #define DEBUGTRACE_ENABLED
#include "lasp_threadedindatahandler.h"
#include <future>
@ -78,7 +78,7 @@ void ThreadedInDataHandlerBase::startThread() {
_1),
resetCallback);
_thread_can_safely_run = true;
_thread_allowed_to_run = true;
_indatahandler->start();
}
@ -87,7 +87,7 @@ void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
DEBUGTRACE_ENTER;
// Early return in case object is under DESTRUCTION
if (!_thread_can_safely_run) return;
if (!_thread_allowed_to_run) return;
_queue->push(daqdata);
if (!_thread_running) {
@ -103,23 +103,26 @@ void ThreadedInDataHandlerBase::stopThread() {
throw rte("BUG: ThreadedIndataHandler not running");
}
// Stop the existing thread
_thread_allowed_to_run = false;
// Make sure no new data arrives
_indatahandler->stop();
_indatahandler.reset();
// Stop the existing thread
_thread_can_safely_run = false;
DEBUGTRACE_PRINT("Indatahandler stopped. Waiting for thread to finish...");
// Then wait in steps for the thread to stop running.
while (_thread_running) {
std::this_thread::sleep_for(10us);
}
DEBUGTRACE_PRINT("Thread stopped");
// Kill the handler
_indatahandler.reset();
DEBUGTRACE_PRINT("Handler resetted");
}
ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
DEBUGTRACE_ENTER;
if (_thread_can_safely_run) {
if (_thread_allowed_to_run) {
stopThread();
cerr << "*** BUG: InDataHandlers have not been all stopped, while "
"StreamMgr destructor is called. This is a misuse BUG."
@ -131,7 +134,7 @@ ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
void ThreadedInDataHandlerBase::threadFcn() {
DEBUGTRACE_ENTER;
while (!_queue->empty() && _thread_can_safely_run) {
while (!_queue->empty() && _thread_allowed_to_run) {
// Call inCallback_threaded
inCallback(_queue->pop());
}

View File

@ -46,7 +46,7 @@ class ThreadedInDataHandlerBase {
std::unique_ptr<InDataHandler> _indatahandler;
std::atomic<bool> _thread_running{false};
std::atomic<bool> _thread_can_safely_run{false};
std::atomic<bool> _thread_allowed_to_run{false};
GlobalThreadPool _pool;

View File

@ -19,6 +19,8 @@
using namespace std::literals::chrono_literals;
using std::cerr;
using std::endl;
using rte = std::runtime_error;
using Lck = std::scoped_lock<std::recursive_mutex>;
namespace py = pybind11;
@ -106,8 +108,9 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
/**
* @brief The callback functions that is called.
*/
py::weakref _cb, _reset_callback;
py::object _cb, _reset_callback;
std::atomic<bool> _done{false};
std::recursive_mutex _mtx;
public:
/**
@ -120,18 +123,26 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
* is called, when a stream stops, this pointer / handle will dangle.
*/
PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback)
: ThreadedInDataHandler(mgr), _cb(cb), _reset_callback(reset_callback) {
: ThreadedInDataHandler(mgr),
_cb(py::weakref(cb)),
_reset_callback(py::weakref(reset_callback)) {
DEBUGTRACE_ENTER;
// cerr << "Thread ID: " << std::this_thread::get_id() << endl;
/// Start should be called externally, as at constructor time no virtual
/// functions should be called.
py::gil_scoped_release release;
if (_cb().is_none() || _reset_callback().is_none()) {
throw rte("cb or reset_callback is none!");
}
startThread();
}
~PyIndataHandler() {
DEBUGTRACE_ENTER;
// cerr << "Thread ID: " << std::this_thread::get_id() << endl;
/// Callback cannot be called, which results in a deadlock on the GIL
/// without this release.
py::gil_scoped_release release;
DEBUGTRACE_PRINT("Gil released");
_done = true;
stopThread();
}
/**
@ -141,13 +152,13 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
*/
void reset(const Daq *daqi) {
DEBUGTRACE_ENTER;
// cerr << "Thread ID: " << std::this_thread::get_id() << endl;
if (_done) return;
{
py::gil_scoped_acquire acquire;
try {
py::object reset_callback = _reset_callback();
if (reset_callback.is_none()) {
DEBUGTRACE_PRINT("cb is none, weakref killed");
DEBUGTRACE_PRINT("reset_callback is none, weakref killed");
_done = true;
return;
}
@ -171,16 +182,16 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
<< endl;
abort();
}
}
}
} // end of GIL scope
} // end of function reset()
/**
* @brief Calls the Python callback method / function with a Numpy array of
* stream data.
*/
void inCallback(const DaqData &d) {
// DEBUGTRACE_ENTER;
// cerr << "Thread ID: " << std::this_thread::get_id() << endl;
DEBUGTRACE_ENTER;
// cerr << "=== Enter incallback for thread ID: " << std::this_thread::get_id() << endl;
using DataType = DataTypeDescriptor::DataType;
if (_done) {
@ -188,6 +199,7 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
return;
}
{
DEBUGTRACE_PRINT("================ TRYING TO OBTAIN GIL in inCallback...");
py::gil_scoped_acquire acquire;
try {
py::object py_bool;
@ -234,12 +246,13 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
// reset_callback.reset();
}
} catch (py::error_already_set &e) {
cerr << "ERROR: Python raised exception from callback function: ";
cerr << "ERROR (BUG): Python raised exception from callback function: ";
cerr << e.what() << endl;
abort();
} catch (py::cast_error &e) {
cerr << e.what() << endl;
cerr << "ERROR: Python callback does not return boolean value." << endl;
cerr << "ERROR (BUG): Python callback does not return boolean value."
<< endl;
abort();
} catch (std::exception &e) {
cerr << "Caught unknown exception in Python callback:" << e.what()
@ -248,7 +261,9 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
}
} // End of scope in which the GIL is acquired
} // End of function
// cerr << "=== LEAVE incallback for thread ID: " << std::this_thread::get_id() << endl;
} // End of function inCallback()
};
void init_datahandler(py::module &m) {

View File

@ -126,8 +126,15 @@ class Recording:
logger.debug("Starting record....")
# In the PyInDataHandler, a weak reference is stored to the python
# methods reset and incallback. One way or another, the weak ref is gone
# on the callback thread. If we store an "extra" ref to this method over
# here, the weak ref stays alive. We do not know whether this is a bug
# or a feature, but in any case storing this extra ref to inCallback
# solves the problem.
self._incalback_cpy = self.inCallback
self._indataHandler = InDataHandler(
streammgr, self.inCallback, self.resetCallback
streammgr, self._incalback_cpy, self.resetCallback
)
if wait: