From 57fe4e6b7c894f892b999bd0a1052b7d41ba72f2 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Sat, 8 May 2021 15:06:11 +0200 Subject: [PATCH] Playing a sine wave works, recording seems to work, although we have not checked what is actually recorded. For that we now switch to ACME! --- lasp/lasp_avstream.py | 101 ++++++++++++------ lasp/lasp_common.py | 3 +- lasp/lasp_measurement.py | 4 +- lasp/lasp_record.py | 219 +++++++++++++++++++++++---------------- scripts/lasp_record | 100 +++++++++--------- scripts/play_sine | 43 +++++--- 6 files changed, 278 insertions(+), 192 deletions(-) diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index d43fe96..0bc5d73 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -5,13 +5,14 @@ Author: J.A. de Jong Description: Controlling an audio stream in a different process. """ #import cv2 as cv +from .lasp_multiprocessingpatch import apply_patch +apply_patch() + import multiprocessing as mp import time, logging, signal import numpy as np from enum import unique, Enum, auto from dataclasses import dataclass -from .lasp_multiprocessingpatch import apply_patch -apply_patch() from .lasp_atomic import Atomic from .lasp_common import AvType @@ -127,6 +128,7 @@ class AudioStream: blocksize = self.daq.nFramesPerBlock, dtype = self.daq.getNumpyDataType() ) + self.running <<= True def streamCallback(self, indata, outdata, nframes): @@ -134,7 +136,12 @@ class AudioStream: This is called (from a separate thread) for each block of audio data. """ - return self.processCallback(self, indata, outdata) + if not self.running(): + return 1 + rv = self.processCallback(self, indata, outdata) + if rv != 0: + self.running <<= False + return rv def stop(self): """ @@ -187,7 +194,7 @@ class AvStreamProcess(mp.Process): while True: msg, data = self.pipe.recv() - logging.debug(f'Obtained message {msg}') + logging.debug(f'Streamprocess obtained message {msg}') if msg == StreamMsg.activateSiggen: self.siggen_activated <<= True @@ -207,12 +214,14 @@ class AvStreamProcess(mp.Process): return elif msg == StreamMsg.getStreamMetaData: - avtype = data + avtype, = data stream = self.streams[avtype] if stream is not None: - self.sendPipe(StreamMsg.streamMetaData, avtype, stream.streammetadata) + self.sendPipeAndAllQueues(StreamMsg.streamMetaData, avtype, + stream.streammetadata) else: - self.sendPipe(StreamMsg.streamMetaData, avtype, None) + self.sendPipeAndAllQueues(StreamMsg.streamMetaData, avtype, + None) elif msg == StreamMsg.startStream: avtype, daqconfig = data @@ -254,7 +263,8 @@ class AvStreamProcess(mp.Process): stream.stop() self.sendPipeAndAllQueues(StreamMsg.streamStopped, stream.avtype) except Exception as e: - self.sendPipeAndAllQueues(StreamMsg.streamError, stream.avtype, "Error occured in stopping stream: {str(e)}") + self.sendPipeAndAllQueues(StreamMsg.streamError, stream.avtype, + "Error occured in stopping stream: {str(e)}") self.streams[avtype] = None @@ -301,7 +311,7 @@ class AvStreamProcess(mp.Process): else: avtype = (avtype,) for t in avtype: - if self.streams[t] is not None: + if self.streams[t] is not None and self.streams[t].running(): return True return False @@ -321,27 +331,34 @@ class AvStreamProcess(mp.Process): def streamCallback(self, audiostream, indata, outdata): """This is called (from a separate thread) for each audio block.""" # logging.debug('streamCallback()') - - if self.siggen_activated: - # logging.debug('siggen_activated') - if self.outq.empty(): - outdata[:, :] = 0 - msgtxt = 'Output signal buffer underflow' - self.sendPipeAndAllQueues(StreamMsg.streamError, - audiostream.avtype, - msgtxt) - else: - newdata = self.outq.get() - if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1: - self.sendPipeAndAllQueues(StreamMsg.streamFatalError, + if outdata is not None: + if self.siggen_activated(): + if not self.outq.empty(): + newdata = self.outq.get() + if newdata.shape[0] != outdata.shape[0] or newdata.ndim != 1: + msgtxt = 'Invalid output data obtained from queue' + logging.fatal(msgtxt) + self.sendPipeAndAllQueues(StreamMsg.streamFatalError, + audiostream.avtype, + msgtxt + ) + return 1 + outdata[:, :] = newdata[:, None] + else: + outdata[:, :] = 0 + msgtxt = 'Output signal buffer underflow' + logging.error(msgtxt) + self.sendPipeAndAllQueues(StreamMsg.streamError, audiostream.avtype, - 'Invalid output data obtained from queue') - return 1 - outdata[:, :] = newdata[:, None] + msgtxt) + + # Siggen not activated + else: + logging.debug('siggen not activated') + outdata[:, :] = 0 if indata is not None: - self.putAllInQueues(StreamMsg.streamData, audiostream.avtype, - indata) + self.putAllInQueues(StreamMsg.streamData, indata) return 0 @@ -419,10 +436,13 @@ class StreamManager: Handle messages that are still on the pipe. """ + # logging.debug('StreamManager::handleMessages()') while self.pipe.poll(): msg, data = self.pipe.recv() + logging.debug(f'StreamManager obtained message {msg}') if msg == StreamMsg.streamStarted: avtype, streammetadata = data + # logging.debug(f'{avtype}, {streammetadata}') self.streamstatus[avtype].lastStatus = msg self.streamstatus[avtype].errorTxt = None self.streamstatus[avtype].streammetadata = streammetadata @@ -439,6 +459,11 @@ class StreamManager: self.streamstatus[avtype].lastStatus = msg self.streamstatus[avtype].errorTxt = None + elif msg == StreamMsg.streamFatalError: + avtype, errorTxt = data + logging.critical(f'Streamprocess fatal error: {errorTxt}') + self.cleanup() + elif msg == StreamMsg.streamMetaData: avtype, metadata = data self.streamstatus[avtype].streammetadata = metadata @@ -454,10 +479,10 @@ class StreamManager: def getStreamStatus(self, avtype: AvType): """ - Returns the current stream Status. + Sends a request for the stream status over the pipe, for given AvType """ self.handleMessages() - return self.streamstatus[avtype] + self.sendPipe(StreamMsg.getStreamMetaData, avtype) def getOutputQueue(self): """ @@ -465,6 +490,7 @@ class StreamManager: Note, should (of course) only be used by one signal generator at the time! """ + self.handleMessages() return self.outq def activateSiggen(self): @@ -504,15 +530,28 @@ class StreamManager: """Returns the current number of installed listeners.""" return len(self.in_qlist) - def startStream(self, avtype: AvType, daqconfig: DaqConfiguration): + def startStream(self, avtype: AvType, daqconfig: DaqConfiguration, + wait=False): """ Start the stream, which means the callbacks are called with stream data (audio/video) + Args: + wait: Wait until the stream starts talking before returning from + this function. + """ logging.debug('Starting stream...') - self.sendPipe(StreamMsg.startStream, avtype, daqconfig) self.handleMessages() + self.sendPipe(StreamMsg.startStream, avtype, daqconfig) + if wait: + # Wait for a message to come into the pipe + while True: + if self.pipe.poll(): + self.handleMessages() + if self.streamstatus[avtype].lastStatus != StreamMsg.streamStopped: + break + def stopStream(self, avtype: AvType): self.handleMessages() diff --git a/lasp/lasp_common.py b/lasp/lasp_common.py index 4ad5ca8..4485d3a 100644 --- a/lasp/lasp_common.py +++ b/lasp/lasp_common.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import os -import platform +import os, platform import shelve import sys import appdirs diff --git a/lasp/lasp_measurement.py b/lasp/lasp_measurement.py index 1e86a24..5f96d0d 100644 --- a/lasp/lasp_measurement.py +++ b/lasp/lasp_measurement.py @@ -48,8 +48,6 @@ import os, time, wave, logging from .lasp_common import SIQtys, Qty, getFreq from .device import DaqChannel from .wrappers import AvPowerSpectra, Window, PowerSpectra -logger = logging.Logger(__name__) - def getSampWidth(dtype): @@ -242,7 +240,7 @@ class Measurement: except KeyError: # If quantity data is not available, this is an 'old' # measurement file. - logger.debug('Physical quantity data not available in measurement file. Assuming {SIQtys.default}') + logging.debug('Physical quantity data not available in measurement file. Assuming {SIQtys.default}') self._qtys = [SIQtys.default for i in range(self.nchannels)] def setAttribute(self, atrname, value): diff --git a/lasp/lasp_record.py b/lasp/lasp_record.py index 7062055..3974f95 100644 --- a/lasp/lasp_record.py +++ b/lasp/lasp_record.py @@ -1,14 +1,10 @@ -#!/usr/bin/python3 +#!/usr/bin/python3.8 # -*- coding: utf-8 -*- """ Read data from stream and record sound and video at the same time """ -import dataclasses -import logging -import os -import time -import h5py -from .lasp_avstream import StreamMsg, StreamManager +import dataclasses, logging, os, time, h5py +from .lasp_avstream import StreamManager, StreamMetaData, StreamMsg from .lasp_common import AvType @@ -40,7 +36,8 @@ class Recording: if ext not in fn: fn += ext - self._stream = stream + self.smgr = streammgr + self.metadata = None self.rectime = rectime self._fn = fn @@ -61,66 +58,26 @@ class Recording: # when a recording is canceled. self._deleteFile = False - f = self._f - nchannels = len(stream.input_channel_names) - - # Input queue - self.inq = stream.addListener() - - # Start the stream, if it is not running try: - if not stream.isRunning(): - metadata = stream.start() - else: - metadata = stream.getStreamMetaData() - except: + # Input queue + self.inq = streammgr.addListener() + + except RuntimeError: # Cleanup stuff, something is going wrong when starting the stream try: - f.close() - except: - pass + self._f.close() + except Exception as e: + logging.error( + 'Error preliminary closing measurement file {fn}: {str(e)}') + self.__deleteFile() - self.blocksize = metadata['blocksize'] - self.samplerate = metadata['samplerate'] - self.dtype = metadata['dtype'] + raise - # The 'Audio' dataset as specified in lasp_measurement, where data is - # send to. We use gzip as compression, this gives moderate a moderate - # compression to the data. - self._ad = f.create_dataset('audio', - (1, self.blocksize, nchannels), - dtype=self.dtype, - maxshape=( - None, # This means, we can add blocks - # indefinitely - self.blocksize, - nchannels), - compression='gzip' - ) + # Try to obtain stream metadata + streammgr.getStreamStatus(AvType.audio_input) + streammgr.getStreamStatus(AvType.audio_duplex) - # TODO: This piece of code is not up-to-date and should be changed at a - # later instance once we really want to record video simultaneously - # with audio. - if stream.hasVideo(): - video_x, video_y = stream.video_x, stream.video_y - self._vd = f.create_dataset('video', - (1, video_y, video_x, 3), - dtype='uint8', - maxshape=( - None, video_y, video_x, 3), - compression='gzip' - ) - - # Set the bunch of attributes - f.attrs['samplerate'] = self.samplerate - f.attrs['nchannels'] = nchannels - f.attrs['blocksize'] = self.blocksize - f.attrs['sensitivity'] = stream.input_sensitivity - f.attrs['channel_names'] = stream.input_channel_names - f.attrs['time'] = time.time() - - # Measured quantities - f.attrs['qtys'] = [qty.to_json() for qty in stream.input_qtys] + self._ad = None logging.debug('Starting record....') # TODO: Fix this later when we want video @@ -139,6 +96,92 @@ class Recording: finally: self.finish() + def handleQueue(self): + """ + This method should be called to grab data from the input queue, which + is filled by the stream, and put it into a file. It should be called at + a regular interval to prevent overflowing of the queue. It is called + within the start() method of the recording, if block is set to True. + Otherwise, it should be called from its parent at regular intervals. + For example, in Qt this can be done using a QTimer. + + + """ + while self.inq.qsize() > 0: + msg, data = self.inq.get() + if msg == StreamMsg.streamData: + samples, = data + self.__addTimeData(samples) + elif msg == StreamMsg.streamStarted: + logging.debug(f'handleQueue obtained message {msg}') + avtype, metadata = data + if metadata is None: + raise RuntimeError('BUG: no stream metadata') + self.processStreamMetaData(metadata) + elif msg == StreamMsg.streamMetaData: + logging.debug(f'handleQueue obtained message {msg}') + avtype, metadata = data + if metadata is not None: + self.processStreamMetaData(metadata) + else: + logging.debug(f'handleQueue obtained message {msg}') + # An error occured, we do not remove the file, but we stop. + self.stop = True + logging.debug(f'Stream message: {msg}. Recording stopped unexpectedly') + raise RuntimeError('Recording stopped unexpectedly') + + + def processStreamMetaData(self, md: StreamMetaData): + """ + Stream metadata has been catched. This is used to set all metadata in + the measurement file + + """ + logging.debug('Recording::processStreamMetaData()') + # The 'Audio' dataset as specified in lasp_measurement, where data is + # send to. We use gzip as compression, this gives moderate a moderate + # compression to the data. + f = self._f + blocksize = md.blocksize + nchannels = len(md.in_ch) + self._ad = f.create_dataset('audio', + (1, blocksize, nchannels), + dtype=md.dtype, + maxshape=( + None, # This means, we can add blocks + # indefinitely + blocksize, + nchannels), + compression='gzip' + ) + + # TODO: This piece of code is not up-to-date and should be changed at a + # later instance once we really want to record video simultaneously + # with audio. + # if smgr.hasVideo(): + # video_x, video_y = smgr.video_x, smgr.video_y + # self._vd = f.create_dataset('video', + # (1, video_y, video_x, 3), + # dtype='uint8', + # maxshape=( + # None, video_y, video_x, 3), + # compression='gzip' + # ) + + # Set the bunch of attributes + f.attrs['samplerate'] = md.fs + f.attrs['nchannels'] = nchannels + f.attrs['blocksize'] = blocksize + f.attrs['sensitivity'] = [ch.sensitivity for ch in md.in_ch] + f.attrs['channel_names'] = [ch.channel_name for ch in md.in_ch] + f.attrs['time'] = time.time() + self.blocksize = blocksize + self.fs = md.fs + + # Measured physical quantity metadata + f.attrs['qtys'] = [ch.qty.to_json() for ch in md.in_ch] + self.metadata = md + def setDelete(self, val: bool): """ Set the delete flag. If set, measurement file is deleted at the end of @@ -153,16 +196,18 @@ class Recording: remove the queue from the stream, etc. """ - stream = self._stream + logging.debug('Recording::finish()') + smgr = self.smgr + # TODO: Fix when video - # if stream.hasVideo(): - # stream.removeCallback(self._vCallback, AvType.video_input) + # if smgr.hasVideo(): + # smgr.removeCallback(self._vCallback, AvType.video_input) # self._f['video_frame_positions'] = self._video_frame_positions try: - stream.removeListener(self.inq) + smgr.removeListener(self.inq) except Exception as e: - logging.error(f'Could not remove queue from stream: {e}') + logging.error(f'Could not remove queue from smgr: {e}') try: # Close the recording file @@ -181,39 +226,29 @@ class Recording: try: os.remove(self._fn) except Exception as e: - logging.debug(f'Error deleting file: {self._fn}') - - def handleQueue(self): - """ - This method should be called to grab data from the input queue, which - is filled by the stream, and put it into a file. It should be called at - a regular interval to prevent overflowing of the queue. It is called - within the start() method of the recording, if block is set to True. - Otherwise, it should be called from its parent at regular intervals. - For example, in Qt this can be done using a QTimer. - - - """ - while self.inq.qsize() > 0: - msg, data = self.inq.get() - if msg == StreamMsg.streamData: - self.__addTimeData(data) - elif msg == StreamMsg.streamStarted: - pass - elif msg == StreamMsg.streamMetaData: - pass - else: - # An error occured, we do not remove the file, but we stop. - self.stop = True + logging.error(f'Error deleting file: {self._fn}') def __addTimeData(self, indata): """ Called by handleQueue() and adds new time data to the storage file. """ + # logging.debug('Recording::__addTimeData()') + + if self.stop: + # Stop flag is raised. We stop recording here. + return # The current time that is recorded and stored into the file, without # the new data - curT = self._ablockno*self.blocksize/self.samplerate + if not self.metadata: + # We obtained stream data, but metadata is not yet available. + # Therefore, we request it explicitly and then we return + logging.info('Requesting stream metadata') + self.smgr.getStreamStatus(AvType.audio_input) + self.smgr.getStreamStatus(AvType.audio_duplex) + return + + curT = self._ablockno*self.blocksize/self.fs recstatus = RecordStatus( curT=curT, done=False) diff --git a/scripts/lasp_record b/scripts/lasp_record index c2fa06e..220603e 100755 --- a/scripts/lasp_record +++ b/scripts/lasp_record @@ -1,61 +1,67 @@ -#!/usr/bin/python3 +#!/usr/bin/python3.8 import sys, logging, os, argparse -logging.basicConfig(level=logging.DEBUG) +FORMAT = "[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s" +parser = argparse.ArgumentParser( + description='Acquire data and store to a measurement file.' +) +parser.add_argument('filename', type=str, + help='File name to record to.' + ' Extension is automatically added.') +parser.add_argument('--duration', '-d', type=float, + help='The recording duration in [s]') + +device_help = 'DAQ Device to record from' +parser.add_argument('--input-daq', '-i', help=device_help, type=str, + default='Default') +parser.add_argument('--log', '-l', + help='Specify log level [info, debug, warning, ...]', + type=str, default='info') + + +args = parser.parse_args() +numeric_level = getattr(logging, args.log.upper(), None) +if not isinstance(numeric_level, int): + raise ValueError('Invalid log level: %s' % args.loglevel) +logging.basicConfig(format=FORMAT, level=numeric_level) + import multiprocessing -from lasp.lasp_multiprocessingpatch import apply_patch - from lasp.device import DaqConfigurations -from lasp.lasp_avstream import AvStream, AvType -from lasp.lasp_record import Recording +from lasp import AvType, StreamManager, Recording# configureLogging -if __name__ == '__main__': - multiprocessing.set_start_method('forkserver', force=True) - apply_patch() - - parser = argparse.ArgumentParser( - description='Acquire data and store a measurement file' - ) - parser.add_argument('filename', type=str, - help='File name to record to.' - ' Extension is automatically added.') - parser.add_argument('--duration', '-d', type=float, - help='The recording duration in [s]') - - device_help = 'DAQ Device to record from' - parser.add_argument('--input-daq', '-i', help=device_help, type=str, - default='Default') - - args = parser.parse_args() - - - configs = DaqConfigurations.loadConfigs() - - config_keys = [key for key in configs.keys()] - for i, key in enumerate(config_keys): - print(f'{i:2} : {key}') - - choosen_index = input('Number of configuration to use: ') +def main(args): try: - daqindex = int(choosen_index) - except: - sys.exit(0) + streammgr = StreamManager() + configs = DaqConfigurations.loadConfigs() - choosen_key = config_keys[daqindex] - config = configs[choosen_key].input_config + config_keys = [key for key in configs.keys()] + for i, key in enumerate(config_keys): + print(f'{i:2} : {key}') - print(f'Choosen configuration: {choosen_key}') + choosen_index = input('Number of configuration to use: ') + try: + daqindex = int(choosen_index) + except: + print('Invalid configuration number. Exiting.') + sys.exit(0) - try: - stream = AvStream( - AvType.audio_input, - config) - # stream.start() - rec = Recording(args.filename, stream, args.duration) - # input('Stream started, press any key to start record') + choosen_key = config_keys[daqindex] + config = configs[choosen_key].input_config + + print(f'Choosen configuration: {choosen_key}') + + streammgr.startStream(AvType.audio_input, config, wait=True) + rec = Recording(args.filename, streammgr, args.duration) + + streammgr.stopStream(AvType.audio_output) finally: try: - stream.cleanup() + streammgr.cleanup() del stream except NameError: pass +if __name__ == '__main__': + + multiprocessing.set_start_method('forkserver', force=True) + + main(args) diff --git a/scripts/play_sine b/scripts/play_sine index 41f3d99..227ef37 100755 --- a/scripts/play_sine +++ b/scripts/play_sine @@ -1,27 +1,35 @@ #!/usr/bin/python3 -import argparse import numpy as np import sys, logging, os, argparse -logging.basicConfig(level=logging.DEBUG) + +parser = argparse.ArgumentParser( + description='Play a sine wave' +) + +parser.add_argument('--freq', '-f', help='Sine frequency [Hz]', type=float, + default=1000.) + +parser.add_argument('--log', '-l', + help='Specify log level [info, debug, warning, ...]', + type=str, default='info') + +args = parser.parse_args() + +numeric_level = getattr(logging, args.log.upper(), None) +if not isinstance(numeric_level, int): + raise ValueError('Invalid log level: %s' % args.loglevel) + +FORMAT = "[%(levelname)s %(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s" +logging.basicConfig(format=FORMAT, level=numeric_level) + import multiprocessing -from lasp.lasp_multiprocessingpatch import apply_patch -from lasp.lasp_avstream import StreamManager -from lasp.lasp_common import AvType -from lasp.lasp_siggen import Siggen, SignalType, SiggenData +from lasp import (StreamManager, AvType, Siggen, SignalType, SiggenData) from lasp.device import DaqConfigurations if __name__ == '__main__': multiprocessing.set_start_method('forkserver', force=True) - parser = argparse.ArgumentParser( - description='Play a sine wave' - ) - device_help = 'DAQ Device to play to' - parser.add_argument('--device', '-d', help=device_help, type=str, - default='Default') - - args = parser.parse_args() - + logging.info(f'Playing frequency {args.freq} [Hz]') configs = DaqConfigurations.loadConfigs() @@ -33,6 +41,7 @@ if __name__ == '__main__': try: daqindex = int(choosen_index) except: + print('Invalid configuration number. Exiting.') sys.exit(0) @@ -47,12 +56,12 @@ if __name__ == '__main__': siggendata = SiggenData( fs=48e3, - nframes_per_block=2048, + nframes_per_block=1024, dtype=np.dtype(np.int16), eqdata=None, level_dB=-20, signaltype=SignalType.Periodic, - signaltypedata=(1000.,) + signaltypedata=(args.freq,) ) siggen = Siggen(outq, siggendata)