diff --git a/lasp/__init__.py b/lasp/__init__.py index bb7dfc5..fdabeb9 100644 --- a/lasp/__init__.py +++ b/lasp/__init__.py @@ -1,10 +1,11 @@ -from .lasp_atomic import * -from .lasp_avstream import * from .lasp_common import * +from .lasp_avstream import * +from .wrappers import * +from .lasp_atomic import * from .lasp_imptube import * from .lasp_measurement import * from .lasp_octavefilter import * from .lasp_slm import * +from .lasp_record import * +from .lasp_siggen import * from .lasp_weighcal import * -from .wrappers import * -from .device import AvType diff --git a/lasp/c/lasp_siggen.c b/lasp/c/lasp_siggen.c index d9610b4..766af76 100644 --- a/lasp/c/lasp_siggen.c +++ b/lasp/c/lasp_siggen.c @@ -332,6 +332,13 @@ us Siggen_getN(const Siggen* siggen) { feTRACE(15); return 0; } +void Siggen_setLevel(Siggen* siggen, const d new_level_dB) { + fsTRACE(15); + + siggen->level_amp = d_pow(10, new_level_dB/20); + + feTRACE(15); +} void Siggen_free(Siggen* siggen) { fsTRACE(15); diff --git a/lasp/c/lasp_siggen.h b/lasp/c/lasp_siggen.h index d5ec290..52e7862 100644 --- a/lasp/c/lasp_siggen.h +++ b/lasp/c/lasp_siggen.h @@ -46,6 +46,14 @@ Siggen* Siggen_Sinewave_create(const d fs,const d freq,const d level_dB); */ Siggen* Siggen_Noise_create(const d fs, const d level_dB, Sosfilterbank* colorfilter); +/** + * Set the level of the signal generator + * @param[in] Siggen* Signal generator handle + * + * @param[in] new_level_dB The new level, in dBFS + */ +void Siggen_setLevel(Siggen*, const d new_level_dB); + /** * Obtain the repetition period for a periodic excitation. diff --git a/lasp/device/__init__.py b/lasp/device/__init__.py index 7bd944a..6fde552 100644 --- a/lasp/device/__init__.py +++ b/lasp/device/__init__.py @@ -1,6 +1,4 @@ -#!/usr/bin/python3 from .lasp_device_common import * -from .lasp_daq import * from .lasp_deviceinfo import * from .lasp_daqconfig import * from .lasp_daq import * diff --git a/lasp/device/lasp_common_decls.pxd b/lasp/device/lasp_common_decls.pxd index 144b418..8152470 100644 --- a/lasp/device/lasp_common_decls.pxd +++ b/lasp/device/lasp_common_decls.pxd @@ -87,6 +87,9 @@ cdef extern from "lasp_cppdaq.h" nogil: unsigned ninchannels unsigned noutchannels + string serialize() + + cppDeviceInfo deserialize(string) bool hasInputIEPE bool hasInputACCouplingSwitch bool hasInputTrigger diff --git a/lasp/device/lasp_cppdaq.h b/lasp/device/lasp_cppdaq.h index 29eaa44..2264dad 100644 --- a/lasp/device/lasp_cppdaq.h +++ b/lasp/device/lasp_cppdaq.h @@ -19,9 +19,12 @@ using std::cerr; using std::cout; using std::endl; +using std::getline; using std::runtime_error; using std::string; using std::vector; +using std::to_string; + typedef unsigned int us; typedef vector boolvec; @@ -68,7 +71,7 @@ class DaqApi { return (apiname == other.apiname && apicode == other.apicode && api_specific_subcode == other.api_specific_subcode); } - + operator string() const { return apiname + ", code: " + to_string(apicode); } static vector getAvailableApis(); }; @@ -78,106 +81,215 @@ const DaqApi uldaqapi("UlDaq", 0); #ifdef HAS_RTAUDIO_API const DaqApi rtaudioAlsaApi("RtAudio Linux ALSA", 1, RtAudio::Api::LINUX_ALSA); const DaqApi rtaudioPulseaudioApi("RtAudio Linux Pulseaudio", 2, - RtAudio::Api::LINUX_PULSE); + RtAudio::Api::LINUX_PULSE); const DaqApi rtaudioWasapiApi("RtAudio Windows Wasapi", 3, - RtAudio::Api::WINDOWS_WASAPI); + RtAudio::Api::WINDOWS_WASAPI); const DaqApi rtaudioDsApi("RtAudio Windows DirectSound", 4, - RtAudio::Api::WINDOWS_DS); + RtAudio::Api::WINDOWS_DS); const DaqApi rtaudioAsioApi("RtAudio Windows ASIO", 5, - RtAudio::Api::WINDOWS_ASIO); + RtAudio::Api::WINDOWS_ASIO); #endif // Structure containing device info parameters class DeviceInfo { -public: - DaqApi api; - string device_name = ""; + public: + DaqApi api; + string device_name = ""; - int api_specific_devindex = -1; + int api_specific_devindex = -1; - vector availableDataTypes; - int prefDataTypeIndex = 0; + vector availableDataTypes; + int prefDataTypeIndex = 0; - vector availableSampleRates; - int prefSampleRateIndex = -1; + vector availableSampleRates; + int prefSampleRateIndex = -1; - vector availableFramesPerBlock; - unsigned prefFramesPerBlockIndex = 0; + vector availableFramesPerBlock; + unsigned prefFramesPerBlockIndex = 0; - dvec availableInputRanges; - int prefInputRangeIndex = 0; + dvec availableInputRanges; + int prefInputRangeIndex = 0; - unsigned ninchannels = 0; - unsigned noutchannels = 0; + unsigned ninchannels = 0; + unsigned noutchannels = 0; - bool hasInputIEPE = false; - bool hasInputACCouplingSwitch = false; - bool hasInputTrigger = false; + bool hasInputIEPE = false; + bool hasInputACCouplingSwitch = false; + bool hasInputTrigger = false; - /* DeviceInfo(): */ - /* datatype(dtype_invalid) { } */ + /* DeviceInfo(): */ + /* datatype(dtype_invalid) { } */ - double prefSampleRate() const { - if (((us)prefSampleRateIndex < availableSampleRates.size()) && - (prefSampleRateIndex >= 0)) { - return availableSampleRates.at(prefSampleRateIndex); - } else { - throw std::runtime_error("No prefered sample rate available"); + double prefSampleRate() const { + if (((us)prefSampleRateIndex < availableSampleRates.size()) && + (prefSampleRateIndex >= 0)) { + return availableSampleRates.at(prefSampleRateIndex); + } else { + throw std::runtime_error("No prefered sample rate available"); + } } - } - operator string() const { - std::stringstream str; - str << api.apiname + " " << api_specific_devindex - << " number of input channels: " << ninchannels - << " number of output channels: " << noutchannels; - return str.str(); - } + operator string() const { + std::stringstream str; + str << api.apiname + " " << api_specific_devindex << endl + << " number of input channels: " << ninchannels << endl + << " number of output channels: " << noutchannels << endl; + return str.str(); + } + + string serialize() const { + // Simple serializer for this object, used because we found a bit late that + // this object needs to be send over the wire. We do not want to make this + // implementation in Python, as these objects are created here, in the C++ + // code. The Python wrapper is just a readonly wrapper. + std::stringstream str; + + str << api.apiname << "\t"; + str << api.apicode << "\t"; + str << api.api_specific_subcode << "\t"; + str << device_name << "\t"; + + str << availableDataTypes.size() << "\t"; + for(const DataType& dtype: availableDataTypes) { + // WARNING: THIS GOES COMPLETELY WRONG WHEN NAMES contain A TAB!!! + str << dtype.name << "\t"; + str << dtype.sw << "\t"; + str << dtype.is_floating << "\t"; + } + str << prefDataTypeIndex << "\t"; + + str << availableSampleRates.size() << "\t"; + for(const double& fs: availableSampleRates) { + // WARNING: THIS GOES COMPLETELY WRONG WHEN NAMES contain A TAB!!! + str << fs << "\t"; + } + str << prefSampleRateIndex << "\t"; + + str << availableFramesPerBlock.size() << "\t"; + for(const us& fb: availableFramesPerBlock) { + // WARNING: THIS GOES COMPLETELY WRONG WHEN NAMES contain A TAB!!! + str << fb << "\t"; + } + str << prefFramesPerBlockIndex << "\t"; + + str << availableInputRanges.size() << "\t"; + for(const double& ir: availableInputRanges) { + // WARNING: THIS GOES COMPLETELY WRONG WHEN NAMES contain A TAB!!! + str << ir << "\t"; + } + str << prefInputRangeIndex << "\t"; + + str << ninchannels << "\t"; + str << noutchannels << "\t"; + str << int(hasInputIEPE) << "\t"; + str << int(hasInputACCouplingSwitch) << "\t"; + str << int(hasInputTrigger) << "\t"; + + return str.str(); + } + + static DeviceInfo deserialize(const string& dstr) { + DeviceInfo devinfo; + + std::stringstream str(dstr); + string tmp; + us N; + // Lambda functions for deserializing + auto nexts = [&]() { getline(str, tmp, '\t'); return tmp; }; + auto nexti = [&]() { getline(str, tmp, '\t'); return std::atoi(tmp.c_str()); }; + auto nextf = [&]() { getline(str, tmp, '\t'); return std::atof(tmp.c_str()); }; + + // Api + string apiname = nexts(); + auto apicode = nexti(); + auto api_specific_subcode = nexti(); + DaqApi api(apiname, apicode, api_specific_subcode); + devinfo.api = api; + + devinfo.device_name = nexts(); + + N = us(nexti()); + for(us i=0;i inchannel_sensitivities; - vector inchannel_names; - vector inchannel_metadata; + vector inchannel_sensitivities; + vector inchannel_names; + vector inchannel_metadata; - vector outchannel_sensitivities; - vector outchannel_names; - vector outchannel_metadata; + vector outchannel_sensitivities; + vector outchannel_names; + vector outchannel_metadata; - us sampleRateIndex = 0; // Index in list of sample rates + us sampleRateIndex = 0; // Index in list of sample rates - us dataTypeIndex = 0; // Required datatype for output, should be - // present in the list + us dataTypeIndex = 0; // Required datatype for output, should be + // present in the list - us framesPerBlockIndex = 0; + us framesPerBlockIndex = 0; - bool monitorOutput = false; + bool monitorOutput = false; - boolvec inputIEPEEnabled; - boolvec inputACCouplingMode; + boolvec inputIEPEEnabled; + boolvec inputACCouplingMode; - usvec inputRangeIndices; + usvec inputRangeIndices; - // Create a default configuration, with all channels disabled on both - // input and output, and default channel names - DaqConfiguration(const DeviceInfo &device); - DaqConfiguration() {} + // Create a default configuration, with all channels disabled on both + // input and output, and default channel names + DaqConfiguration(const DeviceInfo &device); + DaqConfiguration() {} - bool match(const DeviceInfo &devinfo) const; + bool match(const DeviceInfo &devinfo) const; - int getHighestInChannel() const; - int getHighestOutChannel() const; + int getHighestInChannel() const; + int getHighestOutChannel() const; - int getLowestInChannel() const; - int getLowestOutChannel() const; + int getLowestInChannel() const; + int getLowestOutChannel() const; }; class Daq; @@ -185,7 +297,7 @@ class Daq : public DaqConfiguration, public DeviceInfo { mutable std::mutex mutex; -public: + public: static vector getDeviceInfo(); static Daq *createDaq(const DeviceInfo &, const DaqConfiguration &config); @@ -193,7 +305,7 @@ public: Daq(const DeviceInfo &devinfo, const DaqConfiguration &config); virtual void start(SafeQueue *inqueue, - SafeQueue *outqueue) = 0; + SafeQueue *outqueue) = 0; virtual void stop() = 0; diff --git a/lasp/device/lasp_cpprtaudio.cpp b/lasp/device/lasp_cpprtaudio.cpp index d5969ff..0994c24 100644 --- a/lasp/device/lasp_cpprtaudio.cpp +++ b/lasp/device/lasp_cpprtaudio.cpp @@ -175,7 +175,7 @@ class AudioDaq: public Daq { &streamoptions, &myerrorcallback ); - } catch(...) { + } catch(RtAudioError& e) { if(rtaudio) delete rtaudio; if(instreamparams) delete instreamparams; if(outstreamparams) delete outstreamparams; @@ -287,7 +287,6 @@ int mycallback( AudioDaq* daq = (AudioDaq*) userData; DataType dtype = daq->dataType(); - /* us neninchannels = daq->neninchannels(); */ us neninchannels_inc_mon = daq->neninchannels(); us nenoutchannels = daq->nenoutchannels(); @@ -343,6 +342,7 @@ int mycallback( us j=0; // OUR buffer channel counter us i=0; // RtAudio channel counter for(us ch=0;ch<=daq->getHighestOutChannel();ch++) { + /* cerr << "Copying from queue... " << endl; */ if(enoutchannels[ch]) { memcpy( &(outputBuffer[i*bytesperchan]), @@ -351,6 +351,7 @@ int mycallback( j++; } else { + /* cerr << "unused output channel in list" << endl; */ memset( &(outputBuffer[i*bytesperchan]),0,bytesperchan); } @@ -364,7 +365,7 @@ int mycallback( } } else { - cerr << "Stream output buffer underflow, zero-ing buffer... " << endl; + cerr << "RtAudio backend: stream output buffer underflow!" << endl; } diff --git a/lasp/device/lasp_daq.pyx b/lasp/device/lasp_daq.pyx index c05854e..ff9fcc5 100644 --- a/lasp/device/lasp_daq.pyx +++ b/lasp/device/lasp_daq.pyx @@ -1,10 +1,11 @@ cimport cython +from ..lasp_common import AvType from .lasp_deviceinfo cimport DeviceInfo from .lasp_daqconfig cimport DaqConfiguration from cpython.ref cimport PyObject,Py_INCREF, Py_DECREF import numpy as np -from .lasp_device_common import AvType +import logging __all__ = ['Daq'] @@ -36,6 +37,7 @@ cdef getNumpyDataType(DataType& dt): else: raise ValueError('Unknown data type') +DEF QUEUE_BUFFER_TIME = 0.5 ctypedef struct PyStreamData: PyObject* pyCallback @@ -43,6 +45,10 @@ ctypedef struct PyStreamData: # Flag used to pass the stopThread. atomic[bool] stopThread + # Flag to indicate that the signal generator queue has been filled for the + # first time. + atomic[bool] ready + # Number of frames per block unsigned nFramesPerBlock @@ -74,46 +80,56 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: unsigned nBytesPerChan= sd.nBytesPerChan unsigned nFramesPerBlock= sd.nFramesPerBlock - double sleeptime = ( sd.nFramesPerBlock)/(4*sd.samplerate); + double sleeptime = ( sd.nFramesPerBlock)/(8*sd.samplerate); + # Sleep time in microseconds us sleeptime_us = (sleeptime*1e6); + us nblocks_buffer = max(1, (QUEUE_BUFFER_TIME * sd.samplerate / + sd.nFramesPerBlock)) + with gil: npy_format = cnp.NPY_FLOAT64 callback = sd.pyCallback # print(f'Number of input channels: {ninchannels}') # print(f'Number of out channels: {noutchannels}') - fprintf(stderr, 'Sleep time: %d us', sleeptime_us) + # fprintf(stderr, 'Sleep time: %d us\n', sleeptime_us) + for i in range(nblocks_buffer): + outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) + memset(outbuffer, 0, sizeof(double)*nBytesPerChan*noutchannels) + sd.outQueue.enqueue( outbuffer) + sd.ready.store(True) while not sd.stopThread.load(): with gil: - if sd.outQueue and sd.outQueue.size() < 10: - outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) + if sd.outQueue: + while sd.outQueue.size() < nblocks_buffer: + outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) - npy_output = data_to_ndarray( - outbuffer, - nFramesPerBlock, - noutchannels, - sd.npy_format, - False, # Do not transfer ownership - True) # F-contiguous - try: - rval = callback(None, - npy_output, + npy_output = data_to_ndarray( + outbuffer, nFramesPerBlock, - ) + noutchannels, + sd.npy_format, + False, # Do not transfer ownership to the temporary + # Numpy container + True) # F-contiguous + try: + rval = callback(None, + npy_output, + nFramesPerBlock, + ) - except Exception as e: - print('exception in Cython callback for audio output: ', str(e)) - return - - sd.outQueue.enqueue( outbuffer) + except Exception as e: + logging.error('exception in Cython callback for audio output: ', str(e)) + return + sd.outQueue.enqueue( outbuffer) if sd.inQueue and not sd.inQueue.empty(): # Waiting indefinitely on the queue... inbuffer = sd.inQueue.dequeue() if inbuffer == NULL: - printf('Stopping thread...\n') + logging.debug('Stopping thread...\n') return try: @@ -130,7 +146,7 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: ) except Exception as e: - print('exception in cython callback for audio input: ', str(e)) + logging.error('exception in cython callback for audio input: ', str(e)) return CPPsleep_us(sleeptime_us); @@ -142,8 +158,6 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: # Inputbuffer memory is owned by Numpy, so should not be free'ed inbuffer = NULL - fprintf(stderr, 'Exiting python thread...\n') - cdef class Daq: def __cinit__(self, DeviceInfo pydevinfo, DaqConfiguration pydaqconfig): @@ -166,7 +180,6 @@ cdef class Daq: try: self.daq_device = cppDaq.createDaq(devinfo[0], daqconfig[0]) except Exception as e: - print(e) raise self.nFramesPerBlock = self.daq_device.framesPerBlock() self.samplerate = self.daq_device.samplerate() @@ -178,7 +191,7 @@ cdef class Daq: def __dealloc__(self): # fprintf(stderr, "UlDaq.__dealloc__\n") if self.sd is not NULL: - fprintf(stderr, "UlDaq.__dealloc__: stopping stream.\n") + logging.debug("UlDaq.__dealloc__: stopping stream.") self.stop() if self.daq_device is not NULL: @@ -238,6 +251,8 @@ cdef class Daq: self.sd.stopThread.store(False) + self.sd.ready.store(False) + self.sd.inQueue = NULL self.sd.outQueue = NULL @@ -263,13 +278,13 @@ cdef class Daq: with nogil: self.sd.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, self.sd) - - # Allow stream stome time to start - CPPsleep_ms(500) + while not self.sd.ready.load(): + # Allow stream stome time to start + CPPsleep_ms(100) self.daq_device.start( - self.sd.inQueue, - self.sd.outQueue) + self.sd.inQueue, + self.sd.outQueue) return self.daq_device.samplerate() @@ -304,7 +319,7 @@ cdef class Daq: free(sd.outQueue.dequeue()) del sd.outQueue sd.outQueue = NULL - fprintf(stderr, "End cleanup stream queues...\n") + logging.debug("End cleanup stream queues...\n") if sd.pyCallback: Py_DECREF( sd.pyCallback) diff --git a/lasp/device/lasp_daqconfig.pyx b/lasp/device/lasp_daqconfig.pyx index b1d382f..3c9c437 100644 --- a/lasp/device/lasp_daqconfig.pyx +++ b/lasp/device/lasp_daqconfig.pyx @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # -*- coding: utf-8 -*- """! Author: J.A. de Jong - ASCEE @@ -43,9 +42,11 @@ cdef class DaqConfigurations: output_config) @staticmethod - def loadConfigs(): + def loadAllConfigs(): """ - Returns a list of currently available configurations + Returns a dictionary of all configurations presets. The dictionary keys + are the names of the configurations + """ with lasp_shelve() as sh: configs_json = sh.load('daqconfigs', {}) @@ -54,6 +55,16 @@ cdef class DaqConfigurations: configs[name] = DaqConfigurations.from_json(val) return configs + @staticmethod + def loadConfigs(name: str): + """ + Load a configuration preset, containing input config and output config + """ + + with lasp_shelve() as sh: + configs_json = sh.load('daqconfigs', {}) + return DaqConfigurations.from_json(configs_json[name]) + def saveConfigs(self, name): with lasp_shelve() as sh: configs_json = sh.load('daqconfigs', {}) @@ -61,20 +72,25 @@ cdef class DaqConfigurations: sh.store('daqconfigs', configs_json) @staticmethod - def deleteConfig(name): + def deleteConfigs(name): with lasp_shelve() as sh: configs_json = sh.load('daqconfigs', {}) del configs_json[name] sh.store('daqconfigs', configs_json) +def constructDaqConfig(dict_data): + return DaqConfiguration.from_dict(dict_data) cdef class DaqConfiguration: """ Initialize a device descriptor """ - def __init__(self): + def __cinit__(self): pass + def __str__(self): + return str(self.to_json()) + @staticmethod def fromDeviceInfo(DeviceInfo devinfo): cdef: @@ -90,6 +106,9 @@ cdef class DaqConfiguration: config_dict = json.loads(jsonstring) return DaqConfiguration.from_dict(config_dict) + def __reduce__(self): + return (constructDaqConfig, (self.to_dict(),)) + @staticmethod def from_dict(pydict): cdef: @@ -127,8 +146,8 @@ cdef class DaqConfiguration: return pydaqcfg - def to_json(self): - return json.dumps(dict( + def to_dict(self): + return dict( apicode = self.config.api.apicode, device_name = self.config.device_name.decode('utf-8'), @@ -159,8 +178,10 @@ cdef class DaqConfiguration: inputIEPEEnabled = self.config.inputIEPEEnabled, inputACCouplingMode = self.config.inputACCouplingMode, inputRangeIndices = self.config.inputRangeIndices, + ) - )) + def to_json(self): + return json.dumps(self.to_dict()) def getInChannel(self, i:int): return DaqChannel( diff --git a/lasp/device/lasp_device_common.py b/lasp/device/lasp_device_common.py index 5cedd8e..40c9bde 100644 --- a/lasp/device/lasp_device_common.py +++ b/lasp/device/lasp_device_common.py @@ -1,17 +1,10 @@ -__all__ = ['AvType', 'DaqChannel'] +__all__ = ['DaqChannel'] from ..lasp_common import Qty, SIQtys from dataclasses import dataclass, field from dataclasses_json import dataclass_json from typing import List import json -class AvType: - """Specificying the type of data, for adding and removing callbacks from - the stream.""" - audio_input = 1 - audio_output = 2 - video = 4 - @dataclass_json @dataclass class DaqChannel: diff --git a/lasp/device/lasp_deviceinfo.pyx b/lasp/device/lasp_deviceinfo.pyx index 2ead824..95bce55 100644 --- a/lasp/device/lasp_deviceinfo.pyx +++ b/lasp/device/lasp_deviceinfo.pyx @@ -1,5 +1,19 @@ +# -*- coding: utf-8 -*- +"""! +Author: J.A. de Jong - ASCEE + +Description: + +DeviceInfo C++ object wrapper + +""" __all__ = ['DeviceInfo'] +def pickle(dat): + dev = DeviceInfo() + # print('DESERIALIZE****') + dev.devinfo = dev.devinfo.deserialize(dat) + return dev cdef class DeviceInfo: def __cinit__(self): @@ -8,6 +22,11 @@ cdef class DeviceInfo: def __init__(self): pass + def __reduce__(self): + serialized = self.devinfo.serialize() + # print('SERIALIZE****') + return (pickle, (serialized,)) + @property def api(self): return self.devinfo.api.apiname.decode('utf-8') diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index 70bfc87..88299e6 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -1,200 +1,651 @@ -#!/usr/bin/env python3.6 # -*- coding: utf-8 -*- """ -Description: Read data from image stream and record sound at the same time +Author: J.A. de Jong + +Description: Controlling an audio stream in a different process. """ -#import cv2 as cv -from .lasp_atomic import Atomic -from threading import Thread, Lock +import logging +import multiprocessing as mp +# import cv2 as cv +import signal +import time +from dataclasses import dataclass +from enum import Enum, auto, unique +from typing import List + import numpy as np -class DAQConfiguration: - pass +from .device import Daq, DaqChannel, DaqConfiguration, DeviceInfo +from .lasp_atomic import Atomic +from .lasp_common import AvType +from .lasp_multiprocessingpatch import apply_patch -import time -from .device import (Daq, DeviceInfo, - AvType, - ) - -__all__ = ['AvStream'] - -video_x, video_y = 640, 480 +apply_patch() -class AvStream: - """Audio and video data stream, to which callbacks can be added for - processing the data.""" +__all__ = ['StreamManager', 'ignoreSigInt', 'StreamStatus'] - def __init__(self, - avtype: AvType, - device: DeviceInfo, - daqconfig: DAQConfiguration, - video=None): - """Open a stream for audio in/output and video input. For audio output, - by default all available channels are opened for outputting data. - Args: - device: DeviceInfo for the audio device - avtype: Type of stream. Input, output or duplex +def ignoreSigInt(): + """ + Ignore sigint signal. Should be set on all processes to let the main + process control this signal. + """ + signal.signal(signal.SIGINT, signal.SIG_IGN) - daqconfig: DAQConfiguration instance. If duplex mode flag is set, - please make sure that output_device is None, as in that case the - output config will be taken from the input device. - video: + +@dataclass +class StreamMetaData: + # Sample rate [Hz] + fs: float + + # Input channels + in_ch: List[DaqChannel] + + # Output channels + out_ch: List[DaqChannel] + + # blocksize + blocksize: int + + # The data type of input and output blocks. + dtype: np.dtype + + +@unique +class StreamMsg(Enum): + """ + First part, control messages that can be send to the stream + """ + + startStream = auto() + stopStream = auto() + stopAllStreams = auto() + getStreamMetaData = auto() + endProcess = auto() + scanDaqDevices = auto() + + activateSiggen = auto() + deactivateSiggen = auto() + """ + Second part, status messages that are send back on all listeners + """ + # "Normal messages" + deviceList = auto() + streamStarted = auto() + streamStopped = auto() + streamMetaData = auto() + streamData = auto() + + # Error messages + # Some error occured, which mostly leads to a stop of the stream + streamError = auto() + # An error occured, but we recovered + streamTemporaryError = auto() + # A fatal error occured. This leads to serious errors in the application + streamFatalError = auto() + + +class AudioStream: + """ + Audio stream. + """ + + def __init__( + self, + avtype: AvType, + devices: list, + daqconfig: DaqConfiguration, + processCallback: callable, + ): + """ + Initializes the audio stream and tries to start it. + + avtype: AvType + devices: List of device information + daqconfig: DaqConfiguration to used to generate audio stream backend + processCallback: callback function that will be called from a different + thread, with arguments (AudioStream, in """ + self.running = Atomic(False) + self.aframectr = Atomic(0) self.avtype = avtype + self.siggen_activated = Atomic(False) + api_devices = devices[daqconfig.api] + self.processCallback = processCallback + + matching_devices = [ + device for device in api_devices if device.name == daqconfig.device_name + ] + + if len(matching_devices) == 0: + raise RuntimeError("Could not find device {daqconfig.device_name}") + + # TODO: We pick te first one, what to do if we have multiple matches? + # Is that even possible? + device = matching_devices[0] + + self.daq = Daq(device, daqconfig) en_in_ch = daqconfig.getEnabledInChannels(include_monitor=True) en_out_ch = daqconfig.getEnabledOutChannels() - self.input_channel_names = [ch.channel_name for ch in en_in_ch] - self.output_channel_names = [ch.channel_name for ch in en_out_ch] + samplerate = self.daq.start(self.streamCallback) + self.streammetadata = StreamMetaData( + fs=samplerate, + in_ch=daqconfig.getEnabledInChannels(), + out_ch=daqconfig.getEnabledOutChannels(), + blocksize=self.daq.nFramesPerBlock, + dtype=self.daq.getNumpyDataType(), + ) + self.running <<= True - self.input_sensitivity = [ch.sensitivity for ch in en_in_ch] - self.input_sensitivity = np.asarray(self.input_sensitivity) - self.input_qtys = [ch.qty for ch in en_in_ch] + def streamCallback(self, indata, outdata, nframes): + """ + This is called (from a separate thread) for each block + of audio data. + """ + if not self.running(): + return 1 + self.aframectr += 1 - # Counters for the number of frames that have been coming in - self._aframectr = Atomic(0) - self._vframectr = Atomic(0) - - # Lock - self._callbacklock = Lock() - - self._running = Atomic(False) - - self._video = video - self._video_started = Atomic(False) - - # Storage for callbacks, specified by type - self._callbacks = { - AvType.audio_input: [], - AvType.audio_output: [], - AvType.video: [] - } - - # Possible, but long not tested: store video - self._videothread = None - - # self._audiobackend = RtAudio(daqconfig.api) - self._daq = Daq(device, daqconfig) - self.blocksize = self._daq.nFramesPerBlock - self.samplerate = self._daq.samplerate - self.dtype = self._daq.getNumpyDataType() - - def nCallbacks(self): - """Returns the current number of installed callbacks.""" - return len(self._callbacks[AvType.audio_input]) + \ - len(self._callbacks[AvType.audio_output]) + \ - len(self._callbacks[AvType.video]) - - def addCallback(self, cb: callable, cbtype: AvType): - """Add as stream callback to the list of callbacks.""" - with self._callbacklock: - outputcallbacks = self._callbacks[AvType.audio_output] - if cbtype == AvType.audio_output and len(outputcallbacks) > 0: - raise RuntimeError( - 'Only one audio output callback can be allowed') - - if cb not in self._callbacks[cbtype]: - self._callbacks[cbtype].append(cb) - - def removeCallback(self, cb, cbtype: AvType): - with self._callbacklock: - if cb in self._callbacks[cbtype]: - self._callbacks[cbtype].remove(cb) - - def start(self): - """Start the stream, which means the callbacks are called with stream - data (audio/video)""" - - if self._running: - raise RuntimeError('Stream already started') - - assert self._videothread is None - - self._running <<= True - if self._video is not None: - self._videothread = Thread(target=self._videoThread) - self._videothread.start() - else: - self._video_started <<= True - - self.samplerate = self._daq.start(self._audioCallback) - - def _videoThread(self): - cap = cv.VideoCapture(self._video) - if not cap.isOpened(): - cap.open() - vframectr = 0 - loopctr = 0 - while self._running: - ret, frame = cap.read() - # print(frame.shape) - if ret is True: - if vframectr == 0: - self._video_started <<= True - with self._callbacklock: - for cb in self._callbacks[AvType.video]: - cb(frame, vframectr) - vframectr += 1 - self._vframectr += 1 - else: - loopctr += 1 - if loopctr == 10: - print('Error: no video capture!') - time.sleep(0.2) - - cap.release() - print('stopped videothread') - - def _audioCallback(self, indata, outdata, nframes): - """This is called (from a separate thread) for each audio block.""" - self._aframectr += nframes - with self._callbacklock: - # Count the number of output callbacks. If no output callbacks are - # present, and there should be output callbacks, we explicitly set - # the output buffer to zero - noutput_cb = len(self._callbacks[AvType.audio_output]) - - # Loop over callbacks - if outdata is not None: - try: - if len(self._callbacks[AvType.audio_output]) == 0: - outdata[:, :] = 0 - for cb in self._callbacks[AvType.audio_output]: - cb(indata, outdata, self._aframectr()) - except Exception as e: - print(e) - return 2 - if indata is not None: - try: - for cb in self._callbacks[AvType.audio_input]: - cb(indata, outdata, self._aframectr()) - except Exception as e: - print(e) - return 1 - - return 0 if self._running else 1 + rv = self.processCallback(self, indata, outdata) + if rv != 0: + self.running <<= False + return rv def stop(self): - self._running <<= False + """ + Stop the DAQ stream. Should be called only once. + """ + daq = self.daq + self.daq = None + self.running <<= False + daq.stop() - if self._video: - self._videothread.join() - self._videothread = None + self.streammetadata = None - self._aframectr <<= 0 - self._vframectr <<= 0 - self._video_started <<= False - self._daq.stop() - self._daq = None +class AvStreamProcess(mp.Process): + """ + Different process on which all audio streams are running. + """ - def isRunning(self): - return self._running() + def __init__(self, pipe, msg_qlist, indata_qlist, outq): + """ + + Args: + pipe: Message control pipe on which commands are received. + msg_qlist: List of queues on which stream status and events are + sent. Here, everything is send, except for the captured data + itself. + indata_qlist: List of queues on which captured data from a DAQ is + send. This one gets all events, but also captured data. + outq: On this queue, the stream process receives data to be send as + output to the devices. + + """ + self.pipe = pipe + self.msg_qlist = msg_qlist + self.indata_qlist = indata_qlist + self.outq = outq + + self.devices = {} + self.daqconfigs = None + + # In, out, duplex + self.streams = {t: None for t in list(AvType)} + + super().__init__() + + def run(self): + """ + The actual function running in a different process. + """ + # First things first, ignore interrupt signals + # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # Check for devices + self.rescanDaqDevices() + self.siggen_activated = Atomic(False) + + while True: + msg, data = self.pipe.recv() + logging.debug(f"Streamprocess obtained message {msg}") + + if msg == StreamMsg.activateSiggen: + self.siggen_activated <<= True + + elif msg == StreamMsg.deactivateSiggen: + self.siggen_activated <<= False + + elif msg == StreamMsg.scanDaqDevices: + self.rescanDaqDevices() + + elif msg == StreamMsg.stopAllStreams: + self.stopAllStreams() + + elif msg == StreamMsg.endProcess: + self.stopAllStreams() + # and.. exit! + return + + elif msg == StreamMsg.getStreamMetaData: + (avtype,) = data + stream = self.streams[avtype] + if stream is not None: + self.sendAllQueues( + StreamMsg.streamMetaData, avtype, stream.streammetadata + ) + else: + self.sendAllQueues( + StreamMsg.streamMetaData, avtype, None) + + elif msg == StreamMsg.startStream: + avtype, daqconfig = data + self.startStream(avtype, daqconfig) + + elif msg == StreamMsg.stopStream: + (avtype,) = data + self.stopStream(avtype) + + def startStream(self, avtype: AvType, daqconfig: DaqConfiguration): + """ + Start a stream, based on type and configuration + + """ + self.stopRequiredExistingStreams(avtype) + # Empty the queue from existing stuff (puts the signal generator + # directly in action!). + if avtype in (AvType.audio_duplex, AvType.audio_output): + while not self.outq.empty(): + self.outq.get() + try: + stream = AudioStream(avtype, self.devices, + daqconfig, self.streamCallback) + self.streams[avtype] = stream + self.sendAllQueues( + StreamMsg.streamStarted, avtype, stream.streammetadata + ) + + except Exception as e: + self.sendAllQueues( + StreamMsg.streamError, avtype, "Error starting stream {str(e)}" + ) + return + + + def stopStream(self, avtype: AvType): + """ + Stop an existing stream, and sets the attribute in the list of streams + to None + + Args: + stream: AudioStream instance + """ + stream = self.streams[avtype] + if stream is not None: + try: + stream.stop() + self.sendAllQueues( + StreamMsg.streamStopped, stream.avtype) + except Exception as e: + self.sendAllQueues( + StreamMsg.streamError, + stream.avtype, + "Error occured in stopping stream: {str(e)}", + ) + self.streams[avtype] = None + + def stopRequiredExistingStreams(self, avtype: AvType): + """ + Stop all existing streams that conflict with the current avtype + """ + if avtype == AvType.audio_input: + # For a new input, duplex and input needs to be stopped + stream_to_stop = (AvType.audio_input, AvType.audio_duplex) + elif avtype == AvType.audio_output: + # For a new output, duplex and output needs to be stopped + stream_to_stop = (AvType.audio_output, AvType.audio_duplex) + elif avtype == AvType.audio_duplex: + # All others have to stop + stream_to_stop = list(AvType) # All of them + else: + raise ValueError("BUG") + + for stream in stream_to_stop: + if stream is not None: + self.stopStream(stream) + + def stopAllStreams(self): + """ + Stops all streams + """ + for key in self.streams.keys(): + self.stopStream(key) + + def isStreamRunning(self, avtype: AvType = None): + """ + Check whether a stream is running + + Args: + avtype: The stream type to check whether it is still running. If + None, it checks all streams. + + Returns: + True if a stream is running, otherwise false + """ + if avtype is None: + avtype = list(AvType) + else: + avtype = (avtype,) + for t in avtype: + if self.streams[t] is not None and self.streams[t].running(): + return True + + return False + + def rescanDaqDevices(self): + """ + Rescan the available DaQ devices. + + """ + if self.isStreamRunning(): + self.sendAllQueues( + StreamMsg.streamError, + None, + "A stream is running, cannot rescan DAQ devices.", + ) + return + + self.devices = Daq.getDeviceInfo() + self.sendAllQueues(StreamMsg.deviceList, self.devices) + + def streamCallback(self, audiostream, indata, outdata): + """This is called (from a separate thread) for each audio block.""" + # logging.debug('streamCallback()') + if outdata is not None: + if self.siggen_activated(): + if not self.outq.empty(): + newdata = self.outq.get() + if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1: + msgtxt = "Invalid output data obtained from queue" + logging.fatal(msgtxt) + self.sendAllQueues( + StreamMsg.streamFatalError, audiostream.avtype, msgtxt + ) + return 1 + outdata[:, :] = newdata[:, None] + else: + msgtxt = "Signal generator buffer underflow. Signal generator cannot keep up with data generation." + # logging.error(msgtxt) + self.sendAllQueues( + StreamMsg.streamTemporaryError, audiostream.avtype, msgtxt + ) + outdata[:, :] = 0 + + # Siggen not activated + else: + logging.debug("siggen not activated") + outdata[:, :] = 0 + + if indata is not None: + self.sendInQueues(StreamMsg.streamData, indata) + + return 0 + + # Wrapper functions that safe some typing, they do not require an + # explanation. + def sendInQueues(self, msg, *data): + for q in self.indata_qlist: + # Fan out the input data to all queues in the queue list + q.put((msg, data)) + + def sendAllQueues(self, msg, *data): + """ + Destined for all queues, including capture data queues + """ + self.sendInQueues(msg, *data) + for q in self.msg_qlist: + # Fan out the input data to all queues in the queue list + q.put((msg, data)) + + +@dataclass +class StreamStatus: + lastStatus: StreamMsg = StreamMsg.streamStopped + errorTxt: str = None + streammetadata: StreamMetaData = None + + +class StreamManager: + """ + Audio and video data stream manager, to which queus can be added + """ + + def __init__(self): + """Open a stream for audio in/output and video input. For audio output, + + """ + + # Initialize streamstatus + self.streamstatus = {t: StreamStatus() for t in list(AvType)} + + self.devices = None + + # Multiprocessing manager, pipe, output queue, input queue, + self.manager = mp.managers.SyncManager() + + # Start this manager and ignore interrupts + # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing + self.manager.start(ignoreSigInt) + + # List of queues for all entities that require 'microphone' or input + # data. We need a local list, to manage listener queues, as the queues + # which are in the manager list get a new object id. The local list is + # used to find the index in the manager queues list upon deletion by + # 'removeListener()' + self.indata_qlist = self.manager.list([]) + self.indata_qlist_local = [] + + self.msg_qlist = self.manager.list([]) + self.msg_qlist_local = [] + + # Queue used for signal generator data + self.outq = self.manager.Queue() + + # Messaging pipe + self.pipe, child_pipe = mp.Pipe(duplex=True) + + # This is the queue on which this class listens for stream process + # messages. + self.our_msgqueue = self.addMsgQueueListener() + + # Create the stream process + self.streamProcess = AvStreamProcess(child_pipe, + self.msg_qlist, + self.indata_qlist, self.outq) + self.streamProcess.start() + + def handleMessages(self): + """ + Handle messages that are still on the pipe. + """ + # logging.debug('StreamManager::handleMessages()') + while not self.our_msgqueue.empty(): + msg, data = self.our_msgqueue.get() + logging.debug(f'StreamManager obtained message {msg}') + if msg == StreamMsg.streamStarted: + avtype, streammetadata = data + # logging.debug(f'{avtype}, {streammetadata}') + self.streamstatus[avtype].lastStatus = msg + self.streamstatus[avtype].errorTxt = None + self.streamstatus[avtype].streammetadata = streammetadata + + elif msg == StreamMsg.streamStopped: + (avtype,) = data + self.streamstatus[avtype].lastStatus = msg + self.streamstatus[avtype].errorTxt = None + self.streamstatus[avtype].streammetadata = None + + elif msg == StreamMsg.streamError: + avtype, errorTxt = data + if avtype is not None: + self.streamstatus[avtype].lastStatus = msg + self.streamstatus[avtype].errorTxt = errorTxt + logging.debug(f'Message: {errorTxt}') + + elif msg == StreamMsg.streamTemporaryError: + avtype, errorTxt = data + if avtype is not None: + logging.debug(f'Message: {errorTxt}') + + elif msg == StreamMsg.streamFatalError: + avtype, errorTxt = data + logging.critical(f"Streamprocess fatal error: {errorTxt}") + self.cleanup() + + elif msg == StreamMsg.streamMetaData: + avtype, metadata = data + self.streamstatus[avtype].streammetadata = metadata + + elif msg == StreamMsg.deviceList: + devices, = data + # logging.debug(devices) + self.devices = devices + + def getDeviceList(self): + self.handleMessages() + return self.devices + + def rescanDaqDevices(self): + """ + Output the message to the stream process to rescan the list of devices + """ + self.sendPipe(StreamMsg.scanDaqDevices, None) + + def getStreamStatus(self, avtype: AvType): + """ + Sends a request for the stream status over the pipe, for given AvType + """ + self.handleMessages() + self.sendPipe(StreamMsg.getStreamMetaData, avtype) + + def getOutputQueue(self): + """ + Returns the output queue object. + + Note, should (of course) only be used by one signal generator at the time! + """ + self.handleMessages() + return self.outq + + def activateSiggen(self): + self.handleMessages() + logging.debug("activateSiggen()") + self.sendPipe(StreamMsg.activateSiggen, None) + + def deactivateSiggen(self): + self.handleMessages() + logging.debug("deactivateSiggen()") + self.sendPipe(StreamMsg.deactivateSiggen, None) + + def addMsgQueueListener(self): + """ + Add a listener queue to the list of message queues, and return the + queue. + + Returns: + listener queue + """ + newqueue = self.manager.Queue() + self.msg_qlist.append(newqueue) + self.msg_qlist_local.append(newqueue) + return newqueue + + def removeMsgQueueListener(self, queue): + """ + Remove an input listener queue from the message queue list. + """ + # Uses a local queue list to find the index, based on the queue + idx = self.msg_qlist_local.index(queue) + del self.msg_qlist_local[idx] + del self.msg_qlist[idx] + + def addInQueueListener(self): + """ + Add a listener queue to the list of queues, and return the queue. + + Returns: + listener queue + """ + newqueue = self.manager.Queue() + self.indata_qlist.append(newqueue) + self.indata_qlist_local.append(newqueue) + return newqueue + + def removeInQueueListener(self, queue): + """ + Remove an input listener queue from the queue list. + """ + # Uses a local queue list to find the index, based on the queue + idx = self.indata_qlist_local.index(queue) + del self.indata_qlist[idx] + del self.indata_qlist_local[idx] + + def startStream(self, avtype: AvType, daqconfig: DaqConfiguration, wait=False): + """ + Start the stream, which means the callbacks are called with stream + data (audio/video) + + Args: + wait: Wait until the stream starts talking before returning from + this function. + + """ + logging.debug("Starting stream...") + self.handleMessages() + self.sendPipe(StreamMsg.startStream, avtype, daqconfig) + if wait: + # Wait for a message to come into the pipe + while True: + if self.pipe.poll(): + self.handleMessages() + if self.streamstatus[avtype].lastStatus != StreamMsg.streamStopped: + break + + def stopStream(self, avtype: AvType): + self.handleMessages() + self.sendPipe(StreamMsg.stopStream, avtype) + + def stopAllStreams(self): + self.sendPipe(StreamMsg.stopAllStreams) + + def cleanup(self): + """ + Stops the stream if it is still running, and after that, it stops the + stream process. + + This method SHOULD always be called before removing a AvStream object. + Otherwise things will wait forever... + + """ + self.sendPipe(StreamMsg.endProcess, None) + logging.debug("Joining stream process...") + self.streamProcess.join() + logging.debug("Joining stream process done") def hasVideo(self): - return True if self._video is not None else False + """ + Stub, TODO: for future + """ + return False + + def sendPipe(self, msg, *data): + """ + Send a message with data over the control pipe + """ + self.pipe.send((msg, data)) diff --git a/lasp/lasp_common.py b/lasp/lasp_common.py index 2125b73..e624830 100644 --- a/lasp/lasp_common.py +++ b/lasp/lasp_common.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import os -import platform +import os, platform import shelve import sys import appdirs @@ -11,6 +10,7 @@ from .wrappers import Window as wWindow from collections import namedtuple from dataclasses import dataclass from dataclasses_json import dataclass_json +from enum import Enum, unique, auto """ Common definitions used throughout the code. @@ -20,6 +20,7 @@ __all__ = [ 'P_REF', 'FreqWeighting', 'TimeWeighting', 'getTime', 'getFreq', 'Qty', 'SIQtys', 'lasp_shelve', 'this_lasp_shelve', 'W_REF', 'U_REF', 'I_REF', 'dBFS_REF', + 'AvType' ] # Reference sound pressure level @@ -42,6 +43,22 @@ U_REF = 5e-8 # 50 nano meter / s # hence this is the reference level as specified below. dBFS_REF = 0.5*2**0.5 # Which level would be -3.01 dBFS +@unique +class AvType(Enum): + """Specificying the type of data, for adding and removing callbacks from + the stream.""" + + # Input stream + audio_input = auto() + + # Output stream + audio_output = auto() + + # Both input as well as output + audio_duplex = auto() + video = 4 + + @dataclass_json @dataclass class Qty: @@ -87,7 +104,7 @@ class SIQtys: @staticmethod def fillComboBox(cb): """ - Fill FreqWeightings to a combobox + Fill to a combobox Args: cb: QComboBox to fill @@ -126,7 +143,7 @@ class CalibrationSettings: @staticmethod def fillComboBox(cb): """ - Fill FreqWeightings to a combobox + Fill Calibration Settings to a combobox Args: cb: QComboBox to fill @@ -283,6 +300,7 @@ class TimeWeighting: infinite = (0, 'Infinite') types_realtime = (ufast, fast, slow, tens, infinite) types_all = (none, uufast, ufast, fast, slow, tens, infinite) + default = fast default_index = 3 default_index_realtime = 1 diff --git a/lasp/lasp_imptube.py b/lasp/lasp_imptube.py index 56d471d..f32ec44 100644 --- a/lasp/lasp_imptube.py +++ b/lasp/lasp_imptube.py @@ -11,6 +11,8 @@ from .lasp_measurement import Measurement from numpy import pi, sqrt, exp import numpy as np from scipy.interpolate import UnivariateSpline +from lrftubes import PrsDuct +from functools import lru_cache class TwoMicImpedanceTube: def __init__(self, mnormal: Measurement, @@ -22,6 +24,8 @@ class TwoMicImpedanceTube: fu: float = None, periodic_method=False, mat= Air(), + D_imptube = 50e-3, + thermoviscous = True, **kwargs): """ @@ -60,6 +64,9 @@ class TwoMicImpedanceTube: kmax = ksmax/s self.fu = kmax*mat.c0/2/pi + self.thermoviscous = thermoviscous + self.D_imptube = D_imptube + self.periodic_method = periodic_method self.channels = [kwargs.pop('chan0', 0), kwargs.pop('chan1', 1)] # Compute calibration correction @@ -82,8 +89,9 @@ class TwoMicImpedanceTube: # Calibration correction factor # self.K = 0*self.freq + 1.0 K = sqrt(C2[:,0,1]*C1[:,0,0]/(C2[:,1,1]*C1[:,1,0])) - self.K = UnivariateSpline(self.freq, K.real)(self.freq) +\ - 1j*UnivariateSpline(self.freq, K.imag)(self.freq) + # self.K = UnivariateSpline(self.freq, K.real)(self.freq) +\ + # 1j*UnivariateSpline(self.freq, K.imag)(self.freq) + self.K = K def cut_to_limits(self, ar): return ar[self.il:self.ul] @@ -94,6 +102,7 @@ class TwoMicImpedanceTube: """ return self.cut_to_limits(self.freq) + @lru_cache def G_AB(self, meas): if meas is self.mnormal: C = self.C1 diff --git a/lasp/lasp_logging.py b/lasp/lasp_logging.py new file mode 100644 index 0000000..0b90e20 --- /dev/null +++ b/lasp/lasp_logging.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +""" +Author: J.A. de Jong + +Description: configure the logging of messages +""" +import logging, sys +# __all__ = ['configureLogging'] + +# global_loglevel = None + +# def configureLogging(level=None): + +# # Oh yeah, one global variable +# global global_loglevel +# if level is None: +# level is global_loglevel +# else: +# global_loglevel = level + +# if level is None: +# raise RuntimeError('Log level has not yet been set application wide') + + diff --git a/lasp/lasp_measurement.py b/lasp/lasp_measurement.py index 1e86a24..5f96d0d 100644 --- a/lasp/lasp_measurement.py +++ b/lasp/lasp_measurement.py @@ -48,8 +48,6 @@ import os, time, wave, logging from .lasp_common import SIQtys, Qty, getFreq from .device import DaqChannel from .wrappers import AvPowerSpectra, Window, PowerSpectra -logger = logging.Logger(__name__) - def getSampWidth(dtype): @@ -242,7 +240,7 @@ class Measurement: except KeyError: # If quantity data is not available, this is an 'old' # measurement file. - logger.debug('Physical quantity data not available in measurement file. Assuming {SIQtys.default}') + logging.debug('Physical quantity data not available in measurement file. Assuming {SIQtys.default}') self._qtys = [SIQtys.default for i in range(self.nchannels)] def setAttribute(self, atrname, value): diff --git a/lasp/lasp_multiprocessingpatch.py b/lasp/lasp_multiprocessingpatch.py new file mode 100644 index 0000000..fcd8e79 --- /dev/null +++ b/lasp/lasp_multiprocessingpatch.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +""" +Author: J.A. de Jong + +Description: MonkeyPatch required to let the Multiprocessing library work properly. +Should be applied prior to running any other multiprocessing code. Comes from +Stackoverflow and is mainly used for managing a list of queues that can be +shared between processes. + +For more information, see: +https://stackoverflow.com/questions/46779860/multiprocessing-managers-and-custom-classes +""" +from multiprocessing import managers +import logging +from functools import wraps +from inspect import signature + +orig_AutoProxy = managers.AutoProxy + +__all__ = ['apply_patch'] + + +@wraps(managers.AutoProxy) +def AutoProxy(*args, incref=True, manager_owned=False, **kwargs): + # Create the autoproxy without the manager_owned flag, then + # update the flag on the generated instance. If the manager_owned flag + # is set, `incref` is disabled, so set it to False here for the same + # result. + autoproxy_incref = False if manager_owned else incref + proxy = orig_AutoProxy(*args, incref=autoproxy_incref, **kwargs) + proxy._owned_by_manager = manager_owned + return proxy + + +def apply_patch(): + if "manager_owned" in signature(managers.AutoProxy).parameters: + return + + logging.debug("Patching multiprocessing.managers.AutoProxy to add manager_owned") + managers.AutoProxy = AutoProxy + + # re-register any types already registered to SyncManager without a custom + # proxy type, as otherwise these would all be using the old unpatched AutoProxy + SyncManager = managers.SyncManager + registry = managers.SyncManager._registry + for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items(): + if proxytype is not orig_AutoProxy: + continue + create_method = hasattr(managers.SyncManager, typeid) + SyncManager.register( + typeid, + callable=callable, + exposed=exposed, + method_to_typeid=method_to_typeid, + create_method=create_method, + ) + diff --git a/lasp/lasp_record.py b/lasp/lasp_record.py index b208f36..3974f95 100644 --- a/lasp/lasp_record.py +++ b/lasp/lasp_record.py @@ -1,58 +1,187 @@ -#!/usr/bin/python3 +#!/usr/bin/python3.8 # -*- coding: utf-8 -*- """ Read data from stream and record sound and video at the same time """ -from .lasp_atomic import Atomic -from threading import Condition -from .lasp_avstream import AvType, AvStream -import h5py -import dataclasses -import os -import time +import dataclasses, logging, os, time, h5py +from .lasp_avstream import StreamManager, StreamMetaData, StreamMsg +from .lasp_common import AvType + @dataclasses.dataclass class RecordStatus: curT: float done: bool + class Recording: - def __init__(self, fn: str, stream: AvStream, - rectime: float=None, wait: bool = True, + def __init__(self, fn: str, streammgr: StreamManager, + rectime: float = None, wait: bool = True, progressCallback=None): """ + Start a recording. Blocks if wait is set to True. Args: - fn: Filename to record to. extension is added + fn: Filename to record to. Extension is automatically added if not + provided. stream: AvStream instance to record from. Should have input channels! - rectime: Recording time, None for infinite + rectime: Recording time [s], None for infinite, in seconds. If set + to None, or np.inf, the recording continues indefintely. + progressCallback: callable that is called with an instance of + RecordStatus instance as argument. """ ext = '.h5' if ext not in fn: fn += ext - self._stream = stream - self.blocksize = stream.blocksize - self.samplerate = stream.samplerate - self._running = Atomic(False) - self._running_cond = Condition() + self.smgr = streammgr + self.metadata = None + self.rectime = rectime self._fn = fn self._video_frame_positions = [] self._curT_rounded_to_seconds = 0 - self._ablockno = Atomic(0) + # Counter of the number of blocks + self._ablockno = 0 self._vframeno = 0 - self._progressCallback = progressCallback + self._progressCallback = progressCallback self._wait = wait self._f = h5py.File(self._fn, 'w') + + # This flag is used to delete the file on finish(), and can be used + # when a recording is canceled. self._deleteFile = False + try: + # Input queue + self.inq = streammgr.addListener() + + except RuntimeError: + # Cleanup stuff, something is going wrong when starting the stream + try: + self._f.close() + except Exception as e: + logging.error( + 'Error preliminary closing measurement file {fn}: {str(e)}') + + self.__deleteFile() + raise + + # Try to obtain stream metadata + streammgr.getStreamStatus(AvType.audio_input) + streammgr.getStreamStatus(AvType.audio_duplex) + + self._ad = None + + logging.debug('Starting record....') + # TODO: Fix this later when we want video + # if stream.hasVideo(): + # stream.addCallback(self._aCallback, AvType.audio_input) + self.stop = False + + if self._wait: + logging.debug('Stop recording with CTRL-C') + try: + while not self.stop: + self.handleQueue() + time.sleep(0.01) + except KeyboardInterrupt: + logging.debug("Keyboard interrupt on record") + finally: + self.finish() + + def handleQueue(self): + """ + This method should be called to grab data from the input queue, which + is filled by the stream, and put it into a file. It should be called at + a regular interval to prevent overflowing of the queue. It is called + within the start() method of the recording, if block is set to True. + Otherwise, it should be called from its parent at regular intervals. + For example, in Qt this can be done using a QTimer. + + + """ + while self.inq.qsize() > 0: + msg, data = self.inq.get() + if msg == StreamMsg.streamData: + samples, = data + self.__addTimeData(samples) + elif msg == StreamMsg.streamStarted: + logging.debug(f'handleQueue obtained message {msg}') + avtype, metadata = data + if metadata is None: + raise RuntimeError('BUG: no stream metadata') + self.processStreamMetaData(metadata) + elif msg == StreamMsg.streamMetaData: + logging.debug(f'handleQueue obtained message {msg}') + avtype, metadata = data + if metadata is not None: + self.processStreamMetaData(metadata) + else: + logging.debug(f'handleQueue obtained message {msg}') + # An error occured, we do not remove the file, but we stop. + self.stop = True + logging.debug(f'Stream message: {msg}. Recording stopped unexpectedly') + raise RuntimeError('Recording stopped unexpectedly') + + + def processStreamMetaData(self, md: StreamMetaData): + """ + Stream metadata has been catched. This is used to set all metadata in + the measurement file + + """ + logging.debug('Recording::processStreamMetaData()') + # The 'Audio' dataset as specified in lasp_measurement, where data is + # send to. We use gzip as compression, this gives moderate a moderate + # compression to the data. + f = self._f + blocksize = md.blocksize + nchannels = len(md.in_ch) + self._ad = f.create_dataset('audio', + (1, blocksize, nchannels), + dtype=md.dtype, + maxshape=( + None, # This means, we can add blocks + # indefinitely + blocksize, + nchannels), + compression='gzip' + ) + + # TODO: This piece of code is not up-to-date and should be changed at a + # later instance once we really want to record video simultaneously + # with audio. + # if smgr.hasVideo(): + # video_x, video_y = smgr.video_x, smgr.video_y + # self._vd = f.create_dataset('video', + # (1, video_y, video_x, 3), + # dtype='uint8', + # maxshape=( + # None, video_y, video_x, 3), + # compression='gzip' + # ) + + # Set the bunch of attributes + f.attrs['samplerate'] = md.fs + f.attrs['nchannels'] = nchannels + f.attrs['blocksize'] = blocksize + f.attrs['sensitivity'] = [ch.sensitivity for ch in md.in_ch] + f.attrs['channel_names'] = [ch.channel_name for ch in md.in_ch] + f.attrs['time'] = time.time() + self.blocksize = blocksize + self.fs = md.fs + + # Measured physical quantity metadata + f.attrs['qtys'] = [ch.qty.to_json() for ch in md.in_ch] + self.metadata = md + def setDelete(self, val: bool): """ Set the delete flag. If set, measurement file is deleted at the end of @@ -61,92 +190,69 @@ class Recording: """ self._deleteFile = val - def __enter__(self): + def finish(self): """ + This method should be called to finish and a close a recording file, + remove the queue from the stream, etc. - with Recording(fn, stream, wait=False): - event_loop_here() - - or: - - with Recording(fn, stream, wait=True): - pass """ + logging.debug('Recording::finish()') + smgr = self.smgr - stream = self._stream - f = self._f - nchannels = len(stream.input_channel_names) + # TODO: Fix when video + # if smgr.hasVideo(): + # smgr.removeCallback(self._vCallback, AvType.video_input) + # self._f['video_frame_positions'] = self._video_frame_positions - self._ad = f.create_dataset('audio', - (1, stream.blocksize, nchannels), - dtype=stream.dtype, - maxshape=(None, stream.blocksize, - nchannels), - compression='gzip' - ) - if stream.hasVideo(): - video_x, video_y = stream.video_x, stream.video_y - self._vd = f.create_dataset('video', - (1, video_y, video_x, 3), - dtype='uint8', - maxshape=( - None, video_y, video_x, 3), - compression='gzip' - ) + try: + smgr.removeListener(self.inq) + except Exception as e: + logging.error(f'Could not remove queue from smgr: {e}') - f.attrs['samplerate'] = stream.samplerate - f.attrs['nchannels'] = nchannels - f.attrs['blocksize'] = stream.blocksize - f.attrs['sensitivity'] = stream.input_sensitivity - f.attrs['channel_names'] = stream.input_channel_names - f.attrs['time'] = time.time() - f.attrs['qtys'] = [qty.to_json() for qty in stream.input_qtys] - self._running <<= True + try: + # Close the recording file + self._f.close() + except Exception as e: + logging.error(f'Error closing file: {e}') - if not stream.isRunning(): - stream.start() - - print('Starting record....') - stream.addCallback(self._aCallback, AvType.audio_input) - if stream.hasVideo(): - stream.addCallback(self._aCallback, AvType.audio_input) - - if self._wait: - with self._running_cond: - print('Stop recording with CTRL-C') - try: - while self._running: - self._running_cond.wait() - except KeyboardInterrupt: - print("Keyboard interrupt on record") - self._running <<= False - - - def __exit__(self, type, value, traceback): - self._running <<= False - stream = self._stream - stream.removeCallback(self._aCallback, AvType.audio_input) - if stream.hasVideo(): - stream.removeCallback(self._vCallback, AvType.video_input) - self._f['video_frame_positions'] = self._video_frame_positions - - self._f.close() - print('\nEnding record') + logging.debug('Recording ended') if self._deleteFile: - try: - os.remove(self._fn) - except Exception as e: - print(f'Error deleting file: {self._fn}') + self.__deleteFile() + def __deleteFile(self): + """ + Cleanup the recording file. + """ + try: + os.remove(self._fn) + except Exception as e: + logging.error(f'Error deleting file: {self._fn}') - def _aCallback(self, indata, outdata, aframe): - if indata is None: + def __addTimeData(self, indata): + """ + Called by handleQueue() and adds new time data to the storage file. + """ + # logging.debug('Recording::__addTimeData()') + + if self.stop: + # Stop flag is raised. We stop recording here. return - curT = self._ablockno()*self.blocksize/self.samplerate + # The current time that is recorded and stored into the file, without + # the new data + if not self.metadata: + # We obtained stream data, but metadata is not yet available. + # Therefore, we request it explicitly and then we return + logging.info('Requesting stream metadata') + self.smgr.getStreamStatus(AvType.audio_input) + self.smgr.getStreamStatus(AvType.audio_duplex) + return + + curT = self._ablockno*self.blocksize/self.fs recstatus = RecordStatus( - curT = curT, - done = False) + curT=curT, + done=False) + if self._progressCallback is not None: self._progressCallback(recstatus) @@ -159,22 +265,22 @@ class Recording: if self.rectime is not None and curT > self.rectime: # We are done! - self._running <<= False - with self._running_cond: - self._running_cond.notify() if self._progressCallback is not None: recstatus.done = True self._progressCallback(recstatus) + self.stop = True return - self._ad.resize(self._ablockno()+1, axis=0) - self._ad[self._ablockno(), :, :] = indata + # Add the data to the file + self._ad.resize(self._ablockno+1, axis=0) + self._ad[self._ablockno, :, :] = indata + + # Increase the block counter self._ablockno += 1 - def _vCallback(self, frame, framectr): - self._video_frame_positions.append(self._ablockno()) - vframeno = self._vframeno - self._vd.resize(vframeno+1, axis=0) - self._vd[vframeno, :, :] = frame - self._vframeno += 1 - + # def _vCallback(self, frame, framectr): + # self._video_frame_positions.append(self._ablockno()) + # vframeno = self._vframeno + # self._vd.resize(vframeno+1, axis=0) + # self._vd[vframeno, :, :] = frame + # self._vframeno += 1 diff --git a/lasp/lasp_siggen.py b/lasp/lasp_siggen.py new file mode 100644 index 0000000..bbff602 --- /dev/null +++ b/lasp/lasp_siggen.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python3.6 +# -*- coding: utf-8 -*- +""" +Author: J.A. de Jong - ASCEE + +Description: Signal generator code + +""" +import multiprocessing as mp +import dataclasses +import logging +from typing import Tuple +import numpy as np + +from .filter import PinkNoise +from .lasp_octavefilter import SosOctaveFilterBank, SosThirdOctaveFilterBank +from .filter import OctaveBankDesigner, PinkNoise, ThirdOctaveBankDesigner +from .lasp_avstream import StreamManager, ignoreSigInt +from .wrappers import Siggen as pyxSiggen, Equalizer +from enum import Enum, unique, auto + +QUEUE_BUFFER_TIME = 0.5 # The amount of time used in the queues for buffering +# of data, larger is more stable, but also enlarges latency + +__all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"] + + +@unique +class SignalType(Enum): + Periodic = 0 + Noise = 1 + Sweep = 2 + Meas = 3 + + +@unique +class NoiseType(Enum): + white = "White noise" + pink = "Pink noise" + + def __str__(self): + return str(self.value) + + @staticmethod + def fillComboBox(combo): + for type_ in list(NoiseType): + combo.addItem(str(type_)) + + @staticmethod + def getCurrent(cb): + return list(NoiseType)[cb.currentIndex()] + + +class SiggenWorkerDone(Exception): + def __str__(self): + return "Done generating signal" + + +@unique +class SiggenMessage(Enum): + """ + Different messages that can be send to the signal generator over the pipe + connection. + """ + endProcess = auto() # Stop and quit the signal generator + adjustVolume = auto() # Adjust the volume + newEqSettings = auto() # Forward new equalizer settings + newSiggenData = auto() # Forward new equalizer settings + ready = auto() # Send out once, once the signal generator is ready with + # pre-generating data. + + # These messages are send back to the main thread over the pipe + error = auto() + done = auto() + + +@dataclasses.dataclass +class SiggenData: + """ + Metadata used to create a Signal Generator + """ + fs: float # Sample rate [Hz] + + # Number of frames "samples" to send in one block + nframes_per_block: int + + # The data type to output + dtype: np.dtype + + # Level of output signal [dBFS]el + level_dB: float + + # Signal type specific data, i.e. + signaltype: SignalType + signaltypedata: Tuple = None + + # Settings for the equalizer etc + eqdata: object = None # Equalizer data + +class SiggenProcess(mp.Process): + """ + Main function running in a different process, is responsible for generating + new signal data. Uses the signal queue to push new generated signal data + on. + """ + def __init__(self, siggendata, dataq, pipe): + + """ + Args: + siggendata: The signal generator data to start with. + dataq: The queue to put generated signal on + pipe: Control and status messaging pipe + """ + + self.dataq = dataq + self.siggendata = siggendata + self.pipe = pipe + self.eq = None + self.siggen = None + + fs = self.siggendata.fs + nframes_per_block = siggendata.nframes_per_block + self.nblocks_buffer = max( + 1, int(QUEUE_BUFFER_TIME * fs/ nframes_per_block) + ) + super().__init__() + + def newSiggen(self, siggendata: SiggenData): + """ + Create a signal generator based on parameters specified in global + function data. + + Args: + siggendata: SiggenData. Metadata to create a new signal generator. + """ + + logging.debug('newSiggen') + + fs = siggendata.fs + nframes_per_block = siggendata.nframes_per_block + level_dB = siggendata.level_dB + signaltype = siggendata.signaltype + signaltypedata = siggendata.signaltypedata + + if signaltype == SignalType.Periodic: + freq, = signaltypedata + siggen = pyxSiggen.sineWave(fs, freq, level_dB) + elif signaltype == SignalType.Noise: + noisetype, zerodBpoint = signaltypedata + if noisetype == NoiseType.white: + sos_colorfilter = None + elif noisetype == NoiseType.pink: + sos_colorfilter = PinkNoise(fs, zerodBpoint).flatten() + else: + raise ValueError(f"Unknown noise type") + + siggen = pyxSiggen.noise(fs, level_dB, sos_colorfilter) + + elif signaltype == SignalType.Sweep: + fl, fu, Ts, Tq, sweep_flags = signaltypedata + siggen = pyxSiggen.sweep(fs, fl, fu, Ts, Tq, sweep_flags, level_dB) + + else: + raise ValueError(f"Not implemented signal type: {signaltype}") + + logging.debug('newSiggen') + return siggen + + def generate(self): + """ + Generate a single block of data and put it on the data queue + """ + signal = self.siggen.genSignal(self.siggendata.nframes_per_block) + dtype = self.siggendata.dtype + if self.eq is not None: + signal = self.eq.equalize(signal) + if np.issubdtype(dtype, np.integer): + bitdepth_fixed = dtype.itemsize * 8 + signal *= 2 ** (bitdepth_fixed - 1) - 1 + self.dataq.put(signal.astype(dtype)) + + def newEqualizer(self, eqdata): + """ + Create an equalizer object from equalizer data + + Args: + eqdata: dictionary containing equalizer data. TODO: document the + requiring fields. + """ + if eqdata is None: + return None + eq_type = eqdata['type'] + eq_levels = eqdata['levels'] + fs = self.siggendata.fs + + if eq_type == 'three': + fb = SosThirdOctaveFilterBank(fs) + elif eq_type == 'one': + fb = SosOctaveFilterBank(fs) + + eq = Equalizer(fb._fb) + if eq_levels is not None: + eq.setLevels(eq_levels) + return eq + + def run(self): + # The main function of the actual process + # First things first + ignoreSigInt() + + try: + self.siggen = self.newSiggen(self.siggendata) + except Exception as e: + self.pipe.send((SiggenMessage.error, str(e))) + + try: + self.eq = self.newEqualizer(self.siggendata.eqdata) + except Exception as e: + self.pipe.send((SiggenMessage.error, str(e))) + + # Pre-generate blocks of signal data + while self.dataq.qsize() < self.nblocks_buffer: + self.generate() + + self.pipe.send((SiggenMessage.ready, None)) + + while True: + # Wait here for a while, to check for messages to consume + if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 4): + msg, data = self.pipe.recv() + if msg == SiggenMessage.endProcess: + logging.debug("Signal generator caught 'endProcess' message. Exiting.") + return 0 + elif msg == SiggenMessage.adjustVolume: + level_dB = data + logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") + self.siggen.setLevel(level_dB) + elif msg == SiggenMessage.newEqSettings: + eqdata = data + self.eq = self.newEqualizer(eqdata) + elif msg == SiggenMessage.newSiggenData: + siggendata = data + self.siggen = self.newSiggen(siggendata) + else: + self.pipe.send( + SiggenMessage.error, "BUG: Generator caught unknown message. Quiting" + ) + while self.dataq.qsize() < self.nblocks_buffer: + # Generate new data and put it in the queue! + try: + self.generate() + except SiggenWorkerDone: + self.pipe.send(SiggenMessage.done) + return 0 + + return 1 + + +class Siggen: + """ + Signal generator class, generates signal data in a different process to + unload the work in the calling thread. + """ + + def __init__(self, dataq, siggendata: SiggenData): + """""" + + self.pipe, client_end = mp.Pipe(duplex=True) + + self.stopped = False + + self.process = SiggenProcess(siggendata, dataq, client_end) + self.process.start() + + if not self.process.is_alive(): + raise RuntimeError('Unexpected signal generator exception') + + # Block waiting here for signal generator to be ready + msg, data = self.pipe.recv() + if msg == SiggenMessage.ready: + logging.debug('Signal generator ready') + elif msg == SiggenMessage.error: + e = data + raise RuntimeError(f'Signal generator exception: {str(e)}') + else: + # Done, or something + if msg == SiggenMessage.done: + self.stopped = True + + self.handle_msgs() + + def setLevel(self, new_level): + """ + Set a new signal level to the generator + + Args: + new_level: The new level in [dBFS] + """ + self.pipe.send((SiggenMessage.adjustVolume, new_level)) + + def setEqData(self, eqdata): + self.pipe.send((SiggenMessage.newEqSettings, eqdata)) + + def setSiggenData(self, siggendata: SiggenData): + """ + Updates the whole signal generator, based on new signal generator data. + """ + self.pipe.send((SiggenMessage.newSiggenData, siggendata)) + + def handle_msgs(self): + while self.pipe.poll(): + msg, data = self.pipe.recv() + if msg == SiggenMessage.error: + raise RuntimeError( + f"Error in initialization of signal generator: {data}" + ) + # elif msg == SiggenMessage.done: + # self.stop() + + + def cleanup(self): + logging.debug('Siggen::stop()') + self.pipe.send((SiggenMessage.endProcess, None)) + self.pipe.close() + + logging.debug('Joining siggen process') + self.process.join() + logging.debug('Joining siggen process done') + self.process.close() + + self.process = None + diff --git a/lasp/wrappers.pyx b/lasp/wrappers.pyx index e9e6fa9..536c396 100644 --- a/lasp/wrappers.pyx +++ b/lasp/wrappers.pyx @@ -74,7 +74,7 @@ cdef extern from "lasp_python.h": __all__ = ['AvPowerSpectra', 'SosFilterBank', 'FilterBank', 'Siggen', 'sweep_flag_forward', 'sweep_flag_backward', 'sweep_flag_linear', 'sweep_flag_exponential', - 'load_fft_wisdom', 'store_fft_wisdom'] + 'load_fft_wisdom', 'store_fft_wisdom', 'Window'] setTracerLevel(15) @@ -635,6 +635,7 @@ cdef extern from "lasp_siggen.h": c_Siggen* Siggen_Sweep_create(d fs, d fl, d fu, d Ts,d Tq, us sweep_flags, d level_dB) + void Siggen_setLevel(c_Siggen*,d new_level_dB) us Siggen_getN(const c_Siggen*) void Siggen_genSignal(c_Siggen*, vd* samples) nogil void Siggen_free(c_Siggen*) @@ -659,6 +660,9 @@ cdef class Siggen: if self._siggen: Siggen_free(self._siggen) + def setLevel(self,d level_dB): + Siggen_setLevel(self._siggen, level_dB) + def genSignal(self, us nsamples): output = np.empty(nsamples, dtype=np.float) assert self._siggen != NULL diff --git a/scripts/lasp_record b/scripts/lasp_record index 0d42279..9d2edef 100755 --- a/scripts/lasp_record +++ b/scripts/lasp_record @@ -1,10 +1,7 @@ -#!/usr/bin/python3 -import argparse -import sys - - +#!/usr/bin/python3.8 +import sys, logging, os, argparse parser = argparse.ArgumentParser( - description='Acquire data and store a measurement file' + description='Acquire data and store to a measurement file.' ) parser.add_argument('filename', type=str, help='File name to record to.' @@ -15,63 +12,57 @@ parser.add_argument('--duration', '-d', type=float, device_help = 'DAQ Device to record from' parser.add_argument('--input-daq', '-i', help=device_help, type=str, default='Default') +parser.add_argument('--log', '-l', + help='Specify log level [info, debug, warning, ...]', + type=str, default='info') + args = parser.parse_args() +numeric_level = getattr(logging, args.log.upper(), None) +if not isinstance(numeric_level, int): + raise ValueError('Invalid log level: %s' % args.loglevel) +FORMAT = "[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s" +logging.basicConfig(format=FORMAT, level=numeric_level) -from lasp.lasp_avstream import AvStream, AvType -from lasp.lasp_record import Recording -from lasp.device import DaqConfiguration, Daq, DaqChannel +import multiprocessing +from lasp.device import DaqConfigurations +from lasp import AvType, StreamManager, Recording# configureLogging -configs = DaqConfiguration.loadConfigs() +def main(args): + try: + streammgr = StreamManager() + configs = DaqConfigurations.loadAllConfigs() -for i, (key, val) in enumerate(configs.items()): - print(f'{i:2} : {key}') + config_keys = [key for key in configs.keys()] + for i, key in enumerate(config_keys): + print(f'{i:2} : {key}') -daqindex = input('Please enter required config: ') -try: - daqindex = int(daqindex) -except: - sys.exit(0) + choosen_index = input('Number of configuration to use: ') + try: + daqindex = int(choosen_index) + except: + print('Invalid configuration number. Exiting.') + sys.exit(0) -for i, (key, val) in enumerate(configs.items()): - if i == daqindex: - config = configs[key] + choosen_key = config_keys[daqindex] + config = configs[choosen_key].input_config + print(f'Choosen configuration: {choosen_key}') -config = configs[key] + streammgr.startStream(AvType.audio_input, config, wait=True) + rec = Recording(args.filename, streammgr, args.duration) + streammgr.stopStream(AvType.audio_output) + finally: + try: + streammgr.cleanup() + del stream + except NameError: + pass +if __name__ == '__main__': -print(config) -# daq = RtAudio() -devices = Daq.getDeviceInfo() + multiprocessing.set_start_method('forkserver', force=True) -input_devices = {} -for device in devices: - if device.inputchannels >= 0: - input_devices[device.name] = device - -try: - input_device = input_devices[config.input_device_name] -except KeyError: - raise RuntimeError(f'Input device {config.input_device_name} not available') - -print(input_device) - -stream = AvStream(input_device, - AvType.audio_input, - config) - - -rec = Recording(args.filename, stream, args.duration) -stream.start() -with rec: - pass - -print('Stopping stream...') -stream.stop() - -print('Stream stopped') -print('Closing stream...') -print('Stream closed') + main(args) diff --git a/scripts/lasp_siggen b/scripts/lasp_siggen deleted file mode 100755 index c3b1524..0000000 --- a/scripts/lasp_siggen +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/python3 -import argparse -import numpy as np - - -parser = argparse.ArgumentParser( - description='Play a sine wave' -) -device_help = 'DAQ Device to play to' -parser.add_argument('--device', '-d', help=device_help, type=str, - default='Default') - -args = parser.parse_args() - -from lasp.lasp_avstream import AvStream, AvType -from lasp.device import DAQConfiguration, RtAudio - -config = DAQConfiguration.loadConfigs()[args.device] - -rtaudio = RtAudio() -devices = rtaudio.getDeviceInfo() - -output_devices = {} -for device in devices: - if device.outputchannels >= 0: - output_devices[device.name] = device - -try: - output_device = output_devices[config.output_device_name] -except KeyError: - raise RuntimeError(f'output device {config.output_device_name} not available') - -samplerate = int(config.en_output_rate) -stream = AvStream(output_device, - AvType.audio_output, - config) - -# freq = 440. -freq = 1000. -omg = 2*np.pi*freq - - -def mycallback(indata, outdata, blockctr): - frames = outdata.shape[0] - nchannels = outdata.shape[1] - # nchannels = 1 - streamtime = blockctr*frames/samplerate - t = np.linspace(streamtime, streamtime + frames/samplerate, - frames)[np.newaxis, :] - outp = 0.01*np.sin(omg*t) - for i in range(nchannels): - outdata[:,i] = ((2**16-1)*outp).astype(np.int16) - -stream.addCallback(mycallback, AvType.audio_output) -stream.start() - -input() - -print('Stopping stream...') -stream.stop() - -print('Stream stopped') -print('Closing stream...') -stream.close() -print('Stream closed') diff --git a/scripts/play_sine b/scripts/play_sine new file mode 100755 index 0000000..ffda324 --- /dev/null +++ b/scripts/play_sine @@ -0,0 +1,78 @@ +#!/usr/bin/python3 +import numpy as np +import sys, logging, os, argparse + +parser = argparse.ArgumentParser( + description='Play a sine wave' +) + +parser.add_argument('--freq', '-f', help='Sine frequency [Hz]', type=float, + default=1000.) + +parser.add_argument('--log', '-l', + help='Specify log level [info, debug, warning, ...]', + type=str, default='info') + +args = parser.parse_args() + +numeric_level = getattr(logging, args.log.upper(), None) +if not isinstance(numeric_level, int): + raise ValueError('Invalid log level: %s' % args.loglevel) + +FORMAT = "[%(levelname)s %(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s" +logging.basicConfig(format=FORMAT, level=numeric_level) + +import multiprocessing +from lasp import (StreamManager, AvType, Siggen, SignalType, SiggenData) +from lasp.device import DaqConfigurations + + +if __name__ == '__main__': + multiprocessing.set_start_method('forkserver', force=True) + logging.info(f'Playing frequency {args.freq} [Hz]') + + configs = DaqConfigurations.loadAllConfigs() + + config_keys = [key for key in configs.keys()] + for i, key in enumerate(config_keys): + print(f'{i:2} : {key}') + + choosen_index = input('Number of configuration to use: ') + try: + daqindex = int(choosen_index) + except: + print('Invalid configuration number. Exiting.') + sys.exit(0) + + + choosen_key = config_keys[daqindex] + daqconfig = configs[choosen_key].output_config + + print(f'Choosen configuration: {choosen_key}') + + try: + streammgr = StreamManager() + outq = streammgr.getOutputQueue() + + siggendata = SiggenData( + fs=48e3, + nframes_per_block=1024, + dtype=np.dtype(np.int16), + eqdata=None, + level_dB=-20, + signaltype=SignalType.Periodic, + signaltypedata=(args.freq,) + ) + siggen = Siggen(outq, siggendata) + + streammgr.activateSiggen() + + streammgr.startStream(AvType.audio_output, daqconfig) + + input('Press any key to stop...') + streammgr.stopStream(AvType.audio_output) + finally: + siggen.cleanup() + streammgr.cleanup() + +