Record is back working. Now ready for testing signal generator

This commit is contained in:
Anne de Jong 2021-05-04 15:10:13 +02:00
parent b9e31d79fd
commit 4657063467
7 changed files with 613 additions and 344 deletions

View File

@ -175,7 +175,7 @@ class AudioDaq: public Daq {
&streamoptions, &streamoptions,
&myerrorcallback &myerrorcallback
); );
} catch(...) { } catch(RtAudioError& e) {
if(rtaudio) delete rtaudio; if(rtaudio) delete rtaudio;
if(instreamparams) delete instreamparams; if(instreamparams) delete instreamparams;
if(outstreamparams) delete outstreamparams; if(outstreamparams) delete outstreamparams;

View File

@ -166,7 +166,6 @@ cdef class Daq:
try: try:
self.daq_device = cppDaq.createDaq(devinfo[0], daqconfig[0]) self.daq_device = cppDaq.createDaq(devinfo[0], daqconfig[0])
except Exception as e: except Exception as e:
print(e)
raise raise
self.nFramesPerBlock = self.daq_device.framesPerBlock() self.nFramesPerBlock = self.daq_device.framesPerBlock()
self.samplerate = self.daq_device.samplerate() self.samplerate = self.daq_device.samplerate()

View File

@ -45,6 +45,8 @@ cdef class DaqConfigurations:
def loadConfigs(): def loadConfigs():
""" """
Returns a list of currently available configurations Returns a list of currently available configurations
The first configuration is for input, the second for output
""" """
with lasp_shelve() as sh: with lasp_shelve() as sh:
configs_json = sh.load('daqconfigs', {}) configs_json = sh.load('daqconfigs', {})
@ -66,13 +68,16 @@ cdef class DaqConfigurations:
del configs_json[name] del configs_json[name]
sh.store('daqconfigs', configs_json) sh.store('daqconfigs', configs_json)
def constructDaqConfig(dict_data):
return DaqConfiguration.from_dict(dict_data)
cdef class DaqConfiguration: cdef class DaqConfiguration:
""" """
Initialize a device descriptor Initialize a device descriptor
""" """
def __init__(self):
pass def __str__(self):
return str(self.to_json())
@staticmethod @staticmethod
def fromDeviceInfo(DeviceInfo devinfo): def fromDeviceInfo(DeviceInfo devinfo):
@ -89,6 +94,9 @@ cdef class DaqConfiguration:
config_dict = json.loads(jsonstring) config_dict = json.loads(jsonstring)
return DaqConfiguration.from_dict(config_dict) return DaqConfiguration.from_dict(config_dict)
def __reduce__(self):
return (constructDaqConfig, (self.to_dict(),))
@staticmethod @staticmethod
def from_dict(pydict): def from_dict(pydict):
cdef: cdef:
@ -126,8 +134,8 @@ cdef class DaqConfiguration:
return pydaqcfg return pydaqcfg
def to_json(self): def to_dict(self):
return json.dumps(dict( return dict(
apicode = self.config.api.apicode, apicode = self.config.api.apicode,
device_name = self.config.device_name.decode('utf-8'), device_name = self.config.device_name.decode('utf-8'),
@ -158,8 +166,10 @@ cdef class DaqConfiguration:
inputIEPEEnabled = self.config.inputIEPEEnabled, inputIEPEEnabled = self.config.inputIEPEEnabled,
inputACCouplingMode = self.config.inputACCouplingMode, inputACCouplingMode = self.config.inputACCouplingMode,
inputRangeIndices = self.config.inputRangeIndices, inputRangeIndices = self.config.inputRangeIndices,
)
)) def to_json(self):
return json.dumps(self.to_dict())
def getInChannel(self, i:int): def getInChannel(self, i:int):
return DaqChannel( return DaqChannel(

View File

@ -1,28 +1,213 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Author: J.A. de Jong
Description: Read data from image stream and record sound at the same time Description: Read data from image stream and record sound at the same time
""" """
#import cv2 as cv #import cv2 as cv
import multiprocessing as mp
import signal
from .lasp_multiprocessingpatch import apply_patch
apply_patch()
from .lasp_atomic import Atomic from .lasp_atomic import Atomic
from .lasp_common import AvType from .lasp_common import AvType
from .device import (Daq, DeviceInfo, DaqConfiguration) from .device import (Daq, DeviceInfo, DaqConfiguration)
from threading import Thread, Lock from threading import Thread, Lock
import numpy as np import numpy as np
import time import time, logging
from enum import unique, Enum, auto
__all__ = ['AvStream'] __all__ = ['AvStream']
video_x, video_y = 640, 480 video_x, video_y = 640, 480
@unique
class StreamMsg(Enum):
"""
First part, control messages that can be send to the stream
"""
startStream = auto()
stopStream = auto()
getStreamMetaData = auto()
endProcess = auto()
activateSiggen = auto()
deactivateSiggen = auto()
"""
Second part, status messages that are send back on all listeners
"""
# "Normal messages"
streamStarted = auto()
streamStopped = auto()
streamMetaData = auto()
streamData = auto()
# Error messages
streamError = auto()
streamFatalError = auto()
class AvStreamProcess(mp.Process):
def __init__(self, daqconfig: DaqConfiguration,
pipe, in_qlist, outq):
"""
Args:
device: DeviceInfo
"""
self.daqconfig = daqconfig
self.pipe = pipe
self.in_qlist = in_qlist
self.outq = outq
self.aframectr = 0
self.daq = None
self.streamdata = None
super().__init__()
def run(self):
"""
The actual function running in a different process.
"""
# https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.siggen_activated = Atomic(False)
self.running = Atomic(False)
self.aframectr = Atomic(0)
daqconfig = self.daqconfig
devices = Daq.getDeviceInfo()
api_devices = devices[daqconfig.api]
matching_devices = [
device for device in api_devices if device.name == daqconfig.device_name]
if len(matching_devices) == 0:
self.pipe.send((StreamMsg.streamFatalError, f"Device {daqconfig.device_name} not available"))
self.device = matching_devices[0]
# logging.debug(self.device)
# logging.debug(self.daqconfig)
while True:
msg, data = self.pipe.recv()
logging.debug(f'Obtained message {msg}')
if msg == StreamMsg.activateSiggen:
self.siggen_activated <<= True
elif msg == StreamMsg.deactivateSiggen:
self.siggen_activated <<= False
elif msg == StreamMsg.endProcess:
if self.streamdata is not None and self.running:
logging.error('Process exit while stream is still running')
return
elif msg == StreamMsg.getStreamMetaData:
self.pipe.send((StreamMsg.streamMetaData, self.streamdata))
for q in self.in_qlist:
q.put((StreamMsg.streamMetaData, self.streamdata))
elif msg == StreamMsg.startStream:
self.startStream()
elif msg == StreamMsg.stopStream:
self.stopStream()
def startStream(self):
"""
Start the DAQ stream.
"""
if self.daq is not None:
self.pipe.send((StreamMsg.streamError, 'Stream has already been started'))
return
try:
self.daq = Daq(self.device,
self.daqconfig)
samplerate = self.daq.start(self.streamCallback)
streamdata = {
'blocksize': self.daq.nFramesPerBlock,
'samplerate': samplerate,
'dtype': self.daq.getNumpyDataType(),
}
self.streamdata = streamdata
self.pipe.send((StreamMsg.streamStarted, streamdata))
self.putAllInQueues(StreamMsg.streamStarted, streamdata)
except Exception as e:
logging.debug(f'Error starting stream: {e}')
self.daq = None
self.pipe.send((StreamMsg.streamError, str(e)))
def stopStream(self):
"""
Stop the DAQ stream.
"""
if self.daq is None:
self.pipe.send((StreamMsg.streamError, 'Stream is not running'))
return
try:
self.daq.stop()
self.running <<= False
self.streamdata = None
self.pipe.send((StreamMsg.streamStopped, None))
self.putAllInQueues(StreamMsg.streamStopped, None)
except Exception as e:
self.pipe.send((StreamMsg.streamError, f'Error stopping stream: {e}'))
self.streamdata
self.daq = None
def streamCallback(self, indata, outdata, nframes):
"""This is called (from a separate thread) for each audio block."""
self.aframectr += nframes
if self.siggen_activated:
if self.outq.empty():
outdata[:, :] = 0
msgtxt = 'Output signal buffer underflow'
self.pipe.send((StreamMsg.streamError, msgtxt))
self.putAllInQueues(StreamMsg.streamError, msgtxt)
else:
newdata = self.outq.get()
if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1:
self.pipe.send(StreamMsg.streamFatalError, 'Invalid output data obtained from queue')
return 1
if indata is not None:
self.putAllInQueues(StreamMsg.streamData, indata)
return 0 if self.running else 1
def putAllInQueues(self, msg, data):
"""
Put a message and data on all input queues in the queue list
"""
for q in self.in_qlist:
# Fan out the input data to all queues in the queue list
q.put((msg, data))
def ignoreSigInt():
signal.signal(signal.SIGINT, signal.SIG_IGN)
class AvStream: class AvStream:
"""Audio and video data stream, to which callbacks can be added for """Audio and video data stream, to which callbacks can be adde
processing the data.""" daqconfig: DAQConfiguration instance. If duplex mode flag is set,
please make sure that output_device is None, as in that case the
output config will be taken from the input device.
video:
"""
def __init__(self, def __init__(self,
avtype: AvType, avtype: AvType,
device: DeviceInfo,
daqconfig: DaqConfiguration, daqconfig: DaqConfiguration,
video=None): video=None):
"""Open a stream for audio in/output and video input. For audio output, """Open a stream for audio in/output and video input. For audio output,
@ -50,146 +235,160 @@ class AvStream:
self.input_sensitivity = np.asarray(self.input_sensitivity) self.input_sensitivity = np.asarray(self.input_sensitivity)
self.input_qtys = [ch.qty for ch in en_in_ch] self.input_qtys = [ch.qty for ch in en_in_ch]
# Counters for the number of frames that have been coming in # Counters for the number of frames that have been coming in
self._aframectr = Atomic(0)
self._vframectr = Atomic(0) self._vframectr = Atomic(0)
# Lock
self._callbacklock = Lock()
self._running = Atomic(False) # Multiprocessing manager, pipe, output queue, input queue,
self.manager = mp.managers.SyncManager()
# https://stackoverflow.com/questions/21104997/keyboard-interrupt-with-pythons-multiprocessing
self.manager.start(ignoreSigInt)
self._video = video # List of queues for all entities that require 'microphone' or input
self._video_started = Atomic(False) # data. We need a local list, to manage listener queues, as the queues
# which are in the manager list get a new object id. The local list is
# used to find the index in the manager queues list upon deletion by
# 'removeListener()'
self.in_qlist = self.manager.list([])
self.in_qlist_local = []
# Storage for callbacks, specified by type # Queue used for signal generator data
self._callbacks = { self.outq = self.manager.Queue()
AvType.audio_input: [],
AvType.audio_output: [], # Messaging pipe
AvType.video: [] self.pipe, child_pipe = mp.Pipe(duplex=True)
} self.streamProcess = AvStreamProcess(
daqconfig,
child_pipe,
self.in_qlist,
self.outq)
self.streamProcess.start()
# Possible, but long not tested: store video # Possible, but long not tested: store video
# self._video = video
# self._video_started = Atomic(False)
self._videothread = None self._videothread = None
# self._audiobackend = RtAudio(daqconfig.api) self.daqconfig = daqconfig
self._daq = Daq(device, daqconfig) self.streammetadata = None
self.blocksize = self._daq.nFramesPerBlock
self.samplerate = self._daq.samplerate
self.dtype = self._daq.getNumpyDataType()
def nCallbacks(self): def getStreamMetaData(self):
"""Returns the current number of installed callbacks.""" return self.streammetadata
return len(self._callbacks[AvType.audio_input]) + \
len(self._callbacks[AvType.audio_output]) + \
len(self._callbacks[AvType.video])
def addCallback(self, cb: callable, cbtype: AvType): def getOutputQueue(self):
"""Add as stream callback to the list of callbacks.""" """
with self._callbacklock: Returns the output queue object.
outputcallbacks = self._callbacks[AvType.audio_output]
if cbtype == AvType.audio_output and len(outputcallbacks) > 0:
raise RuntimeError(
'Only one audio output callback can be allowed')
if cb not in self._callbacks[cbtype]: Note, should only be used by one signal generator at the time!
self._callbacks[cbtype].append(cb) """
return self.outq
def removeCallback(self, cb, cbtype: AvType): def addListener(self):
with self._callbacklock: """
if cb in self._callbacks[cbtype]: Add a listener queue to the list of queues, and return the queue.
self._callbacks[cbtype].remove(cb)
Returns:
listener queue
"""
newqueue = self.manager.Queue()
self.in_qlist.append(newqueue)
self.in_qlist_local.append(newqueue)
return newqueue
def removeListener(self, queue):
idx = self.in_qlist_local.index(queue)
del self.in_qlist[idx]
del self.in_qlist_local[idx]
def nListeners(self):
"""Returns the current number of installed listeners."""
return len(self.in_qlist)
def start(self): def start(self):
"""Start the stream, which means the callbacks are called with stream """Start the stream, which means the callbacks are called with stream
data (audio/video)""" data (audio/video)"""
logging.debug('Starting stream...')
if self._running: self.pipe.send((StreamMsg.startStream, None))
raise RuntimeError('Stream already started') msg, data = self.pipe.recv()
if msg == StreamMsg.streamStarted:
assert self._videothread is None self.streammetadata = data
return data
self._running <<= True elif msg == StreamMsg.streamError:
if self._video is not None: raise RuntimeError(data)
self._videothread = Thread(target=self._videoThread)
self._videothread.start()
else: else:
self._video_started <<= True raise RuntimeError('BUG: got unexpected message: {msg}')
self.samplerate = self._daq.start(self._audioCallback)
def _videoThread(self):
cap = cv.VideoCapture(self._video)
if not cap.isOpened():
cap.open()
vframectr = 0
loopctr = 0
while self._running:
ret, frame = cap.read()
# print(frame.shape)
if ret is True:
if vframectr == 0:
self._video_started <<= True
with self._callbacklock:
for cb in self._callbacks[AvType.video]:
cb(frame, vframectr)
vframectr += 1
self._vframectr += 1
else:
loopctr += 1
if loopctr == 10:
print('Error: no video capture!')
time.sleep(0.2)
cap.release()
print('stopped videothread')
def _audioCallback(self, indata, outdata, nframes):
"""This is called (from a separate thread) for each audio block."""
self._aframectr += nframes
with self._callbacklock:
# Count the number of output callbacks. If no output callbacks are
# present, and there should be output callbacks, we explicitly set
# the output buffer to zero
noutput_cb = len(self._callbacks[AvType.audio_output])
# Loop over callbacks
if outdata is not None:
try:
if len(self._callbacks[AvType.audio_output]) == 0:
outdata[:, :] = 0
for cb in self._callbacks[AvType.audio_output]:
cb(indata, outdata, self._aframectr())
except Exception as e:
print(e)
return 2
if indata is not None:
try:
for cb in self._callbacks[AvType.audio_input]:
cb(indata, outdata, self._aframectr())
except Exception as e:
print(e)
return 1
return 0 if self._running else 1
def stop(self): def stop(self):
self._running <<= False self.pipe.send((StreamMsg.stopStream, None))
msg, data = self.pipe.recv()
if msg == StreamMsg.streamStopped:
return
elif msg == StreamMsg.streamError:
raise RuntimeError(data)
else:
raise RuntimeError('BUG: got unexpected message: {msg}')
if self._video: def cleanup(self):
self._videothread.join() """
self._videothread = None Stops the stream if it is still running, and after that, it stops the
stream process.
self._aframectr <<= 0 This method SHOULD always be called before removing a AvStream object.
self._vframectr <<= 0 Otherwise things will wait forever...
self._video_started <<= False
self._daq.stop() """
self._daq = None self.pipe.send((StreamMsg.endProcess, None))
logging.debug('Joining stream process...')
def isRunning(self): self.streamProcess.join()
return self._running() logging.debug('Joining stream process done')
def hasVideo(self): def hasVideo(self):
return True if self._video is not None else False """
Stub, TODO: for future
"""
return False
def isRunning(self):
self.pipe.send((StreamMsg.getStreamMetaData, None))
msg, data = self.pipe.recv()
if msg == StreamMsg.streamMetaData:
streamdata = data
return streamdata is not None
elif msg == StreamMsg.streamError:
raise RuntimeError(data)
else:
raise RuntimeError('BUG: got unexpected message: {msg}')
# def _videoThread(self):
# cap = cv.VideoCapture(self._video)
# if not cap.isOpened():
# cap.open()
# vframectr = 0
# loopctr = 0
# while self._running:
# ret, frame = cap.read()
# # print(frame.shape)
# if ret is True:
# if vframectr == 0:
# self._video_started <<= True
# with self._callbacklock:
# for cb in self._callbacks[AvType.video]:
# cb(frame, vframectr)
# vframectr += 1
# self._vframectr += 1
# else:
# loopctr += 1
# if loopctr == 10:
# print('Error: no video capture!')
# time.sleep(0.2)
# cap.release()
# print('stopped videothread')
# def hasVideo(self):
# return True if self._video is not None else False

View File

@ -3,87 +3,103 @@
""" """
Read data from stream and record sound and video at the same time Read data from stream and record sound and video at the same time
""" """
from .lasp_atomic import Atomic
from threading import Condition
from .lasp_avstream import AvType, AvStream
import h5py
import dataclasses import dataclasses
import logging
import os import os
import time import time
import h5py
from .lasp_avstream import AvStream, AvType, StreamMsg
@dataclasses.dataclass @dataclasses.dataclass
class RecordStatus: class RecordStatus:
curT: float curT: float
done: bool done: bool
class Recording: class Recording:
def __init__(self, fn: str, stream: AvStream, def __init__(self, fn: str, stream: AvStream,
rectime: float=None, wait: bool = True, rectime: float = None, wait: bool = True,
progressCallback=None): progressCallback=None):
""" """
Start a recording. Blocks if wait is set to True.
Args: Args:
fn: Filename to record to. extension is added fn: Filename to record to. Extension is automatically added if not
provided.
stream: AvStream instance to record from. Should have input stream: AvStream instance to record from. Should have input
channels! channels!
rectime: Recording time, None for infinite rectime: Recording time [s], None for infinite, in seconds. If set
to None, or np.inf, the recording continues indefintely.
progressCallback: callable that is called with an instance of
RecordStatus instance as argument.
""" """
ext = '.h5' ext = '.h5'
if ext not in fn: if ext not in fn:
fn += ext fn += ext
self._stream = stream self._stream = stream
self.blocksize = stream.blocksize
self.samplerate = stream.samplerate
self._running = Atomic(False)
self._running_cond = Condition()
self.rectime = rectime self.rectime = rectime
self._fn = fn self._fn = fn
self._video_frame_positions = [] self._video_frame_positions = []
self._curT_rounded_to_seconds = 0 self._curT_rounded_to_seconds = 0
self._ablockno = Atomic(0) # Counter of the number of blocks
self._ablockno = 0
self._vframeno = 0 self._vframeno = 0
self._progressCallback = progressCallback self._progressCallback = progressCallback
self._wait = wait self._wait = wait
self._f = h5py.File(self._fn, 'w') self._f = h5py.File(self._fn, 'w')
# This flag is used to delete the file on finish(), and can be used
# when a recording is canceled.
self._deleteFile = False self._deleteFile = False
def setDelete(self, val: bool):
"""
Set the delete flag. If set, measurement file is deleted at the end of
the recording. Typically used for cleaning up after canceling a
recording.
"""
self._deleteFile = val
def __enter__(self):
"""
with Recording(fn, stream, wait=False):
event_loop_here()
or:
with Recording(fn, stream, wait=True):
pass
"""
stream = self._stream
f = self._f f = self._f
nchannels = len(stream.input_channel_names) nchannels = len(stream.input_channel_names)
# Input queue
self.inq = stream.addListener()
# Start the stream, if it is not running
try:
if not stream.isRunning():
metadata = stream.start()
else:
metadata = stream.getStreamMetaData()
except:
# Cleanup stuff, something is going wrong when starting the stream
try:
f.close()
except:
pass
self.__deleteFile()
self.blocksize = metadata['blocksize']
self.samplerate = metadata['samplerate']
self.dtype = metadata['dtype']
# The 'Audio' dataset as specified in lasp_measurement, where data is
# send to. We use gzip as compression, this gives moderate a moderate
# compression to the data.
self._ad = f.create_dataset('audio', self._ad = f.create_dataset('audio',
(1, stream.blocksize, nchannels), (1, self.blocksize, nchannels),
dtype=stream.dtype, dtype=self.dtype,
maxshape=(None, stream.blocksize, maxshape=(
None, # This means, we can add blocks
# indefinitely
self.blocksize,
nchannels), nchannels),
compression='gzip' compression='gzip'
) )
# TODO: This piece of code is not up-to-date and should be changed at a
# later instance once we really want to record video simultaneously
# with audio.
if stream.hasVideo(): if stream.hasVideo():
video_x, video_y = stream.video_x, stream.video_y video_x, video_y = stream.video_x, stream.video_y
self._vd = f.create_dataset('video', self._vd = f.create_dataset('video',
@ -94,59 +110,113 @@ class Recording:
compression='gzip' compression='gzip'
) )
f.attrs['samplerate'] = stream.samplerate # Set the bunch of attributes
f.attrs['samplerate'] = self.samplerate
f.attrs['nchannels'] = nchannels f.attrs['nchannels'] = nchannels
f.attrs['blocksize'] = stream.blocksize f.attrs['blocksize'] = self.blocksize
f.attrs['sensitivity'] = stream.input_sensitivity f.attrs['sensitivity'] = stream.input_sensitivity
f.attrs['channel_names'] = stream.input_channel_names f.attrs['channel_names'] = stream.input_channel_names
f.attrs['time'] = time.time() f.attrs['time'] = time.time()
# Measured quantities
f.attrs['qtys'] = [qty.to_json() for qty in stream.input_qtys] f.attrs['qtys'] = [qty.to_json() for qty in stream.input_qtys]
self._running <<= True
if not stream.isRunning(): logging.debug('Starting record....')
stream.start() # TODO: Fix this later when we want video
# if stream.hasVideo():
print('Starting record....') # stream.addCallback(self._aCallback, AvType.audio_input)
stream.addCallback(self._aCallback, AvType.audio_input) self.stop = False
if stream.hasVideo():
stream.addCallback(self._aCallback, AvType.audio_input)
if self._wait: if self._wait:
with self._running_cond: logging.debug('Stop recording with CTRL-C')
print('Stop recording with CTRL-C')
try: try:
while self._running: while not self.stop:
self._running_cond.wait() self.handleQueue()
time.sleep(0.01)
except KeyboardInterrupt: except KeyboardInterrupt:
print("Keyboard interrupt on record") logging.debug("Keyboard interrupt on record")
self._running <<= False finally:
self.finish()
def setDelete(self, val: bool):
"""
Set the delete flag. If set, measurement file is deleted at the end of
the recording. Typically used for cleaning up after canceling a
recording.
"""
self._deleteFile = val
def __exit__(self, type, value, traceback): def finish(self):
self._running <<= False """
This method should be called to finish and a close a recording file,
remove the queue from the stream, etc.
"""
stream = self._stream stream = self._stream
stream.removeCallback(self._aCallback, AvType.audio_input) # TODO: Fix when video
if stream.hasVideo(): # if stream.hasVideo():
stream.removeCallback(self._vCallback, AvType.video_input) # stream.removeCallback(self._vCallback, AvType.video_input)
self._f['video_frame_positions'] = self._video_frame_positions # self._f['video_frame_positions'] = self._video_frame_positions
try:
stream.removeListener(self.inq)
except Exception as e:
logging.error(f'Could not remove queue from stream: {e}')
try:
# Close the recording file
self._f.close() self._f.close()
print('\nEnding record') except Exception as e:
logging.error(f'Error closing file: {e}')
logging.debug('Recording ended')
if self._deleteFile: if self._deleteFile:
self.__deleteFile()
def __deleteFile(self):
"""
Cleanup the recording file.
"""
try: try:
os.remove(self._fn) os.remove(self._fn)
except Exception as e: except Exception as e:
print(f'Error deleting file: {self._fn}') logging.debug(f'Error deleting file: {self._fn}')
def handleQueue(self):
"""
This method should be called to grab data from the input queue, which
is filled by the stream, and put it into a file. It should be called at
a regular interval to prevent overflowing of the queue. It is called
within the start() method of the recording, if block is set to True.
Otherwise, it should be called from its parent at regular intervals.
For example, in Qt this can be done using a QTimer.
def _aCallback(self, indata, outdata, aframe): """
if indata is None: while self.inq.qsize() > 0:
return msg, data = self.inq.get()
if msg == StreamMsg.streamData:
self.__addTimeData(data)
elif msg == StreamMsg.streamStarted:
pass
elif msg == StreamMsg.streamMetaData:
pass
else:
# An error occured, we do not remove the file, but we stop.
self.stop = True
curT = self._ablockno()*self.blocksize/self.samplerate def __addTimeData(self, indata):
"""
Called by handleQueue() and adds new time data to the storage file.
"""
# The current time that is recorded and stored into the file, without
# the new data
curT = self._ablockno*self.blocksize/self.samplerate
recstatus = RecordStatus( recstatus = RecordStatus(
curT = curT, curT=curT,
done = False) done=False)
if self._progressCallback is not None: if self._progressCallback is not None:
self._progressCallback(recstatus) self._progressCallback(recstatus)
@ -159,22 +229,22 @@ class Recording:
if self.rectime is not None and curT > self.rectime: if self.rectime is not None and curT > self.rectime:
# We are done! # We are done!
self._running <<= False
with self._running_cond:
self._running_cond.notify()
if self._progressCallback is not None: if self._progressCallback is not None:
recstatus.done = True recstatus.done = True
self._progressCallback(recstatus) self._progressCallback(recstatus)
self.stop = True
return return
self._ad.resize(self._ablockno()+1, axis=0) # Add the data to the file
self._ad[self._ablockno(), :, :] = indata self._ad.resize(self._ablockno+1, axis=0)
self._ad[self._ablockno, :, :] = indata
# Increase the block counter
self._ablockno += 1 self._ablockno += 1
def _vCallback(self, frame, framectr): # def _vCallback(self, frame, framectr):
self._video_frame_positions.append(self._ablockno()) # self._video_frame_positions.append(self._ablockno())
vframeno = self._vframeno # vframeno = self._vframeno
self._vd.resize(vframeno+1, axis=0) # self._vd.resize(vframeno+1, axis=0)
self._vd[vframeno, :, :] = frame # self._vd[vframeno, :, :] = frame
self._vframeno += 1 # self._vframeno += 1

View File

@ -6,11 +6,10 @@ Author: J.A. de Jong - ASCEE
Description: Signal generator code Description: Signal generator code
""" """
import multiprocessing as mp
import dataclasses import dataclasses
import logging import logging
import multiprocessing as mp
from typing import Tuple from typing import Tuple
import numpy as np import numpy as np
from .filter import PinkNoise from .filter import PinkNoise
@ -18,6 +17,7 @@ from .lasp_octavefilter import SosOctaveFilterBank, SosThirdOctaveFilterBank
from .filter import OctaveBankDesigner, PinkNoise, ThirdOctaveBankDesigner from .filter import OctaveBankDesigner, PinkNoise, ThirdOctaveBankDesigner
from .lasp_avstream import AvStream, AvType from .lasp_avstream import AvStream, AvType
from .wrappers import Siggen as pyxSiggen, Equalizer from .wrappers import Siggen as pyxSiggen, Equalizer
from enum import Enum, unique, auto
QUEUE_BUFFER_TIME = 0.3 # The amount of time used in the queues for buffering QUEUE_BUFFER_TIME = 0.3 # The amount of time used in the queues for buffering
# of data, larger is more stable, but also enlarges latency # of data, larger is more stable, but also enlarges latency
@ -25,26 +25,30 @@ QUEUE_BUFFER_TIME = 0.3 # The amount of time used in the queues for buffering
__all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"] __all__ = ["SignalType", "NoiseType", "SiggenMessage", "SiggenData", "Siggen"]
class SignalType:
Periodic = 0 class SignalType(Enum):
Noise = 1 Periodic = auto()
Sweep = 2 Noise = auto()
Meas = 3 Sweep = auto()
Meas = auto()
class NoiseType: @unique
white = (0, "White noise") class NoiseType(Enum):
pink = (1, "Pink noise") white = "White noise"
types = (white, pink) pink = "Pink noise"
def __str__(self):
return self.value
@staticmethod @staticmethod
def fillComboBox(combo): def fillComboBox(combo):
for type_ in NoiseType.types: for type_ in list(NoiseType):
combo.addItem(type_[1]) combo.addItem(str(type_))
@staticmethod @staticmethod
def getCurrent(cb): def getCurrent(cb):
return NoiseType.types[cb.currentIndex()] return list(NoiseType)[cb.currentIndex()]
class SiggenWorkerDone(Exception): class SiggenWorkerDone(Exception):
@ -52,21 +56,22 @@ class SiggenWorkerDone(Exception):
return "Done generating signal" return "Done generating signal"
class SiggenMessage: @unique
class SiggenMessage(Enum):
""" """
Different messages that can be send to the signal generator over the pipe Different messages that can be send to the signal generator over the pipe
connection. connection.
""" """
stop = 0 # Stop and quit the signal generator stop = auto() # Stop and quit the signal generator
generate = 1 generate = auto()
adjustVolume = 2 # Adjust the volume adjustVolume = auto() # Adjust the volume
newEqSettings = 3 # Forward new equalizer settings newEqSettings = auto() # Forward new equalizer settings
# These messages are send back to the main thread over the pipe # These messages are send back to the main thread over the pipe
ready = 4 ready = auto()
error = 5 error = auto()
done = 6 done = auto()
@dataclasses.dataclass @dataclasses.dataclass
@ -90,13 +95,16 @@ class SiggenData:
signaltypedata: Tuple = None signaltypedata: Tuple = None
def siggenFcn(siggendata: SiggenData, nblocks_buffer: int, def siggenFcn(siggendata: SiggenData, dataq: mp.Queue, pipe):
dataq: mp.Queue, pipe
):
""" """
Main function running in a different process, is responsible for generating Main function running in a different process, is responsible for generating
new signal data. Uses the signal queue to push new generated signal data new signal data. Uses the signal queue to push new generated signal data
on. on.
Args:
siggendata: The signal generator data to start with.
dataq: The queue to put generated signal on
pipe: Control and status messaging pipe
""" """
fs = siggendata.fs fs = siggendata.fs
nframes_per_block = siggendata.nframes_per_block nframes_per_block = siggendata.nframes_per_block
@ -106,6 +114,9 @@ def siggenFcn(siggendata: SiggenData, nblocks_buffer: int,
signaltype = siggendata.signaltype signaltype = siggendata.signaltype
signaltypedata = siggendata.signaltypedata signaltypedata = siggendata.signaltypedata
nblocks_buffer = max(
1, int(QUEUE_BUFFER_TIME * fs/ nframes_per_block)
)
def generate(siggen, eq): def generate(siggen, eq):
""" """
@ -188,19 +199,11 @@ def siggenFcn(siggendata: SiggenData, nblocks_buffer: int,
pipe.send((SiggenMessage.done, None)) pipe.send((SiggenMessage.done, None))
while True: while True:
if pipe.poll(timeout=QUEUE_BUFFER_TIME / 2):
msg, data = pipe.recv() msg, data = pipe.recv()
if msg == SiggenMessage.stop: if msg == SiggenMessage.stop:
logging.debug("Signal generator caught 'stop' message. Exiting.") logging.debug("Signal generator caught 'stop' message. Exiting.")
return 0 return 0
elif msg == SiggenMessage.generate:
# logging.debug(f"Signal generator caught 'generate' message")
try:
while dataq.qsize() < nblocks_buffer:
# Generate new data and put it in the queue!
generate(siggen, eq)
except SiggenWorkerDone:
pipe.send(SiggenMessage.done)
return 0
elif msg == SiggenMessage.adjustVolume: elif msg == SiggenMessage.adjustVolume:
logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS") logging.debug(f"Signal generator caught 'adjustVolume' message. New volume = {level_dB:.1f} dB FS")
level_dB = data level_dB = data
@ -212,6 +215,14 @@ def siggenFcn(siggendata: SiggenData, nblocks_buffer: int,
pipe.send( pipe.send(
SiggenMessage.error, "BUG: Generator caught unknown message. Quiting" SiggenMessage.error, "BUG: Generator caught unknown message. Quiting"
) )
elif dataq.qsize() < nblocks_buffer:
# Generate new data and put it in the queue!
try:
generate(siggen, eq)
except SiggenWorkerDone:
pipe.send(SiggenMessage.done)
return 0
return 1 return 1
@ -221,20 +232,14 @@ class Siggen:
unload the work in the calling thread. unload the work in the calling thread.
""" """
def __init__(self, stream: AvStream, siggendata: SiggenData): def __init__(self, dataq, siggendata: SiggenData):
"""""" """"""
self.stream = stream
self.nblocks_buffer = max(
1, int(QUEUE_BUFFER_TIME * stream.samplerate / stream.blocksize)
)
qs = [mp.Queue() for i in range(3)]
self.dataq = mp.Queue()
self.pipe, client_end = mp.Pipe(duplex=True) self.pipe, client_end = mp.Pipe(duplex=True)
self.process = mp.Process( self.process = mp.Process(
target=siggenFcn, target=siggenFcn,
args=(siggendata, self.nblocks_buffer, self.dataq, client_end), args=(siggendata, dataq, client_end),
) )
self.process.start() self.process.start()
@ -274,7 +279,6 @@ class Siggen:
if self.stopped: if self.stopped:
raise RuntimeError('BUG: This Siggen object cannot be used again.') raise RuntimeError('BUG: This Siggen object cannot be used again.')
self.stream.addCallback(self.streamCallback, AvType.audio_output)
self.handle_msgs() self.handle_msgs()
def stop(self): def stop(self):

View File

@ -1,77 +1,64 @@
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import sys, logging, os, argparse
import sys logging.basicConfig(level=logging.DEBUG)
import multiprocessing
from lasp.lasp_multiprocessingpatch import apply_patch
parser = argparse.ArgumentParser(
description='Acquire data and store a measurement file'
)
parser.add_argument('filename', type=str,
help='File name to record to.'
' Extension is automatically added.')
parser.add_argument('--duration', '-d', type=float,
help='The recording duration in [s]')
device_help = 'DAQ Device to record from'
parser.add_argument('--input-daq', '-i', help=device_help, type=str,
default='Default')
args = parser.parse_args()
from lasp.device import Daq, DaqChannel, DaqConfigurations
from lasp.lasp_avstream import AvStream, AvType from lasp.lasp_avstream import AvStream, AvType
from lasp.lasp_record import Recording from lasp.lasp_record import Recording
from lasp.device import DaqConfiguration, Daq, DaqChannel
configs = DaqConfiguration.loadConfigs() if __name__ == '__main__':
multiprocessing.set_start_method('forkserver', force=True)
apply_patch()
for i, (key, val) in enumerate(configs.items()): parser = argparse.ArgumentParser(
description='Acquire data and store a measurement file'
)
parser.add_argument('filename', type=str,
help='File name to record to.'
' Extension is automatically added.')
parser.add_argument('--duration', '-d', type=float,
help='The recording duration in [s]')
device_help = 'DAQ Device to record from'
parser.add_argument('--input-daq', '-i', help=device_help, type=str,
default='Default')
args = parser.parse_args()
configs = DaqConfigurations.loadConfigs()
config_keys = [key for key in configs.keys()]
for i, key in enumerate(config_keys):
print(f'{i:2} : {key}') print(f'{i:2} : {key}')
daqindex = input('Please enter required config: ') choosen_index = input('Number of configuration to use: ')
try: try:
daqindex = int(daqindex) daqindex = int(choosen_index)
except: except:
sys.exit(0) sys.exit(0)
for i, (key, val) in enumerate(configs.items()): choosen_key = config_keys[daqindex]
if i == daqindex: config = configs[choosen_key].input_config
config = configs[key]
config.__reduce__()
config = configs[key] print(f'Choosen configuration: {choosen_key}')
try:
stream = AvStream(
print(config)
# daq = RtAudio()
devices = Daq.getDeviceInfo()
input_devices = {}
for device in devices:
if device.inputchannels >= 0:
input_devices[device.name] = device
try:
input_device = input_devices[config.input_device_name]
except KeyError:
raise RuntimeError(f'Input device {config.input_device_name} not available')
print(input_device)
stream = AvStream(input_device,
AvType.audio_input, AvType.audio_input,
config) config)
# stream.start()
rec = Recording(args.filename, stream, args.duration)
rec = Recording(args.filename, stream, args.duration) # input('Stream started, press any key to start record')
stream.start() finally:
with rec: try:
stream.cleanup()
del stream
except NameError:
pass pass
print('Stopping stream...')
stream.stop()
print('Stream stopped')
print('Closing stream...')
print('Stream closed')