diff --git a/lasp/device/lasp_cpprtaudio.cpp b/lasp/device/lasp_cpprtaudio.cpp index d5969ff..53f9e91 100644 --- a/lasp/device/lasp_cpprtaudio.cpp +++ b/lasp/device/lasp_cpprtaudio.cpp @@ -175,7 +175,7 @@ class AudioDaq: public Daq { &streamoptions, &myerrorcallback ); - } catch(...) { + } catch(RtAudioError& e) { if(rtaudio) delete rtaudio; if(instreamparams) delete instreamparams; if(outstreamparams) delete outstreamparams; diff --git a/lasp/device/lasp_daq.pyx b/lasp/device/lasp_daq.pyx index 8b1039c..fdeabe9 100644 --- a/lasp/device/lasp_daq.pyx +++ b/lasp/device/lasp_daq.pyx @@ -166,7 +166,6 @@ cdef class Daq: try: self.daq_device = cppDaq.createDaq(devinfo[0], daqconfig[0]) except Exception as e: - print(e) raise self.nFramesPerBlock = self.daq_device.framesPerBlock() self.samplerate = self.daq_device.samplerate() diff --git a/lasp/device/lasp_daqconfig.pyx b/lasp/device/lasp_daqconfig.pyx index a34063d..a3cad81 100644 --- a/lasp/device/lasp_daqconfig.pyx +++ b/lasp/device/lasp_daqconfig.pyx @@ -45,6 +45,8 @@ cdef class DaqConfigurations: def loadConfigs(): """ Returns a list of currently available configurations + + The first configuration is for input, the second for output """ with lasp_shelve() as sh: configs_json = sh.load('daqconfigs', {}) @@ -66,13 +68,16 @@ cdef class DaqConfigurations: del configs_json[name] sh.store('daqconfigs', configs_json) +def constructDaqConfig(dict_data): + return DaqConfiguration.from_dict(dict_data) cdef class DaqConfiguration: """ Initialize a device descriptor """ - def __init__(self): - pass + + def __str__(self): + return str(self.to_json()) @staticmethod def fromDeviceInfo(DeviceInfo devinfo): @@ -89,6 +94,9 @@ cdef class DaqConfiguration: config_dict = json.loads(jsonstring) return DaqConfiguration.from_dict(config_dict) + def __reduce__(self): + return (constructDaqConfig, (self.to_dict(),)) + @staticmethod def from_dict(pydict): cdef: @@ -126,8 +134,8 @@ cdef class DaqConfiguration: return pydaqcfg - def to_json(self): - return json.dumps(dict( + def to_dict(self): + return dict( apicode = self.config.api.apicode, device_name = self.config.device_name.decode('utf-8'), @@ -158,8 +166,10 @@ cdef class DaqConfiguration: inputIEPEEnabled = self.config.inputIEPEEnabled, inputACCouplingMode = self.config.inputACCouplingMode, inputRangeIndices = self.config.inputRangeIndices, + ) - )) + def to_json(self): + return json.dumps(self.to_dict()) def getInChannel(self, i:int): return DaqChannel( diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index 79a0664..82e2688 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -1,28 +1,213 @@ # -*- coding: utf-8 -*- """ +Author: J.A. de Jong + Description: Read data from image stream and record sound at the same time """ #import cv2 as cv +import multiprocessing as mp +import signal +from .lasp_multiprocessingpatch import apply_patch +apply_patch() + from .lasp_atomic import Atomic from .lasp_common import AvType from .device import (Daq, DeviceInfo, DaqConfiguration) from threading import Thread, Lock import numpy as np -import time +import time, logging +from enum import unique, Enum, auto __all__ = ['AvStream'] video_x, video_y = 640, 480 +@unique +class StreamMsg(Enum): + """ + First part, control messages that can be send to the stream + """ + startStream = auto() + stopStream = auto() + getStreamMetaData = auto() + endProcess = auto() + + activateSiggen = auto() + deactivateSiggen = auto() + + + """ + Second part, status messages that are send back on all listeners + """ + # "Normal messages" + streamStarted = auto() + streamStopped = auto() + streamMetaData = auto() + streamData = auto() + + # Error messages + streamError = auto() + streamFatalError = auto() + + + +class AvStreamProcess(mp.Process): + + def __init__(self, daqconfig: DaqConfiguration, + pipe, in_qlist, outq): + """ + + Args: + device: DeviceInfo + + """ + self.daqconfig = daqconfig + self.pipe = pipe + self.in_qlist = in_qlist + self.outq = outq + self.aframectr = 0 + self.daq = None + self.streamdata = None + + super().__init__() + + def run(self): + """ + The actual function running in a different process. + """ + # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing + signal.signal(signal.SIGINT, signal.SIG_IGN) + + self.siggen_activated = Atomic(False) + self.running = Atomic(False) + self.aframectr = Atomic(0) + + daqconfig = self.daqconfig + devices = Daq.getDeviceInfo() + api_devices = devices[daqconfig.api] + + matching_devices = [ + device for device in api_devices if device.name == daqconfig.device_name] + + if len(matching_devices) == 0: + self.pipe.send((StreamMsg.streamFatalError, f"Device {daqconfig.device_name} not available")) + + self.device = matching_devices[0] + # logging.debug(self.device) + # logging.debug(self.daqconfig) + while True: + msg, data = self.pipe.recv() + logging.debug(f'Obtained message {msg}') + + if msg == StreamMsg.activateSiggen: + self.siggen_activated <<= True + elif msg == StreamMsg.deactivateSiggen: + self.siggen_activated <<= False + + elif msg == StreamMsg.endProcess: + if self.streamdata is not None and self.running: + logging.error('Process exit while stream is still running') + return + + elif msg == StreamMsg.getStreamMetaData: + self.pipe.send((StreamMsg.streamMetaData, self.streamdata)) + for q in self.in_qlist: + q.put((StreamMsg.streamMetaData, self.streamdata)) + + elif msg == StreamMsg.startStream: + self.startStream() + elif msg == StreamMsg.stopStream: + self.stopStream() + + def startStream(self): + """ + Start the DAQ stream. + """ + if self.daq is not None: + self.pipe.send((StreamMsg.streamError, 'Stream has already been started')) + return + + try: + self.daq = Daq(self.device, + self.daqconfig) + samplerate = self.daq.start(self.streamCallback) + streamdata = { + 'blocksize': self.daq.nFramesPerBlock, + 'samplerate': samplerate, + 'dtype': self.daq.getNumpyDataType(), + } + + self.streamdata = streamdata + self.pipe.send((StreamMsg.streamStarted, streamdata)) + self.putAllInQueues(StreamMsg.streamStarted, streamdata) + except Exception as e: + logging.debug(f'Error starting stream: {e}') + self.daq = None + self.pipe.send((StreamMsg.streamError, str(e))) + + def stopStream(self): + """ + Stop the DAQ stream. + """ + + if self.daq is None: + self.pipe.send((StreamMsg.streamError, 'Stream is not running')) + return + + try: + self.daq.stop() + self.running <<= False + self.streamdata = None + self.pipe.send((StreamMsg.streamStopped, None)) + self.putAllInQueues(StreamMsg.streamStopped, None) + except Exception as e: + self.pipe.send((StreamMsg.streamError, f'Error stopping stream: {e}')) + + self.streamdata + self.daq = None + + def streamCallback(self, indata, outdata, nframes): + """This is called (from a separate thread) for each audio block.""" + self.aframectr += nframes + + if self.siggen_activated: + if self.outq.empty(): + outdata[:, :] = 0 + msgtxt = 'Output signal buffer underflow' + self.pipe.send((StreamMsg.streamError, msgtxt)) + self.putAllInQueues(StreamMsg.streamError, msgtxt) + else: + newdata = self.outq.get() + if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1: + self.pipe.send(StreamMsg.streamFatalError, 'Invalid output data obtained from queue') + return 1 + + if indata is not None: + self.putAllInQueues(StreamMsg.streamData, indata) + + return 0 if self.running else 1 + + def putAllInQueues(self, msg, data): + """ + Put a message and data on all input queues in the queue list + """ + for q in self.in_qlist: + # Fan out the input data to all queues in the queue list + q.put((msg, data)) + +def ignoreSigInt(): + signal.signal(signal.SIGINT, signal.SIG_IGN) class AvStream: - """Audio and video data stream, to which callbacks can be added for - processing the data.""" - + """Audio and video data stream, to which callbacks can be adde + daqconfig: DAQConfiguration instance. If duplex mode flag is set, + please make sure that output_device is None, as in that case the + output config will be taken from the input device. + video: + """ def __init__(self, avtype: AvType, - device: DeviceInfo, daqconfig: DaqConfiguration, video=None): """Open a stream for audio in/output and video input. For audio output, @@ -50,146 +235,160 @@ class AvStream: self.input_sensitivity = np.asarray(self.input_sensitivity) self.input_qtys = [ch.qty for ch in en_in_ch] - # Counters for the number of frames that have been coming in - self._aframectr = Atomic(0) self._vframectr = Atomic(0) - # Lock - self._callbacklock = Lock() - self._running = Atomic(False) + # Multiprocessing manager, pipe, output queue, input queue, + self.manager = mp.managers.SyncManager() + # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing + self.manager.start(ignoreSigInt) - self._video = video - self._video_started = Atomic(False) + # List of queues for all entities that require 'microphone' or input + # data. We need a local list, to manage listener queues, as the queues + # which are in the manager list get a new object id. The local list is + # used to find the index in the manager queues list upon deletion by + # 'removeListener()' + self.in_qlist = self.manager.list([]) + self.in_qlist_local = [] - # Storage for callbacks, specified by type - self._callbacks = { - AvType.audio_input: [], - AvType.audio_output: [], - AvType.video: [] - } + # Queue used for signal generator data + self.outq = self.manager.Queue() + + # Messaging pipe + self.pipe, child_pipe = mp.Pipe(duplex=True) + self.streamProcess = AvStreamProcess( + daqconfig, + child_pipe, + self.in_qlist, + self.outq) + self.streamProcess.start() # Possible, but long not tested: store video + # self._video = video + # self._video_started = Atomic(False) self._videothread = None - # self._audiobackend = RtAudio(daqconfig.api) - self._daq = Daq(device, daqconfig) - self.blocksize = self._daq.nFramesPerBlock - self.samplerate = self._daq.samplerate - self.dtype = self._daq.getNumpyDataType() + self.daqconfig = daqconfig + self.streammetadata = None - def nCallbacks(self): - """Returns the current number of installed callbacks.""" - return len(self._callbacks[AvType.audio_input]) + \ - len(self._callbacks[AvType.audio_output]) + \ - len(self._callbacks[AvType.video]) + def getStreamMetaData(self): + return self.streammetadata - def addCallback(self, cb: callable, cbtype: AvType): - """Add as stream callback to the list of callbacks.""" - with self._callbacklock: - outputcallbacks = self._callbacks[AvType.audio_output] - if cbtype == AvType.audio_output and len(outputcallbacks) > 0: - raise RuntimeError( - 'Only one audio output callback can be allowed') + def getOutputQueue(self): + """ + Returns the output queue object. - if cb not in self._callbacks[cbtype]: - self._callbacks[cbtype].append(cb) + Note, should only be used by one signal generator at the time! + """ + return self.outq - def removeCallback(self, cb, cbtype: AvType): - with self._callbacklock: - if cb in self._callbacks[cbtype]: - self._callbacks[cbtype].remove(cb) + def addListener(self): + """ + Add a listener queue to the list of queues, and return the queue. + + Returns: + listener queue + """ + newqueue = self.manager.Queue() + self.in_qlist.append(newqueue) + self.in_qlist_local.append(newqueue) + return newqueue + + def removeListener(self, queue): + idx = self.in_qlist_local.index(queue) + del self.in_qlist[idx] + del self.in_qlist_local[idx] + + def nListeners(self): + """Returns the current number of installed listeners.""" + return len(self.in_qlist) def start(self): """Start the stream, which means the callbacks are called with stream data (audio/video)""" - - if self._running: - raise RuntimeError('Stream already started') - - assert self._videothread is None - - self._running <<= True - if self._video is not None: - self._videothread = Thread(target=self._videoThread) - self._videothread.start() + logging.debug('Starting stream...') + self.pipe.send((StreamMsg.startStream, None)) + msg, data = self.pipe.recv() + if msg == StreamMsg.streamStarted: + self.streammetadata = data + return data + elif msg == StreamMsg.streamError: + raise RuntimeError(data) else: - self._video_started <<= True - - self.samplerate = self._daq.start(self._audioCallback) - - def _videoThread(self): - cap = cv.VideoCapture(self._video) - if not cap.isOpened(): - cap.open() - vframectr = 0 - loopctr = 0 - while self._running: - ret, frame = cap.read() - # print(frame.shape) - if ret is True: - if vframectr == 0: - self._video_started <<= True - with self._callbacklock: - for cb in self._callbacks[AvType.video]: - cb(frame, vframectr) - vframectr += 1 - self._vframectr += 1 - else: - loopctr += 1 - if loopctr == 10: - print('Error: no video capture!') - time.sleep(0.2) - - cap.release() - print('stopped videothread') - - def _audioCallback(self, indata, outdata, nframes): - """This is called (from a separate thread) for each audio block.""" - self._aframectr += nframes - with self._callbacklock: - # Count the number of output callbacks. If no output callbacks are - # present, and there should be output callbacks, we explicitly set - # the output buffer to zero - noutput_cb = len(self._callbacks[AvType.audio_output]) - - # Loop over callbacks - if outdata is not None: - try: - if len(self._callbacks[AvType.audio_output]) == 0: - outdata[:, :] = 0 - for cb in self._callbacks[AvType.audio_output]: - cb(indata, outdata, self._aframectr()) - except Exception as e: - print(e) - return 2 - if indata is not None: - try: - for cb in self._callbacks[AvType.audio_input]: - cb(indata, outdata, self._aframectr()) - except Exception as e: - print(e) - return 1 - - return 0 if self._running else 1 + raise RuntimeError('BUG: got unexpected message: {msg}') def stop(self): - self._running <<= False + self.pipe.send((StreamMsg.stopStream, None)) + msg, data = self.pipe.recv() + if msg == StreamMsg.streamStopped: + return + elif msg == StreamMsg.streamError: + raise RuntimeError(data) + else: + raise RuntimeError('BUG: got unexpected message: {msg}') - if self._video: - self._videothread.join() - self._videothread = None + def cleanup(self): + """ + Stops the stream if it is still running, and after that, it stops the + stream process. - self._aframectr <<= 0 - self._vframectr <<= 0 - self._video_started <<= False + This method SHOULD always be called before removing a AvStream object. + Otherwise things will wait forever... - self._daq.stop() - self._daq = None - - def isRunning(self): - return self._running() + """ + self.pipe.send((StreamMsg.endProcess, None)) + logging.debug('Joining stream process...') + self.streamProcess.join() + logging.debug('Joining stream process done') def hasVideo(self): - return True if self._video is not None else False + """ + Stub, TODO: for future + """ + return False + + + def isRunning(self): + self.pipe.send((StreamMsg.getStreamMetaData, None)) + msg, data = self.pipe.recv() + if msg == StreamMsg.streamMetaData: + streamdata = data + return streamdata is not None + + elif msg == StreamMsg.streamError: + raise RuntimeError(data) + else: + raise RuntimeError('BUG: got unexpected message: {msg}') + + +# def _videoThread(self): +# cap = cv.VideoCapture(self._video) +# if not cap.isOpened(): +# cap.open() +# vframectr = 0 +# loopctr = 0 +# while self._running: +# ret, frame = cap.read() +# # print(frame.shape) +# if ret is True: +# if vframectr == 0: +# self._video_started <<= True +# with self._callbacklock: +# for cb in self._callbacks[AvType.video]: +# cb(frame, vframectr) +# vframectr += 1 +# self._vframectr += 1 +# else: +# loopctr += 1 +# if loopctr == 10: +# print('Error: no video capture!') +# time.sleep(0.2) + +# cap.release() +# print('stopped videothread') + + +# def hasVideo(self): +# return True if self._video is not None else False + diff --git a/lasp/lasp_record.py b/lasp/lasp_record.py index b208f36..336f1ab 100644 --- a/lasp/lasp_record.py +++ b/lasp/lasp_record.py @@ -3,56 +3,141 @@ """ Read data from stream and record sound and video at the same time """ -from .lasp_atomic import Atomic -from threading import Condition -from .lasp_avstream import AvType, AvStream -import h5py import dataclasses +import logging import os import time +import h5py +from .lasp_avstream import AvStream, AvType, StreamMsg + @dataclasses.dataclass class RecordStatus: curT: float done: bool + class Recording: - def __init__(self, fn: str, stream: AvStream, - rectime: float=None, wait: bool = True, + def __init__(self, fn: str, stream: AvStream, + rectime: float = None, wait: bool = True, progressCallback=None): """ + Start a recording. Blocks if wait is set to True. Args: - fn: Filename to record to. extension is added + fn: Filename to record to. Extension is automatically added if not + provided. stream: AvStream instance to record from. Should have input channels! - rectime: Recording time, None for infinite + rectime: Recording time [s], None for infinite, in seconds. If set + to None, or np.inf, the recording continues indefintely. + progressCallback: callable that is called with an instance of + RecordStatus instance as argument. """ ext = '.h5' if ext not in fn: fn += ext self._stream = stream - self.blocksize = stream.blocksize - self.samplerate = stream.samplerate - self._running = Atomic(False) - self._running_cond = Condition() + self.rectime = rectime self._fn = fn self._video_frame_positions = [] self._curT_rounded_to_seconds = 0 - self._ablockno = Atomic(0) + # Counter of the number of blocks + self._ablockno = 0 self._vframeno = 0 - self._progressCallback = progressCallback + self._progressCallback = progressCallback self._wait = wait self._f = h5py.File(self._fn, 'w') + + # This flag is used to delete the file on finish(), and can be used + # when a recording is canceled. self._deleteFile = False + f = self._f + nchannels = len(stream.input_channel_names) + + # Input queue + self.inq = stream.addListener() + + # Start the stream, if it is not running + try: + if not stream.isRunning(): + metadata = stream.start() + else: + metadata = stream.getStreamMetaData() + except: + # Cleanup stuff, something is going wrong when starting the stream + try: + f.close() + except: + pass + self.__deleteFile() + self.blocksize = metadata['blocksize'] + self.samplerate = metadata['samplerate'] + self.dtype = metadata['dtype'] + + # The 'Audio' dataset as specified in lasp_measurement, where data is + # send to. We use gzip as compression, this gives moderate a moderate + # compression to the data. + self._ad = f.create_dataset('audio', + (1, self.blocksize, nchannels), + dtype=self.dtype, + maxshape=( + None, # This means, we can add blocks + # indefinitely + self.blocksize, + nchannels), + compression='gzip' + ) + + # TODO: This piece of code is not up-to-date and should be changed at a + # later instance once we really want to record video simultaneously + # with audio. + if stream.hasVideo(): + video_x, video_y = stream.video_x, stream.video_y + self._vd = f.create_dataset('video', + (1, video_y, video_x, 3), + dtype='uint8', + maxshape=( + None, video_y, video_x, 3), + compression='gzip' + ) + + # Set the bunch of attributes + f.attrs['samplerate'] = self.samplerate + f.attrs['nchannels'] = nchannels + f.attrs['blocksize'] = self.blocksize + f.attrs['sensitivity'] = stream.input_sensitivity + f.attrs['channel_names'] = stream.input_channel_names + f.attrs['time'] = time.time() + + # Measured quantities + f.attrs['qtys'] = [qty.to_json() for qty in stream.input_qtys] + + logging.debug('Starting record....') + # TODO: Fix this later when we want video + # if stream.hasVideo(): + # stream.addCallback(self._aCallback, AvType.audio_input) + self.stop = False + + if self._wait: + logging.debug('Stop recording with CTRL-C') + try: + while not self.stop: + self.handleQueue() + time.sleep(0.01) + except KeyboardInterrupt: + logging.debug("Keyboard interrupt on record") + finally: + self.finish() + def setDelete(self, val: bool): """ Set the delete flag. If set, measurement file is deleted at the end of @@ -61,92 +146,77 @@ class Recording: """ self._deleteFile = val - def __enter__(self): + def finish(self): """ + This method should be called to finish and a close a recording file, + remove the queue from the stream, etc. - with Recording(fn, stream, wait=False): - event_loop_here() - - or: - - with Recording(fn, stream, wait=True): - pass """ - stream = self._stream - f = self._f - nchannels = len(stream.input_channel_names) + # TODO: Fix when video + # if stream.hasVideo(): + # stream.removeCallback(self._vCallback, AvType.video_input) + # self._f['video_frame_positions'] = self._video_frame_positions - self._ad = f.create_dataset('audio', - (1, stream.blocksize, nchannels), - dtype=stream.dtype, - maxshape=(None, stream.blocksize, - nchannels), - compression='gzip' - ) - if stream.hasVideo(): - video_x, video_y = stream.video_x, stream.video_y - self._vd = f.create_dataset('video', - (1, video_y, video_x, 3), - dtype='uint8', - maxshape=( - None, video_y, video_x, 3), - compression='gzip' - ) + try: + stream.removeListener(self.inq) + except Exception as e: + logging.error(f'Could not remove queue from stream: {e}') - f.attrs['samplerate'] = stream.samplerate - f.attrs['nchannels'] = nchannels - f.attrs['blocksize'] = stream.blocksize - f.attrs['sensitivity'] = stream.input_sensitivity - f.attrs['channel_names'] = stream.input_channel_names - f.attrs['time'] = time.time() - f.attrs['qtys'] = [qty.to_json() for qty in stream.input_qtys] - self._running <<= True + try: + # Close the recording file + self._f.close() + except Exception as e: + logging.error(f'Error closing file: {e}') - if not stream.isRunning(): - stream.start() - - print('Starting record....') - stream.addCallback(self._aCallback, AvType.audio_input) - if stream.hasVideo(): - stream.addCallback(self._aCallback, AvType.audio_input) - - if self._wait: - with self._running_cond: - print('Stop recording with CTRL-C') - try: - while self._running: - self._running_cond.wait() - except KeyboardInterrupt: - print("Keyboard interrupt on record") - self._running <<= False - - - def __exit__(self, type, value, traceback): - self._running <<= False - stream = self._stream - stream.removeCallback(self._aCallback, AvType.audio_input) - if stream.hasVideo(): - stream.removeCallback(self._vCallback, AvType.video_input) - self._f['video_frame_positions'] = self._video_frame_positions - - self._f.close() - print('\nEnding record') + logging.debug('Recording ended') if self._deleteFile: - try: - os.remove(self._fn) - except Exception as e: - print(f'Error deleting file: {self._fn}') + self.__deleteFile() + + def __deleteFile(self): + """ + Cleanup the recording file. + """ + try: + os.remove(self._fn) + except Exception as e: + logging.debug(f'Error deleting file: {self._fn}') + + def handleQueue(self): + """ + This method should be called to grab data from the input queue, which + is filled by the stream, and put it into a file. It should be called at + a regular interval to prevent overflowing of the queue. It is called + within the start() method of the recording, if block is set to True. + Otherwise, it should be called from its parent at regular intervals. + For example, in Qt this can be done using a QTimer. - def _aCallback(self, indata, outdata, aframe): - if indata is None: - return + """ + while self.inq.qsize() > 0: + msg, data = self.inq.get() + if msg == StreamMsg.streamData: + self.__addTimeData(data) + elif msg == StreamMsg.streamStarted: + pass + elif msg == StreamMsg.streamMetaData: + pass + else: + # An error occured, we do not remove the file, but we stop. + self.stop = True - curT = self._ablockno()*self.blocksize/self.samplerate + def __addTimeData(self, indata): + """ + Called by handleQueue() and adds new time data to the storage file. + """ + + # The current time that is recorded and stored into the file, without + # the new data + curT = self._ablockno*self.blocksize/self.samplerate recstatus = RecordStatus( - curT = curT, - done = False) + curT=curT, + done=False) + if self._progressCallback is not None: self._progressCallback(recstatus) @@ -159,22 +229,22 @@ class Recording: if self.rectime is not None and curT > self.rectime: # We are done! - self._running <<= False - with self._running_cond: - self._running_cond.notify() if self._progressCallback is not None: recstatus.done = True self._progressCallback(recstatus) + self.stop = True return - self._ad.resize(self._ablockno()+1, axis=0) - self._ad[self._ablockno(), :, :] = indata + # Add the data to the file + self._ad.resize(self._ablockno+1, axis=0) + self._ad[self._ablockno, :, :] = indata + + # Increase the block counter self._ablockno += 1 - def _vCallback(self, frame, framectr): - self._video_frame_positions.append(self._ablockno()) - vframeno = self._vframeno - self._vd.resize(vframeno+1, axis=0) - self._vd[vframeno, :, :] = frame - self._vframeno += 1 - + # def _vCallback(self, frame, framectr): + # self._video_frame_positions.append(self._ablockno()) + # vframeno = self._vframeno + # self._vd.resize(vframeno+1, axis=0) + # self._vd[vframeno, :, :] = frame + # self._vframeno += 1 diff --git a/lasp/lasp_siggen.py b/lasp/lasp_siggen.py index 9f8fcd5..fcd5e7b 100644 --- a/lasp/lasp_siggen.py +++ b/lasp/lasp_siggen.py @@ -6,11 +6,10 @@ Author: J.A. de Jong - ASCEE Description: Signal generator code """ +import multiprocessing as mp import dataclasses import logging -import multiprocessing as mp from typing import Tuple - import numpy as np from .filter import PinkNoise @@ -18,6 +17,7 @@ from .lasp_octavefilter import SosOctaveFilterBank, SosThirdOctaveFilterBank from .filter import OctaveBankDesigner, PinkNoise, ThirdOctaveBankDesigner from .lasp_avstream import AvStream, AvType from .wrappers import Siggen as pyxSiggen, Equalizer +from enum import Enum, unique, auto QUEUE_BUFFER_TIME = 0.3 # The amount of time used in the queues for buffering # of data, larger is more stable, but also enlarges latency @@ -25,26 +25,30 @@ QUEUE_BUFFER_TIME = 0.3 # The amount of time used in the queues for buffering __all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"] -class SignalType: - Periodic = 0 - Noise = 1 - Sweep = 2 - Meas = 3 + +class SignalType(Enum): + Periodic = auto() + Noise = auto() + Sweep = auto() + Meas = auto() -class NoiseType: - white = (0, "White noise") - pink = (1, "Pink noise") - types = (white, pink) +@unique +class NoiseType(Enum): + white = "White noise" + pink = "Pink noise" + + def __str__(self): + return self.value @staticmethod def fillComboBox(combo): - for type_ in NoiseType.types: - combo.addItem(type_[1]) + for type_ in list(NoiseType): + combo.addItem(str(type_)) @staticmethod def getCurrent(cb): - return NoiseType.types[cb.currentIndex()] + return list(NoiseType)[cb.currentIndex()] class SiggenWorkerDone(Exception): @@ -52,21 +56,22 @@ class SiggenWorkerDone(Exception): return "Done generating signal" -class SiggenMessage: +@unique +class SiggenMessage(Enum): """ Different messages that can be send to the signal generator over the pipe connection. """ - stop = 0 # Stop and quit the signal generator - generate = 1 - adjustVolume = 2 # Adjust the volume - newEqSettings = 3 # Forward new equalizer settings + stop = auto() # Stop and quit the signal generator + generate = auto() + adjustVolume = auto() # Adjust the volume + newEqSettings = auto() # Forward new equalizer settings # These messages are send back to the main thread over the pipe - ready = 4 - error = 5 - done = 6 + ready = auto() + error = auto() + done = auto() @dataclasses.dataclass @@ -90,13 +95,16 @@ class SiggenData: signaltypedata: Tuple = None -def siggenFcn(siggendata: SiggenData, nblocks_buffer: int, - dataq: mp.Queue, pipe - ): +def siggenFcn(siggendata: SiggenData, dataq: mp.Queue, pipe): """ Main function running in a different process, is responsible for generating new signal data. Uses the signal queue to push new generated signal data on. + + Args: + siggendata: The signal generator data to start with. + dataq: The queue to put generated signal on + pipe: Control and status messaging pipe """ fs = siggendata.fs nframes_per_block = siggendata.nframes_per_block @@ -106,6 +114,9 @@ def siggenFcn(siggendata: SiggenData, nblocks_buffer: int, signaltype = siggendata.signaltype signaltypedata = siggendata.signaltypedata + nblocks_buffer = max( + 1, int(QUEUE_BUFFER_TIME * fs/ nframes_per_block) + ) def generate(siggen, eq): """ @@ -188,31 +199,31 @@ def siggenFcn(siggendata: SiggenData, nblocks_buffer: int, pipe.send((SiggenMessage.done, None)) while True: - msg, data = pipe.recv() - if msg == SiggenMessage.stop: - logging.debug("Signal generator caught 'stop' message. Exiting.") - return 0 - elif msg == SiggenMessage.generate: - # logging.debug(f"Signal generator caught 'generate' message") + if pipe.poll(timeout=QUEUE_BUFFER_TIME / 2): + msg, data = pipe.recv() + if msg == SiggenMessage.stop: + logging.debug("Signal generator caught 'stop' message. Exiting.") + return 0 + elif msg == SiggenMessage.adjustVolume: + logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") + level_dB = data + siggen.setLevel(level_dB) + elif msg == SiggenMessage.newEqSettings: + eqdata = data + eq = createEqualizer(eqdata) + else: + pipe.send( + SiggenMessage.error, "BUG: Generator caught unknown message. Quiting" + ) + elif dataq.qsize() < nblocks_buffer: + # Generate new data and put it in the queue! try: - while dataq.qsize() < nblocks_buffer: - # Generate new data and put it in the queue! - generate(siggen, eq) + generate(siggen, eq) except SiggenWorkerDone: pipe.send(SiggenMessage.done) return 0 - elif msg == SiggenMessage.adjustVolume: - logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") - level_dB = data - siggen.setLevel(level_dB) - elif msg == SiggenMessage.newEqSettings: - eqdata = data - eq = createEqualizer(eqdata) - else: - pipe.send( - SiggenMessage.error, "BUG: Generator caught unknown message. Quiting" - ) - return 1 + + return 1 class Siggen: @@ -221,20 +232,14 @@ class Siggen: unload the work in the calling thread. """ - def __init__(self, stream: AvStream, siggendata: SiggenData): + def __init__(self, dataq, siggendata: SiggenData): """""" - self.stream = stream - self.nblocks_buffer = max( - 1, int(QUEUE_BUFFER_TIME * stream.samplerate / stream.blocksize) - ) - qs = [mp.Queue() for i in range(3)] - self.dataq = mp.Queue() self.pipe, client_end = mp.Pipe(duplex=True) self.process = mp.Process( target=siggenFcn, - args=(siggendata, self.nblocks_buffer, self.dataq, client_end), + args=(siggendata, dataq, client_end), ) self.process.start() @@ -273,8 +278,7 @@ class Siggen: def start(self): if self.stopped: raise RuntimeError('BUG: This Siggen object cannot be used again.') - - self.stream.addCallback(self.streamCallback, AvType.audio_output) + self.handle_msgs() def stop(self): diff --git a/scripts/lasp_record b/scripts/lasp_record index 0d42279..fc64a2d 100755 --- a/scripts/lasp_record +++ b/scripts/lasp_record @@ -1,77 +1,64 @@ #!/usr/bin/python3 -import argparse -import sys - - -parser = argparse.ArgumentParser( - description='Acquire data and store a measurement file' -) -parser.add_argument('filename', type=str, - help='File name to record to.' - ' Extension is automatically added.') -parser.add_argument('--duration', '-d', type=float, - help='The recording duration in [s]') - -device_help = 'DAQ Device to record from' -parser.add_argument('--input-daq', '-i', help=device_help, type=str, - default='Default') - -args = parser.parse_args() +import sys, logging, os, argparse +logging.basicConfig(level=logging.DEBUG) +import multiprocessing +from lasp.lasp_multiprocessingpatch import apply_patch +from lasp.device import Daq, DaqChannel, DaqConfigurations from lasp.lasp_avstream import AvStream, AvType from lasp.lasp_record import Recording -from lasp.device import DaqConfiguration, Daq, DaqChannel -configs = DaqConfiguration.loadConfigs() +if __name__ == '__main__': + multiprocessing.set_start_method('forkserver', force=True) + apply_patch() -for i, (key, val) in enumerate(configs.items()): - print(f'{i:2} : {key}') + parser = argparse.ArgumentParser( + description='Acquire data and store a measurement file' + ) + parser.add_argument('filename', type=str, + help='File name to record to.' + ' Extension is automatically added.') + parser.add_argument('--duration', '-d', type=float, + help='The recording duration in [s]') -daqindex = input('Please enter required config: ') -try: - daqindex = int(daqindex) -except: - sys.exit(0) + device_help = 'DAQ Device to record from' + parser.add_argument('--input-daq', '-i', help=device_help, type=str, + default='Default') -for i, (key, val) in enumerate(configs.items()): - if i == daqindex: - config = configs[key] + args = parser.parse_args() -config = configs[key] + configs = DaqConfigurations.loadConfigs() + config_keys = [key for key in configs.keys()] + for i, key in enumerate(config_keys): + print(f'{i:2} : {key}') + choosen_index = input('Number of configuration to use: ') + try: + daqindex = int(choosen_index) + except: + sys.exit(0) -print(config) -# daq = RtAudio() -devices = Daq.getDeviceInfo() + choosen_key = config_keys[daqindex] + config = configs[choosen_key].input_config -input_devices = {} -for device in devices: - if device.inputchannels >= 0: - input_devices[device.name] = device + config.__reduce__() -try: - input_device = input_devices[config.input_device_name] -except KeyError: - raise RuntimeError(f'Input device {config.input_device_name} not available') + print(f'Choosen configuration: {choosen_key}') -print(input_device) + try: + stream = AvStream( + AvType.audio_input, + config) + # stream.start() + rec = Recording(args.filename, stream, args.duration) + # input('Stream started, press any key to start record') + finally: + try: + stream.cleanup() + del stream + except NameError: + pass -stream = AvStream(input_device, - AvType.audio_input, - config) - - -rec = Recording(args.filename, stream, args.duration) -stream.start() -with rec: - pass - -print('Stopping stream...') -stream.stop() - -print('Stream stopped') -print('Closing stream...') -print('Stream closed')