diff --git a/lasp/c/lasp_eq.h b/lasp/c/lasp_eq.h index 48eb231..1afefff 100644 --- a/lasp/c/lasp_eq.h +++ b/lasp/c/lasp_eq.h @@ -3,7 +3,8 @@ // Author: J.A. de Jong - ASCEE // // Description: Implementation of an equalizer using the Second Order Sections -// filter bank implementation. +// filter bank implementation. Applies all filterbanks in parallel and +// recombines to create a single output signal. #pragma once #ifndef LASP_EQ_H #define LASP_EQ_H @@ -51,8 +52,6 @@ void Eq_setLevels(Eq* eq, const vd* levels); * @param[in] eq Equalizer handle */ void Eq_free(Eq* eq); - - #endif // LASP_EQ_H diff --git a/lasp/c/lasp_mq.c b/lasp/c/lasp_mq.c index ce2e033..8e96130 100644 --- a/lasp/c/lasp_mq.c +++ b/lasp/c/lasp_mq.c @@ -24,313 +24,344 @@ /* #endif */ typedef struct { - void* job_ptr; - bool running; - bool ready; + void* job_ptr; + bool running; + bool ready; } Job; typedef struct JobQueue_s { - pthread_mutex_t mutex; - pthread_cond_t cv_plus; /**< Condition variable for the - * "workers". */ - pthread_cond_t cv_minus; /**< Condition variable for the - * main thread. */ - - Job* jobs; /**< Pointer to job vector */ - us max_jobs; /**< Stores the maximum number of - * items */ +#ifdef LASP_PARALLEL + pthread_mutex_t mutex; + pthread_cond_t cv_plus; /**< Condition variable for the + * "workers". */ + pthread_cond_t cv_minus; /**< Condition variable for the + * main thread. */ +#endif + Job* jobs; /**< Pointer to job vector */ + us max_jobs; /**< Stores the maximum number of + * items */ } JobQueue; static us count_jobs(JobQueue* jq) { - fsTRACE(15); - us njobs = 0; - for(us i=0;imax_jobs;i++){ - if(jq->jobs[i].ready) - njobs++; - } - return njobs; + fsTRACE(15); + us njobs = 0; + for(us i=0;imax_jobs;i++){ + if(jq->jobs[i].ready) + njobs++; + } + return njobs; } static Job* get_ready_job(JobQueue* jq) { - fsTRACE(15); - Job* j = jq->jobs; - for(us i=0;imax_jobs;i++){ - if(j->ready && !j->running) - return j; - j++; - } - return NULL; + fsTRACE(15); + Job* j = jq->jobs; + for(us i=0;imax_jobs;i++){ + if(j->ready && !j->running) + return j; + j++; + } + return NULL; } void print_job_queue(JobQueue* jq) { - fsTRACE(15); - for(us i=0;imax_jobs;i++) { - printf("Job %zu", i); - if(jq->jobs[i].ready) - printf(" available"); - if(jq->jobs[i].running) - printf(" running"); + fsTRACE(15); + for(us i=0;imax_jobs;i++) { + printf("Job %zu", i); + if(jq->jobs[i].ready) + printf(" available"); + if(jq->jobs[i].running) + printf(" running"); - printf(" - ptr %zu\n", (us) jq->jobs[i].job_ptr); + printf(" - ptr %zu\n", (us) jq->jobs[i].job_ptr); - } - feTRACE(15); + } + feTRACE(15); } - +#ifdef LASP_PARALLEL #define LOCK_MUTEX \ - /* Lock the mutex to let the threads wait initially */ \ - int rv = pthread_mutex_lock(&jq->mutex); \ - if(rv !=0) { \ - WARN("Mutex lock failed"); \ - } - + /* Lock the mutex to let the threads wait initially */ \ + int rv = pthread_mutex_lock(&jq->mutex); \ + if(rv !=0) { \ + WARN("Mutex lock failed"); \ + } #define UNLOCK_MUTEX \ - rv = pthread_mutex_unlock(&jq->mutex); \ - if(rv !=0) { \ - WARN("Mutex unlock failed"); \ - } + rv = pthread_mutex_unlock(&jq->mutex); \ + if(rv !=0) { \ + WARN("Mutex unlock failed"); \ + } + +#else +#define LOCK_MUTEX +#define UNLOCK_MUTEX +#endif // LASP_PARALLEL JobQueue* JobQueue_alloc(const us max_jobs) { - TRACE(15,"JobQueue_alloc"); - if(max_jobs > LASP_MAX_NUM_CHANNELS) { - WARN("Max jobs restricted to LASP_MAX_NUM_CHANNELS"); - return NULL; - } - JobQueue* jq = a_malloc(sizeof(JobQueue)); - + TRACE(15,"JobQueue_alloc"); + if(max_jobs > LASP_MAX_NUM_CHANNELS) { + WARN("Max jobs restricted to LASP_MAX_NUM_CHANNELS"); + return NULL; + } + JobQueue* jq = a_malloc(sizeof(JobQueue)); - if(!jq) { - WARN("Allocation of JobQueue failed"); - return NULL; - } - jq->max_jobs = max_jobs; - jq->jobs = a_malloc(max_jobs*sizeof(Job)); - if(!jq->jobs) { - WARN("Allocation of JobQueue jobs failed"); - return NULL; - } + if(!jq) { + WARN("Allocation of JobQueue failed"); + return NULL; + } + jq->max_jobs = max_jobs; - Job* j = jq->jobs; - for(us jindex=0;jindexjob_ptr = NULL; - j->ready = false; - j->running = false; - j++; - } + jq->jobs = a_malloc(max_jobs*sizeof(Job)); + if(!jq->jobs) { + WARN("Allocation of JobQueue jobs failed"); + return NULL; + } - /* Initialize thread mutex */ - int rv = pthread_mutex_init(&jq->mutex,NULL); - if(rv !=0) { - WARN("Mutex initialization failed"); - return NULL; - } - rv = pthread_cond_init(&jq->cv_plus,NULL); - if(rv !=0) { - WARN("Condition variable initialization failed"); - return NULL; - } + Job* j = jq->jobs; + for(us jindex=0;jindexjob_ptr = NULL; + j->ready = false; + j->running = false; + j++; + } - rv = pthread_cond_init(&jq->cv_minus,NULL); - if(rv !=0) { - WARN("Condition variable initialization failed"); - return NULL; - } +#ifdef LASP_PARALLEL + /* Initialize thread mutex */ + int rv = pthread_mutex_init(&jq->mutex,NULL); + if(rv !=0) { + WARN("Mutex initialization failed"); + return NULL; + } + rv = pthread_cond_init(&jq->cv_plus,NULL); + if(rv !=0) { + WARN("Condition variable initialization failed"); + return NULL; + } - /* print_job_queue(jq); */ - return jq; + rv = pthread_cond_init(&jq->cv_minus,NULL); + if(rv !=0) { + WARN("Condition variable initialization failed"); + return NULL; + } + +#endif // LASP_PARALLEL + /* print_job_queue(jq); */ + return jq; } void JobQueue_free(JobQueue* jq) { - TRACE(15,"JobQueue_free"); - dbgassert(jq,NULLPTRDEREF "jq in JobQueue_free"); - - int rv; - - if(count_jobs(jq) != 0) { - WARN("Job queue not empty!"); - } + TRACE(15,"JobQueue_free"); + dbgassert(jq,NULLPTRDEREF "jq in JobQueue_free"); - a_free(jq->jobs); + int rv; - /* Destroy the mutexes and condition variables */ - rv = pthread_mutex_destroy(&jq->mutex); - if(rv != 0){ - WARN("Mutex destroy failed. Do not know what to do."); - } + if(count_jobs(jq) != 0) { + WARN("Job queue not empty!"); + } - rv = pthread_cond_destroy(&jq->cv_plus); - if(rv != 0){ - WARN("Condition variable destruction failed. " - "Do not know what to do."); - } + a_free(jq->jobs); - rv = pthread_cond_destroy(&jq->cv_minus); - if(rv != 0){ - WARN("Condition variable destruction failed. " - "Do not know what to do."); - } +#ifdef LASP_PARALLEL + /* Destroy the mutexes and condition variables */ + rv = pthread_mutex_destroy(&jq->mutex); + if(rv != 0){ + WARN("Mutex destroy failed. Do not know what to do."); + } + + rv = pthread_cond_destroy(&jq->cv_plus); + if(rv != 0){ + WARN("Condition variable destruction failed. " + "Do not know what to do."); + } + + rv = pthread_cond_destroy(&jq->cv_minus); + if(rv != 0){ + WARN("Condition variable destruction failed. " + "Do not know what to do."); + } +#endif // LASP_PARALLEL } int JobQueue_push(JobQueue* jq,void* job_ptr) { - - TRACE(15,"JobQueue_push"); - dbgassert(jq,NULLPTRDEREF "jq in JobQueue_push"); - /* print_job_queue(jq); */ - /* uVARTRACE(15,(us) job_ptr); */ - - LOCK_MUTEX; - - us max_jobs = jq->max_jobs; + TRACE(15,"JobQueue_push"); + dbgassert(jq,NULLPTRDEREF "jq in JobQueue_push"); - /* Check if queue is full */ - while(count_jobs(jq) == max_jobs) { + /* print_job_queue(jq); */ + /* uVARTRACE(15,(us) job_ptr); */ - WARN("Queue full. Wait until some jobs are done."); - rv = pthread_cond_wait(&jq->cv_minus,&jq->mutex); - if(rv !=0) { - WARN("Condition variable wait failed"); - } + LOCK_MUTEX; + + us max_jobs = jq->max_jobs; + +#ifdef LASP_PARALLEL + /* Check if queue is full */ + while(count_jobs(jq) == max_jobs) { + + WARN("Queue full. Wait until some jobs are done."); + rv = pthread_cond_wait(&jq->cv_minus,&jq->mutex); + if(rv !=0) { + WARN("Condition variable wait failed"); + } + } +#else + /* If job queue is full, not in parallel, we just fail to add something + * without waiting*/ + if(count_jobs(jq) == max_jobs) { + return LASP_FAILURE; + } +#endif // LASP_PARALLEL + + dbgassert(count_jobs(jq) != max_jobs, + "Queue cannot be full!"); + + /* Queue is not full try to find a place, fill it */ + Job* j = jq->jobs; + us i; + for(i=0;iready == false ) { + dbgassert(j->job_ptr==NULL,"Job ptr should be 0"); + dbgassert(j->ready==false,"Job cannot be assigned"); + break; + } + j++; + } + dbgassert(i!=jq->max_jobs,"Should have found a job!"); + + j->job_ptr = job_ptr; + j->ready = true; + +#ifdef LASP_PARALLEL + /* Notify worker threads that a new job has arrived */ + if(count_jobs(jq) == max_jobs) { + /* Notify ALL threads. Action required! */ + rv = pthread_cond_broadcast(&jq->cv_plus); + if(rv !=0) { + WARN("Condition variable broadcast failed"); } - dbgassert(count_jobs(jq) != max_jobs, - "Queue cannot be full!"); - - /* Queue is not full try to find a place, fill it */ - Job* j = jq->jobs; - us i; - for(i=0;iready == false ) { - dbgassert(j->job_ptr==NULL,"Job ptr should be 0"); - dbgassert(j->ready==false,"Job cannot be assigned"); - break; - } - j++; + } else { + /* Notify some thread that there has been some change to + * the Queue */ + rv = pthread_cond_signal(&jq->cv_plus); + if(rv !=0) { + WARN("Condition variable signal failed"); } - dbgassert(i!=jq->max_jobs,"Should have found a job!"); + } +#endif // LASP_PARALLEL - j->job_ptr = job_ptr; - j->ready = true; + /* print_job_queue(jq); */ - /* Notify worker threads that a new job has arrived */ - if(count_jobs(jq) == max_jobs) { - /* Notify ALL threads. Action required! */ - rv = pthread_cond_broadcast(&jq->cv_plus); - if(rv !=0) { - WARN("Condition variable broadcast failed"); - } + UNLOCK_MUTEX; - } else { - /* Notify some thread that there has been some change to - * the Queue */ - rv = pthread_cond_signal(&jq->cv_plus); - if(rv !=0) { - WARN("Condition variable signal failed"); - } - - } - - /* print_job_queue(jq); */ - - UNLOCK_MUTEX; - - return LASP_SUCCESS; + return LASP_SUCCESS; } void* JobQueue_assign(JobQueue* jq) { - TRACE(15,"JobQueue_assign"); + TRACE(15,"JobQueue_assign"); - LOCK_MUTEX; + LOCK_MUTEX; - /* Wait until a job is available */ - Job* j; - while ((j=get_ready_job(jq))==NULL) { + Job* j; +#ifdef LASP_PARALLEL + /* Wait until a job is available */ + while ((j=get_ready_job(jq))==NULL) { - TRACE(15,"JobQueue_assign: no ready job"); - pthread_cond_wait(&jq->cv_plus,&jq->mutex); + TRACE(15,"JobQueue_assign: no ready job"); + pthread_cond_wait(&jq->cv_plus,&jq->mutex); + } +#else + if(count_jobs(jq) == 0) { return NULL; } + else { j = get_ready_job(jq); } +#endif // LASP_PARALLEL + + TRACE(16,"JobQueue_assign: found ready job. Assigned to:"); +#ifdef LASP_DEBUG +#ifdef LASP_PARALLEL + pthread_t thisthread = pthread_self(); + iVARTRACE(16,thisthread); +#endif +#endif + + /* print_job_queue(jq); */ + /* Find a job from the queue, assign it and return it */ + j->running = true; + +#ifdef LASP_PARALLEL + if(count_jobs(jq) > 1) { + /* Signal different thread that there is more work to do */ + rv = pthread_cond_signal(&jq->cv_plus); + if(rv !=0) { + WARN("Condition variable broadcast failed"); } + } +#endif - TRACE(16,"JobQueue_assign: found ready job. Assigned to:"); - #ifdef LASP_DEBUG - pthread_t thisthread = pthread_self(); - iVARTRACE(16,thisthread); - #endif + UNLOCK_MUTEX; - /* print_job_queue(jq); */ - /* Find a job from the queue, assign it and return it */ - j->running = true; - - if(count_jobs(jq) > 1) { - /* Signal different thread that there is more work to do */ - rv = pthread_cond_signal(&jq->cv_plus); - if(rv !=0) { - WARN("Condition variable broadcast failed"); - } - } + TRACE(15,"End JobQueue_assign"); - UNLOCK_MUTEX; - - TRACE(15,"End JobQueue_assign"); - - return j->job_ptr; + return j->job_ptr; } void JobQueue_done(JobQueue* jq,void* job_ptr) { - TRACE(15,"JobQueue_done"); - dbgassert(jq,NULLPTRDEREF "jq in JobQueue_done"); + TRACE(15,"JobQueue_done"); + dbgassert(jq,NULLPTRDEREF "jq in JobQueue_done"); - LOCK_MUTEX; - - /* print_job_queue(jq); */ + LOCK_MUTEX; - /* Find the job from the queue, belonging to the job_ptr */ - Job* j=jq->jobs; - us i; - for(i=0;imax_jobs;i++) { - iVARTRACE(10,i); - if(j->ready && j->running && j->job_ptr == job_ptr) { - TRACE(15,"Found the job that has been done:"); - j->ready = false; - j->job_ptr = NULL; - j->running = false; - break; - } - j++; + /* print_job_queue(jq); */ + + /* Find the job from the queue, belonging to the job_ptr */ + Job* j=jq->jobs; + us i; + for(i=0;imax_jobs;i++) { + iVARTRACE(10,i); + if(j->ready && j->running && j->job_ptr == job_ptr) { + TRACE(15,"Found the job that has been done:"); + j->ready = false; + j->job_ptr = NULL; + j->running = false; + break; } + j++; + } - /* print_job_queue(jq); */ - - /* Job done, broadcast this */ - rv = pthread_cond_signal(&jq->cv_minus); - if(rv !=0) { - WARN("Condition variable broadcast failed"); - } + /* print_job_queue(jq); */ - UNLOCK_MUTEX; +#ifdef LASP_PARALLEL + /* Job done, broadcast this */ + rv = pthread_cond_signal(&jq->cv_minus); + if(rv !=0) { + WARN("Condition variable broadcast failed"); + } +#endif + + UNLOCK_MUTEX; } +#ifdef LASP_PARALLEL void JobQueue_wait_alldone(JobQueue* jq) { - TRACE(15,"JobQueue_wait_alldone"); - dbgassert(jq,NULLPTRDEREF "jq in JobQueue_wait_alldone"); + TRACE(15,"JobQueue_wait_alldone"); + dbgassert(jq,NULLPTRDEREF "jq in JobQueue_wait_alldone"); - LOCK_MUTEX; + LOCK_MUTEX; - /* Wait until number of jobs is 0 */ - while (count_jobs(jq)!=0) { - - if(rv !=0) { - WARN("Condition variable broadcast failed"); - } - - pthread_cond_wait(&jq->cv_minus,&jq->mutex); + /* Wait until number of jobs is 0 */ + while (count_jobs(jq)!=0) { + + if(rv !=0) { + WARN("Condition variable broadcast failed"); } - - UNLOCK_MUTEX; + + pthread_cond_wait(&jq->cv_minus,&jq->mutex); + } + + UNLOCK_MUTEX; } +#endif ////////////////////////////////////////////////////////////////////// diff --git a/lasp/c/lasp_mq.h b/lasp/c/lasp_mq.h index 040d422..e183229 100644 --- a/lasp/c/lasp_mq.h +++ b/lasp/c/lasp_mq.h @@ -33,7 +33,8 @@ void JobQueue_free(JobQueue* jq); /** * Pops a job from the queue. Waits indefinitely until some job is - * available. + * available. If in parallel mode. In serial mode, it returns NULL if no job + * is available (LASP_PARALLEL compilation flag not set). * * @param jq: JobQueue handle * @return Pointer to the job, NULL on error. @@ -68,7 +69,10 @@ int JobQueue_push(JobQueue* jq,void* job_ptr); * until all task workers are finished. * */ + +#ifdef LASP_PARALLEL void JobQueue_wait_alldone(JobQueue*); +#endif // LASP_PARALLEL #endif // MQ_H ////////////////////////////////////////////////////////////////////// diff --git a/lasp/c/lasp_nprocs.c b/lasp/c/lasp_nprocs.c index 706bec6..4467499 100644 --- a/lasp/c/lasp_nprocs.c +++ b/lasp/c/lasp_nprocs.c @@ -1,10 +1,10 @@ +#include "lasp_nprocs.h" #ifdef MS_WIN64 #include #else // Used for obtaining the number of processors #include #endif -#include "lasp_nprocs.h" us getNumberOfProcs() { #if MS_WIN64 @@ -12,7 +12,7 @@ us getNumberOfProcs() { SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); return sysinfo.dwNumberOfProcessors; -#else +#else // Linux, easy return get_nprocs(); #endif -} \ No newline at end of file +} diff --git a/lasp/c/lasp_sosfilterbank.c b/lasp/c/lasp_sosfilterbank.c index e3c652d..7b42147 100644 --- a/lasp/c/lasp_sosfilterbank.c +++ b/lasp/c/lasp_sosfilterbank.c @@ -7,246 +7,258 @@ typedef struct Sosfilterbank { - /// The filter_coefs matrix contains filter coefficients for a SOS filter. - us filterbank_size; - us nsections; + /// The filter_coefs matrix contains filter coefficients for a SOS filter. + us filterbank_size; + us nsections; - /// The filter coefficients for each of the filters in the Filterbank - /// The *first* axis is the filter no, the second axis contains the - /// filter coefficients, in the order, b_0, b_1, b_2, a_0, a_1, a_2, which - /// corresponds to the transfer function - /// b_0 + b_1 z^-1 + b_2 z^-2 - /// H[z] = ------------------------- - /// a_0 + a_1 z^-1 + a_2 z^-2 - dmat sos; /// sos[filter_no, coeff] + /// The filter coefficients for each of the filters in the Filterbank + /// The *first* axis is the filter no, the second axis contains the + /// filter coefficients, in the order, b_0, b_1, b_2, a_0, a_1, a_2, which + /// corresponds to the transfer function + /// b_0 + b_1 z^-1 + b_2 z^-2 + /// H[z] = ------------------------- + /// a_0 + a_1 z^-1 + a_2 z^-2 + dmat sos; /// sos[filter_no, coeff] - /// Storage for the current state of the output, first axis correspond to - /// the filter number axis, the second axis contains state coefficients - dmat state; - - JobQueue* jq; - Workers* workers; + /// Storage for the current state of the output, first axis correspond to + /// the filter number axis, the second axis contains state coefficients + dmat state; +#ifdef LASP_PARALLEL + JobQueue* jq; + Workers* workers; +#endif // LASP_PARALLEL } Sosfilterbank; us Sosfilterbank_getFilterbankSize(const Sosfilterbank* fb) { - fsTRACE(15); - assertvalidptr(fb); - return fb->filterbank_size; - feTRACE(15); + fsTRACE(15); + assertvalidptr(fb); + return fb->filterbank_size; + feTRACE(15); } int filter_single(void* worker_data,void* job); static inline us min(us a, us b){ - return ab?a:b; + return a>b?a:b; } Sosfilterbank* Sosfilterbank_create( - const us nthreads_, - const us filterbank_size, - const us nsections) { - fsTRACE(15); - dbgassert(nthreads_ <= LASP_MAX_NUM_THREADS, "Illegal number of threads"); + const us nthreads_, + const us filterbank_size, + const us nsections) { + fsTRACE(15); - dbgassert(filterbank_size <= MAX_SOS_FILTER_BANK_SIZE, - "Illegal filterbank size. Max size is " - annestr(MAX_SOS_FILTER_BANK_SIZE)); + dbgassert(filterbank_size <= MAX_SOS_FILTER_BANK_SIZE, + "Illegal filterbank size. Max size is " + annestr(MAX_SOS_FILTER_BANK_SIZE)); - Sosfilterbank* fb = (Sosfilterbank*) a_malloc(sizeof(Sosfilterbank)); + Sosfilterbank* fb = (Sosfilterbank*) a_malloc(sizeof(Sosfilterbank)); - fb->jq = NULL; - fb->workers = NULL; - fb->filterbank_size = filterbank_size; - dbgassert(nsections < MAX_SOS_FILTER_BANK_NSECTIONS,"Illegal number of sections"); - fb->nsections = nsections; + fb->filterbank_size = filterbank_size; + dbgassert(nsections < MAX_SOS_FILTER_BANK_NSECTIONS,"Illegal number of sections"); + fb->nsections = nsections; - /// Allocate filter coefficients matrix - fb->sos = dmat_alloc(filterbank_size, nsections*6); - fb->state = dmat_alloc(filterbank_size, nsections*2); - dmat_set(&(fb->state), 0); + /// Allocate filter coefficients matrix + fb->sos = dmat_alloc(filterbank_size, nsections*6); + fb->state = dmat_alloc(filterbank_size, nsections*2); + dmat_set(&(fb->state), 0); - /// Set all filter coefficients to unit impulse response - vd imp_response = vd_alloc(6*nsections); - vd_set(&imp_response,0); - for(us section = 0;section < nsections; section++) { - // Set b0 coefficient to 1 - setvecval(&imp_response, 0 + 6*section, 1); - // Set a0 coefficient to 1 - setvecval(&imp_response, 3 + 6*section, 1); + /// Set all filter coefficients to unit impulse response + vd imp_response = vd_alloc(6*nsections); + vd_set(&imp_response,0); + for(us section = 0;section < nsections; section++) { + // Set b0 coefficient to 1 + setvecval(&imp_response, 0 + 6*section, 1); + // Set a0 coefficient to 1 + setvecval(&imp_response, 3 + 6*section, 1); + } + + // Initialize all filters with a simple impulse response, single pass + for(us filter_no = 0; filter_no < filterbank_size; filter_no++) { + Sosfilterbank_setFilter(fb,filter_no,imp_response); + } + // Check if coefficients are properly initialized + // print_dmat(&(fb->sos)); + vd_free(&imp_response); + +#ifdef LASP_PARALLEL + fb->jq = NULL; + fb->workers = NULL; + dbgassert(nthreads_ <= LASP_MAX_NUM_THREADS, "Illegal number of threads"); + us nthreads; + us nprocs = getNumberOfProcs(); + + if(nthreads_ == 0) { + nthreads = min(max(nprocs/2,1), filterbank_size); + } else { + nthreads = nthreads_; + } + iVARTRACE(15, nthreads); + + if(nthreads > 1) { + if(!(fb->jq = JobQueue_alloc(filterbank_size))) { + Sosfilterbank_free(fb); + feTRACE(15); + return NULL; } - // Initialize all filters with a simple impulse response, single pass - for(us filter_no = 0; filter_no < filterbank_size; filter_no++) { - Sosfilterbank_setFilter(fb,filter_no,imp_response); + if(!(fb->workers = Workers_create(nthreads, + fb->jq, + NULL, + &filter_single, + NULL, + NULL))) { + Sosfilterbank_free(fb); + feTRACE(15); + return NULL; } - // Check if coefficients are properly initialized - // print_dmat(&(fb->sos)); - vd_free(&imp_response); + } - us nthreads; - us nprocs = getNumberOfProcs(); - - if(nthreads_ == 0) { - nthreads = min(max(nprocs/2,1), filterbank_size); - } else { - nthreads = nthreads_; - } - iVARTRACE(15, nthreads); - - if(nthreads > 1) { - if(!(fb->jq = JobQueue_alloc(filterbank_size))) { - Sosfilterbank_free(fb); - feTRACE(15); - return NULL; - } - - if(!(fb->workers = Workers_create(nthreads, - fb->jq, - NULL, - &filter_single, - NULL, - NULL))) { - Sosfilterbank_free(fb); - feTRACE(15); - return NULL; - } - } - - feTRACE(15); - return fb; +#endif // LASP_PARALLEL + feTRACE(15); + return fb; } void Sosfilterbank_setFilter(Sosfilterbank* fb,const us filter_no, - const vd filter_coefs) { - fsTRACE(15); - assertvalidptr(fb); - assert_vx(&filter_coefs); - iVARTRACE(15, filter_coefs.n_rows); - iVARTRACE(15, filter_no); - dbgassert(filter_no < fb->filterbank_size, "Illegal filter number"); - dbgassert(filter_coefs.n_rows == fb->nsections * 6, - "Illegal filter coefficient length"); + const vd filter_coefs) { + fsTRACE(15); + assertvalidptr(fb); + assert_vx(&filter_coefs); + iVARTRACE(15, filter_coefs.n_rows); + iVARTRACE(15, filter_no); + dbgassert(filter_no < fb->filterbank_size, "Illegal filter number"); + dbgassert(filter_coefs.n_rows == fb->nsections * 6, + "Illegal filter coefficient length"); - dmat *sos = &fb->sos; - /* dmat *state = &fb->state; */ - us nsections = fb->nsections; + dmat *sos = &fb->sos; + /* dmat *state = &fb->state; */ + us nsections = fb->nsections; - for(us index=0;indexsos)); - dmat_free(&(fb->state)); + dmat_free(&(fb->sos)); + dmat_free(&(fb->state)); - if(fb->workers) Workers_free(fb->workers); - if(fb->jq) JobQueue_free(fb->jq); +#ifdef LASP_PARALLEL + if(fb->workers) Workers_free(fb->workers); + if(fb->jq) JobQueue_free(fb->jq); +#endif // LASP_PARALLEL - a_free(fb); - feTRACE(15); + a_free(fb); + feTRACE(15); } typedef struct { - us filter_no; - us nsections; - us nsamples; - dmat sos; - dmat state; - dmat ys; + us filter_no; + us nsections; + us nsamples; + dmat sos; + dmat state; + dmat ys; } Job; int filter_single(void* worker_data, void* job_) { - fsTRACE(15); - Job* job = (Job*) job_; + fsTRACE(15); + Job* job = (Job*) job_; - us nsections = job->nsections; - us nsamples = job->nsamples; - dmat sos = job->sos; + us nsections = job->nsections; + us nsamples = job->nsamples; + dmat sos = job->sos; - for(us section=0;sectionstate),job->filter_no,section*2); - d w2 = *getdmatval(&(job->state),job->filter_no,section*2+1); + for(us section=0;sectionstate),job->filter_no,section*2); + d w2 = *getdmatval(&(job->state),job->filter_no,section*2+1); - d b0 = *getdmatval(&sos,job->filter_no,section*6+0); - d b1 = *getdmatval(&sos,job->filter_no,section*6+1); - d b2 = *getdmatval(&sos,job->filter_no,section*6+2); - /* d a0 = *getdmatval(&sos,job->filter_no,section*6+3); */ - d a1 = *getdmatval(&sos,job->filter_no,section*6+4); - d a2 = *getdmatval(&sos,job->filter_no,section*6+5); + d b0 = *getdmatval(&sos,job->filter_no,section*6+0); + d b1 = *getdmatval(&sos,job->filter_no,section*6+1); + d b2 = *getdmatval(&sos,job->filter_no,section*6+2); + /* d a0 = *getdmatval(&sos,job->filter_no,section*6+3); */ + d a1 = *getdmatval(&sos,job->filter_no,section*6+4); + d a2 = *getdmatval(&sos,job->filter_no,section*6+5); - d* y = getdmatval(&(job->ys), 0, job->filter_no); - - for(us sample=0;samplestate),job->filter_no,section*2) = w1; - *getdmatval(&(job->state),job->filter_no,section*2+1) = w2; + d* y = getdmatval(&(job->ys), 0, job->filter_no); + for(us sample=0;samplestate),job->filter_no,section*2) = w1; + *getdmatval(&(job->state),job->filter_no,section*2+1) = w2; - feTRACE(15); - return 0; + } + + feTRACE(15); + return 0; } dmat Sosfilterbank_filter(Sosfilterbank* fb,const vd* xs) { - fsTRACE(15); - assertvalidptr(fb); - assert_vx(xs); - dmat state = fb->state; - dmat sos = fb->sos; + fsTRACE(15); + assertvalidptr(fb); + assert_vx(xs); + dmat state = fb->state; + dmat sos = fb->sos; - us nsections = fb->nsections; - us filterbank_size = fb->filterbank_size; - us nsamples = xs->n_rows; + us nsections = fb->nsections; + us filterbank_size = fb->filterbank_size; + us nsamples = xs->n_rows; - dmat ys = dmat_alloc(nsamples, filterbank_size); - /// Copy input signal to output array - for(us filter=0;filterworkers) { - assertvalidptr(fb->jq); - JobQueue_push(fb->jq, &(jobs[filter])); - } else { - /* No workers, we have to do it ourselves */ - filter_single(NULL,(void*) &(jobs[filter])); - } - } +#ifdef LASP_PARALLEL if(fb->workers) { - JobQueue_wait_alldone(fb->jq); + assertvalidptr(fb->jq); + JobQueue_push(fb->jq, &(jobs[filter])); + } else { + +#endif // LASP_PARALLEL + /* No workers, we have to do it ourselves */ + filter_single(NULL,(void*) &(jobs[filter])); +#ifdef LASP_PARALLEL } - feTRACE(15); - return ys; +#endif // LASP_PARALLEL + } +#ifdef LASP_PARALLEL + if(fb->workers) { + JobQueue_wait_alldone(fb->jq); + } +#endif // LASP_PARALLEL + feTRACE(15); + return ys; } diff --git a/lasp/c/lasp_sosfilterbank.h b/lasp/c/lasp_sosfilterbank.h index 6950627..67c1815 100644 --- a/lasp/c/lasp_sosfilterbank.h +++ b/lasp/c/lasp_sosfilterbank.h @@ -2,8 +2,11 @@ // // Author: J.A. de Jong - ASCEE // -// Description: Implemententation of a discrete filterbank using cascaded -// second order sections (sos), also called BiQuads. +// Description: Implemententation of a discrete parallel filterbank using +// cascaded second order sections (sos) for each filter in the filterbank. In +// parallel mode, the filters are allocated over a set of Worker threads that +// actually perform the computation. In serial mode, all filters in the bank +// are computed in series. ////////////////////////////////////////////////////////////////////// #pragma once #ifndef LASP_FILTERBANK_H @@ -28,8 +31,8 @@ typedef struct Sosfilterbank Sosfilterbank; * @return Sosfilterbank handle */ Sosfilterbank* Sosfilterbank_create(const us nthreads, - const us filterbank_size, - const us nsections); + const us filterbank_size, + const us nsections); /** * Returns the number of channels in the filterbank (the filberbank size). @@ -51,7 +54,7 @@ us Sosfilterbank_getFilterbankSize(const Sosfilterbank* fb); * */ void Sosfilterbank_setFilter(Sosfilterbank* fb,const us filter_no, - const vd coefs); + const vd coefs); /** * Filters x using h, returns y @@ -61,10 +64,10 @@ void Sosfilterbank_setFilter(Sosfilterbank* fb,const us filter_no, * @return Filtered output in an allocated array. The number of * columns in this array equals the number of filters in the * filterbank. The number of output samples is equal to the number of - * input samples in x. + * input samples in x (and is equal to the number of rows in the output). */ dmat Sosfilterbank_filter(Sosfilterbank* fb, - const vd* x); + const vd* x); /** * Cleans up an existing filter bank. diff --git a/lasp/c/lasp_worker.c b/lasp/c/lasp_worker.c index 6af9824..7a26a77 100644 --- a/lasp/c/lasp_worker.c +++ b/lasp/c/lasp_worker.c @@ -6,7 +6,12 @@ // ////////////////////////////////////////////////////////////////////// #define TRACERPLUS (-5) + +/* The code in this file is not of any use for systems that are not able to do + * simultaneous multithreading, or should be adjusted for it. It is therefore + * only compiled in case LASP_PARALLEL flag is set */ #include "lasp_worker.h" +#ifdef LASP_PARALLEL #include "lasp_mq.h" #include "lasp_alloc.h" #include @@ -196,5 +201,6 @@ static void* threadfcn(void* thread_global_data) { return NULL; } +#endif // LASP_PARALLEL ////////////////////////////////////////////////////////////////////// diff --git a/lasp/c/lasp_worker.h b/lasp/c/lasp_worker.h index fa9f9e0..9780e14 100644 --- a/lasp/c/lasp_worker.h +++ b/lasp/c/lasp_worker.h @@ -10,8 +10,10 @@ #pragma once #ifndef LASP_WORKER_H #define LASP_WORKER_H +#include "lasp_config.h" #include "lasp_types.h" +#ifdef LASP_PARALLEL typedef struct Workers_s Workers; typedef struct JobQueue_s JobQueue; @@ -57,5 +59,7 @@ Workers* Workers_create(const us num_workers, */ void Workers_free(Workers* w); +#endif // LASP_PARALLEL + #endif // LASP_WORKER_H ////////////////////////////////////////////////////////////////////// diff --git a/test/test_workers.c b/test/test_workers.c index bb2c4a7..fa33a3b 100644 --- a/test/test_workers.c +++ b/test/test_workers.c @@ -5,14 +5,19 @@ // Description: // ////////////////////////////////////////////////////////////////////// -#include "lasp_worker.h" -#include "lasp_mq.h" +#include "lasp_config.h" #include "lasp_tracer.h" #include "lasp_assert.h" #include + +#ifdef LASP_PARALLEL +#include "lasp_worker.h" +#include "lasp_mq.h" + static void* walloc(void*); static int worker(void*,void*); static void wfree(void*); +#endif // LASP_PARALLEL int main() { @@ -21,6 +26,7 @@ int main() { iVARTRACE(15,getTracerLevel()); +#ifdef LASP_PARALLEL us njobs = 4; JobQueue* jq = JobQueue_alloc(njobs); dbgassert(jq,NULLPTRDEREF); @@ -42,8 +48,10 @@ int main() { Workers_free(w); JobQueue_free(jq); +#endif // LASP_PARALLEL return 0; } +#ifdef LASP_PARALLEL static void* walloc(void* data) { TRACE(15,"WALLOC"); uVARTRACE(15,(us) data); @@ -63,6 +71,7 @@ static void wfree(void* w_data) { TRACE(15,"wfree"); } +#endif // LASP_PARALLEL //////////////////////////////////////////////////////////////////////