From 3160aacc074563c7497508b1e949c40a220c557f Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Fri, 29 Jul 2022 09:32:26 +0200 Subject: [PATCH] Replaced github STL-Threadsafe with boost::lockfree. Added PyIndataHandle to glue Python callbacks to new data. First work on getting recording back to work. --- .gitmodules | 3 + src/lasp/CMakeLists.txt | 3 +- src/lasp/__init__.py | 25 +- src/lasp/device/lasp_daq.cpp | 50 +- src/lasp/device/lasp_daqconfig.h | 14 +- src/lasp/device/lasp_daqdata.h | 15 +- src/lasp/device/lasp_streammgr.cpp | 104 ++-- src/lasp/device/lasp_streammgr.h | 47 +- src/lasp/lasp_avstream.py | 626 -------------------- src/lasp/lasp_cpp.cpp | 2 + src/lasp/lasp_measurement.py | 3 +- src/lasp/lasp_multiprocessingpatch.py | 57 -- src/lasp/lasp_record.py | 222 +++---- src/lasp/lasp_siggen.py | 369 ------------ src/lasp/pybind11/lasp_daq.cpp | 4 + src/lasp/pybind11/lasp_daqconfiguration.cpp | 1 + src/lasp/pybind11/lasp_pyindatahandler.cpp | 183 ++++++ src/lasp/pybind11/lasp_streammgr.cpp | 7 +- third_party/STL-Threadsafe | 1 - third_party/lockfree | 1 + 20 files changed, 445 insertions(+), 1292 deletions(-) delete mode 100644 src/lasp/lasp_avstream.py delete mode 100644 src/lasp/lasp_multiprocessingpatch.py delete mode 100644 src/lasp/lasp_siggen.py create mode 100644 src/lasp/pybind11/lasp_pyindatahandler.cpp delete mode 160000 third_party/STL-Threadsafe create mode 160000 third_party/lockfree diff --git a/.gitmodules b/.gitmodules index d7155c2..61100c0 100644 --- a/.gitmodules +++ b/.gitmodules @@ -13,3 +13,6 @@ [submodule "third_party/tomlplusplus"] path = third_party/tomlplusplus url = https://github.com/marzer/tomlplusplus +[submodule "third_party/lockfree"] + path = third_party/lockfree + url = https://github.com/boostorg/lockfree diff --git a/src/lasp/CMakeLists.txt b/src/lasp/CMakeLists.txt index 671406a..13a993e 100644 --- a/src/lasp/CMakeLists.txt +++ b/src/lasp/CMakeLists.txt @@ -7,7 +7,7 @@ configure_file(lasp_config.h.in lasp_config.h) include_directories(${CMAKE_CURRENT_BINARY_DIR}) include_directories(SYSTEM ../../third_party/armadillo-code/include) include_directories(../../third_party/DebugTrace-cpp/include) -include_directories(../../third_party/STL-Threadsafe/include) +include_directories(../../third_party/lockfreeThreadsafe/include) include_directories(../../third_party/gsl-lite/include) include_directories(../../third_party/tomlplusplus/include) @@ -21,6 +21,7 @@ pybind11_add_module(lasp_cpp MODULE lasp_cpp.cpp pybind11/lasp_streammgr.cpp pybind11/lasp_daq.cpp pybind11/lasp_deviceinfo.cpp + pybind11/lasp_pyindatahandler.cpp ) target_link_libraries(lasp_cpp PRIVATE lasp_device_lib lasp_dsp_lib) diff --git a/src/lasp/__init__.py b/src/lasp/__init__.py index b879372..8a238b0 100644 --- a/src/lasp/__init__.py +++ b/src/lasp/__init__.py @@ -1,13 +1,20 @@ # Comments are what is imported, state of 6-8-2021 +""" +LASP: Library for Acoustic Signal Processing -from .lasp_common import * # P_REF, FreqWeighting, TimeWeighting, getTime, getFreq, Qty, SIQtys, Window, lasp_shelve, this_lasp_shelve, W_REF, U_REF, I_REF, dBFS_REF, AvType -from .lasp_avstream import * # StreamManager, ignoreSigInt, StreamStatus -from .lasp_atomic import * # Atomic -from .lasp_imptube import * # TwoMicImpedanceTube +""" +__version__ = "1.0" + +from .lasp_common import (P_REF, FreqWeighting, TimeWeighting, getTime, + getFreq, Qty, SIQtys, Window, lasp_shelve, + this_lasp_shelve, W_REF, U_REF, I_REF, dBFS_REF, + AvType) +from .lasp_cpp import * +# from .lasp_imptube import * # TwoMicImpedanceTube from .lasp_measurement import * # Measurement, scaleBlockSens -from .lasp_octavefilter import * # FirOctaveFilterBank, FirThirdOctaveFilterBank, OverallFilterBank, SosOctaveFilterBank, SosThirdOctaveFilterBank -from .lasp_slm import * # SLM, Dummy +from .lasp_octavefilter import * +# from .lasp_slm import * # SLM, Dummy from .lasp_record import * # RecordStatus, Recording -from .lasp_siggen import * # SignalType, NoiseType, SiggenMessage, SiggenData, Siggen -from .lasp_weighcal import * # WeighCal -from .tools import * # SmoothingType, smoothSpectralData, SmoothingWidth +# from .lasp_siggen import * # SignalType, NoiseType, SiggenMessage, SiggenData, Siggen +# from .lasp_weighcal import * # WeighCal +# from .tools import * # SmoothingType, smoothSpectralData, SmoothingWidth diff --git a/src/lasp/device/lasp_daq.cpp b/src/lasp/device/lasp_daq.cpp index 9a4821c..5a474aa 100644 --- a/src/lasp/device/lasp_daq.cpp +++ b/src/lasp/device/lasp_daq.cpp @@ -1,4 +1,5 @@ #include "debugtrace.hpp" +#include "lasp_daqconfig.h" DEBUGTRACE_VARIABLES; @@ -11,49 +12,48 @@ DEBUGTRACE_VARIABLES; #endif using std::runtime_error; -Daq::~Daq() {} +Daq::~Daq() { DEBUGTRACE_ENTER; } std::unique_ptr Daq::createDaq(const DeviceInfo &devinfo, - const DaqConfiguration &config) { + const DaqConfiguration &config) { DEBUGTRACE_ENTER; - int apicode = devinfo.api.apicode; #if LASP_HAS_ULDAQ == 1 - if (devinfo.api == uldaqapi) { + if (devinfo.api.apicode == LASP_ULDAQ_APICODE) { return createUlDaqDevice(devinfo, config); } #endif #if LASP_HAS_RTAUDIO == 1 - else if (apicode >= 1 && apicode <= 5) { + // See lasp_daqconfig.h:114 ALSA, up to + if (devinfo.api.apicode == LASP_RTAUDIO_APICODE) { return createRtAudioDevice(devinfo, config); } #endif - else { - throw std::runtime_error(string("Unable to match API: ") + - devinfo.api.apiname); - } + throw std::runtime_error(string("Unable to match Device API: ") + + devinfo.api.apiname); } Daq::Daq(const DeviceInfo &devinfo, const DaqConfiguration &config) - : DaqConfiguration(config), DeviceInfo(devinfo) { - DEBUGTRACE_ENTER; + : DaqConfiguration(config), DeviceInfo(devinfo) { + DEBUGTRACE_ENTER; - if (!hasInternalOutputMonitor && monitorOutput) { - throw std::runtime_error( - "Output monitor flag set, but device does not have output monitor"); - } + if (!hasInternalOutputMonitor && monitorOutput) { + throw std::runtime_error( + "Output monitor flag set, but device does not have output monitor"); + } - if (!config.match(devinfo)) { - throw std::runtime_error("DaqConfiguration does not match device info"); + if (!config.match(devinfo)) { + throw std::runtime_error("DaqConfiguration does not match device info"); + } + if (neninchannels(false) > ninchannels) { + throw std::runtime_error( + "Number of enabled input channels is higher than device capability"); + } + if (nenoutchannels() > noutchannels) { + throw std::runtime_error( + "Number of enabled output channels is higher than device capability"); + } } - if(neninchannels(false) > ninchannels) { - throw std::runtime_error("Number of enabled input channels is higher than device capability"); - } - if(nenoutchannels() > noutchannels) { - throw std::runtime_error("Number of enabled output channels is higher than device capability"); - } - -} double Daq::samplerate() const { return availableSampleRates.at(sampleRateIndex); diff --git a/src/lasp/device/lasp_daqconfig.h b/src/lasp/device/lasp_daqconfig.h index cd922e6..68960e7 100644 --- a/src/lasp/device/lasp_daqconfig.h +++ b/src/lasp/device/lasp_daqconfig.h @@ -107,24 +107,25 @@ public: }; #if LASP_HAS_ULDAQ == 1 +const us LASP_ULDAQ_APICODE = 0; const DaqApi uldaqapi("UlDaq", 0); #endif #if LASP_HAS_RTAUDIO == 1 #include +const us LASP_RTAUDIO_APICODE = 1; const DaqApi rtaudioAlsaApi("RtAudio Linux ALSA", 1, RtAudio::Api::LINUX_ALSA); -const DaqApi rtaudioPulseaudioApi("RtAudio Linux Pulseaudio", 2, +const DaqApi rtaudioPulseaudioApi("RtAudio Linux Pulseaudio", 1, RtAudio::Api::LINUX_PULSE); -const DaqApi rtaudioWasapiApi("RtAudio Windows Wasapi", 3, +const DaqApi rtaudioWasapiApi("RtAudio Windows Wasapi", 1, RtAudio::Api::WINDOWS_WASAPI); -const DaqApi rtaudioDsApi("RtAudio Windows DirectSound", 4, +const DaqApi rtaudioDsApi("RtAudio Windows DirectSound", 1, RtAudio::Api::WINDOWS_DS); -const DaqApi rtaudioAsioApi("RtAudio Windows ASIO", 5, +const DaqApi rtaudioAsioApi("RtAudio Windows ASIO", 1, RtAudio::Api::WINDOWS_ASIO); #endif class DeviceInfo; - /** * @brief Stores channel configuration data for each channel. */ @@ -177,7 +178,6 @@ public: * output. */ double digitalHighPassCutOn = -1; - }; /** @@ -293,6 +293,4 @@ public: * enabled. */ int getLowestOutChannel() const; - - }; diff --git a/src/lasp/device/lasp_daqdata.h b/src/lasp/device/lasp_daqdata.h index 7ee1567..317db52 100644 --- a/src/lasp/device/lasp_daqdata.h +++ b/src/lasp/device/lasp_daqdata.h @@ -20,27 +20,27 @@ public: /** * @brief The number of channels */ - const us nchannels; + us nchannels; /** * @brief The number of frames in this block of data. */ - const us nframes; + us nframes; /** * @brief The data type corresponding to a sample */ - const DataTypeDescriptor::DataType dtype; + DataTypeDescriptor::DataType dtype; /** * @brief The data type description corresponding to a sample */ - const DataTypeDescriptor &dtype_descr; + DataTypeDescriptor dtype_descr; /** * @brief The number of bytes per sample (sample width, sw) */ - const us sw; + us sw; /** * @brief Initialize an empty frame of data @@ -51,7 +51,12 @@ public: */ DaqData(const us nchannels, const us nframes, const DataTypeDescriptor::DataType dtype); + /** + * @brief Initialize using no allocation + */ + DaqData() : DaqData(0, 0, DataTypeDescriptor::DataType::dtype_int8) {} virtual ~DaqData() = default; + DaqData& operator=(const DaqData&) = default; /** * @brief Return pointer to the raw data corresponding to a certain sample. diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index cd3d7ab..ba03437 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -1,21 +1,50 @@ #include "lasp_streammgr.h" +#include "debugtrace.hpp" #include #include #include -#include "debugtrace.hpp" +#include + +using std::cerr; +using std::endl; InDataHandler::InDataHandler(StreamMgr &mgr) : _mgr(mgr) { - mgr.addInDataHandler(*this); + DEBUGTRACE_ENTER; +} +void InDataHandler::start() { + DEBUGTRACE_ENTER; + _mgr.addInDataHandler(*this); +} +void InDataHandler::stop() { +#if LASP_DEBUG == 1 + stopCalled = true; +#endif + _mgr.removeInDataHandler(*this); +} +InDataHandler::~InDataHandler() { + + DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + if (!stopCalled) { + cerr << "************ BUG: Stop function not called while arriving at " + "InDataHandler's destructor. Fix this by calling " + "InDataHandler::stop() from the derived class' destructor." + << endl; + } +#endif } -InDataHandler::~InDataHandler() { _mgr.removeInDataHandler(*this); } StreamMgr &StreamMgr::getInstance() { + + DEBUGTRACE_ENTER; static StreamMgr mgr; return mgr; } -StreamMgr::StreamMgr() {} +StreamMgr::StreamMgr() { DEBUGTRACE_ENTER; } bool StreamMgr::inCallback(const DaqData &data) { + + /* DEBUGTRACE_ENTER; */ std::scoped_lock lck(_inDataHandler_mtx); for (auto &handler : _inDataHandlers) { @@ -50,7 +79,7 @@ template bool fillData(DaqData &data, const vd &signal) { for (us ch = 0; ch < data.nchannels; ch++) { for (us frame = 0; frame < data.nframes; frame++) { res[ch * data.nframes + frame] = - (signal[frame] * std::numeric_limits::max()); + (signal[frame] * std::numeric_limits::max()); } } } @@ -81,21 +110,21 @@ bool StreamMgr::outCallback(DaqData &data) { if (_siggen) { vd signal = _siggen->genSignal(data.nframes); switch (data.dtype) { - case (DataTypeDescriptor::DataType::dtype_fl32): - fillData(data, signal); - break; - case (DataTypeDescriptor::DataType::dtype_fl64): - fillData(data, signal); - break; - case (DataTypeDescriptor::DataType::dtype_int8): - fillData(data, signal); - break; - case (DataTypeDescriptor::DataType::dtype_int16): - fillData(data, signal); - break; - case (DataTypeDescriptor::DataType::dtype_int32): - fillData(data, signal); - break; + case (DataTypeDescriptor::DataType::dtype_fl32): + fillData(data, signal); + break; + case (DataTypeDescriptor::DataType::dtype_fl64): + fillData(data, signal); + break; + case (DataTypeDescriptor::DataType::dtype_int8): + fillData(data, signal); + break; + case (DataTypeDescriptor::DataType::dtype_int16): + fillData(data, signal); + break; + case (DataTypeDescriptor::DataType::dtype_int32): + fillData(data, signal); + break; } } else { // Set all values to 0. @@ -104,37 +133,41 @@ bool StreamMgr::outCallback(DaqData &data) { return true; } -StreamMgr::~StreamMgr() { stopAllStreams(); } +StreamMgr::~StreamMgr() { + DEBUGTRACE_ENTER; + stopAllStreams(); +} void StreamMgr::stopAllStreams() { _inputStream.reset(); _outputStream.reset(); } void StreamMgr::startStream(const DeviceInfo &devinfo, - const DaqConfiguration &config) { + const DaqConfiguration &config) { bool isInput = std::count_if(config.inchannel_config.cbegin(), - config.inchannel_config.cend(), - [](auto &i) { return i.enabled; }); + config.inchannel_config.cend(), + [](auto &i) { return i.enabled; }); isInput |= config.monitorOutput && devinfo.hasInternalOutputMonitor; bool isOutput = std::count_if(config.outchannel_config.cbegin(), - config.outchannel_config.cend(), - [](auto &i) { return i.enabled; }); + config.outchannel_config.cend(), + [](auto &i) { return i.enabled; }); bool isDuplex = isInput && isOutput; if (!isInput && !isOutput) { - throw std::runtime_error( - "Neither input, nor output channels enabled for stream. Cannotr start."); + throw std::runtime_error("Neither input, nor output channels enabled for " + "stream. Cannotr start."); } if ((isDuplex || isInput) && _inputStream) { - throw std::runtime_error("Error: an input stream is already running. Please " + throw std::runtime_error( + "Error: an input stream is already running. Please " "first stop existing stream"); } else if (isOutput && _outputStream) { throw std::runtime_error("Error: output stream is already running. Please " - "first stop existing stream"); + "first stop existing stream"); } InDaqCallback inCallback; @@ -143,12 +176,12 @@ void StreamMgr::startStream(const DeviceInfo &devinfo, using namespace std::placeholders; std::unique_ptr daq = Daq::createDaq(devinfo, config); - if(isInput) { - inCallback = std::bind(&StreamMgr::inCallback, this, _1); + if (isInput) { + inCallback = std::bind(&StreamMgr::inCallback, this, _1); } - if(isOutput) { - if(_siggen) { + if (isOutput) { + if (_siggen) { _siggen->reset(daq->samplerate()); } outCallback = std::bind(&StreamMgr::outCallback, this, _1); @@ -158,13 +191,12 @@ void StreamMgr::startStream(const DeviceInfo &devinfo, daq->start(inCallback, outCallback); - if(isInput) { + if (isInput) { _inputStream = std::move(daq); } else { assert(isOutput); _outputStream = std::move(daq); } - } void StreamMgr::addInDataHandler(InDataHandler &handler) { diff --git a/src/lasp/device/lasp_streammgr.h b/src/lasp/device/lasp_streammgr.h index 450f046..1ddab52 100644 --- a/src/lasp/device/lasp_streammgr.h +++ b/src/lasp/device/lasp_streammgr.h @@ -11,13 +11,16 @@ class InDataHandler { protected: StreamMgr &_mgr; +#if LASP_DEBUG == 1 + std::atomic stopCalled {false}; +#endif public: virtual ~InDataHandler(); /** * @brief When constructed, the handler is added to the stream manager, which - * will call the handlers's inCallback() until the point of destruction. + * will call the handlers's inCallback() until stop() is called. * * @param mgr Stream manager. */ @@ -31,6 +34,24 @@ public: * @return true if no error. False to stop the stream from running. */ virtual bool inCallback(const DaqData &daqdata) = 0; + + /** + * @brief This function should be called from the constructor of the + * implementation of InDataHandler. It will start the stream's calling of + * inCallback(). + */ + void start(); + + /** + * @brief This function should be called from the destructor of derived + * classes, to disable the calls to inCallback(), such that proper + * destruction of the object is allowed and no other threads call methods + * from the object. It removes the inCallback() from the callback list of the + * StreamMgr(). **Failing to call this function results in deadlocks, errors + * like "pure virtual function called", or other**. + */ + void stop(); + }; /** @@ -40,17 +61,6 @@ public: */ class StreamMgr { - enum class StreamType : us { - /** - * @brief Input stream - */ - input = 1 << 0, - /** - * @brief Output stream - */ - output = 1 << 1, - }; - /** * @brief Storage for streams. */ @@ -80,6 +90,18 @@ class StreamMgr { ~StreamMgr(); public: + + enum class StreamType : us { + /** + * @brief Input stream + */ + input = 1 << 0, + /** + * @brief Output stream + */ + output = 1 << 1, + }; + StreamMgr(const StreamMgr &) = delete; StreamMgr &operator=(const StreamMgr &) = delete; @@ -165,5 +187,4 @@ private: * @param handler The handler to add. */ void addInDataHandler(InDataHandler &handler); - }; diff --git a/src/lasp/lasp_avstream.py b/src/lasp/lasp_avstream.py deleted file mode 100644 index e9f7fb5..0000000 --- a/src/lasp/lasp_avstream.py +++ /dev/null @@ -1,626 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Author: J.A. de Jong - -Description: Controlling an audio stream in a different process. -""" -import logging -import multiprocessing as mp -# import cv2 as cv -import signal -import time -from dataclasses import dataclass -from enum import Enum, auto, unique -from typing import List - -import numpy as np - -from .device import Daq, DaqChannel, DaqConfiguration, DeviceInfo -from .filter import highpass -from .lasp_atomic import Atomic -from .lasp_common import AvType -from .lasp_multiprocessingpatch import apply_patch -from .wrappers import SosFilterBank - -apply_patch() - -__all__ = ["StreamManager", "ignoreSigInt", "StreamStatus"] - - -def ignoreSigInt(): - """ - Ignore sigint signal. Should be set on all processes to let the main - process control this signal. - """ - signal.signal(signal.SIGINT, signal.SIG_IGN) - - -@dataclass -class StreamMetaData: - # Sample rate [Hz] - fs: float - - # Input channels - in_ch: List[DaqChannel] - - # Output channels - out_ch: List[DaqChannel] - - # blocksize - blocksize: int - - # The data type of input and output blocks. - dtype: np.dtype - - -@unique -class StreamMsg(Enum): - """ - First part, control messages that can be send to the stream - """ - - startStream = auto() - stopStream = auto() - stopAllStreams = auto() - getStreamMetaData = auto() - endProcess = auto() - scanDaqDevices = auto() - """ - Second part, status messages that are send back on all listeners - """ - # "Normal messages" - deviceList = auto() - streamStarted = auto() - streamStopped = auto() - streamMetaData = auto() - streamData = auto() - - # Error messages - # Some error occured, which mostly leads to a stop of the stream - streamError = auto() - # An error occured, but we recovered - streamTemporaryError = auto() - # A fatal error occured. This leads to serious errors in the application - streamFatalError = auto() - - -class AudioStream: - """ - Audio stream. - """ - - def __init__( - self, - avtype: AvType, - devices: list, - daqconfig: DaqConfiguration, - processCallback: callable, - ): - """ - Initializes the audio stream and tries to start it. - - avtype: AvType - devices: List of device information - daqconfig: DaqConfiguration to used to generate audio stream backend - processCallback: callback function that will be called from a different - thread, with arguments (AudioStream, in - """ - logging.debug("AudioStream()") - - # self.running = Atomic(False) - # self.aframectr = Atomic(0) - self.running = False - self.aframectr = 0 - self.avtype = avtype - - api_devices = devices[daqconfig.api] - self.processCallback = processCallback - - matching_devices = [ - device for device in api_devices if device.name == daqconfig.device_name - ] - - if len(matching_devices) == 0: - raise RuntimeError(f"Could not find device {daqconfig.device_name}") - - # TODO: We pick te first one, what to do if we have multiple matches? - # Is that even possible? - device = matching_devices[0] - - self.daq = Daq(device, daqconfig) - en_in_ch = daqconfig.getEnabledInChannels(include_monitor=True) - en_out_ch = daqconfig.getEnabledOutChannels() - - if en_in_ch == 0 and en_out_ch == 0: - raise RuntimeError("No enabled input / output channels") - elif en_out_ch == 0 and avtype in (AvType.audio_duplex, AvType.audio_output): - raise RuntimeError("No enabled output channels") - elif en_in_ch == 0 and avtype in (AvType.audio_input, AvType.audio_duplex): - raise RuntimeError("No enabled input channels") - - logging.debug("Ready to start device...") - samplerate = self.daq.start(self.streamCallback) - - # Create required Highpass filters for incoming data - self.hpfs = [None] * len(en_in_ch) - for i, ch in enumerate(en_in_ch): - # Simple filter with a single bank and one section - if ch.highpass > 0: - fb = SosFilterBank(1, 1) - hpf = highpass(samplerate, ch.highpass, Q=np.sqrt(2) / 2) - fb.setFilter(0, hpf[None, :]) - self.hpfs[i] = fb - - self.streammetadata = StreamMetaData( - fs=samplerate, - in_ch=en_in_ch, - out_ch=en_out_ch, - blocksize=self.daq.nFramesPerBlock, - dtype=self.daq.getNumpyDataType(), - ) - self.running = True - - def streamCallback(self, indata, outdata, nframes): - """ - This is called (from a separate thread) for each block - of audio data. - """ - if not self.running: - return 1 - self.aframectr += 1 - - # TODO: Fix this. This gives bug on Windows, the threading lock does - # give a strange erro. - try: - if not self.running: - return 1 - except Exception as e: - print(e) - - if indata is not None: - indata_filtered = np.empty_like(indata) - nchannels = indata.shape[1] - - for i in range(nchannels): - # Filter each channel to the optional high-pass, which could also - # be an empty filter - if self.hpfs[i] is not None: - indata_float = indata[:, [i]].astype(np.float) - filtered_ch_float = self.hpfs[i].filter_(indata_float) - - indata_filtered[:, i] = filtered_ch_float.astype( - self.streammetadata.dtype - )[:, 0] - else: - # One-to-one copy - indata_filtered[:, i] = indata[:, i] - else: - indata_filtered = indata - - # rv = self.processCallback(self, indata, outdata) - rv = self.processCallback(self, indata_filtered, outdata) - if rv != 0: - self.running <<= False - return rv - - def stop(self): - """ - Stop the DAQ stream. Should be called only once. - """ - daq = self.daq - self.daq = None - self.running <<= False - daq.stop() - - self.streammetadata = None - - -class AvStreamProcess(mp.Process): - """ - Different process on which all audio streams are running. - """ - - def __init__(self, pipe, msg_qlist, indata_qlist, outq): - """ - - Args: - pipe: Message control pipe on which commands are received. - msg_qlist: List of queues on which stream status and events are - sent. Here, everything is send, except for the captured data - itself. - indata_qlist: List of queues on which captured data from a DAQ is - send. This one gets all events, but also captured data. - outq: On this queue, the stream process receives data to be send as - output to the devices. - - """ - super().__init__() - - self.pipe = pipe - self.msg_qlist = msg_qlist - self.indata_qlist = indata_qlist - self.outq = outq - - self.devices = {} - self.daqconfigs = None - - # In, out, duplex - self.streams = {t: None for t in list(AvType)} - - # When this is set, a kill on the main process will also kill the - # siggen process. Highly wanted feature - self.daemon = True - - def run(self): - """ - The actual function running in a different process. - """ - # First things first, ignore interrupt signals for THIS process - # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing - signal.signal(signal.SIGINT, signal.SIG_IGN) - - # Check for devices - self.rescanDaqDevices() - - while True: - try: - msg, data = self.pipe.recv() - except OSError: - logging.error("Error with pipe, terminating process") - self.stopAllStreams() - self.terminate() - logging.debug(f"Streamprocess obtained message {msg}") - - if msg == StreamMsg.scanDaqDevices: - self.rescanDaqDevices() - - elif msg == StreamMsg.stopAllStreams: - self.stopAllStreams() - - elif msg == StreamMsg.endProcess: - self.stopAllStreams() - # and.. exit! - return - - elif msg == StreamMsg.getStreamMetaData: - (avtype,) = data - stream = self.streams[avtype] - if stream is not None: - self.sendAllQueues( - StreamMsg.streamMetaData, avtype, stream.streammetadata - ) - else: - self.sendAllQueues(StreamMsg.streamMetaData, avtype, None) - - elif msg == StreamMsg.startStream: - avtype, daqconfig = data - self.startStream(avtype, daqconfig) - - elif msg == StreamMsg.stopStream: - (avtype,) = data - self.stopStream(avtype) - - def startStream(self, avtype: AvType, daqconfig: DaqConfiguration): - """ - Start a stream, based on type and configuration - - """ - self.stopRequiredExistingStreams(avtype) - # Empty the queue from existing stuff (puts the signal generator - # directly in action!). - if avtype in (AvType.audio_duplex, AvType.audio_output): - while not self.outq.empty(): - self.outq.get() - try: - stream = AudioStream(avtype, self.devices, daqconfig, self.streamCallback) - self.streams[avtype] = stream - self.sendAllQueues(StreamMsg.streamStarted, avtype, stream.streammetadata) - - except Exception as e: - self.sendAllQueues( - StreamMsg.streamError, avtype, f"Error starting stream: {str(e)}" - ) - return - - def stopStream(self, avtype: AvType): - """ - Stop an existing stream, and sets the attribute in the list of streams - to None - - Args: - stream: AudioStream instance - """ - stream = self.streams[avtype] - if stream is not None: - try: - stream.stop() - self.sendAllQueues(StreamMsg.streamStopped, stream.avtype) - except Exception as e: - self.sendAllQueues( - StreamMsg.streamError, - stream.avtype, - f"Error occured in stopping stream: {str(e)}", - ) - self.streams[avtype] = None - - def stopRequiredExistingStreams(self, avtype: AvType): - """ - Stop all existing streams that conflict with the current avtype - """ - if avtype == AvType.audio_input: - # For a new input, duplex and input needs to be stopped - stream_to_stop = (AvType.audio_input, AvType.audio_duplex) - elif avtype == AvType.audio_output: - # For a new output, duplex and output needs to be stopped - stream_to_stop = (AvType.audio_output, AvType.audio_duplex) - elif avtype == AvType.audio_duplex: - # All others have to stop - stream_to_stop = list(AvType) # All of them - else: - raise ValueError("BUG") - - for stream in stream_to_stop: - if stream is not None: - self.stopStream(stream) - - def stopAllStreams(self): - """ - Stops all streams - """ - for key in self.streams.keys(): - self.stopStream(key) - - def isStreamRunning(self, avtype: AvType = None): - """ - Check whether a stream is running - - Args: - avtype: The stream type to check whether it is still running. If - None, it checks all streams. - - Returns: - True if a stream is running, otherwise false - """ - if avtype is None: - avtype = list(AvType) - else: - avtype = (avtype,) - for t in avtype: - if self.streams[t] is not None and self.streams[t].running(): - return True - - return False - - def rescanDaqDevices(self): - """ - Rescan the available DaQ devices. - - """ - if self.isStreamRunning(): - self.sendAllQueues( - StreamMsg.streamError, - None, - "A stream is running, cannot rescan DAQ devices.", - ) - return - - self.devices = Daq.getDeviceInfo() - self.sendAllQueues(StreamMsg.deviceList, self.devices) - - def streamCallback(self, audiostream, indata, outdata): - """This is called (from a separate thread) for each audio block.""" - # logging.debug('streamCallback()') - if outdata is not None: - if not self.outq.empty(): - newdata = self.outq.get() - if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1: - msgtxt = "Invalid output data obtained from queue" - logging.fatal(msgtxt) - self.sendAllQueues( - StreamMsg.streamFatalError, audiostream.avtype, msgtxt - ) - return 1 - outdata[:, :] = newdata[:, None] - else: - msgtxt = "Signal generator buffer underflow. Signal generator cannot keep up with data generation." - # logging.error(msgtxt) - self.sendAllQueues( - StreamMsg.streamTemporaryError, audiostream.avtype, msgtxt - ) - outdata[:, :] = 0 - - if indata is not None: - self.sendInQueues(StreamMsg.streamData, indata) - - return 0 - - # Wrapper functions that safe some typing, they do not require an - # explanation. - def sendInQueues(self, msg, *data): - # logging.debug('sendInQueues()') - try: - for q in self.indata_qlist: - # Fan out the input data to all queues in the queue list - q.put((msg, data)) - except ValueError: - logging.error("Error with data queue, terminating process") - self.stopAllStreams() - self.terminate() - - def sendAllQueues(self, msg, *data): - """ - Destined for all queues, including capture data queues - """ - self.sendInQueues(msg, *data) - try: - for q in self.msg_qlist: - # Fan out the input data to all queues in the queue list - q.put((msg, data)) - except ValueError: - logging.error("Error with data queue, terminating process") - self.stopAllStreams() - self.terminate() - - -@dataclass -class StreamStatus: - lastStatus: StreamMsg = StreamMsg.streamStopped - errorTxt: str = None - streammetadata: StreamMetaData = None - - -class StreamManager: - """ - Audio and video data stream manager, to which queus can be added - """ - - def __init__(self): - """Open a stream for audio in/output and video input. For audio output,""" - - # Initialize streamstatus - self.streamstatus = {t: StreamStatus() for t in list(AvType)} - - self.devices = None - - # Multiprocessing manager, pipe, output queue, input queue, - self.manager = mp.managers.SyncManager() - - # Start this manager and ignore interrupts - # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing - self.manager.start(ignoreSigInt) - - # List of queues for all entities that require 'microphone' or input - # data. We need a local list, to manage listener queues, as the queues - # which are in the manager list get a new object id. The local list is - # used to find the index in the manager queues list upon deletion by - # 'removeListener()' - self.indata_qlist = self.manager.list([]) - self.indata_qlist_local = [] - - self.msg_qlist = self.manager.list([]) - self.msg_qlist_local = [] - - # Queue used for signal generator data - self.outq = self.manager.Queue() - - # Messaging pipe - self.pipe, child_pipe = mp.Pipe(duplex=True) - - # This is the queue on which this class listens for stream process - # messages. - self.our_msgqueue = self.addMsgQueueListener() - - # Create the stream process - self.streamProcess = AvStreamProcess( - child_pipe, self.msg_qlist, self.indata_qlist, self.outq - ) - self.streamProcess.start() - - def scanDaqDevices(self): - """ - Output the message to the stream process to rescan the list of devices - """ - self.sendPipe(StreamMsg.scanDaqDevices, None) - - def getStreamStatus(self, avtype: AvType): - """ - Sends a request for the stream status over the pipe, for given AvType - """ - self.sendPipe(StreamMsg.getStreamMetaData, avtype) - - def getOutputQueue(self): - """ - Returns the output queue object. - - Note, should (of course) only be used by one signal generator at the time! - """ - return self.outq - - def addMsgQueueListener(self): - """ - Add a listener queue to the list of message queues, and return the - queue. - - Returns: - listener queue - """ - newqueue = self.manager.Queue() - self.msg_qlist.append(newqueue) - self.msg_qlist_local.append(newqueue) - return newqueue - - def removeMsgQueueListener(self, queue): - """ - Remove an input listener queue from the message queue list. - """ - # Uses a local queue list to find the index, based on the queue - idx = self.msg_qlist_local.index(queue) - del self.msg_qlist_local[idx] - del self.msg_qlist[idx] - - def addInQueueListener(self): - """ - Add a listener queue to the list of queues, and return the queue. - - Returns: - listener queue - """ - newqueue = self.manager.Queue() - self.indata_qlist.append(newqueue) - self.indata_qlist_local.append(newqueue) - return newqueue - - def removeInQueueListener(self, queue): - """ - Remove an input listener queue from the queue list. - """ - # Uses a local queue list to find the index, based on the queue - idx = self.indata_qlist_local.index(queue) - del self.indata_qlist[idx] - del self.indata_qlist_local[idx] - - def startStream(self, avtype: AvType, daqconfig: DaqConfiguration): - """ - Start the stream, which means the callbacks are called with stream - data (audio/video) - - Args: - wait: Wait until the stream starts talking before returning from - this function. - - """ - logging.debug("Starting stream...") - self.sendPipe(StreamMsg.startStream, avtype, daqconfig) - - def stopStream(self, avtype: AvType): - logging.debug(f"StreamManager::stopStream({avtype})") - self.sendPipe(StreamMsg.stopStream, avtype) - - def stopAllStreams(self): - self.sendPipe(StreamMsg.stopAllStreams) - - def cleanup(self): - """ - Stops the stream if it is still running, and after that, it stops the - stream process. - - This method SHOULD always be called before removing a AvStream object. - Otherwise things will wait forever... - - """ - self.sendPipe(StreamMsg.endProcess, None) - logging.debug("Joining stream process...") - self.streamProcess.join() - logging.debug("Joining stream process done") - - def hasVideo(self): - """ - Stub, TODO: for future - """ - return False - - def sendPipe(self, msg, *data): - """ - Send a message with data over the control pipe - """ - self.pipe.send((msg, data)) diff --git a/src/lasp/lasp_cpp.cpp b/src/lasp/lasp_cpp.cpp index 25d7609..3f16f99 100644 --- a/src/lasp/lasp_cpp.cpp +++ b/src/lasp/lasp_cpp.cpp @@ -6,6 +6,7 @@ void init_deviceinfo(py::module& m); void init_daqconfiguration(py::module& m); void init_daq(py::module& m); void init_streammgr(py::module& m); +void init_datahandler(py::module& m); PYBIND11_MODULE(lasp_cpp, m) { @@ -15,5 +16,6 @@ PYBIND11_MODULE(lasp_cpp, m) { init_daqconfiguration(m); init_daq(m); init_streammgr(m); + init_datahandler(m); } diff --git a/src/lasp/lasp_measurement.py b/src/lasp/lasp_measurement.py index fb7dd52..f470fdf 100644 --- a/src/lasp/lasp_measurement.py +++ b/src/lasp/lasp_measurement.py @@ -46,8 +46,7 @@ from .lasp_config import LASP_NUMPY_FLOAT_TYPE from scipy.io import wavfile import os, time, wave, logging from .lasp_common import SIQtys, Qty, getFreq -from .device import DaqChannel -from .wrappers import AvPowerSpectra, Window, PowerSpectra +from .lasp_cpp import Window, DaqChannel def getSampWidth(dtype): diff --git a/src/lasp/lasp_multiprocessingpatch.py b/src/lasp/lasp_multiprocessingpatch.py deleted file mode 100644 index fcd8e79..0000000 --- a/src/lasp/lasp_multiprocessingpatch.py +++ /dev/null @@ -1,57 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Author: J.A. de Jong - -Description: MonkeyPatch required to let the Multiprocessing library work properly. -Should be applied prior to running any other multiprocessing code. Comes from -Stackoverflow and is mainly used for managing a list of queues that can be -shared between processes. - -For more information, see: -https://stackoverflow.com/questions/46779860/multiprocessing-managers-and-custom-classes -""" -from multiprocessing import managers -import logging -from functools import wraps -from inspect import signature - -orig_AutoProxy = managers.AutoProxy - -__all__ = ['apply_patch'] - - -@wraps(managers.AutoProxy) -def AutoProxy(*args, incref=True, manager_owned=False, **kwargs): - # Create the autoproxy without the manager_owned flag, then - # update the flag on the generated instance. If the manager_owned flag - # is set, `incref` is disabled, so set it to False here for the same - # result. - autoproxy_incref = False if manager_owned else incref - proxy = orig_AutoProxy(*args, incref=autoproxy_incref, **kwargs) - proxy._owned_by_manager = manager_owned - return proxy - - -def apply_patch(): - if "manager_owned" in signature(managers.AutoProxy).parameters: - return - - logging.debug("Patching multiprocessing.managers.AutoProxy to add manager_owned") - managers.AutoProxy = AutoProxy - - # re-register any types already registered to SyncManager without a custom - # proxy type, as otherwise these would all be using the old unpatched AutoProxy - SyncManager = managers.SyncManager - registry = managers.SyncManager._registry - for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items(): - if proxytype is not orig_AutoProxy: - continue - create_method = hasattr(managers.SyncManager, typeid) - SyncManager.register( - typeid, - callable=callable, - exposed=exposed, - method_to_typeid=method_to_typeid, - create_method=create_method, - ) - diff --git a/src/lasp/lasp_record.py b/src/lasp/lasp_record.py index ff99b3e..972afec 100644 --- a/src/lasp/lasp_record.py +++ b/src/lasp/lasp_record.py @@ -3,25 +3,29 @@ """ Read data from stream and record sound and video at the same time """ -import dataclasses, logging, os, time, h5py -from .lasp_avstream import StreamManager, StreamMetaData, StreamMsg -from .lasp_common import AvType +import dataclasses +import logging +import os +import time +import h5py +from .lasp_cpp import InDataHandler, StreamMgr @dataclasses.dataclass class RecordStatus: - curT: float - done: bool + curT: float = 0 + done: bool = False class Recording: """ Class used to perform a recording. """ - def __init__(self, fn: str, streammgr: StreamManager, + + def __init__(self, fn: str, streammgr: StreamMgr, rectime: float = None, wait: bool = True, progressCallback=None, - startDelay: float=0): + startDelay: float = 0): """ Start a recording. Blocks if wait is set to True. @@ -44,68 +48,104 @@ class Recording: self.smgr = streammgr self.metadata = None - assert startDelay >= 0 + if startDelay < 0: + raise RuntimeError("Invalid start delay value. Should be >= 0") + self.startDelay = startDelay + # Flag used to indicate that we have passed the start delay self.startDelay_passed = False - self.rectime = rectime + + # The amount of seconds (float) that is to be recorded + self.rectime = rectime + + # The file name to store data to self.fn = fn - self.video_frame_positions = [] self.curT_rounded_to_seconds = 0 # Counter of the number of blocks self.ablockno = 0 - self.vframeno = 0 self.progressCallback = progressCallback - self.wait = wait + # Open the file self.f = h5py.File(self.fn, 'w') + f = self.f # This flag is used to delete the file on finish(), and can be used # when a recording is canceled. self.deleteFile = False - try: - # Input queue - self.inq = streammgr.addInQueueListener() - - except RuntimeError: - # Cleanup stuff, something is going wrong when starting the stream - try: - self.f.close() - except Exception as e: - logging.error( - 'Error preliminary closing measurement file {fn}: {str(e)}') - - self.__deleteFile() - raise - # Try to obtain stream metadata - streammgr.getStreamStatus(AvType.audio_input) - streammgr.getStreamStatus(AvType.audio_duplex) + streamstatus = streammgr.getStreamStatus(StreamMgr.StreamType.input) + if not streamstatus.runningOK(): + raise RuntimeError( + "Stream is not running properly. Please first start the stream") self.ad = None logging.debug('Starting record....') - # TODO: Fix this later when we want video - # if stream.hasVideo(): - # stream.addCallback(self.aCallback, AvType.audio_input) + self.stop = False - if self.wait: + self.indh = InDataHandler(streammgr, self.inCallback) + + if wait: logging.debug('Stop recording with CTRL-C') try: while not self.stop: - self.handleQueue() time.sleep(0.01) except KeyboardInterrupt: logging.debug("Keyboard interrupt on record") finally: self.finish() - def handleQueue(self): + def firstFrames(self, data): + """ + When the first frames arrive, we setup the file and add all metadata, + and make ready for storing data. + """ + daq = self.smgr.getDaq(StreamMgr.StreamType.input) + dtype = daq.dtype() + in_ch = daq.inchannel_config + blocksize = daq.framesPerBlock() + self.blocksize = blocksize + + f = self.f + + # Set the bunch of attributes + f.attrs['samplerate'] = daq.samplerate() + f.attrs['nchannels'] = daq.neninchannels() + f.attrs['blocksize'] = blocksize + f.attrs['sensitivity'] = [ch.sensitivity for ch in in_ch] + f.attrs['channelNames'] = [ch.name for ch in in_ch] + + # Add the start delay here, as firstFrames() is called right after the + # constructor is called. + f.attrs['time'] = time.time() + self.startDelay + + nchannels = len(in_ch) + self.ad = f.create_dataset('audio', + (1, blocksize, nchannels), + dtype=data.dtype, + maxshape=( + None, # This means, we can add blocks + # indefinitely + blocksize, + nchannels), + compression='gzip' + ) + self.fs = daq.samplerate() + + # Measured physical quantity metadata + # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch] + + # In V2, we do not store JSON metadata anymore, but just an enumeration + # index to a physical quantity. + f.attrs['qtys_idx'] = [ch.qty for ch in in_ch] + + def inCallback(self, adata): """ This method should be called to grab data from the input queue, which is filled by the stream, and put it into a file. It should be called at @@ -116,92 +156,11 @@ class Recording: """ - # logging.debug('handleQueue()') - while self.inq.qsize() > 0: - msg, data = self.inq.get() - # logging.debug(f'Obtained message: {msg}') - if msg == StreamMsg.streamData: - samples, = data - self.__addTimeData(samples) - elif msg == StreamMsg.streamStarted: - logging.debug(f'handleQueue obtained message {msg}') - avtype, metadata = data - if metadata is None: - raise RuntimeError('BUG: no stream metadata') - if avtype in (AvType.audio_duplex, AvType.audio_input): - self.processStreamMetaData(metadata) - elif msg == StreamMsg.streamMetaData: - logging.debug(f'handleQueue obtained message {msg}') - avtype, metadata = data - if metadata is not None: - self.processStreamMetaData(metadata) - elif msg == StreamMsg.streamTemporaryError: - pass - else: - logging.debug(f'handleQueue obtained message {msg}') - # An error occured, we do not remove the file, but we stop. - self.stop = True - logging.debug(f'Stream message: {msg}. Recording stopped unexpectedly') - raise RuntimeError('Recording stopped unexpectedly') + if self.ad is None: + self.firstFrames(adata) + self.__addTimeData(adata) - def processStreamMetaData(self, md: StreamMetaData): - """ - Stream metadata has been catched. This is used to set all metadata in - the measurement file - - """ - logging.debug('Recording::processStreamMetaData()') - if self.metadata is not None: - # Metadata already obtained. We check whether the new metadata is - # compatible. Otherwise an error occurs - if md != self.metadata: - raise RuntimeError('BUG: Incompatible stream metadata!') - return - - # The 'Audio' dataset as specified in lasp_measurement, where data is - # sent to. We use gzip as compression, this gives moderate a moderate - # compression to the data. - f = self.f - blocksize = md.blocksize - nchannels = len(md.in_ch) - self.ad = f.create_dataset('audio', - (1, blocksize, nchannels), - dtype=md.dtype, - maxshape=( - None, # This means, we can add blocks - # indefinitely - blocksize, - nchannels), - compression='gzip' - ) - - # TODO: This piece of code is not up-to-date and should be changed at a - # later instance once we really want to record video simultaneously - # with audio. - # if smgr.hasVideo(): - # video_x, video_y = smgr.video_x, smgr.video_y - # self.vd = f.create_dataset('video', - # (1, video_y, video_x, 3), - # dtype='uint8', - # maxshape=( - # None, video_y, video_x, 3), - # compression='gzip' - # ) - - # Set the bunch of attributes - f.attrs['samplerate'] = md.fs - f.attrs['nchannels'] = nchannels - f.attrs['blocksize'] = blocksize - f.attrs['sensitivity'] = [ch.sensitivity for ch in md.in_ch] - f.attrs['channelNames'] = [ch.channel_name for ch in md.in_ch] - f.attrs['time'] = time.time() - self.blocksize = blocksize - self.fs = md.fs - - # Measured physical quantity metadata - f.attrs['qtys'] = [ch.qty.to_json() for ch in md.in_ch] - self.metadata = md def setDelete(self, val: bool): """ @@ -219,6 +178,7 @@ class Recording: """ logging.debug('Recording::finish()') smgr = self.smgr + self.indh = None # TODO: Fix when video # if smgr.hasVideo(): @@ -259,17 +219,7 @@ class Recording: # Stop flag is raised. We stop recording here. return - # The current time that is recorded and stored into the file, without - # the new data - if not self.metadata: - # We obtained stream data, but metadata is not yet available. - # Therefore, we request it explicitly and then we return - logging.info('Requesting stream metadata') - self.smgr.getStreamStatus(AvType.audio_input) - self.smgr.getStreamStatus(AvType.audio_duplex) - return - - curT = self.ablockno*self.blocksize/self.fs + curT = self.ablockno*self.blocksize/self.fs # Increase the block counter self.ablockno += 1 @@ -280,9 +230,10 @@ class Recording: elif curT >= 0 and not self.startDelay_passed: # Start delay passed, switch the flag! self.startDelay_passed = True - # Reset the audio block counter and the time + + # Reset the audio block counter and the recording time self.ablockno = 1 - curT = 0 + curT = 0 recstatus = RecordStatus( curT=curT, @@ -310,10 +261,3 @@ class Recording: self.ad.resize(self.ablockno, axis=0) self.ad[self.ablockno-1, :, :] = indata - - # def _vCallback(self, frame, framectr): - # self.video_frame_positions.append(self.ablockno()) - # vframeno = self.vframeno - # self.vd.resize(vframeno+1, axis=0) - # self.vd[vframeno, :, :] = frame - # self.vframeno += 1 diff --git a/src/lasp/lasp_siggen.py b/src/lasp/lasp_siggen.py deleted file mode 100644 index a8cfada..0000000 --- a/src/lasp/lasp_siggen.py +++ /dev/null @@ -1,369 +0,0 @@ -#!/usr/bin/env python3.6 -# -*- coding: utf-8 -*- -""" -Author: J.A. de Jong - ASCEE - -Description: Signal generator code - -""" -import multiprocessing as mp -import dataclasses -import logging -from typing import Tuple -import numpy as np - -from .filter import PinkNoise -from .lasp_octavefilter import SosOctaveFilterBank, SosThirdOctaveFilterBank -from .filter import OctaveBankDesigner, PinkNoise, ThirdOctaveBankDesigner -from .lasp_avstream import StreamManager, ignoreSigInt -from .wrappers import Siggen as pyxSiggen, Equalizer -from enum import Enum, unique, auto - -QUEUE_BUFFER_TIME = 0.5 # The amount of time used in the queues for buffering -# of data, larger is more stable, but also enlarges latency - -__all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"] - - -@unique -class SignalType(Enum): - Periodic = 0 - Noise = 1 - Sweep = 2 - Meas = 3 - - -@unique -class NoiseType(Enum): - white = "White noise" - pink = "Pink noise" - - def __str__(self): - return str(self.value) - - @staticmethod - def fillComboBox(combo): - for type_ in list(NoiseType): - combo.addItem(str(type_)) - - @staticmethod - def getCurrent(cb): - return list(NoiseType)[cb.currentIndex()] - - -class SiggenWorkerDone(Exception): - def __str__(self): - return "Done generating signal" - - -@unique -class SiggenMessage(Enum): - """ - Different messages that can be send to the signal generator over the pipe - connection. - """ - endProcess = auto() # Stop and quit the signal generator - adjustVolume = auto() # Adjust the volume - newEqSettings = auto() # Forward new equalizer settings - newSiggenData = auto() # Forward new equalizer settings - ready = auto() # Send out once, once the signal generator is ready with - # pre-generating data. - mute = auto() # Mute / unmute siggen - - # These messages are send back to the main thread over the pipe - error = auto() - done = auto() - - -@dataclasses.dataclass -class SiggenData: - """ - Metadata used to create a Signal Generator - """ - fs: float # Sample rate [Hz] - - # Number of frames "samples" to send in one block - nframes_per_block: int - - # The data type to output - dtype: np.dtype - - # Muted? - muted: bool - - # Level of output signal [dBFS]el - level_dB: float - - # Signal type specific data, i.e. - signaltype: SignalType - signaltypedata: Tuple = None - - # Settings for the equalizer etc - eqdata: object = None # Equalizer data - -class SiggenProcess(mp.Process): - """ - Main function running in a different process, is responsible for generating - new signal data. Uses the signal queue to push new generated signal data - on. - """ - def __init__(self, siggendata, dataq, pipe): - - """ - Args: - siggendata: The signal generator data to start with. - dataq: The queue to put generated signal on - pipe: Control and status messaging pipe - """ - super().__init__() - - # When this is set, a kill on the main process will also kill the - # siggen process. Highly wanted feature - self.daemon = True - - self.dataq = dataq - self.siggendata = siggendata - self.pipe = pipe - self.eq = None - self.siggen = None - - fs = self.siggendata.fs - nframes_per_block = siggendata.nframes_per_block - self.nblocks_buffer = max( - 1, int(QUEUE_BUFFER_TIME * fs/ nframes_per_block) - ) - - def newSiggen(self, siggendata: SiggenData): - """ - Create a signal generator based on parameters specified in global - function data. - - Args: - siggendata: SiggenData. Metadata to create a new signal generator. - """ - - logging.debug('newSiggen') - - fs = siggendata.fs - nframes_per_block = siggendata.nframes_per_block - level_dB = siggendata.level_dB - signaltype = siggendata.signaltype - signaltypedata = siggendata.signaltypedata - - # Muted state - self.muted = siggendata.muted - - if signaltype == SignalType.Periodic: - freq, = signaltypedata - siggen = pyxSiggen.sineWave(fs, freq, level_dB) - elif signaltype == SignalType.Noise: - noisetype, zerodBpoint = signaltypedata - if noisetype == NoiseType.white: - sos_colorfilter = None - elif noisetype == NoiseType.pink: - sos_colorfilter = PinkNoise(fs, zerodBpoint).flatten() - else: - raise ValueError(f"Unknown noise type") - - siggen = pyxSiggen.noise(fs, level_dB, sos_colorfilter) - - elif signaltype == SignalType.Sweep: - fl, fu, Ts, Tq, sweep_flags = signaltypedata - siggen = pyxSiggen.sweep(fs, fl, fu, Ts, Tq, sweep_flags, level_dB) - - else: - raise ValueError(f"Not implemented signal type: {signaltype}") - - logging.debug('newSiggen') - return siggen - - def generate(self): - """ - Generate a single block of data and put it on the data queue - """ - signal = self.siggen.genSignal(self.siggendata.nframes_per_block) - dtype = self.siggendata.dtype - if self.eq is not None: - signal = self.eq.equalize(signal) - if np.issubdtype(dtype, np.integer): - bitdepth_fixed = dtype.itemsize * 8 - signal *= 2 ** (bitdepth_fixed - 1) - 1 - if self.muted: - # Mute it - signal *= 0 - try: - self.dataq.put(signal.astype(dtype)) - except ValueError: - # As of Python 3.8, a value error on a Queue means that the oter - # end of the process died. - logging.error("Error with data queue, terminating process") - self.terminate() - - def newEqualizer(self, eqdata): - """ - Create an equalizer object from equalizer data - - Args: - eqdata: dictionary containing equalizer data. TODO: document the - requiring fields. - """ - if eqdata is None: - return None - eq_type = eqdata['type'] - eq_levels = eqdata['levels'] - fs = self.siggendata.fs - - if eq_type == 'three': - fb = SosThirdOctaveFilterBank(fs) - elif eq_type == 'one': - fb = SosOctaveFilterBank(fs) - - eq = Equalizer(fb._fb) - if eq_levels is not None: - eq.setLevels(eq_levels) - return eq - - def sendPipe(self, msgtype, msg): - try: - self.pipe.send((msgtype, msg)) - except OSError: - logging.error("Error with pipe, terminating process") - self.terminate() - - def run(self): - # The main function of the actual process - # First things first - ignoreSigInt() - - try: - self.siggen = self.newSiggen(self.siggendata) - except Exception as e: - self.sendPipe(SiggenMessage.error, str(e)) - - try: - self.eq = self.newEqualizer(self.siggendata.eqdata) - except Exception as e: - self.sendPipe(SiggenMessage.error, str(e)) - - # Pre-generate blocks of signal data - while self.dataq.qsize() < self.nblocks_buffer: - self.generate() - - self.sendPipe(SiggenMessage.ready, None) - - while True: - # Wait here for a while, to check for messages to consume - if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 4): - try: - msg, data = self.pipe.recv() - except OSError: - logging.error("Error with pipe, terminating process") - self.terminate() - - if msg == SiggenMessage.endProcess: - logging.debug("Signal generator caught 'endProcess' message. Exiting.") - return 0 - elif msg == SiggenMessage.mute: - self.muted = data - elif msg == SiggenMessage.adjustVolume: - level_dB = data - logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") - self.siggen.setLevel(level_dB) - elif msg == SiggenMessage.newEqSettings: - eqdata = data - self.eq = self.newEqualizer(eqdata) - elif msg == SiggenMessage.newSiggenData: - siggendata = data - self.siggen = self.newSiggen(siggendata) - else: - self.pipe.send( - SiggenMessage.error, "BUG: Generator caught unknown message. Quiting" - ) - while self.dataq.qsize() < self.nblocks_buffer: - # Generate new data and put it in the queue! - try: - self.generate() - except SiggenWorkerDone: - self.sendPipe(SiggenMessage.done, None) - return 0 - - return 1 - - -class Siggen: - """ - Signal generator class, generates signal data in a different process to - unload the work in the calling thread. - """ - - def __init__(self, dataq, siggendata: SiggenData): - """""" - - self.pipe, client_end = mp.Pipe(duplex=True) - - self.stopped = False - - self.process = SiggenProcess(siggendata, dataq, client_end) - self.process.start() - - if not self.process.is_alive(): - raise RuntimeError('Unexpected signal generator exception') - - # Block waiting here for signal generator to be ready - msg, data = self.pipe.recv() - if msg == SiggenMessage.ready: - logging.debug('Signal generator ready') - elif msg == SiggenMessage.error: - e = data - raise RuntimeError(f'Signal generator exception: {str(e)}') - else: - # Done, or something - if msg == SiggenMessage.done: - self.stopped = True - - self.handle_msgs() - - def setLevel(self, new_level): - """ - Set a new signal level to the generator - - Args: - new_level: The new level in [dBFS] - """ - self.pipe.send((SiggenMessage.adjustVolume, new_level)) - - def mute(self, mute): - self.pipe.send((SiggenMessage.mute, mute)) - - def setEqData(self, eqdata): - self.pipe.send((SiggenMessage.newEqSettings, eqdata)) - - def setSiggenData(self, siggendata: SiggenData): - """ - Updates the whole signal generator, based on new signal generator data. - """ - self.pipe.send((SiggenMessage.newSiggenData, siggendata)) - - def handle_msgs(self): - while self.pipe.poll(): - msg, data = self.pipe.recv() - if msg == SiggenMessage.error: - raise RuntimeError( - f"Error in initialization of signal generator: {data}" - ) - # elif msg == SiggenMessage.done: - # self.stop() - - - def cleanup(self): - logging.debug('Siggen::stop()') - self.pipe.send((SiggenMessage.endProcess, None)) - self.pipe.close() - - logging.debug('Joining siggen process') - self.process.join() - logging.debug('Joining siggen process done') - self.process.close() - - self.process = None - diff --git a/src/lasp/pybind11/lasp_daq.cpp b/src/lasp/pybind11/lasp_daq.cpp index b213688..80578f8 100644 --- a/src/lasp/pybind11/lasp_daq.cpp +++ b/src/lasp/pybind11/lasp_daq.cpp @@ -10,11 +10,14 @@ void init_daq(py::module &m) { /// Daq py::class_ daq(m, "Daq"); + + /// Daq::StreamStatus py::class_ ss(m, "StreamStatus"); ss.def("error", &Daq::StreamStatus::error); ss.def("runningOK", &Daq::StreamStatus::runningOK); ss.def_readonly("isRunning", &Daq::StreamStatus::isRunning); + /// Daq::StreamStatus::StreamError py::enum_(ss, "StreamError") .value("noError", Daq::StreamStatus::StreamError::noError) .value("inputXRun", Daq::StreamStatus::StreamError::inputXRun) @@ -27,6 +30,7 @@ void init_daq(py::module &m) { Daq::StreamStatus::StreamError::apiSpecificError) .export_values(); + /// Daq daq.def("neninchannels", &Daq::neninchannels); daq.def("nenoutchannels", &Daq::nenoutchannels); daq.def("samplerate", &Daq::samplerate); diff --git a/src/lasp/pybind11/lasp_daqconfiguration.cpp b/src/lasp/pybind11/lasp_daqconfiguration.cpp index a4f8edc..add2114 100644 --- a/src/lasp/pybind11/lasp_daqconfiguration.cpp +++ b/src/lasp/pybind11/lasp_daqconfiguration.cpp @@ -59,6 +59,7 @@ void init_daqconfiguration(py::module &m) { daqconfig.def(py::init<>()); daqconfig.def(py::init()); + daqconfig.def_readwrite("sampleRateIndex", &DaqConfiguration::sampleRateIndex); daqconfig.def_readwrite("dataTypeIndex", &DaqConfiguration::dataTypeIndex); diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp new file mode 100644 index 0000000..65f31a5 --- /dev/null +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -0,0 +1,183 @@ +#include "debugtrace.hpp" +#include "lasp_streammgr.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std::literals::chrono_literals; +using std::cerr; +using std::endl; + +const us RINGBUFFER_SIZE = 1024; + +namespace py = pybind11; + +/** + * @brief Generate a Numpy array from daqdata, does *NOT* create a copy of the + * data!. + * + * @tparam T The type of the stored sample + * @param d The daqdata to convert + * + * @return Numpy array + */ +template py::array_t getPyArray(DaqData &d) { + // https://github.com/pybind/pybind11/issues/323 + // + // When a valid object is passed as 'base', it tells pybind not to take + // ownership of the data, because 'base' will own it. In fact 'packet' will + // own it, but - psss! - , we don't tell it to pybind... Alos note that ANY + // valid object is good for this purpose, so I chose "str"... + + py::str dummyDataOwner; + /* + * Signature: + array_t(ShapeContainer shape, + StridesContainer strides, + const T *ptr = nullptr, + handle base = handle()); + */ + + return py::array_t( + py::array::ShapeContainer({d.nframes, d.nchannels}), + py::array::StridesContainer( + {sizeof(T), + sizeof(T) * d.nframes}), /* Strides (in bytes) for each index */ + (T *)d.raw_ptr(), /* Pointer to buffer */ + dummyDataOwner); +} + +/** + * @brief Wraps the InDataHandler such that it calls a Python callback with a + * buffer of sample data. + */ +class PyIndataHandler : public InDataHandler { + /** + * @brief The callback function that is called. + */ + py::function cb; + /** + * @brief The thread that is handling callbacks from the queue + */ + std::thread pythread; + boost::lockfree::spsc_queue dataqueue{RINGBUFFER_SIZE}; + std::atomic stopThread{false}; + +public: + ~PyIndataHandler() { + DEBUGTRACE_ENTER; + stop(); + stopThread = true; + pythread.join(); + } + PyIndataHandler(StreamMgr &mgr, py::function cb) + : InDataHandler(mgr), cb(cb), + pythread(&PyIndataHandler::threadfcn, this) { + + DEBUGTRACE_ENTER; + /// TODO: Note that if start() throws an exception, which means that the + /// destructor of PyIndataHandler is not called and the thread destructor + /// calls terminate(). It is a kind of rude way to crash, but it is also + /// *very* unlikely to happen, as start() does only add a reference to this + /// handler to a list in the stream mgr. + start(); + } + + /** + * @brief Reads from the + */ + void threadfcn() { + + /* DEBUGTRACE_ENTER; */ + + using DataType = DataTypeDescriptor::DataType; + + DaqData d{}; + + while (!stopThread) { + if (dataqueue.pop(d)) { + /* DEBUGTRACE_PRINT("pop() returned true"); */ + + py::gil_scoped_acquire acquire; + try { + py::array binfo; + switch (d.dtype) { + case (DataType::dtype_int8): + binfo = getPyArray(d); + break; + case (DataType::dtype_int16): + binfo = getPyArray(d); + break; + case (DataType::dtype_int32): + binfo = getPyArray(d); + break; + case (DataType::dtype_fl32): + binfo = getPyArray(d); + break; + case (DataType::dtype_fl64): + binfo = getPyArray(d); + break; + default: + throw std::runtime_error("BUG"); + } // End of switch + + py::object bool_val = cb(binfo); + bool res = bool_val.cast(); + if (!res) + stopThread = true; + + } catch (py::error_already_set &e) { + cerr << "ERROR: Python raised exception from callback function: "; + cerr << e.what() << endl; + stopThread = true; + + } catch (py::cast_error &e) { + cerr << e.what() << endl; + cerr << "ERROR: Python callback does not return boolean value." + << endl; + stopThread = true; + } + } + // When there is nothing to pop from the queue, just sleep for a some + // time. + else { + std::this_thread::sleep_for(2ms); + } + } // end of while loop + + } // end of threadfcn + + /** + * @brief Pushes a copy of the daqdata to the thread queuue and returns + * + * @param daqdata the daq info to push + * + * @return true, to continue with sampling. + */ + virtual bool inCallback(const DaqData &daqdata) override { + /* DEBUGTRACE_ENTER; */ + if (!stopThread) { + if (!dataqueue.push(daqdata)) { + stopThread = true; + cerr << "Pushing DaqData object failed. Probably the ringbuffer is " + "full. Try reducing the load" + << endl; + } + } + return true; + } +}; + +void init_datahandler(py::module &m) { + py::class_ h(m, "InDataHandler"); + + h.def(py::init()); +} diff --git a/src/lasp/pybind11/lasp_streammgr.cpp b/src/lasp/pybind11/lasp_streammgr.cpp index 9c996b8..ac18b45 100644 --- a/src/lasp/pybind11/lasp_streammgr.cpp +++ b/src/lasp/pybind11/lasp_streammgr.cpp @@ -1,5 +1,6 @@ #include "lasp_streammgr.h" #include +#include #include #include @@ -11,6 +12,11 @@ void init_streammgr(py::module &m) { py::class_> smgr( m, "StreamMgr"); + py::enum_(smgr, "StreamType") + .value("input", StreamMgr::StreamType::input) + .value("output", StreamMgr::StreamType::output) + .export_values(); + smgr.def("startStream", &StreamMgr::startStream); smgr.def("stopStream", &StreamMgr::startStream); smgr.def_static("getInstance", []() { @@ -19,5 +25,4 @@ void init_streammgr(py::module &m) { smgr.def("stopAllStreams", &StreamMgr::stopAllStreams); smgr.def("setSiggen", &StreamMgr::setSiggen); - } diff --git a/third_party/STL-Threadsafe b/third_party/STL-Threadsafe deleted file mode 160000 index 08b2d9e..0000000 --- a/third_party/STL-Threadsafe +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 08b2d9e7f487121088a817071d1d42b2736996e9 diff --git a/third_party/lockfree b/third_party/lockfree new file mode 160000 index 0000000..fdd4d06 --- /dev/null +++ b/third_party/lockfree @@ -0,0 +1 @@ +Subproject commit fdd4d0632dd0904f6e9c656c45397fe8ef985bc9