lasp/lasp/lasp_measurement.py

785 lines
26 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""!
Author: J.A. de Jong - ASCEE
Description: Measurement class
The ASCEE hdf5 measurement file format contains the following fields:
- Attributes:
'version': If not given, version 1 is assumed. For version 1, measurement data
is assumed to be acoustic data.
'samplerate': The audio data sample rate in Hz.
'nchannels': The number of audio channels in the file
'sensitivity': (Optionally) the stored sensitivity of the record channels.
This can be a single value, or a list of sensitivities for
each channel. Both representations are allowed.
'qtys' : (Optionally): list of quantities that is recorded for each channel', if
this array is not found. Quantities are defaulted to 'Number / Full scale'
- Datasets:
'audio': 3-dimensional array of blocks of audio data. The first axis is the
block index, the second axis the sample number and the third axis is the channel
number. The data type is either int16, int32 or float64 / float32. If raw data
is stored as integer values (int16, int32), the actual values should be
pre-scaled by its maximum positive number (2**(nb-1) - 1), such that the
corresponding 'number' lies between -1.0 and 1.0.
'video': 4-dimensional array of video frames. The first index is the frame
number, the second the x-value of the pixel and the third is the
y-value of the pixel. Then, the last axis is the color. This axis has
length 3 and the colors are stored as (r,g,b). Where typically a
color depth of 256 is used (np.uint8 data format)
The video dataset can possibly be not present in the data.
"""
__all__ = ['Measurement', 'scaleBlockSens']
from contextlib import contextmanager
import h5py as h5
import numpy as np
from .lasp_config import LASP_NUMPY_FLOAT_TYPE
from scipy.io import wavfile
import os, time, wave, logging
from .lasp_common import SIQtys, Qty, getFreq
from .device import DaqChannel
from .wrappers import AvPowerSpectra, Window, PowerSpectra
logger = logging.Logger(__name__)
def getSampWidth(dtype):
"""Returns the width of a single sample in **bytes**.
Args:
dtype: numpy dtype
Returns:
Size of a sample in bytes (int)
"""
if dtype in (np.int32, np.float32):
return 4
elif dtype == np.int16:
return 2
elif dtype == np.float64:
return 8
else:
raise ValueError('Invalid data type: %s' % dtype)
def scaleBlockSens(block, sens):
"""Scale a block of raw data to return raw acoustic pressure data.
Args:
block: block of raw data with integer data type
sens: array of sensitivity coeficients for
each channel.
"""
sens = np.asarray(sens)
assert sens.size == block.shape[1]
if np.issubdtype(block.dtype.type, np.integer):
sw = getSampWidth(block.dtype)
2019-10-27 13:19:26 +00:00
fac = 2**(8 * sw - 1) - 1
else:
fac = 1.
2019-10-27 13:19:26 +00:00
return block.astype(LASP_NUMPY_FLOAT_TYPE) / fac / sens[np.newaxis, :]
class IterRawData:
"""Iterate over stored blocks if the raw measurement data of a h5 file."""
def __init__(self, f, channels, **kwargs):
"""Initialize a BlockIter object.
Args:
f: Audio dataset in the h5 file, accessed as f['audio']
"""
assert isinstance(channels, list)
fa = f['audio']
self.fa = fa
self.i = 0
nblocks = fa.shape[0]
blocksize = fa.shape[1]
self.blocksize = blocksize
nchannels = fa.shape[2]
self.channels = channels
self.istart = kwargs.pop('istart', 0)
self.istop = kwargs.pop('istop', blocksize*nblocks)
self.firstblock = self.istart // blocksize
self.lastblock = self.istop // blocksize
if self.istop % blocksize == 0:
self.lastblock -= 1
self.firstblock_start_offset = self.istart % blocksize
if self.istop < 0:
self.istop += blocksize*nblocks
if self.istop <= self.istart:
raise ValueError('Stop index is smaller than start index')
if self.istop != blocksize*nblocks:
self.lastblock_stop_offset = self.istop % blocksize
else:
self.lastblock_stop_offset = blocksize
def __iter__(self):
return self
def __next__(self):
"""Return the next block."""
fa = self.fa
nblocks_to_return = self.lastblock-self.firstblock+1
block = self.firstblock + self.i
if block > self.lastblock:
raise StopIteration
if block == self.firstblock:
start_offset = self.firstblock_start_offset
else:
start_offset = 0
if block == self.lastblock:
stop_offset = self.lastblock_stop_offset
else:
stop_offset = self.blocksize
self.i +=1
return self.fa[block,start_offset:stop_offset,self.channels]
class IterData(IterRawData):
"""
Iterate over blocks of data, scaled with sensitivity and integer scaling
between 0 and 1
"""
def __init__(self, fa, channels, sensitivity, **kwargs):
super().__init__(fa, channels, **kwargs)
self.sens = np.asarray(sensitivity)[self.channels]
assert self.sens.ndim == 1
def __next__(self):
nextraw = super().__next__()
return scaleBlockSens(nextraw, self.sens)
def exportAsWave(fn, fs, data, force=False):
if '.wav' not in fn[-4:]:
fn += '.wav'
nchannels = data.shape[1]
sampwidth = getSampWidth(data.dtype)
if os.path.exists(fn) and not force:
raise RuntimeError('File already exists: %s', fn)
with wave.open(fn, 'w') as wf:
wf.setparams((nchannels, sampwidth, fs, 0, 'NONE', 'NONE'))
wf.writeframes(np.asfortranarray(data).tobytes())
class Measurement:
"""Provides access to measurement data stored in the h5 measurement file
format."""
def __init__(self, fn):
"""Initialize a Measurement object based on the filename."""
if '.h5' not in fn:
fn += '.h5'
# Full filepath
self.fn = fn
# Base filename
self.fn_base = os.path.split(fn)[1]
# Open the h5 file in read-plus mode, to allow for changing the
# measurement comment.
with h5.File(fn, 'r+') as f:
# Check for video data
try:
f['video']
self.has_video = True
except KeyError:
self.has_video = False
self.nblocks, self.blocksize, self.nchannels = f['audio'].shape
dtype = f['audio'].dtype
self.dtype = dtype
self.sampwidth = getSampWidth(dtype)
self.samplerate = f.attrs['samplerate']
2019-10-27 13:19:26 +00:00
self.N = (self.nblocks * self.blocksize)
self.T = self.N / self.samplerate
try:
self._channel_names = f.attrs['channel_names']
except KeyError:
# No channel names found in measurement file
2020-02-26 13:20:33 +00:00
self._channel_names = [f'Unnamed {i}' for i in range(self.nchannels)]
# comment = read-write thing
try:
self._comment = f.attrs['comment']
except KeyError:
f.attrs['comment'] = ''
self._comment = ''
# Sensitivity
try:
sens = f.attrs['sensitivity']
self._sens = sens * \
np.ones(self.nchannels) if isinstance(
sens, float) else sens
except KeyError:
self._sens = np.ones(self.nchannels)
self._time = f.attrs['time']
try:
qtys_json = f.attrs['qtys']
# Load quantity data
self._qtys = [Qty.from_json(qty_json) for qty_json in qtys_json]
except KeyError:
# If quantity data is not available, this is an 'old'
# measurement file.
logger.debug('Physical quantity data not available in measurement file. Assuming {SIQtys.default}')
self._qtys = [SIQtys.default for i in range(self.nchannels)]
def setAttribute(self, atrname, value):
with self.file('r+') as f:
# Update comment attribute in the file
f.attrs[atrname] = value
setattr(self, '_' + atrname, value)
@property
def name(self):
"""Returns filename base without extension."""
return os.path.splitext(self.fn_base)[0]
@property
def channelNames(self):
return self._channel_names
@channelNames.setter
def channelNames(self, newchnames):
if len(newchnames) != self.nchannels:
raise RuntimeError('Invalid length of new channel names')
self.setAttribute('channelNames', newchnames)
@property
def channelConfig(self):
chcfg = [DaqChannel(channel_enabled=True,
channel_name=chname,
sensitivity=sens,)
for chname, sens in zip(
self.channelNames,
self.sensitivity)]
for i, qty in enumerate(self.qtys):
chcfg[i].qty = qty
return chcfg
@channelConfig.setter
def channelConfig(self, chcfg):
chname = []
sens = []
qtys = []
for ch in chcfg:
chname.append(ch.channel_name)
sens.append(ch.sensitivity)
qtys.append(ch.qty)
self.channelNames = chname
self.sensitivity = sens
self.qtys = qtys
@property
def qtys(self):
return self._qtys
@qtys.setter
def qtys(self, newqtys):
if not len(newqtys) == len(self._qtys):
raise ValueError('Invalid number of quantities')
qtys_json = [qty.to_json() for qty in newqtys]
# Use setAttribute here, but thos store the jsonified version as well,
# which we have to overwrite again with the deserialized ones. This is
# actually not a very nice way of coding.
self.setAttribute('qtys', qtys_json)
self._qtys = newqtys
@contextmanager
def file(self, mode='r'):
"""Contextmanager which opens the storage file and yields the file.
Args:
mode: Opening mode for the file. Should either be 'r', or 'r+'
"""
if mode not in ('r', 'r+'):
raise ValueError('Invalid file opening mode.')
with h5.File(self.fn, mode) as f:
yield f
@property
def comment(self):
"""Return the measurement comment.
2019-01-05 11:13:05 +00:00
Returns:
The measurement comment (text string)
"""
return self._comment
@comment.setter
def comment(self, cmt):
"""Set the measurement comment.
2019-01-05 11:13:05 +00:00
Args:
cmt: Comment text string to set
"""
with self.file('r+') as f:
2019-01-05 11:13:05 +00:00
# Update comment attribute in the file
f.attrs['comment'] = cmt
self._comment = cmt
@property
def recTime(self):
"""Returns the total recording time of the measurement, in float
seconds."""
2019-10-27 13:19:26 +00:00
return self.blocksize * self.nblocks / self.samplerate
@property
def time(self):
"""Returns the measurement time in seconds since the epoch."""
return self._time
@property
def rms(self, channels=None):
"""Returns the root mean square values for each channel
Returns:
1D array with rms values for each channel
"""
#
try:
return self._rms
except AttributeError:
pass
meansquare = 0.
with self.file() as f:
for block in self.iterData(channels):
meansquare += np.sum(block**2, axis=0) / self.N
self._rms = np.sqrt(meansquare)
return self._rms
def rawData(self, channels=None, **kwargs):
"""Returns the raw data as stored in the measurement file,
without any transformations applied
args:
channels: list, or tuple of channel numbers to export. If not defined, all
channels in the measurement are returned
returns:
Numpy array with data. The first axis is always the time instance,
the second axis the channel number.
"""
if channels is None:
channels = list(range(self.nchannels))
rawdata = []
with self.file() as f:
for block in IterRawData(f, channels, **kwargs):
rawdata.append(block)
return np.concatenate(rawdata, axis=0)
def iterData(self, channels, **kwargs):
sensitivity = kwargs.pop('sensitivity', self.sensitivity)
if channels is None:
channels = list(range(self.nchannels))
with self.file() as f:
for block in IterData(f, channels, sensitivity, **kwargs):
yield block
def data(self, channels=None, **kwargs):
"""
Returns the measurement data, scaled and sensitivity applied.
"""
data = []
for d in self.iterData(channels, **kwargs):
data.append(d)
return np.concatenate(data, axis=0)
def CPS(self, channels=None, **kwargs):
"""
Compute single-sided Cross-Power-Spectrum of the measurement channels
Args:
channels: Channels to compute for (numbers)
Optional arguments:
nfft: FFT length
window: Window type
overlap: Overlap percentage (value between 0.0 and up to and
including 100.0)
weighting:
Returns:
Cross-power-spectra. C[freq, ch_i, ch_j] = C_ij
"""
nfft = kwargs.pop('nfft', 2048)
window = kwargs.pop('window', Window.hann)
overlap = kwargs.pop('overlap', 50.0)
if channels is None:
channels = list(range(self.nchannels))
nchannels = len(channels)
aps = AvPowerSpectra(nfft, nchannels, overlap, window)
freq = getFreq(self.samplerate, nfft)
for data in self.iterData(channels, **kwargs):
CS = aps.addTimeData(data)
return freq, CS
def periodicAverage(self, N, channels=None, noiseCorrection=True, **kwargs):
"""
Return the (coherent) periodic average the measurement. This method is
useful for a situation of periodic excitation.
Args:
N: The number of samples in one period. This value should
correspond with the period of the excitation!
noiseCorrection: whether to apply coherent averaging, according to
the Sliding Window correlation method (SWiC): Telle et al.: A Novel
Approach for Impulse Response Measurements with Time-Varying Noise.
If set to False, just the arithmetic average is used.
"""
# Create blocks of equal length N
Ntot = self.N
Nblocks = Ntot//N
# TODO: This method graps the whole measurement file into memory. Can
# only be done with relatively small measurement files.
signal = self.data(channels)
# Estimate noise power in block
blocks = [signal[i*N:(i+1)*N] for i in range(Nblocks)]
if noiseCorrection:
enp1 = [blocks[i+1] - blocks[i] for i in range(Nblocks-1)]
noise_est_np1 = [np.average(enp1[i+1]*enp1[i]) for i in range(Nblocks-2)]
# Create weighting coefficients
sum_inverse_noise = sum([1/n for n in noise_est_np1])
c_np1 = [1/(ni*sum_inverse_noise) for ni in noise_est_np1]
else:
c_np1 = [1/(Nblocks-2)]*(Nblocks-2)
assert np.isclose(sum(c_np1), 1.0)
# Average signal over blocks
avg = np.zeros((blocks[0].shape), dtype=float)
for i in range(0, Nblocks-2):
avg += c_np1[i]*blocks[i+1]
return avg
def periodicCPS(self, N, channels=None, **kwargs):
"""
Compute Cross-Spectral Density based on periodic excitation. Uses noise
reduction by time-averaging the data.
"""
if channels is None:
channels = list(range(self.nchannels))
nchannels = len(channels)
window = Window.rectangular
ps = PowerSpectra(N, window)
avg = self.periodicAverage(N, channels, **kwargs)
CS = ps.compute(avg)
freq = getFreq(self.samplerate, N)
return freq, CS
@property
def sensitivity(self):
"""Sensitivity of the data in U^-1, from floating point data scaled
between -1.0 and 1.0 to Units [U].
If the sensitivity is not stored in the measurement file, this
function returns 1.0 for each channel
"""
return self._sens
@sensitivity.setter
def sensitivity(self, sens):
"""Set the sensitivity of the measurement in the file.
Args:
sens: sensitivity data, should be a float, or an array of floats
equal to the number of channels.
"""
if isinstance(sens, float):
# Put all sensitivities equal
2019-10-27 13:19:26 +00:00
sens = sens * np.ones(self.nchannels)
elif isinstance(sens, list):
sens = np.asarray(sens)
valid = sens.ndim == 1
valid &= sens.shape[0] == self.nchannels
valid &= sens.dtype == float
if not valid:
raise ValueError('Invalid sensitivity value(s) given')
with self.file('r+') as f:
f.attrs['sensitivity'] = sens
self._sens = sens
def checkOverflow(self):
"""Coarse check for overflow in measurement.
Return:
True if overflow is possible, else False
"""
with self.file() as f:
for block in self.iterBlocks(f):
dtype = block.dtype
if dtype.kind == 'i':
# minvalue = np.iinfo(dtype).min
maxvalue = np.iinfo(dtype).max
if np.max(np.abs(block)) >= 0.9*maxvalue:
return True
else:
# Cannot check for floating point values.
return False
return False
def exportAsWave(self, fn=None, force=False, newsampwidth=None, normalize=True):
"""Export measurement file as wave. In case the measurement data is
stored as floats, the values are scaled to the proper integer (PCM)
data format.
Args:
fn: If given, this will be the filename to write to. If the
filename does not end with '.wav', this extension is added.
force: If True, overwrites any existing files with the given name
, otherwise a RuntimeError is raised.
2019-12-08 13:29:12 +00:00
newsampwidth: sample width in bytes with which to export the data.
This should only be given in case the measurement data is stored as
2019-12-08 13:29:12 +00:00
floating point values, otherwise an error is thrown
normalize: If set: normalize the level to something sensible.
"""
if fn is None:
fn = self.fn
fn = os.path.splitext(fn)[0]
if os.path.splitext(fn)[1] != '.wav':
fn += '.wav'
if os.path.exists(fn) and not force:
raise RuntimeError(f'File already exists: {fn}')
2019-12-08 13:29:12 +00:00
data = self.rawData()
2019-12-08 13:29:12 +00:00
if normalize:
maxabs = np.max(np.abs(data), axis=0)
data /= maxabs[np.newaxis, :]
2019-12-08 13:29:12 +00:00
if newsampwidth is not None:
# Convert to floats, then to new sample width
data = scaleBlockSens(data, self.sensitivity**0)
2019-12-08 13:29:12 +00:00
if newsampwidth == 2:
newtype = np.int16
elif newsampwidth == 4:
newtype = np.int32
else:
raise ValueError('Invalid sample width, should be 2 or 4')
scalefac = 2**(8*(newsampwidth-1))-1
data = (data*scalefac).astype(newtype)
wavfile.write(fn, self.samplerate, data)
2019-12-08 13:29:12 +00:00
@staticmethod
2019-10-27 13:19:26 +00:00
def fromtxt(fn,
skiprows,
samplerate,
sensitivity,
mfn=None,
timestamp=None,
2019-10-27 13:19:26 +00:00
delimiter='\t',
firstcoltime=True):
"""Converts a txt file to a LASP Measurement file, opens the associated
Measurement object and returns it. The measurement file will have the
same file name as the txt file, except with h5 extension.
Args:
fn: Filename of text file
skiprows: Number of header rows in text file to skip
samplerate: Sampling frequency in [Hz]
sensitivity: 1D array of channel sensitivities
mfn: Filepath where measurement file is stored. If not given,
a h5 file will be created along fn, which shares its basename
timestamp: If given, a custom timestamp for the measurement
(integer containing seconds since epoch). If not given, the
timestamp is obtained from the last modification time.
delimiter: Column delimiter
firstcoltime: If true, the first column is the treated as the
sample time.
"""
if not os.path.exists(fn):
raise ValueError(f'File {fn} does not exist.')
if timestamp is None:
timestamp = os.path.getmtime(fn)
if mfn is None:
mfn = os.path.splitext(fn)[0] + '.h5'
2019-12-08 13:29:12 +00:00
else:
mfn = os.path.splitext(mfn)[0] + '.h5'
dat = np.loadtxt(fn, skiprows=skiprows, delimiter=delimiter)
if firstcoltime:
time = dat[:, 0]
2019-10-27 13:19:26 +00:00
if not np.isclose(time[1] - time[0], 1 / samplerate):
raise ValueError('Samplerate given does not agree with '
'samplerate in file')
2019-10-27 13:19:26 +00:00
# Chop off first column
dat = dat[:, 1:]
nchannels = dat.shape[1]
2019-10-27 13:19:26 +00:00
if nchannels != sensitivity.shape[0]:
raise ValueError(
f'Invalid sensitivity length given. Should be: {nchannels}')
with h5.File(mfn, 'w') as hf:
hf.attrs['samplerate'] = samplerate
hf.attrs['sensitivity'] = sensitivity
hf.attrs['time'] = timestamp
hf.attrs['blocksize'] = 1
hf.attrs['nchannels'] = nchannels
2019-10-27 13:19:26 +00:00
ad = hf.create_dataset('audio', (1, dat.shape[0], dat.shape[1]),
dtype=dat.dtype,
maxshape=(1, dat.shape[0], dat.shape[1]),
compression='gzip')
ad[0] = dat
return Measurement(mfn)
2019-10-27 13:19:26 +00:00
@staticmethod
def fromnpy(data,
samplerate,
sensitivity,
mfn,
timestamp=None):
"""Converts a numpy array to a LASP Measurement file, opens the
associated Measurement object and returns it. The measurement file will
have the same file name as the txt file, except with h5 extension.
2019-10-27 13:19:26 +00:00
Args:
data: Numpy array, first column is sample, second is channel. Can
2019-10-27 13:19:26 +00:00
also be specified with a single column for single-channel data
samplerate: Sampling frequency in [Hz]
sensitivity: 1D array of channel sensitivities [Pa^-1]
mfn: Filepath where measurement file is stored.
timestamp: If given, a custom timestamp for the measurement
(integer containing seconds since epoch). If not given, the
timestamp is obtained from the last modification time.
delimiter: Column delimiter
firstcoltime: If true, the first column is the treated as the
sample time.
"""
if os.path.splitext(mfn)[1] != '.h5':
mfn += '.h5'
2019-10-27 13:19:26 +00:00
if os.path.exists(mfn):
raise ValueError(f'File {mfn} already exist.')
if timestamp is None:
timestamp = int(time.time())
if data.ndim != 2:
data = data[:, np.newaxis]
try:
len(sensitivity)
except:
raise ValueError('Sensitivity should be given as array-like data type')
sensitivity = np.asarray(sensitivity)
2019-10-27 13:19:26 +00:00
nchannels = data.shape[1]
if nchannels != sensitivity.shape[0]:
raise ValueError(
f'Invalid sensitivity length given. Should be: {nchannels}')
with h5.File(mfn, 'w') as hf:
hf.attrs['samplerate'] = samplerate
hf.attrs['sensitivity'] = sensitivity
hf.attrs['time'] = timestamp
hf.attrs['blocksize'] = 1
hf.attrs['nchannels'] = nchannels
ad = hf.create_dataset('audio', (1, data.shape[0], data.shape[1]),
dtype=data.dtype,
maxshape=(1, data.shape[0], data.shape[1]),
compression='gzip')
ad[0] = data
return Measurement(mfn)
@staticmethod
def fromWaveFile(fn, newfn=None, force=False, timestamp=None):
"""Convert a measurement file to a wave file, and return the
measurement handle."""
if timestamp is None:
timestamp = int(time.time())
base_fn = os.path.splitext(fn)[0]
if newfn is None:
newfn = base_fn + '.h5'
if os.path.exists(newfn) and not force:
raise RuntimeError(f'Measurement file name {newfn} already exists in path, set "force" to true to overwrite')
samplerate, data = wavfile.read(fn)
if data.ndim == 2:
nframes, nchannels = data.shape
else:
nchannels = 1
nframes = len(data)
data = data[:, np.newaxis]
sensitivity = np.ones(nchannels)
with h5.File(newfn, 'w') as hf:
hf.attrs['samplerate'] = samplerate
hf.attrs['nchannels'] = nchannels
hf.attrs['time'] = timestamp
hf.attrs['blocksize'] = 1
hf.attrs['sensitivity'] = sensitivity
ad = hf.create_dataset('audio', (1, nframes, nchannels),
dtype=data.dtype,
maxshape=(1, nframes, nchannels),
compression='gzip')
ad[0] = data
return Measurement(newfn)