Compare commits
7 Commits
15cd62baf8
...
da023273d8
Author | SHA1 | Date |
---|---|---|
Anne de Jong | da023273d8 | |
Anne de Jong | 84db689e56 | |
Anne de Jong | 83c7aa6ade | |
Anne de Jong | 3c16e33453 | |
Anne de Jong | e973f14884 | |
Anne de Jong | e24cac2805 | |
Anne de Jong | d0d494fcb2 |
|
@ -25,7 +25,7 @@ jobs:
|
||||||
|
|
||||||
- name: Cleanup old dist files and copy new to /dist dir
|
- name: Cleanup old dist files and copy new to /dist dir
|
||||||
run: |-
|
run: |-
|
||||||
rm /dist/*
|
rm -f /dist/*
|
||||||
cp -v dist/* /dist
|
cp -v dist/* /dist
|
||||||
|
|
||||||
Release-Ubuntu:
|
Release-Ubuntu:
|
||||||
|
|
|
@ -21,3 +21,6 @@ acme_log.log
|
||||||
.venv
|
.venv
|
||||||
.py-build-cmake_cache
|
.py-build-cmake_cache
|
||||||
cpp_src/lasp_config.h
|
cpp_src/lasp_config.h
|
||||||
|
.cache
|
||||||
|
.vscode
|
||||||
|
build
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/* #define DEBUGTRACE_ENABLED */
|
// #define DEBUGTRACE_ENABLED
|
||||||
#include "debugtrace.hpp"
|
#include "debugtrace.hpp"
|
||||||
#include "lasp_daqconfig.h"
|
#include "lasp_daqconfig.h"
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/* #define DEBUGTRACE_ENABLED */
|
// #define DEBUGTRACE_ENABLED
|
||||||
#include "lasp_indatahandler.h"
|
#include "lasp_indatahandler.h"
|
||||||
#include "debugtrace.hpp"
|
#include "debugtrace.hpp"
|
||||||
#include "lasp_streammgr.h"
|
#include "lasp_streammgr.h"
|
||||||
|
@ -29,12 +29,14 @@ void InDataHandler::start() {
|
||||||
}
|
}
|
||||||
void InDataHandler::stop() {
|
void InDataHandler::stop() {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
checkRightThread();
|
// checkRightThread();
|
||||||
#if LASP_DEBUG == 1
|
#if LASP_DEBUG == 1
|
||||||
stopCalled = true;
|
stopCalled = true;
|
||||||
#endif
|
#endif
|
||||||
if (SmgrHandle handle = _mgr.lock()) {
|
if (SmgrHandle smgr = _mgr.lock()) {
|
||||||
handle->removeInDataHandler(*this);
|
smgr->removeInDataHandler(*this);
|
||||||
|
} else {
|
||||||
|
DEBUGTRACE_PRINT("No stream manager alive anymore!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,7 +44,7 @@ InDataHandler::~InDataHandler() {
|
||||||
|
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
#if LASP_DEBUG == 1
|
#if LASP_DEBUG == 1
|
||||||
checkRightThread();
|
// checkRightThread();
|
||||||
if (!stopCalled) {
|
if (!stopCalled) {
|
||||||
std::cerr << "************ BUG: Stop function not called while arriving at "
|
std::cerr << "************ BUG: Stop function not called while arriving at "
|
||||||
"InDataHandler's destructor. Fix this by calling "
|
"InDataHandler's destructor. Fix this by calling "
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/* #define DEBUGTRACE_ENABLED */
|
// #define DEBUGTRACE_ENABLED
|
||||||
#include "lasp_streammgr.h"
|
#include "lasp_streammgr.h"
|
||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
@ -8,12 +8,15 @@
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
#include "debugtrace.hpp"
|
#include "debugtrace.hpp"
|
||||||
#include "lasp_biquadbank.h"
|
#include "lasp_biquadbank.h"
|
||||||
#include "lasp_indatahandler.h"
|
#include "lasp_indatahandler.h"
|
||||||
#include "lasp_thread.h"
|
#include "lasp_thread.h"
|
||||||
|
|
||||||
|
using namespace std::literals::chrono_literals;
|
||||||
|
|
||||||
using std::cerr;
|
using std::cerr;
|
||||||
using std::endl;
|
using std::endl;
|
||||||
using rte = std::runtime_error;
|
using rte = std::runtime_error;
|
||||||
|
@ -27,7 +30,7 @@ using rte = std::runtime_error;
|
||||||
std::weak_ptr<StreamMgr> _mgr;
|
std::weak_ptr<StreamMgr> _mgr;
|
||||||
std::mutex _mgr_mutex;
|
std::mutex _mgr_mutex;
|
||||||
|
|
||||||
using Lck = std::scoped_lock<std::mutex>;
|
using Lck = std::scoped_lock<std::recursive_mutex>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief The only way to obtain a stream manager, can only be called from the
|
* @brief The only way to obtain a stream manager, can only be called from the
|
||||||
|
@ -38,11 +41,11 @@ using Lck = std::scoped_lock<std::mutex>;
|
||||||
SmgrHandle StreamMgr::getInstance() {
|
SmgrHandle StreamMgr::getInstance() {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
|
|
||||||
|
std::scoped_lock<std::mutex> lck(_mgr_mutex);
|
||||||
auto mgr = _mgr.lock();
|
auto mgr = _mgr.lock();
|
||||||
if (!mgr) {
|
if (!mgr) {
|
||||||
// Double Check Locking Pattern, if two threads would simultaneously
|
// Double Check Locking Pattern, if two threads would simultaneously
|
||||||
// instantiate the singleton instance.
|
// instantiate the singleton instance.
|
||||||
Lck lck(_mgr_mutex);
|
|
||||||
|
|
||||||
auto mgr = _mgr.lock();
|
auto mgr = _mgr.lock();
|
||||||
if (mgr) {
|
if (mgr) {
|
||||||
|
@ -72,7 +75,7 @@ StreamMgr::StreamMgr()
|
||||||
{
|
{
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
// Trigger a scan for the available devices, in the background.
|
// Trigger a scan for the available devices, in the background.
|
||||||
rescanDAQDevices(true);
|
rescanDAQDevices(false);
|
||||||
}
|
}
|
||||||
#if LASP_DEBUG == 1
|
#if LASP_DEBUG == 1
|
||||||
void StreamMgr::checkRightThread() const {
|
void StreamMgr::checkRightThread() const {
|
||||||
|
@ -84,37 +87,39 @@ void StreamMgr::rescanDAQDevices(bool background,
|
||||||
std::function<void()> callback) {
|
std::function<void()> callback) {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
DEBUGTRACE_PRINT(background);
|
DEBUGTRACE_PRINT(background);
|
||||||
|
if (_scanningDevices) {
|
||||||
|
throw rte("A background device scan is already busy");
|
||||||
|
}
|
||||||
|
|
||||||
|
Lck lck(_mtx);
|
||||||
checkRightThread();
|
checkRightThread();
|
||||||
if (_inputStream || _outputStream) {
|
if (_inputStream || _outputStream) {
|
||||||
throw rte("Rescanning DAQ devices only possible when no stream is running");
|
throw rte("Rescanning DAQ devices only possible when no stream is running");
|
||||||
}
|
}
|
||||||
if (!_devices_mtx.try_lock()) {
|
|
||||||
throw rte("A background DAQ device scan is probably already running");
|
|
||||||
}
|
|
||||||
_devices_mtx.unlock();
|
|
||||||
|
|
||||||
std::scoped_lock lck(_devices_mtx);
|
|
||||||
_devices.clear();
|
_devices.clear();
|
||||||
if (!background) {
|
if (!background) {
|
||||||
|
_scanningDevices = true;
|
||||||
rescanDAQDevices_impl(callback);
|
rescanDAQDevices_impl(callback);
|
||||||
} else {
|
} else {
|
||||||
DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread...");
|
DEBUGTRACE_PRINT("Rescanning DAQ devices on different thread...");
|
||||||
|
_scanningDevices = true;
|
||||||
_pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback);
|
_pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void StreamMgr::rescanDAQDevices_impl(std::function<void()> callback) {
|
void StreamMgr::rescanDAQDevices_impl(std::function<void()> callback) {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
std::scoped_lock lck(_devices_mtx);
|
Lck lck(_mtx);
|
||||||
_devices = DeviceInfo::getDeviceInfo();
|
_devices = DeviceInfo::getDeviceInfo();
|
||||||
if (callback) {
|
if (callback) {
|
||||||
callback();
|
callback();
|
||||||
}
|
}
|
||||||
|
_scanningDevices = false;
|
||||||
}
|
}
|
||||||
void StreamMgr::inCallback(const DaqData &data) {
|
void StreamMgr::inCallback(const DaqData &data) {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
|
|
||||||
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
|
Lck lck(_mtx);
|
||||||
|
|
||||||
assert(_inputFilters.size() == data.nchannels);
|
assert(_inputFilters.size() == data.nchannels);
|
||||||
|
|
||||||
|
@ -139,12 +144,13 @@ void StreamMgr::inCallback(const DaqData &data) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DEBUGTRACE_PRINT("Calling incallback for handlers (filtered)...");
|
||||||
for (auto &handler : _inDataHandlers) {
|
for (auto &handler : _inDataHandlers) {
|
||||||
handler->inCallback(input_filtered);
|
handler->inCallback(input_filtered);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
/// No input filters
|
/// No input filters
|
||||||
|
DEBUGTRACE_PRINT("Calling incallback for handlers...");
|
||||||
for (auto &handler : _inDataHandlers) {
|
for (auto &handler : _inDataHandlers) {
|
||||||
handler->inCallback(data);
|
handler->inCallback(data);
|
||||||
}
|
}
|
||||||
|
@ -155,8 +161,7 @@ void StreamMgr::setSiggen(std::shared_ptr<Siggen> siggen) {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
checkRightThread();
|
checkRightThread();
|
||||||
|
|
||||||
std::scoped_lock<std::mutex> lck(_siggen_mtx);
|
Lck lck(_mtx);
|
||||||
|
|
||||||
// If not set to nullptr, and a stream is running, we update the signal
|
// If not set to nullptr, and a stream is running, we update the signal
|
||||||
// generator by resetting it.
|
// generator by resetting it.
|
||||||
if (isStreamRunningOK(StreamType::output) && siggen) {
|
if (isStreamRunningOK(StreamType::output) && siggen) {
|
||||||
|
@ -213,9 +218,9 @@ bool fillData(DaqData &data, const vd &signal) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
void StreamMgr::outCallback(DaqData &data) {
|
void StreamMgr::outCallback(DaqData &data) {
|
||||||
/* DEBUGTRACE_ENTER; */
|
DEBUGTRACE_ENTER;
|
||||||
|
|
||||||
std::scoped_lock<std::mutex> lck(_siggen_mtx);
|
Lck lck(_mtx);
|
||||||
|
|
||||||
if (_siggen) {
|
if (_siggen) {
|
||||||
vd signal = _siggen->genSignal(data.nframes);
|
vd signal = _siggen->genSignal(data.nframes);
|
||||||
|
@ -244,7 +249,17 @@ void StreamMgr::outCallback(DaqData &data) {
|
||||||
|
|
||||||
StreamMgr::~StreamMgr() {
|
StreamMgr::~StreamMgr() {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
checkRightThread();
|
while (_scanningDevices) {
|
||||||
|
std::this_thread::sleep_for(10us);
|
||||||
|
}
|
||||||
|
|
||||||
|
#if LASP_DEBUG == 1
|
||||||
|
{ // Careful, this lock needs to be released to make sure the streams can
|
||||||
|
// obtain a lock to the stream manager.
|
||||||
|
Lck lck(_mtx);
|
||||||
|
checkRightThread();
|
||||||
|
}
|
||||||
|
#endif
|
||||||
// Stream manager now handled by shared pointer. Each indata handler gets a
|
// Stream manager now handled by shared pointer. Each indata handler gets a
|
||||||
// shared pointer to the stream manager, and stores a weak pointer to it.
|
// shared pointer to the stream manager, and stores a weak pointer to it.
|
||||||
// Hence, we do not have to do any cleanup here. It also makes sure that the
|
// Hence, we do not have to do any cleanup here. It also makes sure that the
|
||||||
|
@ -260,13 +275,21 @@ StreamMgr::~StreamMgr() {
|
||||||
}
|
}
|
||||||
void StreamMgr::stopAllStreams() {
|
void StreamMgr::stopAllStreams() {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
checkRightThread();
|
{
|
||||||
|
Lck lck(_mtx);
|
||||||
|
checkRightThread();
|
||||||
|
}
|
||||||
|
// No lock here!
|
||||||
_inputStream.reset();
|
_inputStream.reset();
|
||||||
_outputStream.reset();
|
_outputStream.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
void StreamMgr::startStream(const DaqConfiguration &config) {
|
void StreamMgr::startStream(const DaqConfiguration &config) {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
|
if (_scanningDevices) {
|
||||||
|
throw rte("DAQ device scan is busy. Cannot start stream.");
|
||||||
|
}
|
||||||
|
Lck lck(_mtx);
|
||||||
checkRightThread();
|
checkRightThread();
|
||||||
|
|
||||||
bool isInput = std::count_if(config.inchannel_config.cbegin(),
|
bool isInput = std::count_if(config.inchannel_config.cbegin(),
|
||||||
|
@ -278,8 +301,6 @@ void StreamMgr::startStream(const DaqConfiguration &config) {
|
||||||
[](auto &i) { return i.enabled; }) > 0;
|
[](auto &i) { return i.enabled; }) > 0;
|
||||||
|
|
||||||
// Find the first device that matches with the configuration
|
// Find the first device that matches with the configuration
|
||||||
std::scoped_lock lck(_devices_mtx);
|
|
||||||
|
|
||||||
DeviceInfo *devinfo = nullptr;
|
DeviceInfo *devinfo = nullptr;
|
||||||
|
|
||||||
// Match configuration to a device in the list of devices
|
// Match configuration to a device in the list of devices
|
||||||
|
@ -404,37 +425,49 @@ void StreamMgr::startStream(const DaqConfiguration &config) {
|
||||||
void StreamMgr::stopStream(const StreamType t) {
|
void StreamMgr::stopStream(const StreamType t) {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
checkRightThread();
|
checkRightThread();
|
||||||
|
bool resetHandlers = false;
|
||||||
|
std::unique_ptr<Daq> *streamToStop = nullptr;
|
||||||
|
|
||||||
if (t == StreamType::input) {
|
{ // Mutex locked in this scope
|
||||||
if (!_inputStream) {
|
Lck lck(_mtx);
|
||||||
throw rte("Input stream is not running");
|
if (t == StreamType::input) {
|
||||||
|
if (!_inputStream) {
|
||||||
|
throw rte("Input stream is not running");
|
||||||
|
}
|
||||||
|
streamToStop = std::addressof(_inputStream);
|
||||||
|
resetHandlers = true;
|
||||||
|
} else {
|
||||||
|
/// t == output
|
||||||
|
/// Kill input stream in case that one is a duplex stream
|
||||||
|
if (_inputStream && _inputStream->duplexMode()) {
|
||||||
|
streamToStop = std::addressof(_inputStream);
|
||||||
|
} else {
|
||||||
|
if (!_outputStream) {
|
||||||
|
throw rte("Output stream is not running");
|
||||||
|
}
|
||||||
|
streamToStop = std::addressof(_outputStream);
|
||||||
|
} // end else
|
||||||
}
|
}
|
||||||
/// Kills input stream
|
} // End of mutex lock. When stopping stream, mutex should be unlocked.
|
||||||
_inputStream.reset();
|
|
||||||
/// Send reset to all in data handlers
|
// If we arrive here, we should have a stream to stop.
|
||||||
|
assert(streamToStop != nullptr);
|
||||||
|
streamToStop->reset();
|
||||||
|
|
||||||
|
/// Send reset to all in data handlers
|
||||||
|
if (resetHandlers) {
|
||||||
|
Lck lck(_mtx);
|
||||||
for (auto &handler : _inDataHandlers) {
|
for (auto &handler : _inDataHandlers) {
|
||||||
handler->reset(nullptr);
|
handler->reset(nullptr);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
/// t == output
|
|
||||||
|
|
||||||
/// Kill input stream in case that one is a duplex stream
|
|
||||||
if (_inputStream && _inputStream->duplexMode()) {
|
|
||||||
_inputStream.reset();
|
|
||||||
} else {
|
|
||||||
if (!_outputStream) {
|
|
||||||
throw rte("Output stream is not running");
|
|
||||||
}
|
|
||||||
_outputStream.reset();
|
|
||||||
} // end else
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void StreamMgr::addInDataHandler(InDataHandler *handler) {
|
void StreamMgr::addInDataHandler(InDataHandler *handler) {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
|
Lck lck(_mtx);
|
||||||
checkRightThread();
|
checkRightThread();
|
||||||
assert(handler);
|
assert(handler);
|
||||||
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
|
|
||||||
handler->reset(_inputStream.get());
|
handler->reset(_inputStream.get());
|
||||||
|
|
||||||
if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) !=
|
if (std::find(_inDataHandlers.cbegin(), _inDataHandlers.cend(), handler) !=
|
||||||
|
@ -449,16 +482,17 @@ void StreamMgr::addInDataHandler(InDataHandler *handler) {
|
||||||
|
|
||||||
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
|
void StreamMgr::removeInDataHandler(InDataHandler &handler) {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
checkRightThread();
|
Lck lck(_mtx);
|
||||||
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
|
// checkRightThread();
|
||||||
_inDataHandlers.remove(&handler);
|
_inDataHandlers.remove(&handler);
|
||||||
|
|
||||||
DEBUGTRACE_PRINT(_inDataHandlers.size());
|
DEBUGTRACE_PRINT(_inDataHandlers.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {
|
Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {
|
||||||
/* DEBUGTRACE_ENTER; */
|
DEBUGTRACE_ENTER;
|
||||||
|
|
||||||
|
Lck lck(_mtx);
|
||||||
checkRightThread();
|
checkRightThread();
|
||||||
// Default constructor, says stream is not running, but also no errors
|
// Default constructor, says stream is not running, but also no errors
|
||||||
|
|
||||||
|
@ -471,6 +505,7 @@ Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const {
|
||||||
}
|
}
|
||||||
|
|
||||||
const Daq *StreamMgr::getDaq(StreamType type) const {
|
const Daq *StreamMgr::getDaq(StreamType type) const {
|
||||||
|
Lck lck(_mtx);
|
||||||
checkRightThread();
|
checkRightThread();
|
||||||
|
|
||||||
if (type == StreamType::input) {
|
if (type == StreamType::input) {
|
||||||
|
|
|
@ -1,19 +1,19 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
#include "lasp_daq.h"
|
|
||||||
#include "lasp_siggen.h"
|
|
||||||
#include "lasp_thread.h"
|
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
|
#include "lasp_daq.h"
|
||||||
|
#include "lasp_siggen.h"
|
||||||
|
#include "lasp_thread.h"
|
||||||
|
|
||||||
/** \addtogroup device
|
/** \addtogroup device
|
||||||
* @{
|
* @{
|
||||||
*/
|
*/
|
||||||
class StreamMgr;
|
class StreamMgr;
|
||||||
class InDataHandler;
|
class InDataHandler;
|
||||||
|
|
||||||
|
|
||||||
class SeriesBiquad;
|
class SeriesBiquad;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -25,12 +25,15 @@ class SeriesBiquad;
|
||||||
* fact is asserted.
|
* fact is asserted.
|
||||||
*/
|
*/
|
||||||
class StreamMgr {
|
class StreamMgr {
|
||||||
|
mutable std::recursive_mutex _mtx;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Storage for streams.
|
* @brief Storage for streams.
|
||||||
*/
|
*/
|
||||||
std::unique_ptr<Daq> _inputStream, _outputStream;
|
std::unique_ptr<Daq> _inputStream, _outputStream;
|
||||||
|
|
||||||
|
std::atomic<bool> _scanningDevices{false};
|
||||||
|
|
||||||
GlobalThreadPool _pool;
|
GlobalThreadPool _pool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -39,22 +42,18 @@ class StreamMgr {
|
||||||
* thread-safety.
|
* thread-safety.
|
||||||
*/
|
*/
|
||||||
std::list<InDataHandler *> _inDataHandlers;
|
std::list<InDataHandler *> _inDataHandlers;
|
||||||
mutable std::mutex _inDataHandler_mtx;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Signal generator in use to generate output data. Currently
|
* @brief Signal generator in use to generate output data. Currently
|
||||||
* implemented as to generate the same data for all output channels.
|
* implemented as to generate the same data for all output channels.
|
||||||
*/
|
*/
|
||||||
std::shared_ptr<Siggen> _siggen;
|
std::shared_ptr<Siggen> _siggen;
|
||||||
std::mutex _siggen_mtx;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Filters on input stream. For example, a digital high pass filter.
|
* @brief Filters on input stream. For example, a digital high pass filter.
|
||||||
*/
|
*/
|
||||||
std::vector<std::unique_ptr<SeriesBiquad>> _inputFilters;
|
std::vector<std::unique_ptr<SeriesBiquad>> _inputFilters;
|
||||||
|
|
||||||
|
|
||||||
mutable std::recursive_mutex _devices_mtx;
|
|
||||||
/**
|
/**
|
||||||
* @brief Current storage for the device list
|
* @brief Current storage for the device list
|
||||||
*/
|
*/
|
||||||
|
@ -67,9 +66,7 @@ class StreamMgr {
|
||||||
friend class InDataHandler;
|
friend class InDataHandler;
|
||||||
friend class Siggen;
|
friend class Siggen;
|
||||||
|
|
||||||
|
public:
|
||||||
public:
|
|
||||||
|
|
||||||
~StreamMgr();
|
~StreamMgr();
|
||||||
|
|
||||||
enum class StreamType : us {
|
enum class StreamType : us {
|
||||||
|
@ -100,9 +97,10 @@ class StreamMgr {
|
||||||
* @return A copy of the internal stored list of devices
|
* @return A copy of the internal stored list of devices
|
||||||
*/
|
*/
|
||||||
DeviceInfoList getDeviceInfo() const {
|
DeviceInfoList getDeviceInfo() const {
|
||||||
std::scoped_lock lck(_devices_mtx);
|
std::scoped_lock lck(_mtx);
|
||||||
DeviceInfoList d2;
|
DeviceInfoList d2;
|
||||||
for(const auto& dev: _devices) {
|
for (const auto &dev : _devices) {
|
||||||
|
assert(dev != nullptr);
|
||||||
d2.push_back(dev->clone());
|
d2.push_back(dev->clone());
|
||||||
}
|
}
|
||||||
return d2;
|
return d2;
|
||||||
|
@ -118,9 +116,9 @@ class StreamMgr {
|
||||||
* set to true, the function returns immediately.
|
* set to true, the function returns immediately.
|
||||||
* @param callback Function to call when complete.
|
* @param callback Function to call when complete.
|
||||||
*/
|
*/
|
||||||
void
|
void rescanDAQDevices(
|
||||||
rescanDAQDevices(bool background = false,
|
bool background = false,
|
||||||
std::function<void()> callback = std::function<void()>());
|
std::function<void()> callback = std::function<void()>());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Start a stream based on given configuration.
|
* @brief Start a stream based on given configuration.
|
||||||
|
@ -141,12 +139,12 @@ class StreamMgr {
|
||||||
}
|
}
|
||||||
bool isStreamRunning(const StreamType type) const {
|
bool isStreamRunning(const StreamType type) const {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case (StreamType::input):
|
case (StreamType::input):
|
||||||
return bool(_inputStream);
|
return bool(_inputStream);
|
||||||
break;
|
break;
|
||||||
case (StreamType::output):
|
case (StreamType::output):
|
||||||
return bool(_outputStream);
|
return bool(_outputStream);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -193,11 +191,10 @@ class StreamMgr {
|
||||||
*/
|
*/
|
||||||
void setSiggen(std::shared_ptr<Siggen> s);
|
void setSiggen(std::shared_ptr<Siggen> s);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void inCallback(const DaqData &data);
|
void inCallback(const DaqData &data);
|
||||||
void outCallback(DaqData &data);
|
void outCallback(DaqData &data);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Add an input data handler. The handler's inCallback() function is
|
* @brief Add an input data handler. The handler's inCallback() function is
|
||||||
* called with data when available. This function should *NOT* be called by
|
* called with data when available. This function should *NOT* be called by
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/* #define DEBUGTRACE_ENABLED */
|
// #define DEBUGTRACE_ENABLED
|
||||||
#include "debugtrace.hpp"
|
#include "debugtrace.hpp"
|
||||||
#include "lasp_config.h"
|
#include "lasp_config.h"
|
||||||
|
|
||||||
|
@ -422,6 +422,7 @@ Daq::StreamStatus PortAudioDaq::getStreamStatus() const {
|
||||||
}
|
}
|
||||||
|
|
||||||
PortAudioDaq::~PortAudioDaq() {
|
PortAudioDaq::~PortAudioDaq() {
|
||||||
|
DEBUGTRACE_ENTER;
|
||||||
PaError err;
|
PaError err;
|
||||||
assert(_stream);
|
assert(_stream);
|
||||||
if (Pa_IsStreamActive(_stream)) {
|
if (Pa_IsStreamActive(_stream)) {
|
||||||
|
@ -445,7 +446,7 @@ int PortAudioDaq::memberPaCallback(const void *inputBuffer, void *outputBuffer,
|
||||||
unsigned long framesPerBuffer,
|
unsigned long framesPerBuffer,
|
||||||
const PaStreamCallbackTimeInfo *timeInfo,
|
const PaStreamCallbackTimeInfo *timeInfo,
|
||||||
PaStreamCallbackFlags statusFlags) {
|
PaStreamCallbackFlags statusFlags) {
|
||||||
// DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
typedef Daq::StreamStatus::StreamError se;
|
typedef Daq::StreamStatus::StreamError se;
|
||||||
if (statusFlags & paPrimingOutput) {
|
if (statusFlags & paPrimingOutput) {
|
||||||
// Initial output buffers generated. So nothing with input yet
|
// Initial output buffers generated. So nothing with input yet
|
||||||
|
|
|
@ -2,12 +2,9 @@
|
||||||
#include "lasp_avpowerspectra.h"
|
#include "lasp_avpowerspectra.h"
|
||||||
#include "debugtrace.hpp"
|
#include "debugtrace.hpp"
|
||||||
#include "lasp_mathtypes.h"
|
#include "lasp_mathtypes.h"
|
||||||
#include <cmath>
|
#include <stdexcept>
|
||||||
#include <optional>
|
|
||||||
|
|
||||||
using rte = std::runtime_error;
|
using rte = std::runtime_error;
|
||||||
using std::cerr;
|
|
||||||
using std::endl;
|
|
||||||
|
|
||||||
PowerSpectra::PowerSpectra(const us nfft, const Window::WindowType w)
|
PowerSpectra::PowerSpectra(const us nfft, const Window::WindowType w)
|
||||||
: PowerSpectra(Window::create(w, nfft)) {}
|
: PowerSpectra(Window::create(w, nfft)) {}
|
||||||
|
|
|
@ -3,8 +3,6 @@
|
||||||
#include "lasp_mathtypes.h"
|
#include "lasp_mathtypes.h"
|
||||||
#include "lasp_timebuffer.h"
|
#include "lasp_timebuffer.h"
|
||||||
#include "lasp_window.h"
|
#include "lasp_window.h"
|
||||||
#include <memory>
|
|
||||||
#include <optional>
|
|
||||||
|
|
||||||
/** \defgroup dsp Digital Signal Processing utilities
|
/** \defgroup dsp Digital Signal Processing utilities
|
||||||
* These are classes and functions used for processing raw signal data, to
|
* These are classes and functions used for processing raw signal data, to
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
#include "lasp_biquadbank.h"
|
#include "lasp_biquadbank.h"
|
||||||
#include "debugtrace.hpp"
|
#include "debugtrace.hpp"
|
||||||
#include "lasp_thread.h"
|
#include "lasp_thread.h"
|
||||||
#include <cassert>
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
using std::cerr;
|
using std::cerr;
|
||||||
|
|
|
@ -4,12 +4,9 @@
|
||||||
//
|
//
|
||||||
// Description: Real Time Signal Viewer.
|
// Description: Real Time Signal Viewer.
|
||||||
#pragma once
|
#pragma once
|
||||||
#include "lasp_avpowerspectra.h"
|
|
||||||
#include "lasp_filter.h"
|
|
||||||
#include "lasp_mathtypes.h"
|
#include "lasp_mathtypes.h"
|
||||||
#include "lasp_threadedindatahandler.h"
|
#include "lasp_threadedindatahandler.h"
|
||||||
#include "lasp_timebuffer.h"
|
#include "lasp_timebuffer.h"
|
||||||
#include <memory>
|
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/* #define DEBUGTRACE_ENABLED */
|
// #define DEBUGTRACE_ENABLED
|
||||||
#include "lasp_threadedindatahandler.h"
|
#include "lasp_threadedindatahandler.h"
|
||||||
|
|
||||||
#include <future>
|
#include <future>
|
||||||
|
@ -78,7 +78,7 @@ void ThreadedInDataHandlerBase::startThread() {
|
||||||
_1),
|
_1),
|
||||||
resetCallback);
|
resetCallback);
|
||||||
|
|
||||||
_thread_can_safely_run = true;
|
_thread_allowed_to_run = true;
|
||||||
_indatahandler->start();
|
_indatahandler->start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler(
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
|
|
||||||
// Early return in case object is under DESTRUCTION
|
// Early return in case object is under DESTRUCTION
|
||||||
if (!_thread_can_safely_run) return;
|
if (!_thread_allowed_to_run) return;
|
||||||
|
|
||||||
_queue->push(daqdata);
|
_queue->push(daqdata);
|
||||||
if (!_thread_running) {
|
if (!_thread_running) {
|
||||||
|
@ -103,23 +103,26 @@ void ThreadedInDataHandlerBase::stopThread() {
|
||||||
throw rte("BUG: ThreadedIndataHandler not running");
|
throw rte("BUG: ThreadedIndataHandler not running");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop the existing thread
|
||||||
|
_thread_allowed_to_run = false;
|
||||||
|
|
||||||
// Make sure no new data arrives
|
// Make sure no new data arrives
|
||||||
_indatahandler->stop();
|
_indatahandler->stop();
|
||||||
|
_indatahandler.reset();
|
||||||
|
|
||||||
// Stop the existing thread
|
DEBUGTRACE_PRINT("Indatahandler stopped. Waiting for thread to finish...");
|
||||||
_thread_can_safely_run = false;
|
|
||||||
|
|
||||||
// Then wait in steps for the thread to stop running.
|
// Then wait in steps for the thread to stop running.
|
||||||
while (_thread_running) {
|
while (_thread_running) {
|
||||||
std::this_thread::sleep_for(10us);
|
std::this_thread::sleep_for(10us);
|
||||||
}
|
}
|
||||||
|
DEBUGTRACE_PRINT("Thread stopped");
|
||||||
// Kill the handler
|
// Kill the handler
|
||||||
_indatahandler.reset();
|
DEBUGTRACE_PRINT("Handler resetted");
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
|
ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
if (_thread_can_safely_run) {
|
if (_thread_allowed_to_run) {
|
||||||
stopThread();
|
stopThread();
|
||||||
cerr << "*** BUG: InDataHandlers have not been all stopped, while "
|
cerr << "*** BUG: InDataHandlers have not been all stopped, while "
|
||||||
"StreamMgr destructor is called. This is a misuse BUG."
|
"StreamMgr destructor is called. This is a misuse BUG."
|
||||||
|
@ -131,7 +134,7 @@ ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() {
|
||||||
void ThreadedInDataHandlerBase::threadFcn() {
|
void ThreadedInDataHandlerBase::threadFcn() {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
|
|
||||||
while (!_queue->empty() && _thread_can_safely_run) {
|
while (!_queue->empty() && _thread_allowed_to_run) {
|
||||||
// Call inCallback_threaded
|
// Call inCallback_threaded
|
||||||
inCallback(_queue->pop());
|
inCallback(_queue->pop());
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ class ThreadedInDataHandlerBase {
|
||||||
std::unique_ptr<InDataHandler> _indatahandler;
|
std::unique_ptr<InDataHandler> _indatahandler;
|
||||||
|
|
||||||
std::atomic<bool> _thread_running{false};
|
std::atomic<bool> _thread_running{false};
|
||||||
std::atomic<bool> _thread_can_safely_run{false};
|
std::atomic<bool> _thread_allowed_to_run{false};
|
||||||
|
|
||||||
GlobalThreadPool _pool;
|
GlobalThreadPool _pool;
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,10 @@
|
||||||
/* #define DEBUGTRACE_ENABLED */
|
// #define DEBUGTRACE_ENABLED
|
||||||
|
#include <pybind11/pybind11.h>
|
||||||
|
#include <pybind11/pytypes.h>
|
||||||
|
|
||||||
|
#include <armadillo>
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
#include "arma_npy.h"
|
#include "arma_npy.h"
|
||||||
#include "debugtrace.hpp"
|
#include "debugtrace.hpp"
|
||||||
#include "lasp_clip.h"
|
#include "lasp_clip.h"
|
||||||
|
@ -9,14 +15,12 @@
|
||||||
#include "lasp_rtsignalviewer.h"
|
#include "lasp_rtsignalviewer.h"
|
||||||
#include "lasp_streammgr.h"
|
#include "lasp_streammgr.h"
|
||||||
#include "lasp_threadedindatahandler.h"
|
#include "lasp_threadedindatahandler.h"
|
||||||
#include <armadillo>
|
|
||||||
#include <atomic>
|
|
||||||
#include <chrono>
|
|
||||||
#include <pybind11/pybind11.h>
|
|
||||||
|
|
||||||
using namespace std::literals::chrono_literals;
|
using namespace std::literals::chrono_literals;
|
||||||
using std::cerr;
|
using std::cerr;
|
||||||
using std::endl;
|
using std::endl;
|
||||||
|
using rte = std::runtime_error;
|
||||||
|
using Lck = std::scoped_lock<std::recursive_mutex>;
|
||||||
|
|
||||||
namespace py = pybind11;
|
namespace py = pybind11;
|
||||||
|
|
||||||
|
@ -48,17 +52,17 @@ py::array_t<T> getPyArrayNoCpy(const DaqData &d) {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
return py::array_t<T>(
|
return py::array_t<T>(
|
||||||
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
|
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
|
||||||
|
|
||||||
py::array::StridesContainer( // Strides
|
py::array::StridesContainer( // Strides
|
||||||
{sizeof(T),
|
{sizeof(T),
|
||||||
sizeof(T) * d.nframes}), // Strides (in bytes) for each index
|
sizeof(T) * d.nframes}), // Strides (in bytes) for each index
|
||||||
|
|
||||||
reinterpret_cast<T *>(
|
reinterpret_cast<T *>(
|
||||||
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
|
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
|
||||||
|
|
||||||
dummyDataOwner // As stated above, now Numpy does not take ownership of
|
dummyDataOwner // As stated above, now Numpy does not take ownership of
|
||||||
// the data pointer.
|
// the data pointer.
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,17 +85,17 @@ py::array_t<d> dmat_to_ndarray(const DaqData &d) {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
return py::array_t<T>(
|
return py::array_t<T>(
|
||||||
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
|
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
|
||||||
|
|
||||||
py::array::StridesContainer( // Strides
|
py::array::StridesContainer( // Strides
|
||||||
{sizeof(T),
|
{sizeof(T),
|
||||||
sizeof(T) * d.nframes}), // Strides (in bytes) for each index
|
sizeof(T) * d.nframes}), // Strides (in bytes) for each index
|
||||||
|
|
||||||
reinterpret_cast<T *>(
|
reinterpret_cast<T *>(
|
||||||
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
|
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
|
||||||
|
|
||||||
dummyDataOwner // As stated above, now Numpy does not take ownership of
|
dummyDataOwner // As stated above, now Numpy does not take ownership of
|
||||||
// the data pointer.
|
// the data pointer.
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,8 +108,9 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
|
||||||
/**
|
/**
|
||||||
* @brief The callback functions that is called.
|
* @brief The callback functions that is called.
|
||||||
*/
|
*/
|
||||||
std::unique_ptr<py::function> cb, reset_callback;
|
py::object _cb, _reset_callback;
|
||||||
bool _done{false};
|
std::atomic<bool> _done{false};
|
||||||
|
std::recursive_mutex _mtx;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
|
@ -119,19 +124,25 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
|
||||||
*/
|
*/
|
||||||
PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback)
|
PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback)
|
||||||
: ThreadedInDataHandler(mgr),
|
: ThreadedInDataHandler(mgr),
|
||||||
cb(std::make_unique<py::function>(cb)),
|
_cb(py::weakref(cb)),
|
||||||
reset_callback(std::make_unique<py::function>(reset_callback)) {
|
_reset_callback(py::weakref(reset_callback)) {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
|
// cerr << "Thread ID: " << std::this_thread::get_id() << endl;
|
||||||
/// Start should be called externally, as at constructor time no virtual
|
/// Start should be called externally, as at constructor time no virtual
|
||||||
/// functions should be called.
|
/// functions should be called.
|
||||||
py::gil_scoped_release release;
|
if (_cb().is_none() || _reset_callback().is_none()) {
|
||||||
|
throw rte("cb or reset_callback is none!");
|
||||||
|
}
|
||||||
startThread();
|
startThread();
|
||||||
}
|
}
|
||||||
~PyIndataHandler() {
|
~PyIndataHandler() {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
|
// cerr << "Thread ID: " << std::this_thread::get_id() << endl;
|
||||||
/// Callback cannot be called, which results in a deadlock on the GIL
|
/// Callback cannot be called, which results in a deadlock on the GIL
|
||||||
/// without this release.
|
/// without this release.
|
||||||
py::gil_scoped_release release;
|
py::gil_scoped_release release;
|
||||||
|
DEBUGTRACE_PRINT("Gil released");
|
||||||
|
_done = true;
|
||||||
stopThread();
|
stopThread();
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
|
@ -139,91 +150,123 @@ class PyIndataHandler : public ThreadedInDataHandler<PyIndataHandler> {
|
||||||
*
|
*
|
||||||
* @param daq Daq device, or nullptr in case no input stream is running.
|
* @param daq Daq device, or nullptr in case no input stream is running.
|
||||||
*/
|
*/
|
||||||
void reset(const Daq *daq) {
|
void reset(const Daq *daqi) {
|
||||||
DEBUGTRACE_ENTER;
|
DEBUGTRACE_ENTER;
|
||||||
|
// cerr << "Thread ID: " << std::this_thread::get_id() << endl;
|
||||||
if (_done) return;
|
if (_done) return;
|
||||||
try {
|
{
|
||||||
py::gil_scoped_acquire acquire;
|
try {
|
||||||
if (daq) {
|
py::object reset_callback = _reset_callback();
|
||||||
(*reset_callback)(daq);
|
if (reset_callback.is_none()) {
|
||||||
} else {
|
DEBUGTRACE_PRINT("reset_callback is none, weakref killed");
|
||||||
(*reset_callback)(py::none());
|
_done = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (daqi != nullptr) {
|
||||||
|
assert(reset_callback);
|
||||||
|
reset_callback(daqi);
|
||||||
|
} else {
|
||||||
|
assert(reset_callback);
|
||||||
|
reset_callback(py::none());
|
||||||
|
}
|
||||||
|
} catch (py::error_already_set &e) {
|
||||||
|
cerr << "*************** Error calling reset callback!\n";
|
||||||
|
cerr << e.what() << endl;
|
||||||
|
cerr << "*************** \n";
|
||||||
|
/// Throwing a runtime error here does not work out one way or another.
|
||||||
|
/// Therefore, it is better to dive out and prevent undefined behaviour
|
||||||
|
abort();
|
||||||
|
/* throw std::runtime_error(e.what()); */
|
||||||
|
} catch (std::exception &e) {
|
||||||
|
cerr << "Caught unknown exception in reset callback:" << e.what()
|
||||||
|
<< endl;
|
||||||
|
abort();
|
||||||
}
|
}
|
||||||
} catch (py::error_already_set &e) {
|
} // end of GIL scope
|
||||||
cerr << "*************** Error calling reset callback!\n";
|
} // end of function reset()
|
||||||
cerr << e.what() << endl;
|
|
||||||
cerr << "*************** \n";
|
|
||||||
/// Throwing a runtime error here does not work out one way or another.
|
|
||||||
/// Therefore, it is better to dive out and prevent undefined behaviour
|
|
||||||
abort();
|
|
||||||
/* throw std::runtime_error(e.what()); */
|
|
||||||
} catch (std::exception &e) {
|
|
||||||
cerr << "Caught unknown exception in reset callback:" << e.what() << endl;
|
|
||||||
abort();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Calls the Python callback method / function with a Numpy array of
|
* @brief Calls the Python callback method / function with a Numpy array of
|
||||||
* stream data.
|
* stream data.
|
||||||
*/
|
*/
|
||||||
void inCallback(const DaqData &d) {
|
void inCallback(const DaqData &d) {
|
||||||
|
DEBUGTRACE_ENTER;
|
||||||
/* DEBUGTRACE_ENTER; */
|
// cerr << "=== Enter incallback for thread ID: " << std::this_thread::get_id() << endl;
|
||||||
|
|
||||||
using DataType = DataTypeDescriptor::DataType;
|
using DataType = DataTypeDescriptor::DataType;
|
||||||
if (_done) return;
|
if (_done) {
|
||||||
|
DEBUGTRACE_PRINT("Early stop, done");
|
||||||
try {
|
return;
|
||||||
py::gil_scoped_acquire acquire;
|
|
||||||
py::object bool_val;
|
|
||||||
switch (d.dtype) {
|
|
||||||
case (DataType::dtype_int8): {
|
|
||||||
bool_val = (*cb)(getPyArrayNoCpy<int8_t>(d));
|
|
||||||
} break;
|
|
||||||
case (DataType::dtype_int16): {
|
|
||||||
bool_val = (*cb)(getPyArrayNoCpy<int16_t>(d));
|
|
||||||
} break;
|
|
||||||
case (DataType::dtype_int32): {
|
|
||||||
bool_val = (*cb)(getPyArrayNoCpy<int32_t>(d));
|
|
||||||
} break;
|
|
||||||
case (DataType::dtype_fl32): {
|
|
||||||
bool_val = (*cb)(getPyArrayNoCpy<float>(d));
|
|
||||||
} break;
|
|
||||||
case (DataType::dtype_fl64): {
|
|
||||||
bool_val = (*cb)(getPyArrayNoCpy<double>(d));
|
|
||||||
} break;
|
|
||||||
default:
|
|
||||||
throw std::runtime_error("BUG");
|
|
||||||
} // End of switch
|
|
||||||
|
|
||||||
bool res = bool_val.cast<bool>();
|
|
||||||
if (res == false) {
|
|
||||||
DEBUGTRACE_PRINT("Setting callbacks to None")
|
|
||||||
_done = true;
|
|
||||||
// cb = py::function(py::none());
|
|
||||||
// reset_callback = py::function(py::none());
|
|
||||||
cb.reset();
|
|
||||||
reset_callback.reset();
|
|
||||||
}
|
|
||||||
} catch (py::error_already_set &e) {
|
|
||||||
cerr << "ERROR: Python raised exception from callback function: ";
|
|
||||||
cerr << e.what() << endl;
|
|
||||||
abort();
|
|
||||||
} catch (py::cast_error &e) {
|
|
||||||
cerr << e.what() << endl;
|
|
||||||
cerr << "ERROR: Python callback does not return boolean value." << endl;
|
|
||||||
abort();
|
|
||||||
} catch (std::exception &e) {
|
|
||||||
cerr << "Caught unknown exception in Python callback:" << e.what()
|
|
||||||
<< endl;
|
|
||||||
abort();
|
|
||||||
}
|
}
|
||||||
}
|
{
|
||||||
|
DEBUGTRACE_PRINT("================ TRYING TO OBTAIN GIL in inCallback...");
|
||||||
|
py::gil_scoped_acquire acquire;
|
||||||
|
try {
|
||||||
|
py::object py_bool;
|
||||||
|
py::object cb = _cb();
|
||||||
|
if (cb.is_none()) {
|
||||||
|
DEBUGTRACE_PRINT("cb is none, weakref killed");
|
||||||
|
_done = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
switch (d.dtype) {
|
||||||
|
case (DataType::dtype_int8): {
|
||||||
|
py_bool = cb(getPyArrayNoCpy<int8_t>(d));
|
||||||
|
} break;
|
||||||
|
case (DataType::dtype_int16): {
|
||||||
|
py_bool = cb(getPyArrayNoCpy<int16_t>(d));
|
||||||
|
} break;
|
||||||
|
case (DataType::dtype_int32): {
|
||||||
|
py_bool = cb(getPyArrayNoCpy<int32_t>(d));
|
||||||
|
} break;
|
||||||
|
case (DataType::dtype_fl32): {
|
||||||
|
py_bool = cb(getPyArrayNoCpy<float>(d));
|
||||||
|
} break;
|
||||||
|
case (DataType::dtype_fl64): {
|
||||||
|
py_bool = cb(getPyArrayNoCpy<double>(d));
|
||||||
|
} break;
|
||||||
|
default:
|
||||||
|
throw std::runtime_error("BUG");
|
||||||
|
} // End of switch
|
||||||
|
|
||||||
|
bool res = py_bool.cast<bool>();
|
||||||
|
if (res == false) {
|
||||||
|
DEBUGTRACE_PRINT("Setting callbacks to None")
|
||||||
|
_done = true;
|
||||||
|
|
||||||
|
// By doing this, we remove the references, but in the mean time this
|
||||||
|
// might also trigger removing Python objects. Including itself, as
|
||||||
|
// there is no reference to it anymore. The consequence is that the
|
||||||
|
// current object might be destroyed from this thread. However, if we
|
||||||
|
// do not remove these references and in lasp_record.py finish() is
|
||||||
|
// not called, we end up with not-garbage collected recordings in
|
||||||
|
// memory. This is also not good. How can we force Python to not yet
|
||||||
|
// destroy this object?
|
||||||
|
// cb.reset();
|
||||||
|
// reset_callback.reset();
|
||||||
|
}
|
||||||
|
} catch (py::error_already_set &e) {
|
||||||
|
cerr << "ERROR (BUG): Python raised exception from callback function: ";
|
||||||
|
cerr << e.what() << endl;
|
||||||
|
abort();
|
||||||
|
} catch (py::cast_error &e) {
|
||||||
|
cerr << e.what() << endl;
|
||||||
|
cerr << "ERROR (BUG): Python callback does not return boolean value."
|
||||||
|
<< endl;
|
||||||
|
abort();
|
||||||
|
} catch (std::exception &e) {
|
||||||
|
cerr << "Caught unknown exception in Python callback:" << e.what()
|
||||||
|
<< endl;
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
} // End of scope in which the GIL is acquired
|
||||||
|
// cerr << "=== LEAVE incallback for thread ID: " << std::this_thread::get_id() << endl;
|
||||||
|
|
||||||
|
} // End of function inCallback()
|
||||||
};
|
};
|
||||||
|
|
||||||
void init_datahandler(py::module &m) {
|
void init_datahandler(py::module &m) {
|
||||||
|
|
||||||
/// The C++ class is PyIndataHandler, but for Python, it is called
|
/// The C++ class is PyIndataHandler, but for Python, it is called
|
||||||
/// InDataHandler
|
/// InDataHandler
|
||||||
py::class_<PyIndataHandler> pyidh(m, "InDataHandler");
|
py::class_<PyIndataHandler> pyidh(m, "InDataHandler");
|
||||||
|
@ -256,29 +299,29 @@ void init_datahandler(py::module &m) {
|
||||||
cval = clip.getCurrentValue();
|
cval = clip.getCurrentValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
return ColToNpy<arma::uword>(cval); // something goes wrong here
|
return ColToNpy<arma::uword>(cval); // something goes wrong here
|
||||||
});
|
});
|
||||||
|
|
||||||
/// Real time Aps
|
/// Real time Aps
|
||||||
///
|
///
|
||||||
py::class_<RtAps> rtaps(m, "RtAps");
|
py::class_<RtAps> rtaps(m, "RtAps");
|
||||||
rtaps.def(py::init<SmgrHandle, // StreamMgr
|
rtaps.def(py::init<SmgrHandle, // StreamMgr
|
||||||
Filter *const, // FreqWeighting filter
|
Filter *const, // FreqWeighting filter
|
||||||
const us, // Nfft
|
const us, // Nfft
|
||||||
const Window::WindowType, // Window
|
const Window::WindowType, // Window
|
||||||
const d, // Overlap percentage 0<=o<100
|
const d, // Overlap percentage 0<=o<100
|
||||||
|
|
||||||
const d // Time constant
|
const d // Time constant
|
||||||
>(),
|
>(),
|
||||||
py::arg("streammgr"), // StreamMgr
|
py::arg("streammgr"), // StreamMgr
|
||||||
py::arg("preFilter").none(true),
|
py::arg("preFilter").none(true),
|
||||||
/// Below list of arguments *SHOULD* be same as for
|
/// Below list of arguments *SHOULD* be same as for
|
||||||
|
|
||||||
/// AvPowerSpectra constructor!
|
/// AvPowerSpectra constructor!
|
||||||
py::arg("nfft") = 2048, //
|
py::arg("nfft") = 2048, //
|
||||||
py::arg("windowType") = Window::WindowType::Hann, //
|
py::arg("windowType") = Window::WindowType::Hann, //
|
||||||
py::arg("overlap_percentage") = 50.0, //
|
py::arg("overlap_percentage") = 50.0, //
|
||||||
py::arg("time_constant") = -1 //
|
py::arg("time_constant") = -1 //
|
||||||
);
|
);
|
||||||
|
|
||||||
rtaps.def("getCurrentValue", [](RtAps &rt) {
|
rtaps.def("getCurrentValue", [](RtAps &rt) {
|
||||||
|
@ -293,10 +336,10 @@ void init_datahandler(py::module &m) {
|
||||||
/// Real time Signal Viewer
|
/// Real time Signal Viewer
|
||||||
///
|
///
|
||||||
py::class_<RtSignalViewer> rtsv(m, "RtSignalViewer");
|
py::class_<RtSignalViewer> rtsv(m, "RtSignalViewer");
|
||||||
rtsv.def(py::init<SmgrHandle, // StreamMgr
|
rtsv.def(py::init<SmgrHandle, // StreamMgr
|
||||||
const d, // Time history
|
const d, // Time history
|
||||||
const us, // Resolution
|
const us, // Resolution
|
||||||
const us // Channel number
|
const us // Channel number
|
||||||
>());
|
>());
|
||||||
|
|
||||||
rtsv.def("getCurrentValue", [](RtSignalViewer &rt) {
|
rtsv.def("getCurrentValue", [](RtSignalViewer &rt) {
|
||||||
|
|
|
@ -5,7 +5,7 @@ requires-python = ">=3.10"
|
||||||
description = "Library for Acoustic Signal Processing"
|
description = "Library for Acoustic Signal Processing"
|
||||||
license = { "file" = "LICENSE" }
|
license = { "file" = "LICENSE" }
|
||||||
authors = [{ "name" = "J.A. de Jong", "email" = "j.a.dejong@ascee.nl" }]
|
authors = [{ "name" = "J.A. de Jong", "email" = "j.a.dejong@ascee.nl" }]
|
||||||
version = "1.5.0"
|
version = "1.5.1"
|
||||||
|
|
||||||
keywords = ["DSP", "DAQ", "Signal processing"]
|
keywords = ["DSP", "DAQ", "Signal processing"]
|
||||||
|
|
||||||
|
@ -24,7 +24,6 @@ urls = { "Documentation" = "https://lasp.ascee.nl" }
|
||||||
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"scipy",
|
"scipy",
|
||||||
"numpy",
|
|
||||||
"matplotlib>=3.7.2",
|
"matplotlib>=3.7.2",
|
||||||
"appdirs",
|
"appdirs",
|
||||||
"dataclasses_json",
|
"dataclasses_json",
|
||||||
|
|
|
@ -126,8 +126,15 @@ class Recording:
|
||||||
|
|
||||||
logger.debug("Starting record....")
|
logger.debug("Starting record....")
|
||||||
|
|
||||||
|
# In the PyInDataHandler, a weak reference is stored to the python
|
||||||
|
# methods reset and incallback. One way or another, the weak ref is gone
|
||||||
|
# on the callback thread. If we store an "extra" ref to this method over
|
||||||
|
# here, the weak ref stays alive. We do not know whether this is a bug
|
||||||
|
# or a feature, but in any case storing this extra ref to inCallback
|
||||||
|
# solves the problem.
|
||||||
|
self._incalback_cpy = self.inCallback
|
||||||
self._indataHandler = InDataHandler(
|
self._indataHandler = InDataHandler(
|
||||||
streammgr, self.inCallback, self.resetCallback
|
streammgr, self._incalback_cpy, self.resetCallback
|
||||||
)
|
)
|
||||||
|
|
||||||
if wait:
|
if wait:
|
||||||
|
@ -190,7 +197,7 @@ class Recording:
|
||||||
When returning False, it will stop the stream.
|
When returning False, it will stop the stream.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
logger.debug(f"inCallback({adata})")
|
logger.debug(f"inCallback()")
|
||||||
if self._stop():
|
if self._stop():
|
||||||
logger.debug("Stop flag set, early return in inCallback")
|
logger.debug("Stop flag set, early return in inCallback")
|
||||||
# Stop flag is raised. We do not add any data anymore.
|
# Stop flag is raised. We do not add any data anymore.
|
||||||
|
@ -228,10 +235,10 @@ class Recording:
|
||||||
self._progressCallback(recstatus)
|
self._progressCallback(recstatus)
|
||||||
|
|
||||||
case RecordingState.AllDataStored:
|
case RecordingState.AllDataStored:
|
||||||
pass
|
return False
|
||||||
|
|
||||||
case RecordingState.Finished:
|
case RecordingState.Finished:
|
||||||
pass
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue