diff --git a/python_src/lasp/lasp_atomic.py b/python_src/lasp/lasp_atomic.py index 6897683..c0096a9 100644 --- a/python_src/lasp/lasp_atomic.py +++ b/python_src/lasp/lasp_atomic.py @@ -33,7 +33,7 @@ class Atomic: def checkType(self, val): if not (type(val) == bool or type(val) == int): - raise RuntimeError("Invalid type for Atomic") + raise ValueError("Invalid type for Atomic") def __iadd__(self, toadd): self.checkType(toadd) diff --git a/python_src/lasp/lasp_record.py b/python_src/lasp/lasp_record.py index 07172a4..482c324 100644 --- a/python_src/lasp/lasp_record.py +++ b/python_src/lasp/lasp_record.py @@ -7,6 +7,7 @@ import dataclasses, logging, os, time, h5py, threading import numpy as np from .lasp_atomic import Atomic +from enum import Enum, auto, unique from .lasp_cpp import InDataHandler, StreamMgr from .lasp_version import LASP_VERSION_MAJOR, LASP_VERSION_MINOR import uuid @@ -18,6 +19,16 @@ class RecordStatus: done: bool = False +class RecordingState(Enum): + """Enumeration for the recording state""" + + Waiting = auto() + Recording = auto() + AllDataStored = auto() + Finished = auto() + Error = auto() + + class Recording: """ Class used to perform a recording. Recording data can come in from a @@ -53,83 +64,88 @@ class Recording: if ext not in fn: fn += ext - self.smgr = streammgr - self.metadata = None + self._smgr = streammgr + self._metadata = None + + self._recState = RecordingState.Waiting if startDelay < 0: raise RuntimeError("Invalid start delay value. Should be >= 0") - self.startDelay = startDelay - - # Flag used to indicate that we have passed the start delay - self.startDelay_passed = False + self._startDelay = startDelay # The amount of seconds (float) that is to be recorded - self.rectime = rectime + self._requiredRecordingLength = rectime # The file name to store data to - self.fn = fn + self._fn = fn - self.curT_rounded_to_seconds = 0 + # Counter of the number of blocks that have been recorded + self._recordedBlocks = 0 - # Counter of the number of blocks - self.ablockno = Atomic(0) + # Counter of the overall number of blocks that have passed (including + # the blocks that passed during waiting prior to recording) + self._allBlocks = 0 # Stop flag, set when recording is finished. - self.stop = Atomic(False) + self._stop = Atomic(False) - # Mutex, on who is working with the H5py data - self.file_mtx = threading.Lock() + # Mutex, on who is working with the H5py data and the class settings + self._rec_mutex = threading.Lock() - self.progressCallback = progressCallback + self._progressCallback = progressCallback try: # Open the file - self.f = h5py.File(self.fn, "w", "stdio") - self.f.flush() + self._h5file = h5py.File(self._fn, "w", "stdio") + self._h5file.flush() except Exception as e: logging.error(f"Error creating measurement file {e}") raise # This flag is used to delete the file on finish(), and can be used - # when a recording is canceled. - self.deleteFile = False + # when a recording is canceled. It is set to True at start, as the file will be deleted when no data is in it. + self._deleteFile = True # Try to obtain stream metadata streamstatus = streammgr.getStreamStatus(StreamMgr.StreamType.input) if not streamstatus.runningOK(): - raise RuntimeError( - "Stream is not running properly. Please first start the stream" - ) + raise RuntimeError("Stream is not running properly. Cannot start recording") - self.ad = None + # Audio dataset + self._ad = None logging.debug("Starting record....") - self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback) + self._indataHandler = InDataHandler( + streammgr, self.inCallback, self.resetCallback + ) if wait: logging.debug("Stop recording with CTRL-C") try: - while not self.stop(): + while not self._stop(): time.sleep(0.01) except KeyboardInterrupt: logging.debug("Keyboard interrupt on record") finally: self.finish() + def curT(self): + """Return currently recorded time as float""" + def resetCallback(self, daq): """ Function called with initial stream data. """ - with self.file_mtx: + with self._rec_mutex: in_ch = daq.enabledInChannels() blocksize = daq.framesPerBlock() - self.blocksize = blocksize - self.nchannels = daq.neninchannels() - self.fs = daq.samplerate() + self._blocksize = blocksize + self._nchannels = daq.neninchannels() + self._fs = daq.samplerate() - f = self.f + f = self._h5file f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR @@ -145,7 +161,7 @@ class Recording: # Add the start delay here, as firstFrames() is called right after the # constructor is called. time.time() returns a floating point # number of seconds after epoch. - f.attrs["time"] = time.time() + self.startDelay + f.attrs["time"] = time.time() + self._startDelay # In V2, we do not store JSON metadata anymore, but just an enumeration # index to a physical quantity. @@ -156,10 +172,69 @@ class Recording: # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch] f.flush() - def firstFrames(self, adata): + def inCallback(self, adata): + """ + This method is called when a block of audio data from the stream is + available. It should return either True or False. + + When returning False, it will stop the stream. + + """ + if self._stop(): + logging.debug("Stop flag set, early return in inCallback") + # Stop flag is raised. We do not add any data anymore. + return True + + with self._rec_mutex: + + self._allBlocks += 1 + + match self._recState: + case RecordingState.Waiting: + if ( + self._allBlocks * self._blocksize / self._fs + > self._startDelay + ): + self._recState = RecordingState.Recording + + case RecordingState.Recording: + if self._ad is None: + self.__addFirstFramesToFile(adata) + else: + self.__addTimeDataToFile(adata) + + # Increase the block counter + self._recordedBlocks += 1 + + recstatus = RecordStatus(curT=self.recordedTime, done=False) + + if self.recordedTime >= self._requiredRecordingLength: + self._recState = RecordingState.AllDataStored + self._stop <<= True + recstatus.done = True + + if self._progressCallback is not None: + self._progressCallback(recstatus) + + case RecordingState.AllDataStored: + pass + + case RecordingState.Finished: + pass + + return True + + @property + def recordedTime(self): + """Return recorded time (not rounded) as float""" + if self._ad is None: + return 0.0 + return self._recordedBlocks * self._blocksize / self._fs + + def __addFirstFramesToFile(self, adata): """ Set up the dataset in which to store the audio data. This will create - the attribute `self.ad` + the attribute `self.ad` and flip around the _deleteFile flag. Args: adata: Numpy array with data from DAQ @@ -170,40 +245,24 @@ class Recording: # datatype = daq.dataType() dtype = np.dtype(adata.dtype) - self.ad = self.f.create_dataset( + assert self._ad is None + + self._ad = self._h5file.create_dataset( "audio", - (1, self.blocksize, self.nchannels), + (1, self._blocksize, self._nchannels), dtype=dtype, maxshape=( None, # This means, we can add blocks # indefinitely - self.blocksize, - self.nchannels, + self._blocksize, + self._nchannels, ), compression="gzip", ) - self.f.flush() + self._ad[0, :, :] = adata - def inCallback(self, adata): - """ - This method is called when a block of audio data from the stream is - available. It should return either True or False. - - When returning False, it will stop the stream. - - """ - if self.stop(): - logging.debug("Stop flag set, early return in inCallback") - # Stop flag is raised. We do not add any data anymore. - return True - - with self.file_mtx: - - if self.ad is None: - self.firstFrames(adata) - - self.__addTimeData(adata) - return True + self._h5file.flush() + self._deleteFile = False def setDelete(self, val: bool): """ @@ -211,8 +270,8 @@ class Recording: the recording. Typically used for cleaning up after canceling a recording. """ - with self.file_mtx: - self.deleteFile = val + with self._rec_mutex: + self._deleteFile = val def finish(self): """ @@ -222,86 +281,55 @@ class Recording: """ logging.debug("Recording::finish()") - self.stop <<= True + self._stop <<= True - with self.file_mtx: - self.f.flush() + + with self._rec_mutex: + if self._recState == RecordingState.Finished: + raise RuntimeError('Recording has already finished') + + self._h5file.flush() # Remove indata handler, which also should remove callback function # from StreamMgr. This, however does not have to happen # instantaneously. For which we have to implement extra mutex # guards in this class - del self.indh - self.indh = None + del self._indataHandler + self._indataHandler = None # Remove handle to dataset otherwise the h5 file is not closed # properly. - del self.ad - self.ad = None + del self._ad + self._ad = None try: # Close the recording file - self.f.close() - del self.f + self._h5file.close() + del self._h5file except Exception as e: logging.error(f"Error closing file: {e}") logging.debug("Recording ended") - if self.deleteFile: + if self._deleteFile: self.__deleteFile() + self._recState = RecordingState.Finished def __deleteFile(self): """ Cleanup the recording file. """ try: - os.remove(self.fn) + os.remove(self._fn) except Exception as e: - logging.error(f"Error deleting file: {self.fn}: {str(e)}") + logging.error(f"Error deleting file: {self._fn}: {str(e)}") - def __addTimeData(self, indata): + def __addTimeDataToFile(self, indata): """ Called by handleQueue() and adds new time data to the storage file. """ - # logging.debug('Recording::__addTimeData()') - curT = self.ablockno() * self.blocksize / self.fs - - # Increase the block counter - self.ablockno += 1 - - if curT < self.startDelay and not self.startDelay_passed: - # Start delay has not been passed - return - elif curT >= 0 and not self.startDelay_passed: - # Start delay passed, switch the flag! - self.startDelay_passed = True - - # Reset the audio block counter and the recording time - self.ablockno = Atomic(1) - curT = 0 - - ablockno = self.ablockno() - recstatus = RecordStatus(curT=curT, done=False) - - if self.progressCallback is not None: - self.progressCallback(recstatus) - - curT_rounded_to_seconds = int(curT) - if curT_rounded_to_seconds > self.curT_rounded_to_seconds: - self.curT_rounded_to_seconds = curT_rounded_to_seconds - print(f"{curT_rounded_to_seconds}", end="", flush=True) - else: - print(".", end="", flush=True) - - if self.rectime is not None and curT > self.rectime: - # We are done! - if self.progressCallback is not None: - recstatus.done = True - self.progressCallback(recstatus) - self.stop <<= True - return + ablockno = self._recordedBlocks # Add the data to the file, and resize the audio data blocks - self.ad.resize(ablockno, axis=0) - self.ad[ablockno - 1, :, :] = indata - self.f.flush() + self._ad.resize(ablockno + 1, axis=0) + self._ad[ablockno, :, :] = indata + self._h5file.flush()