There is still a small tick at the start of the signal generator. Otherwise, it is working properly

This commit is contained in:
Anne de Jong 2021-05-16 16:45:44 +02:00
parent ea24459d4d
commit c792806fad
3 changed files with 43 additions and 27 deletions

View File

@ -37,6 +37,7 @@ cdef getNumpyDataType(DataType& dt):
else: else:
raise ValueError('Unknown data type') raise ValueError('Unknown data type')
DEF QUEUE_BUFFER_TIME = 0.5
ctypedef struct PyStreamData: ctypedef struct PyStreamData:
PyObject* pyCallback PyObject* pyCallback
@ -44,6 +45,10 @@ ctypedef struct PyStreamData:
# Flag used to pass the stopThread. # Flag used to pass the stopThread.
atomic[bool] stopThread atomic[bool] stopThread
# Flag to indicate that the signal generator queue has been filled for the
# first time.
atomic[bool] ready
# Number of frames per block # Number of frames per block
unsigned nFramesPerBlock unsigned nFramesPerBlock
@ -75,30 +80,29 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil:
unsigned nBytesPerChan= sd.nBytesPerChan unsigned nBytesPerChan= sd.nBytesPerChan
unsigned nFramesPerBlock= sd.nFramesPerBlock unsigned nFramesPerBlock= sd.nFramesPerBlock
double sleeptime = (<double> sd.nFramesPerBlock)/(4*sd.samplerate); double sleeptime = (<double> sd.nFramesPerBlock)/(8*sd.samplerate);
# Sleep time in microseconds
us sleeptime_us = <us> (sleeptime*1e6); us sleeptime_us = <us> (sleeptime*1e6);
us nblocks_buffer = <us> max(1, (QUEUE_BUFFER_TIME * sd.samplerate /
sd.nFramesPerBlock))
with gil: with gil:
npy_format = cnp.NPY_FLOAT64 npy_format = cnp.NPY_FLOAT64
callback = <object> sd.pyCallback callback = <object> sd.pyCallback
# print(f'Number of input channels: {ninchannels}') # print(f'Number of input channels: {ninchannels}')
# print(f'Number of out channels: {noutchannels}') # print(f'Number of out channels: {noutchannels}')
# fprintf(stderr, 'Sleep time: %d us\n', sleeptime_us) # fprintf(stderr, 'Sleep time: %d us\n', sleeptime_us)
for i in range(nblocks_buffer):
# Fill a couple of empty blocks ot the outQueue outbuffer = <double*> malloc(sizeof(double)*nBytesPerChan*noutchannels)
if sd.outQueue: memset(outbuffer, 0, sizeof(double)*nBytesPerChan*noutchannels)
for i in range(30): sd.outQueue.enqueue(<double*> outbuffer)
outbuffer = <double*> malloc(sizeof(double)*nBytesPerChan*noutchannels) sd.ready.store(True)
memset(outbuffer, 0, sizeof(double)*nBytesPerChan*noutchannels)
sd.outQueue.enqueue(<double*> outbuffer)
outbuffer = NULL
while not sd.stopThread.load(): while not sd.stopThread.load():
with gil: with gil:
if sd.outQueue: if sd.outQueue:
while sd.outQueue.size() < 10: while sd.outQueue.size() < nblocks_buffer:
outbuffer = <double*> malloc(sizeof(double)*nBytesPerChan*noutchannels) outbuffer = <double*> malloc(sizeof(double)*nBytesPerChan*noutchannels)
npy_output = <object> data_to_ndarray( npy_output = <object> data_to_ndarray(
@ -121,7 +125,6 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil:
sd.outQueue.enqueue(<double*> outbuffer) sd.outQueue.enqueue(<double*> outbuffer)
if sd.inQueue and not sd.inQueue.empty(): if sd.inQueue and not sd.inQueue.empty():
# Waiting indefinitely on the queue... # Waiting indefinitely on the queue...
inbuffer = <double*> sd.inQueue.dequeue() inbuffer = <double*> sd.inQueue.dequeue()
@ -248,6 +251,8 @@ cdef class Daq:
self.sd.stopThread.store(False) self.sd.stopThread.store(False)
self.sd.ready.store(False)
self.sd.inQueue = NULL self.sd.inQueue = NULL
self.sd.outQueue = NULL self.sd.outQueue = NULL
@ -273,9 +278,9 @@ cdef class Daq:
with nogil: with nogil:
self.sd.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, self.sd.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction,
<void*> self.sd) <void*> self.sd)
while not self.sd.ready.load():
# Allow stream stome time to start # Allow stream stome time to start
CPPsleep_ms(300) CPPsleep_ms(100)
self.daq_device.start( self.daq_device.start(
self.sd.inQueue, self.sd.inQueue,

View File

@ -78,7 +78,11 @@ class StreamMsg(Enum):
streamData = auto() streamData = auto()
# Error messages # Error messages
# Some error occured, which mostly leads to a stop of the stream
streamError = auto() streamError = auto()
# An error occured, but we recovered
streamTemporaryError = auto()
# A fatal error occured. This leads to serious errors in the application
streamFatalError = auto() streamFatalError = auto()
@ -254,10 +258,18 @@ class AvStreamProcess(mp.Process):
""" """
self.stopRequiredExistingStreams(avtype) self.stopRequiredExistingStreams(avtype)
# Empty the queue from existing stuff (puts the signal generator
# directly in action!).
if avtype in (AvType.audio_duplex, AvType.audio_output):
while not self.outq.empty():
self.outq.get()
try: try:
stream = AudioStream(avtype, self.devices, stream = AudioStream(avtype, self.devices,
daqconfig, self.streamCallback) daqconfig, self.streamCallback)
self.streams[avtype] = stream self.streams[avtype] = stream
self.sendAllQueues(
StreamMsg.streamStarted, avtype, stream.streammetadata
)
except Exception as e: except Exception as e:
self.sendAllQueues( self.sendAllQueues(
@ -265,9 +277,6 @@ class AvStreamProcess(mp.Process):
) )
return return
self.sendAllQueues(
StreamMsg.streamStarted, avtype, stream.streammetadata
)
def stopStream(self, avtype: AvType): def stopStream(self, avtype: AvType):
""" """
@ -371,10 +380,10 @@ class AvStreamProcess(mp.Process):
return 1 return 1
outdata[:, :] = newdata[:, None] outdata[:, :] = newdata[:, None]
else: else:
msgtxt = "Output signal buffer underflow" msgtxt = "Signal generator buffer underflow. Signal generator cannot keep up with data generation."
# logging.error(msgtxt) # logging.error(msgtxt)
self.sendAllQueues( self.sendAllQueues(
StreamMsg.streamError, audiostream.avtype, msgtxt StreamMsg.streamTemporaryError, audiostream.avtype, msgtxt
) )
outdata[:, :] = 0 outdata[:, :] = 0
@ -487,7 +496,11 @@ class StreamManager:
if avtype is not None: if avtype is not None:
self.streamstatus[avtype].lastStatus = msg self.streamstatus[avtype].lastStatus = msg
self.streamstatus[avtype].errorTxt = errorTxt self.streamstatus[avtype].errorTxt = errorTxt
logging.debug(f'Message: {errorTxt}')
elif msg == StreamMsg.streamTemporaryError:
avtype, errorTxt = data
if avtype is not None:
logging.debug(f'Message: {errorTxt}') logging.debug(f'Message: {errorTxt}')
elif msg == StreamMsg.streamFatalError: elif msg == StreamMsg.streamFatalError:

View File

@ -136,9 +136,6 @@ class SiggenProcess(mp.Process):
""" """
logging.debug('newSiggen') logging.debug('newSiggen')
# Cleanup old data queue.
while not self.dataq.empty():
self.dataq.get()
fs = siggendata.fs fs = siggendata.fs
nframes_per_block = siggendata.nframes_per_block nframes_per_block = siggendata.nframes_per_block
@ -167,6 +164,7 @@ class SiggenProcess(mp.Process):
else: else:
raise ValueError(f"Not implemented signal type: {signaltype}") raise ValueError(f"Not implemented signal type: {signaltype}")
logging.debug('newSiggen')
return siggen return siggen
def generate(self): def generate(self):
@ -229,18 +227,18 @@ class SiggenProcess(mp.Process):
while True: while True:
# Wait here for a while, to check for messages to consume # Wait here for a while, to check for messages to consume
if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 2): if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 4):
msg, data = self.pipe.recv() msg, data = self.pipe.recv()
if msg == SiggenMessage.endProcess: if msg == SiggenMessage.endProcess:
logging.debug("Signal generator caught 'endProcess' message. Exiting.") logging.debug("Signal generator caught 'endProcess' message. Exiting.")
return 0 return 0
elif msg == SiggenMessage.adjustVolume: elif msg == SiggenMessage.adjustVolume:
logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS")
level_dB = data level_dB = data
logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS")
self.siggen.setLevel(level_dB) self.siggen.setLevel(level_dB)
elif msg == SiggenMessage.newEqSettings: elif msg == SiggenMessage.newEqSettings:
eqdata = data eqdata = data
eq = self.newEqualizer(eqdata) self.eq = self.newEqualizer(eqdata)
elif msg == SiggenMessage.newSiggenData: elif msg == SiggenMessage.newSiggenData:
siggendata = data siggendata = data
self.siggen = self.newSiggen(siggendata) self.siggen = self.newSiggen(siggendata)