diff --git a/CMakeLists.txt b/CMakeLists.txt index 2f229c0..7351fae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,7 +19,7 @@ option(LASP_RTAUDIO "Compile with RtAudio Daq backend" ON) option(LASP_ULDAQ "Compile with UlDaq backend" ON) option(LASP_DEBUG "Compile in debug mode" ON) option(LASP_FFTW_BACKEND "Compile with FFTW fft backend" ON) -option(LAS_FFTPACK_BACKEND "Compile with Fftpack fft backend" OFF) +option(LASP_FFTPACK_BACKEND "Compile with Fftpack fft backend" OFF) if(LASP_PARALLEL) add_definitions(-DLASP_MAX_NUM_THREADS=30) @@ -39,10 +39,11 @@ add_definitions(-DLASP_MAX_NUM_CHANNELS=80) add_definitions(-DLASP_MAX_NFFT=33554432) # 2**25 # ####################################### End of user-adjustable variables section - +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_C_STANDARD 11) # ############### Choose an fft backend here -if(LASP_FFTW_BACKEND AND LASP_FFTPACK_BACKEND) +if(((LASP_FFTW_BACKEND AND LASP_FFTPACK_BACKEND) OR ((NOT (LASP_FFTPACK_BACKEND) AND (NOT LASP_FFTW_BACKEND))))) message(FATAL_ERROR "Either FFTW or Fftpack backend should be chosen. Please disable one of them") endif() @@ -61,6 +62,7 @@ if(LASP_FLOAT STREQUAL "double") add_definitions(-DLASP_FLOAT=64) add_definitions(-DLASP_DOUBLE_PRECISION) else() + # TODO: This has not been tested for a long time. add_definitions(-DLASP_FLOAT=32) add_definitions(-DLASP_SINGLE_PRECISION) endif(LASP_FLOAT STREQUAL "double") @@ -68,6 +70,7 @@ endif(LASP_FLOAT STREQUAL "double") # ##################### END Cmake variables converted to a macro set(Python_ADDITIONAL_VERSIONS "3.8") +set(python_version_windll "38") # #################### Setting definitions and debug-specific compilation flags # General make flags @@ -77,21 +80,28 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11 -Wall -Wextra -Wno-type-limits \ if(CMAKE_SYSTEM_NAME STREQUAL "Windows") set(win32 true) + set(home $ENV{USERPROFILE}) + # set(miniconda_dir ${home}\\Miniconda3) + message("Building for Windows") include_directories( ..\\rtaudio - C:\\mingw\\include\\OpenBLAS + C:\\mingw\\mingw64\\include\\OpenBLAS + link_directories(${home}\\miniconda3\\Library\\include) ) + set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH} $miniconda_dir\\Lib\\cmake") + # include( add_definitions(-DMS_WIN64) - link_directories(C:\\mingw\\lib) - link_directories(C:\\mingw\\bin) + link_directories(C:\\mingw\\mingw64\\lib) + link_directories(C:\\mingw\\mingw64\\bin) link_directories(..\\rtaudio) - link_directories(C:\\Users\\User\\Miniconda3) + link_directories(${home}\\Miniconda3) add_definitions(-DHAS_RTAUDIO_WIN_WASAPI_API) -else() +else() # Linux compile set(win32 false) - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -std=c11 \ - -Werror=incompatible-pointer-types") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11 -Werror=incompatible-pointer-types") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") include_directories(/usr/local/include/rtaudio) include_directories(/usr/include/rtaudio) link_directories(/usr/local/lib) @@ -136,7 +146,7 @@ set(CYTHON_EXTRA_C_FLAGS "-Wno-sign-compare -Wno-cpp -Wno-implicit-fallthrough - set(CYTHON_EXTRA_CXX_FLAGS "-Wno-sign-compare -Wno-cpp -Wno-implicit-fallthrough -Wno-strict-aliasing") -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC -std=c++11 -Wall -Wextra \ +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra \ -Wno-type-limits") # Debug make flags diff --git a/README.md b/README.md index be7c5ba..cdb46df 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,14 @@ compilation: - libopenblas-base - libopenblas-dev +#### Windows specific + +Tested using a Anacond / Miniconda Python environment. Please first run the following command: + +`conda install fftw` + +in case you want the *FFTW* fft backend. + ### Dependencies #### Ubuntu / Linux Mint diff --git a/lasp/CMakeLists.txt b/lasp/CMakeLists.txt index 8a827ad..5760574 100644 --- a/lasp/CMakeLists.txt +++ b/lasp/CMakeLists.txt @@ -17,5 +17,5 @@ set_source_files_properties(wrappers.c PROPERTIES COMPILE_FLAGS "${CMAKE_C_FLAGS cython_add_module(wrappers wrappers.pyx) target_link_libraries(wrappers lasp_lib) if(win32) -target_link_libraries(wrappers python37) +target_link_libraries(wrappers python${python_version_windll}) endif(win32) diff --git a/lasp/__init__.py b/lasp/__init__.py index bb7dfc5..85eb2d1 100644 --- a/lasp/__init__.py +++ b/lasp/__init__.py @@ -1,10 +1,12 @@ -from .lasp_atomic import * -from .lasp_avstream import * from .lasp_common import * +from .lasp_avstream import * +from .wrappers import * +from .lasp_atomic import * from .lasp_imptube import * from .lasp_measurement import * from .lasp_octavefilter import * from .lasp_slm import * +from .lasp_record import * +from .lasp_siggen import * from .lasp_weighcal import * -from .wrappers import * -from .device import AvType +from .tools import * diff --git a/lasp/c/CMakeLists.txt b/lasp/c/CMakeLists.txt index a68d4ef..d1d5b3d 100644 --- a/lasp/c/CMakeLists.txt +++ b/lasp/c/CMakeLists.txt @@ -17,6 +17,7 @@ add_library(lasp_lib lasp_mq.c lasp_siggen.c lasp_worker.c + lasp_nprocs.c lasp_dfifo.c lasp_firfilterbank.c lasp_sosfilterbank.c diff --git a/lasp/c/lasp_nprocs.c b/lasp/c/lasp_nprocs.c new file mode 100644 index 0000000..706bec6 --- /dev/null +++ b/lasp/c/lasp_nprocs.c @@ -0,0 +1,18 @@ +#ifdef MS_WIN64 +#include +#else +// Used for obtaining the number of processors +#include +#endif +#include "lasp_nprocs.h" + +us getNumberOfProcs() { +#if MS_WIN64 +// https://stackoverflow.com/questions/150355/programmatically-find-the-number-of-cores-on-a-machine + SYSTEM_INFO sysinfo; + GetSystemInfo(&sysinfo); + return sysinfo.dwNumberOfProcessors; +#else + return get_nprocs(); +#endif +} \ No newline at end of file diff --git a/lasp/c/lasp_nprocs.h b/lasp/c/lasp_nprocs.h new file mode 100644 index 0000000..7ba5458 --- /dev/null +++ b/lasp/c/lasp_nprocs.h @@ -0,0 +1,18 @@ +// lasp_nprocs.h +// +// Author: J.A. de Jong - ASCEE +// +// Description: Implemententation of a function to determine the number +// of processors. +////////////////////////////////////////////////////////////////////// +#pragma once +#ifndef LASP_NPROCS_H +#define LASP_NPROCS_H +#include "lasp_types.h" + +/** + * @return The number of SMP processors + */ +us getNumberOfProcs(); + +#endif // LASP_NPROCS_H \ No newline at end of file diff --git a/lasp/c/lasp_pyarray.h b/lasp/c/lasp_pyarray.h index 8a0169a..cbe8792 100644 --- a/lasp/c/lasp_pyarray.h +++ b/lasp/c/lasp_pyarray.h @@ -22,7 +22,7 @@ * Function passed to Python to use for cleanup of * foreignly obtained data. **/ -static inline void capsule_cleanup(void *capsule) { +static inline void capsule_cleanup(PyObject *capsule) { void *memory = PyCapsule_GetPointer(capsule, NULL); free(memory); } @@ -67,8 +67,12 @@ static inline PyObject *data_to_ndarray(void *data, int n_rows, int n_cols, // https://stackoverflow.com/questions/54269956/crash-of-jupyter-due-to-the-use-of-pyarray-enableflags/54278170#54278170 // Note that in general it was disadvised to build all C code with MinGW on // Windows. We do it anyway, see if we find any problems on the way. - void *capsule = PyCapsule_New(mat->_data, NULL, capsule_cleanup); - PyArray_SetBaseObject(arr, capsule); + PyObject *capsule = PyCapsule_New(data, "data destructor", capsule_cleanup); + int res = PyArray_SetBaseObject(arr, capsule); + if(res != 0) { + fprintf(stderr, "Failed to set base object of array!"); + return NULL; + } #endif /* fprintf(stderr, "============Ownership transfer================\n"); */ PyArray_ENABLEFLAGS(arr, NPY_OWNDATA); diff --git a/lasp/c/lasp_siggen.c b/lasp/c/lasp_siggen.c index d9610b4..766af76 100644 --- a/lasp/c/lasp_siggen.c +++ b/lasp/c/lasp_siggen.c @@ -332,6 +332,13 @@ us Siggen_getN(const Siggen* siggen) { feTRACE(15); return 0; } +void Siggen_setLevel(Siggen* siggen, const d new_level_dB) { + fsTRACE(15); + + siggen->level_amp = d_pow(10, new_level_dB/20); + + feTRACE(15); +} void Siggen_free(Siggen* siggen) { fsTRACE(15); diff --git a/lasp/c/lasp_siggen.h b/lasp/c/lasp_siggen.h index d5ec290..52e7862 100644 --- a/lasp/c/lasp_siggen.h +++ b/lasp/c/lasp_siggen.h @@ -46,6 +46,14 @@ Siggen* Siggen_Sinewave_create(const d fs,const d freq,const d level_dB); */ Siggen* Siggen_Noise_create(const d fs, const d level_dB, Sosfilterbank* colorfilter); +/** + * Set the level of the signal generator + * @param[in] Siggen* Signal generator handle + * + * @param[in] new_level_dB The new level, in dBFS + */ +void Siggen_setLevel(Siggen*, const d new_level_dB); + /** * Obtain the repetition period for a periodic excitation. diff --git a/lasp/c/lasp_sosfilterbank.c b/lasp/c/lasp_sosfilterbank.c index 7e237cb..e3c652d 100644 --- a/lasp/c/lasp_sosfilterbank.c +++ b/lasp/c/lasp_sosfilterbank.c @@ -2,7 +2,8 @@ #include "lasp_sosfilterbank.h" #include "lasp_mq.h" #include "lasp_worker.h" -#include +#include "lasp_nprocs.h" + typedef struct Sosfilterbank { @@ -89,7 +90,7 @@ Sosfilterbank* Sosfilterbank_create( vd_free(&imp_response); us nthreads; - us nprocs = (us) get_nprocs(); + us nprocs = getNumberOfProcs(); if(nthreads_ == 0) { nthreads = min(max(nprocs/2,1), filterbank_size); diff --git a/lasp/device/CMakeLists.txt b/lasp/device/CMakeLists.txt index 320d528..cec4915 100644 --- a/lasp/device/CMakeLists.txt +++ b/lasp/device/CMakeLists.txt @@ -11,7 +11,7 @@ if(LASP_ULDAQ) list(PREPEND cpp_daq_linklibs uldaq) endif() if(win32) - list(APPEND cpp_daq_linklibs python) + list(APPEND cpp_daq_linklibs python${python_version_windll}) endif(win32) add_library(cpp_daq ${cpp_daq_files}) diff --git a/lasp/device/__init__.py b/lasp/device/__init__.py index 7bd944a..6fde552 100644 --- a/lasp/device/__init__.py +++ b/lasp/device/__init__.py @@ -1,6 +1,4 @@ -#!/usr/bin/python3 from .lasp_device_common import * -from .lasp_daq import * from .lasp_deviceinfo import * from .lasp_daqconfig import * from .lasp_daq import * diff --git a/lasp/device/lasp_common_decls.pxd b/lasp/device/lasp_common_decls.pxd index 144b418..8152470 100644 --- a/lasp/device/lasp_common_decls.pxd +++ b/lasp/device/lasp_common_decls.pxd @@ -87,6 +87,9 @@ cdef extern from "lasp_cppdaq.h" nogil: unsigned ninchannels unsigned noutchannels + string serialize() + + cppDeviceInfo deserialize(string) bool hasInputIEPE bool hasInputACCouplingSwitch bool hasInputTrigger diff --git a/lasp/device/lasp_cppdaq.h b/lasp/device/lasp_cppdaq.h index 29eaa44..c238c12 100644 --- a/lasp/device/lasp_cppdaq.h +++ b/lasp/device/lasp_cppdaq.h @@ -19,9 +19,12 @@ using std::cerr; using std::cout; using std::endl; +using std::getline; using std::runtime_error; using std::string; using std::vector; +using std::to_string; + typedef unsigned int us; typedef vector boolvec; @@ -47,11 +50,12 @@ const DataType dtype_invalid; const DataType dtype_fl32("32-bits floating point", 4, true); const DataType dtype_fl64("64-bits floating point", 8, true); const DataType dtype_int8("8-bits integers", 1, false); +const DataType dtype_int24("24-bits integers", 1, false); const DataType dtype_int16("16-bits integers", 2, false); const DataType dtype_int32("32-bits integers", 4, false); const std::vector dataTypes = { - dtype_int8, dtype_int16, dtype_int32, dtype_fl32, dtype_fl64, + dtype_int8, dtype_int16,dtype_int24, dtype_int32, dtype_fl32, dtype_fl64, }; class DaqApi { @@ -68,7 +72,7 @@ class DaqApi { return (apiname == other.apiname && apicode == other.apicode && api_specific_subcode == other.api_specific_subcode); } - + operator string() const { return apiname + ", code: " + to_string(apicode); } static vector getAvailableApis(); }; @@ -78,106 +82,215 @@ const DaqApi uldaqapi("UlDaq", 0); #ifdef HAS_RTAUDIO_API const DaqApi rtaudioAlsaApi("RtAudio Linux ALSA", 1, RtAudio::Api::LINUX_ALSA); const DaqApi rtaudioPulseaudioApi("RtAudio Linux Pulseaudio", 2, - RtAudio::Api::LINUX_PULSE); + RtAudio::Api::LINUX_PULSE); const DaqApi rtaudioWasapiApi("RtAudio Windows Wasapi", 3, - RtAudio::Api::WINDOWS_WASAPI); + RtAudio::Api::WINDOWS_WASAPI); const DaqApi rtaudioDsApi("RtAudio Windows DirectSound", 4, - RtAudio::Api::WINDOWS_DS); + RtAudio::Api::WINDOWS_DS); const DaqApi rtaudioAsioApi("RtAudio Windows ASIO", 5, - RtAudio::Api::WINDOWS_ASIO); + RtAudio::Api::WINDOWS_ASIO); #endif // Structure containing device info parameters class DeviceInfo { -public: - DaqApi api; - string device_name = ""; + public: + DaqApi api; + string device_name = ""; - int api_specific_devindex = -1; + int api_specific_devindex = -1; - vector availableDataTypes; - int prefDataTypeIndex = 0; + vector availableDataTypes; + int prefDataTypeIndex = 0; - vector availableSampleRates; - int prefSampleRateIndex = -1; + vector availableSampleRates; + int prefSampleRateIndex = -1; - vector availableFramesPerBlock; - unsigned prefFramesPerBlockIndex = 0; + vector availableFramesPerBlock; + unsigned prefFramesPerBlockIndex = 0; - dvec availableInputRanges; - int prefInputRangeIndex = 0; + dvec availableInputRanges; + int prefInputRangeIndex = 0; - unsigned ninchannels = 0; - unsigned noutchannels = 0; + unsigned ninchannels = 0; + unsigned noutchannels = 0; - bool hasInputIEPE = false; - bool hasInputACCouplingSwitch = false; - bool hasInputTrigger = false; + bool hasInputIEPE = false; + bool hasInputACCouplingSwitch = false; + bool hasInputTrigger = false; - /* DeviceInfo(): */ - /* datatype(dtype_invalid) { } */ + /* DeviceInfo(): */ + /* datatype(dtype_invalid) { } */ - double prefSampleRate() const { - if (((us)prefSampleRateIndex < availableSampleRates.size()) && - (prefSampleRateIndex >= 0)) { - return availableSampleRates.at(prefSampleRateIndex); - } else { - throw std::runtime_error("No prefered sample rate available"); + double prefSampleRate() const { + if (((us)prefSampleRateIndex < availableSampleRates.size()) && + (prefSampleRateIndex >= 0)) { + return availableSampleRates.at(prefSampleRateIndex); + } else { + throw std::runtime_error("No prefered sample rate available"); + } } - } - operator string() const { - std::stringstream str; - str << api.apiname + " " << api_specific_devindex - << " number of input channels: " << ninchannels - << " number of output channels: " << noutchannels; - return str.str(); - } + operator string() const { + std::stringstream str; + str << api.apiname + " " << api_specific_devindex << endl + << " number of input channels: " << ninchannels << endl + << " number of output channels: " << noutchannels << endl; + return str.str(); + } + + string serialize() const { + // Simple serializer for this object, used because we found a bit late that + // this object needs to be send over the wire. We do not want to make this + // implementation in Python, as these objects are created here, in the C++ + // code. The Python wrapper is just a readonly wrapper. + std::stringstream str; + + str << api.apiname << "\t"; + str << api.apicode << "\t"; + str << api.api_specific_subcode << "\t"; + str << device_name << "\t"; + + str << availableDataTypes.size() << "\t"; + for(const DataType& dtype: availableDataTypes) { + // WARNING: THIS GOES COMPLETELY WRONG WHEN NAMES contain A TAB!!! + str << dtype.name << "\t"; + str << dtype.sw << "\t"; + str << dtype.is_floating << "\t"; + } + str << prefDataTypeIndex << "\t"; + + str << availableSampleRates.size() << "\t"; + for(const double& fs: availableSampleRates) { + // WARNING: THIS GOES COMPLETELY WRONG WHEN NAMES contain A TAB!!! + str << fs << "\t"; + } + str << prefSampleRateIndex << "\t"; + + str << availableFramesPerBlock.size() << "\t"; + for(const us& fb: availableFramesPerBlock) { + // WARNING: THIS GOES COMPLETELY WRONG WHEN NAMES contain A TAB!!! + str << fb << "\t"; + } + str << prefFramesPerBlockIndex << "\t"; + + str << availableInputRanges.size() << "\t"; + for(const double& ir: availableInputRanges) { + // WARNING: THIS GOES COMPLETELY WRONG WHEN NAMES contain A TAB!!! + str << ir << "\t"; + } + str << prefInputRangeIndex << "\t"; + + str << ninchannels << "\t"; + str << noutchannels << "\t"; + str << int(hasInputIEPE) << "\t"; + str << int(hasInputACCouplingSwitch) << "\t"; + str << int(hasInputTrigger) << "\t"; + + return str.str(); + } + + static DeviceInfo deserialize(const string& dstr) { + DeviceInfo devinfo; + + std::stringstream str(dstr); + string tmp; + us N; + // Lambda functions for deserializing + auto nexts = [&]() { getline(str, tmp, '\t'); return tmp; }; + auto nexti = [&]() { getline(str, tmp, '\t'); return std::atoi(tmp.c_str()); }; + auto nextf = [&]() { getline(str, tmp, '\t'); return std::atof(tmp.c_str()); }; + + // Api + string apiname = nexts(); + auto apicode = nexti(); + auto api_specific_subcode = nexti(); + DaqApi api(apiname, apicode, api_specific_subcode); + devinfo.api = api; + + devinfo.device_name = nexts(); + + N = us(nexti()); + for(us i=0;i inchannel_sensitivities; - vector inchannel_names; - vector inchannel_metadata; + vector inchannel_sensitivities; + vector inchannel_names; + vector inchannel_metadata; - vector outchannel_sensitivities; - vector outchannel_names; - vector outchannel_metadata; + vector outchannel_sensitivities; + vector outchannel_names; + vector outchannel_metadata; - us sampleRateIndex = 0; // Index in list of sample rates + us sampleRateIndex = 0; // Index in list of sample rates - us dataTypeIndex = 0; // Required datatype for output, should be - // present in the list + us dataTypeIndex = 0; // Required datatype for output, should be + // present in the list - us framesPerBlockIndex = 0; + us framesPerBlockIndex = 0; - bool monitorOutput = false; + bool monitorOutput = false; - boolvec inputIEPEEnabled; - boolvec inputACCouplingMode; + boolvec inputIEPEEnabled; + boolvec inputACCouplingMode; - usvec inputRangeIndices; + usvec inputRangeIndices; - // Create a default configuration, with all channels disabled on both - // input and output, and default channel names - DaqConfiguration(const DeviceInfo &device); - DaqConfiguration() {} + // Create a default configuration, with all channels disabled on both + // input and output, and default channel names + DaqConfiguration(const DeviceInfo &device); + DaqConfiguration() {} - bool match(const DeviceInfo &devinfo) const; + bool match(const DeviceInfo &devinfo) const; - int getHighestInChannel() const; - int getHighestOutChannel() const; + int getHighestInChannel() const; + int getHighestOutChannel() const; - int getLowestInChannel() const; - int getLowestOutChannel() const; + int getLowestInChannel() const; + int getLowestOutChannel() const; }; class Daq; @@ -185,7 +298,7 @@ class Daq : public DaqConfiguration, public DeviceInfo { mutable std::mutex mutex; -public: + public: static vector getDeviceInfo(); static Daq *createDaq(const DeviceInfo &, const DaqConfiguration &config); @@ -193,7 +306,7 @@ public: Daq(const DeviceInfo &devinfo, const DaqConfiguration &config); virtual void start(SafeQueue *inqueue, - SafeQueue *outqueue) = 0; + SafeQueue *outqueue) = 0; virtual void stop() = 0; diff --git a/lasp/device/lasp_cpprtaudio.cpp b/lasp/device/lasp_cpprtaudio.cpp index d5969ff..8a77d01 100644 --- a/lasp/device/lasp_cpprtaudio.cpp +++ b/lasp/device/lasp_cpprtaudio.cpp @@ -4,6 +4,9 @@ #include #include #include +#if MS_WIN64 +typedef uint8_t u_int8_t; +#endif using std::atomic; @@ -18,7 +21,10 @@ void fillRtAudioDeviceInfo(vector &devinfolist) { for(us devno = 0; devno< count;devno++) { RtAudio::DeviceInfo devinfo = rtaudio.getDeviceInfo(devno); - + if(!devinfo.probed) { + // Device capabilities not successfully probed. Continue to next + continue; + } DeviceInfo d; switch(api){ case RtAudio::LINUX_ALSA: @@ -65,12 +71,18 @@ void fillRtAudioDeviceInfo(vector &devinfolist) { if(formats & RTAUDIO_SINT16) { d.availableDataTypes.push_back(dtype_int16); } + if(formats & RTAUDIO_SINT32) { + d.availableDataTypes.push_back(dtype_int24); + } if(formats & RTAUDIO_SINT32) { d.availableDataTypes.push_back(dtype_fl32); } if(formats & RTAUDIO_FLOAT64) { d.availableDataTypes.push_back(dtype_fl64); } + if(d.availableDataTypes.size() == 0) { + std::cerr << "RtAudio: No data types found in device!" << endl; + } d.prefDataTypeIndex = d.availableDataTypes.size() - 1; @@ -175,7 +187,7 @@ class AudioDaq: public Daq { &streamoptions, &myerrorcallback ); - } catch(...) { + } catch(RtAudioError& e) { if(rtaudio) delete rtaudio; if(instreamparams) delete instreamparams; if(outstreamparams) delete outstreamparams; @@ -287,7 +299,6 @@ int mycallback( AudioDaq* daq = (AudioDaq*) userData; DataType dtype = daq->dataType(); - /* us neninchannels = daq->neninchannels(); */ us neninchannels_inc_mon = daq->neninchannels(); us nenoutchannels = daq->nenoutchannels(); @@ -343,6 +354,7 @@ int mycallback( us j=0; // OUR buffer channel counter us i=0; // RtAudio channel counter for(us ch=0;ch<=daq->getHighestOutChannel();ch++) { + /* cerr << "Copying from queue... " << endl; */ if(enoutchannels[ch]) { memcpy( &(outputBuffer[i*bytesperchan]), @@ -351,6 +363,7 @@ int mycallback( j++; } else { + /* cerr << "unused output channel in list" << endl; */ memset( &(outputBuffer[i*bytesperchan]),0,bytesperchan); } @@ -364,7 +377,7 @@ int mycallback( } } else { - cerr << "Stream output buffer underflow, zero-ing buffer... " << endl; + cerr << "RtAudio backend: stream output buffer underflow!" << endl; } diff --git a/lasp/device/lasp_daq.pyx b/lasp/device/lasp_daq.pyx index af09b20..6557583 100644 --- a/lasp/device/lasp_daq.pyx +++ b/lasp/device/lasp_daq.pyx @@ -1,10 +1,11 @@ cimport cython +from ..lasp_common import AvType from .lasp_deviceinfo cimport DeviceInfo from .lasp_daqconfig cimport DaqConfiguration from cpython.ref cimport PyObject,Py_INCREF, Py_DECREF import numpy as np -from .lasp_device_common import AvType +import logging __all__ = ['Daq'] @@ -36,6 +37,7 @@ cdef getNumpyDataType(DataType& dt): else: raise ValueError('Unknown data type') +DEF QUEUE_BUFFER_TIME = 0.5 ctypedef struct PyStreamData: PyObject* pyCallback @@ -43,6 +45,10 @@ ctypedef struct PyStreamData: # Flag used to pass the stopThread. atomic[bool] stopThread + # Flag to indicate that the signal generator queue has been filled for the + # first time. + atomic[bool] ready + # Number of frames per block unsigned nFramesPerBlock @@ -74,46 +80,58 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: unsigned nBytesPerChan= sd.nBytesPerChan unsigned nFramesPerBlock= sd.nFramesPerBlock - double sleeptime = ( sd.nFramesPerBlock)/(4*sd.samplerate); + double sleeptime = ( sd.nFramesPerBlock)/(8*sd.samplerate); + # Sleep time in microseconds us sleeptime_us = (sleeptime*1e6); + us nblocks_buffer = max(1, (QUEUE_BUFFER_TIME * sd.samplerate / + sd.nFramesPerBlock)) + with gil: npy_format = cnp.NPY_FLOAT64 callback = sd.pyCallback # print(f'Number of input channels: {ninchannels}') # print(f'Number of out channels: {noutchannels}') - fprintf(stderr, 'Sleep time: %d us', sleeptime_us) + # fprintf(stderr, 'Sleep time: %d us\n', sleeptime_us) + + if sd.outQueue: + for i in range(nblocks_buffer): + outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) + memset(outbuffer, 0, sizeof(double)*nBytesPerChan*noutchannels) + sd.outQueue.enqueue( outbuffer) + sd.ready.store(True) while not sd.stopThread.load(): with gil: - if sd.outQueue and sd.outQueue.size() < 10: - outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) + if sd.outQueue: + while sd.outQueue.size() < nblocks_buffer: + outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) - npy_output = data_to_ndarray( - outbuffer, - nFramesPerBlock, - noutchannels, - sd.npy_format, - False, # Do not transfer ownership - True) # F-contiguous - try: - rval = callback(None, - npy_output, + npy_output = data_to_ndarray( + outbuffer, nFramesPerBlock, - ) + noutchannels, + sd.npy_format, + False, # Do not transfer ownership to the temporary + # Numpy container + True) # F-contiguous + try: + rval = callback(None, + npy_output, + nFramesPerBlock, + ) - except Exception as e: - print('exception in Cython callback for audio output: ', str(e)) - return - - sd.outQueue.enqueue( outbuffer) + except Exception as e: + logging.error('exception in Cython callback for audio output: ', str(e)) + return + sd.outQueue.enqueue( outbuffer) if sd.inQueue and not sd.inQueue.empty(): # Waiting indefinitely on the queue... inbuffer = sd.inQueue.dequeue() if inbuffer == NULL: - printf('Stopping thread...\n') + logging.debug('Stopping thread...\n') return try: @@ -130,7 +148,7 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: ) except Exception as e: - print('exception in cython callback for audio input: ', str(e)) + logging.error('exception in cython callback for audio input: ', str(e)) return CPPsleep_us(sleeptime_us); @@ -142,8 +160,6 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: # Inputbuffer memory is owned by Numpy, so should not be free'ed inbuffer = NULL - fprintf(stderr, 'Exiting python thread...\n') - cdef class Daq: def __cinit__(self, DeviceInfo pydevinfo, DaqConfiguration pydaqconfig): @@ -166,7 +182,6 @@ cdef class Daq: try: self.daq_device = cppDaq.createDaq(devinfo[0], daqconfig[0]) except Exception as e: - print(e) raise self.nFramesPerBlock = self.daq_device.framesPerBlock() self.samplerate = self.daq_device.samplerate() @@ -178,7 +193,7 @@ cdef class Daq: def __dealloc__(self): # fprintf(stderr, "UlDaq.__dealloc__\n") if self.sd is not NULL: - fprintf(stderr, "UlDaq.__dealloc__: stopping stream.\n") + logging.debug("UlDaq.__dealloc__: stopping stream.") self.stop() if self.daq_device is not NULL: @@ -238,6 +253,8 @@ cdef class Daq: self.sd.stopThread.store(False) + self.sd.ready.store(False) + self.sd.inQueue = NULL self.sd.outQueue = NULL @@ -264,12 +281,13 @@ cdef class Daq: self.sd.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, self.sd) - # Allow stream stome time to start - CPPsleep_ms(500) + while not self.sd.ready.load(): + # Allow stream stome time to start + CPPsleep_ms(100) self.daq_device.start( - self.sd.inQueue, - self.sd.outQueue) + self.sd.inQueue, + self.sd.outQueue) return self.daq_device.samplerate() @@ -304,7 +322,7 @@ cdef class Daq: free(sd.outQueue.dequeue()) del sd.outQueue sd.outQueue = NULL - fprintf(stderr, "End cleanup stream queues...\n") + logging.debug("End cleanup stream queues...\n") if sd.pyCallback: Py_DECREF( sd.pyCallback) diff --git a/lasp/device/lasp_daqconfig.pyx b/lasp/device/lasp_daqconfig.pyx index b1d382f..3c9c437 100644 --- a/lasp/device/lasp_daqconfig.pyx +++ b/lasp/device/lasp_daqconfig.pyx @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # -*- coding: utf-8 -*- """! Author: J.A. de Jong - ASCEE @@ -43,9 +42,11 @@ cdef class DaqConfigurations: output_config) @staticmethod - def loadConfigs(): + def loadAllConfigs(): """ - Returns a list of currently available configurations + Returns a dictionary of all configurations presets. The dictionary keys + are the names of the configurations + """ with lasp_shelve() as sh: configs_json = sh.load('daqconfigs', {}) @@ -54,6 +55,16 @@ cdef class DaqConfigurations: configs[name] = DaqConfigurations.from_json(val) return configs + @staticmethod + def loadConfigs(name: str): + """ + Load a configuration preset, containing input config and output config + """ + + with lasp_shelve() as sh: + configs_json = sh.load('daqconfigs', {}) + return DaqConfigurations.from_json(configs_json[name]) + def saveConfigs(self, name): with lasp_shelve() as sh: configs_json = sh.load('daqconfigs', {}) @@ -61,20 +72,25 @@ cdef class DaqConfigurations: sh.store('daqconfigs', configs_json) @staticmethod - def deleteConfig(name): + def deleteConfigs(name): with lasp_shelve() as sh: configs_json = sh.load('daqconfigs', {}) del configs_json[name] sh.store('daqconfigs', configs_json) +def constructDaqConfig(dict_data): + return DaqConfiguration.from_dict(dict_data) cdef class DaqConfiguration: """ Initialize a device descriptor """ - def __init__(self): + def __cinit__(self): pass + def __str__(self): + return str(self.to_json()) + @staticmethod def fromDeviceInfo(DeviceInfo devinfo): cdef: @@ -90,6 +106,9 @@ cdef class DaqConfiguration: config_dict = json.loads(jsonstring) return DaqConfiguration.from_dict(config_dict) + def __reduce__(self): + return (constructDaqConfig, (self.to_dict(),)) + @staticmethod def from_dict(pydict): cdef: @@ -127,8 +146,8 @@ cdef class DaqConfiguration: return pydaqcfg - def to_json(self): - return json.dumps(dict( + def to_dict(self): + return dict( apicode = self.config.api.apicode, device_name = self.config.device_name.decode('utf-8'), @@ -159,8 +178,10 @@ cdef class DaqConfiguration: inputIEPEEnabled = self.config.inputIEPEEnabled, inputACCouplingMode = self.config.inputACCouplingMode, inputRangeIndices = self.config.inputRangeIndices, + ) - )) + def to_json(self): + return json.dumps(self.to_dict()) def getInChannel(self, i:int): return DaqChannel( diff --git a/lasp/device/lasp_device_common.py b/lasp/device/lasp_device_common.py index 5cedd8e..40c9bde 100644 --- a/lasp/device/lasp_device_common.py +++ b/lasp/device/lasp_device_common.py @@ -1,17 +1,10 @@ -__all__ = ['AvType', 'DaqChannel'] +__all__ = ['DaqChannel'] from ..lasp_common import Qty, SIQtys from dataclasses import dataclass, field from dataclasses_json import dataclass_json from typing import List import json -class AvType: - """Specificying the type of data, for adding and removing callbacks from - the stream.""" - audio_input = 1 - audio_output = 2 - video = 4 - @dataclass_json @dataclass class DaqChannel: diff --git a/lasp/device/lasp_deviceinfo.pyx b/lasp/device/lasp_deviceinfo.pyx index 2ead824..95bce55 100644 --- a/lasp/device/lasp_deviceinfo.pyx +++ b/lasp/device/lasp_deviceinfo.pyx @@ -1,5 +1,19 @@ +# -*- coding: utf-8 -*- +"""! +Author: J.A. de Jong - ASCEE + +Description: + +DeviceInfo C++ object wrapper + +""" __all__ = ['DeviceInfo'] +def pickle(dat): + dev = DeviceInfo() + # print('DESERIALIZE****') + dev.devinfo = dev.devinfo.deserialize(dat) + return dev cdef class DeviceInfo: def __cinit__(self): @@ -8,6 +22,11 @@ cdef class DeviceInfo: def __init__(self): pass + def __reduce__(self): + serialized = self.devinfo.serialize() + # print('SERIALIZE****') + return (pickle, (serialized,)) + @property def api(self): return self.devinfo.api.apiname.decode('utf-8') diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index 70bfc87..03a8071 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -1,200 +1,674 @@ -#!/usr/bin/env python3.6 # -*- coding: utf-8 -*- """ -Description: Read data from image stream and record sound at the same time +Author: J.A. de Jong + +Description: Controlling an audio stream in a different process. """ -#import cv2 as cv -from .lasp_atomic import Atomic -from threading import Thread, Lock +import logging +import multiprocessing as mp +# import cv2 as cv +import signal +import time +from dataclasses import dataclass +from enum import Enum, auto, unique +from typing import List + import numpy as np -class DAQConfiguration: - pass +from .device import Daq, DaqChannel, DaqConfiguration, DeviceInfo +from .lasp_atomic import Atomic +from .lasp_common import AvType +from .lasp_multiprocessingpatch import apply_patch -import time -from .device import (Daq, DeviceInfo, - AvType, - ) - -__all__ = ['AvStream'] - -video_x, video_y = 640, 480 +apply_patch() -class AvStream: - """Audio and video data stream, to which callbacks can be added for - processing the data.""" +__all__ = ['StreamManager', 'ignoreSigInt', 'StreamStatus'] - def __init__(self, - avtype: AvType, - device: DeviceInfo, - daqconfig: DAQConfiguration, - video=None): - """Open a stream for audio in/output and video input. For audio output, - by default all available channels are opened for outputting data. - Args: - device: DeviceInfo for the audio device - avtype: Type of stream. Input, output or duplex +def ignoreSigInt(): + """ + Ignore sigint signal. Should be set on all processes to let the main + process control this signal. + """ + signal.signal(signal.SIGINT, signal.SIG_IGN) - daqconfig: DAQConfiguration instance. If duplex mode flag is set, - please make sure that output_device is None, as in that case the - output config will be taken from the input device. - video: + +@dataclass +class StreamMetaData: + # Sample rate [Hz] + fs: float + + # Input channels + in_ch: List[DaqChannel] + + # Output channels + out_ch: List[DaqChannel] + + # blocksize + blocksize: int + + # The data type of input and output blocks. + dtype: np.dtype + + +@unique +class StreamMsg(Enum): + """ + First part, control messages that can be send to the stream + """ + + startStream = auto() + stopStream = auto() + stopAllStreams = auto() + getStreamMetaData = auto() + endProcess = auto() + scanDaqDevices = auto() + + activateSiggen = auto() + deactivateSiggen = auto() + """ + Second part, status messages that are send back on all listeners + """ + # "Normal messages" + deviceList = auto() + streamStarted = auto() + streamStopped = auto() + streamMetaData = auto() + streamData = auto() + + # Error messages + # Some error occured, which mostly leads to a stop of the stream + streamError = auto() + # An error occured, but we recovered + streamTemporaryError = auto() + # A fatal error occured. This leads to serious errors in the application + streamFatalError = auto() + + +class AudioStream: + """ + Audio stream. + """ + + def __init__( + self, + avtype: AvType, + devices: list, + daqconfig: DaqConfiguration, + processCallback: callable, + ): """ + Initializes the audio stream and tries to start it. + avtype: AvType + devices: List of device information + daqconfig: DaqConfiguration to used to generate audio stream backend + processCallback: callback function that will be called from a different + thread, with arguments (AudioStream, in + """ + logging.debug('AudioStream()') + + # self.running = Atomic(False) + # self.aframectr = Atomic(0) + self.running = False + self.aframectr = 0 self.avtype = avtype + self.siggen_activated = Atomic(False) + api_devices = devices[daqconfig.api] + self.processCallback = processCallback + + matching_devices = [ + device for device in api_devices if device.name == daqconfig.device_name + ] + + if len(matching_devices) == 0: + raise RuntimeError(f"Could not find device {daqconfig.device_name}") + + # TODO: We pick te first one, what to do if we have multiple matches? + # Is that even possible? + device = matching_devices[0] + + self.daq = Daq(device, daqconfig) en_in_ch = daqconfig.getEnabledInChannels(include_monitor=True) en_out_ch = daqconfig.getEnabledOutChannels() + if en_in_ch == 0 and en_out_ch == 0: + raise RuntimeError('No enabled input / output channels') - self.input_channel_names = [ch.channel_name for ch in en_in_ch] - self.output_channel_names = [ch.channel_name for ch in en_out_ch] + logging.debug('Ready to start device...') - self.input_sensitivity = [ch.sensitivity for ch in en_in_ch] - self.input_sensitivity = np.asarray(self.input_sensitivity) - self.input_qtys = [ch.qty for ch in en_in_ch] + samplerate = self.daq.start(self.streamCallback) + self.streammetadata = StreamMetaData( + fs=samplerate, + in_ch=daqconfig.getEnabledInChannels(), + out_ch=daqconfig.getEnabledOutChannels(), + blocksize=self.daq.nFramesPerBlock, + dtype=self.daq.getNumpyDataType(), + ) + self.running = True + def streamCallback(self, indata, outdata, nframes): + """ + This is called (from a separate thread) for each block + of audio data. + """ + if not self.running: + return 1 + self.aframectr += 1 - # Counters for the number of frames that have been coming in - self._aframectr = Atomic(0) - self._vframectr = Atomic(0) + # TODO: Fix this. This gives bug on Windows, the threading lock does + # give a strange erro. + try: + if not self.running: + return 1 + except Exception as e: + print(e) - # Lock - self._callbacklock = Lock() - - self._running = Atomic(False) - - self._video = video - self._video_started = Atomic(False) - - # Storage for callbacks, specified by type - self._callbacks = { - AvType.audio_input: [], - AvType.audio_output: [], - AvType.video: [] - } - - # Possible, but long not tested: store video - self._videothread = None - - # self._audiobackend = RtAudio(daqconfig.api) - self._daq = Daq(device, daqconfig) - self.blocksize = self._daq.nFramesPerBlock - self.samplerate = self._daq.samplerate - self.dtype = self._daq.getNumpyDataType() - - def nCallbacks(self): - """Returns the current number of installed callbacks.""" - return len(self._callbacks[AvType.audio_input]) + \ - len(self._callbacks[AvType.audio_output]) + \ - len(self._callbacks[AvType.video]) - - def addCallback(self, cb: callable, cbtype: AvType): - """Add as stream callback to the list of callbacks.""" - with self._callbacklock: - outputcallbacks = self._callbacks[AvType.audio_output] - if cbtype == AvType.audio_output and len(outputcallbacks) > 0: - raise RuntimeError( - 'Only one audio output callback can be allowed') - - if cb not in self._callbacks[cbtype]: - self._callbacks[cbtype].append(cb) - - def removeCallback(self, cb, cbtype: AvType): - with self._callbacklock: - if cb in self._callbacks[cbtype]: - self._callbacks[cbtype].remove(cb) - - def start(self): - """Start the stream, which means the callbacks are called with stream - data (audio/video)""" - - if self._running: - raise RuntimeError('Stream already started') - - assert self._videothread is None - - self._running <<= True - if self._video is not None: - self._videothread = Thread(target=self._videoThread) - self._videothread.start() - else: - self._video_started <<= True - - self.samplerate = self._daq.start(self._audioCallback) - - def _videoThread(self): - cap = cv.VideoCapture(self._video) - if not cap.isOpened(): - cap.open() - vframectr = 0 - loopctr = 0 - while self._running: - ret, frame = cap.read() - # print(frame.shape) - if ret is True: - if vframectr == 0: - self._video_started <<= True - with self._callbacklock: - for cb in self._callbacks[AvType.video]: - cb(frame, vframectr) - vframectr += 1 - self._vframectr += 1 - else: - loopctr += 1 - if loopctr == 10: - print('Error: no video capture!') - time.sleep(0.2) - - cap.release() - print('stopped videothread') - - def _audioCallback(self, indata, outdata, nframes): - """This is called (from a separate thread) for each audio block.""" - self._aframectr += nframes - with self._callbacklock: - # Count the number of output callbacks. If no output callbacks are - # present, and there should be output callbacks, we explicitly set - # the output buffer to zero - noutput_cb = len(self._callbacks[AvType.audio_output]) - - # Loop over callbacks - if outdata is not None: - try: - if len(self._callbacks[AvType.audio_output]) == 0: - outdata[:, :] = 0 - for cb in self._callbacks[AvType.audio_output]: - cb(indata, outdata, self._aframectr()) - except Exception as e: - print(e) - return 2 - if indata is not None: - try: - for cb in self._callbacks[AvType.audio_input]: - cb(indata, outdata, self._aframectr()) - except Exception as e: - print(e) - return 1 - - return 0 if self._running else 1 + rv = self.processCallback(self, indata, outdata) + if rv != 0: + self.running <<= False + return rv def stop(self): - self._running <<= False + """ + Stop the DAQ stream. Should be called only once. + """ + daq = self.daq + self.daq = None + self.running <<= False + daq.stop() - if self._video: - self._videothread.join() - self._videothread = None + self.streammetadata = None - self._aframectr <<= 0 - self._vframectr <<= 0 - self._video_started <<= False - self._daq.stop() - self._daq = None +class AvStreamProcess(mp.Process): + """ + Different process on which all audio streams are running. + """ - def isRunning(self): - return self._running() + def __init__(self, pipe, msg_qlist, indata_qlist, outq): + """ + + Args: + pipe: Message control pipe on which commands are received. + msg_qlist: List of queues on which stream status and events are + sent. Here, everything is send, except for the captured data + itself. + indata_qlist: List of queues on which captured data from a DAQ is + send. This one gets all events, but also captured data. + outq: On this queue, the stream process receives data to be send as + output to the devices. + + """ + super().__init__() + + self.pipe = pipe + self.msg_qlist = msg_qlist + self.indata_qlist = indata_qlist + self.outq = outq + + self.devices = {} + self.daqconfigs = None + + # In, out, duplex + self.streams = {t: None for t in list(AvType)} + + # When this is set, a kill on the main process will also kill the + # siggen process. Highly wanted feature + self.daemon = True + + def run(self): + """ + The actual function running in a different process. + """ + # First things first, ignore interrupt signals + # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # Check for devices + self.rescanDaqDevices() + self.siggen_activated = Atomic(False) + + while True: + msg, data = self.pipe.recv() + logging.debug(f"Streamprocess obtained message {msg}") + + if msg == StreamMsg.activateSiggen: + self.siggen_activated <<= True + + elif msg == StreamMsg.deactivateSiggen: + self.siggen_activated <<= False + + elif msg == StreamMsg.scanDaqDevices: + self.rescanDaqDevices() + + elif msg == StreamMsg.stopAllStreams: + self.stopAllStreams() + + elif msg == StreamMsg.endProcess: + self.stopAllStreams() + # and.. exit! + return + + elif msg == StreamMsg.getStreamMetaData: + (avtype,) = data + stream = self.streams[avtype] + if stream is not None: + self.sendAllQueues( + StreamMsg.streamMetaData, avtype, stream.streammetadata + ) + else: + self.sendAllQueues( + StreamMsg.streamMetaData, avtype, None) + + elif msg == StreamMsg.startStream: + avtype, daqconfig = data + self.startStream(avtype, daqconfig) + + elif msg == StreamMsg.stopStream: + (avtype,) = data + self.stopStream(avtype) + + def startStream(self, avtype: AvType, daqconfig: DaqConfiguration): + """ + Start a stream, based on type and configuration + + """ + self.stopRequiredExistingStreams(avtype) + # Empty the queue from existing stuff (puts the signal generator + # directly in action!). + if avtype in (AvType.audio_duplex, AvType.audio_output): + while not self.outq.empty(): + self.outq.get() + try: + stream = AudioStream(avtype, self.devices, + daqconfig, self.streamCallback) + self.streams[avtype] = stream + self.sendAllQueues( + StreamMsg.streamStarted, avtype, stream.streammetadata + ) + + except Exception as e: + self.sendAllQueues( + StreamMsg.streamError, avtype, f"Error starting stream: {str(e)}" + ) + return + + + def stopStream(self, avtype: AvType): + """ + Stop an existing stream, and sets the attribute in the list of streams + to None + + Args: + stream: AudioStream instance + """ + stream = self.streams[avtype] + if stream is not None: + try: + stream.stop() + self.sendAllQueues( + StreamMsg.streamStopped, stream.avtype) + except Exception as e: + self.sendAllQueues( + StreamMsg.streamError, + stream.avtype, + "Error occured in stopping stream: {str(e)}", + ) + self.streams[avtype] = None + + def stopRequiredExistingStreams(self, avtype: AvType): + """ + Stop all existing streams that conflict with the current avtype + """ + if avtype == AvType.audio_input: + # For a new input, duplex and input needs to be stopped + stream_to_stop = (AvType.audio_input, AvType.audio_duplex) + elif avtype == AvType.audio_output: + # For a new output, duplex and output needs to be stopped + stream_to_stop = (AvType.audio_output, AvType.audio_duplex) + elif avtype == AvType.audio_duplex: + # All others have to stop + stream_to_stop = list(AvType) # All of them + else: + raise ValueError("BUG") + + for stream in stream_to_stop: + if stream is not None: + self.stopStream(stream) + + def stopAllStreams(self): + """ + Stops all streams + """ + for key in self.streams.keys(): + self.stopStream(key) + + def isStreamRunning(self, avtype: AvType = None): + """ + Check whether a stream is running + + Args: + avtype: The stream type to check whether it is still running. If + None, it checks all streams. + + Returns: + True if a stream is running, otherwise false + """ + if avtype is None: + avtype = list(AvType) + else: + avtype = (avtype,) + for t in avtype: + if self.streams[t] is not None and self.streams[t].running(): + return True + + return False + + def rescanDaqDevices(self): + """ + Rescan the available DaQ devices. + + """ + if self.isStreamRunning(): + self.sendAllQueues( + StreamMsg.streamError, + None, + "A stream is running, cannot rescan DAQ devices.", + ) + return + + self.devices = Daq.getDeviceInfo() + self.sendAllQueues(StreamMsg.deviceList, self.devices) + + def streamCallback(self, audiostream, indata, outdata): + """This is called (from a separate thread) for each audio block.""" + # logging.debug('streamCallback()') + if outdata is not None: + if self.siggen_activated(): + if not self.outq.empty(): + newdata = self.outq.get() + if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1: + msgtxt = "Invalid output data obtained from queue" + logging.fatal(msgtxt) + self.sendAllQueues( + StreamMsg.streamFatalError, audiostream.avtype, msgtxt + ) + return 1 + outdata[:, :] = newdata[:, None] + else: + msgtxt = "Signal generator buffer underflow. Signal generator cannot keep up with data generation." + # logging.error(msgtxt) + self.sendAllQueues( + StreamMsg.streamTemporaryError, audiostream.avtype, msgtxt + ) + outdata[:, :] = 0 + + # Siggen not activated + else: + logging.debug("siggen not activated") + outdata[:, :] = 0 + + if indata is not None: + self.sendInQueues(StreamMsg.streamData, indata) + + return 0 + + # Wrapper functions that safe some typing, they do not require an + # explanation. + def sendInQueues(self, msg, *data): + # logging.debug('sendInQueues()') + for q in self.indata_qlist: + # Fan out the input data to all queues in the queue list + q.put((msg, data)) + + def sendAllQueues(self, msg, *data): + """ + Destined for all queues, including capture data queues + """ + self.sendInQueues(msg, *data) + for q in self.msg_qlist: + # Fan out the input data to all queues in the queue list + q.put((msg, data)) + + +@dataclass +class StreamStatus: + lastStatus: StreamMsg = StreamMsg.streamStopped + errorTxt: str = None + streammetadata: StreamMetaData = None + + +class StreamManager: + """ + Audio and video data stream manager, to which queus can be added + """ + + def __init__(self): + """Open a stream for audio in/output and video input. For audio output, + + """ + + # Initialize streamstatus + self.streamstatus = {t: StreamStatus() for t in list(AvType)} + + self.devices = None + + # Multiprocessing manager, pipe, output queue, input queue, + self.manager = mp.managers.SyncManager() + + # Start this manager and ignore interrupts + # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing + self.manager.start(ignoreSigInt) + + # List of queues for all entities that require 'microphone' or input + # data. We need a local list, to manage listener queues, as the queues + # which are in the manager list get a new object id. The local list is + # used to find the index in the manager queues list upon deletion by + # 'removeListener()' + self.indata_qlist = self.manager.list([]) + self.indata_qlist_local = [] + + self.msg_qlist = self.manager.list([]) + self.msg_qlist_local = [] + + # Queue used for signal generator data + self.outq = self.manager.Queue() + + # Messaging pipe + self.pipe, child_pipe = mp.Pipe(duplex=True) + + # This is the queue on which this class listens for stream process + # messages. + self.our_msgqueue = self.addMsgQueueListener() + + # Create the stream process + self.streamProcess = AvStreamProcess(child_pipe, + self.msg_qlist, + self.indata_qlist, self.outq) + self.streamProcess.start() + + def handleMessages(self): + """ + Handle messages that are still on the pipe. + """ + # logging.debug('StreamManager::handleMessages()') + msgs = [] + while not self.our_msgqueue.empty(): + msg, data = self.our_msgqueue.get() + logging.debug(f'StreamManager obtained message {msg}') + if msg == StreamMsg.streamStarted: + avtype, streammetadata = data + # logging.debug(f'{avtype}, {streammetadata}') + self.streamstatus[avtype].lastStatus = msg + self.streamstatus[avtype].errorTxt = None + self.streamstatus[avtype].streammetadata = streammetadata + + elif msg == StreamMsg.streamStopped: + (avtype,) = data + self.streamstatus[avtype].lastStatus = msg + self.streamstatus[avtype].errorTxt = None + self.streamstatus[avtype].streammetadata = None + + elif msg == StreamMsg.streamError: + avtype, errorTxt = data + if avtype is not None: + self.streamstatus[avtype].lastStatus = msg + self.streamstatus[avtype].errorTxt = errorTxt + logging.debug(f'Message: {errorTxt}') + + elif msg == StreamMsg.streamTemporaryError: + avtype, errorTxt = data + if avtype is not None: + logging.debug(f'Message: {errorTxt}') + + elif msg == StreamMsg.streamFatalError: + avtype, errorTxt = data + logging.critical(f"Streamprocess fatal error: {errorTxt}") + self.cleanup() + + elif msg == StreamMsg.streamMetaData: + avtype, metadata = data + self.streamstatus[avtype].streammetadata = metadata + + elif msg == StreamMsg.deviceList: + devices, = data + # logging.debug(devices) + self.devices = devices + msgs.append((msg, data)) + + return msgs + + def getDeviceList(self): + self.handleMessages() + return self.devices + + def rescanDaqDevices(self): + """ + Output the message to the stream process to rescan the list of devices + """ + self.sendPipe(StreamMsg.scanDaqDevices, None) + + def getStreamStatus(self, avtype: AvType): + """ + Sends a request for the stream status over the pipe, for given AvType + """ + self.handleMessages() + self.sendPipe(StreamMsg.getStreamMetaData, avtype) + + def getOutputQueue(self): + """ + Returns the output queue object. + + Note, should (of course) only be used by one signal generator at the time! + """ + self.handleMessages() + return self.outq + + def activateSiggen(self): + self.handleMessages() + logging.debug("activateSiggen()") + self.sendPipe(StreamMsg.activateSiggen, None) + + def deactivateSiggen(self): + self.handleMessages() + logging.debug("deactivateSiggen()") + self.sendPipe(StreamMsg.deactivateSiggen, None) + + def addMsgQueueListener(self): + """ + Add a listener queue to the list of message queues, and return the + queue. + + Returns: + listener queue + """ + newqueue = self.manager.Queue() + self.msg_qlist.append(newqueue) + self.msg_qlist_local.append(newqueue) + return newqueue + + def removeMsgQueueListener(self, queue): + """ + Remove an input listener queue from the message queue list. + """ + # Uses a local queue list to find the index, based on the queue + idx = self.msg_qlist_local.index(queue) + del self.msg_qlist_local[idx] + del self.msg_qlist[idx] + + def addInQueueListener(self): + """ + Add a listener queue to the list of queues, and return the queue. + + Returns: + listener queue + """ + newqueue = self.manager.Queue() + self.indata_qlist.append(newqueue) + self.indata_qlist_local.append(newqueue) + return newqueue + + def removeInQueueListener(self, queue): + """ + Remove an input listener queue from the queue list. + """ + # Uses a local queue list to find the index, based on the queue + idx = self.indata_qlist_local.index(queue) + del self.indata_qlist[idx] + del self.indata_qlist_local[idx] + + def startStream(self, avtype: AvType, daqconfig: DaqConfiguration, wait=False): + """ + Start the stream, which means the callbacks are called with stream + data (audio/video) + + Args: + wait: Wait until the stream starts talking before returning from + this function. + + """ + logging.debug("Starting stream...") + self.handleMessages() + self.sendPipe(StreamMsg.startStream, avtype, daqconfig) + if wait: + # Wait for a message to come into the pipe + while True: + if self.pipe.poll(): + self.handleMessages() + if self.streamstatus[avtype].lastStatus != StreamMsg.streamStopped: + break + + def stopStream(self, avtype: AvType): + self.handleMessages() + self.sendPipe(StreamMsg.stopStream, avtype) + + def stopAllStreams(self): + self.sendPipe(StreamMsg.stopAllStreams) + + def cleanup(self): + """ + Stops the stream if it is still running, and after that, it stops the + stream process. + + This method SHOULD always be called before removing a AvStream object. + Otherwise things will wait forever... + + """ + self.sendPipe(StreamMsg.endProcess, None) + logging.debug("Joining stream process...") + self.streamProcess.join() + logging.debug("Joining stream process done") def hasVideo(self): - return True if self._video is not None else False + """ + Stub, TODO: for future + """ + return False + + def sendPipe(self, msg, *data): + """ + Send a message with data over the control pipe + """ + self.pipe.send((msg, data)) diff --git a/lasp/lasp_common.py b/lasp/lasp_common.py index 2125b73..3490e5e 100644 --- a/lasp/lasp_common.py +++ b/lasp/lasp_common.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import os -import platform +import os, platform import shelve import sys import appdirs @@ -11,6 +10,7 @@ from .wrappers import Window as wWindow from collections import namedtuple from dataclasses import dataclass from dataclasses_json import dataclass_json +from enum import Enum, unique, auto """ Common definitions used throughout the code. @@ -18,8 +18,9 @@ Common definitions used throughout the code. __all__ = [ 'P_REF', 'FreqWeighting', 'TimeWeighting', 'getTime', 'getFreq', 'Qty', - 'SIQtys', + 'SIQtys', 'Window', 'lasp_shelve', 'this_lasp_shelve', 'W_REF', 'U_REF', 'I_REF', 'dBFS_REF', + 'AvType' ] # Reference sound pressure level @@ -42,6 +43,22 @@ U_REF = 5e-8 # 50 nano meter / s # hence this is the reference level as specified below. dBFS_REF = 0.5*2**0.5 # Which level would be -3.01 dBFS +@unique +class AvType(Enum): + """Specificying the type of data, for adding and removing callbacks from + the stream.""" + + # Input stream + audio_input = (0, 'input') + + # Output stream + audio_output = (1, 'output') + + # Both input as well as output + audio_duplex = (2, 'duplex') + # video = 4 + + @dataclass_json @dataclass class Qty: @@ -62,7 +79,7 @@ class SIQtys: unit_name='No unit / full scale', unit_symb='-', level_unit=('dBFS',), - level_ref_name=('Full scale sine wave',), + level_ref_name=('Relative to full scale sine wave',), level_ref_value=(dBFS_REF,) ) AP = Qty(name='Acoustic Pressure', @@ -81,13 +98,13 @@ class SIQtys: level_ref_value=(1.0,), ) types = (N, AP, V) - default = AP + default = N default_index = 0 @staticmethod def fillComboBox(cb): """ - Fill FreqWeightings to a combobox + Fill to a combobox Args: cb: QComboBox to fill @@ -126,7 +143,7 @@ class CalibrationSettings: @staticmethod def fillComboBox(cb): """ - Fill FreqWeightings to a combobox + Fill Calibration Settings to a combobox Args: cb: QComboBox to fill @@ -246,16 +263,14 @@ class this_lasp_shelve(Shelve): return os.path.join(lasp_appdir, f'{node}_config.shelve') -class Window: +@unique +class Window(Enum): hann = (wWindow.hann, 'Hann') hamming = (wWindow.hamming, 'Hamming') rectangular = (wWindow.rectangular, 'Rectangular') bartlett = (wWindow.bartlett, 'Bartlett') blackman = (wWindow.blackman, 'Blackman') - types = (hann, hamming, rectangular, bartlett, blackman) - default = 0 - @staticmethod def fillComboBox(cb): """ @@ -265,12 +280,13 @@ class Window: cb: QComboBox to fill """ cb.clear() - for tw in Window.types: - cb.addItem(tw[1], tw) - cb.setCurrentIndex(Window.default) + for w in list(Window): + cb.addItem(w.value[1], w) + cb.setCurrentIndex(0) + @staticmethod def getCurrent(cb): - return Window.types[cb.currentIndex()] + return list(Window)[cb.currentIndex()] class TimeWeighting: @@ -283,6 +299,7 @@ class TimeWeighting: infinite = (0, 'Infinite') types_realtime = (ufast, fast, slow, tens, infinite) types_all = (none, uufast, ufast, fast, slow, tens, infinite) + default = fast default_index = 3 default_index_realtime = 1 @@ -319,11 +336,11 @@ class FreqWeighting: """ Frequency weighting types """ + Z = ('Z', 'Z-weighting') A = ('A', 'A-weighting') C = ('C', 'C-weighting') - Z = ('Z', 'Z-weighting') types = (A, C, Z) - default = A + default = Z default_index = 0 @staticmethod diff --git a/lasp/lasp_imptube.py b/lasp/lasp_imptube.py index 56d471d..8a00a89 100644 --- a/lasp/lasp_imptube.py +++ b/lasp/lasp_imptube.py @@ -6,11 +6,13 @@ Author: J.A. de Jong - ASCEE Description: Two-microphone impedance tube methods """ __all__ = ['TwoMicImpedanceTube'] -from lrftubes import Air +# from lrftubes import Air from .lasp_measurement import Measurement from numpy import pi, sqrt, exp import numpy as np from scipy.interpolate import UnivariateSpline +# from lrftubes import PrsDuct +from functools import lru_cache class TwoMicImpedanceTube: def __init__(self, mnormal: Measurement, @@ -21,7 +23,9 @@ class TwoMicImpedanceTube: fl: float = None, fu: float = None, periodic_method=False, - mat= Air(), + # mat= Air(), + D_imptube = 50e-3, + thermoviscous = True, **kwargs): """ @@ -60,6 +64,9 @@ class TwoMicImpedanceTube: kmax = ksmax/s self.fu = kmax*mat.c0/2/pi + self.thermoviscous = thermoviscous + self.D_imptube = D_imptube + self.periodic_method = periodic_method self.channels = [kwargs.pop('chan0', 0), kwargs.pop('chan1', 1)] # Compute calibration correction @@ -82,8 +89,9 @@ class TwoMicImpedanceTube: # Calibration correction factor # self.K = 0*self.freq + 1.0 K = sqrt(C2[:,0,1]*C1[:,0,0]/(C2[:,1,1]*C1[:,1,0])) - self.K = UnivariateSpline(self.freq, K.real)(self.freq) +\ - 1j*UnivariateSpline(self.freq, K.imag)(self.freq) + # self.K = UnivariateSpline(self.freq, K.real)(self.freq) +\ + # 1j*UnivariateSpline(self.freq, K.imag)(self.freq) + self.K = K def cut_to_limits(self, ar): return ar[self.il:self.ul] @@ -94,6 +102,7 @@ class TwoMicImpedanceTube: """ return self.cut_to_limits(self.freq) + @lru_cache def G_AB(self, meas): if meas is self.mnormal: C = self.C1 diff --git a/lasp/lasp_logging.py b/lasp/lasp_logging.py new file mode 100644 index 0000000..0b90e20 --- /dev/null +++ b/lasp/lasp_logging.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +""" +Author: J.A. de Jong + +Description: configure the logging of messages +""" +import logging, sys +# __all__ = ['configureLogging'] + +# global_loglevel = None + +# def configureLogging(level=None): + +# # Oh yeah, one global variable +# global global_loglevel +# if level is None: +# level is global_loglevel +# else: +# global_loglevel = level + +# if level is None: +# raise RuntimeError('Log level has not yet been set application wide') + + diff --git a/lasp/lasp_measurement.py b/lasp/lasp_measurement.py index 1e86a24..94120ee 100644 --- a/lasp/lasp_measurement.py +++ b/lasp/lasp_measurement.py @@ -48,8 +48,6 @@ import os, time, wave, logging from .lasp_common import SIQtys, Qty, getFreq from .device import DaqChannel from .wrappers import AvPowerSpectra, Window, PowerSpectra -logger = logging.Logger(__name__) - def getSampWidth(dtype): @@ -210,11 +208,18 @@ class Measurement: self.N = (self.nblocks * self.blocksize) self.T = self.N / self.samplerate + # Due to a previous bug, the channel names were not stored + # consistently, i.e. as 'channel_names' and later camelcase. try: - self._channel_names = f.attrs['channel_names'] + self._channelNames = f.attrs['channelNames'] except KeyError: - # No channel names found in measurement file - self._channel_names = [f'Unnamed {i}' for i in range(self.nchannels)] + try: + self._channelNames = f.attrs['channel_names'] + logging.info("Measurement file obtained which stores channel names with *old* attribute 'channel_names'") + except KeyError: + # No channel names found in measurement file + logging.info('No channel name data found in measurement') + self._channelNames = [f'Unnamed {i}' for i in range(self.nchannels)] # comment = read-write thing try: @@ -234,7 +239,6 @@ class Measurement: self._time = f.attrs['time'] - try: qtys_json = f.attrs['qtys'] # Load quantity data @@ -242,14 +246,18 @@ class Measurement: except KeyError: # If quantity data is not available, this is an 'old' # measurement file. - logger.debug('Physical quantity data not available in measurement file. Assuming {SIQtys.default}') + logging.debug(f'Physical quantity data not available in measurement file. Assuming {SIQtys.default}') self._qtys = [SIQtys.default for i in range(self.nchannels)] def setAttribute(self, atrname, value): + """ + Set an attribute in the measurement file, and keep a local copy in + memory for efficient accessing. + """ with self.file('r+') as f: # Update comment attribute in the file f.attrs[atrname] = value - setattr(self, '_' + atrname, value) + setattr(self, '_' + atrname, value) @property def name(self): @@ -258,7 +266,7 @@ class Measurement: @property def channelNames(self): - return self._channel_names + return self._channelNames @channelNames.setter def channelNames(self, newchnames): diff --git a/lasp/lasp_multiprocessingpatch.py b/lasp/lasp_multiprocessingpatch.py new file mode 100644 index 0000000..fcd8e79 --- /dev/null +++ b/lasp/lasp_multiprocessingpatch.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +""" +Author: J.A. de Jong + +Description: MonkeyPatch required to let the Multiprocessing library work properly. +Should be applied prior to running any other multiprocessing code. Comes from +Stackoverflow and is mainly used for managing a list of queues that can be +shared between processes. + +For more information, see: +https://stackoverflow.com/questions/46779860/multiprocessing-managers-and-custom-classes +""" +from multiprocessing import managers +import logging +from functools import wraps +from inspect import signature + +orig_AutoProxy = managers.AutoProxy + +__all__ = ['apply_patch'] + + +@wraps(managers.AutoProxy) +def AutoProxy(*args, incref=True, manager_owned=False, **kwargs): + # Create the autoproxy without the manager_owned flag, then + # update the flag on the generated instance. If the manager_owned flag + # is set, `incref` is disabled, so set it to False here for the same + # result. + autoproxy_incref = False if manager_owned else incref + proxy = orig_AutoProxy(*args, incref=autoproxy_incref, **kwargs) + proxy._owned_by_manager = manager_owned + return proxy + + +def apply_patch(): + if "manager_owned" in signature(managers.AutoProxy).parameters: + return + + logging.debug("Patching multiprocessing.managers.AutoProxy to add manager_owned") + managers.AutoProxy = AutoProxy + + # re-register any types already registered to SyncManager without a custom + # proxy type, as otherwise these would all be using the old unpatched AutoProxy + SyncManager = managers.SyncManager + registry = managers.SyncManager._registry + for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items(): + if proxytype is not orig_AutoProxy: + continue + create_method = hasattr(managers.SyncManager, typeid) + SyncManager.register( + typeid, + callable=callable, + exposed=exposed, + method_to_typeid=method_to_typeid, + create_method=create_method, + ) + diff --git a/lasp/lasp_record.py b/lasp/lasp_record.py index b208f36..e94f748 100644 --- a/lasp/lasp_record.py +++ b/lasp/lasp_record.py @@ -1,57 +1,207 @@ -#!/usr/bin/python3 +#!/usr/bin/python3.8 # -*- coding: utf-8 -*- """ Read data from stream and record sound and video at the same time """ -from .lasp_atomic import Atomic -from threading import Condition -from .lasp_avstream import AvType, AvStream -import h5py -import dataclasses -import os -import time +import dataclasses, logging, os, time, h5py +from .lasp_avstream import StreamManager, StreamMetaData, StreamMsg +from .lasp_common import AvType + @dataclasses.dataclass class RecordStatus: curT: float done: bool -class Recording: - def __init__(self, fn: str, stream: AvStream, - rectime: float=None, wait: bool = True, - progressCallback=None): +class Recording: + """ + Class used to perform a recording. + """ + def __init__(self, fn: str, streammgr: StreamManager, + rectime: float = None, wait: bool = True, + progressCallback=None, + startDelay: float=0): """ + Start a recording. Blocks if wait is set to True. Args: - fn: Filename to record to. extension is added + fn: Filename to record to. Extension is automatically added if not + provided. stream: AvStream instance to record from. Should have input channels! - rectime: Recording time, None for infinite + rectime: Recording time [s], None for infinite, in seconds. If set + to None, or np.inf, the recording continues indefintely. + progressCallback: callable that is called with an instance of + RecordStatus instance as argument. + startDelay: Optional delay added before the recording is *actually* + started in [s]. """ ext = '.h5' if ext not in fn: fn += ext - self._stream = stream - self.blocksize = stream.blocksize - self.samplerate = stream.samplerate - self._running = Atomic(False) - self._running_cond = Condition() - self.rectime = rectime - self._fn = fn + self.smgr = streammgr + self.metadata = None - self._video_frame_positions = [] - self._curT_rounded_to_seconds = 0 + assert startDelay >= 0 + self.startDelay = startDelay + # Flag used to indicate that we have passed the start delay + self.startDelay_passed = False + self.rectime = rectime + self.fn = fn - self._ablockno = Atomic(0) - self._vframeno = 0 + self.video_frame_positions = [] + self.curT_rounded_to_seconds = 0 - self._progressCallback = progressCallback - self._wait = wait + # Counter of the number of blocks + self.ablockno = 0 + self.vframeno = 0 - self._f = h5py.File(self._fn, 'w') - self._deleteFile = False + self.progressCallback = progressCallback + self.wait = wait + + self.f = h5py.File(self.fn, 'w') + + # This flag is used to delete the file on finish(), and can be used + # when a recording is canceled. + self.deleteFile = False + + try: + # Input queue + self.inq = streammgr.addInQueueListener() + + except RuntimeError: + # Cleanup stuff, something is going wrong when starting the stream + try: + self.f.close() + except Exception as e: + logging.error( + 'Error preliminary closing measurement file {fn}: {str(e)}') + + self.__deleteFile() + raise + + # Try to obtain stream metadata + streammgr.getStreamStatus(AvType.audio_input) + streammgr.getStreamStatus(AvType.audio_duplex) + + self.ad = None + + logging.debug('Starting record....') + # TODO: Fix this later when we want video + # if stream.hasVideo(): + # stream.addCallback(self.aCallback, AvType.audio_input) + self.stop = False + + if self.wait: + logging.debug('Stop recording with CTRL-C') + try: + while not self.stop: + self.handleQueue() + time.sleep(0.01) + except KeyboardInterrupt: + logging.debug("Keyboard interrupt on record") + finally: + self.finish() + + def handleQueue(self): + """ + This method should be called to grab data from the input queue, which + is filled by the stream, and put it into a file. It should be called at + a regular interval to prevent overflowing of the queue. It is called + within the start() method of the recording, if block is set to True. + Otherwise, it should be called from its parent at regular intervals. + For example, in Qt this can be done using a QTimer. + + + """ + # logging.debug('handleQueue()') + while self.inq.qsize() > 0: + msg, data = self.inq.get() + # logging.debug(f'Obtained message: {msg}') + if msg == StreamMsg.streamData: + samples, = data + self.__addTimeData(samples) + elif msg == StreamMsg.streamStarted: + logging.debug(f'handleQueue obtained message {msg}') + avtype, metadata = data + if metadata is None: + raise RuntimeError('BUG: no stream metadata') + if avtype in (AvType.audio_duplex, AvType.audio_input): + self.processStreamMetaData(metadata) + elif msg == StreamMsg.streamMetaData: + logging.debug(f'handleQueue obtained message {msg}') + avtype, metadata = data + if metadata is not None: + self.processStreamMetaData(metadata) + elif msg == StreamMsg.streamTemporaryError: + pass + else: + logging.debug(f'handleQueue obtained message {msg}') + # An error occured, we do not remove the file, but we stop. + self.stop = True + logging.debug(f'Stream message: {msg}. Recording stopped unexpectedly') + raise RuntimeError('Recording stopped unexpectedly') + + + def processStreamMetaData(self, md: StreamMetaData): + """ + Stream metadata has been catched. This is used to set all metadata in + the measurement file + + """ + logging.debug('Recording::processStreamMetaData()') + if self.metadata is not None: + # Metadata already obtained. We check whether the new metadata is + # compatible. Otherwise an error occurs + if md != self.metadata: + raise RuntimeError('BUG: Incompatible stream metadata!') + return + + # The 'Audio' dataset as specified in lasp_measurement, where data is + # send to. We use gzip as compression, this gives moderate a moderate + # compression to the data. + f = self.f + blocksize = md.blocksize + nchannels = len(md.in_ch) + self.ad = f.create_dataset('audio', + (1, blocksize, nchannels), + dtype=md.dtype, + maxshape=( + None, # This means, we can add blocks + # indefinitely + blocksize, + nchannels), + compression='gzip' + ) + + # TODO: This piece of code is not up-to-date and should be changed at a + # later instance once we really want to record video simultaneously + # with audio. + # if smgr.hasVideo(): + # video_x, video_y = smgr.video_x, smgr.video_y + # self.vd = f.create_dataset('video', + # (1, video_y, video_x, 3), + # dtype='uint8', + # maxshape=( + # None, video_y, video_x, 3), + # compression='gzip' + # ) + + # Set the bunch of attributes + f.attrs['samplerate'] = md.fs + f.attrs['nchannels'] = nchannels + f.attrs['blocksize'] = blocksize + f.attrs['sensitivity'] = [ch.sensitivity for ch in md.in_ch] + f.attrs['channelNames'] = [ch.channel_name for ch in md.in_ch] + f.attrs['time'] = time.time() + self.blocksize = blocksize + self.fs = md.fs + + # Measured physical quantity metadata + f.attrs['qtys'] = [ch.qty.to_json() for ch in md.in_ch] + self.metadata = md def setDelete(self, val: bool): """ @@ -59,122 +209,111 @@ class Recording: the recording. Typically used for cleaning up after canceling a recording. """ - self._deleteFile = val + self.deleteFile = val - def __enter__(self): + def finish(self): """ + This method should be called to finish and a close a recording file, + remove the queue from the stream, etc. - with Recording(fn, stream, wait=False): - event_loop_here() - - or: - - with Recording(fn, stream, wait=True): - pass """ + logging.debug('Recording::finish()') + smgr = self.smgr - stream = self._stream - f = self._f - nchannels = len(stream.input_channel_names) + # TODO: Fix when video + # if smgr.hasVideo(): + # smgr.removeCallback(self.vCallback, AvType.video_input) + # self.f['video_frame_positions'] = self.video_frame_positions - self._ad = f.create_dataset('audio', - (1, stream.blocksize, nchannels), - dtype=stream.dtype, - maxshape=(None, stream.blocksize, - nchannels), - compression='gzip' - ) - if stream.hasVideo(): - video_x, video_y = stream.video_x, stream.video_y - self._vd = f.create_dataset('video', - (1, video_y, video_x, 3), - dtype='uint8', - maxshape=( - None, video_y, video_x, 3), - compression='gzip' - ) + try: + smgr.removeInQueueListener(self.inq) + except Exception as e: + logging.error(f'Could not remove queue from smgr: {e}') - f.attrs['samplerate'] = stream.samplerate - f.attrs['nchannels'] = nchannels - f.attrs['blocksize'] = stream.blocksize - f.attrs['sensitivity'] = stream.input_sensitivity - f.attrs['channel_names'] = stream.input_channel_names - f.attrs['time'] = time.time() - f.attrs['qtys'] = [qty.to_json() for qty in stream.input_qtys] - self._running <<= True + try: + # Close the recording file + self.f.close() + except Exception as e: + logging.error(f'Error closing file: {e}') - if not stream.isRunning(): - stream.start() + logging.debug('Recording ended') + if self.deleteFile: + self.__deleteFile() - print('Starting record....') - stream.addCallback(self._aCallback, AvType.audio_input) - if stream.hasVideo(): - stream.addCallback(self._aCallback, AvType.audio_input) + def __deleteFile(self): + """ + Cleanup the recording file. + """ + try: + os.remove(self.fn) + except Exception as e: + logging.error(f'Error deleting file: {self.fn}') - if self._wait: - with self._running_cond: - print('Stop recording with CTRL-C') - try: - while self._running: - self._running_cond.wait() - except KeyboardInterrupt: - print("Keyboard interrupt on record") - self._running <<= False + def __addTimeData(self, indata): + """ + Called by handleQueue() and adds new time data to the storage file. + """ + # logging.debug('Recording::__addTimeData()') - - def __exit__(self, type, value, traceback): - self._running <<= False - stream = self._stream - stream.removeCallback(self._aCallback, AvType.audio_input) - if stream.hasVideo(): - stream.removeCallback(self._vCallback, AvType.video_input) - self._f['video_frame_positions'] = self._video_frame_positions - - self._f.close() - print('\nEnding record') - if self._deleteFile: - try: - os.remove(self._fn) - except Exception as e: - print(f'Error deleting file: {self._fn}') - - - def _aCallback(self, indata, outdata, aframe): - if indata is None: + if self.stop: + # Stop flag is raised. We stop recording here. return - curT = self._ablockno()*self.blocksize/self.samplerate + # The current time that is recorded and stored into the file, without + # the new data + if not self.metadata: + # We obtained stream data, but metadata is not yet available. + # Therefore, we request it explicitly and then we return + logging.info('Requesting stream metadata') + self.smgr.getStreamStatus(AvType.audio_input) + self.smgr.getStreamStatus(AvType.audio_duplex) + return + + curT = self.ablockno*self.blocksize/self.fs + + # Increase the block counter + self.ablockno += 1 + + if curT < self.startDelay and not self.startDelay_passed: + # Start delay has not been passed + return + elif curT >= 0 and not self.startDelay_passed: + # Start delay passed, switch the flag! + self.startDelay_passed = True + # Reset the audio block counter and the time + self.ablockno = 1 + curT = 0 + recstatus = RecordStatus( - curT = curT, - done = False) - if self._progressCallback is not None: - self._progressCallback(recstatus) + curT=curT, + done=False) + + if self.progressCallback is not None: + self.progressCallback(recstatus) curT_rounded_to_seconds = int(curT) - if curT_rounded_to_seconds > self._curT_rounded_to_seconds: - self._curT_rounded_to_seconds = curT_rounded_to_seconds + if curT_rounded_to_seconds > self.curT_rounded_to_seconds: + self.curT_rounded_to_seconds = curT_rounded_to_seconds print(f'{curT_rounded_to_seconds}', end='', flush=True) else: print('.', end='', flush=True) if self.rectime is not None and curT > self.rectime: # We are done! - self._running <<= False - with self._running_cond: - self._running_cond.notify() - if self._progressCallback is not None: + if self.progressCallback is not None: recstatus.done = True - self._progressCallback(recstatus) + self.progressCallback(recstatus) + self.stop = True return - self._ad.resize(self._ablockno()+1, axis=0) - self._ad[self._ablockno(), :, :] = indata - self._ablockno += 1 + # Add the data to the file, and resize the audio data blocks + self.ad.resize(self.ablockno, axis=0) + self.ad[self.ablockno-1, :, :] = indata - def _vCallback(self, frame, framectr): - self._video_frame_positions.append(self._ablockno()) - vframeno = self._vframeno - self._vd.resize(vframeno+1, axis=0) - self._vd[vframeno, :, :] = frame - self._vframeno += 1 + # def _vCallback(self, frame, framectr): + # self.video_frame_positions.append(self.ablockno()) + # vframeno = self.vframeno + # self.vd.resize(vframeno+1, axis=0) + # self.vd[vframeno, :, :] = frame + # self.vframeno += 1 diff --git a/lasp/lasp_siggen.py b/lasp/lasp_siggen.py new file mode 100644 index 0000000..1f73647 --- /dev/null +++ b/lasp/lasp_siggen.py @@ -0,0 +1,336 @@ +#!/usr/bin/env python3.6 +# -*- coding: utf-8 -*- +""" +Author: J.A. de Jong - ASCEE + +Description: Signal generator code + +""" +import multiprocessing as mp +import dataclasses +import logging +from typing import Tuple +import numpy as np + +from .filter import PinkNoise +from .lasp_octavefilter import SosOctaveFilterBank, SosThirdOctaveFilterBank +from .filter import OctaveBankDesigner, PinkNoise, ThirdOctaveBankDesigner +from .lasp_avstream import StreamManager, ignoreSigInt +from .wrappers import Siggen as pyxSiggen, Equalizer +from enum import Enum, unique, auto + +QUEUE_BUFFER_TIME = 0.5 # The amount of time used in the queues for buffering +# of data, larger is more stable, but also enlarges latency + +__all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"] + + +@unique +class SignalType(Enum): + Periodic = 0 + Noise = 1 + Sweep = 2 + Meas = 3 + + +@unique +class NoiseType(Enum): + white = "White noise" + pink = "Pink noise" + + def __str__(self): + return str(self.value) + + @staticmethod + def fillComboBox(combo): + for type_ in list(NoiseType): + combo.addItem(str(type_)) + + @staticmethod + def getCurrent(cb): + return list(NoiseType)[cb.currentIndex()] + + +class SiggenWorkerDone(Exception): + def __str__(self): + return "Done generating signal" + + +@unique +class SiggenMessage(Enum): + """ + Different messages that can be send to the signal generator over the pipe + connection. + """ + endProcess = auto() # Stop and quit the signal generator + adjustVolume = auto() # Adjust the volume + newEqSettings = auto() # Forward new equalizer settings + newSiggenData = auto() # Forward new equalizer settings + ready = auto() # Send out once, once the signal generator is ready with + # pre-generating data. + + # These messages are send back to the main thread over the pipe + error = auto() + done = auto() + + +@dataclasses.dataclass +class SiggenData: + """ + Metadata used to create a Signal Generator + """ + fs: float # Sample rate [Hz] + + # Number of frames "samples" to send in one block + nframes_per_block: int + + # The data type to output + dtype: np.dtype + + # Level of output signal [dBFS]el + level_dB: float + + # Signal type specific data, i.e. + signaltype: SignalType + signaltypedata: Tuple = None + + # Settings for the equalizer etc + eqdata: object = None # Equalizer data + +class SiggenProcess(mp.Process): + """ + Main function running in a different process, is responsible for generating + new signal data. Uses the signal queue to push new generated signal data + on. + """ + def __init__(self, siggendata, dataq, pipe): + + """ + Args: + siggendata: The signal generator data to start with. + dataq: The queue to put generated signal on + pipe: Control and status messaging pipe + """ + super().__init__() + + # When this is set, a kill on the main process will also kill the + # siggen process. Highly wanted feature + self.daemon = True + + self.dataq = dataq + self.siggendata = siggendata + self.pipe = pipe + self.eq = None + self.siggen = None + + fs = self.siggendata.fs + nframes_per_block = siggendata.nframes_per_block + self.nblocks_buffer = max( + 1, int(QUEUE_BUFFER_TIME * fs/ nframes_per_block) + ) + + def newSiggen(self, siggendata: SiggenData): + """ + Create a signal generator based on parameters specified in global + function data. + + Args: + siggendata: SiggenData. Metadata to create a new signal generator. + """ + + logging.debug('newSiggen') + + fs = siggendata.fs + nframes_per_block = siggendata.nframes_per_block + level_dB = siggendata.level_dB + signaltype = siggendata.signaltype + signaltypedata = siggendata.signaltypedata + + if signaltype == SignalType.Periodic: + freq, = signaltypedata + siggen = pyxSiggen.sineWave(fs, freq, level_dB) + elif signaltype == SignalType.Noise: + noisetype, zerodBpoint = signaltypedata + if noisetype == NoiseType.white: + sos_colorfilter = None + elif noisetype == NoiseType.pink: + sos_colorfilter = PinkNoise(fs, zerodBpoint).flatten() + else: + raise ValueError(f"Unknown noise type") + + siggen = pyxSiggen.noise(fs, level_dB, sos_colorfilter) + + elif signaltype == SignalType.Sweep: + fl, fu, Ts, Tq, sweep_flags = signaltypedata + siggen = pyxSiggen.sweep(fs, fl, fu, Ts, Tq, sweep_flags, level_dB) + + else: + raise ValueError(f"Not implemented signal type: {signaltype}") + + logging.debug('newSiggen') + return siggen + + def generate(self): + """ + Generate a single block of data and put it on the data queue + """ + signal = self.siggen.genSignal(self.siggendata.nframes_per_block) + dtype = self.siggendata.dtype + if self.eq is not None: + signal = self.eq.equalize(signal) + if np.issubdtype(dtype, np.integer): + bitdepth_fixed = dtype.itemsize * 8 + signal *= 2 ** (bitdepth_fixed - 1) - 1 + self.dataq.put(signal.astype(dtype)) + + def newEqualizer(self, eqdata): + """ + Create an equalizer object from equalizer data + + Args: + eqdata: dictionary containing equalizer data. TODO: document the + requiring fields. + """ + if eqdata is None: + return None + eq_type = eqdata['type'] + eq_levels = eqdata['levels'] + fs = self.siggendata.fs + + if eq_type == 'three': + fb = SosThirdOctaveFilterBank(fs) + elif eq_type == 'one': + fb = SosOctaveFilterBank(fs) + + eq = Equalizer(fb._fb) + if eq_levels is not None: + eq.setLevels(eq_levels) + return eq + + def run(self): + # The main function of the actual process + # First things first + ignoreSigInt() + + try: + self.siggen = self.newSiggen(self.siggendata) + except Exception as e: + self.pipe.send((SiggenMessage.error, str(e))) + + try: + self.eq = self.newEqualizer(self.siggendata.eqdata) + except Exception as e: + self.pipe.send((SiggenMessage.error, str(e))) + + # Pre-generate blocks of signal data + while self.dataq.qsize() < self.nblocks_buffer: + self.generate() + + self.pipe.send((SiggenMessage.ready, None)) + + while True: + # Wait here for a while, to check for messages to consume + if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 4): + msg, data = self.pipe.recv() + if msg == SiggenMessage.endProcess: + logging.debug("Signal generator caught 'endProcess' message. Exiting.") + return 0 + elif msg == SiggenMessage.adjustVolume: + level_dB = data + logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") + self.siggen.setLevel(level_dB) + elif msg == SiggenMessage.newEqSettings: + eqdata = data + self.eq = self.newEqualizer(eqdata) + elif msg == SiggenMessage.newSiggenData: + siggendata = data + self.siggen = self.newSiggen(siggendata) + else: + self.pipe.send( + SiggenMessage.error, "BUG: Generator caught unknown message. Quiting" + ) + while self.dataq.qsize() < self.nblocks_buffer: + # Generate new data and put it in the queue! + try: + self.generate() + except SiggenWorkerDone: + self.pipe.send(SiggenMessage.done) + return 0 + + return 1 + + +class Siggen: + """ + Signal generator class, generates signal data in a different process to + unload the work in the calling thread. + """ + + def __init__(self, dataq, siggendata: SiggenData): + """""" + + self.pipe, client_end = mp.Pipe(duplex=True) + + self.stopped = False + + self.process = SiggenProcess(siggendata, dataq, client_end) + self.process.start() + + if not self.process.is_alive(): + raise RuntimeError('Unexpected signal generator exception') + + # Block waiting here for signal generator to be ready + msg, data = self.pipe.recv() + if msg == SiggenMessage.ready: + logging.debug('Signal generator ready') + elif msg == SiggenMessage.error: + e = data + raise RuntimeError(f'Signal generator exception: {str(e)}') + else: + # Done, or something + if msg == SiggenMessage.done: + self.stopped = True + + self.handle_msgs() + + def setLevel(self, new_level): + """ + Set a new signal level to the generator + + Args: + new_level: The new level in [dBFS] + """ + self.pipe.send((SiggenMessage.adjustVolume, new_level)) + + def setEqData(self, eqdata): + self.pipe.send((SiggenMessage.newEqSettings, eqdata)) + + def setSiggenData(self, siggendata: SiggenData): + """ + Updates the whole signal generator, based on new signal generator data. + """ + self.pipe.send((SiggenMessage.newSiggenData, siggendata)) + + def handle_msgs(self): + while self.pipe.poll(): + msg, data = self.pipe.recv() + if msg == SiggenMessage.error: + raise RuntimeError( + f"Error in initialization of signal generator: {data}" + ) + # elif msg == SiggenMessage.done: + # self.stop() + + + def cleanup(self): + logging.debug('Siggen::stop()') + self.pipe.send((SiggenMessage.endProcess, None)) + self.pipe.close() + + logging.debug('Joining siggen process') + self.process.join() + logging.debug('Joining siggen process done') + self.process.close() + + self.process = None + diff --git a/lasp/tools/__init__.py b/lasp/tools/__init__.py index 04da443..5a820b5 100644 --- a/lasp/tools/__init__.py +++ b/lasp/tools/__init__.py @@ -1,6 +1,3 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from .config import init_backend -from lasp.lasp_common import FreqWeighting, TimeWeighting -__all__ = ['init_backend', - 'FreqWeighting', 'TimeWeighting'] +from .tools import * diff --git a/lasp/tools/config.py b/lasp/tools/config.py deleted file mode 100644 index 4e7a3f5..0000000 --- a/lasp/tools/config.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""! -Author: J.A. de Jong - ASCEE - -Description: -""" -__all__ = ['init_backend', 'getReportQuality'] - -_report_quality = False -_init = False - - -def init_matplotlib(report_quality=False): - global _init - if not _init: - _init = True - print('Initializing matplotlib...') - preamble = [ - r'\usepackage{libertine-type1}' - r'\usepackage[libertine]{newtxmath}' - # r'\usepackage{fontspec}', - # r'\setmainfont{Libertine}', - ] - params = { - 'font.family': 'serif', - 'text.usetex': True, - 'text.latex.unicode': True, - 'pgf.rcfonts': False, - 'pgf.texsystem': 'pdflatex', - 'pgf.preamble': preamble, - } - import matplotlib - matplotlib.rcParams.update(params) - global _report_quality - _report_quality = report_quality - - -def init_backend(report_quality=False): - global _init - if not _init: - import matplotlib - matplotlib.use('Qt5Agg', warn=False, force=True) - init_matplotlib(report_quality) - _init = True - import matplotlib.pyplot as plt - plt.ion() - - -def getReportQuality(): - global _report_quality - return _report_quality diff --git a/lasp/tools/plot.py b/lasp/tools/plot.py deleted file mode 100644 index 660edc9..0000000 --- a/lasp/tools/plot.py +++ /dev/null @@ -1,210 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""! -Author: J.A. de Jong - ASCEE - -Description: -""" -__all__ = ['Figure', 'Bode', 'PS', 'PSD'] - -from .config import getReportQuality -import matplotlib.pyplot as plt -import numpy as np -from cycler import cycler -from lasp.lasp_common import (PLOT_COLORS_LIST, PLOT_NOCOLORS_LIST, - DEFAULT_FIGSIZE_H, DEFAULT_FIGSIZE_W) - - -class Figure: - def __init__(self, **kwargs): - ncols = kwargs.pop('ncols', 1) - nrows = kwargs.pop('nrows', 1) - color = kwargs.pop('color', (PLOT_NOCOLORS_LIST if getReportQuality() - else PLOT_COLORS_LIST)) - if isinstance(color, bool): - if color: - color = PLOT_COLORS_LIST - else: - color = PLOT_NOCOLORS_LIST - colors = cycler('color', color) - - figsize = kwargs.pop('figsize', (DEFAULT_FIGSIZE_W, DEFAULT_FIGSIZE_H)) - self._f = plt.figure(figsize=figsize) - - marker = kwargs.pop('marker', False) - if marker: - markers = cycler(marker=['o', 's', 'D', 'X', 'v', '^', '<', '>']) - else: - markers = cycler(marker=[None]*8) - - linewidths = cycler(linewidth=[1, 2, 1, 2, 2, 3, 2, 1]) - - linestyles = cycler( - linestyle=['-', '-', '--', ':', '-', '--', ':', '-.', ]) - - self._ax = [] - self._legend = {} - for row in range(nrows): - self._legend[row] = {} - for col in range(ncols): - self._legend[row][col] = [] - ax = self._f.add_subplot(100*nrows - + 10*ncols - + (row*ncols + col)+1) - ax.set_prop_cycle( - colors+linestyles+markers+linewidths) - self._ax.append(ax) - self._ncols = ncols - self._cur_ax = self._ax[0] - self._cur_col = 0 - self._cur_row = 0 - - self._zorder = -1 - - def setAx(self, row, col): - self._cur_ax = self._ax[row*self._ncols+col] - - @property - def fig(self): - return self._f - - def markup(self): - for ax in self._ax: - ax.grid(True, 'both') - self._zorder -= 1 - self.fig.show() - - def vline(self, x): - self._ax[0].axvline(x) - - def plot(self, *args, **kwargs): - line = self._cur_ax.plot(*args, **kwargs, zorder=self._zorder) - self.markup() - return line - - def loglog(self, *args, **kwargs): - line = self._cur_ax.loglog(*args, **kwargs, zorder=self._zorder) - self.markup() - return line - - def semilogx(self, *args, **kwargs): - line = self._cur_ax.semilogx(*args, **kwargs, zorder=self._zorder) - self.markup() - return line - - def xlabel(self, *args, **kwargs): - all_ax = kwargs.pop('all_ax', False) - if all_ax: - for ax in self._ax: - ax.set_xlabel(*args, **kwargs) - else: - self._cur_ax.set_xlabel(*args, **kwargs) - - def ylabel(self, *args, **kwargs): - all_ax = kwargs.pop('all_ax', False) - if all_ax: - for ax in self._ax: - ax.set_ylabel(*args, **kwargs) - else: - self._cur_ax.set_ylabel(*args, **kwargs) - - def legend(self, leg, *args, **kwargs): - # all_ax = kwargs.pop('all_ax', False) - if isinstance(leg, list) or isinstance(leg, tuple): - self._legend[self._cur_col][self._cur_col] = list(leg) - else: - self._legend[self._cur_col][self._cur_col].append(leg) - self._cur_ax.legend(self._legend[self._cur_col][self._cur_col]) - - def savefig(self, *args, **kwargs): - self.fig.savefig(*args, **kwargs) - - def xlim(self, *args, **kwargs): - all_ax = kwargs.pop('all_ax', False) - if all_ax: - for ax in self._ax: - ax.set_xlim(*args, **kwargs) - else: - self._cur_ax.set_xlim(*args, **kwargs) - - def ylim(self, *args, **kwargs): - all_ax = kwargs.pop('all_ax', False) - if all_ax: - for ax in self._ax: - ax.set_ylim(*args, **kwargs) - else: - self._cur_ax.set_ylim(*args, **kwargs) - - def title(self, *args, **kwargs): - self._cur_ax.set_title(*args, **kwargs) - - def xticks(self, ticks): - for ax in self._ax: - ax.set_xticks(ticks) - - def close(self): - plt.close(self._f) - - def xscale(self, scale): - for ax in self._ax: - ax.set_xscale(scale) - - -class Bode(Figure): - def __init__(self, *args, **kwargs): - super().__init__(naxes=2, *args, **kwargs) - - def add(self, freq, phasor, qtyname='G', **kwargs): - L = 20*np.log10(np.abs(phasor)) - phase = np.angle(phasor)*180/np.pi - self.semilogx(freq, L, axno=0, **kwargs) - self.semilogx(freq, phase, axno=1, **kwargs) - self.ylabel('$L$ [%s] [dB]' % qtyname, axno=0) - self.ylabel(fr'$\angle$ {qtyname} [$^\circ$]', axno=1) - self.xlabel('Frequency [Hz]', axno=1) - - -class PS(Figure): - def __init__(self, ref, *args, **kwargs): - super().__init__(naxes=1, *args, **kwargs) - self.ref = ref - - def add(self, fs, freq, ps, qtyname='C', **kwargs): - - overall = np.sum(ps) - print(overall) - overall_db = 10*np.log10(overall/self.ref**2) - L = 10*np.log10(np.abs(ps)/self.ref**2) - - self.semilogx(freq, L, **kwargs) - # self.plot(freq,L,**kwargs) - self.ylabel('Level [dB re 20$\\mu$Pa]') - self.xlabel('Frequency [Hz]') - self.legend('%s. Overall SPL = %0.1f dB SPL' % (qtyname, overall_db)) - - -class PSD(PS): - def __init__(self, ref, *args, **kwargs): - """ - Initialize a PSD plot - - Args: - ref: Reference value for level in dB's - - """ - super().__init__(ref, *args, **kwargs) - - def add(self, fs, freq, ps, qtyname='C', **kwargs): - df = freq[1]-freq[0] - nfft = fs/df - df = fs/nfft - psd = ps / df - - overall = np.sum(np.abs(ps), axis=0) - overall_db = 10*np.log10(overall/self.ref**2) - L = 10*np.log10(abs(psd)/self.ref**2) - - self.semilogx(freq, L, **kwargs) - self.ylabel('$L$ [%s] [dB re %0.0e]' % (qtyname, self.ref)) - self.xlabel('Frequency [Hz]') - self.legend('%s. Overall SPL = %0.1f dB SPL' % (qtyname, overall_db)) diff --git a/lasp/tools/report_tools.py b/lasp/tools/report_tools.py deleted file mode 100644 index e17db31..0000000 --- a/lasp/tools/report_tools.py +++ /dev/null @@ -1,198 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Author: J.A. de Jong - ASCEE - -Description: backend tools for easy postprocessing of measurements -""" -from .plot import Figure -from lasp.wrappers import AvPowerSpectra -from lasp.lasp_measurement import Measurement -from lasp.lasp_common import (FreqWeighting, TimeWeighting, - getFreq, getTime, Window, P_REF) -from lasp.lasp_weighcal import WeighCal -from lasp.lasp_octavefilter import OctaveFilterBank, ThirdOctaveFilterBank -from lasp.lasp_figuredialog import FigureDialog -from lasp.lasp_figure import Plotable, PlotOptions -from lasp.lasp_slm import SLM -import numpy as np -import sys - - -def close(): - import matplotlib.pyplot as plt - plt.close('all') - - -def PSPlot(fn_list, **kwargs): - """ - Create a power spectral density plot, ASCEE style - - Args: - fn_list: list of measurement filenames to plot PSD for - fw: - fs: - nfft: - xscale: - yscale: - """ - - fw = kwargs.pop('fw', FreqWeighting.A) - nfft = kwargs.pop('nfft', 2048) - xscale = kwargs.pop('xscale', 'log') - yscale = kwargs.pop('yscale', 'PSD') - ylim = kwargs.pop('ylim', (0, 100)) - xlim = kwargs.pop('xlim', (100, 10000)) - f = Figure(**kwargs) - - print(kwargs) - if xscale == 'log': - pltfun = f.semilogx - else: - pltfun = f.plot - - for fn in fn_list: - meas = Measurement(fn) - fs = meas.samplerate - data = meas.praw() - aps = AvPowerSpectra(nfft, 1, 50.) - weighcal = WeighCal(fw, nchannels=1, - fs=fs) - weighted = weighcal.filter_(data) - ps = aps.addTimeData(weighted) - freq = getFreq(fs, nfft) - if yscale == 'PSD': - df = fs/nfft - type_str = '/$\\sqrt{\\mathrm{Hz}}$' - elif yscale == 'PS': - df = 1. - type_str = '' - else: - raise ValueError("'type' should be either 'PS' or 'PSD'") - - psd_log = 10*np.log10(ps[:, 0, 0].real/df/2e-5**2) - - pltfun(freq, psd_log) - - f.xlabel('Frequency [Hz]') - f.ylabel(f'Level [dB({fw[0]}) re (20$\\mu$ Pa){type_str}') - f.ylim(ylim) - f.xlim(xlim) - return f - - -def PowerSpectra(fn_list, **kwargs): - nfft = kwargs.pop('nfft', 2048) - window = kwargs.pop('window', Window.hann) - fw = kwargs.pop('fw', FreqWeighting.A) - overlap = kwargs.pop('overlap', 50.) - ptas = [] - for fn in fn_list: - meas = Measurement(fn) - fs = meas.samplerate - weighcal = WeighCal(fw, nchannels=1, - fs=fs, calfile=None) - praw = meas.praw() - weighted = weighcal.filter_(praw) - aps = AvPowerSpectra(nfft, 1, overlap, window[0]) - result = aps.addTimeData(weighted)[:, 0, 0].real - pwr = 10*np.log10(result/P_REF**2) - - freq = getFreq(fs, nfft) - ptas.append(Plotable(freq, pwr)) - - pto = PlotOptions.forPower() - pto.ylabel = f'L{fw[0]} [dB({fw[0]})]' - return ptas - - -def Levels(fn_list, **kwargs): - - bank = kwargs.pop('bank', 'third') - fw = kwargs.pop('fw', FreqWeighting.A) - tw = kwargs.pop('tw', TimeWeighting.fast) - xmin_txt = kwargs.pop('xmin', '100') - xmax_txt = kwargs.pop('xmax', '16k') - - levels = [] - leveltype = 'eq' - - for fn in fn_list: - meas = Measurement(fn) - fs = meas.samplerate - weighcal = WeighCal(fw, nchannels=1, - fs=fs, calfile=None) - praw = meas.praw() - weighted = weighcal.filter_(praw) - if bank == 'third': - filt = ThirdOctaveFilterBank(fs) - xmin = filt.nominal_txt_tox(xmin_txt) - xmax = filt.nominal_txt_tox(xmax_txt) - - elif bank == 'overall': - slm = SLM(meas.samplerate, tw) - slm.addData(weighted) - levels.append( - Plotable(' ', slm.Lmax if leveltype == 'max' else slm.Leq, - name=meas.name)) - continue - else: - raise NotImplementedError() - - # Octave bands - # filt = OctaveFilterBank(fs) - filtered_out = filt.filter_(weighted) - level = np.empty((xmax - xmin + 1)) - xlabels = [] - for i, x in enumerate(range(xmin, xmax+1)): - nom = filt.nominal_txt(x) - xlabels.append(nom) - filt_x = filtered_out[nom]['data'] - slm = SLM(filt.fs, tw) - slm.addData(filt_x) - leveli = slm.Lmax if leveltype == 'max' else slm.Leq - level[i] = leveli - levels.append(Plotable(xlabels, level, name=meas.name)) - return levels - - -def LevelDifference(levels): - assert len(levels) == 2 - return Plotable(name='Difference', x=levels[0].x, - y=levels[1].y-levels[0].y) - - -def LevelFigure(levels, show=True, **kwargs): - figtype = kwargs.pop('figtype', 'bar') - from PySide.QtGui import QApplication, QFont - app = QApplication.instance() - if not app: - app = QApplication(sys.argv) - app.setFont(QFont('Linux Libertine')) - size = kwargs.pop('size', (1200, 600)) - if figtype == 'bar': - opts = PlotOptions.forLevelBars() - elif figtype == 'line': - opts = PlotOptions.forPower() - else: - raise RuntimeError('figtype should be either line or bar') - opts.ylim = kwargs.pop('ylim', (0, 100)) - opts.ylabel = kwargs.pop('ylabel', 'LAeq [dB(A)]') - opts.xlabel = kwargs.pop('xlabel', None) - opts.legend = kwargs.pop('legend', [level.name for level in levels]) - opts.legendpos = kwargs.pop('legendpos', None) - opts.title = kwargs.pop('title', None) - - def Plotter(ptas, pto): - fig = FigureDialog(None, None, pto, figtype) - for pta in ptas: - fig.fig.add(pta) - return fig - - fig = Plotter(levels, opts) - if show: - fig.show() - fig.resize(*size) - if show: - app.exec_() - return fig diff --git a/lasp/tools/tools.py b/lasp/tools/tools.py index 08ff96d..742f7b0 100644 --- a/lasp/tools/tools.py +++ b/lasp/tools/tools.py @@ -1,24 +1,58 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Created on Thu May 6 14:49:03 2021 +Author: C. Jansen, J.A. de Jong - ASCEE V.O.F. -@author: Casper - -Smooth data in the frequency domain +Smooth data in the frequency domain. TODO: This function is rather slow as it +used Python for loops. The implementations should be speed up in the near +future. """ +from enum import Enum, unique + +__all__ = ['SmoothingType', 'smoothSpectralData', 'SmoothingWidth'] + + +@unique +class SmoothingWidth(Enum): + none = (0, 'No smoothing') + # three = (3, '1/3th octave smoothing') + six = (6, '1/6th octave smoothing') + twelve = (12, '1/12th octave smoothing') + twfo = (24, '1/24th octave smoothing') + ftei = (48, '1/48th octave smoothing') + + @staticmethod + def fillComboBox(cb): + """ + Fill Windows to a combobox + + Args: + cb: QComboBox to fill + """ + cb.clear() + for w in list(SmoothingWidth): + cb.addItem(w.value[1], w) + cb.setCurrentIndex(0) + + @staticmethod + def getCurrent(cb): + return list(SmoothingWidth)[cb.currentIndex()] + +class SmoothingType: + levels = 'l', 'Levels' + # tf = 'tf', 'Transfer function', + ps = 'ps', '(Auto) powers' + # TO DO: check if everything is correct # TO DO: add possibility to insert data that is not lin spaced in frequency -import matplotlib.pyplot as plt import numpy as np from scipy.signal.windows import gaussian - -# %% Smoothing function -def oct_smooth(f, M, Noct, dB=False): +def smoothSpectralData(freq, M, sw: SmoothingWidth, + st: SmoothingType = SmoothingType.levels): """ Apply fractional octave smoothing to magnitude data in frequency domain. Smoothing is performed to power, using a sliding Gaussian window with @@ -30,49 +64,58 @@ def oct_smooth(f, M, Noct, dB=False): side. The deviation is largest when Noct is small (e.g. coarse smoothing). Casper Jansen, 07-05-2021 - Parameters - ---------- - f : float - frequencies of data points [Hz] - equally spaced - M : float - magnitude of data points [- or dB, specify in paramater 'dB'] - Noct : int - smoothing strength: Noct=12 means 1/12 octave smoothing - dB : Bool - True if [M]=dB, False if [M]=absolute + Args: + freq: array of frequencies of data points [Hz] - equally spaced + M: array of either power, transfer functin or dB points. Depending on + the smoothing type `st`, the smoothing is applied. - Returns - ------- - f : float - frequencies of data points [Hz] - Msm : float - smoothed magnitude of data points + Returns: + freq : array frequencies of data points [Hz] + Msm : float smoothed magnitude of data points """ + # TODO: Make this function multi-dimensional array aware. # Settings tr = 2 # truncate window after 2x std # Safety - assert Noct > 0, '\'Noct\' must be absolute positive' + Noct = sw.value[0] + assert Noct > 0, "'Noct' must be absolute positive" if Noct < 1: raise Warning('Check if \'Noct\' is entered correctly') - assert len(f)==len(M), 'f and M should have equal length' - if not dB: assert np.min(M) >= 0, 'absolute magnitude M cannot be negative' + assert len(freq)==len(M), 'f and M should have equal length' + + if st == SmoothingType.ps: + assert np.min(M) >= 0, 'absolute magnitude M cannot be negative' + if st == SmoothingType.levels and isinstance(M.dtype, complex): + raise RuntimeError('Decibel input should be real-valued') # Initialize - L = len(M) # number of data points - P = 10**(M/10) if dB else M**2 # convert magnitude --> power - Psm = np.zeros(L) # smoothed power - to be calculated - x0 = 1 if f[0]==0 else 0 # skip first data point if zero frequency - df = f[1] - f[0] # frequency step + L = M.shape[0] # number of data points + + P = M + if st == SmoothingType.levels: + P = 10**(P/10) + # TODO: This does not work due to complex numbers. Should be split up in + # magnitude and phase. + # elif st == SmoothingType.tf: + # P = P**2 + + Psm = np.zeros_like(P) # smoothed power - to be calculated + x0 = 1 if freq[0]==0 else 0 # skip first data point if zero frequency + df = freq[1] - freq[0] # frequency step # Loop through data points for x in range(x0, L): # Find indices of data points to calculate current (smoothed) magnitude - fc = f[x] # center freq. of smoothing window + fc = freq[x] # center freq. of smoothing window Df = tr * fc / Noct # freq. range of smoothing window - xl = int(np.ceil(x - 0.5*Df/df)) # desired lower index of frequency array to be used during smoothing - xu = int(np.floor(x + 0.5*Df/df)) + 1 # upper index + 1 (because half-open interval) + + # desired lower index of frequency array to be used during smoothing + xl = int(np.ceil(x - 0.5*Df/df)) + + # upper index + 1 (because half-open interval) + xu = int(np.floor(x + 0.5*Df/df)) + 1 # Create window Np = xu - xl # number of points @@ -110,13 +153,15 @@ def oct_smooth(f, M, Noct, dB=False): wind_int = np.sum(wind) # integral Psm[x] = np.dot(wind, P[xl:xu]) / wind_int # apply window - Msm = 10*np.log10(Psm) if dB else Psm**0.5 # convert power --> magnitude + if st == SmoothingType.levels: + Psm = 10*np.log10(Psm) - return Msm + return Psm # %% Test if __name__ == "__main__": + import matplotlib.pyplot as plt # Initialize Noct = 6 # 1/6 oct. smoothing diff --git a/lasp/wrappers.pyx b/lasp/wrappers.pyx index e9e6fa9..8d7d350 100644 --- a/lasp/wrappers.pyx +++ b/lasp/wrappers.pyx @@ -75,7 +75,6 @@ __all__ = ['AvPowerSpectra', 'SosFilterBank', 'FilterBank', 'Siggen', 'sweep_flag_forward', 'sweep_flag_backward', 'sweep_flag_linear', 'sweep_flag_exponential', 'load_fft_wisdom', 'store_fft_wisdom'] - setTracerLevel(15) @@ -635,6 +634,7 @@ cdef extern from "lasp_siggen.h": c_Siggen* Siggen_Sweep_create(d fs, d fl, d fu, d Ts,d Tq, us sweep_flags, d level_dB) + void Siggen_setLevel(c_Siggen*,d new_level_dB) us Siggen_getN(const c_Siggen*) void Siggen_genSignal(c_Siggen*, vd* samples) nogil void Siggen_free(c_Siggen*) @@ -659,6 +659,9 @@ cdef class Siggen: if self._siggen: Siggen_free(self._siggen) + def setLevel(self,d level_dB): + Siggen_setLevel(self._siggen, level_dB) + def genSignal(self, us nsamples): output = np.empty(nsamples, dtype=np.float) assert self._siggen != NULL diff --git a/scripts/lasp_record b/scripts/lasp_record index 0d42279..9d2edef 100755 --- a/scripts/lasp_record +++ b/scripts/lasp_record @@ -1,10 +1,7 @@ -#!/usr/bin/python3 -import argparse -import sys - - +#!/usr/bin/python3.8 +import sys, logging, os, argparse parser = argparse.ArgumentParser( - description='Acquire data and store a measurement file' + description='Acquire data and store to a measurement file.' ) parser.add_argument('filename', type=str, help='File name to record to.' @@ -15,63 +12,57 @@ parser.add_argument('--duration', '-d', type=float, device_help = 'DAQ Device to record from' parser.add_argument('--input-daq', '-i', help=device_help, type=str, default='Default') +parser.add_argument('--log', '-l', + help='Specify log level [info, debug, warning, ...]', + type=str, default='info') + args = parser.parse_args() +numeric_level = getattr(logging, args.log.upper(), None) +if not isinstance(numeric_level, int): + raise ValueError('Invalid log level: %s' % args.loglevel) +FORMAT = "[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s" +logging.basicConfig(format=FORMAT, level=numeric_level) -from lasp.lasp_avstream import AvStream, AvType -from lasp.lasp_record import Recording -from lasp.device import DaqConfiguration, Daq, DaqChannel +import multiprocessing +from lasp.device import DaqConfigurations +from lasp import AvType, StreamManager, Recording# configureLogging -configs = DaqConfiguration.loadConfigs() +def main(args): + try: + streammgr = StreamManager() + configs = DaqConfigurations.loadAllConfigs() -for i, (key, val) in enumerate(configs.items()): - print(f'{i:2} : {key}') + config_keys = [key for key in configs.keys()] + for i, key in enumerate(config_keys): + print(f'{i:2} : {key}') -daqindex = input('Please enter required config: ') -try: - daqindex = int(daqindex) -except: - sys.exit(0) + choosen_index = input('Number of configuration to use: ') + try: + daqindex = int(choosen_index) + except: + print('Invalid configuration number. Exiting.') + sys.exit(0) -for i, (key, val) in enumerate(configs.items()): - if i == daqindex: - config = configs[key] + choosen_key = config_keys[daqindex] + config = configs[choosen_key].input_config + print(f'Choosen configuration: {choosen_key}') -config = configs[key] + streammgr.startStream(AvType.audio_input, config, wait=True) + rec = Recording(args.filename, streammgr, args.duration) + streammgr.stopStream(AvType.audio_output) + finally: + try: + streammgr.cleanup() + del stream + except NameError: + pass +if __name__ == '__main__': -print(config) -# daq = RtAudio() -devices = Daq.getDeviceInfo() + multiprocessing.set_start_method('forkserver', force=True) -input_devices = {} -for device in devices: - if device.inputchannels >= 0: - input_devices[device.name] = device - -try: - input_device = input_devices[config.input_device_name] -except KeyError: - raise RuntimeError(f'Input device {config.input_device_name} not available') - -print(input_device) - -stream = AvStream(input_device, - AvType.audio_input, - config) - - -rec = Recording(args.filename, stream, args.duration) -stream.start() -with rec: - pass - -print('Stopping stream...') -stream.stop() - -print('Stream stopped') -print('Closing stream...') -print('Stream closed') + main(args) diff --git a/scripts/lasp_siggen b/scripts/lasp_siggen deleted file mode 100755 index c3b1524..0000000 --- a/scripts/lasp_siggen +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/python3 -import argparse -import numpy as np - - -parser = argparse.ArgumentParser( - description='Play a sine wave' -) -device_help = 'DAQ Device to play to' -parser.add_argument('--device', '-d', help=device_help, type=str, - default='Default') - -args = parser.parse_args() - -from lasp.lasp_avstream import AvStream, AvType -from lasp.device import DAQConfiguration, RtAudio - -config = DAQConfiguration.loadConfigs()[args.device] - -rtaudio = RtAudio() -devices = rtaudio.getDeviceInfo() - -output_devices = {} -for device in devices: - if device.outputchannels >= 0: - output_devices[device.name] = device - -try: - output_device = output_devices[config.output_device_name] -except KeyError: - raise RuntimeError(f'output device {config.output_device_name} not available') - -samplerate = int(config.en_output_rate) -stream = AvStream(output_device, - AvType.audio_output, - config) - -# freq = 440. -freq = 1000. -omg = 2*np.pi*freq - - -def mycallback(indata, outdata, blockctr): - frames = outdata.shape[0] - nchannels = outdata.shape[1] - # nchannels = 1 - streamtime = blockctr*frames/samplerate - t = np.linspace(streamtime, streamtime + frames/samplerate, - frames)[np.newaxis, :] - outp = 0.01*np.sin(omg*t) - for i in range(nchannels): - outdata[:,i] = ((2**16-1)*outp).astype(np.int16) - -stream.addCallback(mycallback, AvType.audio_output) -stream.start() - -input() - -print('Stopping stream...') -stream.stop() - -print('Stream stopped') -print('Closing stream...') -stream.close() -print('Stream closed') diff --git a/scripts/play_sine b/scripts/play_sine new file mode 100755 index 0000000..ffda324 --- /dev/null +++ b/scripts/play_sine @@ -0,0 +1,78 @@ +#!/usr/bin/python3 +import numpy as np +import sys, logging, os, argparse + +parser = argparse.ArgumentParser( + description='Play a sine wave' +) + +parser.add_argument('--freq', '-f', help='Sine frequency [Hz]', type=float, + default=1000.) + +parser.add_argument('--log', '-l', + help='Specify log level [info, debug, warning, ...]', + type=str, default='info') + +args = parser.parse_args() + +numeric_level = getattr(logging, args.log.upper(), None) +if not isinstance(numeric_level, int): + raise ValueError('Invalid log level: %s' % args.loglevel) + +FORMAT = "[%(levelname)s %(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s" +logging.basicConfig(format=FORMAT, level=numeric_level) + +import multiprocessing +from lasp import (StreamManager, AvType, Siggen, SignalType, SiggenData) +from lasp.device import DaqConfigurations + + +if __name__ == '__main__': + multiprocessing.set_start_method('forkserver', force=True) + logging.info(f'Playing frequency {args.freq} [Hz]') + + configs = DaqConfigurations.loadAllConfigs() + + config_keys = [key for key in configs.keys()] + for i, key in enumerate(config_keys): + print(f'{i:2} : {key}') + + choosen_index = input('Number of configuration to use: ') + try: + daqindex = int(choosen_index) + except: + print('Invalid configuration number. Exiting.') + sys.exit(0) + + + choosen_key = config_keys[daqindex] + daqconfig = configs[choosen_key].output_config + + print(f'Choosen configuration: {choosen_key}') + + try: + streammgr = StreamManager() + outq = streammgr.getOutputQueue() + + siggendata = SiggenData( + fs=48e3, + nframes_per_block=1024, + dtype=np.dtype(np.int16), + eqdata=None, + level_dB=-20, + signaltype=SignalType.Periodic, + signaltypedata=(args.freq,) + ) + siggen = Siggen(outq, siggendata) + + streammgr.activateSiggen() + + streammgr.startStream(AvType.audio_output, daqconfig) + + input('Press any key to stop...') + streammgr.stopStream(AvType.audio_output) + finally: + siggen.cleanup() + streammgr.cleanup() + + diff --git a/setup.py b/setup.py index 322e3f6..902c565 100644 --- a/setup.py +++ b/setup.py @@ -1,24 +1,10 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3.8 # -*- coding: utf-8 -*- """ @author: J.A. de Jong - ASCEE """ from setuptools import setup, find_packages - -# class CMakeExtension(Extension): -# """ -# An extension to run the cmake build -# -# This simply overrides the base extension class so that setuptools -# doesn't try to build your sources for you -# """ -# -# def __init__(self, name, sources=[]): -# -# super().__init__(name=name, sources=sources) - - setup( name="LASP", version="1.0", @@ -27,16 +13,16 @@ setup( long_description_content_type="text/markdown", # ext_modules=[CMakeExtension('lasp/wrappers.so'), # ], - package_data={'lasp': ['wrappers.so']}, + #package_data={'lasp': ['wrappers.so']}, author='J.A. de Jong - ASCEE', author_email="j.a.dejong@ascee.nl", install_requires=['matplotlib>=1.0', - 'scipy>=1.0', 'numpy>=1.0', 'h5py', - 'dataclasses_json', + 'scipy>=1.0', 'numpy>=1.0', 'h5py==3.2.0', + 'dataclasses_json', 'cython', ], license='MIT', description="Library for Acoustic Signal Processing", keywords="", - url="https://www.ascee.nl/lasp/", # project home page, if any + url="https://www.ascee.nl/lasp/", # project home page ) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 15442c7..f077ccc 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -11,5 +11,7 @@ target_link_libraries(test_fft lasp_lib) target_link_libraries(test_workers lasp_lib) target_link_libraries(test_math lasp_lib) -add_executable(test_uldaq test_uldaq.cpp) -target_link_libraries(test_uldaq cpp_daq) +if(LASP_ULDAQ) + add_executable(test_uldaq test_uldaq.cpp) + target_link_libraries(test_uldaq cpp_daq) +endif(LASP_ULDAQ)