Bugfix in deserializing device info. Added comments and function in stream manager to rescan Daq devices

This commit is contained in:
Anne de Jong 2021-05-13 21:35:51 +02:00
parent 57fe4e6b7c
commit bd4961710e
6 changed files with 130 additions and 87 deletions

View File

@ -23,6 +23,8 @@ using std::getline;
using std::runtime_error;
using std::string;
using std::vector;
using std::to_string;
typedef unsigned int us;
typedef vector<bool> boolvec;
@ -69,7 +71,7 @@ class DaqApi {
return (apiname == other.apiname && apicode == other.apicode &&
api_specific_subcode == other.api_specific_subcode);
}
operator string() const { return apiname + ", code: " + to_string(apicode); }
static vector<DaqApi> getAvailableApis();
};
@ -192,6 +194,7 @@ class DeviceInfo {
std::stringstream str(dstr);
string tmp;
us N;
// Lambda functions for deserializing
auto nexts = [&]() { getline(str, tmp, '\t'); return tmp; };
auto nexti = [&]() { getline(str, tmp, '\t'); return std::atoi(tmp.c_str()); };
auto nextf = [&]() { getline(str, tmp, '\t'); return std::atof(tmp.c_str()); };

View File

@ -11,7 +11,8 @@ __all__ = ['DeviceInfo']
def pickle(dat):
dev = DeviceInfo()
dev.devinfo.deserialize(dat)
# print('DESERIALIZE****')
dev.devinfo = dev.devinfo.deserialize(dat)
return dev
cdef class DeviceInfo:
@ -22,7 +23,9 @@ cdef class DeviceInfo:
pass
def __reduce__(self):
return (pickle, (self.devinfo.serialize(),))
serialized = self.devinfo.serialize()
# print('SERIALIZE****')
return (pickle, (serialized,))
@property
def api(self): return self.devinfo.api.apiname.decode('utf-8')

View File

@ -4,23 +4,26 @@ Author: J.A. de Jong
Description: Controlling an audio stream in a different process.
"""
#import cv2 as cv
from .lasp_multiprocessingpatch import apply_patch
apply_patch()
import logging
import multiprocessing as mp
import time, logging, signal
import numpy as np
from enum import unique, Enum, auto
# import cv2 as cv
import signal
import time
from dataclasses import dataclass
from .lasp_atomic import Atomic
from .lasp_common import AvType
from .device import (Daq, DeviceInfo, DaqConfiguration, DaqChannel)
from enum import Enum, auto, unique
from typing import List
import numpy as np
__all__ = ['StreamManager', 'ignoreSigInt']
from .device import Daq, DaqChannel, DaqConfiguration, DeviceInfo
from .lasp_atomic import Atomic
from .lasp_common import AvType
from .lasp_multiprocessingpatch import apply_patch
apply_patch()
__all__ = ["StreamManager", "ignoreSigInt"]
def ignoreSigInt():
@ -30,6 +33,7 @@ def ignoreSigInt():
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
@dataclass
class StreamMetaData:
# Sample rate [Hz]
@ -53,6 +57,7 @@ class StreamMsg(Enum):
"""
First part, control messages that can be send to the stream
"""
startStream = auto()
stopStream = auto()
stopAllStreams = auto()
@ -62,7 +67,6 @@ class StreamMsg(Enum):
activateSiggen = auto()
deactivateSiggen = auto()
"""
Second part, status messages that are send back on all listeners
"""
@ -82,11 +86,14 @@ class AudioStream:
"""
Audio stream.
"""
def __init__(self,
avtype: AvType,
devices: list,
daqconfig: DaqConfiguration,
processCallback: callable):
def __init__(
self,
avtype: AvType,
devices: list,
daqconfig: DaqConfiguration,
processCallback: callable,
):
"""
Initializes the audio stream and tries to start it.
@ -106,31 +113,30 @@ class AudioStream:
self.processCallback = processCallback
matching_devices = [
device for device in api_devices if
device.name == daqconfig.device_name]
device for device in api_devices if device.name == daqconfig.device_name
]
if len(matching_devices) == 0:
raise RuntimeError('Could not find device {daqconfig.device_name}')
raise RuntimeError("Could not find device {daqconfig.device_name}")
# TODO: We pick te first one, what to do if we have multiple matches?
# Is that even possible?
device = matching_devices[0]
self.daq = Daq(device, daqconfig)
self.daq = Daq(device, daqconfig)
en_in_ch = daqconfig.getEnabledInChannels(include_monitor=True)
en_out_ch = daqconfig.getEnabledOutChannels()
samplerate = self.daq.start(self.streamCallback)
self.streammetadata = StreamMetaData(
fs = samplerate,
in_ch = daqconfig.getEnabledInChannels(),
out_ch = daqconfig.getEnabledOutChannels(),
blocksize = self.daq.nFramesPerBlock,
dtype = self.daq.getNumpyDataType()
fs=samplerate,
in_ch=daqconfig.getEnabledInChannels(),
out_ch=daqconfig.getEnabledOutChannels(),
blocksize=self.daq.nFramesPerBlock,
dtype=self.daq.getNumpyDataType(),
)
self.running <<= True
def streamCallback(self, indata, outdata, nframes):
"""
This is called (from a separate thread) for each block
@ -160,12 +166,11 @@ class AvStreamProcess(mp.Process):
Different process on which all audio streams are running
"""
def __init__(self,
pipe, in_qlist, outq):
def __init__(self, pipe, in_qlist, outq):
"""
Args:
device: DeviceInfo
device: DeviceInfo
"""
self.pipe = pipe
@ -194,7 +199,7 @@ class AvStreamProcess(mp.Process):
while True:
msg, data = self.pipe.recv()
logging.debug(f'Streamprocess obtained message {msg}')
logging.debug(f"Streamprocess obtained message {msg}")
if msg == StreamMsg.activateSiggen:
self.siggen_activated <<= True
@ -214,21 +219,21 @@ class AvStreamProcess(mp.Process):
return
elif msg == StreamMsg.getStreamMetaData:
avtype, = data
(avtype,) = data
stream = self.streams[avtype]
if stream is not None:
self.sendPipeAndAllQueues(StreamMsg.streamMetaData, avtype,
stream.streammetadata)
self.sendPipeAndAllQueues(
StreamMsg.streamMetaData, avtype, stream.streammetadata
)
else:
self.sendPipeAndAllQueues(StreamMsg.streamMetaData, avtype,
None)
self.sendPipeAndAllQueues(StreamMsg.streamMetaData, avtype, None)
elif msg == StreamMsg.startStream:
avtype, daqconfig = data
self.startStream(avtype, daqconfig)
elif msg == StreamMsg.stopStream:
avtype, = data
(avtype,) = data
self.stopStream(avtype)
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration):
@ -238,16 +243,18 @@ class AvStreamProcess(mp.Process):
"""
self.stopRequiredExistingStreams(avtype)
try:
stream = AudioStream(avtype,
self.devices,
daqconfig, self.streamCallback)
stream = AudioStream(avtype, self.devices, daqconfig, self.streamCallback)
self.streams[avtype] = stream
except Exception as e:
self.sendPipeAndAllQueues(StreamMsg.streamError, avtype, "Error starting stream {str(e)}")
self.sendPipeAndAllQueues(
StreamMsg.streamError, avtype, "Error starting stream {str(e)}"
)
return
self.sendPipeAndAllQueues(StreamMsg.streamStarted, avtype, stream.streammetadata)
self.sendPipeAndAllQueues(
StreamMsg.streamStarted, avtype, stream.streammetadata
)
def stopStream(self, avtype: AvType):
"""
@ -260,14 +267,16 @@ class AvStreamProcess(mp.Process):
stream = self.streams[avtype]
if stream is not None:
try:
stream.stop()
self.sendPipeAndAllQueues(StreamMsg.streamStopped, stream.avtype)
stream.stop()
self.sendPipeAndAllQueues(StreamMsg.streamStopped, stream.avtype)
except Exception as e:
self.sendPipeAndAllQueues(StreamMsg.streamError, stream.avtype,
"Error occured in stopping stream: {str(e)}")
self.sendPipeAndAllQueues(
StreamMsg.streamError,
stream.avtype,
"Error occured in stopping stream: {str(e)}",
)
self.streams[avtype] = None
def stopRequiredExistingStreams(self, avtype: AvType):
"""
Stop all existing streams that conflict with the current avtype
@ -280,14 +289,14 @@ class AvStreamProcess(mp.Process):
stream_to_stop = (AvType.audio_output, AvType.audio_duplex)
elif avtype == AvType.audio_duplex:
# All others have to stop
stream_to_stop = list(AvType) # All of them
stream_to_stop = list(AvType) # All of them
else:
raise ValueError('BUG')
raise ValueError("BUG")
for stream in stream_to_stop:
if stream is not None:
self.stopStream(stream)
def stopAllStreams(self):
"""
Stops all streams
@ -322,12 +331,16 @@ class AvStreamProcess(mp.Process):
"""
if self.isStreamRunning():
self.sendPipe(StreamMsg.streamError, None, "A stream is running, cannot rescan DAQ devices.")
self.sendPipe(
StreamMsg.streamError,
None,
"A stream is running, cannot rescan DAQ devices.",
)
return
self.devices = Daq.getDeviceInfo()
self.sendPipe(StreamMsg.deviceList, self.devices)
def streamCallback(self, audiostream, indata, outdata):
"""This is called (from a separate thread) for each audio block."""
# logging.debug('streamCallback()')
@ -336,25 +349,24 @@ class AvStreamProcess(mp.Process):
if not self.outq.empty():
newdata = self.outq.get()
if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1:
msgtxt = 'Invalid output data obtained from queue'
msgtxt = "Invalid output data obtained from queue"
logging.fatal(msgtxt)
self.sendPipeAndAllQueues(StreamMsg.streamFatalError,
audiostream.avtype,
msgtxt
)
self.sendPipeAndAllQueues(
StreamMsg.streamFatalError, audiostream.avtype, msgtxt
)
return 1
outdata[:, :] = newdata[:, None]
else:
outdata[:, :] = 0
msgtxt = 'Output signal buffer underflow'
msgtxt = "Output signal buffer underflow"
logging.error(msgtxt)
self.sendPipeAndAllQueues(StreamMsg.streamError,
audiostream.avtype,
msgtxt)
self.sendPipeAndAllQueues(
StreamMsg.streamError, audiostream.avtype, msgtxt
)
# Siggen not activated
else:
logging.debug('siggen not activated')
logging.debug("siggen not activated")
outdata[:, :] = 0
if indata is not None:
@ -391,9 +403,9 @@ class StreamManager:
"""
Audio and video data stream manager, to which queus can be added
"""
def __init__(self):
"""Open a stream for audio in/output and video input. For audio output,
by default all available channels are opened for outputting data.
"""
@ -424,10 +436,7 @@ class StreamManager:
self.pipe, child_pipe = mp.Pipe(duplex=True)
# Create the stream process
self.streamProcess = AvStreamProcess(
child_pipe,
self.in_qlist,
self.outq)
self.streamProcess = AvStreamProcess(child_pipe, self.in_qlist, self.outq)
self.streamProcess.start()
def handleMessages(self):
@ -448,7 +457,7 @@ class StreamManager:
self.streamstatus[avtype].streammetadata = streammetadata
elif msg == StreamMsg.streamStopped:
avtype, = data
(avtype,) = data
self.streamstatus[avtype].lastStatus = msg
self.streamstatus[avtype].errorTxt = None
self.streamstatus[avtype].streammetadata = None
@ -461,7 +470,7 @@ class StreamManager:
elif msg == StreamMsg.streamFatalError:
avtype, errorTxt = data
logging.critical(f'Streamprocess fatal error: {errorTxt}')
logging.critical(f"Streamprocess fatal error: {errorTxt}")
self.cleanup()
elif msg == StreamMsg.streamMetaData:
@ -469,13 +478,19 @@ class StreamManager:
self.streamstatus[avtype].streammetadata = metadata
elif msg == StreamMsg.deviceList:
devices = data
devices, = data
# logging.debug(devices)
self.devices = devices
def getDeviceList(self):
self.handleMessages()
return self.devices
def rescanDaqDevices(self):
"""
Output the message to the stream process to rescan the list of devices
"""
self.sendPipe(StreamMsg.scanDaqDevices, None)
def getStreamStatus(self, avtype: AvType):
"""
@ -495,12 +510,12 @@ class StreamManager:
def activateSiggen(self):
self.handleMessages()
logging.debug('activateSiggen()')
logging.debug("activateSiggen()")
self.sendPipe(StreamMsg.activateSiggen, None)
def deactivateSiggen(self):
self.handleMessages()
logging.debug('activateSiggen()')
logging.debug("activateSiggen()")
self.sendPipe(StreamMsg.deactivateSiggen, None)
def addListener(self):
@ -530,8 +545,7 @@ class StreamManager:
"""Returns the current number of installed listeners."""
return len(self.in_qlist)
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration,
wait=False):
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration, wait=False):
"""
Start the stream, which means the callbacks are called with stream
data (audio/video)
@ -541,7 +555,7 @@ class StreamManager:
this function.
"""
logging.debug('Starting stream...')
logging.debug("Starting stream...")
self.handleMessages()
self.sendPipe(StreamMsg.startStream, avtype, daqconfig)
if wait:
@ -552,7 +566,6 @@ class StreamManager:
if self.streamstatus[avtype].lastStatus != StreamMsg.streamStopped:
break
def stopStream(self, avtype: AvType):
self.handleMessages()
self.sendPipe(StreamMsg.stopStream, avtype)
@ -570,19 +583,17 @@ class StreamManager:
"""
self.sendPipe(StreamMsg.endProcess, None)
logging.debug('Joining stream process...')
logging.debug("Joining stream process...")
self.streamProcess.join()
logging.debug('Joining stream process done')
logging.debug("Joining stream process done")
def hasVideo(self):
"""
Stub, TODO: for future
"""
return False
def sendPipe(self, msg, *data):
"""
Send a message with data over the control pipe
"""
self.pipe.send((msg, data))

View File

@ -104,7 +104,7 @@ class SIQtys:
@staticmethod
def fillComboBox(cb):
"""
Fill FreqWeightings to a combobox
Fill to a combobox
Args:
cb: QComboBox to fill
@ -143,7 +143,7 @@ class CalibrationSettings:
@staticmethod
def fillComboBox(cb):
"""
Fill FreqWeightings to a combobox
Fill Calibration Settings to a combobox
Args:
cb: QComboBox to fill
@ -300,6 +300,7 @@ class TimeWeighting:
infinite = (0, 'Infinite')
types_realtime = (ufast, fast, slow, tens, infinite)
types_all = (none, uufast, ufast, fast, slow, tens, infinite)
default = fast
default_index = 3
default_index_realtime = 1

24
lasp/lasp_logging.py Normal file
View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
"""
Author: J.A. de Jong
Description: configure the logging of messages
"""
import logging, sys
# __all__ = ['configureLogging']
# global_loglevel = None
# def configureLogging(level=None):
# # Oh yeah, one global variable
# global global_loglevel
# if level is None:
# level is global_loglevel
# else:
# global_loglevel = level
# if level is None:
# raise RuntimeError('Log level has not yet been set application wide')

View File

@ -1,6 +1,5 @@
#!/usr/bin/python3.8
import sys, logging, os, argparse
FORMAT = "[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s"
parser = argparse.ArgumentParser(
description='Acquire data and store to a measurement file.'
)
@ -22,6 +21,8 @@ args = parser.parse_args()
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % args.loglevel)
FORMAT = "[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s"
logging.basicConfig(format=FORMAT, level=numeric_level)
import multiprocessing