Daq measurement interface changed to permanently work with UlDaq. Should become a choice in future. Simultaneous output and input not yet working. Probably the DAQ API is not working properly with threads. We should find a minimum working example of simultaneous input/output. Then, the UlDAQ and RtAudio details should be abstracted away in a common interface, written in C++

This commit is contained in:
Anne de Jong 2020-09-18 08:52:56 +02:00
parent 1f9279ff86
commit fa0c241fb9
4 changed files with 209 additions and 94 deletions

View File

@ -29,4 +29,6 @@ void CPPsleep_ms(unsigned int ms) {
void CPPsleep_us(unsigned int us) {
std::this_thread::sleep_for(std::chrono::microseconds(us));
}
#endif // LASP_CPPTHREAD_H

View File

@ -24,6 +24,8 @@ cdef struct DaqThreadData:
unsigned noutChanDescriptors
atomic[bool] stopThread
atomic[bool] outThread_ready
mutex* tdmutex
CPPThread[void*, void (*)(void*)] *inThread
CPPThread[void*, void (*)(void*)] *outThread
@ -70,7 +72,10 @@ cdef void inThreadFunction(void* threaddata_void) nogil:
bottom_enqueued = True
fprintf(stderr, 'Starting input thread\n')
while not td.outThread_ready.load():
CPPsleep_ms(50)
td.tdmutex.lock()
err = ulDaqInScan(td.handle,
td.inChanDescriptors,
td.ninChanDescriptors,
@ -79,6 +84,7 @@ cdef void inThreadFunction(void* threaddata_void) nogil:
scanoptions,
inscanflags,
td.inbuffer)
td.tdmutex.unlock()
fprintf(stderr, 'Actual input sampling rate: %0.2f\n', samplerate)
if err != ERR_NO_ERROR:
@ -87,16 +93,21 @@ cdef void inThreadFunction(void* threaddata_void) nogil:
return
td.tdmutex.lock()
err = ulDaqInScanStatus(td.handle, &scanstat, &xstat)
td.tdmutex.unlock()
if err != ERR_NO_ERROR:
fprintf(stderr, 'Error obtaining input scan status\n')
showErr(err)
return
while td.stopThread.load() == False and err == ERR_NO_ERROR:
td.tdmutex.lock()
err = ulDaqInScanStatus(td.handle, &scanstat, &xstat)
td.tdmutex.unlock()
if err != ERR_NO_ERROR:
showErr(err)
break
if xstat.currentIndex < buffer_mid_idx:
top_enqueued = False
@ -132,7 +143,9 @@ cdef void inThreadFunction(void* threaddata_void) nogil:
fprintf(stderr, 'Exit while loop input thread\n')
td.tdmutex.lock()
err = ulDaqInScanStop(td.handle)
td.tdmutex.unlock()
if err != ERR_NO_ERROR:
fprintf(stderr, "Error stopping DAQ input thread\n")
showErr(err)
@ -184,6 +197,9 @@ cdef void outThreadFunction(void* threaddata_void) nogil:
fprintf(stderr, 'Starting output thread\n')
td.tdmutex.lock()
fprintf(stderr, 'mutex locked\n')
err = ulAOutScan(td.handle,
0,
0,
@ -193,6 +209,8 @@ cdef void outThreadFunction(void* threaddata_void) nogil:
scanoptions,
outscanflags,
td.outbuffer)
fprintf(stderr, 'returned\n')
td.tdmutex.unlock()
fprintf(stderr, 'Actual output sampling rate: %0.2f\n', samplerate)
if err != ERR_NO_ERROR:
@ -201,29 +219,40 @@ cdef void outThreadFunction(void* threaddata_void) nogil:
return
td.tdmutex.lock()
err = ulAOutScanStatus(td.handle, &scanstat, &xstat)
td.tdmutex.unlock()
if err != ERR_NO_ERROR:
showErr(err)
return
td.outThread_ready.store(True)
while td.stopThread.load() == False:
# printf('Running output thread in loop\n')
td.tdmutex.lock()
err = ulAOutScanStatus(td.handle, &scanstat, &xstat)
td.tdmutex.unlock()
if err != ERR_NO_ERROR:
showErr(err)
break
if xstat.currentIndex < buffer_mid_idx:
top_enqueued = False
# fprintf('xstat.currentIndex'
if not bottom_enqueued:
# Copy the bottom of the buffer to the queue, while transposing
# it.
if not td.outQueue.empty():
outbuffer_cpy = <double*> td.outQueue.dequeue()
else:
outbuffer_cpy = <double*> malloc(sizeof(double)*td.noutChanDescriptors*td.samplesPerBlock)
for chan in range(td.noutChanDescriptors):
for sample in range(td.samplesPerBlock):
outbuffer_cpy[chan*td.samplesPerBlock+sample] = 0.0
# outbuffer_cpy = <double*> malloc(sizeof(double)*td.noutChanDescriptors*td.samplesPerBlock)
outbuffer_cpy = <double*> malloc(sizeof(double)*td.samplesPerBlock)
# for chan in range(td.noutChanDescriptors):
# for sample in range(td.samplesPerBlock):
# outbuffer_cpy[chan*td.samplesPerBlock+sample] = 0.0
for sample in range(td.samplesPerBlock):
outbuffer_cpy[sample] = 0.0
fprintf(stderr, 'Output buffer queue empty! Filling with zeros\n')
for chan in range(td.noutChanDescriptors):
@ -243,9 +272,11 @@ cdef void outThreadFunction(void* threaddata_void) nogil:
outbuffer_cpy = <double*> td.outQueue.dequeue()
else:
outbuffer_cpy = <double*> malloc(sizeof(double)*td.noutChanDescriptors*td.samplesPerBlock)
for chan in range(td.noutChanDescriptors):
for sample in range(td.samplesPerBlock):
outbuffer_cpy[chan*td.samplesPerBlock+sample] = 0.0
# for chan in range(td.noutChanDescriptors):
# for sample in range(td.samplesPerBlock):
# outbuffer_cpy[chan*td.samplesPerBlock+sample] = 0.0
for sample in range(td.samplesPerBlock):
outbuffer_cpy[sample] = 0.0
fprintf(stderr, 'Output buffer queue empty! Filling with zeros\n')
for chan in range(td.noutChanDescriptors):
@ -263,7 +294,10 @@ cdef void outThreadFunction(void* threaddata_void) nogil:
fprintf(stderr, 'Exit while loop output thread\n')
td.tdmutex.lock()
err = ulAOutScanStop(td.handle)
td.tdmutex.unlock()
if err != ERR_NO_ERROR:
fprintf(stderr, "Error stopping AOut output thread\n")
showErr(err)
@ -366,9 +400,14 @@ cdef class UlDT9837A:
td.outThread = NULL
td.stopThread.store(False)
td.tdmutex = new mutex()
td.inbuffer = NULL
td.outbuffer = NULL
if not outQueue:
td.outThread_ready.store(True)
else:
td.outThread_ready.store(False)
# Configure INPUTS
py_nnormalinchannels = sum([1 if en_input else 0 for en_input in
@ -447,50 +486,51 @@ cdef class UlDT9837A:
raise RuntimeError('Error creating thread')
self.td = td
def start(self):
cdef:
SafeQueue[void*] *inqueue
SafeQueue[void*] *outqueue
int i, sample, samples, samplesperbuf
double meas_seconds, samplerate
double* inbuf
double* outbuf
# def start(self):
# cdef:
# SafeQueue[void*] *inqueue
# SafeQueue[void*] *outqueue
# int i, sample, samples, samplesperbuf
# double meas_seconds, samplerate
# double* inbuf
# double* outbuf
inqueue = new SafeQueue[void*]()
outqueue = new SafeQueue[void*]()
# inqueue = new SafeQueue[void*]()
# outqueue = new SafeQueue[void*]()
samplesperbuf = 512
samplerate = 10000
meas_seconds = 4
# samplesperbuf = 512
# samplerate = 10000
# meas_seconds = 4
for i in range(int(meas_seconds*samplerate/samplesperbuf)):
outbuf = <double*> malloc(sizeof(double)*samplesperbuf)
for sample in range(samplesperbuf):
outbuf[sample] = <double> 1
outqueue.enqueue(<void*> outbuf)
# for i in range(int(meas_seconds*samplerate/samplesperbuf)):
# outbuf = <double*> malloc(sizeof(double)*samplesperbuf)
# for sample in range(samplesperbuf):
# outbuf[sample] = <double> 1
# outqueue.enqueue(<void*> outbuf)
self.startScan(
samplesperbuf,
samplerate, # Sample rate
inqueue,
outqueue)
# self.startScan(
# samplesperbuf,
# samplerate, # Sample rate
# inqueue,
# outqueue)
CPPsleep_ms(int(0.5*1000*meas_seconds))
self.stop()
while not inqueue.empty():
inbuf = <double*> inqueue.dequeue()
for sample in range(samplesperbuf):
print(f'Value monitor: {inbuf[sample]:1.2f}, value input: {inbuf[samplesperbuf+sample]:1.2f}')
pass
free(inbuf)
# CPPsleep_ms(int(0.5*1000*meas_seconds))
# self.stop()
# while not inqueue.empty():
# inbuf = <double*> inqueue.dequeue()
# for sample in range(samplesperbuf):
# print(f'Value monitor: {inbuf[sample]:1.2f}, value input: {inbuf[samplesperbuf+sample]:1.2f}')
# pass
# free(inbuf)
while not outqueue.empty():
outbuf = <double*> outqueue.dequeue()
free(outbuf)
del inqueue
del outqueue
# while not outqueue.empty():
# outbuf = <double*> outqueue.dequeue()
# free(outbuf)
# del inqueue
# del outqueue
def stopScan(self):
print(f'UlDT9837A: stopScan()')
self.cleanupThreadData(self.td)
self.td = NULL
@ -503,9 +543,16 @@ cdef class UlDT9837A:
IepeMode iepe
CouplingMode cm
if chnum >= self.ninchannels:
if chnum > 3:
raise RuntimeError('Invalid input channel number')
# if chnum == 0:
# fprintf(stderr, '====================== BIG WARNING ==============\n')
# fprintf(stderr, 'We override IEPE to enabled on ch 0\n')
# channelconfig.IEPE_enabled = True
# fprintf(stderr, '====================== END BIG WARNING ==============\n')
self.input_range[chnum] = True if channelconfig.range_ == pyRange.tenV else False
self.enabled_inputs[chnum] = channelconfig.channel_enabled
@ -521,8 +568,12 @@ cdef class UlDT9837A:
if err != ERR_NO_ERROR:
raise RuntimeError('Fatal: could not set coupling mode')
# err = ulAISetConfigDbl(self.handle, AI_CFG_CHAN_SENSOR_SENSITIVITY,
# chnum, channelconfig.sensitivity)
err = ulAISetConfigDbl(self.handle, AI_CFG_CHAN_SENSOR_SENSITIVITY,
chnum, channelconfig.sensitivity)
chnum, 1.0)
# TODO: Fix this problem, of setting sensitivity twice, how do we do it
# in the future?
if err != ERR_NO_ERROR:
raise RuntimeError('Fatal: could not set sensitivity')
@ -552,6 +603,7 @@ cdef class UlDT9837A:
fprintf(stderr, 'cleanupThreadData()\n')
if td is NULL:
printf('TD is zero\n')
return
td.stopThread.store(True)
@ -565,6 +617,8 @@ cdef class UlDT9837A:
del td.outThread
fprintf(stderr, 'SFSG1\n')
del td.tdmutex
if td.inChanDescriptors:
free(td.inChanDescriptors)
@ -627,33 +681,33 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil:
with gil:
npy_format = cnp.NPY_FLOAT64
callback = <object> sd.pyCallback
print(f'Number of input channels: {ninchannels}')
print(f'Number of out channels: {noutchannels}')
while True:
if sd.stopThread.load() == True:
printf('Stopping thread...\n')
return
if sd.inQueue:
# fprintf(stderr, "Waiting on in queue\n")
inbuffer = sd.inQueue.dequeue()
if not inbuffer:
printf('Stopping thread...\n')
return
while not sd.stopThread.load():
if sd.outQueue:
# fprintf(stderr, 'Allocating output buffer...\n')
outbuffer = malloc(nBytesPerChan*noutchannels)
# memset(outbuffer, 0, nBytesPerChan*noutchannels)
if sd.inQueue:
if not sd.outQueue:
inbuffer = sd.inQueue.dequeue()
if inbuffer == NULL:
printf('Stopping thread...\n')
return
else:
if not sd.inQueue.empty():
inbuffer = sd.inQueue.dequeue()
else:
inbuffer = NULL
with gil:
# Obtain stream information
npy_input = None
npy_output = None
if sd.inQueue:
if sd.inQueue and inbuffer:
try:
npy_input = <object> data_to_ndarray(
inbuffer,
nFramesPerBlock,
@ -666,6 +720,20 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil:
print('exception in cython callback for audio input: ', str(e))
return
if sd.outQueue:
# fprintf(stderr, 'Copying output buffer to Numpy...\n')
try:
npy_output = <object> data_to_ndarray(
outbuffer,
nFramesPerBlock,
noutchannels,
npy_format,
False, # Do not transfer ownership
True) # F-contiguous
except Exception as e:
print('exception in Cython callback for audio output: ', str(e))
return
try:
rval = callback(npy_input,
@ -721,6 +789,10 @@ cdef class UlDaq:
unsigned int numdevs = MAX_DEF_COUNT
unsigned deviceno
if self.sd is not NULL or self.daq_device is not None:
assert self.daq_device is not None
raise RuntimeError('Cannot acquire device info: stream is already opened.')
err = ulGetDaqDeviceInventory(interfaceType,
devdescriptors,
&numdevs)
@ -772,6 +844,7 @@ cdef class UlDaq:
"""
if self.sd is not NULL:
assert self.daq_device is not None
raise RuntimeError('Stream is already opened.')
daqconfig = avstream.daqconfig
@ -785,44 +858,47 @@ cdef class UlDaq:
unsigned int nFramesPerBlock = daqconfig.nFramesPerBlock
unsigned int samplerate
unsigned int ninchannels = 0, noutchannels = 0
int i
bint in_stream=False, output_stream=False
bint in_stream=False
bint out_stream=False
if daqconfig.nFramesPerBlock > 8192 or daqconfig.nFramesPerBlock < 512:
raise ValueError('Invalid number of nFramesPerBlock')
if daqconfig.outputDelayBlocks != 0:
raise ValueError('OutputDelayBlocks not supported by API')
print('WARNING: OutputDelayBlocks not supported by API')
# Determine sample rate and sample format, determine whether we are an
# in or an output stream, or both
print(f'AvType: {avtype}')
print(f'Dup: {duplex_mode}')
if avtype == AvType.audio_input or duplex_mode:
# Here, we override the sample format in case of duplex mode.
sampleformat = daqconfig.en_input_sample_format
samplerate = int(daqconfig.en_input_rate)
in_stream = True
if duplex_mode:
fprintf(stderr, 'Duplex mode enabled\n')
out_stream = True
else:
elif avtype == AvType.audio_output:
sampleformat = daqconfig.en_output_sample_format
samplerate = int(daqconfig.en_output_rate)
out_stream = True
if 'DT9837A' in device.name:
self.daq_device = UlDT9837A(device.deviceno)
else:
raise RuntimeError(f'Device {device.name} not found or not configured')
raise ValueError(f'Invalid stream type {avtype}')
if out_stream and daqconfig.firstEnabledOutputChannelNumber() == -1:
raise RuntimeError('No output channels enabled')
if in_stream and daqconfig.firstEnabledInputChannelNumber() == -1:
raise RuntimeError('No input channels enabled')
# All set, allocate the stream!
self.sd = <PyStreamData*> malloc(sizeof(PyStreamData))
if self.sd == NULL:
raise MemoryError('Could not allocate stream: memory error.')
self.sd.pyCallback = <PyObject*> avstream._audioCallback
# Increase reference count to the callback
Py_INCREF(<object> avstream._audioCallback)
self.sd.stopThread.store(False)
self.sd.inQueue = NULL
@ -833,31 +909,48 @@ cdef class UlDaq:
self.sd.ninchannels = 0
self.sd.noutchannels = 0
self.sd.nBytesPerChan = daqconfig.nFramesPerBlock*sizeof(double)
self.sd.nFramesPerBlock = daqconfig.nFramesPerBlock
if 'DT9837A' in device.name:
self.daq_device = UlDT9837A(device.index)
else:
raise RuntimeError(f'Device {device.name} not found or not configured')
# Create channel maps for in channels, set in stream
# parameters
if in_stream:
print('Stream is input stream')
for i, ch in enumerate(daqconfig.getInputChannels()):
if ch.channel_enabled:
ninchannels += 1
self.sd.ninchannels += 1
self.daq_device.setInputChannelConfig(i, ch)
self.sd.inQueue = new SafeQueue[void*]()
self.sd.ninchannels = ninchannels
# Create channel maps for output channels
if out_stream:
print('Stream is output stream')
for i, ch in enumerate(daqconfig.getOutputChannels()):
if ch.channel_enabled:
noutchannels += 1
self.daq_device.setOutputChannelConfig(i, ch)
self.sd.noutchannels += 1
self.daq_device.setOutputChannelConfig(i, ch, monitorOutput)
self.sd.outQueue = new SafeQueue[void*]()
self.sd.noutchannels = noutchannels
if monitorOutput and duplex_mode:
self.sd.ninchannels += noutchannels
self.sd.ninchannels += self.sd.noutchannels
self.sd.pyCallback = <PyObject*> avstream._audioCallback
# Increase reference count to the callback
Py_INCREF(<object> avstream._audioCallback)
self.daq_device.startScan(
daqconfig.nFramesPerBlock,
samplerate,
self.sd.inQueue,
self.sd.outQueue)
with nogil:
self.sd.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction,
@ -866,11 +959,6 @@ cdef class UlDaq:
# Allow it to start
CPPsleep_ms(500)
self.daq_device.startScan(
daqconfig.nFramesPerBlock,
samplerate,
self.sd.inQueue,
self.sd.outQueue)
return nFramesPerBlock, self.daq_device.td.samplerate
@ -878,10 +966,12 @@ cdef class UlDaq:
if self.sd is NULL:
raise RuntimeError('Stream is not opened')
self.uldaq.stopScan()
self.daq_device.stopScan()
self.daq_device = None
self.cleanupStream(self.sd)
self.sd = NULL
cdef cleanupStream(self, PyStreamData* sd):
with nogil:

View File

@ -11,7 +11,8 @@ import numpy as np
import time
from .device import (RtAudio, DeviceInfo, DAQConfiguration,
get_numpy_dtype_from_format_string,
get_sampwidth_from_format_string, AvType)
get_sampwidth_from_format_string, AvType,
UlDaq)
__all__ = ['AvStream']
@ -108,7 +109,9 @@ class AvStream:
# Possible, but long not tested: store video
self._videothread = None
self._audiobackend = RtAudio(daqconfig.api)
# self._audiobackend = RtAudio(daqconfig.api)
self._audiobackend = UlDaq()
self.blocksize = daqconfig.nFramesPerBlock
def nCallbacks(self):
"""Returns the current number of installed callbacks."""

View File

@ -1,6 +1,6 @@
#!/usr/bin/python3
import argparse
import sys
parser = argparse.ArgumentParser(
@ -21,13 +21,32 @@ args = parser.parse_args()
from lasp.lasp_avstream import AvStream, AvType
from lasp.lasp_record import Recording
from lasp.device import DAQConfiguration, RtAudio
from lasp.device import DAQConfiguration, RtAudio, UlDaq
configs = DAQConfiguration.loadConfigs()
for i, (key, val) in enumerate(configs.items()):
print(f'{i:2} : {key}')
daqindex = input('Please enter required config: ')
try:
daqindex = int(daqindex)
except:
sys.exit(0)
for i, (key, val) in enumerate(configs.items()):
if i == daqindex:
config = configs[key]
config = configs[key]
config = DAQConfiguration.loadConfigs()[args.input_daq]
print(config)
rtaudio = RtAudio()
devices = rtaudio.getDeviceInfo()
# daq = RtAudio()
daq = UlDaq()
devices = daq.getDeviceInfo()
input_devices = {}
for device in devices:
@ -45,7 +64,9 @@ stream = AvStream(input_device,
AvType.audio_input,
config)
rec = Recording(args.filename, stream, args.duration)
stream.start()
with rec:
pass
@ -54,5 +75,4 @@ stream.stop()
print('Stream stopped')
print('Closing stream...')
stream.close()
print('Stream closed')