Playing a sine wave works, recording seems to work, although we have not checked what is actually recorded. For that we now switch to ACME!

This commit is contained in:
Anne de Jong 2021-05-08 15:06:11 +02:00
parent b031dfb280
commit 57fe4e6b7c
6 changed files with 278 additions and 192 deletions

View File

@ -5,13 +5,14 @@ Author: J.A. de Jong
Description: Controlling an audio stream in a different process.
"""
#import cv2 as cv
from .lasp_multiprocessingpatch import apply_patch
apply_patch()
import multiprocessing as mp
import time, logging, signal
import numpy as np
from enum import unique, Enum, auto
from dataclasses import dataclass
from .lasp_multiprocessingpatch import apply_patch
apply_patch()
from .lasp_atomic import Atomic
from .lasp_common import AvType
@ -127,6 +128,7 @@ class AudioStream:
blocksize = self.daq.nFramesPerBlock,
dtype = self.daq.getNumpyDataType()
)
self.running <<= True
def streamCallback(self, indata, outdata, nframes):
@ -134,7 +136,12 @@ class AudioStream:
This is called (from a separate thread) for each block
of audio data.
"""
return self.processCallback(self, indata, outdata)
if not self.running():
return 1
rv = self.processCallback(self, indata, outdata)
if rv != 0:
self.running <<= False
return rv
def stop(self):
"""
@ -187,7 +194,7 @@ class AvStreamProcess(mp.Process):
while True:
msg, data = self.pipe.recv()
logging.debug(f'Obtained message {msg}')
logging.debug(f'Streamprocess obtained message {msg}')
if msg == StreamMsg.activateSiggen:
self.siggen_activated <<= True
@ -207,12 +214,14 @@ class AvStreamProcess(mp.Process):
return
elif msg == StreamMsg.getStreamMetaData:
avtype = data
avtype, = data
stream = self.streams[avtype]
if stream is not None:
self.sendPipe(StreamMsg.streamMetaData, avtype, stream.streammetadata)
self.sendPipeAndAllQueues(StreamMsg.streamMetaData, avtype,
stream.streammetadata)
else:
self.sendPipe(StreamMsg.streamMetaData, avtype, None)
self.sendPipeAndAllQueues(StreamMsg.streamMetaData, avtype,
None)
elif msg == StreamMsg.startStream:
avtype, daqconfig = data
@ -254,7 +263,8 @@ class AvStreamProcess(mp.Process):
stream.stop()
self.sendPipeAndAllQueues(StreamMsg.streamStopped, stream.avtype)
except Exception as e:
self.sendPipeAndAllQueues(StreamMsg.streamError, stream.avtype, "Error occured in stopping stream: {str(e)}")
self.sendPipeAndAllQueues(StreamMsg.streamError, stream.avtype,
"Error occured in stopping stream: {str(e)}")
self.streams[avtype] = None
@ -301,7 +311,7 @@ class AvStreamProcess(mp.Process):
else:
avtype = (avtype,)
for t in avtype:
if self.streams[t] is not None:
if self.streams[t] is not None and self.streams[t].running():
return True
return False
@ -321,27 +331,34 @@ class AvStreamProcess(mp.Process):
def streamCallback(self, audiostream, indata, outdata):
"""This is called (from a separate thread) for each audio block."""
# logging.debug('streamCallback()')
if self.siggen_activated:
# logging.debug('siggen_activated')
if self.outq.empty():
outdata[:, :] = 0
msgtxt = 'Output signal buffer underflow'
self.sendPipeAndAllQueues(StreamMsg.streamError,
audiostream.avtype,
msgtxt)
else:
newdata = self.outq.get()
if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1:
self.sendPipeAndAllQueues(StreamMsg.streamFatalError,
if outdata is not None:
if self.siggen_activated():
if not self.outq.empty():
newdata = self.outq.get()
if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1:
msgtxt = 'Invalid output data obtained from queue'
logging.fatal(msgtxt)
self.sendPipeAndAllQueues(StreamMsg.streamFatalError,
audiostream.avtype,
msgtxt
)
return 1
outdata[:, :] = newdata[:, None]
else:
outdata[:, :] = 0
msgtxt = 'Output signal buffer underflow'
logging.error(msgtxt)
self.sendPipeAndAllQueues(StreamMsg.streamError,
audiostream.avtype,
'Invalid output data obtained from queue')
return 1
outdata[:, :] = newdata[:, None]
msgtxt)
# Siggen not activated
else:
logging.debug('siggen not activated')
outdata[:, :] = 0
if indata is not None:
self.putAllInQueues(StreamMsg.streamData, audiostream.avtype,
indata)
self.putAllInQueues(StreamMsg.streamData, indata)
return 0
@ -419,10 +436,13 @@ class StreamManager:
Handle messages that are still on the pipe.
"""
# logging.debug('StreamManager::handleMessages()')
while self.pipe.poll():
msg, data = self.pipe.recv()
logging.debug(f'StreamManager obtained message {msg}')
if msg == StreamMsg.streamStarted:
avtype, streammetadata = data
# logging.debug(f'{avtype}, {streammetadata}')
self.streamstatus[avtype].lastStatus = msg
self.streamstatus[avtype].errorTxt = None
self.streamstatus[avtype].streammetadata = streammetadata
@ -439,6 +459,11 @@ class StreamManager:
self.streamstatus[avtype].lastStatus = msg
self.streamstatus[avtype].errorTxt = None
elif msg == StreamMsg.streamFatalError:
avtype, errorTxt = data
logging.critical(f'Streamprocess fatal error: {errorTxt}')
self.cleanup()
elif msg == StreamMsg.streamMetaData:
avtype, metadata = data
self.streamstatus[avtype].streammetadata = metadata
@ -454,10 +479,10 @@ class StreamManager:
def getStreamStatus(self, avtype: AvType):
"""
Returns the current stream Status.
Sends a request for the stream status over the pipe, for given AvType
"""
self.handleMessages()
return self.streamstatus[avtype]
self.sendPipe(StreamMsg.getStreamMetaData, avtype)
def getOutputQueue(self):
"""
@ -465,6 +490,7 @@ class StreamManager:
Note, should (of course) only be used by one signal generator at the time!
"""
self.handleMessages()
return self.outq
def activateSiggen(self):
@ -504,15 +530,28 @@ class StreamManager:
"""Returns the current number of installed listeners."""
return len(self.in_qlist)
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration):
def startStream(self, avtype: AvType, daqconfig: DaqConfiguration,
wait=False):
"""
Start the stream, which means the callbacks are called with stream
data (audio/video)
Args:
wait: Wait until the stream starts talking before returning from
this function.
"""
logging.debug('Starting stream...')
self.sendPipe(StreamMsg.startStream, avtype, daqconfig)
self.handleMessages()
self.sendPipe(StreamMsg.startStream, avtype, daqconfig)
if wait:
# Wait for a message to come into the pipe
while True:
if self.pipe.poll():
self.handleMessages()
if self.streamstatus[avtype].lastStatus != StreamMsg.streamStopped:
break
def stopStream(self, avtype: AvType):
self.handleMessages()

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import platform
import os, platform
import shelve
import sys
import appdirs

View File

@ -48,8 +48,6 @@ import os, time, wave, logging
from .lasp_common import SIQtys, Qty, getFreq
from .device import DaqChannel
from .wrappers import AvPowerSpectra, Window, PowerSpectra
logger = logging.Logger(__name__)
def getSampWidth(dtype):
@ -242,7 +240,7 @@ class Measurement:
except KeyError:
# If quantity data is not available, this is an 'old'
# measurement file.
logger.debug('Physical quantity data not available in measurement file. Assuming {SIQtys.default}')
logging.debug('Physical quantity data not available in measurement file. Assuming {SIQtys.default}')
self._qtys = [SIQtys.default for i in range(self.nchannels)]
def setAttribute(self, atrname, value):

View File

@ -1,14 +1,10 @@
#!/usr/bin/python3
#!/usr/bin/python3.8
# -*- coding: utf-8 -*-
"""
Read data from stream and record sound and video at the same time
"""
import dataclasses
import logging
import os
import time
import h5py
from .lasp_avstream import StreamMsg, StreamManager
import dataclasses, logging, os, time, h5py
from .lasp_avstream import StreamManager, StreamMetaData, StreamMsg
from .lasp_common import AvType
@ -40,7 +36,8 @@ class Recording:
if ext not in fn:
fn += ext
self._stream = stream
self.smgr = streammgr
self.metadata = None
self.rectime = rectime
self._fn = fn
@ -61,66 +58,26 @@ class Recording:
# when a recording is canceled.
self._deleteFile = False
f = self._f
nchannels = len(stream.input_channel_names)
# Input queue
self.inq = stream.addListener()
# Start the stream, if it is not running
try:
if not stream.isRunning():
metadata = stream.start()
else:
metadata = stream.getStreamMetaData()
except:
# Input queue
self.inq = streammgr.addListener()
except RuntimeError:
# Cleanup stuff, something is going wrong when starting the stream
try:
f.close()
except:
pass
self._f.close()
except Exception as e:
logging.error(
'Error preliminary closing measurement file {fn}: {str(e)}')
self.__deleteFile()
self.blocksize = metadata['blocksize']
self.samplerate = metadata['samplerate']
self.dtype = metadata['dtype']
raise
# The 'Audio' dataset as specified in lasp_measurement, where data is
# send to. We use gzip as compression, this gives moderate a moderate
# compression to the data.
self._ad = f.create_dataset('audio',
(1, self.blocksize, nchannels),
dtype=self.dtype,
maxshape=(
None, # This means, we can add blocks
# indefinitely
self.blocksize,
nchannels),
compression='gzip'
)
# Try to obtain stream metadata
streammgr.getStreamStatus(AvType.audio_input)
streammgr.getStreamStatus(AvType.audio_duplex)
# TODO: This piece of code is not up-to-date and should be changed at a
# later instance once we really want to record video simultaneously
# with audio.
if stream.hasVideo():
video_x, video_y = stream.video_x, stream.video_y
self._vd = f.create_dataset('video',
(1, video_y, video_x, 3),
dtype='uint8',
maxshape=(
None, video_y, video_x, 3),
compression='gzip'
)
# Set the bunch of attributes
f.attrs['samplerate'] = self.samplerate
f.attrs['nchannels'] = nchannels
f.attrs['blocksize'] = self.blocksize
f.attrs['sensitivity'] = stream.input_sensitivity
f.attrs['channel_names'] = stream.input_channel_names
f.attrs['time'] = time.time()
# Measured quantities
f.attrs['qtys'] = [qty.to_json() for qty in stream.input_qtys]
self._ad = None
logging.debug('Starting record....')
# TODO: Fix this later when we want video
@ -139,6 +96,92 @@ class Recording:
finally:
self.finish()
def handleQueue(self):
"""
This method should be called to grab data from the input queue, which
is filled by the stream, and put it into a file. It should be called at
a regular interval to prevent overflowing of the queue. It is called
within the start() method of the recording, if block is set to True.
Otherwise, it should be called from its parent at regular intervals.
For example, in Qt this can be done using a QTimer.
"""
while self.inq.qsize() > 0:
msg, data = self.inq.get()
if msg == StreamMsg.streamData:
samples, = data
self.__addTimeData(samples)
elif msg == StreamMsg.streamStarted:
logging.debug(f'handleQueue obtained message {msg}')
avtype, metadata = data
if metadata is None:
raise RuntimeError('BUG: no stream metadata')
self.processStreamMetaData(metadata)
elif msg == StreamMsg.streamMetaData:
logging.debug(f'handleQueue obtained message {msg}')
avtype, metadata = data
if metadata is not None:
self.processStreamMetaData(metadata)
else:
logging.debug(f'handleQueue obtained message {msg}')
# An error occured, we do not remove the file, but we stop.
self.stop = True
logging.debug(f'Stream message: {msg}. Recording stopped unexpectedly')
raise RuntimeError('Recording stopped unexpectedly')
def processStreamMetaData(self, md: StreamMetaData):
"""
Stream metadata has been catched. This is used to set all metadata in
the measurement file
"""
logging.debug('Recording::processStreamMetaData()')
# The 'Audio' dataset as specified in lasp_measurement, where data is
# send to. We use gzip as compression, this gives moderate a moderate
# compression to the data.
f = self._f
blocksize = md.blocksize
nchannels = len(md.in_ch)
self._ad = f.create_dataset('audio',
(1, blocksize, nchannels),
dtype=md.dtype,
maxshape=(
None, # This means, we can add blocks
# indefinitely
blocksize,
nchannels),
compression='gzip'
)
# TODO: This piece of code is not up-to-date and should be changed at a
# later instance once we really want to record video simultaneously
# with audio.
# if smgr.hasVideo():
# video_x, video_y = smgr.video_x, smgr.video_y
# self._vd = f.create_dataset('video',
# (1, video_y, video_x, 3),
# dtype='uint8',
# maxshape=(
# None, video_y, video_x, 3),
# compression='gzip'
# )
# Set the bunch of attributes
f.attrs['samplerate'] = md.fs
f.attrs['nchannels'] = nchannels
f.attrs['blocksize'] = blocksize
f.attrs['sensitivity'] = [ch.sensitivity for ch in md.in_ch]
f.attrs['channel_names'] = [ch.channel_name for ch in md.in_ch]
f.attrs['time'] = time.time()
self.blocksize = blocksize
self.fs = md.fs
# Measured physical quantity metadata
f.attrs['qtys'] = [ch.qty.to_json() for ch in md.in_ch]
self.metadata = md
def setDelete(self, val: bool):
"""
Set the delete flag. If set, measurement file is deleted at the end of
@ -153,16 +196,18 @@ class Recording:
remove the queue from the stream, etc.
"""
stream = self._stream
logging.debug('Recording::finish()')
smgr = self.smgr
# TODO: Fix when video
# if stream.hasVideo():
# stream.removeCallback(self._vCallback, AvType.video_input)
# if smgr.hasVideo():
# smgr.removeCallback(self._vCallback, AvType.video_input)
# self._f['video_frame_positions'] = self._video_frame_positions
try:
stream.removeListener(self.inq)
smgr.removeListener(self.inq)
except Exception as e:
logging.error(f'Could not remove queue from stream: {e}')
logging.error(f'Could not remove queue from smgr: {e}')
try:
# Close the recording file
@ -181,39 +226,29 @@ class Recording:
try:
os.remove(self._fn)
except Exception as e:
logging.debug(f'Error deleting file: {self._fn}')
def handleQueue(self):
"""
This method should be called to grab data from the input queue, which
is filled by the stream, and put it into a file. It should be called at
a regular interval to prevent overflowing of the queue. It is called
within the start() method of the recording, if block is set to True.
Otherwise, it should be called from its parent at regular intervals.
For example, in Qt this can be done using a QTimer.
"""
while self.inq.qsize() > 0:
msg, data = self.inq.get()
if msg == StreamMsg.streamData:
self.__addTimeData(data)
elif msg == StreamMsg.streamStarted:
pass
elif msg == StreamMsg.streamMetaData:
pass
else:
# An error occured, we do not remove the file, but we stop.
self.stop = True
logging.error(f'Error deleting file: {self._fn}')
def __addTimeData(self, indata):
"""
Called by handleQueue() and adds new time data to the storage file.
"""
# logging.debug('Recording::__addTimeData()')
if self.stop:
# Stop flag is raised. We stop recording here.
return
# The current time that is recorded and stored into the file, without
# the new data
curT = self._ablockno*self.blocksize/self.samplerate
if not self.metadata:
# We obtained stream data, but metadata is not yet available.
# Therefore, we request it explicitly and then we return
logging.info('Requesting stream metadata')
self.smgr.getStreamStatus(AvType.audio_input)
self.smgr.getStreamStatus(AvType.audio_duplex)
return
curT = self._ablockno*self.blocksize/self.fs
recstatus = RecordStatus(
curT=curT,
done=False)

View File

@ -1,61 +1,67 @@
#!/usr/bin/python3
#!/usr/bin/python3.8
import sys, logging, os, argparse
logging.basicConfig(level=logging.DEBUG)
FORMAT = "[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s"
parser = argparse.ArgumentParser(
description='Acquire data and store to a measurement file.'
)
parser.add_argument('filename', type=str,
help='File name to record to.'
' Extension is automatically added.')
parser.add_argument('--duration', '-d', type=float,
help='The recording duration in [s]')
device_help = 'DAQ Device to record from'
parser.add_argument('--input-daq', '-i', help=device_help, type=str,
default='Default')
parser.add_argument('--log', '-l',
help='Specify log level [info, debug, warning, ...]',
type=str, default='info')
args = parser.parse_args()
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % args.loglevel)
logging.basicConfig(format=FORMAT, level=numeric_level)
import multiprocessing
from lasp.lasp_multiprocessingpatch import apply_patch
from lasp.device import DaqConfigurations
from lasp.lasp_avstream import AvStream, AvType
from lasp.lasp_record import Recording
from lasp import AvType, StreamManager, Recording# configureLogging
if __name__ == '__main__':
multiprocessing.set_start_method('forkserver', force=True)
apply_patch()
parser = argparse.ArgumentParser(
description='Acquire data and store a measurement file'
)
parser.add_argument('filename', type=str,
help='File name to record to.'
' Extension is automatically added.')
parser.add_argument('--duration', '-d', type=float,
help='The recording duration in [s]')
device_help = 'DAQ Device to record from'
parser.add_argument('--input-daq', '-i', help=device_help, type=str,
default='Default')
args = parser.parse_args()
configs = DaqConfigurations.loadConfigs()
config_keys = [key for key in configs.keys()]
for i, key in enumerate(config_keys):
print(f'{i:2} : {key}')
choosen_index = input('Number of configuration to use: ')
def main(args):
try:
daqindex = int(choosen_index)
except:
sys.exit(0)
streammgr = StreamManager()
configs = DaqConfigurations.loadConfigs()
choosen_key = config_keys[daqindex]
config = configs[choosen_key].input_config
config_keys = [key for key in configs.keys()]
for i, key in enumerate(config_keys):
print(f'{i:2} : {key}')
print(f'Choosen configuration: {choosen_key}')
choosen_index = input('Number of configuration to use: ')
try:
daqindex = int(choosen_index)
except:
print('Invalid configuration number. Exiting.')
sys.exit(0)
try:
stream = AvStream(
AvType.audio_input,
config)
# stream.start()
rec = Recording(args.filename, stream, args.duration)
# input('Stream started, press any key to start record')
choosen_key = config_keys[daqindex]
config = configs[choosen_key].input_config
print(f'Choosen configuration: {choosen_key}')
streammgr.startStream(AvType.audio_input, config, wait=True)
rec = Recording(args.filename, streammgr, args.duration)
streammgr.stopStream(AvType.audio_output)
finally:
try:
stream.cleanup()
streammgr.cleanup()
del stream
except NameError:
pass
if __name__ == '__main__':
multiprocessing.set_start_method('forkserver', force=True)
main(args)

View File

@ -1,27 +1,35 @@
#!/usr/bin/python3
import argparse
import numpy as np
import sys, logging, os, argparse
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser(
description='Play a sine wave'
)
parser.add_argument('--freq', '-f', help='Sine frequency [Hz]', type=float,
default=1000.)
parser.add_argument('--log', '-l',
help='Specify log level [info, debug, warning, ...]',
type=str, default='info')
args = parser.parse_args()
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % args.loglevel)
FORMAT = "[%(levelname)s %(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s"
logging.basicConfig(format=FORMAT, level=numeric_level)
import multiprocessing
from lasp.lasp_multiprocessingpatch import apply_patch
from lasp.lasp_avstream import StreamManager
from lasp.lasp_common import AvType
from lasp.lasp_siggen import Siggen, SignalType, SiggenData
from lasp import (StreamManager, AvType, Siggen, SignalType, SiggenData)
from lasp.device import DaqConfigurations
if __name__ == '__main__':
multiprocessing.set_start_method('forkserver', force=True)
parser = argparse.ArgumentParser(
description='Play a sine wave'
)
device_help = 'DAQ Device to play to'
parser.add_argument('--device', '-d', help=device_help, type=str,
default='Default')
args = parser.parse_args()
logging.info(f'Playing frequency {args.freq} [Hz]')
configs = DaqConfigurations.loadConfigs()
@ -33,6 +41,7 @@ if __name__ == '__main__':
try:
daqindex = int(choosen_index)
except:
print('Invalid configuration number. Exiting.')
sys.exit(0)
@ -47,12 +56,12 @@ if __name__ == '__main__':
siggendata = SiggenData(
fs=48e3,
nframes_per_block=2048,
nframes_per_block=1024,
dtype=np.dtype(np.int16),
eqdata=None,
level_dB=-20,
signaltype=SignalType.Periodic,
signaltypedata=(1000.,)
signaltypedata=(args.freq,)
)
siggen = Siggen(outq, siggendata)