#!/usr/bin/python3.8 # -*- coding: utf-8 -*- """ Read data from stream and record sound and video at the same time """ import dataclasses, logging, os, time, h5py, threading import numpy as np from .lasp_atomic import Atomic from enum import Enum, auto, unique from .lasp_cpp import InDataHandler, StreamMgr from .lasp_version import LASP_VERSION_MAJOR, LASP_VERSION_MINOR import uuid import logging logger = logging.getLogger(__name__) # logger.setLevel(logging.DEBUG) logger.setLevel(logging.INFO) @dataclasses.dataclass class RecordStatus: curT: float = 0 done: bool = False class RecordingState(Enum): """Enumeration for the recording state""" Waiting = auto() Recording = auto() AllDataStored = auto() Finished = auto() Error = auto() class Recording: """ Class used to perform a recording. Recording data can come in from a different thread, that is supposed to call the `inCallback` method, with audio data as an argument. """ def __init__( self, fn: str, streammgr: StreamMgr, rectime: float = None, wait: bool = True, progressCallback=None, startDelay: float = 0, ): """ Start a recording. Blocks if wait is set to True. Args: fn: Filename to record to. Extension is automatically added if not provided. stream: AvStream instance to record from. Should have input channels! rectime: Recording time [s], None for infinite, in seconds. If set to None, or np.inf, the recording continues indefintely. progressCallback: callable that is called with an instance of RecordStatus instance as argument. startDelay: Optional delay added before the recording is *actually* started in [s]. """ logger.debug("__init__()") ext = ".h5" if ext not in fn: fn += ext if os.path.exists(fn): raise RuntimeError("Recording file name already exists / is in use") self._smgr = streammgr self._metadata = None self._recState = RecordingState.Waiting if startDelay < 0: raise RuntimeError("Invalid start delay value. Should be >= 0") self._startDelay = startDelay # The amount of seconds (float) that is to be recorded self._requiredRecordingLength = rectime # The file name to store data to self._fn = fn # Counter of the number of blocks that have been recorded self._recordedBlocks = 0 # Counter of the overall number of blocks that have passed (including # the blocks that passed during waiting prior to recording) self._allBlocks = 0 # Stop flag, set when recording is finished. self._stop = Atomic(False) # Mutex, on who is working with the H5py data and the class settings self._rec_mutex = threading.RLock() self._progressCallback = progressCallback try: # Open the file self._h5file = h5py.File(self._fn, "w", "stdio") self._h5file.flush() except Exception as e: logger.error(f"Error creating measurement file {e}") raise # This flag is used to delete the file on finish(), and can be used # when a recording is canceled. It is set to True at start, as the file will be deleted when no data is in it. self._deleteFile = True # Try to obtain stream metadata streamstatus = streammgr.getStreamStatus(StreamMgr.StreamType.input) if not streamstatus.runningOK(): raise RuntimeError("Stream is not running properly. Cannot start recording") # Audio dataset self._ad = None logger.debug("Starting record....") # In the PyInDataHandler, a weak reference is stored to the python # methods reset and incallback. One way or another, the weak ref is gone # on the callback thread. If we store an "extra" ref to this method over # here, the weak ref stays alive. We do not know whether this is a bug # or a feature, but in any case storing this extra ref to inCallback # solves the problem. self._incalback_cpy = self.inCallback self._indataHandler = InDataHandler( streammgr, self._incalback_cpy, self.resetCallback ) if wait: logger.debug("Stop recording with CTRL-C") try: while not self._stop(): time.sleep(0.01) except KeyboardInterrupt: logger.debug("Keyboard interrupt on record") finally: self.finish() def curT(self): """Return currently recorded time as float""" def resetCallback(self, daq): """ Function called with initial stream data. """ logger.debug(f"resetCallback({daq})") with self._rec_mutex: in_ch = daq.enabledInChannels() blocksize = daq.framesPerBlock() self._blocksize = blocksize self._nchannels = daq.neninchannels() self._fs = daq.samplerate() f = self._h5file f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR # Set the bunch of attributes f.attrs["samplerate"] = daq.samplerate() f.attrs["nchannels"] = daq.neninchannels() f.attrs["blocksize"] = blocksize f.attrs["sensitivity"] = [ch.sensitivity for ch in in_ch] f.attrs["channelNames"] = [ch.name for ch in in_ch] f.attrs["UUID"] = str(uuid.uuid1()) # Add the start delay here, as firstFrames() is called right after the # constructor is called. time.time() returns a floating point # number of seconds after epoch. f.attrs["time"] = time.time() + self._startDelay # In V2, we do not store JSON metadata anymore, but just an enumeration # index to a physical quantity. f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch] # Measured physical quantity metadata # This was how it was in LASP version < 1.0 # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch] f.flush() def inCallback(self, adata): """ This method is called when a block of audio data from the stream is available. It should return either True or False. When returning False, it will stop the stream. """ logger.debug(f"inCallback()") if self._stop(): logger.debug("Stop flag set, early return in inCallback") # Stop flag is raised. We do not add any data anymore. return False with self._rec_mutex: self._allBlocks += 1 match self._recState: case RecordingState.Waiting: if self._allBlocks * self._blocksize / self._fs > self._startDelay: self._recState = RecordingState.Recording case RecordingState.Recording: if self._ad is None: self.__addFirstFramesToFile(adata) else: self.__addTimeDataToFile(adata) # Increase the block counter self._recordedBlocks += 1 recstatus = RecordStatus(curT=self.recordedTime, done=False) if ( self._requiredRecordingLength is not None and self.recordedTime >= self._requiredRecordingLength ): self._recState = RecordingState.AllDataStored self._stop <<= True recstatus.done = True if self._progressCallback is not None: self._progressCallback(recstatus) case RecordingState.AllDataStored: return False case RecordingState.Finished: return False return True @property def recordedTime(self): """Return recorded time (not rounded) as float""" with self._rec_mutex: if self._ad is None: return 0.0 return self._recordedBlocks * self._blocksize / self._fs def __addFirstFramesToFile(self, adata): """ Set up the dataset in which to store the audio data. This will create the attribute `self.ad` and flip around the _deleteFile flag. Args: adata: Numpy array with data from DAQ """ with self._rec_mutex: # The array data type cannot # datatype = daq.dataType() dtype = np.dtype(adata.dtype) assert self._ad is None self._ad = self._h5file.create_dataset( "audio", (1, self._blocksize, self._nchannels), dtype=dtype, maxshape=( None, # This means, we can add blocks # indefinitely self._blocksize, self._nchannels, ), compression="gzip", ) self._ad[0, :, :] = adata self._h5file.flush() self._deleteFile = False def setDelete(self, val: bool): """ Set the delete flag. If set, measurement file is deleted at the end of the recording. Typically used for cleaning up after canceling a recording. """ with self._rec_mutex: self._deleteFile = val def finish(self): """ This method should be called to finish and a close a recording file, remove the queue from the stream, etc. """ logger.debug("Recording::finish()") self._stop <<= True with self._rec_mutex: if self._recState == RecordingState.Finished: raise RuntimeError("Recording has already finished") # Remove indata handler, which also should remove callback function # from StreamMgr. This, however does not have to happen # instantaneously. For which we have to implement extra mutex # guards in this class del self._indataHandler self._h5file.flush() # Remove handle to dataset otherwise the h5 file is not closed # properly. del self._ad try: # Close the recording file self._h5file.close() del self._h5file except Exception as e: logger.error(f"Error closing file: {e}") logger.debug("Recording ended") if self._deleteFile: self.__deleteFile() self._recState = RecordingState.Finished def __deleteFile(self): """ Cleanup the recording file. """ try: os.remove(self._fn) except Exception as e: logger.error(f"Error deleting file: {self._fn}: {str(e)}") def __addTimeDataToFile(self, indata): """ Called by handleQueue() and adds new time data to the storage file. """ with self._rec_mutex: ablockno = self._recordedBlocks # Add the data to the file, and resize the audio data blocks self._ad.resize(ablockno + 1, axis=0) self._ad[ablockno, :, :] = indata self._h5file.flush()