Added coherent periodic averaging methods to Measurement

This commit is contained in:
Anne de Jong 2021-02-19 15:33:33 +01:00
parent d72a35bfb5
commit a1e4a63043

View File

@ -21,12 +21,12 @@ this array is not found. Quantities are defaulted to 'Number / Full scale'
- Datasets:
'audio': 3-dimensional array of blocks of audio data. The first axis is the
'audio': 3-dimensional array of blocks of audio data. The first axis is the
block index, the second axis the sample number and the third axis is the channel
number. The data type is either int16, int32 or float64 / float32. In case the
data is stored as integers. The raw data should be scaled with the maximum value
that can be stored for the integer bit depth to get a number between -1.0 and
1.0.
number. The data type is either int16, int32 or float64 / float32. If raw data
is stored as integer values (int16, int32), the actual values should be
pre-scaled by its maximum positive number (2**(nb-1) - 1), such that the
corresponding 'number' lies between -1.0 and 1.0.
'video': 4-dimensional array of video frames. The first index is the frame
number, the second the x-value of the pixel and the third is the
@ -44,40 +44,16 @@ import h5py as h5
import numpy as np
from .lasp_config import LASP_NUMPY_FLOAT_TYPE
from scipy.io import wavfile
import os
import time
import wave
from .lasp_common import SIQtys, Qty
import os, time, wave, logging
from .lasp_common import SIQtys, Qty, getFreq
from .device import DaqChannel
import logging
from .wrappers import AvPowerSpectra, Window, PowerSpectra
logger = logging.Logger(__name__)
class BlockIter:
"""Iterate over the blocks in the audio data of a h5 file."""
def __init__(self, f):
"""Initialize a BlockIter object.
Args:
f: Audio dataset in the h5 file, accessed as f['audio']
"""
self.i = 0
self.nblocks = f['audio'].shape[0]
self.fa = f['audio']
def __iter__(self):
return self
def __next__(self):
"""Return the next block."""
if self.i == self.nblocks:
raise StopIteration
self.i += 1
return self.fa[self.i - 1][:, :]
def getSampWidth(dtype):
"""Returns the width of a single sample in bytes.
"""Returns the width of a single sample in **bytes**.
Args:
dtype: numpy dtype
@ -94,17 +70,14 @@ def getSampWidth(dtype):
else:
raise ValueError('Invalid data type: %s' % dtype)
def scaleBlockSens(block, sens):
"""Scale a block of raw data to return raw acoustic pressure data.
Args:
block: block of raw data with integer data type
sensitivity: array of sensitivity coeficients for
each channel
sens: array of sensitivity coeficients for
each channel.
"""
sens = np.asarray(sens)
assert sens.ndim == 1
assert sens.size == block.shape[1]
if np.issubdtype(block.dtype.type, np.integer):
sw = getSampWidth(block.dtype)
@ -113,6 +86,89 @@ def scaleBlockSens(block, sens):
fac = 1.
return block.astype(LASP_NUMPY_FLOAT_TYPE) / fac / sens[np.newaxis, :]
class IterRawData:
"""Iterate over stored blocks if the raw measurement data of a h5 file."""
def __init__(self, f, channels, **kwargs):
"""Initialize a BlockIter object.
Args:
f: Audio dataset in the h5 file, accessed as f['audio']
"""
assert isinstance(channels, list)
fa = f['audio']
self.fa = fa
self.i = 0
nblocks = fa.shape[0]
blocksize = fa.shape[1]
self.blocksize = blocksize
nchannels = fa.shape[2]
self.channels = channels
self.istart = kwargs.pop('istart', 0)
self.istop = kwargs.pop('istop', blocksize*nblocks)
self.firstblock = self.istart // blocksize
self.lastblock = self.istop // blocksize
if self.istop % blocksize == 0:
self.lastblock -= 1
self.firstblock_start_offset = self.istart % blocksize
if self.istop < 0:
self.istop += blocksize*nblocks
if self.istop <= self.istart:
raise ValueError('Stop index is smaller than start index')
if self.istop != blocksize*nblocks:
self.lastblock_stop_offset = self.istop % blocksize
else:
self.lastblock_stop_offset = blocksize
def __iter__(self):
return self
def __next__(self):
"""Return the next block."""
fa = self.fa
nblocks_to_return = self.lastblock-self.firstblock+1
block = self.firstblock + self.i
if block > self.lastblock:
raise StopIteration
if block == self.firstblock:
start_offset = self.firstblock_start_offset
else:
start_offset = 0
if block == self.lastblock:
stop_offset = self.lastblock_stop_offset
else:
stop_offset = self.blocksize
self.i +=1
return self.fa[block,start_offset:stop_offset,self.channels]
class IterData(IterRawData):
"""
Iterate over blocks of data, scaled with sensitivity and integer scaling
between 0 and 1
"""
def __init__(self, fa, channels, sensitivity, **kwargs):
super().__init__(fa, channels, **kwargs)
self.sens = np.asarray(sensitivity)[self.channels]
assert self.sens.ndim == 1
def __next__(self):
nextraw = super().__next__()
return scaleBlockSens(nextraw, self.sens)
def exportAsWave(fn, fs, data, force=False):
if '.wav' not in fn[-4:]:
@ -190,8 +246,12 @@ class Measurement:
try:
qtys_json = f.attrs['qtys']
# Load quantity data
self._qtys = [Qty.from_json(qty_json) for qty_json in qtys_json]
except KeyError:
# If quantity data is not available, this is an 'old'
# measurement file.
logger.debug('Physical quantity data not available in measurement file. Assuming {SIQtys.default}')
self._qtys = [SIQtys.default for i in range(self.nchannels)]
def setAttribute(self, atrname, value):
@ -300,92 +360,164 @@ class Measurement:
"""Returns the measurement time in seconds since the epoch."""
return self._time
def scaleBlock(self, block):
"""When the data is stored as integers, we assume dB full-scale
scaling. Hence, when we convert the data to floats, we divide by the
maximum possible value.
Returns:
Block of measurement data, scaled using sensitivity values and
retured as floating point values
"""
return scaleBlockSens(block, self.sensitivity)
@property
def prms(self):
"""Returns the root mean square of the uncalibrated rms sound pressure
level (equivalend SPL).
def rms(self, channels=None):
"""Returns the root mean square values for each channel
Returns:
1D array with rms values for each channel
"""
#
try:
return self._prms
return self._rms
except AttributeError:
pass
pms = 0.
meansquare = 0.
with self.file() as f:
for block in self.iterBlocks(f):
block = self.scaleBlock(block)
pms += np.sum(block**2, axis=0) / self.N
self._prms = np.sqrt(pms)
return self._prms
for block in self.iterData(channels):
meansquare += np.sum(block**2, axis=0) / self.N
self._rms = np.sqrt(meansquare)
return self._rms
def rawData(self, block=None, channel=None):
"""Returns the raw signal, without any transformations applied
def rawData(self, channels=None, **kwargs):
"""Returns the raw data as stored in the measurement file,
without any transformations applied
args:
block: If specified a certain block is returned
channels: list, or tuple of channel numbers to export. If not defined, all
channels in the measurement are returned
returns:
Numpy array with data. The first axis is always the time instance,
the second axis the channel number.
"""
if block is not None:
with self.file() as f:
if channel is not None:
blocks = f['audio'][block][:, [channel]]
else:
blocks = f['audio'][block]
else:
blocks = []
with self.file() as f:
for block in self.iterBlocks(f):
if channel is not None:
blocks.append(block[:, [channel]])
else:
blocks.append(block)
if channels is None:
channels = list(range(self.nchannels))
blocks = np.asarray(blocks)
rawdata = []
if channel is None:
blocks = blocks.reshape((self.nblocks * self.blocksize,
self.nchannels))
else:
blocks = blocks.reshape((self.nblocks * self.blocksize,
1))
return blocks
with self.file() as f:
for block in IterRawData(f, channels, **kwargs):
rawdata.append(block)
def praw(self, block=None):
"""Returns the uncalibrated acoustic pressure signal, but the
sensitivity is applied, converted to floating point acoustic
pressure values [Pa]."""
return np.concatenate(rawdata, axis=0)
print('TODO: THIS SHOULD BE CHANGED, VERY INCONSISTENT AND CONFUSING API')
blocks = self.rawData(block)
def iterData(self, channels, **kwargs):
sensitivity = kwargs.pop('sensitivity', self.sensitivity)
if channels is None:
channels = list(range(self.nchannels))
with self.file() as f:
for block in IterData(f, channels, sensitivity, **kwargs):
yield block
# Apply scaling (sensitivity, integer -> float)
blocks = self.scaleBlock(blocks)
return blocks
def data(self, channels=None, **kwargs):
"""
Returns the measurement data, scaled and sensitivity applied.
"""
data = []
for d in self.iterData(channels, **kwargs):
data.append(d)
def iterBlocks(self, opened_file):
"""Iterate over all the audio blocks in the opened file.
return np.concatenate(data, axis=0)
def CPS(self, channels=None, **kwargs):
"""
Compute single-sided Cross-Power-Spectrum of the measurement channels
Args:
opened_file: The h5File with the data
channels: Channels to compute for (numbers)
Optional arguments:
nfft: FFT length
window: Window type
overlap: Overlap percentage (value between 0.0 and up to and
including 100.0)
weighting:
Returns:
Cross-power-spectra. C[freq, ch_i, ch_j] = C_ij
"""
return BlockIter(opened_file)
nfft = kwargs.pop('nfft', 2048)
window = kwargs.pop('window', Window.hann)
overlap = kwargs.pop('overlap', 50.0)
if channels is None:
channels = list(range(self.nchannels))
nchannels = len(channels)
aps = AvPowerSpectra(nfft, nchannels, overlap, window)
freq = getFreq(self.samplerate, nfft)
for data in self.iterData(channels, **kwargs):
CS = aps.addTimeData(data)
return freq, CS
def periodicAverage(self, N, channels=None, noiseCorrection=True, **kwargs):
"""
Return the (coherent) periodic average the measurement. This method is
useful for a situation of periodic excitation.
Args:
N: The number of samples in one period. This value should
correspond with the period of the excitation!
noiseCorrection: whether to apply coherent averaging, according to
the Sliding Window correlation method (SWiC): Telle et al.: A Novel
Approach for Impulse Response Measurements with Time-Varying Noise.
If set to False, just the arithmetic average is used.
"""
# Create blocks of equal length N
Ntot = self.N
Nblocks = Ntot//N
# TODO: This method graps the whole measurement file into memory. Can
# only be done with relatively small measurement files.
signal = self.data(channels)
# Estimate noise power in block
blocks = [signal[i*N:(i+1)*N] for i in range(Nblocks)]
if noiseCorrection:
enp1 = [blocks[i+1] - blocks[i] for i in range(Nblocks-1)]
noise_est_np1 = [np.average(enp1[i+1]*enp1[i]) for i in range(Nblocks-2)]
# Create weighting coefficients
sum_inverse_noise = sum([1/n for n in noise_est_np1])
c_np1 = [1/(ni*sum_inverse_noise) for ni in noise_est_np1]
else:
c_np1 = [1/(Nblocks-2)]*(Nblocks-2)
assert np.isclose(sum(c_np1), 1.0)
# Average signal over blocks
avg = np.zeros((blocks[0].shape), dtype=float)
for i in range(0, Nblocks-2):
avg += c_np1[i]*blocks[i+1]
return avg
def periodicCPS(self, N, channels=None, **kwargs):
"""
Compute Cross-Spectral Density based on periodic excitation. Uses noise
reduction by time-averaging the data.
"""
if channels is None:
channels = list(range(self.nchannels))
nchannels = len(channels)
window = Window.rectangular
ps = PowerSpectra(N, window)
avg = self.periodicAverage(N, channels, **kwargs)
CS = ps.compute(avg)
freq = getFreq(self.samplerate, N)
return freq, CS
@property
def sensitivity(self):
@ -492,7 +624,6 @@ class Measurement:
wavfile.write(fn, self.samplerate, data)
@staticmethod
def fromtxt(fn,
skiprows,