From 2192f5a7fcf948b3187392094b0e64f33c115f84 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Sat, 29 Aug 2020 20:11:09 +0200 Subject: [PATCH] Parallelized the SOS filter implementation. Updated worker implementation to deal with NULL pointers for initialization and destructor function pointers --- lasp/c/lasp_slm.c | 2 +- lasp/c/lasp_sosfilterbank.c | 150 +++++++++++++++++++++++++++++------- lasp/c/lasp_sosfilterbank.h | 17 ++-- lasp/c/lasp_worker.c | 60 ++++++++------- lasp/wrappers.pyx | 12 +-- 5 files changed, 173 insertions(+), 68 deletions(-) diff --git a/lasp/c/lasp_slm.c b/lasp/c/lasp_slm.c index 6dacae0..ed69052 100644 --- a/lasp/c/lasp_slm.c +++ b/lasp/c/lasp_slm.c @@ -84,7 +84,7 @@ Slm *Slm_create(Sosfilterbank *prefilter, Sosfilterbank *bandpass, const d fs, slm->splowpass = a_malloc(filterbank_size * sizeof(Sosfilterbank *)); for (us ch = 0; ch < filterbank_size; ch++) { /// Allocate a filterbank with one channel and one section. - slm->splowpass[ch] = Sosfilterbank_create(1, 1); + slm->splowpass[ch] = Sosfilterbank_create(0, 1, 1); Sosfilterbank_setFilter(slm->splowpass[ch], 0, lowpass_sos); } vd_free(&lowpass_sos); diff --git a/lasp/c/lasp_sosfilterbank.c b/lasp/c/lasp_sosfilterbank.c index c7e5acf..c2fdab9 100644 --- a/lasp/c/lasp_sosfilterbank.c +++ b/lasp/c/lasp_sosfilterbank.c @@ -1,5 +1,8 @@ #define TRACERPLUS (-5) #include "lasp_sosfilterbank.h" +#include "lasp_mq.h" +#include "lasp_worker.h" +#include typedef struct Sosfilterbank { @@ -19,6 +22,11 @@ typedef struct Sosfilterbank { /// Storage for the current state of the output, first axis correspond to /// the filter number axis, the second axis contains state coefficients dmat state; + + JobQueue* jq; + Workers* workers; + + } Sosfilterbank; us Sosfilterbank_getFilterbankSize(const Sosfilterbank* fb) { @@ -29,15 +37,30 @@ us Sosfilterbank_getFilterbankSize(const Sosfilterbank* fb) { } -Sosfilterbank* Sosfilterbank_create(const us filterbank_size, +int filter_single(void* worker_data,void* job); + +static inline min(us a, us b){ + return ab?a:b; +} + +Sosfilterbank* Sosfilterbank_create( + const us nthreads_, + const us filterbank_size, const us nsections) { fsTRACE(15); + dbgassert(nthreads_ <= LASP_MAX_NUM_THREADS, "Illegal number of threads"); + dbgassert(filterbank_size <= MAX_SOS_FILTER_BANK_SIZE, "Illegal filterbank size. Max size is " annestr(MAX_SOS_FILTER_BANK_SIZE)); Sosfilterbank* fb = (Sosfilterbank*) a_malloc(sizeof(Sosfilterbank)); + fb->jq = NULL; + fb->workers = NULL; fb->filterbank_size = filterbank_size; dbgassert(nsections < MAX_SOS_FILTER_BANK_NSECTIONS,"Illegal number of sections"); fb->nsections = nsections; @@ -64,6 +87,36 @@ Sosfilterbank* Sosfilterbank_create(const us filterbank_size, // Check if coefficients are properly initialized // print_dmat(&(fb->sos)); vd_free(&imp_response); + + us nthreads; + us nprocs = (us) get_nprocs(); + + if(nthreads_ == 0) { + nthreads = min(max(nprocs/2,1), filterbank_size); + } else { + nthreads = nthreads_; + } + iVARTRACE(15, nthreads); + + if(nthreads > 1) { + if(!(fb->jq = JobQueue_alloc(filterbank_size))) { + Sosfilterbank_free(fb); + feTRACE(15); + return NULL; + } + + if(!(fb->workers = Workers_create(nthreads, + fb->jq, + NULL, + &filter_single, + NULL, + NULL))) { + Sosfilterbank_free(fb); + feTRACE(15); + return NULL; + } + } + feTRACE(15); return fb; } @@ -80,7 +133,7 @@ void Sosfilterbank_setFilter(Sosfilterbank* fb,const us filter_no, "Illegal filter coefficient length"); dmat *sos = &fb->sos; - dmat *state = &fb->state; + /* dmat *state = &fb->state; */ us nsections = fb->nsections; for(us index=0;indexsos)); dmat_free(&(fb->state)); + if(fb->workers) Workers_free(fb->workers); + if(fb->jq) JobQueue_free(fb->jq); + a_free(fb); feTRACE(15); } +typedef struct { + us filter_no; + us nsections; + us nsamples; + dmat sos; + dmat state; + dmat ys; +} Job; + +int filter_single(void* worker_data, void* job_) { + fsTRACE(15); + Job* job = (Job*) job_; + + us nsections = job->nsections; + us nsamples = job->nsamples; + dmat sos = job->sos; + + + for(us section=0;sectionstate),job->filter_no,section*2); + d w2 = *getdmatval(&(job->state),job->filter_no,section*2+1); + + d b0 = *getdmatval(&sos,job->filter_no,section*6+0); + d b1 = *getdmatval(&sos,job->filter_no,section*6+1); + d b2 = *getdmatval(&sos,job->filter_no,section*6+2); + /* d a0 = *getdmatval(&sos,job->filter_no,section*6+3); */ + d a1 = *getdmatval(&sos,job->filter_no,section*6+4); + d a2 = *getdmatval(&sos,job->filter_no,section*6+5); + + d* y = getdmatval(&(job->ys), 0, job->filter_no); + + for(us sample=0;samplestate),job->filter_no,section*2) = w1; + *getdmatval(&(job->state),job->filter_no,section*2+1) = w2; + + } + + feTRACE(15); + return 0; +} + + dmat Sosfilterbank_filter(Sosfilterbank* fb,const vd* xs) { fsTRACE(15); @@ -121,39 +225,27 @@ dmat Sosfilterbank_filter(Sosfilterbank* fb,const vd* xs) { d_copy(getdmatval(&ys,0,filter),getvdval(xs,0),nsamples,1,1); } + Job jobs[MAX_SOS_FILTER_BANK_SIZE]; /// Implementation is based on Proakis & Manolakis - Digital Signal /// Processing, Fourth Edition, p. 550 - for(us section=0;sectionworkers) { + assertvalidptr(fb->jq); + JobQueue_push(fb->jq, &(jobs[filter])); + } else { + /* No workers, we have to do it ourselves */ + filter_single(NULL,(void*) &(jobs[filter])); } } - - - + if(fb->workers) { + JobQueue_wait_alldone(fb->jq); + } feTRACE(15); return ys; } diff --git a/lasp/c/lasp_sosfilterbank.h b/lasp/c/lasp_sosfilterbank.h index 73c2159..1b851a0 100644 --- a/lasp/c/lasp_sosfilterbank.h +++ b/lasp/c/lasp_sosfilterbank.h @@ -19,9 +19,16 @@ typedef struct Sosfilterbank Sosfilterbank; /** * Initializes a Sosfilterbank. Sets all coefficients in such a way that the * filter effectively does nothing (unit impulse response). + * @param[in] nthreads: The number of threads used in computation should be 1 + * <= nthreads <= LASP_MAX_NUM_THREADS. If set to 0, a sensible value is + * computed based on the number of filterbanks. + * @param[in] filterbank_size: The number of parallel filters in the bank + * + * @return Sosfilterbank handle */ -Sosfilterbank* Sosfilterbank_create(const us filterbank_size, - const us nsections); +Sosfilterbank* Sosfilterbank_create(const us nthreads, + const us filterbank_size, + const us nsections); /** * Returns the number of channels in the filterbank (the filberbank size). @@ -41,9 +48,9 @@ us Sosfilterbank_getFilterbankSize(const Sosfilterbank* fb); * a1, a2), where b are the numerator coefficients and a are the denominator * coefficients. * -*/ + */ void Sosfilterbank_setFilter(Sosfilterbank* fb,const us filter_no, - const vd coefs); + const vd coefs); /** * Filters x using h, returns y @@ -56,7 +63,7 @@ void Sosfilterbank_setFilter(Sosfilterbank* fb,const us filter_no, * input samples in x. */ dmat Sosfilterbank_filter(Sosfilterbank* fb, - const vd* x); + const vd* x); /** * Cleans up an existing filter bank. diff --git a/lasp/c/lasp_worker.c b/lasp/c/lasp_worker.c index e139432..6af9824 100644 --- a/lasp/c/lasp_worker.c +++ b/lasp/c/lasp_worker.c @@ -29,22 +29,22 @@ typedef struct Workers_s { static void* threadfcn(void* data); Workers* Workers_create(const us num_workers, - JobQueue* jq, - worker_alloc_function init_fn, - worker_function fn, - worker_free_function free_fn, - void* thread_global_data) { + JobQueue* jq, + worker_alloc_function init_fn, + worker_function fn, + worker_free_function free_fn, + void* thread_global_data) { TRACE(15,"Workers_create"); - + if(num_workers > LASP_MAX_NUM_THREADS) { WARN("Number of workers too high in Workers_create"); return NULL; } - dbgassert(init_fn,NULLPTRDEREF "init_fn"); + /* dbgassert(init_fn,NULLPTRDEREF "init_fn"); */ dbgassert(fn,NULLPTRDEREF "fn"); - dbgassert(free_fn,NULLPTRDEREF "free_fn"); + /* dbgassert(free_fn,NULLPTRDEREF "free_fn"); */ Workers* w = a_malloc(sizeof(Workers)); if(!w){ @@ -72,9 +72,9 @@ Workers* Workers_create(const us num_workers, for(us i = 0; i < num_workers; i++) { TRACE(15,"Creating thread"); int rv = pthread_create(thread, - NULL, /* Thread attributes */ - threadfcn, /* Function */ - w); /* Data */ + NULL, /* Thread attributes */ + threadfcn, /* Function */ + w); /* Data */ if(rv!=0) { WARN("Thread creation failed"); return NULL; @@ -83,23 +83,23 @@ Workers* Workers_create(const us num_workers, } return w; - + } - + void Workers_free(Workers* w) { TRACE(15,"Workers_free"); dbgassert(w,NULLPTRDEREF "w in Workers_free"); dbgassert(w->jq,NULLPTRDEREF "w->jq in Workers_free"); for(us i=0;inum_workers;i++) { - /* Push the special NULL job. This will make the worker - * threads stop their execution. */ + /* Push the special NULL job. This will make the worker + * threads stop their execution. */ JobQueue_push(w->jq,NULL); } JobQueue_wait_alldone(w->jq); -/* Join the threads */ + /* Join the threads */ pthread_t* thread = w->worker_threads; for(us i=0;inum_workers;i++) { void* retval; @@ -126,8 +126,8 @@ void Workers_free(Workers* w) { static void* threadfcn(void* thread_global_data) { TRACE(15,"Started worker thread function"); - dbgassert(thread_global_data,NULLPTRDEREF "thread_data in" - " threadfcn"); + /* dbgassert(thread_global_data,NULLPTRDEREF "thread_data in" */ + /* " threadfcn"); */ Workers* w = (Workers*) thread_global_data; JobQueue* jq = w->jq; @@ -137,21 +137,23 @@ static void* threadfcn(void* thread_global_data) { void* global_data = w->global_data; dbgassert(jq,NULLPTRDEREF "jq in threadfcn"); - dbgassert(walloc,NULLPTRDEREF "walloc in threadfcn"); - dbgassert(wfree,NULLPTRDEREF "wfree in threadfcn"); + /* dbgassert(walloc,NULLPTRDEREF "walloc in threadfcn"); */ + /* dbgassert(wfree,NULLPTRDEREF "wfree in threadfcn"); */ int rv = pthread_mutex_lock(&w->global_data_mutex); if(rv !=0) { WARN("Global data mutex lock failed"); pthread_exit((void*) 1); } - - void* w_data = walloc(global_data); - if(!w_data) { - WARN(ALLOCFAILED); - pthread_exit((void*) 1); + void* w_data = NULL; + if(walloc) { + w_data = walloc(global_data); + if(!w_data) { + WARN(ALLOCFAILED); + pthread_exit((void*) 1); + } } - + rv = pthread_mutex_unlock(&w->global_data_mutex); if(rv !=0) { WARN("Global data mutex unlock failed"); @@ -161,7 +163,7 @@ static void* threadfcn(void* thread_global_data) { void* job = NULL; TRACE(20,"Worker ready"); while (true) { - + TRACE(10,"--------------- START CYCLE -------------"); job = JobQueue_assign(jq); @@ -183,7 +185,9 @@ static void* threadfcn(void* thread_global_data) { JobQueue_done(jq,job); /* Call the cleanup function */ - wfree(w_data); + if(wfree) { + wfree(w_data); + } TRACE(15,"Exiting thread. Goodbye"); pthread_exit((void*) NULL); diff --git a/lasp/wrappers.pyx b/lasp/wrappers.pyx index 2263a1c..b6ee1ea 100644 --- a/lasp/wrappers.pyx +++ b/lasp/wrappers.pyx @@ -78,6 +78,7 @@ __all__ = ['AvPowerSpectra', 'SosFilterBank', 'FilterBank', 'Siggen', setTracerLevel(15) + cdef extern from "cblas.h": int openblas_get_num_threads() void openblas_set_num_threads(int) @@ -421,7 +422,7 @@ cdef class FilterBank: cdef extern from "lasp_sosfilterbank.h": ctypedef struct c_Sosfilterbank "Sosfilterbank" - c_Sosfilterbank* Sosfilterbank_create(const us filterbank_size, const us nsections) nogil + c_Sosfilterbank* Sosfilterbank_create(const us nthreads, const us filterbank_size, const us nsections) nogil void Sosfilterbank_setFilter(c_Sosfilterbank* fb,const us filter_no, const vd filter_coefs) dmat Sosfilterbank_filter(c_Sosfilterbank* fb,const vd* x) nogil void Sosfilterbank_free(c_Sosfilterbank* fb) nogil @@ -432,7 +433,7 @@ cdef class SosFilterBank: c_Sosfilterbank* fb def __cinit__(self,const us filterbank_size, const us nsections): - self.fb = Sosfilterbank_create(filterbank_size,nsections) + self.fb = Sosfilterbank_create(0, filterbank_size,nsections) def setFilter(self,us filter_no, d[:, ::1] sos): """ @@ -520,7 +521,7 @@ cdef class Slm: if sos_prefilter is not None: assert sos_prefilter.size % 6 == 0 prefilter_nsections = sos_prefilter.size // 6 - prefilter = Sosfilterbank_create(1,prefilter_nsections) + prefilter = Sosfilterbank_create(0, 1,prefilter_nsections) coefs = sos_prefilter coefs_vd = dmat_foreign_data(prefilter_nsections*6,1, &coefs[0],False) @@ -533,7 +534,8 @@ cdef class Slm: assert sos_bandpass.shape[1] % 6 == 0 bandpass_nsections = sos_bandpass.shape[1] // 6 bandpass_nchannels = sos_bandpass.shape[0] - bandpass = Sosfilterbank_create(bandpass_nchannels, + bandpass = Sosfilterbank_create(0, + bandpass_nchannels, bandpass_nsections) if bandpass == NULL: if prefilter: @@ -696,7 +698,7 @@ cdef class Siggen: if colorfilter_coefs is not None: assert colorfilter_coefs.size % 6 == 0 colorfilter_nsections = colorfilter_coefs.size // 6 - colorfilter = Sosfilterbank_create(1,colorfilter_nsections) + colorfilter = Sosfilterbank_create(0, 1,colorfilter_nsections) coefs = colorfilter_coefs coefs_vd = dmat_foreign_data(colorfilter_nsections*6,1, &colorfilter_coefs[0],False)