diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index 6c1253c..6b9c0ed 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -405,6 +405,7 @@ class AvStreamProcess(mp.Process): # Wrapper functions that safe some typing, they do not require an # explanation. def sendInQueues(self, msg, *data): + # logging.debug('sendInQueues()') for q in self.indata_qlist: # Fan out the input data to all queues in the queue list q.put((msg, data)) diff --git a/lasp/lasp_common.py b/lasp/lasp_common.py index 2377a39..f6003c9 100644 --- a/lasp/lasp_common.py +++ b/lasp/lasp_common.py @@ -49,14 +49,14 @@ class AvType(Enum): the stream.""" # Input stream - audio_input = auto() + audio_input = (0, 'input') # Output stream - audio_output = auto() + audio_output = (1, 'output') # Both input as well as output - audio_duplex = auto() - video = 4 + audio_duplex = (2, 'duplex') + # video = 4 @dataclass_json diff --git a/lasp/lasp_record.py b/lasp/lasp_record.py index 3974f95..6407e5a 100644 --- a/lasp/lasp_record.py +++ b/lasp/lasp_record.py @@ -15,10 +15,13 @@ class RecordStatus: class Recording: - + """ + Class used to perform a recording. + """ def __init__(self, fn: str, streammgr: StreamManager, rectime: float = None, wait: bool = True, - progressCallback=None): + progressCallback=None, + startDelay: float=0): """ Start a recording. Blocks if wait is set to True. @@ -31,6 +34,8 @@ class Recording: to None, or np.inf, the recording continues indefintely. progressCallback: callable that is called with an instance of RecordStatus instance as argument. + startDelay: Optional delay added before the recording is *actually* + started in [s]. """ ext = '.h5' if ext not in fn: @@ -39,33 +44,37 @@ class Recording: self.smgr = streammgr self.metadata = None - self.rectime = rectime - self._fn = fn + assert startDelay >= 0 + self.startDelay = startDelay + # Flag used to indicate that we have passed the start delay + self.startDelay_passed = False + self.rectime = rectime + self.fn = fn - self._video_frame_positions = [] - self._curT_rounded_to_seconds = 0 + self.video_frame_positions = [] + self.curT_rounded_to_seconds = 0 # Counter of the number of blocks - self._ablockno = 0 - self._vframeno = 0 + self.ablockno = 0 + self.vframeno = 0 - self._progressCallback = progressCallback - self._wait = wait + self.progressCallback = progressCallback + self.wait = wait - self._f = h5py.File(self._fn, 'w') + self.f = h5py.File(self.fn, 'w') # This flag is used to delete the file on finish(), and can be used # when a recording is canceled. - self._deleteFile = False + self.deleteFile = False try: # Input queue - self.inq = streammgr.addListener() + self.inq = streammgr.addInQueueListener() except RuntimeError: # Cleanup stuff, something is going wrong when starting the stream try: - self._f.close() + self.f.close() except Exception as e: logging.error( 'Error preliminary closing measurement file {fn}: {str(e)}') @@ -77,15 +86,15 @@ class Recording: streammgr.getStreamStatus(AvType.audio_input) streammgr.getStreamStatus(AvType.audio_duplex) - self._ad = None + self.ad = None logging.debug('Starting record....') # TODO: Fix this later when we want video # if stream.hasVideo(): - # stream.addCallback(self._aCallback, AvType.audio_input) + # stream.addCallback(self.aCallback, AvType.audio_input) self.stop = False - if self._wait: + if self.wait: logging.debug('Stop recording with CTRL-C') try: while not self.stop: @@ -107,8 +116,10 @@ class Recording: """ + # logging.debug('handleQueue()') while self.inq.qsize() > 0: msg, data = self.inq.get() + # logging.debug(f'Obtained message: {msg}') if msg == StreamMsg.streamData: samples, = data self.__addTimeData(samples) @@ -117,12 +128,15 @@ class Recording: avtype, metadata = data if metadata is None: raise RuntimeError('BUG: no stream metadata') - self.processStreamMetaData(metadata) + if avtype in (AvType.audio_duplex, AvType.audio_input): + self.processStreamMetaData(metadata) elif msg == StreamMsg.streamMetaData: logging.debug(f'handleQueue obtained message {msg}') avtype, metadata = data if metadata is not None: self.processStreamMetaData(metadata) + elif msg == StreamMsg.streamTemporaryError: + pass else: logging.debug(f'handleQueue obtained message {msg}') # An error occured, we do not remove the file, but we stop. @@ -138,13 +152,20 @@ class Recording: """ logging.debug('Recording::processStreamMetaData()') + if self.metadata is not None: + # Metadata already obtained. We check whether the new metadata is + # compatible. Otherwise an error occurs + if md != self.metadata: + raise RuntimeError('BUG: Incompatible stream metadata!') + return + # The 'Audio' dataset as specified in lasp_measurement, where data is # send to. We use gzip as compression, this gives moderate a moderate # compression to the data. - f = self._f + f = self.f blocksize = md.blocksize nchannels = len(md.in_ch) - self._ad = f.create_dataset('audio', + self.ad = f.create_dataset('audio', (1, blocksize, nchannels), dtype=md.dtype, maxshape=( @@ -160,7 +181,7 @@ class Recording: # with audio. # if smgr.hasVideo(): # video_x, video_y = smgr.video_x, smgr.video_y - # self._vd = f.create_dataset('video', + # self.vd = f.create_dataset('video', # (1, video_y, video_x, 3), # dtype='uint8', # maxshape=( @@ -188,7 +209,7 @@ class Recording: the recording. Typically used for cleaning up after canceling a recording. """ - self._deleteFile = val + self.deleteFile = val def finish(self): """ @@ -201,22 +222,22 @@ class Recording: # TODO: Fix when video # if smgr.hasVideo(): - # smgr.removeCallback(self._vCallback, AvType.video_input) - # self._f['video_frame_positions'] = self._video_frame_positions + # smgr.removeCallback(self.vCallback, AvType.video_input) + # self.f['video_frame_positions'] = self.video_frame_positions try: - smgr.removeListener(self.inq) + smgr.removeInQueueListener(self.inq) except Exception as e: logging.error(f'Could not remove queue from smgr: {e}') try: # Close the recording file - self._f.close() + self.f.close() except Exception as e: logging.error(f'Error closing file: {e}') logging.debug('Recording ended') - if self._deleteFile: + if self.deleteFile: self.__deleteFile() def __deleteFile(self): @@ -224,9 +245,9 @@ class Recording: Cleanup the recording file. """ try: - os.remove(self._fn) + os.remove(self.fn) except Exception as e: - logging.error(f'Error deleting file: {self._fn}') + logging.error(f'Error deleting file: {self.fn}') def __addTimeData(self, indata): """ @@ -248,39 +269,51 @@ class Recording: self.smgr.getStreamStatus(AvType.audio_duplex) return - curT = self._ablockno*self.blocksize/self.fs + curT = self.ablockno*self.blocksize/self.fs + + # Increase the block counter + self.ablockno += 1 + + if curT < self.startDelay and not self.startDelay_passed: + # Start delay has not been passed + return + elif curT >= 0 and not self.startDelay_passed: + # Start delay passed, switch the flag! + self.startDelay_passed = True + # Reset the audio block counter and the time + self.ablockno = 1 + curT = 0 + recstatus = RecordStatus( curT=curT, done=False) - if self._progressCallback is not None: - self._progressCallback(recstatus) + if self.progressCallback is not None: + self.progressCallback(recstatus) curT_rounded_to_seconds = int(curT) - if curT_rounded_to_seconds > self._curT_rounded_to_seconds: - self._curT_rounded_to_seconds = curT_rounded_to_seconds + if curT_rounded_to_seconds > self.curT_rounded_to_seconds: + self.curT_rounded_to_seconds = curT_rounded_to_seconds print(f'{curT_rounded_to_seconds}', end='', flush=True) else: print('.', end='', flush=True) if self.rectime is not None and curT > self.rectime: # We are done! - if self._progressCallback is not None: + if self.progressCallback is not None: recstatus.done = True - self._progressCallback(recstatus) + self.progressCallback(recstatus) self.stop = True return - # Add the data to the file - self._ad.resize(self._ablockno+1, axis=0) - self._ad[self._ablockno, :, :] = indata + # Add the data to the file, and resize the audio data blocks + self.ad.resize(self.ablockno, axis=0) + self.ad[self.ablockno-1, :, :] = indata - # Increase the block counter - self._ablockno += 1 # def _vCallback(self, frame, framectr): - # self._video_frame_positions.append(self._ablockno()) - # vframeno = self._vframeno - # self._vd.resize(vframeno+1, axis=0) - # self._vd[vframeno, :, :] = frame - # self._vframeno += 1 + # self.video_frame_positions.append(self.ablockno()) + # vframeno = self.vframeno + # self.vd.resize(vframeno+1, axis=0) + # self.vd[vframeno, :, :] = frame + # self.vframeno += 1