From e5ee46dd20a1770993ce16c64ade197b9cb94340 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Wed, 21 Jun 2023 11:38:11 +0200 Subject: [PATCH] All hdf5 code on main thread. Using Python Queue() to buffer, and added handleQueue() method to Recording() that should be called occasionally --- src/lasp/lasp_record.py | 153 +++++++++++++++++++++++----------------- 1 file changed, 90 insertions(+), 63 deletions(-) diff --git a/src/lasp/lasp_record.py b/src/lasp/lasp_record.py index 158665b..995bdee 100644 --- a/src/lasp/lasp_record.py +++ b/src/lasp/lasp_record.py @@ -3,8 +3,14 @@ """ Read data from stream and record sound and video at the same time """ -import dataclasses, logging, os, time, h5py, threading +import dataclasses +import logging +import os +import time +import h5py +import threading import numpy as np +from queue import Queue from .lasp_atomic import Atomic from .lasp_cpp import (LASP_VERSION_MAJOR, LASP_VERSION_MINOR, InDataHandler, @@ -72,13 +78,14 @@ class Recording: self.curT_rounded_to_seconds = 0 # Counter of the number of blocks - self.ablockno = Atomic(0) + self.ablockno = 0 # Stop flag, set when recording is finished. self.stop = Atomic(False) # Mutex, on who is working with the H5py data - self.file_mtx = threading.Lock() + self.queue_mtx = threading.Lock() + self.queue = Queue() self.progressCallback = progressCallback @@ -105,12 +112,14 @@ class Recording: logging.debug("Starting record....") - self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback) + self.indh = InDataHandler( + streammgr, self.inCallback, self.resetCallback) if wait: logging.debug("Stop recording with CTRL-C") try: while not self.stop(): + self.handleQueue() time.sleep(0.01) except KeyboardInterrupt: logging.debug("Keyboard interrupt on record") @@ -120,39 +129,46 @@ class Recording: def resetCallback(self, daq): """ Function called with initial stream data. + + Important notes: we found that the h5py HDF5 library does not really + work properly in a multi-threaded context. Therefore, this method + SHOULD be called from the same thread, as the one that created self.f. + Which for the current implementation of StreamMgr and friends is true. + + Args: + daq: Daq device. """ - with self.file_mtx: - in_ch = daq.enabledInChannels() - blocksize = daq.framesPerBlock() - self.blocksize = blocksize - self.nchannels = daq.neninchannels() - self.fs = daq.samplerate() + in_ch = daq.enabledInChannels() + blocksize = daq.framesPerBlock() + self.blocksize = blocksize + self.nchannels = daq.neninchannels() + self.fs = daq.samplerate() - f = self.f + f = self.f - f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR - f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR + f.attrs["LASP_VERSION_MAJOR"] = LASP_VERSION_MAJOR + f.attrs["LASP_VERSION_MINOR"] = LASP_VERSION_MINOR - # Set the bunch of attributes - f.attrs["samplerate"] = daq.samplerate() - f.attrs["nchannels"] = daq.neninchannels() - f.attrs["blocksize"] = blocksize - f.attrs["sensitivity"] = [ch.sensitivity for ch in in_ch] - f.attrs["channelNames"] = [ch.name for ch in in_ch] + # Set the bunch of attributes + f.attrs["samplerate"] = daq.samplerate() + f.attrs["nchannels"] = daq.neninchannels() + f.attrs["blocksize"] = blocksize + f.attrs["sensitivity"] = [ch.sensitivity for ch in in_ch] + f.attrs["channelNames"] = [ch.name for ch in in_ch] - # Add the start delay here, as firstFrames() is called right after the - # constructor is called. time.time() returns a floating point - # number of seconds after epoch. - f.attrs["time"] = time.time() + self.startDelay + # Add the start delay here, as firstFrames() is called right after the + # constructor is called. time.time() returns a floating point + # number of seconds after epoch. + f.attrs["time"] = time.time() + self.startDelay - # In V2, we do not store JSON metadata anymore, but just an enumeration - # index to a physical quantity. - f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch] + # In V2, we do not store JSON metadata anymore, but just an enumeration + # index to a physical quantity. + f.attrs["qtys_enum_idx"] = [ch.qty.value for ch in in_ch] - # Measured physical quantity metadata - # This was how it was in LASP version < 1.0 - # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch] - f.flush() + # Measured physical quantity metadata + # This was how it was in LASP version < 1.0 + # f.attrs['qtys'] = [ch.qty.to_json() for ch in in_ch] + f.flush() def firstFrames(self, adata): """ @@ -183,6 +199,18 @@ class Recording: self.f.flush() def inCallback(self, adata): + """ + Called from different thread. + """ + if self.stop(): + return True + + with self.queue_mtx: + self.queue.put(np.copy(adata)) + + return True + + def handleQueue(self): """ This method is called when a block of audio data from the stream is available. It should return either True or False. @@ -191,17 +219,18 @@ class Recording: """ if self.stop(): - logging.debug('Stop flag set, early return in inCallback') + logging.debug('Stop flag set, early return in handleQueue') # Stop flag is raised. We do not add any data anymore. return True - with self.file_mtx: + with self.queue_mtx: + while not self.queue.empty(): + adata = self.queue.get() - if self.ad is None: - self.firstFrames(adata) + if self.ad is None: + self.firstFrames(adata) - self.__addTimeData(adata) - return True + self.__addTimeData(adata) def setDelete(self, val: bool): """ @@ -209,8 +238,7 @@ class Recording: the recording. Typically used for cleaning up after canceling a recording. """ - with self.file_mtx: - self.deleteFile = val + self.deleteFile = val def finish(self): """ @@ -222,30 +250,29 @@ class Recording: self.stop <<= True - with self.file_mtx: - self.f.flush() - # Remove indata handler, which also should remove callback function - # from StreamMgr. This, however does not have to happen - # instantaneously. For which we have to implement extra mutex - # guards in this class - del self.indh - self.indh = None - - # Remove handle to dataset otherwise the h5 file is not closed - # properly. - del self.ad - self.ad = None + self.f.flush() + # Remove indata handler, which also should remove callback function + # from StreamMgr. This, however does not have to happen + # instantaneously. For which we have to implement extra mutex + # guards in this class + del self.indh + self.indh = None - try: - # Close the recording file - self.f.close() - del self.f - except Exception as e: - logging.error(f"Error closing file: {e}") + # Remove handle to dataset otherwise the h5 file is not closed + # properly. + del self.ad + self.ad = None - logging.debug("Recording ended") - if self.deleteFile: - self.__deleteFile() + try: + # Close the recording file + self.f.close() + del self.f + except Exception as e: + logging.error(f"Error closing file: {e}") + + logging.debug("Recording ended") + if self.deleteFile: + self.__deleteFile() def __deleteFile(self): """ @@ -262,7 +289,7 @@ class Recording: """ # logging.debug('Recording::__addTimeData()') - curT = self.ablockno() * self.blocksize / self.fs + curT = self.ablockno * self.blocksize / self.fs # Increase the block counter self.ablockno += 1 @@ -275,10 +302,10 @@ class Recording: self.startDelay_passed = True # Reset the audio block counter and the recording time - self.ablockno = Atomic(1) + self.ablockno = 1 curT = 0 - ablockno = self.ablockno() + ablockno = self.ablockno recstatus = RecordStatus(curT=curT, done=False) if self.progressCallback is not None: