From c792806fad5453866cdc4973b394f8dc7a4a4980 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Sun, 16 May 2021 16:45:44 +0200 Subject: [PATCH] There is still a small tick at the start of the signal generator. Otherwise, it is working properly --- lasp/device/lasp_daq.pyx | 37 +++++++++++++++++++++---------------- lasp/lasp_avstream.py | 23 ++++++++++++++++++----- lasp/lasp_siggen.py | 10 ++++------ 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/lasp/device/lasp_daq.pyx b/lasp/device/lasp_daq.pyx index 12115cc..ff9fcc5 100644 --- a/lasp/device/lasp_daq.pyx +++ b/lasp/device/lasp_daq.pyx @@ -37,6 +37,7 @@ cdef getNumpyDataType(DataType& dt): else: raise ValueError('Unknown data type') +DEF QUEUE_BUFFER_TIME = 0.5 ctypedef struct PyStreamData: PyObject* pyCallback @@ -44,6 +45,10 @@ ctypedef struct PyStreamData: # Flag used to pass the stopThread. atomic[bool] stopThread + # Flag to indicate that the signal generator queue has been filled for the + # first time. + atomic[bool] ready + # Number of frames per block unsigned nFramesPerBlock @@ -75,30 +80,29 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: unsigned nBytesPerChan= sd.nBytesPerChan unsigned nFramesPerBlock= sd.nFramesPerBlock - double sleeptime = ( sd.nFramesPerBlock)/(4*sd.samplerate); + double sleeptime = ( sd.nFramesPerBlock)/(8*sd.samplerate); + # Sleep time in microseconds us sleeptime_us = (sleeptime*1e6); + us nblocks_buffer = max(1, (QUEUE_BUFFER_TIME * sd.samplerate / + sd.nFramesPerBlock)) + with gil: npy_format = cnp.NPY_FLOAT64 callback = sd.pyCallback # print(f'Number of input channels: {ninchannels}') # print(f'Number of out channels: {noutchannels}') # fprintf(stderr, 'Sleep time: %d us\n', sleeptime_us) - - # Fill a couple of empty blocks ot the outQueue - if sd.outQueue: - for i in range(30): - outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) - memset(outbuffer, 0, sizeof(double)*nBytesPerChan*noutchannels) - sd.outQueue.enqueue( outbuffer) - - outbuffer = NULL - + for i in range(nblocks_buffer): + outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) + memset(outbuffer, 0, sizeof(double)*nBytesPerChan*noutchannels) + sd.outQueue.enqueue( outbuffer) + sd.ready.store(True) while not sd.stopThread.load(): with gil: if sd.outQueue: - while sd.outQueue.size() < 10: + while sd.outQueue.size() < nblocks_buffer: outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) npy_output = data_to_ndarray( @@ -121,7 +125,6 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: sd.outQueue.enqueue( outbuffer) - if sd.inQueue and not sd.inQueue.empty(): # Waiting indefinitely on the queue... inbuffer = sd.inQueue.dequeue() @@ -248,6 +251,8 @@ cdef class Daq: self.sd.stopThread.store(False) + self.sd.ready.store(False) + self.sd.inQueue = NULL self.sd.outQueue = NULL @@ -273,9 +278,9 @@ cdef class Daq: with nogil: self.sd.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, self.sd) - - # Allow stream stome time to start - CPPsleep_ms(300) + while not self.sd.ready.load(): + # Allow stream stome time to start + CPPsleep_ms(100) self.daq_device.start( self.sd.inQueue, diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index 709675f..88299e6 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -78,7 +78,11 @@ class StreamMsg(Enum): streamData = auto() # Error messages + # Some error occured, which mostly leads to a stop of the stream streamError = auto() + # An error occured, but we recovered + streamTemporaryError = auto() + # A fatal error occured. This leads to serious errors in the application streamFatalError = auto() @@ -254,10 +258,18 @@ class AvStreamProcess(mp.Process): """ self.stopRequiredExistingStreams(avtype) + # Empty the queue from existing stuff (puts the signal generator + # directly in action!). + if avtype in (AvType.audio_duplex, AvType.audio_output): + while not self.outq.empty(): + self.outq.get() try: stream = AudioStream(avtype, self.devices, daqconfig, self.streamCallback) self.streams[avtype] = stream + self.sendAllQueues( + StreamMsg.streamStarted, avtype, stream.streammetadata + ) except Exception as e: self.sendAllQueues( @@ -265,9 +277,6 @@ class AvStreamProcess(mp.Process): ) return - self.sendAllQueues( - StreamMsg.streamStarted, avtype, stream.streammetadata - ) def stopStream(self, avtype: AvType): """ @@ -371,10 +380,10 @@ class AvStreamProcess(mp.Process): return 1 outdata[:, :] = newdata[:, None] else: - msgtxt = "Output signal buffer underflow" + msgtxt = "Signal generator buffer underflow. Signal generator cannot keep up with data generation." # logging.error(msgtxt) self.sendAllQueues( - StreamMsg.streamError, audiostream.avtype, msgtxt + StreamMsg.streamTemporaryError, audiostream.avtype, msgtxt ) outdata[:, :] = 0 @@ -487,7 +496,11 @@ class StreamManager: if avtype is not None: self.streamstatus[avtype].lastStatus = msg self.streamstatus[avtype].errorTxt = errorTxt + logging.debug(f'Message: {errorTxt}') + elif msg == StreamMsg.streamTemporaryError: + avtype, errorTxt = data + if avtype is not None: logging.debug(f'Message: {errorTxt}') elif msg == StreamMsg.streamFatalError: diff --git a/lasp/lasp_siggen.py b/lasp/lasp_siggen.py index 48b5bdb..34e95e5 100644 --- a/lasp/lasp_siggen.py +++ b/lasp/lasp_siggen.py @@ -136,9 +136,6 @@ class SiggenProcess(mp.Process): """ logging.debug('newSiggen') - # Cleanup old data queue. - while not self.dataq.empty(): - self.dataq.get() fs = siggendata.fs nframes_per_block = siggendata.nframes_per_block @@ -167,6 +164,7 @@ class SiggenProcess(mp.Process): else: raise ValueError(f"Not implemented signal type: {signaltype}") + logging.debug('newSiggen') return siggen def generate(self): @@ -229,18 +227,18 @@ class SiggenProcess(mp.Process): while True: # Wait here for a while, to check for messages to consume - if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 2): + if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 4): msg, data = self.pipe.recv() if msg == SiggenMessage.endProcess: logging.debug("Signal generator caught 'endProcess' message. Exiting.") return 0 elif msg == SiggenMessage.adjustVolume: - logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") level_dB = data + logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") self.siggen.setLevel(level_dB) elif msg == SiggenMessage.newEqSettings: eqdata = data - eq = self.newEqualizer(eqdata) + self.eq = self.newEqualizer(eqdata) elif msg == SiggenMessage.newSiggenData: siggendata = data self.siggen = self.newSiggen(siggendata)