StreamManager is new frontend to all DAQ.

This commit is contained in:
Anne de Jong 2021-05-07 22:53:29 +02:00
parent ee888891d9
commit e72e9154aa
6 changed files with 420 additions and 259 deletions

View File

@ -1,10 +1,10 @@
cimport cython cimport cython
from ..lasp_common import AvType
from .lasp_deviceinfo cimport DeviceInfo from .lasp_deviceinfo cimport DeviceInfo
from .lasp_daqconfig cimport DaqConfiguration from .lasp_daqconfig cimport DaqConfiguration
from cpython.ref cimport PyObject,Py_INCREF, Py_DECREF from cpython.ref cimport PyObject,Py_INCREF, Py_DECREF
import numpy as np import numpy as np
from ..lasp_common import AvType
__all__ = ['Daq'] __all__ = ['Daq']

View File

@ -2,26 +2,50 @@
""" """
Author: J.A. de Jong Author: J.A. de Jong
Description: Read data from image stream and record sound at the same time Description: Controlling an audio stream in a different process.
""" """
#import cv2 as cv #import cv2 as cv
import multiprocessing as mp import multiprocessing as mp
import signal import time, logging, signal
import numpy as np
from enum import unique, Enum, auto
from dataclasses import dataclass
from .lasp_multiprocessingpatch import apply_patch from .lasp_multiprocessingpatch import apply_patch
apply_patch() apply_patch()
from .lasp_atomic import Atomic from .lasp_atomic import Atomic
from .lasp_common import AvType from .lasp_common import AvType
from .device import (Daq, DeviceInfo, DaqConfiguration) from .device import (Daq, DeviceInfo, DaqConfiguration, DaqChannel)
from threading import Thread, Lock from typing import List
import numpy as np
import time, logging
from enum import unique, Enum, auto
__all__ = ['AvStream'] __all__ = ['StreamManager', 'ignoreSigInt']
def ignoreSigInt():
"""
Ignore sigint signal. Should be set on all processes to let the main
process control this signal.
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
@dataclass
class StreamMetaData:
# Sample rate [Hz]
fs: float
# Input channels
in_ch: List[DaqChannel]
# Output channels
out_ch: List[DaqChannel]
# blocksize
blocksize: int
# The data type of input and output blocks.
dtype: np.dtype
video_x, video_y = 640, 480
@unique @unique
class StreamMsg(Enum): class StreamMsg(Enum):
@ -30,17 +54,19 @@ class StreamMsg(Enum):
""" """
startStream = auto() startStream = auto()
stopStream = auto() stopStream = auto()
stopAllStreams = auto()
getStreamMetaData = auto() getStreamMetaData = auto()
endProcess = auto() endProcess = auto()
scanDaqDevices = auto()
activateSiggen = auto() activateSiggen = auto()
deactivateSiggen = auto() deactivateSiggen = auto()
""" """
Second part, status messages that are send back on all listeners Second part, status messages that are send back on all listeners
""" """
# "Normal messages" # "Normal messages"
deviceList = auto()
streamStarted = auto() streamStarted = auto()
streamStopped = auto() streamStopped = auto()
streamMetaData = auto() streamMetaData = auto()
@ -51,10 +77,83 @@ class StreamMsg(Enum):
streamFatalError = auto() streamFatalError = auto()
class AudioStream:
"""
Audio stream.
"""
def __init__(self,
avtype: AvType,
devices: list,
daqconfig: DaqConfiguration,
processCallback: callable):
"""
Initializes the audio stream and tries to start it.
avtype: AvType
devices: List of device information
daqconfig: DaqConfiguration to used to generate audio stream backend
processCallback: callback function that will be called from a different
thread, with arguments (AudioStream, in
"""
self.running = Atomic(False)
self.aframectr = Atomic(0)
self.avtype = avtype
self.siggen_activated = Atomic(False)
api_devices = devices[daqconfig.api]
self.processCallback = processCallback
matching_devices = [
device for device in api_devices if
device.name == daqconfig.device_name]
if len(matching_devices) == 0:
raise RuntimeError('Could not find device {daqconfig.device_name}')
# TODO: We pick te first one, what to do if we have multiple matches?
# Is that even possible?
device = matching_devices[0]
self.daq = Daq(device, daqconfig)
en_in_ch = daqconfig.getEnabledInChannels(include_monitor=True)
en_out_ch = daqconfig.getEnabledOutChannels()
samplerate = self.daq.start(self.streamCallback)
self.streammetadata = StreamMetaData(
fs = samplerate,
in_ch = daqconfig.getEnabledInChannels(),
out_ch = daqconfig.getEnabledOutChannels(),
blocksize = self.daq.nFramesPerBlock,
dtype = self.daq.getNumpyDataType()
)
def streamCallback(self, indata, outdata, nframes):
"""
This is called (from a separate thread) for each block
of audio data.
"""
return self.processCallback(self, indata, outdata)
def stop(self):
"""
Stop the DAQ stream. Should be called only once.
"""
daq = self.daq
self.daq = None
self.running <<= False
daq.stop()
self.streammetadata = None
class AvStreamProcess(mp.Process): class AvStreamProcess(mp.Process):
"""
Different process on which all audio streams are running
"""
def __init__(self, daqconfig: DaqConfiguration, def __init__(self,
pipe, in_qlist, outq): pipe, in_qlist, outq):
""" """
@ -62,13 +161,15 @@ class AvStreamProcess(mp.Process):
device: DeviceInfo device: DeviceInfo
""" """
self.daqconfig = daqconfig
self.pipe = pipe self.pipe = pipe
self.in_qlist = in_qlist self.in_qlist = in_qlist
self.outq = outq self.outq = outq
self.aframectr = 0
self.daq = None self.devices = {}
self.streamdata = None self.daqconfigs = None
# In, out, duplex
self.streams = {t: None for t in list(AvType)}
super().__init__() super().__init__()
@ -76,122 +177,175 @@ class AvStreamProcess(mp.Process):
""" """
The actual function running in a different process. The actual function running in a different process.
""" """
# First things first, ignore interrupt signals
# https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
# Check for devices
self.rescanDaqDevices()
self.siggen_activated = Atomic(False) self.siggen_activated = Atomic(False)
self.running = Atomic(False)
self.aframectr = Atomic(0)
daqconfig = self.daqconfig
devices = Daq.getDeviceInfo()
api_devices = devices[daqconfig.api]
matching_devices = [
device for device in api_devices if device.name == daqconfig.device_name]
if len(matching_devices) == 0:
self.pipe.send((StreamMsg.streamFatalError, f"Device {daqconfig.device_name} not available"))
self.device = matching_devices[0]
# logging.debug(self.device)
# logging.debug(self.daqconfig)
while True: while True:
msg, data = self.pipe.recv() msg, data = self.pipe.recv()
logging.debug(f'Obtained message {msg}') logging.debug(f'Obtained message {msg}')
if msg == StreamMsg.activateSiggen: if msg == StreamMsg.activateSiggen:
self.siggen_activated <<= True self.siggen_activated <<= True
elif msg == StreamMsg.deactivateSiggen: elif msg == StreamMsg.deactivateSiggen:
self.siggen_activated <<= False self.siggen_activated <<= False
elif msg == StreamMsg.scanDaqDevices:
self.rescanDaqDevices()
elif msg == StreamMsg.stopAllStreams:
self.stopAllStreams()
elif msg == StreamMsg.endProcess: elif msg == StreamMsg.endProcess:
if self.streamdata is not None and self.running: self.stopAllStreams()
logging.error('Process exit while stream is still running') # and.. exit!
return return
elif msg == StreamMsg.getStreamMetaData: elif msg == StreamMsg.getStreamMetaData:
self.pipe.send((StreamMsg.streamMetaData, self.streamdata)) avtype = data
for q in self.in_qlist: stream = self.streams[avtype]
q.put((StreamMsg.streamMetaData, self.streamdata)) if stream is not None:
self.sendPipe(StreamMsg.streamMetaData, avtype, stream.streammetadata)
else:
self.sendPipe(StreamMsg.streamMetaData, avtype, None)
elif msg == StreamMsg.startStream: elif msg == StreamMsg.startStream:
self.startStream() avtype, daqconfig = data
self.startStream(avtype, daqconfig)
elif msg == StreamMsg.stopStream: elif msg == StreamMsg.stopStream:
self.stopStream() avtype, = data
self.stopStream(avtype)
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration):
"""
Start a stream, based on type and configuration
"""
self.stopRequiredExistingStreams(avtype)
try:
stream = AudioStream(avtype,
self.devices,
daqconfig, self.streamCallback)
self.streams[avtype] = stream
except Exception as e:
self.sendPipeAndAllQueues(StreamMsg.streamError, avtype, "Error starting stream {str(e)}")
return
self.sendPipeAndAllQueues(StreamMsg.streamStarted, avtype, stream.streammetadata)
def stopStream(self, avtype: AvType):
"""
Stop an existing stream, and sets the attribute in the list of streams
to None
Args:
stream: AudioStream instance
"""
stream = self.streams[avtype]
if stream is not None:
try:
stream.stop()
self.sendPipeAndAllQueues(StreamMsg.streamStopped, stream.avtype)
except Exception as e:
self.sendPipeAndAllQueues(StreamMsg.streamError, stream.avtype, "Error occured in stopping stream: {str(e)}")
self.streams[avtype] = None
def stopRequiredExistingStreams(self, avtype: AvType):
"""
Stop all existing streams that conflict with the current avtype
"""
if avtype == AvType.audio_input:
# For a new input, duplex and input needs to be stopped
stream_to_stop = (AvType.audio_input, AvType.audio_duplex)
elif avtype == AvType.audio_output:
# For a new output, duplex and output needs to be stopped
stream_to_stop = (AvType.audio_output, AvType.audio_duplex)
elif avtype == AvType.audio_duplex:
# All others have to stop
stream_to_stop = list(AvType) # All of them
else:
raise ValueError('BUG')
for stream in stream_to_stop:
if stream is not None:
self.stopStream(stream)
def startStream(self): def stopAllStreams(self):
""" """
Start the DAQ stream. Stops all streams
""" """
if self.daq is not None: for key in self.streams.keys():
self.pipe.send((StreamMsg.streamError, 'Stream has already been started')) self.stopStream(key)
def isStreamRunning(self, avtype: AvType = None):
"""
Check whether a stream is running
Args:
avtype: The stream type to check whether it is still running. If
None, it checks all streams.
Returns:
True if a stream is running, otherwise false
"""
if avtype is None:
avtype = list(AvType)
else:
avtype = (avtype,)
for t in avtype:
if self.streams[t] is not None:
return True
return False
def rescanDaqDevices(self):
"""
Rescan the available DaQ devices.
"""
if self.isStreamRunning():
self.sendPipe(StreamMsg.streamError, None, "A stream is running, cannot rescan DAQ devices.")
return return
try: self.devices = Daq.getDeviceInfo()
self.daq = Daq(self.device, self.sendPipe(StreamMsg.deviceList, self.devices)
self.daqconfig)
samplerate = self.daq.start(self.streamCallback) def streamCallback(self, audiostream, indata, outdata):
streamdata = {
'blocksize': self.daq.nFramesPerBlock,
'samplerate': samplerate,
'dtype': self.daq.getNumpyDataType(),
}
self.streamdata = streamdata
self.pipe.send((StreamMsg.streamStarted, streamdata))
self.putAllInQueues(StreamMsg.streamStarted, streamdata)
except Exception as e:
logging.debug(f'Error starting stream: {e}')
self.daq = None
self.pipe.send((StreamMsg.streamError, str(e)))
def stopStream(self):
"""
Stop the DAQ stream.
"""
if self.daq is None:
self.pipe.send((StreamMsg.streamError, 'Stream is not running'))
return
try:
self.daq.stop()
self.running <<= False
self.streamdata = None
self.pipe.send((StreamMsg.streamStopped, None))
self.putAllInQueues(StreamMsg.streamStopped, None)
except Exception as e:
self.pipe.send((StreamMsg.streamError, f'Error stopping stream: {e}'))
self.streamdata
self.daq = None
def streamCallback(self, indata, outdata, nframes):
"""This is called (from a separate thread) for each audio block.""" """This is called (from a separate thread) for each audio block."""
# logging.debug('streamCallback()') # logging.debug('streamCallback()')
self.aframectr += nframes
if self.siggen_activated: if self.siggen_activated:
# logging.debug('siggen_activated') # logging.debug('siggen_activated')
if self.outq.empty(): if self.outq.empty():
outdata[:, :] = 0 outdata[:, :] = 0
msgtxt = 'Output signal buffer underflow' msgtxt = 'Output signal buffer underflow'
self.pipe.send((StreamMsg.streamError, msgtxt)) self.sendPipeAndAllQueues(StreamMsg.streamError,
self.putAllInQueues(StreamMsg.streamError, msgtxt) audiostream.avtype,
msgtxt)
else: else:
newdata = self.outq.get() newdata = self.outq.get()
if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1: if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1:
self.pipe.send(StreamMsg.streamFatalError, 'Invalid output data obtained from queue') self.sendPipeAndAllQueues(StreamMsg.streamFatalError,
audiostream.avtype,
'Invalid output data obtained from queue')
return 1 return 1
outdata[:, :] = newdata[:, np.newaxis] outdata[:, :] = newdata[:, None]
if indata is not None: if indata is not None:
self.putAllInQueues(StreamMsg.streamData, indata) self.putAllInQueues(StreamMsg.streamData, audiostream.avtype,
indata)
return 0 if self.running else 1 return 0
def putAllInQueues(self, msg, data): def putAllInQueues(self, msg, *data):
""" """
Put a message and data on all input queues in the queue list Put a message and data on all input queues in the queue list
""" """
@ -199,51 +353,42 @@ class AvStreamProcess(mp.Process):
# Fan out the input data to all queues in the queue list # Fan out the input data to all queues in the queue list
q.put((msg, data)) q.put((msg, data))
def ignoreSigInt(): # Wrapper functions that safe some typing, they do not require an
signal.signal(signal.SIGINT, signal.SIG_IGN) # explanation.
def sendPipe(self, msg, *data):
self.pipe.send((msg, data))
class AvStream: def sendPipeAndAllQueues(self, msg, *data):
"""Audio and video data stream, to which callbacks can be adde self.sendPipe(msg, *data)
daqconfig: DAQConfiguration instance. If duplex mode flag is set, self.putAllInQueues(msg, *data)
please make sure that output_device is None, as in that case the
output config will be taken from the input device.
video: @dataclass
""" class StreamStatus:
def __init__(self, lastStatus: StreamMsg = StreamMsg.streamStopped
avtype: AvType, errorTxt: str = None
daqconfig: DaqConfiguration, streammetadata: StreamMetaData = None
video=None):
class StreamManager:
"""
Audio and video data stream manager, to which queus can be added
"""
def __init__(self):
"""Open a stream for audio in/output and video input. For audio output, """Open a stream for audio in/output and video input. For audio output,
by default all available channels are opened for outputting data. by default all available channels are opened for outputting data.
Args:
device: DeviceInfo for the audio device
avtype: Type of stream. Input, output or duplex
daqconfig: DAQConfiguration instance. If duplex mode flag is set,
please make sure that output_device is None, as in that case the
output config will be taken from the input device.
video:
""" """
self.avtype = avtype # Initialize streamstatus
self.streamstatus = {t: StreamStatus() for t in list(AvType)}
en_in_ch = daqconfig.getEnabledInChannels(include_monitor=True)
en_out_ch = daqconfig.getEnabledOutChannels()
self.input_channel_names = [ch.channel_name for ch in en_in_ch]
self.output_channel_names = [ch.channel_name for ch in en_out_ch]
self.input_sensitivity = [ch.sensitivity for ch in en_in_ch]
self.input_sensitivity = np.asarray(self.input_sensitivity)
self.input_qtys = [ch.qty for ch in en_in_ch]
# Counters for the number of frames that have been coming in
self._vframectr = Atomic(0)
self.devices = None
# Multiprocessing manager, pipe, output queue, input queue, # Multiprocessing manager, pipe, output queue, input queue,
self.manager = mp.managers.SyncManager() self.manager = mp.managers.SyncManager()
# Start this manager and ignore interrupts
# https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing # https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing
self.manager.start(ignoreSigInt) self.manager.start(ignoreSigInt)
@ -260,40 +405,77 @@ class AvStream:
# Messaging pipe # Messaging pipe
self.pipe, child_pipe = mp.Pipe(duplex=True) self.pipe, child_pipe = mp.Pipe(duplex=True)
# Create the stream process
self.streamProcess = AvStreamProcess( self.streamProcess = AvStreamProcess(
daqconfig,
child_pipe, child_pipe,
self.in_qlist, self.in_qlist,
self.outq) self.outq)
self.streamProcess.start() self.streamProcess.start()
# Possible, but long not tested: store video def handleMessages(self):
# self._video = video """
# self._video_started = Atomic(False)
self._videothread = None
self.daqconfig = daqconfig Handle messages that are still on the pipe.
self.streammetadata = None
def getStreamMetaData(self): """
return self.streammetadata while self.pipe.poll():
msg, data = self.pipe.recv()
if msg == StreamMsg.streamStarted:
avtype, streammetadata = data
self.streamstatus[avtype].lastStatus = msg
self.streamstatus[avtype].errorTxt = None
self.streamstatus[avtype].streammetadata = streammetadata
elif msg == StreamMsg.streamStopped:
avtype, = data
self.streamstatus[avtype].lastStatus = msg
self.streamstatus[avtype].errorTxt = None
self.streamstatus[avtype].streammetadata = None
elif msg == StreamMsg.streamError:
avtype, errorTxt = data
if avtype is not None:
self.streamstatus[avtype].lastStatus = msg
self.streamstatus[avtype].errorTxt = None
elif msg == StreamMsg.streamMetaData:
avtype, metadata = data
self.streamstatus[avtype].streammetadata = metadata
elif msg == StreamMsg.deviceList:
devices = data
self.devices = devices
def getDeviceList(self):
self.handleMessages()
return self.devices
def getStreamStatus(self, avtype: AvType):
"""
Returns the current stream Status.
"""
self.handleMessages()
return self.streamstatus[avtype]
def getOutputQueue(self): def getOutputQueue(self):
""" """
Returns the output queue object. Returns the output queue object.
Note, should only be used by one signal generator at the time! Note, should (of course) only be used by one signal generator at the time!
""" """
return self.outq return self.outq
def activateSiggen(self): def activateSiggen(self):
self.handleMessages()
logging.debug('activateSiggen()') logging.debug('activateSiggen()')
self.pipe.send((StreamMsg.activateSiggen, None)) self.sendPipe(StreamMsg.activateSiggen, None)
def deactivateSiggen(self): def deactivateSiggen(self):
self.handleMessages()
logging.debug('activateSiggen()') logging.debug('activateSiggen()')
self.pipe.send((StreamMsg.deactivateSiggen, None)) self.sendPipe(StreamMsg.deactivateSiggen, None)
def addListener(self): def addListener(self):
""" """
@ -302,12 +484,18 @@ class AvStream:
Returns: Returns:
listener queue listener queue
""" """
self.handleMessages()
newqueue = self.manager.Queue() newqueue = self.manager.Queue()
self.in_qlist.append(newqueue) self.in_qlist.append(newqueue)
self.in_qlist_local.append(newqueue) self.in_qlist_local.append(newqueue)
return newqueue return newqueue
def removeListener(self, queue): def removeListener(self, queue):
"""
Remove an input listener queue from the queue list.
"""
# Uses a local queue list to find the index, based on the queue
self.handleMessages()
idx = self.in_qlist_local.index(queue) idx = self.in_qlist_local.index(queue)
del self.in_qlist[idx] del self.in_qlist[idx]
del self.in_qlist_local[idx] del self.in_qlist_local[idx]
@ -316,29 +504,22 @@ class AvStream:
"""Returns the current number of installed listeners.""" """Returns the current number of installed listeners."""
return len(self.in_qlist) return len(self.in_qlist)
def start(self): def startStream(self, avtype: AvType, daqconfig: DaqConfiguration):
"""Start the stream, which means the callbacks are called with stream """
data (audio/video)""" Start the stream, which means the callbacks are called with stream
logging.debug('Starting stream...') data (audio/video)
self.pipe.send((StreamMsg.startStream, None))
msg, data = self.pipe.recv()
if msg == StreamMsg.streamStarted:
self.streammetadata = data
return data
elif msg == StreamMsg.streamError:
raise RuntimeError(data)
else:
raise RuntimeError('BUG: got unexpected message: {msg}')
def stop(self): """
self.pipe.send((StreamMsg.stopStream, None)) logging.debug('Starting stream...')
msg, data = self.pipe.recv() self.sendPipe(StreamMsg.startStream, avtype, daqconfig)
if msg == StreamMsg.streamStopped: self.handleMessages()
return
elif msg == StreamMsg.streamError: def stopStream(self, avtype: AvType):
raise RuntimeError(data) self.handleMessages()
else: self.sendPipe(StreamMsg.stopStream, avtype)
raise RuntimeError('BUG: got unexpected message: {msg}')
def stopAllStreams(self):
self.sendPipe(StreamMsg.stopAllStreams)
def cleanup(self): def cleanup(self):
""" """
@ -349,7 +530,7 @@ class AvStream:
Otherwise things will wait forever... Otherwise things will wait forever...
""" """
self.pipe.send((StreamMsg.endProcess, None)) self.sendPipe(StreamMsg.endProcess, None)
logging.debug('Joining stream process...') logging.debug('Joining stream process...')
self.streamProcess.join() self.streamProcess.join()
logging.debug('Joining stream process done') logging.debug('Joining stream process done')
@ -360,47 +541,9 @@ class AvStream:
""" """
return False return False
def sendPipe(self, msg, *data):
def isRunning(self): """
self.pipe.send((StreamMsg.getStreamMetaData, None)) Send a message with data over the control pipe
msg, data = self.pipe.recv() """
if msg == StreamMsg.streamMetaData: self.pipe.send((msg, data))
streamdata = data
return streamdata is not None
elif msg == StreamMsg.streamError:
raise RuntimeError(data)
else:
raise RuntimeError('BUG: got unexpected message: {msg}')
# def _videoThread(self):
# cap = cv.VideoCapture(self._video)
# if not cap.isOpened():
# cap.open()
# vframectr = 0
# loopctr = 0
# while self._running:
# ret, frame = cap.read()
# # print(frame.shape)
# if ret is True:
# if vframectr == 0:
# self._video_started <<= True
# with self._callbacklock:
# for cb in self._callbacks[AvType.video]:
# cb(frame, vframectr)
# vframectr += 1
# self._vframectr += 1
# else:
# loopctr += 1
# if loopctr == 10:
# print('Error: no video capture!')
# time.sleep(0.2)
# cap.release()
# print('stopped videothread')
# def hasVideo(self):
# return True if self._video is not None else False

View File

@ -11,6 +11,7 @@ from .wrappers import Window as wWindow
from collections import namedtuple from collections import namedtuple
from dataclasses import dataclass from dataclasses import dataclass
from dataclasses_json import dataclass_json from dataclasses_json import dataclass_json
from enum import Enum, unique, auto
""" """
Common definitions used throughout the code. Common definitions used throughout the code.
@ -43,11 +44,19 @@ U_REF = 5e-8 # 50 nano meter / s
# hence this is the reference level as specified below. # hence this is the reference level as specified below.
dBFS_REF = 0.5*2**0.5 # Which level would be -3.01 dBFS dBFS_REF = 0.5*2**0.5 # Which level would be -3.01 dBFS
class AvType: @unique
class AvType(Enum):
"""Specificying the type of data, for adding and removing callbacks from """Specificying the type of data, for adding and removing callbacks from
the stream.""" the stream."""
audio_input = 1
audio_output = 2 # Input stream
audio_input = auto()
# Output stream
audio_output = auto()
# Both input as well as output
audio_duplex = auto()
video = 4 video = 4

View File

@ -8,7 +8,8 @@ import logging
import os import os
import time import time
import h5py import h5py
from .lasp_avstream import AvStream, AvType, StreamMsg from .lasp_avstream import StreamMsg, StreamManager
from .lasp_common import AvType
@dataclasses.dataclass @dataclasses.dataclass
@ -19,7 +20,7 @@ class RecordStatus:
class Recording: class Recording:
def __init__(self, fn: str, stream: AvStream, def __init__(self, fn: str, streammgr: StreamManager,
rectime: float = None, wait: bool = True, rectime: float = None, wait: bool = True,
progressCallback=None): progressCallback=None):
""" """

View File

@ -15,17 +15,16 @@ import numpy as np
from .filter import PinkNoise from .filter import PinkNoise
from .lasp_octavefilter import SosOctaveFilterBank, SosThirdOctaveFilterBank from .lasp_octavefilter import SosOctaveFilterBank, SosThirdOctaveFilterBank
from .filter import OctaveBankDesigner, PinkNoise, ThirdOctaveBankDesigner from .filter import OctaveBankDesigner, PinkNoise, ThirdOctaveBankDesigner
from .lasp_avstream import AvStream, AvType from .lasp_avstream import StreamManager, ignoreSigInt
from .wrappers import Siggen as pyxSiggen, Equalizer from .wrappers import Siggen as pyxSiggen, Equalizer
from enum import Enum, unique, auto from enum import Enum, unique, auto
QUEUE_BUFFER_TIME = 0.3 # The amount of time used in the queues for buffering QUEUE_BUFFER_TIME = 0.5 # The amount of time used in the queues for buffering
# of data, larger is more stable, but also enlarges latency # of data, larger is more stable, but also enlarges latency
__all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"] __all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"]
class SignalType(Enum): class SignalType(Enum):
Periodic = auto() Periodic = auto()
Noise = auto() Noise = auto()
@ -39,7 +38,7 @@ class NoiseType(Enum):
pink = "Pink noise" pink = "Pink noise"
def __str__(self): def __str__(self):
return self.value return str(self.value)
@staticmethod @staticmethod
def fillComboBox(combo): def fillComboBox(combo):
@ -62,9 +61,11 @@ class SiggenMessage(Enum):
Different messages that can be send to the signal generator over the pipe Different messages that can be send to the signal generator over the pipe
connection. connection.
""" """
stop = auto() # Stop and quit the signal generator endProcess = auto() # Stop and quit the signal generator
adjustVolume = auto() # Adjust the volume adjustVolume = auto() # Adjust the volume
newEqSettings = auto() # Forward new equalizer settings newEqSettings = auto() # Forward new equalizer settings
ready = auto() # Send out once, once the signal generator is ready with
# pre-generating data.
# These messages are send back to the main thread over the pipe # These messages are send back to the main thread over the pipe
error = auto() error = auto()
@ -73,6 +74,9 @@ class SiggenMessage(Enum):
@dataclasses.dataclass @dataclasses.dataclass
class SiggenData: class SiggenData:
"""
Metadata used to create a Signal Generator
"""
fs: float # Sample rate [Hz] fs: float # Sample rate [Hz]
# Number of frames "samples" to send in one block # Number of frames "samples" to send in one block
@ -120,10 +124,13 @@ class SiggenProcess(mp.Process):
) )
super().__init__() super().__init__()
def newSiggen(self, siggendata): def newSiggen(self, siggendata: SiggenData):
""" """
Create a signal generator based on parameters specified in global Create a signal generator based on parameters specified in global
function data. function data.
Args:
siggendata: SiggenData. Metadata to create a new signal generator.
""" """
fs = siggendata.fs fs = siggendata.fs
nframes_per_block = siggendata.nframes_per_block nframes_per_block = siggendata.nframes_per_block
@ -192,23 +199,31 @@ class SiggenProcess(mp.Process):
return eq return eq
def run(self): def run(self):
# Initialization # The main function of the actual process
# First things first
ignoreSigInt()
try: try:
self.siggen = self.newSiggen(self.siggendata) self.siggen = self.newSiggen(self.siggendata)
except Exception as e:
self.pipe.send((SiggenMessage.error, str(e)))
try:
self.eq = self.newEqualizer(self.siggendata.eqdata) self.eq = self.newEqualizer(self.siggendata.eqdata)
except Exception as e: except Exception as e:
self.pipe.send((SiggenMessage.error, str(e))) self.pipe.send((SiggenMessage.error, str(e)))
return 1
# Pre-generate blocks of signal data # Pre-generate blocks of signal data
while self.dataq.qsize() < self.nblocks_buffer: while self.dataq.qsize() < self.nblocks_buffer:
self.generate() self.generate()
self.pipe.send((SiggenMessage.ready, None))
while True: while True:
if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 2): if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 2):
msg, data = self.pipe.recv() msg, data = self.pipe.recv()
if msg == SiggenMessage.stop: if msg == SiggenMessage.endProcess:
logging.debug("Signal generator caught 'stop' message. Exiting.") logging.debug("Signal generator caught 'endProcess' message. Exiting.")
return 0 return 0
elif msg == SiggenMessage.adjustVolume: elif msg == SiggenMessage.adjustVolume:
logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS")
@ -243,14 +258,27 @@ class Siggen:
self.pipe, client_end = mp.Pipe(duplex=True) self.pipe, client_end = mp.Pipe(duplex=True)
self.stopped = False
self.process = SiggenProcess(siggendata, dataq, client_end) self.process = SiggenProcess(siggendata, dataq, client_end)
self.process.start() self.process.start()
self.handle_msgs()
if not self.process.is_alive(): if not self.process.is_alive():
raise RuntimeError('Unexpected signal generator exception') raise RuntimeError('Unexpected signal generator exception')
self.stopped = False # Block waiting here for signal generator to be ready
msg, data = self.pipe.recv()
if msg == SiggenMessage.ready:
logging.debug('Signal generator ready')
elif msg == SiggenMessage.error:
e = data
raise RuntimeError('Signal generator exception: {str(e)}')
else:
# Done, or something
if msg == SiggenMessage.done:
self.stopped = True
self.handle_msgs()
def setLevel(self, new_level): def setLevel(self, new_level):
""" """
@ -268,27 +296,16 @@ class Siggen:
while self.pipe.poll(): while self.pipe.poll():
msg, data = self.pipe.recv() msg, data = self.pipe.recv()
if msg == SiggenMessage.error: if msg == SiggenMessage.error:
self.stop()
raise RuntimeError( raise RuntimeError(
f"Error in initialization of signal generator: {data}" f"Error in initialization of signal generator: {data}"
) )
elif msg == SiggenMessage.ready: # elif msg == SiggenMessage.done:
return # self.stop()
elif msg == SiggenMessage.done:
self.stop()
def start(self): def cleanup(self):
if self.stopped:
raise RuntimeError('BUG: This Siggen object cannot be used again.')
self.handle_msgs()
def stop(self):
logging.debug('Siggen::stop()') logging.debug('Siggen::stop()')
if self.stopped: self.pipe.send((SiggenMessage.endProcess, None))
raise RuntimeError('BUG: Siggen::stop() is called twice!')
self.pipe.send((SiggenMessage.stop, None))
self.pipe.close() self.pipe.close()
logging.debug('Joining siggen process') logging.debug('Joining siggen process')
@ -297,6 +314,4 @@ class Siggen:
self.process.close() self.process.close()
self.process = None self.process = None
logging.debug('End Siggen::stop()')
self.stopped = True

View File

@ -5,7 +5,8 @@ import sys, logging, os, argparse
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
import multiprocessing import multiprocessing
from lasp.lasp_multiprocessingpatch import apply_patch from lasp.lasp_multiprocessingpatch import apply_patch
from lasp.lasp_avstream import AvStream, AvType from lasp.lasp_avstream import StreamManager
from lasp.lasp_common import AvType
from lasp.lasp_siggen import Siggen, SignalType, SiggenData from lasp.lasp_siggen import Siggen, SignalType, SiggenData
from lasp.device import DaqConfigurations from lasp.device import DaqConfigurations
@ -36,41 +37,33 @@ if __name__ == '__main__':
choosen_key = config_keys[daqindex] choosen_key = config_keys[daqindex]
config = configs[choosen_key].output_config daqconfig = configs[choosen_key].output_config
print(f'Choosen configuration: {choosen_key}') print(f'Choosen configuration: {choosen_key}')
try: try:
streammgr = StreamManager()
outq = streammgr.getOutputQueue()
siggendata = SiggenData( siggendata = SiggenData(
fs=48e3, fs=48e3,
nframes_per_block=1024, nframes_per_block=2048,
dtype=np.dtype(np.int16), dtype=np.dtype(np.int16),
eqdata=None, eqdata=None,
level_dB=-20, level_dB=-20,
signaltype=SignalType.Periodic, signaltype=SignalType.Periodic,
signaltypedata=(1000.,) signaltypedata=(1000.,)
) )
stream = AvStream(
AvType.audio_output,
config)
outq = stream.getOutputQueue()
stream.activateSiggen()
siggen = Siggen(outq, siggendata) siggen = Siggen(outq, siggendata)
stream.start() streammgr.activateSiggen()
streammgr.startStream(AvType.audio_output, daqconfig)
input('Press any key to stop...') input('Press any key to stop...')
stream.stop() streammgr.stopStream(AvType.audio_output)
siggen.stop()
finally: finally:
try: siggen.cleanup()
stream.cleanup() streammgr.cleanup()
del stream
except NameError:
pass