First work on ZPKmodel for the benefit of the sound level meter implementation. Added code for bilinear transforms. Added butterworth analog filters. Documentation improvements. This is an intermediate commit of working code, but nothing finished.

This commit is contained in:
Anne de Jong 2024-08-11 11:58:50 +02:00
parent 649c9f549c
commit 99a8db23b8
15 changed files with 870 additions and 69 deletions

View File

@ -26,7 +26,7 @@ num = "0.4.3"
# openblas-src = { version = "0.10", features = ["cblas", "system"] }
# Parallel iterators
rayon = "1.8.0"
rayon = "1.10.0"
# Python bindings
pyo3 = { version = "0.22.2", optional = true, features = ["extension-module", "anyhow"]}
@ -40,8 +40,8 @@ rand_distr = "0.4.3"
cpal = { version = "0.15.3", optional = true }
# Nice enumerations
strum = "0.26.2"
strum_macros = "0.26.2"
strum = "0.26.3"
strum_macros = "0.26.4"
# Conditional compilation enhancements
cfg-if = "1.0.0"
@ -55,7 +55,7 @@ crossbeam = "0.8.4"
# Serialization
serde = { version = "1.0.193", features = ["derive"] }
toml = "0.8.14"
toml = "0.8.19"
# Initialize array for non-copy type
array-init = "2.1.0"
@ -70,13 +70,16 @@ hdf5 = { version = "0.8.1", optional = true }
# Useful iterator stuff
itertools = "0.13.0"
# Approximate equal stuff
approx = "0.5.1"
# For getting timestamps. Only useful when recording.
chrono = {version = "0.4.38", optional = true}
# For getting UUIDs in recording
uuid = { version = "1.9.1", features = ["v4"] , optional = true}
uuid = { version = "1.10.0", features = ["v4"] , optional = true}
# Command line argument parser, for CLI apps
clap = { version = "4.5.8", features = ["derive", "color", "help", "suggestions"] }
clap = { version = "4.5.13", features = ["derive", "color", "help", "suggestions"] }
# FFT's
realfft = "3.3.0"
@ -86,7 +89,6 @@ parking_lot = "0.12.3"
derive_builder = "0.20.0"
[dev-dependencies]
approx = "0.5.1"
ndarray-rand = "0.14.0"
[features]

View File

@ -1,4 +1 @@
from .._lasprs import filter as _filter
Biquad = _filter.Biquad
SeriesBiquad = _filter.SeriesBiquad
BiquadBank = _filter.BiquadBank
from .._lasprs import (Biquad, BiquadBank, SeriesBiquad, ZPKModel, FilterSpec)

View File

@ -24,30 +24,28 @@ cfg_if::cfg_if! {
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
pub use numpy::ndarray::{ArrayD, ArrayViewD, ArrayViewMutD};
pub use numpy::ndarray::prelude::*;
pub use numpy::{IntoPyArray,PyArray, PyArray1, PyArrayDyn, PyArrayLike1, PyReadonlyArrayDyn};
pub use numpy::ndarray::Zip;
pub use pyo3::prelude::*;
pub use pyo3::exceptions::PyValueError;
pub use pyo3::{pymodule, types::PyModule, PyResult};
pub use pyo3::anyhow::*;
pub use pyo3;
} else {
pub use ndarray::prelude::*;
pub use ndarray::Zip;
pub use ndarray::{Array1, Array2, ArrayView1};
} }
}
}
pub use ndarray::prelude::*;
pub use ndarray::{Array1, Array2, ArrayView1};
use ndarray::OwnedRepr;
pub use ndarray::Zip;
use num::complex::Complex;
pub use num::complex::ComplexFloat;
/// View into 1D array of floats
#[allow(dead_code)]
pub type VdView<'a> = ArrayView1<'a, Flt>;
/// View into 1D array of complex floats
#[allow(dead_code)]
pub type VcView<'a> = ArrayView1<'a, Cflt>;
/// Complex number floating point

View File

@ -11,7 +11,7 @@ pub struct StreamMetaData {
/// The data type of the device [Number / voltage / Acoustic pressure / ...]
pub rawDatatype: DataType,
/// Sample rate in [Hz]
/// Sample rate in \[Hz\]
pub samplerate: Flt,
/// The number of frames per block of data that comes in. Multiplied by

View File

@ -4,7 +4,7 @@ use anyhow::{bail, Result};
use num::Complex;
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq)]
/// # A biquad is a second order recursive filter structure.
///
/// This implementation only allows for normalized coefficients (a_0 = 1). It
@ -48,7 +48,7 @@ pub struct Biquad {
#[cfg_attr(feature = "python-bindings", pymethods)]
impl Biquad {
#[new]
/// Create new biquad filter. See [Biquad::new()]
/// Create new biquad filter. See [Biquad::new]
///
pub fn new_py<'py>(coefs: PyArrayLike1<Flt>) -> PyResult<Self> {
Ok(Biquad::new(coefs.as_slice()?)?)
@ -61,19 +61,24 @@ impl Biquad {
}
#[pyo3(name = "firstOrderHighPass")]
#[staticmethod]
/// See: [Biquad::firstOrderHighPass()]
/// See: [Biquad::firstOrderHighPass]
pub fn firstOrderHighPass_py(fs: Flt, fc: Flt) -> PyResult<Biquad> {
Ok(Biquad::firstOrderHighPass(fs, fc)?)
}
/// See: [Biquad::firstOrderLowPass()]
#[pyo3(name = "firstOrderLowPass")]
#[staticmethod]
pub fn firstOrderLowPass_py(fs: Flt, fc: Flt) -> PyResult<Biquad> {
Ok(Biquad::firstOrderLowPass(fs, fc)?)
// Print biquad in Python
fn __repr__(&self) -> String {
format!("{self:?}")
}
/// See: [Biquad::tf()]
/// See: [Biquad::firstOrderMovingAverage]
#[pyo3(name = "firstOrderMovingAverage")]
#[staticmethod]
pub fn firstOrderMovingAverage_py(fs: Flt, fc: Flt) -> PyResult<Biquad> {
Ok(Biquad::firstOrderMovingAverage(fs, fc)?)
}
/// See: [Biquad::tf]
#[pyo3(name = "tf")]
pub fn tf_py<'py>(
&self,
@ -101,17 +106,21 @@ impl Biquad {
///
/// # Args
///
/// - coefs: Filter coefficients.
/// - coefs: Filter coefficients. Should be 6 in toal. First 3 coefficients
/// are numerator (forward) coefs. Last 3 are denominator (recursive)
/// coefficients. Note that `coefs[3]` should be equal to 1.0. Hence the
/// normalization is such that a0 equals 1.0. If this is not the case, an
/// error occurs.
///
pub fn new(coefs: &[Flt]) -> Result<Self> {
match coefs {
[b0, b1, b2, a0, a1, a2] => {
if *a0 != 1.0 {
&[b0, b1, b2, a0, a1, a2] => {
if a0 != 1.0 {
bail!("Coefficient a0 should be equal to 1.0")
}
Ok(Biquad { w1: 0., w2: 0., b0: *b0, b1: *b1, b2: *b2, a1: *a1, a2: *a2})
Ok(Biquad { w1: 0., w2: 0., b0, b1, b2, a1, a2})
},
_ => bail!("Could not initialize biquad. Please make sure that the coefficients contain 6 terms")
_ => bail!("Could not initialize biquad. Please make sure that the coefficients contain 6 terms, see documentation for order.")
}
}
@ -132,13 +141,14 @@ impl Biquad {
}
/// Create unit impulse response biquad filter. Input = output
fn unit() -> Biquad {
pub fn unit() -> Biquad {
let filter_coefs = &[1., 0., 0., 1., 0., 0.];
Biquad::new(filter_coefs).unwrap()
}
/// Initialize biquad as first order high pass filter.
///
/// Initialize biquad as first order high pass filter. Pre-warps the
/// bilinear transformation to set the -3 dB point exactly at the cut-on
/// frequency.
///
/// * fs: Sampling frequency in \[Hz\]
/// * cuton_Hz: -3 dB cut-on frequency in \[Hz\]
@ -156,35 +166,38 @@ impl Biquad {
cuton_Hz
);
}
let omgc = 2. * pi * cuton_Hz;
let fwarp = Some(cuton_Hz);
Ok(Biquad::bilinear(fs, &[0., 1., 0.], &[omgc, 1.0, 0.], fwarp))
let tau: Flt = 1. / (2. * pi * cuton_Hz);
let facnum = 2. * fs * tau / (1. + 2. * fs * tau);
let facden = (1. - 2. * fs * tau) / (1. + 2. * fs * tau);
// let tau: Flt = 1. / (2. * pi * cuton_Hz);
// let facnum = 2. * fs * tau / (1. + 2. * fs * tau);
// let facden = (1. - 2. * fs * tau) / (1. + 2. * fs * tau);
Ok(Biquad::fromCoefs(
facnum, // b0
-facnum, // b1
0., // b2,
facden, // a1
0., // a2
))
// Ok(Biquad::fromCoefs(
// facnum, // b0
// -facnum, // b1
// 0., // b2,
// facden, // a1
// 0., // a2
// ))
}
/// First order low pass filter (one pole in the real axis). No pre-warping
/// First order low pass filter, which is a simple moving average (one pole in the real axis). No pre-warping
/// correction done.
///
/// * `fs` - Sampling frequency \[Hz\]
/// * `fc` - Cut-off frequency (-3 dB point) \[Hz\]
pub fn firstOrderLowPass(fs: Flt, fc: Flt) -> Result<Biquad> {
pub fn firstOrderMovingAverage(fs: Flt, fc: Flt) -> Result<Biquad> {
if fc <= 0. {
bail!("Cuton frequency, given: should be > 0")
}
if fc >= fs / 2. {
bail!("Cuton frequency should be smaller than Nyquist frequency")
}
let b0 = pi*fc/(pi*fc+fs);
let b0 = pi * fc / (pi * fc + fs);
let b1 = b0;
let a1 = (pi*fc-fs)/(pi*fc+fs);
let a1 = (pi * fc - fs) / (pi * fc + fs);
Ok(Biquad::fromCoefs(b0, b1, 0., a1, 0.))
}
@ -201,6 +214,154 @@ impl Biquad {
}
// println!("{:?}", inout);
}
/// Create new biquad using bilinear transform. Optionally pre-warps the
/// filter to correct for the mapping of infinite frequency to Nyquist
/// frequency.
///
/// The analog filter is defined as:
///
/// b0 + b1*s + b2*s^2
/// H(s) = --------------------
/// a0 + a1*s + a2*s^2
///
/// # Args
///
/// - `fs` - Sampling frequency in \[Hz\]
/// - `b` - 3 Analog coefficients (numerator) of second order filter
/// - `a` - 3 Analog coefficients of (denominator) second order
/// filter.
/// - `fwarp` - Optional reference frequency for pre-warping in \[Hz\]
///
/// # Panics
///
/// - when a.len() or b.len() not equal to 3
/// - when fref >= fs
/// - when fs <= 0.
///
pub fn bilinear(fs: Flt, b: &[Flt], a: &[Flt], fwarp: Option<Flt>) -> Biquad {
assert!(b.len() == 3);
assert!(a.len() == 3);
assert!(fs > 0.);
let T = 1. / fs;
let (b0a, b1a, b2a, a0a, a1a, a2a) = (b[0], b[1], b[2], a[0], a[1], a[2]);
// See: https://en.wikipedia.org/wiki/Bilinear_transform
let K = if let Some(fref) = fwarp {
assert!(fref < fs);
let omg0 = fref * 2. * pi;
omg0 / (omg0 * T / 2.).tan()
} else {
2. / T
};
let Ksq = K.powi(2);
//
let a0fac = a2a * Ksq + a1a * K + a0a;
println!("Ksq = {Ksq}");
println!("a0fac = {a0fac}");
// Coefficient b0
let b0 = (b2a * Ksq + b1a * K + b0a) / a0fac;
// Coefficient b1
let b1 = (2. * b0a - 2. * b2a * Ksq) / a0fac;
// Coefficient b2
let b2 = (b2a * Ksq - b1a * K + b0a) / a0fac;
// Coefficient a1
let a1 = (2. * a0a - 2. * a2a * Ksq) / a0fac;
// Coefficient a2
let a2 = (a2a * Ksq - a1a * K + a0a) / a0fac;
Biquad::fromCoefs(b0, b1, b2, a1, a2)
}
/// Create biquad using bilinear transform (BLT) and given analogue zeros,
/// poles and gain values. Can only deal with a maximum of two poles /
/// zeros. This restriction is already enforced by only allowing
/// [PoleOrZero] values as inputs.
///
/// # Args
///
/// - `fs` - Sampling frequency in \[Hz\]
/// - `z` - Optional zero, or zero pair, units are \[rad/s\]
/// - `p` - Optional pole or pole pair, units are \[rad/s\]
/// - `k` - Gain value. Arbitrary units. If not given, uses value of 1.0.
/// - `fwarp` - Warp point frequency. Used to correct for frequency warping
/// in BLT for pre-warping the transform.
pub fn bilinear_zpk(
fs: Flt,
z: Option<PoleOrZero>,
p: Option<PoleOrZero>,
k: Option<Flt>,
fwarp: Option<Flt>,
) -> Biquad {
// k defaults to 1.0
let k = if let Some(k) = k { k } else { 1.0 };
// The zpk form:
// (s-z1)*(s-z2)...
// H(s) = k ------------------
// (s-p1)*(s-p2)...
// The nominal form:
// b0 + b1*s + b2*s^2
// H(s) = ---------------------
// a0 + a1*s + a2*s^2
// Note that here also we have one 'DOF' too much, in the sense that an
// infinite number of models of the nominal form can fit a zpk form. We
// restrict ourselve to the case that a0 = Π(-p_i), which is the
// simplest.
// If we have a single zero, the math says:
// k*(s-z) = b0 + b1*s --> b1 = k, b0 = -k*z
// If we have a set of two zeros, the math says:
// k*(s-z1)*(s-z2) = b0 + b1*s + b2*s^2 -->
// b0 = k*z1*z2
// b1 = -k*(z1+z2)
// b2 = k
// Note that when z = z1 = z2.conj(), this simplifies to:
// b0 = k*abs(z)**2
// b1 = -2*k*real(z)
// b2 = k
let b = if let Some(z) = z {
match z {
PoleOrZero::Complex(z) => [k * z.norm_sqr(), -2. * k * z.re(), k],
PoleOrZero::Real1(z) => [-k * z, k, 0.],
PoleOrZero::Real2(z1, z2) => [k * z1 * z2, -k * (z1 + z2), k],
}
} else {
[k, 0., 0.]
};
// For a single pole:
// (s-p) = a0 + a1*s --> a0 = -p, a1 = 1.
// If we have a set of two poles, the math says:
// (s-p1)*(s-p2) = a0 + a1*s + a2*s^2 -->
// a0 = p1*p2
// a1 = -k*(p1+p2)
// a2 = 1.0
// Note that when p = p1 = p2.conj(), this simplifies to:
// a0 = abs(p)**2
// a1 = -2*real(p)
// a2 = 1.0
let a = if let Some(p) = p {
match p {
PoleOrZero::Complex(p) => [p.norm_sqr(), -2. * p.re(), 1.0],
PoleOrZero::Real1(p) => [-p, 1.0, 0.],
PoleOrZero::Real2(p1, p2) => [p1 * p2, -(p1 + p2), 1.0],
}
} else {
[1., 0., 0.]
};
println!("b = {b:?}");
println!("a = {a:?}");
Biquad::bilinear(fs, &b, &a, fwarp)
}
}
impl Default for Biquad {
/// Unit impulse (does not transform signal whatsoever)
@ -229,9 +390,9 @@ impl<'a, T: AsArray<'a, Flt>> TransferFunction<'a, T> for Biquad {
let freq = freq.into();
freq.mapv(|f| {
let z = Complex::exp(I * 2. * pi * f / fs);
let num = self.b0 + self.b1 / z + self.b2 / z / z;
let den = 1. + self.a1 / z + self.a2 / z / z;
let zm = Complex::exp(-I * 2. * pi * f / fs);
let num = self.b0 + self.b1 * zm + self.b2 * zm * zm;
let den = 1. + self.a1 * zm + self.a2 * zm * zm;
num / den
})
}
@ -240,7 +401,7 @@ impl<'a, T: AsArray<'a, Flt>> TransferFunction<'a, T> for Biquad {
#[cfg(test)]
mod test {
use approx::assert_abs_diff_eq;
use num::complex::ComplexFloat;
use num::{complex::ComplexFloat, integer::sqrt};
use super::*;
@ -256,7 +417,7 @@ mod test {
fn test_firstOrderLowpass() {
let fs = 1e5;
let fc = 10.;
let b = Biquad::firstOrderLowPass(fs, fc).unwrap();
let b = Biquad::firstOrderMovingAverage(fs, fc).unwrap();
let mut freq = Dcol::from_elem(5, 0.);
freq[1] = fc;
freq[2] = fs / 2.;
@ -266,4 +427,32 @@ mod test {
assert_abs_diff_eq!(tf[0].im, 0.);
assert_abs_diff_eq!(tf[1].abs(), 1. / Flt::sqrt(2.), epsilon = 1e-6);
}
#[test]
fn test_bilinear() {
let fc = 100.;
let omgc = 2. * pi * fc;
let fs = 2e3;
let b1 = Biquad::firstOrderMovingAverage(fs, fc).unwrap();
let b2 = Biquad::bilinear_zpk(fs, None, Some(PoleOrZero::Real1(-omgc)), Some(omgc), None);
println!("b1 = {b1:?}");
println!("b2 = {b2:?}");
assert_abs_diff_eq!((b1.tf(fs, &[0.])[0] - Cflt::ONE).abs(), 0., epsilon = 1e-9);
assert_abs_diff_eq!((b2.tf(fs, &[0.])[0] - Cflt::ONE).abs(), 0., epsilon = 1e-9);
// assert_eq!(b1, b2);
assert_abs_diff_eq!((b1.tf(fs, &[fs / 2.])[0]).abs(), 0., epsilon = 1e-9);
assert_abs_diff_eq!((b2.tf(fs, &[fs / 2.])[0]).abs(), 0., epsilon = 1e-9);
}
#[test]
fn test_firstOrderHighPass() {
let fc = 100.;
let fs = 4e3;
let b3 = Biquad::firstOrderHighPass(fs, fc).unwrap();
println!("b3 = {b3:?}");
assert_abs_diff_eq!((b3.tf(fs, &[0.])[0]).abs(), 0., epsilon = 1e-9);
assert_abs_diff_eq!((b3.tf(fs, &[(fs-fs/1e9) / 2.])[0]).abs(), 1., epsilon = 1e-9);
assert_abs_diff_eq!((b3.tf(fs, &[fc])[0]).abs(), (0.5).sqrt(), epsilon = 1e-9);
// let freq = &[0., 10.,100.,1000., 2000.];
// println!("{:?}", b3.tf(fs, freq));
}
}

41
src/filter/butter.rs Normal file
View File

@ -0,0 +1,41 @@
//! We implement
//! [Pieter's Pages](https://tttapa.github.io/Pages/Mathematics/Systems-and-Control-Theory/Analog-Filters/Butterworth-Filters.html)
//! is a fine source to understand the theory presented here.
//! A Butterworth lowpass filter has the form
//!
//! 1
//! |H(s)|^2 = ------------------
//! 1+ (omega/omega_c)^(2n)
//!
//! where n is the order of the filter.
use approx::abs_diff_eq;
use super::PoleOrZero;
use crate::config::*;
/// Create iterator that returns the poles of a butterworth lowpass filter.
///
/// # Args
///
/// - `fc` - Cutoff-frequency in \[Hz\]
/// - `n` - Filter order
pub fn butter_lowpass_roots(fc: Flt, n: u32) -> impl Iterator<Item = PoleOrZero> {
let omgc = 2. * pi * fc;
let nf = n as Flt;
(1..=n).filter_map(move |k| {
let kf = k as Flt;
let angle = pi * (2. * kf + nf - 1.) / (2. * nf);
let pole = omgc * Cflt::exp(I * angle);
if abs_diff_eq!(pole.im(), 0., epsilon = 1e-5) {
Some(PoleOrZero::Real1(pole.re()))
} else if pole.im() > 0. {
// We only pick the roots with positive imaginary part
Some(PoleOrZero::Complex(pole))
} else {
// Negative imaginary part. Will be filtered out
None
}
})
}

View File

@ -2,16 +2,18 @@ use itertools::Itertools;
use super::*;
/// A Dummy fillter just does 'nothing', its input equals its output
/// A Dummy fillter just does 'nothing', its input equals its output. It is
/// equal to [Biquad::unit], but then with the option to fully optimize it away.
#[derive(Clone, Copy, Debug)]
pub struct DummyFilter;
impl Filter for DummyFilter {
#[inline]
fn filter(&mut self, input: &[Flt]) -> Vec<Flt> {
// Just returns an allocated copy
input.to_vec()
}
fn reset(&mut self) { }
fn reset(&mut self) {}
fn clone_dyn(&self) -> Box<dyn Filter> {
Box::new(*self)
}

View File

@ -10,11 +10,14 @@ mod biquad;
mod biquadbank;
mod dummy;
mod seriesbiquad;
mod zpkmodel;
mod butter;
pub use biquad::Biquad;
pub use biquadbank::BiquadBank;
pub use dummy::DummyFilter;
pub use seriesbiquad::SeriesBiquad;
pub use zpkmodel::{PoleOrZero, ZPKModel, FilterSpec};
/// Implementations of this trait are able to DSP-filter input data.
pub trait Filter: Send {

View File

@ -19,6 +19,11 @@ pub struct SeriesBiquad {
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl SeriesBiquad {
// Print biquad in Python
fn __repr__(&self) -> String {
format!("{self:?}")
}
/// Create new series filter set. See [SeriesBiquad::new()]
///
#[new]
@ -27,12 +32,13 @@ impl SeriesBiquad {
}
#[pyo3(name = "unit")]
#[staticmethod]
/// See: [Biquad::unit()]
/// See: [Biquad::unit]
pub fn unit_py() -> SeriesBiquad {
SeriesBiquad::unit()
}
/// See: [SeriesBiquad::filter]
#[pyo3(name = "filter")]
/// See: [SeriesBiquad::filter()]
pub fn filter_py<'py>(
&mut self,
py: Python<'py>,
@ -48,6 +54,14 @@ impl SeriesBiquad {
}
}
impl SeriesBiquad {
/// Create new series biquad from vector of biquads. No checks on the
/// validity or the stability of the biquads are performed.
///
pub fn newFromBiqs(biqs: Vec<Biquad>) -> SeriesBiquad {
assert!(biqs.len() > 0);
SeriesBiquad { biqs }
}
/// Create a new series biquad, having an arbitrary number of biquads.
///
/// # Arguments
@ -83,6 +97,7 @@ impl SeriesBiquad {
let filter_coefs = &[1., 0., 0., 1., 0., 0.];
SeriesBiquad::new(filter_coefs).unwrap()
}
// pub fn fromZpk(ZerosOrPoles)
fn clone_dyn(&self) -> Box<dyn Filter> {
Box::new(self.clone())
}

464
src/filter/zpkmodel.rs Normal file
View File

@ -0,0 +1,464 @@
use std::cmp::{max, min};
use super::butter::butter_lowpass_roots;
use itertools::{EitherOrBoth, Itertools};
use num::{zero, Complex};
use crate::config::*;
use super::{Biquad, SeriesBiquad, TransferFunction};
/// Specification of a filter for a certain type.
///
/// The order corresponds to the rolloff in dB/decade. order=1 means 20
/// dB/dec, order=2 40 dB/dec and so on. For a bandpass filter, the order also
/// corresponds to the roll-off and roll-on of the filter. For this case, the
/// order is not 'shared' between the highpass and lowpass part.
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum FilterSpec {
/// Bandpass filter. Cuton frequency `fl` in \[Hz\]. Cutoff frequency `fu`
/// in \[Hz\]. Typically implemented as a highpass combined with a lowpass.
Bandpass {
/// Lower cut-on frequency \[Hz\]
fl: Flt,
/// Higher cut-off frequency \[Hz\]
fu: Flt,
/// Filter order n*20 dB/dec roll-on and off
order: u32,
},
/// Lowpass filter. Cutoff frequency `fc` in \[Hz\].
Lowpass {
/// Cut-off frequency \[Hz\]
fc: Flt,
/// Filter order n*20 dB/dec roll-on and off
order: u32,
},
/// Highpass filter. Cuton frequency `fc` in \[Hz\].
Highpass {
/// Cut-on frequency \[Hz\]
fc: Flt,
/// Filter order n*20 dB/dec roll-on and off
order: u32,
},
}
/// Analog zero-pole-gain model for real input to real output. Provides methods
/// to generate analog filters of standard type, i.e. bandpass, lowpass and
/// highpass. These can subsequentially be used to generate a digital filter.
///
/// # Example
///
/// ```rust
/// use lasprs::filter::{FilterSpec, ZPKModel};
///
/// ```
///
/// It has a transfer function that can be described as a rational function of
/// the form:
///
/// ```math
/// Π_i (s-z_i)
/// H(s) = k ------------
/// Π_i (s-p_i)
/// ```
///
/// where `Π` denotes the product of a series, `z_i` are the zeros, and `p_i`
/// are the poles. In order to have real output for a real input, the zeros and
/// poles should either be real, or come in complex conjugate pairs. This is
/// enforced by the way the poles and zero's are internally stored.
///
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct ZPKModel {
// List of zeros
z: Vec<PoleOrZero>,
// List of poles
p: Vec<PoleOrZero>,
// Gain factor
k: Flt,
// Optional: prewarping critical frequency. Used when using bilinear
// transform to create digital filter of this analogue one.
fwarp: Option<Flt>,
}
impl<'a, T: AsArray<'a, Flt>> TransferFunction<'a, T> for ZPKModel {
fn tf(&self, _fs: Flt, freq: T) -> Ccol {
let freq = freq.into();
freq.mapv(|freq| {
let s = 2. * I * pi * freq;
let mut res = Cflt::ONE;
use PoleOrZero::*;
self.z.iter().for_each(|z| match z {
Complex(z) => {
res *= (s - z) * (s - z.conj());
}
Real1(z) => {
res *= s - z;
}
Real2(z1, z2) => {
res *= (s - z1) * (s - z2);
}
});
self.p.iter().for_each(|p| match p {
Complex(p) => {
res *= 1. / ((s - p) * (s - p.conj()));
}
Real1(p) => {
res *= 1. / (s - p);
}
Real2(p1, p2) => {
res *= 1. / ((s - p1) * (s - p2));
}
});
res *= self.k;
res
})
}
}
use std::ops::Mul;
impl Mul for ZPKModel {
type Output = Self;
// Combines two ZPK model transfer functions into one
fn mul(self, rhs: ZPKModel) -> Self {
let (mut z, mut p, mut k) = (self.z, self.p, self.k);
k *= rhs.k;
z.extend(rhs.z);
p.extend(rhs.p);
ZPKModel {
z,
p,
k,
..Default::default()
}
}
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl ZPKModel {
#[pyo3(name = "butter")]
#[staticmethod]
fn butter_py<'py>(spec: FilterSpec) -> ZPKModel {
ZPKModel::butter(spec)
}
fn __repr__(&self) -> String {
format!("{self:?}")
}
/// See: [ZPKModel::tf]
#[pyo3(name = "tf")]
fn tf_py<'py>(
&self,
py: Python<'py>,
fs: Flt,
freq: PyArrayLike1<Flt>,
) -> PyResult<PyArr1Cflt<'py>> {
let freq = freq.as_array();
let res = PyArray1::from_array_bound(py, &self.tf(fs, freq));
Ok(res)
}
}
impl ZPKModel {
/// Creata a new ZPK model, with give list of poles and zeros, and gain
///
/// # Args
///
/// - `zeros` - list like struct of zeros. Can be a `Vec<ZeroOrPole>` or an
/// `&[ZeroOrPole]`.
/// - `poles` - list like struct of poles. Can be a `Vec<ZeroOrPole>` or an
/// `&[ZeroOrPole]`.
/// - `k` - linear gain.
pub fn new<T>(zeros: T, poles: T, k: Flt) -> ZPKModel
where
T: Into<Vec<PoleOrZero>>,
{
let z = zeros.into();
let p = poles.into();
ZPKModel {
z,
p,
k,
..Default::default()
}
.compactize()
}
// Combine real poles / zeros for two Real1s to 1 Real2.
fn combine_reals(v: Vec<PoleOrZero>) -> Vec<PoleOrZero> {
let mut real1: Option<PoleOrZero> = None;
let mut v: Vec<PoleOrZero> = v
.iter()
.filter_map(|z| match z {
PoleOrZero::Complex(z) => Some(PoleOrZero::Complex(*z)),
PoleOrZero::Real2(z1, z2) => Some(PoleOrZero::Real2(*z1, *z2)),
PoleOrZero::Real1(z) => {
if let Some(real1) = real1.take() {
if let PoleOrZero::Real1(z2) = real1 {
return Some(PoleOrZero::Real2(z2, *z));
} else {
unreachable!()
}
} else {
real1 = Some(PoleOrZero::Real1(*z));
return None;
}
}
})
.collect();
// A leftover real1, push it at the end
if let Some(real1) = real1 {
if let PoleOrZero::Real1(z) = real1 {
v.push(PoleOrZero::Real1(z));
} else {
unreachable!()
}
}
v
}
// Compactice filter, combines real1 poles/zeros to create more real2 poles/zeros.
fn compactize(self) -> ZPKModel {
let (z, p, k, fwarp) = (self.z, self.p, self.k, self.fwarp);
let z = Self::combine_reals(z);
let p = Self::combine_reals(p);
ZPKModel { z, p, k, fwarp }
}
// Set critical frequency in filter
fn setWarpFreq(mut self, fcrit: Flt) -> ZPKModel {
self.fwarp = Some(fcrit);
self
}
/// Change the gain value such that it matches `val` at frequency `freq`.
/// Does not change the phase at the given frequency.
pub fn setGainAt(mut self, freq: Flt, required_gain: Flt) -> ZPKModel {
assert!(required_gain > 0.);
let freq = [freq];
let cur_gain_at_freq = self.tf(-1.0, &freq)[0].abs();
let gain_fac = required_gain / cur_gain_at_freq;
// Update overall gain to set it equal to val
self.k *= gain_fac;
self
}
// For each original pole in the lowpass filter, generate new poles and
// zeros that transform the lowpass filter to a bandpass filter with mid
// frequency of `fc` and bandwidth `Bw_Hz` = fu-fl.
// Returns first a list of new poles, and secondly a list of extra zeros.
fn replace_poles_lp2bp(
pzlp: PoleOrZero,
fc: Flt,
Bw_Hz: Flt,
) -> (Vec<PoleOrZero>, Vec<PoleOrZero>) {
let omgc = 2. * pi * fc;
let mut new_poles = Vec::with_capacity(2);
let mut extra_zeros = Vec::with_capacity(2);
let omgcsq = omgc.powi(2);
match pzlp {
PoleOrZero::Real1(pz) => {
// Scale each pole or zero from the original cut-off frequency
// of the low-pass filter to the new bandwidth divided by 2
let pz_lp = pz * Bw_Hz / fc / 2.;
let sq = pz_lp.powi(2) - omgcsq;
if sq >= 0. {
let sqrt = sq.sqrt();
// For every 2 poles that are the result of a single
// original pole, we will have 1 new zero
extra_zeros.push(PoleOrZero::Real1(0.));
new_poles.push(PoleOrZero::Real2(
pz_lp + sqrt, // Two new poles or zeros
pz_lp - sqrt, // Two new poles or zeros
));
} else {
let sqrt = (sq + 0. * I).sqrt();
// For every 2 poles that are the result of a single
// original pole, we will have 1 new zero. A complex pole
// also has its complex conjugate as a new pole, zo we have
// one real zero here.
extra_zeros.push(PoleOrZero::Real1(0.));
new_poles.push(PoleOrZero::Complex(pz_lp + sqrt));
}
}
PoleOrZero::Real2(z1, z2) => {
// We do this in two parts. Not the most efficient, but we see
// filter calculation as a `once in a while calculation`.
for z in [z1, z2] {
let (np, ez) = Self::replace_poles_lp2bp(PoleOrZero::Real1(z), fc, Bw_Hz);
new_poles.extend(np);
extra_zeros.extend(ez);
}
}
PoleOrZero::Complex(pz) => {
// Scale each pole or zero from the original cut-off frequency
// of the low-pass filter to the new bandwidth divided by 2
let pz_lp = pz * Bw_Hz / fc / 2.;
let sqrt = (pz_lp.powi(2) - omgcsq).sqrt();
extra_zeros.push(PoleOrZero::Real2(0., 0.));
new_poles.push(PoleOrZero::Complex(pz_lp + sqrt));
new_poles.push(PoleOrZero::Complex(pz_lp - sqrt));
}
}
(new_poles, extra_zeros)
}
fn lowpass_to_bandpass(self, fc: Flt, Bw_Hz: Flt) -> ZPKModel {
// Lowpass to bandpass transformation. Means, we map:
// s^2 + omg_1 * omg_2
// s -> --------------------
// ( omg_2 - omg_1) + s
// This means, for each (s - z), we get:
//
// s^2 -z*s + omg_1 * omg_2 - z*(omg_2-omg_1)
// (s-z) -> ---------------------------------------------
// ( omg_2 - omg_1) + s
// So:
// - we get a new pole real at omg_1 - omg_2
// - And a new zero at: 0
let (mut z, p, k, fwarp) = (self.z, self.p, self.k, self.fwarp);
// Does not *yet* work with zeros in the lowpass filter.
assert!(z.len() == 0);
let mut new_poles = Vec::with_capacity(2 * p.len());
// Replace poles with new poles of the bandpass flter, add extra zeros
// to the list of zeros
for p in p {
let (new_poles_current, extra_zeros_current) = Self::replace_poles_lp2bp(p, fc, Bw_Hz);
new_poles.extend(new_poles_current);
z.extend(extra_zeros_current);
}
ZPKModel {
z,
p: new_poles,
k,
fwarp,
}
}
/// Create a Butterworth filter according to a certain specification
pub fn butter(spec: FilterSpec) -> ZPKModel {
match spec {
FilterSpec::Lowpass { fc, order } => {
assert!(fc > 0.);
assert!(order > 0);
let p = butter_lowpass_roots(fc, order as u32).collect();
let z = vec![];
ZPKModel {
z,
p,
k: 1.0,
..Default::default()
}
.compactize()
.setGainAt(fc, (0.5).sqrt())
.setWarpFreq(fc)
}
FilterSpec::Highpass { fc, order } => {
assert!(fc > 0.);
assert!(order > 0);
let p = butter_lowpass_roots(fc, order as u32).collect();
let z = vec![PoleOrZero::Real1(0.); order as usize];
ZPKModel {
z,
p,
k: 1.0,
..Default::default()
}
.compactize()
.setGainAt(fc, (0.5).sqrt())
.setWarpFreq(fc)
}
FilterSpec::Bandpass { fl, fu, order } => {
assert!(fl <= fu && fl > 0.);
assert!(order > 0);
let fmid = (fl * fu).sqrt();
let Bw_Hz = fu - fl;
let lp = Self::butter(FilterSpec::Lowpass { fc: fmid, order });
Self::lowpass_to_bandpass(lp, fmid, Bw_Hz)
.compactize()
.setGainAt(fmid, 1.0)
.setWarpFreq(fmid)
}
}
}
/// Apply bilinear transform to obtain series biquads from this ZPK model.
/// No prewarping taken into account.
///
/// # Args
///
/// - `fs` - Sampling frequency \[Hz\]
///
pub fn bilinear(&self, fs: Flt) -> SeriesBiquad {
let mut biqs = vec![];
// We spread the gain over all biquads.
let max_len = max(self.z.len(), self.p.len());
if max_len == 0 {
// No poles or zeros, return a gain-only biquad series with only one
// biquad.
return SeriesBiquad::new(&[self.k, 0., 0., 1., 0., 0.]).unwrap();
}
// Convert to floating point
let max_len = max_len as Flt;
// Spread gain over all biquads
let k_fac = self.k.powf(1. / max_len);
for case in self.z.iter().zip_longest(&self.p) {
match case {
EitherOrBoth::Both(z, p) => {
biqs.push(Biquad::bilinear_zpk(
fs,
Some(*z),
Some(*p),
Some(k_fac),
self.fwarp,
));
}
EitherOrBoth::Left(z) => {
biqs.push(Biquad::bilinear_zpk(
fs,
Some(*z),
None,
Some(k_fac),
self.fwarp,
));
}
EitherOrBoth::Right(p) => {
biqs.push(Biquad::bilinear_zpk(
fs,
None,
Some(*p),
Some(k_fac),
self.fwarp,
));
}
}
}
SeriesBiquad::newFromBiqs(biqs)
}
}
/// Enumeration describing a pole or zero, a complex conjugate pair, a single
/// real pole / zero, or a set of two real poles / zeros, or nothing at all.
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum PoleOrZero {
/// Complex conjugate pair, only single one listed, other one can be
/// inferred.
Complex(Cflt),
/// Set of two real poles / zeros
Real2(Flt, Flt),
/// Single zero / pole
Real1(Flt),
}

View File

@ -53,6 +53,8 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<filter::SeriesBiquad>()?;
m.add_class::<filter::BiquadBank>()?;
m.add_class::<siggen::Siggen>()?;
m.add_class::<filter::FilterSpec>()?;
m.add_class::<filter::ZPKModel>()?;
Ok(())
}

View File

@ -64,7 +64,7 @@ impl ApsSettings {
pub fn nfft(&self) -> usize {
self.nfft
}
pub fn get_overlap_keep(&self) -> usize {
fn get_overlap_keep(&self) -> usize {
self.validate_get_overlap_keep().unwrap()
}
/// Unpack all, returns parts in tuple
@ -176,7 +176,7 @@ impl Default for Overlap {
/// The 'mode' used in computing averaged power spectra. When providing data in
/// blocks to the [AvPowerSpectra] the resulting 'current estimate' responds
/// differently, depending on the model.
#[derive(Default, Clone)]
#[derive(Default, Copy, Clone)]
pub enum ApsMode {
/// Averaged over all data provided. New averages can be created by calling
/// `AvPowerSpectra::reset()`

View File

@ -1,4 +1,8 @@
use crate::config::{pi, Flt};
use crate::filter::{Filter, PoleOrZero, SeriesBiquad, TransferFunction, ZPKModel};
use num::Float;
use std::default;
use std::ops::Deref;
use strum_macros::{Display, EnumMessage};
/// Sound level frequency weighting type (A, C, Z)
@ -13,7 +17,76 @@ pub enum FreqWeightingType {
Z,
}
struct FreqWeightingFilter;
struct FreqWeightingFilter {
// The calculated filter for this frequency weighting
filter: SeriesBiquad,
}
impl Deref for FreqWeightingFilter {
type Target = SeriesBiquad;
fn deref(&self) -> &Self::Target {
&self.filter
}
}
impl FreqWeightingFilter {
pub fn new(fs: Flt, wt: FreqWeightingType) -> FreqWeightingFilter {
if let FreqWeightingType::Z = wt {
return FreqWeightingFilter {
filter: SeriesBiquad::unit(),
};
}
let fr: Flt = 1000.;
let fL: Flt = Float::powf(10., 1.5);
let fH: Flt = Float::powf(10., 3.9);
let sq5: Flt = Float::powf(5., 0.5);
let fLsq = fL.powi(2);
let fHsq: Flt = fH.powi(2);
let frsq: Flt = fr.powi(2);
let fA = Float::powf(10., 2.45);
let D = Float::powf(2., 0.5);
let b = (1. / (1. - D)) * (frsq + fLsq * fHsq / frsq - D * (fLsq + fHsq));
let c = fLsq * fHsq;
let f2 = (3. - sq5) / 2. * fA;
let f3 = (3. + sq5) / 2. * fA;
let f1 = ((-b - (b.powi(2) - 4. * c).sqrt()) / 2.).sqrt();
let f4 = ((-b + (b.powi(2) - 4. * c).sqrt()) / 2.).sqrt();
let p1 = 2. * pi * f1;
let p2 = 2. * pi * f2;
let p3 = 2. * pi * f3;
let p4 = 2. * pi * f4;
let (zeros, poles) = match wt {
FreqWeightingType::Z => {
unreachable!()
}
FreqWeightingType::C => {
let zeros = vec![PoleOrZero::Real2(0., 0.)];
let poles = vec![PoleOrZero::Real2(-p1, -p1), PoleOrZero::Real2(-p4, -p4)];
(zeros, poles)
}
FreqWeightingType::A => {
let poles = vec![
PoleOrZero::Real2(-p1, -p1),
PoleOrZero::Real2(-p2, -p3),
PoleOrZero::Real2(-p4, -p4),
];
let zeros = vec![PoleOrZero::Real2(0., 0.), PoleOrZero::Real2(0., 0.)];
(zeros, poles)
}
};
return FreqWeightingFilter {
filter: ZPKModel::new(zeros, poles, 1.0)
.setGainAt(1000., 1.0)
.bilinear(fs),
};
}
}
#[cfg(test)]
mod test {

15
src/slm/mod.rs Normal file
View File

@ -0,0 +1,15 @@
//! Sound Level Meter (SLM) module.
//!
//! Provides structs and helpers (SLMBuilder) for creating configurated Sound
//! Level Meters.
//!
/// Sound Level Meter
struct SLM {
}
impl SLM {
}

View File

@ -6,6 +6,6 @@
# $ cargo install cargo-watch cargo-docserver`
# ```
#
cargo watch -s "cargo rustdoc --lib && cargo docserve"
cargo watch -s "clear && cargo rustdoc -p lasprs --lib && cargo docserve"
# Then open: ${browser} http://localhost:4000