lasp/lasp/lasp_record.py

251 lines
8.4 KiB
Python
Raw Normal View History

2019-12-22 14:00:50 +00:00
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Read data from stream and record sound and video at the same time
"""
import dataclasses
import logging
import os
import time
import h5py
from .lasp_avstream import AvStream, AvType, StreamMsg
@dataclasses.dataclass
class RecordStatus:
curT: float
done: bool
class Recording:
2019-12-22 14:00:50 +00:00
def __init__(self, fn: str, stream: AvStream,
rectime: float = None, wait: bool = True,
progressCallback=None):
"""
Start a recording. Blocks if wait is set to True.
Args:
fn: Filename to record to. Extension is automatically added if not
provided.
stream: AvStream instance to record from. Should have input
channels!
rectime: Recording time [s], None for infinite, in seconds. If set
to None, or np.inf, the recording continues indefintely.
progressCallback: callable that is called with an instance of
RecordStatus instance as argument.
"""
ext = '.h5'
if ext not in fn:
fn += ext
2019-12-22 14:00:50 +00:00
self._stream = stream
self.rectime = rectime
self._fn = fn
self._video_frame_positions = []
self._curT_rounded_to_seconds = 0
# Counter of the number of blocks
self._ablockno = 0
self._vframeno = 0
self._progressCallback = progressCallback
self._wait = wait
self._f = h5py.File(self._fn, 'w')
# This flag is used to delete the file on finish(), and can be used
# when a recording is canceled.
self._deleteFile = False
f = self._f
nchannels = len(stream.input_channel_names)
# Input queue
self.inq = stream.addListener()
# Start the stream, if it is not running
try:
if not stream.isRunning():
metadata = stream.start()
else:
metadata = stream.getStreamMetaData()
except:
# Cleanup stuff, something is going wrong when starting the stream
try:
f.close()
except:
pass
self.__deleteFile()
self.blocksize = metadata['blocksize']
self.samplerate = metadata['samplerate']
self.dtype = metadata['dtype']
# The 'Audio' dataset as specified in lasp_measurement, where data is
# send to. We use gzip as compression, this gives moderate a moderate
# compression to the data.
self._ad = f.create_dataset('audio',
(1, self.blocksize, nchannels),
dtype=self.dtype,
maxshape=(
None, # This means, we can add blocks
# indefinitely
self.blocksize,
nchannels),
compression='gzip'
)
# TODO: This piece of code is not up-to-date and should be changed at a
# later instance once we really want to record video simultaneously
# with audio.
if stream.hasVideo():
video_x, video_y = stream.video_x, stream.video_y
self._vd = f.create_dataset('video',
(1, video_y, video_x, 3),
dtype='uint8',
maxshape=(
None, video_y, video_x, 3),
compression='gzip'
)
# Set the bunch of attributes
f.attrs['samplerate'] = self.samplerate
f.attrs['nchannels'] = nchannels
f.attrs['blocksize'] = self.blocksize
f.attrs['sensitivity'] = stream.input_sensitivity
f.attrs['channel_names'] = stream.input_channel_names
f.attrs['time'] = time.time()
# Measured quantities
f.attrs['qtys'] = [qty.to_json() for qty in stream.input_qtys]
logging.debug('Starting record....')
# TODO: Fix this later when we want video
# if stream.hasVideo():
# stream.addCallback(self._aCallback, AvType.audio_input)
self.stop = False
2019-12-22 14:00:50 +00:00
if self._wait:
logging.debug('Stop recording with CTRL-C')
try:
while not self.stop:
self.handleQueue()
time.sleep(0.01)
except KeyboardInterrupt:
logging.debug("Keyboard interrupt on record")
finally:
self.finish()
def setDelete(self, val: bool):
"""
Set the delete flag. If set, measurement file is deleted at the end of
the recording. Typically used for cleaning up after canceling a
recording.
"""
self._deleteFile = val
def finish(self):
"""
This method should be called to finish and a close a recording file,
remove the queue from the stream, etc.
"""
stream = self._stream
# TODO: Fix when video
# if stream.hasVideo():
# stream.removeCallback(self._vCallback, AvType.video_input)
# self._f['video_frame_positions'] = self._video_frame_positions
try:
stream.removeListener(self.inq)
except Exception as e:
logging.error(f'Could not remove queue from stream: {e}')
try:
# Close the recording file
self._f.close()
except Exception as e:
logging.error(f'Error closing file: {e}')
logging.debug('Recording ended')
if self._deleteFile:
self.__deleteFile()
def __deleteFile(self):
"""
Cleanup the recording file.
"""
try:
os.remove(self._fn)
except Exception as e:
logging.debug(f'Error deleting file: {self._fn}')
def handleQueue(self):
"""
This method should be called to grab data from the input queue, which
is filled by the stream, and put it into a file. It should be called at
a regular interval to prevent overflowing of the queue. It is called
within the start() method of the recording, if block is set to True.
Otherwise, it should be called from its parent at regular intervals.
For example, in Qt this can be done using a QTimer.
2019-12-22 14:00:50 +00:00
"""
while self.inq.qsize() > 0:
msg, data = self.inq.get()
if msg == StreamMsg.streamData:
self.__addTimeData(data)
elif msg == StreamMsg.streamStarted:
pass
elif msg == StreamMsg.streamMetaData:
pass
else:
# An error occured, we do not remove the file, but we stop.
self.stop = True
def __addTimeData(self, indata):
"""
Called by handleQueue() and adds new time data to the storage file.
"""
# The current time that is recorded and stored into the file, without
# the new data
curT = self._ablockno*self.blocksize/self.samplerate
recstatus = RecordStatus(
curT=curT,
done=False)
if self._progressCallback is not None:
self._progressCallback(recstatus)
curT_rounded_to_seconds = int(curT)
if curT_rounded_to_seconds > self._curT_rounded_to_seconds:
self._curT_rounded_to_seconds = curT_rounded_to_seconds
print(f'{curT_rounded_to_seconds}', end='', flush=True)
else:
print('.', end='', flush=True)
if self.rectime is not None and curT > self.rectime:
# We are done!
if self._progressCallback is not None:
recstatus.done = True
self._progressCallback(recstatus)
self.stop = True
return
# Add the data to the file
self._ad.resize(self._ablockno+1, axis=0)
self._ad[self._ablockno, :, :] = indata
# Increase the block counter
self._ablockno += 1
# def _vCallback(self, frame, framectr):
# self._video_frame_positions.append(self._ablockno())
# vframeno = self._vframeno
# self._vd.resize(vframeno+1, axis=0)
# self._vd[vframeno, :, :] = frame
# self._vframeno += 1