Added test files. Debugged quite some things. SLM needs still tests on statistics. Bugfix in frequency weighting filters. Added a lot of wrappers for Python calls.

This commit is contained in:
Anne de Jong 2024-08-26 21:57:11 +02:00
parent 90c7f0eb37
commit 7092ef4717
13 changed files with 694 additions and 147 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@ python/lasprs/_lasprs*
.venv
.vscode/launch.json
.vscode
examples_py/.ipynb_checkpoints

View File

@ -0,0 +1,135 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "bcd07c1e-6722-44c7-b162-267f1341c2fe",
"metadata": {},
"source": [
"# Standard filterbank frequency response and impulse response"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f0c4d6ef-69b2-4b5c-92fd-d987a11f3cbd",
"metadata": {},
"outputs": [],
"source": [
"# Prerequisites, uncomment below in case of errors. Also for ipympl, restart Jupyter Lab if it was not installed\n",
"\n",
"!cd .. && maturin develop -F python-bindings\n",
"#!pip install ipympl scipy matplotlib"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "23b45cf6-85bc-4eaa-8f27-77c5b354e772",
"metadata": {},
"outputs": [],
"source": [
"# If this does not work, install ipympl and reboot Jupyter Lab\n",
"%matplotlib widget"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "96090ac9-2033-411d-8e45-0c6b375a268b",
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from numpy import log10, sqrt, exp, pi\n",
"from lasprs._lasprs import StandardFilterDescriptor, SLMSettings, FreqWeighting, TimeWeighting, SLM\n",
"def level(a):\n",
" return 20*np.log10(np.abs(a))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "52343e3d-63ef-4fc2-ad7e-85ca124efdac",
"metadata": {},
"outputs": [],
"source": [
"freq = np.logspace(log10(2), log10(2e4), 200)\n",
"octaves = StandardFilterDescriptor.genFilterSetInRange(1, 10, 16e3, False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2788f062-34ab-4573-9849-459fdccc97d3",
"metadata": {},
"outputs": [],
"source": [
"hs = [ o.genFilter().tf(0,freq) for o in octaves]\n",
"names = [str(o) for o in octaves]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3366f635-29fe-4757-ab0c-9d4929f8cddb",
"metadata": {},
"outputs": [],
"source": [
"plt.figure()\n",
"# plt.subplot(211)\n",
"plt.title('Frequency response of filters - magnitude')\n",
"\n",
"for h in hs:\n",
" plt.semilogx(freq, level(h))\n",
"plt.legend(names)\n",
"plt.ylabel('Magnitude [dB]')\n",
"plt.ylim(-50, 1)\n",
"plt.xlabel('Freq. [Hz]')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b242187a-3449-4c30-8e26-28e75d59c414",
"metadata": {},
"outputs": [],
"source": [
"fs = 48000\n",
"tend = 0.01\n",
"t = np.linspace(0, tend, int(tend*fs), endpoint=False)\n",
"impulse = np.zeros(t.size)\n",
"impulse[0] = 1\n",
"plt.figure()\n",
"plt.title('Filter impulse response')\n",
"for o in octaves:\n",
" i = o.genFilter().bilinear(fs).filter(impulse)\n",
" plt.plot(t, i)\n",
"plt.legend(names, loc='upper right')\n",
"plt.ylabel('Filter output [-]')\n",
"plt.xlabel('Time [s]')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -0,0 +1,110 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "bcd07c1e-6722-44c7-b162-267f1341c2fe",
"metadata": {},
"source": [
"# Frequency weighting plots"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f0c4d6ef-69b2-4b5c-92fd-d987a11f3cbd",
"metadata": {},
"outputs": [],
"source": [
"# Prerequisites, uncomment below in case of errors. Also for ipympl, restart Jupyter Lab if it was not installed\n",
"\n",
"!cd .. && maturin develop -F python-bindings\n",
"#!pip install ipympl scipy matplotlib"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "23b45cf6-85bc-4eaa-8f27-77c5b354e772",
"metadata": {},
"outputs": [],
"source": [
"# If this does not work, install ipympl and reboot Jupyter Lab\n",
"%matplotlib widget"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "96090ac9-2033-411d-8e45-0c6b375a268b",
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from numpy import log10, sqrt, exp, pi\n",
"from lasprs._lasprs import ZPKModel, FreqWeighting\n",
"def level(a):\n",
" return 20*np.log10(np.abs(a))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "52343e3d-63ef-4fc2-ad7e-85ca124efdac",
"metadata": {},
"outputs": [],
"source": [
"A = ZPKModel.freqWeightingFilter(FreqWeighting.A)\n",
"C = ZPKModel.freqWeightingFilter(FreqWeighting.C)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7d3cc799-4809-4ea4-8570-89ffbb751a5a",
"metadata": {},
"outputs": [],
"source": [
"freq = np.logspace(log10(2), log10(2e4), 200)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ab22d9cc-03ae-4d58-b730-599b659f9dc0",
"metadata": {},
"outputs": [],
"source": [
"plt.figure()\n",
"plt.title('Frequency response of weighting filters - magnitude')\n",
"plt.semilogx(freq, level(A.tf(0, freq)))\n",
"plt.semilogx(freq, level(C.tf(0, freq)))\n",
"plt.legend(['A-weighting', 'Z-weighting'])\n",
"plt.ylabel('Magnitude [dB]')\n",
"plt.ylim(-50, 3)\n",
"plt.xlabel('Freq. [Hz]')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

175
examples_py/test_slm1.ipynb Normal file
View File

@ -0,0 +1,175 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "bcd07c1e-6722-44c7-b162-267f1341c2fe",
"metadata": {},
"source": [
"# Test Sound Level Meter implementation - 1"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f0c4d6ef-69b2-4b5c-92fd-d987a11f3cbd",
"metadata": {},
"outputs": [],
"source": [
"# Prerequisites, uncomment below in case of errors. Also for ipympl, restart Jupyter Lab if it was not installed\n",
"\n",
"!cd .. && maturin develop -F python-bindings\n",
"#!pip install ipympl scipy matplotlib"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "23b45cf6-85bc-4eaa-8f27-77c5b354e772",
"metadata": {},
"outputs": [],
"source": [
"# If this does not work, install ipympl and reboot Jupyter Lab\n",
"%matplotlib widget"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "96090ac9-2033-411d-8e45-0c6b375a268b",
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from numpy import log10, sqrt, exp, pi\n",
"from lasprs._lasprs import StandardFilterDescriptor, SLMSettings, FreqWeighting, TimeWeighting, SLM\n",
"def level(a):\n",
" return 20*np.log10(np.abs(a))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "52343e3d-63ef-4fc2-ad7e-85ca124efdac",
"metadata": {},
"outputs": [],
"source": [
"freq = np.logspace(log10(2), log10(2e4), 200)\n",
"octaves = [StandardFilterDescriptor.Overall()]+StandardFilterDescriptor.genOctaveFilterSet(16, 16e3)\n",
"\n",
"names = [str(o) for o in octaves]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ab22d9cc-03ae-4d58-b730-599b659f9dc0",
"metadata": {},
"outputs": [],
"source": [
"fs = 48000\n",
"\n",
"tw = TimeWeighting.Impulse()\n",
"# tw = TimeWeighting.Fast()\n",
"N = fs\n",
"if tw == TimeWeighting.Slow():\n",
" tau = 1.0\n",
"elif tw == TimeWeighting.Fast():\n",
" tau = 1/8\n",
"elif tw == TimeWeighting.Impulse():\n",
" tau = 35e-3\n",
"\n",
"else:\n",
" raise NotImplementedError()\n",
"settings = SLMSettings(fs, FreqWeighting.Z, tw, filterDescriptors=octaves, Lref=1)\n",
"slm = SLM(settings)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e1960b80-99f9-4744-9b0f-6651ed2693cd",
"metadata": {},
"outputs": [],
"source": [
"inp = np.ones(N)\n",
"# inp = np.zeros(N)\n",
"inp[0] = 1"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "532539b8-7a5e-47c2-afb1-a72d62c445d4",
"metadata": {},
"outputs": [],
"source": [
"t = np.linspace(0, N/fs, N, endpoint=False)\n",
"out = slm.run(inp)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "37af9b78-73b9-4228-a758-d9a5362f79f4",
"metadata": {},
"outputs": [],
"source": [
"len(out[0])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4609a63f-e350-4b6b-b898-93c9e6fef804",
"metadata": {},
"outputs": [],
"source": [
"# help(plt.legend)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f446ac32-4ac2-4e85-87c8-43a06be9fb14",
"metadata": {},
"outputs": [],
"source": [
"plt.figure()\n",
"for o in out:\n",
" plt.plot(t,o)\n",
"plt.ylim(-60, 0)\n",
"plt.legend(names)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4303e20e-cc61-4d23-a5f0-cab683d55912",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -101,7 +101,7 @@ impl Biquad {
&mut self,
py: Python<'py>,
input: PyArrayLike1<Flt>,
) -> PyResult<PyArr1Flt<'py>> {
) -> Result<PyArr1Flt<'py>> {
Ok(PyArray1::from_vec_bound(py, self.filter(input.as_slice()?)))
}
}
@ -128,12 +128,25 @@ impl Biquad {
}
}
/// Re-initialize state. *This is an advanced function. You should know what
/// you are doing!*. If not, please use any other function like
/// [Biquad::reset].
pub fn setNextOutputX0(&mut self, out: Flt) {
let (b0, b1, b2, a1, a2) = (self.b0, self.b1, self.b2, self.a1, self.a2);
let w = out / (b1 + b2 - b0 * (a1 + a2));
/// Set the state as if the filter converged to the DC value `val`. Note
/// that this only works for filters that do have a D.C. nonzero gain. If
/// not, this method will do a kind of divide-by-zero.
pub fn setToDCValue(&mut self, val: Flt) {
// D.C. output is:
// y[n] = b0*w[n] + b1*w[n-1] + b2*w[n-2]
// and:
// w[n] = x[n] - a1*w[n-1] -a2*w[n-2]
// Assume w[n] = w[n-1] = w[n-2]:
// We have:
// w[n]*(1+a1+a2) = x
// And:
// y[n] = (b0+b1+b2) * wn
// Hence:
// y[n] = sum_b * w
// So we should set w to val/sum_b
let sumb = self.b0 + self.b1 + self.b2;
let w = val / sumb;
self.w1 = w;
self.w2 = w;
}
@ -442,6 +455,16 @@ mod test {
let filtered = ser.filter(&inp);
assert_eq!(&filtered, &inp);
}
#[test]
fn test_setDC() {
let mut b = Biquad::firstOrderMovingAverage(7., 1.).unwrap();
let dc = 7.8;
b.setToDCValue(dc);
let mut x = dc;
b.filter_inout_single(&mut x);
assert_abs_diff_eq!(x, dc, epsilon = Flt::EPSILON * 10.)
}
#[test]
fn test_firstOrderLowpass() {
@ -502,21 +525,4 @@ mod test {
// let freq = &[0., 10.,100.,1000., 2000.];
// println!("{:?}", b3.tf(fs, freq));
}
#[test]
fn test_setOutput1() {
let mut f = Biquad::firstOrderHighPass(10., 1.).unwrap();
f.setNextOutputX0(1.0);
let mut sample = 0.;
f.filter_inout_single(&mut sample);
assert_abs_diff_eq!(sample, 1.0);
}
#[test]
fn test_setOutput2() {
let mut f = Biquad::bilinear_zpk(1.0, None, Some(PoleOrZero::Real1(-1.)), None, None);
f.setNextOutputX0(4.2);
let mut sample = 0.;
f.filter_inout_single(&mut sample);
assert_abs_diff_eq!(sample, 4.2, epsilon = 1e-6);
}
}

View File

@ -1,4 +1,4 @@
use crate::{Flt, ZPKModel};
use crate::{config::*, ZPKModel};
use anyhow::{anyhow, bail, Result};
use num::{traits::float, Float};
use rayon::iter::Filter;
@ -100,6 +100,7 @@ pub const FREQ_REF: Flt = 1000.;
/// # Ok(())
/// # }
/// ```
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(PartialEq, Clone, Debug)]
pub struct StandardFilterDescriptor {
/// b and x. Bandwidth and offset w.r.t. reference frequency.
@ -128,12 +129,12 @@ impl StandardFilterDescriptor {
// are valid.
fn check_fmid_in_range(&self) -> Result<()> {
if let Some(fm) = self.fm() {
if fm < MIN_MIDBAND_FREQ {
if fm < MIN_MIDBAND_FREQ / 2. {
bail!(
"Invalid x. Computed filter center frequency is {} Hz, which is too low. Lowest allowed is {} Hz",
fm, MIN_MIDBAND_FREQ
)
} else if fm > 20e3 {
} else if fm > 25e3 {
bail!(
"Invalid x. Computed filter center frequency is {} Hz, which is too high. Highest allowed is {} Hz",
fm, MAX_MIDBAND_FREQ
@ -216,9 +217,9 @@ impl StandardFilterDescriptor {
};
loop {
// Get midband. Assuming we are not overall if we arrive
// here in the loop
let fm = desc.fm().unwrap();
// eprintln!("Fmid: {fm:.2e}");
// eprintln!("desc: {desc:#?}");
let ord = f_in_range(&desc);
// Bands for midband frequencies are a bit wider here
if fm < MIN_MIDBAND_FREQ - 3. || fm > MAX_MIDBAND_FREQ * 1.1 {
@ -284,24 +285,24 @@ impl StandardFilterDescriptor {
/// overall, `b=1` is one octave, `b=3`` is one-third, etc.
/// - `xmin` - Band designator of lowest band. Midband frequency can be computed as [FREQ_REF]*[G]^(`xmin/b`)
/// - `xmax` - Band designator of lowest band. Midband frequency can be computed as [FREQ_REF]*[G]^(`xmax/b`)
/// - `include_overall` - If `true`, adds an overall filter (a no-op) as the last designator in the list
/// - `append_overall` - If `true`, adds an overall filter (a no-op) as the last designator in the list
pub fn genFilterSetByDesignator(
b: u32,
xmin: i32,
xmax: i32,
include_overall: bool,
append_overall: bool,
) -> Result<Vec<Self>> {
if xmin > xmax {
bail!("xmin should be <= xmax");
}
let cap = (xmax - xmin) as usize + if include_overall { 1 } else { 0 };
let cap = (xmax - xmin) as usize + if append_overall { 1 } else { 0 };
let mut res = Vec::with_capacity(cap);
for x in xmin..=xmax {
res.push(StandardFilterDescriptor::build(b, x)?);
}
if include_overall {
if append_overall {
res.push(StandardFilterDescriptor::Overall()?)
}
@ -314,16 +315,17 @@ impl StandardFilterDescriptor {
///
/// # Other args
///
/// - `include_overall` - If `true`, adds an overall filter (a no-op) as the
pub fn genFilterSetInRange(
/// - `append_overall` - If `true`, adds an overall filter (a no-op) as the
/// last filter in the list.
pub fn genFilterSetForRange(
b: u32,
fl: Flt,
fu: Flt,
include_overall: bool,
append_overall: bool,
) -> Result<Vec<Self>> {
let xmin = StandardFilterDescriptor::filterForFreq(b, fl)?.x;
let xmax = StandardFilterDescriptor::filterForFreq(b, fu)?.x;
StandardFilterDescriptor::genFilterSetByDesignator(b, xmin, xmax, include_overall)
StandardFilterDescriptor::genFilterSetByDesignator(b, xmin, xmax, append_overall)
}
/// Returns the midband frequency in \[Hz\]
@ -338,7 +340,7 @@ impl StandardFilterDescriptor {
}
/// Cuton frequency and cut-off frequency, in \[Hz\].
/// Returns none if it does not apply, for [FilterDescriptor::Overall].
/// Returns none if it does not apply, for [StandardFilterDescriptor::Overall].
pub fn fl_fh(&self) -> Option<(Flt, Flt)> {
match self.b {
0 => None,
@ -435,6 +437,22 @@ impl TryFrom<&str> for ThirdOctaveBandDescriptor {
})
}
}
impl TryFrom<Flt> for ThirdOctaveBandDescriptor {
type Error = anyhow::Error;
fn try_from(value: Flt) -> Result<Self, Self::Error> {
Ok(ThirdOctaveBandDescriptor {
x: StandardFilterDescriptor::filterForFreq(3, value)?.x,
})
}
}
impl TryFrom<Flt> for OctaveBandDescriptor {
type Error = anyhow::Error;
fn try_from(value: Flt) -> Result<Self, Self::Error> {
Ok(OctaveBandDescriptor {
x: StandardFilterDescriptor::filterForFreq(1, value)?.x,
})
}
}
trait BandDescriptor {
fn name(&self) -> Cow<'static, str>;
@ -460,11 +478,58 @@ impl BandDescriptor for ThirdOctaveBandDescriptor {
}
}
// impl Into<OctaveBandDescriptor> for &str {
// fn into(self) -> OctaveBandDescriptor {
// OctaveBandDescriptor{x: self}
// }
// }
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StandardFilterDescriptor {
#[pyo3(name = "genFilter")]
fn genFilter_py(&self) -> ZPKModel {
self.genFilter()
}
#[staticmethod]
#[pyo3(name = "Overall")]
fn Overall_py() -> StandardFilterDescriptor {
StandardFilterDescriptor::Overall().unwrap()
}
#[staticmethod]
#[pyo3(name = "genThirdOctaveFilterSet")]
fn genThirdOctaveFilterSet_py(fmin: Flt, fmax: Flt) -> PyResult<Vec<StandardFilterDescriptor>> {
Ok(Self::genThirdOctaveFilterSet(fmin, fmax)?)
}
#[staticmethod]
#[pyo3(name = "genOctaveFilterSet")]
fn genOctaveFilterSetFromFreq(fmin: Flt, fmax: Flt) -> PyResult<Vec<StandardFilterDescriptor>> {
Ok(Self::genOctaveFilterSet(fmin, fmax)?)
}
#[staticmethod]
fn genThirdOctaveFilterSetFromFreq(
fmin: Flt,
fmax: Flt,
) -> PyResult<Vec<StandardFilterDescriptor>> {
Ok(Self::genThirdOctaveFilterSet(fmin, fmax)?)
}
fn __repr__(&self) -> String {
format!{"{:#?}", self}
}
fn __str__(&self) -> String {
self.name().into()
}
#[staticmethod]
#[pyo3(name = "genFilterSetInRange")]
fn genFilterSetInRange_py(
b: u32,
fmin: Flt,
fmax: Flt,
append_overall: bool,
) -> PyResult<Vec<StandardFilterDescriptor>> {
Ok(Self::genFilterSetForRange(b, fmin, fmax, append_overall)?)
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@ -87,11 +87,15 @@ pub struct ZPKModel {
// transform to create digital filter of this analogue one.
fwarp: Option<Flt>,
}
impl Default for ZPKModel {
impl Default for ZPKModel {
fn default() -> Self {
ZPKModel{z:vec![], p:vec![], k: 1.0, fwarp: None}
ZPKModel {
z: vec![],
p: vec![],
k: 1.0,
fwarp: None,
}
}
}
impl<'a, T: AsArray<'a, Flt>> TransferFunction<'a, T> for ZPKModel {
fn tf(&self, _fs: Flt, freq: T) -> Ccol {
@ -155,6 +159,12 @@ impl ZPKModel {
ZPKModel::butter(spec)
}
#[pyo3(name = "freqWeightingFilter")]
#[staticmethod]
fn freqWeightingFilter_py(fw: FreqWeighting) ->ZPKModel {
Self::freqWeightingFilter(fw)
}
fn __repr__(&self) -> String {
format!("{self:?}")
}
@ -171,6 +181,12 @@ impl ZPKModel {
let res = PyArray1::from_array_bound(py, &self.tf(fs, freq));
Ok(res)
}
/// See: [ZPKModel::tf]
#[pyo3(name = "bilinear")]
fn bilinear_py(&self, fs: Flt) -> SeriesBiquad {
self.bilinear(fs)
}
}
impl ZPKModel {
/// Creata a new ZPK model, with give list of poles and zeros, and gain
@ -538,20 +554,22 @@ impl ZPKModel {
let fHsq: Flt = fH.powi(2);
let frsq: Flt = fr.powi(2);
let fA = num::Float::powf(10., 2.45);
let D = num::Float::powf(2., 0.5);
let D = Flt::sqrt(0.5);
let b = (1. / (1. - D)) * (frsq + fLsq * fHsq / frsq - D * (fLsq + fHsq));
let c = fLsq * fHsq;
let f2 = (3. - sq5) / 2. * fA;
let f3 = (3. + sq5) / 2. * fA;
let f1 = ((-b - (b.powi(2) - 4. * c).sqrt()) / 2.).sqrt();
let f4 = ((-b + (b.powi(2) - 4. * c).sqrt()) / 2.).sqrt();
let sqrtfac = (b.powi(2) - 4. * c).sqrt();
let f1 = ((-b - sqrtfac) / 2.).sqrt();
let f4 = ((-b + sqrtfac) / 2.).sqrt();
let p1 = 2. * pi * f1;
let p2 = 2. * pi * f2;
let p3 = 2. * pi * f3;
let p4 = 2. * pi * f4;
println!("{b} {p1}, {p2}, {p3}, {p4}");
let (zeros, poles) = match wt {
FreqWeighting::Z => {
@ -589,3 +607,14 @@ pub enum PoleOrZero {
/// Single zero / pole
Real1(Flt),
}
#[cfg(test)]
mod test{
use super::ZPKModel;
#[test]
fn test_A() {
let Aw = ZPKModel::freqWeightingFilter(crate::FreqWeighting::A);
}
}

View File

@ -57,6 +57,11 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<siggen::Siggen>()?;
m.add_class::<filter::FilterSpec>()?;
m.add_class::<filter::ZPKModel>()?;
m.add_class::<filter::StandardFilterDescriptor>()?;
m.add_class::<slm::TimeWeighting>()?;
m.add_class::<ps::FreqWeighting>()?;
m.add_class::<slm::SLMSettings>()?;
m.add_class::<slm::SLM>()?;
Ok(())
}

View File

@ -1,6 +1,8 @@
use crate::config::*;
use strum_macros::{Display, EnumMessage};
/// Sound level frequency weighting type (A, C, Z)
#[derive(Display, Debug, EnumMessage, Default, Clone)]
#[cfg_attr(feature = "python-bindings", pyclass(eq, eq_int))]
#[derive(Display, Debug, EnumMessage, Default, Clone, PartialEq)]
pub enum FreqWeighting {
/// A-weighting
A,

View File

@ -1,18 +1,22 @@
use super::{TimeWeighting, SLM_MAX_CHANNELS};
use crate::{filter::StandardFilterDescriptor, Flt, FreqWeighting};
use super::{TimeWeighting, SLM, SLM_MAX_CHANNELS};
use crate::{config::*, filter::StandardFilterDescriptor, Flt, FreqWeighting};
use anyhow::Result;
use clap::builder;
use derive_builder::Builder;
use smallvec::{smallvec, SmallVec};
const Lref_default: Flt = 20e-6;
/// Settings used to create a Sound Level Meter.
#[derive(Builder, Clone)]
#[builder(setter(into))]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct SLMSettings {
/// Sampling frequency in \[Hz\]
pub fs: Flt,
/// Reference level, in units of measured quantity. For sound pressure in
/// air, this is typically 20 μPa. This is also the default value.
#[builder(default = "2e-5")]
#[builder(default = "Lref_default")]
pub Lref: Flt,
/// Frequency weightin A/C/Z applied to data. Defaults to [FreqWeighting::default()].
#[builder(default)]
@ -24,6 +28,31 @@ pub struct SLMSettings {
pub filterDescriptors: Vec<StandardFilterDescriptor>,
}
#[cfg_attr(feature = "python-bindings", pymethods)]
impl SLMSettings {
#[new]
#[pyo3(signature=
(fs, freqWeighting,
timeWeighting,
filterDescriptors,
Lref=(Lref_default)))]
fn new_py(
fs: Flt,
freqWeighting: FreqWeighting,
timeWeighting: TimeWeighting,
filterDescriptors: Vec<StandardFilterDescriptor>,
Lref: Flt,
) -> SLMSettings {
SLMSettings {
fs,
Lref,
freqWeighting,
timeWeighting,
filterDescriptors,
}
}
}
#[cfg(test)]
mod test {
use super::*;
@ -31,12 +60,12 @@ mod test {
#[test]
fn test_slmsettings1() -> Result<()> {
let desc = StandardFilterDescriptor::genFilterSetInRange(1, 100., 5e3, true).unwrap();
let desc = StandardFilterDescriptor::genFilterSetForRange(1, 100., 5e3, true).unwrap();
let _ = SLMSettingsBuilder::default()
.fs(1e3)
.freqWeighting(FreqWeighting::A)
.timeWeighting(TimeWeighting::Slow)
.timeWeighting(TimeWeighting::Slow {})
.filterDescriptors(desc)
.build()
.unwrap();

View File

@ -1,3 +1,4 @@
use crate::config::*;
use derive_builder::Builder;
use itertools::Itertools;
use ndarray::ArrayView1;
@ -22,6 +23,7 @@ struct SLMChannel {
}
/// Sound Level Meter
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug, Clone)]
pub struct SLM {
// Number of samples processed after last run() is called.
@ -76,16 +78,22 @@ impl SLM {
}
}
/// Push new time data through sound level meter. Returns L(t) data for each
/// channel.
/// channel. Updates the computed statistics and optionally outputs levels
/// vs time if flag `provide_output` is set to `true`. Note that at the end
/// of the block, the `L(t)` can also be obtained by calling [SLM::Ltlast].
///
/// # Args
///
/// - `td`: Time data
pub fn run(&mut self, td: &[Flt]) -> Option<Vec<Vec<Flt>>> {
/// - `provide_output` - Set this to true to give intermediate output data
pub fn run(&mut self, td: &[Flt], provide_output: bool) -> Option<Vec<Vec<Flt>>> {
if td.len() == 0 {
return None;
}
let prefiltered = self.prefilter.filter(td);
let level = |a| 10. * Flt::log10(a) / self.Lrefsq;
let Lt_iter = self.channels.par_iter_mut().map(|ch| {
let mut tmp = ch.bp.filter(&prefiltered);
let mut N = self.N;
@ -111,21 +119,23 @@ impl SLM {
// Run filtered_squared signal throug rectifier
if let Some(rectifier_down) = &mut ch.rect_lowpass_down {
filtered_squared.mapv_inplace(|sample_sq| {
let rectifier_up = &mut ch.rect_lowpass_up;
// Asymmetric up/down case for level
let mut fup = sample_sq;
let mut fdown = sample_sq;
// Filter in up-filter
let rectifier_up = &mut ch.rect_lowpass_up;
rectifier_up.filter_inout_single(&mut fup);
// Filter in down-filter
rectifier_down.filter_inout_single(&mut fdown);
// Check who wins
if fup > fdown {
rectifier_down.setNextOutputX0(fup);
if fup >= fdown {
rectifier_down.setToDCValue(fup);
fup
} else {
rectifier_up.setNextOutputX0(fdown);
rectifier_up.setToDCValue(fup);
fdown
}
});
@ -144,11 +154,20 @@ impl SLM {
});
// Update last signal power coming from SLM
ch.stat.Pt_last = *filtered_squared.last().unwrap();
// Convert output to levels
filtered_squared.mapv_inplace(level);
tmp
});
let Lt: Vec<_> = Lt_iter.collect();
self.N += td.len();
Some(Lt)
if provide_output {
let Lt: Vec<_> = Lt_iter.collect();
self.N += td.len();
Some(Lt)
} else {
// Just consume the iterator
Lt_iter.for_each(|_| {});
self.N += td.len();
None
}
}
/// Number of channels in SLM
@ -163,7 +182,7 @@ impl SLM {
Dcol::from_iter(
self.channels
.iter()
.map(|ch| 20. * Flt::log10(stat_returner(ch) / self.Lrefsq)),
.map(|ch| 10. * Flt::log10(stat_returner(ch) / self.Lrefsq)),
)
}
@ -186,6 +205,36 @@ impl SLM {
}
}
#[cfg_attr(feature = "python-bindings", pymethods)]
impl SLM {
#[new]
fn new_py(settings: SLMSettings) -> SLM {
SLM::new(settings)
}
#[pyo3(name = "run", signature=(dat, provide_output=true))]
fn run_py(&mut self, dat: PyArrayLike1<Flt>, provide_output: bool) -> Option<Vec<Vec<Flt>>> {
self.run(dat.as_array().as_slice()?, provide_output)
}
#[pyo3(name = "Lmax")]
fn Lmax_py<'py>(&self, py: Python<'py>) -> PyArr1Flt<'py> {
PyArray1::from_array_bound(py, &self.Lmax())
}
#[pyo3(name = "Leq")]
fn Leq_py<'py>(&self, py: Python<'py>) -> PyArr1Flt<'py> {
PyArray1::from_array_bound(py, &self.Leq())
}
#[pyo3(name = "Lpk")]
fn Lpk_py<'py>(&self, py: Python<'py>) -> PyArr1Flt<'py> {
PyArray1::from_array_bound(py, &self.Lpk())
}
#[pyo3(name = "Ltlast")]
fn Ltlast_py<'py>(&self, py: Python<'py>) -> PyArr1Flt<'py> {
PyArray1::from_array_bound(py, &self.Ltlast())
}
}
#[derive(Debug, Clone, Default)]
/// Quantities defined as powers, i.e. square of amplitude
struct SLMStat {
@ -213,13 +262,13 @@ mod test {
#[test]
fn test_slm1() {
const fs: Flt = 48e3;
const N: usize = (fs/8.) as usize;
const N: usize = (fs / 8.) as usize;
let desc = StandardFilterDescriptor::Overall().unwrap();
let settings = SLMSettingsBuilder::default()
.fs(fs)
.timeWeighting(TimeWeighting::Fast)
.timeWeighting(TimeWeighting::Fast {})
.freqWeighting(FreqWeighting::Z)
.filterDescriptors(&[desc])
.build()
@ -230,14 +279,12 @@ mod test {
siggen.reset(fs);
let mut data = vec![0.; N];
siggen.genSignal(&mut data);
// println!("{:#?}", data);
let mut slm = SLM::new(settings);
// println!("{slm:#?}");
let res = slm.run(&data).unwrap();
let res = slm.run(&data, true).unwrap();
let res = &res[0];
println!("{slm:#?}");
println!("{:#?}", &res[res.len()- 100..]);
println!("{:#?}", &res[res.len() - 100..]);
}
}

View File

@ -1,18 +1,23 @@
use crate::Flt;
use crate::config::*;
/// Time weighting to use in level detection of Sound Level Meter.
#[derive(Clone, Copy)]
#[cfg_attr(feature = "python-bindings", pyclass(eq))]
#[derive(Clone, Copy, PartialEq)]
pub enum TimeWeighting {
// I know that the curly braces here are not required and add some
// boilerplate, but this is the only way Pyo3 swallows complex enums at the
// moment.
/// Slow time weighting ~ 1 s
Slow,
Slow {},
/// Fast time weighting ~ 1/8 s
Fast,
Fast {},
/// Impulse time weighting ~ 30 ms
Impulse,
Impulse {},
/// A custom symmetric time weighting
CustomSymmetric {
/// Custom time constant [s]
t: Flt,
},
/// A custom symmetric time weighting
CustomAsymmetric {
/// Time weighting when level is increasing
@ -23,7 +28,7 @@ pub enum TimeWeighting {
}
impl Default for TimeWeighting {
fn default() -> Self {
TimeWeighting::Fast
TimeWeighting::Fast {}
}
}
impl TimeWeighting {
@ -32,13 +37,13 @@ impl TimeWeighting {
pub fn getLowpassPoles(&self) -> (Flt, Option<Flt>) {
use TimeWeighting::*;
match self {
Slow => (-1.0, None),
Fast =>
Slow {} => (-1.0, None),
Fast {} =>
// Time constant is 1/8 s, pole is at -8 rad/s
{
(-8., None)
}
Impulse => {
Impulse {} => {
// For the impulse time weighting, some source says ~ 2.9 dB/s
// drop for the decay
// [https://www.nti-audio.com/en/support/know-how/fast-slow-impulse-time-weighting-what-do-they-mean].

View File

@ -1,62 +0,0 @@
#!/usr/bin/env python3
import numpy as np
from lasprs.filter import Biquad, SeriesBiquad, BiquadBank
def test_biquad1():
"""
Test if manual unit biquad matches response for manual input biquad.
"""
input_ = np.array([1.0, 0, 0, 0, 0, 0, 0])
b = Biquad(np.array([1., 0, 0, 1, 0, 0]))
b2 = Biquad.unit()
out1 = b.filter(input_)
out2 = b2.filter(input_)
expected_output = input_
assert(np.linalg.norm(out1 - out2) == 0.)
assert(np.linalg.norm(out1 - expected_output) == 0.)
def test_seriesbiquad():
"""
Test if manual unit biquad matches response for manual input biquad.
"""
input_ = np.array([1.0, 0, 0, 0, 0, 0, 0])
f1 = [1., 0, 0, 1, 0, 0]
f2 = [1., 0, 0, 1, 0, 0]
f = f1+f2
# Two biquads in series
b = SeriesBiquad(np.array(f))
# Single one
b2 = SeriesBiquad.unit()
out1 = b.filter(input_)
out2 = b2.filter(input_)
expected_output = input_
assert(np.linalg.norm(out1 - out2) == 0.)
assert(np.linalg.norm(out1 - expected_output) == 0.)
def test_biquadbank():
"""
See if two filters with half gain produce the output=input
"""
input_ = np.array([1.0, 0, 0, 0, 0, 0, 0])
f1 = [1., 0, 0, 1, 0, 0]
f2 = [1., 0, 0, 1, 0, 0]
filters = np.array([f1, f2]).T
bank = BiquadBank(filters)
bank.set_gains([0.5, 0.5])
filtered = bank.filter(input_)
assert(bank.len() == 2)
assert(np.linalg.norm(input_ - filtered) == 0)
if __name__ == '__main__':
test_biquad1()
test_seriesbiquad()
test_biquadbank()