#!/usr/bin/python3.8 # -*- coding: utf-8 -*- """ Read data from stream and record sound and video at the same time """ import dataclasses, logging, os, time, h5py from .lasp_avstream import StreamManager, StreamMetaData, StreamMsg from .lasp_common import AvType @dataclasses.dataclass class RecordStatus: curT: float done: bool class Recording: def __init__(self, fn: str, streammgr: StreamManager, rectime: float = None, wait: bool = True, progressCallback=None): """ Start a recording. Blocks if wait is set to True. Args: fn: Filename to record to. Extension is automatically added if not provided. stream: AvStream instance to record from. Should have input channels! rectime: Recording time [s], None for infinite, in seconds. If set to None, or np.inf, the recording continues indefintely. progressCallback: callable that is called with an instance of RecordStatus instance as argument. """ ext = '.h5' if ext not in fn: fn += ext self.smgr = streammgr self.metadata = None self.rectime = rectime self._fn = fn self._video_frame_positions = [] self._curT_rounded_to_seconds = 0 # Counter of the number of blocks self._ablockno = 0 self._vframeno = 0 self._progressCallback = progressCallback self._wait = wait self._f = h5py.File(self._fn, 'w') # This flag is used to delete the file on finish(), and can be used # when a recording is canceled. self._deleteFile = False try: # Input queue self.inq = streammgr.addListener() except RuntimeError: # Cleanup stuff, something is going wrong when starting the stream try: self._f.close() except Exception as e: logging.error( 'Error preliminary closing measurement file {fn}: {str(e)}') self.__deleteFile() raise # Try to obtain stream metadata streammgr.getStreamStatus(AvType.audio_input) streammgr.getStreamStatus(AvType.audio_duplex) self._ad = None logging.debug('Starting record....') # TODO: Fix this later when we want video # if stream.hasVideo(): # stream.addCallback(self._aCallback, AvType.audio_input) self.stop = False if self._wait: logging.debug('Stop recording with CTRL-C') try: while not self.stop: self.handleQueue() time.sleep(0.01) except KeyboardInterrupt: logging.debug("Keyboard interrupt on record") finally: self.finish() def handleQueue(self): """ This method should be called to grab data from the input queue, which is filled by the stream, and put it into a file. It should be called at a regular interval to prevent overflowing of the queue. It is called within the start() method of the recording, if block is set to True. Otherwise, it should be called from its parent at regular intervals. For example, in Qt this can be done using a QTimer. """ while self.inq.qsize() > 0: msg, data = self.inq.get() if msg == StreamMsg.streamData: samples, = data self.__addTimeData(samples) elif msg == StreamMsg.streamStarted: logging.debug(f'handleQueue obtained message {msg}') avtype, metadata = data if metadata is None: raise RuntimeError('BUG: no stream metadata') self.processStreamMetaData(metadata) elif msg == StreamMsg.streamMetaData: logging.debug(f'handleQueue obtained message {msg}') avtype, metadata = data if metadata is not None: self.processStreamMetaData(metadata) else: logging.debug(f'handleQueue obtained message {msg}') # An error occured, we do not remove the file, but we stop. self.stop = True logging.debug(f'Stream message: {msg}. Recording stopped unexpectedly') raise RuntimeError('Recording stopped unexpectedly') def processStreamMetaData(self, md: StreamMetaData): """ Stream metadata has been catched. This is used to set all metadata in the measurement file """ logging.debug('Recording::processStreamMetaData()') # The 'Audio' dataset as specified in lasp_measurement, where data is # send to. We use gzip as compression, this gives moderate a moderate # compression to the data. f = self._f blocksize = md.blocksize nchannels = len(md.in_ch) self._ad = f.create_dataset('audio', (1, blocksize, nchannels), dtype=md.dtype, maxshape=( None, # This means, we can add blocks # indefinitely blocksize, nchannels), compression='gzip' ) # TODO: This piece of code is not up-to-date and should be changed at a # later instance once we really want to record video simultaneously # with audio. # if smgr.hasVideo(): # video_x, video_y = smgr.video_x, smgr.video_y # self._vd = f.create_dataset('video', # (1, video_y, video_x, 3), # dtype='uint8', # maxshape=( # None, video_y, video_x, 3), # compression='gzip' # ) # Set the bunch of attributes f.attrs['samplerate'] = md.fs f.attrs['nchannels'] = nchannels f.attrs['blocksize'] = blocksize f.attrs['sensitivity'] = [ch.sensitivity for ch in md.in_ch] f.attrs['channel_names'] = [ch.channel_name for ch in md.in_ch] f.attrs['time'] = time.time() self.blocksize = blocksize self.fs = md.fs # Measured physical quantity metadata f.attrs['qtys'] = [ch.qty.to_json() for ch in md.in_ch] self.metadata = md def setDelete(self, val: bool): """ Set the delete flag. If set, measurement file is deleted at the end of the recording. Typically used for cleaning up after canceling a recording. """ self._deleteFile = val def finish(self): """ This method should be called to finish and a close a recording file, remove the queue from the stream, etc. """ logging.debug('Recording::finish()') smgr = self.smgr # TODO: Fix when video # if smgr.hasVideo(): # smgr.removeCallback(self._vCallback, AvType.video_input) # self._f['video_frame_positions'] = self._video_frame_positions try: smgr.removeListener(self.inq) except Exception as e: logging.error(f'Could not remove queue from smgr: {e}') try: # Close the recording file self._f.close() except Exception as e: logging.error(f'Error closing file: {e}') logging.debug('Recording ended') if self._deleteFile: self.__deleteFile() def __deleteFile(self): """ Cleanup the recording file. """ try: os.remove(self._fn) except Exception as e: logging.error(f'Error deleting file: {self._fn}') def __addTimeData(self, indata): """ Called by handleQueue() and adds new time data to the storage file. """ # logging.debug('Recording::__addTimeData()') if self.stop: # Stop flag is raised. We stop recording here. return # The current time that is recorded and stored into the file, without # the new data if not self.metadata: # We obtained stream data, but metadata is not yet available. # Therefore, we request it explicitly and then we return logging.info('Requesting stream metadata') self.smgr.getStreamStatus(AvType.audio_input) self.smgr.getStreamStatus(AvType.audio_duplex) return curT = self._ablockno*self.blocksize/self.fs recstatus = RecordStatus( curT=curT, done=False) if self._progressCallback is not None: self._progressCallback(recstatus) curT_rounded_to_seconds = int(curT) if curT_rounded_to_seconds > self._curT_rounded_to_seconds: self._curT_rounded_to_seconds = curT_rounded_to_seconds print(f'{curT_rounded_to_seconds}', end='', flush=True) else: print('.', end='', flush=True) if self.rectime is not None and curT > self.rectime: # We are done! if self._progressCallback is not None: recstatus.done = True self._progressCallback(recstatus) self.stop = True return # Add the data to the file self._ad.resize(self._ablockno+1, axis=0) self._ad[self._ablockno, :, :] = indata # Increase the block counter self._ablockno += 1 # def _vCallback(self, frame, framectr): # self._video_frame_positions.append(self._ablockno()) # vframeno = self._vframeno # self._vd.resize(vframeno+1, axis=0) # self._vd[vframeno, :, :] = frame # self._vframeno += 1