diff --git a/CMakeLists.txt b/CMakeLists.txt index c96fad9..664e68b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -76,7 +76,8 @@ else() set(win32 false) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -std=c11 \ -Werror=incompatible-pointer-types") - + include_directories(/usr/local/include/rtaudio) + link_directories(/usr/local/lib) endif(CMAKE_SYSTEM_NAME STREQUAL "Windows") diff --git a/lasp/__init__.py b/lasp/__init__.py index a921472..60b611a 100644 --- a/lasp/__init__.py +++ b/lasp/__init__.py @@ -6,3 +6,4 @@ from .lasp_octavefilter import * from .lasp_slm import * from .lasp_weighcal import * from .wrappers import * +from .device import AvType diff --git a/lasp/c/lasp_pyarray.h b/lasp/c/lasp_pyarray.h new file mode 100644 index 0000000..54e0a22 --- /dev/null +++ b/lasp/c/lasp_pyarray.h @@ -0,0 +1,76 @@ +// ascee_python.h +// +// Author: J.A. de Jong - ASCEE +// +// Description: +// Routine to generate a Numpy array from an arbitrary buffer. Careful, this +// code should both be C and C++ compatible!! +////////////////////////////////////////////////////////////////////// +#pragma once +#ifndef LASP_PYARRAY_H +#define LASP_PYARRAY_H + +#define TRACERPLUS (-10) +#include +#ifdef LASP_DOUBLE_PRECISION +#define LASP_NUMPY_FLOAT_TYPE NPY_FLOAT64 +#define LASP_NUMPY_COMPLEX_TYPE NPY_COMPLEX128 +#else +#define LASP_NUMPY_FLOAT_TYPE NPY_FLOAT32 +#endif + +#ifdef MS_WIN64 +/** + * Function passed to Python to use for cleanup of + * foreignly obtained data. + **/ +static inline void capsule_cleanup(void *capsule) { + void *memory = PyCapsule_GetPointer(capsule, NULL); + free(memory); +} + +#endif + +static inline PyObject *data_to_ndarray(void *data, int n_rows, int n_cols, + int typenum, bool transfer_ownership, + bool F_contiguous) { + + /* fprintf(stderr, "Enter data_to_ndarray\n"); */ + assert(data); + import_array(); + + npy_intp dims[2] = {n_rows, n_cols}; + assert(n_rows > 0); + assert(n_cols > 0); + + PyArrayObject *arr = + (PyArrayObject *)PyArray_SimpleNewFromData(2, dims, typenum, data); + + if (!arr) { + return NULL; + } + if (F_contiguous) { + PyArray_ENABLEFLAGS(arr, NPY_ARRAY_F_CONTIGUOUS); + } + + if (transfer_ownership == true) { + #ifdef MS_WIN64 + // The default destructor of Python cannot free the data, as it is allocated + // with malloc. Therefore, with this code, we tell Numpy/Python to use + // the capsule_cleanup constructor. See: + // https://stackoverflow.com/questions/54269956/crash-of-jupyter-due-to-the-use-of-pyarray-enableflags/54278170#54278170 + // Note that in general it was disadvised to build all C code with MinGW on + // Windows. We do it anyway, see if we find any problems on the way. + void *capsule = PyCapsule_New(mat->_data, NULL, capsule_cleanup); + PyArray_SetBaseObject(arr, capsule); + #endif + /* fprintf(stderr, "============Ownership transfer================\n"); */ + PyArray_ENABLEFLAGS(arr, NPY_OWNDATA); + } + /* fprintf(stderr, "Exit data_to_ndarray\n"); */ + + return (PyObject *) arr; +} + +#endif // LASP_PYARRAY_H +////////////////////////////////////////////////////////////////////// diff --git a/lasp/c/lasp_python.h b/lasp/c/lasp_python.h index 4ce31d6..71cdb15 100644 --- a/lasp/c/lasp_python.h +++ b/lasp/c/lasp_python.h @@ -9,25 +9,8 @@ #ifndef LASP_PYTHON_H #define LASP_PYTHON_H #define TRACERPLUS (-10) -#include -#ifdef LASP_DOUBLE_PRECISION -#define LASP_NUMPY_FLOAT_TYPE NPY_FLOAT64 -#define LASP_NUMPY_COMPLEX_TYPE NPY_COMPLEX128 -#else -#define LASP_NUMPY_FLOAT_TYPE NPY_FLOAT32 -#endif +#include "lasp_pyarray.h" -#ifdef MS_WIN64 -/** - * Function passed to Python to use for cleanup of - * foreignly obtained data. - **/ -static inline void capsule_cleanup(void* capsule) { - void *memory = PyCapsule_GetPointer(capsule, NULL); - free(memory); - } - -#endif /** * Create a numpy array from an existing dmat. * @@ -38,50 +21,31 @@ static inline void capsule_cleanup(void* capsule) { * @return Numpy array */ static inline PyObject* dmat_to_ndarray(dmat* mat,bool transfer_ownership) { - fsTRACE(15); dbgassert(mat,NULLPTRDEREF); + dbgassert(mat->_data,NULLPTRDEREF); + /* fprintf(stderr, "Enter dmat_to_ndarray\n"); */ - import_array(); - // Dimensions given in wrong order, as mat is // Fortran-contiguous. Later on we transpose the result. This is // more easy than using the PyArray_New syntax. - npy_intp dims[] = {mat->n_cols,mat->n_rows}; - PyObject* arr_t = PyArray_SimpleNewFromData(2,dims, - LASP_NUMPY_FLOAT_TYPE, - mat->_data); - if(!arr_t) { + PyObject* arr = data_to_ndarray(mat->_data, + mat->n_rows, + mat->n_cols, + LASP_NUMPY_FLOAT_TYPE, + transfer_ownership, + true); // Fortran-contiguous + + if(transfer_ownership) { + mat->_foreign_data = true; + } + + if(!arr) { WARN("Array creation failure"); feTRACE(15); return NULL; } + /* fprintf(stderr, "Exit dmat_to_ndarray\n"); */ - if(transfer_ownership) { - mat->_foreign_data = true; -#ifdef MS_WIN64 - // The default destructor of Python cannot free the data, as it is allocated - // with malloc. Therefore, with this code, we tell Numpy/Python to use - // the capsule_cleanup constructor. See: - // https://stackoverflow.com/questions/54269956/crash-of-jupyter-due-to-the-use-of-pyarray-enableflags/54278170#54278170 - // Note that in general it was disadvised to build all C code with MinGW on Windows. - // We do it anyway, see if we find any problems on the way. - void* capsule = PyCapsule_New(mat->_data, NULL, capsule_cleanup); - PyArray_SetBaseObject( arr_t, capsule); -#else - PyArray_ENABLEFLAGS(arr_t, NPY_OWNDATA); -#endif - } - - // Transpose the array - PyObject* arr = PyArray_Transpose((PyArrayObject*) arr_t,NULL); - if(!arr) { - WARN("Array transpose failure"); - feTRACE(15); - return NULL; - } - Py_DECREF(arr_t); - - feTRACE(15); return arr; } diff --git a/lasp/device/__init__.py b/lasp/device/__init__.py index 760e8f8..9fa6d59 100644 --- a/lasp/device/__init__.py +++ b/lasp/device/__init__.py @@ -1,6 +1,3 @@ #!/usr/bin/python3 -__all__ = ['DAQConfiguration'] -from .lasp_daqconfig import DAQConfiguration, DAQInputChannel, DeviceInfo -from .lasp_rtaudio import (RtAudio, - get_numpy_dtype_from_format_string, - get_sampwidth_from_format_string) +from .lasp_rtaudio import * +from .lasp_daqconfig import * diff --git a/lasp/device/lasp_daqconfig.py b/lasp/device/lasp_daqconfig.py index 0a908d6..77be942 100644 --- a/lasp/device/lasp_daqconfig.py +++ b/lasp/device/lasp_daqconfig.py @@ -9,11 +9,14 @@ Data Acquistiion (DAQ) device descriptors, and the DAQ devices themselves """ from dataclasses import dataclass, field +from dataclasses_json import dataclass_json import numpy as np +from ..lasp_common import lasp_shelve, Qty @dataclass class DeviceInfo: + api: int index: int probed: bool name: str @@ -25,17 +28,17 @@ class DeviceInfo: prefsamplerate: int -from ..lasp_common import lasp_shelve +@dataclass_json @dataclass -class DAQInputChannel: +class DAQChannel: channel_enabled: bool channel_name: str sensitivity: float + unit: Qty - - +@dataclass_json @dataclass class DAQConfiguration: """ @@ -44,6 +47,9 @@ class DAQConfiguration: Args: duplex_mode: Set device to duplex mode, if possible monitor_gen: If set to true, add monitor channel to recording. + outputDelayBlocks: number of blocks to delay output stream when added + to input for monitoring the output synchronously with input. + input_device_name: ASCII name with which to open the device when connected outut_device_name: ASCII name with which to open the device when connected @@ -66,6 +72,7 @@ class DAQConfiguration: """ + api: int duplex_mode: bool input_device_name: str @@ -76,28 +83,12 @@ class DAQConfiguration: en_input_rate: int en_output_rate: int - input_channel_configs: list = None - monitor_gen: bool = False + input_channel_configs: list + output_channel_configs: list + monitor_gen: bool - - def __post_init__(self): - """ - We do a check here to see whether the list of enabled channels is - contiguous. Non-contiguous is not yet implemented in RtAudio backend. - """ - en_input = self.input_channel_configs - first_ch_enabled_found = False - ch_disabled_found_after = False - for ch in en_input: - if ch.channel_enabled: - first_ch_enabled_found = True - if ch_disabled_found_after: - raise ValueError('Error: non-contiguous array of channels' - ' found. This is not yet implemented in' - ' backend') - else: - if first_ch_enabled_found: - ch_disabled_found_after = True + outputDelayBlocks: int + nFramesPerBlock: int def firstEnabledInputChannelNumber(self): """ @@ -109,18 +100,55 @@ class DAQConfiguration: return i return -1 + def firstEnabledOutputChannelNumber(self): + """ + Returns the channel number of the first enabled output channel. Returns -1 if + no channels are enabled. + """ + for i, ch in enumerate(self.output_channel_configs): + if ch.channel_enabled: + return i + return -1 - def getEnabledChannels(self): + def lastEnabledInputChannelNumber(self): + last = -1 + for i, ch in enumerate(self.input_channel_configs): + if ch.channel_enabled: + last = i + return last + + def lastEnabledOutputChannelNumber(self): + last = -1 + for i, ch in enumerate(self.output_channel_configs): + print(ch) + if ch.channel_enabled: + last = i + return last + + def getEnabledInputChannels(self): en_channels = [] for chan in self.input_channel_configs: if chan.channel_enabled: en_channels.append(chan) return en_channels - def getEnabledChannelSensitivities(self): - return np.array( - [float(channel.sensitivity) for channel in - self.getEnabledChannels()]) + def getEnabledInputChannelNames(self): + return [ch.channel_name for ch in self.getEnabledInputChannels()] + + def getEnabledInputChannelSensitivities(self): + return [float(channel.sensitivity) for channel in + self.getEnabledInputChannels()] + + def getEnabledOutputChannels(self): + en_channels = [] + for chan in self.output_channel_configs: + if chan.channel_enabled: + en_channels.append(chan) + return en_channels + + def getEnabledOutputChannelSensitivities(self): + return [float(channel.sensitivity) for channel in + self.getEnabledOutputChannels()] @staticmethod def loadConfigs(): diff --git a/lasp/device/lasp_rtaudio.pyx b/lasp/device/lasp_rtaudio.pyx index 2c54f26..2342b74 100644 --- a/lasp/device/lasp_rtaudio.pyx +++ b/lasp/device/lasp_rtaudio.pyx @@ -9,6 +9,15 @@ from libc.stdio cimport printf, fprintf, stderr from libc.string cimport memcpy, memset from cpython.ref cimport PyObject,Py_INCREF, Py_DECREF +__all__ = ['AvType', 'RtAudio', 'get_numpy_dtype_from_format_string', + 'get_sampwidth_from_format_string'] + +class AvType: + """Specificying the type of data, for adding and removing callbacks from + the stream.""" + audio_input = 1 + audio_output = 2 + video = 4 cdef extern from "RtAudio.h" nogil: ctypedef unsigned long RtAudioStreamStatus @@ -47,7 +56,7 @@ cdef extern from "RtAudio.h" nogil: ctypedef int (*RtAudioCallback)(void* outputBuffer, void* inputBuffer, - unsigned int nFrames, + unsigned int nFramesPerBlock, double streamTime, RtAudioStreamStatus status, void* userData) @@ -56,6 +65,15 @@ cdef extern from "RtAudio.h" nogil: const string& errortxt) cdef cppclass cppRtAudio "RtAudio": + enum Api: + UNSPECIFIED + LINUX_ALSA + LINUX_PULSE + MACOSX_CORE + WINDOWS_WASAPI + WINDOWS_ASIO + WINDOWS_DS + cppclass DeviceInfo: bool probed string name @@ -69,17 +87,35 @@ cdef extern from "RtAudio.h" nogil: RtAudioFormat nativeFormats cppclass StreamOptions: + StreamOptions() RtAudioStreamFlags flags unsigned int numberOfBuffers string streamName int priority cppclass StreamParameters: + StreamParameters() unsigned int deviceId unsigned int nChannels unsigned int firstChannel - RtAudio() except + + @staticmethod + void getCompiledApi(vector[cppRtAudio.Api]& apis) + + @staticmethod + cppRtAudio.Api getCompiledApiByName(string& name) + + @staticmethod + string getApiDisplayName(cppRtAudio.Api api) + + @staticmethod + string getApiName(cppRtAudio.Api api) + + # RtAudio() except + + cppRtAudio() except + + cppRtAudio(cppRtAudio.Api api) except + + # ~RtAudio() Destructors should not be listed + unsigned int getDeviceCount() DeviceInfo getDeviceInfo(unsigned int device) unsigned int getDefaultOutputDevice() @@ -126,6 +162,13 @@ cdef extern from "atomic" namespace "std" nogil: T load() void store(T) +cdef extern from "lasp_pyarray.h": + PyObject* data_to_ndarray(void* data, + int n_rows,int n_cols, + int typenum, + bool transfer_ownership, + bool F_contiguous) + _formats_strkey = { '8-bit integers': (RTAUDIO_SINT8, 1, np.int8), '16-bit integers': (RTAUDIO_SINT16, 2, np.int16), @@ -135,10 +178,10 @@ _formats_strkey = { '64-bit floats': (RTAUDIO_FLOAT64, 8, np.float64), } _formats_rtkey = { - RTAUDIO_SINT8: ('8-bit integers', 1, cnp.NPY_INT8), - RTAUDIO_SINT16: ('16-bit integers',2, cnp.NPY_INT16), - RTAUDIO_SINT24: ('24-bit integers',3), - RTAUDIO_SINT32: ('32-bit integers',4, cnp.NPY_INT32), + RTAUDIO_SINT8: ('8-bit integers', 1, cnp.NPY_INT8), + RTAUDIO_SINT16: ('16-bit integers', 2, cnp.NPY_INT16), + RTAUDIO_SINT24: ('24-bit integers', 3), + RTAUDIO_SINT32: ('32-bit integers', 4, cnp.NPY_INT32), RTAUDIO_FLOAT32: ('32-bit floats', 4, cnp.NPY_FLOAT32), RTAUDIO_FLOAT64: ('64-bit floats', 8, cnp.NPY_FLOAT64), } @@ -153,52 +196,60 @@ def get_sampwidth_from_format_string(format_string): # It took me quite a long time to fully understand Cython's idiosyncrasies # concerning C(++) callbacks, the GIL and passing Python objects as pointers # into C(++) functions. But finally, here it is! -cdef object fromBufferToNPYNoCopy( - cnp.NPY_TYPES buffer_format_type, - void* buf, - size_t nchannels, - size_t nframes): - cdef cnp.npy_intp[2] dims = [nframes, nchannels] - - # Interleaved data is C-style contiguous. Therefore, we can directly use - # SimpleNewFromData() - array = cnp.PyArray_SimpleNewFromData(2, &dims[0], buffer_format_type, - buf) - return array +# cdef void fromNPYToBuffer(cnp.ndarray arr, +# void* buf): +# """ +# Copy a Python numpy array over to a buffer +# No checks, just memcpy! Careful! +# """ +# memcpy(buf, arr.data, arr.size*arr.itemsize) +cdef void copyChannel(void* to, void* from_, + unsigned bytesperchan, + unsigned toindex, + unsigned fromindex) nogil: + memcpy( &(( to)[bytesperchan*toindex]), + &(( from_)[bytesperchan*fromindex]), + bytesperchan) -cdef void fromNPYToBuffer(cnp.ndarray arr, - void* buf): - """ - Copy a Python numpy array over to a buffer - No checks, just memcpy! Careful! - """ - memcpy(buf, arr.data, arr.size*arr.itemsize) ctypedef struct _Stream: PyObject* pyCallback RtAudioFormat sampleformat + # Flag used to pass the stopThread. atomic[bool] stopThread - unsigned int nFrames + # Number of frames per block + unsigned nFramesPerBlock + # Number of bytes per channel + unsigned int nBytesPerChan + # Number of blocks to delay the output before adding to the input + unsigned int outputDelayBlocks + + # The structures as used by RtAudio cppRtAudio.StreamParameters inputParams cppRtAudio.StreamParameters outputParams + bool* inputChannelsEnabled + bool* outputChannelsEnabled + + unsigned ninputchannels_forwarded + unsigned noutputchannels_forwarded + # If these queue pointers are NULL, it means the stream does not have an # input, or output. + SafeQueue[void*] *outputDelayQueue SafeQueue[void*] *inputQueue SafeQueue[void*] *outputQueue - size_t inputbuffersize # Full size of the output buffer, in BYTES - size_t outputbuffersize # Full size of the output buffer, in BYTES CPPThread[void*, void (*)(void*)] *thread - + cdef int audioCallback(void* outputbuffer, void* inputbuffer, - unsigned int nFrames, + unsigned int nFramesPerBlock, double streamTime, RtAudioStreamStatus status, void* userData) nogil: @@ -211,8 +262,22 @@ cdef int audioCallback(void* outputbuffer, _Stream* stream void* outputbuffercpy = NULL void* inputbuffercpy = NULL + unsigned j, i + unsigned bytesperchan + unsigned noutputchannels_forwarded, ninputchannels_forwarded + bint ch_en stream = <_Stream*>(userData) + bytesperchan = stream.nBytesPerChan + ninputchannels_forwarded = stream.ninputchannels_forwarded + noutputchannels_forwarded = stream.noutputchannels_forwarded + + # with gil: + # print(f'bytesperchan: {bytesperchan}') + # print(f'ninputchannels_forwarded:: {ninputchannels_forwarded}') + # print(f'noutputchannels_forwarded:: {noutputchannels_forwarded}') + # fprintf(stderr, "Stream heartbeat...\n") + # Returning 2 means aborting the stream immediately if status == RTAUDIO_INPUT_OVERFLOW: @@ -224,35 +289,78 @@ cdef int audioCallback(void* outputbuffer, # stream.stopThread.store(True) return 0 - if nFrames != stream.nFrames: + if nFramesPerBlock != stream.nFramesPerBlock: printf('Number of frames mismath in callback data!\n') stream.stopThread.store(True) return 2 if inputbuffer: - # assert stream.inputQueue is not NULL - inputbuffercpy = malloc(stream.inputbuffersize) - memcpy(inputbuffercpy, inputbuffer, - stream.inputbuffersize) + # fprintf(stderr, "enter copying input buffer code\n") + # with gil: + # assert stream.inputQueue is not NULL + inputbuffercpy = malloc(bytesperchan*ninputchannels_forwarded) + if not inputbuffercpy: + fprintf(stderr, "Error allocating buffer\n") + return 2 + + if stream.outputDelayQueue: + if stream.outputDelayQueue.size() > stream.outputDelayBlocks: + outputbuffercpy = stream.outputDelayQueue.dequeue() + memcpy(inputbuffercpy, outputbuffercpy, + bytesperchan*noutputchannels_forwarded) + + # Cleanup buffer + free(outputbuffercpy) + outputbuffercpy = NULL + else: + memset(inputbuffercpy, 0, + bytesperchan*noutputchannels_forwarded) + + # Starting channel for copying input channels according to the channel + # map + j = stream.noutputchannels_forwarded + else: + j = 0 + i = 0 + for i in range(stream.inputParams.nChannels): + ch_en = stream.inputChannelsEnabled[i] + if ch_en: + copyChannel(inputbuffercpy, inputbuffer, bytesperchan, j, i) + j+=1 + i+=1 + stream.inputQueue.enqueue(inputbuffercpy) if outputbuffer: - # assert stream.outputQueue is not NULL + # with gil: + # assert stream.outputQueue + # fprintf(stderr, "enter copying output buffer code\n") if stream.outputQueue.empty(): fprintf(stderr, 'Stream output buffer underflow, zero-ing buffer...\n') - # Pre-stack three empty output buffers - # printf('Pre-stacking\n') - # outputbuffer = malloc(stream.outputbuffersize) - memset(outputbuffer, 0, stream.outputbuffersize) - if stream.inputQueue: - stream.inputQueue.enqueue(NULL) - return 0 - - outputbuffercpy = stream.outputQueue.dequeue() - memcpy(outputbuffer, outputbuffercpy, - stream.outputbuffersize) - free(outputbuffercpy) + memset(outputbuffer, 0, stream.outputParams.nChannels*bytesperchan) + else: + outputbuffercpy = stream.outputQueue.dequeue() + # fprintf(stderr, 'Copying data to stream output buffer...\n') + j = 0 + i = 0 + for i in range(stream.outputParams.nChannels): + ch_en = stream.outputChannelsEnabled[i] + if ch_en: + copyChannel(outputbuffer, outputbuffercpy, bytesperchan, i, j) + j+=1 + else: + # If channel is not enabled, we set the data to zero + memset( &(( outputbuffer)[bytesperchan*i]), 0, bytesperchan) + pass + i+=1 + if stream.outputDelayQueue: + # fprintf(stderr, "Adding to delay queue\n") + stream.outputDelayQueue.enqueue(outputbuffercpy) + else: + free(outputbuffercpy) + + outputbuffercpy = NULL return 0 @@ -263,69 +371,104 @@ cdef void audioCallbackPythonThreadFunction(void* voidstream) nogil: cnp.NPY_TYPES npy_format void* inputbuffer = NULL void* outputbuffer = NULL + unsigned noutputchannels_forwarded + unsigned ninputchannels_forwarded + unsigned nBytesPerChan + unsigned nFramesPerBlock + + void* _TESTDATA stream = <_Stream*> voidstream - printf('Thread started\n') + ninputchannels_forwarded = stream.ninputchannels_forwarded + noutputchannels_forwarded = stream.noutputchannels_forwarded + nBytesPerChan = stream.nBytesPerChan + nFramesPerBlock = stream.nFramesPerBlock with gil: npy_format = _formats_rtkey[stream.sampleformat][2] callback = stream.pyCallback + print(f'noutputchannels_forwarded: {noutputchannels_forwarded}') + print(f'ninputchannels_forwarded: {ninputchannels_forwarded}') + print(f'nBytesPerChan: {nBytesPerChan}') + print(f'nFramesPerBlock: {nFramesPerBlock}') + while True: if stream.stopThread.load() == True: printf('Stopping thread...\n') return if stream.inputQueue: + # fprintf(stderr, "Waiting on input queue\n") inputbuffer = stream.inputQueue.dequeue() - # if inputbuffer == NULL: - # continue + if not inputbuffer: + printf('Stopping thread...\n') + return if stream.outputQueue: - outputbuffer = malloc(stream.outputbuffersize) + # fprintf(stderr, 'Allocating output buffer...\n') + outputbuffer = malloc(nBytesPerChan*noutputchannels_forwarded) + # memset(outputbuffer, 0, nBytesPerChan*noutputchannels_forwarded) with gil: # Obtain stream information npy_input = None npy_output = None - - if stream.inputQueue and inputbuffer: + + if stream.inputQueue: + # assert(inputbuffer) try: - npy_input = fromBufferToNPYNoCopy( - npy_format, - inputbuffer, - stream.inputParams.nChannels, - stream.nFrames) + # print(f'========ninputchannels_forwarded: {ninputchannels_forwarded}') + # print(f'========nFramesPerBlock: {nFramesPerBlock}') + # print(f'========npy_format: {npy_format}') + + npy_input = data_to_ndarray( + inputbuffer, + nFramesPerBlock, + ninputchannels_forwarded, + npy_format, + True, # Do transfer ownership + True) # F-contiguous is True: data is Fortran-cont. + # fprintf(stderr, "Copying array...\n") + # fprintf(stderr, "End Copying array...\n") except Exception as e: print('exception in cython callback for audio input: ', str(e)) return if stream.outputQueue: + # fprintf(stderr, 'Copying output buffer to Numpy...\n') try: - assert outputbuffer != NULL - npy_output = fromBufferToNPYNoCopy( - npy_format, - outputbuffer, - stream.outputParams.nChannels, - stream.nFrames) + npy_output = data_to_ndarray( + outputbuffer, + nFramesPerBlock, + noutputchannels_forwarded, + npy_format, + False, # Do not transfer ownership + True) # F-contiguous except Exception as e: print('exception in Cython callback for audio output: ', str(e)) return + try: + # fprintf(stderr, "Python callback...\n") rval = callback(npy_input, npy_output, - stream.nFrames, + nFramesPerBlock, ) + # fprintf(stderr, "Return from Python callback...\n") except Exception as e: print('Exception in Cython callback: ', str(e)) return - if stream.outputQueue: + if stream.outputQueue: + # fprintf(stderr, 'Enqueuing output buffer...\n') + stream.outputQueue.enqueue(outputbuffer) if not stream.inputQueue: + # fprintf(stderr, 'No input queue!\n') while stream.outputQueue.size() > 10 and not stream.stopThread.load(): # printf('Sleeping...\n') # No input queue to wait on, so we relax a bit here. @@ -334,30 +477,50 @@ cdef void audioCallbackPythonThreadFunction(void* voidstream) nogil: # Outputbuffer is free'ed by the audiothread, so should not be touched # here. outputbuffer = NULL + # Inputbuffer memory is owned by Numpy, so should not be free'ed inputbuffer = NULL cdef void errorCallback(RtAudioError.Type _type,const string& errortxt) nogil: - printf('RtAudio error callback called: ') - printf(errortxt.c_str()) - printf('\n') + fprintf(stderr, 'RtAudio error callback called: ') + fprintf(stderr, errortxt.c_str()) + fprintf(stderr, '\n') cdef class RtAudio: cdef: - cppRtAudio _rtaudio + cppRtAudio* _rtaudio _Stream* _stream + int api - def __cinit__(self): + def __cinit__(self, unsigned int iapi): + cdef: + cppRtAudio.Api api = iapi + self._rtaudio = new cppRtAudio(api) self._stream = NULL self._rtaudio.showWarnings(True) + self.api = api def __dealloc__(self): if self._stream is not NULL: - fprintf(stderr, 'Force closing stream...') + # fprintf(stderr, 'Force closing stream...') self._rtaudio.closeStream() self.cleanupStream(self._stream) + del self._rtaudio + + @staticmethod + def getApi(): + cdef: + vector[cppRtAudio.Api] apis + cppRtAudio.getCompiledApi(apis) + apidict = {} + for api in apis: + apidict[ api] = { + 'displayname': cppRtAudio.getApiDisplayName(api).decode('utf-8'), + 'name': cppRtAudio.getApiName(api).decode('utf-8') + } + return apidict cpdef unsigned int getDeviceCount(self): return self._rtaudio.getDeviceCount() @@ -372,14 +535,17 @@ cdef class RtAudio: """ Return device information of the current device """ - cdef cppRtAudio.DeviceInfo devinfo = self._rtaudio.getDeviceInfo(device) sampleformats = [] + cdef: + cppRtAudio.DeviceInfo devinfo + devinfo = self._rtaudio.getDeviceInfo(device) nf = devinfo.nativeFormats for format_ in [ RTAUDIO_SINT8, RTAUDIO_SINT16, RTAUDIO_SINT24, RTAUDIO_SINT32, RTAUDIO_FLOAT32, RTAUDIO_FLOAT64]: if nf & format_: sampleformats.append(_formats_rtkey[format_][0]) return DeviceInfo( + api = self.api, index = device, probed = devinfo.probed, name = devinfo.name.decode('utf-8'), @@ -391,109 +557,204 @@ cdef class RtAudio: prefsamplerate = devinfo.preferredSampleRate) @cython.nonecheck(True) - def openStream(self,object outputParams, - object inputParams, - str sampleformat, - unsigned int sampleRate, - unsigned int bufferFrames, - object pyCallback, - object options = None, - object pyErrorCallback = None): + def openStream(self, + avstream + ): """ Opening a stream with specified parameters Args: - outputParams: dictionary of stream outputParameters, set to None - if no outputPararms are specified - inputParams: dictionary of stream inputParameters, set to None - if no inputPararms are specified - sampleRate: desired sample rate. - bufferFrames: the amount of frames in a callback buffer - callback: callback to call. Note: this callback is called on a - different thread! - options: A dictionary of optional additional stream options - errorCallback: client-defined function that will be invoked when an - error has occured. + avstream: AvStream instance Returns: None """ + if self._stream is not NULL: raise RuntimeError('Stream is already opened.') + daqconfig = avstream.daqconfig + avtype = avstream.avtype + device = avstream.device + cdef: + bint duplex_mode = daqconfig.duplex_mode + bint monitorOutput = daqconfig.monitor_gen + unsigned int outputDelayBlocks = daqconfig.outputDelayBlocks cppRtAudio.StreamParameters *rtOutputParams_ptr = NULL cppRtAudio.StreamParameters *rtInputParams_ptr = NULL cppRtAudio.StreamOptions streamoptions - size_t bytespersample - unsigned int bufferFrames_local + size_t sw + unsigned int nFramesPerBlock = int(daqconfig.nFramesPerBlock) + int firstinputchannel, firstoutputchannel + int lastinputchannel, lastoutputchannel + unsigned int ninputchannels_forwarded=0 + unsigned int ninputchannels_rtaudio=0 + unsigned int noutputchannels_rtaudio=0 + unsigned int noutputchannels_forwarded=0 + unsigned int samplerate + int i + bint input_stream=False, output_stream=False + bool* inputChannelsEnabled + bool* outputChannelsEnabled - streamoptions.flags = RTAUDIO_HOG_DEVICE - streamoptions.numberOfBuffers = 4 - bufferFrames_local = bufferFrames + if daqconfig.nFramesPerBlock > 8192 or daqconfig.nFramesPerBlock < 512: + raise ValueError('Invalid number of nFramesPerBlock') - self._stream = <_Stream*> malloc(sizeof(_Stream)) - if self._stream == NULL: - raise MemoryError() - - self._stream.pyCallback = pyCallback - Py_INCREF(pyCallback) - self._stream.sampleformat = _formats_strkey[sampleformat][0] - self._stream.inputQueue = NULL - self._stream.outputQueue = NULL - self._stream.outputbuffersize = 0 - self._stream.inputbuffersize = 0 - self._stream.stopThread.store(False) - self._stream.thread = NULL - - bytespersample = get_sampwidth_from_format_string(sampleformat) - - if outputParams is not None: - rtOutputParams_ptr = &self._stream.outputParams - rtOutputParams_ptr.deviceId = outputParams['deviceid'] - rtOutputParams_ptr.nChannels = outputParams['nchannels'] - rtOutputParams_ptr.firstChannel = outputParams['firstchannel'] - self._stream.outputQueue = new SafeQueue[void*]() - - if inputParams is not None: - rtInputParams_ptr = &self._stream.inputParams - rtInputParams_ptr.deviceId = inputParams['deviceid'] - rtInputParams_ptr.nChannels = inputParams['nchannels'] - rtInputParams_ptr.firstChannel = inputParams['firstchannel'] - self._stream.inputQueue = new SafeQueue[void*]() + if daqconfig.outputDelayBlocks < 0 or daqconfig.outputDelayBlocks > 10: + raise ValueError('Invalid number of outputDelayBlocks') try: + + # Determine sample rate and sample format, determine whether we are an + # input or an output stream, or both + if avtype == AvType.audio_input or duplex_mode: + # Here, we override the sample format in case of duplex mode. + sampleformat = daqconfig.en_input_sample_format + samplerate = int(daqconfig.en_input_rate) + input_stream = True + if duplex_mode: + output_stream = True + else: + sampleformat = daqconfig.en_output_sample_format + samplerate = int(daqconfig.en_output_rate) + output_stream = True + + print(f'Is input stream: {input_stream}') + print(f'Is output stream: {output_stream}') + + sw = get_sampwidth_from_format_string(sampleformat) + print(f'samplewidth: {sw}') + + # All set, allocate the stream! + self._stream = <_Stream*> malloc(sizeof(_Stream)) + if self._stream == NULL: + raise MemoryError('Could not allocate stream: memory error.') + + self._stream.pyCallback = avstream._audioCallback + # Increase reference count to the callback + Py_INCREF( avstream._audioCallback) + + self._stream.sampleformat = _formats_strkey[sampleformat][0] + self._stream.stopThread.store(False) + self._stream.inputQueue = NULL + self._stream.outputQueue = NULL + self._stream.outputDelayQueue = NULL + + self._stream.thread = NULL + + self._stream.outputDelayBlocks = outputDelayBlocks + self._stream.ninputchannels_forwarded = 0 + self._stream.noutputchannels_forwarded = 0 + self._stream.inputChannelsEnabled = NULL + self._stream.outputChannelsEnabled = NULL + + # Create channel maps for input channels, set RtAudio input stream + # parameters + if input_stream: + firstinputchannel = daqconfig.firstEnabledInputChannelNumber() + lastinputchannel = daqconfig.lastEnabledInputChannelNumber() + ninputchannels_rtaudio = lastinputchannel-firstinputchannel+1 + + # print(firstinputchannel) + # print(lastinputchannel) + # print(ninputchannels_rtaudio) + + if lastinputchannel < 0 or ninputchannels_rtaudio < 1: + raise ValueError('Not enough input channels selected') + input_ch = daqconfig.input_channel_configs + + inputChannelsEnabled = malloc(sizeof(bool)*ninputchannels_rtaudio) + self._stream.inputChannelsEnabled = inputChannelsEnabled + + for i in range(firstinputchannel, lastinputchannel+1): + ch_en = input_ch[i].channel_enabled + if ch_en: + ninputchannels_forwarded += 1 + inputChannelsEnabled[i] = ch_en + + rtInputParams_ptr = &self._stream.inputParams + rtInputParams_ptr.deviceId = device.index + rtInputParams_ptr.nChannels = ninputchannels_rtaudio + rtInputParams_ptr.firstChannel = firstinputchannel + + self._stream.inputQueue = new SafeQueue[void*]() + self._stream.ninputchannels_forwarded = ninputchannels_forwarded + + + # Create channel maps for output channels + if output_stream: + firstoutputchannel = daqconfig.firstEnabledOutputChannelNumber() + lastoutputchannel = daqconfig.lastEnabledOutputChannelNumber() + noutputchannels_rtaudio = lastoutputchannel-firstoutputchannel+1 + + # print(firstoutputchannel) + # print(lastoutputchannel) + # print(noutputchannels_rtaudio) + + if lastoutputchannel < 0 or noutputchannels_rtaudio < 1: + raise ValueError('Not enough output channels selected') + output_ch = daqconfig.output_channel_configs + + outputChannelsEnabled = malloc(sizeof(bool)*noutputchannels_rtaudio) + self._stream.outputChannelsEnabled = outputChannelsEnabled + for i in range(firstoutputchannel, lastoutputchannel+1): + ch_en = output_ch[i].channel_enabled + if ch_en: + noutputchannels_forwarded += 1 + outputChannelsEnabled[i] = ch_en + + rtOutputParams_ptr = &self._stream.outputParams + rtOutputParams_ptr.deviceId = device.index + rtOutputParams_ptr.nChannels = noutputchannels_rtaudio + rtOutputParams_ptr.firstChannel = firstoutputchannel + + self._stream.outputQueue = new SafeQueue[void*]() + self._stream.noutputchannels_forwarded = noutputchannels_forwarded + + if monitorOutput and duplex_mode: + self._stream.outputDelayQueue = new SafeQueue[void*]() + self._stream.ninputchannels_forwarded += noutputchannels_forwarded + + + streamoptions.flags = RTAUDIO_HOG_DEVICE + streamoptions.flags |= RTAUDIO_NONINTERLEAVED + streamoptions.numberOfBuffers = 4 + streamoptions.streamName = "LASP Audio stream".encode('utf-8') + streamoptions.priority = 1 + self._rtaudio.openStream(rtOutputParams_ptr, rtInputParams_ptr, _formats_strkey[sampleformat][0], - sampleRate, - &bufferFrames_local, + samplerate, + &nFramesPerBlock, audioCallback, self._stream, &streamoptions, # Stream options errorCallback # Error callback ) - self._stream.nFrames = bufferFrames_local + + self._stream.nBytesPerChan = nFramesPerBlock*sw + self._stream.nFramesPerBlock = nFramesPerBlock except Exception as e: - print('Exception occured in stream opening: ', str(e)) + print('Exception occured in stream opening: ', e) self.cleanupStream(self._stream) self._stream = NULL - raise - - if inputParams is not None: - self._stream.inputbuffersize = bufferFrames_local*bytespersample*inputParams['nchannels'] - if outputParams is not None: - self._stream.outputbuffersize = bufferFrames_local*bytespersample*outputParams['nchannels'] + raise e with nogil: self._stream.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, self._stream) + # Allow it to start CPPsleep(500) + pass - return bufferFrames_local + return nFramesPerBlock cdef cleanupStream(self, _Stream* stream): # printf('Entrance function cleanupStream...\n') + cdef: + void* ptr if stream == NULL: return @@ -509,17 +770,32 @@ cdef class RtAudio: stream.thread.join() # printf('Thread joined!\n') del stream.thread + stream.thread = NULL + + if stream.inputChannelsEnabled: + free(stream.inputChannelsEnabled) + if stream.outputChannelsEnabled: + free(stream.outputChannelsEnabled) if stream.outputQueue: + while not stream.outputQueue.empty(): + free(stream.outputQueue.dequeue()) del stream.outputQueue if stream.inputQueue: + while not stream.inputQueue.empty(): + free(stream.inputQueue.dequeue()) del stream.inputQueue + if stream.outputDelayQueue: + while not stream.outputDelayQueue.empty(): + free(stream.outputDelayQueue.dequeue()) + del stream.outputDelayQueue + fprintf(stderr, "End cleanup stream queues...\n") if stream.pyCallback: Py_DECREF( stream.pyCallback) - + stream.pyCallback = NULL + # fprintf(stderr, "End cleanup callback...\n") free(stream) - # printf('Cleanup of stream is done\n') def startStream(self): self._rtaudio.startStream() diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index f8dcd51..3abd4fb 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -11,20 +11,13 @@ import numpy as np import time from .device import (RtAudio, DeviceInfo, DAQConfiguration, get_numpy_dtype_from_format_string, - get_sampwidth_from_format_string) + get_sampwidth_from_format_string, AvType) -__all__ = ['AvType', 'AvStream'] +__all__ = ['AvStream'] video_x, video_y = 640, 480 -class AvType: - """Specificying the type of data, for adding and removing callbacks from - the stream.""" - audio_input = 1 - audio_output = 2 - video = 4 - class AvStream: """Audio and video data stream, to which callbacks can be added for @@ -49,59 +42,28 @@ class AvStream: """ self.daqconfig = daqconfig - self._device = device + self.device = device self.avtype = avtype self.duplex_mode = daqconfig.duplex_mode - self.monitor_gen = daqconfig.monitor_gen - # Determine highest input channel number - channelconfigs = daqconfig.input_channel_configs - firstchannel = daqconfig.firstEnabledInputChannelNumber() - if firstchannel < 0: - raise ValueError('No input channels enabled') - - self.channel_names = [] - self.sensitivity = self.daqconfig.getEnabledChannelSensitivities() + self.output_channel_names = [ch.channel_name for ch in daqconfig.getEnabledOutputChannels()] if daqconfig.monitor_gen: - assert self.duplex_mode - self.channel_names.append('Generated signal') - self.sensitivity = np.concatenate([np.array([1.]), - self.sensitivity]) + self.input_channel_names = self.output_channel_names + self.input_sensitivity = daqconfig.getEnabledOutputChannelSensitivities() + else: + self.input_channel_names = [] + self.input_sensitivity = [] + + self.input_sensitivity += daqconfig.getEnabledInputChannelSensitivities() + self.input_sensitivity = np.asarray(self.input_sensitivity) - rtaudio_inputparams = None - rtaudio_outputparams = None + self.input_channel_names += [ + ch.channel_name for ch in daqconfig.getEnabledInputChannels()] - self.nframes_per_block = 1024 - - if self.duplex_mode or avtype == AvType.audio_output: - rtaudio_outputparams = {'deviceid': device.index, - # TODO: Add option to specify the number of output channels to use - 'nchannels': 1, # device.outputchannels, - 'firstchannel': 0} - self.sampleformat = daqconfig.en_output_sample_format - self.samplerate = int(daqconfig.en_output_rate) - - if avtype == AvType.audio_input or self.duplex_mode: - for i, channelconfig in enumerate(channelconfigs): - if channelconfig.channel_enabled: - self.nchannels = i+1 - self.channel_names.append(channelconfig.channel_name) - rtaudio_inputparams = {'deviceid': device.index, - 'nchannels': self.nchannels, - 'firstchannel': firstchannel} - - # Here, we override the sample format in case of duplex mode. - self.sampleformat = daqconfig.en_input_sample_format - self.samplerate = int(daqconfig.en_input_rate) - - - # Fill in numpy data type, and sample width - self.numpy_dtype = get_numpy_dtype_from_format_string( - self.sampleformat) - self.sampwidth = get_sampwidth_from_format_string( - self.sampleformat) + self.input_samplerate = float(daqconfig.en_input_rate) + self.output_samplerate = float(daqconfig.en_output_rate) # Counters for the number of frames that have been coming in self._aframectr = Atomic(0) @@ -125,19 +87,19 @@ class AvStream: # Possible, but long not tested: store video self._videothread = None + self._rtaudio = RtAudio(daqconfig.api) + self.blocksize = self._rtaudio.openStream(self) - try: - self._rtaudio = RtAudio() - self.blocksize = self._rtaudio.openStream( - rtaudio_outputparams, # Outputparams - rtaudio_inputparams, # Inputparams - self.sampleformat, # Sampleformat - self.samplerate, - self.nframes_per_block, # Buffer size in frames - self._audioCallback) + # Fill in numpy data type, and sample width + self.input_numpy_dtype = get_numpy_dtype_from_format_string( + daqconfig.en_input_sample_format) + self.output_numpy_dtype = get_numpy_dtype_from_format_string( + daqconfig.en_output_sample_format) - except Exception as e: - raise RuntimeError(f'Could not initialize DAQ device: {str(e)}') + self.input_sampwidth = get_sampwidth_from_format_string( + daqconfig.en_input_sample_format) + self.output_sampwidth = get_sampwidth_from_format_string( + daqconfig.en_output_sample_format) def close(self): self._rtaudio.closeStream() @@ -212,29 +174,30 @@ class AvStream: """This is called (from a separate thread) for each audio block.""" self._aframectr += nframes with self._callbacklock: - # Count the number of output callbacks. If no output callbacks are # present, and there should be output callbacks, we explicitly set # the output buffer to zero noutput_cb = len(self._callbacks[AvType.audio_output]) shouldhaveoutput = (self.avtype == AvType.audio_output or - self.duplex_mode) - if noutput_cb == 0 and shouldhaveoutput: + self.daqconfig.duplex_mode) + if noutput_cb == 0 and shouldhaveoutput and outdata is not None: outdata[:, :] = 0 # Loop over callbacks - for cb in self._callbacks[AvType.audio_output]: - try: - cb(indata, outdata, self._aframectr()) - except Exception as e: - print(e) - return 1 - for cb in self._callbacks[AvType.audio_input]: - try: - cb(indata, outdata, self._aframectr()) - except Exception as e: - print(e) - return 1 + if outdata is not None: + for cb in self._callbacks[AvType.audio_output]: + try: + cb(indata, outdata, self._aframectr()) + except Exception as e: + print(e) + return 2 + if indata is not None: + for cb in self._callbacks[AvType.audio_input]: + try: + cb(indata, outdata, self._aframectr()) + except Exception as e: + print(e) + return 1 return 0 if self._running else 1 diff --git a/lasp/lasp_common.py b/lasp/lasp_common.py index 3406138..37be552 100644 --- a/lasp/lasp_common.py +++ b/lasp/lasp_common.py @@ -9,6 +9,8 @@ import numpy as np from .wrappers import Window as wWindow from collections import namedtuple +from dataclasses import dataclass +from dataclasses_json import dataclass_json """ Common definitions used throughout the code. @@ -39,9 +41,24 @@ U_REF = 5e-8 # 50 nano meter / s # hence this is the reference level as specified below. dBFS_REF = 0.5*2**0.5 # Which level would be -3.01 dBFS -Qty = namedtuple('Qty', 'name unit_name unit_symb level_unit level_ref_name level_ref_value') +@dataclass_json +@dataclass +class Qty: + name: str + unit_name: str + unit_symb: str + level_unit: str + level_ref_name: str + level_ref_value: str class SIQtys: + N = Qty(name='Number', + unit_name='No unit / full scale', + unit_symb=('-'), + level_unit=('dBFS',), + level_ref_name=('Full scale sine wave',), + level_ref_value=(dBFS_REF,) + ) AP = Qty(name='Acoustic Pressure', unit_name='Pascal', unit_symb=('Pa', 'muPa'), @@ -51,21 +68,14 @@ class SIQtys: ) V = Qty(name='Voltage', - unit_name='volt', + unit_name='Volt', unit_symb='V', level_unit=('dBV',), # dBV level_ref_name=('1V',), level_ref_value=(1.0,), ) - N = Qty(name='No unit', - unit_name='', - unit_symb='[-]', - level_unit=('dBFS',), - level_ref_name=('Full scale sine wave',), - level_ref_value=(dBFS_REF,) - ) types = (AP, V, N) - default = AP + default = N default_index = 0 @staticmethod @@ -78,7 +88,7 @@ class SIQtys: """ cb.clear() for ty in SIQtys.types: - cb.addItem(f'{ty.name} [{ty.unit_symb[0]}') + cb.addItem(f'{ty.unit_name}') cb.setCurrentIndex(SIQtys.default_index) @staticmethod diff --git a/lasp/lasp_measurement.py b/lasp/lasp_measurement.py index 39efbce..09fa710 100644 --- a/lasp/lasp_measurement.py +++ b/lasp/lasp_measurement.py @@ -9,6 +9,8 @@ The ASCEE hdf5 measurement file format contains the following fields: - Attributes: +'version': If not given, version 1 is assumed. For version 1, measurement data +is assumed to be acoustic data. 'samplerate': The audio data sample rate in Hz. 'nchannels': The number of audio channels in the file 'sensitivity': (Optionally) the stored sensitivity of the record channels. @@ -42,6 +44,7 @@ from .lasp_config import LASP_NUMPY_FLOAT_TYPE from scipy.io import wavfile import os import time +import wave class BlockIter: diff --git a/lasp/lasp_record.py b/lasp/lasp_record.py index 95c372c..1878998 100644 --- a/lasp/lasp_record.py +++ b/lasp/lasp_record.py @@ -78,10 +78,6 @@ class Recording: stream = self._stream f = self._f nchannels = stream.nchannels - if stream.monitor_gen: - nchannels += 1 - - self.monitor_gen = stream.monitor_gen self._ad = f.create_dataset('audio', (1, stream.blocksize, nchannels), @@ -173,11 +169,7 @@ class Recording: return self._ad.resize(self._ablockno()+1, axis=0) - if self.monitor_gen: - self._ad[self._ablockno(), :, 0] = outdata[:, 0] - self._ad[self._ablockno(), :, 1:] = indata - else: - self._ad[self._ablockno(), :, :] = indata + self._ad[self._ablockno(), :, :] = indata self._ablockno += 1 def _vCallback(self, frame, framectr): diff --git a/setup.py b/setup.py index ebd4746..322e3f6 100644 --- a/setup.py +++ b/setup.py @@ -32,6 +32,7 @@ setup( author_email="j.a.dejong@ascee.nl", install_requires=['matplotlib>=1.0', 'scipy>=1.0', 'numpy>=1.0', 'h5py', + 'dataclasses_json', ], license='MIT', description="Library for Acoustic Signal Processing",