Replaced github STL-Threadsafe with boost::lockfree. Added PyIndataHandle to glue Python callbacks to new data. First work on getting recording back to work.

This commit is contained in:
Anne de Jong 2022-07-29 09:32:26 +02:00
parent b35686f79d
commit 3160aacc07
20 changed files with 445 additions and 1292 deletions

3
.gitmodules vendored
View File

@ -13,3 +13,6 @@
[submodule "third_party/tomlplusplus"] [submodule "third_party/tomlplusplus"]
path = third_party/tomlplusplus path = third_party/tomlplusplus
url = https://github.com/marzer/tomlplusplus url = https://github.com/marzer/tomlplusplus
[submodule "third_party/lockfree"]
path = third_party/lockfree
url = https://github.com/boostorg/lockfree

View File

@ -7,7 +7,7 @@ configure_file(lasp_config.h.in lasp_config.h)
include_directories(${CMAKE_CURRENT_BINARY_DIR}) include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(SYSTEM ../../third_party/armadillo-code/include) include_directories(SYSTEM ../../third_party/armadillo-code/include)
include_directories(../../third_party/DebugTrace-cpp/include) include_directories(../../third_party/DebugTrace-cpp/include)
include_directories(../../third_party/STL-Threadsafe/include) include_directories(../../third_party/lockfreeThreadsafe/include)
include_directories(../../third_party/gsl-lite/include) include_directories(../../third_party/gsl-lite/include)
include_directories(../../third_party/tomlplusplus/include) include_directories(../../third_party/tomlplusplus/include)
@ -21,6 +21,7 @@ pybind11_add_module(lasp_cpp MODULE lasp_cpp.cpp
pybind11/lasp_streammgr.cpp pybind11/lasp_streammgr.cpp
pybind11/lasp_daq.cpp pybind11/lasp_daq.cpp
pybind11/lasp_deviceinfo.cpp pybind11/lasp_deviceinfo.cpp
pybind11/lasp_pyindatahandler.cpp
) )
target_link_libraries(lasp_cpp PRIVATE lasp_device_lib lasp_dsp_lib) target_link_libraries(lasp_cpp PRIVATE lasp_device_lib lasp_dsp_lib)

View File

@ -1,13 +1,20 @@
# Comments are what is imported, state of 6-8-2021 # Comments are what is imported, state of 6-8-2021
"""
LASP: Library for Acoustic Signal Processing
from .lasp_common import * # P_REF, FreqWeighting, TimeWeighting, getTime, getFreq, Qty, SIQtys, Window, lasp_shelve, this_lasp_shelve, W_REF, U_REF, I_REF, dBFS_REF, AvType """
from .lasp_avstream import * # StreamManager, ignoreSigInt, StreamStatus __version__ = "1.0"
from .lasp_atomic import * # Atomic
from .lasp_imptube import * # TwoMicImpedanceTube from .lasp_common import (P_REF, FreqWeighting, TimeWeighting, getTime,
getFreq, Qty, SIQtys, Window, lasp_shelve,
this_lasp_shelve, W_REF, U_REF, I_REF, dBFS_REF,
AvType)
from .lasp_cpp import *
# from .lasp_imptube import * # TwoMicImpedanceTube
from .lasp_measurement import * # Measurement, scaleBlockSens from .lasp_measurement import * # Measurement, scaleBlockSens
from .lasp_octavefilter import * # FirOctaveFilterBank, FirThirdOctaveFilterBank, OverallFilterBank, SosOctaveFilterBank, SosThirdOctaveFilterBank from .lasp_octavefilter import *
from .lasp_slm import * # SLM, Dummy # from .lasp_slm import * # SLM, Dummy
from .lasp_record import * # RecordStatus, Recording from .lasp_record import * # RecordStatus, Recording
from .lasp_siggen import * # SignalType, NoiseType, SiggenMessage, SiggenData, Siggen # from .lasp_siggen import * # SignalType, NoiseType, SiggenMessage, SiggenData, Siggen
from .lasp_weighcal import * # WeighCal # from .lasp_weighcal import * # WeighCal
from .tools import * # SmoothingType, smoothSpectralData, SmoothingWidth # from .tools import * # SmoothingType, smoothSpectralData, SmoothingWidth

View File

@ -1,4 +1,5 @@
#include "debugtrace.hpp" #include "debugtrace.hpp"
#include "lasp_daqconfig.h"
DEBUGTRACE_VARIABLES; DEBUGTRACE_VARIABLES;
@ -11,27 +12,25 @@ DEBUGTRACE_VARIABLES;
#endif #endif
using std::runtime_error; using std::runtime_error;
Daq::~Daq() {} Daq::~Daq() { DEBUGTRACE_ENTER; }
std::unique_ptr<Daq> Daq::createDaq(const DeviceInfo &devinfo, std::unique_ptr<Daq> Daq::createDaq(const DeviceInfo &devinfo,
const DaqConfiguration &config) { const DaqConfiguration &config) {
DEBUGTRACE_ENTER; DEBUGTRACE_ENTER;
int apicode = devinfo.api.apicode;
#if LASP_HAS_ULDAQ == 1 #if LASP_HAS_ULDAQ == 1
if (devinfo.api == uldaqapi) { if (devinfo.api.apicode == LASP_ULDAQ_APICODE) {
return createUlDaqDevice(devinfo, config); return createUlDaqDevice(devinfo, config);
} }
#endif #endif
#if LASP_HAS_RTAUDIO == 1 #if LASP_HAS_RTAUDIO == 1
else if (apicode >= 1 && apicode <= 5) { // See lasp_daqconfig.h:114 ALSA, up to
if (devinfo.api.apicode == LASP_RTAUDIO_APICODE) {
return createRtAudioDevice(devinfo, config); return createRtAudioDevice(devinfo, config);
} }
#endif #endif
else { throw std::runtime_error(string("Unable to match Device API: ") +
throw std::runtime_error(string("Unable to match API: ") +
devinfo.api.apiname); devinfo.api.apiname);
}
} }
Daq::Daq(const DeviceInfo &devinfo, const DaqConfiguration &config) Daq::Daq(const DeviceInfo &devinfo, const DaqConfiguration &config)
@ -46,14 +45,15 @@ Daq::Daq(const DeviceInfo &devinfo, const DaqConfiguration &config)
if (!config.match(devinfo)) { if (!config.match(devinfo)) {
throw std::runtime_error("DaqConfiguration does not match device info"); throw std::runtime_error("DaqConfiguration does not match device info");
} }
if(neninchannels(false) > ninchannels) { if (neninchannels(false) > ninchannels) {
throw std::runtime_error("Number of enabled input channels is higher than device capability"); throw std::runtime_error(
"Number of enabled input channels is higher than device capability");
}
if (nenoutchannels() > noutchannels) {
throw std::runtime_error(
"Number of enabled output channels is higher than device capability");
} }
if(nenoutchannels() > noutchannels) {
throw std::runtime_error("Number of enabled output channels is higher than device capability");
} }
}
double Daq::samplerate() const { double Daq::samplerate() const {
return availableSampleRates.at(sampleRateIndex); return availableSampleRates.at(sampleRateIndex);

View File

@ -107,24 +107,25 @@ public:
}; };
#if LASP_HAS_ULDAQ == 1 #if LASP_HAS_ULDAQ == 1
const us LASP_ULDAQ_APICODE = 0;
const DaqApi uldaqapi("UlDaq", 0); const DaqApi uldaqapi("UlDaq", 0);
#endif #endif
#if LASP_HAS_RTAUDIO == 1 #if LASP_HAS_RTAUDIO == 1
#include <RtAudio.h> #include <RtAudio.h>
const us LASP_RTAUDIO_APICODE = 1;
const DaqApi rtaudioAlsaApi("RtAudio Linux ALSA", 1, RtAudio::Api::LINUX_ALSA); const DaqApi rtaudioAlsaApi("RtAudio Linux ALSA", 1, RtAudio::Api::LINUX_ALSA);
const DaqApi rtaudioPulseaudioApi("RtAudio Linux Pulseaudio", 2, const DaqApi rtaudioPulseaudioApi("RtAudio Linux Pulseaudio", 1,
RtAudio::Api::LINUX_PULSE); RtAudio::Api::LINUX_PULSE);
const DaqApi rtaudioWasapiApi("RtAudio Windows Wasapi", 3, const DaqApi rtaudioWasapiApi("RtAudio Windows Wasapi", 1,
RtAudio::Api::WINDOWS_WASAPI); RtAudio::Api::WINDOWS_WASAPI);
const DaqApi rtaudioDsApi("RtAudio Windows DirectSound", 4, const DaqApi rtaudioDsApi("RtAudio Windows DirectSound", 1,
RtAudio::Api::WINDOWS_DS); RtAudio::Api::WINDOWS_DS);
const DaqApi rtaudioAsioApi("RtAudio Windows ASIO", 5, const DaqApi rtaudioAsioApi("RtAudio Windows ASIO", 1,
RtAudio::Api::WINDOWS_ASIO); RtAudio::Api::WINDOWS_ASIO);
#endif #endif
class DeviceInfo; class DeviceInfo;
/** /**
* @brief Stores channel configuration data for each channel. * @brief Stores channel configuration data for each channel.
*/ */
@ -177,7 +178,6 @@ public:
* output. * output.
*/ */
double digitalHighPassCutOn = -1; double digitalHighPassCutOn = -1;
}; };
/** /**
@ -293,6 +293,4 @@ public:
* enabled. * enabled.
*/ */
int getLowestOutChannel() const; int getLowestOutChannel() const;
}; };

View File

@ -20,27 +20,27 @@ public:
/** /**
* @brief The number of channels * @brief The number of channels
*/ */
const us nchannels; us nchannels;
/** /**
* @brief The number of frames in this block of data. * @brief The number of frames in this block of data.
*/ */
const us nframes; us nframes;
/** /**
* @brief The data type corresponding to a sample * @brief The data type corresponding to a sample
*/ */
const DataTypeDescriptor::DataType dtype; DataTypeDescriptor::DataType dtype;
/** /**
* @brief The data type description corresponding to a sample * @brief The data type description corresponding to a sample
*/ */
const DataTypeDescriptor &dtype_descr; DataTypeDescriptor dtype_descr;
/** /**
* @brief The number of bytes per sample (sample width, sw) * @brief The number of bytes per sample (sample width, sw)
*/ */
const us sw; us sw;
/** /**
* @brief Initialize an empty frame of data * @brief Initialize an empty frame of data
@ -51,7 +51,12 @@ public:
*/ */
DaqData(const us nchannels, const us nframes, DaqData(const us nchannels, const us nframes,
const DataTypeDescriptor::DataType dtype); const DataTypeDescriptor::DataType dtype);
/**
* @brief Initialize using no allocation
*/
DaqData() : DaqData(0, 0, DataTypeDescriptor::DataType::dtype_int8) {}
virtual ~DaqData() = default; virtual ~DaqData() = default;
DaqData& operator=(const DaqData&) = default;
/** /**
* @brief Return pointer to the raw data corresponding to a certain sample. * @brief Return pointer to the raw data corresponding to a certain sample.

View File

@ -1,21 +1,50 @@
#include "lasp_streammgr.h" #include "lasp_streammgr.h"
#include "debugtrace.hpp"
#include <algorithm> #include <algorithm>
#include <assert.h> #include <assert.h>
#include <functional> #include <functional>
#include "debugtrace.hpp" #include <iostream>
using std::cerr;
using std::endl;
InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) { InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) {
mgr.addInDataHandler(*this); DEBUGTRACE_ENTER;
}
void InDataHandler::start() {
DEBUGTRACE_ENTER;
_mgr.addInDataHandler(*this);
}
void InDataHandler::stop() {
#if LASP_DEBUG == 1
stopCalled = true;
#endif
_mgr.removeInDataHandler(*this);
}
InDataHandler::~InDataHandler() {
DEBUGTRACE_ENTER;
#if LASP_DEBUG == 1
if (!stopCalled) {
cerr << "************ BUG: Stop function not called while arriving at "
"InDataHandler's destructor. Fix this by calling "
"InDataHandler::stop() from the derived class' destructor."
<< endl;
}
#endif
} }
InDataHandler::~InDataHandler() { _mgr.removeInDataHandler(*this); }
StreamMgr &StreamMgr::getInstance() { StreamMgr &StreamMgr::getInstance() {
DEBUGTRACE_ENTER;
static StreamMgr mgr; static StreamMgr mgr;
return mgr; return mgr;
} }
StreamMgr::StreamMgr() {} StreamMgr::StreamMgr() { DEBUGTRACE_ENTER; }
bool StreamMgr::inCallback(const DaqData &data) { bool StreamMgr::inCallback(const DaqData &data) {
/* DEBUGTRACE_ENTER; */
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx); std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);
for (auto &handler : _inDataHandlers) { for (auto &handler : _inDataHandlers) {
@ -104,7 +133,10 @@ bool StreamMgr::outCallback(DaqData &data) {
return true; return true;
} }
StreamMgr::~StreamMgr() { stopAllStreams(); } StreamMgr::~StreamMgr() {
DEBUGTRACE_ENTER;
stopAllStreams();
}
void StreamMgr::stopAllStreams() { void StreamMgr::stopAllStreams() {
_inputStream.reset(); _inputStream.reset();
_outputStream.reset(); _outputStream.reset();
@ -125,12 +157,13 @@ void StreamMgr::startStream(const DeviceInfo &devinfo,
bool isDuplex = isInput && isOutput; bool isDuplex = isInput && isOutput;
if (!isInput && !isOutput) { if (!isInput && !isOutput) {
throw std::runtime_error( throw std::runtime_error("Neither input, nor output channels enabled for "
"Neither input, nor output channels enabled for stream. Cannotr start."); "stream. Cannotr start.");
} }
if ((isDuplex || isInput) && _inputStream) { if ((isDuplex || isInput) && _inputStream) {
throw std::runtime_error("Error: an input stream is already running. Please " throw std::runtime_error(
"Error: an input stream is already running. Please "
"first stop existing stream"); "first stop existing stream");
} else if (isOutput && _outputStream) { } else if (isOutput && _outputStream) {
throw std::runtime_error("Error: output stream is already running. Please " throw std::runtime_error("Error: output stream is already running. Please "
@ -143,12 +176,12 @@ void StreamMgr::startStream(const DeviceInfo &devinfo,
using namespace std::placeholders; using namespace std::placeholders;
std::unique_ptr<Daq> daq = Daq::createDaq(devinfo, config); std::unique_ptr<Daq> daq = Daq::createDaq(devinfo, config);
if(isInput) { if (isInput) {
inCallback = std::bind(&StreamMgr::inCallback, this, _1); inCallback = std::bind(&StreamMgr::inCallback, this, _1);
} }
if(isOutput) { if (isOutput) {
if(_siggen) { if (_siggen) {
_siggen->reset(daq->samplerate()); _siggen->reset(daq->samplerate());
} }
outCallback = std::bind(&StreamMgr::outCallback, this, _1); outCallback = std::bind(&StreamMgr::outCallback, this, _1);
@ -158,13 +191,12 @@ void StreamMgr::startStream(const DeviceInfo &devinfo,
daq->start(inCallback, outCallback); daq->start(inCallback, outCallback);
if(isInput) { if (isInput) {
_inputStream = std::move(daq); _inputStream = std::move(daq);
} else { } else {
assert(isOutput); assert(isOutput);
_outputStream = std::move(daq); _outputStream = std::move(daq);
} }
} }
void StreamMgr::addInDataHandler(InDataHandler &handler) { void StreamMgr::addInDataHandler(InDataHandler &handler) {

View File

@ -11,13 +11,16 @@ class InDataHandler {
protected: protected:
StreamMgr &_mgr; StreamMgr &_mgr;
#if LASP_DEBUG == 1
std::atomic<bool> stopCalled {false};
#endif
public: public:
virtual ~InDataHandler(); virtual ~InDataHandler();
/** /**
* @brief When constructed, the handler is added to the stream manager, which * @brief When constructed, the handler is added to the stream manager, which
* will call the handlers's inCallback() until the point of destruction. * will call the handlers's inCallback() until stop() is called.
* *
* @param mgr Stream manager. * @param mgr Stream manager.
*/ */
@ -31,6 +34,24 @@ public:
* @return true if no error. False to stop the stream from running. * @return true if no error. False to stop the stream from running.
*/ */
virtual bool inCallback(const DaqData &daqdata) = 0; virtual bool inCallback(const DaqData &daqdata) = 0;
/**
* @brief This function should be called from the constructor of the
* implementation of InDataHandler. It will start the stream's calling of
* inCallback().
*/
void start();
/**
* @brief This function should be called from the destructor of derived
* classes, to disable the calls to inCallback(), such that proper
* destruction of the object is allowed and no other threads call methods
* from the object. It removes the inCallback() from the callback list of the
* StreamMgr(). **Failing to call this function results in deadlocks, errors
* like "pure virtual function called", or other**.
*/
void stop();
}; };
/** /**
@ -40,17 +61,6 @@ public:
*/ */
class StreamMgr { class StreamMgr {
enum class StreamType : us {
/**
* @brief Input stream
*/
input = 1 << 0,
/**
* @brief Output stream
*/
output = 1 << 1,
};
/** /**
* @brief Storage for streams. * @brief Storage for streams.
*/ */
@ -80,6 +90,18 @@ class StreamMgr {
~StreamMgr(); ~StreamMgr();
public: public:
enum class StreamType : us {
/**
* @brief Input stream
*/
input = 1 << 0,
/**
* @brief Output stream
*/
output = 1 << 1,
};
StreamMgr(const StreamMgr &) = delete; StreamMgr(const StreamMgr &) = delete;
StreamMgr &operator=(const StreamMgr &) = delete; StreamMgr &operator=(const StreamMgr &) = delete;
@ -165,5 +187,4 @@ private:
* @param handler The handler to add. * @param handler The handler to add.
*/ */
void addInDataHandler(InDataHandler &handler); void addInDataHandler(InDataHandler &handler);
}; };

View File

@ -1,626 +0,0 @@
# -*- coding: utf-8 -*-
"""
Author: J.A. de Jong
Description: Controlling an audio stream in a different process.
"""
import logging
import multiprocessing as mp
# import cv2 as cv
import signal
import time
from dataclasses import dataclass
from enum import Enum, auto, unique
from typing import List
import numpy as np
from .device import Daq, DaqChannel, DaqConfiguration, DeviceInfo
from .filter import highpass
from .lasp_atomic import Atomic
from .lasp_common import AvType
from .lasp_multiprocessingpatch import apply_patch
from .wrappers import SosFilterBank
apply_patch()
__all__ = ["StreamManager", "ignoreSigInt", "StreamStatus"]
def ignoreSigInt():
"""
Ignore sigint signal. Should be set on all processes to let the main
process control this signal.
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
@dataclass
class StreamMetaData:
# Sample rate [Hz]
fs: float
# Input channels
in_ch: List[DaqChannel]
# Output channels
out_ch: List[DaqChannel]
# blocksize
blocksize: int
# The data type of input and output blocks.
dtype: np.dtype
@unique
class StreamMsg(Enum):
"""
First part, control messages that can be send to the stream
"""
startStream = auto()
stopStream = auto()
stopAllStreams = auto()
getStreamMetaData = auto()
endProcess = auto()
scanDaqDevices = auto()
"""
Second part, status messages that are send back on all listeners
"""
# "Normal messages"
deviceList = auto()
streamStarted = auto()
streamStopped = auto()
streamMetaData = auto()
streamData = auto()
# Error messages
# Some error occured, which mostly leads to a stop of the stream
streamError = auto()
# An error occured, but we recovered
streamTemporaryError = auto()
# A fatal error occured. This leads to serious errors in the application
streamFatalError = auto()
class AudioStream:
"""
Audio stream.
"""
def __init__(
self,
avtype: AvType,
devices: list,
daqconfig: DaqConfiguration,
processCallback: callable,
):
"""
Initializes the audio stream and tries to start it.
avtype: AvType
devices: List of device information
daqconfig: DaqConfiguration to used to generate audio stream backend
processCallback: callback function that will be called from a different
thread, with arguments (AudioStream, in
"""
logging.debug("AudioStream()")
# self.running = Atomic(False)
# self.aframectr = Atomic(0)
self.running = False
self.aframectr = 0
self.avtype = avtype
api_devices = devices[daqconfig.api]
self.processCallback = processCallback
matching_devices = [
device for device in api_devices if device.name == daqconfig.device_name
]
if len(matching_devices) == 0:
raise RuntimeError(f"Could not find device {daqconfig.device_name}")
# TODO: We pick te first one, what to do if we have multiple matches?
# Is that even possible?
device = matching_devices[0]
self.daq = Daq(device, daqconfig)
en_in_ch = daqconfig.getEnabledInChannels(include_monitor=True)
en_out_ch = daqconfig.getEnabledOutChannels()
if en_in_ch == 0 and en_out_ch == 0:
raise RuntimeError("No enabled input / output channels")
elif en_out_ch == 0 and avtype in (AvType.audio_duplex, AvType.audio_output):
raise RuntimeError("No enabled output channels")
elif en_in_ch == 0 and avtype in (AvType.audio_input, AvType.audio_duplex):
raise RuntimeError("No enabled input channels")
logging.debug("Ready to start device...")
samplerate = self.daq.start(self.streamCallback)
# Create required Highpass filters for incoming data
self.hpfs = [None] * len(en_in_ch)
for i, ch in enumerate(en_in_ch):
# Simple filter with a single bank and one section
if ch.highpass > 0:
fb = SosFilterBank(1, 1)
hpf = highpass(samplerate, ch.highpass, Q=np.sqrt(2) / 2)
fb.setFilter(0, hpf[None, :])
self.hpfs[i] = fb
self.streammetadata = StreamMetaData(
fs=samplerate,
in_ch=en_in_ch,
out_ch=en_out_ch,
blocksize=self.daq.nFramesPerBlock,
dtype=self.daq.getNumpyDataType(),
)
self.running = True
def streamCallback(self, indata, outdata, nframes):
"""
This is called (from a separate thread) for each block
of audio data.
"""
if not self.running:
return 1
self.aframectr += 1
# TODO: Fix this. This gives bug on Windows, the threading lock does
# give a strange erro.
try:
if not self.running:
return 1
except Exception as e:
print(e)
if indata is not None:
indata_filtered = np.empty_like(indata)
nchannels = indata.shape[1]
for i in range(nchannels):
# Filter each channel to the optional high-pass, which could also
# be an empty filter
if self.hpfs[i] is not None:
indata_float = indata[:, [i]].astype(np.float)
filtered_ch_float = self.hpfs[i].filter_(indata_float)
indata_filtered[:, i] = filtered_ch_float.astype(
self.streammetadata.dtype
)[:, 0]
else:
# One-to-one copy
indata_filtered[:, i] = indata[:, i]
else:
indata_filtered = indata
# rv = self.processCallback(self, indata, outdata)
rv = self.processCallback(self, indata_filtered, outdata)
if rv != 0:
self.running <<= False
return rv
def stop(self):
"""
Stop the DAQ stream. Should be called only once.
"""
daq = self.daq
self.daq = None
self.running <<= False
daq.stop()
self.streammetadata = None
class AvStreamProcess(mp.Process):
"""
Different process on which all audio streams are running.
"""
def __init__(self, pipe, msg_qlist, indata_qlist, outq):
"""
Args:
pipe: Message control pipe on which commands are received.
msg_qlist: List of queues on which stream status and events are
sent. Here, everything is send, except for the captured data
itself.
indata_qlist: List of queues on which captured data from a DAQ is
send. This one gets all events, but also captured data.
outq: On this queue, the stream process receives data to be send as
output to the devices.
"""
super().__init__()
self.pipe = pipe
self.msg_qlist = msg_qlist
self.indata_qlist = indata_qlist
self.outq = outq
self.devices = {}
self.daqconfigs = None
# In, out, duplex
self.streams = {t: None for t in list(AvType)}
# When this is set, a kill on the main process will also kill the
# siggen process. Highly wanted feature
self.daemon = True
def run(self):
"""
The actual function running in a different process.
"""
# First things first, ignore interrupt signals for THIS process
# https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Check for devices
self.rescanDaqDevices()
while True:
try:
msg, data = self.pipe.recv()
except OSError:
logging.error("Error with pipe, terminating process")
self.stopAllStreams()
self.terminate()
logging.debug(f"Streamprocess obtained message {msg}")
if msg == StreamMsg.scanDaqDevices:
self.rescanDaqDevices()
elif msg == StreamMsg.stopAllStreams:
self.stopAllStreams()
elif msg == StreamMsg.endProcess:
self.stopAllStreams()
# and.. exit!
return
elif msg == StreamMsg.getStreamMetaData:
(avtype,) = data
stream = self.streams[avtype]
if stream is not None:
self.sendAllQueues(
StreamMsg.streamMetaData, avtype, stream.streammetadata
)
else:
self.sendAllQueues(StreamMsg.streamMetaData, avtype, None)
elif msg == StreamMsg.startStream:
avtype, daqconfig = data
self.startStream(avtype, daqconfig)
elif msg == StreamMsg.stopStream:
(avtype,) = data
self.stopStream(avtype)
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration):
"""
Start a stream, based on type and configuration
"""
self.stopRequiredExistingStreams(avtype)
# Empty the queue from existing stuff (puts the signal generator
# directly in action!).
if avtype in (AvType.audio_duplex, AvType.audio_output):
while not self.outq.empty():
self.outq.get()
try:
stream = AudioStream(avtype, self.devices, daqconfig, self.streamCallback)
self.streams[avtype] = stream
self.sendAllQueues(StreamMsg.streamStarted, avtype, stream.streammetadata)
except Exception as e:
self.sendAllQueues(
StreamMsg.streamError, avtype, f"Error starting stream: {str(e)}"
)
return
def stopStream(self, avtype: AvType):
"""
Stop an existing stream, and sets the attribute in the list of streams
to None
Args:
stream: AudioStream instance
"""
stream = self.streams[avtype]
if stream is not None:
try:
stream.stop()
self.sendAllQueues(StreamMsg.streamStopped, stream.avtype)
except Exception as e:
self.sendAllQueues(
StreamMsg.streamError,
stream.avtype,
f"Error occured in stopping stream: {str(e)}",
)
self.streams[avtype] = None
def stopRequiredExistingStreams(self, avtype: AvType):
"""
Stop all existing streams that conflict with the current avtype
"""
if avtype == AvType.audio_input:
# For a new input, duplex and input needs to be stopped
stream_to_stop = (AvType.audio_input, AvType.audio_duplex)
elif avtype == AvType.audio_output:
# For a new output, duplex and output needs to be stopped
stream_to_stop = (AvType.audio_output, AvType.audio_duplex)
elif avtype == AvType.audio_duplex:
# All others have to stop
stream_to_stop = list(AvType) # All of them
else:
raise ValueError("BUG")
for stream in stream_to_stop:
if stream is not None:
self.stopStream(stream)
def stopAllStreams(self):
"""
Stops all streams
"""
for key in self.streams.keys():
self.stopStream(key)
def isStreamRunning(self, avtype: AvType = None):
"""
Check whether a stream is running
Args:
avtype: The stream type to check whether it is still running. If
None, it checks all streams.
Returns:
True if a stream is running, otherwise false
"""
if avtype is None:
avtype = list(AvType)
else:
avtype = (avtype,)
for t in avtype:
if self.streams[t] is not None and self.streams[t].running():
return True
return False
def rescanDaqDevices(self):
"""
Rescan the available DaQ devices.
"""
if self.isStreamRunning():
self.sendAllQueues(
StreamMsg.streamError,
None,
"A stream is running, cannot rescan DAQ devices.",
)
return
self.devices = Daq.getDeviceInfo()
self.sendAllQueues(StreamMsg.deviceList, self.devices)
def streamCallback(self, audiostream, indata, outdata):
"""This is called (from a separate thread) for each audio block."""
# logging.debug('streamCallback()')
if outdata is not None:
if not self.outq.empty():
newdata = self.outq.get()
if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1:
msgtxt = "Invalid output data obtained from queue"
logging.fatal(msgtxt)
self.sendAllQueues(
StreamMsg.streamFatalError, audiostream.avtype, msgtxt
)
return 1
outdata[:, :] = newdata[:, None]
else:
msgtxt = "Signal generator buffer underflow. Signal generator cannot keep up with data generation."
# logging.error(msgtxt)
self.sendAllQueues(
StreamMsg.streamTemporaryError, audiostream.avtype, msgtxt
)
outdata[:, :] = 0
if indata is not None:
self.sendInQueues(StreamMsg.streamData, indata)
return 0
# Wrapper functions that safe some typing, they do not require an
# explanation.
def sendInQueues(self, msg, *data):
# logging.debug('sendInQueues()')
try:
for q in self.indata_qlist:
# Fan out the input data to all queues in the queue list
q.put((msg, data))
except ValueError:
logging.error("Error with data queue, terminating process")
self.stopAllStreams()
self.terminate()
def sendAllQueues(self, msg, *data):
"""
Destined for all queues, including capture data queues
"""
self.sendInQueues(msg, *data)
try:
for q in self.msg_qlist:
# Fan out the input data to all queues in the queue list
q.put((msg, data))
except ValueError:
logging.error("Error with data queue, terminating process")
self.stopAllStreams()
self.terminate()
@dataclass
class StreamStatus:
lastStatus: StreamMsg = StreamMsg.streamStopped
errorTxt: str = None
streammetadata: StreamMetaData = None
class StreamManager:
"""
Audio and video data stream manager, to which queus can be added
"""
def __init__(self):
"""Open a stream for audio in/output and video input. For audio output,"""
# Initialize streamstatus
self.streamstatus = {t: StreamStatus() for t in list(AvType)}
self.devices = None
# Multiprocessing manager, pipe, output queue, input queue,
self.manager = mp.managers.SyncManager()
# Start this manager and ignore interrupts
# https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing
self.manager.start(ignoreSigInt)
# List of queues for all entities that require 'microphone' or input
# data. We need a local list, to manage listener queues, as the queues
# which are in the manager list get a new object id. The local list is
# used to find the index in the manager queues list upon deletion by
# 'removeListener()'
self.indata_qlist = self.manager.list([])
self.indata_qlist_local = []
self.msg_qlist = self.manager.list([])
self.msg_qlist_local = []
# Queue used for signal generator data
self.outq = self.manager.Queue()
# Messaging pipe
self.pipe, child_pipe = mp.Pipe(duplex=True)
# This is the queue on which this class listens for stream process
# messages.
self.our_msgqueue = self.addMsgQueueListener()
# Create the stream process
self.streamProcess = AvStreamProcess(
child_pipe, self.msg_qlist, self.indata_qlist, self.outq
)
self.streamProcess.start()
def scanDaqDevices(self):
"""
Output the message to the stream process to rescan the list of devices
"""
self.sendPipe(StreamMsg.scanDaqDevices, None)
def getStreamStatus(self, avtype: AvType):
"""
Sends a request for the stream status over the pipe, for given AvType
"""
self.sendPipe(StreamMsg.getStreamMetaData, avtype)
def getOutputQueue(self):
"""
Returns the output queue object.
Note, should (of course) only be used by one signal generator at the time!
"""
return self.outq
def addMsgQueueListener(self):
"""
Add a listener queue to the list of message queues, and return the
queue.
Returns:
listener queue
"""
newqueue = self.manager.Queue()
self.msg_qlist.append(newqueue)
self.msg_qlist_local.append(newqueue)
return newqueue
def removeMsgQueueListener(self, queue):
"""
Remove an input listener queue from the message queue list.
"""
# Uses a local queue list to find the index, based on the queue
idx = self.msg_qlist_local.index(queue)
del self.msg_qlist_local[idx]
del self.msg_qlist[idx]
def addInQueueListener(self):
"""
Add a listener queue to the list of queues, and return the queue.
Returns:
listener queue
"""
newqueue = self.manager.Queue()
self.indata_qlist.append(newqueue)
self.indata_qlist_local.append(newqueue)
return newqueue
def removeInQueueListener(self, queue):
"""
Remove an input listener queue from the queue list.
"""
# Uses a local queue list to find the index, based on the queue
idx = self.indata_qlist_local.index(queue)
del self.indata_qlist[idx]
del self.indata_qlist_local[idx]
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration):
"""
Start the stream, which means the callbacks are called with stream
data (audio/video)
Args:
wait: Wait until the stream starts talking before returning from
this function.
"""
logging.debug("Starting stream...")
self.sendPipe(StreamMsg.startStream, avtype, daqconfig)
def stopStream(self, avtype: AvType):
logging.debug(f"StreamManager::stopStream({avtype})")
self.sendPipe(StreamMsg.stopStream, avtype)
def stopAllStreams(self):
self.sendPipe(StreamMsg.stopAllStreams)
def cleanup(self):
"""
Stops the stream if it is still running, and after that, it stops the
stream process.
This method SHOULD always be called before removing a AvStream object.
Otherwise things will wait forever...
"""
self.sendPipe(StreamMsg.endProcess, None)
logging.debug("Joining stream process...")
self.streamProcess.join()
logging.debug("Joining stream process done")
def hasVideo(self):
"""
Stub, TODO: for future
"""
return False
def sendPipe(self, msg, *data):
"""
Send a message with data over the control pipe
"""
self.pipe.send((msg, data))

View File

@ -6,6 +6,7 @@ void init_deviceinfo(py::module& m);
void init_daqconfiguration(py::module& m); void init_daqconfiguration(py::module& m);
void init_daq(py::module& m); void init_daq(py::module& m);
void init_streammgr(py::module& m); void init_streammgr(py::module& m);
void init_datahandler(py::module& m);
PYBIND11_MODULE(lasp_cpp, m) { PYBIND11_MODULE(lasp_cpp, m) {
@ -15,5 +16,6 @@ PYBIND11_MODULE(lasp_cpp, m) {
init_daqconfiguration(m); init_daqconfiguration(m);
init_daq(m); init_daq(m);
init_streammgr(m); init_streammgr(m);
init_datahandler(m);
} }

View File

@ -46,8 +46,7 @@ from .lasp_config import LASP_NUMPY_FLOAT_TYPE
from scipy.io import wavfile from scipy.io import wavfile
import os, time, wave, logging import os, time, wave, logging
from .lasp_common import SIQtys, Qty, getFreq from .lasp_common import SIQtys, Qty, getFreq
from .device import DaqChannel from .lasp_cpp import Window, DaqChannel
from .wrappers import AvPowerSpectra, Window, PowerSpectra
def getSampWidth(dtype): def getSampWidth(dtype):

View File

@ -1,57 +0,0 @@
# -*- coding: utf-8 -*-
"""
Author: J.A. de Jong
Description: MonkeyPatch required to let the Multiprocessing library work properly.
Should be applied prior to running any other multiprocessing code. Comes from
Stackoverflow and is mainly used for managing a list of queues that can be
shared between processes.
For more information, see:
https://stackoverflow.com/questions/46779860/multiprocessing-managers-and-custom-classes
"""
from multiprocessing import managers
import logging
from functools import wraps
from inspect import signature
orig_AutoProxy = managers.AutoProxy
__all__ = ['apply_patch']
@wraps(managers.AutoProxy)
def AutoProxy(*args, incref=True, manager_owned=False, **kwargs):
# Create the autoproxy without the manager_owned flag, then
# update the flag on the generated instance. If the manager_owned flag
# is set, `incref` is disabled, so set it to False here for the same
# result.
autoproxy_incref = False if manager_owned else incref
proxy = orig_AutoProxy(*args, incref=autoproxy_incref, **kwargs)
proxy._owned_by_manager = manager_owned
return proxy
def apply_patch():
if "manager_owned" in signature(managers.AutoProxy).parameters:
return
logging.debug("Patching multiprocessing.managers.AutoProxy to add manager_owned")
managers.AutoProxy = AutoProxy
# re-register any types already registered to SyncManager without a custom
# proxy type, as otherwise these would all be using the old unpatched AutoProxy
SyncManager = managers.SyncManager
registry = managers.SyncManager._registry
for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items():
if proxytype is not orig_AutoProxy:
continue
create_method = hasattr(managers.SyncManager, typeid)
SyncManager.register(
typeid,
callable=callable,
exposed=exposed,
method_to_typeid=method_to_typeid,
create_method=create_method,
)

View File

@ -3,25 +3,29 @@
""" """
Read data from stream and record sound and video at the same time Read data from stream and record sound and video at the same time
""" """
import dataclasses, logging, os, time, h5py import dataclasses
from .lasp_avstream import StreamManager, StreamMetaData, StreamMsg import logging
from .lasp_common import AvType import os
import time
import h5py
from .lasp_cpp import InDataHandler, StreamMgr
@dataclasses.dataclass @dataclasses.dataclass
class RecordStatus: class RecordStatus:
curT: float curT: float = 0
done: bool done: bool = False
class Recording: class Recording:
""" """
Class used to perform a recording. Class used to perform a recording.
""" """
def __init__(self, fn: str, streammgr: StreamManager,
def __init__(self, fn: str, streammgr: StreamMgr,
rectime: float = None, wait: bool = True, rectime: float = None, wait: bool = True,
progressCallback=None, progressCallback=None,
startDelay: float=0): startDelay: float = 0):
""" """
Start a recording. Blocks if wait is set to True. Start a recording. Blocks if wait is set to True.
@ -44,68 +48,104 @@ class Recording:
self.smgr = streammgr self.smgr = streammgr
self.metadata = None self.metadata = None
assert startDelay >= 0 if startDelay < 0:
raise RuntimeError("Invalid start delay value. Should be >= 0")
self.startDelay = startDelay self.startDelay = startDelay
# Flag used to indicate that we have passed the start delay # Flag used to indicate that we have passed the start delay
self.startDelay_passed = False self.startDelay_passed = False
# The amount of seconds (float) that is to be recorded
self.rectime = rectime self.rectime = rectime
# The file name to store data to
self.fn = fn self.fn = fn
self.video_frame_positions = []
self.curT_rounded_to_seconds = 0 self.curT_rounded_to_seconds = 0
# Counter of the number of blocks # Counter of the number of blocks
self.ablockno = 0 self.ablockno = 0
self.vframeno = 0
self.progressCallback = progressCallback self.progressCallback = progressCallback
self.wait = wait
# Open the file
self.f = h5py.File(self.fn, 'w') self.f = h5py.File(self.fn, 'w')
f = self.f
# This flag is used to delete the file on finish(), and can be used # This flag is used to delete the file on finish(), and can be used
# when a recording is canceled. # when a recording is canceled.
self.deleteFile = False self.deleteFile = False
try:
# Input queue
self.inq = streammgr.addInQueueListener()
except RuntimeError:
# Cleanup stuff, something is going wrong when starting the stream
try:
self.f.close()
except Exception as e:
logging.error(
'Error preliminary closing measurement file {fn}: {str(e)}')
self.__deleteFile()
raise
# Try to obtain stream metadata # Try to obtain stream metadata
streammgr.getStreamStatus(AvType.audio_input) streamstatus = streammgr.getStreamStatus(StreamMgr.StreamType.input)
streammgr.getStreamStatus(AvType.audio_duplex) if not streamstatus.runningOK():
raise RuntimeError(
"Stream is not running properly. Please first start the stream")
self.ad = None self.ad = None
logging.debug('Starting record....') logging.debug('Starting record....')
# TODO: Fix this later when we want video
# if stream.hasVideo():
# stream.addCallback(self.aCallback, AvType.audio_input)
self.stop = False self.stop = False
if self.wait: self.indh = InDataHandler(streammgr, self.inCallback)
if wait:
logging.debug('Stop recording with CTRL-C') logging.debug('Stop recording with CTRL-C')
try: try:
while not self.stop: while not self.stop:
self.handleQueue()
time.sleep(0.01) time.sleep(0.01)
except KeyboardInterrupt: except KeyboardInterrupt:
logging.debug("Keyboard interrupt on record") logging.debug("Keyboard interrupt on record")
finally: finally:
self.finish() self.finish()
def handleQueue(self): def firstFrames(self, data):
"""
When the first frames arrive, we setup the file and add all metadata,
and make ready for storing data.
"""
daq = self.smgr.getDaq(StreamMgr.StreamType.input)
dtype = daq.dtype()
in_ch = daq.inchannel_config
blocksize = daq.framesPerBlock()
self.blocksize = blocksize
f = self.f
# Set the bunch of attributes
f.attrs['samplerate'] = daq.samplerate()
f.attrs['nchannels'] = daq.neninchannels()
f.attrs['blocksize'] = blocksize
f.attrs['sensitivity'] = [ch.sensitivity for ch in in_ch]
f.attrs['channelNames'] = [ch.name for ch in in_ch]
# Add the start delay here, as firstFrames() is called right after the
# constructor is called.
f.attrs['time'] = time.time() + self.startDelay
nchannels = len(in_ch)
self.ad = f.create_dataset('audio',
(1, blocksize, nchannels),
dtype=data.dtype,
maxshape=(
None, # This means, we can add blocks
# indefinitely
blocksize,
nchannels),
compression='gzip'
)
self.fs = daq.samplerate()
# Measured physical quantity metadata
# f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch]
# In V2, we do not store JSON metadata anymore, but just an enumeration
# index to a physical quantity.
f.attrs['qtys_idx'] = [ch.qty for ch in in_ch]
def inCallback(self, adata):
""" """
This method should be called to grab data from the input queue, which This method should be called to grab data from the input queue, which
is filled by the stream, and put it into a file. It should be called at is filled by the stream, and put it into a file. It should be called at
@ -116,92 +156,11 @@ class Recording:
""" """
# logging.debug('handleQueue()') if self.ad is None:
while self.inq.qsize() > 0: self.firstFrames(adata)
msg, data = self.inq.get()
# logging.debug(f'Obtained message: {msg}')
if msg == StreamMsg.streamData:
samples, = data
self.__addTimeData(samples)
elif msg == StreamMsg.streamStarted:
logging.debug(f'handleQueue obtained message {msg}')
avtype, metadata = data
if metadata is None:
raise RuntimeError('BUG: no stream metadata')
if avtype in (AvType.audio_duplex, AvType.audio_input):
self.processStreamMetaData(metadata)
elif msg == StreamMsg.streamMetaData:
logging.debug(f'handleQueue obtained message {msg}')
avtype, metadata = data
if metadata is not None:
self.processStreamMetaData(metadata)
elif msg == StreamMsg.streamTemporaryError:
pass
else:
logging.debug(f'handleQueue obtained message {msg}')
# An error occured, we do not remove the file, but we stop.
self.stop = True
logging.debug(f'Stream message: {msg}. Recording stopped unexpectedly')
raise RuntimeError('Recording stopped unexpectedly')
self.__addTimeData(adata)
def processStreamMetaData(self, md: StreamMetaData):
"""
Stream metadata has been catched. This is used to set all metadata in
the measurement file
"""
logging.debug('Recording::processStreamMetaData()')
if self.metadata is not None:
# Metadata already obtained. We check whether the new metadata is
# compatible. Otherwise an error occurs
if md != self.metadata:
raise RuntimeError('BUG: Incompatible stream metadata!')
return
# The 'Audio' dataset as specified in lasp_measurement, where data is
# sent to. We use gzip as compression, this gives moderate a moderate
# compression to the data.
f = self.f
blocksize = md.blocksize
nchannels = len(md.in_ch)
self.ad = f.create_dataset('audio',
(1, blocksize, nchannels),
dtype=md.dtype,
maxshape=(
None, # This means, we can add blocks
# indefinitely
blocksize,
nchannels),
compression='gzip'
)
# TODO: This piece of code is not up-to-date and should be changed at a
# later instance once we really want to record video simultaneously
# with audio.
# if smgr.hasVideo():
# video_x, video_y = smgr.video_x, smgr.video_y
# self.vd = f.create_dataset('video',
# (1, video_y, video_x, 3),
# dtype='uint8',
# maxshape=(
# None, video_y, video_x, 3),
# compression='gzip'
# )
# Set the bunch of attributes
f.attrs['samplerate'] = md.fs
f.attrs['nchannels'] = nchannels
f.attrs['blocksize'] = blocksize
f.attrs['sensitivity'] = [ch.sensitivity for ch in md.in_ch]
f.attrs['channelNames'] = [ch.channel_name for ch in md.in_ch]
f.attrs['time'] = time.time()
self.blocksize = blocksize
self.fs = md.fs
# Measured physical quantity metadata
f.attrs['qtys'] = [ch.qty.to_json() for ch in md.in_ch]
self.metadata = md
def setDelete(self, val: bool): def setDelete(self, val: bool):
""" """
@ -219,6 +178,7 @@ class Recording:
""" """
logging.debug('Recording::finish()') logging.debug('Recording::finish()')
smgr = self.smgr smgr = self.smgr
self.indh = None
# TODO: Fix when video # TODO: Fix when video
# if smgr.hasVideo(): # if smgr.hasVideo():
@ -259,16 +219,6 @@ class Recording:
# Stop flag is raised. We stop recording here. # Stop flag is raised. We stop recording here.
return return
# The current time that is recorded and stored into the file, without
# the new data
if not self.metadata:
# We obtained stream data, but metadata is not yet available.
# Therefore, we request it explicitly and then we return
logging.info('Requesting stream metadata')
self.smgr.getStreamStatus(AvType.audio_input)
self.smgr.getStreamStatus(AvType.audio_duplex)
return
curT = self.ablockno*self.blocksize/self.fs curT = self.ablockno*self.blocksize/self.fs
# Increase the block counter # Increase the block counter
@ -280,7 +230,8 @@ class Recording:
elif curT >= 0 and not self.startDelay_passed: elif curT >= 0 and not self.startDelay_passed:
# Start delay passed, switch the flag! # Start delay passed, switch the flag!
self.startDelay_passed = True self.startDelay_passed = True
# Reset the audio block counter and the time
# Reset the audio block counter and the recording time
self.ablockno = 1 self.ablockno = 1
curT = 0 curT = 0
@ -310,10 +261,3 @@ class Recording:
self.ad.resize(self.ablockno, axis=0) self.ad.resize(self.ablockno, axis=0)
self.ad[self.ablockno-1, :, :] = indata self.ad[self.ablockno-1, :, :] = indata
# def _vCallback(self, frame, framectr):
# self.video_frame_positions.append(self.ablockno())
# vframeno = self.vframeno
# self.vd.resize(vframeno+1, axis=0)
# self.vd[vframeno, :, :] = frame
# self.vframeno += 1

View File

@ -1,369 +0,0 @@
#!/usr/bin/env python3.6
# -*- coding: utf-8 -*-
"""
Author: J.A. de Jong - ASCEE
Description: Signal generator code
"""
import multiprocessing as mp
import dataclasses
import logging
from typing import Tuple
import numpy as np
from .filter import PinkNoise
from .lasp_octavefilter import SosOctaveFilterBank, SosThirdOctaveFilterBank
from .filter import OctaveBankDesigner, PinkNoise, ThirdOctaveBankDesigner
from .lasp_avstream import StreamManager, ignoreSigInt
from .wrappers import Siggen as pyxSiggen, Equalizer
from enum import Enum, unique, auto
QUEUE_BUFFER_TIME = 0.5 # The amount of time used in the queues for buffering
# of data, larger is more stable, but also enlarges latency
__all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"]
@unique
class SignalType(Enum):
Periodic = 0
Noise = 1
Sweep = 2
Meas = 3
@unique
class NoiseType(Enum):
white = "White noise"
pink = "Pink noise"
def __str__(self):
return str(self.value)
@staticmethod
def fillComboBox(combo):
for type_ in list(NoiseType):
combo.addItem(str(type_))
@staticmethod
def getCurrent(cb):
return list(NoiseType)[cb.currentIndex()]
class SiggenWorkerDone(Exception):
def __str__(self):
return "Done generating signal"
@unique
class SiggenMessage(Enum):
"""
Different messages that can be send to the signal generator over the pipe
connection.
"""
endProcess = auto() # Stop and quit the signal generator
adjustVolume = auto() # Adjust the volume
newEqSettings = auto() # Forward new equalizer settings
newSiggenData = auto() # Forward new equalizer settings
ready = auto() # Send out once, once the signal generator is ready with
# pre-generating data.
mute = auto() # Mute / unmute siggen
# These messages are send back to the main thread over the pipe
error = auto()
done = auto()
@dataclasses.dataclass
class SiggenData:
"""
Metadata used to create a Signal Generator
"""
fs: float # Sample rate [Hz]
# Number of frames "samples" to send in one block
nframes_per_block: int
# The data type to output
dtype: np.dtype
# Muted?
muted: bool
# Level of output signal [dBFS]el
level_dB: float
# Signal type specific data, i.e.
signaltype: SignalType
signaltypedata: Tuple = None
# Settings for the equalizer etc
eqdata: object = None # Equalizer data
class SiggenProcess(mp.Process):
"""
Main function running in a different process, is responsible for generating
new signal data. Uses the signal queue to push new generated signal data
on.
"""
def __init__(self, siggendata, dataq, pipe):
"""
Args:
siggendata: The signal generator data to start with.
dataq: The queue to put generated signal on
pipe: Control and status messaging pipe
"""
super().__init__()
# When this is set, a kill on the main process will also kill the
# siggen process. Highly wanted feature
self.daemon = True
self.dataq = dataq
self.siggendata = siggendata
self.pipe = pipe
self.eq = None
self.siggen = None
fs = self.siggendata.fs
nframes_per_block = siggendata.nframes_per_block
self.nblocks_buffer = max(
1, int(QUEUE_BUFFER_TIME * fs/ nframes_per_block)
)
def newSiggen(self, siggendata: SiggenData):
"""
Create a signal generator based on parameters specified in global
function data.
Args:
siggendata: SiggenData. Metadata to create a new signal generator.
"""
logging.debug('newSiggen')
fs = siggendata.fs
nframes_per_block = siggendata.nframes_per_block
level_dB = siggendata.level_dB
signaltype = siggendata.signaltype
signaltypedata = siggendata.signaltypedata
# Muted state
self.muted = siggendata.muted
if signaltype == SignalType.Periodic:
freq, = signaltypedata
siggen = pyxSiggen.sineWave(fs, freq, level_dB)
elif signaltype == SignalType.Noise:
noisetype, zerodBpoint = signaltypedata
if noisetype == NoiseType.white:
sos_colorfilter = None
elif noisetype == NoiseType.pink:
sos_colorfilter = PinkNoise(fs, zerodBpoint).flatten()
else:
raise ValueError(f"Unknown noise type")
siggen = pyxSiggen.noise(fs, level_dB, sos_colorfilter)
elif signaltype == SignalType.Sweep:
fl, fu, Ts, Tq, sweep_flags = signaltypedata
siggen = pyxSiggen.sweep(fs, fl, fu, Ts, Tq, sweep_flags, level_dB)
else:
raise ValueError(f"Not implemented signal type: {signaltype}")
logging.debug('newSiggen')
return siggen
def generate(self):
"""
Generate a single block of data and put it on the data queue
"""
signal = self.siggen.genSignal(self.siggendata.nframes_per_block)
dtype = self.siggendata.dtype
if self.eq is not None:
signal = self.eq.equalize(signal)
if np.issubdtype(dtype, np.integer):
bitdepth_fixed = dtype.itemsize * 8
signal *= 2 ** (bitdepth_fixed - 1) - 1
if self.muted:
# Mute it
signal *= 0
try:
self.dataq.put(signal.astype(dtype))
except ValueError:
# As of Python 3.8, a value error on a Queue means that the oter
# end of the process died.
logging.error("Error with data queue, terminating process")
self.terminate()
def newEqualizer(self, eqdata):
"""
Create an equalizer object from equalizer data
Args:
eqdata: dictionary containing equalizer data. TODO: document the
requiring fields.
"""
if eqdata is None:
return None
eq_type = eqdata['type']
eq_levels = eqdata['levels']
fs = self.siggendata.fs
if eq_type == 'three':
fb = SosThirdOctaveFilterBank(fs)
elif eq_type == 'one':
fb = SosOctaveFilterBank(fs)
eq = Equalizer(fb._fb)
if eq_levels is not None:
eq.setLevels(eq_levels)
return eq
def sendPipe(self, msgtype, msg):
try:
self.pipe.send((msgtype, msg))
except OSError:
logging.error("Error with pipe, terminating process")
self.terminate()
def run(self):
# The main function of the actual process
# First things first
ignoreSigInt()
try:
self.siggen = self.newSiggen(self.siggendata)
except Exception as e:
self.sendPipe(SiggenMessage.error, str(e))
try:
self.eq = self.newEqualizer(self.siggendata.eqdata)
except Exception as e:
self.sendPipe(SiggenMessage.error, str(e))
# Pre-generate blocks of signal data
while self.dataq.qsize() < self.nblocks_buffer:
self.generate()
self.sendPipe(SiggenMessage.ready, None)
while True:
# Wait here for a while, to check for messages to consume
if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 4):
try:
msg, data = self.pipe.recv()
except OSError:
logging.error("Error with pipe, terminating process")
self.terminate()
if msg == SiggenMessage.endProcess:
logging.debug("Signal generator caught 'endProcess' message. Exiting.")
return 0
elif msg == SiggenMessage.mute:
self.muted = data
elif msg == SiggenMessage.adjustVolume:
level_dB = data
logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS")
self.siggen.setLevel(level_dB)
elif msg == SiggenMessage.newEqSettings:
eqdata = data
self.eq = self.newEqualizer(eqdata)
elif msg == SiggenMessage.newSiggenData:
siggendata = data
self.siggen = self.newSiggen(siggendata)
else:
self.pipe.send(
SiggenMessage.error, "BUG: Generator caught unknown message. Quiting"
)
while self.dataq.qsize() < self.nblocks_buffer:
# Generate new data and put it in the queue!
try:
self.generate()
except SiggenWorkerDone:
self.sendPipe(SiggenMessage.done, None)
return 0
return 1
class Siggen:
"""
Signal generator class, generates signal data in a different process to
unload the work in the calling thread.
"""
def __init__(self, dataq, siggendata: SiggenData):
""""""
self.pipe, client_end = mp.Pipe(duplex=True)
self.stopped = False
self.process = SiggenProcess(siggendata, dataq, client_end)
self.process.start()
if not self.process.is_alive():
raise RuntimeError('Unexpected signal generator exception')
# Block waiting here for signal generator to be ready
msg, data = self.pipe.recv()
if msg == SiggenMessage.ready:
logging.debug('Signal generator ready')
elif msg == SiggenMessage.error:
e = data
raise RuntimeError(f'Signal generator exception: {str(e)}')
else:
# Done, or something
if msg == SiggenMessage.done:
self.stopped = True
self.handle_msgs()
def setLevel(self, new_level):
"""
Set a new signal level to the generator
Args:
new_level: The new level in [dBFS]
"""
self.pipe.send((SiggenMessage.adjustVolume, new_level))
def mute(self, mute):
self.pipe.send((SiggenMessage.mute, mute))
def setEqData(self, eqdata):
self.pipe.send((SiggenMessage.newEqSettings, eqdata))
def setSiggenData(self, siggendata: SiggenData):
"""
Updates the whole signal generator, based on new signal generator data.
"""
self.pipe.send((SiggenMessage.newSiggenData, siggendata))
def handle_msgs(self):
while self.pipe.poll():
msg, data = self.pipe.recv()
if msg == SiggenMessage.error:
raise RuntimeError(
f"Error in initialization of signal generator: {data}"
)
# elif msg == SiggenMessage.done:
# self.stop()
def cleanup(self):
logging.debug('Siggen::stop()')
self.pipe.send((SiggenMessage.endProcess, None))
self.pipe.close()
logging.debug('Joining siggen process')
self.process.join()
logging.debug('Joining siggen process done')
self.process.close()
self.process = None

View File

@ -10,11 +10,14 @@ void init_daq(py::module &m) {
/// Daq /// Daq
py::class_<Daq, DaqConfiguration, DeviceInfo> daq(m, "Daq"); py::class_<Daq, DaqConfiguration, DeviceInfo> daq(m, "Daq");
/// Daq::StreamStatus
py::class_<Daq::StreamStatus> ss(m, "StreamStatus"); py::class_<Daq::StreamStatus> ss(m, "StreamStatus");
ss.def("error", &Daq::StreamStatus::error); ss.def("error", &Daq::StreamStatus::error);
ss.def("runningOK", &Daq::StreamStatus::runningOK); ss.def("runningOK", &Daq::StreamStatus::runningOK);
ss.def_readonly("isRunning", &Daq::StreamStatus::isRunning); ss.def_readonly("isRunning", &Daq::StreamStatus::isRunning);
/// Daq::StreamStatus::StreamError
py::enum_<Daq::StreamStatus::StreamError>(ss, "StreamError") py::enum_<Daq::StreamStatus::StreamError>(ss, "StreamError")
.value("noError", Daq::StreamStatus::StreamError::noError) .value("noError", Daq::StreamStatus::StreamError::noError)
.value("inputXRun", Daq::StreamStatus::StreamError::inputXRun) .value("inputXRun", Daq::StreamStatus::StreamError::inputXRun)
@ -27,6 +30,7 @@ void init_daq(py::module &m) {
Daq::StreamStatus::StreamError::apiSpecificError) Daq::StreamStatus::StreamError::apiSpecificError)
.export_values(); .export_values();
/// Daq
daq.def("neninchannels", &Daq::neninchannels); daq.def("neninchannels", &Daq::neninchannels);
daq.def("nenoutchannels", &Daq::nenoutchannels); daq.def("nenoutchannels", &Daq::nenoutchannels);
daq.def("samplerate", &Daq::samplerate); daq.def("samplerate", &Daq::samplerate);

View File

@ -59,6 +59,7 @@ void init_daqconfiguration(py::module &m) {
daqconfig.def(py::init<>()); daqconfig.def(py::init<>());
daqconfig.def(py::init<const DeviceInfo &>()); daqconfig.def(py::init<const DeviceInfo &>());
daqconfig.def_readwrite("sampleRateIndex", daqconfig.def_readwrite("sampleRateIndex",
&DaqConfiguration::sampleRateIndex); &DaqConfiguration::sampleRateIndex);
daqconfig.def_readwrite("dataTypeIndex", &DaqConfiguration::dataTypeIndex); daqconfig.def_readwrite("dataTypeIndex", &DaqConfiguration::dataTypeIndex);

View File

@ -0,0 +1,183 @@
#include "debugtrace.hpp"
#include "lasp_streammgr.h"
#include <atomic>
#include <boost/lockfree/spsc_queue.hpp>
#include <chrono>
#include <pybind11/buffer_info.h>
#include <pybind11/cast.h>
#include <pybind11/gil.h>
#include <pybind11/numpy.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <pybind11/stl.h>
#include <thread>
using namespace std::literals::chrono_literals;
using std::cerr;
using std::endl;
const us RINGBUFFER_SIZE = 1024;
namespace py = pybind11;
/**
* @brief Generate a Numpy array from daqdata, does *NOT* create a copy of the
* data!.
*
* @tparam T The type of the stored sample
* @param d The daqdata to convert
*
* @return Numpy array
*/
template <typename T> py::array_t<T> getPyArray(DaqData &d) {
// https://github.com/pybind/pybind11/issues/323
//
// When a valid object is passed as 'base', it tells pybind not to take
// ownership of the data, because 'base' will own it. In fact 'packet' will
// own it, but - psss! - , we don't tell it to pybind... Alos note that ANY
// valid object is good for this purpose, so I chose "str"...
py::str dummyDataOwner;
/*
* Signature:
array_t(ShapeContainer shape,
StridesContainer strides,
const T *ptr = nullptr,
handle base = handle());
*/
return py::array_t<T>(
py::array::ShapeContainer({d.nframes, d.nchannels}),
py::array::StridesContainer(
{sizeof(T),
sizeof(T) * d.nframes}), /* Strides (in bytes) for each index */
(T *)d.raw_ptr(), /* Pointer to buffer */
dummyDataOwner);
}
/**
* @brief Wraps the InDataHandler such that it calls a Python callback with a
* buffer of sample data.
*/
class PyIndataHandler : public InDataHandler {
/**
* @brief The callback function that is called.
*/
py::function cb;
/**
* @brief The thread that is handling callbacks from the queue
*/
std::thread pythread;
boost::lockfree::spsc_queue<DaqData> dataqueue{RINGBUFFER_SIZE};
std::atomic<bool> stopThread{false};
public:
~PyIndataHandler() {
DEBUGTRACE_ENTER;
stop();
stopThread = true;
pythread.join();
}
PyIndataHandler(StreamMgr &mgr, py::function cb)
: InDataHandler(mgr), cb(cb),
pythread(&PyIndataHandler::threadfcn, this) {
DEBUGTRACE_ENTER;
/// TODO: Note that if start() throws an exception, which means that the
/// destructor of PyIndataHandler is not called and the thread destructor
/// calls terminate(). It is a kind of rude way to crash, but it is also
/// *very* unlikely to happen, as start() does only add a reference to this
/// handler to a list in the stream mgr.
start();
}
/**
* @brief Reads from the
*/
void threadfcn() {
/* DEBUGTRACE_ENTER; */
using DataType = DataTypeDescriptor::DataType;
DaqData d{};
while (!stopThread) {
if (dataqueue.pop(d)) {
/* DEBUGTRACE_PRINT("pop() returned true"); */
py::gil_scoped_acquire acquire;
try {
py::array binfo;
switch (d.dtype) {
case (DataType::dtype_int8):
binfo = getPyArray<uint8_t>(d);
break;
case (DataType::dtype_int16):
binfo = getPyArray<uint16_t>(d);
break;
case (DataType::dtype_int32):
binfo = getPyArray<uint32_t>(d);
break;
case (DataType::dtype_fl32):
binfo = getPyArray<float>(d);
break;
case (DataType::dtype_fl64):
binfo = getPyArray<double>(d);
break;
default:
throw std::runtime_error("BUG");
} // End of switch
py::object bool_val = cb(binfo);
bool res = bool_val.cast<bool>();
if (!res)
stopThread = true;
} catch (py::error_already_set &e) {
cerr << "ERROR: Python raised exception from callback function: ";
cerr << e.what() << endl;
stopThread = true;
} catch (py::cast_error &e) {
cerr << e.what() << endl;
cerr << "ERROR: Python callback does not return boolean value."
<< endl;
stopThread = true;
}
}
// When there is nothing to pop from the queue, just sleep for a some
// time.
else {
std::this_thread::sleep_for(2ms);
}
} // end of while loop
} // end of threadfcn
/**
* @brief Pushes a copy of the daqdata to the thread queuue and returns
*
* @param daqdata the daq info to push
*
* @return true, to continue with sampling.
*/
virtual bool inCallback(const DaqData &daqdata) override {
/* DEBUGTRACE_ENTER; */
if (!stopThread) {
if (!dataqueue.push(daqdata)) {
stopThread = true;
cerr << "Pushing DaqData object failed. Probably the ringbuffer is "
"full. Try reducing the load"
<< endl;
}
}
return true;
}
};
void init_datahandler(py::module &m) {
py::class_<PyIndataHandler> h(m, "InDataHandler");
h.def(py::init<StreamMgr &, py::function>());
}

View File

@ -1,5 +1,6 @@
#include "lasp_streammgr.h" #include "lasp_streammgr.h"
#include <pybind11/numpy.h> #include <pybind11/numpy.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h> #include <pybind11/stl.h>
#include <stdint.h> #include <stdint.h>
@ -11,6 +12,11 @@ void init_streammgr(py::module &m) {
py::class_<StreamMgr, std::unique_ptr<StreamMgr, py::nodelete>> smgr( py::class_<StreamMgr, std::unique_ptr<StreamMgr, py::nodelete>> smgr(
m, "StreamMgr"); m, "StreamMgr");
py::enum_<StreamMgr::StreamType>(smgr, "StreamType")
.value("input", StreamMgr::StreamType::input)
.value("output", StreamMgr::StreamType::output)
.export_values();
smgr.def("startStream", &StreamMgr::startStream); smgr.def("startStream", &StreamMgr::startStream);
smgr.def("stopStream", &StreamMgr::startStream); smgr.def("stopStream", &StreamMgr::startStream);
smgr.def_static("getInstance", []() { smgr.def_static("getInstance", []() {
@ -19,5 +25,4 @@ void init_streammgr(py::module &m) {
smgr.def("stopAllStreams", &StreamMgr::stopAllStreams); smgr.def("stopAllStreams", &StreamMgr::stopAllStreams);
smgr.def("setSiggen", &StreamMgr::setSiggen); smgr.def("setSiggen", &StreamMgr::setSiggen);
} }

@ -1 +0,0 @@
Subproject commit 08b2d9e7f487121088a817071d1d42b2736996e9

1
third_party/lockfree vendored Submodule

@ -0,0 +1 @@
Subproject commit fdd4d0632dd0904f6e9c656c45397fe8ef985bc9