From e72e9154aa280812131cd9567d5704d7c6fa84c6 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Fri, 7 May 2021 22:53:29 +0200 Subject: [PATCH] StreamManager is new frontend to all DAQ. --- lasp/device/lasp_daq.pyx | 2 +- lasp/lasp_avstream.py | 549 ++++++++++++++++++++++++--------------- lasp/lasp_common.py | 15 +- lasp/lasp_record.py | 5 +- lasp/lasp_siggen.py | 73 +++--- scripts/lasp_siggen | 35 +-- 6 files changed, 420 insertions(+), 259 deletions(-) diff --git a/lasp/device/lasp_daq.pyx b/lasp/device/lasp_daq.pyx index 36170d7..42cb7ca 100644 --- a/lasp/device/lasp_daq.pyx +++ b/lasp/device/lasp_daq.pyx @@ -1,10 +1,10 @@ cimport cython +from ..lasp_common import AvType from .lasp_deviceinfo cimport DeviceInfo from .lasp_daqconfig cimport DaqConfiguration from cpython.ref cimport PyObject,Py_INCREF, Py_DECREF import numpy as np -from ..lasp_common import AvType __all__ = ['Daq'] diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index 110e7ed..d43fe96 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -2,26 +2,50 @@ """ Author: J.A. de Jong -Description: Read data from image stream and record sound at the same time +Description: Controlling an audio stream in a different process. """ #import cv2 as cv import multiprocessing as mp -import signal +import time, logging, signal +import numpy as np +from enum import unique, Enum, auto +from dataclasses import dataclass from .lasp_multiprocessingpatch import apply_patch apply_patch() from .lasp_atomic import Atomic from .lasp_common import AvType -from .device import (Daq, DeviceInfo, DaqConfiguration) -from threading import Thread, Lock -import numpy as np -import time, logging -from enum import unique, Enum, auto +from .device import (Daq, DeviceInfo, DaqConfiguration, DaqChannel) +from typing import List -__all__ = ['AvStream'] +__all__ = ['StreamManager', 'ignoreSigInt'] + + +def ignoreSigInt(): + """ + Ignore sigint signal. Should be set on all processes to let the main + process control this signal. + """ + signal.signal(signal.SIGINT, signal.SIG_IGN) + +@dataclass +class StreamMetaData: + # Sample rate [Hz] + fs: float + + # Input channels + in_ch: List[DaqChannel] + + # Output channels + out_ch: List[DaqChannel] + + # blocksize + blocksize: int + + # The data type of input and output blocks. + dtype: np.dtype -video_x, video_y = 640, 480 @unique class StreamMsg(Enum): @@ -30,17 +54,19 @@ class StreamMsg(Enum): """ startStream = auto() stopStream = auto() + stopAllStreams = auto() getStreamMetaData = auto() endProcess = auto() + scanDaqDevices = auto() activateSiggen = auto() deactivateSiggen = auto() - """ Second part, status messages that are send back on all listeners """ # "Normal messages" + deviceList = auto() streamStarted = auto() streamStopped = auto() streamMetaData = auto() @@ -51,10 +77,83 @@ class StreamMsg(Enum): streamFatalError = auto() +class AudioStream: + """ + Audio stream. + """ + def __init__(self, + avtype: AvType, + devices: list, + daqconfig: DaqConfiguration, + processCallback: callable): + """ + Initializes the audio stream and tries to start it. + + avtype: AvType + devices: List of device information + daqconfig: DaqConfiguration to used to generate audio stream backend + processCallback: callback function that will be called from a different + thread, with arguments (AudioStream, in + """ + + self.running = Atomic(False) + self.aframectr = Atomic(0) + self.avtype = avtype + self.siggen_activated = Atomic(False) + + api_devices = devices[daqconfig.api] + self.processCallback = processCallback + + matching_devices = [ + device for device in api_devices if + device.name == daqconfig.device_name] + + if len(matching_devices) == 0: + raise RuntimeError('Could not find device {daqconfig.device_name}') + + # TODO: We pick te first one, what to do if we have multiple matches? + # Is that even possible? + device = matching_devices[0] + + self.daq = Daq(device, daqconfig) + en_in_ch = daqconfig.getEnabledInChannels(include_monitor=True) + en_out_ch = daqconfig.getEnabledOutChannels() + + samplerate = self.daq.start(self.streamCallback) + self.streammetadata = StreamMetaData( + fs = samplerate, + in_ch = daqconfig.getEnabledInChannels(), + out_ch = daqconfig.getEnabledOutChannels(), + blocksize = self.daq.nFramesPerBlock, + dtype = self.daq.getNumpyDataType() + ) + + + def streamCallback(self, indata, outdata, nframes): + """ + This is called (from a separate thread) for each block + of audio data. + """ + return self.processCallback(self, indata, outdata) + + def stop(self): + """ + Stop the DAQ stream. Should be called only once. + """ + daq = self.daq + self.daq = None + self.running <<= False + daq.stop() + + self.streammetadata = None + class AvStreamProcess(mp.Process): + """ + Different process on which all audio streams are running + """ - def __init__(self, daqconfig: DaqConfiguration, + def __init__(self, pipe, in_qlist, outq): """ @@ -62,13 +161,15 @@ class AvStreamProcess(mp.Process): device: DeviceInfo """ - self.daqconfig = daqconfig self.pipe = pipe self.in_qlist = in_qlist self.outq = outq - self.aframectr = 0 - self.daq = None - self.streamdata = None + + self.devices = {} + self.daqconfigs = None + + # In, out, duplex + self.streams = {t: None for t in list(AvType)} super().__init__() @@ -76,122 +177,175 @@ class AvStreamProcess(mp.Process): """ The actual function running in a different process. """ + # First things first, ignore interrupt signals # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing signal.signal(signal.SIGINT, signal.SIG_IGN) + # Check for devices + self.rescanDaqDevices() self.siggen_activated = Atomic(False) - self.running = Atomic(False) - self.aframectr = Atomic(0) - daqconfig = self.daqconfig - devices = Daq.getDeviceInfo() - api_devices = devices[daqconfig.api] - - matching_devices = [ - device for device in api_devices if device.name == daqconfig.device_name] - - if len(matching_devices) == 0: - self.pipe.send((StreamMsg.streamFatalError, f"Device {daqconfig.device_name} not available")) - - self.device = matching_devices[0] - # logging.debug(self.device) - # logging.debug(self.daqconfig) while True: msg, data = self.pipe.recv() logging.debug(f'Obtained message {msg}') if msg == StreamMsg.activateSiggen: self.siggen_activated <<= True + elif msg == StreamMsg.deactivateSiggen: self.siggen_activated <<= False + elif msg == StreamMsg.scanDaqDevices: + self.rescanDaqDevices() + + elif msg == StreamMsg.stopAllStreams: + self.stopAllStreams() + elif msg == StreamMsg.endProcess: - if self.streamdata is not None and self.running: - logging.error('Process exit while stream is still running') + self.stopAllStreams() + # and.. exit! return elif msg == StreamMsg.getStreamMetaData: - self.pipe.send((StreamMsg.streamMetaData, self.streamdata)) - for q in self.in_qlist: - q.put((StreamMsg.streamMetaData, self.streamdata)) + avtype = data + stream = self.streams[avtype] + if stream is not None: + self.sendPipe(StreamMsg.streamMetaData, avtype, stream.streammetadata) + else: + self.sendPipe(StreamMsg.streamMetaData, avtype, None) elif msg == StreamMsg.startStream: - self.startStream() + avtype, daqconfig = data + self.startStream(avtype, daqconfig) + elif msg == StreamMsg.stopStream: - self.stopStream() + avtype, = data + self.stopStream(avtype) + + def startStream(self, avtype: AvType, daqconfig: DaqConfiguration): + """ + Start a stream, based on type and configuration + + """ + self.stopRequiredExistingStreams(avtype) + try: + stream = AudioStream(avtype, + self.devices, + daqconfig, self.streamCallback) + self.streams[avtype] = stream + + except Exception as e: + self.sendPipeAndAllQueues(StreamMsg.streamError, avtype, "Error starting stream {str(e)}") + return + + self.sendPipeAndAllQueues(StreamMsg.streamStarted, avtype, stream.streammetadata) + + def stopStream(self, avtype: AvType): + """ + Stop an existing stream, and sets the attribute in the list of streams + to None + + Args: + stream: AudioStream instance + """ + stream = self.streams[avtype] + if stream is not None: + try: + stream.stop() + self.sendPipeAndAllQueues(StreamMsg.streamStopped, stream.avtype) + except Exception as e: + self.sendPipeAndAllQueues(StreamMsg.streamError, stream.avtype, "Error occured in stopping stream: {str(e)}") + self.streams[avtype] = None + + + def stopRequiredExistingStreams(self, avtype: AvType): + """ + Stop all existing streams that conflict with the current avtype + """ + if avtype == AvType.audio_input: + # For a new input, duplex and input needs to be stopped + stream_to_stop = (AvType.audio_input, AvType.audio_duplex) + elif avtype == AvType.audio_output: + # For a new output, duplex and output needs to be stopped + stream_to_stop = (AvType.audio_output, AvType.audio_duplex) + elif avtype == AvType.audio_duplex: + # All others have to stop + stream_to_stop = list(AvType) # All of them + else: + raise ValueError('BUG') + + for stream in stream_to_stop: + if stream is not None: + self.stopStream(stream) - def startStream(self): + def stopAllStreams(self): """ - Start the DAQ stream. + Stops all streams """ - if self.daq is not None: - self.pipe.send((StreamMsg.streamError, 'Stream has already been started')) + for key in self.streams.keys(): + self.stopStream(key) + + def isStreamRunning(self, avtype: AvType = None): + """ + Check whether a stream is running + + Args: + avtype: The stream type to check whether it is still running. If + None, it checks all streams. + + Returns: + True if a stream is running, otherwise false + """ + if avtype is None: + avtype = list(AvType) + else: + avtype = (avtype,) + for t in avtype: + if self.streams[t] is not None: + return True + + return False + + def rescanDaqDevices(self): + """ + Rescan the available DaQ devices. + + """ + if self.isStreamRunning(): + self.sendPipe(StreamMsg.streamError, None, "A stream is running, cannot rescan DAQ devices.") return - try: - self.daq = Daq(self.device, - self.daqconfig) - samplerate = self.daq.start(self.streamCallback) - streamdata = { - 'blocksize': self.daq.nFramesPerBlock, - 'samplerate': samplerate, - 'dtype': self.daq.getNumpyDataType(), - } - - self.streamdata = streamdata - self.pipe.send((StreamMsg.streamStarted, streamdata)) - self.putAllInQueues(StreamMsg.streamStarted, streamdata) - except Exception as e: - logging.debug(f'Error starting stream: {e}') - self.daq = None - self.pipe.send((StreamMsg.streamError, str(e))) - - def stopStream(self): - """ - Stop the DAQ stream. - """ - - if self.daq is None: - self.pipe.send((StreamMsg.streamError, 'Stream is not running')) - return - - try: - self.daq.stop() - self.running <<= False - self.streamdata = None - self.pipe.send((StreamMsg.streamStopped, None)) - self.putAllInQueues(StreamMsg.streamStopped, None) - except Exception as e: - self.pipe.send((StreamMsg.streamError, f'Error stopping stream: {e}')) - - self.streamdata - self.daq = None - - def streamCallback(self, indata, outdata, nframes): + self.devices = Daq.getDeviceInfo() + self.sendPipe(StreamMsg.deviceList, self.devices) + + def streamCallback(self, audiostream, indata, outdata): """This is called (from a separate thread) for each audio block.""" # logging.debug('streamCallback()') - self.aframectr += nframes if self.siggen_activated: # logging.debug('siggen_activated') if self.outq.empty(): outdata[:, :] = 0 msgtxt = 'Output signal buffer underflow' - self.pipe.send((StreamMsg.streamError, msgtxt)) - self.putAllInQueues(StreamMsg.streamError, msgtxt) + self.sendPipeAndAllQueues(StreamMsg.streamError, + audiostream.avtype, + msgtxt) else: newdata = self.outq.get() if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1: - self.pipe.send(StreamMsg.streamFatalError, 'Invalid output data obtained from queue') + self.sendPipeAndAllQueues(StreamMsg.streamFatalError, + audiostream.avtype, + 'Invalid output data obtained from queue') return 1 - outdata[:, :] = newdata[:, np.newaxis] + outdata[:, :] = newdata[:, None] if indata is not None: - self.putAllInQueues(StreamMsg.streamData, indata) + self.putAllInQueues(StreamMsg.streamData, audiostream.avtype, + indata) - return 0 if self.running else 1 + return 0 - def putAllInQueues(self, msg, data): + def putAllInQueues(self, msg, *data): """ Put a message and data on all input queues in the queue list """ @@ -199,51 +353,42 @@ class AvStreamProcess(mp.Process): # Fan out the input data to all queues in the queue list q.put((msg, data)) -def ignoreSigInt(): - signal.signal(signal.SIGINT, signal.SIG_IGN) + # Wrapper functions that safe some typing, they do not require an + # explanation. + def sendPipe(self, msg, *data): + self.pipe.send((msg, data)) -class AvStream: - """Audio and video data stream, to which callbacks can be adde - daqconfig: DAQConfiguration instance. If duplex mode flag is set, - please make sure that output_device is None, as in that case the - output config will be taken from the input device. - video: - """ - def __init__(self, - avtype: AvType, - daqconfig: DaqConfiguration, - video=None): + def sendPipeAndAllQueues(self, msg, *data): + self.sendPipe(msg, *data) + self.putAllInQueues(msg, *data) + + +@dataclass +class StreamStatus: + lastStatus: StreamMsg = StreamMsg.streamStopped + errorTxt: str = None + streammetadata: StreamMetaData = None + + +class StreamManager: + """ + Audio and video data stream manager, to which queus can be added + """ + def __init__(self): """Open a stream for audio in/output and video input. For audio output, by default all available channels are opened for outputting data. - Args: - device: DeviceInfo for the audio device - avtype: Type of stream. Input, output or duplex - - daqconfig: DAQConfiguration instance. If duplex mode flag is set, - please make sure that output_device is None, as in that case the - output config will be taken from the input device. - video: """ - self.avtype = avtype - - en_in_ch = daqconfig.getEnabledInChannels(include_monitor=True) - en_out_ch = daqconfig.getEnabledOutChannels() - - self.input_channel_names = [ch.channel_name for ch in en_in_ch] - self.output_channel_names = [ch.channel_name for ch in en_out_ch] - - self.input_sensitivity = [ch.sensitivity for ch in en_in_ch] - self.input_sensitivity = np.asarray(self.input_sensitivity) - self.input_qtys = [ch.qty for ch in en_in_ch] - - # Counters for the number of frames that have been coming in - self._vframectr = Atomic(0) + # Initialize streamstatus + self.streamstatus = {t: StreamStatus() for t in list(AvType)} + self.devices = None # Multiprocessing manager, pipe, output queue, input queue, self.manager = mp.managers.SyncManager() + + # Start this manager and ignore interrupts # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing self.manager.start(ignoreSigInt) @@ -260,40 +405,77 @@ class AvStream: # Messaging pipe self.pipe, child_pipe = mp.Pipe(duplex=True) + + # Create the stream process self.streamProcess = AvStreamProcess( - daqconfig, child_pipe, self.in_qlist, self.outq) self.streamProcess.start() - # Possible, but long not tested: store video - # self._video = video - # self._video_started = Atomic(False) - self._videothread = None + def handleMessages(self): + """ - self.daqconfig = daqconfig - self.streammetadata = None + Handle messages that are still on the pipe. - def getStreamMetaData(self): - return self.streammetadata + """ + while self.pipe.poll(): + msg, data = self.pipe.recv() + if msg == StreamMsg.streamStarted: + avtype, streammetadata = data + self.streamstatus[avtype].lastStatus = msg + self.streamstatus[avtype].errorTxt = None + self.streamstatus[avtype].streammetadata = streammetadata + + elif msg == StreamMsg.streamStopped: + avtype, = data + self.streamstatus[avtype].lastStatus = msg + self.streamstatus[avtype].errorTxt = None + self.streamstatus[avtype].streammetadata = None + + elif msg == StreamMsg.streamError: + avtype, errorTxt = data + if avtype is not None: + self.streamstatus[avtype].lastStatus = msg + self.streamstatus[avtype].errorTxt = None + + elif msg == StreamMsg.streamMetaData: + avtype, metadata = data + self.streamstatus[avtype].streammetadata = metadata + + elif msg == StreamMsg.deviceList: + devices = data + self.devices = devices + + def getDeviceList(self): + self.handleMessages() + return self.devices + + + def getStreamStatus(self, avtype: AvType): + """ + Returns the current stream Status. + """ + self.handleMessages() + return self.streamstatus[avtype] def getOutputQueue(self): """ Returns the output queue object. - Note, should only be used by one signal generator at the time! + Note, should (of course) only be used by one signal generator at the time! """ return self.outq def activateSiggen(self): + self.handleMessages() logging.debug('activateSiggen()') - self.pipe.send((StreamMsg.activateSiggen, None)) + self.sendPipe(StreamMsg.activateSiggen, None) def deactivateSiggen(self): + self.handleMessages() logging.debug('activateSiggen()') - self.pipe.send((StreamMsg.deactivateSiggen, None)) - + self.sendPipe(StreamMsg.deactivateSiggen, None) def addListener(self): """ @@ -302,12 +484,18 @@ class AvStream: Returns: listener queue """ + self.handleMessages() newqueue = self.manager.Queue() self.in_qlist.append(newqueue) self.in_qlist_local.append(newqueue) return newqueue def removeListener(self, queue): + """ + Remove an input listener queue from the queue list. + """ + # Uses a local queue list to find the index, based on the queue + self.handleMessages() idx = self.in_qlist_local.index(queue) del self.in_qlist[idx] del self.in_qlist_local[idx] @@ -316,29 +504,22 @@ class AvStream: """Returns the current number of installed listeners.""" return len(self.in_qlist) - def start(self): - """Start the stream, which means the callbacks are called with stream - data (audio/video)""" - logging.debug('Starting stream...') - self.pipe.send((StreamMsg.startStream, None)) - msg, data = self.pipe.recv() - if msg == StreamMsg.streamStarted: - self.streammetadata = data - return data - elif msg == StreamMsg.streamError: - raise RuntimeError(data) - else: - raise RuntimeError('BUG: got unexpected message: {msg}') + def startStream(self, avtype: AvType, daqconfig: DaqConfiguration): + """ + Start the stream, which means the callbacks are called with stream + data (audio/video) - def stop(self): - self.pipe.send((StreamMsg.stopStream, None)) - msg, data = self.pipe.recv() - if msg == StreamMsg.streamStopped: - return - elif msg == StreamMsg.streamError: - raise RuntimeError(data) - else: - raise RuntimeError('BUG: got unexpected message: {msg}') + """ + logging.debug('Starting stream...') + self.sendPipe(StreamMsg.startStream, avtype, daqconfig) + self.handleMessages() + + def stopStream(self, avtype: AvType): + self.handleMessages() + self.sendPipe(StreamMsg.stopStream, avtype) + + def stopAllStreams(self): + self.sendPipe(StreamMsg.stopAllStreams) def cleanup(self): """ @@ -349,7 +530,7 @@ class AvStream: Otherwise things will wait forever... """ - self.pipe.send((StreamMsg.endProcess, None)) + self.sendPipe(StreamMsg.endProcess, None) logging.debug('Joining stream process...') self.streamProcess.join() logging.debug('Joining stream process done') @@ -360,47 +541,9 @@ class AvStream: """ return False - - def isRunning(self): - self.pipe.send((StreamMsg.getStreamMetaData, None)) - msg, data = self.pipe.recv() - if msg == StreamMsg.streamMetaData: - streamdata = data - return streamdata is not None - - elif msg == StreamMsg.streamError: - raise RuntimeError(data) - else: - raise RuntimeError('BUG: got unexpected message: {msg}') - - -# def _videoThread(self): -# cap = cv.VideoCapture(self._video) -# if not cap.isOpened(): -# cap.open() -# vframectr = 0 -# loopctr = 0 -# while self._running: -# ret, frame = cap.read() -# # print(frame.shape) -# if ret is True: -# if vframectr == 0: -# self._video_started <<= True -# with self._callbacklock: -# for cb in self._callbacks[AvType.video]: -# cb(frame, vframectr) -# vframectr += 1 -# self._vframectr += 1 -# else: -# loopctr += 1 -# if loopctr == 10: -# print('Error: no video capture!') -# time.sleep(0.2) - -# cap.release() -# print('stopped videothread') - - -# def hasVideo(self): -# return True if self._video is not None else False + def sendPipe(self, msg, *data): + """ + Send a message with data over the control pipe + """ + self.pipe.send((msg, data)) diff --git a/lasp/lasp_common.py b/lasp/lasp_common.py index 1fb8249..4ad5ca8 100644 --- a/lasp/lasp_common.py +++ b/lasp/lasp_common.py @@ -11,6 +11,7 @@ from .wrappers import Window as wWindow from collections import namedtuple from dataclasses import dataclass from dataclasses_json import dataclass_json +from enum import Enum, unique, auto """ Common definitions used throughout the code. @@ -43,11 +44,19 @@ U_REF = 5e-8 # 50 nano meter / s # hence this is the reference level as specified below. dBFS_REF = 0.5*2**0.5 # Which level would be -3.01 dBFS -class AvType: +@unique +class AvType(Enum): """Specificying the type of data, for adding and removing callbacks from the stream.""" - audio_input = 1 - audio_output = 2 + + # Input stream + audio_input = auto() + + # Output stream + audio_output = auto() + + # Both input as well as output + audio_duplex = auto() video = 4 diff --git a/lasp/lasp_record.py b/lasp/lasp_record.py index 336f1ab..7062055 100644 --- a/lasp/lasp_record.py +++ b/lasp/lasp_record.py @@ -8,7 +8,8 @@ import logging import os import time import h5py -from .lasp_avstream import AvStream, AvType, StreamMsg +from .lasp_avstream import StreamMsg, StreamManager +from .lasp_common import AvType @dataclasses.dataclass @@ -19,7 +20,7 @@ class RecordStatus: class Recording: - def __init__(self, fn: str, stream: AvStream, + def __init__(self, fn: str, streammgr: StreamManager, rectime: float = None, wait: bool = True, progressCallback=None): """ diff --git a/lasp/lasp_siggen.py b/lasp/lasp_siggen.py index ce776f2..03cc6cf 100644 --- a/lasp/lasp_siggen.py +++ b/lasp/lasp_siggen.py @@ -15,17 +15,16 @@ import numpy as np from .filter import PinkNoise from .lasp_octavefilter import SosOctaveFilterBank, SosThirdOctaveFilterBank from .filter import OctaveBankDesigner, PinkNoise, ThirdOctaveBankDesigner -from .lasp_avstream import AvStream, AvType +from .lasp_avstream import StreamManager, ignoreSigInt from .wrappers import Siggen as pyxSiggen, Equalizer from enum import Enum, unique, auto -QUEUE_BUFFER_TIME = 0.3 # The amount of time used in the queues for buffering +QUEUE_BUFFER_TIME = 0.5 # The amount of time used in the queues for buffering # of data, larger is more stable, but also enlarges latency __all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"] - class SignalType(Enum): Periodic = auto() Noise = auto() @@ -39,7 +38,7 @@ class NoiseType(Enum): pink = "Pink noise" def __str__(self): - return self.value + return str(self.value) @staticmethod def fillComboBox(combo): @@ -62,9 +61,11 @@ class SiggenMessage(Enum): Different messages that can be send to the signal generator over the pipe connection. """ - stop = auto() # Stop and quit the signal generator + endProcess = auto() # Stop and quit the signal generator adjustVolume = auto() # Adjust the volume newEqSettings = auto() # Forward new equalizer settings + ready = auto() # Send out once, once the signal generator is ready with + # pre-generating data. # These messages are send back to the main thread over the pipe error = auto() @@ -73,6 +74,9 @@ class SiggenMessage(Enum): @dataclasses.dataclass class SiggenData: + """ + Metadata used to create a Signal Generator + """ fs: float # Sample rate [Hz] # Number of frames "samples" to send in one block @@ -120,10 +124,13 @@ class SiggenProcess(mp.Process): ) super().__init__() - def newSiggen(self, siggendata): + def newSiggen(self, siggendata: SiggenData): """ Create a signal generator based on parameters specified in global function data. + + Args: + siggendata: SiggenData. Metadata to create a new signal generator. """ fs = siggendata.fs nframes_per_block = siggendata.nframes_per_block @@ -192,23 +199,31 @@ class SiggenProcess(mp.Process): return eq def run(self): - # Initialization + # The main function of the actual process + # First things first + ignoreSigInt() + try: self.siggen = self.newSiggen(self.siggendata) + except Exception as e: + self.pipe.send((SiggenMessage.error, str(e))) + + try: self.eq = self.newEqualizer(self.siggendata.eqdata) except Exception as e: self.pipe.send((SiggenMessage.error, str(e))) - return 1 # Pre-generate blocks of signal data while self.dataq.qsize() < self.nblocks_buffer: self.generate() + self.pipe.send((SiggenMessage.ready, None)) + while True: if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 2): msg, data = self.pipe.recv() - if msg == SiggenMessage.stop: - logging.debug("Signal generator caught 'stop' message. Exiting.") + if msg == SiggenMessage.endProcess: + logging.debug("Signal generator caught 'endProcess' message. Exiting.") return 0 elif msg == SiggenMessage.adjustVolume: logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") @@ -243,14 +258,27 @@ class Siggen: self.pipe, client_end = mp.Pipe(duplex=True) + self.stopped = False + self.process = SiggenProcess(siggendata, dataq, client_end) self.process.start() - self.handle_msgs() if not self.process.is_alive(): raise RuntimeError('Unexpected signal generator exception') - self.stopped = False + # Block waiting here for signal generator to be ready + msg, data = self.pipe.recv() + if msg == SiggenMessage.ready: + logging.debug('Signal generator ready') + elif msg == SiggenMessage.error: + e = data + raise RuntimeError('Signal generator exception: {str(e)}') + else: + # Done, or something + if msg == SiggenMessage.done: + self.stopped = True + + self.handle_msgs() def setLevel(self, new_level): """ @@ -268,27 +296,16 @@ class Siggen: while self.pipe.poll(): msg, data = self.pipe.recv() if msg == SiggenMessage.error: - self.stop() raise RuntimeError( f"Error in initialization of signal generator: {data}" ) - elif msg == SiggenMessage.ready: - return + # elif msg == SiggenMessage.done: + # self.stop() - elif msg == SiggenMessage.done: - self.stop() - def start(self): - if self.stopped: - raise RuntimeError('BUG: This Siggen object cannot be used again.') - - self.handle_msgs() - - def stop(self): + def cleanup(self): logging.debug('Siggen::stop()') - if self.stopped: - raise RuntimeError('BUG: Siggen::stop() is called twice!') - self.pipe.send((SiggenMessage.stop, None)) + self.pipe.send((SiggenMessage.endProcess, None)) self.pipe.close() logging.debug('Joining siggen process') @@ -297,6 +314,4 @@ class Siggen: self.process.close() self.process = None - logging.debug('End Siggen::stop()') - self.stopped = True diff --git a/scripts/lasp_siggen b/scripts/lasp_siggen index eca1f68..41f3d99 100755 --- a/scripts/lasp_siggen +++ b/scripts/lasp_siggen @@ -5,7 +5,8 @@ import sys, logging, os, argparse logging.basicConfig(level=logging.DEBUG) import multiprocessing from lasp.lasp_multiprocessingpatch import apply_patch -from lasp.lasp_avstream import AvStream, AvType +from lasp.lasp_avstream import StreamManager +from lasp.lasp_common import AvType from lasp.lasp_siggen import Siggen, SignalType, SiggenData from lasp.device import DaqConfigurations @@ -36,41 +37,33 @@ if __name__ == '__main__': choosen_key = config_keys[daqindex] - config = configs[choosen_key].output_config + daqconfig = configs[choosen_key].output_config print(f'Choosen configuration: {choosen_key}') - - try: + streammgr = StreamManager() + outq = streammgr.getOutputQueue() + siggendata = SiggenData( fs=48e3, - nframes_per_block=1024, + nframes_per_block=2048, dtype=np.dtype(np.int16), eqdata=None, level_dB=-20, signaltype=SignalType.Periodic, signaltypedata=(1000.,) ) - - stream = AvStream( - AvType.audio_output, - config) - - outq = stream.getOutputQueue() - stream.activateSiggen() siggen = Siggen(outq, siggendata) - stream.start() + streammgr.activateSiggen() + + streammgr.startStream(AvType.audio_output, daqconfig) + input('Press any key to stop...') - stream.stop() - siggen.stop() - + streammgr.stopStream(AvType.audio_output) finally: - try: - stream.cleanup() - del stream - except NameError: - pass + siggen.cleanup() + streammgr.cleanup()