Updated API. First work on impedance tube code

This commit is contained in:
Anne de Jong 2021-03-31 11:23:57 +02:00
parent 1f9e0325f4
commit 2d33110138
3 changed files with 70 additions and 36 deletions

View File

@ -2,4 +2,4 @@ from .soundpressureweighting import *
from .filterbank_design import *
from .fir_design import *
from .colorednoise import *
from .biquad import *

View File

@ -107,11 +107,27 @@ class TwoMicImpedanceTube:
# Microphone transfer function
G_AB = self.K*C[:,1,0]/C[:,0,0]
return self.getFreq(), self.cut_to_limits(G_AB)
def k(self, freq):
"""
Wave number, or thermoviscous wave number
"""
if self.thermoviscous:
D = self.D_imptube
S = pi/4*D**2
d = PrsDuct(0, S=S, rh=D/4, cs='circ')
d.mat = self.mat
omg = 2*pi*freq
G, Z = d.GammaZc(omg)
return G
else:
return 2*pi*freq/self.mat.c0
def R(self, meas):
freq, G_AB = self.G_AB(meas)
s = self.s
k = 2*pi*freq/self.mat.c0
k = self.k(freq)
d1 = self.d1
RpA = (G_AB - exp(-1j*k*s))/(exp(1j*k*s)-G_AB)

View File

@ -152,9 +152,11 @@ class IterRawData:
stop_offset = self.lastblock_stop_offset
else:
stop_offset = self.blocksize
# print(f'block: {block}, starto: {start_offset}, stopo {stop_offset}')
self.i +=1
return self.fa[block,start_offset:stop_offset,self.channels]
return self.fa[block,start_offset:stop_offset,:][:,self.channels]
class IterData(IterRawData):
"""
@ -362,25 +364,38 @@ class Measurement:
"""Returns the measurement time in seconds since the epoch."""
return self._time
@property
def rms(self, channels=None):
def rms(self, channels=None, substract_average=False):
"""Returns the root mean square values for each channel
Args:
channels: list of channels
substract_average: If set to true, computes the rms of only the
oscillating component of the signal, which is in fact the signal
variance.
Returns:
1D array with rms values for each channel
"""
#
try:
return self._rms
except AttributeError:
pass
meansquare = 0.
meansquare = 0. # Mean square of the signal, including its average
sum_ = 0. # Sumf of the values of the signal, used to compute average
N = 0
with self.file() as f:
for block in self.iterData(channels):
Nblock = block.shape[0]
sum_ += np.sum(block, axis=0)
N += Nblock
meansquare += np.sum(block**2, axis=0) / self.N
self._rms = np.sqrt(meansquare)
return self._rms
avg = sum_/N
# In fact, this is not the complete RMS, as in includes the DC
# If p = p_dc + p_osc, then rms(p_osc) = sqrt(ms(p)-ms(p_dc))
if substract_average:
meansquare -= avg**2
rms = np.sqrt(meansquare)
return rms
def variance(self, channels=None):
return self.rms(substract_average=True)
def rawData(self, channels=None, **kwargs):
"""Returns the raw data as stored in the measurement file,
@ -406,7 +421,6 @@ class Measurement:
return np.concatenate(rawdata, axis=0)
def iterData(self, channels, **kwargs):
sensitivity = kwargs.pop('sensitivity', self.sensitivity)
if channels is None:
@ -484,21 +498,26 @@ class Measurement:
blocks = [signal[i*N:(i+1)*N] for i in range(Nblocks)]
if noiseCorrection:
enp1 = [blocks[i+1] - blocks[i] for i in range(Nblocks-1)]
noise_est_np1 = [np.average(enp1[i+1]*enp1[i]) for i in range(Nblocks-2)]
# The difference between the measured signal in the previous block and
# the current block
en = [None] + [blocks[i] - blocks[i-1] for i in range(1,Nblocks)]
noise_est = [None] + [-np.average(en[i]*en[i+1]) for i in range(1,len(en)-1)]
# Create weighting coefficients
sum_inverse_noise = sum([1/n for n in noise_est_np1])
c_np1 = [1/(ni*sum_inverse_noise) for ni in noise_est_np1]
sum_inverse_noise = sum([1/n for n in noise_est[1:]])
c_n = [1/(ni*sum_inverse_noise) for ni in noise_est[1:]]
else:
c_np1 = [1/(Nblocks-2)]*(Nblocks-2)
c_n = [1/(Nblocks-2)]*(Nblocks-2)
assert np.isclose(sum(c_np1), 1.0)
assert np.isclose(sum(c_n), 1.0)
assert Nblocks-2 == len(c_n)
# Average signal over blocks
avg = np.zeros((blocks[0].shape), dtype=float)
for i in range(0, Nblocks-2):
avg += c_np1[i]*blocks[i+1]
for n in range(0, Nblocks-2):
avg += c_n[n]*blocks[n+1]
return avg
def periodicCPS(self, N, channels=None, **kwargs):
@ -514,7 +533,7 @@ class Measurement:
window = Window.rectangular
ps = PowerSpectra(N, window)
avg = self.periodicAverage(N, channels, **kwargs)
avg = np.asfortranarray(self.periodicAverage(N, channels, **kwargs))
CS = ps.compute(avg)
freq = getFreq(self.samplerate, N)
@ -554,24 +573,23 @@ class Measurement:
f.attrs['sensitivity'] = sens
self._sens = sens
def checkOverflow(self):
def checkOverflow(self, channels):
"""Coarse check for overflow in measurement.
Return:
True if overflow is possible, else False
"""
with self.file() as f:
for block in self.iterBlocks(f):
dtype = block.dtype
if dtype.kind == 'i':
# minvalue = np.iinfo(dtype).min
maxvalue = np.iinfo(dtype).max
if np.max(np.abs(block)) >= 0.9*maxvalue:
return True
else:
# Cannot check for floating point values.
return False
for block in self.iterData(channels):
dtype = block.dtype
if dtype.kind == 'i':
# minvalue = np.iinfo(dtype).min
maxvalue = np.iinfo(dtype).max
if np.max(np.abs(block)) >= 0.9*maxvalue:
return True
else:
# Cannot check for floating point values.
return False
return False