From fae906884ed9628be77df90c67a2dba7f9629102 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Mon, 10 Oct 2022 19:17:38 +0200 Subject: [PATCH] Textual improvements. Formatting (Neoformat) improvements. Let DataTypeDescriptor be a reference (to a global const std map in Daq class. Improved naming of certain functions. Better DaqData implementation, now we make sure memory alignment is good at all times. Switched functions arguments in DaqData constructor to comply with all other cases of first frame, then channel. Better naming of stream in RtAudio. Better handling of faulty function calling in RtAudio start(). Bugfix in RtAudio, did call right Daq::dtypeDescr() function --> result was that only first channel was copied to all channels. Added extra check in StreamMgr. Removed unnecessary TypedDaqData class. Use a safe queue in threaded in data handler. We can now remove the Boost code for that. --- CMakeLists.txt | 9 +- README.md | 19 +- cmake/rtaudio.cmake | 1 + pyproject.toml | 11 -- setup.py | 48 ++++- src/lasp/device/lasp_daq.cpp | 46 ++--- src/lasp/device/lasp_daq.h | 2 +- src/lasp/device/lasp_daqconfig.cpp | 8 +- src/lasp/device/lasp_daqconfig.h | 12 +- src/lasp/device/lasp_daqdata.cpp | 202 +++++++++++++++----- src/lasp/device/lasp_daqdata.h | 141 +++++++++----- src/lasp/device/lasp_rtaudiodaq.cpp | 103 ++++++---- src/lasp/device/lasp_streammgr.cpp | 52 +++-- src/lasp/device/lasp_uldaq.cpp | 18 +- src/lasp/dsp/lasp_ppm.cpp | 47 +++-- src/lasp/dsp/lasp_ppm.h | 2 +- src/lasp/dsp/lasp_threadedindatahandler.cpp | 57 +++++- src/lasp/dsp/lasp_threadedindatahandler.h | 6 +- src/lasp/dsp/lasp_types.h | 1 + src/lasp/lasp_record.py | 1 + src/lasp/pybind11/lasp_pyindatahandler.cpp | 76 ++++++-- src/lasp/pybind11/lasp_streammgr.cpp | 22 ++- third_party/armadillo-code | 2 +- 23 files changed, 590 insertions(+), 296 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 92963a5..81ee98e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,9 +45,14 @@ set(CMAKE_POSITION_INDEPENDENT_CODE ON) if(${CMAKE_BUILD_TYPE} STREQUAL "Debug") set(LASP_DEBUG True) - # This does not work nicely with RtAudio. However, if we compile it + # This does not work nicely with RtAudio if it is precompiled (undefined + # symbols). However, if we compile it # ourselves as a third_party ref, this works nicely together. - add_definitions(-D_GLIBCXX_DEBUG) + + # This flag gives "floating point exception: core dumped" + # add_definitions(-D_GLIBCXX_DEBUG) + + # add_definitions(ARMA_EXTRA_DEBUG) else() set(LASP_DEBUG False) diff --git a/README.md b/README.md index 89892a4..98bfcc0 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,11 @@ # Library for Acoustic Signal Processing -Welcome to LASP: Library for Acoustic Signal Processing. LASP is a C library +Welcome to LASP: Library for Acoustic Signal Processing. LASP is a C++ library with a Python interface which is supposed to process (multi-) microphone acoustic data in real time on a PC and output results. -The main goal of this library will be the processing of data from an -array of microphones real time, on a Raspberry PI. At the point in -time of this writing, we are yet unsure whether the Raspberry PI will -have enough computational power to this end, but may be by the time it +At the point in time of this writing, we are yet unsure whether the Raspberry +PI will have enough computational power to this end, but may be by the time it is finished, we have a new faster generation :). Current features that are implemented: @@ -41,7 +39,7 @@ If you have any question(s), please feel free to contact us: info@ascee.nl. Two commands that install all requirements (for Ubuntu / Linux Mint) - `pip install scipy numpy build scikit-build appdirs` -- `sudo apt install libusb-dev +- `sudo apt install libusb-dev libpulse-dev libboost-dev` ## Runtime dependencies (Linux) @@ -52,14 +50,12 @@ Two commands that install all requirements (for Ubuntu / Linux Mint) - RtAudio, for Audio DAQ backends - libusb - BLAS (OpenBLAS, other). -- RtAudio (optional) -- UlDaq (optional) ## Editable install In the root directory of the repository, run: -- `pip3 isntall --user --prefix=~/.local -e .` +- `pip3 isntall --user -e .` - `cmake .` - `make -j` @@ -72,7 +68,6 @@ Optional dependencies, which can be turned ON/OFF using CMake: - Build tools: compiler [http://cmake.org](CMake), the Python packages: - Scipy - Numpy - - py-build-cmake - appdirs These can all be installed using: @@ -103,7 +98,11 @@ compilation: - cython - python3-numpy - libopenblas + +If building RtAudio with the ALSA backend: + - libclalsadrv-dev - libopenblas-base - libopenblas-dev +- libusb-1.0-dev diff --git a/cmake/rtaudio.cmake b/cmake/rtaudio.cmake index ca36779..835fde6 100644 --- a/cmake/rtaudio.cmake +++ b/cmake/rtaudio.cmake @@ -6,6 +6,7 @@ if(LASP_HAS_RTAUDIO) else() set(RTAUDIO_API_PULSE TRUE CACHE BOOL "Build with PulseAudio backend" FORCE) set(RTAUDIO_API_ALSA OFF CACHE BOOL "Do not build with Alsa backend" FORCE) + set(RTAUDIO_API_JACK OFF CACHE BOOL "Do not build with Jack backend" FORCE) endif() set(RTAUDIO_BUILD_STATIC_LIBS ON CACHE BOOL "Build static libs for RtAudio" FORCE) add_subdirectory(third_party/rtaudio) diff --git a/pyproject.toml b/pyproject.toml index 771e14f..6ac7893 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,6 @@ readme = "README.md" requires-python = ">=3.8" license = { "file" = "LICENSE" } authors = [{ "name" = "J.A. de Jong et al.", "email" = "info@ascee.nl" }] -keywords = ["DSP", "DAQ", "Signal processing"] classifiers = [ "Topic :: Scientific/Engineering", @@ -14,15 +13,5 @@ classifiers = [ ] # urls = { "Documentation" = "https://" } -dependencies = ["numpy", "scipy", "appdirs", "h5py", "appdirs", "dataclasses_json", ] dynamic = ["version", "description"] -[build-system] # How pip and other frontends should build this project -requires = [ - "setuptools>=42", - "wheel", - "scikit-build", - "cmake", -] -build-backend = "setuptools.build_meta" - diff --git a/setup.py b/setup.py index 65fef76..b46819d 100644 --- a/setup.py +++ b/setup.py @@ -1,15 +1,51 @@ -from skbuild import setup +import glob +import platform + +from setuptools import setup + +if 'Linux' in platform.platform(): + extension = list(glob.glob('src/lasp/lasp_cpp.cpython*')) + if len(extension) == 0: + raise RuntimeError('Please first run CMake to build extension') + elif len(extension) > 1: + raise RuntimeError('Too many extension files found') + + pkgdata = extension + +else: + raise RuntimeError('Not yet Windows-proof') + +classifiers = [ + "Topic :: Scientific/Engineering", + "Programming Language :: Python :: 3.8", + "Operating System :: POSIX :: Linux", + "Operating System :: Microsoft :: Windows", +] + +keywords = ["DSP", "DAQ", "Signal processing"] + +with open('README.md', 'r') as f: + readme = f.read() + setup( name="lasp", version="1.0", - description="LASP Library of Acoustic Signal Processing", + description="LASP: Library for Acoustic Signal Processing", + author='J.A. de Jong (ASCEE / Redu-Sone)', author_email='info@ascee.nl', + + url='https://www.ascee.nl/lasp', + classifiers=classifiers, + keywords=keywords, license="MIT", - packages=['lasp'], - package_dir= {'': 'src'}, - cmake_install_dir='src/lasp', - # cmake_install_target='src', + readme=readme, + dependencies=["numpy", "scipy", "appdirs", "h5py", "appdirs", + "dataclasses_json"], + packages=['lasp', 'lasp.filter', 'lasp.tools'], + data_files = pkgdata, + include_package_data=True, + package_dir={'': 'src'}, python_requires='>=3.8', ) diff --git a/src/lasp/device/lasp_daq.cpp b/src/lasp/device/lasp_daq.cpp index 1892426..158ac04 100644 --- a/src/lasp/device/lasp_daq.cpp +++ b/src/lasp/device/lasp_daq.cpp @@ -14,7 +14,7 @@ using std::runtime_error; Daq::~Daq() { DEBUGTRACE_ENTER; } std::unique_ptr Daq::createDaq(const DeviceInfo &devinfo, - const DaqConfiguration &config) { + const DaqConfiguration &config) { DEBUGTRACE_ENTER; #if LASP_HAS_ULDAQ == 1 @@ -23,37 +23,37 @@ std::unique_ptr Daq::createDaq(const DeviceInfo &devinfo, } #endif #if LASP_HAS_RTAUDIO == 1 - // See lasp_daqconfig.h:114 ALSA, up to + // See lasp_daqconfig.h:114 ALSA, up to if (devinfo.api.apicode == LASP_RTAUDIO_APICODE) { return createRtAudioDevice(devinfo, config); } #endif throw std::runtime_error(string("Unable to match Device API: ") + - devinfo.api.apiname); + devinfo.api.apiname); } Daq::Daq(const DeviceInfo &devinfo, const DaqConfiguration &config) - : DaqConfiguration(config), DeviceInfo(devinfo) { - DEBUGTRACE_ENTER; + : DaqConfiguration(config), DeviceInfo(devinfo) { + DEBUGTRACE_ENTER; - if (!hasInternalOutputMonitor && monitorOutput) { - throw std::runtime_error( - "Output monitor flag set, but device does not have output monitor"); - } - - if (!config.match(devinfo)) { - throw std::runtime_error("DaqConfiguration does not match device info"); - } - if (neninchannels(false) > ninchannels) { - throw std::runtime_error( - "Number of enabled input channels is higher than device capability"); - } - if (nenoutchannels() > noutchannels) { - throw std::runtime_error( - "Number of enabled output channels is higher than device capability"); - } + if (!hasInternalOutputMonitor && monitorOutput) { + throw std::runtime_error( + "Output monitor flag set, but device does not have output monitor"); } + if (!config.match(devinfo)) { + throw std::runtime_error("DaqConfiguration does not match device info"); + } + if (neninchannels(false) > ninchannels) { + throw std::runtime_error( + "Number of enabled input channels is higher than device capability"); + } + if (nenoutchannels() > noutchannels) { + throw std::runtime_error( + "Number of enabled output channels is higher than device capability"); + } +} + double Daq::samplerate() const { DEBUGTRACE_ENTER; return availableSampleRates.at(sampleRateIndex); @@ -62,7 +62,9 @@ double Daq::samplerate() const { DataTypeDescriptor::DataType Daq::dataType() const { return availableDataTypes.at(dataTypeIndex); } -DataTypeDescriptor Daq::dtypeDescr() const { return dtype_map.at(dataType()); } +const DataTypeDescriptor &Daq::dtypeDescr() const { + return dtype_map.at(dataType()); +} double Daq::inputRangeForChannel(us ch) const { if (!(ch < ninchannels)) { diff --git a/src/lasp/device/lasp_daq.h b/src/lasp/device/lasp_daq.h index fe57355..6779cda 100644 --- a/src/lasp/device/lasp_daq.h +++ b/src/lasp/device/lasp_daq.h @@ -176,7 +176,7 @@ public: * * @return A DataTypeDescriptor */ - DataTypeDescriptor dtypeDescr() const; + const DataTypeDescriptor& dtypeDescr() const; /** * @brief The number of frames that is send in a block of DaqData. diff --git a/src/lasp/device/lasp_daqconfig.cpp b/src/lasp/device/lasp_daqconfig.cpp index bb6d33b..e8b2010 100644 --- a/src/lasp/device/lasp_daqconfig.cpp +++ b/src/lasp/device/lasp_daqconfig.cpp @@ -57,7 +57,7 @@ bool DaqConfiguration::match(const DeviceInfo &dev) const { return (dev.device_name == device_name && dev.api == api); } -int DaqConfiguration::getHighestInChannel() const { +int DaqConfiguration::getHighestEnabledInChannel() const { for (int i = inchannel_config.size() - 1; i > -1; i--) { if (inchannel_config.at(i).enabled) return i; @@ -65,21 +65,21 @@ int DaqConfiguration::getHighestInChannel() const { return -1; } -int DaqConfiguration::getHighestOutChannel() const { +int DaqConfiguration::getHighestEnabledOutChannel() const { for (us i = outchannel_config.size() - 1; i >= 0; i--) { if (outchannel_config.at(i).enabled) return i; } return -1; } -int DaqConfiguration::getLowestInChannel() const { +int DaqConfiguration::getLowestEnabledInChannel() const { for (us i = 0; i < inchannel_config.size(); i++) { if (inchannel_config.at(i).enabled) return i; } return -1; } -int DaqConfiguration::getLowestOutChannel() const { +int DaqConfiguration::getLowestEnabledOutChannel() const { for (us i = 0; i < outchannel_config.size(); i++) { if (outchannel_config.at(i).enabled) return i; diff --git a/src/lasp/device/lasp_daqconfig.h b/src/lasp/device/lasp_daqconfig.h index dc5248d..ae334bb 100644 --- a/src/lasp/device/lasp_daqconfig.h +++ b/src/lasp/device/lasp_daqconfig.h @@ -303,13 +303,13 @@ public: bool match(const DeviceInfo &devinfo) const; /** - * @brief Get the highest channel number from the list of enabled input + * @brief Get the enabled highest channel number from the list of enabled input * channels. * * @return Index to the highest input channel. -1 if no input channels are * enabled. */ - int getHighestInChannel() const; + int getHighestEnabledInChannel() const; /** * @brief Get the highest channel number from the list of enabled output * channels. @@ -317,7 +317,7 @@ public: * @return Index to the highest input channel. -1 if no output channels are * enabled. */ - int getHighestOutChannel() const; + int getHighestEnabledOutChannel() const; /** * @brief Get the lowest channel number from the list of enabled input @@ -326,15 +326,15 @@ public: * @return Index to the lowest input channel. -1 if no input channels are * enabled. */ - int getLowestInChannel() const; + int getLowestEnabledInChannel() const; /** * @brief Get the lowest channel number from the list of enabled output * channels. * - * @return Index to the lowest input channel. -1 if no output channels are + * @return Index to the lowest output channel. -1 if no output channels are * enabled. */ - int getLowestOutChannel() const; + int getLowestEnabledOutChannel() const; /** * @brief Set all input channels to enabled / disabled state. diff --git a/src/lasp/device/lasp_daqdata.cpp b/src/lasp/device/lasp_daqdata.cpp index 1e26091..dd52d0d 100644 --- a/src/lasp/device/lasp_daqdata.cpp +++ b/src/lasp/device/lasp_daqdata.cpp @@ -1,106 +1,210 @@ /* #define DEBUGTRACE_ENABLED */ -#include "lasp_daqdata.h" #include "debugtrace.hpp" +#include +#include "lasp_daqdata.h" #include "lasp_mathtypes.h" #include +#include + +using std::cerr; +using std::cout; +using std::endl; +using rte = std::runtime_error; + +static_assert(sizeof(byte_t) == 1, "Invalid char size"); DEBUGTRACE_VARIABLES; -DaqData::DaqData(const us nchannels, const us nframes, +/// Constructors and destructors +DaqData::DaqData(const us nframes, const us nchannels, const DataTypeDescriptor::DataType dtype) - : nchannels(nchannels), nframes(nframes), dtype(dtype), - dtype_descr(dtype_map.at(dtype)), sw(dtype_descr.sw) { - static_assert(sizeof(char) == 1, "Invalid char size"); - const DataTypeDescriptor &desc = dtype_map.at(dtype); - _data.resize(nframes * nchannels * desc.sw); + : nframes(nframes), nchannels(nchannels), dtype(dtype), + dtype_descr(dtype_map.at(dtype)), sw(dtype_descr.sw) { + + DEBUGTRACE_ENTER; + DEBUGTRACE_PRINT(sw); + + _data = new (std::align_val_t{8}) byte_t[sw * nchannels * nframes]; + if (!_data) { + throw rte("Could not allocate memory for DaqData!"); + } +} +DaqData::DaqData(const DaqData &o) : DaqData(o.nframes, o.nchannels, o.dtype) { + + DEBUGTRACE_ENTER; + /* std::copy(o._data, &o._data[sw * nchannels * nframes], _data); */ + memcpy(_data, o._data, sw * nchannels * nframes); } -void DaqData::copyInFromRaw(const std::vector &ptrs) { +/* DaqData::DaqData(DaqData &&o) */ +/* : nframes(o.nframes), nchannels(o.nchannels), dtype(o.dtype), */ +/* dtype_descr(std::move(o.dtype_descr)), sw(o.sw) { */ + +/* DEBUGTRACE_ENTER; */ +/* _data = o._data; */ +/* /// Nullptrs do not get deleted */ +/* o._data = nullptr; */ +/* } */ + +DaqData::~DaqData() { + DEBUGTRACE_ENTER; + if (_data) + delete[] _data; +} + +void DaqData::copyInFromRaw(const std::vector &ptrs) { + DEBUGTRACE_ENTER; us ch = 0; assert(ptrs.size() == nchannels); - for (auto ptr : ptrs) { - std::copy(ptr, ptr + sw * nframes, &_data[sw * ch * nframes]); + for (auto& ptr : ptrs) { + assert(ch < nchannels); + memcpy(&_data[sw * ch * nframes], ptr, sw * nframes); + /* std::copy(ptr, ptr + sw * nframes, &_data[sw * ch * nframes]); */ ch++; } + dmat data2(nframes, nchannels); } -void DaqData::copyToRaw(const us channel, uint8_t *ptr) { - std::copy(raw_ptr(0, channel), - raw_ptr(nframes, channel), ptr); +void DaqData::copyToRaw(const us channel, byte_t *ptr) { + /* std::copy(raw_ptr(0, channel), raw_ptr(nframes, channel), ptr); */ + memcpy(ptr, raw_ptr(0, channel), sw*nframes); } -template d convertToFloat(const int_type val) { - if constexpr (std::is_integral::value) { - return static_cast(val) / std::numeric_limits::max(); +template +d DaqData::toFloat(const us frame, const us channel) const { + DEBUGTRACE_ENTER; + if constexpr (std::is_integral::value) { + return static_cast(value(frame, channel)) / + std::numeric_limits::max(); } else { - return static_cast(val); + return static_cast(value(frame, channel)); } } -template -inline vd channelToFloat(const byte_t *vals, const us nframes) { + +template vd DaqData::toFloat(const us channel) const { + DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + check_type(); +#endif vd res(nframes); for (us i = 0; i < nframes; i++) { - res(i) = convertToFloat(reinterpret_cast(vals)[i]); + res(i) = toFloat(i, channel); } return res; } -template -inline dmat allToFloat(const byte_t *vals, const us nframes, const us nchannels) { +template dmat DaqData::toFloat() const { + + DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + check_type(); +#endif dmat res(nframes, nchannels); - for (us j = 0; j < nchannels; j++) { - for (us i = 0; i < nframes; i++) { - res(i, j) = convertToFloat( - reinterpret_cast(vals)[i + j * nframes]); + + for (us i = 0; i < nframes; i++) { + for (us j = 0; j < nchannels; j++) { + res(i, j) = toFloat(i, j); } } return res; } -vd DaqData::toFloat(const us channel_no) const { +d DaqData::toFloat(const us frame, const us channel) const { + DEBUGTRACE_ENTER; using DataType = DataTypeDescriptor::DataType; switch (dtype) { + case (DataType::dtype_int8): + return toFloat(frame, channel); + break; + case (DataType::dtype_int16): + return toFloat(frame, channel); + break; + case (DataType::dtype_int32): + return toFloat(frame, channel); + break; + case (DataType::dtype_fl32): + return toFloat(frame, channel); + break; + case (DataType::dtype_fl64): + return toFloat(frame, channel); + break; + default: + throw std::runtime_error("BUG"); + } // End of switch + + // Never arrives her + return 0; +} + +vd DaqData::toFloat(const us channel_no) const { + DEBUGTRACE_ENTER; + using DataType = DataTypeDescriptor::DataType; + cerr << (int) dtype << endl; + switch (dtype) { case (DataType::dtype_int8): { - return channelToFloat(raw_ptr(0, channel_no), nframes); + return toFloat(channel_no); } break; case (DataType::dtype_int16): { - return channelToFloat(raw_ptr(0, channel_no), nframes); - + return toFloat(channel_no); } break; case (DataType::dtype_int32): { - return channelToFloat(raw_ptr(0, channel_no), nframes); + return toFloat(channel_no); } break; case (DataType::dtype_fl32): { - return channelToFloat(raw_ptr(0, channel_no), nframes); + return toFloat(channel_no); } break; case (DataType::dtype_fl64): { - return channelToFloat(raw_ptr(0, channel_no), nframes); + return toFloat(channel_no); } break; default: throw std::runtime_error("BUG"); } // End of switch + + // Never arrives here + return vd(); } + dmat DaqData::toFloat() const { - dmat result(nframes, nchannels); + + DEBUGTRACE_ENTER; + /* DEBUGTRACE_PRINT(nframes); */ + /* DEBUGTRACE_PRINT(nchannels); */ + using DataType = DataTypeDescriptor::DataType; + + /* cerr << "DataType: " << (int) dtype << endl; */ + switch (dtype) { - case (DataType::dtype_int8): { - return allToFloat(raw_ptr(0), nframes, nchannels); - } break; - case (DataType::dtype_int16): { - return allToFloat(raw_ptr(0), nframes, nchannels); - } break; - case (DataType::dtype_int32): { - return allToFloat(raw_ptr(0), nframes, nchannels); - } break; - case (DataType::dtype_fl32): { - return allToFloat(raw_ptr(0), nframes, nchannels); - } break; - case (DataType::dtype_fl64): { - return allToFloat(raw_ptr(0), nframes, nchannels); - } break; + case (DataType::dtype_int8): + return toFloat(); + break; + case (DataType::dtype_int16): + return toFloat(); + break; + case (DataType::dtype_int32): + return toFloat(); + break; + case (DataType::dtype_fl32): + return toFloat(); + break; + case (DataType::dtype_fl64): + return toFloat(); + break; default: throw std::runtime_error("BUG"); } // End of switch + + // Never reached + return dmat(); +} + +void DaqData::print() const { + cout << "Number of frames: " << nframes << endl; + cout << "Number of channels: " << nchannels << endl; + cout << "DataType: " << dtype_map.at(dtype).name << endl; + cout << "First sample of first channel (as float)" << toFloat(0,0) << endl; + cout << "Last sample of first channel (as float)" << toFloat(nframes-1,0) << endl; + cout << "Last sample of last channel (as float)" << toFloat(nframes-1,nchannels-1) << endl; + } diff --git a/src/lasp/device/lasp_daqdata.h b/src/lasp/device/lasp_daqdata.h index 84e91ee..034ebfa 100644 --- a/src/lasp/device/lasp_daqdata.h +++ b/src/lasp/device/lasp_daqdata.h @@ -1,10 +1,12 @@ #pragma once #include "lasp_daqconfig.h" #include "lasp_types.h" +#include +#include #include #include #include -#include +#include /** \addtogroup device * @{ @@ -22,19 +24,19 @@ protected: /** * @brief Storage for the actual data. */ - std::vector _data; + byte_t *_data; public: - /** - * @brief The number of channels - */ - us nchannels; - /** * @brief The number of frames in this block of data. */ us nframes; + /** + * @brief The number of channels + */ + us nchannels; + /** * @brief The data type corresponding to a sample */ @@ -53,22 +55,23 @@ public: /** * @brief Initialize an empty frame of data * - * @param nchannels The number of channels * @param nframes The number of frames + * @param nchannels The number of channels * @param dtype The data type */ - DaqData(const us nchannels, const us nframes, + DaqData(const us nframes, const us nchannels, const DataTypeDescriptor::DataType dtype); /** * @brief Initialize using no allocation */ - DaqData() - : DaqData(0, 0, DataTypeDescriptor::DataType::dtype_int8) {} - virtual ~DaqData() = default; - DaqData &operator=(const DaqData &) = default; + DaqData(const DaqData &); + /* DaqData(DaqData &&); */ + DaqData &operator=(const DaqData &) = delete; + virtual ~DaqData(); /** - * @brief Return pointer to the raw data corresponding to a certain sample. + * @brief Return pointer to the raw data corresponding to a certain sample + * (frame, channel combo). * * @param frame The frame number * @param channel The channel number @@ -76,16 +79,14 @@ public: * @return Pointer to sample, not casted to final type */ byte_t *raw_ptr(const us frame = 0, const us channel = 0) { - return &(_data.data()[sw * (frame + channel * nframes)]); + assert(frame < nframes); + assert(channel < nchannels); + return &(_data[sw * (frame + channel * nframes)]); } const byte_t *raw_ptr(const us frame = 0, const us channel = 0) const { - return &(_data.data()[sw * (frame + channel * nframes)]); - } - - void setSlow(const us frame, const us channel, const int8_t *val) { - for (us i = 0; i < sw; i++) { - _data.at(i + sw * (frame + channel * nframes)) = val[i]; - } + assert(frame < nframes); + assert(channel < nchannels); + return &(_data[sw * (frame + channel * nframes)]); } /** @@ -93,7 +94,7 @@ public: * * @return Number of bytes of data. */ - us size_bytes() const { return _data.size(); } + us size_bytes() const { return sw * nchannels * nframes; } /** * @brief Copy data from a set of raw pointers of *uninterleaved* data. @@ -101,7 +102,7 @@ public: * * @param ptrs Pointers to data from channels */ - void copyInFromRaw(const std::vector &ptrs); + void copyInFromRaw(const std::vector &ptrs); /** * @brief Copy contents of DaqData for a certain channel to a raw pointer. @@ -109,7 +110,7 @@ public: * @param channel The channel to copy. * @param ptr The pointer where data is copied to. */ - void copyToRaw(const us channel, uint8_t *ptr); + void copyToRaw(const us channel, byte_t *ptr); /** * @brief Convert samples to floating point values and return a nframes x @@ -121,7 +122,7 @@ public: arma::Mat toFloat() const; /** - * @brief Convert samples to floating point values and return a nframes + * @brief Convert samples to floating point value; and return a nframes * column vector of floats. For data that is not already floating-point, * the data is scaled back from MAX_INT to +1.0. * @@ -130,39 +131,73 @@ public: * @return Array of floats */ arma::Col toFloat(const us channel_no) const; -}; - -template class TypedDaqData : public DaqData { -public: - TypedDaqData(const us nchannels, const us nframes, - const DataTypeDescriptor::DataType dtype_descr) - : DaqData(nchannels, nframes, dtype_descr) {} - - T &operator[](const us i) { return _data[sw * i]; } /** - * @brief Reference of sample, casted to the right type. Same as raw_ptr(), - * but then as reference, and corresponding to right type. + * @brief Convert single sample to floating point value; and return a nframes + * column vector of floats. For data that is not already floating-point, + * the data is scaled back from MAX_INT to +1.0. * + * @param channel The channel to convert + * @param frame_no The frame number to convert + * + * @return Float value + */ + d toFloat(const us frame_no, const us channel_no) const; + + // Return value based on type + template T &value(const us frame, const us channel) { +#if LASP_DEBUG == 1 + check_type(); +#endif + return *reinterpret_cast(raw_ptr(frame, channel)); + } + template const T &value(const us frame, const us channel) const { +#if LASP_DEBUG == 1 + check_type(); +#endif + return *reinterpret_cast(raw_ptr(frame, channel)); + } + + /** + * @brief For debugging purposes: prints some stats + */ + void print() const; + +protected: + template void check_type() const { + using DataType = DataTypeDescriptor::DataType; + + bool correct = false; + static_assert(std::is_arithmetic::value); + + if constexpr (std::is_floating_point::value) { + correct |= sizeof(T) == 4 && dtype == DataType::dtype_fl32; + correct |= sizeof(T) == 8 && dtype == DataType::dtype_fl64; + + } else { + correct |= sizeof(T) == 1 && dtype == DataType::dtype_int8; + correct |= sizeof(T) == 2 && dtype == DataType::dtype_int16; + correct |= sizeof(T) == 4 && dtype == DataType::dtype_int32; + } + if (!correct) { + throw std::runtime_error("Wrong datatype for template argument"); + } + } + + template arma::Mat toFloat() const; + template arma::Col toFloat(const us channel_no) const; + template d toFloat(const us frame_no, const us channel_no) const; + + /** + * @brief Return a value as floating point. Does a conversion from integer to + * float, if the stored type is integer. Also scales by its maximum value. + * + * @tparam T The original type * @param frame Frame number * @param channel Channel number * - * @return + * @return Value converted to floating point */ - T &operator()(const us frame = 0, const us channel = 0) { - return reinterpret_cast(*raw_ptr(frame, channel)); - } - - /** - * @brief Copy out the data for a certain channel to a buffer - * - * @param channel The channel to copy - * @param buffer The buffer to copy to. *Make sure* it is allocated with at - * least `sw*nframes` bytes. - */ - void copyToRaw(const us channel, T *buffer) { - DaqData::copyToRaw(channel, reinterpret_cast(buffer)); - } - }; + /** @} */ diff --git a/src/lasp/device/lasp_rtaudiodaq.cpp b/src/lasp/device/lasp_rtaudiodaq.cpp index 46e30fe..6716770 100644 --- a/src/lasp/device/lasp_rtaudiodaq.cpp +++ b/src/lasp/device/lasp_rtaudiodaq.cpp @@ -1,18 +1,21 @@ -/* #define DEBUGTRACE_ENABLED */ +#include +#define DEBUGTRACE_ENABLED #include "debugtrace.hpp" +#include "lasp_mathtypes.h" #include "lasp_rtaudiodaq.h" #if LASP_HAS_RTAUDIO == 1 -#include "lasp_daq.h" #include "RtAudio.h" +#include "lasp_daq.h" #include #include using std::atomic; using std::cerr; using std::endl; -using std::runtime_error; +using rte = std::runtime_error; using std::vector; +using lck = std::scoped_lock; DEBUGTRACE_VARIABLES; @@ -81,7 +84,7 @@ void fillRtAudioDeviceInfo(vector &devinfolist) { d.availableDataTypes.push_back( DataTypeDescriptor::DataType::dtype_int16); } - /* if (formats & RTAUDIO_SINT32) { */ + /* if (formats & RTAUDIO_SINT24) { *1/ */ /* d.availableDataTypes.push_back(DataTypeDescriptor::DataType::dtype_int24); */ /* } */ @@ -137,7 +140,7 @@ public: // We make sure not to run RtAudio in duplex mode. This seems to be buggy // and untested. Better to use a hardware-type loopback into the system. if (duplexMode()) { - throw runtime_error("RtAudio backend cannot run in duplex mode."); + throw rte("RtAudio backend cannot run in duplex mode."); } assert(!monitorOutput); @@ -148,9 +151,9 @@ public: inParams = std::make_unique(); // +1 to get the count. - inParams->nChannels = getHighestInChannel() + 1; + inParams->nChannels = getHighestEnabledInChannel() + 1; if (inParams->nChannels < 1) { - throw runtime_error("Invalid input number of channels"); + throw rte("Invalid input number of channels"); } inParams->firstChannel = 0; inParams->deviceId = devinfo.api_specific_devindex; @@ -159,9 +162,9 @@ public: outParams = std::make_unique(); - outParams->nChannels = getHighestOutChannel() + 1; + outParams->nChannels = getHighestEnabledOutChannel() + 1; if (outParams->nChannels < 1) { - throw runtime_error("Invalid output number of channels"); + throw rte("Invalid output number of channels"); } outParams->firstChannel = 0; outParams->deviceId = devinfo.api_specific_devindex; @@ -171,7 +174,7 @@ public: streamoptions.flags = RTAUDIO_HOG_DEVICE | RTAUDIO_NONINTERLEAVED; streamoptions.numberOfBuffers = 2; - streamoptions.streamName = "RtAudio stream"; + streamoptions.streamName = "LASP RtAudio DAQ stream"; streamoptions.priority = 0; RtAudioFormat format; @@ -179,22 +182,27 @@ public: const Dtype dtype = dataType(); switch (dtype) { case Dtype::dtype_fl32: + DEBUGTRACE_PRINT("Datatype float32"); format = RTAUDIO_FLOAT32; break; case Dtype::dtype_fl64: + DEBUGTRACE_PRINT("Datatype float64"); format = RTAUDIO_FLOAT64; break; case Dtype::dtype_int8: + DEBUGTRACE_PRINT("Datatype int8"); format = RTAUDIO_SINT8; break; case Dtype::dtype_int16: + DEBUGTRACE_PRINT("Datatype int16"); format = RTAUDIO_SINT16; break; case Dtype::dtype_int32: + DEBUGTRACE_PRINT("Datatype int32"); format = RTAUDIO_SINT32; break; default: - throw runtime_error("Invalid data type specified for DAQ stream."); + throw rte("Invalid data type specified for DAQ stream."); break; } @@ -207,38 +215,46 @@ public: static_cast(samplerate()), &nFramesPerBlock_copy, mycallback, (void *)this, &streamoptions, &myerrorcallback); + + if (nFramesPerBlock_copy != nFramesPerBlock) { + throw rte("Got different number of frames per block back from RtAudio " + "backend. Do not know what to do"); + } } virtual void start(InDaqCallback inCallback, OutDaqCallback outCallback) override final { DEBUGTRACE_ENTER; + lck lock(_mtx); assert(!monitorOutput); - if (StreamStatus().runningOK()) { - throw runtime_error("Stream already running"); + if (getStreamStatus().runningOK()) { + throw rte("Stream already running"); } // Logical XOR if (inCallback && outCallback) { - throw runtime_error("Either input or output stream possible for RtAudio. " - "Stream duplex mode not provided."); + throw rte("Either input or output stream possible for RtAudio. " + "Stream duplex mode not provided."); } - if (inCallback) { - _incallback = inCallback; - if (neninchannels() == 0) { - throw runtime_error( + if (neninchannels() > 0) { + if (!inCallback) { + throw rte( + "Input callback given, but stream does not provide input data"); } + + _incallback = inCallback; } - if (outCallback) { - _outcallback = outCallback; - if (nenoutchannels() == 0) { - throw runtime_error( + if (nenoutchannels() > 0) { + if (!outCallback) { + throw rte( "Output callback given, but stream does not provide output data"); } + _outcallback = outCallback; } // Start the stream. Throws on error. @@ -264,10 +280,9 @@ public: } int streamCallback(void *outputBuffer, void *inputBuffer, - unsigned int nFrames, double streamTime, - RtAudioStreamStatus status) { + unsigned int nFrames, RtAudioStreamStatus status) { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; using se = StreamStatus::StreamError; @@ -294,8 +309,9 @@ public: break; } - const auto &dtype_descr = DataTypeDescriptor(); + const auto &dtype_descr = dtypeDescr(); const auto dtype = dataType(); + us neninchannels = this->neninchannels(); us nenoutchannels = this->nenoutchannels(); us sw = dtype_descr.sw; @@ -308,18 +324,22 @@ public: if (inputBuffer) { assert(_incallback); - std::vector ptrs; + std::vector ptrs; ptrs.reserve(neninchannels); + const us ch_min = getLowestEnabledInChannel(); + const us ch_max = getHighestEnabledInChannel(); us i = 0; - for (int ch = getLowestInChannel(); ch <= getHighestInChannel(); ch++) { + for (us ch = ch_min; ch <= ch_max; ch++) { if (inchannel_config.at(ch).enabled) { - ptrs.push_back(static_cast(inputBuffer) + - sw * i * nFramesPerBlock); - i++; + byte_t *ptr = + static_cast(inputBuffer) + sw * i * nFramesPerBlock; + DEBUGTRACE_PRINT((us)ptr); + ptrs.push_back(ptr); } + i++; } - DaqData d{neninchannels, nFramesPerBlock, dtype}; + DaqData d{nFramesPerBlock, neninchannels, dtype}; d.copyInFromRaw(ptrs); bool ret = _incallback(d); @@ -331,22 +351,23 @@ public: if (outputBuffer) { assert(_outcallback); - std::vector ptrs; + std::vector ptrs; ptrs.reserve(nenoutchannels); /* outCallback */ - us i = 0; - for (int ch = getLowestOutChannel(); ch <= getHighestOutChannel(); ch++) { + const us ch_min = getLowestEnabledOutChannel(); + const us ch_max = getHighestEnabledOutChannel(); + us i = 0; + for (us ch = ch_min; ch <= ch_max; ch++) { if (outchannel_config.at(ch).enabled) { - ptrs.push_back(static_cast(outputBuffer) + + ptrs.push_back(static_cast(outputBuffer) + sw * i * nFramesPerBlock); - - i++; } + i++; } - DaqData d{nenoutchannels, nFramesPerBlock, dtype}; + DaqData d{nFramesPerBlock, nenoutchannels, dtype}; bool ret = _outcallback(d); if (!ret) { @@ -381,7 +402,7 @@ int mycallback(void *outputBuffer, void *inputBuffer, unsigned int nFrames, double streamTime, RtAudioStreamStatus status, void *userData) { return static_cast(userData)->streamCallback( - outputBuffer, inputBuffer, nFrames, streamTime, status); + outputBuffer, inputBuffer, nFrames, status); } #endif // LASP_HAS_RTAUDIO == 1 diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index cdf9118..1d07dad 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -31,6 +31,7 @@ InDataHandler::~InDataHandler() { "InDataHandler's destructor. Fix this by calling " "InDataHandler::stop() from the derived class' destructor." << endl; + abort(); } #endif } @@ -60,7 +61,7 @@ void StreamMgr::rescanDAQDevices(bool background, DEBUGTRACE_ENTER; checkRightThread(); - if(!_devices_mtx.try_lock()) { + if (!_devices_mtx.try_lock()) { throw rte("A background DAQ device scan is probably already running"); } _devices_mtx.unlock(); @@ -69,11 +70,11 @@ void StreamMgr::rescanDAQDevices(bool background, throw rte("Rescanning DAQ devices only possible when no stream is running"); } _devices.clear(); - auto &pool = getPool(); - if (background) { - pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); - } else { + /* auto &pool = getPool(); */ + if (!background) { rescanDAQDevices_impl(callback); + } else { + /* pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); */ } } void StreamMgr::rescanDAQDevices_impl(std::function callback) { @@ -86,7 +87,7 @@ void StreamMgr::rescanDAQDevices_impl(std::function callback) { } bool StreamMgr::inCallback(const DaqData &data) { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; std::scoped_lock lck(_inDataHandler_mtx); @@ -95,7 +96,6 @@ bool StreamMgr::inCallback(const DaqData &data) { bool res = handler->inCallback(data); if (!res) return false; - } return true; } @@ -197,6 +197,12 @@ StreamMgr::~StreamMgr() { DEBUGTRACE_ENTER; checkRightThread(); stopAllStreams(); + if (!_inDataHandlers.empty()) { + cerr << "*** WARNING: InDataHandlers have not been all stopped, while " + "StreamMgr destructor is called. This is a misuse BUG" + << endl; + abort(); + } } void StreamMgr::stopAllStreams() { DEBUGTRACE_ENTER; @@ -213,6 +219,10 @@ void StreamMgr::startStream(const DaqConfiguration &config) { config.inchannel_config.cend(), [](auto &i) { return i.enabled; }); + bool isOutput = std::count_if(config.outchannel_config.cbegin(), + config.outchannel_config.cend(), + [](auto &i) { return i.enabled; }); + // Find the first device that matches with the configuration DeviceInfo devinfo; @@ -231,10 +241,6 @@ void StreamMgr::startStream(const DaqConfiguration &config) { isInput |= config.monitorOutput && devinfo.hasInternalOutputMonitor; - bool isOutput = std::count_if(config.outchannel_config.cbegin(), - config.outchannel_config.cend(), - [](auto &i) { return i.enabled; }); - bool isDuplex = isInput && isOutput; if (!isInput && !isOutput) { @@ -242,12 +248,18 @@ void StreamMgr::startStream(const DaqConfiguration &config) { "stream. Cannot start."); } - if ((isDuplex || isInput) && _inputStream) { + if (isInput && _inputStream) { throw rte("Error: an input stream is already running. Please " "first stop existing stream"); } else if (isOutput && _outputStream) { throw rte("Error: output stream is already running. Please " "first stop existing stream"); + } else if (_inputStream) { + if (_inputStream->duplexMode() && isOutput) { + throw rte( + "Error: output stream is already running (in duplex mode). Please " + "first stop existing stream"); + } } InDaqCallback inCallback; @@ -275,7 +287,7 @@ void StreamMgr::startStream(const DaqConfiguration &config) { if (isInput) { _inputStream = std::move(daq); - for(auto& handler: _inDataHandlers) { + for (auto &handler : _inDataHandlers) { handler->reset(_inputStream.get()); } if (_inputStream->duplexMode()) { @@ -297,7 +309,7 @@ void StreamMgr::stopStream(const StreamType t) { /// Kills input stream _inputStream = nullptr; /// Send reset to all in data handlers - for(auto& handler: _inDataHandlers) { + for (auto &handler : _inDataHandlers) { handler->reset(nullptr); } } break; @@ -305,6 +317,7 @@ void StreamMgr::stopStream(const StreamType t) { if (_inputStream && _inputStream->duplexMode()) { _inputStream = nullptr; } else { + if (!_outputStream) { throw rte("Output stream is not running"); } @@ -322,7 +335,7 @@ void StreamMgr::addInDataHandler(InDataHandler &handler) { checkRightThread(); std::scoped_lock lck(_inDataHandler_mtx); _inDataHandlers.push_back(&handler); - if(_inputStream) { + if (_inputStream) { handler.reset(_inputStream.get()); } else { handler.reset(nullptr); @@ -340,16 +353,19 @@ Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const { checkRightThread(); // Default constructor, says stream is not running, but also no errors - Daq::StreamStatus s; const Daq *daq = getDaq(type); if (daq) { - s = daq->getStreamStatus(); + return daq->getStreamStatus(); + } else { + return Daq::StreamStatus(); } - return s; } const Daq *StreamMgr::getDaq(StreamType type) const { + + checkRightThread(); + if (type == StreamType::input) { return _inputStream.get(); } else { diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index 24fb61a..c32c6cb 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -254,22 +254,22 @@ public: auto runCallback = ([&](us totalOffset) { /* DEBUGTRACE_ENTER; */ - TypedDaqData data(nchannels, nFramesPerBlock, - DataTypeDescriptor::DataType::dtype_fl64); + DaqData data(nFramesPerBlock, nchannels, + DataTypeDescriptor::DataType::dtype_fl64); us monitorOffset = monitorOutput ? 1 : 0; /* /// Put the output monitor in front */ if (monitorOutput) { for (us sample = 0; sample < nFramesPerBlock; sample++) { - data(sample, 0) = + data.value(sample, 0) = buf[totalOffset + sample * nchannels + (nchannels - 1)]; } } - for (us channel = 0; channel < nchannels-monitorOffset; channel++) { + for (us channel = 0; channel < nchannels - monitorOffset; channel++) { /* DEBUGTRACE_PRINT(channel); */ for (us frame = 0; frame < nFramesPerBlock; frame++) { - data(frame, channel + monitorOffset) = + data.value(frame, channel + monitorOffset) = buf[totalOffset + (frame * nchannels) + channel]; } } @@ -392,20 +392,20 @@ public: if (transferStatus.currentIndex < buffer_mid_idx) { topenqueued = false; if (!botenqueued) { - TypedDaqData d(1, nFramesPerBlock, + DaqData d(nFramesPerBlock,1, DataTypeDescriptor::DataType::dtype_fl64); res = cb(d); - d.copyToRaw(0, &buf[buffer_mid_idx]); + d.copyToRaw(0, reinterpret_cast(&(buf[buffer_mid_idx]))); botenqueued = true; } } else { botenqueued = false; if (!topenqueued) { - TypedDaqData d(1, nFramesPerBlock, + DaqData d(nFramesPerBlock,1, DataTypeDescriptor::DataType::dtype_fl64); res = cb(d); - d.copyToRaw(0, buf.data()); + d.copyToRaw(0, reinterpret_cast(&(buf[0]))); topenqueued = true; } diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 2e1f276..68b8d20 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -1,6 +1,6 @@ /* #define DEBUGTRACE_ENABLED */ -#include "lasp_ppm.h" #include "debugtrace.hpp" +#include "lasp_ppm.h" #include using std::cerr; @@ -11,26 +11,40 @@ using rte = std::runtime_error; PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps) : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { + DEBUGTRACE_ENTER; - // This results in a deadlock! - /* std::scoped_lock lck(_mtx); */ - start(); } + bool PPMHandler::inCallback_threaded(const DaqData &d) { - /* DEBUGTRACE_ENTER; */ + + DEBUGTRACE_ENTER; std::scoped_lock lck(_mtx); + dmat data = d.toFloat(); + /* data.print(); */ - const us nchannels = _cur_max.size(); + const us nchannels = d.nchannels; + assert(data.n_cols == nchannels); - vrd maxabs = arma::max(arma::abs(data)); + if (nchannels != _cur_max.size()) { + DEBUGTRACE_PRINT("Resizing clip and cur max"); + _cur_max = vd(nchannels, arma::fill::value(1e-80)); + _clip_time = vd(nchannels, arma::fill::value(-1)); + } + assert(_clip_time.size() == _cur_max.size()); + + /// Obtain max abs values + vd maxabs = arma::max(arma::abs(data), 0).as_col(); + /* maxabs.print(); */ arma::uvec clip_indices = arma::find(maxabs > clip_point); arma::uvec clip(nchannels, arma::fill::zeros); clip.elem(clip_indices).fill(1); arma::uvec update_max_idx = arma::find(maxabs > _cur_max); + /* update_max_idx.print(); */ arma::uvec update_max(nchannels, arma::fill::zeros); + /* update_max.print(); */ update_max.elem(update_max_idx).fill(1); assert(_cur_max.size() == _clip_time.size()); @@ -47,8 +61,6 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) { _clip_time(i) += _dt; } - /* cerr << "maxabs(i)" << maxabs(i) << endl; */ - /* cerr << "curmax(i)" << _cur_max(i) << endl; */ if (update_max(i)) { _cur_max(i) = maxabs(i); } else { @@ -60,7 +72,7 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) { std::tuple PPMHandler::getCurrentValue() const { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; std::scoped_lock lck(_mtx); arma::uvec clips(_clip_time.size(), arma::fill::zeros); @@ -71,24 +83,21 @@ std::tuple PPMHandler::getCurrentValue() const { void PPMHandler::reset(const Daq *daq) { - DEBUGTRACE_ENTER; + /* DEBUGTRACE_ENTER; */ std::scoped_lock lck(_mtx); if (daq) { - _cur_max = vrd(daq->neninchannels(), arma::fill::zeros); + _cur_max.fill(1e-80); + ; + _clip_time.fill(-1); - _clip_time = vd(daq->neninchannels(), arma::fill::value(-1)); const d fs = daq->samplerate(); - DEBUGTRACE_PRINT(fs); + /* DEBUGTRACE_PRINT(fs); */ _dt = daq->framesPerBlock() / fs; _alpha = std::max(d_pow(10, -_dt * _decay_dBps / (20)), 0); - DEBUGTRACE_PRINT(_alpha); - - } else { - _cur_max.clear(); - _clip_time.clear(); + /* DEBUGTRACE_PRINT(_alpha); */ } } diff --git a/src/lasp/dsp/lasp_ppm.h b/src/lasp/dsp/lasp_ppm.h index cbc7776..6988605 100644 --- a/src/lasp/dsp/lasp_ppm.h +++ b/src/lasp/dsp/lasp_ppm.h @@ -52,7 +52,7 @@ class PPMHandler: public ThreadedInDataHandler { /** * @brief Current maximum values */ - vrd _cur_max; + vd _cur_max; /** * @brief How long ago the last clip has happened. Negative in case no clip diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 8d3682c..4ee16ac 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -4,25 +4,62 @@ #include "lasp_thread.h" #include #include +#include +#include using namespace std::literals::chrono_literals; +using lck = std::scoped_lock; +using rte = std::runtime_error; + +class SafeQueue { + std::queue _queue; + std::mutex _mtx; + std::atomic_int32_t _contents {0}; + public: + void push(const DaqData& d) { + DEBUGTRACE_ENTER; + lck lock(_mtx); + _queue.push(d); + _contents++; + } + DaqData pop() { + DEBUGTRACE_ENTER; + if(_contents ==0) { + throw rte("BUG: Pop on empty queue"); + } + lck lock(_mtx); + + /* DaqData d(std::move(_queue.front())); */ + DaqData d(_queue.front()); + _queue.pop(); + _contents--; + + return d; + } + bool empty() const { + return _contents == 0; + } + + +}; ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) - : InDataHandler(mgr) { + : InDataHandler(mgr), _queue(std::make_unique()) { - DEBUGTRACE_ENTER; + DEBUGTRACE_ENTER; - // Initialize thread pool, if not already done - getPool(); -} + // Initialize thread pool, if not already done + getPool(); + } bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; if (!_lastCallbackResult) { return false; } - dataqueue.push(daqdata); + + _queue->push(daqdata); auto &pool = getPool(); @@ -46,12 +83,12 @@ ThreadedInDataHandler::~ThreadedInDataHandler() { void ThreadedInDataHandler::threadFcn() { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; _thread_running = true; - DaqData d{}; - if (dataqueue.pop(d) && !_stopThread) { + if (!_queue->empty() && !_stopThread) { + DaqData d(_queue->pop()); // Call inCallback_threaded if (inCallback_threaded(d) == false) { diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index 0161d11..f628f44 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -13,7 +13,7 @@ const us RINGBUFFER_SIZE = 1024; * @{ */ - +class SafeQueue; /** * @brief Threaded in data handler. Buffers inCallback data and calls a * callback with the same signature on a different thread. @@ -22,9 +22,7 @@ class ThreadedInDataHandler: public InDataHandler { /** * @brief The queue used to push elements to the handling thread. */ - boost::lockfree::spsc_queue> - dataqueue; + std::unique_ptr _queue; std::atomic _thread_running{false}; std::atomic _stopThread{false}; diff --git a/src/lasp/dsp/lasp_types.h b/src/lasp/dsp/lasp_types.h index 692a9f5..4fd92a4 100644 --- a/src/lasp/dsp/lasp_types.h +++ b/src/lasp/dsp/lasp_types.h @@ -25,6 +25,7 @@ #include #endif +static_assert(sizeof(size_t) == 8); typedef size_t us; /* Size type I always use */ // To change the whole code to 32-bit floating points, change this to diff --git a/src/lasp/lasp_record.py b/src/lasp/lasp_record.py index 3dacecf..8f83ae5 100644 --- a/src/lasp/lasp_record.py +++ b/src/lasp/lasp_record.py @@ -99,6 +99,7 @@ class Recording: logging.debug("Starting record....") self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback) + self.indh.start() if wait: logging.debug("Stop recording with CTRL-C") diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index 694b700..046620d 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -25,13 +25,45 @@ namespace py = pybind11; * * @return Numpy array */ -template py::array_t getPyArray(const DaqData &d) { +template py::array_t getPyArrayNoCpy(const DaqData &d) { // https://github.com/pybind/pybind11/issues/323 // // When a valid object is passed as 'base', it tells pybind not to take // ownership of the data, because 'base' will own it. In fact 'packet' will // own it, but - psss! - , we don't tell it to pybind... Alos note that ANY - // valid object is good for this purpose, so I chose "str"... + // valid object is good for this purpose, so I choose "str"... + + py::str dummyDataOwner; + /* + * Signature: + array_t(ShapeContainer shape, + StridesContainer strides, + const T *ptr = nullptr, + handle base = handle()); + */ + + return py::array_t( + py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape + + py::array::StridesContainer( // Strides + {sizeof(T), + sizeof(T) * d.nframes}), // Strides (in bytes) for each index + + reinterpret_cast( + const_cast(d).raw_ptr()), // Pointer to buffer + + dummyDataOwner // As stated above, now Numpy does not take ownership of + // the data pointer. + ); +} + +template py::array_t dmat_to_ndarray(const DaqData &d) { + // https://github.com/pybind/pybind11/issues/323 + // + // When a valid object is passed as 'base', it tells pybind not to take + // ownership of the data, because 'base' will own it. In fact 'packet' will + // own it, but - psss! - , we don't tell it to pybind... Alos note that ANY + // valid object is good for this purpose, so I choose "str"... py::str dummyDataOwner; /* @@ -73,12 +105,9 @@ public: : ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) { DEBUGTRACE_ENTER; - /// TODO: Note that if start() throws an exception, which means that the - /// destructor of PyIndataHandler is not called and the thread destructor - /// calls terminate(). It is a kind of rude way to crash, but it is also - /// *very* unlikely to happen, as start() does only add a reference to this - /// handler to a list in the stream mgr. - start(); + /// Start should be called externally, as at constructor time no virtual + /// functions should be called. + /* start(); */ } ~PyIndataHandler() { DEBUGTRACE_ENTER; @@ -100,19 +129,19 @@ public: py::object bool_val; switch (d.dtype) { case (DataType::dtype_int8): { - bool_val = cb(getPyArray(d)); + bool_val = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_int16): { - bool_val = cb(getPyArray(d)); + bool_val = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_int32): { - bool_val = cb(getPyArray(d)); + bool_val = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_fl32): { - bool_val = cb(getPyArray(d)); + bool_val = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_fl64): { - bool_val = cb(getPyArray(d)); + bool_val = cb(getPyArrayNoCpy(d)); } break; default: throw std::runtime_error("BUG"); @@ -134,13 +163,22 @@ public: } }; - void init_datahandler(py::module &m) { - py::class_ h(m, "InDataHandler"); - h.def(py::init()); + + py::class_ idh(m, "InDataHandler_base"); + idh.def("start", &InDataHandler::start); + idh.def("stop", &InDataHandler::stop); + + py::class_ tidh( + m, "ThreadedInDataHandler"); + + /// The C++ class is PyIndataHandler, but for Python, it is called + /// InDataHandler + py::class_ pyidh(m, "InDataHandler"); + pyidh.def(py::init()); /// Peak Programme Meter - py::class_ ppm(m, "PPMHandler"); + py::class_ ppm(m, "PPMHandler"); ppm.def(py::init()); ppm.def(py::init()); @@ -153,8 +191,8 @@ void init_datahandler(py::module &m) { /// Real time Aps /// - py::class_ rtaps(m, "RtAps"); - rtaps.def(py::init rtaps(m, "RtAps"); + rtaps.def(py::init(smgr, "StreamType") - .value("input", StreamMgr::StreamType::input) - .value("output", StreamMgr::StreamType::output); + .value("input", StreamMgr::StreamType::input) + .value("output", StreamMgr::StreamType::output); smgr.def("startStream", &StreamMgr::startStream); smgr.def("stopStream", &StreamMgr::stopStream); smgr.def_static("getInstance", []() { - return std::unique_ptr(&StreamMgr::getInstance()); - }); + return std::unique_ptr(&StreamMgr::getInstance()); + }); smgr.def("stopAllStreams", &StreamMgr::stopAllStreams); smgr.def("setSiggen", &StreamMgr::setSiggen); @@ -32,11 +32,13 @@ void init_streammgr(py::module &m) { smgr.def("isStreamRunningOK", &StreamMgr::isStreamRunningOK); smgr.def("isStreamRunning", &StreamMgr::isStreamRunning); smgr.def("getDaq", &StreamMgr::getDaq, py::return_value_policy::reference); - smgr.def("rescanDAQDevices", [](StreamMgr& smgr, bool background) { - // A pure C++ callback is the second argument to rescanDAQDevices, which - // cannot be wrapped to Pybind11. Only the one without callback is - // forwarded here to Python code. - smgr.rescanDAQDevices(background); + smgr.def( + "rescanDAQDevices", + [](StreamMgr &smgr, bool background) { + // A pure C++ callback is the second argument to rescanDAQDevices, which + // cannot be wrapped to Pybind11. Only the one without callback is + // forwarded here to Python code. + smgr.rescanDAQDevices(background); }, - py::arg("background")=false); + py::arg("background") = false); } diff --git a/third_party/armadillo-code b/third_party/armadillo-code index b572743..3865a05 160000 --- a/third_party/armadillo-code +++ b/third_party/armadillo-code @@ -1 +1 @@ -Subproject commit b5727433d342afca53aca0ee16ecf1caaa661821 +Subproject commit 3865a0520d577ac293d88c4fd726a41bda949869