From 466a6f5cc140f00a8d06e5926eaa7e1f82b2f7f1 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Wed, 5 May 2021 19:48:04 +0200 Subject: [PATCH] Stream and recording seems to work. Also signal generator seems to work. Error handling is not working properly yet. --- lasp/device/lasp_daq.pyx | 2 +- lasp/lasp_avstream.py | 12 +++ lasp/lasp_siggen.py | 205 ++++++++++++++++++--------------------- scripts/lasp_record | 3 +- scripts/lasp_siggen | 121 ++++++++++++----------- 5 files changed, 175 insertions(+), 168 deletions(-) diff --git a/lasp/device/lasp_daq.pyx b/lasp/device/lasp_daq.pyx index fdeabe9..5529dee 100644 --- a/lasp/device/lasp_daq.pyx +++ b/lasp/device/lasp_daq.pyx @@ -82,7 +82,7 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: callback = sd.pyCallback # print(f'Number of input channels: {ninchannels}') # print(f'Number of out channels: {noutchannels}') - fprintf(stderr, 'Sleep time: %d us', sleeptime_us) + fprintf(stderr, 'Sleep time: %d us\n', sleeptime_us) while not sd.stopThread.load(): with gil: diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index 82e2688..110e7ed 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -169,9 +169,11 @@ class AvStreamProcess(mp.Process): def streamCallback(self, indata, outdata, nframes): """This is called (from a separate thread) for each audio block.""" + # logging.debug('streamCallback()') self.aframectr += nframes if self.siggen_activated: + # logging.debug('siggen_activated') if self.outq.empty(): outdata[:, :] = 0 msgtxt = 'Output signal buffer underflow' @@ -182,6 +184,7 @@ class AvStreamProcess(mp.Process): if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1: self.pipe.send(StreamMsg.streamFatalError, 'Invalid output data obtained from queue') return 1 + outdata[:, :] = newdata[:, np.newaxis] if indata is not None: self.putAllInQueues(StreamMsg.streamData, indata) @@ -283,6 +286,15 @@ class AvStream: """ return self.outq + def activateSiggen(self): + logging.debug('activateSiggen()') + self.pipe.send((StreamMsg.activateSiggen, None)) + + def deactivateSiggen(self): + logging.debug('activateSiggen()') + self.pipe.send((StreamMsg.deactivateSiggen, None)) + + def addListener(self): """ Add a listener queue to the list of queues, and return the queue. diff --git a/lasp/lasp_siggen.py b/lasp/lasp_siggen.py index fcd5e7b..ce776f2 100644 --- a/lasp/lasp_siggen.py +++ b/lasp/lasp_siggen.py @@ -62,14 +62,11 @@ class SiggenMessage(Enum): Different messages that can be send to the signal generator over the pipe connection. """ - stop = auto() # Stop and quit the signal generator - generate = auto() adjustVolume = auto() # Adjust the volume newEqSettings = auto() # Forward new equalizer settings # These messages are send back to the main thread over the pipe - ready = auto() error = auto() done = auto() @@ -95,71 +92,45 @@ class SiggenData: signaltypedata: Tuple = None -def siggenFcn(siggendata: SiggenData, dataq: mp.Queue, pipe): +class SiggenProcess(mp.Process): """ Main function running in a different process, is responsible for generating new signal data. Uses the signal queue to push new generated signal data on. - - Args: - siggendata: The signal generator data to start with. - dataq: The queue to put generated signal on - pipe: Control and status messaging pipe """ - fs = siggendata.fs - nframes_per_block = siggendata.nframes_per_block - level_dB = siggendata.level_dB - dtype = siggendata.dtype + def __init__(self, siggendata, dataq, pipe): - signaltype = siggendata.signaltype - signaltypedata = siggendata.signaltypedata - - nblocks_buffer = max( - 1, int(QUEUE_BUFFER_TIME * fs/ nframes_per_block) - ) - - def generate(siggen, eq): """ - Generate a single block of data - """ - signal = siggen.genSignal(nframes_per_block) - if eq is not None: - signal = eq.equalize(signal) - if np.issubdtype((dtype := siggendata.dtype), np.integer): - bitdepth_fixed = dtype.itemsize * 8 - signal *= 2 ** (bitdepth_fixed - 1) - 1 - dataq.put(signal.astype(dtype)) - - def createEqualizer(eqdata): - """ - Create an equalizer object from equalizer data - Args: - eqdata: dictionary containing equalizer data. TODO: document the - requiring fields. + siggendata: The signal generator data to start with. + dataq: The queue to put generated signal on + pipe: Control and status messaging pipe """ - if eqdata is None: - return None - eq_type = eqdata['type'] - eq_levels = eqdata['levels'] - if eq_type == 'three': - fb = SosThirdOctaveFilterBank(fs) - elif eq_type == 'one': - fb = SosOctaveFilterBank(fs) + self.dataq = dataq + self.siggendata = siggendata + self.pipe = pipe + self.eq = None + self.siggen = None - eq = Equalizer(fb._fb) - if eq_levels is not None: - eq.setLevels(eq_levels) - return eq + fs = self.siggendata.fs + nframes_per_block = siggendata.nframes_per_block + self.nblocks_buffer = max( + 1, int(QUEUE_BUFFER_TIME * fs/ nframes_per_block) + ) + super().__init__() - eq = createEqualizer(siggendata.eqdata) - - def newSiggen(): + def newSiggen(self, siggendata): """ Create a signal generator based on parameters specified in global function data. """ + fs = siggendata.fs + nframes_per_block = siggendata.nframes_per_block + level_dB = siggendata.level_dB + signaltype = siggendata.signaltype + signaltypedata = siggendata.signaltypedata + if signaltype == SignalType.Periodic: freq, = signaltypedata siggen = pyxSiggen.sineWave(fs, freq, level_dB) @@ -181,47 +152,82 @@ def siggenFcn(siggendata: SiggenData, dataq: mp.Queue, pipe): else: raise ValueError(f"Not implemented signal type: {signaltype}") - # Pre-generate blocks of signal data - while dataq.qsize() < nblocks_buffer: - generate(siggen, eq) - return siggen - # Initialization - try: - siggen = newSiggen() + def generate(self): + """ + Generate a single block of data and put it on the data queue + """ + signal = self.siggen.genSignal(self.siggendata.nframes_per_block) + dtype = self.siggendata.dtype + if self.eq is not None: + signal = self.eq.equalize(signal) + if np.issubdtype(dtype, np.integer): + bitdepth_fixed = dtype.itemsize * 8 + signal *= 2 ** (bitdepth_fixed - 1) - 1 + self.dataq.put(signal.astype(dtype)) - except Exception as e: - pipe.send((SiggenMessage.error, str(e))) - return 1 + def newEqualizer(self, eqdata): + """ + Create an equalizer object from equalizer data - finally: - pipe.send((SiggenMessage.done, None)) + Args: + eqdata: dictionary containing equalizer data. TODO: document the + requiring fields. + """ + if eqdata is None: + return None + eq_type = eqdata['type'] + eq_levels = eqdata['levels'] + fs = self.siggendata.fs - while True: - if pipe.poll(timeout=QUEUE_BUFFER_TIME / 2): - msg, data = pipe.recv() - if msg == SiggenMessage.stop: - logging.debug("Signal generator caught 'stop' message. Exiting.") - return 0 - elif msg == SiggenMessage.adjustVolume: - logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") - level_dB = data - siggen.setLevel(level_dB) - elif msg == SiggenMessage.newEqSettings: - eqdata = data - eq = createEqualizer(eqdata) - else: - pipe.send( - SiggenMessage.error, "BUG: Generator caught unknown message. Quiting" - ) - elif dataq.qsize() < nblocks_buffer: - # Generate new data and put it in the queue! - try: - generate(siggen, eq) - except SiggenWorkerDone: - pipe.send(SiggenMessage.done) - return 0 + if eq_type == 'three': + fb = SosThirdOctaveFilterBank(fs) + elif eq_type == 'one': + fb = SosOctaveFilterBank(fs) + + eq = Equalizer(fb._fb) + if eq_levels is not None: + eq.setLevels(eq_levels) + return eq + + def run(self): + # Initialization + try: + self.siggen = self.newSiggen(self.siggendata) + self.eq = self.newEqualizer(self.siggendata.eqdata) + except Exception as e: + self.pipe.send((SiggenMessage.error, str(e))) + return 1 + + # Pre-generate blocks of signal data + while self.dataq.qsize() < self.nblocks_buffer: + self.generate() + + while True: + if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 2): + msg, data = self.pipe.recv() + if msg == SiggenMessage.stop: + logging.debug("Signal generator caught 'stop' message. Exiting.") + return 0 + elif msg == SiggenMessage.adjustVolume: + logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") + level_dB = data + self.siggen.setLevel(level_dB) + elif msg == SiggenMessage.newEqSettings: + eqdata = data + eq = self.newEqualizer(eqdata) + else: + self.pipe.send( + SiggenMessage.error, "BUG: Generator caught unknown message. Quiting" + ) + while self.dataq.qsize() < self.nblocks_buffer: + # Generate new data and put it in the queue! + try: + self.generate() + except SiggenWorkerDone: + self.pipe.send(SiggenMessage.done) + return 0 return 1 @@ -237,10 +243,7 @@ class Siggen: self.pipe, client_end = mp.Pipe(duplex=True) - self.process = mp.Process( - target=siggenFcn, - args=(siggendata, dataq, client_end), - ) + self.process = SiggenProcess(siggendata, dataq, client_end) self.process.start() self.handle_msgs() @@ -285,9 +288,6 @@ class Siggen: logging.debug('Siggen::stop()') if self.stopped: raise RuntimeError('BUG: Siggen::stop() is called twice!') - self.stream.removeCallback(self.streamCallback, AvType.audio_output) - while not self.dataq.empty(): - self.dataq.get() self.pipe.send((SiggenMessage.stop, None)) self.pipe.close() @@ -300,18 +300,3 @@ class Siggen: logging.debug('End Siggen::stop()') self.stopped = True - def streamCallback(self, indata, outdata, blockctr): - """Callback from AvStream. - - Copy generated signal from queue - """ - # logging.debug('Siggen::streamCallback()') - assert outdata is not None - if not self.dataq.empty(): - outdata[:, :] = self.dataq.get()[:, np.newaxis] - else: - logging.warning("Signal generator queue empty!") - outdata[:, :] = 0 - - if self.dataq.qsize() < self.nblocks_buffer: - self.pipe.send((SiggenMessage.generate, None)) diff --git a/scripts/lasp_record b/scripts/lasp_record index fc64a2d..7f6f8fc 100755 --- a/scripts/lasp_record +++ b/scripts/lasp_record @@ -4,8 +4,7 @@ logging.basicConfig(level=logging.DEBUG) import multiprocessing from lasp.lasp_multiprocessingpatch import apply_patch - -from lasp.device import Daq, DaqChannel, DaqConfigurations +from lasp.device import DaqConfigurations from lasp.lasp_avstream import AvStream, AvType from lasp.lasp_record import Recording diff --git a/scripts/lasp_siggen b/scripts/lasp_siggen index c3b1524..eca1f68 100755 --- a/scripts/lasp_siggen +++ b/scripts/lasp_siggen @@ -1,65 +1,76 @@ #!/usr/bin/python3 import argparse import numpy as np - - -parser = argparse.ArgumentParser( - description='Play a sine wave' -) -device_help = 'DAQ Device to play to' -parser.add_argument('--device', '-d', help=device_help, type=str, - default='Default') - -args = parser.parse_args() - +import sys, logging, os, argparse +logging.basicConfig(level=logging.DEBUG) +import multiprocessing +from lasp.lasp_multiprocessingpatch import apply_patch from lasp.lasp_avstream import AvStream, AvType -from lasp.device import DAQConfiguration, RtAudio - -config = DAQConfiguration.loadConfigs()[args.device] - -rtaudio = RtAudio() -devices = rtaudio.getDeviceInfo() - -output_devices = {} -for device in devices: - if device.outputchannels >= 0: - output_devices[device.name] = device - -try: - output_device = output_devices[config.output_device_name] -except KeyError: - raise RuntimeError(f'output device {config.output_device_name} not available') - -samplerate = int(config.en_output_rate) -stream = AvStream(output_device, - AvType.audio_output, - config) - -# freq = 440. -freq = 1000. -omg = 2*np.pi*freq +from lasp.lasp_siggen import Siggen, SignalType, SiggenData +from lasp.device import DaqConfigurations -def mycallback(indata, outdata, blockctr): - frames = outdata.shape[0] - nchannels = outdata.shape[1] - # nchannels = 1 - streamtime = blockctr*frames/samplerate - t = np.linspace(streamtime, streamtime + frames/samplerate, - frames)[np.newaxis, :] - outp = 0.01*np.sin(omg*t) - for i in range(nchannels): - outdata[:,i] = ((2**16-1)*outp).astype(np.int16) +if __name__ == '__main__': + multiprocessing.set_start_method('forkserver', force=True) + parser = argparse.ArgumentParser( + description='Play a sine wave' + ) + device_help = 'DAQ Device to play to' + parser.add_argument('--device', '-d', help=device_help, type=str, + default='Default') -stream.addCallback(mycallback, AvType.audio_output) -stream.start() + args = parser.parse_args() -input() -print('Stopping stream...') -stream.stop() + configs = DaqConfigurations.loadConfigs() + + config_keys = [key for key in configs.keys()] + for i, key in enumerate(config_keys): + print(f'{i:2} : {key}') + + choosen_index = input('Number of configuration to use: ') + try: + daqindex = int(choosen_index) + except: + sys.exit(0) + + + choosen_key = config_keys[daqindex] + config = configs[choosen_key].output_config + + print(f'Choosen configuration: {choosen_key}') + + + + try: + siggendata = SiggenData( + fs=48e3, + nframes_per_block=1024, + dtype=np.dtype(np.int16), + eqdata=None, + level_dB=-20, + signaltype=SignalType.Periodic, + signaltypedata=(1000.,) + ) + + stream = AvStream( + AvType.audio_output, + config) + + outq = stream.getOutputQueue() + stream.activateSiggen() + siggen = Siggen(outq, siggendata) + + stream.start() + input('Press any key to stop...') + stream.stop() + siggen.stop() + + finally: + try: + stream.cleanup() + del stream + except NameError: + pass + -print('Stream stopped') -print('Closing stream...') -stream.close() -print('Stream closed')