diff --git a/src/daq/streamstatus.rs b/src/daq/streamstatus.rs index 4554e94..ec2817b 100644 --- a/src/daq/streamstatus.rs +++ b/src/daq/streamstatus.rs @@ -1,23 +1,44 @@ //! Provides stream messages that come from a running stream -use strum_macros::Display; +use strum_macros::{Display, EnumMessage}; use super::*; /// Gives the stream status of a (possible) stream, either input / output or duplex. -#[derive(strum_macros::EnumMessage, Debug, Clone, Copy, Display)] +#[derive(EnumMessage, Debug, Clone, Copy, Display, PartialEq)] #[cfg_attr(feature = "python-bindings", pyclass)] pub enum StreamStatus { /// Stream is not running #[strum(message = "NotRunning", detailed_message = "Stream is not running")] - NotRunning{}, + NotRunning {}, /// Stream is running properly #[strum(message = "Running", detailed_message = "Stream is running")] - Running{}, + Running {}, /// An error occured in the stream. - #[strum(message = "Error", detailed_message = "An error occured with the stream")] - Error{ + #[strum( + message = "Error", + detailed_message = "An error occured with the stream" + )] + Error { /// In case the stream has an error: e is the field name - e: StreamError + e: StreamError, + }, +} + +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] +impl StreamStatus { + fn __eq__(&self, other: &Self) -> bool { + self == other + } + fn hasError(&self) -> bool { + matches!(self, StreamStatus::Error { .. }) + } + fn getError(&self) -> Option { + use StreamStatus::*; + if let Error { e } = self { + return Some(*e); + } + None } } diff --git a/src/lib.rs b/src/lib.rs index 0b6a80e..18016c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ pub use config::Flt; pub mod daq; pub mod filter; pub mod ps; +mod math; pub mod siggen; use filter::*; pub mod rt; @@ -68,6 +69,8 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/math/mod.rs b/src/math/mod.rs index c0b9747..0a7c5f2 100644 --- a/src/math/mod.rs +++ b/src/math/mod.rs @@ -1,4 +1,26 @@ //! General math tools that are internally required -//! -//! +//! +//! +use crate::config::*; +use ndarray::ArrayView1; +/// Compute maximum value of an array of float values +pub fn max(arr: ArrayView1) -> Flt { + arr.fold( + -Flt::INFINITY, + |acc, new| if *new > acc { *new } else { acc }, + ) +} + +/// Compute minimum value of an array of float values +pub fn min(arr: ArrayView1) -> Flt { + arr.fold( + Flt::INFINITY, + |acc, new| if *new < acc { *new } else { acc }, + ) +} + +/// Compute maximum absolute value of an array of float values +pub fn maxabs(arr: ArrayView1) -> Flt { + arr.fold(0., |acc, new| if new.abs() > acc { new.abs() } else { acc }) +} diff --git a/src/ps/aps.rs b/src/ps/aps.rs index e409260..eb5f879 100644 --- a/src/ps/aps.rs +++ b/src/ps/aps.rs @@ -11,7 +11,6 @@ use freqweighting::FreqWeighting; /// /// For more information, see the book on numerical recipes. -#[cfg(feature = "python-bindings")] #[cfg_attr(feature = "python-bindings", pyclass)] #[derive(Debug)] diff --git a/src/rt/mod.rs b/src/rt/mod.rs index 490ad2d..7d82469 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -1,5 +1,7 @@ //! Real time signal analysis blocks, used for visual inspection and showing //! data 'on the fly'. Examples are real time power spectra plotting //! (Spectrograms, Auto powers, ..., or ) +mod ppm; mod rtaps; -pub use rtaps::{RtAps, RtApsResult}; \ No newline at end of file +pub use ppm::{PPM, ClipState}; +pub use rtaps::{RtAps, RtApsResult}; diff --git a/src/rt/ppm.rs b/src/rt/ppm.rs new file mode 100644 index 0000000..c469c8f --- /dev/null +++ b/src/rt/ppm.rs @@ -0,0 +1,245 @@ +use crate::daq::InStreamMsg; +use crate::math::maxabs; +use crate::slm::{self, SLMSettingsBuilder, SLM}; +use crate::{config::*, FreqWeighting, StandardFilterDescriptor}; +use crate::{daq::StreamMgr, Dcol}; +use crossbeam::channel::{unbounded, Receiver, Sender}; +use crossbeam::utils::Backoff; +use ndarray_stats::QuantileExt; +use parking_lot::Mutex; +use std::default; +use std::ops::DerefMut; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +/// When the level reaches ALMOST_CLIPPED_REL_AMP in amplitude, w.r.t. to the +/// full scale, we mark a channel as almost clipped. +const ALMOST_CLIPPED_REL_AMP: Flt = 0.95; + +/// If clipping occured, this is the time it keeps saying 'signal is clipped' +const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(2); + +/// If the signal level falls below this value, we indicate that the signal level is low. +const LEVEL_THRESHOLD_FOR_LOW_LEVEL: Flt = -50.; + +/// If the signal level falls below this value, we indicate that the signal level is low. +const LEVEL_THRESHOLD_FOR_HIGH_LEVEL: Flt = -5.; + +type SharedPPMStatus = Arc>>; + +/// Peak programme meter implementation, including clip detector. Effectively uses a realtime SLM on all +/// input channels. Also includes a clipping detector. +#[derive(Debug)] +#[cfg_attr(feature = "python-bindings", pyclass)] +pub struct PPM { + // Latest and createst status + status: SharedPPMStatus, + sender: Sender, +} + +impl PPM { + /// Initialize a new PPM meter. + pub fn new(mgr: &mut StreamMgr) -> Self { + let (sender, rxmsg) = unbounded(); + let (tx, rxstream) = unbounded(); + mgr.addInQueue(tx); + + let status2: SharedPPMStatus = Arc::new(Mutex::new(vec![])); + let status = status2.clone(); + rayon::spawn(move || { + let mut slms: Vec = vec![]; + + let resetall = |slms: &mut Vec| { + let mut status = status.lock(); + *status = Default::default(); + slms.clear(); + }; + let mut sleep_dur = Duration::from_millis(50); + loop { + for msg in rxstream.try_iter() { + match msg { + InStreamMsg::InStreamData(d) => { + let mut status = status.lock(); + let floatdata = d.getFloatData(); + + 'channel: for (chno, (slm, ppmstatus)) in + slms.iter_mut().zip(status.iter_mut()).enumerate() + { + let chdata = floatdata.slice(s![.., chno]); + let maxabs_new = maxabs(chdata); + let chdata = chdata + .as_slice() + .expect("Data not contiguous on sample axis"); + slm.run(chdata, false); + + // Update levels + let last_level = slm.Ltlast()[0]; + ppmstatus.level = last_level; + + // If previous clip is there, and some time has elapsed, we remove the clip state + if let Some(moment) = ppmstatus.clip_time { + if moment.elapsed() > CLIP_INDICATOR_WAIT_S { + ppmstatus.clip = ClipState::LevelFine; + ppmstatus.clip_time = None; + } + // Do not update anything else if we are + // still in clipping mode. We are done + // updating PPM status for this channel. + continue 'channel; + } + + // Update clip status, if we were not clipping + ppmstatus.clip = if maxabs_new > ALMOST_CLIPPED_REL_AMP { + ppmstatus.clip_time = Some(Instant::now()); + ClipState::Clipped + } else if last_level > LEVEL_THRESHOLD_FOR_HIGH_LEVEL { + ClipState::HighLevel + } else if last_level < LEVEL_THRESHOLD_FOR_LOW_LEVEL { + ClipState::LowLevels + } else { + ClipState::LevelFine + } + } + } + InStreamMsg::StreamError(_e) => { + resetall(&mut slms); + } + InStreamMsg::StreamStarted(meta) => { + // Compute 'good' sleep time, as half the approximate time between two blocks + let block_time_us = + ((1e6 * meta.framesPerBlock as Flt) / meta.samplerate) as u64; + sleep_dur = Duration::from_micros(std::cmp::max(1, block_time_us / 2)); + + // Re-initalize sound level meters + slms.clear(); + let mut s = status.lock(); + (0..meta.nchannels()).for_each(|_ch_index| { + // Create SLM settings. These might be different + // depending on channel in future + let slmsettings = SLMSettingsBuilder::default() + .fs(meta.samplerate) + .freqWeighting(FreqWeighting::Z) + .Lref(1.0) + .filterDescriptors([ + StandardFilterDescriptor::Overall().unwrap() + ]) + .build() + .unwrap(); + + slms.push(SLM::new(slmsettings.clone())); + // Initialize levels at -300 dB, and clip state + // at low levels + s.push(PPMChannelStatus { + clip: ClipState::LowLevels, + level: -300., + clip_time: None + }); + }); + } + InStreamMsg::StreamStopped => { + // Restore sleep time to sensible polling default + sleep_dur = Duration::from_millis(50); + } + } + // Loop over any messages coming in from main thread + for msg in rxmsg.try_iter() { + match msg { + PPMMessage::ResetClip {} => { + // Reset clip state to not clipped. + let mut s = status.lock(); + s.iter_mut().for_each(|c| c.clip = ClipState::LevelFine); + } + PPMMessage::StopThread => { + resetall(&mut slms); + return; + } + } + } + std::thread::sleep(sleep_dur); + } + } + }); + + PPM { + status: status2, + sender, + } + } + + /// Return array of instantaneous PPM level per channel + pub fn getLevels(&self) -> Dcol { + Dcol::from_iter(self.status.lock().iter().map(|s| s.level)) + } + /// Reset clip state. Used to quickly restore the clipping state. + pub fn resetClip(&self) { + self.sender.send(PPMMessage::ResetClip).unwrap(); + } + fn getClipState(&self) -> Vec { + self.status.lock().iter().map(|s| s.clip).collect() + + } +} +impl Drop for PPM { + fn drop(&mut self) { + // Stop the thread + self.sender.send(PPMMessage::StopThread).unwrap(); + } +} + +/// Enumerator denoting, for each channel what the level approximately is. Low, +/// fine, high or clipped. +#[cfg_attr(feature = "python-bindings", pyclass)] +#[derive(Copy, Debug, Clone, Default)] +pub enum ClipState { + /// Level is rather low + #[default] + LowLevels, + /// Default state, fine levels + LevelFine, + /// High levels: warning! + HighLevel, + /// Signal probably clipped. The instant denotes when in history the clip + /// happened. + Clipped, +} + +#[derive(Debug, Default)] +struct PPMChannelStatus { + // Current levels + level: Flt, + // If clipped in the past, it gives a timestamp on the time the clip + // happened. A call to [PPM::resetClip] will reset the clip state. + clip: ClipState, + + /// Store when clip was + clip_time: Option, +} + +enum PPMMessage { + ResetClip, + StopThread, +} + +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] +impl PPM { + #[new] + fn new_py(smgr: &mut StreamMgr) -> Self { + Self::new(smgr) + } + + #[pyo3(name = "getLevels")] + fn getLevels_py<'py>(&self, py: Python<'py>) -> Bound<'py, PyArray1> { + self.getLevels().to_pyarray_bound(py) + } + + #[pyo3(name = "getClipState")] + fn getClipState_py(&self) -> Vec { + self.getClipState() + } + + #[pyo3(name = "resetClip")] + fn resetClip_py(&self) { + self.resetClip() + } +} diff --git a/src/slm/tw.rs b/src/slm/tw.rs index 82b40d4..318986a 100644 --- a/src/slm/tw.rs +++ b/src/slm/tw.rs @@ -34,6 +34,7 @@ pub enum TimeWeighting { }, } +#[cfg(feature = "python-bindings")] #[cfg_attr(feature = "python-bindings", pymethods)] impl TimeWeighting { // This method is still required in Pyo3 0.21, not anymore in 0.22