Signal generator adjustments to let it run more smoothly. Output buffer is initially filled with some blocks of 0's to give the generator some headstart

This commit is contained in:
Anne de Jong 2021-05-16 14:24:03 +02:00
parent ee58f19251
commit ea24459d4d
4 changed files with 150 additions and 80 deletions

View File

@ -287,7 +287,6 @@ int mycallback(
AudioDaq* daq = (AudioDaq*) userData;
DataType dtype = daq->dataType();
/* us neninchannels = daq->neninchannels(); */
us neninchannels_inc_mon = daq->neninchannels();
us nenoutchannels = daq->nenoutchannels();
@ -343,6 +342,7 @@ int mycallback(
us j=0; // OUR buffer channel counter
us i=0; // RtAudio channel counter
for(us ch=0;ch<=daq->getHighestOutChannel();ch++) {
/* cerr << "Copying from queue... " << endl; */
if(enoutchannels[ch]) {
memcpy(
&(outputBuffer[i*bytesperchan]),
@ -351,6 +351,7 @@ int mycallback(
j++;
}
else {
/* cerr << "unused output channel in list" << endl; */
memset(
&(outputBuffer[i*bytesperchan]),0,bytesperchan);
}
@ -364,7 +365,7 @@ int mycallback(
}
}
else {
cerr << "Stream output buffer underflow, zero-ing buffer... " << endl;
cerr << "RtAudio backend: stream output buffer underflow!" << endl;
}

View File

@ -85,29 +85,41 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil:
# print(f'Number of out channels: {noutchannels}')
# fprintf(stderr, 'Sleep time: %d us\n', sleeptime_us)
# Fill a couple of empty blocks ot the outQueue
if sd.outQueue:
for i in range(30):
outbuffer = <double*> malloc(sizeof(double)*nBytesPerChan*noutchannels)
memset(outbuffer, 0, sizeof(double)*nBytesPerChan*noutchannels)
sd.outQueue.enqueue(<double*> outbuffer)
outbuffer = NULL
while not sd.stopThread.load():
with gil:
if sd.outQueue and sd.outQueue.size() < 10:
outbuffer = <double*> malloc(sizeof(double)*nBytesPerChan*noutchannels)
if sd.outQueue:
while sd.outQueue.size() < 10:
outbuffer = <double*> malloc(sizeof(double)*nBytesPerChan*noutchannels)
npy_output = <object> data_to_ndarray(
outbuffer,
nFramesPerBlock,
noutchannels,
sd.npy_format,
False, # Do not transfer ownership
True) # F-contiguous
try:
rval = callback(None,
npy_output,
npy_output = <object> data_to_ndarray(
outbuffer,
nFramesPerBlock,
)
noutchannels,
sd.npy_format,
False, # Do not transfer ownership to the temporary
# Numpy container
True) # F-contiguous
try:
rval = callback(None,
npy_output,
nFramesPerBlock,
)
except Exception as e:
logging.error('exception in Cython callback for audio output: ', str(e))
return
except Exception as e:
logging.error('exception in Cython callback for audio output: ', str(e))
return
sd.outQueue.enqueue(<double*> outbuffer)
sd.outQueue.enqueue(<double*> outbuffer)
if sd.inQueue and not sd.inQueue.empty():
@ -266,8 +278,8 @@ cdef class Daq:
CPPsleep_ms(300)
self.daq_device.start(
self.sd.inQueue,
self.sd.outQueue)
self.sd.inQueue,
self.sd.outQueue)
return self.daq_device.samplerate()

View File

@ -23,7 +23,7 @@ from .lasp_multiprocessingpatch import apply_patch
apply_patch()
__all__ = ["StreamManager", "ignoreSigInt"]
__all__ = ['StreamManager', 'ignoreSigInt', 'StreamStatus']
def ignoreSigInt():
@ -144,6 +144,9 @@ class AudioStream:
"""
if not self.running():
return 1
self.aframectr += 1
rv = self.processCallback(self, indata, outdata)
if rv != 0:
self.running <<= False
@ -163,18 +166,26 @@ class AudioStream:
class AvStreamProcess(mp.Process):
"""
Different process on which all audio streams are running
Different process on which all audio streams are running.
"""
def __init__(self, pipe, in_qlist, outq):
def __init__(self, pipe, msg_qlist, indata_qlist, outq):
"""
Args:
device: DeviceInfo
pipe: Message control pipe on which commands are received.
msg_qlist: List of queues on which stream status and events are
sent. Here, everything is send, except for the captured data
itself.
indata_qlist: List of queues on which captured data from a DAQ is
send. This one gets all events, but also captured data.
outq: On this queue, the stream process receives data to be send as
output to the devices.
"""
self.pipe = pipe
self.in_qlist = in_qlist
self.msg_qlist = msg_qlist
self.indata_qlist = indata_qlist
self.outq = outq
self.devices = {}
@ -222,11 +233,12 @@ class AvStreamProcess(mp.Process):
(avtype,) = data
stream = self.streams[avtype]
if stream is not None:
self.sendPipeAndAllQueues(
self.sendAllQueues(
StreamMsg.streamMetaData, avtype, stream.streammetadata
)
else:
self.sendPipeAndAllQueues(StreamMsg.streamMetaData, avtype, None)
self.sendAllQueues(
StreamMsg.streamMetaData, avtype, None)
elif msg == StreamMsg.startStream:
avtype, daqconfig = data
@ -243,16 +255,17 @@ class AvStreamProcess(mp.Process):
"""
self.stopRequiredExistingStreams(avtype)
try:
stream = AudioStream(avtype, self.devices, daqconfig, self.streamCallback)
stream = AudioStream(avtype, self.devices,
daqconfig, self.streamCallback)
self.streams[avtype] = stream
except Exception as e:
self.sendPipeAndAllQueues(
self.sendAllQueues(
StreamMsg.streamError, avtype, "Error starting stream {str(e)}"
)
return
self.sendPipeAndAllQueues(
self.sendAllQueues(
StreamMsg.streamStarted, avtype, stream.streammetadata
)
@ -268,9 +281,10 @@ class AvStreamProcess(mp.Process):
if stream is not None:
try:
stream.stop()
self.sendPipeAndAllQueues(StreamMsg.streamStopped, stream.avtype)
self.sendAllQueues(
StreamMsg.streamStopped, stream.avtype)
except Exception as e:
self.sendPipeAndAllQueues(
self.sendAllQueues(
StreamMsg.streamError,
stream.avtype,
"Error occured in stopping stream: {str(e)}",
@ -331,7 +345,7 @@ class AvStreamProcess(mp.Process):
"""
if self.isStreamRunning():
self.sendPipe(
self.sendAllQueues(
StreamMsg.streamError,
None,
"A stream is running, cannot rescan DAQ devices.",
@ -339,7 +353,7 @@ class AvStreamProcess(mp.Process):
return
self.devices = Daq.getDeviceInfo()
self.sendPipe(StreamMsg.deviceList, self.devices)
self.sendAllQueues(StreamMsg.deviceList, self.devices)
def streamCallback(self, audiostream, indata, outdata):
"""This is called (from a separate thread) for each audio block."""
@ -351,18 +365,18 @@ class AvStreamProcess(mp.Process):
if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1:
msgtxt = "Invalid output data obtained from queue"
logging.fatal(msgtxt)
self.sendPipeAndAllQueues(
self.sendAllQueues(
StreamMsg.streamFatalError, audiostream.avtype, msgtxt
)
return 1
outdata[:, :] = newdata[:, None]
else:
outdata[:, :] = 0
msgtxt = "Output signal buffer underflow"
logging.error(msgtxt)
self.sendPipeAndAllQueues(
# logging.error(msgtxt)
self.sendAllQueues(
StreamMsg.streamError, audiostream.avtype, msgtxt
)
outdata[:, :] = 0
# Siggen not activated
else:
@ -370,26 +384,25 @@ class AvStreamProcess(mp.Process):
outdata[:, :] = 0
if indata is not None:
self.putAllInQueues(StreamMsg.streamData, indata)
self.sendInQueues(StreamMsg.streamData, indata)
return 0
def putAllInQueues(self, msg, *data):
"""
Put a message and data on all input queues in the queue list
"""
for q in self.in_qlist:
# Wrapper functions that safe some typing, they do not require an
# explanation.
def sendInQueues(self, msg, *data):
for q in self.indata_qlist:
# Fan out the input data to all queues in the queue list
q.put((msg, data))
# Wrapper functions that safe some typing, they do not require an
# explanation.
def sendPipe(self, msg, *data):
self.pipe.send((msg, data))
def sendPipeAndAllQueues(self, msg, *data):
self.sendPipe(msg, *data)
self.putAllInQueues(msg, *data)
def sendAllQueues(self, msg, *data):
"""
Destined for all queues, including capture data queues
"""
self.sendInQueues(msg, *data)
for q in self.msg_qlist:
# Fan out the input data to all queues in the queue list
q.put((msg, data))
@dataclass
@ -426,8 +439,11 @@ class StreamManager:
# which are in the manager list get a new object id. The local list is
# used to find the index in the manager queues list upon deletion by
# 'removeListener()'
self.in_qlist = self.manager.list([])
self.in_qlist_local = []
self.indata_qlist = self.manager.list([])
self.indata_qlist_local = []
self.msg_qlist = self.manager.list([])
self.msg_qlist_local = []
# Queue used for signal generator data
self.outq = self.manager.Queue()
@ -435,19 +451,23 @@ class StreamManager:
# Messaging pipe
self.pipe, child_pipe = mp.Pipe(duplex=True)
# This is the queue on which this class listens for stream process
# messages.
self.our_msgqueue = self.addMsgQueueListener()
# Create the stream process
self.streamProcess = AvStreamProcess(child_pipe, self.in_qlist, self.outq)
self.streamProcess = AvStreamProcess(child_pipe,
self.msg_qlist,
self.indata_qlist, self.outq)
self.streamProcess.start()
def handleMessages(self):
"""
Handle messages that are still on the pipe.
"""
# logging.debug('StreamManager::handleMessages()')
while self.pipe.poll():
msg, data = self.pipe.recv()
while not self.our_msgqueue.empty():
msg, data = self.our_msgqueue.get()
logging.debug(f'StreamManager obtained message {msg}')
if msg == StreamMsg.streamStarted:
avtype, streammetadata = data
@ -466,7 +486,9 @@ class StreamManager:
avtype, errorTxt = data
if avtype is not None:
self.streamstatus[avtype].lastStatus = msg
self.streamstatus[avtype].errorTxt = None
self.streamstatus[avtype].errorTxt = errorTxt
logging.debug(f'Message: {errorTxt}')
elif msg == StreamMsg.streamFatalError:
avtype, errorTxt = data
@ -515,35 +537,51 @@ class StreamManager:
def deactivateSiggen(self):
self.handleMessages()
logging.debug("activateSiggen()")
logging.debug("deactivateSiggen()")
self.sendPipe(StreamMsg.deactivateSiggen, None)
def addListener(self):
def addMsgQueueListener(self):
"""
Add a listener queue to the list of message queues, and return the
queue.
Returns:
listener queue
"""
newqueue = self.manager.Queue()
self.msg_qlist.append(newqueue)
self.msg_qlist_local.append(newqueue)
return newqueue
def removeMsgQueueListener(self, queue):
"""
Remove an input listener queue from the message queue list.
"""
# Uses a local queue list to find the index, based on the queue
idx = self.msg_qlist_local.index(queue)
del self.msg_qlist_local[idx]
del self.msg_qlist[idx]
def addInQueueListener(self):
"""
Add a listener queue to the list of queues, and return the queue.
Returns:
listener queue
"""
self.handleMessages()
newqueue = self.manager.Queue()
self.in_qlist.append(newqueue)
self.in_qlist_local.append(newqueue)
self.indata_qlist.append(newqueue)
self.indata_qlist_local.append(newqueue)
return newqueue
def removeListener(self, queue):
def removeInQueueListener(self, queue):
"""
Remove an input listener queue from the queue list.
"""
# Uses a local queue list to find the index, based on the queue
self.handleMessages()
idx = self.in_qlist_local.index(queue)
del self.in_qlist[idx]
del self.in_qlist_local[idx]
def nListeners(self):
"""Returns the current number of installed listeners."""
return len(self.in_qlist)
idx = self.indata_qlist_local.index(queue)
del self.indata_qlist[idx]
del self.indata_qlist_local[idx]
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration, wait=False):
"""
@ -592,6 +630,7 @@ class StreamManager:
Stub, TODO: for future
"""
return False
def sendPipe(self, msg, *data):
"""
Send a message with data over the control pipe

View File

@ -25,11 +25,12 @@ QUEUE_BUFFER_TIME = 0.5 # The amount of time used in the queues for buffering
__all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"]
@unique
class SignalType(Enum):
Periodic = auto()
Noise = auto()
Sweep = auto()
Meas = auto()
Periodic = 0
Noise = 1
Sweep = 2
Meas = 3
@unique
@ -64,6 +65,7 @@ class SiggenMessage(Enum):
endProcess = auto() # Stop and quit the signal generator
adjustVolume = auto() # Adjust the volume
newEqSettings = auto() # Forward new equalizer settings
newSiggenData = auto() # Forward new equalizer settings
ready = auto() # Send out once, once the signal generator is ready with
# pre-generating data.
@ -132,6 +134,12 @@ class SiggenProcess(mp.Process):
Args:
siggendata: SiggenData. Metadata to create a new signal generator.
"""
logging.debug('newSiggen')
# Cleanup old data queue.
while not self.dataq.empty():
self.dataq.get()
fs = siggendata.fs
nframes_per_block = siggendata.nframes_per_block
level_dB = siggendata.level_dB
@ -220,6 +228,7 @@ class SiggenProcess(mp.Process):
self.pipe.send((SiggenMessage.ready, None))
while True:
# Wait here for a while, to check for messages to consume
if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 2):
msg, data = self.pipe.recv()
if msg == SiggenMessage.endProcess:
@ -232,6 +241,9 @@ class SiggenProcess(mp.Process):
elif msg == SiggenMessage.newEqSettings:
eqdata = data
eq = self.newEqualizer(eqdata)
elif msg == SiggenMessage.newSiggenData:
siggendata = data
self.siggen = self.newSiggen(siggendata)
else:
self.pipe.send(
SiggenMessage.error, "BUG: Generator caught unknown message. Quiting"
@ -272,7 +284,7 @@ class Siggen:
logging.debug('Signal generator ready')
elif msg == SiggenMessage.error:
e = data
raise RuntimeError('Signal generator exception: {str(e)}')
raise RuntimeError(f'Signal generator exception: {str(e)}')
else:
# Done, or something
if msg == SiggenMessage.done:
@ -292,6 +304,12 @@ class Siggen:
def setEqData(self, eqdata):
self.pipe.send((SiggenMessage.newEqSettings, eqdata))
def setSiggenData(self, siggendata: SiggenData):
"""
Updates the whole signal generator, based on new signal generator data.
"""
self.pipe.send((SiggenMessage.newSiggenData, siggendata))
def handle_msgs(self):
while self.pipe.poll():
msg, data = self.pipe.recv()