From fa0c241fb911ed9c0daa27a19fac06ec20437ddd Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Fri, 18 Sep 2020 08:52:56 +0200 Subject: [PATCH] Daq measurement interface changed to permanently work with UlDaq. Should become a choice in future. Simultaneous output and input not yet working. Probably the DAQ API is not working properly with threads. We should find a minimum working example of simultaneous input/output. Then, the UlDAQ and RtAudio details should be abstracted away in a common interface, written in C++ --- lasp/device/lasp_cppthread.h | 2 + lasp/device/lasp_uldaq.pyx | 262 +++++++++++++++++++++++------------ lasp/lasp_avstream.py | 7 +- scripts/lasp_record | 32 ++++- 4 files changed, 209 insertions(+), 94 deletions(-) diff --git a/lasp/device/lasp_cppthread.h b/lasp/device/lasp_cppthread.h index 77bd61a..2f9311a 100644 --- a/lasp/device/lasp_cppthread.h +++ b/lasp/device/lasp_cppthread.h @@ -29,4 +29,6 @@ void CPPsleep_ms(unsigned int ms) { void CPPsleep_us(unsigned int us) { std::this_thread::sleep_for(std::chrono::microseconds(us)); } + + #endif // LASP_CPPTHREAD_H diff --git a/lasp/device/lasp_uldaq.pyx b/lasp/device/lasp_uldaq.pyx index deddfa9..add7700 100644 --- a/lasp/device/lasp_uldaq.pyx +++ b/lasp/device/lasp_uldaq.pyx @@ -24,6 +24,8 @@ cdef struct DaqThreadData: unsigned noutChanDescriptors atomic[bool] stopThread + atomic[bool] outThread_ready + mutex* tdmutex CPPThread[void*, void (*)(void*)] *inThread CPPThread[void*, void (*)(void*)] *outThread @@ -70,7 +72,10 @@ cdef void inThreadFunction(void* threaddata_void) nogil: bottom_enqueued = True fprintf(stderr, 'Starting input thread\n') + while not td.outThread_ready.load(): + CPPsleep_ms(50) + td.tdmutex.lock() err = ulDaqInScan(td.handle, td.inChanDescriptors, td.ninChanDescriptors, @@ -79,6 +84,7 @@ cdef void inThreadFunction(void* threaddata_void) nogil: scanoptions, inscanflags, td.inbuffer) + td.tdmutex.unlock() fprintf(stderr, 'Actual input sampling rate: %0.2f\n', samplerate) if err != ERR_NO_ERROR: @@ -87,16 +93,21 @@ cdef void inThreadFunction(void* threaddata_void) nogil: return + td.tdmutex.lock() err = ulDaqInScanStatus(td.handle, &scanstat, &xstat) + td.tdmutex.unlock() if err != ERR_NO_ERROR: fprintf(stderr, 'Error obtaining input scan status\n') showErr(err) return while td.stopThread.load() == False and err == ERR_NO_ERROR: + td.tdmutex.lock() err = ulDaqInScanStatus(td.handle, &scanstat, &xstat) + td.tdmutex.unlock() if err != ERR_NO_ERROR: showErr(err) + break if xstat.currentIndex < buffer_mid_idx: top_enqueued = False @@ -132,7 +143,9 @@ cdef void inThreadFunction(void* threaddata_void) nogil: fprintf(stderr, 'Exit while loop input thread\n') + td.tdmutex.lock() err = ulDaqInScanStop(td.handle) + td.tdmutex.unlock() if err != ERR_NO_ERROR: fprintf(stderr, "Error stopping DAQ input thread\n") showErr(err) @@ -184,6 +197,9 @@ cdef void outThreadFunction(void* threaddata_void) nogil: fprintf(stderr, 'Starting output thread\n') + td.tdmutex.lock() + fprintf(stderr, 'mutex locked\n') + err = ulAOutScan(td.handle, 0, 0, @@ -193,6 +209,8 @@ cdef void outThreadFunction(void* threaddata_void) nogil: scanoptions, outscanflags, td.outbuffer) + fprintf(stderr, 'returned\n') + td.tdmutex.unlock() fprintf(stderr, 'Actual output sampling rate: %0.2f\n', samplerate) if err != ERR_NO_ERROR: @@ -201,29 +219,40 @@ cdef void outThreadFunction(void* threaddata_void) nogil: return + td.tdmutex.lock() err = ulAOutScanStatus(td.handle, &scanstat, &xstat) + td.tdmutex.unlock() if err != ERR_NO_ERROR: showErr(err) return + td.outThread_ready.store(True) + while td.stopThread.load() == False: + # printf('Running output thread in loop\n') + td.tdmutex.lock() err = ulAOutScanStatus(td.handle, &scanstat, &xstat) + td.tdmutex.unlock() if err != ERR_NO_ERROR: showErr(err) break if xstat.currentIndex < buffer_mid_idx: top_enqueued = False + # fprintf('xstat.currentIndex' if not bottom_enqueued: # Copy the bottom of the buffer to the queue, while transposing # it. if not td.outQueue.empty(): outbuffer_cpy = td.outQueue.dequeue() else: - outbuffer_cpy = malloc(sizeof(double)*td.noutChanDescriptors*td.samplesPerBlock) - for chan in range(td.noutChanDescriptors): - for sample in range(td.samplesPerBlock): - outbuffer_cpy[chan*td.samplesPerBlock+sample] = 0.0 + # outbuffer_cpy = malloc(sizeof(double)*td.noutChanDescriptors*td.samplesPerBlock) + outbuffer_cpy = malloc(sizeof(double)*td.samplesPerBlock) + # for chan in range(td.noutChanDescriptors): + # for sample in range(td.samplesPerBlock): + # outbuffer_cpy[chan*td.samplesPerBlock+sample] = 0.0 + for sample in range(td.samplesPerBlock): + outbuffer_cpy[sample] = 0.0 fprintf(stderr, 'Output buffer queue empty! Filling with zeros\n') for chan in range(td.noutChanDescriptors): @@ -243,9 +272,11 @@ cdef void outThreadFunction(void* threaddata_void) nogil: outbuffer_cpy = td.outQueue.dequeue() else: outbuffer_cpy = malloc(sizeof(double)*td.noutChanDescriptors*td.samplesPerBlock) - for chan in range(td.noutChanDescriptors): - for sample in range(td.samplesPerBlock): - outbuffer_cpy[chan*td.samplesPerBlock+sample] = 0.0 + # for chan in range(td.noutChanDescriptors): + # for sample in range(td.samplesPerBlock): + # outbuffer_cpy[chan*td.samplesPerBlock+sample] = 0.0 + for sample in range(td.samplesPerBlock): + outbuffer_cpy[sample] = 0.0 fprintf(stderr, 'Output buffer queue empty! Filling with zeros\n') for chan in range(td.noutChanDescriptors): @@ -263,7 +294,10 @@ cdef void outThreadFunction(void* threaddata_void) nogil: fprintf(stderr, 'Exit while loop output thread\n') + + td.tdmutex.lock() err = ulAOutScanStop(td.handle) + td.tdmutex.unlock() if err != ERR_NO_ERROR: fprintf(stderr, "Error stopping AOut output thread\n") showErr(err) @@ -366,9 +400,14 @@ cdef class UlDT9837A: td.outThread = NULL td.stopThread.store(False) + td.tdmutex = new mutex() td.inbuffer = NULL td.outbuffer = NULL + if not outQueue: + td.outThread_ready.store(True) + else: + td.outThread_ready.store(False) # Configure INPUTS py_nnormalinchannels = sum([1 if en_input else 0 for en_input in @@ -447,50 +486,51 @@ cdef class UlDT9837A: raise RuntimeError('Error creating thread') self.td = td - def start(self): - cdef: - SafeQueue[void*] *inqueue - SafeQueue[void*] *outqueue - int i, sample, samples, samplesperbuf - double meas_seconds, samplerate - double* inbuf - double* outbuf + # def start(self): + # cdef: + # SafeQueue[void*] *inqueue + # SafeQueue[void*] *outqueue + # int i, sample, samples, samplesperbuf + # double meas_seconds, samplerate + # double* inbuf + # double* outbuf - inqueue = new SafeQueue[void*]() - outqueue = new SafeQueue[void*]() + # inqueue = new SafeQueue[void*]() + # outqueue = new SafeQueue[void*]() - samplesperbuf = 512 - samplerate = 10000 - meas_seconds = 4 + # samplesperbuf = 512 + # samplerate = 10000 + # meas_seconds = 4 - for i in range(int(meas_seconds*samplerate/samplesperbuf)): - outbuf = malloc(sizeof(double)*samplesperbuf) - for sample in range(samplesperbuf): - outbuf[sample] = 1 - outqueue.enqueue( outbuf) + # for i in range(int(meas_seconds*samplerate/samplesperbuf)): + # outbuf = malloc(sizeof(double)*samplesperbuf) + # for sample in range(samplesperbuf): + # outbuf[sample] = 1 + # outqueue.enqueue( outbuf) - self.startScan( - samplesperbuf, - samplerate, # Sample rate - inqueue, - outqueue) + # self.startScan( + # samplesperbuf, + # samplerate, # Sample rate + # inqueue, + # outqueue) - CPPsleep_ms(int(0.5*1000*meas_seconds)) - self.stop() - while not inqueue.empty(): - inbuf = inqueue.dequeue() - for sample in range(samplesperbuf): - print(f'Value monitor: {inbuf[sample]:1.2f}, value input: {inbuf[samplesperbuf+sample]:1.2f}') - pass - free(inbuf) + # CPPsleep_ms(int(0.5*1000*meas_seconds)) + # self.stop() + # while not inqueue.empty(): + # inbuf = inqueue.dequeue() + # for sample in range(samplesperbuf): + # print(f'Value monitor: {inbuf[sample]:1.2f}, value input: {inbuf[samplesperbuf+sample]:1.2f}') + # pass + # free(inbuf) - while not outqueue.empty(): - outbuf = outqueue.dequeue() - free(outbuf) - del inqueue - del outqueue + # while not outqueue.empty(): + # outbuf = outqueue.dequeue() + # free(outbuf) + # del inqueue + # del outqueue def stopScan(self): + print(f'UlDT9837A: stopScan()') self.cleanupThreadData(self.td) self.td = NULL @@ -503,9 +543,16 @@ cdef class UlDT9837A: IepeMode iepe CouplingMode cm - if chnum >= self.ninchannels: + if chnum > 3: raise RuntimeError('Invalid input channel number') + # if chnum == 0: + # fprintf(stderr, '====================== BIG WARNING ==============\n') + # fprintf(stderr, 'We override IEPE to enabled on ch 0\n') + # channelconfig.IEPE_enabled = True + # fprintf(stderr, '====================== END BIG WARNING ==============\n') + + self.input_range[chnum] = True if channelconfig.range_ == pyRange.tenV else False self.enabled_inputs[chnum] = channelconfig.channel_enabled @@ -521,8 +568,12 @@ cdef class UlDT9837A: if err != ERR_NO_ERROR: raise RuntimeError('Fatal: could not set coupling mode') + # err = ulAISetConfigDbl(self.handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, + # chnum, channelconfig.sensitivity) err = ulAISetConfigDbl(self.handle, AI_CFG_CHAN_SENSOR_SENSITIVITY, - chnum, channelconfig.sensitivity) + chnum, 1.0) + # TODO: Fix this problem, of setting sensitivity twice, how do we do it + # in the future? if err != ERR_NO_ERROR: raise RuntimeError('Fatal: could not set sensitivity') @@ -552,6 +603,7 @@ cdef class UlDT9837A: fprintf(stderr, 'cleanupThreadData()\n') if td is NULL: + printf('TD is zero\n') return td.stopThread.store(True) @@ -565,6 +617,8 @@ cdef class UlDT9837A: del td.outThread fprintf(stderr, 'SFSG1\n') + del td.tdmutex + if td.inChanDescriptors: free(td.inChanDescriptors) @@ -627,33 +681,33 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: with gil: npy_format = cnp.NPY_FLOAT64 callback = sd.pyCallback + print(f'Number of input channels: {ninchannels}') + print(f'Number of out channels: {noutchannels}') - while True: - if sd.stopThread.load() == True: - printf('Stopping thread...\n') - return - - if sd.inQueue: - # fprintf(stderr, "Waiting on in queue\n") - inbuffer = sd.inQueue.dequeue() - if not inbuffer: - printf('Stopping thread...\n') - return + while not sd.stopThread.load(): if sd.outQueue: - # fprintf(stderr, 'Allocating output buffer...\n') outbuffer = malloc(nBytesPerChan*noutchannels) - # memset(outbuffer, 0, nBytesPerChan*noutchannels) + + if sd.inQueue: + if not sd.outQueue: + inbuffer = sd.inQueue.dequeue() + if inbuffer == NULL: + printf('Stopping thread...\n') + return + else: + if not sd.inQueue.empty(): + inbuffer = sd.inQueue.dequeue() + else: + inbuffer = NULL with gil: # Obtain stream information npy_input = None npy_output = None - - if sd.inQueue: + if sd.inQueue and inbuffer: try: - npy_input = data_to_ndarray( inbuffer, nFramesPerBlock, @@ -666,6 +720,20 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: print('exception in cython callback for audio input: ', str(e)) return + if sd.outQueue: + # fprintf(stderr, 'Copying output buffer to Numpy...\n') + try: + npy_output = data_to_ndarray( + outbuffer, + nFramesPerBlock, + noutchannels, + npy_format, + False, # Do not transfer ownership + True) # F-contiguous + + except Exception as e: + print('exception in Cython callback for audio output: ', str(e)) + return try: rval = callback(npy_input, @@ -721,6 +789,10 @@ cdef class UlDaq: unsigned int numdevs = MAX_DEF_COUNT unsigned deviceno + if self.sd is not NULL or self.daq_device is not None: + assert self.daq_device is not None + raise RuntimeError('Cannot acquire device info: stream is already opened.') + err = ulGetDaqDeviceInventory(interfaceType, devdescriptors, &numdevs) @@ -772,6 +844,7 @@ cdef class UlDaq: """ if self.sd is not NULL: + assert self.daq_device is not None raise RuntimeError('Stream is already opened.') daqconfig = avstream.daqconfig @@ -785,44 +858,47 @@ cdef class UlDaq: unsigned int nFramesPerBlock = daqconfig.nFramesPerBlock unsigned int samplerate - unsigned int ninchannels = 0, noutchannels = 0 - int i - bint in_stream=False, output_stream=False + bint in_stream=False + bint out_stream=False if daqconfig.nFramesPerBlock > 8192 or daqconfig.nFramesPerBlock < 512: raise ValueError('Invalid number of nFramesPerBlock') if daqconfig.outputDelayBlocks != 0: - raise ValueError('OutputDelayBlocks not supported by API') + print('WARNING: OutputDelayBlocks not supported by API') # Determine sample rate and sample format, determine whether we are an # in or an output stream, or both + print(f'AvType: {avtype}') + print(f'Dup: {duplex_mode}') if avtype == AvType.audio_input or duplex_mode: # Here, we override the sample format in case of duplex mode. sampleformat = daqconfig.en_input_sample_format samplerate = int(daqconfig.en_input_rate) in_stream = True if duplex_mode: + fprintf(stderr, 'Duplex mode enabled\n') out_stream = True - else: + elif avtype == AvType.audio_output: sampleformat = daqconfig.en_output_sample_format samplerate = int(daqconfig.en_output_rate) out_stream = True - - if 'DT9837A' in device.name: - self.daq_device = UlDT9837A(device.deviceno) else: - raise RuntimeError(f'Device {device.name} not found or not configured') + raise ValueError(f'Invalid stream type {avtype}') + + if out_stream and daqconfig.firstEnabledOutputChannelNumber() == -1: + raise RuntimeError('No output channels enabled') + + if in_stream and daqconfig.firstEnabledInputChannelNumber() == -1: + raise RuntimeError('No input channels enabled') + # All set, allocate the stream! self.sd = malloc(sizeof(PyStreamData)) if self.sd == NULL: raise MemoryError('Could not allocate stream: memory error.') - self.sd.pyCallback = avstream._audioCallback - # Increase reference count to the callback - Py_INCREF( avstream._audioCallback) self.sd.stopThread.store(False) self.sd.inQueue = NULL @@ -833,31 +909,48 @@ cdef class UlDaq: self.sd.ninchannels = 0 self.sd.noutchannels = 0 self.sd.nBytesPerChan = daqconfig.nFramesPerBlock*sizeof(double) + self.sd.nFramesPerBlock = daqconfig.nFramesPerBlock + + if 'DT9837A' in device.name: + self.daq_device = UlDT9837A(device.index) + else: + raise RuntimeError(f'Device {device.name} not found or not configured') # Create channel maps for in channels, set in stream # parameters if in_stream: + print('Stream is input stream') for i, ch in enumerate(daqconfig.getInputChannels()): if ch.channel_enabled: - ninchannels += 1 + self.sd.ninchannels += 1 self.daq_device.setInputChannelConfig(i, ch) self.sd.inQueue = new SafeQueue[void*]() - self.sd.ninchannels = ninchannels - + # Create channel maps for output channels if out_stream: + print('Stream is output stream') for i, ch in enumerate(daqconfig.getOutputChannels()): if ch.channel_enabled: - noutchannels += 1 - self.daq_device.setOutputChannelConfig(i, ch) + self.sd.noutchannels += 1 + self.daq_device.setOutputChannelConfig(i, ch, monitorOutput) self.sd.outQueue = new SafeQueue[void*]() - self.sd.noutchannels = noutchannels if monitorOutput and duplex_mode: - self.sd.ninchannels += noutchannels + self.sd.ninchannels += self.sd.noutchannels + + self.sd.pyCallback = avstream._audioCallback + # Increase reference count to the callback + Py_INCREF( avstream._audioCallback) + + + self.daq_device.startScan( + daqconfig.nFramesPerBlock, + samplerate, + self.sd.inQueue, + self.sd.outQueue) with nogil: self.sd.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, @@ -866,11 +959,6 @@ cdef class UlDaq: # Allow it to start CPPsleep_ms(500) - self.daq_device.startScan( - daqconfig.nFramesPerBlock, - samplerate, - self.sd.inQueue, - self.sd.outQueue) return nFramesPerBlock, self.daq_device.td.samplerate @@ -878,10 +966,12 @@ cdef class UlDaq: if self.sd is NULL: raise RuntimeError('Stream is not opened') - self.uldaq.stopScan() + self.daq_device.stopScan() + self.daq_device = None self.cleanupStream(self.sd) self.sd = NULL + cdef cleanupStream(self, PyStreamData* sd): with nogil: diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index 6ffdaf0..8f8f91e 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -11,7 +11,8 @@ import numpy as np import time from .device import (RtAudio, DeviceInfo, DAQConfiguration, get_numpy_dtype_from_format_string, - get_sampwidth_from_format_string, AvType) + get_sampwidth_from_format_string, AvType, + UlDaq) __all__ = ['AvStream'] @@ -108,7 +109,9 @@ class AvStream: # Possible, but long not tested: store video self._videothread = None - self._audiobackend = RtAudio(daqconfig.api) + # self._audiobackend = RtAudio(daqconfig.api) + self._audiobackend = UlDaq() + self.blocksize = daqconfig.nFramesPerBlock def nCallbacks(self): """Returns the current number of installed callbacks.""" diff --git a/scripts/lasp_record b/scripts/lasp_record index cb5c5d0..d2a4ade 100755 --- a/scripts/lasp_record +++ b/scripts/lasp_record @@ -1,6 +1,6 @@ #!/usr/bin/python3 import argparse - +import sys parser = argparse.ArgumentParser( @@ -21,13 +21,32 @@ args = parser.parse_args() from lasp.lasp_avstream import AvStream, AvType from lasp.lasp_record import Recording -from lasp.device import DAQConfiguration, RtAudio +from lasp.device import DAQConfiguration, RtAudio, UlDaq + +configs = DAQConfiguration.loadConfigs() + +for i, (key, val) in enumerate(configs.items()): + print(f'{i:2} : {key}') + +daqindex = input('Please enter required config: ') +try: + daqindex = int(daqindex) +except: + sys.exit(0) + +for i, (key, val) in enumerate(configs.items()): + if i == daqindex: + config = configs[key] + + +config = configs[key] + -config = DAQConfiguration.loadConfigs()[args.input_daq] print(config) -rtaudio = RtAudio() -devices = rtaudio.getDeviceInfo() +# daq = RtAudio() +daq = UlDaq() +devices = daq.getDeviceInfo() input_devices = {} for device in devices: @@ -45,7 +64,9 @@ stream = AvStream(input_device, AvType.audio_input, config) + rec = Recording(args.filename, stream, args.duration) +stream.start() with rec: pass @@ -54,5 +75,4 @@ stream.stop() print('Stream stopped') print('Closing stream...') -stream.close() print('Stream closed')