Split up indatahandler with python callback in part and part that calls the Python function. Threading is now handled using a thread pool. Some bugfixes

This commit is contained in:
Anne de Jong 2022-08-14 21:00:22 +02:00
parent 1e8b18aabe
commit c75f0dddc5
13 changed files with 315 additions and 195 deletions

View File

@ -1,3 +1,4 @@
#define DEBUGTRACE_ENABLED
#include "debugtrace.hpp"
#include "lasp_daqconfig.h"

View File

@ -1,4 +1,4 @@
/* #define DEBUGTRACE_ENABLED */
#define DEBUGTRACE_ENABLED
#include "debugtrace.hpp"
#include "lasp_daqconfig.h"
@ -7,8 +7,6 @@
#include <cassert>
#include <stdexcept>
#define MAX_DEV_COUNT_PER_API 20
using std::vector;
vector<DaqApi> DaqApi::getAvailableApis() {

View File

@ -1,6 +1,8 @@
#define DEBUGTRACE_ENABLED
#include "debugtrace.hpp"
#include "lasp_rtaudiodaq.h"
#if LASP_HAS_RTAUDIO == 1
#include "debugtrace.hpp"
#include "lasp_daq.h"
#include <RtAudio.h>
#include <atomic>

View File

@ -1,5 +1,7 @@
#include "lasp_streammgr.h"
#define DEBUGTRACE_ENABLED
#include "debugtrace.hpp"
#include "lasp_thread.h"
#include "lasp_streammgr.h"
#include <algorithm>
#include <assert.h>
#include <functional>
@ -37,6 +39,7 @@ InDataHandler::~InDataHandler() {
StreamMgr &StreamMgr::getInstance() {
DEBUGTRACE_ENTER;
getPool();
static StreamMgr mgr;
return mgr;
}
@ -198,6 +201,29 @@ void StreamMgr::startStream(const DeviceInfo &devinfo,
_outputStream = std::move(daq);
}
}
void StreamMgr::stopStream(const StreamType t) {
switch (t) {
case (StreamType::input): {
if (!_inputStream) {
throw std::runtime_error("Input stream is not running");
}
_inputStream = nullptr;
} break;
case (StreamType::output): {
if (_inputStream && _inputStream->duplexMode()) {
_inputStream = nullptr;
} else {
if (!_outputStream) {
throw std::runtime_error("Output stream is not running");
}
_outputStream = nullptr;
} // end else
} break;
default:
throw std::runtime_error("BUG");
break;
}
}
void StreamMgr::addInDataHandler(InDataHandler &handler) {
std::scoped_lock<std::mutex> lck(_inDataHandler_mtx);

View File

@ -155,7 +155,7 @@ public:
*
* @param stype The stream type to stop.
*/
void stopStream(StreamType stype);
void stopStream(const StreamType stype);
/**
* @brief Stop and delete all streams. Also called on destruction of the

View File

@ -11,12 +11,15 @@ set(lasp_dsp_files
lasp_thread.cpp
lasp_timebuffer.cpp
lasp_slm.cpp
lasp_threadedindatahandler.cpp
)
add_library(lasp_dsp_lib STATIC ${lasp_dsp_files})
target_compile_definitions(lasp_dsp_lib PUBLIC "-DARMA_DONT_USE_WRAPPER")
target_link_libraries(lasp_dsp_lib PRIVATE ${BLAS_LIBRARIES})
target_include_directories(lasp_dsp_lib INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
target_include_directories(lasp_dsp_lib PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_include_directories(lasp_dsp_lib PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../device)

View File

@ -1,9 +1,25 @@
#define DEBUGTRACE_ENABLED
#include "lasp_thread.h"
#include "BS_thread_pool.hpp"
#include "debugtrace.hpp"
#include <memory>
/**
* @brief It seems to work much better in cooperation with Pybind11 when this
* singleton is implemented with a unique_ptr.
*/
std::unique_ptr<BS::thread_pool> _static_storage_threadpool;
void destroyThreadPool() {
DEBUGTRACE_ENTER;
_static_storage_threadpool = nullptr;
}
BS::thread_pool &getPool() {
static BS::thread_pool pool;
return pool;
/* DEBUGTRACE_ENTER; */
if (!_static_storage_threadpool) {
DEBUGTRACE_PRINT("Creating new thread pool");
_static_storage_threadpool = std::make_unique<BS::thread_pool>();
}
return *_static_storage_threadpool;
}

View File

@ -9,3 +9,11 @@
* @return Thread pool ref.
*/
BS::thread_pool& getPool();
/**
* @brief The global thread pool is stored in a unique_ptr, so in normal C++
* code the thread pool is deleted at the end of main(). However this does not
* hold when LASP code is run
*/
void destroyThreadPool();

View File

@ -0,0 +1,65 @@
#define DEBUGTRACE_ENABLED
#include "lasp_threadedindatahandler.h"
#include "debugtrace.hpp"
#include "lasp_thread.h"
#include <future>
#include <thread>
using namespace std::literals::chrono_literals;
ThreadedInDataHandler::ThreadedInDataHandler(StreamMgr &mgr)
: InDataHandler(mgr) {
DEBUGTRACE_ENTER;
// Initialize thread pool, if not already done
getPool();
}
bool ThreadedInDataHandler::inCallback(const DaqData &daqdata) {
/* DEBUGTRACE_ENTER; */
if (!_lastCallbackResult) {
return false;
}
dataqueue.push(daqdata);
auto &pool = getPool();
if (!_thread_running && (!_stopThread) && _lastCallbackResult) {
pool.push_task(&ThreadedInDataHandler::threadFcn, this);
}
return _lastCallbackResult;
}
ThreadedInDataHandler::~ThreadedInDataHandler() {
DEBUGTRACE_ENTER;
_stopThread = true;
// Then wait in steps for the thread to stop running.
while (_thread_running) {
std::this_thread::sleep_for(10us);
}
}
void ThreadedInDataHandler::threadFcn() {
/* DEBUGTRACE_ENTER; */
_thread_running = true;
DaqData d{};
if (dataqueue.pop(d) && !_stopThread) {
// Call inCallback_threaded
if (inCallback_threaded(d) == false) {
_lastCallbackResult = false;
_thread_running = false;
return;
}
}
_lastCallbackResult = true;
_thread_running = false;
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <boost/lockfree/spsc_queue.hpp>
#include "lasp_streammgr.h"
const us RINGBUFFER_SIZE = 1024;
class ThreadedInDataHandler: public InDataHandler {
/**
* @brief The queue used to push elements to the handling thread.
*/
boost::lockfree::spsc_queue<DaqData,
boost::lockfree::capacity<RINGBUFFER_SIZE>>
dataqueue;
std::atomic<bool> _thread_running{false};
std::atomic<bool> _stopThread{false};
std::atomic<bool> _lastCallbackResult{true};
void threadFcn();
public:
ThreadedInDataHandler(StreamMgr& mgr);
~ThreadedInDataHandler();
/**
* @brief Pushes a copy of the daqdata to the thread queue and returns
*
* @param daqdata the daq info to push
*
* @return true, to continue with sampling.
*/
virtual bool inCallback(const DaqData &daqdata) override;
virtual bool inCallback_threaded(const DaqData&) = 0;
};

View File

@ -1,8 +1,8 @@
#define DEBUGTRACE_ENABLED
#include "debugtrace.hpp"
#include "lasp_streammgr.h"
#include "lasp_threadedindatahandler.h"
#include <atomic>
#include <boost/lockfree/policies.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <chrono>
#include <pybind11/buffer_info.h>
#include <pybind11/cast.h>
@ -17,8 +17,6 @@ using namespace std::literals::chrono_literals;
using std::cerr;
using std::endl;
const us RINGBUFFER_SIZE = 1024;
namespace py = pybind11;
/**
@ -30,7 +28,7 @@ namespace py = pybind11;
*
* @return Numpy array
*/
template <typename T> py::array_t<T> getPyArray(DaqData &d) {
template <typename T> py::array_t<T> getPyArray(const DaqData &d) {
// https://github.com/pybind/pybind11/issues/323
//
// When a valid object is passed as 'base', it tells pybind not to take
@ -49,12 +47,13 @@ template <typename T> py::array_t<T> getPyArray(DaqData &d) {
return py::array_t<T>(
py::array::ShapeContainer({d.nframes, d.nchannels}), // Shape
//
py::array::StridesContainer( // Strides
{sizeof(T),
sizeof(T) * d.nframes}), // Strides (in bytes) for each index
(T *)d.raw_ptr(), // Pointer to buffer
reinterpret_cast<T *>(
const_cast<DaqData &>(d).raw_ptr()), // Pointer to buffer
dummyDataOwner // As stated above, now Numpy does not take ownership of
// the data pointer.
@ -66,35 +65,15 @@ template <typename T> py::array_t<T> getPyArray(DaqData &d) {
* buffer of sample data. The Python callback is called from a different
* thread, using a Numpy argument as
*/
class PyIndataHandler : public InDataHandler {
class PyIndataHandler : public ThreadedInDataHandler {
/**
* @brief The callback function that is called.
*/
py::function cb;
/**
* @brief The thread that is handling callbacks from the queue
*/
std::thread pythread;
/**
* @brief The queue used to push elements to the handling thread.
*/
boost::lockfree::spsc_queue<DaqData,
boost::lockfree::capacity<RINGBUFFER_SIZE>>
dataqueue;
std::atomic<bool> stopThread{false};
public:
~PyIndataHandler() {
DEBUGTRACE_ENTER;
stop();
stopThread = true;
if(pythread.joinable()) {
pythread.join();
}
}
PyIndataHandler(StreamMgr &mgr, py::function cb)
: InDataHandler(mgr), cb(cb),
pythread(&PyIndataHandler::threadfcn, this) {
: ThreadedInDataHandler(mgr), cb(cb) {
DEBUGTRACE_ENTER;
/// TODO: Note that if start() throws an exception, which means that the
@ -104,34 +83,32 @@ class PyIndataHandler : public InDataHandler {
/// handler to a list in the stream mgr.
start();
}
~PyIndataHandler() {
DEBUGTRACE_ENTER;
stop();
}
/**
* @brief Reads from the
* @brief Reads from the buffer
*/
void threadfcn() {
bool inCallback_threaded(const DaqData &d) {
/* DEBUGTRACE_ENTER; */
using DataType = DataTypeDescriptor::DataType;
DaqData d{};
while (!stopThread) {
if (dataqueue.pop(d)) {
/* DEBUGTRACE_PRINT("pop() returned true"); */
py::gil_scoped_acquire acquire;
try {
py::array binfo;
switch (d.dtype) {
case (DataType::dtype_int8):
binfo = getPyArray<uint8_t>(d);
binfo = getPyArray<int8_t>(d);
break;
case (DataType::dtype_int16):
binfo = getPyArray<uint16_t>(d);
binfo = getPyArray<int16_t>(d);
break;
case (DataType::dtype_int32):
binfo = getPyArray<uint32_t>(d);
binfo = getPyArray<int32_t>(d);
break;
case (DataType::dtype_fl32):
binfo = getPyArray<float>(d);
@ -146,50 +123,21 @@ class PyIndataHandler : public InDataHandler {
py::object bool_val = cb(binfo);
bool res = bool_val.cast<bool>();
if (!res)
stopThread = true;
return false;
} catch (py::error_already_set &e) {
cerr << "ERROR: Python raised exception from callback function: ";
cerr << e.what() << endl;
stopThread = true;
return false;
} catch (py::cast_error &e) {
cerr << e.what() << endl;
cerr << "ERROR: Python callback does not return boolean value."
<< endl;
stopThread = true;
}
}
// When there is nothing to pop from the queue, just sleep for a some
// time.
else {
std::this_thread::sleep_for(2ms);
}
} // end of while loop
} // end of threadfcn
/**
* @brief Pushes a copy of the daqdata to the thread queuue and returns
*
* @param daqdata the daq info to push
*
* @return true, to continue with sampling.
*/
virtual bool inCallback(const DaqData &daqdata) override {
/* DEBUGTRACE_ENTER; */
if (!stopThread) {
if (!dataqueue.push(daqdata)) {
stopThread = true;
cerr << "Pushing DaqData object failed. Probably the ringbuffer is "
"full. Try reducing the load"
<< endl;
}
cerr << "ERROR: Python callback does not return boolean value." << endl;
return false;
}
return true;
}
};
void init_datahandler(py::module &m) {
py::class_<PyIndataHandler> h(m, "InDataHandler");

View File

@ -18,7 +18,7 @@ void init_streammgr(py::module &m) {
.export_values();
smgr.def("startStream", &StreamMgr::startStream);
smgr.def("stopStream", &StreamMgr::startStream);
smgr.def("stopStream", &StreamMgr::stopStream);
smgr.def_static("getInstance", []() {
return std::unique_ptr<StreamMgr, py::nodelete>(&StreamMgr::getInstance());
});

View File

@ -3,6 +3,7 @@
{
"cell_type": "code",
"execution_count": 1,
"id": "b0d15138",
"metadata": {},
"outputs": [
{
@ -12,27 +13,15 @@
"make: Entering directory '/home/anne/wip/mycode/lasp'\n",
"make[1]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"\u001b[35m\u001b[1mConsolidate compiler generated dependencies of target lasp_dsp_lib\u001b[0m\n",
"make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
"[ 26%] Built target lasp_dsp_lib\n",
"make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"\u001b[35m\u001b[1mScanning dependencies of target lasp_device_lib\u001b[0m\n",
"make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
"make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"[ 31%] \u001b[32mBuilding CXX object src/lasp/device/CMakeFiles/lasp_device_lib.dir/lasp_rtaudiodaq.cpp.o\u001b[0m\n",
"\u001b[01m\u001b[K/home/anne/wip/mycode/lasp/src/lasp/device/lasp_rtaudiodaq.cpp:\u001b[m\u001b[K In lambda function:\n",
"\u001b[01m\u001b[K/home/anne/wip/mycode/lasp/src/lasp/device/lasp_rtaudiodaq.cpp:258:20:\u001b[m\u001b[K \u001b[01;35m\u001b[Kwarning: \u001b[m\u001b[Kvariable \u001b[01m\u001b[Kstat\u001b[m\u001b[K set but not used [\u001b[01;35m\u001b[K-Wunused-but-set-variable\u001b[m\u001b[K]\n",
" 258 | StreamStatus \u001b[01;35m\u001b[Kstat\u001b[m\u001b[K = _streamStatus;\n",
" | \u001b[01;35m\u001b[K^~~~\u001b[m\u001b[K\n",
"\u001b[01m\u001b[K/home/anne/wip/mycode/lasp/src/lasp/device/lasp_rtaudiodaq.cpp:\u001b[m\u001b[K In member function \u001b[01m\u001b[Kint RtAudioDaq::streamCallback(void*, void*, unsigned int, double, RtAudioStreamStatus)\u001b[m\u001b[K:\n",
"\u001b[01m\u001b[K/home/anne/wip/mycode/lasp/src/lasp/device/lasp_rtaudiodaq.cpp:250:36:\u001b[m\u001b[K \u001b[01;35m\u001b[Kwarning: \u001b[m\u001b[Kunused parameter \u001b[01m\u001b[KstreamTime\u001b[m\u001b[K [\u001b[01;35m\u001b[K-Wunused-parameter\u001b[m\u001b[K]\n",
" 250 | unsigned int nFrames, \u001b[01;35m\u001b[Kdouble streamTime\u001b[m\u001b[K,\n",
" | \u001b[01;35m\u001b[K~~~~~~~^~~~~~~~~~\u001b[m\u001b[K\n",
"\u001b[35m\u001b[1mConsolidate compiler generated dependencies of target lasp_device_lib\u001b[0m\n",
"make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
"[ 63%] Built target lasp_device_lib\n",
"make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
"make[2]: Entering directory '/home/anne/wip/mycode/lasp'\n",
"[ 68%] \u001b[32m\u001b[1mLinking CXX shared module lasp_cpp.cpython-38-x86_64-linux-gnu.so\u001b[0m\n",
"\u001b[35m\u001b[1mConsolidate compiler generated dependencies of target lasp_cpp\u001b[0m\n",
"make[2]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
"[100%] Built target lasp_cpp\n",
"make[1]: Leaving directory '/home/anne/wip/mycode/lasp'\n",
@ -47,8 +36,20 @@
{
"cell_type": "code",
"execution_count": 2,
"id": "1787e24c",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-07-20 22:04:50+0200 DebugTrace-cpp 2.0.0a2 (g++ 12.1.0)\n",
"2022-07-20 22:04:50+0200 \n",
"2022-07-20 22:04:50+0200 Enter fillUlDaqDeviceInfo (lasp_uldaq.cpp: 514)\n",
"2022-07-20 22:04:51+0200 Leave fillUlDaqDeviceInfo (lasp_uldaq.cpp)\n"
]
}
],
"source": [
"import lasp\n",
"ds = lasp.DeviceInfo.getDeviceInfo()"
@ -57,6 +58,7 @@
{
"cell_type": "code",
"execution_count": 3,
"id": "cb4ab7d8",
"metadata": {},
"outputs": [
{
@ -74,17 +76,26 @@
{
"cell_type": "code",
"execution_count": 4,
"id": "22ae99b1",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0: Camera Mono\n",
"0: Monitor of Starship/Matisse HD Audio Controller Analog Stereo\n",
"1: Starship/Matisse HD Audio Controller Analog Stereo\n",
"2: Baffin HDMI/DP Audio [Radeon RX 550 640SP / RX 560/560X] Digital Stereo (HDMI)\n",
"3: Monitor of Baffin HDMI/DP Audio [Radeon RX 550 640SP / RX 560/560X] Digital Stereo (HDMI)\n",
"4: Monitor of Starship/Matisse HD Audio Controller Analog Stereo\n"
"2: GP108 High Definition Audio Controller Digital Stereo (HDMI)\n",
"3: Monitor of GP108 High Definition Audio Controller Digital Stereo (HDMI)\n",
"4: default\n",
"5: hw:HDA NVidia,3\n",
"6: hw:HDA NVidia,7\n",
"7: hw:HDA NVidia,8\n",
"8: hw:HDA NVidia,9\n",
"9: hw:HDA NVidia,10\n",
"10: hw:HD-Audio Generic,0\n",
"11: hw:HD-Audio Generic,1\n",
"12: hw:HD-Audio Generic,2\n"
]
}
],
@ -97,6 +108,7 @@
{
"cell_type": "code",
"execution_count": 5,
"id": "47385b02",
"metadata": {},
"outputs": [],
"source": [
@ -107,56 +119,28 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": 59,
"id": "d12f84b7",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"2"
"name": "stdout",
"output_type": "stream",
"text": [
"Out channels: 2\n",
"In channels: 0\n"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"d.noutchannels"
"print('Out channels:',d.noutchannels)\n",
"print('In channels:',d.ninchannels)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"#daq = lasp.Daq.createDaq(d, config)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"2"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"d.ninchannels"
]
},
{
"cell_type": "code",
"execution_count": 9,
"execution_count": 52,
"id": "902ce309",
"metadata": {},
"outputs": [],
"source": [
@ -165,16 +149,38 @@
},
{
"cell_type": "code",
"execution_count": 15,
"execution_count": 55,
"id": "b209294b",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-07-20 22:07:02+0200 \n",
"2022-07-20 22:07:02+0200 Enter createDaq (lasp_daq.cpp: 18)\n",
"2022-07-20 22:07:02+0200 | Enter Daq (lasp_daq.cpp: 39)\n",
"2022-07-20 22:07:02+0200 | Leave Daq (lasp_daq.cpp)\n",
"2022-07-20 22:07:02+0200 | \n",
"2022-07-20 22:07:02+0200 | Enter RtAudioDaq (lasp_rtaudiodaq.cpp: 131)\n",
"2022-07-20 22:07:02+0200 | Leave RtAudioDaq (lasp_rtaudiodaq.cpp)\n",
"2022-07-20 22:07:02+0200 Leave createDaq (lasp_daq.cpp)\n",
"2022-07-20 22:07:02+0200 isInput = false\n",
"2022-07-20 22:07:02+0200 isOutput = true\n",
"2022-07-20 22:07:02+0200 \n",
"2022-07-20 22:07:02+0200 Enter start (lasp_rtaudiodaq.cpp: 211)\n",
"2022-07-20 22:07:02+0200 Leave start (lasp_rtaudiodaq.cpp)\n"
]
}
],
"source": [
"mgr.startStream(d, config)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"execution_count": 27,
"id": "0830ffb5",
"metadata": {},
"outputs": [],
"source": [
@ -183,7 +189,8 @@
},
{
"cell_type": "code",
"execution_count": 16,
"execution_count": 60,
"id": "a7fddc19",
"metadata": {},
"outputs": [],
"source": [
@ -192,26 +199,36 @@
},
{
"cell_type": "code",
"execution_count": 13,
"execution_count": 57,
"id": "f4610574",
"metadata": {},
"outputs": [],
"source": [
"sine = lasp.Sine(1000)"
"sine = lasp.Sine(200)\n",
"sine.setLevel(-20, True)\n",
"mgr.setSiggen(sine)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"execution_count": 21,
"id": "d11c7dae",
"metadata": {},
"outputs": [],
"source": [
"mgr.setSiggen(sine)"
]
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "0eeb2311",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
@ -225,7 +242,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.10.5"
}
},
"nbformat": 4,