From 49ee42bb01733ee03afa57fd4cdf1e2ab42693b7 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Tue, 15 Sep 2020 14:16:48 +0200 Subject: [PATCH] WORKING in and outputgit status --- lasp/device/lasp_common_decls.pxd | 3 +- lasp/device/lasp_cppthread.h | 5 +- lasp/device/lasp_daqconfig.py | 6 +- lasp/device/lasp_rtaudio.pyx | 4 +- lasp/device/lasp_uldaq.pxd | 58 +-- lasp/device/lasp_uldaq.pyx | 799 ++++++++++++++++++++---------- 6 files changed, 576 insertions(+), 299 deletions(-) diff --git a/lasp/device/lasp_common_decls.pxd b/lasp/device/lasp_common_decls.pxd index a086ad4..1f4752c 100644 --- a/lasp/device/lasp_common_decls.pxd +++ b/lasp/device/lasp_common_decls.pxd @@ -19,7 +19,8 @@ cdef extern from "lasp_cppthread.h" nogil: CPPThread(F threadfunction, T data) void join() - void CPPsleep(unsigned int ms) + void CPPsleep_ms(unsigned int ms) + void CPPsleep_us(unsigned int us) cdef extern from "lasp_cppqueue.h" nogil: cdef cppclass SafeQueue[T]: diff --git a/lasp/device/lasp_cppthread.h b/lasp/device/lasp_cppthread.h index d2b3125..77bd61a 100644 --- a/lasp/device/lasp_cppthread.h +++ b/lasp/device/lasp_cppthread.h @@ -23,7 +23,10 @@ class CPPThread { }; -void CPPsleep(unsigned int ms) { +void CPPsleep_ms(unsigned int ms) { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); } +void CPPsleep_us(unsigned int us) { + std::this_thread::sleep_for(std::chrono::microseconds(us)); +} #endif // LASP_CPPTHREAD_H diff --git a/lasp/device/lasp_daqconfig.py b/lasp/device/lasp_daqconfig.py index fbad2b0..c750047 100644 --- a/lasp/device/lasp_daqconfig.py +++ b/lasp/device/lasp_daqconfig.py @@ -49,9 +49,9 @@ class DeviceInfo: @dataclass class DAQChannel: channel_enabled: bool - channel_name: str - sensitivity: float - qty: Qty + channel_name: str = 'Unnamed channel' + sensitivity: float = 1.0 + qty: Qty = SIQtys.default range_: str = 'Undefined' IEPE_enabled: bool = False diff --git a/lasp/device/lasp_rtaudio.pyx b/lasp/device/lasp_rtaudio.pyx index 2c3e7fb..eeb6341 100644 --- a/lasp/device/lasp_rtaudio.pyx +++ b/lasp/device/lasp_rtaudio.pyx @@ -422,7 +422,7 @@ cdef void audioCallbackPythonThreadFunction(void* voidstream) nogil: while stream.outputQueue.size() > 10 and not stream.stopThread.load(): # printf('Sleeping...\n') # No input queue to wait on, so we relax a bit here. - CPPsleep(1); + CPPsleep_ms(1); # Outputbuffer is free'ed by the audiothread, so should not be touched # here. @@ -693,7 +693,7 @@ cdef class RtAudio: self._stream.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, self._stream) # Allow it to start - CPPsleep(500) + CPPsleep_ms(500) pass return nFramesPerBlock diff --git a/lasp/device/lasp_uldaq.pxd b/lasp/device/lasp_uldaq.pxd index a109da1..5421f1a 100644 --- a/lasp/device/lasp_uldaq.pxd +++ b/lasp/device/lasp_uldaq.pxd @@ -1,5 +1,6 @@ include "lasp_common_decls.pxd" + cdef extern from "uldaq.h" nogil: ctypedef enum DaqDeviceInterface: @@ -24,6 +25,8 @@ cdef extern from "uldaq.h" nogil: long long currentIndex char reserved[64] + UlError ulGetErrMsg(UlError, char* errMsg) + ctypedef enum UlError: ERR_NO_ERROR ERR_UNHANDLED_EXCEPTION @@ -136,11 +139,6 @@ cdef extern from "uldaq.h" nogil: ERR_NET_BUFFER_OVERRUN ERR_BAD_NET_BUFFER - ctypedef enum AiInputMode: - AI_DIFFERENTIAL = 1, - AI_SINGLE_ENDED = 2, - AI_PSEUDO_DIFFERENTIAL = 3 - ctypedef enum Range: BIP10VOLTS BIP5VOLTS @@ -241,10 +239,11 @@ cdef extern from "uldaq.h" nogil: DAQINSCAN_FF_NOCALIBRATEDATA DAQINSCAN_FF_NOCLEAR - ctypedef enum DaqOutScanFlag: - DAQOUTSCAN_FF_DEFAULT - DAQOUTSCAN_FF_NOSCALEDATA - DAQOUTSCAN_FF_NOCALIBRATEDATA + ctypedef enum AOutScanFlag: + AOUTSCAN_FF_DEFAULT + AOUTSCAN_FF_NOSCALEDATA + AOUTSCAN_FF_NOCALIBRATEDATA + AOUTSCAN_FF_NOCLEAR ctypedef enum DaqInChanType: DAQI_ANALOG_DIFF @@ -260,15 +259,6 @@ cdef extern from "uldaq.h" nogil: DaqInChanType type Range range - ctypedef enum DaqOutChanType: - DAQO_ANALOG - DAQO_DIGITAL - - ctypedef struct DaqOutChanDescriptor: - int channel - DaqOutChanType type - Range range - ctypedef enum DaqEventType: DE_NONE DE_ON_DATA_AVAILABLE @@ -359,29 +349,13 @@ cdef extern from "uldaq.h" nogil: UlError ulDaqInScanStop(DaqDeviceHandle daqDeviceHandle); UlError ulDaqInScanWait(DaqDeviceHandle daqDeviceHandle, WaitType waitType, long long waitParam, double timeout); - UlError ulDaqOutScan(DaqDeviceHandle daqDeviceHandle, DaqOutChanDescriptor chanDescriptors[], int numChans, int samplesPerChan, double* rate, ScanOption options, DaqOutScanFlag flags, double data[]); - UlError ulDaqOutScanStatus(DaqDeviceHandle daqDeviceHandle, ScanStatus* status, TransferStatus* xferStatus); - UlError ulDaqOutScanStop(DaqDeviceHandle daqDeviceHandle); - UlError ulDaqOutSetTrigger(DaqDeviceHandle daqDeviceHandle, TriggerType type, DaqInChanDescriptor trigChanDescriptor, double level, double variance, unsigned int retriggerSampleCount); + ctypedef void (*DaqEventCallback)(DaqDeviceHandle, DaqEventType, unsigned long long, void*) + UlError ulEnableEvent(DaqDeviceHandle daqDeviceHandle, DaqEventType eventTypes, unsigned long long eventParameter, DaqEventCallback eventCallbackFunction, void* userData); + UlError ulDisableEvent(DaqDeviceHandle daqDeviceHandle, DaqEventType eventTypes) - # UlError ulAIGetConfigDbl(DaqDeviceHandle daqDeviceHandle, AiConfigItemDbl configItem, unsigned int index, double* configValue); - # UlError ulAIGetConfigStr(DaqDeviceHandle daqDeviceHandle, AiConfigItemStr configItem, unsigned int index, char* configStr, unsigned int* maxConfigLen); -# ctypedef enum AiIn -# { -# /** Returns the minimum scan rate in samples per second to the \p infoValue argument. Index is ignored. */ -# AI_INFO_MIN_SCAN_RATE = 1000, - -# /** Returns the maximum scan rate in samples per second to the \p infoValue argument. Index is ignored. */ -# AI_INFO_MAX_SCAN_RATE = 1001, - -# /** Returns the maximum throughput in samples per second to the \p infoValue argument. Index is ignored. */ -# AI_INFO_MAX_THROUGHPUT = 1002, - -# /** Returns the maximum scan rate in samples per second when using the ::SO_BURSTIO ScanOption to the \p infoValue argument. Index is ignored. */ -# AI_INFO_MAX_BURST_RATE = 1003, - -# /** Returns the maximum throughput in samples per second when using the ::SO_BURSTIO ScanOption to the \p infoValue argument. Index is ignored. */ -# AI_INFO_MAX_BURST_THROUGHPUT = 1004 -# }AiInfoItemDbl - + UlError ulAOutScan(DaqDeviceHandle daqDeviceHandle,int lowChan,int highChan,Range range,int samplesPerChan,double * rate,ScanOption options,AOutScanFlag flags,double data[]) + UlError ulAOutScanStatus(DaqDeviceHandle daqDeviceHandle,ScanStatus * status,TransferStatus * xferStatus) + UlError ulAOutScanStop(DaqDeviceHandle daqDeviceHandle) + UlError ulAOutScanWait(DaqDeviceHandle daqDeviceHandle,WaitType waitType,long long waitParam,double timeout) + UlError ulAOutSetTrigger(DaqDeviceHandle daqDeviceHandle,TriggerType type,int trigChan,double level,double variance,unsigned int retriggerSampleCount) diff --git a/lasp/device/lasp_uldaq.pyx b/lasp/device/lasp_uldaq.pyx index 04d6f47..f110cbe 100644 --- a/lasp/device/lasp_uldaq.pyx +++ b/lasp/device/lasp_uldaq.pyx @@ -6,6 +6,7 @@ from .lasp_avtype import AvType __all__ = ['UlDT9837A', 'UlDaq'] DEF MAX_DEF_COUNT = 100 +DEF UL_ERR_MSG_LEN = 512 cdef struct DaqThreadData: @@ -14,103 +15,290 @@ cdef struct DaqThreadData: DaqDeviceHandle handle - SafeQueue[void*] *inputQueue - SafeQueue[void*] *outputQueue + SafeQueue[void*] *inQueue + SafeQueue[void*] *outQueue DaqInChanDescriptor* inChanDescriptors - unsigned ninputChanDescriptors - - DaqOutChanDescriptor* outChanDescriptors - unsigned noutputChanDescriptors + unsigned ninChanDescriptors + unsigned noutChanDescriptors atomic[bool] stopThread - CPPThread[void*, void (*)(void*)] *thread -cdef void eventCallbackFunction(DaqDeviceHandle handle, DaqEventType eventType, - unsigned long long eventData, void* userData): + CPPThread[void*, void (*)(void*)] *inThread + CPPThread[void*, void (*)(void*)] *outThread - pass + double* inbuffer + double* outbuffer -cdef void ulThreadFunction(void* threaddata_void) nogil: + +cdef void showErr(UlError err) nogil: + cdef: + char errmsg[UL_ERR_MSG_LEN] + ulGetErrMsg(err, errmsg) + fprintf(stderr, 'UlError: %s\n', errmsg) + + +cdef void inThreadFunction(void* threaddata_void) nogil: """ - Stream thread function + Stream input thread function """ cdef: DaqThreadData* td = threaddata_void - TransferStatus instatus - TransferStatus outstatus + UlError err + ScanOption scanoptions DaqInScanFlag inscanflags - DaqOutScanFlag outscanflags - # AScanFlag inscanflags - double* outbuffer, inbuffer + TransferStatus xstat + ScanStatus scanstat + double samplerate + bint top_enqueued, bottom_enqueued + unsigned buffer_mid_idx, chan, sample - # inbuffer = NULL - # outbuffer = NULL + double sleep_time + double *inbuffer_cpy + unsigned sleep_time_mus # Thread sleep time in ms - inscanflags = DAQINSCAN_FF_NOSCALEDATA - outscanflags = DAQOUTSCAN_FF_NOSCALEDATA + samplerate = td.samplerate + sleep_time = td.samplesPerBlock / td.samplerate / 2 + sleep_time_mus = (sleep_time * 1e6) - scanoptions = SO_CONTINUOUS + inscanflags = DAQINSCAN_FF_DEFAULT + scanoptions = SO_CONTINUOUS + + buffer_mid_idx = td.samplesPerBlock*td.ninChanDescriptors + top_enqueued = True + bottom_enqueued = True + + fprintf(stderr, 'Starting input thread\n') + + err = ulDaqInScan(td.handle, + td.inChanDescriptors, + td.ninChanDescriptors, + 2*td.samplesPerBlock, # Watch the 2 here! + &samplerate, + scanoptions, + inscanflags, + td.inbuffer) + + fprintf(stderr, 'Actual input sampling rate: %0.2f\n', samplerate) + if err != ERR_NO_ERROR: + fprintf(stderr, 'Error starting data in\n') + showErr(err) + return - if td.noutputChanDescriptors > 0: - # Enable input and outputs - err = ulDaqOutScan(td.handle, - td.outChanDescriptors, - td.noutputChanDescriptors, - td.samplesPerBlock, - &td.samplerate, - scanoptions, - outscanflags, - outbuffer) - if err: - fprintf(stderr, 'Error starting data output\n') - return + err = ulDaqInScanStatus(td.handle, &scanstat, &xstat) + if err != ERR_NO_ERROR: + fprintf(stderr, 'Error obtaining input scan status\n') + showErr(err) + return + while td.stopThread.load() == False and err == ERR_NO_ERROR: + err = ulDaqInScanStatus(td.handle, &scanstat, &xstat) + if err != ERR_NO_ERROR: + showErr(err) + if xstat.currentIndex < buffer_mid_idx: + top_enqueued = False + if not bottom_enqueued: + # Copy the bottom of the buffer to the queue, while transposing + # it. + inbuffer_cpy = malloc(sizeof(double)*td.samplesPerBlock*td.ninChanDescriptors) + for chan in range(td.ninChanDescriptors): + for sample in range(td.samplesPerBlock): + inbuffer_cpy[chan*td.samplesPerBlock+sample] = td.inbuffer[buffer_mid_idx+sample*td.ninChanDescriptors+chan] + td.inQueue.enqueue(inbuffer_cpy) - if td.ninputChanDescriptors > 0: - # Enable input and outputs - # err = ulDaqOutScan(td.handle, - # td.outChanDescriptors, - # td.noutputChanDescriptors, - # td.samplesPerBlock, - # &td.samplerate, - # scanoptions, - # outscanflags, - # outbuffer) - # if err: - # fprintf(stderr, 'Error starting data output\n') - # return - pass + bottom_enqueued = True + else: + # fprintf(stderr,'sleep...\n') + CPPsleep_us(sleep_time_mus) + # fprintf(stderr,'awake...\n') + else: + bottom_enqueued = False + if not top_enqueued: + inbuffer_cpy = malloc(sizeof(double)*td.samplesPerBlock*td.ninChanDescriptors) + for chan in range(td.ninChanDescriptors): + for sample in range(td.samplesPerBlock): + inbuffer_cpy[chan*td.samplesPerBlock+sample] = td.inbuffer[sample*td.ninChanDescriptors+chan] + td.inQueue.enqueue(inbuffer_cpy) + + # Enqueue the top part of the queue + top_enqueued = True + else: + # fprintf(stderr,'sleep...\n') + CPPsleep_us(sleep_time_mus) + # fprintf(stderr,'awake...\n') + + fprintf(stderr, 'Exit while loop input thread\n') + + err = ulDaqInScanStop(td.handle) + if err != ERR_NO_ERROR: + fprintf(stderr, "Error stopping DAQ input thread\n") + showErr(err) + + return + +cdef void outThreadFunction(void* threaddata_void) nogil: + """ + Stream output thread function + """ + cdef: + DaqThreadData* td = threaddata_void + + UlError err + + ScanOption scanoptions + AOutScanFlag outscanflags + TransferStatus xstat + ScanStatus scanstat + bint top_enqueued, bottom_enqueued + unsigned buffer_mid_idx, chan, sample + + double sleep_time + double *outbuffer_cpy + unsigned sleep_time_mus # Thread sleep time in ms + double samplerate + + samplerate = td.samplerate + sleep_time = td.samplesPerBlock / td.samplerate / 2 + sleep_time_mus = (sleep_time * 1e6) + + # CAREFULL, a MEMCPY here is only allowed for a single channel!! + if not td.noutChanDescriptors == 1: + fprintf(stderr, 'Error: multiple output channels not implemented!\n') + return + + # Copy first queue blob to top of outbuffer + outbuffer_cpy = td.outQueue.dequeue() + memcpy(td.outbuffer, outbuffer_cpy, sizeof(double)*td.samplesPerBlock) + free(outbuffer_cpy) + + outscanflags = AOUTSCAN_FF_DEFAULT + scanoptions = SO_CONTINUOUS + + buffer_mid_idx = td.samplesPerBlock + + top_enqueued = True + bottom_enqueued = False + + fprintf(stderr, 'Starting output thread\n') + + err = ulAOutScan(td.handle, + 0, + 0, + BIP10VOLTS, + 2*td.samplesPerBlock, # Watch the 2 here! + &samplerate, + scanoptions, + outscanflags, + td.outbuffer) + fprintf(stderr, 'Actual output sampling rate: %0.2f\n', samplerate) + + if err != ERR_NO_ERROR: + fprintf(stderr, 'Error starting data out\n') + showErr(err) + return + + + err = ulAOutScanStatus(td.handle, &scanstat, &xstat) + if err != ERR_NO_ERROR: + showErr(err) + return + + while td.stopThread.load() == False: + err = ulAOutScanStatus(td.handle, &scanstat, &xstat) + if err != ERR_NO_ERROR: + showErr(err) + break + + if xstat.currentIndex < buffer_mid_idx: + top_enqueued = False + if not bottom_enqueued: + # Copy the bottom of the buffer to the queue, while transposing + # it. + if not td.outQueue.empty(): + outbuffer_cpy = td.outQueue.dequeue() + else: + outbuffer_cpy = malloc(sizeof(double)*td.noutChanDescriptors*td.samplesPerBlock) + for chan in range(td.noutChanDescriptors): + for sample in range(td.samplesPerBlock): + outbuffer_cpy[chan*td.samplesPerBlock+sample] = 0.0 + fprintf(stderr, 'Output buffer queue empty! Filling with zeros\n') + + for chan in range(td.noutChanDescriptors): + for sample in range(td.samplesPerBlock): + # td.outbuffer[buffer_mid_idx+sample*td.noutChanDescriptors+chan] = outbuffer_cpy[chan*td.samplesPerBlock+sample] + # Simpler, valid for single chan + td.outbuffer[buffer_mid_idx+sample] = outbuffer_cpy[sample] + free(outbuffer_cpy) + + bottom_enqueued = True + else: + CPPsleep_us(sleep_time_mus) + else: + bottom_enqueued = False + if not top_enqueued: + if not td.outQueue.empty(): + outbuffer_cpy = td.outQueue.dequeue() + else: + outbuffer_cpy = malloc(sizeof(double)*td.noutChanDescriptors*td.samplesPerBlock) + for chan in range(td.noutChanDescriptors): + for sample in range(td.samplesPerBlock): + outbuffer_cpy[chan*td.samplesPerBlock+sample] = 0.0 + fprintf(stderr, 'Output buffer queue empty! Filling with zeros\n') + + for chan in range(td.noutChanDescriptors): + for sample in range(td.samplesPerBlock): + # td.outbuffer[sample*td.noutChanDescriptors+chan] = outbuffer_cpy[chan*td.samplesPerBlock+sample] + # Simpler, valid for single chan + td.outbuffer[sample] = outbuffer_cpy[sample] + + free(outbuffer_cpy) + + # Enqueue the top part of the queue + top_enqueued = True + else: + CPPsleep_us(sleep_time_mus) + + fprintf(stderr, 'Exit while loop output thread\n') + + err = ulAOutScanStop(td.handle) + if err != ERR_NO_ERROR: + fprintf(stderr, "Error stopping AOut output thread\n") + showErr(err) cdef class UlDT9837A: cdef: DaqDeviceHandle handle bint handle_connected - bint stream_running - unsigned int ninputchannels - unsigned int noutputchannels + + unsigned int ninchannels + unsigned int noutchannels object input_range object enabled_inputs + bint output_enabled bint monitor_gen - DaqThreadData *threaddata + + DaqThreadData *td def __cinit__(self, unsigned int deviceno): self.handle = 0 self.handle_connected = False - self.ninputchannels = 4 - self.noutputchannels = 1 + + self.ninchannels = 4 + self.noutchannels = 1 self.input_range = 4*[False] self.enabled_inputs = 4*[False] + self.output_enabled = False - self.threaddata = NULL self.monitor_gen = False + self.td = NULL + cdef: DaqDeviceDescriptor devdescriptors[MAX_DEF_COUNT] DaqDeviceDescriptor descriptor @@ -140,82 +328,176 @@ cdef class UlDT9837A: if err != ERR_NO_ERROR: raise RuntimeError(f'Unable to connect to device: {err}') - - cdef void startScan(self, + cdef void startScan(self, unsigned int samplesPerBlock, - SafeQueue[void*] *inputQueue, - SafeQueue[void*] *outputQueue): + double samplerate, + SafeQueue[void*] *inQueue, + SafeQueue[void*] *outQueue): cdef: - int i, j + int i, j, nnormalinchannels, neninchannels + # Sanity checks - if inputQueue and (not any(self.enabled_inputs) and (not self.monitor_gen)): + if inQueue and (not any(self.enabled_inputs) and (not self.monitor_gen)): raise ValueError('Input queue given, but no input channels enabled and monitor output disabled') - if outputQueue and not self.output_enabled: + if outQueue and not self.output_enabled: raise ValueError('Output queue given, but output channel is not enabled') # End sanity checks cdef: - DaqThreadData *threaddata - threaddata = malloc(sizeof(threaddata)) - if not threaddata: + DaqThreadData *td + + td = malloc(sizeof(DaqThreadData)) + + if not td: raise MemoryError() - threaddata.samplesPerBlock = samplesPerBlock - threaddata.handle = self.handle - threaddata.inputQueue = inputQueue - threaddata.outputQueue = outputQueue - threaddata.inChanDescriptors = NULL - threaddata.outChanDescriptors = NULL - threaddata.ninputChanDescriptors = 0 - threaddata.noutputChanDescriptors = 0 + with nogil: + td.samplesPerBlock = samplesPerBlock + td.samplerate = samplerate + + td.handle = self.handle + td.inQueue = inQueue + td.outQueue = outQueue + + td.inChanDescriptors = NULL + td.ninChanDescriptors = 0 + td.noutChanDescriptors = 0 + + td.inThread = NULL + td.outThread = NULL + + td.stopThread.store(False) + + td.inbuffer = NULL + td.outbuffer = NULL + + # Configure INPUTS + py_nnormalinchannels = sum([1 if en_input else 0 for en_input in + self.enabled_inputs]) + nnormalinchannels = py_nnormalinchannels - neninputchannels = sum([1 if self.enabled_inputs == True else 0]) - - threaddata.inputQueue = inputQueue - threaddata.outputQueue = inputQueue - threaddata.thread = NULL - j = 0 + neninchannels = nnormalinchannels if self.monitor_gen: - neninputchannels +=1 - j+=1 + neninchannels += 1 - threaddata.inChanDescriptors = malloc(neninputchannels*sizeof(DaqInChanDescriptor)) - if not threaddata.inChanDescriptors: - self.cleanupThreadData(threaddata) - raise MemoryError() + fprintf(stderr, 'neninchannels: %u\n', neninchannels) - if self.monitor_gen: - threaddata.inChanDescriptors[0].chantype = DAQI_DAC - threaddata.inChanDescriptors[0].channel = 7 - threaddata.inChanDescriptors[0].range = BIP10VOLTS - - for i in range(neninputchannels): - if self.enabled_inputs[i]: - threaddata.inChanDescriptors[j].chantype = DAQI_ANALOG_DIFF - threaddata.inChanDescriptors[j].channel = i - threaddata.inChanDescriptors[j].range = BIP10VOLTS if self.input_range[i] else BIP1VOLTS - j+=1 - - threaddata.ninputChanDescriptors = neninputchannels - - if self.output_enabled: - threaddata.outChanDescriptors = malloc(sizeof(DaqInChanDescriptor)) - if not threaddata.outChanDescriptors: - self.cleanupThreadData(threaddata) + if neninchannels > 0: + td.inChanDescriptors = malloc(neninchannels* + sizeof(DaqInChanDescriptor)) + if not td.inChanDescriptors: + self.cleanupThreadData(td) + raise MemoryError() + td.ninChanDescriptors = neninchannels + td.inbuffer = malloc(2*sizeof(double)*td.samplesPerBlock* + neninchannels) + if not td.inbuffer: + self.cleanupThreadData(td) raise MemoryError() - self.threaddata.outChanDescriptors[0].channel = 0 - self.threaddata.outChanDescriptors[0].type = DAQO_ANALOG - self.threaddata.outChanDescriptors[0].range = BIP10VOLTS - self.threaddata.thread = new CPPThread[void*, void (*)(void*)]( - ulThreadFunction, - self.threaddata) + j = 0 + for i in range(4): + if self.enabled_inputs[i]: + td.inChanDescriptors[j].type = DAQI_ANALOG_SE + td.inChanDescriptors[j].channel = i + td.inChanDescriptors[j].range = BIP10VOLTS if self.input_range[i] else BIP1VOLTS + j+=1 + td.inbuffer = malloc(2*sizeof(double)*td.samplesPerBlock*neninchannels) + if not td.inbuffer: + self.cleanupThreadData(td) + raise MemoryError() + + # Create input channel descriptors + if self.monitor_gen: + td.inChanDescriptors[neninchannels-1].type = DAQI_DAC + td.inChanDescriptors[neninchannels-1].channel = 0 + td.inChanDescriptors[neninchannels-1].range = BIP10VOLTS + + fprintf(stderr, 'SFSG12\n') + + # CONFIGURE OUTPUTS + if self.output_enabled: + fprintf(stderr, 'SFSG13\n') + td.noutChanDescriptors = 1 + # WATCH THE TWO HERE!!! + td.outbuffer = malloc(2*sizeof(double)*td.samplesPerBlock) + if not td.outbuffer: + self.cleanupThreadData(td) + raise MemoryError() + + # Create DAQ Threads + with nogil: + fprintf(stderr, 'SFSG14\n') + if self.output_enabled: + td.outThread = new CPPThread[void*, void (*)(void*)]( + outThreadFunction, td) + + if not td.outThread: + self.cleanupThreadData(td) + raise RuntimeError('Error creating thread') + + if neninchannels > 0: + fprintf(stderr, 'SFSG15\n') + td.inThread = new CPPThread[void*, void (*)(void*)]( + inThreadFunction, td) + + if not td.inThread: + self.cleanupThreadData(td) + raise RuntimeError('Error creating thread') + self.td = td + + def start(self): + cdef: + SafeQueue[void*] *inqueue + SafeQueue[void*] *outqueue + int i, sample, samples, samplesperbuf + double meas_seconds, samplerate + double* inbuf + double* outbuf + + inqueue = new SafeQueue[void*]() + outqueue = new SafeQueue[void*]() + + samplesperbuf = 512 + samplerate = 10000 + meas_seconds = 4 + + for i in range(int(meas_seconds*samplerate/samplesperbuf)): + outbuf = malloc(sizeof(double)*samplesperbuf) + for sample in range(samplesperbuf): + outbuf[sample] = 1 + outqueue.enqueue( outbuf) + + self.startScan( + samplesperbuf, + samplerate, # Sample rate + inqueue, + outqueue) + + CPPsleep_ms(int(0.5*1000*meas_seconds)) + self.stop() + while not inqueue.empty(): + inbuf = inqueue.dequeue() + for sample in range(samplesperbuf): + print(f'Value monitor: {inbuf[sample]:1.2f}, value input: {inbuf[samplesperbuf+sample]:1.2f}') + pass + free(inbuf) + + while not outqueue.empty(): + outbuf = outqueue.dequeue() + free(outbuf) + del inqueue + del outqueue + + def stop(self): + self.cleanupThreadData(self.td) + self.td = NULL def setInputChannelConfig(self, unsigned chnum, channelconfig: DAQChannel): - if self.threaddata: + if self.td: raise RuntimeError('Cannot change settings while sampling') cdef: int i @@ -223,11 +505,11 @@ cdef class UlDT9837A: IepeMode iepe CouplingMode cm - if chnum >= self.ninputchannels: + if chnum >= self.ninchannels: raise RuntimeError('Invalid input channel number') self.input_range[chnum] = True if channelconfig.range_ == pyRange.tenV else False - self.enabled_input[chnum] = channelconfig.channel_enabled + self.enabled_inputs[chnum] = channelconfig.channel_enabled iepe = IEPE_ENABLED if channelconfig.IEPE_enabled else IEPE_DISABLED cm = CM_AC if channelconfig.IEPE_enabled else CM_DC @@ -248,39 +530,56 @@ cdef class UlDT9837A: def setOutputChannelConfig(self, unsigned chnum, channelconfig: DAQChannel, bint monitor_gen): - if self.threaddata: + if self.td: raise RuntimeError('Cannot change settings while sampling') - if chnum >= self.noutputchannels: + if chnum > 0: raise RuntimeError('Invalid output channel number') if monitor_gen and not channelconfig.channel_enabled: raise RuntimeError('Output channel should be enabled to enable channel monitoring') + self.output_enabled = channelconfig.channel_enabled self.monitor_gen = monitor_gen def __dealloc__(self): + fprintf(stderr, '__dealloc__\n') + self.cleanupThreadData(self.td) + self.td = NULL if self.handle_connected: ulDisconnectDaqDevice(self.handle) ulReleaseDaqDevice(self.handle) - cdef cleanupThreadData(self, DaqThreadData* td): + cdef void cleanupThreadData(self, DaqThreadData* td) nogil: + fprintf(stderr, 'cleanupThreadData()\n') + if td is NULL: return - if td.thread: - td.stopThread.store(True) - td.thread.join() - del td.thread - td.thread = NULL + + td.stopThread.store(True) + if td.inThread: + td.inThread.join() + del td.inThread + fprintf(stderr, 'SFSG0\n') + + if td.outThread: + td.outThread.join() + del td.outThread + fprintf(stderr, 'SFSG1\n') if td.inChanDescriptors: free(td.inChanDescriptors) - td.inChanDescriptors = NULL - if td.outChanDescriptors: - free(td.outChanDescriptors) - td.outChanDescriptors = NULL + fprintf(stderr, 'SFSG1\n') + if td.inbuffer: + free(td.inbuffer) + fprintf(stderr, 'SFSG2\n') + if td.outbuffer: + free(td.outbuffer) + fprintf(stderr, 'SFSG3\n') + free(td) + fprintf(stderr, 'end cleanupThreadData()\n') @@ -299,7 +598,7 @@ cdef class UlDaq: # # Open the device to probe the number of input and output ch. # handle = ulCreateDaqDevice(descriptor) # if not handle: - # raise RuntimeError('Unable to create device handle on device') + # raise RuntimeError('Unable to create device handle on device') cpdef int getDeviceCount(self): cdef: @@ -347,14 +646,14 @@ cdef class UlDaq: if descriptor.productName == b'DT9837A': # Create proper interface name if descriptor.devInterface == DaqDeviceInterface.USB_IFC: - name = 'USB - ' + name = 'USB - ' elif descriptor.devInterface == DaqDeviceInterface.BLUETOOTH_IFC: - name = 'Bluetooth - ' + name = 'Bluetooth - ' elif descriptor.devInterface == DaqDeviceInterface.ETHERNET_IFC: - name = 'Ethernet - ' + name = 'Ethernet - ' name += descriptor.productName.decode('utf-8') + ', id ' + \ - descriptor.uniqueId.decode('utf-8') + descriptor.uniqueId.decode('utf-8') return DeviceInfo( api = -1, @@ -376,67 +675,67 @@ cdef class UlDaq: # def openStream(self, # avstream # ): - # """ - # Opens a stream with specified parameters + # """ + # Opens a stream with specified parameters # Args: - # avstream: AvStream instance + # avstream: AvStream instance # Returns: None # """ # if self._stream is not NULL: - # raise RuntimeError('Stream is already opened.') + # raise RuntimeError('Stream is already opened.') # daqconfig = avstream.daqconfig # avtype = avstream.avtype # device = avstream.device # cdef: - # bint duplex_mode = daqconfig.duplex_mode - # bint monitorOutput = daqconfig.monitor_gen - # size_t sw # Sample width in bytes - # unsigned int nFramesPerBlock = unsigned int(daqconfig.nFramesPerBlock) - # int firstinputchannel, firstoutputchannel - # int lastinputchannel, lastoutputchannel - # unsigned int ninputchannels_forwarded=0 - # unsigned int ninputchannels_uldaq=0 - # unsigned int noutputchannels_uldaq=0 - # unsigned int noutputchannels_forwarded=0 - # unsigned int samplerate - # int i - # bint input_stream=False, output_stream=False - # bool* inputChannelsEnabled - # bool* outputChannelsEnabled + # bint duplex_mode = daqconfig.duplex_mode + # bint monitorOutput = daqconfig.monitor_gen + # size_t sw # Sample width in bytes + # unsigned int nFramesPerBlock = unsigned int(daqconfig.nFramesPerBlock) + # int firstinputchannel, firstoutputchannel + # int lastinputchannel, lastoutputchannel + # unsigned int ninputchannels_forwarded=0 + # unsigned int ninputchannels_uldaq=0 + # unsigned int noutputchannels_uldaq=0 + # unsigned int noutputchannels_forwarded=0 + # unsigned int samplerate + # int i + # bint input_stream=False, output_stream=False + # bool* inputChannelsEnabled + # bool* outputChannelsEnabled # if daqconfig.nFramesPerBlock > 8192 or daqconfig.nFramesPerBlock < 512: - # raise ValueError('Invalid number of nFramesPerBlock') + # raise ValueError('Invalid number of nFramesPerBlock') # if daqconfig.outputDelayBlocks < 0 or daqconfig.outputDelayBlocks > 10: - # raise ValueError('Invalid number of outputDelayBlocks') + # raise ValueError('Invalid number of outputDelayBlocks') # try: # # Determine sample rate and sample format, determine whether we are an # # input or an output stream, or both # if avtype == AvType.audio_input or duplex_mode: - # # Here, we override the sample format in case of duplex mode. - # sampleformat = daqconfig.en_input_sample_format - # samplerate = int(daqconfig.en_input_rate) - # input_stream = True - # if duplex_mode: - # output_stream = True - # else: - # sampleformat = daqconfig.en_output_sample_format - # samplerate = int(daqconfig.en_output_rate) - # output_stream = True + # # Here, we override the sample format in case of duplex mode. + # sampleformat = daqconfig.en_input_sample_format + # samplerate = int(daqconfig.en_input_rate) + # input_stream = True + # if duplex_mode: + # output_stream = True + # else: + # sampleformat = daqconfig.en_output_sample_format + # samplerate = int(daqconfig.en_output_rate) + # output_stream = True # sw = 64 # # All set, allocate the stream! # self._stream = <_Stream*> malloc(sizeof(_Stream)) # if self._stream == NULL: - # raise MemoryError('Could not allocate stream: memory error.') + # raise MemoryError('Could not allocate stream: memory error.') # self._stream.pyCallback = avstream._audioCallback # # Increase reference count to the callback @@ -459,22 +758,22 @@ cdef class UlDaq: # # Create channel maps for input channels, set input stream # # parameters # if input_stream: - # firstinputchannel = daqconfig.firstEnabledInputChannelNumber() - # lastinputchannel = daqconfig.lastEnabledInputChannelNumber() - # ninputchannels_uldaq = lastinputchannel-firstinputchannel+1 + # firstinputchannel = daqconfig.firstEnabledInputChannelNumber() + # lastinputchannel = daqconfig.lastEnabledInputChannelNumber() + # ninputchannels_uldaq = lastinputchannel-firstinputchannel+1 # if lastinputchannel < 0 or ninputchannels_uldaq < 1: - # raise ValueError('Not enough input channels selected') - # input_ch = daqconfig.input_channel_configs + # raise ValueError('Not enough input channels selected') + # input_ch = daqconfig.input_channel_configs # inputChannelsEnabled = malloc(sizeof(bool)*ninputchannels_uldaq) # self._stream.inputChannelsEnabled = inputChannelsEnabled # for i in range(firstinputchannel, lastinputchannel+1): - # ch_en = input_ch[i].channel_enabled - # if ch_en: - # ninputchannels_forwarded += 1 - # inputChannelsEnabled[i] = ch_en + # ch_en = input_ch[i].channel_enabled + # if ch_en: + # ninputchannels_forwarded += 1 + # inputChannelsEnabled[i] = ch_en # self._stream.inputQueue = new SafeQueue[void*]() @@ -483,21 +782,21 @@ cdef class UlDaq: # # Create channel maps for output channels # if output_stream: - # firstoutputchannel = daqconfig.firstEnabledOutputChannelNumber() - # lastoutputchannel = daqconfig.lastEnabledOutputChannelNumber() - # noutputchannels_uldaq = lastoutputchannel-firstoutputchannel+1 + # firstoutputchannel = daqconfig.firstEnabledOutputChannelNumber() + # lastoutputchannel = daqconfig.lastEnabledOutputChannelNumber() + # noutputchannels_uldaq = lastoutputchannel-firstoutputchannel+1 # if lastoutputchannel < 0 or noutputchannels_uldaq < 1: - # raise ValueError('Not enough output channels selected') - # output_ch = daqconfig.output_channel_configs + # raise ValueError('Not enough output channels selected') + # output_ch = daqconfig.output_channel_configs # outputChannelsEnabled = malloc(sizeof(bool)*noutputchannels_uldaq) # self._stream.outputChannelsEnabled = outputChannelsEnabled # for i in range(firstoutputchannel, lastoutputchannel+1): - # ch_en = output_ch[i].channel_enabled - # if ch_en: - # noutputchannels_forwarded += 1 - # outputChannelsEnabled[i] = ch_en + # ch_en = output_ch[i].channel_enabled + # if ch_en: + # noutputchannels_forwarded += 1 + # outputChannelsEnabled[i] = ch_en # rtOutputParams_ptr = &self._stream.outputParams # rtOutputParams_ptr.deviceId = device.index @@ -508,7 +807,7 @@ cdef class UlDaq: # self._stream.noutputchannels_forwarded = noutputchannels_forwarded # if monitorOutput and duplex_mode: - # self._stream.ninputchannels_forwarded += noutputchannels_forwarded + # self._stream.ninputchannels_forwarded += noutputchannels_forwarded # # self._uldaq.openStream(rtOutputParams_ptr, # # rtInputParams_ptr, @@ -519,106 +818,106 @@ cdef class UlDaq: # # self._stream, # # &streamoptions, # Stream options # # errorCallback # Error callback - # # ) + # # ) # # self._stream.nBytesPerChan = nFramesPerBlock*sw # # self._stream.nFramesPerBlock = nFramesPerBlock # except Exception as e: - # print('Exception occured in stream opening: ', e) - # self.cleanupStream(self._stream) - # self._stream = NULL - # raise e + # print('Exception occured in stream opening: ', e) + # self.cleanupStream(self._stream) + # self._stream = NULL + # raise e # with nogil: - # self._stream.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, - # self._stream) - # # Allow it to start - # CPPsleep(500) - # pass + # self._stream.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, + # self._stream) + # # Allow it to start + # CPPsleep(500) + # pass # return nFramesPerBlock # cdef cleanupStream(self, _Stream* stream): - # # printf('Entrance function cleanupStream...\n') - # cdef: - # void* ptr - # if stream == NULL: - # return + # # printf('Entrance function cleanupStream...\n') + # cdef: + # void* ptr + # if stream == NULL: + # return # with nogil: - # if stream.thread: - # stream.stopThread.store(True) - # if stream.inputQueue: - # # If waiting in the input queue, hereby we let it run. - # stream.inputQueue.enqueue(NULL) - # # printf('Joining thread...\n') - # # HERE WE SHOULD RELEASE THE GIL, as exiting the thread function - # # will require the GIL, which is locked by this thread! - # stream.thread.join() - # # printf('Thread joined!\n') - # del stream.thread - # stream.thread = NULL + # if stream.thread: + # stream.stopThread.store(True) + # if stream.inputQueue: + # # If waiting in the input queue, hereby we let it run. + # stream.inputQueue.enqueue(NULL) + # # printf('Joining thread...\n') + # # HERE WE SHOULD RELEASE THE GIL, as exiting the thread function + # # will require the GIL, which is locked by this thread! + # stream.thread.join() + # # printf('Thread joined!\n') + # del stream.thread + # stream.thread = NULL # if stream.inputChannelsEnabled: - # free(stream.inputChannelsEnabled) - # if stream.outputChannelsEnabled: - # free(stream.outputChannelsEnabled) + # free(stream.inputChannelsEnabled) + # if stream.outputChannelsEnabled: + # free(stream.outputChannelsEnabled) # if stream.outputQueue: - # while not stream.outputQueue.empty(): - # free(stream.outputQueue.dequeue()) - # del stream.outputQueue - # if stream.inputQueue: - # while not stream.inputQueue.empty(): - # free(stream.inputQueue.dequeue()) - # del stream.inputQueue - # if stream.outputDelayQueue: - # while not stream.outputDelayQueue.empty(): - # free(stream.outputDelayQueue.dequeue()) - # del stream.outputDelayQueue - # fprintf(stderr, "End cleanup stream queues...\n") + # while not stream.outputQueue.empty(): + # free(stream.outputQueue.dequeue()) + # del stream.outputQueue + # if stream.inputQueue: + # while not stream.inputQueue.empty(): + # free(stream.inputQueue.dequeue()) + # del stream.inputQueue + # if stream.outputDelayQueue: + # while not stream.outputDelayQueue.empty(): + # free(stream.outputDelayQueue.dequeue()) + # del stream.outputDelayQueue + # fprintf(stderr, "End cleanup stream queues...\n") # if stream.pyCallback: - # Py_DECREF( stream.pyCallback) - # stream.pyCallback = NULL - # # fprintf(stderr, "End cleanup callback...\n") - # free(stream) + # Py_DECREF( stream.pyCallback) + # stream.pyCallback = NULL + # # fprintf(stderr, "End cleanup callback...\n") + # free(stream) # def startStream(self): - # self._uldaq.startStream() + # self._uldaq.startStream() # def stopStream(self): - # if self._stream is NULL: - # raise RuntimeError('Stream is not opened') - # try: - # self._uldaq.stopStream() - # except: - # pass + # if self._stream is NULL: + # raise RuntimeError('Stream is not opened') + # try: + # self._uldaq.stopStream() + # except: + # pass # def closeStream(self): - # # print('closeStream') - # if self._stream is NULL: - # raise RuntimeError('Stream is not opened') - # # Closing stream - # self._uldaq.closeStream() - # self.cleanupStream(self._stream) - # self._stream = NULL + # # print('closeStream') + # if self._stream is NULL: + # raise RuntimeError('Stream is not opened') + # # Closing stream + # self._uldaq.closeStream() + # self.cleanupStream(self._stream) + # self._stream = NULL # def abortStream(self): - # if self._stream is NULL: - # raise RuntimeError('Stream is not opened') - # self._uldaq.abortStream() + # if self._stream is NULL: + # raise RuntimeError('Stream is not opened') + # self._uldaq.abortStream() # def isStreamOpen(self): - # return self._uldaq.isStreamOpen() + # return self._uldaq.isStreamOpen() # def isStreamRunning(self): - # return self._uldaq.isStreamRunning() + # return self._uldaq.isStreamRunning() # def getStreamTime(self): - # return self._uldaq.getStreamTime() - - # def setStreamTime(self, double time): - # return self._uldaq.setStreamTime(time) + # return self._uldaq.getStreamTime() + + # def setStreamTime(self, double time): + # return self._uldaq.setStreamTime(time)