lasp/python_src/lasp/lasp_measurement.py

1221 lines
43 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
"""!
Author: J.A. de Jong - ASCEE
Description: Measurement class
The ASCEE hdf5 measurement file format contains the following fields:
- Attributes:
'LASP_VERSION_MAJOR': int The major version of LASP which which the recording has been performed.
'LASP_VERSION_MINOR': int The minor version of LASP which which the recording has been performed.
'samplerate': The audio data sample rate in Hz [float]
'nchannels': The number of audio channels in the file List[float]
'sensitivity': (Optionally) the stored sensitivity of the record channels.
This can be a single value, or a list of sensitivities for
each channel. Both representations are allowed. List[float]
For measurement files of LASP < v1.0
'qtys' : (Optionally): list of quantities that is recorded for each channel',
if this array is not found. Quantities are defaulted to 'Number / Full scale'
'type_int': A specified measurement type that can be used programmatically. It can be read out as an enumeration variant of type "MeasurementType". See code below of implemented measurement types.
For measurement files of LASP >= 1.0
- Datasets:
'audio': 3-dimensional array of blocks of audio data. The first axis is the
block index, the second axis the sample number and the third axis is the
channel number. The data type is either int16, int32 or float64 / float32. If
raw data is stored as integer values (int16, int32), the actual values should
be pre-scaled by its maximum positive number (2**(nb-1) - 1), such that the
corresponding 'number' lies between -1.0 and 1.0. To stay backwards-compatible, the dataset is always called 'audio' despite it being possible that other types of data is stored in the dataset (such as voltages, accelerations etc).
'video': 4-dimensional array of video frames. The first index is the frame
number, the second the x-value of the pixel and the third is the
y-value of the pixel. Then, the last axis is the color. This axis has
length 3 and the colors are stored as (r,g,b). Where typically a
color depth of 256 is used (np.uint8 data format)
The video dataset can possibly be not present in the data.
"""
__all__ = ["Measurement", "scaleBlockSens", "MeasurementType"]
from contextlib import contextmanager
from weakref import WeakValueDictionary
import h5py as h5
import uuid
import pathlib
import glob
import itertools
import numpy as np
from enum import Enum, unique
from .lasp_config import LASP_NUMPY_FLOAT_TYPE
from scipy.io import wavfile
import os, time, wave, logging
from .lasp_common import SIQtys, Qty, getFreq
from .lasp_version import LASP_VERSION_MAJOR, LASP_VERSION_MINOR
from .lasp_cpp import Window, DaqChannel, AvPowerSpectra
from typing import List
from functools import lru_cache
# Measurement file extension
MEXT = 'h5'
DOTMEXT = f'.{MEXT}'
@unique
class MeasurementType(Enum):
"""
Measurement flags related to the measurement. Stored as bit flags in the measurement file. This is for possible changes in the API later.
"""
# Not specific measurement type
NotSpecific = 0
# Measurement serves as an insertion loss reference measurement
ILReference = 1 << 0
# Measurement is general calibration measurement (to calibrate sensor in a certain way)
CALGeneral = 1 << 1
# Measurement serves as impedance tube calibration (short tube case / ref. plane at origin)
muZCalShort = 1 << 2
# Measurement serves as impedance tube calibration (long tube case)
muZCalLong = 1 << 3
# Series impedance reference
muZSeriesImpedanceRef = 1 << 4
def __str__(self):
match self:
case MeasurementType.NotSpecific: return '-'
case MeasurementType.ILReference: return 'Insertion loss reference'
case MeasurementType.CALGeneral: return 'General calibration'
case MeasurementType.muZCalShort: return 'ASCEE μZ short length calibration'
case MeasurementType.muZCalLong: return 'ASCEE μZ long length calibration'
case MeasurementType.muZSeriesImpedanceRef: return 'ASCEE μZ series impedance empty reference'
case _: raise ValueError("Not a MeasurementType")
def getSampWidth(dtype):
"""Returns the width of a single sample in **bytes**.
Args:
dtype: numpy dtype
Returns:
Size of a sample in bytes (int)
"""
if dtype in (np.int32, np.float32):
return 4
elif dtype == np.int16:
return 2
elif dtype == np.float64:
return 8
else:
raise ValueError("Invalid data type: %s" % dtype)
def scaleBlockSens(block, sens):
"""Scale a block of raw data to return raw acoustic pressure data.
Args:
block: block of raw data with integer data type
sens: array of sensitivity coeficients for
each channel.
"""
sens = np.asarray(sens)
assert sens.size == block.shape[1]
if np.issubdtype(block.dtype.type, np.integer):
sw = getSampWidth(block.dtype)
fac = 2 ** (8 * sw - 1) - 1
else:
fac = 1.0
return block.astype(LASP_NUMPY_FLOAT_TYPE) / fac / sens[np.newaxis,:]
class IterRawData:
"""Iterate over stored blocks if the raw measurement data of a h5 file."""
def __init__(self, f, channels, **kwargs):
"""Initialize a BlockIter object.
Args:
f: Audio dataset in the h5 file, accessed as f['audio']
channels: list of channel indices to use
istart: index of first sample
istop: index of last sample (not including istop)
"""
assert isinstance(channels, list)
fa = f["audio"]
self.fa = fa
self.i = 0
nblocks = fa.shape[0]
blocksize = fa.shape[1]
self.blocksize = blocksize
# nchannels = fa.shape[2]
self.channels = channels
self.istart = kwargs.pop("istart", 0)
self.istop = kwargs.pop("istop", blocksize * nblocks)
self.firstblock = self.istart // blocksize
self.lastblock = self.istop // blocksize
if self.istop % blocksize == 0:
self.lastblock -= 1
self.firstblock_start_offset = self.istart % blocksize
if self.istop < 0:
self.istop += blocksize * nblocks
if self.istop <= self.istart:
raise ValueError("Stop index is smaller than start index")
if self.istop != blocksize * nblocks:
self.lastblock_stop_offset = self.istop % blocksize
else:
self.lastblock_stop_offset = blocksize
def __iter__(self):
return self
def __next__(self):
"""Return the next block."""
fa = self.fa
# nblocks_to_return = self.lastblock-self.firstblock+1
block = self.firstblock + self.i
if block > self.lastblock:
raise StopIteration
if block == self.firstblock:
start_offset = self.firstblock_start_offset
else:
start_offset = 0
if block == self.lastblock:
stop_offset = self.lastblock_stop_offset
else:
stop_offset = self.blocksize
# print(f'block: {block}, starto: {start_offset}, stopo {stop_offset}')
self.i += 1
return fa[block, start_offset:stop_offset,:][:, self.channels]
class IterData(IterRawData):
"""
Iterate over blocks of data, scaled with sensitivity and integer scaling
between 0 and 1
"""
def __init__(self, fa, channels, sensitivity, **kwargs):
super().__init__(fa, channels, **kwargs)
self.sens = np.asarray(sensitivity)[self.channels]
assert self.sens.ndim == 1
def __next__(self):
nextraw = super().__next__()
return scaleBlockSens(nextraw, self.sens)
class Measurement:
"""Provides access to measurement data stored in the h5 measurement file
format."""
# Store a dict of open measurements, with uuid string as a key. We store them as a weak ref.
uuid_s = WeakValueDictionary()
def __init__(self, fn):
"""Initialize a Measurement object based on the filename."""
# Add extension if tried to open without exension
if DOTMEXT not in fn:
fn += DOTMEXT
# Full filepath
self.fn = fn
# Open the h5 file in read-plus mode, to allow for changing the
# measurement comment.
with h5.File(fn, "r") as f:
# Check for video data
try:
f["video"]
self.has_video = True
except KeyError:
self.has_video = False
self.nblocks, self.blocksize, self.nchannels = f["audio"].shape
dtype = f["audio"].dtype
self.dtype = dtype
self.sampwidth = getSampWidth(dtype)
self.samplerate = f.attrs["samplerate"]
self.N = self.nblocks * self.blocksize
self.T = self.N / self.samplerate
try:
self.version_major = f.attrs["LASP_VERSION_MAJOR"]
self.version_minor = f.attrs["LASP_VERSION_MINOR"]
except KeyError:
# No version information stored
self.version_major = 0
self.version_minor = 1
try:
# Try to catch UUID (Universally Unique IDentifier)
self._UUID = f.attrs['UUID']
# Flag indicating we have to add a new UUID
create_new_uuid = False
except KeyError:
create_new_uuid = True
try:
# UUID of the reference measurement. Should be stored as
# a lists of tuples, where each tuple is a combination of (<MeasurementType.value>, <uuid_string>, <last_filename>).
# The last filename is a filename that *probably* is the reference measurement with
# given UUID. If it is not, we will search for it in the same directory as `this` measurement.
# If we cannot find it there, we will give up, and remove the field corresponding to this reference measurement type.
refMeas_list = f.attrs['refMeas']
# Build a tuple string from it
self._refMeas = {}
for (key, val, name) in refMeas_list:
self._refMeas[MeasurementType(int(key))] = (val, name)
except KeyError:
self._refMeas = {}
try:
self._type_int = f.attrs['type_int']
except KeyError:
self._type_int = 0
# Due to a previous bug, the channel names were not stored
# consistently, i.e. as 'channel_names' and later camelcase.
try:
self._channelNames = f.attrs["channelNames"]
except KeyError:
try:
self._channelNames = f.attrs["channel_names"]
logging.info(
"Measurement file obtained which stores channel names with *old* attribute 'channel_names'"
)
except KeyError:
# No channel names found in measurement file
logging.info("No channel name data found in measurement")
self._channelNames = [f"Unnamed {i}" for i in range(self.nchannels)]
# comment = read-write thing
if "comment" in f.attrs:
self._comment = f.attrs["comment"]
else:
self._comment = ""
# Sensitivity
try:
sens = f.attrs["sensitivity"]
self._sens = (
sens * np.ones(self.nchannels) if isinstance(sens, float) else sens
)
except KeyError:
self._sens = np.ones(self.nchannels)
# The time is cached AND ALWAYS ASSUMED TO BE AN IMMUTABLE OBJECT.
# It is also cached. Changing the measurement timestamp should not
# be done.
self._time = f.attrs["time"]
# Quantity stored as channel.
self._qtys = None
try:
qtys_enum_idx = f.attrs["qtys_enum_idx"]
self._qtys = [SIQtys.fromInt(idx) for idx in qtys_enum_idx]
except KeyError:
try:
qtys_json = f.attrs["qtys"]
# Load quantity data
self._qtys = [Qty.from_json(qty_json) for qty_json in qtys_json]
except KeyError:
# If quantity data is not available, this is an 'old'
# measurement file.
pass
if self._qtys is None:
self._qtys = [SIQtys.default() for i in range(self.nchannels)]
logging.debug(
f"Physical quantity data not available in measurement file. Assuming {SIQtys.default}"
)
if create_new_uuid:
# Create and store a random UUID based on *now* and store it forever
# inside of the Measurement file
self.genNewUUID()
else:
if self.UUID in Measurement.uuid_s.keys():
raise RuntimeError(f"Measurement '{self.name}' is already opened. Cannot open measurement twice. Note: this error can happen when measurements are manually copied.")
# Store weak reference to 'self' in list of UUID's. They are removed when no file is open anymore
Measurement.uuid_s[self._UUID] = self
def rename(self, newname: str):
"""
Try to rename the measurement file.
Args:
newname: New name, with or without extension
"""
_ , ext = os.path.splitext(newname)
# Add proper extension if new name is given without extension.
if ext != DOTMEXT:
newname = newname + DOTMEXT
# Folder, Base filename + extension
folder, _ = os.path.split(self.fn)
newname_full = str(pathlib.Path(folder) / newname)
os.rename(self.fn, newname_full)
self.fn = newname_full
def genNewUUID(self):
"""
Create new UUID for measurement and store in file.
"""
self.setAttribute('UUID', str(uuid.uuid1()))
@property
def UUID(self):
"""
Universally unique identifier
"""
return self._UUID
def getRefMeas(self, mtype: MeasurementType):
"""
Return corresponding reference measurement, if configured and can be found. If the reference
measurement is currently not open, it tries to open it by traversing other measurement
files in the current directory. Throws a runtime error in case the reference measurement cannot be found.
Throws a ValueError when the reference measurement is not configured.
"""
# See if we can find the UUID for the required measurement type
try:
required_uuid, possible_name = self._refMeas[mtype]
except KeyError:
raise ValueError(f"No reference measurement configured for '{self.name}'")
m = None
# Try to find it in the dictionary of of open measurements
if required_uuid in Measurement.uuid_s.keys():
m = Measurement.uuid_s[required_uuid]
logging.debug(f'Returned reference measurement {m.name} from list of open measurements')
# Not found in list of openend measurements. See if we can open it using its last stored file name we know of
if m is None:
try:
m = Measurement(possible_name)
if m.UUID == required_uuid:
logging.info(f'Opened reference measurement {m.name} by name')
except Exception as e:
logging.error(f'Could not find reference measurement using file name: {possible_name}')
# Last resort, see if we can find the right measurement in the same folder
if m is None:
try:
folder, _ = os.path.split(self.fn)
m = Measurement.fromFolderWithUUID(required_uuid, folder, skip=[self.name])
logging.info('Found reference measurement in folder with correct UUID. Updating name of reference measurement')
# Update the measurement file name in the list, such that next time it
# can be opened just by its name.
self.setRefMeas(m)
except:
logging.error("Could not find the reference measurement. Is it deleted?")
# Well, we found it. Now make sure the reference measurement actually has the right type (User could have marked it as a NotSpecific for example in the mean time).
if m is not None:
if m.measurementType() != mtype:
m.removeRefMeas(mtype)
raise RuntimeError(f"Reference measurement for {self.name} is not a proper reference (anymore).")
# Whow, we passed all security checks, here we go!
return m
else:
# Nope, not there.
raise RuntimeError(f"Could not find the reference measurement for '{self.name}'. Is it deleted?")
def removeRefMeas(self, mtype: MeasurementType):
"""
Remove an existing reference measurement of specified type from this measurement. Silently ignores this
action if no reference measurement of this type is configured.
"""
try:
del self._refMeas[mtype]
self.__storeReafMeas()
except KeyError:
pass
def __storeReafMeas(self):
"""
Internal method that syncs the dictionary of reference methods with the backing HDF5 file
"""
with self.file("r+") as f:
# Update attribute in file. Stored as a flat list of string tuples:
# [(ref_value1, uuid_1, name_1), (ref_value2, uuid_2, name_2), ...]
reflist = list((str(key.value), val1, val2) for key, (val1, val2) in self._refMeas.items())
# print(reflist)
f.attrs['refMeas'] = reflist
def setRefMeas(self, m: Measurement):
"""
Set a reference measurement for the given measurement. If this measurement is already
a reference measurement, the previous reference measurement type is overwritten, such that there is
only one measurement that is the reference of a certain 'MeasurementType'
"""
mtype = m.measurementType()
if mtype == MeasurementType.NotSpecific:
raise ValueError('Measurement to be set as reference is not a reference measurement')
self._refMeas[mtype] = (m.UUID, m.name)
self.__storeReafMeas()
@staticmethod
def fromFolderWithUUID(uuid_str: str, folder: str='', skip=[]):
"""
Returns Measurement object from a given UUID string. It first tries to find whether there
is an uuid in the static list of weak references. If not, it will try to open files in
the current file path.
"""
for fn in glob.glob(str(pathlib.Path(folder)) + f'/*{DOTMEXT}'):
# Do not try to open this file in case it is in the 'skip' list.
if len(list(filter(lambda a: a in fn, skip))) > 0:
continue
try:
m = Measurement(fn)
if m.UUID == uuid_str:
# Update 'last_fn' attribute in dict of stored reference measurements
return m
except Exception as e:
logging.error(f'Possible measurement file {fn} returned error {e} when opening.')
raise RuntimeError(f'Measurement with UUID {uuid_str} could not be found.')
def setAttribute(self, attrname: str, value):
"""
Set an attribute in the measurement file, and keep a local copy in
memory for efficient accessing.
Args:
attrname: name of attribute, a string
value: the value. Should be anything that can be stored as an attribute in HDF5.
"""
with self.file("r+") as f:
# Update comment attribute in the file
f.attrs[attrname] = value
setattr(self, "_" + attrname, value)
def isType(self, type_: MeasurementType):
"""
Returns True when a measurement is flagged as being of a certaint "MeasurementType"
"""
if (type_.value & self._type_int):
return True
elif type_.value == self._type_int == 0:
return True
return False
def setType(self, type_: MeasurementType):
"""
Set the measurement type to given type
"""
self.setAttribute('type_int', type_.value)
def measurementType(self):
"""
Returns type of measurement
"""
return MeasurementType(self._type_int)
@property
def name(self):
"""Returns filename base without extension."""
_, fn = os.path.split(self.fn)
return os.path.splitext(fn)[0]
@property
def channelNames(self):
return self._channelNames
@channelNames.setter
def channelNames(self, newchnames):
"""
Returns list of the names of the channels
"""
if len(newchnames) != self.nchannels:
raise RuntimeError("Invalid length of new channel names")
self.setAttribute("channelNames", newchnames)
@property
def channelConfig(self):
"""
Returns list of current channel configuration data.
"""
chcfg = []
for chname, sens, qty in zip(self.channelNames, self.sensitivity, self.qtys):
ch = DaqChannel()
ch.enabled = True
ch.name = chname
ch.sensitivity = sens
ch.qty = qty.cpp_enum
chcfg.append(ch)
return chcfg
@channelConfig.setter
def channelConfig(self, chcfg: List[DaqChannel]):
"""
Set new channel configuration from list of DaqChannel objects.
Use cases:
- Update channel types, sensitivities etc.
Args:
chchfg: New channel configuration
"""
if len(chcfg) != self.nchannels:
raise RuntimeError("Invalid number of channels")
chname = []
sens = []
qtys = []
for ch in chcfg:
chname.append(ch.name)
sens.append(ch.sensitivity)
qtys.append(SIQtys.fromCppEnum(ch.qty))
self.channelNames = chname
self.sensitivity = sens
self.qtys = qtys
@property
def qtys(self):
return self._qtys
@qtys.setter
def qtys(self, newqtys):
if not len(newqtys) == len(self._qtys):
raise ValueError("Invalid number of quantities")
qtys_int = [qty.toInt() for qty in newqtys]
# Use setAttribute here, but thos store the jsonified version as well,
# which we have to overwrite again with the deserialized ones. This is
# actually not a very nice way of coding.
with self.file("r+") as f:
# Update comment attribute in the file
f.attrs["qtys_enum_idx"] = qtys_int
self._qtys = newqtys
@contextmanager
def file(self, mode="r"):
"""Contextmanager which opens the storage file and yields the file.
Args:
mode: Opening mode for the file. Should either be 'r', or 'r+'
"""
if mode not in ("r", "r+"):
raise ValueError("Invalid file opening mode.")
with h5.File(self.fn, mode) as f:
yield f
@property
def comment(self):
"""Return the measurement comment.
Returns:
The measurement comment (text string)
"""
return self._comment
@comment.setter
def comment(self, cmt):
"""Set the measurement comment.
Args:
cmt: Comment text string to set
"""
with self.file("r+") as f:
# Update comment attribute in the file
f.attrs["comment"] = cmt
self._comment = cmt
@property
@lru_cache()
def recTime(self):
"""Returns the total recording time of the measurement, in float
seconds."""
return self.blocksize * self.nblocks / self.samplerate
@property
def time(self):
"""Returns the measurement time in seconds since the epoch."""
return self._time
@property
@lru_cache()
def timestr(self):
"""
Return a properly formatted string of the measurement time, in order of
year-month-day hour etc.
"""
time_struct = time.localtime(self.time)
time_string = time.strftime("%Y-%m-%d %H:%M:%S", time_struct)
return time_string
def rms(self, channels=None, substract_average=False):
"""Returns the root mean square values for each channel
Args:
channels: list of channels
substract_average: If set to true, computes the rms of only the
oscillating component of the signal, which is in fact the signal
variance.
Returns:
1D array with rms values for each channel
"""
meansquare = 0.0 # Mean square of the signal, including its average
sum_ = 0.0 # Sumf of the values of the signal, used to compute average
N = 0
with self.file() as f:
for block in self.iterData(channels):
Nblock = block.shape[0]
sum_ += np.sum(block, axis=0)
N += Nblock
meansquare += np.sum(block ** 2, axis=0) / self.N
avg = sum_ / N
# In fact, this is not the complete RMS, as in includes the DC
# If p = p_dc + p_osc, then rms(p_osc) = sqrt(ms(p)-ms(p_dc))
if substract_average:
meansquare -= avg ** 2
rms = np.sqrt(meansquare)
return rms
def variance(self, channels=None):
return self.rms(substract_average=True)
def rawData(self, channels=None, **kwargs):
"""Returns the raw data as stored in the measurement file,
without any transformations applied
args:
channels: list, or tuple of channel numbers to export. If not defined, all
channels in the measurement are returned
returns:
Numpy array with data. The first axis is always the time instance,
the second axis the channel number.
"""
if channels is None:
channels = list(range(self.nchannels))
rawdata = []
with self.file() as f:
for block in IterRawData(f, channels, **kwargs):
rawdata.append(block)
return np.concatenate(rawdata, axis=0)
def iterData(self, channels, **kwargs):
sensitivity = kwargs.pop("sensitivity", self.sensitivity)
if channels is None:
channels = list(range(self.nchannels))
with self.file() as f:
for block in IterData(f, channels, sensitivity, **kwargs):
yield block
def data(self, channels=None, **kwargs):
"""
Returns the measurement data, scaled and sensitivity applied.
"""
data = []
for d in self.iterData(channels, **kwargs):
data.append(d)
return np.concatenate(data, axis=0)
def CPS(self, channels=None, **kwargs):
"""
Compute single-sided Cross-Power-Spectrum of the measurement channels
Args:
channels: Channels to compute for (numbers)
Optional arguments:
nfft: FFT length
window: Window type
overlap: Overlap percentage (value between 0.0 and up to and
including 100.0)
weighting:
Returns:
Cross-power-spectra. C[freq, ch_i, ch_j] = C_ij
"""
nfft = kwargs.pop("nfft", 2048)
window = kwargs.pop("windowType", Window.WindowType.Hann)
overlap = kwargs.pop("overlap", 50.0)
if channels is None:
channels = list(range(self.nchannels))
nchannels = len(channels)
aps = AvPowerSpectra(nfft, window, overlap)
freq = getFreq(self.samplerate, nfft)
for data in self.iterData(channels, **kwargs):
CS = aps.compute(data)
return freq, aps.get_est()
def periodicAverage(self, N, channels=None, noiseCorrection=True, **kwargs):
"""
Return the (coherent) periodic average the measurement. This method is
useful for a situation of periodic excitation.
Args:
N: The number of samples in one period. This value should
correspond with the period of the excitation!
noiseCorrection: whether to apply coherent averaging, according to
the Sliding Window correlation method (SWiC): Telle et al.: A Novel
Approach for Impulse Response Measurements with Time-Varying Noise.
If set to False, just the arithmetic average is used.
"""
# Create blocks of equal length N
Ntot = self.N
Nblocks = Ntot // N
# TODO: This method graps the whole measurement file into memory. Can
# only be done with relatively small measurement files.
signal = self.data(channels)
# Estimate noise power in block
blocks = [signal[i * N: (i + 1) * N] for i in range(Nblocks)]
if noiseCorrection:
# The difference between the measured signal in the previous block and
# the current block
en = [None] + [blocks[i] - blocks[i - 1] for i in range(1, Nblocks)]
noise_est = [None] + [
-np.average(en[i] * en[i + 1]) for i in range(1, len(en) - 1)
]
# Create weighting coefficients
sum_inverse_noise = sum([1 / n for n in noise_est[1:]])
c_n = [1 / (ni * sum_inverse_noise) for ni in noise_est[1:]]
else:
c_n = [1 / (Nblocks - 2)] * (Nblocks - 2)
assert np.isclose(sum(c_n), 1.0)
assert Nblocks - 2 == len(c_n)
# Average signal over blocks
avg = np.zeros((blocks[0].shape), dtype=float)
for n in range(0, Nblocks - 2):
avg += c_n[n] * blocks[n + 1]
return avg
def periodicCPS(self, N, channels=None, **kwargs):
"""
Compute Cross-Spectral Density based on periodic excitation. Uses noise
reduction by time-averaging the data.
"""
if channels is None:
channels = list(range(self.nchannels))
nchannels = len(channels)
window = Window.rectangular
ps = PowerSpectra(N, window)
avg = np.asfortranarray(self.periodicAverage(N, channels, **kwargs))
CS = ps.compute(avg)
freq = getFreq(self.samplerate, N)
return freq, CS
@property
def sensitivity(self):
"""Sensitivity of the data in U^-1, from floating point data scaled
between -1.0 and 1.0 to Units [U].
If the sensitivity is not stored in the measurement file, this
function returns 1.0 for each channel
"""
return self._sens
@sensitivity.setter
def sensitivity(self, sens):
"""Set the sensitivity of the measurement in the file.
Args:
sens: sensitivity data, should be a float, or an array of floats
equal to the number of channels.
"""
if isinstance(sens, float):
# Put all sensitivities equal
sens = sens * np.ones(self.nchannels)
elif isinstance(sens, list):
sens = np.asarray(sens)
valid = sens.ndim == 1
valid &= sens.shape[0] == self.nchannels
valid &= sens.dtype == float
if not valid:
raise ValueError("Invalid sensitivity value(s) given")
with self.file("r+") as f:
f.attrs["sensitivity"] = sens
self._sens = sens
def checkOverflow(self, channels):
"""Coarse check for overflow in measurement.
Return:
True if overflow is possible, else False
"""
for block in self.iterData(channels):
dtype = block.dtype
if dtype.kind == "i":
# minvalue = np.iinfo(dtype).min
maxvalue = np.iinfo(dtype).max
if np.max(np.abs(block)) >= 0.9 * maxvalue:
return True
else:
# Cannot check for floating point values.
return False
return False
def exportAsWave(self, fn=None, force=False, dtype=None, normalize=False, **kwargs):
"""Export measurement file as wave. In case the measurement data is
stored as floats, the values are scaled to the proper integer (PCM)
data format.
Args:
fn: If given, this will be the filename to write to. If the
filename does not end with '.wav', this extension is added.
force: If True, overwrites any existing files with the given name
, otherwise a RuntimeError is raised.
dtype: if not None, convert data to this data type.
Options are 'int16', 'int32', 'float32'.
normalize: If set: normalize the level to something sensible.
"""
if fn is None:
fn = self.fn
fn = os.path.splitext(fn)[0]
if os.path.splitext(fn)[1] != ".wav":
fn += ".wav"
if os.path.exists(fn) and not force:
raise RuntimeError(f"File already exists: {fn}")
if not np.isclose(self.samplerate % 1, 0):
raise RuntimeError(
f"Sample rates should be approximately integer for exporting to Wave to work"
)
# TODO: With VERY large measurment files, this is not possible! Is this
# a theoretical case?
# TODO: add sensitivity? Then use self.data() instead of self.rawData()
data = self.rawData(**kwargs)
if normalize:
# Scale back to maximum of absolute value
maxabs = np.max(np.abs(data))
data = data / maxabs # "data /= maxabs" fails if dtpyes differ
if dtype == None:
dtype = data.dtype # keep existing
logging.debug(f"dtype not passed as arg; using dtype = {dtype}")
# dtype conversion
if dtype == "int16":
newtype = np.int16
newsampwidth = 2
elif dtype == "int32":
newtype = np.int32
newsampwidth = 4
elif dtype == "float32":
newtype = np.float32
elif dtype == "float64":
newtype = np.float64
else:
logging.debug(f"cannot handle this dtype {dtype}")
pass
# Convert range to [-1, 1]
# TODO: this is wrong for float data where full scale > 1
sensone = np.ones_like(self.sensitivity)
data = scaleBlockSens(data, sensone)
if dtype == "int16" or dtype == "int32":
# Scale data to integer range and convert to integers
scalefac = 2 ** (8 * newsampwidth - 1) - 1
data = (data * scalefac).astype(newtype)
wavfile.write(fn, int(self.samplerate), data.astype(newtype))
@staticmethod
def fromFile(fn):
"""
Try to open measurement from a given file name. First checks
whether the measurement is already open. Otherwise it might
happen that a Measurement object is created twice for the same backing file, which we do not allow.
"""
# See if the base part of the filename is referring to a file that is already open
with h5.File(fn, 'r') as f:
try:
theuuid = f.attrs['UUID']
except KeyError:
# No UUID stored in measurement. This is an old measurement that did not have UUID's
# We create a new UUID here such that the file is opened from the filesystem
# anyhow.
theuuid = str(uuid.uuid1())
if theuuid in Measurement.uuid_s.keys():
return Measurement.uuid_s[theuuid]
return Measurement(fn)
@staticmethod
def fromtxt(
fn,
skiprows,
samplerate,
sensitivity,
mfn=None,
timestamp=None,
delimiter="\t",
firstcoltime=True,
):
"""Converts a txt file to a LASP Measurement file, opens the associated
Measurement object and returns it. The measurement file will have the
same file name as the txt file, except with h5 extension.
Args:
fn: Filename of text file
skiprows: Number of header rows in text file to skip
samplerate: Sampling frequency in [Hz]
sensitivity: 1D array of channel sensitivities
mfn: Filepath where measurement file is stored. If not given,
a h5 file will be created along fn, which shares its basename
timestamp: If given, a custom timestamp for the measurement
(integer containing seconds since epoch). If not given, the
timestamp is obtained from the last modification time.
delimiter: Column delimiter
firstcoltime: If true, the first column is the treated as the
sample time.
"""
if not os.path.exists(fn):
raise ValueError(f"File {fn} does not exist.")
if timestamp is None:
timestamp = os.path.getmtime(fn)
if mfn is None:
mfn = os.path.splitext(fn)[0] + DOTMEXT
else:
mfn = os.path.splitext(mfn)[0] + DOTMEXT
dat = np.loadtxt(fn, skiprows=skiprows, delimiter=delimiter)
if firstcoltime:
time = dat[:, 0]
if not np.isclose(time[1] - time[0], 1 / samplerate):
raise ValueError(
"Samplerate given does not agree with " "samplerate in file"
)
# Chop off first column
dat = dat[:, 1:]
nchannels = dat.shape[1]
if nchannels != sensitivity.shape[0]:
raise ValueError(
f"Invalid sensitivity length given. Should be: {nchannels}"
)
with h5.File(mfn, "w") as hf:
hf.attrs["samplerate"] = samplerate
hf.attrs["sensitivity"] = sensitivity
hf.attrs["time"] = timestamp
hf.attrs["blocksize"] = 1
hf.attrs["nchannels"] = nchannels
ad = hf.create_dataset(
"audio",
(1, dat.shape[0], dat.shape[1]),
dtype=dat.dtype,
maxshape=(1, dat.shape[0], dat.shape[1]),
compression="gzip",
)
ad[0] = dat
return Measurement(mfn)
@staticmethod
def fromnpy(
data,
samplerate,
sensitivity,
mfn,
timestamp=None,
qtys: List[SIQtys]=None,
channelNames: List[str]=None,
force=False,
) -> Measurement:
"""
Converts a numpy array to a LASP Measurement file, opens the
associated Measurement object and returns it. The measurement file will
have the same file name as the txt file, except with h5 extension.
Args:
data: Numpy array, first column is sample, second is channel. Can
also be specified with a single column for single-channel data.
samplerate: Sampling frequency in [Hz]
sensitivity: 1D array of channel sensitivities in [U^-1], where U is
the recorded unit.
mfn: Filepath of the file where the data is stored.
timestamp: If given, a custom timestamp for the measurement
(integer containing seconds since epoch).
qtys: If a list of physical quantity data is given here
channelNames: Name of the channels
force: If True, overwrites existing files with specified `mfn`
name.
"""
if os.path.splitext(mfn)[1] != DOTMEXT:
mfn += DOTMEXT
if os.path.exists(mfn) and not force:
raise ValueError(f"File {mfn} already exist.")
if timestamp is None:
timestamp = int(time.time())
if data.ndim != 2:
data = data[:, np.newaxis]
try:
len(sensitivity)
except:
raise ValueError("Sensitivity should be given as array-like data type")
sensitivity = np.asarray(sensitivity)
nchannels = data.shape[1]
if nchannels != sensitivity.shape[0]:
raise ValueError(
f"Invalid sensitivity length given. Should be: {nchannels}"
)
if channelNames is not None:
if len(channelNames) != nchannels:
raise RuntimeError("Illegal length of channelNames list given")
if qtys is None:
qtys = [SIQtys.AP] * nchannels
else:
if len(qtys) != nchannels:
raise RuntimeError("Illegal length of qtys list given")
with h5.File(mfn, "w") as hf:
hf.attrs["samplerate"] = samplerate
hf.attrs["sensitivity"] = sensitivity
hf.attrs["time"] = timestamp
hf.attrs["blocksize"] = 1
hf.attrs["nchannels"] = nchannels
# Add physical quantity indices
hf.attrs['qtys_enum_idx'] = [qty.toInt() for qty in qtys]
# Add channel names in case given
if channelNames is not None:
hf.attrs["channelNames"] = channelNames
ad = hf.create_dataset(
"audio",
(1, data.shape[0], data.shape[1]),
dtype=data.dtype,
maxshape=(1, data.shape[0], data.shape[1]),
compression="gzip",
)
ad[0] = data
return Measurement(mfn)
@staticmethod
def fromWaveFile(fn, newfn=None, force=False, timestamp=None):
"""Convert a measurement file to a wave file, and return the
measurement handle."""
if timestamp is None:
timestamp = int(time.time())
base_fn = os.path.splitext(fn)[0]
if newfn is None:
newfn = base_fn + ".h5"
if os.path.exists(newfn) and not force:
raise RuntimeError(
f'Measurement file name {newfn} already exists in path, set "force" to true to overwrite'
)
samplerate, data = wavfile.read(fn)
if data.ndim == 2:
nframes, nchannels = data.shape
else:
nchannels = 1
nframes = len(data)
data = data[:, np.newaxis]
sensitivity = np.ones(nchannels)
with h5.File(newfn, "w") as hf:
hf.attrs["samplerate"] = samplerate
hf.attrs["nchannels"] = nchannels
hf.attrs["time"] = timestamp
hf.attrs["blocksize"] = 1
hf.attrs["sensitivity"] = sensitivity
ad = hf.create_dataset(
"audio",
(1, nframes, nchannels),
dtype=data.dtype,
maxshape=(1, nframes, nchannels),
compression="gzip",
)
ad[0] = data
return Measurement(newfn)