lasp/lasp/lasp_slm.py

209 lines
6.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Sound level meter implementation
@author: J.A. de Jong - ASCEE
"""
from .wrappers import Slm as pyxSlm
import numpy as np
from .lasp_common import (TimeWeighting, FreqWeighting, P_REF)
from .filter import SPLFilterDesigner
__all__ = ['SLM', 'Dummy']
class Dummy:
"""
Emulate filtering, but does not filter anything at all.
"""
def filter_(self, data):
return data[:, np.newaxis]
class SLM:
"""
Multi-channel Sound Level Meter. Input data: time data with a certain
sampling frequency. Output: time-weighted (fast/slow) sound pressure
levels in dB(A/C/Z). Possibly in octave bands.
"""
def __init__(self,
fs,
2020-10-16 16:33:22 +00:00
fbdesigner=None,
tw=TimeWeighting.fast,
fw=FreqWeighting.A,
xmin = None,
xmax = None,
include_overall=True,
level_ref_value=P_REF):
"""
Initialize a sound level meter object.
Args:
fs: Sampling frequency of input data [Hz]
fbdesigner: FilterBankDesigner to use for creating the
(fractional) octave bank filters. Set this one to None to only do
overalls
fs: Sampling frequency [Hz]
tw: Time Weighting to apply
fw: Frequency weighting to apply
xmin: Filter designator of lowest band
xmax: Filter designator of highest band
include_overall: If true, a non-functioning filter is added which
is used to compute the overall level.
level_ref_value: Reference value for computing the levels in dB
"""
self.fbdesigner = fbdesigner
2020-10-16 16:33:22 +00:00
if xmin is None and fbdesigner is not None:
xmin = fbdesigner.xs[0]
2020-10-16 16:33:22 +00:00
elif fbdesigner is None:
xmin = 0
if xmax is None and self.fbdesigner is not None:
xmax = fbdesigner.xs[-1]
2020-10-16 16:33:22 +00:00
elif fbdesigner is None:
xmax = 0
self.xs = list(range(xmin, xmax + 1))
nfilters = len(self.xs)
if include_overall: nfilters +=1
self.include_overall = include_overall
spld = SPLFilterDesigner(fs)
if fw == FreqWeighting.A:
prefilter = spld.A_Sos_design().flatten()
elif fw == FreqWeighting.C:
prefilter = spld.C_Sos_design().flatten()
elif fw == FreqWeighting.Z:
prefilter = None
else:
raise ValueError(f'Not implemented prefilter {fw}')
# 'Probe' size of filter coefficients
self.nom_txt = []
2020-10-16 16:33:22 +00:00
# This is a bit of a hack, as the 5 is hard-encoded here, but should in
# fact be coming from somewhere else..
sos_overall = np.array([1,0,0,1,0,0]*5, dtype=float)
if fbdesigner is not None:
assert fbdesigner.fs == fs
2020-10-16 16:33:22 +00:00
sos_firstx = fbdesigner.createSOSFilter(self.xs[0]).flatten()
2020-01-24 19:44:12 +00:00
self.nom_txt.append(fbdesigner.nominal_txt(self.xs[0]))
2020-10-16 16:33:22 +00:00
sos = np.empty((nfilters, sos_firstx.size), dtype=float, order='C')
sos[0, :] = sos_firstx
for i, x in enumerate(self.xs[1:]):
2020-01-24 19:44:12 +00:00
sos[i+1, :] = fbdesigner.createSOSFilter(x).flatten()
self.nom_txt.append(fbdesigner.nominal_txt(x))
if include_overall:
# Create a unit impulse response filter, every third index equals
# 1, so b0 = 1 and a0 is 1 (by definition)
2020-10-16 16:33:22 +00:00
# a0 = 1, b0 = 1, rest is zero
sos[-1,:] = sos_overall
self.nom_txt.append('overall')
2020-10-16 16:33:22 +00:00
else:
# No filterbank, means we do only compute the overall values. This
# means that in case of include_overall, it creates two overall
# channels. That would be confusing, so we do not allow it.
2020-10-16 16:33:22 +00:00
if include_overall:
sos = sos_overall[np.newaxis,:]
self.nom_txt.append('overall')
self.slm = pyxSlm(prefilter, sos,
fs, tw[0], level_ref_value)
dsfac = self.slm.downsampling_fac
if dsfac > 0:
# Not unfiltered data
self.fs_slm = fs / self.slm.downsampling_fac
else:
self.fs_slm = fs
# Initialize counter to 0
self.N = 0
def run(self, data):
"""
Add new fresh timedata to the Sound Level Meter
Args:
data: one-dimensional input data
"""
assert data.ndim == 2
assert data.shape[1] == 1, "invalid number of channels, should be 1"
if data.shape[0] == 0:
return {}
levels = self.slm.run(data)
tstart = self.N / self.fs_slm
Ncur = levels.shape[0]
tend = tstart + Ncur / self.fs_slm
t = np.linspace(tstart, tend, Ncur, endpoint=False)
self.N += Ncur
output = {}
for i, x in enumerate(self.xs):
# '31.5' to '16k'
output[self.nom_txt[i]] = {'t': t,
2020-01-24 19:44:12 +00:00
'data': levels[:, [i]],
'x': x}
if self.include_overall and self.fbdesigner is not None:
output['overall'] = {'t': t, 'data': levels[:, [i+1]], 'x': 0}
2020-01-24 19:44:12 +00:00
return output
2020-01-24 19:44:12 +00:00
def return_as_dict(self, dat):
"""
Helper function used to
"""
output = {}
for i, x in enumerate(self.xs):
# '31.5' to '16k'
output[self.nom_txt[i]] = { 'data': dat[i],
'x': x}
if self.include_overall and self.fbdesigner is not None:
output['overall'] = {'data': dat[i+1], 'x': 0}
return output
def Leq(self):
"""
Returns the computed equivalent levels for each filter channel
"""
return self.return_as_dict(self.slm.Leq())
def Lmax(self):
"""
Returns the computed max levels for each filter channel
"""
return self.return_as_dict(self.slm.Lmax())
def Lpeak(self):
"""
Returns the computed peak levels for each filter channel
"""
return self.return_as_dict(self.slm.Lpeak())
def Leq_array(self):
return self.slm.Leq()
def Lmax_array(self):
return self.slm.Lmax()
def Lpeak_array(self):
return self.slm.Lpeak()