DaqConfiguration(s) back in place. Time to couple some stuff to ACME

This commit is contained in:
Anne de Jong 2022-08-01 17:26:22 +02:00
parent 0421236ed0
commit 12cf9586eb
18 changed files with 331 additions and 89 deletions

View File

@ -1,6 +1,8 @@
cmake_minimum_required (VERSION 3.16) cmake_minimum_required (VERSION 3.16)
project(LASP LANGUAGES C CXX) project(LASP LANGUAGES C CXX)
set(LASP_VERSION_MAJOR 1)
set(LASP_VERSION_MINOR 0)
# To allow linking to static libs from other directories # To allow linking to static libs from other directories
cmake_policy(SET CMP0079 NEW) cmake_policy(SET CMP0079 NEW)
@ -32,7 +34,6 @@ set(PY_FULL_VERSION ${PROJECT_VERSION}${PY_VERSION_SUFFIX})
set(CMAKE_POSITION_INDEPENDENT_CODE ON) set(CMAKE_POSITION_INDEPENDENT_CODE ON)
if(CMAKE_BUILD_TYPE STREQUAL Debug) if(CMAKE_BUILD_TYPE STREQUAL Debug)
add_definitions(-DDEBUG=1)
set(LASP_DEBUG True) set(LASP_DEBUG True)
else() else()
set(LASP_DEBUG False) set(LASP_DEBUG False)

View File

@ -5,12 +5,14 @@ requires-python = ">=3.8"
license = { "file" = "LICENSE" } license = { "file" = "LICENSE" }
authors = [{ "name" = "J.A. de Jong et al.", "email" = "info@ascee.nl" }] authors = [{ "name" = "J.A. de Jong et al.", "email" = "info@ascee.nl" }]
keywords = ["DSP", "DAQ", "Signal processing"] keywords = ["DSP", "DAQ", "Signal processing"]
classifiers = [ classifiers = [
"Topic :: Scientific/Engineering", "Topic :: Scientific/Engineering",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.8",
"Operating System :: POSIX :: Linux", "Operating System :: POSIX :: Linux",
"Operating System :: Microsoft :: Windows", "Operating System :: Microsoft :: Windows",
] ]
# urls = { "Documentation" = "https://" } # urls = { "Documentation" = "https://" }
dependencies = ["numpy", "scipy", "appdirs"] dependencies = ["numpy", "scipy", "appdirs"]
dynamic = ["version", "description"] dynamic = ["version", "description"]

View File

@ -26,12 +26,11 @@ FORMAT = "[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s"
logging.basicConfig(format=FORMAT, level=numeric_level) logging.basicConfig(format=FORMAT, level=numeric_level)
import multiprocessing import multiprocessing
from lasp.device import DaqConfigurations from lasp import StreamMgr, Recording, DaqConfiguration
from lasp import AvType, StreamManager, Recording# configureLogging
def main(args): def main(args):
try: try:
streammgr = StreamManager() streammgr = StreamMgr.getInstance()
configs = DaqConfigurations.loadAllConfigs() configs = DaqConfigurations.loadAllConfigs()
config_keys = [key for key in configs.keys()] config_keys = [key for key in configs.keys()]

View File

@ -1,4 +1,4 @@
# cpp_src/CMakeLists.txt # src/lasp/CMakeLists.txt
# Armadillo # Armadillo
add_definitions(-DARMA_DONT_USE_WRAPPER) add_definitions(-DARMA_DONT_USE_WRAPPER)

View File

@ -3,18 +3,20 @@
LASP: Library for Acoustic Signal Processing LASP: Library for Acoustic Signal Processing
""" """
__version__ = "1.0"
from .lasp_common import (P_REF, FreqWeighting, TimeWeighting, getTime, from .lasp_common import (P_REF, FreqWeighting, TimeWeighting, getTime,
getFreq, Qty, SIQtys, Window, lasp_shelve, getFreq, Qty, SIQtys, Window, lasp_shelve,
this_lasp_shelve, W_REF, U_REF, I_REF, dBFS_REF, this_lasp_shelve, W_REF, U_REF, I_REF, dBFS_REF,
AvType) AvType)
from .lasp_cpp import * from .lasp_cpp import *
__version__ = lasp_cpp.__version__
# from .lasp_imptube import * # TwoMicImpedanceTube # from .lasp_imptube import * # TwoMicImpedanceTube
from .lasp_measurement import * # Measurement, scaleBlockSens from .lasp_measurement import * # Measurement, scaleBlockSens
from .lasp_octavefilter import * from .lasp_octavefilter import *
# from .lasp_slm import * # SLM, Dummy # from .lasp_slm import * # SLM, Dummy
from .lasp_record import * # RecordStatus, Recording from .lasp_record import * # RecordStatus, Recording
from .lasp_daqconfigs import DaqConfigurations
# from .lasp_siggen import * # SignalType, NoiseType, SiggenMessage, SiggenData, Siggen # from .lasp_siggen import * # SignalType, NoiseType, SiggenMessage, SiggenData, Siggen
# from .lasp_weighcal import * # WeighCal # from .lasp_weighcal import * # WeighCal
# from .tools import * # SmoothingType, smoothSpectralData, SmoothingWidth # from .tools import * # SmoothingType, smoothSpectralData, SmoothingWidth

View File

@ -1,4 +1,4 @@
# cpp_src/src/device/CMakeLists.txt # src/lasp/device/CMakeLists.txt
add_library(lasp_device_lib OBJECT add_library(lasp_device_lib OBJECT
lasp_daq.cpp lasp_daq.cpp

View File

@ -1,7 +1,11 @@
/* #define DEBUGTRACE_ENABLED */
#include "debugtrace.hpp"
#include "lasp_daqconfig.h" #include "lasp_daqconfig.h"
#include "lasp_deviceinfo.h" #include "lasp_deviceinfo.h"
#include <algorithm> #include <algorithm>
#include <cassert> #include <cassert>
#include <stdexcept>
#define MAX_DEV_COUNT_PER_API 20 #define MAX_DEV_COUNT_PER_API 20
@ -10,16 +14,15 @@ using std::vector;
vector<DaqApi> DaqApi::getAvailableApis() { vector<DaqApi> DaqApi::getAvailableApis() {
vector<DaqApi> apis; vector<DaqApi> apis;
apis.resize(6);
#if LASP_HAS_ULDAQ == 1 #if LASP_HAS_ULDAQ == 1
apis.at(uldaqapi.apicode) = uldaqapi; apis.push_back(uldaqapi);
#endif #endif
#if LASP_HAS_RTAUDIO == 1 #if LASP_HAS_RTAUDIO == 1
apis.at(rtaudioAlsaApi.apicode) = rtaudioAlsaApi; apis.push_back(rtaudioAlsaApi);
apis.at(rtaudioPulseaudioApi.apicode) = rtaudioPulseaudioApi; apis.push_back(rtaudioPulseaudioApi);
apis.at(rtaudioWasapiApi.apicode) = rtaudioWasapiApi; apis.push_back(rtaudioWasapiApi);
apis.at(rtaudioDsApi.apicode) = rtaudioDsApi; apis.push_back(rtaudioDsApi);
apis.at(rtaudioAsioApi.apicode) = rtaudioAsioApi; apis.push_back(rtaudioAsioApi);
#endif #endif
return apis; return apis;
} }
@ -32,13 +35,13 @@ DaqConfiguration::DaqConfiguration(const DeviceInfo &device) {
inchannel_config.resize(device.ninchannels); inchannel_config.resize(device.ninchannels);
outchannel_config.resize(device.noutchannels); outchannel_config.resize(device.noutchannels);
us i = 0; us i = 0;
for(auto& inch: inchannel_config) { for (auto &inch : inchannel_config) {
inch.name = "Unnamed input channel " + std::to_string(i); inch.name = "Unnamed input channel " + std::to_string(i);
i++; i++;
} }
i = 0; i = 0;
for(auto& outch: outchannel_config) { for (auto &outch : outchannel_config) {
outch.name = "Unnamed output channel " + std::to_string(i); outch.name = "Unnamed output channel " + std::to_string(i);
i++; i++;
} }
@ -89,7 +92,8 @@ int DaqConfiguration::getLowestOutChannel() const {
#include "toml++/toml.h" #include "toml++/toml.h"
#include <sstream> #include <sstream>
toml::table daqChannelToTOML(const DaqChannel& ch) { toml::table daqChannelToTOML(const DaqChannel &ch) {
DEBUGTRACE_ENTER;
toml::table tbl; toml::table tbl;
tbl.emplace("enabled", ch.enabled); tbl.emplace("enabled", ch.enabled);
tbl.emplace("name", ch.name); tbl.emplace("name", ch.name);
@ -105,9 +109,12 @@ toml::table daqChannelToTOML(const DaqChannel& ch) {
string DaqConfiguration::toTOML() const { string DaqConfiguration::toTOML() const {
toml::table apitbl{{"name", api.apiname}, DEBUGTRACE_ENTER;
{"code", api.apicode}, toml::table apitbl{
{"subcode", api.api_specific_subcode}}; {"apiname", api.apiname}, // Api name
{"apicode", api.apicode}, // Api code
{"api_specific_subcode", api.api_specific_subcode} // Subcode
};
toml::table tbl{{"daqapi", apitbl}}; toml::table tbl{{"daqapi", apitbl}};
@ -118,13 +125,13 @@ string DaqConfiguration::toTOML() const {
tbl.emplace("monitorOutput", monitorOutput); tbl.emplace("monitorOutput", monitorOutput);
toml::array inchannel_config_tbl; toml::array inchannel_config_tbl;
for(const auto& ch: inchannel_config) { for (const auto &ch : inchannel_config) {
inchannel_config_tbl.emplace_back(daqChannelToTOML(ch)); inchannel_config_tbl.emplace_back(daqChannelToTOML(ch));
} }
tbl.emplace("inchannel_config", inchannel_config_tbl); tbl.emplace("inchannel_config", inchannel_config_tbl);
toml::array outchannel_config_tbl; toml::array outchannel_config_tbl;
for(const auto& ch: outchannel_config) { for (const auto &ch : outchannel_config) {
outchannel_config_tbl.emplace_back(daqChannelToTOML(ch)); outchannel_config_tbl.emplace_back(daqChannelToTOML(ch));
} }
tbl.emplace("outchannel_config", outchannel_config_tbl); tbl.emplace("outchannel_config", outchannel_config_tbl);
@ -135,3 +142,82 @@ string DaqConfiguration::toTOML() const {
return str.str(); return str.str();
} }
template <typename T,typename R> T getValue(R &tbl, const char* key) {
using std::runtime_error;
DEBUGTRACE_ENTER;
// Yeah, weird syntax quirck of C++
std::optional<T> val = tbl[key].template value<T>();
if (!val) {
throw runtime_error(string("Error while parsing Daq configuration. Table "
"does not contain key: ") +
key);
}
return val.value();
}
template<typename T> DaqChannel TOMLToDaqChannel(T& node) {
DEBUGTRACE_ENTER;
DaqChannel d;
toml::table& tbl = *node.as_table();
d.enabled = getValue<bool>(tbl, "enabled");
d.name = getValue<string>(tbl, "name");
d.sensitivity = getValue<double>(tbl, "sensitivity");
d.IEPEEnabled = getValue<bool>(tbl, "IEPEEnabled");
d.ACCouplingMode = getValue<bool>(tbl, "ACCouplingMode");
d.rangeIndex = getValue<bool>(tbl, "rangeIndex");
d.qty = static_cast<DaqChannel::Qty>(getValue<int64_t>(tbl, "qty"));
d.digitalHighPassCutOn = getValue<double>(tbl, "digitalHighpassCutOn");
return d;
}
DaqConfiguration DaqConfiguration::fromTOML(const std::string &tomlstr) {
DEBUGTRACE_ENTER;
using std::runtime_error;
try {
toml::table tbl = toml::parse(tomlstr);
DaqConfiguration config;
auto daqapi = tbl["daqapi"];
DaqApi api;
api.apicode = getValue<int64_t>(daqapi, "apicode");
api.api_specific_subcode = getValue<int64_t>(daqapi, "api_specific_subcode");
api.apiname = getValue<string>(daqapi, "apiname");
config.api = api;
config.device_name = getValue<string>(tbl, "device_name");
config.sampleRateIndex = getValue<int64_t>(tbl, "sampleRateIndex");
config.dataTypeIndex = getValue<int>(tbl, "dataTypeIndex");
config.framesPerBlockIndex = getValue<int64_t>(tbl, "framesPerBlockIndex");
config.monitorOutput = getValue<bool>(tbl, "monitorOutput");
if(toml::array* in_arr = tbl["inchannel_config"].as_array()) {
for(auto& el: *in_arr) {
config.inchannel_config.push_back(TOMLToDaqChannel(el));
}
} else {
throw runtime_error("inchannel_config is not an array");
}
if(toml::array* out_arr = tbl["outchannel_config"].as_array()) {
for(auto& el: *out_arr) {
config.outchannel_config.push_back(TOMLToDaqChannel(el));
}
} else {
throw runtime_error("outchannel_config is not an array");
}
return config;
} catch (const toml::parse_error &e) {
throw std::runtime_error(string("Error parsing TOML DaqConfiguration: ") +
e.what());
}
}

View File

@ -109,7 +109,7 @@ public:
* @param frame Frame number * @param frame Frame number
* @param channel Channel number * @param channel Channel number
* *
* @return * @return
*/ */
T &operator()(const us frame = 0, const us channel = 0) { T &operator()(const us frame = 0, const us channel = 0) {
return reinterpret_cast<T &>(*raw_ptr(frame, channel)); return reinterpret_cast<T &>(*raw_ptr(frame, channel));

View File

@ -22,16 +22,27 @@ from threading import Lock
class Atomic: class Atomic:
"""
Implementation of atomic operations on integers and booleans.
"""
def __init__(self, val): def __init__(self, val):
self.checkType(val)
self._val = val self._val = val
self._lock = Lock() self._lock = Lock()
def checkType(self, val):
if not (type(val) == bool | type(val) == int):
raise RuntimeError("Invalid type for Atomic")
def __iadd__(self, toadd): def __iadd__(self, toadd):
self.checkType(toadd)
with self._lock: with self._lock:
self._val += toadd self._val += toadd
return self return self
def __isub__(self, toadd): def __isub__(self, toadd):
self.checkType(toadd)
with self._lock: with self._lock:
self._val -= toadd self._val -= toadd
return self return self
@ -41,6 +52,7 @@ class Atomic:
return self._val return self._val
def __ilshift__(self, other): def __ilshift__(self, other):
self.checkType(other)
with self._lock: with self._lock:
self._val = other self._val = other
return self return self

View File

@ -9,6 +9,9 @@
#ifndef LASP_CONFIG_H #ifndef LASP_CONFIG_H
#define LASP_CONFIG_H #define LASP_CONFIG_H
const int LASP_VERSION_MAJOR = @LASP_VERSION_MAJOR@;
const int LASP_VERSION_MINOR = @LASP_VERSION_MINOR@;
/* Debug flag */ /* Debug flag */
#cmakedefine01 LASP_DEBUG #cmakedefine01 LASP_DEBUG
@ -32,6 +35,7 @@
#cmakedefine01 LASP_DOUBLE_PRECISION #cmakedefine01 LASP_DOUBLE_PRECISION
#cmakedefine01 LASP_USE_BLAS #cmakedefine01 LASP_USE_BLAS
/* Single / double precision */ /* Single / double precision */
#ifdef LASP_DOUBLE_PRECISION #ifdef LASP_DOUBLE_PRECISION
#define LASP_FLOAT_SIZE 64 #define LASP_FLOAT_SIZE 64

View File

@ -1,13 +1,13 @@
#include "lasp_config.h"
#include <pybind11/pybind11.h> #include <pybind11/pybind11.h>
namespace py = pybind11; namespace py = pybind11;
void init_dsp(py::module& m); void init_dsp(py::module &m);
void init_deviceinfo(py::module& m); void init_deviceinfo(py::module &m);
void init_daqconfiguration(py::module& m); void init_daqconfiguration(py::module &m);
void init_daq(py::module& m); void init_daq(py::module &m);
void init_streammgr(py::module& m); void init_streammgr(py::module &m);
void init_datahandler(py::module& m); void init_datahandler(py::module &m);
PYBIND11_MODULE(lasp_cpp, m) { PYBIND11_MODULE(lasp_cpp, m) {
@ -18,4 +18,9 @@ PYBIND11_MODULE(lasp_cpp, m) {
init_streammgr(m); init_streammgr(m);
init_datahandler(m); init_datahandler(m);
m.attr("__version__") = std::to_string(LASP_VERSION_MAJOR) + "." +
std::to_string(LASP_VERSION_MINOR);
m.attr("LASP_VERSION_MAJOR") = LASP_VERSION_MAJOR;
m.attr("LASP_VERSION_MINOR") = LASP_VERSION_MINOR;
} }

121
src/lasp/lasp_daqconfigs.py Normal file
View File

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
from .lasp_cpp import DaqConfiguration, LASP_VERSION_MAJOR
"""!
Author: J.A. de Jong - ASCEE
Description:
Data Acquistiion (DAQ) device descriptors, and the DAQ devices themselves
"""
__all__ = ["DaqConfigurations"]
import json
from .lasp_common import Qty, SIQtys, lasp_shelve
from .lasp_cpp import DaqChannel, DaqConfiguration
class DaqConfigurations:
"""
DaqConfigurations stores a set containing an input configuration and an
output configuration.
"""
def __init__(
self,
duplex_mode: bool,
input_config: DaqConfiguration,
output_config: DaqConfiguration,
):
"""
Initialize set of DaqConfigurations.
Args:
duplex_mode: If true, the input configuration is used for output as
well. This makes only sense when the device is capable of having
simultaneous input / output.
input_config: The configuration settings for the input
output_config: The configuration settoutgs for the output
"""
self.input_config = input_config
self.output_config = output_config
self.duplex_mode = duplex_mode
@staticmethod
def getNames():
"""
Get a list of all names of DaqConfigurations sets.
Returns:
list of names
"""
with lasp_shelve() as sh:
configs_ser = sh.load(f"daqconfigs_v{LASP_VERSION_MAJOR}", {})
return list(configs_ser.keys())
@staticmethod
def loadAll():
"""
Returns a dictionary of all configurations presets. The dictionary keys
are the names of the configurations
Returns:
all configurations, as a dictionary
"""
with lasp_shelve() as sh:
configs_ser = sh.load(f"daqconfigs_v{LASP_VERSION_MAJOR}", {})
configs = {}
for name, val in configs_ser.items():
configs[name] = DaqConfigurations.load(name)
return configs
@staticmethod
def load(name: str):
"""
Load a single configuration preset, containing input config and output config
Args:
name: The name of the configuration to load.
"""
with lasp_shelve() as sh:
configs_str = sh.load(f"daqconfigs_v{LASP_VERSION_MAJOR}", {})
config_str = configs_str[name]
duplex_mode = config_str[0]
input_config = DaqConfiguration.fromTOML(config_str[1])
output_config = DaqConfiguration.fromTOML(config_str[2])
return DaqConfigurations(duplex_mode, input_config, output_config)
def save(self, name: str):
"""
Save the current set of configurations to the shelve store.
Args:
name: The name of the configuration set.
"""
with lasp_shelve() as sh:
# Convert to TOML
input_str = self.input_config.toTOML()
output_str = self.output_config.toTOML()
configs_str = sh.load(f"daqconfigs_v{LASP_VERSION_MAJOR}", {})
configs_str[name] = [self.duplex_mode, input_str, output_str]
sh.store(f"daqconfigs_v{LASP_VERSION_MAJOR}", configs_str)
@staticmethod
def delete(name: str):
"""
Delete a DaqConfigurations set from the store.
"""
with lasp_shelve() as sh:
configs = sh.load("daqconfigs", {})
del configs[name]
sh.store(f"daqconfigs_v{LASP_VERSION_MAJOR}", configs)

View File

@ -12,8 +12,6 @@ __all__ = ['FirOctaveFilterBank', 'FirThirdOctaveFilterBank',
from .filter.filterbank_design import (OctaveBankDesigner, from .filter.filterbank_design import (OctaveBankDesigner,
ThirdOctaveBankDesigner) ThirdOctaveBankDesigner)
from .wrappers import (Decimator, FilterBank as pyxFilterBank,
SosFilterBank as pyxSosFilterBank)
import numpy as np import numpy as np

View File

@ -9,6 +9,7 @@ import os
import time import time
import h5py import h5py
from .lasp_cpp import InDataHandler, StreamMgr from .lasp_cpp import InDataHandler, StreamMgr
from .lasp_atomic import Atomic
@dataclasses.dataclass @dataclasses.dataclass
@ -65,7 +66,7 @@ class Recording:
self.curT_rounded_to_seconds = 0 self.curT_rounded_to_seconds = 0
# Counter of the number of blocks # Counter of the number of blocks
self.ablockno = 0 self.ablockno = Atomic(0)
self.progressCallback = progressCallback self.progressCallback = progressCallback
@ -87,14 +88,14 @@ class Recording:
logging.debug('Starting record....') logging.debug('Starting record....')
self.stop = False self.stop = Atomic(False)
self.indh = InDataHandler(streammgr, self.inCallback) self.indh = InDataHandler(streammgr, self.inCallback)
if wait: if wait:
logging.debug('Stop recording with CTRL-C') logging.debug('Stop recording with CTRL-C')
try: try:
while not self.stop: while not self.stop():
time.sleep(0.01) time.sleep(0.01)
except KeyboardInterrupt: except KeyboardInterrupt:
logging.debug("Keyboard interrupt on record") logging.debug("Keyboard interrupt on record")
@ -176,8 +177,10 @@ class Recording:
remove the queue from the stream, etc. remove the queue from the stream, etc.
""" """
assert self.stop() == True
logging.debug('Recording::finish()') logging.debug('Recording::finish()')
smgr = self.smgr smgr = self.smgr
self.indh = None self.indh = None
# TODO: Fix when video # TODO: Fix when video
@ -185,11 +188,6 @@ class Recording:
# smgr.removeCallback(self.vCallback, AvType.video_input) # smgr.removeCallback(self.vCallback, AvType.video_input)
# self.f['video_frame_positions'] = self.video_frame_positions # self.f['video_frame_positions'] = self.video_frame_positions
try:
smgr.removeInQueueListener(self.inq)
except Exception as e:
logging.error(f'Could not remove queue from smgr: {e}')
try: try:
# Close the recording file # Close the recording file
self.f.close() self.f.close()
@ -215,7 +213,7 @@ class Recording:
""" """
# logging.debug('Recording::__addTimeData()') # logging.debug('Recording::__addTimeData()')
if self.stop: if self.stop():
# Stop flag is raised. We stop recording here. # Stop flag is raised. We stop recording here.
return return
@ -254,7 +252,7 @@ class Recording:
if self.progressCallback is not None: if self.progressCallback is not None:
recstatus.done = True recstatus.done = True
self.progressCallback(recstatus) self.progressCallback(recstatus)
self.stop = True self.stop << True
return return
# Add the data to the file, and resize the audio data blocks # Add the data to the file, and resize the audio data blocks

View File

@ -58,14 +58,14 @@ class ReverbTime:
# Solve the least-squares problem, by creating a matrix of # Solve the least-squares problem, by creating a matrix of
A = np.hstack([x, np.ones(x.shape)]) A = np.hstack([x, np.ones(x.shape)])
print(A.shape) # print(A.shape)
print(points.shape) # print(points.shape)
# derivative is dB/s of increase/decrease # derivative is dB/s of increase/decrease
sol, residuals, rank, s = np.linalg.lstsq(A, points) sol, residuals, rank, s = np.linalg.lstsq(A, points)
print(f'sol: {sol}') # print(f'sol: {sol}')
# Derivative of the decay in dB/s # Derivative of the decay in dB/s
derivative = sol[0][0] derivative = sol[0][0]
@ -79,5 +79,5 @@ class ReverbTime:
'const': const, 'const': const,
'derivative': derivative, 'derivative': derivative,
'T60': T60} 'T60': T60}
print(res) # print(res)
return res return res

View File

@ -70,6 +70,7 @@ void init_daqconfiguration(py::module &m) {
daqconfig.def("match", &DaqConfiguration::match); daqconfig.def("match", &DaqConfiguration::match);
daqconfig.def_static("fromTOML", &DaqConfiguration::fromTOML);
daqconfig.def("toTOML", &DaqConfiguration::toTOML); daqconfig.def("toTOML", &DaqConfiguration::toTOML);
daqconfig.def_readwrite("inchannel_config", daqconfig.def_readwrite("inchannel_config",
&DaqConfiguration::inchannel_config); &DaqConfiguration::inchannel_config);

View File

@ -22,7 +22,7 @@ namespace py = pybind11;
/** /**
* @brief Generate a Numpy array from daqdata, does *NOT* create a copy of the * @brief Generate a Numpy array from daqdata, does *NOT* create a copy of the
* data!. * data!. Instead, it shares the data from the DaqData container.
* *
* @tparam T The type of the stored sample * @tparam T The type of the stored sample
* @param d The daqdata to convert * @param d The daqdata to convert
@ -47,17 +47,23 @@ template <typename T> py::array_t<T> getPyArray(DaqData &d) {
*/ */
return py::array_t<T>( return py::array_t<T>(
py::array::ShapeContainer({d.nframes, d.nchannels}), py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
py::array::StridesContainer( //
{sizeof(T), py::array::StridesContainer( // Strides
sizeof(T) * d.nframes}), /* Strides (in bytes) for each index */ {sizeof(T),
(T *)d.raw_ptr(), /* Pointer to buffer */ sizeof(T) * d.nframes}), // Strides (in bytes) for each index
dummyDataOwner);
(T *)d.raw_ptr(), // Pointer to buffer
dummyDataOwner // As stated above, now Numpy does not take ownership of
// the data pointer.
);
} }
/** /**
* @brief Wraps the InDataHandler such that it calls a Python callback with a * @brief Wraps the InDataHandler such that it calls a Python callback with a
* buffer of sample data. * buffer of sample data. The Python callback is called from a different
* thread, using a Numpy argument as
*/ */
class PyIndataHandler : public InDataHandler { class PyIndataHandler : public InDataHandler {
/** /**
@ -68,28 +74,35 @@ class PyIndataHandler : public InDataHandler {
* @brief The thread that is handling callbacks from the queue * @brief The thread that is handling callbacks from the queue
*/ */
std::thread pythread; std::thread pythread;
boost::lockfree::spsc_queue<DaqData> dataqueue{RINGBUFFER_SIZE}; /**
* @brief The queue used to push elements to the handling thread.
*/
boost::lockfree::spsc_queue<DaqData,
boost::lockfree::capacity<RINGBUFFER_SIZE>>
dataqueue;
std::atomic<bool> stopThread{false}; std::atomic<bool> stopThread{false};
public: public:
~PyIndataHandler() { ~PyIndataHandler() {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
stop(); stop();
stopThread = true; stopThread = true;
pythread.join(); if(pythread.joinable()) {
pythread.join();
}
} }
PyIndataHandler(StreamMgr &mgr, py::function cb) PyIndataHandler(StreamMgr &mgr, py::function cb)
: InDataHandler(mgr), cb(cb), : InDataHandler(mgr), cb(cb),
pythread(&PyIndataHandler::threadfcn, this) { pythread(&PyIndataHandler::threadfcn, this) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
/// TODO: Note that if start() throws an exception, which means that the /// TODO: Note that if start() throws an exception, which means that the
/// destructor of PyIndataHandler is not called and the thread destructor /// destructor of PyIndataHandler is not called and the thread destructor
/// calls terminate(). It is a kind of rude way to crash, but it is also /// calls terminate(). It is a kind of rude way to crash, but it is also
/// *very* unlikely to happen, as start() does only add a reference to this /// *very* unlikely to happen, as start() does only add a reference to this
/// handler to a list in the stream mgr. /// handler to a list in the stream mgr.
start(); start();
} }
/** /**
* @brief Reads from the * @brief Reads from the
@ -110,23 +123,23 @@ public:
try { try {
py::array binfo; py::array binfo;
switch (d.dtype) { switch (d.dtype) {
case (DataType::dtype_int8): case (DataType::dtype_int8):
binfo = getPyArray<uint8_t>(d); binfo = getPyArray<uint8_t>(d);
break; break;
case (DataType::dtype_int16): case (DataType::dtype_int16):
binfo = getPyArray<uint16_t>(d); binfo = getPyArray<uint16_t>(d);
break; break;
case (DataType::dtype_int32): case (DataType::dtype_int32):
binfo = getPyArray<uint32_t>(d); binfo = getPyArray<uint32_t>(d);
break; break;
case (DataType::dtype_fl32): case (DataType::dtype_fl32):
binfo = getPyArray<float>(d); binfo = getPyArray<float>(d);
break; break;
case (DataType::dtype_fl64): case (DataType::dtype_fl64):
binfo = getPyArray<double>(d); binfo = getPyArray<double>(d);
break; break;
default: default:
throw std::runtime_error("BUG"); throw std::runtime_error("BUG");
} // End of switch } // End of switch
py::object bool_val = cb(binfo); py::object bool_val = cb(binfo);
@ -142,7 +155,7 @@ public:
} catch (py::cast_error &e) { } catch (py::cast_error &e) {
cerr << e.what() << endl; cerr << e.what() << endl;
cerr << "ERROR: Python callback does not return boolean value." cerr << "ERROR: Python callback does not return boolean value."
<< endl; << endl;
stopThread = true; stopThread = true;
} }
} }
@ -168,8 +181,8 @@ public:
if (!dataqueue.push(daqdata)) { if (!dataqueue.push(daqdata)) {
stopThread = true; stopThread = true;
cerr << "Pushing DaqData object failed. Probably the ringbuffer is " cerr << "Pushing DaqData object failed. Probably the ringbuffer is "
"full. Try reducing the load" "full. Try reducing the load"
<< endl; << endl;
} }
} }
return true; return true;

@ -1 +1 @@
Subproject commit 6a47ce703d2825dd1da0776c946f13c293e096a2 Subproject commit b1ecdf0ed86faeeaebe13d0db16ad04c20519527