#!/usr/bin/env python3 # -*- coding: utf-8 -*- """! Author: J.A. de Jong - ASCEE Description: Measurement class The ASCEE hdf5 measurement file format contains the following fields: - Attributes: 'version': If not given, version 1 is assumed. For version 1, measurement data is assumed to be acoustic data. 'samplerate': The audio data sample rate in Hz. 'nchannels': The number of audio channels in the file 'sensitivity': (Optionally) the stored sensitivity of the record channels. This can be a single value, or a list of sensitivities for each channel. Both representations are allowed. For measurement files of LASP < v1.0 'qtys' : (Optionally): list of quantities that is recorded for each channel', if this array is not found. Quantities are defaulted to 'Number / Full scale' For measurement files of LASP >= 1.0 - Datasets: 'audio': 3-dimensional array of blocks of audio data. The first axis is the block index, the second axis the sample number and the third axis is the channel number. The data type is either int16, int32 or float64 / float32. If raw data is stored as integer values (int16, int32), the actual values should be pre-scaled by its maximum positive number (2**(nb-1) - 1), such that the corresponding 'number' lies between -1.0 and 1.0. 'video': 4-dimensional array of video frames. The first index is the frame number, the second the x-value of the pixel and the third is the y-value of the pixel. Then, the last axis is the color. This axis has length 3 and the colors are stored as (r,g,b). Where typically a color depth of 256 is used (np.uint8 data format) The video dataset can possibly be not present in the data. """ __all__ = ['Measurement', 'scaleBlockSens'] from contextlib import contextmanager import h5py as h5 import numpy as np from .lasp_config import LASP_NUMPY_FLOAT_TYPE from scipy.io import wavfile import os, time, wave, logging from .lasp_common import SIQtys, Qty, getFreq from .lasp_cpp import Window, DaqChannel, LASP_VERSION_MAJOR from typing import List def getSampWidth(dtype): """Returns the width of a single sample in **bytes**. Args: dtype: numpy dtype Returns: Size of a sample in bytes (int) """ if dtype in (np.int32, np.float32): return 4 elif dtype == np.int16: return 2 elif dtype == np.float64: return 8 else: raise ValueError('Invalid data type: %s' % dtype) def scaleBlockSens(block, sens): """Scale a block of raw data to return raw acoustic pressure data. Args: block: block of raw data with integer data type sens: array of sensitivity coeficients for each channel. """ sens = np.asarray(sens) assert sens.size == block.shape[1] if np.issubdtype(block.dtype.type, np.integer): sw = getSampWidth(block.dtype) fac = 2**(8 * sw - 1) - 1 else: fac = 1. return block.astype(LASP_NUMPY_FLOAT_TYPE) / fac / sens[np.newaxis, :] class IterRawData: """Iterate over stored blocks if the raw measurement data of a h5 file.""" def __init__(self, f, channels, **kwargs): """Initialize a BlockIter object. Args: f: Audio dataset in the h5 file, accessed as f['audio'] channels: list of channel indices to use istart: index of first sample istop: index of last sample (not including istop) """ assert isinstance(channels, list) fa = f['audio'] self.fa = fa self.i = 0 nblocks = fa.shape[0] blocksize = fa.shape[1] self.blocksize = blocksize nchannels = fa.shape[2] self.channels = channels self.istart = kwargs.pop('istart', 0) self.istop = kwargs.pop('istop', blocksize*nblocks) self.firstblock = self.istart // blocksize self.lastblock = self.istop // blocksize if self.istop % blocksize == 0: self.lastblock -= 1 self.firstblock_start_offset = self.istart % blocksize if self.istop < 0: self.istop += blocksize*nblocks if self.istop <= self.istart: raise ValueError('Stop index is smaller than start index') if self.istop != blocksize*nblocks: self.lastblock_stop_offset = self.istop % blocksize else: self.lastblock_stop_offset = blocksize def __iter__(self): return self def __next__(self): """Return the next block.""" fa = self.fa nblocks_to_return = self.lastblock-self.firstblock+1 block = self.firstblock + self.i if block > self.lastblock: raise StopIteration if block == self.firstblock: start_offset = self.firstblock_start_offset else: start_offset = 0 if block == self.lastblock: stop_offset = self.lastblock_stop_offset else: stop_offset = self.blocksize # print(f'block: {block}, starto: {start_offset}, stopo {stop_offset}') self.i += 1 return self.fa[block, start_offset:stop_offset, :][:, self.channels] class IterData(IterRawData): """ Iterate over blocks of data, scaled with sensitivity and integer scaling between 0 and 1 """ def __init__(self, fa, channels, sensitivity, **kwargs): super().__init__(fa, channels, **kwargs) self.sens = np.asarray(sensitivity)[self.channels] assert self.sens.ndim == 1 def __next__(self): nextraw = super().__next__() return scaleBlockSens(nextraw, self.sens) class Measurement: """Provides access to measurement data stored in the h5 measurement file format.""" def __init__(self, fn): """Initialize a Measurement object based on the filename.""" if '.h5' not in fn: fn += '.h5' # Full filepath self.fn = fn # Base filename self.fn_base = os.path.split(fn)[1] # Open the h5 file in read-plus mode, to allow for changing the # measurement comment. with h5.File(fn, 'r+') as f: # Check for video data try: f['video'] self.has_video = True except KeyError: self.has_video = False self.nblocks, self.blocksize, self.nchannels = f['audio'].shape dtype = f['audio'].dtype self.dtype = dtype self.sampwidth = getSampWidth(dtype) self.samplerate = f.attrs['samplerate'] self.N = (self.nblocks * self.blocksize) self.T = self.N / self.samplerate try: self.version_major = f.attrs['LASP_VERSION_MAJOR'] self.version_minor = f.attrs['LASP_VERSION_MINOR'] except KeyError: self.version_major = 0 self.version_minor = 1 # Due to a previous bug, the channel names were not stored # consistently, i.e. as 'channel_names' and later camelcase. try: self._channelNames = f.attrs['channelNames'] except KeyError: try: self._channelNames = f.attrs['channel_names'] logging.info("Measurement file obtained which stores channel names with *old* attribute 'channel_names'") except KeyError: # No channel names found in measurement file logging.info('No channel name data found in measurement') self._channelNames = [f'Unnamed {i}' for i in range(self.nchannels)] # comment = read-write thing try: self._comment = f.attrs['comment'] except KeyError: f.attrs['comment'] = '' self._comment = '' # Sensitivity try: sens = f.attrs['sensitivity'] self._sens = sens * \ np.ones(self.nchannels) if isinstance( sens, float) else sens except KeyError: self._sens = np.ones(self.nchannels) self._time = f.attrs['time'] self._qtys = None try: qtys_json = f.attrs['qtys'] # Load quantity data self._qtys = [Qty.from_json(qty_json) for qty_json in qtys_json] except KeyError: # If quantity data is not available, this is an 'old' # measurement file. pass try: qtys_enum_idx = f.attrs['qtys_enum_idx'] self._qtys = [SIQtys.fromInt(idx) for idx in qtys_enum_idx] except KeyError: pass if self._qtys is None: self._qtys = [SIQtys.default() for i in range(self.nchannels)] logging.debug(f'Physical quantity data not available in measurement file. Assuming {SIQtys.default}') logging.debug(f'Physical quantity data not available in measurement file. Assuming {SIQtys.default}') def setAttribute(self, atrname, value): """ Set an attribute in the measurement file, and keep a local copy in memory for efficient accessing. """ with self.file('r+') as f: # Update comment attribute in the file f.attrs[atrname] = value setattr(self, '_' + atrname, value) @property def name(self): """Returns filename base without extension.""" return os.path.splitext(self.fn_base)[0] @property def channelNames(self): return self._channelNames @channelNames.setter def channelNames(self, newchnames): if len(newchnames) != self.nchannels: raise RuntimeError('Invalid length of new channel names') self.setAttribute('channelNames', newchnames) @property def channelConfig(self): chcfg = [] for chname, sens, qty in zip(self.channelNames, self.sensitivity, self.qtys): ch = DaqChannel() ch.enabled = True ch.name = chname ch.sensitivity = sens ch.qty = qty.cpp_enum chcfg.append(ch) return chcfg @channelConfig.setter def channelConfig(self, chcfg: List[DaqChannel]): chname = [] sens = [] qtys = [] for ch in chcfg: chname.append(ch.name) sens.append(ch.sensitivity) qtys.append(SIQtys.fromCppEnum(ch.qty)) self.channelNames = chname self.sensitivity = sens self.qtys = qtys @property def qtys(self): return self._qtys @qtys.setter def qtys(self, newqtys): if not len(newqtys) == len(self._qtys): raise ValueError('Invalid number of quantities') qtys_int = [qty.toInt() for qty in newqtys] # Use setAttribute here, but thos store the jsonified version as well, # which we have to overwrite again with the deserialized ones. This is # actually not a very nice way of coding. with self.file('r+') as f: # Update comment attribute in the file f.attrs['qtys_enum_idx'] = qtys_int self._qtys = newqtys @contextmanager def file(self, mode='r'): """Contextmanager which opens the storage file and yields the file. Args: mode: Opening mode for the file. Should either be 'r', or 'r+' """ if mode not in ('r', 'r+'): raise ValueError('Invalid file opening mode.') with h5.File(self.fn, mode) as f: yield f @property def comment(self): """Return the measurement comment. Returns: The measurement comment (text string) """ return self._comment @comment.setter def comment(self, cmt): """Set the measurement comment. Args: cmt: Comment text string to set """ with self.file('r+') as f: # Update comment attribute in the file f.attrs['comment'] = cmt self._comment = cmt @property def recTime(self): """Returns the total recording time of the measurement, in float seconds.""" return self.blocksize * self.nblocks / self.samplerate @property def time(self): """Returns the measurement time in seconds since the epoch.""" return self._time def rms(self, channels=None, substract_average=False): """Returns the root mean square values for each channel Args: channels: list of channels substract_average: If set to true, computes the rms of only the oscillating component of the signal, which is in fact the signal variance. Returns: 1D array with rms values for each channel """ meansquare = 0. # Mean square of the signal, including its average sum_ = 0. # Sumf of the values of the signal, used to compute average N = 0 with self.file() as f: for block in self.iterData(channels): Nblock = block.shape[0] sum_ += np.sum(block, axis=0) N += Nblock meansquare += np.sum(block**2, axis=0) / self.N avg = sum_/N # In fact, this is not the complete RMS, as in includes the DC # If p = p_dc + p_osc, then rms(p_osc) = sqrt(ms(p)-ms(p_dc)) if substract_average: meansquare -= avg**2 rms = np.sqrt(meansquare) return rms def variance(self, channels=None): return self.rms(substract_average=True) def rawData(self, channels=None, **kwargs): """Returns the raw data as stored in the measurement file, without any transformations applied args: channels: list, or tuple of channel numbers to export. If not defined, all channels in the measurement are returned returns: Numpy array with data. The first axis is always the time instance, the second axis the channel number. """ if channels is None: channels = list(range(self.nchannels)) rawdata = [] with self.file() as f: for block in IterRawData(f, channels, **kwargs): rawdata.append(block) return np.concatenate(rawdata, axis=0) def iterData(self, channels, **kwargs): sensitivity = kwargs.pop('sensitivity', self.sensitivity) if channels is None: channels = list(range(self.nchannels)) with self.file() as f: for block in IterData(f, channels, sensitivity, **kwargs): yield block def data(self, channels=None, **kwargs): """ Returns the measurement data, scaled and sensitivity applied. """ data = [] for d in self.iterData(channels, **kwargs): data.append(d) return np.concatenate(data, axis=0) def CPS(self, channels=None, **kwargs): """ Compute single-sided Cross-Power-Spectrum of the measurement channels Args: channels: Channels to compute for (numbers) Optional arguments: nfft: FFT length window: Window type overlap: Overlap percentage (value between 0.0 and up to and including 100.0) weighting: Returns: Cross-power-spectra. C[freq, ch_i, ch_j] = C_ij """ nfft = kwargs.pop('nfft', 2048) window = kwargs.pop('window', Window.hann) overlap = kwargs.pop('overlap', 50.0) if channels is None: channels = list(range(self.nchannels)) nchannels = len(channels) aps = AvPowerSpectra(nfft, nchannels, overlap, window) freq = getFreq(self.samplerate, nfft) for data in self.iterData(channels, **kwargs): CS = aps.addTimeData(data) return freq, CS def periodicAverage(self, N, channels=None, noiseCorrection=True, **kwargs): """ Return the (coherent) periodic average the measurement. This method is useful for a situation of periodic excitation. Args: N: The number of samples in one period. This value should correspond with the period of the excitation! noiseCorrection: whether to apply coherent averaging, according to the Sliding Window correlation method (SWiC): Telle et al.: A Novel Approach for Impulse Response Measurements with Time-Varying Noise. If set to False, just the arithmetic average is used. """ # Create blocks of equal length N Ntot = self.N Nblocks = Ntot//N # TODO: This method graps the whole measurement file into memory. Can # only be done with relatively small measurement files. signal = self.data(channels) # Estimate noise power in block blocks = [signal[i*N:(i+1)*N] for i in range(Nblocks)] if noiseCorrection: # The difference between the measured signal in the previous block and # the current block en = [None] + [blocks[i] - blocks[i-1] for i in range(1,Nblocks)] noise_est = [None] + [-np.average(en[i]*en[i+1]) for i in range(1,len(en)-1)] # Create weighting coefficients sum_inverse_noise = sum([1/n for n in noise_est[1:]]) c_n = [1/(ni*sum_inverse_noise) for ni in noise_est[1:]] else: c_n = [1/(Nblocks-2)]*(Nblocks-2) assert np.isclose(sum(c_n), 1.0) assert Nblocks-2 == len(c_n) # Average signal over blocks avg = np.zeros((blocks[0].shape), dtype=float) for n in range(0, Nblocks-2): avg += c_n[n]*blocks[n+1] return avg def periodicCPS(self, N, channels=None, **kwargs): """ Compute Cross-Spectral Density based on periodic excitation. Uses noise reduction by time-averaging the data. """ if channels is None: channels = list(range(self.nchannels)) nchannels = len(channels) window = Window.rectangular ps = PowerSpectra(N, window) avg = np.asfortranarray(self.periodicAverage(N, channels, **kwargs)) CS = ps.compute(avg) freq = getFreq(self.samplerate, N) return freq, CS @property def sensitivity(self): """Sensitivity of the data in U^-1, from floating point data scaled between -1.0 and 1.0 to Units [U]. If the sensitivity is not stored in the measurement file, this function returns 1.0 for each channel """ return self._sens @sensitivity.setter def sensitivity(self, sens): """Set the sensitivity of the measurement in the file. Args: sens: sensitivity data, should be a float, or an array of floats equal to the number of channels. """ if isinstance(sens, float): # Put all sensitivities equal sens = sens * np.ones(self.nchannels) elif isinstance(sens, list): sens = np.asarray(sens) valid = sens.ndim == 1 valid &= sens.shape[0] == self.nchannels valid &= sens.dtype == float if not valid: raise ValueError('Invalid sensitivity value(s) given') with self.file('r+') as f: f.attrs['sensitivity'] = sens self._sens = sens def checkOverflow(self, channels): """Coarse check for overflow in measurement. Return: True if overflow is possible, else False """ for block in self.iterData(channels): dtype = block.dtype if dtype.kind == 'i': # minvalue = np.iinfo(dtype).min maxvalue = np.iinfo(dtype).max if np.max(np.abs(block)) >= 0.9*maxvalue: return True else: # Cannot check for floating point values. return False return False def exportAsWave(self, fn=None, force=False, newsampwidth=None, normalize=True, **kwargs): """Export measurement file as wave. In case the measurement data is stored as floats, the values are scaled to the proper integer (PCM) data format. Args: fn: If given, this will be the filename to write to. If the filename does not end with '.wav', this extension is added. force: If True, overwrites any existing files with the given name , otherwise a RuntimeError is raised. newsampwidth: sample width in bytes with which to export the data. This should only be given in case the measurement data is stored as floating point values, otherwise an error is thrown normalize: If set: normalize the level to something sensible. """ if fn is None: fn = self.fn fn = os.path.splitext(fn)[0] if os.path.splitext(fn)[1] != '.wav': fn += '.wav' if os.path.exists(fn) and not force: raise RuntimeError(f'File already exists: {fn}') if not np.isclose(self.samplerate%1,0): raise RuntimeError(f'Sample rates should be approximately integer for exporting to Wave to work') # TODO: With VERY large measurment files, this is not possible! Is this # a theoretical case? data = self.rawData(**kwargs) if np.issubdtype(data.dtype, np.floating) and newsampwidth is None: raise ValueError('Newsampwidth parameter should be given for floating point raw data') if normalize: # Scale back to maximum of absolute value maxabs = np.max(np.abs(data)) data = data / maxabs # "data /= maxabs" fails if dtpyes differ if newsampwidth is not None: # Convert to floats, then to new sample width sensone = np.ones_like(self.sensitivity) data = scaleBlockSens(data, sensone) if newsampwidth == 2: newtype = np.int16 elif newsampwidth == 4: newtype = np.int32 else: raise ValueError('Invalid sample width, should be 2 or 4') scalefac = 2**(8*newsampwidth-1)-1 # Scale data to integer range, and then convert to integers data = (data*scalefac).astype(newtype) wavfile.write(fn, int(self.samplerate), data) @staticmethod def fromtxt(fn, skiprows, samplerate, sensitivity, mfn=None, timestamp=None, delimiter='\t', firstcoltime=True): """Converts a txt file to a LASP Measurement file, opens the associated Measurement object and returns it. The measurement file will have the same file name as the txt file, except with h5 extension. Args: fn: Filename of text file skiprows: Number of header rows in text file to skip samplerate: Sampling frequency in [Hz] sensitivity: 1D array of channel sensitivities mfn: Filepath where measurement file is stored. If not given, a h5 file will be created along fn, which shares its basename timestamp: If given, a custom timestamp for the measurement (integer containing seconds since epoch). If not given, the timestamp is obtained from the last modification time. delimiter: Column delimiter firstcoltime: If true, the first column is the treated as the sample time. """ if not os.path.exists(fn): raise ValueError(f'File {fn} does not exist.') if timestamp is None: timestamp = os.path.getmtime(fn) if mfn is None: mfn = os.path.splitext(fn)[0] + '.h5' else: mfn = os.path.splitext(mfn)[0] + '.h5' dat = np.loadtxt(fn, skiprows=skiprows, delimiter=delimiter) if firstcoltime: time = dat[:, 0] if not np.isclose(time[1] - time[0], 1 / samplerate): raise ValueError('Samplerate given does not agree with ' 'samplerate in file') # Chop off first column dat = dat[:, 1:] nchannels = dat.shape[1] if nchannels != sensitivity.shape[0]: raise ValueError( f'Invalid sensitivity length given. Should be: {nchannels}') with h5.File(mfn, 'w') as hf: hf.attrs['samplerate'] = samplerate hf.attrs['sensitivity'] = sensitivity hf.attrs['time'] = timestamp hf.attrs['blocksize'] = 1 hf.attrs['nchannels'] = nchannels ad = hf.create_dataset('audio', (1, dat.shape[0], dat.shape[1]), dtype=dat.dtype, maxshape=(1, dat.shape[0], dat.shape[1]), compression='gzip') ad[0] = dat return Measurement(mfn) @staticmethod def fromnpy(data, samplerate, sensitivity, mfn, timestamp=None): """Converts a numpy array to a LASP Measurement file, opens the associated Measurement object and returns it. The measurement file will have the same file name as the txt file, except with h5 extension. Args: data: Numpy array, first column is sample, second is channel. Can also be specified with a single column for single-channel data samplerate: Sampling frequency in [Hz] sensitivity: 1D array of channel sensitivities [Pa^-1] mfn: Filepath where measurement file is stored. timestamp: If given, a custom timestamp for the measurement (integer containing seconds since epoch). If not given, the timestamp is obtained from the last modification time. delimiter: Column delimiter firstcoltime: If true, the first column is the treated as the sample time. force: If True, overwrites existing files with specified `mfn` name. """ if os.path.splitext(mfn)[1] != '.h5': mfn += '.h5' if os.path.exists(mfn) and not force: raise ValueError(f'File {mfn} already exist.') if timestamp is None: timestamp = int(time.time()) if data.ndim != 2: data = data[:, np.newaxis] try: len(sensitivity) except: raise ValueError('Sensitivity should be given as array-like data type') sensitivity = np.asarray(sensitivity) nchannels = data.shape[1] if nchannels != sensitivity.shape[0]: raise ValueError( f'Invalid sensitivity length given. Should be: {nchannels}') with h5.File(mfn, 'w') as hf: hf.attrs['samplerate'] = samplerate hf.attrs['sensitivity'] = sensitivity hf.attrs['time'] = timestamp hf.attrs['blocksize'] = 1 hf.attrs['nchannels'] = nchannels ad = hf.create_dataset('audio', (1, data.shape[0], data.shape[1]), dtype=data.dtype, maxshape=(1, data.shape[0], data.shape[1]), compression='gzip') ad[0] = data return Measurement(mfn) @staticmethod def fromWaveFile(fn, newfn=None, force=False, timestamp=None): """Convert a measurement file to a wave file, and return the measurement handle.""" if timestamp is None: timestamp = int(time.time()) base_fn = os.path.splitext(fn)[0] if newfn is None: newfn = base_fn + '.h5' if os.path.exists(newfn) and not force: raise RuntimeError(f'Measurement file name {newfn} already exists in path, set "force" to true to overwrite') samplerate, data = wavfile.read(fn) if data.ndim == 2: nframes, nchannels = data.shape else: nchannels = 1 nframes = len(data) data = data[:, np.newaxis] sensitivity = np.ones(nchannels) with h5.File(newfn, 'w') as hf: hf.attrs['samplerate'] = samplerate hf.attrs['nchannels'] = nchannels hf.attrs['time'] = timestamp hf.attrs['blocksize'] = 1 hf.attrs['sensitivity'] = sensitivity ad = hf.create_dataset('audio', (1, nframes, nchannels), dtype=data.dtype, maxshape=(1, nframes, nchannels), compression='gzip') ad[0] = data return Measurement(newfn)