228 lines
7.4 KiB
Python
228 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Sound level meter implementation
|
|
@author: J.A. de Jong - ASCEE
|
|
"""
|
|
from .lasp_cpp import cppSLM
|
|
import numpy as np
|
|
from .lasp_common import (TimeWeighting, FreqWeighting, P_REF)
|
|
from .filter import SPLFilterDesigner
|
|
import logging
|
|
|
|
__all__ = ['SLM', 'Dummy']
|
|
|
|
|
|
class Dummy:
|
|
"""
|
|
Emulate filtering, but does not filter anything at all.
|
|
"""
|
|
|
|
def filter_(self, data):
|
|
return data[:, np.newaxis]
|
|
|
|
|
|
class SLM:
|
|
"""
|
|
Multi-channel Sound Level Meter. Input data: time data with a certain
|
|
sampling frequency. Output: time-weighted (fast/slow) sound pressure
|
|
levels in dB(A/C/Z). Possibly in octave bands.
|
|
"""
|
|
|
|
def __init__(self,
|
|
fs,
|
|
fbdesigner=None,
|
|
tw: TimeWeighting = TimeWeighting.fast,
|
|
fw: FreqWeighting = FreqWeighting.A,
|
|
xmin=None,
|
|
xmax=None,
|
|
include_overall=True,
|
|
level_ref_value=P_REF,
|
|
offset_t=0):
|
|
"""
|
|
Initialize a sound level meter object.
|
|
|
|
Args:
|
|
fs: Sampling frequency of input data [Hz]
|
|
fbdesigner: FilterBankDesigner to use for creating the
|
|
(fractional) octave bank filters. Set this one to None to only do
|
|
overalls
|
|
fs: Sampling frequency [Hz]
|
|
tw: Time Weighting to apply
|
|
fw: Frequency weighting to apply
|
|
xmin: Filter designator of lowest band.
|
|
xmax: Filter designator of highest band
|
|
include_overall: If true, a non-functioning filter is added which
|
|
is used to compute the overall level.
|
|
level_ref_value: Reference value for computing the levels in dB
|
|
offset_t: Offset to be added to output time data [s]
|
|
"""
|
|
|
|
self.fbdesigner = fbdesigner
|
|
if xmin is None and fbdesigner is not None:
|
|
xmin = fbdesigner.xs[0]
|
|
elif fbdesigner is None:
|
|
xmin = 0
|
|
|
|
if xmax is None and self.fbdesigner is not None:
|
|
xmax = fbdesigner.xs[-1]
|
|
elif fbdesigner is None:
|
|
xmax = 0
|
|
|
|
self.xs = list(range(xmin, xmax + 1))
|
|
|
|
nfilters = len(self.xs)
|
|
if include_overall:
|
|
nfilters += 1
|
|
self.include_overall = include_overall
|
|
self.offset_t = offset_t
|
|
|
|
spld = SPLFilterDesigner(fs)
|
|
if fw == FreqWeighting.A:
|
|
prefilter = spld.A_Sos_design().flatten()
|
|
elif fw == FreqWeighting.C:
|
|
prefilter = spld.C_Sos_design().flatten()
|
|
elif fw == FreqWeighting.Z:
|
|
prefilter = None
|
|
else:
|
|
raise ValueError(f'Not implemented prefilter {fw}')
|
|
|
|
# 'Probe' size of filter coefficients
|
|
self.nom_txt = []
|
|
|
|
# This is a bit of a hack, as the 5 is hard-encoded here, but should in
|
|
# fact be coming from somewhere else..
|
|
sos_overall = np.array([1, 0, 0, 1, 0, 0]*5, dtype=float)
|
|
|
|
if fbdesigner is not None:
|
|
assert fbdesigner.fs == fs
|
|
sos_firstx = fbdesigner.createSOSFilter(self.xs[0]).flatten()
|
|
self.nom_txt.append(fbdesigner.nominal_txt(self.xs[0]))
|
|
sos = np.empty((sos_firstx.size, nfilters), dtype=float, order='C')
|
|
sos[:, 0] = sos_firstx
|
|
|
|
for i, x in enumerate(self.xs[1:]):
|
|
sos[:, i+1] = fbdesigner.createSOSFilter(x).flatten()
|
|
self.nom_txt.append(fbdesigner.nominal_txt(x))
|
|
|
|
if include_overall:
|
|
# Create a unit impulse response filter, every third index equals
|
|
# 1, so b0 = 1 and a0 is 1 (by definition)
|
|
# a0 = 1, b0 = 1, rest is zero
|
|
sos[:, -1] = sos_overall
|
|
self.nom_txt.append('overall')
|
|
|
|
else:
|
|
# No filterbank, means we do only compute the overall values. This
|
|
# means that in case of include_overall, it creates two overall
|
|
# channels. That would be confusing, so we do not allow it.
|
|
sos = sos_overall[:, np.newaxis]
|
|
self.nom_txt.append('overall')
|
|
|
|
# Downsampling factor, determine from single pole low pass filter time
|
|
# constant, such that aliasing is ~ allowed at 20 dB lower value
|
|
# and
|
|
dsfac = cppSLM.suggestedDownSamplingFac(fs, tw[0])
|
|
|
|
if prefilter is not None:
|
|
self.slm = cppSLM.fromBiquads(fs, level_ref_value, dsfac,
|
|
tw[0],
|
|
prefilter.flatten(), sos)
|
|
else:
|
|
self.slm = cppSLM.fromBiquads(fs, level_ref_value, dsfac,
|
|
tw[0],
|
|
sos)
|
|
|
|
self.fs_slm = fs / dsfac
|
|
|
|
# Initialize counter to 0
|
|
self.N = 0
|
|
|
|
def run(self, data):
|
|
"""
|
|
Add new fresh timedata to the Sound Level Meter
|
|
|
|
Args:
|
|
data: one-dimensional input data
|
|
"""
|
|
|
|
assert data.ndim == 1
|
|
|
|
levels = self.slm.run(data)
|
|
|
|
tstart = self.N / self.fs_slm
|
|
Ncur = levels.shape[0]
|
|
tend = tstart + Ncur / self.fs_slm
|
|
|
|
t = np.linspace(tstart, tend, Ncur, endpoint=False) + self.offset_t
|
|
self.N += Ncur
|
|
|
|
output = {}
|
|
|
|
for i, x in enumerate(self.xs):
|
|
# '31.5' to '16k'
|
|
output[self.nom_txt[i]] = {'t': t,
|
|
'data': levels[:, [i]],
|
|
'x': x}
|
|
if self.include_overall and self.fbdesigner is not None:
|
|
output['overall'] = {'t': t, 'data': levels[:, [i+1]], 'x': 0}
|
|
|
|
return output
|
|
|
|
def return_as_dict(self, dat):
|
|
"""
|
|
Helper function used to return resulting data in a proper way.
|
|
|
|
Returns a dictionary with the following keys:
|
|
'data': The y-values of Lmax, Lpeak, etc
|
|
'overall': The overall value, in [dB] **COULD BE NOT PART OF
|
|
OUTPUT**
|
|
'x': The exponents of the octave, such that the midband frequency
|
|
corresponds to 1000*G**(x/b), where b is the bands, either 1, 3, or
|
|
6
|
|
'mid': The center frequencies of each band, as a numpy float array
|
|
'nom': The nominal frequency array text, as textual output corresponding
|
|
to the frequencies in x, they are '16', .. up to '16k'
|
|
|
|
"""
|
|
output = {}
|
|
output['nom'] = self.nom_txt
|
|
output['x'] = list(self.xs)
|
|
output['mid'] = self.fbdesigner.fm(list(self.xs))
|
|
logging.debug(list(self.xs))
|
|
logging.debug(output['mid'])
|
|
|
|
if self.include_overall and self.fbdesigner is not None:
|
|
output['overall'] = dat[-1]
|
|
output['y'] = np.asarray(dat[:-1])
|
|
else:
|
|
output['y'] = np.asarray(dat[:])
|
|
return output
|
|
|
|
def Leq(self):
|
|
"""
|
|
Returns the computed equivalent levels for each filter channel
|
|
"""
|
|
return self.return_as_dict(self.slm.Leq())
|
|
|
|
def Lmax(self):
|
|
"""
|
|
Returns the computed max levels for each filter channel
|
|
"""
|
|
return self.return_as_dict(self.slm.Lmax())
|
|
|
|
def Lpeak(self):
|
|
"""
|
|
Returns the computed peak levels for each filter channel
|
|
"""
|
|
return self.return_as_dict(self.slm.Lpeak())
|
|
|
|
def Leq_array(self):
|
|
return self.slm.Leq()
|
|
|
|
def Lmax_array(self):
|
|
return self.slm.Lmax()
|
|
|
|
def Lpeak_array(self):
|
|
return self.slm.Lpeak()
|