Compare commits

...

3 Commits

8 changed files with 313 additions and 11 deletions

View File

@ -1,23 +1,44 @@
//! Provides stream messages that come from a running stream //! Provides stream messages that come from a running stream
use strum_macros::Display; use strum_macros::{Display, EnumMessage};
use super::*; use super::*;
/// Gives the stream status of a (possible) stream, either input / output or duplex. /// Gives the stream status of a (possible) stream, either input / output or duplex.
#[derive(strum_macros::EnumMessage, Debug, Clone, Copy, Display)] #[derive(EnumMessage, Debug, Clone, Copy, Display, PartialEq)]
#[cfg_attr(feature = "python-bindings", pyclass)] #[cfg_attr(feature = "python-bindings", pyclass)]
pub enum StreamStatus { pub enum StreamStatus {
/// Stream is not running /// Stream is not running
#[strum(message = "NotRunning", detailed_message = "Stream is not running")] #[strum(message = "NotRunning", detailed_message = "Stream is not running")]
NotRunning{}, NotRunning {},
/// Stream is running properly /// Stream is running properly
#[strum(message = "Running", detailed_message = "Stream is running")] #[strum(message = "Running", detailed_message = "Stream is running")]
Running{}, Running {},
/// An error occured in the stream. /// An error occured in the stream.
#[strum(message = "Error", detailed_message = "An error occured with the stream")] #[strum(
Error{ message = "Error",
detailed_message = "An error occured with the stream"
)]
Error {
/// In case the stream has an error: e is the field name /// In case the stream has an error: e is the field name
e: StreamError e: StreamError,
},
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamStatus {
fn __eq__(&self, other: &Self) -> bool {
self == other
}
fn hasError(&self) -> bool {
matches!(self, StreamStatus::Error { .. })
}
fn getError(&self) -> Option<StreamError> {
use StreamStatus::*;
if let Error { e } = self {
return Some(*e);
}
None
} }
} }

View File

@ -38,6 +38,7 @@ pub use config::Flt;
pub mod daq; pub mod daq;
pub mod filter; pub mod filter;
pub mod ps; pub mod ps;
mod math;
pub mod siggen; pub mod siggen;
use filter::*; use filter::*;
pub mod rt; pub mod rt;
@ -68,6 +69,8 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<ps::ApsMode>()?; m.add_class::<ps::ApsMode>()?;
m.add_class::<ps::ApsSettings>()?; m.add_class::<ps::ApsSettings>()?;
m.add_class::<ps::AvPowerSpectra>()?; m.add_class::<ps::AvPowerSpectra>()?;
m.add_class::<rt::PPM>()?;
m.add_class::<rt::ClipState>()?;
Ok(()) Ok(())
} }

View File

@ -1,4 +1,26 @@
//! General math tools that are internally required //! General math tools that are internally required
//! //!
//! //!
use crate::config::*;
use ndarray::ArrayView1;
/// Compute maximum value of an array of float values
pub fn max(arr: ArrayView1<Flt>) -> Flt {
arr.fold(
-Flt::INFINITY,
|acc, new| if *new > acc { *new } else { acc },
)
}
/// Compute minimum value of an array of float values
pub fn min(arr: ArrayView1<Flt>) -> Flt {
arr.fold(
Flt::INFINITY,
|acc, new| if *new < acc { *new } else { acc },
)
}
/// Compute maximum absolute value of an array of float values
pub fn maxabs(arr: ArrayView1<Flt>) -> Flt {
arr.fold(0., |acc, new| if new.abs() > acc { new.abs() } else { acc })
}

View File

@ -11,7 +11,6 @@ use freqweighting::FreqWeighting;
/// ///
/// For more information, see the book on numerical recipes. /// For more information, see the book on numerical recipes.
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pyclass)] #[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug)] #[derive(Debug)]

View File

@ -1,5 +1,7 @@
//! Real time signal analysis blocks, used for visual inspection and showing //! Real time signal analysis blocks, used for visual inspection and showing
//! data 'on the fly'. Examples are real time power spectra plotting //! data 'on the fly'. Examples are real time power spectra plotting
//! (Spectrograms, Auto powers, ..., or ) //! (Spectrograms, Auto powers, ..., or )
mod ppm;
mod rtaps; mod rtaps;
pub use rtaps::{RtAps, RtApsResult}; pub use ppm::{PPM, ClipState};
pub use rtaps::{RtAps, RtApsResult};

252
src/rt/ppm.rs Normal file
View File

@ -0,0 +1,252 @@
use crate::daq::InStreamMsg;
use crate::math::maxabs;
use crate::slm::{self, SLMSettingsBuilder, TimeWeighting, SLM};
use crate::{config::*, FreqWeighting, StandardFilterDescriptor};
use crate::{daq::StreamMgr, Dcol};
use crossbeam::channel::{unbounded, Receiver, Sender};
use crossbeam::utils::Backoff;
use ndarray_stats::QuantileExt;
use parking_lot::Mutex;
use std::default;
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::{Duration, Instant};
/// When the level reaches ALMOST_CLIPPED_REL_AMP in amplitude, w.r.t. to the
/// full scale, we mark a channel as almost clipped.
const ALMOST_CLIPPED_REL_AMP: Flt = 0.95;
/// If clipping occured, this is the time it keeps saying 'signal is clipped'
const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(4);
/// If the signal level falls below this value, we indicate that the signal level is low.
const LEVEL_THRESHOLD_FOR_LOW_LEVEL: Flt = -50.;
/// If the signal level falls below this value, we indicate that the signal level is low.
const LEVEL_THRESHOLD_FOR_HIGH_LEVEL: Flt = -10.;
type SharedPPMStatus = Arc<Mutex<Vec<PPMChannelStatus>>>;
/// Peak programme meter implementation, including clip detector. Effectively uses a realtime SLM on all
/// input channels. Also includes a clipping detector.
#[derive(Debug)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct PPM {
// Latest and createst status
status: SharedPPMStatus,
sender: Sender<PPMMessage>,
}
impl PPM {
/// Initialize a new PPM meter.
///
/// Args
///
/// - `mgr`: Stream manager instance.
pub fn new(mgr: &mut StreamMgr) -> Self {
let (sender, rxmsg) = unbounded();
// Shared status object
let status: SharedPPMStatus = Arc::new(Mutex::new(vec![]));
// Start the thread that calculates PPM and clip values
Self::startThread(status.clone(), mgr, rxmsg);
PPM { status, sender }
}
fn startThread(status: SharedPPMStatus, mgr: &mut StreamMgr, rxmsg: Receiver<PPMMessage>) {
// Obtain messages from stream manager
let (tx, rxstream) = unbounded();
// Add queue sender part of queue to stream manager
mgr.addInQueue(tx);
rayon::spawn(move || {
let mut slms: Vec<SLM> = vec![];
let resetall = |slms: &mut Vec<SLM>| {
let mut status = status.lock();
*status = Default::default();
slms.clear();
};
let mut sleep_dur = Duration::from_millis(50);
loop {
for msg in rxstream.try_iter() {
match msg {
InStreamMsg::InStreamData(d) => {
let mut status = status.lock();
let floatdata = d.getFloatData();
'channel: for (chno, (slm, ppmstatus)) in
slms.iter_mut().zip(status.iter_mut()).enumerate()
{
let chdata = floatdata.slice(s![.., chno]);
let maxabs_new = maxabs(chdata);
let chdata = chdata
.as_slice()
.expect("Data not contiguous on sample axis");
slm.run(chdata, false);
// Update levels
let last_level = slm.Ltlast()[0];
ppmstatus.level = last_level;
// If previous clip is there, and some time has elapsed, we remove the clip state
if let Some(moment) = ppmstatus.clip_time {
if moment.elapsed() > CLIP_INDICATOR_WAIT_S {
ppmstatus.clip = ClipState::LevelFine;
ppmstatus.clip_time = None;
}
// Do not update anything else if we are
// still in clipping mode. We are done
// updating PPM status for this channel.
continue 'channel;
}
// Update clip status, if we were not clipping
ppmstatus.clip = if maxabs_new > ALMOST_CLIPPED_REL_AMP {
ppmstatus.clip_time = Some(Instant::now());
ClipState::Clipped
} else if last_level > LEVEL_THRESHOLD_FOR_HIGH_LEVEL {
ClipState::HighLevel
} else if last_level < LEVEL_THRESHOLD_FOR_LOW_LEVEL {
ClipState::LowLevel
} else {
ClipState::LevelFine
}
}
}
InStreamMsg::StreamError(_e) => {
resetall(&mut slms);
}
InStreamMsg::StreamStarted(meta) => {
// Compute 'good' sleep time, as half the approximate time between two blocks
let block_time_us =
((1e6 * meta.framesPerBlock as Flt) / meta.samplerate) as u64;
sleep_dur = Duration::from_micros(std::cmp::max(1, block_time_us / 2));
// Re-initalize sound level meters
slms.clear();
let mut s = status.lock();
(0..meta.nchannels()).for_each(|_ch_index| {
// Create SLM settings. These might be different
// depending on channel in future
let slmsettings = SLMSettingsBuilder::default()
.fs(meta.samplerate)
.freqWeighting(FreqWeighting::Z)
.Lref(1.0)
.timeWeighting(TimeWeighting::Impulse {})
.filterDescriptors([
StandardFilterDescriptor::Overall().unwrap()
])
.build()
.unwrap();
slms.push(SLM::new(slmsettings.clone()));
// Initialize levels at -300 dB, and clip state
// at low levels
s.push(PPMChannelStatus {
clip: ClipState::LowLevel,
level: -300.,
clip_time: None,
});
});
}
InStreamMsg::StreamStopped => {
// Restore sleep time to sensible polling default
sleep_dur = Duration::from_millis(50);
}
}
// Loop over any messages coming in from main thread
for msg in rxmsg.try_iter() {
match msg {
PPMMessage::ResetClip => {
// Reset clip state to not clipped.
let mut s = status.lock();
s.iter_mut().for_each(|c| c.clip = ClipState::LevelFine);
}
PPMMessage::StopThread => {
resetall(&mut slms);
return;
}
}
}
std::thread::sleep(sleep_dur);
}
}
});
}
/// Reset clip state. Used to quickly restore the clipping state.
pub fn resetClip(&self) {
self.sender.send(PPMMessage::ResetClip).unwrap();
}
/// Returns the current state: levels and clip state
pub fn getState(&self) -> (Dcol, Vec<ClipState>) {
let status = self.status.lock();
let levels = Dcol::from_iter(status.iter().map(|s| s.level));
let clips = status.iter().map(|s| s.clip).collect();
(levels, clips)
}
}
impl Drop for PPM {
fn drop(&mut self) {
// Stop the thread
self.sender.send(PPMMessage::StopThread).unwrap();
}
}
/// Enumerator denoting, for each channel what the level approximately is. Low,
/// fine, high or clipped.
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Copy, Debug, Clone, Default)]
pub enum ClipState {
/// Level is rather low
#[default]
LowLevel,
/// Default state, fine levels
LevelFine,
/// High levels: warning!
HighLevel,
/// Signal probably clipped. The instant denotes when in history the clip
/// happened.
Clipped,
}
#[derive(Debug, Default)]
struct PPMChannelStatus {
// Current levels
level: Flt,
// If clipped in the past, it gives a timestamp on the time the clip
// happened. A call to [PPM::resetClip] will reset the clip state.
clip: ClipState,
/// Store when clip was
clip_time: Option<Instant>,
}
enum PPMMessage {
ResetClip,
StopThread,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl PPM {
#[new]
fn new_py(smgr: &mut StreamMgr) -> Self {
Self::new(smgr)
}
#[pyo3(name = "getState")]
fn getState_py<'py>(&self, py: Python<'py>) -> (Bound<'py, PyArray1<Flt>>, Vec<ClipState>) {
let (levels, clips) = self.getState();
let levels = levels.to_pyarray_bound(py);
(levels, clips)
}
#[pyo3(name = "resetClip")]
fn resetClip_py(&self) {
self.resetClip()
}
}

View File

@ -1,4 +1,5 @@
use std::ops::Deref; use std::ops::Deref;
use crate::config::*;
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr}; use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
@ -10,6 +11,7 @@ use parking_lot::Mutex;
use rayon::ThreadPool; use rayon::ThreadPool;
use std::sync::Arc; use std::sync::Arc;
#[derive(Debug)]
enum RtApsComm { enum RtApsComm {
CommStopThread, CommStopThread,
NewResult(CPSResult), NewResult(CPSResult),

View File

@ -34,6 +34,7 @@ pub enum TimeWeighting {
}, },
} }
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)] #[cfg_attr(feature = "python-bindings", pymethods)]
impl TimeWeighting { impl TimeWeighting {
// This method is still required in Pyo3 0.21, not anymore in 0.22 // This method is still required in Pyo3 0.21, not anymore in 0.22