Bugfix (delete measurement when no data is in it) and cleanup of recording code
Some checks failed
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 1m49s

This commit is contained in:
Anne de Jong 2024-02-26 11:51:59 +01:00
parent e9f500d460
commit 878da3369b
2 changed files with 142 additions and 114 deletions

View File

@ -33,7 +33,7 @@ class Atomic:
def checkType(self, val):
if not (type(val) == bool or type(val) == int):
raise RuntimeError("Invalid type for Atomic")
raise ValueError("Invalid type for Atomic")
def __iadd__(self, toadd):
self.checkType(toadd)

View File

@ -7,6 +7,7 @@ import dataclasses, logging, os, time, h5py, threading
import numpy as np
from .lasp_atomic import Atomic
from enum import Enum, auto, unique
from .lasp_cpp import InDataHandler, StreamMgr
from .lasp_version import LASP_VERSION_MAJOR, LASP_VERSION_MINOR
import uuid
@ -18,6 +19,16 @@ class RecordStatus:
done: bool = False
class RecordingState(Enum):
"""Enumeration for the recording state"""
Waiting = auto()
Recording = auto()
AllDataStored = auto()
Finished = auto()
Error = auto()
class Recording:
"""
Class used to perform a recording. Recording data can come in from a
@ -53,83 +64,88 @@ class Recording:
if ext not in fn:
fn += ext
self.smgr = streammgr
self.metadata = None
self._smgr = streammgr
self._metadata = None
self._recState = RecordingState.Waiting
if startDelay < 0:
raise RuntimeError("Invalid start delay value. Should be >= 0")
self.startDelay = startDelay
# Flag used to indicate that we have passed the start delay
self.startDelay_passed = False
self._startDelay = startDelay
# The amount of seconds (float) that is to be recorded
self.rectime = rectime
self._requiredRecordingLength = rectime
# The file name to store data to
self.fn = fn
self._fn = fn
self.curT_rounded_to_seconds = 0
# Counter of the number of blocks that have been recorded
self._recordedBlocks = 0
# Counter of the number of blocks
self.ablockno = Atomic(0)
# Counter of the overall number of blocks that have passed (including
# the blocks that passed during waiting prior to recording)
self._allBlocks = 0
# Stop flag, set when recording is finished.
self.stop = Atomic(False)
self._stop = Atomic(False)
# Mutex, on who is working with the H5py data
self.file_mtx = threading.Lock()
# Mutex, on who is working with the H5py data and the class settings
self._rec_mutex = threading.Lock()
self.progressCallback = progressCallback
self._progressCallback = progressCallback
try:
# Open the file
self.f = h5py.File(self.fn, "w", "stdio")
self.f.flush()
self._h5file = h5py.File(self._fn, "w", "stdio")
self._h5file.flush()
except Exception as e:
logging.error(f"Error creating measurement file {e}")
raise
# This flag is used to delete the file on finish(), and can be used
# when a recording is canceled.
self.deleteFile = False
# when a recording is canceled. It is set to True at start, as the file will be deleted when no data is in it.
self._deleteFile = True
# Try to obtain stream metadata
streamstatus = streammgr.getStreamStatus(StreamMgr.StreamType.input)
if not streamstatus.runningOK():
raise RuntimeError(
"Stream is not running properly. Please first start the stream"
)
raise RuntimeError("Stream is not running properly. Cannot start recording")
self.ad = None
# Audio dataset
self._ad = None
logging.debug("Starting record....")
self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback)
self._indataHandler = InDataHandler(
streammgr, self.inCallback, self.resetCallback
)
if wait:
logging.debug("Stop recording with CTRL-C")
try:
while not self.stop():
while not self._stop():
time.sleep(0.01)
except KeyboardInterrupt:
logging.debug("Keyboard interrupt on record")
finally:
self.finish()
def curT(self):
"""Return currently recorded time as float"""
def resetCallback(self, daq):
"""
Function called with initial stream data.
"""
with self.file_mtx:
with self._rec_mutex:
in_ch = daq.enabledInChannels()
blocksize = daq.framesPerBlock()
self.blocksize = blocksize
self.nchannels = daq.neninchannels()
self.fs = daq.samplerate()
self._blocksize = blocksize
self._nchannels = daq.neninchannels()
self._fs = daq.samplerate()
f = self.f
f = self._h5file
f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR
f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR
@ -145,7 +161,7 @@ class Recording:
# Add the start delay here, as firstFrames() is called right after the
# constructor is called. time.time() returns a floating point
# number of seconds after epoch.
f.attrs["time"] = time.time() + self.startDelay
f.attrs["time"] = time.time() + self._startDelay
# In V2, we do not store JSON metadata anymore, but just an enumeration
# index to a physical quantity.
@ -156,10 +172,69 @@ class Recording:
# f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch]
f.flush()
def firstFrames(self, adata):
def inCallback(self, adata):
"""
This method is called when a block of audio data from the stream is
available. It should return either True or False.
When returning False, it will stop the stream.
"""
if self._stop():
logging.debug("Stop flag set, early return in inCallback")
# Stop flag is raised. We do not add any data anymore.
return True
with self._rec_mutex:
self._allBlocks += 1
match self._recState:
case RecordingState.Waiting:
if (
self._allBlocks * self._blocksize / self._fs
> self._startDelay
):
self._recState = RecordingState.Recording
case RecordingState.Recording:
if self._ad is None:
self.__addFirstFramesToFile(adata)
else:
self.__addTimeDataToFile(adata)
# Increase the block counter
self._recordedBlocks += 1
recstatus = RecordStatus(curT=self.recordedTime, done=False)
if self.recordedTime >= self._requiredRecordingLength:
self._recState = RecordingState.AllDataStored
self._stop <<= True
recstatus.done = True
if self._progressCallback is not None:
self._progressCallback(recstatus)
case RecordingState.AllDataStored:
pass
case RecordingState.Finished:
pass
return True
@property
def recordedTime(self):
"""Return recorded time (not rounded) as float"""
if self._ad is None:
return 0.0
return self._recordedBlocks * self._blocksize / self._fs
def __addFirstFramesToFile(self, adata):
"""
Set up the dataset in which to store the audio data. This will create
the attribute `self.ad`
the attribute `self.ad` and flip around the _deleteFile flag.
Args:
adata: Numpy array with data from DAQ
@ -170,40 +245,24 @@ class Recording:
# datatype = daq.dataType()
dtype = np.dtype(adata.dtype)
self.ad = self.f.create_dataset(
assert self._ad is None
self._ad = self._h5file.create_dataset(
"audio",
(1, self.blocksize, self.nchannels),
(1, self._blocksize, self._nchannels),
dtype=dtype,
maxshape=(
None, # This means, we can add blocks
# indefinitely
self.blocksize,
self.nchannels,
self._blocksize,
self._nchannels,
),
compression="gzip",
)
self.f.flush()
self._ad[0, :, :] = adata
def inCallback(self, adata):
"""
This method is called when a block of audio data from the stream is
available. It should return either True or False.
When returning False, it will stop the stream.
"""
if self.stop():
logging.debug("Stop flag set, early return in inCallback")
# Stop flag is raised. We do not add any data anymore.
return True
with self.file_mtx:
if self.ad is None:
self.firstFrames(adata)
self.__addTimeData(adata)
return True
self._h5file.flush()
self._deleteFile = False
def setDelete(self, val: bool):
"""
@ -211,8 +270,8 @@ class Recording:
the recording. Typically used for cleaning up after canceling a
recording.
"""
with self.file_mtx:
self.deleteFile = val
with self._rec_mutex:
self._deleteFile = val
def finish(self):
"""
@ -222,86 +281,55 @@ class Recording:
"""
logging.debug("Recording::finish()")
self.stop <<= True
self._stop <<= True
with self.file_mtx:
self.f.flush()
with self._rec_mutex:
if self._recState == RecordingState.Finished:
raise RuntimeError('Recording has already finished')
self._h5file.flush()
# Remove indata handler, which also should remove callback function
# from StreamMgr. This, however does not have to happen
# instantaneously. For which we have to implement extra mutex
# guards in this class
del self.indh
self.indh = None
del self._indataHandler
self._indataHandler = None
# Remove handle to dataset otherwise the h5 file is not closed
# properly.
del self.ad
self.ad = None
del self._ad
self._ad = None
try:
# Close the recording file
self.f.close()
del self.f
self._h5file.close()
del self._h5file
except Exception as e:
logging.error(f"Error closing file: {e}")
logging.debug("Recording ended")
if self.deleteFile:
if self._deleteFile:
self.__deleteFile()
self._recState = RecordingState.Finished
def __deleteFile(self):
"""
Cleanup the recording file.
"""
try:
os.remove(self.fn)
os.remove(self._fn)
except Exception as e:
logging.error(f"Error deleting file: {self.fn}: {str(e)}")
logging.error(f"Error deleting file: {self._fn}: {str(e)}")
def __addTimeData(self, indata):
def __addTimeDataToFile(self, indata):
"""
Called by handleQueue() and adds new time data to the storage file.
"""
# logging.debug('Recording::__addTimeData()')
curT = self.ablockno() * self.blocksize / self.fs
# Increase the block counter
self.ablockno += 1
if curT < self.startDelay and not self.startDelay_passed:
# Start delay has not been passed
return
elif curT >= 0 and not self.startDelay_passed:
# Start delay passed, switch the flag!
self.startDelay_passed = True
# Reset the audio block counter and the recording time
self.ablockno = Atomic(1)
curT = 0
ablockno = self.ablockno()
recstatus = RecordStatus(curT=curT, done=False)
if self.progressCallback is not None:
self.progressCallback(recstatus)
curT_rounded_to_seconds = int(curT)
if curT_rounded_to_seconds > self.curT_rounded_to_seconds:
self.curT_rounded_to_seconds = curT_rounded_to_seconds
print(f"{curT_rounded_to_seconds}", end="", flush=True)
else:
print(".", end="", flush=True)
if self.rectime is not None and curT > self.rectime:
# We are done!
if self.progressCallback is not None:
recstatus.done = True
self.progressCallback(recstatus)
self.stop <<= True
return
ablockno = self._recordedBlocks
# Add the data to the file, and resize the audio data blocks
self.ad.resize(ablockno, axis=0)
self.ad[ablockno - 1, :, :] = indata
self.f.flush()
self._ad.resize(ablockno + 1, axis=0)
self._ad[ablockno, :, :] = indata
self._h5file.flush()