lasp/lasp/lasp_avstream.py

627 lines
20 KiB
Python

# -*- coding: utf-8 -*-
"""
Author: J.A. de Jong
Description: Controlling an audio stream in a different process.
"""
import logging
import multiprocessing as mp
# import cv2 as cv
import signal
import time
from dataclasses import dataclass
from enum import Enum, auto, unique
from typing import List
import numpy as np
from .device import Daq, DaqChannel, DaqConfiguration, DeviceInfo
from .filter import highpass
from .lasp_atomic import Atomic
from .lasp_common import AvType
from .lasp_multiprocessingpatch import apply_patch
from .wrappers import SosFilterBank
apply_patch()
__all__ = ["StreamManager", "ignoreSigInt", "StreamStatus"]
def ignoreSigInt():
"""
Ignore sigint signal. Should be set on all processes to let the main
process control this signal.
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
@dataclass
class StreamMetaData:
# Sample rate [Hz]
fs: float
# Input channels
in_ch: List[DaqChannel]
# Output channels
out_ch: List[DaqChannel]
# blocksize
blocksize: int
# The data type of input and output blocks.
dtype: np.dtype
@unique
class StreamMsg(Enum):
"""
First part, control messages that can be send to the stream
"""
startStream = auto()
stopStream = auto()
stopAllStreams = auto()
getStreamMetaData = auto()
endProcess = auto()
scanDaqDevices = auto()
"""
Second part, status messages that are send back on all listeners
"""
# "Normal messages"
deviceList = auto()
streamStarted = auto()
streamStopped = auto()
streamMetaData = auto()
streamData = auto()
# Error messages
# Some error occured, which mostly leads to a stop of the stream
streamError = auto()
# An error occured, but we recovered
streamTemporaryError = auto()
# A fatal error occured. This leads to serious errors in the application
streamFatalError = auto()
class AudioStream:
"""
Audio stream.
"""
def __init__(
self,
avtype: AvType,
devices: list,
daqconfig: DaqConfiguration,
processCallback: callable,
):
"""
Initializes the audio stream and tries to start it.
avtype: AvType
devices: List of device information
daqconfig: DaqConfiguration to used to generate audio stream backend
processCallback: callback function that will be called from a different
thread, with arguments (AudioStream, in
"""
logging.debug("AudioStream()")
# self.running = Atomic(False)
# self.aframectr = Atomic(0)
self.running = False
self.aframectr = 0
self.avtype = avtype
api_devices = devices[daqconfig.api]
self.processCallback = processCallback
matching_devices = [
device for device in api_devices if device.name == daqconfig.device_name
]
if len(matching_devices) == 0:
raise RuntimeError(f"Could not find device {daqconfig.device_name}")
# TODO: We pick te first one, what to do if we have multiple matches?
# Is that even possible?
device = matching_devices[0]
self.daq = Daq(device, daqconfig)
en_in_ch = daqconfig.getEnabledInChannels(include_monitor=True)
en_out_ch = daqconfig.getEnabledOutChannels()
if en_in_ch == 0 and en_out_ch == 0:
raise RuntimeError("No enabled input / output channels")
elif en_out_ch == 0 and avtype in (AvType.audio_duplex, AvType.audio_output):
raise RuntimeError("No enabled output channels")
elif en_in_ch == 0 and avtype in (AvType.audio_input, AvType.audio_duplex):
raise RuntimeError("No enabled input channels")
logging.debug("Ready to start device...")
samplerate = self.daq.start(self.streamCallback)
# Create required Highpass filters for incoming data
self.hpfs = [None] * len(en_in_ch)
for i, ch in enumerate(en_in_ch):
# Simple filter with a single bank and one section
if ch.highpass > 0:
fb = SosFilterBank(1, 1)
hpf = highpass(samplerate, ch.highpass, Q=np.sqrt(2) / 2)
fb.setFilter(0, hpf[None, :])
self.hpfs[i] = fb
self.streammetadata = StreamMetaData(
fs=samplerate,
in_ch=en_in_ch,
out_ch=en_out_ch,
blocksize=self.daq.nFramesPerBlock,
dtype=self.daq.getNumpyDataType(),
)
self.running = True
def streamCallback(self, indata, outdata, nframes):
"""
This is called (from a separate thread) for each block
of audio data.
"""
if not self.running:
return 1
self.aframectr += 1
# TODO: Fix this. This gives bug on Windows, the threading lock does
# give a strange erro.
try:
if not self.running:
return 1
except Exception as e:
print(e)
if indata is not None:
indata_filtered = np.empty_like(indata)
nchannels = indata.shape[1]
for i in range(nchannels):
# Filter each channel to the optional high-pass, which could also
# be an empty filter
if self.hpfs[i] is not None:
indata_float = indata[:, [i]].astype(np.float)
filtered_ch_float = self.hpfs[i].filter_(indata_float)
indata_filtered[:, i] = filtered_ch_float.astype(
self.streammetadata.dtype
)[:, 0]
else:
# One-to-one copy
indata_filtered[:, i] = indata[:, i]
else:
indata_filtered = indata
# rv = self.processCallback(self, indata, outdata)
rv = self.processCallback(self, indata_filtered, outdata)
if rv != 0:
self.running <<= False
return rv
def stop(self):
"""
Stop the DAQ stream. Should be called only once.
"""
daq = self.daq
self.daq = None
self.running <<= False
daq.stop()
self.streammetadata = None
class AvStreamProcess(mp.Process):
"""
Different process on which all audio streams are running.
"""
def __init__(self, pipe, msg_qlist, indata_qlist, outq):
"""
Args:
pipe: Message control pipe on which commands are received.
msg_qlist: List of queues on which stream status and events are
sent. Here, everything is send, except for the captured data
itself.
indata_qlist: List of queues on which captured data from a DAQ is
send. This one gets all events, but also captured data.
outq: On this queue, the stream process receives data to be send as
output to the devices.
"""
super().__init__()
self.pipe = pipe
self.msg_qlist = msg_qlist
self.indata_qlist = indata_qlist
self.outq = outq
self.devices = {}
self.daqconfigs = None
# In, out, duplex
self.streams = {t: None for t in list(AvType)}
# When this is set, a kill on the main process will also kill the
# siggen process. Highly wanted feature
self.daemon = True
def run(self):
"""
The actual function running in a different process.
"""
# First things first, ignore interrupt signals for THIS process
# https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Check for devices
self.rescanDaqDevices()
while True:
try:
msg, data = self.pipe.recv()
except OSError:
logging.error("Error with pipe, terminating process")
self.stopAllStreams()
self.terminate()
logging.debug(f"Streamprocess obtained message {msg}")
if msg == StreamMsg.scanDaqDevices:
self.rescanDaqDevices()
elif msg == StreamMsg.stopAllStreams:
self.stopAllStreams()
elif msg == StreamMsg.endProcess:
self.stopAllStreams()
# and.. exit!
return
elif msg == StreamMsg.getStreamMetaData:
(avtype,) = data
stream = self.streams[avtype]
if stream is not None:
self.sendAllQueues(
StreamMsg.streamMetaData, avtype, stream.streammetadata
)
else:
self.sendAllQueues(StreamMsg.streamMetaData, avtype, None)
elif msg == StreamMsg.startStream:
avtype, daqconfig = data
self.startStream(avtype, daqconfig)
elif msg == StreamMsg.stopStream:
(avtype,) = data
self.stopStream(avtype)
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration):
"""
Start a stream, based on type and configuration
"""
self.stopRequiredExistingStreams(avtype)
# Empty the queue from existing stuff (puts the signal generator
# directly in action!).
if avtype in (AvType.audio_duplex, AvType.audio_output):
while not self.outq.empty():
self.outq.get()
try:
stream = AudioStream(avtype, self.devices, daqconfig, self.streamCallback)
self.streams[avtype] = stream
self.sendAllQueues(StreamMsg.streamStarted, avtype, stream.streammetadata)
except Exception as e:
self.sendAllQueues(
StreamMsg.streamError, avtype, f"Error starting stream: {str(e)}"
)
return
def stopStream(self, avtype: AvType):
"""
Stop an existing stream, and sets the attribute in the list of streams
to None
Args:
stream: AudioStream instance
"""
stream = self.streams[avtype]
if stream is not None:
try:
stream.stop()
self.sendAllQueues(StreamMsg.streamStopped, stream.avtype)
except Exception as e:
self.sendAllQueues(
StreamMsg.streamError,
stream.avtype,
f"Error occured in stopping stream: {str(e)}",
)
self.streams[avtype] = None
def stopRequiredExistingStreams(self, avtype: AvType):
"""
Stop all existing streams that conflict with the current avtype
"""
if avtype == AvType.audio_input:
# For a new input, duplex and input needs to be stopped
stream_to_stop = (AvType.audio_input, AvType.audio_duplex)
elif avtype == AvType.audio_output:
# For a new output, duplex and output needs to be stopped
stream_to_stop = (AvType.audio_output, AvType.audio_duplex)
elif avtype == AvType.audio_duplex:
# All others have to stop
stream_to_stop = list(AvType) # All of them
else:
raise ValueError("BUG")
for stream in stream_to_stop:
if stream is not None:
self.stopStream(stream)
def stopAllStreams(self):
"""
Stops all streams
"""
for key in self.streams.keys():
self.stopStream(key)
def isStreamRunning(self, avtype: AvType = None):
"""
Check whether a stream is running
Args:
avtype: The stream type to check whether it is still running. If
None, it checks all streams.
Returns:
True if a stream is running, otherwise false
"""
if avtype is None:
avtype = list(AvType)
else:
avtype = (avtype,)
for t in avtype:
if self.streams[t] is not None and self.streams[t].running():
return True
return False
def rescanDaqDevices(self):
"""
Rescan the available DaQ devices.
"""
if self.isStreamRunning():
self.sendAllQueues(
StreamMsg.streamError,
None,
"A stream is running, cannot rescan DAQ devices.",
)
return
self.devices = Daq.getDeviceInfo()
self.sendAllQueues(StreamMsg.deviceList, self.devices)
def streamCallback(self, audiostream, indata, outdata):
"""This is called (from a separate thread) for each audio block."""
# logging.debug('streamCallback()')
if outdata is not None:
if not self.outq.empty():
newdata = self.outq.get()
if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1:
msgtxt = "Invalid output data obtained from queue"
logging.fatal(msgtxt)
self.sendAllQueues(
StreamMsg.streamFatalError, audiostream.avtype, msgtxt
)
return 1
outdata[:, :] = newdata[:, None]
else:
msgtxt = "Signal generator buffer underflow. Signal generator cannot keep up with data generation."
# logging.error(msgtxt)
self.sendAllQueues(
StreamMsg.streamTemporaryError, audiostream.avtype, msgtxt
)
outdata[:, :] = 0
if indata is not None:
self.sendInQueues(StreamMsg.streamData, indata)
return 0
# Wrapper functions that safe some typing, they do not require an
# explanation.
def sendInQueues(self, msg, *data):
# logging.debug('sendInQueues()')
try:
for q in self.indata_qlist:
# Fan out the input data to all queues in the queue list
q.put((msg, data))
except ValueError:
logging.error("Error with data queue, terminating process")
self.stopAllStreams()
self.terminate()
def sendAllQueues(self, msg, *data):
"""
Destined for all queues, including capture data queues
"""
self.sendInQueues(msg, *data)
try:
for q in self.msg_qlist:
# Fan out the input data to all queues in the queue list
q.put((msg, data))
except ValueError:
logging.error("Error with data queue, terminating process")
self.stopAllStreams()
self.terminate()
@dataclass
class StreamStatus:
lastStatus: StreamMsg = StreamMsg.streamStopped
errorTxt: str = None
streammetadata: StreamMetaData = None
class StreamManager:
"""
Audio and video data stream manager, to which queus can be added
"""
def __init__(self):
"""Open a stream for audio in/output and video input. For audio output,"""
# Initialize streamstatus
self.streamstatus = {t: StreamStatus() for t in list(AvType)}
self.devices = None
# Multiprocessing manager, pipe, output queue, input queue,
self.manager = mp.managers.SyncManager()
# Start this manager and ignore interrupts
# https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing
self.manager.start(ignoreSigInt)
# List of queues for all entities that require 'microphone' or input
# data. We need a local list, to manage listener queues, as the queues
# which are in the manager list get a new object id. The local list is
# used to find the index in the manager queues list upon deletion by
# 'removeListener()'
self.indata_qlist = self.manager.list([])
self.indata_qlist_local = []
self.msg_qlist = self.manager.list([])
self.msg_qlist_local = []
# Queue used for signal generator data
self.outq = self.manager.Queue()
# Messaging pipe
self.pipe, child_pipe = mp.Pipe(duplex=True)
# This is the queue on which this class listens for stream process
# messages.
self.our_msgqueue = self.addMsgQueueListener()
# Create the stream process
self.streamProcess = AvStreamProcess(
child_pipe, self.msg_qlist, self.indata_qlist, self.outq
)
self.streamProcess.start()
def scanDaqDevices(self):
"""
Output the message to the stream process to rescan the list of devices
"""
self.sendPipe(StreamMsg.scanDaqDevices, None)
def getStreamStatus(self, avtype: AvType):
"""
Sends a request for the stream status over the pipe, for given AvType
"""
self.sendPipe(StreamMsg.getStreamMetaData, avtype)
def getOutputQueue(self):
"""
Returns the output queue object.
Note, should (of course) only be used by one signal generator at the time!
"""
return self.outq
def addMsgQueueListener(self):
"""
Add a listener queue to the list of message queues, and return the
queue.
Returns:
listener queue
"""
newqueue = self.manager.Queue()
self.msg_qlist.append(newqueue)
self.msg_qlist_local.append(newqueue)
return newqueue
def removeMsgQueueListener(self, queue):
"""
Remove an input listener queue from the message queue list.
"""
# Uses a local queue list to find the index, based on the queue
idx = self.msg_qlist_local.index(queue)
del self.msg_qlist_local[idx]
del self.msg_qlist[idx]
def addInQueueListener(self):
"""
Add a listener queue to the list of queues, and return the queue.
Returns:
listener queue
"""
newqueue = self.manager.Queue()
self.indata_qlist.append(newqueue)
self.indata_qlist_local.append(newqueue)
return newqueue
def removeInQueueListener(self, queue):
"""
Remove an input listener queue from the queue list.
"""
# Uses a local queue list to find the index, based on the queue
idx = self.indata_qlist_local.index(queue)
del self.indata_qlist[idx]
del self.indata_qlist_local[idx]
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration):
"""
Start the stream, which means the callbacks are called with stream
data (audio/video)
Args:
wait: Wait until the stream starts talking before returning from
this function.
"""
logging.debug("Starting stream...")
self.sendPipe(StreamMsg.startStream, avtype, daqconfig)
def stopStream(self, avtype: AvType):
logging.debug(f"StreamManager::stopStream({avtype})")
self.sendPipe(StreamMsg.stopStream, avtype)
def stopAllStreams(self):
self.sendPipe(StreamMsg.stopAllStreams)
def cleanup(self):
"""
Stops the stream if it is still running, and after that, it stops the
stream process.
This method SHOULD always be called before removing a AvStream object.
Otherwise things will wait forever...
"""
self.sendPipe(StreamMsg.endProcess, None)
logging.debug("Joining stream process...")
self.streamProcess.join()
logging.debug("Joining stream process done")
def hasVideo(self):
"""
Stub, TODO: for future
"""
return False
def sendPipe(self, msg, *data):
"""
Send a message with data over the control pipe
"""
self.pipe.send((msg, data))