From c8bfc461ce35cead5b2d89775454d35cee8bbd71 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Sun, 27 Sep 2020 19:38:49 +0200 Subject: [PATCH] Final bugfix, of memory leak in Python thread for uldaq, proper sleep time in Python thread. The whole train with UlDaq now seems to work properly --- lasp/device/lasp_cppuldaq.cpp | 104 +++++++++++++++++----------------- lasp/device/lasp_uldaq.pyx | 95 ++++++++++++++----------------- lasp/device/test_uldaq.cpp | 32 +++++++++-- 3 files changed, 120 insertions(+), 111 deletions(-) diff --git a/lasp/device/lasp_cppuldaq.cpp b/lasp/device/lasp_cppuldaq.cpp index d94ba8c..dd3fa07 100644 --- a/lasp/device/lasp_cppuldaq.cpp +++ b/lasp/device/lasp_cppuldaq.cpp @@ -185,30 +185,6 @@ void DT9837A::setInputRange(boolvec& high_range) { this->high_range = high_range; } -/* static inline void copysamples_transpose(const us samplesPerBlock,const us nchannels, */ -/* double* from, double* to, */ -/* const us from_startidx, */ -/* const us to_startidx, */ -/* const bool touldaqbuffer) { */ - -/* // Copy from "our type of buffer" to an uldaq buffer */ -/* if(touldaqbuffer) { */ -/* for(us sample=0;sample< samplesPerBlock;sample++) { */ -/* for(us channel=0;channelsamplerate(); + const double sleeptime = ((double) samplesPerBlock)/(4*samplerate); + const us sleeptime_us = (us) (sleeptime*1e6); + /* cerr << "Sleep time in loop: " << sleeptime_us << "us." << endl; */ + if(sleeptime_us < 10) { + cerr << "ERROR: Too small buffer size (samplesPerBlock) chosen!" << endl; + return; + } + const us buffer_mid_idx_in = neninchannels*samplesPerBlock; const us buffer_mid_idx_out = nenoutchannels*samplesPerBlock; @@ -248,37 +232,27 @@ void threadfcn(DT9837A* td) { DaqInChanDescriptor *indesc = NULL; - bool topinenqueued = false; + bool topinenqueued = true; // Start with true here, to not copy here the first time bool botinenqueued = true; - bool topoutenqueued = false; - bool botoutenqueued = false; - std::cerr << "SFSG" << endl; + bool topoutenqueued = true; + bool botoutenqueued = true; - // outitialize output, if any + size_t inTotalCount = 0; + size_t outTotalCount = 0; + + // initialize output, if any if(hasoutput) { assert(nenoutchannels == 1); assert(outqueue); - double *firstout; - if(outqueue->empty()) { - cerr << "******* WARNING: OUTPUTQUEUE UNDERFLOW, FILLING SIGNAL QUEUE WITH ZEROS ***********" << endl; - firstout = static_cast(malloc(sizeof(double)*samplesPerBlock*nenoutchannels)); - for(us sample=0;sample< samplesPerBlock;sample++) { - firstout[sample] = 0; - } - } - else { - firstout = outqueue->dequeue(); - } + // Initialize the buffer with zeros, before pushing any data. for(us sample=0;sample<2*samplesPerBlock;sample++) { - outbuffer[sample] = firstout[sample]; + outbuffer[sample] = 0; } - memcpy(td->outbuffer, firstout, samplesPerBlock*nenoutchannels); - /* free(firstout); */ - topoutenqueued = true; + cerr << "Starting output DAC" << endl; err = ulAOutScan(handle, 0, 0, @@ -320,6 +294,7 @@ void threadfcn(DT9837A* td) { } assert(j==neninchannels); + cerr << "Starting input ADC" << endl; err = ulDaqInScan(handle, indesc, neninchannels, @@ -335,8 +310,19 @@ void threadfcn(DT9837A* td) { } - std::cerr << "Entering while loop" << endl; - std::cerr << "hasinput: " << hasinput << endl; + // Runs scan status on output, to catch up with position + if(hasoutput) { + err = ulAOutScanStatus(handle, &outscanstat, &outxstat); + if(err != ERR_NO_ERROR) { + showErr(err); + goto exit; + } + outTotalCount = outxstat.currentScanCount; + assert(outscanstat == SS_RUNNING); + } + + /* std::cerr << "Entering while loop" << endl; */ + /* std::cerr << "hasinput: " << hasinput << endl; */ while(!td->stopThread && err == ERR_NO_ERROR) { /* std::cerr << "While..." << endl; */ if(hasoutput) { @@ -346,6 +332,12 @@ void threadfcn(DT9837A* td) { goto exit; } assert(outscanstat == SS_RUNNING); + + if(outxstat.currentScanCount > outTotalCount+2*samplesPerBlock) { + cerr << "***** WARNING: Missing output sample blocks, DAQ Scan count=" << outxstat.currentScanCount << " while loop count = " << outTotalCount <<", probably due to too small buffer size. *****" << endl; + } + outTotalCount = outxstat.currentScanCount; + /* std::cerr << "Samples scanned: " << outxstat.currentTotalCount << endl; */ if(outxstat.currentIndex < buffer_mid_idx_out) { topoutenqueued = false; @@ -373,7 +365,6 @@ void threadfcn(DT9837A* td) { else { botoutenqueued = false; if(!topoutenqueued) { - topoutenqueued = true; /* cerr << "Copying output buffer to top" << endl; */ double* bufcpy; if(!outqueue->empty()) { @@ -403,6 +394,11 @@ void threadfcn(DT9837A* td) { goto exit; } assert(inscanstat == SS_RUNNING); + if(inxstat.currentScanCount > inTotalCount+2*samplesPerBlock) { + cerr << "***** ERROR: Missing input sample blocks, count=" << inxstat.currentScanCount << ", probably due to too small buffer size. Exiting thread. *****" << endl; + break; + } + inTotalCount = inxstat.currentScanCount; if(inxstat.currentIndex < buffer_mid_idx_in) { topinenqueued = false; @@ -456,10 +452,10 @@ void threadfcn(DT9837A* td) { } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us)); } // End of while loop - std::cerr << "Exit of while loop" << endl; + /* std::cerr << "Exit of while loop" << endl; */ exit: @@ -479,7 +475,7 @@ exit: } if(indesc) delete indesc; - std::cerr << "Exit of thread fcn" << endl; + std::cerr << "Exit of DAQ thread fcn" << endl; } @@ -501,15 +497,19 @@ void DT9837A::start( SafeQueue *inqueue, SafeQueue *outqueue) if(hasinput) { assert(!inbuffer); - inbuffer = new double[neninchannels()*samplesPerBlock*2]; + inbuffer = new double[neninchannels()*samplesPerBlock*2]; // Watch the 2! } if(hasoutput) { assert(!outbuffer); - outbuffer = new double[nenoutchannels()*samplesPerBlock*2]; + outbuffer = new double[nenoutchannels()*samplesPerBlock*2]; // Watch the 2! } this->inqueue = inqueue; this->outqueue = outqueue; + /* std::cerr << "************************ WARNING: Forcing coupling mode to AC **************************" << endl; */ + /* boolvec couplingmode = {true, true, true, true}; */ + /* setACCouplingMode(couplingmode); */ + stopThread = false; thread = new std::thread(threadfcn, this); diff --git a/lasp/device/lasp_uldaq.pyx b/lasp/device/lasp_uldaq.pyx index 74532e5..250e801 100644 --- a/lasp/device/lasp_uldaq.pyx +++ b/lasp/device/lasp_uldaq.pyx @@ -26,13 +26,13 @@ cdef extern from "lasp_cppuldaq.h": vector[bool] outChannels, double samplerate, bint monitorOutput, - us deviceno) + us deviceno) except + void start(SafeQueue[double*] *inQueue, - SafeQueue[double*] *outQueue) - void stop() - void setACCouplingMode(vector[bool] accoupling) - void setInputRange(vector[bool] accoupling) - void setIEPEEnabled(vector[bool] iepe) + SafeQueue[double*] *outQueue) except + + void stop() except + + void setACCouplingMode(vector[bool] accoupling) except + + void setInputRange(vector[bool] accoupling) except + + void setIEPEEnabled(vector[bool] iepe) except + us neninchannels() us nenoutchannels() @@ -51,6 +51,8 @@ ctypedef struct PyStreamData: unsigned ninchannels unsigned noutchannels + double samplerate + # If these queue pointers are NULL, it means the stream does not have an # input, or output. SafeQueue[double*] *inQueue @@ -73,6 +75,9 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: unsigned sw = sizeof(double) sd = voidsd + cdef: + double sleeptime = ( nFramesPerBlock)/(4*sd.samplerate); + us sleeptime_us = (sleeptime*1e6); ninchannels = sd.ninchannels noutchannels = sd.noutchannels @@ -82,34 +87,43 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: with gil: npy_format = cnp.NPY_FLOAT64 callback = sd.pyCallback - print(f'Number of input channels: {ninchannels}') - print(f'Number of out channels: {noutchannels}') + # print(f'Number of input channels: {ninchannels}') + # print(f'Number of out channels: {noutchannels}') + fprintf(stderr, 'Sleep time: %d us', sleeptime_us) while not sd.stopThread.load(): + with gil: + if sd.outQueue and sd.outQueue.size() < 10: + outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) - if sd.outQueue: - outbuffer = malloc(sizeof(double)*nBytesPerChan*noutchannels) + npy_output = data_to_ndarray( + outbuffer, + nFramesPerBlock, + noutchannels, + npy_format, + False, # Do not transfer ownership + True) # F-contiguous + try: - if sd.inQueue: - if not sd.outQueue: + rval = callback(None, + npy_output, + nFramesPerBlock, + ) + + except Exception as e: + print('exception in Cython callback for audio output: ', str(e)) + return + + sd.outQueue.enqueue( outbuffer) + + + if sd.inQueue and not sd.inQueue.empty(): # Waiting indefinitely on the queue... inbuffer = sd.inQueue.dequeue() if inbuffer == NULL: printf('Stopping thread...\n') return - else: - if not sd.inQueue.empty(): - inbuffer = sd.inQueue.dequeue() - if inbuffer == NULL: - printf('Stopping thread...\n') - return - else: - inbuffer = NULL - with gil: - - npy_output = None - if sd.inQueue and inbuffer: try: npy_input = data_to_ndarray( inbuffer, @@ -120,7 +134,7 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: True) # F-contiguous is True: data is Fortran-cont. rval = callback(npy_input, - npy_output, + None, nFramesPerBlock, ) @@ -128,34 +142,7 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil: print('exception in cython callback for audio input: ', str(e)) return - npy_input = None - if sd.outQueue: - # fprintf(stderr, 'Copying output buffer to Numpy...\n') - rval = callback(npy_input, - npy_output, - nFramesPerBlock, - ) - try: - npy_output = data_to_ndarray( - outbuffer, - nFramesPerBlock, - noutchannels, - npy_format, - False, # Do not transfer ownership - True) # F-contiguous - - except Exception as e: - print('exception in Cython callback for audio output: ', str(e)) - return - - - if sd.outQueue: - sd.outQueue.enqueue( outbuffer) - if not sd.inQueue: - while sd.outQueue.size() > 10 and not sd.stopThread.load(): - # printf('Sleeping...\n') - # No input queue to wait on, so we relax a bit here. - CPPsleep_us(10); + CPPsleep_us(sleeptime_us); # Outputbuffer is free'ed by the audiothread, so should not be touched # here. @@ -264,6 +251,7 @@ cdef class UlDaq: self.sd.outQueue = NULL self.sd.thread = NULL + self.sd.samplerate = samplerate self.sd.ninchannels = 0 self.sd.noutchannels = 0 @@ -309,7 +297,6 @@ cdef class UlDaq: # Increase reference count to the callback Py_INCREF( avstream._audioCallback) - with nogil: self.sd.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, self.sd) diff --git a/lasp/device/test_uldaq.cpp b/lasp/device/test_uldaq.cpp index 51dd1c9..f180325 100644 --- a/lasp/device/test_uldaq.cpp +++ b/lasp/device/test_uldaq.cpp @@ -1,33 +1,51 @@ #include "lasp_cppuldaq.h" #include #include +#include using std::cout; using std::endl; int main() { - boolvec inChannels = {true, false, false, false}; - /* boolvec inChannels = {false, false, false, false}; */ + /* boolvec inChannels = {true, false, false, false}; */ + boolvec inChannels = {true, true, false, false}; boolvec outChannels = {true}; double samplerate = 10000; - const us samplesPerBlock = 2048; + const us samplesPerBlock = 256; DT9837A daq( samplesPerBlock, inChannels, outChannels, samplerate, - false // monitor Output + true // monitor Output ); SafeQueue inqueue; SafeQueue outqueue; + double totalTime = 5; + double t = 0; + double freq = 1000; + us nblocks = ((us) totalTime*samplerate/samplesPerBlock) + 10; + + for(us i=0;i(malloc(sizeof(double)*samplesPerBlock)); + for(us sample=0;sample> a; */ @@ -43,6 +61,10 @@ int main() { } free(buf); } + while(!outqueue.empty()){ + double* dat = outqueue.dequeue(); + free(dat); + } return 0; }