Final bugfix, of memory leak in Python thread for uldaq, proper sleep time in Python thread. The whole train with UlDaq now seems to work properly

This commit is contained in:
Anne de Jong 2020-09-27 19:38:49 +02:00
parent 21e1b9d811
commit c8bfc461ce
3 changed files with 120 additions and 111 deletions

View File

@ -185,30 +185,6 @@ void DT9837A::setInputRange(boolvec& high_range) {
this->high_range = high_range; this->high_range = high_range;
} }
/* static inline void copysamples_transpose(const us samplesPerBlock,const us nchannels, */
/* double* from, double* to, */
/* const us from_startidx, */
/* const us to_startidx, */
/* const bool touldaqbuffer) { */
/* // Copy from "our type of buffer" to an uldaq buffer */
/* if(touldaqbuffer) { */
/* for(us sample=0;sample< samplesPerBlock;sample++) { */
/* for(us channel=0;channel<nchannels;channel++) { */
/* to[to_startidx+sample*nchannels + channel] = from[from_startidx + channel*samplesPerBlock+ sample]; */
/* } */
/* } */
/* } */
/* // Copy from an uldaq buffer to our type of buffer */
/* else { */
/* for(us sample=0;sample< samplesPerBlock;sample++) { */
/* for(us channel=0;channel<nchannels;channel++) { */
/* to[to_startidx + channel*samplesPerBlock+ sample] = from[from_startidx+sample*nchannels + channel]; */
/* } */
/* } */
/* } */
/* } */
void threadfcn(DT9837A* td) { void threadfcn(DT9837A* td) {
@ -235,6 +211,14 @@ void threadfcn(DT9837A* td) {
double samplerate = td->samplerate(); double samplerate = td->samplerate();
const double sleeptime = ((double) samplesPerBlock)/(4*samplerate);
const us sleeptime_us = (us) (sleeptime*1e6);
/* cerr << "Sleep time in loop: " << sleeptime_us << "us." << endl; */
if(sleeptime_us < 10) {
cerr << "ERROR: Too small buffer size (samplesPerBlock) chosen!" << endl;
return;
}
const us buffer_mid_idx_in = neninchannels*samplesPerBlock; const us buffer_mid_idx_in = neninchannels*samplesPerBlock;
const us buffer_mid_idx_out = nenoutchannels*samplesPerBlock; const us buffer_mid_idx_out = nenoutchannels*samplesPerBlock;
@ -248,37 +232,27 @@ void threadfcn(DT9837A* td) {
DaqInChanDescriptor *indesc = NULL; DaqInChanDescriptor *indesc = NULL;
bool topinenqueued = false; bool topinenqueued = true;
// Start with true here, to not copy here the first time // Start with true here, to not copy here the first time
bool botinenqueued = true; bool botinenqueued = true;
bool topoutenqueued = false; bool topoutenqueued = true;
bool botoutenqueued = false; bool botoutenqueued = true;
std::cerr << "SFSG" << endl;
// outitialize output, if any size_t inTotalCount = 0;
size_t outTotalCount = 0;
// initialize output, if any
if(hasoutput) { if(hasoutput) {
assert(nenoutchannels == 1); assert(nenoutchannels == 1);
assert(outqueue); assert(outqueue);
double *firstout; // Initialize the buffer with zeros, before pushing any data.
if(outqueue->empty()) {
cerr << "******* WARNING: OUTPUTQUEUE UNDERFLOW, FILLING SIGNAL QUEUE WITH ZEROS ***********" << endl;
firstout = static_cast<double*>(malloc(sizeof(double)*samplesPerBlock*nenoutchannels));
for(us sample=0;sample< samplesPerBlock;sample++) {
firstout[sample] = 0;
}
}
else {
firstout = outqueue->dequeue();
}
for(us sample=0;sample<2*samplesPerBlock;sample++) { for(us sample=0;sample<2*samplesPerBlock;sample++) {
outbuffer[sample] = firstout[sample]; outbuffer[sample] = 0;
} }
memcpy(td->outbuffer, firstout, samplesPerBlock*nenoutchannels);
/* free(firstout); */
topoutenqueued = true;
cerr << "Starting output DAC" << endl;
err = ulAOutScan(handle, err = ulAOutScan(handle,
0, 0,
0, 0,
@ -320,6 +294,7 @@ void threadfcn(DT9837A* td) {
} }
assert(j==neninchannels); assert(j==neninchannels);
cerr << "Starting input ADC" << endl;
err = ulDaqInScan(handle, err = ulDaqInScan(handle,
indesc, indesc,
neninchannels, neninchannels,
@ -335,8 +310,19 @@ void threadfcn(DT9837A* td) {
} }
std::cerr << "Entering while loop" << endl; // Runs scan status on output, to catch up with position
std::cerr << "hasinput: " << hasinput << endl; if(hasoutput) {
err = ulAOutScanStatus(handle, &outscanstat, &outxstat);
if(err != ERR_NO_ERROR) {
showErr(err);
goto exit;
}
outTotalCount = outxstat.currentScanCount;
assert(outscanstat == SS_RUNNING);
}
/* std::cerr << "Entering while loop" << endl; */
/* std::cerr << "hasinput: " << hasinput << endl; */
while(!td->stopThread && err == ERR_NO_ERROR) { while(!td->stopThread && err == ERR_NO_ERROR) {
/* std::cerr << "While..." << endl; */ /* std::cerr << "While..." << endl; */
if(hasoutput) { if(hasoutput) {
@ -346,6 +332,12 @@ void threadfcn(DT9837A* td) {
goto exit; goto exit;
} }
assert(outscanstat == SS_RUNNING); assert(outscanstat == SS_RUNNING);
if(outxstat.currentScanCount > outTotalCount+2*samplesPerBlock) {
cerr << "***** WARNING: Missing output sample blocks, DAQ Scan count=" << outxstat.currentScanCount << " while loop count = " << outTotalCount <<", probably due to too small buffer size. *****" << endl;
}
outTotalCount = outxstat.currentScanCount;
/* std::cerr << "Samples scanned: " << outxstat.currentTotalCount << endl; */ /* std::cerr << "Samples scanned: " << outxstat.currentTotalCount << endl; */
if(outxstat.currentIndex < buffer_mid_idx_out) { if(outxstat.currentIndex < buffer_mid_idx_out) {
topoutenqueued = false; topoutenqueued = false;
@ -373,7 +365,6 @@ void threadfcn(DT9837A* td) {
else { else {
botoutenqueued = false; botoutenqueued = false;
if(!topoutenqueued) { if(!topoutenqueued) {
topoutenqueued = true;
/* cerr << "Copying output buffer to top" << endl; */ /* cerr << "Copying output buffer to top" << endl; */
double* bufcpy; double* bufcpy;
if(!outqueue->empty()) { if(!outqueue->empty()) {
@ -403,6 +394,11 @@ void threadfcn(DT9837A* td) {
goto exit; goto exit;
} }
assert(inscanstat == SS_RUNNING); assert(inscanstat == SS_RUNNING);
if(inxstat.currentScanCount > inTotalCount+2*samplesPerBlock) {
cerr << "***** ERROR: Missing input sample blocks, count=" << inxstat.currentScanCount << ", probably due to too small buffer size. Exiting thread. *****" << endl;
break;
}
inTotalCount = inxstat.currentScanCount;
if(inxstat.currentIndex < buffer_mid_idx_in) { if(inxstat.currentIndex < buffer_mid_idx_in) {
topinenqueued = false; topinenqueued = false;
@ -456,10 +452,10 @@ void threadfcn(DT9837A* td) {
} }
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::microseconds(sleeptime_us));
} // End of while loop } // End of while loop
std::cerr << "Exit of while loop" << endl; /* std::cerr << "Exit of while loop" << endl; */
exit: exit:
@ -479,7 +475,7 @@ exit:
} }
if(indesc) delete indesc; if(indesc) delete indesc;
std::cerr << "Exit of thread fcn" << endl; std::cerr << "Exit of DAQ thread fcn" << endl;
} }
@ -501,15 +497,19 @@ void DT9837A::start( SafeQueue<double*> *inqueue, SafeQueue<double*> *outqueue)
if(hasinput) { if(hasinput) {
assert(!inbuffer); assert(!inbuffer);
inbuffer = new double[neninchannels()*samplesPerBlock*2]; inbuffer = new double[neninchannels()*samplesPerBlock*2]; // Watch the 2!
} }
if(hasoutput) { if(hasoutput) {
assert(!outbuffer); assert(!outbuffer);
outbuffer = new double[nenoutchannels()*samplesPerBlock*2]; outbuffer = new double[nenoutchannels()*samplesPerBlock*2]; // Watch the 2!
} }
this->inqueue = inqueue; this->inqueue = inqueue;
this->outqueue = outqueue; this->outqueue = outqueue;
/* std::cerr << "************************ WARNING: Forcing coupling mode to AC **************************" << endl; */
/* boolvec couplingmode = {true, true, true, true}; */
/* setACCouplingMode(couplingmode); */
stopThread = false; stopThread = false;
thread = new std::thread(threadfcn, this); thread = new std::thread(threadfcn, this);

View File

@ -26,13 +26,13 @@ cdef extern from "lasp_cppuldaq.h":
vector[bool] outChannels, vector[bool] outChannels,
double samplerate, double samplerate,
bint monitorOutput, bint monitorOutput,
us deviceno) us deviceno) except +
void start(SafeQueue[double*] *inQueue, void start(SafeQueue[double*] *inQueue,
SafeQueue[double*] *outQueue) SafeQueue[double*] *outQueue) except +
void stop() void stop() except +
void setACCouplingMode(vector[bool] accoupling) void setACCouplingMode(vector[bool] accoupling) except +
void setInputRange(vector[bool] accoupling) void setInputRange(vector[bool] accoupling) except +
void setIEPEEnabled(vector[bool] iepe) void setIEPEEnabled(vector[bool] iepe) except +
us neninchannels() us neninchannels()
us nenoutchannels() us nenoutchannels()
@ -51,6 +51,8 @@ ctypedef struct PyStreamData:
unsigned ninchannels unsigned ninchannels
unsigned noutchannels unsigned noutchannels
double samplerate
# If these queue pointers are NULL, it means the stream does not have an # If these queue pointers are NULL, it means the stream does not have an
# input, or output. # input, or output.
SafeQueue[double*] *inQueue SafeQueue[double*] *inQueue
@ -73,6 +75,9 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil:
unsigned sw = sizeof(double) unsigned sw = sizeof(double)
sd = <PyStreamData*> voidsd sd = <PyStreamData*> voidsd
cdef:
double sleeptime = (<double> nFramesPerBlock)/(4*sd.samplerate);
us sleeptime_us = <us> (sleeptime*1e6);
ninchannels = sd.ninchannels ninchannels = sd.ninchannels
noutchannels = sd.noutchannels noutchannels = sd.noutchannels
@ -82,34 +87,43 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil:
with gil: with gil:
npy_format = cnp.NPY_FLOAT64 npy_format = cnp.NPY_FLOAT64
callback = <object> sd.pyCallback callback = <object> sd.pyCallback
print(f'Number of input channels: {ninchannels}') # print(f'Number of input channels: {ninchannels}')
print(f'Number of out channels: {noutchannels}') # print(f'Number of out channels: {noutchannels}')
fprintf(stderr, 'Sleep time: %d us', sleeptime_us)
while not sd.stopThread.load(): while not sd.stopThread.load():
with gil:
if sd.outQueue and sd.outQueue.size() < 10:
outbuffer = <double*> malloc(sizeof(double)*nBytesPerChan*noutchannels)
if sd.outQueue: npy_output = <object> data_to_ndarray(
outbuffer = <double*> malloc(sizeof(double)*nBytesPerChan*noutchannels) outbuffer,
nFramesPerBlock,
noutchannels,
npy_format,
False, # Do not transfer ownership
True) # F-contiguous
try:
if sd.inQueue: rval = callback(None,
if not sd.outQueue: npy_output,
nFramesPerBlock,
)
except Exception as e:
print('exception in Cython callback for audio output: ', str(e))
return
sd.outQueue.enqueue(<double*> outbuffer)
if sd.inQueue and not sd.inQueue.empty():
# Waiting indefinitely on the queue... # Waiting indefinitely on the queue...
inbuffer = sd.inQueue.dequeue() inbuffer = sd.inQueue.dequeue()
if inbuffer == NULL: if inbuffer == NULL:
printf('Stopping thread...\n') printf('Stopping thread...\n')
return return
else:
if not sd.inQueue.empty():
inbuffer = sd.inQueue.dequeue()
if inbuffer == NULL:
printf('Stopping thread...\n')
return
else:
inbuffer = NULL
with gil:
npy_output = None
if sd.inQueue and inbuffer:
try: try:
npy_input = <object> data_to_ndarray( npy_input = <object> data_to_ndarray(
inbuffer, inbuffer,
@ -120,7 +134,7 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil:
True) # F-contiguous is True: data is Fortran-cont. True) # F-contiguous is True: data is Fortran-cont.
rval = callback(npy_input, rval = callback(npy_input,
npy_output, None,
nFramesPerBlock, nFramesPerBlock,
) )
@ -128,34 +142,7 @@ cdef void audioCallbackPythonThreadFunction(void* voidsd) nogil:
print('exception in cython callback for audio input: ', str(e)) print('exception in cython callback for audio input: ', str(e))
return return
npy_input = None CPPsleep_us(sleeptime_us);
if sd.outQueue:
# fprintf(stderr, 'Copying output buffer to Numpy...\n')
rval = callback(npy_input,
npy_output,
nFramesPerBlock,
)
try:
npy_output = <object> data_to_ndarray(
outbuffer,
nFramesPerBlock,
noutchannels,
npy_format,
False, # Do not transfer ownership
True) # F-contiguous
except Exception as e:
print('exception in Cython callback for audio output: ', str(e))
return
if sd.outQueue:
sd.outQueue.enqueue(<double*> outbuffer)
if not sd.inQueue:
while sd.outQueue.size() > 10 and not sd.stopThread.load():
# printf('Sleeping...\n')
# No input queue to wait on, so we relax a bit here.
CPPsleep_us(10);
# Outputbuffer is free'ed by the audiothread, so should not be touched # Outputbuffer is free'ed by the audiothread, so should not be touched
# here. # here.
@ -264,6 +251,7 @@ cdef class UlDaq:
self.sd.outQueue = NULL self.sd.outQueue = NULL
self.sd.thread = NULL self.sd.thread = NULL
self.sd.samplerate = <double> samplerate
self.sd.ninchannels = 0 self.sd.ninchannels = 0
self.sd.noutchannels = 0 self.sd.noutchannels = 0
@ -309,7 +297,6 @@ cdef class UlDaq:
# Increase reference count to the callback # Increase reference count to the callback
Py_INCREF(<object> avstream._audioCallback) Py_INCREF(<object> avstream._audioCallback)
with nogil: with nogil:
self.sd.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction, self.sd.thread = new CPPThread[void*, void (*)(void*)](audioCallbackPythonThreadFunction,
<void*> self.sd) <void*> self.sd)

View File

@ -1,33 +1,51 @@
#include "lasp_cppuldaq.h" #include "lasp_cppuldaq.h"
#include <chrono> #include <chrono>
#include <iostream> #include <iostream>
#include <cmath>
using std::cout; using std::cout;
using std::endl; using std::endl;
int main() { int main() {
boolvec inChannels = {true, false, false, false}; /* boolvec inChannels = {true, false, false, false}; */
/* boolvec inChannels = {false, false, false, false}; */ boolvec inChannels = {true, true, false, false};
boolvec outChannels = {true}; boolvec outChannels = {true};
double samplerate = 10000; double samplerate = 10000;
const us samplesPerBlock = 2048; const us samplesPerBlock = 256;
DT9837A daq( DT9837A daq(
samplesPerBlock, samplesPerBlock,
inChannels, inChannels,
outChannels, outChannels,
samplerate, samplerate,
false // monitor Output true // monitor Output
); );
SafeQueue<double*> inqueue; SafeQueue<double*> inqueue;
SafeQueue<double*> outqueue; SafeQueue<double*> outqueue;
double totalTime = 5;
double t = 0;
double freq = 1000;
us nblocks = ((us) totalTime*samplerate/samplesPerBlock) + 10;
for(us i=0;i<nblocks;i++) {
double* data = static_cast<double*>(malloc(sizeof(double)*samplesPerBlock));
for(us sample=0;sample<samplesPerBlock;sample++) {
data[sample] = sin(2*M_PI*freq*t);
t+= 1.0/samplerate;
}
outqueue.enqueue(data);
}
daq.start(&inqueue, &outqueue); daq.start(&inqueue, &outqueue);
/* daq.start(NULL, &outqueue); */ /* daq.start(NULL, &outqueue); */
std::this_thread::sleep_for(std::chrono::seconds(5)); std::this_thread::sleep_for(std::chrono::seconds((int) totalTime));
/* std::string a; */ /* std::string a; */
/* std::cin >> a; */ /* std::cin >> a; */
@ -43,6 +61,10 @@ int main() {
} }
free(buf); free(buf);
} }
while(!outqueue.empty()){
double* dat = outqueue.dequeue();
free(dat);
}
return 0; return 0;
} }