diff --git a/CMakeLists.txt b/CMakeLists.txt index 3e53eb5..7fb0fd9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -109,6 +109,10 @@ set(CMAKE_C_FLAGS_RELEASE "-O3 -flto -mfpmath=sse -march=x86-64 -mtune=native \ -fdata-sections -ffunction-sections -fomit-frame-pointer -finline-functions") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wno-type-limits -Werror=return-type") +set(CMAKE_CXX_FLAGS_RELEASE "-O3 -flto -mfpmath=sse -march=x86-64 -mtune=native \ +-fdata-sections -ffunction-sections -fomit-frame-pointer -finline-functions") + +set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g -Wall ") # ############################# End compilation flags include_directories(/usr/lib/python3.10/site-packages/numpy/core/include) diff --git a/cpp_src/device/lasp_indatahandler.cpp b/cpp_src/device/lasp_indatahandler.cpp index c9a4d15..a6a0946 100644 --- a/cpp_src/device/lasp_indatahandler.cpp +++ b/cpp_src/device/lasp_indatahandler.cpp @@ -41,8 +41,8 @@ void InDataHandler::stop() { InDataHandler::~InDataHandler() { DEBUGTRACE_ENTER; - checkRightThread(); #if LASP_DEBUG == 1 + checkRightThread(); if (!stopCalled) { std::cerr << "************ BUG: Stop function not called while arriving at " "InDataHandler's destructor. Fix this by calling " diff --git a/cpp_src/dsp/lasp_threadedindatahandler.cpp b/cpp_src/dsp/lasp_threadedindatahandler.cpp index cd60750..bf47d4b 100644 --- a/cpp_src/dsp/lasp_threadedindatahandler.cpp +++ b/cpp_src/dsp/lasp_threadedindatahandler.cpp @@ -78,8 +78,8 @@ void ThreadedInDataHandlerBase::startThread() { _1), resetCallback); - _indatahandler->start(); _thread_can_safely_run = true; + _indatahandler->start(); } void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( @@ -92,13 +92,14 @@ void ThreadedInDataHandlerBase::_inCallbackFromInDataHandler( _queue->push(daqdata); if (!_thread_running) { DEBUGTRACE_PRINT("Pushing new thread in pool"); + _thread_running = true; _pool.push_task(&ThreadedInDataHandlerBase::threadFcn, this); } } void ThreadedInDataHandlerBase::stopThread() { DEBUGTRACE_ENTER; - if(!_indatahandler) { + if (!_indatahandler) { throw rte("BUG: ThreadedIndataHandler not running"); } @@ -129,7 +130,6 @@ ThreadedInDataHandlerBase::~ThreadedInDataHandlerBase() { void ThreadedInDataHandlerBase::threadFcn() { DEBUGTRACE_ENTER; - _thread_running = true; while (!_queue->empty() && _thread_can_safely_run) { // Call inCallback_threaded diff --git a/cpp_src/pybind11/lasp_pyindatahandler.cpp b/cpp_src/pybind11/lasp_pyindatahandler.cpp index adb107d..ce1ca9f 100644 --- a/cpp_src/pybind11/lasp_pyindatahandler.cpp +++ b/cpp_src/pybind11/lasp_pyindatahandler.cpp @@ -104,9 +104,10 @@ class PyIndataHandler : public ThreadedInDataHandler { /** * @brief The callback functions that is called. */ - py::function cb, reset_callback; + std::unique_ptr cb, reset_callback; + bool _done{false}; -public: + public: /** * @brief Initialize PyIndataHandler * @@ -117,8 +118,9 @@ public: * is called, when a stream stops, this pointer / handle will dangle. */ PyIndataHandler(SmgrHandle mgr, py::function cb, py::function reset_callback) - : ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) { - + : ThreadedInDataHandler(mgr), + cb(std::make_unique(cb)), + reset_callback(std::make_unique(reset_callback)) { DEBUGTRACE_ENTER; /// Start should be called externally, as at constructor time no virtual /// functions should be called. @@ -139,12 +141,13 @@ public: */ void reset(const Daq *daq) { DEBUGTRACE_ENTER; + if (_done) return; try { py::gil_scoped_acquire acquire; if (daq) { - reset_callback(daq); + (*reset_callback)(daq); } else { - reset_callback(py::none()); + (*reset_callback)(py::none()); } } catch (py::error_already_set &e) { cerr << "*************** Error calling reset callback!\n"; @@ -169,31 +172,40 @@ public: /* DEBUGTRACE_ENTER; */ using DataType = DataTypeDescriptor::DataType; + if (_done) return; try { py::gil_scoped_acquire acquire; py::object bool_val; switch (d.dtype) { - case (DataType::dtype_int8): { - bool_val = cb(getPyArrayNoCpy(d)); - } break; - case (DataType::dtype_int16): { - bool_val = cb(getPyArrayNoCpy(d)); - } break; - case (DataType::dtype_int32): { - bool_val = cb(getPyArrayNoCpy(d)); - } break; - case (DataType::dtype_fl32): { - bool_val = cb(getPyArrayNoCpy(d)); - } break; - case (DataType::dtype_fl64): { - bool_val = cb(getPyArrayNoCpy(d)); - } break; - default: - throw std::runtime_error("BUG"); - } // End of switch + case (DataType::dtype_int8): { + bool_val = (*cb)(getPyArrayNoCpy(d)); + } break; + case (DataType::dtype_int16): { + bool_val = (*cb)(getPyArrayNoCpy(d)); + } break; + case (DataType::dtype_int32): { + bool_val = (*cb)(getPyArrayNoCpy(d)); + } break; + case (DataType::dtype_fl32): { + bool_val = (*cb)(getPyArrayNoCpy(d)); + } break; + case (DataType::dtype_fl64): { + bool_val = (*cb)(getPyArrayNoCpy(d)); + } break; + default: + throw std::runtime_error("BUG"); + } // End of switch bool res = bool_val.cast(); + if (res == false) { + DEBUGTRACE_PRINT("Setting callbacks to None") + _done = true; + // cb = py::function(py::none()); + // reset_callback = py::function(py::none()); + cb.reset(); + reset_callback.reset(); + } } catch (py::error_already_set &e) { cerr << "ERROR: Python raised exception from callback function: "; cerr << e.what() << endl; diff --git a/pyproject.toml b/pyproject.toml index 2c1cc60..c7aabce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ requires-python = ">=3.10" description = "Library for Acoustic Signal Processing" license = { "file" = "LICENSE" } authors = [{ "name" = "J.A. de Jong", "email" = "j.a.dejong@ascee.nl" }] -version = "1.4.6" +version = "1.4.7" keywords = ["DSP", "DAQ", "Signal processing"] diff --git a/python_src/lasp/lasp_record.py b/python_src/lasp/lasp_record.py index 66adef2..c918ceb 100644 --- a/python_src/lasp/lasp_record.py +++ b/python_src/lasp/lasp_record.py @@ -11,6 +11,11 @@ from enum import Enum, auto, unique from .lasp_cpp import InDataHandler, StreamMgr from .lasp_version import LASP_VERSION_MAJOR, LASP_VERSION_MINOR import uuid +import logging + +logger = logging.getLogger(__name__) +# logger.setLevel(logging.DEBUG) +logger.setLevel(logging.INFO) @dataclasses.dataclass @@ -60,10 +65,14 @@ class Recording: startDelay: Optional delay added before the recording is *actually* started in [s]. """ + logger.debug("__init__()") ext = ".h5" if ext not in fn: fn += ext + if os.path.exists(fn): + raise RuntimeError("Recording file name already exists / is in use") + self._smgr = streammgr self._metadata = None @@ -91,7 +100,7 @@ class Recording: self._stop = Atomic(False) # Mutex, on who is working with the H5py data and the class settings - self._rec_mutex = threading.Lock() + self._rec_mutex = threading.RLock() self._progressCallback = progressCallback @@ -100,7 +109,7 @@ class Recording: self._h5file = h5py.File(self._fn, "w", "stdio") self._h5file.flush() except Exception as e: - logging.error(f"Error creating measurement file {e}") + logger.error(f"Error creating measurement file {e}") raise # This flag is used to delete the file on finish(), and can be used @@ -115,19 +124,19 @@ class Recording: # Audio dataset self._ad = None - logging.debug("Starting record....") + logger.debug("Starting record....") self._indataHandler = InDataHandler( streammgr, self.inCallback, self.resetCallback ) if wait: - logging.debug("Stop recording with CTRL-C") + logger.debug("Stop recording with CTRL-C") try: while not self._stop(): time.sleep(0.01) except KeyboardInterrupt: - logging.debug("Keyboard interrupt on record") + logger.debug("Keyboard interrupt on record") finally: self.finish() @@ -138,6 +147,7 @@ class Recording: """ Function called with initial stream data. """ + logger.debug(f"resetCallback({daq})") with self._rec_mutex: in_ch = daq.enabledInChannels() blocksize = daq.framesPerBlock() @@ -180,10 +190,11 @@ class Recording: When returning False, it will stop the stream. """ + logger.debug(f"inCallback({adata})") if self._stop(): - logging.debug("Stop flag set, early return in inCallback") + logger.debug("Stop flag set, early return in inCallback") # Stop flag is raised. We do not add any data anymore. - return True + return False with self._rec_mutex: @@ -191,10 +202,7 @@ class Recording: match self._recState: case RecordingState.Waiting: - if ( - self._allBlocks * self._blocksize / self._fs - > self._startDelay - ): + if self._allBlocks * self._blocksize / self._fs > self._startDelay: self._recState = RecordingState.Recording case RecordingState.Recording: @@ -208,7 +216,10 @@ class Recording: recstatus = RecordStatus(curT=self.recordedTime, done=False) - if self._requiredRecordingLength is not None and self.recordedTime >= self._requiredRecordingLength: + if ( + self._requiredRecordingLength is not None + and self.recordedTime >= self._requiredRecordingLength + ): self._recState = RecordingState.AllDataStored self._stop <<= True recstatus.done = True @@ -227,9 +238,10 @@ class Recording: @property def recordedTime(self): """Return recorded time (not rounded) as float""" - if self._ad is None: - return 0.0 - return self._recordedBlocks * self._blocksize / self._fs + with self._rec_mutex: + if self._ad is None: + return 0.0 + return self._recordedBlocks * self._blocksize / self._fs def __addFirstFramesToFile(self, adata): """ @@ -240,29 +252,29 @@ class Recording: adata: Numpy array with data from DAQ """ + with self._rec_mutex: + # The array data type cannot + # datatype = daq.dataType() + dtype = np.dtype(adata.dtype) - # The array data type cannot - # datatype = daq.dataType() - dtype = np.dtype(adata.dtype) + assert self._ad is None - assert self._ad is None + self._ad = self._h5file.create_dataset( + "audio", + (1, self._blocksize, self._nchannels), + dtype=dtype, + maxshape=( + None, # This means, we can add blocks + # indefinitely + self._blocksize, + self._nchannels, + ), + compression="gzip", + ) + self._ad[0, :, :] = adata - self._ad = self._h5file.create_dataset( - "audio", - (1, self._blocksize, self._nchannels), - dtype=dtype, - maxshape=( - None, # This means, we can add blocks - # indefinitely - self._blocksize, - self._nchannels, - ), - compression="gzip", - ) - self._ad[0, :, :] = adata - - self._h5file.flush() - self._deleteFile = False + self._h5file.flush() + self._deleteFile = False def setDelete(self, val: bool): """ @@ -279,36 +291,35 @@ class Recording: remove the queue from the stream, etc. """ - logging.debug("Recording::finish()") + logger.debug("Recording::finish()") self._stop <<= True - with self._rec_mutex: - if self._recState == RecordingState.Finished: - raise RuntimeError('Recording has already finished') - self._h5file.flush() + if self._recState == RecordingState.Finished: + raise RuntimeError("Recording has already finished") + # Remove indata handler, which also should remove callback function # from StreamMgr. This, however does not have to happen # instantaneously. For which we have to implement extra mutex # guards in this class del self._indataHandler - self._indataHandler = None + + self._h5file.flush() # Remove handle to dataset otherwise the h5 file is not closed # properly. del self._ad - self._ad = None try: # Close the recording file self._h5file.close() del self._h5file except Exception as e: - logging.error(f"Error closing file: {e}") + logger.error(f"Error closing file: {e}") - logging.debug("Recording ended") + logger.debug("Recording ended") if self._deleteFile: self.__deleteFile() self._recState = RecordingState.Finished @@ -320,16 +331,17 @@ class Recording: try: os.remove(self._fn) except Exception as e: - logging.error(f"Error deleting file: {self._fn}: {str(e)}") + logger.error(f"Error deleting file: {self._fn}: {str(e)}") def __addTimeDataToFile(self, indata): """ Called by handleQueue() and adds new time data to the storage file. """ + with self._rec_mutex: - ablockno = self._recordedBlocks + ablockno = self._recordedBlocks - # Add the data to the file, and resize the audio data blocks - self._ad.resize(ablockno + 1, axis=0) - self._ad[ablockno, :, :] = indata - self._h5file.flush() + # Add the data to the file, and resize the audio data blocks + self._ad.resize(ablockno + 1, axis=0) + self._ad[ablockno, :, :] = indata + self._h5file.flush()