From 16390352dc562d4b89540c492272e4d106cbbb9f Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Tue, 17 May 2022 13:52:34 +0200 Subject: [PATCH] Removed stupid handleMessages from streammanager. --- .gitmodules | 3 + CMakeLists.txt | 7 +-- STL-Threadsafe | 1 + lasp/lasp_avstream.py | 134 ++++++++---------------------------------- 4 files changed, 32 insertions(+), 113 deletions(-) create mode 100644 .gitmodules create mode 160000 STL-Threadsafe diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..36b3976 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "STL-Threadsafe"] + path = STL-Threadsafe + url = https://github.com/miachm/STL-Threadsafe diff --git a/CMakeLists.txt b/CMakeLists.txt index 1ecccff..672bee0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -136,7 +136,6 @@ set(CMAKE_C_FLAGS_RELEASE "-O2 -mfpmath=sse -march=x86-64 -mtune=native \ # ############################# End compilation flags - # Python searching. set(Python_ADDITIONAL_VERSIONS "3.8") set(python_version_windll "38") @@ -151,10 +150,8 @@ if(LASP_FFTPACK_BACKEND) ) endif() -include_directories( - lasp/c - ) - +include_directories(lasp/c) +include_directories(STL-Threadsafe/include) add_subdirectory(lasp) add_subdirectory(test) diff --git a/STL-Threadsafe b/STL-Threadsafe new file mode 160000 index 0000000..08b2d9e --- /dev/null +++ b/STL-Threadsafe @@ -0,0 +1 @@ +Subproject commit 08b2d9e7f487121088a817071d1d42b2736996e9 diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index e7eba83..e9f7fb5 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -16,16 +16,15 @@ from typing import List import numpy as np from .device import Daq, DaqChannel, DaqConfiguration, DeviceInfo +from .filter import highpass from .lasp_atomic import Atomic from .lasp_common import AvType from .lasp_multiprocessingpatch import apply_patch -from .filter import highpass from .wrappers import SosFilterBank apply_patch() - -__all__ = ['StreamManager', 'ignoreSigInt', 'StreamStatus'] +__all__ = ["StreamManager", "ignoreSigInt", "StreamStatus"] def ignoreSigInt(): @@ -66,7 +65,6 @@ class StreamMsg(Enum): getStreamMetaData = auto() endProcess = auto() scanDaqDevices = auto() - """ Second part, status messages that are send back on all listeners """ @@ -107,7 +105,7 @@ class AudioStream: processCallback: callback function that will be called from a different thread, with arguments (AudioStream, in """ - logging.debug('AudioStream()') + logging.debug("AudioStream()") # self.running = Atomic(False) # self.aframectr = Atomic(0) @@ -134,24 +132,22 @@ class AudioStream: en_out_ch = daqconfig.getEnabledOutChannels() if en_in_ch == 0 and en_out_ch == 0: - raise RuntimeError('No enabled input / output channels') - elif en_out_ch == 0 and avtype in (AvType.audio_duplex, - AvType.audio_output): - raise RuntimeError('No enabled output channels') - elif en_in_ch == 0 and avtype in (AvType.audio_input, - AvType.audio_duplex): - raise RuntimeError('No enabled input channels') + raise RuntimeError("No enabled input / output channels") + elif en_out_ch == 0 and avtype in (AvType.audio_duplex, AvType.audio_output): + raise RuntimeError("No enabled output channels") + elif en_in_ch == 0 and avtype in (AvType.audio_input, AvType.audio_duplex): + raise RuntimeError("No enabled input channels") - logging.debug('Ready to start device...') + logging.debug("Ready to start device...") samplerate = self.daq.start(self.streamCallback) # Create required Highpass filters for incoming data - self.hpfs = [None]*len(en_in_ch) + self.hpfs = [None] * len(en_in_ch) for i, ch in enumerate(en_in_ch): # Simple filter with a single bank and one section if ch.highpass > 0: fb = SosFilterBank(1, 1) - hpf = highpass(samplerate, ch.highpass, Q=np.sqrt(2)) + hpf = highpass(samplerate, ch.highpass, Q=np.sqrt(2) / 2) fb.setFilter(0, hpf[None, :]) self.hpfs[i] = fb @@ -164,7 +160,6 @@ class AudioStream: ) self.running = True - def streamCallback(self, indata, outdata, nframes): """ This is called (from a separate thread) for each block @@ -191,12 +186,11 @@ class AudioStream: # be an empty filter if self.hpfs[i] is not None: indata_float = indata[:, [i]].astype(np.float) - filtered_ch_float = self.hpfs[i].filter_( - indata_float - ) + filtered_ch_float = self.hpfs[i].filter_(indata_float) indata_filtered[:, i] = filtered_ch_float.astype( - self.streammetadata.dtype)[:, 0] + self.streammetadata.dtype + )[:, 0] else: # One-to-one copy indata_filtered[:, i] = indata[:, i] @@ -261,7 +255,7 @@ class AvStreamProcess(mp.Process): """ The actual function running in a different process. """ - # First things first, ignore interrupt signals + # First things first, ignore interrupt signals for THIS process # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -296,8 +290,7 @@ class AvStreamProcess(mp.Process): StreamMsg.streamMetaData, avtype, stream.streammetadata ) else: - self.sendAllQueues( - StreamMsg.streamMetaData, avtype, None) + self.sendAllQueues(StreamMsg.streamMetaData, avtype, None) elif msg == StreamMsg.startStream: avtype, daqconfig = data @@ -319,12 +312,9 @@ class AvStreamProcess(mp.Process): while not self.outq.empty(): self.outq.get() try: - stream = AudioStream(avtype, self.devices, - daqconfig, self.streamCallback) + stream = AudioStream(avtype, self.devices, daqconfig, self.streamCallback) self.streams[avtype] = stream - self.sendAllQueues( - StreamMsg.streamStarted, avtype, stream.streammetadata - ) + self.sendAllQueues(StreamMsg.streamStarted, avtype, stream.streammetadata) except Exception as e: self.sendAllQueues( @@ -332,7 +322,6 @@ class AvStreamProcess(mp.Process): ) return - def stopStream(self, avtype: AvType): """ Stop an existing stream, and sets the attribute in the list of streams @@ -345,8 +334,7 @@ class AvStreamProcess(mp.Process): if stream is not None: try: stream.stop() - self.sendAllQueues( - StreamMsg.streamStopped, stream.avtype) + self.sendAllQueues(StreamMsg.streamStopped, stream.avtype) except Exception as e: self.sendAllQueues( StreamMsg.streamError, @@ -487,9 +475,7 @@ class StreamManager: """ def __init__(self): - """Open a stream for audio in/output and video input. For audio output, - - """ + """Open a stream for audio in/output and video input. For audio output,""" # Initialize streamstatus self.streamstatus = {t: StreamStatus() for t in list(AvType)} @@ -525,67 +511,12 @@ class StreamManager: self.our_msgqueue = self.addMsgQueueListener() # Create the stream process - self.streamProcess = AvStreamProcess(child_pipe, - self.msg_qlist, - self.indata_qlist, self.outq) + self.streamProcess = AvStreamProcess( + child_pipe, self.msg_qlist, self.indata_qlist, self.outq + ) self.streamProcess.start() - def handleMessages(self): - """ - Handle messages that are still on the pipe. - """ - # logging.debug('StreamManager::handleMessages()') - msgs = [] - while not self.our_msgqueue.empty(): - msg, data = self.our_msgqueue.get() - logging.debug(f'StreamManager obtained message {msg}') - if msg == StreamMsg.streamStarted: - avtype, streammetadata = data - # logging.debug(f'{avtype}, {streammetadata}') - self.streamstatus[avtype].lastStatus = msg - self.streamstatus[avtype].errorTxt = None - self.streamstatus[avtype].streammetadata = streammetadata - - elif msg == StreamMsg.streamStopped: - (avtype,) = data - self.streamstatus[avtype].lastStatus = msg - self.streamstatus[avtype].errorTxt = None - self.streamstatus[avtype].streammetadata = None - - elif msg == StreamMsg.streamError: - avtype, errorTxt = data - if avtype is not None: - self.streamstatus[avtype].lastStatus = msg - self.streamstatus[avtype].errorTxt = errorTxt - logging.debug(f'Message: {errorTxt}') - - elif msg == StreamMsg.streamTemporaryError: - avtype, errorTxt = data - if avtype is not None: - logging.debug(f'Message: {errorTxt}') - - elif msg == StreamMsg.streamFatalError: - avtype, errorTxt = data - logging.critical(f"Streamprocess fatal error: {errorTxt}") - self.cleanup() - - elif msg == StreamMsg.streamMetaData: - avtype, metadata = data - self.streamstatus[avtype].streammetadata = metadata - - elif msg == StreamMsg.deviceList: - devices, = data - # logging.debug(devices) - self.devices = devices - msgs.append((msg, data)) - - return msgs - - def getDeviceList(self): - self.handleMessages() - return self.devices - - def rescanDaqDevices(self): + def scanDaqDevices(self): """ Output the message to the stream process to rescan the list of devices """ @@ -595,7 +526,6 @@ class StreamManager: """ Sends a request for the stream status over the pipe, for given AvType """ - self.handleMessages() self.sendPipe(StreamMsg.getStreamMetaData, avtype) def getOutputQueue(self): @@ -604,11 +534,8 @@ class StreamManager: Note, should (of course) only be used by one signal generator at the time! """ - self.handleMessages() return self.outq - - def addMsgQueueListener(self): """ Add a listener queue to the list of message queues, and return the @@ -652,7 +579,7 @@ class StreamManager: del self.indata_qlist[idx] del self.indata_qlist_local[idx] - def startStream(self, avtype: AvType, daqconfig: DaqConfiguration, wait=False): + def startStream(self, avtype: AvType, daqconfig: DaqConfiguration): """ Start the stream, which means the callbacks are called with stream data (audio/video) @@ -663,19 +590,10 @@ class StreamManager: """ logging.debug("Starting stream...") - self.handleMessages() self.sendPipe(StreamMsg.startStream, avtype, daqconfig) - if wait: - # Wait for a message to come into the pipe - while True: - if self.pipe.poll(): - self.handleMessages() - if self.streamstatus[avtype].lastStatus != StreamMsg.streamStopped: - break def stopStream(self, avtype: AvType): - logging.debug(f'StreamManager::stopStream({avtype})') - self.handleMessages() + logging.debug(f"StreamManager::stopStream({avtype})") self.sendPipe(StreamMsg.stopStream, avtype) def stopAllStreams(self):