diff --git a/CMakeLists.txt b/CMakeLists.txt index 92963a5..81ee98e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,9 +45,14 @@ set(CMAKE_POSITION_INDEPENDENT_CODE ON) if(${CMAKE_BUILD_TYPE} STREQUAL "Debug") set(LASP_DEBUG True) - # This does not work nicely with RtAudio. However, if we compile it + # This does not work nicely with RtAudio if it is precompiled (undefined + # symbols). However, if we compile it # ourselves as a third_party ref, this works nicely together. - add_definitions(-D_GLIBCXX_DEBUG) + + # This flag gives "floating point exception: core dumped" + # add_definitions(-D_GLIBCXX_DEBUG) + + # add_definitions(ARMA_EXTRA_DEBUG) else() set(LASP_DEBUG False) diff --git a/README.md b/README.md index 89892a4..98bfcc0 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,11 @@ # Library for Acoustic Signal Processing -Welcome to LASP: Library for Acoustic Signal Processing. LASP is a C library +Welcome to LASP: Library for Acoustic Signal Processing. LASP is a C++ library with a Python interface which is supposed to process (multi-) microphone acoustic data in real time on a PC and output results. -The main goal of this library will be the processing of data from an -array of microphones real time, on a Raspberry PI. At the point in -time of this writing, we are yet unsure whether the Raspberry PI will -have enough computational power to this end, but may be by the time it +At the point in time of this writing, we are yet unsure whether the Raspberry +PI will have enough computational power to this end, but may be by the time it is finished, we have a new faster generation :). Current features that are implemented: @@ -41,7 +39,7 @@ If you have any question(s), please feel free to contact us: info@ascee.nl. Two commands that install all requirements (for Ubuntu / Linux Mint) - `pip install scipy numpy build scikit-build appdirs` -- `sudo apt install libusb-dev +- `sudo apt install libusb-dev libpulse-dev libboost-dev` ## Runtime dependencies (Linux) @@ -52,14 +50,12 @@ Two commands that install all requirements (for Ubuntu / Linux Mint) - RtAudio, for Audio DAQ backends - libusb - BLAS (OpenBLAS, other). -- RtAudio (optional) -- UlDaq (optional) ## Editable install In the root directory of the repository, run: -- `pip3 isntall --user --prefix=~/.local -e .` +- `pip3 isntall --user -e .` - `cmake .` - `make -j` @@ -72,7 +68,6 @@ Optional dependencies, which can be turned ON/OFF using CMake: - Build tools: compiler [http://cmake.org](CMake), the Python packages: - Scipy - Numpy - - py-build-cmake - appdirs These can all be installed using: @@ -103,7 +98,11 @@ compilation: - cython - python3-numpy - libopenblas + +If building RtAudio with the ALSA backend: + - libclalsadrv-dev - libopenblas-base - libopenblas-dev +- libusb-1.0-dev diff --git a/cmake/rtaudio.cmake b/cmake/rtaudio.cmake index ca36779..835fde6 100644 --- a/cmake/rtaudio.cmake +++ b/cmake/rtaudio.cmake @@ -6,6 +6,7 @@ if(LASP_HAS_RTAUDIO) else() set(RTAUDIO_API_PULSE TRUE CACHE BOOL "Build with PulseAudio backend" FORCE) set(RTAUDIO_API_ALSA OFF CACHE BOOL "Do not build with Alsa backend" FORCE) + set(RTAUDIO_API_JACK OFF CACHE BOOL "Do not build with Jack backend" FORCE) endif() set(RTAUDIO_BUILD_STATIC_LIBS ON CACHE BOOL "Build static libs for RtAudio" FORCE) add_subdirectory(third_party/rtaudio) diff --git a/pyproject.toml b/pyproject.toml index 771e14f..6ac7893 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,6 @@ readme = "README.md" requires-python = ">=3.8" license = { "file" = "LICENSE" } authors = [{ "name" = "J.A. de Jong et al.", "email" = "info@ascee.nl" }] -keywords = ["DSP", "DAQ", "Signal processing"] classifiers = [ "Topic :: Scientific/Engineering", @@ -14,15 +13,5 @@ classifiers = [ ] # urls = { "Documentation" = "https://" } -dependencies = ["numpy", "scipy", "appdirs", "h5py", "appdirs", "dataclasses_json", ] dynamic = ["version", "description"] -[build-system] # How pip and other frontends should build this project -requires = [ - "setuptools>=42", - "wheel", - "scikit-build", - "cmake", -] -build-backend = "setuptools.build_meta" - diff --git a/setup.py b/setup.py index 65fef76..b46819d 100644 --- a/setup.py +++ b/setup.py @@ -1,15 +1,51 @@ -from skbuild import setup +import glob +import platform + +from setuptools import setup + +if 'Linux' in platform.platform(): + extension = list(glob.glob('src/lasp/lasp_cpp.cpython*')) + if len(extension) == 0: + raise RuntimeError('Please first run CMake to build extension') + elif len(extension) > 1: + raise RuntimeError('Too many extension files found') + + pkgdata = extension + +else: + raise RuntimeError('Not yet Windows-proof') + +classifiers = [ + "Topic :: Scientific/Engineering", + "Programming Language :: Python :: 3.8", + "Operating System :: POSIX :: Linux", + "Operating System :: Microsoft :: Windows", +] + +keywords = ["DSP", "DAQ", "Signal processing"] + +with open('README.md', 'r') as f: + readme = f.read() + setup( name="lasp", version="1.0", - description="LASP Library of Acoustic Signal Processing", + description="LASP: Library for Acoustic Signal Processing", + author='J.A. de Jong (ASCEE / Redu-Sone)', author_email='info@ascee.nl', + + url='https://www.ascee.nl/lasp', + classifiers=classifiers, + keywords=keywords, license="MIT", - packages=['lasp'], - package_dir= {'': 'src'}, - cmake_install_dir='src/lasp', - # cmake_install_target='src', + readme=readme, + dependencies=["numpy", "scipy", "appdirs", "h5py", "appdirs", + "dataclasses_json"], + packages=['lasp', 'lasp.filter', 'lasp.tools'], + data_files = pkgdata, + include_package_data=True, + package_dir={'': 'src'}, python_requires='>=3.8', ) diff --git a/src/lasp/device/lasp_daq.cpp b/src/lasp/device/lasp_daq.cpp index 1892426..158ac04 100644 --- a/src/lasp/device/lasp_daq.cpp +++ b/src/lasp/device/lasp_daq.cpp @@ -14,7 +14,7 @@ using std::runtime_error; Daq::~Daq() { DEBUGTRACE_ENTER; } std::unique_ptr Daq::createDaq(const DeviceInfo &devinfo, - const DaqConfiguration &config) { + const DaqConfiguration &config) { DEBUGTRACE_ENTER; #if LASP_HAS_ULDAQ == 1 @@ -23,37 +23,37 @@ std::unique_ptr Daq::createDaq(const DeviceInfo &devinfo, } #endif #if LASP_HAS_RTAUDIO == 1 - // See lasp_daqconfig.h:114 ALSA, up to + // See lasp_daqconfig.h:114 ALSA, up to if (devinfo.api.apicode == LASP_RTAUDIO_APICODE) { return createRtAudioDevice(devinfo, config); } #endif throw std::runtime_error(string("Unable to match Device API: ") + - devinfo.api.apiname); + devinfo.api.apiname); } Daq::Daq(const DeviceInfo &devinfo, const DaqConfiguration &config) - : DaqConfiguration(config), DeviceInfo(devinfo) { - DEBUGTRACE_ENTER; + : DaqConfiguration(config), DeviceInfo(devinfo) { + DEBUGTRACE_ENTER; - if (!hasInternalOutputMonitor && monitorOutput) { - throw std::runtime_error( - "Output monitor flag set, but device does not have output monitor"); - } - - if (!config.match(devinfo)) { - throw std::runtime_error("DaqConfiguration does not match device info"); - } - if (neninchannels(false) > ninchannels) { - throw std::runtime_error( - "Number of enabled input channels is higher than device capability"); - } - if (nenoutchannels() > noutchannels) { - throw std::runtime_error( - "Number of enabled output channels is higher than device capability"); - } + if (!hasInternalOutputMonitor && monitorOutput) { + throw std::runtime_error( + "Output monitor flag set, but device does not have output monitor"); } + if (!config.match(devinfo)) { + throw std::runtime_error("DaqConfiguration does not match device info"); + } + if (neninchannels(false) > ninchannels) { + throw std::runtime_error( + "Number of enabled input channels is higher than device capability"); + } + if (nenoutchannels() > noutchannels) { + throw std::runtime_error( + "Number of enabled output channels is higher than device capability"); + } +} + double Daq::samplerate() const { DEBUGTRACE_ENTER; return availableSampleRates.at(sampleRateIndex); @@ -62,7 +62,9 @@ double Daq::samplerate() const { DataTypeDescriptor::DataType Daq::dataType() const { return availableDataTypes.at(dataTypeIndex); } -DataTypeDescriptor Daq::dtypeDescr() const { return dtype_map.at(dataType()); } +const DataTypeDescriptor &Daq::dtypeDescr() const { + return dtype_map.at(dataType()); +} double Daq::inputRangeForChannel(us ch) const { if (!(ch < ninchannels)) { diff --git a/src/lasp/device/lasp_daq.h b/src/lasp/device/lasp_daq.h index fe57355..6779cda 100644 --- a/src/lasp/device/lasp_daq.h +++ b/src/lasp/device/lasp_daq.h @@ -176,7 +176,7 @@ public: * * @return A DataTypeDescriptor */ - DataTypeDescriptor dtypeDescr() const; + const DataTypeDescriptor& dtypeDescr() const; /** * @brief The number of frames that is send in a block of DaqData. diff --git a/src/lasp/device/lasp_daqconfig.cpp b/src/lasp/device/lasp_daqconfig.cpp index bb6d33b..e8b2010 100644 --- a/src/lasp/device/lasp_daqconfig.cpp +++ b/src/lasp/device/lasp_daqconfig.cpp @@ -57,7 +57,7 @@ bool DaqConfiguration::match(const DeviceInfo &dev) const { return (dev.device_name == device_name && dev.api == api); } -int DaqConfiguration::getHighestInChannel() const { +int DaqConfiguration::getHighestEnabledInChannel() const { for (int i = inchannel_config.size() - 1; i > -1; i--) { if (inchannel_config.at(i).enabled) return i; @@ -65,21 +65,21 @@ int DaqConfiguration::getHighestInChannel() const { return -1; } -int DaqConfiguration::getHighestOutChannel() const { +int DaqConfiguration::getHighestEnabledOutChannel() const { for (us i = outchannel_config.size() - 1; i >= 0; i--) { if (outchannel_config.at(i).enabled) return i; } return -1; } -int DaqConfiguration::getLowestInChannel() const { +int DaqConfiguration::getLowestEnabledInChannel() const { for (us i = 0; i < inchannel_config.size(); i++) { if (inchannel_config.at(i).enabled) return i; } return -1; } -int DaqConfiguration::getLowestOutChannel() const { +int DaqConfiguration::getLowestEnabledOutChannel() const { for (us i = 0; i < outchannel_config.size(); i++) { if (outchannel_config.at(i).enabled) return i; diff --git a/src/lasp/device/lasp_daqconfig.h b/src/lasp/device/lasp_daqconfig.h index dc5248d..ae334bb 100644 --- a/src/lasp/device/lasp_daqconfig.h +++ b/src/lasp/device/lasp_daqconfig.h @@ -303,13 +303,13 @@ public: bool match(const DeviceInfo &devinfo) const; /** - * @brief Get the highest channel number from the list of enabled input + * @brief Get the enabled highest channel number from the list of enabled input * channels. * * @return Index to the highest input channel. -1 if no input channels are * enabled. */ - int getHighestInChannel() const; + int getHighestEnabledInChannel() const; /** * @brief Get the highest channel number from the list of enabled output * channels. @@ -317,7 +317,7 @@ public: * @return Index to the highest input channel. -1 if no output channels are * enabled. */ - int getHighestOutChannel() const; + int getHighestEnabledOutChannel() const; /** * @brief Get the lowest channel number from the list of enabled input @@ -326,15 +326,15 @@ public: * @return Index to the lowest input channel. -1 if no input channels are * enabled. */ - int getLowestInChannel() const; + int getLowestEnabledInChannel() const; /** * @brief Get the lowest channel number from the list of enabled output * channels. * - * @return Index to the lowest input channel. -1 if no output channels are + * @return Index to the lowest output channel. -1 if no output channels are * enabled. */ - int getLowestOutChannel() const; + int getLowestEnabledOutChannel() const; /** * @brief Set all input channels to enabled / disabled state. diff --git a/src/lasp/device/lasp_daqdata.cpp b/src/lasp/device/lasp_daqdata.cpp index 1e26091..dd52d0d 100644 --- a/src/lasp/device/lasp_daqdata.cpp +++ b/src/lasp/device/lasp_daqdata.cpp @@ -1,106 +1,210 @@ /* #define DEBUGTRACE_ENABLED */ -#include "lasp_daqdata.h" #include "debugtrace.hpp" +#include +#include "lasp_daqdata.h" #include "lasp_mathtypes.h" #include +#include + +using std::cerr; +using std::cout; +using std::endl; +using rte = std::runtime_error; + +static_assert(sizeof(byte_t) == 1, "Invalid char size"); DEBUGTRACE_VARIABLES; -DaqData::DaqData(const us nchannels, const us nframes, +/// Constructors and destructors +DaqData::DaqData(const us nframes, const us nchannels, const DataTypeDescriptor::DataType dtype) - : nchannels(nchannels), nframes(nframes), dtype(dtype), - dtype_descr(dtype_map.at(dtype)), sw(dtype_descr.sw) { - static_assert(sizeof(char) == 1, "Invalid char size"); - const DataTypeDescriptor &desc = dtype_map.at(dtype); - _data.resize(nframes * nchannels * desc.sw); + : nframes(nframes), nchannels(nchannels), dtype(dtype), + dtype_descr(dtype_map.at(dtype)), sw(dtype_descr.sw) { + + DEBUGTRACE_ENTER; + DEBUGTRACE_PRINT(sw); + + _data = new (std::align_val_t{8}) byte_t[sw * nchannels * nframes]; + if (!_data) { + throw rte("Could not allocate memory for DaqData!"); + } +} +DaqData::DaqData(const DaqData &o) : DaqData(o.nframes, o.nchannels, o.dtype) { + + DEBUGTRACE_ENTER; + /* std::copy(o._data, &o._data[sw * nchannels * nframes], _data); */ + memcpy(_data, o._data, sw * nchannels * nframes); } -void DaqData::copyInFromRaw(const std::vector &ptrs) { +/* DaqData::DaqData(DaqData &&o) */ +/* : nframes(o.nframes), nchannels(o.nchannels), dtype(o.dtype), */ +/* dtype_descr(std::move(o.dtype_descr)), sw(o.sw) { */ + +/* DEBUGTRACE_ENTER; */ +/* _data = o._data; */ +/* /// Nullptrs do not get deleted */ +/* o._data = nullptr; */ +/* } */ + +DaqData::~DaqData() { + DEBUGTRACE_ENTER; + if (_data) + delete[] _data; +} + +void DaqData::copyInFromRaw(const std::vector &ptrs) { + DEBUGTRACE_ENTER; us ch = 0; assert(ptrs.size() == nchannels); - for (auto ptr : ptrs) { - std::copy(ptr, ptr + sw * nframes, &_data[sw * ch * nframes]); + for (auto& ptr : ptrs) { + assert(ch < nchannels); + memcpy(&_data[sw * ch * nframes], ptr, sw * nframes); + /* std::copy(ptr, ptr + sw * nframes, &_data[sw * ch * nframes]); */ ch++; } + dmat data2(nframes, nchannels); } -void DaqData::copyToRaw(const us channel, uint8_t *ptr) { - std::copy(raw_ptr(0, channel), - raw_ptr(nframes, channel), ptr); +void DaqData::copyToRaw(const us channel, byte_t *ptr) { + /* std::copy(raw_ptr(0, channel), raw_ptr(nframes, channel), ptr); */ + memcpy(ptr, raw_ptr(0, channel), sw*nframes); } -template d convertToFloat(const int_type val) { - if constexpr (std::is_integral::value) { - return static_cast(val) / std::numeric_limits::max(); +template +d DaqData::toFloat(const us frame, const us channel) const { + DEBUGTRACE_ENTER; + if constexpr (std::is_integral::value) { + return static_cast(value(frame, channel)) / + std::numeric_limits::max(); } else { - return static_cast(val); + return static_cast(value(frame, channel)); } } -template -inline vd channelToFloat(const byte_t *vals, const us nframes) { + +template vd DaqData::toFloat(const us channel) const { + DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + check_type(); +#endif vd res(nframes); for (us i = 0; i < nframes; i++) { - res(i) = convertToFloat(reinterpret_cast(vals)[i]); + res(i) = toFloat(i, channel); } return res; } -template -inline dmat allToFloat(const byte_t *vals, const us nframes, const us nchannels) { +template dmat DaqData::toFloat() const { + + DEBUGTRACE_ENTER; +#if LASP_DEBUG == 1 + check_type(); +#endif dmat res(nframes, nchannels); - for (us j = 0; j < nchannels; j++) { - for (us i = 0; i < nframes; i++) { - res(i, j) = convertToFloat( - reinterpret_cast(vals)[i + j * nframes]); + + for (us i = 0; i < nframes; i++) { + for (us j = 0; j < nchannels; j++) { + res(i, j) = toFloat(i, j); } } return res; } -vd DaqData::toFloat(const us channel_no) const { +d DaqData::toFloat(const us frame, const us channel) const { + DEBUGTRACE_ENTER; using DataType = DataTypeDescriptor::DataType; switch (dtype) { + case (DataType::dtype_int8): + return toFloat(frame, channel); + break; + case (DataType::dtype_int16): + return toFloat(frame, channel); + break; + case (DataType::dtype_int32): + return toFloat(frame, channel); + break; + case (DataType::dtype_fl32): + return toFloat(frame, channel); + break; + case (DataType::dtype_fl64): + return toFloat(frame, channel); + break; + default: + throw std::runtime_error("BUG"); + } // End of switch + + // Never arrives her + return 0; +} + +vd DaqData::toFloat(const us channel_no) const { + DEBUGTRACE_ENTER; + using DataType = DataTypeDescriptor::DataType; + cerr << (int) dtype << endl; + switch (dtype) { case (DataType::dtype_int8): { - return channelToFloat(raw_ptr(0, channel_no), nframes); + return toFloat(channel_no); } break; case (DataType::dtype_int16): { - return channelToFloat(raw_ptr(0, channel_no), nframes); - + return toFloat(channel_no); } break; case (DataType::dtype_int32): { - return channelToFloat(raw_ptr(0, channel_no), nframes); + return toFloat(channel_no); } break; case (DataType::dtype_fl32): { - return channelToFloat(raw_ptr(0, channel_no), nframes); + return toFloat(channel_no); } break; case (DataType::dtype_fl64): { - return channelToFloat(raw_ptr(0, channel_no), nframes); + return toFloat(channel_no); } break; default: throw std::runtime_error("BUG"); } // End of switch + + // Never arrives here + return vd(); } + dmat DaqData::toFloat() const { - dmat result(nframes, nchannels); + + DEBUGTRACE_ENTER; + /* DEBUGTRACE_PRINT(nframes); */ + /* DEBUGTRACE_PRINT(nchannels); */ + using DataType = DataTypeDescriptor::DataType; + + /* cerr << "DataType: " << (int) dtype << endl; */ + switch (dtype) { - case (DataType::dtype_int8): { - return allToFloat(raw_ptr(0), nframes, nchannels); - } break; - case (DataType::dtype_int16): { - return allToFloat(raw_ptr(0), nframes, nchannels); - } break; - case (DataType::dtype_int32): { - return allToFloat(raw_ptr(0), nframes, nchannels); - } break; - case (DataType::dtype_fl32): { - return allToFloat(raw_ptr(0), nframes, nchannels); - } break; - case (DataType::dtype_fl64): { - return allToFloat(raw_ptr(0), nframes, nchannels); - } break; + case (DataType::dtype_int8): + return toFloat(); + break; + case (DataType::dtype_int16): + return toFloat(); + break; + case (DataType::dtype_int32): + return toFloat(); + break; + case (DataType::dtype_fl32): + return toFloat(); + break; + case (DataType::dtype_fl64): + return toFloat(); + break; default: throw std::runtime_error("BUG"); } // End of switch + + // Never reached + return dmat(); +} + +void DaqData::print() const { + cout << "Number of frames: " << nframes << endl; + cout << "Number of channels: " << nchannels << endl; + cout << "DataType: " << dtype_map.at(dtype).name << endl; + cout << "First sample of first channel (as float)" << toFloat(0,0) << endl; + cout << "Last sample of first channel (as float)" << toFloat(nframes-1,0) << endl; + cout << "Last sample of last channel (as float)" << toFloat(nframes-1,nchannels-1) << endl; + } diff --git a/src/lasp/device/lasp_daqdata.h b/src/lasp/device/lasp_daqdata.h index 84e91ee..034ebfa 100644 --- a/src/lasp/device/lasp_daqdata.h +++ b/src/lasp/device/lasp_daqdata.h @@ -1,10 +1,12 @@ #pragma once #include "lasp_daqconfig.h" #include "lasp_types.h" +#include +#include #include #include #include -#include +#include /** \addtogroup device * @{ @@ -22,19 +24,19 @@ protected: /** * @brief Storage for the actual data. */ - std::vector _data; + byte_t *_data; public: - /** - * @brief The number of channels - */ - us nchannels; - /** * @brief The number of frames in this block of data. */ us nframes; + /** + * @brief The number of channels + */ + us nchannels; + /** * @brief The data type corresponding to a sample */ @@ -53,22 +55,23 @@ public: /** * @brief Initialize an empty frame of data * - * @param nchannels The number of channels * @param nframes The number of frames + * @param nchannels The number of channels * @param dtype The data type */ - DaqData(const us nchannels, const us nframes, + DaqData(const us nframes, const us nchannels, const DataTypeDescriptor::DataType dtype); /** * @brief Initialize using no allocation */ - DaqData() - : DaqData(0, 0, DataTypeDescriptor::DataType::dtype_int8) {} - virtual ~DaqData() = default; - DaqData &operator=(const DaqData &) = default; + DaqData(const DaqData &); + /* DaqData(DaqData &&); */ + DaqData &operator=(const DaqData &) = delete; + virtual ~DaqData(); /** - * @brief Return pointer to the raw data corresponding to a certain sample. + * @brief Return pointer to the raw data corresponding to a certain sample + * (frame, channel combo). * * @param frame The frame number * @param channel The channel number @@ -76,16 +79,14 @@ public: * @return Pointer to sample, not casted to final type */ byte_t *raw_ptr(const us frame = 0, const us channel = 0) { - return &(_data.data()[sw * (frame + channel * nframes)]); + assert(frame < nframes); + assert(channel < nchannels); + return &(_data[sw * (frame + channel * nframes)]); } const byte_t *raw_ptr(const us frame = 0, const us channel = 0) const { - return &(_data.data()[sw * (frame + channel * nframes)]); - } - - void setSlow(const us frame, const us channel, const int8_t *val) { - for (us i = 0; i < sw; i++) { - _data.at(i + sw * (frame + channel * nframes)) = val[i]; - } + assert(frame < nframes); + assert(channel < nchannels); + return &(_data[sw * (frame + channel * nframes)]); } /** @@ -93,7 +94,7 @@ public: * * @return Number of bytes of data. */ - us size_bytes() const { return _data.size(); } + us size_bytes() const { return sw * nchannels * nframes; } /** * @brief Copy data from a set of raw pointers of *uninterleaved* data. @@ -101,7 +102,7 @@ public: * * @param ptrs Pointers to data from channels */ - void copyInFromRaw(const std::vector &ptrs); + void copyInFromRaw(const std::vector &ptrs); /** * @brief Copy contents of DaqData for a certain channel to a raw pointer. @@ -109,7 +110,7 @@ public: * @param channel The channel to copy. * @param ptr The pointer where data is copied to. */ - void copyToRaw(const us channel, uint8_t *ptr); + void copyToRaw(const us channel, byte_t *ptr); /** * @brief Convert samples to floating point values and return a nframes x @@ -121,7 +122,7 @@ public: arma::Mat toFloat() const; /** - * @brief Convert samples to floating point values and return a nframes + * @brief Convert samples to floating point value; and return a nframes * column vector of floats. For data that is not already floating-point, * the data is scaled back from MAX_INT to +1.0. * @@ -130,39 +131,73 @@ public: * @return Array of floats */ arma::Col toFloat(const us channel_no) const; -}; - -template class TypedDaqData : public DaqData { -public: - TypedDaqData(const us nchannels, const us nframes, - const DataTypeDescriptor::DataType dtype_descr) - : DaqData(nchannels, nframes, dtype_descr) {} - - T &operator[](const us i) { return _data[sw * i]; } /** - * @brief Reference of sample, casted to the right type. Same as raw_ptr(), - * but then as reference, and corresponding to right type. + * @brief Convert single sample to floating point value; and return a nframes + * column vector of floats. For data that is not already floating-point, + * the data is scaled back from MAX_INT to +1.0. * + * @param channel The channel to convert + * @param frame_no The frame number to convert + * + * @return Float value + */ + d toFloat(const us frame_no, const us channel_no) const; + + // Return value based on type + template T &value(const us frame, const us channel) { +#if LASP_DEBUG == 1 + check_type(); +#endif + return *reinterpret_cast(raw_ptr(frame, channel)); + } + template const T &value(const us frame, const us channel) const { +#if LASP_DEBUG == 1 + check_type(); +#endif + return *reinterpret_cast(raw_ptr(frame, channel)); + } + + /** + * @brief For debugging purposes: prints some stats + */ + void print() const; + +protected: + template void check_type() const { + using DataType = DataTypeDescriptor::DataType; + + bool correct = false; + static_assert(std::is_arithmetic::value); + + if constexpr (std::is_floating_point::value) { + correct |= sizeof(T) == 4 && dtype == DataType::dtype_fl32; + correct |= sizeof(T) == 8 && dtype == DataType::dtype_fl64; + + } else { + correct |= sizeof(T) == 1 && dtype == DataType::dtype_int8; + correct |= sizeof(T) == 2 && dtype == DataType::dtype_int16; + correct |= sizeof(T) == 4 && dtype == DataType::dtype_int32; + } + if (!correct) { + throw std::runtime_error("Wrong datatype for template argument"); + } + } + + template arma::Mat toFloat() const; + template arma::Col toFloat(const us channel_no) const; + template d toFloat(const us frame_no, const us channel_no) const; + + /** + * @brief Return a value as floating point. Does a conversion from integer to + * float, if the stored type is integer. Also scales by its maximum value. + * + * @tparam T The original type * @param frame Frame number * @param channel Channel number * - * @return + * @return Value converted to floating point */ - T &operator()(const us frame = 0, const us channel = 0) { - return reinterpret_cast(*raw_ptr(frame, channel)); - } - - /** - * @brief Copy out the data for a certain channel to a buffer - * - * @param channel The channel to copy - * @param buffer The buffer to copy to. *Make sure* it is allocated with at - * least `sw*nframes` bytes. - */ - void copyToRaw(const us channel, T *buffer) { - DaqData::copyToRaw(channel, reinterpret_cast(buffer)); - } - }; + /** @} */ diff --git a/src/lasp/device/lasp_rtaudiodaq.cpp b/src/lasp/device/lasp_rtaudiodaq.cpp index 46e30fe..6716770 100644 --- a/src/lasp/device/lasp_rtaudiodaq.cpp +++ b/src/lasp/device/lasp_rtaudiodaq.cpp @@ -1,18 +1,21 @@ -/* #define DEBUGTRACE_ENABLED */ +#include +#define DEBUGTRACE_ENABLED #include "debugtrace.hpp" +#include "lasp_mathtypes.h" #include "lasp_rtaudiodaq.h" #if LASP_HAS_RTAUDIO == 1 -#include "lasp_daq.h" #include "RtAudio.h" +#include "lasp_daq.h" #include #include using std::atomic; using std::cerr; using std::endl; -using std::runtime_error; +using rte = std::runtime_error; using std::vector; +using lck = std::scoped_lock; DEBUGTRACE_VARIABLES; @@ -81,7 +84,7 @@ void fillRtAudioDeviceInfo(vector &devinfolist) { d.availableDataTypes.push_back( DataTypeDescriptor::DataType::dtype_int16); } - /* if (formats & RTAUDIO_SINT32) { */ + /* if (formats & RTAUDIO_SINT24) { *1/ */ /* d.availableDataTypes.push_back(DataTypeDescriptor::DataType::dtype_int24); */ /* } */ @@ -137,7 +140,7 @@ public: // We make sure not to run RtAudio in duplex mode. This seems to be buggy // and untested. Better to use a hardware-type loopback into the system. if (duplexMode()) { - throw runtime_error("RtAudio backend cannot run in duplex mode."); + throw rte("RtAudio backend cannot run in duplex mode."); } assert(!monitorOutput); @@ -148,9 +151,9 @@ public: inParams = std::make_unique(); // +1 to get the count. - inParams->nChannels = getHighestInChannel() + 1; + inParams->nChannels = getHighestEnabledInChannel() + 1; if (inParams->nChannels < 1) { - throw runtime_error("Invalid input number of channels"); + throw rte("Invalid input number of channels"); } inParams->firstChannel = 0; inParams->deviceId = devinfo.api_specific_devindex; @@ -159,9 +162,9 @@ public: outParams = std::make_unique(); - outParams->nChannels = getHighestOutChannel() + 1; + outParams->nChannels = getHighestEnabledOutChannel() + 1; if (outParams->nChannels < 1) { - throw runtime_error("Invalid output number of channels"); + throw rte("Invalid output number of channels"); } outParams->firstChannel = 0; outParams->deviceId = devinfo.api_specific_devindex; @@ -171,7 +174,7 @@ public: streamoptions.flags = RTAUDIO_HOG_DEVICE | RTAUDIO_NONINTERLEAVED; streamoptions.numberOfBuffers = 2; - streamoptions.streamName = "RtAudio stream"; + streamoptions.streamName = "LASP RtAudio DAQ stream"; streamoptions.priority = 0; RtAudioFormat format; @@ -179,22 +182,27 @@ public: const Dtype dtype = dataType(); switch (dtype) { case Dtype::dtype_fl32: + DEBUGTRACE_PRINT("Datatype float32"); format = RTAUDIO_FLOAT32; break; case Dtype::dtype_fl64: + DEBUGTRACE_PRINT("Datatype float64"); format = RTAUDIO_FLOAT64; break; case Dtype::dtype_int8: + DEBUGTRACE_PRINT("Datatype int8"); format = RTAUDIO_SINT8; break; case Dtype::dtype_int16: + DEBUGTRACE_PRINT("Datatype int16"); format = RTAUDIO_SINT16; break; case Dtype::dtype_int32: + DEBUGTRACE_PRINT("Datatype int32"); format = RTAUDIO_SINT32; break; default: - throw runtime_error("Invalid data type specified for DAQ stream."); + throw rte("Invalid data type specified for DAQ stream."); break; } @@ -207,38 +215,46 @@ public: static_cast(samplerate()), &nFramesPerBlock_copy, mycallback, (void *)this, &streamoptions, &myerrorcallback); + + if (nFramesPerBlock_copy != nFramesPerBlock) { + throw rte("Got different number of frames per block back from RtAudio " + "backend. Do not know what to do"); + } } virtual void start(InDaqCallback inCallback, OutDaqCallback outCallback) override final { DEBUGTRACE_ENTER; + lck lock(_mtx); assert(!monitorOutput); - if (StreamStatus().runningOK()) { - throw runtime_error("Stream already running"); + if (getStreamStatus().runningOK()) { + throw rte("Stream already running"); } // Logical XOR if (inCallback && outCallback) { - throw runtime_error("Either input or output stream possible for RtAudio. " - "Stream duplex mode not provided."); + throw rte("Either input or output stream possible for RtAudio. " + "Stream duplex mode not provided."); } - if (inCallback) { - _incallback = inCallback; - if (neninchannels() == 0) { - throw runtime_error( + if (neninchannels() > 0) { + if (!inCallback) { + throw rte( + "Input callback given, but stream does not provide input data"); } + + _incallback = inCallback; } - if (outCallback) { - _outcallback = outCallback; - if (nenoutchannels() == 0) { - throw runtime_error( + if (nenoutchannels() > 0) { + if (!outCallback) { + throw rte( "Output callback given, but stream does not provide output data"); } + _outcallback = outCallback; } // Start the stream. Throws on error. @@ -264,10 +280,9 @@ public: } int streamCallback(void *outputBuffer, void *inputBuffer, - unsigned int nFrames, double streamTime, - RtAudioStreamStatus status) { + unsigned int nFrames, RtAudioStreamStatus status) { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; using se = StreamStatus::StreamError; @@ -294,8 +309,9 @@ public: break; } - const auto &dtype_descr = DataTypeDescriptor(); + const auto &dtype_descr = dtypeDescr(); const auto dtype = dataType(); + us neninchannels = this->neninchannels(); us nenoutchannels = this->nenoutchannels(); us sw = dtype_descr.sw; @@ -308,18 +324,22 @@ public: if (inputBuffer) { assert(_incallback); - std::vector ptrs; + std::vector ptrs; ptrs.reserve(neninchannels); + const us ch_min = getLowestEnabledInChannel(); + const us ch_max = getHighestEnabledInChannel(); us i = 0; - for (int ch = getLowestInChannel(); ch <= getHighestInChannel(); ch++) { + for (us ch = ch_min; ch <= ch_max; ch++) { if (inchannel_config.at(ch).enabled) { - ptrs.push_back(static_cast(inputBuffer) + - sw * i * nFramesPerBlock); - i++; + byte_t *ptr = + static_cast(inputBuffer) + sw * i * nFramesPerBlock; + DEBUGTRACE_PRINT((us)ptr); + ptrs.push_back(ptr); } + i++; } - DaqData d{neninchannels, nFramesPerBlock, dtype}; + DaqData d{nFramesPerBlock, neninchannels, dtype}; d.copyInFromRaw(ptrs); bool ret = _incallback(d); @@ -331,22 +351,23 @@ public: if (outputBuffer) { assert(_outcallback); - std::vector ptrs; + std::vector ptrs; ptrs.reserve(nenoutchannels); /* outCallback */ - us i = 0; - for (int ch = getLowestOutChannel(); ch <= getHighestOutChannel(); ch++) { + const us ch_min = getLowestEnabledOutChannel(); + const us ch_max = getHighestEnabledOutChannel(); + us i = 0; + for (us ch = ch_min; ch <= ch_max; ch++) { if (outchannel_config.at(ch).enabled) { - ptrs.push_back(static_cast(outputBuffer) + + ptrs.push_back(static_cast(outputBuffer) + sw * i * nFramesPerBlock); - - i++; } + i++; } - DaqData d{nenoutchannels, nFramesPerBlock, dtype}; + DaqData d{nFramesPerBlock, nenoutchannels, dtype}; bool ret = _outcallback(d); if (!ret) { @@ -381,7 +402,7 @@ int mycallback(void *outputBuffer, void *inputBuffer, unsigned int nFrames, double streamTime, RtAudioStreamStatus status, void *userData) { return static_cast(userData)->streamCallback( - outputBuffer, inputBuffer, nFrames, streamTime, status); + outputBuffer, inputBuffer, nFrames, status); } #endif // LASP_HAS_RTAUDIO == 1 diff --git a/src/lasp/device/lasp_streammgr.cpp b/src/lasp/device/lasp_streammgr.cpp index cdf9118..1d07dad 100644 --- a/src/lasp/device/lasp_streammgr.cpp +++ b/src/lasp/device/lasp_streammgr.cpp @@ -31,6 +31,7 @@ InDataHandler::~InDataHandler() { "InDataHandler's destructor. Fix this by calling " "InDataHandler::stop() from the derived class' destructor." << endl; + abort(); } #endif } @@ -60,7 +61,7 @@ void StreamMgr::rescanDAQDevices(bool background, DEBUGTRACE_ENTER; checkRightThread(); - if(!_devices_mtx.try_lock()) { + if (!_devices_mtx.try_lock()) { throw rte("A background DAQ device scan is probably already running"); } _devices_mtx.unlock(); @@ -69,11 +70,11 @@ void StreamMgr::rescanDAQDevices(bool background, throw rte("Rescanning DAQ devices only possible when no stream is running"); } _devices.clear(); - auto &pool = getPool(); - if (background) { - pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); - } else { + /* auto &pool = getPool(); */ + if (!background) { rescanDAQDevices_impl(callback); + } else { + /* pool.push_task(&StreamMgr::rescanDAQDevices_impl, this, callback); */ } } void StreamMgr::rescanDAQDevices_impl(std::function callback) { @@ -86,7 +87,7 @@ void StreamMgr::rescanDAQDevices_impl(std::function callback) { } bool StreamMgr::inCallback(const DaqData &data) { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; std::scoped_lock lck(_inDataHandler_mtx); @@ -95,7 +96,6 @@ bool StreamMgr::inCallback(const DaqData &data) { bool res = handler->inCallback(data); if (!res) return false; - } return true; } @@ -197,6 +197,12 @@ StreamMgr::~StreamMgr() { DEBUGTRACE_ENTER; checkRightThread(); stopAllStreams(); + if (!_inDataHandlers.empty()) { + cerr << "*** WARNING: InDataHandlers have not been all stopped, while " + "StreamMgr destructor is called. This is a misuse BUG" + << endl; + abort(); + } } void StreamMgr::stopAllStreams() { DEBUGTRACE_ENTER; @@ -213,6 +219,10 @@ void StreamMgr::startStream(const DaqConfiguration &config) { config.inchannel_config.cend(), [](auto &i) { return i.enabled; }); + bool isOutput = std::count_if(config.outchannel_config.cbegin(), + config.outchannel_config.cend(), + [](auto &i) { return i.enabled; }); + // Find the first device that matches with the configuration DeviceInfo devinfo; @@ -231,10 +241,6 @@ void StreamMgr::startStream(const DaqConfiguration &config) { isInput |= config.monitorOutput && devinfo.hasInternalOutputMonitor; - bool isOutput = std::count_if(config.outchannel_config.cbegin(), - config.outchannel_config.cend(), - [](auto &i) { return i.enabled; }); - bool isDuplex = isInput && isOutput; if (!isInput && !isOutput) { @@ -242,12 +248,18 @@ void StreamMgr::startStream(const DaqConfiguration &config) { "stream. Cannot start."); } - if ((isDuplex || isInput) && _inputStream) { + if (isInput && _inputStream) { throw rte("Error: an input stream is already running. Please " "first stop existing stream"); } else if (isOutput && _outputStream) { throw rte("Error: output stream is already running. Please " "first stop existing stream"); + } else if (_inputStream) { + if (_inputStream->duplexMode() && isOutput) { + throw rte( + "Error: output stream is already running (in duplex mode). Please " + "first stop existing stream"); + } } InDaqCallback inCallback; @@ -275,7 +287,7 @@ void StreamMgr::startStream(const DaqConfiguration &config) { if (isInput) { _inputStream = std::move(daq); - for(auto& handler: _inDataHandlers) { + for (auto &handler : _inDataHandlers) { handler->reset(_inputStream.get()); } if (_inputStream->duplexMode()) { @@ -297,7 +309,7 @@ void StreamMgr::stopStream(const StreamType t) { /// Kills input stream _inputStream = nullptr; /// Send reset to all in data handlers - for(auto& handler: _inDataHandlers) { + for (auto &handler : _inDataHandlers) { handler->reset(nullptr); } } break; @@ -305,6 +317,7 @@ void StreamMgr::stopStream(const StreamType t) { if (_inputStream && _inputStream->duplexMode()) { _inputStream = nullptr; } else { + if (!_outputStream) { throw rte("Output stream is not running"); } @@ -322,7 +335,7 @@ void StreamMgr::addInDataHandler(InDataHandler &handler) { checkRightThread(); std::scoped_lock lck(_inDataHandler_mtx); _inDataHandlers.push_back(&handler); - if(_inputStream) { + if (_inputStream) { handler.reset(_inputStream.get()); } else { handler.reset(nullptr); @@ -340,16 +353,19 @@ Daq::StreamStatus StreamMgr::getStreamStatus(const StreamType type) const { checkRightThread(); // Default constructor, says stream is not running, but also no errors - Daq::StreamStatus s; const Daq *daq = getDaq(type); if (daq) { - s = daq->getStreamStatus(); + return daq->getStreamStatus(); + } else { + return Daq::StreamStatus(); } - return s; } const Daq *StreamMgr::getDaq(StreamType type) const { + + checkRightThread(); + if (type == StreamType::input) { return _inputStream.get(); } else { diff --git a/src/lasp/device/lasp_uldaq.cpp b/src/lasp/device/lasp_uldaq.cpp index 24fb61a..c32c6cb 100644 --- a/src/lasp/device/lasp_uldaq.cpp +++ b/src/lasp/device/lasp_uldaq.cpp @@ -254,22 +254,22 @@ public: auto runCallback = ([&](us totalOffset) { /* DEBUGTRACE_ENTER; */ - TypedDaqData data(nchannels, nFramesPerBlock, - DataTypeDescriptor::DataType::dtype_fl64); + DaqData data(nFramesPerBlock, nchannels, + DataTypeDescriptor::DataType::dtype_fl64); us monitorOffset = monitorOutput ? 1 : 0; /* /// Put the output monitor in front */ if (monitorOutput) { for (us sample = 0; sample < nFramesPerBlock; sample++) { - data(sample, 0) = + data.value(sample, 0) = buf[totalOffset + sample * nchannels + (nchannels - 1)]; } } - for (us channel = 0; channel < nchannels-monitorOffset; channel++) { + for (us channel = 0; channel < nchannels - monitorOffset; channel++) { /* DEBUGTRACE_PRINT(channel); */ for (us frame = 0; frame < nFramesPerBlock; frame++) { - data(frame, channel + monitorOffset) = + data.value(frame, channel + monitorOffset) = buf[totalOffset + (frame * nchannels) + channel]; } } @@ -392,20 +392,20 @@ public: if (transferStatus.currentIndex < buffer_mid_idx) { topenqueued = false; if (!botenqueued) { - TypedDaqData d(1, nFramesPerBlock, + DaqData d(nFramesPerBlock,1, DataTypeDescriptor::DataType::dtype_fl64); res = cb(d); - d.copyToRaw(0, &buf[buffer_mid_idx]); + d.copyToRaw(0, reinterpret_cast(&(buf[buffer_mid_idx]))); botenqueued = true; } } else { botenqueued = false; if (!topenqueued) { - TypedDaqData d(1, nFramesPerBlock, + DaqData d(nFramesPerBlock,1, DataTypeDescriptor::DataType::dtype_fl64); res = cb(d); - d.copyToRaw(0, buf.data()); + d.copyToRaw(0, reinterpret_cast(&(buf[0]))); topenqueued = true; } diff --git a/src/lasp/dsp/lasp_ppm.cpp b/src/lasp/dsp/lasp_ppm.cpp index 2e1f276..68b8d20 100644 --- a/src/lasp/dsp/lasp_ppm.cpp +++ b/src/lasp/dsp/lasp_ppm.cpp @@ -1,6 +1,6 @@ /* #define DEBUGTRACE_ENABLED */ -#include "lasp_ppm.h" #include "debugtrace.hpp" +#include "lasp_ppm.h" #include using std::cerr; @@ -11,26 +11,40 @@ using rte = std::runtime_error; PPMHandler::PPMHandler(StreamMgr &mgr, const d decay_dBps) : ThreadedInDataHandler(mgr), _decay_dBps(decay_dBps) { + DEBUGTRACE_ENTER; - // This results in a deadlock! - /* std::scoped_lock lck(_mtx); */ - start(); } + bool PPMHandler::inCallback_threaded(const DaqData &d) { - /* DEBUGTRACE_ENTER; */ + + DEBUGTRACE_ENTER; std::scoped_lock lck(_mtx); + dmat data = d.toFloat(); + /* data.print(); */ - const us nchannels = _cur_max.size(); + const us nchannels = d.nchannels; + assert(data.n_cols == nchannels); - vrd maxabs = arma::max(arma::abs(data)); + if (nchannels != _cur_max.size()) { + DEBUGTRACE_PRINT("Resizing clip and cur max"); + _cur_max = vd(nchannels, arma::fill::value(1e-80)); + _clip_time = vd(nchannels, arma::fill::value(-1)); + } + assert(_clip_time.size() == _cur_max.size()); + + /// Obtain max abs values + vd maxabs = arma::max(arma::abs(data), 0).as_col(); + /* maxabs.print(); */ arma::uvec clip_indices = arma::find(maxabs > clip_point); arma::uvec clip(nchannels, arma::fill::zeros); clip.elem(clip_indices).fill(1); arma::uvec update_max_idx = arma::find(maxabs > _cur_max); + /* update_max_idx.print(); */ arma::uvec update_max(nchannels, arma::fill::zeros); + /* update_max.print(); */ update_max.elem(update_max_idx).fill(1); assert(_cur_max.size() == _clip_time.size()); @@ -47,8 +61,6 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) { _clip_time(i) += _dt; } - /* cerr << "maxabs(i)" << maxabs(i) << endl; */ - /* cerr << "curmax(i)" << _cur_max(i) << endl; */ if (update_max(i)) { _cur_max(i) = maxabs(i); } else { @@ -60,7 +72,7 @@ bool PPMHandler::inCallback_threaded(const DaqData &d) { std::tuple PPMHandler::getCurrentValue() const { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; std::scoped_lock lck(_mtx); arma::uvec clips(_clip_time.size(), arma::fill::zeros); @@ -71,24 +83,21 @@ std::tuple PPMHandler::getCurrentValue() const { void PPMHandler::reset(const Daq *daq) { - DEBUGTRACE_ENTER; + /* DEBUGTRACE_ENTER; */ std::scoped_lock lck(_mtx); if (daq) { - _cur_max = vrd(daq->neninchannels(), arma::fill::zeros); + _cur_max.fill(1e-80); + ; + _clip_time.fill(-1); - _clip_time = vd(daq->neninchannels(), arma::fill::value(-1)); const d fs = daq->samplerate(); - DEBUGTRACE_PRINT(fs); + /* DEBUGTRACE_PRINT(fs); */ _dt = daq->framesPerBlock() / fs; _alpha = std::max(d_pow(10, -_dt * _decay_dBps / (20)), 0); - DEBUGTRACE_PRINT(_alpha); - - } else { - _cur_max.clear(); - _clip_time.clear(); + /* DEBUGTRACE_PRINT(_alpha); */ } } diff --git a/src/lasp/dsp/lasp_ppm.h b/src/lasp/dsp/lasp_ppm.h index cbc7776..6988605 100644 --- a/src/lasp/dsp/lasp_ppm.h +++ b/src/lasp/dsp/lasp_ppm.h @@ -52,7 +52,7 @@ class PPMHandler: public ThreadedInDataHandler { /** * @brief Current maximum values */ - vrd _cur_max; + vd _cur_max; /** * @brief How long ago the last clip has happened. Negative in case no clip diff --git a/src/lasp/dsp/lasp_threadedindatahandler.cpp b/src/lasp/dsp/lasp_threadedindatahandler.cpp index 8d3682c..4ee16ac 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.cpp +++ b/src/lasp/dsp/lasp_threadedindatahandler.cpp @@ -4,25 +4,62 @@ #include "lasp_thread.h" #include #include +#include +#include using namespace std::literals::chrono_literals; +using lck = std::scoped_lock; +using rte = std::runtime_error; + +class SafeQueue { + std::queue _queue; + std::mutex _mtx; + std::atomic_int32_t _contents {0}; + public: + void push(const DaqData& d) { + DEBUGTRACE_ENTER; + lck lock(_mtx); + _queue.push(d); + _contents++; + } + DaqData pop() { + DEBUGTRACE_ENTER; + if(_contents ==0) { + throw rte("BUG: Pop on empty queue"); + } + lck lock(_mtx); + + /* DaqData d(std::move(_queue.front())); */ + DaqData d(_queue.front()); + _queue.pop(); + _contents--; + + return d; + } + bool empty() const { + return _contents == 0; + } + + +}; ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr) - : InDataHandler(mgr) { + : InDataHandler(mgr), _queue(std::make_unique()) { - DEBUGTRACE_ENTER; + DEBUGTRACE_ENTER; - // Initialize thread pool, if not already done - getPool(); -} + // Initialize thread pool, if not already done + getPool(); + } bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; if (!_lastCallbackResult) { return false; } - dataqueue.push(daqdata); + + _queue->push(daqdata); auto &pool = getPool(); @@ -46,12 +83,12 @@ ThreadedInDataHandler::~ThreadedInDataHandler() { void ThreadedInDataHandler::threadFcn() { - /* DEBUGTRACE_ENTER; */ + DEBUGTRACE_ENTER; _thread_running = true; - DaqData d{}; - if (dataqueue.pop(d) && !_stopThread) { + if (!_queue->empty() && !_stopThread) { + DaqData d(_queue->pop()); // Call inCallback_threaded if (inCallback_threaded(d) == false) { diff --git a/src/lasp/dsp/lasp_threadedindatahandler.h b/src/lasp/dsp/lasp_threadedindatahandler.h index 0161d11..f628f44 100644 --- a/src/lasp/dsp/lasp_threadedindatahandler.h +++ b/src/lasp/dsp/lasp_threadedindatahandler.h @@ -13,7 +13,7 @@ const us RINGBUFFER_SIZE = 1024; * @{ */ - +class SafeQueue; /** * @brief Threaded in data handler. Buffers inCallback data and calls a * callback with the same signature on a different thread. @@ -22,9 +22,7 @@ class ThreadedInDataHandler: public InDataHandler { /** * @brief The queue used to push elements to the handling thread. */ - boost::lockfree::spsc_queue> - dataqueue; + std::unique_ptr _queue; std::atomic _thread_running{false}; std::atomic _stopThread{false}; diff --git a/src/lasp/dsp/lasp_types.h b/src/lasp/dsp/lasp_types.h index 692a9f5..4fd92a4 100644 --- a/src/lasp/dsp/lasp_types.h +++ b/src/lasp/dsp/lasp_types.h @@ -25,6 +25,7 @@ #include #endif +static_assert(sizeof(size_t) == 8); typedef size_t us; /* Size type I always use */ // To change the whole code to 32-bit floating points, change this to diff --git a/src/lasp/lasp_record.py b/src/lasp/lasp_record.py index 3dacecf..8f83ae5 100644 --- a/src/lasp/lasp_record.py +++ b/src/lasp/lasp_record.py @@ -99,6 +99,7 @@ class Recording: logging.debug("Starting record....") self.indh = InDataHandler(streammgr, self.inCallback, self.resetCallback) + self.indh.start() if wait: logging.debug("Stop recording with CTRL-C") diff --git a/src/lasp/pybind11/lasp_pyindatahandler.cpp b/src/lasp/pybind11/lasp_pyindatahandler.cpp index 694b700..046620d 100644 --- a/src/lasp/pybind11/lasp_pyindatahandler.cpp +++ b/src/lasp/pybind11/lasp_pyindatahandler.cpp @@ -25,13 +25,45 @@ namespace py = pybind11; * * @return Numpy array */ -template py::array_t getPyArray(const DaqData &d) { +template py::array_t getPyArrayNoCpy(const DaqData &d) { // https://github.com/pybind/pybind11/issues/323 // // When a valid object is passed as 'base', it tells pybind not to take // ownership of the data, because 'base' will own it. In fact 'packet' will // own it, but - psss! - , we don't tell it to pybind... Alos note that ANY - // valid object is good for this purpose, so I chose "str"... + // valid object is good for this purpose, so I choose "str"... + + py::str dummyDataOwner; + /* + * Signature: + array_t(ShapeContainer shape, + StridesContainer strides, + const T *ptr = nullptr, + handle base = handle()); + */ + + return py::array_t( + py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape + + py::array::StridesContainer( // Strides + {sizeof(T), + sizeof(T) * d.nframes}), // Strides (in bytes) for each index + + reinterpret_cast( + const_cast(d).raw_ptr()), // Pointer to buffer + + dummyDataOwner // As stated above, now Numpy does not take ownership of + // the data pointer. + ); +} + +template py::array_t dmat_to_ndarray(const DaqData &d) { + // https://github.com/pybind/pybind11/issues/323 + // + // When a valid object is passed as 'base', it tells pybind not to take + // ownership of the data, because 'base' will own it. In fact 'packet' will + // own it, but - psss! - , we don't tell it to pybind... Alos note that ANY + // valid object is good for this purpose, so I choose "str"... py::str dummyDataOwner; /* @@ -73,12 +105,9 @@ public: : ThreadedInDataHandler(mgr), cb(cb), reset_callback(reset_callback) { DEBUGTRACE_ENTER; - /// TODO: Note that if start() throws an exception, which means that the - /// destructor of PyIndataHandler is not called and the thread destructor - /// calls terminate(). It is a kind of rude way to crash, but it is also - /// *very* unlikely to happen, as start() does only add a reference to this - /// handler to a list in the stream mgr. - start(); + /// Start should be called externally, as at constructor time no virtual + /// functions should be called. + /* start(); */ } ~PyIndataHandler() { DEBUGTRACE_ENTER; @@ -100,19 +129,19 @@ public: py::object bool_val; switch (d.dtype) { case (DataType::dtype_int8): { - bool_val = cb(getPyArray(d)); + bool_val = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_int16): { - bool_val = cb(getPyArray(d)); + bool_val = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_int32): { - bool_val = cb(getPyArray(d)); + bool_val = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_fl32): { - bool_val = cb(getPyArray(d)); + bool_val = cb(getPyArrayNoCpy(d)); } break; case (DataType::dtype_fl64): { - bool_val = cb(getPyArray(d)); + bool_val = cb(getPyArrayNoCpy(d)); } break; default: throw std::runtime_error("BUG"); @@ -134,13 +163,22 @@ public: } }; - void init_datahandler(py::module &m) { - py::class_ h(m, "InDataHandler"); - h.def(py::init()); + + py::class_ idh(m, "InDataHandler_base"); + idh.def("start", &InDataHandler::start); + idh.def("stop", &InDataHandler::stop); + + py::class_ tidh( + m, "ThreadedInDataHandler"); + + /// The C++ class is PyIndataHandler, but for Python, it is called + /// InDataHandler + py::class_ pyidh(m, "InDataHandler"); + pyidh.def(py::init()); /// Peak Programme Meter - py::class_ ppm(m, "PPMHandler"); + py::class_ ppm(m, "PPMHandler"); ppm.def(py::init()); ppm.def(py::init()); @@ -153,8 +191,8 @@ void init_datahandler(py::module &m) { /// Real time Aps /// - py::class_ rtaps(m, "RtAps"); - rtaps.def(py::init rtaps(m, "RtAps"); + rtaps.def(py::init(smgr, "StreamType") - .value("input", StreamMgr::StreamType::input) - .value("output", StreamMgr::StreamType::output); + .value("input", StreamMgr::StreamType::input) + .value("output", StreamMgr::StreamType::output); smgr.def("startStream", &StreamMgr::startStream); smgr.def("stopStream", &StreamMgr::stopStream); smgr.def_static("getInstance", []() { - return std::unique_ptr(&StreamMgr::getInstance()); - }); + return std::unique_ptr(&StreamMgr::getInstance()); + }); smgr.def("stopAllStreams", &StreamMgr::stopAllStreams); smgr.def("setSiggen", &StreamMgr::setSiggen); @@ -32,11 +32,13 @@ void init_streammgr(py::module &m) { smgr.def("isStreamRunningOK", &StreamMgr::isStreamRunningOK); smgr.def("isStreamRunning", &StreamMgr::isStreamRunning); smgr.def("getDaq", &StreamMgr::getDaq, py::return_value_policy::reference); - smgr.def("rescanDAQDevices", [](StreamMgr& smgr, bool background) { - // A pure C++ callback is the second argument to rescanDAQDevices, which - // cannot be wrapped to Pybind11. Only the one without callback is - // forwarded here to Python code. - smgr.rescanDAQDevices(background); + smgr.def( + "rescanDAQDevices", + [](StreamMgr &smgr, bool background) { + // A pure C++ callback is the second argument to rescanDAQDevices, which + // cannot be wrapped to Pybind11. Only the one without callback is + // forwarded here to Python code. + smgr.rescanDAQDevices(background); }, - py::arg("background")=false); + py::arg("background") = false); } diff --git a/third_party/armadillo-code b/third_party/armadillo-code index b572743..3865a05 160000 --- a/third_party/armadillo-code +++ b/third_party/armadillo-code @@ -1 +1 @@ -Subproject commit b5727433d342afca53aca0ee16ecf1caaa661821 +Subproject commit 3865a0520d577ac293d88c4fd726a41bda949869