Parallelized the SOS filter implementation. Updated worker implementation to deal with NULL pointers for initialization and destructor function pointers

This commit is contained in:
Anne de Jong 2020-08-29 20:11:09 +02:00
parent 28358f5385
commit 2192f5a7fc
5 changed files with 173 additions and 68 deletions

View File

@ -84,7 +84,7 @@ Slm *Slm_create(Sosfilterbank *prefilter, Sosfilterbank *bandpass, const d fs,
slm->splowpass = a_malloc(filterbank_size * sizeof(Sosfilterbank *)); slm->splowpass = a_malloc(filterbank_size * sizeof(Sosfilterbank *));
for (us ch = 0; ch < filterbank_size; ch++) { for (us ch = 0; ch < filterbank_size; ch++) {
/// Allocate a filterbank with one channel and one section. /// Allocate a filterbank with one channel and one section.
slm->splowpass[ch] = Sosfilterbank_create(1, 1); slm->splowpass[ch] = Sosfilterbank_create(0, 1, 1);
Sosfilterbank_setFilter(slm->splowpass[ch], 0, lowpass_sos); Sosfilterbank_setFilter(slm->splowpass[ch], 0, lowpass_sos);
} }
vd_free(&lowpass_sos); vd_free(&lowpass_sos);

View File

@ -1,5 +1,8 @@
#define TRACERPLUS (-5) #define TRACERPLUS (-5)
#include "lasp_sosfilterbank.h" #include "lasp_sosfilterbank.h"
#include "lasp_mq.h"
#include "lasp_worker.h"
#include <sys/sysinfo.h>
typedef struct Sosfilterbank { typedef struct Sosfilterbank {
@ -19,6 +22,11 @@ typedef struct Sosfilterbank {
/// Storage for the current state of the output, first axis correspond to /// Storage for the current state of the output, first axis correspond to
/// the filter number axis, the second axis contains state coefficients /// the filter number axis, the second axis contains state coefficients
dmat state; dmat state;
JobQueue* jq;
Workers* workers;
} Sosfilterbank; } Sosfilterbank;
us Sosfilterbank_getFilterbankSize(const Sosfilterbank* fb) { us Sosfilterbank_getFilterbankSize(const Sosfilterbank* fb) {
@ -29,15 +37,30 @@ us Sosfilterbank_getFilterbankSize(const Sosfilterbank* fb) {
} }
Sosfilterbank* Sosfilterbank_create(const us filterbank_size, int filter_single(void* worker_data,void* job);
static inline min(us a, us b){
return a<b?a:b;
}
static inline max(us a, us b){
return a>b?a:b;
}
Sosfilterbank* Sosfilterbank_create(
const us nthreads_,
const us filterbank_size,
const us nsections) { const us nsections) {
fsTRACE(15); fsTRACE(15);
dbgassert(nthreads_ <= LASP_MAX_NUM_THREADS, "Illegal number of threads");
dbgassert(filterbank_size <= MAX_SOS_FILTER_BANK_SIZE, dbgassert(filterbank_size <= MAX_SOS_FILTER_BANK_SIZE,
"Illegal filterbank size. Max size is " "Illegal filterbank size. Max size is "
annestr(MAX_SOS_FILTER_BANK_SIZE)); annestr(MAX_SOS_FILTER_BANK_SIZE));
Sosfilterbank* fb = (Sosfilterbank*) a_malloc(sizeof(Sosfilterbank)); Sosfilterbank* fb = (Sosfilterbank*) a_malloc(sizeof(Sosfilterbank));
fb->jq = NULL;
fb->workers = NULL;
fb->filterbank_size = filterbank_size; fb->filterbank_size = filterbank_size;
dbgassert(nsections < MAX_SOS_FILTER_BANK_NSECTIONS,"Illegal number of sections"); dbgassert(nsections < MAX_SOS_FILTER_BANK_NSECTIONS,"Illegal number of sections");
fb->nsections = nsections; fb->nsections = nsections;
@ -64,6 +87,36 @@ Sosfilterbank* Sosfilterbank_create(const us filterbank_size,
// Check if coefficients are properly initialized // Check if coefficients are properly initialized
// print_dmat(&(fb->sos)); // print_dmat(&(fb->sos));
vd_free(&imp_response); vd_free(&imp_response);
us nthreads;
us nprocs = (us) get_nprocs();
if(nthreads_ == 0) {
nthreads = min(max(nprocs/2,1), filterbank_size);
} else {
nthreads = nthreads_;
}
iVARTRACE(15, nthreads);
if(nthreads > 1) {
if(!(fb->jq = JobQueue_alloc(filterbank_size))) {
Sosfilterbank_free(fb);
feTRACE(15);
return NULL;
}
if(!(fb->workers = Workers_create(nthreads,
fb->jq,
NULL,
&filter_single,
NULL,
NULL))) {
Sosfilterbank_free(fb);
feTRACE(15);
return NULL;
}
}
feTRACE(15); feTRACE(15);
return fb; return fb;
} }
@ -80,7 +133,7 @@ void Sosfilterbank_setFilter(Sosfilterbank* fb,const us filter_no,
"Illegal filter coefficient length"); "Illegal filter coefficient length");
dmat *sos = &fb->sos; dmat *sos = &fb->sos;
dmat *state = &fb->state; /* dmat *state = &fb->state; */
us nsections = fb->nsections; us nsections = fb->nsections;
for(us index=0;index<nsections*6;index++){ for(us index=0;index<nsections*6;index++){
@ -98,11 +151,62 @@ void Sosfilterbank_free(Sosfilterbank* fb) {
dmat_free(&(fb->sos)); dmat_free(&(fb->sos));
dmat_free(&(fb->state)); dmat_free(&(fb->state));
if(fb->workers) Workers_free(fb->workers);
if(fb->jq) JobQueue_free(fb->jq);
a_free(fb); a_free(fb);
feTRACE(15); feTRACE(15);
} }
typedef struct {
us filter_no;
us nsections;
us nsamples;
dmat sos;
dmat state;
dmat ys;
} Job;
int filter_single(void* worker_data, void* job_) {
fsTRACE(15);
Job* job = (Job*) job_;
us nsections = job->nsections;
us nsamples = job->nsamples;
dmat sos = job->sos;
for(us section=0;section<nsections;section++) {
d w1 = *getdmatval(&(job->state),job->filter_no,section*2);
d w2 = *getdmatval(&(job->state),job->filter_no,section*2+1);
d b0 = *getdmatval(&sos,job->filter_no,section*6+0);
d b1 = *getdmatval(&sos,job->filter_no,section*6+1);
d b2 = *getdmatval(&sos,job->filter_no,section*6+2);
/* d a0 = *getdmatval(&sos,job->filter_no,section*6+3); */
d a1 = *getdmatval(&sos,job->filter_no,section*6+4);
d a2 = *getdmatval(&sos,job->filter_no,section*6+5);
d* y = getdmatval(&(job->ys), 0, job->filter_no);
for(us sample=0;sample<nsamples;sample++){
d w0 = *y - a1*w1 - a2*w2;
d yn = b0*w0 + b1*w1 + b2*w2;
w2 = w1;
w1 = w0;
*y++ = yn;
}
*getdmatval(&(job->state),job->filter_no,section*2) = w1;
*getdmatval(&(job->state),job->filter_no,section*2+1) = w2;
}
feTRACE(15);
return 0;
}
dmat Sosfilterbank_filter(Sosfilterbank* fb,const vd* xs) { dmat Sosfilterbank_filter(Sosfilterbank* fb,const vd* xs) {
fsTRACE(15); fsTRACE(15);
@ -121,39 +225,27 @@ dmat Sosfilterbank_filter(Sosfilterbank* fb,const vd* xs) {
d_copy(getdmatval(&ys,0,filter),getvdval(xs,0),nsamples,1,1); d_copy(getdmatval(&ys,0,filter),getvdval(xs,0),nsamples,1,1);
} }
Job jobs[MAX_SOS_FILTER_BANK_SIZE];
/// Implementation is based on Proakis & Manolakis - Digital Signal /// Implementation is based on Proakis & Manolakis - Digital Signal
/// Processing, Fourth Edition, p. 550 /// Processing, Fourth Edition, p. 550
for(us section=0;section<nsections;section++) { Job job_template = {0, nsections, nsamples, sos, state, ys};
/// Obtain state information for current section, and all filters
for(us filter=0;filter<filterbank_size;filter++) { for(us filter=0;filter<filterbank_size;filter++) {
d w1 = *getdmatval(&state,filter,section*2); /// Obtain state information for current section, and all filters
d w2 = *getdmatval(&state,filter,section*2+1); jobs[filter] = job_template;
jobs[filter].filter_no = filter;
d b0 = *getdmatval(&sos,filter,section*6+0);
d b1 = *getdmatval(&sos,filter,section*6+1);
d b2 = *getdmatval(&sos,filter,section*6+2);
d a0 = *getdmatval(&sos,filter,section*6+3);
d a1 = *getdmatval(&sos,filter,section*6+4);
d a2 = *getdmatval(&sos,filter,section*6+5);
d* y = getdmatval(&ys, 0, filter);
for(us sample=0;sample<nsamples;sample++){
d w0 = *y - a1*w1 - a2*w2;
d yn = b0*w0 + b1*w1 + b2*w2;
w2 = w1;
w1 = w0;
*y++ = yn;
}
*getdmatval(&state,filter,section*2) = w1;
*getdmatval(&state,filter,section*2+1) = w2;
if(fb->workers) {
assertvalidptr(fb->jq);
JobQueue_push(fb->jq, &(jobs[filter]));
} else {
/* No workers, we have to do it ourselves */
filter_single(NULL,(void*) &(jobs[filter]));
} }
} }
if(fb->workers) {
JobQueue_wait_alldone(fb->jq);
}
feTRACE(15); feTRACE(15);
return ys; return ys;
} }

View File

@ -19,8 +19,15 @@ typedef struct Sosfilterbank Sosfilterbank;
/** /**
* Initializes a Sosfilterbank. Sets all coefficients in such a way that the * Initializes a Sosfilterbank. Sets all coefficients in such a way that the
* filter effectively does nothing (unit impulse response). * filter effectively does nothing (unit impulse response).
* @param[in] nthreads: The number of threads used in computation should be 1
* <= nthreads <= LASP_MAX_NUM_THREADS. If set to 0, a sensible value is
* computed based on the number of filterbanks.
* @param[in] filterbank_size: The number of parallel filters in the bank
*
* @return Sosfilterbank handle
*/ */
Sosfilterbank* Sosfilterbank_create(const us filterbank_size, Sosfilterbank* Sosfilterbank_create(const us nthreads,
const us filterbank_size,
const us nsections); const us nsections);
/** /**
@ -41,7 +48,7 @@ us Sosfilterbank_getFilterbankSize(const Sosfilterbank* fb);
* a1, a2), where b are the numerator coefficients and a are the denominator * a1, a2), where b are the numerator coefficients and a are the denominator
* coefficients. * coefficients.
* *
*/ */
void Sosfilterbank_setFilter(Sosfilterbank* fb,const us filter_no, void Sosfilterbank_setFilter(Sosfilterbank* fb,const us filter_no,
const vd coefs); const vd coefs);

View File

@ -42,9 +42,9 @@ Workers* Workers_create(const us num_workers,
return NULL; return NULL;
} }
dbgassert(init_fn,NULLPTRDEREF "init_fn"); /* dbgassert(init_fn,NULLPTRDEREF "init_fn"); */
dbgassert(fn,NULLPTRDEREF "fn"); dbgassert(fn,NULLPTRDEREF "fn");
dbgassert(free_fn,NULLPTRDEREF "free_fn"); /* dbgassert(free_fn,NULLPTRDEREF "free_fn"); */
Workers* w = a_malloc(sizeof(Workers)); Workers* w = a_malloc(sizeof(Workers));
if(!w){ if(!w){
@ -99,7 +99,7 @@ void Workers_free(Workers* w) {
JobQueue_wait_alldone(w->jq); JobQueue_wait_alldone(w->jq);
/* Join the threads */ /* Join the threads */
pthread_t* thread = w->worker_threads; pthread_t* thread = w->worker_threads;
for(us i=0;i<w->num_workers;i++) { for(us i=0;i<w->num_workers;i++) {
void* retval; void* retval;
@ -126,8 +126,8 @@ void Workers_free(Workers* w) {
static void* threadfcn(void* thread_global_data) { static void* threadfcn(void* thread_global_data) {
TRACE(15,"Started worker thread function"); TRACE(15,"Started worker thread function");
dbgassert(thread_global_data,NULLPTRDEREF "thread_data in" /* dbgassert(thread_global_data,NULLPTRDEREF "thread_data in" */
" threadfcn"); /* " threadfcn"); */
Workers* w = (Workers*) thread_global_data; Workers* w = (Workers*) thread_global_data;
JobQueue* jq = w->jq; JobQueue* jq = w->jq;
@ -137,20 +137,22 @@ static void* threadfcn(void* thread_global_data) {
void* global_data = w->global_data; void* global_data = w->global_data;
dbgassert(jq,NULLPTRDEREF "jq in threadfcn"); dbgassert(jq,NULLPTRDEREF "jq in threadfcn");
dbgassert(walloc,NULLPTRDEREF "walloc in threadfcn"); /* dbgassert(walloc,NULLPTRDEREF "walloc in threadfcn"); */
dbgassert(wfree,NULLPTRDEREF "wfree in threadfcn"); /* dbgassert(wfree,NULLPTRDEREF "wfree in threadfcn"); */
int rv = pthread_mutex_lock(&w->global_data_mutex); int rv = pthread_mutex_lock(&w->global_data_mutex);
if(rv !=0) { if(rv !=0) {
WARN("Global data mutex lock failed"); WARN("Global data mutex lock failed");
pthread_exit((void*) 1); pthread_exit((void*) 1);
} }
void* w_data = NULL;
void* w_data = walloc(global_data); if(walloc) {
w_data = walloc(global_data);
if(!w_data) { if(!w_data) {
WARN(ALLOCFAILED); WARN(ALLOCFAILED);
pthread_exit((void*) 1); pthread_exit((void*) 1);
} }
}
rv = pthread_mutex_unlock(&w->global_data_mutex); rv = pthread_mutex_unlock(&w->global_data_mutex);
if(rv !=0) { if(rv !=0) {
@ -183,7 +185,9 @@ static void* threadfcn(void* thread_global_data) {
JobQueue_done(jq,job); JobQueue_done(jq,job);
/* Call the cleanup function */ /* Call the cleanup function */
if(wfree) {
wfree(w_data); wfree(w_data);
}
TRACE(15,"Exiting thread. Goodbye"); TRACE(15,"Exiting thread. Goodbye");
pthread_exit((void*) NULL); pthread_exit((void*) NULL);

View File

@ -78,6 +78,7 @@ __all__ = ['AvPowerSpectra', 'SosFilterBank', 'FilterBank', 'Siggen',
setTracerLevel(15) setTracerLevel(15)
cdef extern from "cblas.h": cdef extern from "cblas.h":
int openblas_get_num_threads() int openblas_get_num_threads()
void openblas_set_num_threads(int) void openblas_set_num_threads(int)
@ -421,7 +422,7 @@ cdef class FilterBank:
cdef extern from "lasp_sosfilterbank.h": cdef extern from "lasp_sosfilterbank.h":
ctypedef struct c_Sosfilterbank "Sosfilterbank" ctypedef struct c_Sosfilterbank "Sosfilterbank"
c_Sosfilterbank* Sosfilterbank_create(const us filterbank_size, const us nsections) nogil c_Sosfilterbank* Sosfilterbank_create(const us nthreads, const us filterbank_size, const us nsections) nogil
void Sosfilterbank_setFilter(c_Sosfilterbank* fb,const us filter_no, const vd filter_coefs) void Sosfilterbank_setFilter(c_Sosfilterbank* fb,const us filter_no, const vd filter_coefs)
dmat Sosfilterbank_filter(c_Sosfilterbank* fb,const vd* x) nogil dmat Sosfilterbank_filter(c_Sosfilterbank* fb,const vd* x) nogil
void Sosfilterbank_free(c_Sosfilterbank* fb) nogil void Sosfilterbank_free(c_Sosfilterbank* fb) nogil
@ -432,7 +433,7 @@ cdef class SosFilterBank:
c_Sosfilterbank* fb c_Sosfilterbank* fb
def __cinit__(self,const us filterbank_size, const us nsections): def __cinit__(self,const us filterbank_size, const us nsections):
self.fb = Sosfilterbank_create(filterbank_size,nsections) self.fb = Sosfilterbank_create(0, filterbank_size,nsections)
def setFilter(self,us filter_no, d[:, ::1] sos): def setFilter(self,us filter_no, d[:, ::1] sos):
""" """
@ -520,7 +521,7 @@ cdef class Slm:
if sos_prefilter is not None: if sos_prefilter is not None:
assert sos_prefilter.size % 6 == 0 assert sos_prefilter.size % 6 == 0
prefilter_nsections = sos_prefilter.size // 6 prefilter_nsections = sos_prefilter.size // 6
prefilter = Sosfilterbank_create(1,prefilter_nsections) prefilter = Sosfilterbank_create(0, 1,prefilter_nsections)
coefs = sos_prefilter coefs = sos_prefilter
coefs_vd = dmat_foreign_data(prefilter_nsections*6,1, coefs_vd = dmat_foreign_data(prefilter_nsections*6,1,
&coefs[0],False) &coefs[0],False)
@ -533,7 +534,8 @@ cdef class Slm:
assert sos_bandpass.shape[1] % 6 == 0 assert sos_bandpass.shape[1] % 6 == 0
bandpass_nsections = sos_bandpass.shape[1] // 6 bandpass_nsections = sos_bandpass.shape[1] // 6
bandpass_nchannels = sos_bandpass.shape[0] bandpass_nchannels = sos_bandpass.shape[0]
bandpass = Sosfilterbank_create(bandpass_nchannels, bandpass = Sosfilterbank_create(0,
bandpass_nchannels,
bandpass_nsections) bandpass_nsections)
if bandpass == NULL: if bandpass == NULL:
if prefilter: if prefilter:
@ -696,7 +698,7 @@ cdef class Siggen:
if colorfilter_coefs is not None: if colorfilter_coefs is not None:
assert colorfilter_coefs.size % 6 == 0 assert colorfilter_coefs.size % 6 == 0
colorfilter_nsections = colorfilter_coefs.size // 6 colorfilter_nsections = colorfilter_coefs.size // 6
colorfilter = Sosfilterbank_create(1,colorfilter_nsections) colorfilter = Sosfilterbank_create(0, 1,colorfilter_nsections)
coefs = colorfilter_coefs coefs = colorfilter_coefs
coefs_vd = dmat_foreign_data(colorfilter_nsections*6,1, coefs_vd = dmat_foreign_data(colorfilter_nsections*6,1,
&colorfilter_coefs[0],False) &colorfilter_coefs[0],False)