Cleaned up recording code. Added start delay to recording

This commit is contained in:
Anne de Jong 2021-05-19 21:19:31 +02:00
parent 7153096552
commit 377291ccf4
3 changed files with 84 additions and 50 deletions

View File

@ -405,6 +405,7 @@ class AvStreamProcess(mp.Process):
# Wrapper functions that safe some typing, they do not require an
# explanation.
def sendInQueues(self, msg, *data):
# logging.debug('sendInQueues()')
for q in self.indata_qlist:
# Fan out the input data to all queues in the queue list
q.put((msg, data))

View File

@ -49,14 +49,14 @@ class AvType(Enum):
the stream."""
# Input stream
audio_input = auto()
audio_input = (0, 'input')
# Output stream
audio_output = auto()
audio_output = (1, 'output')
# Both input as well as output
audio_duplex = auto()
video = 4
audio_duplex = (2, 'duplex')
# video = 4
@dataclass_json

View File

@ -15,10 +15,13 @@ class RecordStatus:
class Recording:
"""
Class used to perform a recording.
"""
def __init__(self, fn: str, streammgr: StreamManager,
rectime: float = None, wait: bool = True,
progressCallback=None):
progressCallback=None,
startDelay: float=0):
"""
Start a recording. Blocks if wait is set to True.
@ -31,6 +34,8 @@ class Recording:
to None, or np.inf, the recording continues indefintely.
progressCallback: callable that is called with an instance of
RecordStatus instance as argument.
startDelay: Optional delay added before the recording is *actually*
started in [s].
"""
ext = '.h5'
if ext not in fn:
@ -39,33 +44,37 @@ class Recording:
self.smgr = streammgr
self.metadata = None
self.rectime = rectime
self._fn = fn
assert startDelay >= 0
self.startDelay = startDelay
# Flag used to indicate that we have passed the start delay
self.startDelay_passed = False
self.rectime = rectime
self.fn = fn
self._video_frame_positions = []
self._curT_rounded_to_seconds = 0
self.video_frame_positions = []
self.curT_rounded_to_seconds = 0
# Counter of the number of blocks
self._ablockno = 0
self._vframeno = 0
self.ablockno = 0
self.vframeno = 0
self._progressCallback = progressCallback
self._wait = wait
self.progressCallback = progressCallback
self.wait = wait
self._f = h5py.File(self._fn, 'w')
self.f = h5py.File(self.fn, 'w')
# This flag is used to delete the file on finish(), and can be used
# when a recording is canceled.
self._deleteFile = False
self.deleteFile = False
try:
# Input queue
self.inq = streammgr.addListener()
self.inq = streammgr.addInQueueListener()
except RuntimeError:
# Cleanup stuff, something is going wrong when starting the stream
try:
self._f.close()
self.f.close()
except Exception as e:
logging.error(
'Error preliminary closing measurement file {fn}: {str(e)}')
@ -77,15 +86,15 @@ class Recording:
streammgr.getStreamStatus(AvType.audio_input)
streammgr.getStreamStatus(AvType.audio_duplex)
self._ad = None
self.ad = None
logging.debug('Starting record....')
# TODO: Fix this later when we want video
# if stream.hasVideo():
# stream.addCallback(self._aCallback, AvType.audio_input)
# stream.addCallback(self.aCallback, AvType.audio_input)
self.stop = False
if self._wait:
if self.wait:
logging.debug('Stop recording with CTRL-C')
try:
while not self.stop:
@ -107,8 +116,10 @@ class Recording:
"""
# logging.debug('handleQueue()')
while self.inq.qsize() > 0:
msg, data = self.inq.get()
# logging.debug(f'Obtained message: {msg}')
if msg == StreamMsg.streamData:
samples, = data
self.__addTimeData(samples)
@ -117,12 +128,15 @@ class Recording:
avtype, metadata = data
if metadata is None:
raise RuntimeError('BUG: no stream metadata')
self.processStreamMetaData(metadata)
if avtype in (AvType.audio_duplex, AvType.audio_input):
self.processStreamMetaData(metadata)
elif msg == StreamMsg.streamMetaData:
logging.debug(f'handleQueue obtained message {msg}')
avtype, metadata = data
if metadata is not None:
self.processStreamMetaData(metadata)
elif msg == StreamMsg.streamTemporaryError:
pass
else:
logging.debug(f'handleQueue obtained message {msg}')
# An error occured, we do not remove the file, but we stop.
@ -138,13 +152,20 @@ class Recording:
"""
logging.debug('Recording::processStreamMetaData()')
if self.metadata is not None:
# Metadata already obtained. We check whether the new metadata is
# compatible. Otherwise an error occurs
if md != self.metadata:
raise RuntimeError('BUG: Incompatible stream metadata!')
return
# The 'Audio' dataset as specified in lasp_measurement, where data is
# send to. We use gzip as compression, this gives moderate a moderate
# compression to the data.
f = self._f
f = self.f
blocksize = md.blocksize
nchannels = len(md.in_ch)
self._ad = f.create_dataset('audio',
self.ad = f.create_dataset('audio',
(1, blocksize, nchannels),
dtype=md.dtype,
maxshape=(
@ -160,7 +181,7 @@ class Recording:
# with audio.
# if smgr.hasVideo():
# video_x, video_y = smgr.video_x, smgr.video_y
# self._vd = f.create_dataset('video',
# self.vd = f.create_dataset('video',
# (1, video_y, video_x, 3),
# dtype='uint8',
# maxshape=(
@ -188,7 +209,7 @@ class Recording:
the recording. Typically used for cleaning up after canceling a
recording.
"""
self._deleteFile = val
self.deleteFile = val
def finish(self):
"""
@ -201,22 +222,22 @@ class Recording:
# TODO: Fix when video
# if smgr.hasVideo():
# smgr.removeCallback(self._vCallback, AvType.video_input)
# self._f['video_frame_positions'] = self._video_frame_positions
# smgr.removeCallback(self.vCallback, AvType.video_input)
# self.f['video_frame_positions'] = self.video_frame_positions
try:
smgr.removeListener(self.inq)
smgr.removeInQueueListener(self.inq)
except Exception as e:
logging.error(f'Could not remove queue from smgr: {e}')
try:
# Close the recording file
self._f.close()
self.f.close()
except Exception as e:
logging.error(f'Error closing file: {e}')
logging.debug('Recording ended')
if self._deleteFile:
if self.deleteFile:
self.__deleteFile()
def __deleteFile(self):
@ -224,9 +245,9 @@ class Recording:
Cleanup the recording file.
"""
try:
os.remove(self._fn)
os.remove(self.fn)
except Exception as e:
logging.error(f'Error deleting file: {self._fn}')
logging.error(f'Error deleting file: {self.fn}')
def __addTimeData(self, indata):
"""
@ -248,39 +269,51 @@ class Recording:
self.smgr.getStreamStatus(AvType.audio_duplex)
return
curT = self._ablockno*self.blocksize/self.fs
curT = self.ablockno*self.blocksize/self.fs
# Increase the block counter
self.ablockno += 1
if curT < self.startDelay and not self.startDelay_passed:
# Start delay has not been passed
return
elif curT >= 0 and not self.startDelay_passed:
# Start delay passed, switch the flag!
self.startDelay_passed = True
# Reset the audio block counter and the time
self.ablockno = 1
curT = 0
recstatus = RecordStatus(
curT=curT,
done=False)
if self._progressCallback is not None:
self._progressCallback(recstatus)
if self.progressCallback is not None:
self.progressCallback(recstatus)
curT_rounded_to_seconds = int(curT)
if curT_rounded_to_seconds > self._curT_rounded_to_seconds:
self._curT_rounded_to_seconds = curT_rounded_to_seconds
if curT_rounded_to_seconds > self.curT_rounded_to_seconds:
self.curT_rounded_to_seconds = curT_rounded_to_seconds
print(f'{curT_rounded_to_seconds}', end='', flush=True)
else:
print('.', end='', flush=True)
if self.rectime is not None and curT > self.rectime:
# We are done!
if self._progressCallback is not None:
if self.progressCallback is not None:
recstatus.done = True
self._progressCallback(recstatus)
self.progressCallback(recstatus)
self.stop = True
return
# Add the data to the file
self._ad.resize(self._ablockno+1, axis=0)
self._ad[self._ablockno, :, :] = indata
# Add the data to the file, and resize the audio data blocks
self.ad.resize(self.ablockno, axis=0)
self.ad[self.ablockno-1, :, :] = indata
# Increase the block counter
self._ablockno += 1
# def _vCallback(self, frame, framectr):
# self._video_frame_positions.append(self._ablockno())
# vframeno = self._vframeno
# self._vd.resize(vframeno+1, axis=0)
# self._vd[vframeno, :, :] = frame
# self._vframeno += 1
# self.video_frame_positions.append(self.ablockno())
# vframeno = self.vframeno
# self.vd.resize(vframeno+1, axis=0)
# self.vd[vframeno, :, :] = frame
# self.vframeno += 1