First work on new Siggen implementation

This commit is contained in:
Anne de Jong 2021-04-29 22:07:20 +02:00
parent 99b8553235
commit 11cc623363

279
lasp/lasp_siggen.py Normal file
View File

@ -0,0 +1,279 @@
#!/usr/bin/env python3.6
# -*- coding: utf-8 -*-
"""
Author: J.A. de Jong - ASCEE
Description: Signal generator code
"""
import dataclasses
import logging
import multiprocessing as mp
from typing import Tuple
import numpy as np
from .filter import PinkNoise
from .lasp_avstream import AvStream, AvType
from .wrappers import Siggen as pyxSiggen
QUEUE_BUFFER_TIME = 0.3 # The amount of time used in the queues for buffering
# of data, larger is more stable, but also enlarges latency
__all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"]
class SignalType:
Periodic = 0
Noise = 1
Sweep = 2
Meas = 3
class NoiseType:
white = (0, "White noise")
pink = (1, "Pink noise")
types = (white, pink)
@staticmethod
def fillComboBox(combo):
for type_ in NoiseType.types:
combo.addItem(type_[1])
@staticmethod
def getCurrent(cb):
return NoiseType.types[cb.currentIndex()]
class SiggenWorkerDone(Exception):
def __str__(self):
return "Done generating signal"
class SiggenMessage:
"""
Different messages that can be send to the signal generator over the pipe
connection.
"""
stop = 0 # Stop and quit the signal generator
generate = 1
adjustVolume = 2 # Adjust the volume
newEqSettings = 3 # Forward new equalizer settings
# These messages are send back to the main thread over the pipe
ready = 4
error = 5
done = 6
@dataclasses.dataclass
class SiggenData:
fs: float # Sample rate [Hz]
# Number of frames "samples" to send in one block
nframes_per_block: int
# The data type to output
dtype: np.dtype
# Settings for the equalizer etc
eqdata: object # Equalizer data
# Level of output signal [dBFS]el
level_dB: float
# Signal type specific data, i.e.
signaltype: SignalType
signaltypedata: Tuple = None
def siggenFcn(siggendata: SiggenData, nblocks_buffer: int,
queues: list
):
"""
Main function running in a different process, is responsible for generating
new signal data. Uses the signal queue to push new generated signal data
on.
"""
fs = siggendata.fs
nframes_per_block = siggendata.nframes_per_block
level_dB = siggendata.level_dB
dtype = siggendata.dtype
eq = None
signaltype = siggendata.signaltype
signaltypedata = siggendata.signaltypedata
dataq, msgq, statusq = queues
def generate(siggen, eq):
"""
Generate a single block of data
"""
signal = siggen.genSignal(nframes_per_block)
if eq is not None:
signal = eq.equalize(signal)
if np.issubdtype((dtype := siggendata.dtype), np.integer):
bitdepth_fixed = dtype.itemsize * 8
signal *= 2 ** (bitdepth_fixed - 1) - 1
dataq.put(signal.astype(dtype))
def newSiggen():
"""
Create a signal generator based on parameters specified in global
function data.
"""
if signaltype == SignalType.Periodic:
freq, = signaltypedata
siggen = pyxSiggen.sineWave(fs, freq, level_dB)
elif signaltype == SignalType.Noise:
noisetype, zerodBpoint = signaltypedata
if noisetype == NoiseType.white:
sos_colorfilter = None
elif noisetype == NoiseType.pink:
sos_colorfilter = PinkNoise(fs, zerodBpoint).flatten()
else:
raise ValueError(f"Unknown noise type")
siggen = pyxSiggen.noise(fs, level_dB, sos_colorfilter)
elif signaltype == SignalType.Sweep:
fl, fu, Ts, Tq, sweep_flags = signaltypedata
siggen = pyxSiggen.sweep(fs, fl, fu, Ts, Tq, sweep_flags, level_dB)
else:
raise ValueError(f"Not implemented signal type: {signaltype}")
# Pre-generate blocks of signal data
while dataq.qsize() < nblocks_buffer:
generate(siggen, eq)
return siggen
# Initialization
try:
siggen = newSiggen()
except Exception as e:
statusq.put((SiggenMessage.error, str(e)))
return 1
finally:
statusq.put((SiggenMessage.done, None))
while True:
msg, data = msgq.get()
if msg == SiggenMessage.stop:
logging.debug("Signal generator caught 'stop' message. Exiting.")
return 0
elif msg == SiggenMessage.generate:
# logging.debug(f"Signal generator caught 'generate' message")
try:
while dataq.qsize() < nblocks_buffer:
# Generate new data and put it in the queue!
generate(siggen, eq)
except SiggenWorkerDone:
statusq.put(SiggenMessage.done)
return 0
elif msg == SiggenMessage.adjustVolume:
logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS")
level_dB = data
siggen = newSiggen()
else:
statusq.put(
SiggenMessage.error, "BUG: Generator caught unknown message. Quiting"
)
return 1
class Siggen:
"""
Signal generator class, generates signal data in a different process to
unload the work in the calling thread.
"""
def __init__(self, stream: AvStream, siggendata: SiggenData):
""""""
self.stream = stream
self.nblocks_buffer = max(
1, int(QUEUE_BUFFER_TIME * stream.samplerate / stream.blocksize)
)
qs = [mp.Queue() for i in range(3)]
self.dataq, self.msgq, self.statusq = qs
self.process = mp.Process(
target=siggenFcn,
args=(siggendata, self.nblocks_buffer, qs),
)
self.process.start()
self.handle_msgs()
if not self.process.is_alive():
raise RuntimeError('Unexpected signal generator exception')
self.stopped = False
def setLevel(self, new_level):
"""
Set a new signal level to the generator
Args:
new_level: The new level in [dBFS]
"""
self.msgq.put((SiggenMessage.adjustVolume, new_level))
def handle_msgs(self):
while not self.statusq.empty():
msg, data = self.statusq.get()
if msg == SiggenMessage.error:
self.stop()
raise RuntimeError(
f"Error in initialization of signal generator: {data}"
)
elif msg == SiggenMessage.ready:
return
elif msg == SiggenMessage.done:
self.stop()
def start(self):
self.stream.addCallback(self.streamCallback, AvType.audio_output)
self.handle_msgs()
def stop(self):
logging.debug('Siggen::stop()')
if self.stopped:
raise RuntimeError('BUG: Siggen::stop() is called twice!')
self.stream.removeCallback(self.streamCallback, AvType.audio_output)
while not self.dataq.empty():
self.dataq.get()
while not self.statusq.empty():
self.statusq.get()
self.msgq.put((SiggenMessage.stop, None))
logging.debug('Joining siggen process')
self.process.join()
logging.debug('Joining siggen process done')
self.process.close()
self.process = None
logging.debug('End Siggen::stop()')
self.stopped = True
def streamCallback(self, indata, outdata, blockctr):
"""Callback from AvStream.
Copy generated signal from queue
"""
# logging.debug('Siggen::streamCallback()')
assert outdata is not None
if not self.dataq.empty():
outdata[:, :] = self.dataq.get()[:, np.newaxis]
else:
logging.warning("Signal generator queue empty!")
outdata[:, :] = 0
if self.dataq.qsize() < self.nblocks_buffer:
self.msgq.put((SiggenMessage.generate, None))