From 11cc623363b75b5a453a9d2fd1ec5336a02b6a8b Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Thu, 29 Apr 2021 22:07:20 +0200 Subject: [PATCH] First work on new Siggen implementation --- lasp/lasp_siggen.py | 279 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 279 insertions(+) create mode 100644 lasp/lasp_siggen.py diff --git a/lasp/lasp_siggen.py b/lasp/lasp_siggen.py new file mode 100644 index 0000000..0144d7f --- /dev/null +++ b/lasp/lasp_siggen.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3.6 +# -*- coding: utf-8 -*- +""" +Author: J.A. de Jong - ASCEE + +Description: Signal generator code + +""" +import dataclasses +import logging +import multiprocessing as mp +from typing import Tuple + +import numpy as np + +from .filter import PinkNoise +from .lasp_avstream import AvStream, AvType +from .wrappers import Siggen as pyxSiggen + +QUEUE_BUFFER_TIME = 0.3 # The amount of time used in the queues for buffering +# of data, larger is more stable, but also enlarges latency + +__all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"] + + +class SignalType: + Periodic = 0 + Noise = 1 + Sweep = 2 + Meas = 3 + + +class NoiseType: + white = (0, "White noise") + pink = (1, "Pink noise") + types = (white, pink) + + @staticmethod + def fillComboBox(combo): + for type_ in NoiseType.types: + combo.addItem(type_[1]) + + @staticmethod + def getCurrent(cb): + return NoiseType.types[cb.currentIndex()] + + +class SiggenWorkerDone(Exception): + def __str__(self): + return "Done generating signal" + + +class SiggenMessage: + """ + Different messages that can be send to the signal generator over the pipe + connection. + """ + + stop = 0 # Stop and quit the signal generator + generate = 1 + adjustVolume = 2 # Adjust the volume + newEqSettings = 3 # Forward new equalizer settings + + # These messages are send back to the main thread over the pipe + ready = 4 + error = 5 + done = 6 + + +@dataclasses.dataclass +class SiggenData: + fs: float # Sample rate [Hz] + + # Number of frames "samples" to send in one block + nframes_per_block: int + + # The data type to output + dtype: np.dtype + + # Settings for the equalizer etc + eqdata: object # Equalizer data + + # Level of output signal [dBFS]el + level_dB: float + + # Signal type specific data, i.e. + signaltype: SignalType + signaltypedata: Tuple = None + + +def siggenFcn(siggendata: SiggenData, nblocks_buffer: int, + queues: list + ): + """ + Main function running in a different process, is responsible for generating + new signal data. Uses the signal queue to push new generated signal data + on. + """ + fs = siggendata.fs + nframes_per_block = siggendata.nframes_per_block + level_dB = siggendata.level_dB + dtype = siggendata.dtype + eq = None + + signaltype = siggendata.signaltype + signaltypedata = siggendata.signaltypedata + + dataq, msgq, statusq = queues + + def generate(siggen, eq): + """ + Generate a single block of data + """ + signal = siggen.genSignal(nframes_per_block) + if eq is not None: + signal = eq.equalize(signal) + if np.issubdtype((dtype := siggendata.dtype), np.integer): + bitdepth_fixed = dtype.itemsize * 8 + signal *= 2 ** (bitdepth_fixed - 1) - 1 + dataq.put(signal.astype(dtype)) + + def newSiggen(): + """ + Create a signal generator based on parameters specified in global + function data. + """ + if signaltype == SignalType.Periodic: + freq, = signaltypedata + siggen = pyxSiggen.sineWave(fs, freq, level_dB) + elif signaltype == SignalType.Noise: + noisetype, zerodBpoint = signaltypedata + if noisetype == NoiseType.white: + sos_colorfilter = None + elif noisetype == NoiseType.pink: + sos_colorfilter = PinkNoise(fs, zerodBpoint).flatten() + else: + raise ValueError(f"Unknown noise type") + + siggen = pyxSiggen.noise(fs, level_dB, sos_colorfilter) + + elif signaltype == SignalType.Sweep: + fl, fu, Ts, Tq, sweep_flags = signaltypedata + siggen = pyxSiggen.sweep(fs, fl, fu, Ts, Tq, sweep_flags, level_dB) + + else: + raise ValueError(f"Not implemented signal type: {signaltype}") + + # Pre-generate blocks of signal data + while dataq.qsize() < nblocks_buffer: + generate(siggen, eq) + + return siggen + + # Initialization + try: + siggen = newSiggen() + + except Exception as e: + statusq.put((SiggenMessage.error, str(e))) + return 1 + + finally: + statusq.put((SiggenMessage.done, None)) + + while True: + msg, data = msgq.get() + if msg == SiggenMessage.stop: + logging.debug("Signal generator caught 'stop' message. Exiting.") + return 0 + elif msg == SiggenMessage.generate: + # logging.debug(f"Signal generator caught 'generate' message") + try: + while dataq.qsize() < nblocks_buffer: + # Generate new data and put it in the queue! + generate(siggen, eq) + except SiggenWorkerDone: + statusq.put(SiggenMessage.done) + return 0 + elif msg == SiggenMessage.adjustVolume: + logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") + level_dB = data + siggen = newSiggen() + else: + statusq.put( + SiggenMessage.error, "BUG: Generator caught unknown message. Quiting" + ) + return 1 + + +class Siggen: + """ + Signal generator class, generates signal data in a different process to + unload the work in the calling thread. + """ + + def __init__(self, stream: AvStream, siggendata: SiggenData): + """""" + self.stream = stream + self.nblocks_buffer = max( + 1, int(QUEUE_BUFFER_TIME * stream.samplerate / stream.blocksize) + ) + + qs = [mp.Queue() for i in range(3)] + self.dataq, self.msgq, self.statusq = qs + + self.process = mp.Process( + target=siggenFcn, + args=(siggendata, self.nblocks_buffer, qs), + ) + self.process.start() + + self.handle_msgs() + if not self.process.is_alive(): + raise RuntimeError('Unexpected signal generator exception') + + self.stopped = False + + def setLevel(self, new_level): + """ + Set a new signal level to the generator + + Args: + new_level: The new level in [dBFS] + """ + self.msgq.put((SiggenMessage.adjustVolume, new_level)) + + def handle_msgs(self): + while not self.statusq.empty(): + msg, data = self.statusq.get() + if msg == SiggenMessage.error: + self.stop() + raise RuntimeError( + f"Error in initialization of signal generator: {data}" + ) + elif msg == SiggenMessage.ready: + return + + elif msg == SiggenMessage.done: + self.stop() + + def start(self): + self.stream.addCallback(self.streamCallback, AvType.audio_output) + self.handle_msgs() + + def stop(self): + logging.debug('Siggen::stop()') + if self.stopped: + raise RuntimeError('BUG: Siggen::stop() is called twice!') + self.stream.removeCallback(self.streamCallback, AvType.audio_output) + while not self.dataq.empty(): + self.dataq.get() + while not self.statusq.empty(): + self.statusq.get() + self.msgq.put((SiggenMessage.stop, None)) + + logging.debug('Joining siggen process') + self.process.join() + logging.debug('Joining siggen process done') + self.process.close() + + self.process = None + logging.debug('End Siggen::stop()') + self.stopped = True + + def streamCallback(self, indata, outdata, blockctr): + """Callback from AvStream. + + Copy generated signal from queue + """ + # logging.debug('Siggen::streamCallback()') + assert outdata is not None + if not self.dataq.empty(): + outdata[:, :] = self.dataq.get()[:, np.newaxis] + else: + logging.warning("Signal generator queue empty!") + outdata[:, :] = 0 + + if self.dataq.qsize() < self.nblocks_buffer: + self.msgq.put((SiggenMessage.generate, None))