Audio thread does not interfere with Python GIL anymore. Instead, GIL-acquiring stuff runs on different thread which communicates using queues. Small API change in callback to Python, no streamtime is given anymore, and buffers for input and output CAN be none.

This commit is contained in:
Anne de Jong 2020-04-03 11:12:49 +02:00
parent 5516fe44ce
commit 8e0b173fdf
9 changed files with 346 additions and 98 deletions

View File

@ -92,6 +92,9 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -std=c11 -Wall -Wextra -Wno-type-limi
-Werror=implicit-function-declaration -Werror=incompatible-pointer-types \
-Werror=return-type")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC -std=c++11 -Wall -Wextra \
-Wno-type-limits")
# Debug make flags
set(CMAKE_C_FLAGS_DEBUG "-g" )

View File

@ -8,6 +8,7 @@
#pragma once
#ifndef MQ_H
#define MQ_H
#include "lasp_types.h"
typedef struct JobQueue_s JobQueue;
@ -58,7 +59,7 @@ void JobQueue_done(JobQueue* jq,void* job);
* @param job_ptr Pointer to job to be done
* @return 0 on success.
*/
int JobQueue_push(JobQueue* jp,void* job_ptr);
int JobQueue_push(JobQueue* jq,void* job_ptr);
/**
* Wait until the job queue is empty. Please use this function with

View File

@ -6,4 +6,4 @@ set_source_files_properties(lasp_rtaudio.cxx PROPERTIES COMPILE_FLAGS
cython_add_module(lasp_rtaudio lasp_rtaudio.pyx)
# target_link_libraries(lasp_portaudio portaudio) */
target_link_libraries(lasp_rtaudio rtaudio)
target_link_libraries(lasp_rtaudio pthread rtaudio)

View File

@ -0,0 +1,56 @@
// threadsafe_queue.h
//
// Author: J.A. de Jong
//
// Description:
// Implementation of a thread-safe queue, based on STL queue
//////////////////////////////////////////////////////////////////////
#pragma once
#ifndef THREADSAFE_QUEUE_H
#define THREADSAFE_QUEUE_H
#include <queue>
#include <mutex>
#include <condition_variable>
// A threadsafe-queue.
template <class T>
class SafeQueue {
std::queue<T> _queue;
mutable std::mutex _mutex;
std::condition_variable _cv;
public:
SafeQueue(): _queue(), _mutex() , _cv()
{}
~SafeQueue(){}
void enqueue(T t) {
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(t);
_cv.notify_one();
}
T dequeue() {
std::unique_lock<std::mutex> lock(_mutex);
while(_queue.empty())
{
// release lock as long as the wait and reaquire it afterwards.
_cv.wait(lock);
}
T val = _queue.front();
_queue.pop();
return val;
}
bool empty() const {
std::unique_lock<std::mutex> lock(_mutex);
return _queue.size()==0;
}
size_t size() const {
std::unique_lock<std::mutex> lock(_mutex);
return _queue.size();
}
};
#endif // THREADSAFE_QUEUE_H
//////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,29 @@
#pragma once
#ifndef LASP_CPPTHREAD_H
#define LASP_CPPTHREAD_H
#include <chrono>
#include <thread>
#include <assert.h>
// This is a small wrapper around the std library thread.
template <class T, typename F>
class CPPThread {
std::thread _thread;
public:
CPPThread(F threadfcn, T data) :
_thread(threadfcn, data) { }
void join() {
assert(_thread.joinable());
_thread.join();
}
/* ~CPPThread() { */
/* } */
};
void CPPsleep(unsigned int ms) {
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
#endif // LASP_CPPTHREAD_H

View File

@ -5,10 +5,10 @@ from .lasp_daqconfig import DeviceInfo
from libcpp.string cimport string
from libcpp.vector cimport vector
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy
from libc.stdio cimport printf, fprintf, stderr
from libc.string cimport memcpy, memset
from cpython.ref cimport PyObject,Py_INCREF, Py_DECREF
# cdef extern from "lasp_worker.h":
cdef extern from "RtAudio.h" nogil:
ctypedef unsigned long RtAudioStreamStatus
@ -105,6 +105,27 @@ cdef extern from "RtAudio.h" nogil:
unsigned int getStreamSampleRate()
void showWarnings(bool value)
cdef extern from "lasp_cppthread.h" nogil:
cdef cppclass CPPThread[T,F]:
CPPThread(F threadfunction, T data)
void join()
void CPPsleep(unsigned int ms)
cdef extern from "lasp_cppqueue.h" nogil:
cdef cppclass SafeQueue[T]:
SafeQueue()
void enqueue(T t)
T dequeue()
size_t size() const
bool empty() const
cdef extern from "atomic" namespace "std" nogil:
cdef cppclass atomic[T]:
T load()
void store(T)
_formats_strkey = {
'8-bit integers': (RTAUDIO_SINT8, 1, np.int8),
'16-bit integers': (RTAUDIO_SINT16, 2, np.int16),
@ -127,17 +148,6 @@ def get_numpy_dtype_from_format_string(format_string):
def get_sampwidth_from_format_string(format_string):
return _formats_strkey[format_string][-2]
ctypedef struct _Stream:
PyObject* pyCallback
RtAudioFormat sampleformat
cppRtAudio.StreamParameters inputParams
cppRtAudio.StreamParameters outputParams
# These boolean values tell us whether the structs above here are
# initialized and contain valid data
bool hasInput
bool hasOutput
unsigned int bufferFrames
# It took me quite a long time to fully understand Cython's idiosyncrasies
@ -166,7 +176,26 @@ cdef void fromNPYToBuffer(cnp.ndarray arr,
"""
memcpy(buf, arr.data, arr.size*arr.itemsize)
ctypedef struct _Stream:
PyObject* pyCallback
RtAudioFormat sampleformat
atomic[bool] stopThread
unsigned int nFrames
cppRtAudio.StreamParameters inputParams
cppRtAudio.StreamParameters outputParams
# If these queue pointers are NULL, it means the stream does not have an
# input, or output.
SafeQueue[void*] *inputQueue
SafeQueue[void*] *outputQueue
size_t inputbuffersize # Full size of the output buffer, in BYTES
size_t outputbuffersize # Full size of the output buffer, in BYTES
CPPThread[void*, void (*)(void*)] *thread
cdef int audioCallback(void* outputbuffer,
void* inputbuffer,
unsigned int nFrames,
@ -179,67 +208,140 @@ cdef int audioCallback(void* outputbuffer,
"""
cdef:
int rval = 0
cnp.NPY_TYPES npy_format
_Stream* stream
void* outputbuffercpy = NULL
void* inputbuffercpy = NULL
stream = <_Stream*>(userData)
# Returning 2 means aborting the stream immediately
if status == RTAUDIO_INPUT_OVERFLOW:
fprintf(stderr, 'Input overflow.\n')
stream.stopThread.store(True)
return 2
elif status == RTAUDIO_OUTPUT_UNDERFLOW:
fprintf(stderr, 'Output underflow.\n')
# stream.stopThread.store(True)
return 0
if nFrames != stream.nFrames:
printf('Number of frames mismath in callback data!\n')
stream.stopThread.store(True)
return 2
if inputbuffer:
# assert stream.inputQueue is not NULL
inputbuffercpy = malloc(stream.inputbuffersize)
memcpy(inputbuffercpy, inputbuffer,
stream.inputbuffersize)
stream.inputQueue.enqueue(inputbuffercpy)
if outputbuffer:
# assert stream.outputQueue is not NULL
if stream.outputQueue.empty():
fprintf(stderr, 'Stream output buffer underflow, zero-ing buffer...\n')
# Pre-stack three empty output buffers
# printf('Pre-stacking\n')
# outputbuffer = malloc(stream.outputbuffersize)
memset(outputbuffer, 0, stream.outputbuffersize)
if stream.inputQueue:
stream.inputQueue.enqueue(NULL)
return 0
outputbuffercpy = stream.outputQueue.dequeue()
memcpy(outputbuffer, outputbuffercpy,
stream.outputbuffersize)
free(outputbuffercpy)
return 0
cdef void audioCallbackPythonThreadFunction(void* voidstream) nogil:
cdef:
_Stream* stream
cnp.NPY_TYPES npy_format
void* inputbuffer = NULL
void* outputbuffer = NULL
stream = <_Stream*> voidstream
printf('Thread started\n')
with gil:
if status == RTAUDIO_INPUT_OVERFLOW:
print('Input overflow.')
return 0
elif status == RTAUDIO_OUTPUT_UNDERFLOW:
print('Output underflow.')
return 0
else:
pass
stream = <_Stream*>(userData)
callback = <object> stream[0].pyCallback
# Obtain stream information
npy_input = None
npy_output = None
if stream.hasInput:
npy_format = _formats_rtkey[stream.sampleformat][2]
callback = <object> stream.pyCallback
while True:
if stream.stopThread.load() == True:
printf('Stopping thread...\n')
return
if stream.inputQueue:
inputbuffer = stream.inputQueue.dequeue()
# if inputbuffer == NULL:
# continue
if stream.outputQueue:
outputbuffer = malloc(stream.outputbuffersize)
with gil:
# Obtain stream information
npy_input = None
npy_output = None
if stream.inputQueue and inputbuffer:
try:
npy_input = fromBufferToNPYNoCopy(
npy_format,
inputbuffer,
stream.inputParams.nChannels,
stream.nFrames)
except Exception as e:
print('exception in cython callback for audio input: ', str(e))
return
if stream.outputQueue:
try:
assert outputbuffer != NULL
npy_output = fromBufferToNPYNoCopy(
npy_format,
outputbuffer,
stream.outputParams.nChannels,
stream.nFrames)
except Exception as e:
print('exception in Cython callback for audio output: ', str(e))
return
try:
assert inputbuffer != NULL
npy_format = _formats_rtkey[stream.sampleformat][2]
npy_input = fromBufferToNPYNoCopy(
npy_format,
inputbuffer,
stream[0].inputParams.nChannels,
nFrames)
rval = callback(npy_input,
npy_output,
stream.nFrames,
)
except Exception as e:
print('exception in cython callback for input: ', str(e))
return 1
print('Exception in Cython callback: ', str(e))
return
if stream[0].hasOutput:
try:
assert outputbuffer != NULL
npy_format = _formats_rtkey[stream[0].sampleformat][2]
npy_output = fromBufferToNPYNoCopy(
npy_format,
outputbuffer,
stream[0].outputParams.nChannels,
nFrames)
if stream.outputQueue:
stream.outputQueue.enqueue(outputbuffer)
if not stream.inputQueue:
while stream.outputQueue.size() > 10 and not stream.stopThread.load():
# printf('Sleeping...\n')
# No input queue to wait on, so we relax a bit here.
CPPsleep(1);
except Exception as e:
print('exception in Cython callback for output: ', str(e))
return 1
try:
rval = callback(npy_input,
npy_output,
nFrames,
streamTime)
except Exception as e:
print('Exception in Cython callback: ', str(e))
return 1
# Outputbuffer is free'ed by the audiothread, so should not be touched
# here.
outputbuffer = NULL
# Inputbuffer memory is owned by Numpy, so should not be free'ed
inputbuffer = NULL
return rval
cdef void errorCallback(RtAudioError.Type _type,const string& errortxt) nogil:
with gil:
print('Error callback called: %s', errortxt)
printf('RtAudio error callback called: ')
printf(errortxt.c_str())
printf('\n')
cdef class RtAudio:
@ -249,11 +351,13 @@ cdef class RtAudio:
def __cinit__(self):
self._stream = NULL
self._rtaudio.showWarnings(True)
def __dealloc__(self):
if self._stream is not NULL:
print('Force closing stream')
fprintf(stderr, 'Force closing stream...')
self._rtaudio.closeStream()
self.cleanupStream(self._stream)
cpdef unsigned int getDeviceCount(self):
return self._rtaudio.getDeviceCount()
@ -316,57 +420,106 @@ cdef class RtAudio:
if self._stream is not NULL:
raise RuntimeError('Stream is already opened.')
cdef cppRtAudio.StreamParameters *rtOutputParams_ptr = NULL
cdef cppRtAudio.StreamParameters *rtInputParams_ptr = NULL
cdef:
cppRtAudio.StreamParameters *rtOutputParams_ptr = NULL
cppRtAudio.StreamParameters *rtInputParams_ptr = NULL
cppRtAudio.StreamOptions streamoptions
size_t bytespersample
unsigned int bufferFrames_local
cdef cppRtAudio.StreamOptions streamoptions
streamoptions.flags = RTAUDIO_HOG_DEVICE
streamoptions.numberOfBuffers = 4
bufferFrames_local = bufferFrames
self._stream = <_Stream*> malloc(sizeof(_Stream))
self._stream.hasInput = False
self._stream.hasInput = False
if self._stream is NULL:
if self._stream == NULL:
raise MemoryError()
self._stream[0].pyCallback = <PyObject*> pyCallback
self._stream.pyCallback = <PyObject*> pyCallback
Py_INCREF(pyCallback)
self._stream[0].sampleformat = _formats_strkey[sampleformat][0]
self._stream.sampleformat = _formats_strkey[sampleformat][0]
self._stream.inputQueue = NULL
self._stream.outputQueue = NULL
self._stream.outputbuffersize = 0
self._stream.inputbuffersize = 0
self._stream.stopThread.store(False)
self._stream.thread = NULL
bytespersample = get_sampwidth_from_format_string(sampleformat)
if outputParams is not None:
rtOutputParams_ptr = &self._stream.outputParams
rtOutputParams_ptr.deviceId = outputParams['deviceid']
rtOutputParams_ptr.nChannels = outputParams['nchannels']
rtOutputParams_ptr.firstChannel = outputParams['firstchannel']
self._stream[0].hasOutput = True
self._stream.outputQueue = new SafeQueue[void*]()
if inputParams is not None:
rtInputParams_ptr = &self._stream.inputParams
rtInputParams_ptr.deviceId = inputParams['deviceid']
rtInputParams_ptr.nChannels = inputParams['nchannels']
rtInputParams_ptr.firstChannel = inputParams['firstchannel']
self._stream[0].hasInput = True
self._stream.inputQueue = new SafeQueue[void*]()
try:
self._stream.bufferFrames = bufferFrames
self._rtaudio.openStream(rtOutputParams_ptr,
rtInputParams_ptr,
_formats_strkey[sampleformat][0],
sampleRate,
&self._stream[0].bufferFrames,
&bufferFrames_local,
audioCallback,
<void*> self._stream,
&streamoptions, # Stream options
errorCallback # Error callback
)
self._stream.nFrames = bufferFrames_local
except Exception as e:
print('Exception occured in stream opening: ', str(e))
self.cleanupStream(self._stream)
self._stream = NULL
free(self._stream)
Py_INCREF(pyCallback)
raise
return self._stream.bufferFrames
if inputParams is not None:
self._stream.inputbuffersize = bufferFrames_local*bytespersample*inputParams['nchannels']
if outputParams is not None:
self._stream.outputbuffersize = bufferFrames_local*bytespersample*outputParams['nchannels']
with nogil:
self._stream.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction,
<void*> self._stream)
CPPsleep(500)
return bufferFrames_local
cdef cleanupStream(self, _Stream* stream):
# printf('Entrance function cleanupStream...\n')
if stream == NULL:
return
with nogil:
if stream.thread:
stream.stopThread.store(True)
if stream.inputQueue:
# If waiting in the input queue, hereby we let it run.
stream.inputQueue.enqueue(NULL)
# printf('Joining thread...\n')
# HERE WE SHOULD RELEASE THE GIL, as exiting the thread function
# will require the GIL, which is locked by this thread!
stream.thread.join()
# printf('Thread joined!\n')
del stream.thread
if stream.outputQueue:
del stream.outputQueue
if stream.inputQueue:
del stream.inputQueue
if stream.pyCallback:
Py_DECREF(<object> stream.pyCallback)
free(stream)
# printf('Cleanup of stream is done\n')
def startStream(self):
self._rtaudio.startStream()
@ -374,15 +527,18 @@ cdef class RtAudio:
def stopStream(self):
if self._stream is NULL:
raise RuntimeError('Stream is not opened')
self._rtaudio.stopStream()
try:
self._rtaudio.stopStream()
except:
pass
def closeStream(self):
# print('closeStream')
if self._stream is NULL:
raise RuntimeError('Stream is not opened')
# Closing stream
self._rtaudio.closeStream()
Py_DECREF(<object> self._stream[0].pyCallback)
free(self._stream)
self.cleanupStream(self._stream)
self._stream = NULL
def abortStream(self):

View File

@ -96,18 +96,6 @@ class AvStream:
self.sampleformat = daqconfig.en_input_sample_format
self.samplerate = int(daqconfig.en_input_rate)
try:
self._rtaudio = RtAudio()
self.blocksize = self._rtaudio.openStream(
rtaudio_outputparams, # Outputparams
rtaudio_inputparams, # Inputparams
self.sampleformat, # Sampleformat
self.samplerate,
self.nframes_per_block, # Buffer size in frames
self._audioCallback)
except Exception as e:
raise RuntimeError(f'Could not initialize DAQ device: {str(e)}')
# Fill in numpy data type, and sample width
self.numpy_dtype = get_numpy_dtype_from_format_string(
@ -138,6 +126,19 @@ class AvStream:
# Possible, but long not tested: store video
self._videothread = None
try:
self._rtaudio = RtAudio()
self.blocksize = self._rtaudio.openStream(
rtaudio_outputparams, # Outputparams
rtaudio_inputparams, # Inputparams
self.sampleformat, # Sampleformat
self.samplerate,
self.nframes_per_block, # Buffer size in frames
self._audioCallback)
except Exception as e:
raise RuntimeError(f'Could not initialize DAQ device: {str(e)}')
def close(self):
self._rtaudio.closeStream()
self._rtaudio = None
@ -207,7 +208,7 @@ class AvStream:
cap.release()
print('stopped videothread')
def _audioCallback(self, indata, outdata, nframes, streamtime):
def _audioCallback(self, indata, outdata, nframes):
"""This is called (from a separate thread) for each audio block."""
self._aframectr += nframes
with self._callbacklock:

View File

@ -289,7 +289,7 @@ class Measurement:
def praw(self, block=None):
"""
Returns the uncalibrated acoustic pressure signal, converted to
floating point acoustic pressure values [Pa].
floating point acoustic pressure values [Pa].
"""
if block is not None:
with self.file() as f:

View File

@ -145,6 +145,8 @@ class Recording:
def _aCallback(self, indata, outdata, aframe):
if indata is None:
return
curT = self._ablockno()*self.blocksize/self.samplerate
recstatus = RecordStatus(