Added lots of comments to some C-code. Moved some Python code to lasp_
common.py. Added lasp_measurement, lasp_slm, lasp_weighcal. Lots of bugfixes, renamed fir_design to filter_design, cleaned up frequency weighting filter code.
This commit is contained in:
parent
3823d51074
commit
2842fb5714
2
.gitignore
vendored
2
.gitignore
vendored
@ -18,4 +18,4 @@ test/test_workers
|
||||
doc
|
||||
LASP.egg-info
|
||||
lasp_octave_fir.*
|
||||
lasp/aps_ui.py
|
||||
lasp/ui_*
|
||||
|
@ -1,3 +1,3 @@
|
||||
from .wrappers import *
|
||||
from .tools import *
|
||||
P_REF = 2e-5
|
||||
from .lasp_tools import *
|
||||
from .lasp_common import *
|
@ -25,10 +25,10 @@ static inline d vd_dot(const vd * a,const vd* b) {
|
||||
return d_dot(getvdval(a,0),getvdval(b,0),a->n_rows);
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* y = fac * y
|
||||
*
|
||||
* @param y
|
||||
* @param y
|
||||
* @param fac scale factor
|
||||
*/
|
||||
static inline void dmat_scale(dmat* y,const c fac){
|
||||
@ -38,10 +38,10 @@ static inline void dmat_scale(dmat* y,const c fac){
|
||||
}
|
||||
|
||||
}
|
||||
/**
|
||||
/**
|
||||
* y = fac * y
|
||||
*
|
||||
* @param y
|
||||
* @param y
|
||||
* @param fac scale factor
|
||||
*/
|
||||
static inline void cmat_scale(cmat* y,const c fac){
|
||||
|
@ -98,11 +98,13 @@ dmat FilterBank_filter(FilterBank* fb,
|
||||
|
||||
dmat input_block = dmat_alloc(nfft,1);
|
||||
|
||||
/* Output is ready to be multiplied with FFt'th filter */
|
||||
/* FFT'th filter coefficients */
|
||||
cmat input_fft = cmat_alloc(nfft/2+1,1);
|
||||
|
||||
/* Output is ready to be multiplied with FFt'th filter */
|
||||
/* Output of the fast convolution */
|
||||
cmat output_fft = cmat_alloc(nfft/2+1,nfilters);
|
||||
|
||||
/* Inverse FFT'th output */
|
||||
dmat output_block = dmat_alloc(nfft,nfilters);
|
||||
|
||||
while (dFifo_size(input_fifo) >= nfft) {
|
||||
@ -111,6 +113,7 @@ dmat FilterBank_filter(FilterBank* fb,
|
||||
&input_block,
|
||||
fb->P_m_1 /* save P-1 samples */
|
||||
);
|
||||
|
||||
dbgassert(nsamples == nfft,"BUG in dFifo");
|
||||
|
||||
Fft_fft(fb->fft,&input_block,&input_fft);
|
||||
@ -127,7 +130,9 @@ dmat FilterBank_filter(FilterBank* fb,
|
||||
|
||||
vc_free(&output_fft_col);
|
||||
vc_free(&filter_col);
|
||||
|
||||
}
|
||||
|
||||
vc_free(&input_fft_col);
|
||||
|
||||
Fft_ifft(fb->fft,&output_fft,&output_block);
|
||||
@ -136,6 +141,8 @@ dmat FilterBank_filter(FilterBank* fb,
|
||||
fb->P_m_1,0, /* startrow, startcol */
|
||||
nfft-fb->P_m_1, /* Number of rows */
|
||||
output_block.n_cols);
|
||||
|
||||
/* Push the valid samples to the output FIFO */
|
||||
dFifo_push(fb->output_fifo,&valid_samples);
|
||||
dmat_free(&valid_samples);
|
||||
|
||||
|
@ -357,9 +357,11 @@ static inline void dmat_copy_rows(dmat* to,const dmat* from,
|
||||
us startrow_from,
|
||||
us nrows) {
|
||||
us col,ncols = to->n_cols;
|
||||
|
||||
dbgassert(to && from,NULLPTRDEREF);
|
||||
dbgassert(to->n_cols == from->n_cols,SIZEINEQUAL);
|
||||
dbgassert(startrow_from+nrows <= from->n_rows,OUTOFBOUNDSMATR);
|
||||
dbgassert(startrow_to+nrows <= to->n_rows,OUTOFBOUNDSMATR);
|
||||
|
||||
for(col=0;col<ncols;col++) {
|
||||
d* to_d = getdmatval(to,startrow_to,col);
|
||||
d* from_d = getdmatval(from,startrow_from,col);
|
||||
|
@ -16,7 +16,14 @@
|
||||
#define LASP_NUMPY_FLOAT_TYPE NPY_FLOAT32
|
||||
#endif
|
||||
|
||||
|
||||
/**
|
||||
* Create a numpy array from an existing dmat.
|
||||
*
|
||||
* @param mat
|
||||
* @param transfer_ownership
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
static inline PyObject* dmat_to_ndarray(dmat* mat,bool transfer_ownership) {
|
||||
fsTRACE(15);
|
||||
dbgassert(mat,NULLPTRDEREF);
|
||||
|
147
lasp/filter_design/freqweighting_fir.py
Normal file
147
lasp/filter_design/freqweighting_fir.py
Normal file
@ -0,0 +1,147 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""!
|
||||
Author: J.A. de Jong - ASCEE
|
||||
|
||||
Description: Filter design for frequency weightings A and C.
|
||||
"""
|
||||
from .fir_design import freqResponse, arbitrary_fir_design
|
||||
from ..lasp_common import FreqWeighting
|
||||
import numpy as np
|
||||
|
||||
__all__ = ['A','C','A_fir_design','C_fir_design',
|
||||
'show_Afir','show_Cfir']
|
||||
|
||||
fr = 1000.
|
||||
fL = 10**1.5
|
||||
fH = 10**3.9
|
||||
fLsq = fL**2
|
||||
fHsq = fH**2
|
||||
frsq = fr**2
|
||||
fA = 10**2.45
|
||||
D = np.sqrt(.5)
|
||||
|
||||
b = (1/(1-D))*(frsq+fLsq*fHsq/frsq-D*(fLsq+fHsq))
|
||||
c = fLsq*fHsq
|
||||
f2 = (3-np.sqrt(5.))/2*fA
|
||||
f3 = (3+np.sqrt(5.))/2*fA
|
||||
f1 = np.sqrt((-b-np.sqrt(b**2-4*c))/2)
|
||||
f4 = np.sqrt((-b+np.sqrt(b**2-4*c))/2)
|
||||
f4sq = f4**2
|
||||
|
||||
def A_uncor(f):
|
||||
"""
|
||||
Computes the uncorrected frequency response of the A-filter
|
||||
"""
|
||||
fsq = f**2
|
||||
num = f4sq*fsq**2
|
||||
denom1 = (fsq+f1**2)
|
||||
denom2 = np.sqrt((fsq+f2**2)*(fsq+f3**2))*(fsq+f4sq)
|
||||
|
||||
return (num/(denom1*denom2))
|
||||
|
||||
def A(f):
|
||||
"""
|
||||
Computes the linear A-weighting freqency response
|
||||
"""
|
||||
Auncor = A_uncor(f)
|
||||
A1000 = A_uncor(1000.)
|
||||
return Auncor/A1000
|
||||
|
||||
def C_uncor(f):
|
||||
"""
|
||||
Computes the uncorrected frequency response of the C-filter
|
||||
"""
|
||||
fsq = f**2
|
||||
num = f4sq*fsq
|
||||
denom1 = (fsq+f1**2)
|
||||
denom2 = (fsq+f4**2)
|
||||
return num/(denom1*denom2)
|
||||
|
||||
def C(f):
|
||||
"""
|
||||
Computes the linear A-weighting freqency response
|
||||
"""
|
||||
Cuncor = C_uncor(f)
|
||||
C1000 = C_uncor(1000.)
|
||||
return Cuncor/C1000
|
||||
|
||||
|
||||
def A_fir_design():
|
||||
fs = 48000.
|
||||
freq_design = np.linspace(0,17e3,3000)
|
||||
freq_design[-1] = fs/2
|
||||
amp_design = A(freq_design)
|
||||
amp_design[-1] = 0.
|
||||
|
||||
L = 2048 # Filter order
|
||||
fir = arbitrary_fir_design(fs,L,freq_design,amp_design,
|
||||
window='rectangular')
|
||||
return fir
|
||||
|
||||
|
||||
def C_fir_design():
|
||||
fs = 48000.
|
||||
freq_design = np.linspace(0,17e3,3000)
|
||||
freq_design[-1] = fs/2
|
||||
amp_design = C(freq_design)
|
||||
amp_design[-1] = 0.
|
||||
|
||||
L = 2048 # Filter order
|
||||
fir = arbitrary_fir_design(fs,L,freq_design,amp_design,
|
||||
window='rectangular')
|
||||
return fir
|
||||
|
||||
|
||||
def show_Afir():
|
||||
from asceefigs.plot import Bode, close, Figure
|
||||
close('all')
|
||||
|
||||
fs = 48000.
|
||||
freq_design = np.linspace(0,17e3,3000)
|
||||
freq_design[-1] = fs/2
|
||||
amp_design = A(freq_design)
|
||||
amp_design[-1] = 0.
|
||||
firs = []
|
||||
|
||||
# firs.append(arbitrary_fir_design(fs,L,freq_design,amp_design,window='hamming'))
|
||||
# firs.append(arbitrary_fir_design(fs,L,freq_design,amp_design,window='hann'))
|
||||
firs.append(A_fir_design())
|
||||
#from scipy.signal import iirdesign
|
||||
#b,a = iirdesign()
|
||||
freq_check = np.logspace(0,np.log10(fs/2),5000)
|
||||
f=Figure()
|
||||
|
||||
f.semilogx(freq_check,20*np.log10(A(freq_check)))
|
||||
for fir in firs:
|
||||
H = freqResponse(fs,freq_check, fir)
|
||||
f.plot(freq_check,20*np.log10(np.abs(H)))
|
||||
|
||||
f.fig.get_axes()[0].set_ylim(-75,3)
|
||||
|
||||
|
||||
def show_Cfir():
|
||||
from asceefigs.plot import Bode, close, Figure
|
||||
close('all')
|
||||
|
||||
fs = 48000.
|
||||
freq_design = np.linspace(0,17e3,3000)
|
||||
freq_design[-1] = fs/2
|
||||
amp_design = C(freq_design)
|
||||
amp_design[-1] = 0.
|
||||
firs = []
|
||||
|
||||
# firs.append(arbitrary_fir_design(fs,L,freq_design,amp_design,window='hamming'))
|
||||
# firs.append(arbitrary_fir_design(fs,L,freq_design,amp_design,window='hann'))
|
||||
firs.append(C_fir_design())
|
||||
#from scipy.signal import iirdesign
|
||||
#b,a = iirdesign()
|
||||
freq_check = np.logspace(0,np.log10(fs/2),5000)
|
||||
f=Figure()
|
||||
|
||||
f.semilogx(freq_check,20*np.log10(C(freq_check)))
|
||||
for fir in firs:
|
||||
H = freqResponse(fs,freq_check, fir)
|
||||
f.plot(freq_check,20*np.log10(np.abs(H)))
|
||||
|
||||
f.fig.get_axes()[0].set_ylim(-30,1)
|
86
lasp/lasp_common.py
Normal file
86
lasp/lasp_common.py
Normal file
@ -0,0 +1,86 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#!/usr/bin/env python3
|
||||
import numpy as np
|
||||
"""
|
||||
Common definitions used throughout the code.
|
||||
"""
|
||||
|
||||
__all__ = ['P_REF','FreqWeighting','TimeWeighting','getTime']
|
||||
|
||||
# Reference sound pressure level
|
||||
P_REF = 2e-5
|
||||
|
||||
class TimeWeighting:
|
||||
none = (None,'Raw (no time weighting)')
|
||||
ufast = (30e-3,'30 ms')
|
||||
fast = (0.125,'Fast')
|
||||
slow = (1.0,'Slow')
|
||||
types = (none, ufast, fast, slow)
|
||||
default = 2
|
||||
|
||||
@staticmethod
|
||||
def fillComboBox(cb):
|
||||
"""
|
||||
Fill TimeWeightings to a combobox
|
||||
|
||||
Args:
|
||||
cb: QComboBox to fill
|
||||
"""
|
||||
cb.clear()
|
||||
for tw in TimeWeighting.types:
|
||||
cb.addItem(tw[1],tw)
|
||||
cb.setCurrentIndex(TimeWeighting.default)
|
||||
|
||||
def getCurrent(cb):
|
||||
return TimeWeighting.types[cb.currentIndex()]
|
||||
|
||||
class FreqWeighting:
|
||||
"""
|
||||
Frequency weighting types
|
||||
"""
|
||||
A=('A','A-weighting')
|
||||
C=('C','C-weighting')
|
||||
Z=('Z','Z-weighting')
|
||||
types = (A,C,Z)
|
||||
default = 0
|
||||
|
||||
@staticmethod
|
||||
def fillComboBox(cb):
|
||||
"""
|
||||
Fill FreqWeightings to a combobox
|
||||
|
||||
Args:
|
||||
cb: QComboBox to fill
|
||||
"""
|
||||
cb.clear()
|
||||
for fw in FreqWeighting.types:
|
||||
cb.addItem(fw[1],fw)
|
||||
cb.setCurrentIndex(FreqWeighting.default)
|
||||
|
||||
def getCurrent(cb):
|
||||
return FreqWeighting.types[cb.currentIndex()]
|
||||
|
||||
def getTime(fs,N,start=0):
|
||||
"""
|
||||
Return a time array for given number of points and sampling frequency.
|
||||
|
||||
Args:
|
||||
fs: Sampling frequency [Hz]
|
||||
N: Number of time samples
|
||||
start: Optional start ofset in number of samples
|
||||
"""
|
||||
|
||||
return np.linspace(start/fs,N/fs,N,endpoint=False)
|
||||
|
||||
|
||||
def getFreq(fs, nfft):
|
||||
"""
|
||||
return an array of frequencies for single-sided spectra
|
||||
|
||||
Args:
|
||||
fs: Sampling frequency [Hz]
|
||||
nfft: Fft length (int)
|
||||
"""
|
||||
df = fs/nfft # frequency resolution
|
||||
K = nfft//2+1 # number of frequency bins
|
||||
return np.linspace(0, (K-1)*df, K)
|
17
lasp/lasp_config.py
Normal file
17
lasp/lasp_config.py
Normal file
@ -0,0 +1,17 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""!
|
||||
Author: J.A. de Jong - ASCEE
|
||||
|
||||
Description: LASP configuration
|
||||
"""
|
||||
import numpy as np
|
||||
|
||||
LASP_NUMPY_FLOAT_TYPE = np.float64
|
||||
|
||||
def zeros(shape):
|
||||
return np.zeros(shape,dtype=LASP_NUMPY_FLOAT_TYPE)
|
||||
def ones(shape):
|
||||
return np.ones(shape,dtype=LASP_NUMPY_FLOAT_TYPE)
|
||||
def empty(shape):
|
||||
return np.empty(shape,dtype=LASP_NUMPY_FLOAT_TYPE)
|
171
lasp/lasp_measurement.py
Normal file
171
lasp/lasp_measurement.py
Normal file
@ -0,0 +1,171 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""!
|
||||
Author: J.A. de Jong - ASCEE
|
||||
|
||||
Description: Measurement class
|
||||
"""
|
||||
import h5py as h5
|
||||
import numpy as np
|
||||
from .lasp_config import LASP_NUMPY_FLOAT_TYPE
|
||||
import wave,os
|
||||
|
||||
class BlockIter:
|
||||
def __init__(self,nblocks,faudio):
|
||||
self.i = 0
|
||||
self.nblocks = nblocks
|
||||
self.fa = faudio
|
||||
def __iter__(self):
|
||||
return self
|
||||
def __next__(self):
|
||||
if self.i == self.nblocks:
|
||||
raise StopIteration
|
||||
self.i+=1
|
||||
return self.fa[self.i-1][:,:]
|
||||
|
||||
|
||||
def getSampWidth(dtype):
|
||||
if dtype == np.int32:
|
||||
return 4
|
||||
elif dtype == np.int16:
|
||||
return 2
|
||||
else:
|
||||
raise ValueError('Invalid data type: %s' %dtype)
|
||||
|
||||
|
||||
def exportAsWave(fn,fs,data,force=False):
|
||||
if not '.wav' in fn[-4:]:
|
||||
fn += '.wav'
|
||||
|
||||
nchannels = data.shape[1]
|
||||
sampwidth = getSampWidth(data.dtype)
|
||||
|
||||
if os.path.exists(fn) and not force:
|
||||
raise RuntimeError('File already exists: %s', fn)
|
||||
|
||||
with wave.open(fn,'w') as wf:
|
||||
wf.setparams((nchannels,sampwidth,fs,0,'NONE','NONE'))
|
||||
wf.writeframes(np.asfortranarray(data).tobytes())
|
||||
|
||||
|
||||
class Measurement:
|
||||
def __init__(self, fn):
|
||||
|
||||
if not '.h5' in fn:
|
||||
fn += '.h5'
|
||||
|
||||
self.fn = fn
|
||||
|
||||
f = h5.File(fn,'r+')
|
||||
self.f = f
|
||||
try:
|
||||
f['video']
|
||||
self.has_video = True
|
||||
except KeyError:
|
||||
self.has_video = False
|
||||
|
||||
self.nblocks, self.blocksize, self.nchannels = f['audio'].shape
|
||||
dtype = f['audio'].dtype
|
||||
self.sampwidth = getSampWidth(dtype)
|
||||
|
||||
self.samplerate = f.attrs['samplerate']
|
||||
self.N = (self.nblocks*self.blocksize)
|
||||
self.T = self.N/self.samplerate
|
||||
|
||||
# comment = read-write thing
|
||||
try:
|
||||
self.f.attrs['comment']
|
||||
except KeyError:
|
||||
self.f.attrs['comment'] = ''
|
||||
|
||||
@property
|
||||
def comment(self):
|
||||
return self.f.attrs['comment']
|
||||
|
||||
@comment.setter
|
||||
def comment(self, cmt):
|
||||
self.f.attrs['comment'] = cmt
|
||||
|
||||
@property
|
||||
def recTime(self):
|
||||
return (self.blocksize*self.nblocks)/self.samplerate
|
||||
|
||||
@property
|
||||
def time(self):
|
||||
return self.f.attrs['time']
|
||||
|
||||
@property
|
||||
def prms(self):
|
||||
try:
|
||||
return self._prms
|
||||
except AttributeError:
|
||||
pass
|
||||
sens = self.sensitivity
|
||||
pms = 0.
|
||||
for block in self.iterBlocks():
|
||||
pms += np.sum(block/sens)**2/self.N
|
||||
self._prms = np.sqrt(pms)
|
||||
return self._prms
|
||||
|
||||
|
||||
def praw(self,block=None):
|
||||
if block is not None:
|
||||
blocks = self.f['audio'][block]
|
||||
else:
|
||||
blocks = []
|
||||
for block in self.iterBlocks():
|
||||
blocks.append(block)
|
||||
blocks = np.asarray(blocks)
|
||||
|
||||
blocks = blocks.reshape(self.nblocks*self.blocksize,
|
||||
self.nchannels)
|
||||
if blocks.dtype == np.int32:
|
||||
fac = 2**31
|
||||
elif blocks.dtype == np.int16:
|
||||
fac = 2**15
|
||||
else:
|
||||
raise RuntimeError('Unimplemented data type from recording: %s' %str(blocks.dtype))
|
||||
|
||||
blocks = blocks.astype(LASP_NUMPY_FLOAT_TYPE)/fac/self.sensitivity
|
||||
|
||||
return blocks
|
||||
|
||||
def iterBlocks(self):
|
||||
return BlockIter(self.nblocks,self.f['audio'])
|
||||
|
||||
@property
|
||||
def sensitivity(self):
|
||||
try:
|
||||
return self.f.attrs['sensitivity']
|
||||
except:
|
||||
return 1.0
|
||||
|
||||
@sensitivity.setter
|
||||
def sensitivity(self, sens):
|
||||
self.f.attrs['sensitivity'] = sens
|
||||
|
||||
def exportAsWave(self, fn=None, force=False):
|
||||
"""
|
||||
Export measurement file as wave
|
||||
"""
|
||||
if fn is None:
|
||||
fn = self.fn
|
||||
fn = os.path.splitext(fn)[0]
|
||||
|
||||
if not '.wav' in fn[-4:]:
|
||||
fn += '.wav'
|
||||
|
||||
if os.path.exists(fn) and not force:
|
||||
raise RuntimeError('File already exists: %s', fn)
|
||||
|
||||
with wave.open(fn,'w') as wf:
|
||||
wf.setparams((self.nchannels,self.sampwidth,self.samplerate,0,'NONE','NONE'))
|
||||
for block in self.iterBlocks():
|
||||
wf.writeframes(np.asfortranarray(block).tobytes())
|
||||
|
||||
|
||||
def __del__(self):
|
||||
try:
|
||||
self.f.close()
|
||||
except AttributeError:
|
||||
pass
|
58
lasp/lasp_slm.py
Normal file
58
lasp/lasp_slm.py
Normal file
@ -0,0 +1,58 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Sound level meter implementation
|
||||
@author: J.A. de Jong - ASCEE
|
||||
"""
|
||||
from .wrappers import FilterBank, SPLowpass
|
||||
import numpy as np
|
||||
from .lasp_config import zeros
|
||||
from .lasp_common import TimeWeighting, P_REF
|
||||
__all__ = ['SLM']
|
||||
|
||||
class Dummy:
|
||||
def filter_(self, data):
|
||||
return data
|
||||
|
||||
class SLM:
|
||||
"""
|
||||
Sound Level Meter, implements the single pole lowpass filter
|
||||
"""
|
||||
def __init__(self, fs, weighcal,
|
||||
tw=TimeWeighting.default,
|
||||
nchannels = 1,
|
||||
freq=None, cal=None):
|
||||
|
||||
self._weighcal = weighcal
|
||||
if tw is not TimeWeighting.none:
|
||||
self._lps = [SPLowpass(fs, tw[0]) for i in range(nchannels)]
|
||||
else:
|
||||
self._lpw = [Dummy() for i in range(nchannels)]
|
||||
self._Lmax = zeros(nchannels)
|
||||
|
||||
@property
|
||||
def Lmax(self):
|
||||
return self._Lmax
|
||||
|
||||
def addData(self, data):
|
||||
assert data.ndim == 2
|
||||
data_weighted = self._weighcal.filter_(data)
|
||||
|
||||
# Squared
|
||||
sq = data_weighted**2
|
||||
if sq.shape[0] == 0:
|
||||
return np.array([])
|
||||
|
||||
tw = []
|
||||
# Time-weight the signal
|
||||
for chan,lp in enumerate(self._lps):
|
||||
tw.append(lp.filter_(sq[:,chan])[:,0])
|
||||
tw = np.asarray(tw).transpose()
|
||||
|
||||
Level = 10*np.log10(tw/P_REF**2)
|
||||
|
||||
curmax = np.max(Level)
|
||||
if curmax > self._Lmax:
|
||||
self._Lmax = curmax
|
||||
|
||||
return Level
|
8
lasp/lasp_tools.py
Normal file
8
lasp/lasp_tools.py
Normal file
@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""!
|
||||
Author: J.A. de Jong - ASCEE
|
||||
|
||||
Description:
|
||||
"""
|
||||
import numpy as np
|
136
lasp/lasp_weighcal.py
Normal file
136
lasp/lasp_weighcal.py
Normal file
@ -0,0 +1,136 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Weighting and calibration filter in one
|
||||
@author: J.A. de Jong - ASCEE
|
||||
"""
|
||||
from .filter_design.freqweighting_fir import A,C
|
||||
from .lasp_common import FreqWeighting
|
||||
from .filter_design.fir_design import (arbitrary_fir_design,
|
||||
freqResponse as frp)
|
||||
from lasp.lasp_config import ones, empty
|
||||
from .wrappers import FilterBank
|
||||
import numpy as np
|
||||
|
||||
__all__ = ['WeighCal']
|
||||
|
||||
|
||||
class WeighCal:
|
||||
"""
|
||||
Time weighting and calibration FIR filter
|
||||
"""
|
||||
def __init__(self, fw = FreqWeighting.default,
|
||||
nchannels = 1,
|
||||
fs = 48000.,
|
||||
calfile = None,
|
||||
sens=1.0):
|
||||
"""
|
||||
Initialize the frequency weighting and calibration FIR filters.
|
||||
|
||||
Args:
|
||||
fw: Frequency weighting to apply
|
||||
nchannels: Number of channels for the input data
|
||||
fs: Sampling frequency [Hz]
|
||||
calfile: Calibration file to load.
|
||||
sens: Sensitivity in units [\f$ Pa^{-1} \f$]
|
||||
"""
|
||||
|
||||
self.nchannels = nchannels
|
||||
self.fs = fs
|
||||
self.fw = fw
|
||||
self.sens = sens
|
||||
self.calfile = calfile
|
||||
|
||||
# Frequencies used for the filter design
|
||||
freq_design = np.linspace(0,17e3,3000)
|
||||
freq_design[-1] = fs/2
|
||||
|
||||
# Objective function for the frequency response
|
||||
frp_obj = self.frpObj(freq_design)
|
||||
|
||||
self._firs = []
|
||||
self._fbs = []
|
||||
for chan in range(self.nchannels):
|
||||
fir = arbitrary_fir_design(fs,2048,freq_design,
|
||||
frp_obj[:,chan],
|
||||
window='rectangular')
|
||||
self._firs.append(fir)
|
||||
|
||||
self._fbs.append(FilterBank(fir[:,np.newaxis],4096))
|
||||
|
||||
self._freq_design = freq_design
|
||||
|
||||
def filter_(self,data):
|
||||
"""
|
||||
Filter data using the calibration and frequency weighting filter.
|
||||
"""
|
||||
nchan = self.nchannels
|
||||
assert data.shape[1] == nchan
|
||||
assert data.shape[0] > 0
|
||||
|
||||
filtered = []
|
||||
for chan in range(nchan):
|
||||
filtered.append(self._fbs[chan].filter_(data[:,chan])[:,0])
|
||||
filtered = np.asarray(filtered).transpose()/self.sens
|
||||
if filtered.ndim == 1:
|
||||
filtered = filtered[:,np.newaxis]
|
||||
return filtered
|
||||
|
||||
|
||||
def frpCalObj(self, freq_design):
|
||||
"""
|
||||
Computes the objective frequency response of the calibration filter
|
||||
"""
|
||||
calfile = self.calfile
|
||||
if calfile is not None:
|
||||
cal = np.loadtxt(calfile,skiprows=2)
|
||||
freq = cal[:,0]
|
||||
cal = cal[:,1:]
|
||||
if cal.shape[1] != self.nchannels:
|
||||
raise ValueError('Number of channels in calibration file does'
|
||||
' not equal to given number of channels')
|
||||
calfac = 10**(-cal/20)
|
||||
filter_calfac = empty((freq_design.shape[0],self.nchannels))
|
||||
|
||||
for chan in range(self.nchannels):
|
||||
filter_calfac[:,chan] = np.interp(freq_design,freq,calfac[:,chan])
|
||||
|
||||
else:
|
||||
filter_calfac = ones((freq_design.shape[0],self.nchannels,))
|
||||
|
||||
return filter_calfac
|
||||
|
||||
def frpWeightingObj(self, freq_design):
|
||||
"""
|
||||
Computes the objective frequency response of the frequency weighting
|
||||
filter.
|
||||
"""
|
||||
fw = self.fw
|
||||
if fw == FreqWeighting.A:
|
||||
return A(freq_design)
|
||||
elif fw == FreqWeighting.C:
|
||||
return C(freq_design)
|
||||
elif fw == FreqWeighting.Z:
|
||||
return ones(freq_design.shape[0])
|
||||
else:
|
||||
raise ValueError('Invalid fw parameter')
|
||||
|
||||
def frpObj(self, freq_design):
|
||||
"""
|
||||
Combines the frequency weighting and the calibration filter into
|
||||
one frequency response objective function.
|
||||
"""
|
||||
# Objective function for the frequency response
|
||||
frp_objective = self.frpCalObj(freq_design) * \
|
||||
self.frpWeightingObj(freq_design)[:,np.newaxis]
|
||||
frp_objective[-1] = 0.
|
||||
|
||||
return frp_objective
|
||||
|
||||
def freqResponse(self, chan=0, freq=None):
|
||||
"""
|
||||
Returns the frequency response of the designed FIR filter
|
||||
"""
|
||||
if freq is None:
|
||||
freq = np.logspace(1,np.log10(self.fs/2),500)
|
||||
return freq,frp(self.fs,freq,self._firs[chan]),self.frpObj(freq)[:,chan]
|
@ -1,13 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""!
|
||||
Author: J.A. de Jong - ASCEE
|
||||
|
||||
Description:
|
||||
"""
|
||||
import numpy as np
|
||||
|
||||
def getFreq(fs, nfft):
|
||||
df = fs/nfft # frequency resolution
|
||||
K = nfft//2+1 # number of frequency bins
|
||||
return np.linspace(0, (K-1)*df, K)
|
@ -98,8 +98,6 @@ cdef class Fft:
|
||||
|
||||
return timedata
|
||||
|
||||
|
||||
|
||||
cdef extern from "lasp_window.h":
|
||||
ctypedef enum WindowType:
|
||||
Hann
|
||||
@ -214,7 +212,7 @@ cdef class AvPowerSpectra:
|
||||
cdef vd weighting_vd
|
||||
cdef vd* weighting_ptr = NULL
|
||||
if(weighting.size != 0):
|
||||
weighting_vd = dmat_foreign_data(weighting.n_rows,1,
|
||||
weighting_vd = dmat_foreign_data(weighting.size,1,
|
||||
&weighting[0],False)
|
||||
weighting_ptr = &weighting_vd
|
||||
|
||||
@ -295,7 +293,7 @@ cdef class FilterBank:
|
||||
cdef dmat hmat = dmat_foreign_data(h.shape[0],
|
||||
h.shape[1],
|
||||
&h[0,0],
|
||||
True)
|
||||
False)
|
||||
|
||||
self.fb = FilterBank_create(&hmat,nfft)
|
||||
dmat_free(&hmat)
|
||||
@ -376,6 +374,9 @@ cdef class SPLowpass:
|
||||
SPLowpass_free(self.lp)
|
||||
|
||||
def filter_(self,d[:] input_):
|
||||
if input_.shape[0] == 0:
|
||||
return np.array([],dtype=NUMPY_FLOAT_TYPE)
|
||||
|
||||
cdef vd input_vd = dmat_foreign_data(input_.shape[0],1,
|
||||
&input_[0],False)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user