Stream and recording seems to work. Also signal generator seems to work. Error handling is not working properly yet.

This commit is contained in:
Anne de Jong 2021-05-05 19:48:04 +02:00
parent 4657063467
commit 466a6f5cc1
5 changed files with 175 additions and 168 deletions

View File

@ -82,7 +82,7 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil:
callback = <object> sd.pyCallback
# print(f'Number of input channels: {ninchannels}')
# print(f'Number of out channels: {noutchannels}')
fprintf(stderr, 'Sleep time: %d us', sleeptime_us)
fprintf(stderr, 'Sleep time: %d us\n', sleeptime_us)
while not sd.stopThread.load():
with gil:

View File

@ -169,9 +169,11 @@ class AvStreamProcess(mp.Process):
def streamCallback(self, indata, outdata, nframes):
"""This is called (from a separate thread) for each audio block."""
# logging.debug('streamCallback()')
self.aframectr += nframes
if self.siggen_activated:
# logging.debug('siggen_activated')
if self.outq.empty():
outdata[:, :] = 0
msgtxt = 'Output signal buffer underflow'
@ -182,6 +184,7 @@ class AvStreamProcess(mp.Process):
if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1:
self.pipe.send(StreamMsg.streamFatalError, 'Invalid output data obtained from queue')
return 1
outdata[:, :] = newdata[:, np.newaxis]
if indata is not None:
self.putAllInQueues(StreamMsg.streamData, indata)
@ -283,6 +286,15 @@ class AvStream:
"""
return self.outq
def activateSiggen(self):
logging.debug('activateSiggen()')
self.pipe.send((StreamMsg.activateSiggen, None))
def deactivateSiggen(self):
logging.debug('activateSiggen()')
self.pipe.send((StreamMsg.deactivateSiggen, None))
def addListener(self):
"""
Add a listener queue to the list of queues, and return the queue.

View File

@ -62,14 +62,11 @@ class SiggenMessage(Enum):
Different messages that can be send to the signal generator over the pipe
connection.
"""
stop = auto() # Stop and quit the signal generator
generate = auto()
adjustVolume = auto() # Adjust the volume
newEqSettings = auto() # Forward new equalizer settings
# These messages are send back to the main thread over the pipe
ready = auto()
error = auto()
done = auto()
@ -95,71 +92,45 @@ class SiggenData:
signaltypedata: Tuple = None
def siggenFcn(siggendata: SiggenData, dataq: mp.Queue, pipe):
class SiggenProcess(mp.Process):
"""
Main function running in a different process, is responsible for generating
new signal data. Uses the signal queue to push new generated signal data
on.
Args:
siggendata: The signal generator data to start with.
dataq: The queue to put generated signal on
pipe: Control and status messaging pipe
"""
fs = siggendata.fs
nframes_per_block = siggendata.nframes_per_block
level_dB = siggendata.level_dB
dtype = siggendata.dtype
def __init__(self, siggendata, dataq, pipe):
signaltype = siggendata.signaltype
signaltypedata = siggendata.signaltypedata
nblocks_buffer = max(
1, int(QUEUE_BUFFER_TIME * fs/ nframes_per_block)
)
def generate(siggen, eq):
"""
Generate a single block of data
"""
signal = siggen.genSignal(nframes_per_block)
if eq is not None:
signal = eq.equalize(signal)
if np.issubdtype((dtype := siggendata.dtype), np.integer):
bitdepth_fixed = dtype.itemsize * 8
signal *= 2 ** (bitdepth_fixed - 1) - 1
dataq.put(signal.astype(dtype))
def createEqualizer(eqdata):
"""
Create an equalizer object from equalizer data
Args:
eqdata: dictionary containing equalizer data. TODO: document the
requiring fields.
siggendata: The signal generator data to start with.
dataq: The queue to put generated signal on
pipe: Control and status messaging pipe
"""
if eqdata is None:
return None
eq_type = eqdata['type']
eq_levels = eqdata['levels']
if eq_type == 'three':
fb = SosThirdOctaveFilterBank(fs)
elif eq_type == 'one':
fb = SosOctaveFilterBank(fs)
self.dataq = dataq
self.siggendata = siggendata
self.pipe = pipe
self.eq = None
self.siggen = None
eq = Equalizer(fb._fb)
if eq_levels is not None:
eq.setLevels(eq_levels)
return eq
fs = self.siggendata.fs
nframes_per_block = siggendata.nframes_per_block
self.nblocks_buffer = max(
1, int(QUEUE_BUFFER_TIME * fs/ nframes_per_block)
)
super().__init__()
eq = createEqualizer(siggendata.eqdata)
def newSiggen():
def newSiggen(self, siggendata):
"""
Create a signal generator based on parameters specified in global
function data.
"""
fs = siggendata.fs
nframes_per_block = siggendata.nframes_per_block
level_dB = siggendata.level_dB
signaltype = siggendata.signaltype
signaltypedata = siggendata.signaltypedata
if signaltype == SignalType.Periodic:
freq, = signaltypedata
siggen = pyxSiggen.sineWave(fs, freq, level_dB)
@ -181,47 +152,82 @@ def siggenFcn(siggendata: SiggenData, dataq: mp.Queue, pipe):
else:
raise ValueError(f"Not implemented signal type: {signaltype}")
# Pre-generate blocks of signal data
while dataq.qsize() < nblocks_buffer:
generate(siggen, eq)
return siggen
# Initialization
try:
siggen = newSiggen()
def generate(self):
"""
Generate a single block of data and put it on the data queue
"""
signal = self.siggen.genSignal(self.siggendata.nframes_per_block)
dtype = self.siggendata.dtype
if self.eq is not None:
signal = self.eq.equalize(signal)
if np.issubdtype(dtype, np.integer):
bitdepth_fixed = dtype.itemsize * 8
signal *= 2 ** (bitdepth_fixed - 1) - 1
self.dataq.put(signal.astype(dtype))
except Exception as e:
pipe.send((SiggenMessage.error, str(e)))
return 1
def newEqualizer(self, eqdata):
"""
Create an equalizer object from equalizer data
finally:
pipe.send((SiggenMessage.done, None))
Args:
eqdata: dictionary containing equalizer data. TODO: document the
requiring fields.
"""
if eqdata is None:
return None
eq_type = eqdata['type']
eq_levels = eqdata['levels']
fs = self.siggendata.fs
while True:
if pipe.poll(timeout=QUEUE_BUFFER_TIME / 2):
msg, data = pipe.recv()
if msg == SiggenMessage.stop:
logging.debug("Signal generator caught 'stop' message. Exiting.")
return 0
elif msg == SiggenMessage.adjustVolume:
logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS")
level_dB = data
siggen.setLevel(level_dB)
elif msg == SiggenMessage.newEqSettings:
eqdata = data
eq = createEqualizer(eqdata)
else:
pipe.send(
SiggenMessage.error, "BUG: Generator caught unknown message. Quiting"
)
elif dataq.qsize() < nblocks_buffer:
# Generate new data and put it in the queue!
try:
generate(siggen, eq)
except SiggenWorkerDone:
pipe.send(SiggenMessage.done)
return 0
if eq_type == 'three':
fb = SosThirdOctaveFilterBank(fs)
elif eq_type == 'one':
fb = SosOctaveFilterBank(fs)
eq = Equalizer(fb._fb)
if eq_levels is not None:
eq.setLevels(eq_levels)
return eq
def run(self):
# Initialization
try:
self.siggen = self.newSiggen(self.siggendata)
self.eq = self.newEqualizer(self.siggendata.eqdata)
except Exception as e:
self.pipe.send((SiggenMessage.error, str(e)))
return 1
# Pre-generate blocks of signal data
while self.dataq.qsize() < self.nblocks_buffer:
self.generate()
while True:
if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 2):
msg, data = self.pipe.recv()
if msg == SiggenMessage.stop:
logging.debug("Signal generator caught 'stop' message. Exiting.")
return 0
elif msg == SiggenMessage.adjustVolume:
logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS")
level_dB = data
self.siggen.setLevel(level_dB)
elif msg == SiggenMessage.newEqSettings:
eqdata = data
eq = self.newEqualizer(eqdata)
else:
self.pipe.send(
SiggenMessage.error, "BUG: Generator caught unknown message. Quiting"
)
while self.dataq.qsize() < self.nblocks_buffer:
# Generate new data and put it in the queue!
try:
self.generate()
except SiggenWorkerDone:
self.pipe.send(SiggenMessage.done)
return 0
return 1
@ -237,10 +243,7 @@ class Siggen:
self.pipe, client_end = mp.Pipe(duplex=True)
self.process = mp.Process(
target=siggenFcn,
args=(siggendata, dataq, client_end),
)
self.process = SiggenProcess(siggendata, dataq, client_end)
self.process.start()
self.handle_msgs()
@ -285,9 +288,6 @@ class Siggen:
logging.debug('Siggen::stop()')
if self.stopped:
raise RuntimeError('BUG: Siggen::stop() is called twice!')
self.stream.removeCallback(self.streamCallback, AvType.audio_output)
while not self.dataq.empty():
self.dataq.get()
self.pipe.send((SiggenMessage.stop, None))
self.pipe.close()
@ -300,18 +300,3 @@ class Siggen:
logging.debug('End Siggen::stop()')
self.stopped = True
def streamCallback(self, indata, outdata, blockctr):
"""Callback from AvStream.
Copy generated signal from queue
"""
# logging.debug('Siggen::streamCallback()')
assert outdata is not None
if not self.dataq.empty():
outdata[:, :] = self.dataq.get()[:, np.newaxis]
else:
logging.warning("Signal generator queue empty!")
outdata[:, :] = 0
if self.dataq.qsize() < self.nblocks_buffer:
self.pipe.send((SiggenMessage.generate, None))

View File

@ -4,8 +4,7 @@ logging.basicConfig(level=logging.DEBUG)
import multiprocessing
from lasp.lasp_multiprocessingpatch import apply_patch
from lasp.device import Daq, DaqChannel, DaqConfigurations
from lasp.device import DaqConfigurations
from lasp.lasp_avstream import AvStream, AvType
from lasp.lasp_record import Recording

View File

@ -1,65 +1,76 @@
#!/usr/bin/python3
import argparse
import numpy as np
parser = argparse.ArgumentParser(
description='Play a sine wave'
)
device_help = 'DAQ Device to play to'
parser.add_argument('--device', '-d', help=device_help, type=str,
default='Default')
args = parser.parse_args()
import sys, logging, os, argparse
logging.basicConfig(level=logging.DEBUG)
import multiprocessing
from lasp.lasp_multiprocessingpatch import apply_patch
from lasp.lasp_avstream import AvStream, AvType
from lasp.device import DAQConfiguration, RtAudio
config = DAQConfiguration.loadConfigs()[args.device]
rtaudio = RtAudio()
devices = rtaudio.getDeviceInfo()
output_devices = {}
for device in devices:
if device.outputchannels >= 0:
output_devices[device.name] = device
try:
output_device = output_devices[config.output_device_name]
except KeyError:
raise RuntimeError(f'output device {config.output_device_name} not available')
samplerate = int(config.en_output_rate)
stream = AvStream(output_device,
AvType.audio_output,
config)
# freq = 440.
freq = 1000.
omg = 2*np.pi*freq
from lasp.lasp_siggen import Siggen, SignalType, SiggenData
from lasp.device import DaqConfigurations
def mycallback(indata, outdata, blockctr):
frames = outdata.shape[0]
nchannels = outdata.shape[1]
# nchannels = 1
streamtime = blockctr*frames/samplerate
t = np.linspace(streamtime, streamtime + frames/samplerate,
frames)[np.newaxis, :]
outp = 0.01*np.sin(omg*t)
for i in range(nchannels):
outdata[:,i] = ((2**16-1)*outp).astype(np.int16)
if __name__ == '__main__':
multiprocessing.set_start_method('forkserver', force=True)
parser = argparse.ArgumentParser(
description='Play a sine wave'
)
device_help = 'DAQ Device to play to'
parser.add_argument('--device', '-d', help=device_help, type=str,
default='Default')
stream.addCallback(mycallback, AvType.audio_output)
stream.start()
args = parser.parse_args()
input()
print('Stopping stream...')
stream.stop()
configs = DaqConfigurations.loadConfigs()
config_keys = [key for key in configs.keys()]
for i, key in enumerate(config_keys):
print(f'{i:2} : {key}')
choosen_index = input('Number of configuration to use: ')
try:
daqindex = int(choosen_index)
except:
sys.exit(0)
choosen_key = config_keys[daqindex]
config = configs[choosen_key].output_config
print(f'Choosen configuration: {choosen_key}')
try:
siggendata = SiggenData(
fs=48e3,
nframes_per_block=1024,
dtype=np.dtype(np.int16),
eqdata=None,
level_dB=-20,
signaltype=SignalType.Periodic,
signaltypedata=(1000.,)
)
stream = AvStream(
AvType.audio_output,
config)
outq = stream.getOutputQueue()
stream.activateSiggen()
siggen = Siggen(outq, siggendata)
stream.start()
input('Press any key to stop...')
stream.stop()
siggen.stop()
finally:
try:
stream.cleanup()
del stream
except NameError:
pass
print('Stream stopped')
print('Closing stream...')
stream.close()
print('Stream closed')