355 lines
12 KiB
Python
355 lines
12 KiB
Python
#!/usr/bin/python3.8
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Read data from stream and record sound and video at the same time
|
|
"""
|
|
import dataclasses, logging, os, time, h5py, threading
|
|
import numpy as np
|
|
|
|
from .lasp_atomic import Atomic
|
|
from enum import Enum, auto, unique
|
|
from .lasp_cpp import InDataHandler, StreamMgr
|
|
from .lasp_version import LASP_VERSION_MAJOR, LASP_VERSION_MINOR
|
|
import uuid
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
# logger.setLevel(logging.DEBUG)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class RecordStatus:
|
|
curT: float = 0
|
|
done: bool = False
|
|
|
|
|
|
class RecordingState(Enum):
|
|
"""Enumeration for the recording state"""
|
|
|
|
Waiting = auto()
|
|
Recording = auto()
|
|
AllDataStored = auto()
|
|
Finished = auto()
|
|
Error = auto()
|
|
|
|
|
|
class Recording:
|
|
"""
|
|
Class used to perform a recording. Recording data can come in from a
|
|
different thread, that is supposed to call the `inCallback` method, with
|
|
audio data as an argument.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
fn: str,
|
|
streammgr: StreamMgr,
|
|
rectime: float = None,
|
|
wait: bool = True,
|
|
progressCallback=None,
|
|
startDelay: float = 0,
|
|
):
|
|
"""
|
|
Start a recording. Blocks if wait is set to True.
|
|
|
|
Args:
|
|
fn: Filename to record to. Extension is automatically added if not
|
|
provided.
|
|
stream: AvStream instance to record from. Should have input
|
|
channels!
|
|
rectime: Recording time [s], None for infinite, in seconds. If set
|
|
to None, or np.inf, the recording continues indefintely.
|
|
progressCallback: callable that is called with an instance of
|
|
RecordStatus instance as argument.
|
|
startDelay: Optional delay added before the recording is *actually*
|
|
started in [s].
|
|
"""
|
|
logger.debug("__init__()")
|
|
ext = ".h5"
|
|
if ext not in fn:
|
|
fn += ext
|
|
|
|
if os.path.exists(fn):
|
|
raise RuntimeError("Recording file name already exists / is in use")
|
|
|
|
self._smgr = streammgr
|
|
self._metadata = None
|
|
|
|
self._recState = RecordingState.Waiting
|
|
|
|
if startDelay < 0:
|
|
raise RuntimeError("Invalid start delay value. Should be >= 0")
|
|
|
|
self._startDelay = startDelay
|
|
|
|
# The amount of seconds (float) that is to be recorded
|
|
self._requiredRecordingLength = rectime
|
|
|
|
# The file name to store data to
|
|
self._fn = fn
|
|
|
|
# Counter of the number of blocks that have been recorded
|
|
self._recordedBlocks = 0
|
|
|
|
# Counter of the overall number of blocks that have passed (including
|
|
# the blocks that passed during waiting prior to recording)
|
|
self._allBlocks = 0
|
|
|
|
# Stop flag, set when recording is finished.
|
|
self._stop = Atomic(False)
|
|
|
|
# Mutex, on who is working with the H5py data and the class settings
|
|
self._rec_mutex = threading.RLock()
|
|
|
|
self._progressCallback = progressCallback
|
|
|
|
try:
|
|
# Open the file
|
|
self._h5file = h5py.File(self._fn, "w", "stdio")
|
|
self._h5file.flush()
|
|
except Exception as e:
|
|
logger.error(f"Error creating measurement file {e}")
|
|
raise
|
|
|
|
# This flag is used to delete the file on finish(), and can be used
|
|
# when a recording is canceled. It is set to True at start, as the file will be deleted when no data is in it.
|
|
self._deleteFile = True
|
|
|
|
# Try to obtain stream metadata
|
|
streamstatus = streammgr.getStreamStatus(StreamMgr.StreamType.input)
|
|
if not streamstatus.runningOK():
|
|
raise RuntimeError("Stream is not running properly. Cannot start recording")
|
|
|
|
# Audio dataset
|
|
self._ad = None
|
|
|
|
logger.debug("Starting record....")
|
|
|
|
# In the PyInDataHandler, a weak reference is stored to the python
|
|
# methods reset and incallback. One way or another, the weak ref is gone
|
|
# on the callback thread. If we store an "extra" ref to this method over
|
|
# here, the weak ref stays alive. We do not know whether this is a bug
|
|
# or a feature, but in any case storing this extra ref to inCallback
|
|
# solves the problem.
|
|
self._incalback_cpy = self.inCallback
|
|
self._indataHandler = InDataHandler(
|
|
streammgr, self._incalback_cpy, self.resetCallback
|
|
)
|
|
|
|
if wait:
|
|
logger.debug("Stop recording with CTRL-C")
|
|
try:
|
|
while not self._stop():
|
|
time.sleep(0.01)
|
|
except KeyboardInterrupt:
|
|
logger.debug("Keyboard interrupt on record")
|
|
finally:
|
|
self.finish()
|
|
|
|
def curT(self):
|
|
"""Return currently recorded time as float"""
|
|
|
|
def resetCallback(self, daq):
|
|
"""
|
|
Function called with initial stream data.
|
|
"""
|
|
logger.debug(f"resetCallback({daq})")
|
|
with self._rec_mutex:
|
|
in_ch = daq.enabledInChannels()
|
|
blocksize = daq.framesPerBlock()
|
|
self._blocksize = blocksize
|
|
self._nchannels = daq.neninchannels()
|
|
self._fs = daq.samplerate()
|
|
|
|
f = self._h5file
|
|
|
|
f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR
|
|
f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR
|
|
|
|
# Set the bunch of attributes
|
|
f.attrs["samplerate"] = daq.samplerate()
|
|
f.attrs["nchannels"] = daq.neninchannels()
|
|
f.attrs["blocksize"] = blocksize
|
|
f.attrs["sensitivity"] = [ch.sensitivity for ch in in_ch]
|
|
f.attrs["channelNames"] = [ch.name for ch in in_ch]
|
|
f.attrs["UUID"] = str(uuid.uuid1())
|
|
|
|
# Add the start delay here, as firstFrames() is called right after the
|
|
# constructor is called. time.time() returns a floating point
|
|
# number of seconds after epoch.
|
|
f.attrs["time"] = time.time() + self._startDelay
|
|
|
|
# In V2, we do not store JSON metadata anymore, but just an enumeration
|
|
# index to a physical quantity.
|
|
f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch]
|
|
|
|
# Measured physical quantity metadata
|
|
# This was how it was in LASP version < 1.0
|
|
# f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch]
|
|
f.flush()
|
|
|
|
def inCallback(self, adata):
|
|
"""
|
|
This method is called when a block of audio data from the stream is
|
|
available. It should return either True or False.
|
|
|
|
When returning False, it will stop the stream.
|
|
|
|
"""
|
|
logger.debug(f"inCallback()")
|
|
if self._stop():
|
|
logger.debug("Stop flag set, early return in inCallback")
|
|
# Stop flag is raised. We do not add any data anymore.
|
|
return False
|
|
|
|
with self._rec_mutex:
|
|
|
|
self._allBlocks += 1
|
|
|
|
match self._recState:
|
|
case RecordingState.Waiting:
|
|
if self._allBlocks * self._blocksize / self._fs > self._startDelay:
|
|
self._recState = RecordingState.Recording
|
|
|
|
case RecordingState.Recording:
|
|
if self._ad is None:
|
|
self.__addFirstFramesToFile(adata)
|
|
else:
|
|
self.__addTimeDataToFile(adata)
|
|
|
|
# Increase the block counter
|
|
self._recordedBlocks += 1
|
|
|
|
recstatus = RecordStatus(curT=self.recordedTime, done=False)
|
|
|
|
if (
|
|
self._requiredRecordingLength is not None
|
|
and self.recordedTime >= self._requiredRecordingLength
|
|
):
|
|
self._recState = RecordingState.AllDataStored
|
|
self._stop <<= True
|
|
recstatus.done = True
|
|
|
|
if self._progressCallback is not None:
|
|
self._progressCallback(recstatus)
|
|
|
|
case RecordingState.AllDataStored:
|
|
return False
|
|
|
|
case RecordingState.Finished:
|
|
return False
|
|
|
|
return True
|
|
|
|
@property
|
|
def recordedTime(self):
|
|
"""Return recorded time (not rounded) as float"""
|
|
with self._rec_mutex:
|
|
if self._ad is None:
|
|
return 0.0
|
|
return self._recordedBlocks * self._blocksize / self._fs
|
|
|
|
def __addFirstFramesToFile(self, adata):
|
|
"""
|
|
Set up the dataset in which to store the audio data. This will create
|
|
the attribute `self.ad` and flip around the _deleteFile flag.
|
|
|
|
Args:
|
|
adata: Numpy array with data from DAQ
|
|
|
|
"""
|
|
with self._rec_mutex:
|
|
# The array data type cannot
|
|
# datatype = daq.dataType()
|
|
dtype = np.dtype(adata.dtype)
|
|
|
|
assert self._ad is None
|
|
|
|
self._ad = self._h5file.create_dataset(
|
|
"audio",
|
|
(1, self._blocksize, self._nchannels),
|
|
dtype=dtype,
|
|
maxshape=(
|
|
None, # This means, we can add blocks
|
|
# indefinitely
|
|
self._blocksize,
|
|
self._nchannels,
|
|
),
|
|
compression="gzip",
|
|
)
|
|
self._ad[0, :, :] = adata
|
|
|
|
self._h5file.flush()
|
|
self._deleteFile = False
|
|
|
|
def setDelete(self, val: bool):
|
|
"""
|
|
Set the delete flag. If set, measurement file is deleted at the end of
|
|
the recording. Typically used for cleaning up after canceling a
|
|
recording.
|
|
"""
|
|
with self._rec_mutex:
|
|
self._deleteFile = val
|
|
|
|
def finish(self):
|
|
"""
|
|
This method should be called to finish and a close a recording file,
|
|
remove the queue from the stream, etc.
|
|
|
|
"""
|
|
logger.debug("Recording::finish()")
|
|
|
|
self._stop <<= True
|
|
|
|
with self._rec_mutex:
|
|
|
|
if self._recState == RecordingState.Finished:
|
|
raise RuntimeError("Recording has already finished")
|
|
|
|
# Remove indata handler, which also should remove callback function
|
|
# from StreamMgr. This, however does not have to happen
|
|
# instantaneously. For which we have to implement extra mutex
|
|
# guards in this class
|
|
del self._indataHandler
|
|
|
|
self._h5file.flush()
|
|
|
|
# Remove handle to dataset otherwise the h5 file is not closed
|
|
# properly.
|
|
del self._ad
|
|
|
|
try:
|
|
# Close the recording file
|
|
self._h5file.close()
|
|
del self._h5file
|
|
except Exception as e:
|
|
logger.error(f"Error closing file: {e}")
|
|
|
|
logger.debug("Recording ended")
|
|
if self._deleteFile:
|
|
self.__deleteFile()
|
|
self._recState = RecordingState.Finished
|
|
|
|
def __deleteFile(self):
|
|
"""
|
|
Cleanup the recording file.
|
|
"""
|
|
try:
|
|
os.remove(self._fn)
|
|
except Exception as e:
|
|
logger.error(f"Error deleting file: {self._fn}: {str(e)}")
|
|
|
|
def __addTimeDataToFile(self, indata):
|
|
"""
|
|
Called by handleQueue() and adds new time data to the storage file.
|
|
"""
|
|
with self._rec_mutex:
|
|
|
|
ablockno = self._recordedBlocks
|
|
|
|
# Add the data to the file, and resize the audio data blocks
|
|
self._ad.resize(ablockno + 1, axis=0)
|
|
self._ad[ablockno, :, :] = indata
|
|
self._h5file.flush()
|