Cleanup code. Added ApsSettingsBuilder, bugfixes in RtAps.

This commit is contained in:
Anne de Jong 2024-08-03 12:23:39 +02:00
parent ada5bfb427
commit 649c9f549c
10 changed files with 369 additions and 206 deletions

View File

@ -29,8 +29,8 @@ num = "0.4.3"
rayon = "1.8.0"
# Python bindings
pyo3 = { version = "0.21.2", optional = true, features = ["extension-module", "anyhow"]}
numpy = { version = "0.21.0", optional = true}
pyo3 = { version = "0.22.2", optional = true, features = ["extension-module", "anyhow"]}
numpy = { git = "https://github.com/JRRudy1/rust-numpy", branch = "pyo3-0.22.0" , optional = true}
# White noise etc
rand = "0.8.5"
@ -83,6 +83,7 @@ realfft = "3.3.0"
# Fast Mutex
parking_lot = "0.12.3"
derive_builder = "0.20.0"
[dev-dependencies]
approx = "0.5.1"

View File

@ -13,7 +13,7 @@ use super::{StreamStatus, StreamMetaData};
#[cfg(feature = "cpal-api")]
pub mod api_cpal;
#[cfg(feature = "pulse_api")]
#[cfg(feature = "pulse-api")]
pub mod api_pulse;
/// A currently running stream

View File

@ -249,7 +249,6 @@ impl StreamMgr {
.expect("No input streams queues!");
let threadhandle = std::thread::spawn(move || {
let mut ctr: usize = 0;
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
@ -275,13 +274,8 @@ impl StreamMgr {
}
}
if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) {
// println!("Obtained raw stream data!");
if let InStreamMsg::StreamError(e) = msg {
}
sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg);
ctr += 1;
}
}
iqueues
@ -382,7 +376,6 @@ impl StreamMgr {
break 'infy;
}
}
}
siggen
});

View File

@ -2,87 +2,87 @@ use super::timebuffer::TimeBuffer;
use super::CrossPowerSpecra;
use super::*;
use crate::config::*;
use anyhow::{bail, Result};
use anyhow::{bail, Error, Result};
use derive_builder::Builder;
use freqweighting::FreqWeightingType;
/// Provide the overlap of blocks for computing averaged (cross) power spectra.
/// Can be provided as a percentage of the block size, or as a number of
/// samples.
pub enum Overlap {
/// Overlap specified as a percentage of the total FFT length. Value should
/// be 0<=pct<100.
Percentage(Flt),
/// Number of samples to overlap
Number(usize),
/// No overlap at all, which is the same as Overlap::Number(0)
NoOverlap,
}
impl Default for Overlap {
fn default() -> Self {
Overlap::Percentage(50.)
}
}
/// The 'mode' used in computing averaged power spectra. When providing data in
/// blocks to the [AvPowerSpectra] the resulting 'current estimate' responds
/// differently, depending on the model.
#[derive(Default)]
pub enum ApsMode {
/// Averaged over all data provided. New averages can be created by calling
/// `AvPowerSpectra::reset()`
#[default]
AllAveraging,
/// In this mode, the `AvPowerSpectra` works a bit like a sound level meter,
/// where new data is weighted with old data, and old data exponentially
/// backs off. This mode only makes sense when `tau >> nfft/fs`
ExponentialWeighting {
/// Sampling frequency in `[Hz]`, used for computing IIR filter coefficient.
fs: Flt,
/// Time weighting constant, follows convention of Sound Level Meters.
/// Means the data is approximately low-pass filtered with a cut-off
/// frequency f_c of s/tau ≅ 1 → f_c = (2 * pi * tau)^-1.
tau: Flt,
},
/// Spectrogram mode. Only returns the latest estimate(s).
Spectrogram,
}
/// Averaged power spectra computing engine
/// Used to compute power spectra estimations on
/// long datasets, where nfft << length of data. This way, the variance of a
/// single periodogram is suppressed with increasing number of averages.
///
/// For more information, see the book on numerical recipes.
///
pub struct AvPowerSpectra {
// Power spectra estimator for single block
ps: PowerSpectra,
// The mode with which the AvPowerSpectra is computing.
/// All settings used for computing averaged power spectra using Welch' method.
#[derive(Builder, Clone)]
#[builder(build_fn(validate = "Self::validate", error = "Error"))]
pub struct ApsSettings {
/// Mode of computation, see [ApsMode].
#[builder(default)]
mode: ApsMode,
// The number of samples to keep in the time buffer when overlapping time
// blocks
overlap_keep: usize,
/// The number of blocks of length [self.nfft()] already used in the average
N: usize,
/// Storage for sample data.
timebuf: TimeBuffer,
// Current estimation of the power spectra
cur_est: CPSResult,
/// Overlap in time segments. See [Overlap].
#[builder(default)]
overlap: Overlap,
/// Window applied to time segments. See [WindowType].
#[builder(default)]
windowType: WindowType,
/// Kind of freqency weighting. Defaults to Z
#[builder(default)]
freqWeightingType: FreqWeightingType,
/// FFT Length
nfft: usize,
/// Sampling frequency
fs: Flt,
}
impl AvPowerSpectra {
/// The FFT Length of estimating (cross)power spectra
impl ApsSettingsBuilder {
fn validate(&self) -> Result<()> {
let fs = self.fs.unwrap();
if !fs.is_normal() {
bail!("Sampling frequency not a normal number")
}
if fs <= 0.0 {
bail!("Invalid sampling frequency given as parameter");
}
if self.nfft.is_none() {
bail!("nfft not specified")
};
let nfft = self.nfft.unwrap();
if nfft % 2 != 0 {
bail!("NFFT should be even")
}
if nfft == 0 {
bail!("Invalid NFFT, should be > 0.")
}
// Perform some checks on ApsMode
if let Some(ApsMode::ExponentialWeighting { tau }) = self.mode {
if tau <= 0.0 {
bail!("Invalid time weighting constant [s]. Should be > 0 if given.");
}
}
Ok(())
}
}
impl ApsSettings {
/// Returns nfft
pub fn nfft(&self) -> usize {
self.ps.nfft()
self.nfft
}
pub fn get_overlap_keep(&self) -> usize {
self.validate_get_overlap_keep().unwrap()
}
/// Unpack all, returns parts in tuple
pub fn get(self) -> (ApsMode, Overlap, WindowType, FreqWeightingType, usize, Flt) {
(
self.mode,
self.overlap,
self.windowType,
self.freqWeightingType,
self.nfft,
self.fs,
)
}
/// Returns the amount of samples to `keep` in the time buffer when
/// overlapping time segments using [TimeBuffer].
fn get_overlap_keep(nfft: usize, overlap: Overlap) -> Result<usize> {
let overlap_keep = match overlap {
pub fn validate_get_overlap_keep(&self) -> Result<usize> {
let nfft = self.nfft;
let overlap_keep = match self.overlap {
Overlap::Number(i) if i >= nfft => {
bail!("Invalid overlap number of samples. Should be < nfft, which is {nfft}.")
}
@ -104,6 +104,129 @@ impl AvPowerSpectra {
Ok(overlap_keep)
}
/// Return a reasonable acoustic default with a frequency resolution around
/// ~ 10 Hz, where nfft is still an integer power of 2.
///
/// # Errors
///
/// If `fs` is something odd, i.e. < 1 kHz, or higher than 1 MHz.
///
pub fn reasonableAcousticDefault(fs: Flt, mode: ApsMode) -> Result<ApsSettings> {
if fs < 1e3 || fs > 1e6 {
bail!("Sampling frequency for reasonable acoustic data is >= 1 kHz and <= 1 MHz.");
}
let fs_div_10_rounded = (fs / 10.) as u32;
// 2^30 is about 1 million. We search for a two-power of an nfft that is
// the closest to fs/10. The frequency resolution is about fs/nfft.
let nfft = (0..30).map(|i| 2u32.pow(i) - fs_div_10_rounded).fold(
// Start wth a value that is always too large
fs as u32 * 10,
|cur, new| cur.min(new),
) as usize;
Ok(ApsSettings {
mode,
fs,
nfft,
windowType: WindowType::default(),
overlap: Overlap::default(),
freqWeightingType: FreqWeightingType::default(),
})
}
/// Return sampling frequency
pub fn fs(&self) -> Flt {
self.fs
}
/// Return Nyquist frequency
pub fn fnyq(&self) -> Flt {
self.fs() / 2.
}
/// Returns a single-sided frequency array corresponding to points in Power
/// spectra computation.
pub fn getFreq(&self) -> Array1<Flt> {
let df = self.fs / self.nfft as Flt;
let K = self.nfft / 2 + 1;
Array1::linspace(0., (K - 1) as Flt * df, K)
}
}
/// Provide the overlap of blocks for computing averaged (cross) power spectra.
/// Can be provided as a percentage of the block size, or as a number of
/// samples.
#[derive(Clone, Debug)]
pub enum Overlap {
/// Overlap specified as a percentage of the total FFT length. Value should
/// be 0<=pct<100.
Percentage(Flt),
/// Number of samples to overlap
Number(usize),
/// No overlap at all, which is the same as Overlap::Number(0)
NoOverlap,
}
impl Default for Overlap {
fn default() -> Self {
Overlap::Percentage(50.)
}
}
/// The 'mode' used in computing averaged power spectra. When providing data in
/// blocks to the [AvPowerSpectra] the resulting 'current estimate' responds
/// differently, depending on the model.
#[derive(Default, Clone)]
pub enum ApsMode {
/// Averaged over all data provided. New averages can be created by calling
/// `AvPowerSpectra::reset()`
#[default]
AllAveraging,
/// In this mode, the `AvPowerSpectra` works a bit like a sound level meter,
/// where new data is weighted with old data, and old data exponentially
/// backs off. This mode only makes sense when `tau >> nfft/fs`
ExponentialWeighting {
/// Time weighting constant, follows convention of Sound Level Meters.
/// Means the data is approximately low-pass filtered with a cut-off
/// frequency f_c of s/tau ≅ 1 → f_c = (2 * pi * tau)^-1.
tau: Flt,
},
/// Spectrogram mode. Only returns the latest estimate(s).
Spectrogram,
}
/// Averaged power spectra computing engine
/// Used to compute power spectra estimations on
/// long datasets, where nfft << length of data. This way, the variance of a
/// single periodogram is suppressed with increasing number of averages.
///
/// For more information, see the book on numerical recipes.
///
pub struct AvPowerSpectra {
// Power spectra estimator for single block
ps: PowerSpectra,
settings: ApsSettings,
// The number of samples to keep in the time buffer when overlapping time
// blocks
overlap_keep: usize,
/// The number of blocks of length [self.nfft()] already used in the average
N: usize,
/// Storage for sample data.
timebuf: TimeBuffer,
// Current estimation of the power spectra
cur_est: CPSResult,
}
impl AvPowerSpectra {
/// The FFT Length of estimating (cross)power spectra
pub fn nfft(&self) -> usize {
self.ps.nfft()
}
/// Resets all state, starting with a clean sleave. After this step, also
/// the number of channels can be different on the input.
pub fn reset(&mut self) {
@ -119,14 +242,19 @@ impl AvPowerSpectra {
///
/// # Args
///
/// * `nfft` - FFT Length
/// * `fs` - Sampling frequency in \[Hz\]
/// * `nfft` - FFT Length \[-\]
///
/// # Panics
///
/// - When nfft is not even, or 0.
/// - When providing invalid sampling frequencies
///
pub fn new_simple(nfft: usize) -> AvPowerSpectra {
AvPowerSpectra::build(nfft, None, None, None).unwrap()
pub fn new_simple_all_averaging(fs: Flt, nfft: usize) -> AvPowerSpectra {
let mut settings =
ApsSettings::reasonableAcousticDefault(fs, ApsMode::AllAveraging).unwrap();
settings.nfft = nfft;
AvPowerSpectra::new(settings)
}
/// Create power spectra estimator which weighs either all data
@ -148,52 +276,24 @@ impl AvPowerSpectra {
/// - `overlap` - Amount of overlap in
/// - `mode` - The mode in which the [AvPowerSpectra] runs. See [ApsMode].
///
pub fn build(
nfft: usize,
windowtype: Option<WindowType>,
overlap: Option<Overlap>,
mode: Option<ApsMode>,
) -> Result<AvPowerSpectra> {
if nfft % 2 != 0 {
bail!("NFFT should be even")
}
if nfft == 0 {
bail!("Invalid NFFT")
}
let windowtype = windowtype.unwrap_or_default();
let window = Window::new(windowtype, nfft);
let mode = mode.unwrap_or_default();
let overlap = overlap.unwrap_or_default();
// Perform some checks on ApsMode
if let ApsMode::ExponentialWeighting { fs, tau } = mode {
if fs <= 0.0 {
bail!("Invalid sampling frequency given as parameter");
}
if tau <= 0.0 {
bail!("Invalid time weighting constant [s]. Should be > 0 if given.");
}
}
pub fn new(settings: ApsSettings) -> AvPowerSpectra {
let overlap_keep = settings.get_overlap_keep();
let window = Window::new(settings.windowType, settings.nfft);
let ps = PowerSpectra::newFromWindow(window);
let overlap_keep = Self::get_overlap_keep(nfft, overlap)?;
Ok(AvPowerSpectra {
AvPowerSpectra {
ps,
overlap_keep,
mode,
settings,
N: 0,
cur_est: CPSResult::default((0, 0, 0)),
timebuf: TimeBuffer::new(),
})
}
}
// Update result for single block
fn update_singleblock<'a, T>(&mut self, timedata: T)
where
T: AsArray<'a, Flt, Ix2>,
{
let timeblock = timedata.into();
let Cpsnew = self.ps.compute(timeblock);
fn update_singleblock(&mut self, timedata: ArrayView2<Flt>) {
let Cpsnew = self.ps.compute(timedata);
// println!("Cpsnew: {:?}", Cpsnew[[0, 0, 0]]);
// Initialize to zero
@ -206,7 +306,7 @@ impl AvPowerSpectra {
self.N += 1;
// Apply operation based on mode
match self.mode {
match self.settings.mode {
ApsMode::AllAveraging => {
let Nf = Cflt {
re: self.N as Flt,
@ -214,7 +314,7 @@ impl AvPowerSpectra {
};
self.cur_est = (Nf - 1.) / Nf * &self.cur_est + 1. / Nf * Cpsnew;
}
ApsMode::ExponentialWeighting { fs, tau } => {
ApsMode::ExponentialWeighting { tau } => {
debug_assert!(self.N > 0);
if self.N == 1 {
self.cur_est = Cpsnew;
@ -250,7 +350,7 @@ impl AvPowerSpectra {
// y[n] = alpha * y[n-1] + (1-alpha) * x[n]
// where alpha = exp(-T/tau).
let T = (self.nfft() - self.overlap_keep) as Flt / fs;
let T = (self.nfft() - self.overlap_keep) as Flt / self.settings.fs;
let alpha = Cflt::ONE * Flt::exp(-T / tau);
self.cur_est = alpha * &self.cur_est + (1. - alpha) * Cpsnew;
}
@ -287,7 +387,7 @@ impl AvPowerSpectra {
// Iterate over all blocks that can come,
while let Some(timeblock) = self.timebuf.pop(self.nfft(), self.overlap_keep) {
// Compute cross-power spectra for current time block
self.update_singleblock(&timeblock);
self.update_singleblock(timeblock.view());
computed_single = true;
}
@ -323,7 +423,7 @@ impl AvPowerSpectra {
// Iterate over all blocks that can come,
while let Some(timeblock) = self.timebuf.pop(self.nfft(), self.overlap_keep) {
// Compute cross-power spectra for current time block
self.update_singleblock(&timeblock);
self.update_singleblock(timeblock.view());
result.push(self.cur_est.clone());
}
@ -338,32 +438,34 @@ mod test {
use ndarray_rand::rand_distr::Normal;
use ndarray_rand::RandomExt;
use super::CrossPowerSpecra;
use super::*;
use crate::config::*;
use super::{ApsMode, AvPowerSpectra, CPSResult, Overlap, WindowType};
use Overlap::Percentage;
#[test]
fn test_overlap_keep() {
let nfft = 10;
assert_eq!(
AvPowerSpectra::get_overlap_keep(nfft, Overlap::Percentage(50.)).unwrap(),
nfft / 2
);
let nfft = 11;
assert_eq!(
AvPowerSpectra::get_overlap_keep(nfft, Overlap::Percentage(50.)).unwrap(),
nfft / 2
);
let nfft = 1024;
assert_eq!(
AvPowerSpectra::get_overlap_keep(nfft, Overlap::Percentage(25.)).unwrap(),
nfft / 4
);
assert_eq!(
AvPowerSpectra::get_overlap_keep(nfft, Overlap::Number(10)).unwrap(),
10
);
let ol = [
Overlap::NoOverlap,
Percentage(50.),
Percentage(50.),
Percentage(25.),
Overlap::Number(10),
];
let nffts = [10, 10, 1024, 10];
let expected_keep = [0, 5, 512, 2, 10];
for ((expected, nfft), overlap) in expected_keep.iter().zip(nffts.iter()).zip(ol.iter()) {
let settings = ApsSettingsBuilder::default()
.nfft(*nfft)
.fs(1.)
.overlap(overlap.clone())
.build()
.unwrap();
assert_eq!(settings.get_overlap_keep(), *expected);
}
}
/// When the time constant is 1.0, every second the powers approximately
@ -373,42 +475,33 @@ mod test {
let nfft = 48000;
let fs = nfft as Flt;
let tau = 2.;
let mut aps = AvPowerSpectra::build(
nfft,
None,
Some(Overlap::NoOverlap),
Some(ApsMode::ExponentialWeighting { fs, tau }),
)
let settings = ApsSettingsBuilder::default()
.fs(fs)
.nfft(nfft)
.overlap(Overlap::NoOverlap)
.mode(ApsMode::ExponentialWeighting { tau })
.build()
.unwrap();
let overlap_keep = settings.get_overlap_keep();
let mut aps = AvPowerSpectra::new(settings);
assert_eq!(aps.overlap_keep, 0);
let distr = Normal::new(1.0, 1.0).unwrap();
let timedata_some = Dmat::random((nfft, 1), distr);
let timedata_zeros = Dmat::zeros((nfft, 1));
let mut first_result = CPSResult::zeros((0, 0, 0));
// Clone here, as first_result reference is overwritten by subsequent
// calls to compute_last.
let first_result = aps.compute_last(timedata_some.view()).unwrap().clone();
aps.compute_last(&timedata_zeros).unwrap();
let last = aps.compute_last(&timedata_zeros).unwrap();
if let Some(v) = aps.compute_last(timedata_some.view()) {
first_result = v.clone();
// println!("{:?}", first_result.ap(0)[0]);
} else {
assert!(false, "Should return one value");
}
let overlap_keep = AvPowerSpectra::get_overlap_keep(nfft, Overlap::NoOverlap).unwrap();
if let Some(_) = aps.compute_last(&timedata_zeros) {
// Do nothing with it.
} else {
assert!(false, "Should return one value");
}
if let Some(v) = aps.compute_last(&timedata_zeros) {
let alpha = Flt::exp(-((nfft - overlap_keep) as Flt) / (fs * tau));
// let alpha: Flt = 1.;
for i in 0..nfft / 2 + 1 {
// println!("i={i}");
assert_abs_diff_eq!(first_result.ap(0)[i] * alpha.powi(2), v.ap(0)[i]);
}
} else {
assert!(false, "Should return one value");
assert_abs_diff_eq!(first_result.ap(0)[i] * alpha.powi(2), last.ap(0)[i]);
}
assert_eq!(aps.N, 3);
}
@ -422,7 +515,12 @@ mod test {
.push_column(timedata.column(0).mapv(|a| 2. * a).view())
.unwrap();
let mut aps = AvPowerSpectra::build(nfft, None, None, None).unwrap();
let settings = ApsSettingsBuilder::default()
.fs(1.0)
.nfft(nfft)
.build()
.unwrap();
let mut aps = AvPowerSpectra::new(settings);
if let Some(v) = aps.compute_last(&timedata) {
let tf = v.tf(0, 1, None);
assert_eq!((&tf - 2.0 * Cflt::ONE).sum().abs(), 0.0);
@ -444,7 +542,12 @@ mod test {
.push_column(timedata.column(0).mapv(|a| -1. * a).view())
.unwrap();
let mut aps = AvPowerSpectra::build(nfft, None, None, None).unwrap();
let settings = ApsSettingsBuilder::default()
.fs(1.)
.nfft(nfft)
.build()
.unwrap();
let mut aps = AvPowerSpectra::new(settings);
if let Some(v) = aps.compute_last(&timedata) {
let tf = v.tf(0, 1, Some(2));
assert_eq!((&tf - 2.0 * Cflt::ONE).sum().abs(), 0.0);
@ -466,7 +569,13 @@ mod test {
Some(WindowType::Blackman),
None,
] {
let mut aps = AvPowerSpectra::build(nfft, wt, None, None).unwrap();
let settings = ApsSettingsBuilder::default()
.fs(1.0)
.nfft(nfft)
.windowType(wt.unwrap_or_default())
.build()
.unwrap();
let mut aps = AvPowerSpectra::new(settings);
if let Some(v) = aps.compute_last(&timedata) {
let ap = v.ap(0);
assert_abs_diff_eq!((&ap).sum().abs(), timedata_mean_square, epsilon = 1e-2);
@ -475,4 +584,18 @@ mod test {
}
}
}
#[test]
#[should_panic]
fn test_apssettings1() {
let s = ApsSettingsBuilder::default().build().unwrap();
}
#[test]
fn test_apssettings2() {
let s = ApsSettingsBuilder::default()
.nfft(2048)
.fs(1.0)
.build()
.unwrap();
}
}

30
src/ps/freqweighting.rs Normal file
View File

@ -0,0 +1,30 @@
use std::default;
use strum_macros::{Display, EnumMessage};
/// Sound level frequency weighting type (A, C, Z)
#[derive(Display, Debug, EnumMessage, Default, Clone)]
pub enum FreqWeightingType {
/// A-weighting
A,
/// C-weighting
C,
/// Z-weighting, or no weighting
#[default]
Z,
}
struct FreqWeightingFilter;
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test() {
let a = FreqWeightingType::A;
let c = FreqWeightingType::C;
let z = FreqWeightingType::Z;
println!("A-weighting: {a}");
println!("C-weighting: {c}");
println!("Z-weighting: {z}");
}
}

View File

@ -9,9 +9,11 @@ mod fft;
mod ps;
mod timebuffer;
mod window;
mod freqweighting;
use crate::config::*;
pub use aps::{ApsMode, AvPowerSpectra, Overlap};
pub use freqweighting::FreqWeightingType;
pub use aps::{ApsSettings, ApsSettingsBuilder,ApsMode, AvPowerSpectra, Overlap};
pub use ps::{CrossPowerSpecra, PowerSpectra, CPSResult};
pub use window::{Window, WindowType};

View File

@ -8,6 +8,7 @@ use std::collections::VecDeque;
/// TimeBuffer, storage to add blocks of data in a ring buffer, that can be
/// extracted by blocks of other size. Also, we can keep samples in a buffer to
/// create, for example, overlapping windows of time data.
#[derive(Default)]
pub struct TimeBuffer {
data: Vec<VecDeque<Flt>>,
}

View File

@ -72,8 +72,7 @@ fn hamming(N: usize) -> Dcol {
/// * Blackman
///
/// The [WindowType::default] is [WindowType::Hann].
#[derive(Display, Clone, Debug)]
#[derive(Default)]
#[derive(Display,Default, Copy, Clone, Debug)]
pub enum WindowType {
/// Von Hann window
#[default]

View File

@ -2,3 +2,4 @@
//! data 'on the fly'. Examples are real time power spectra plotting
//! (Spectrograms, Auto powers, ..., or )
mod rtaps;
pub use rtaps::{RtAps, RtApsResult};

View File

@ -2,6 +2,7 @@ use std::ops::Deref;
use std::thread::{self, JoinHandle};
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
use crate::ps::ApsSettings;
use crate::ps::{AvPowerSpectra, CPSResult};
use crate::I;
use anyhow::Result;
@ -14,41 +15,49 @@ enum RtApsComm {
NewResult(CPSResult),
NewMeta(Arc<StreamMetaData>),
}
/// Result type coming from Real time Averaged Power Spectra computing engine
pub enum RtApsResult {
/// New result
NewResult(CPSResult),
/// New metadata
NewMeta(Arc<StreamMetaData>),
}
/// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent'
pub struct RtAps {
/// Storage for optional last result
comm: Arc<Mutex<Option<RtApsComm>>>,
/// Settings used for real time power spectra.
pub settings: ApsSettings,
}
impl RtAps {
/// Build new Real time power spectra computing engine.
pub fn build(mgr: &mut StreamMgr) -> Result<RtAps> {
/// Create new Real time power spectra computing engine.
pub fn new(mgr: &mut StreamMgr, settings: ApsSettings) -> RtAps {
// Handler needs to be created here.
let handler = StreamHandler::new(mgr);
let last_result = Arc::new(Mutex::new(None));
let last_result2 = last_result.clone();
let settings2 = settings.clone();
let mut aps = AvPowerSpectra::build(2048, None, None, None)?;
let mut aps = AvPowerSpectra::new(settings);
let thread = std::thread::spawn(move || {
println!("Thread started...");
// println!("Thread started...");
let rx = handler.rx;
// What is running on the thread
'mainloop: loop {
let mut last_cps: Option<CPSResult> = None;
let mut meta: Option<Arc<StreamMetaData>> = None;
'mainloop: loop {
println!("LOOP");
'msgloop: for msg in &rx {
println!("Message found!");
if let Some(msg) = rx.recv_timeout(std::time::Duration::from_millis(10)).ok() {
match msg {
InStreamMsg::StreamStarted(new_meta) => {
aps.reset();
last_cps = None;
meta = Some(new_meta);
break 'msgloop;
}
InStreamMsg::StreamStopped | InStreamMsg::StreamError(_) => {
debug_assert!(meta.is_none());
@ -64,13 +73,11 @@ impl RtAps {
}
}
println!("LOOP2");
// Communicate last result, if any.
'commscope: {
let mut last_result_lock = last_result.lock();
if let Some(RtApsComm::CommStopThread) = *last_result_lock {
println!("Stopping RtAps thread");
break 'mainloop;
}
if let Some(newmeta) = meta.take() {
@ -89,32 +96,37 @@ impl RtAps {
}
// Move last_cps into mutex.
if let Some(last_cps) = last_cps.take() {
*last_result_lock = Some(RtApsComm::NewResult(last_cps));
}
}
} // End of loop
println!("Ending RtAps thread");
});
assert!(!thread.is_finished());
Ok(RtAps { comm: last_result2 })
RtAps {
comm: last_result2,
settings: settings2,
}
}
/// Get last computed value. When new stream metadata is
pub fn get_last(&self) -> Option<RtApsComm> {
pub fn get_last(&self) -> Option<RtApsResult> {
let mut lck = self.comm.lock();
let res = lck.take();
if let Some(RtApsComm::CommStopThread) = res {
panic!("BUG: CommStopThread should never be set!")
if let Some(res) = res {
match res {
RtApsComm::CommStopThread => panic!("BUG: CommStopThread should never be set!"),
RtApsComm::NewMeta(m) => return Some(RtApsResult::NewMeta(m)),
RtApsComm::NewResult(r) => return Some(RtApsResult::NewResult(r)),
}
lck.take()
}
None
}
}
impl Drop for RtAps {
fn drop(&mut self) {
println!("DROP");
let mut lck = self.comm.lock();
*lck = Some(RtApsComm::CommStopThread);
println!("DROP done");
}
}
@ -123,12 +135,13 @@ mod test {
use std::time::Duration;
use super::*;
use crate::daq::StreamMgr;
use crate::{daq::StreamMgr, ps::ApsSettingsBuilder};
#[test]
fn test_rtaps1() -> Result<()> {
{
let settings = ApsSettingsBuilder::default().nfft(2048).build().unwrap();
let mut smgr = StreamMgr::new();
let rtaps = RtAps::build(&mut smgr)?;
let rtaps = RtAps::new(&mut smgr, settings);
smgr.startDefaultInputStream()?;
thread::sleep(Duration::from_secs(2));
}