All hdf5 code on main thread. Using Python Queue() to buffer, and added handleQueue() method to Recording() that should be called occasionally
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
Anne de Jong 2023-06-21 11:38:11 +02:00
parent 30ce35d29b
commit e5ee46dd20

View File

@ -3,8 +3,14 @@
""" """
Read data from stream and record sound and video at the same time Read data from stream and record sound and video at the same time
""" """
import dataclasses, logging, os, time, h5py, threading import dataclasses
import logging
import os
import time
import h5py
import threading
import numpy as np import numpy as np
from queue import Queue
from .lasp_atomic import Atomic from .lasp_atomic import Atomic
from .lasp_cpp import (LASP_VERSION_MAJOR, LASP_VERSION_MINOR, InDataHandler, from .lasp_cpp import (LASP_VERSION_MAJOR, LASP_VERSION_MINOR, InDataHandler,
@ -72,13 +78,14 @@ class Recording:
self.curT_rounded_to_seconds = 0 self.curT_rounded_to_seconds = 0
# Counter of the number of blocks # Counter of the number of blocks
self.ablockno = Atomic(0) self.ablockno = 0
# Stop flag, set when recording is finished. # Stop flag, set when recording is finished.
self.stop = Atomic(False) self.stop = Atomic(False)
# Mutex, on who is working with the H5py data # Mutex, on who is working with the H5py data
self.file_mtx = threading.Lock() self.queue_mtx = threading.Lock()
self.queue = Queue()
self.progressCallback = progressCallback self.progressCallback = progressCallback
@ -105,12 +112,14 @@ class Recording:
logging.debug("Starting record....") logging.debug("Starting record....")
self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback) self.indh = InDataHandler(
streammgr, self.inCallback, self.resetCallback)
if wait: if wait:
logging.debug("Stop recording with CTRL-C") logging.debug("Stop recording with CTRL-C")
try: try:
while not self.stop(): while not self.stop():
self.handleQueue()
time.sleep(0.01) time.sleep(0.01)
except KeyboardInterrupt: except KeyboardInterrupt:
logging.debug("Keyboard interrupt on record") logging.debug("Keyboard interrupt on record")
@ -120,39 +129,46 @@ class Recording:
def resetCallback(self, daq): def resetCallback(self, daq):
""" """
Function called with initial stream data. Function called with initial stream data.
Important notes: we found that the h5py HDF5 library does not really
work properly in a multi-threaded context. Therefore, this method
SHOULD be called from the same thread, as the one that created self.f.
Which for the current implementation of StreamMgr and friends is true.
Args:
daq: Daq device.
""" """
with self.file_mtx: in_ch = daq.enabledInChannels()
in_ch = daq.enabledInChannels() blocksize = daq.framesPerBlock()
blocksize = daq.framesPerBlock() self.blocksize = blocksize
self.blocksize = blocksize self.nchannels = daq.neninchannels()
self.nchannels = daq.neninchannels() self.fs = daq.samplerate()
self.fs = daq.samplerate()
f = self.f f = self.f
f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR
f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR
# Set the bunch of attributes # Set the bunch of attributes
f.attrs["samplerate"] = daq.samplerate() f.attrs["samplerate"] = daq.samplerate()
f.attrs["nchannels"] = daq.neninchannels() f.attrs["nchannels"] = daq.neninchannels()
f.attrs["blocksize"] = blocksize f.attrs["blocksize"] = blocksize
f.attrs["sensitivity"] = [ch.sensitivity for ch in in_ch] f.attrs["sensitivity"] = [ch.sensitivity for ch in in_ch]
f.attrs["channelNames"] = [ch.name for ch in in_ch] f.attrs["channelNames"] = [ch.name for ch in in_ch]
# Add the start delay here, as firstFrames() is called right after the # Add the start delay here, as firstFrames() is called right after the
# constructor is called. time.time() returns a floating point # constructor is called. time.time() returns a floating point
# number of seconds after epoch. # number of seconds after epoch.
f.attrs["time"] = time.time() + self.startDelay f.attrs["time"] = time.time() + self.startDelay
# In V2, we do not store JSON metadata anymore, but just an enumeration # In V2, we do not store JSON metadata anymore, but just an enumeration
# index to a physical quantity. # index to a physical quantity.
f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch] f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch]
# Measured physical quantity metadata # Measured physical quantity metadata
# This was how it was in LASP version < 1.0 # This was how it was in LASP version < 1.0
# f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch] # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch]
f.flush() f.flush()
def firstFrames(self, adata): def firstFrames(self, adata):
""" """
@ -183,6 +199,18 @@ class Recording:
self.f.flush() self.f.flush()
def inCallback(self, adata): def inCallback(self, adata):
"""
Called from different thread.
"""
if self.stop():
return True
with self.queue_mtx:
self.queue.put(np.copy(adata))
return True
def handleQueue(self):
""" """
This method is called when a block of audio data from the stream is This method is called when a block of audio data from the stream is
available. It should return either True or False. available. It should return either True or False.
@ -191,17 +219,18 @@ class Recording:
""" """
if self.stop(): if self.stop():
logging.debug('Stop flag set, early return in inCallback') logging.debug('Stop flag set, early return in handleQueue')
# Stop flag is raised. We do not add any data anymore. # Stop flag is raised. We do not add any data anymore.
return True return True
with self.file_mtx: with self.queue_mtx:
while not self.queue.empty():
adata = self.queue.get()
if self.ad is None: if self.ad is None:
self.firstFrames(adata) self.firstFrames(adata)
self.__addTimeData(adata) self.__addTimeData(adata)
return True
def setDelete(self, val: bool): def setDelete(self, val: bool):
""" """
@ -209,8 +238,7 @@ class Recording:
the recording. Typically used for cleaning up after canceling a the recording. Typically used for cleaning up after canceling a
recording. recording.
""" """
with self.file_mtx: self.deleteFile = val
self.deleteFile = val
def finish(self): def finish(self):
""" """
@ -222,30 +250,29 @@ class Recording:
self.stop <<= True self.stop <<= True
with self.file_mtx: self.f.flush()
self.f.flush() # Remove indata handler, which also should remove callback function
# Remove indata handler, which also should remove callback function # from StreamMgr. This, however does not have to happen
# from StreamMgr. This, however does not have to happen # instantaneously. For which we have to implement extra mutex
# instantaneously. For which we have to implement extra mutex # guards in this class
# guards in this class del self.indh
del self.indh self.indh = None
self.indh = None
# Remove handle to dataset otherwise the h5 file is not closed
# properly.
del self.ad
self.ad = None
try: # Remove handle to dataset otherwise the h5 file is not closed
# Close the recording file # properly.
self.f.close() del self.ad
del self.f self.ad = None
except Exception as e:
logging.error(f"Error closing file: {e}")
logging.debug("Recording ended") try:
if self.deleteFile: # Close the recording file
self.__deleteFile() self.f.close()
del self.f
except Exception as e:
logging.error(f"Error closing file: {e}")
logging.debug("Recording ended")
if self.deleteFile:
self.__deleteFile()
def __deleteFile(self): def __deleteFile(self):
""" """
@ -262,7 +289,7 @@ class Recording:
""" """
# logging.debug('Recording::__addTimeData()') # logging.debug('Recording::__addTimeData()')
curT = self.ablockno() * self.blocksize / self.fs curT = self.ablockno * self.blocksize / self.fs
# Increase the block counter # Increase the block counter
self.ablockno += 1 self.ablockno += 1
@ -275,10 +302,10 @@ class Recording:
self.startDelay_passed = True self.startDelay_passed = True
# Reset the audio block counter and the recording time # Reset the audio block counter and the recording time
self.ablockno = Atomic(1) self.ablockno = 1
curT = 0 curT = 0
ablockno = self.ablockno() ablockno = self.ablockno
recstatus = RecordStatus(curT=curT, done=False) recstatus = RecordStatus(curT=curT, done=False)
if self.progressCallback is not None: if self.progressCallback is not None: