From 2f95c1a95d7e426ce6e0d9c02211fed5d9e5f918 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F." Date: Sun, 6 Oct 2024 14:26:41 +0200 Subject: [PATCH] Better tuning of realtime ppm. Set timeouts in rtaps. Rtaps wrapped to Python. Bugfix in time constants for time weighting to compute the poles (not inverse of time weighting constant token). Bugfix in asymmetric time weighting for slm --- src/bin/lasp_inputdefault.rs | 2 +- src/daq/streammetadata.rs | 25 +++++++ src/daq/streammgr.rs | 7 +- src/lib.rs | 13 +++- src/rt/ppm.rs | 23 +++---- src/rt/rtaps.rs | 129 ++++++++++++++++++++++------------- src/slm/slm.rs | 6 +- src/slm/tw.rs | 6 +- 8 files changed, 140 insertions(+), 71 deletions(-) diff --git a/src/bin/lasp_inputdefault.rs b/src/bin/lasp_inputdefault.rs index de62152..b525b0c 100644 --- a/src/bin/lasp_inputdefault.rs +++ b/src/bin/lasp_inputdefault.rs @@ -41,7 +41,7 @@ fn main() -> Result<()> { println!("Stream started metadata: {meta:#?}"); }, InStreamMsg::InStreamData(_) => {} - _ => { println!("Other msg...");} + _ => { println!("Instreamdata arrived...");} } } Err(e) => match e { diff --git a/src/daq/streammetadata.rs b/src/daq/streammetadata.rs index c999d0a..2f0f6fd 100644 --- a/src/daq/streammetadata.rs +++ b/src/daq/streammetadata.rs @@ -1,8 +1,10 @@ use super::*; use crate::config::Flt; use anyhow::Result; + /// Stream metadata. All information required for properly interpreting the raw /// data that is coming from the stream. +#[cfg_attr(feature = "python-bindings", pyclass)] #[derive(Clone, Debug)] pub struct StreamMetaData { /// Information for each channel in the stream @@ -58,3 +60,26 @@ impl StreamMetaData { self.channelInfo.len() } } + +/// Simple getters for all sub-attributes of the stream metadata. +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] +impl StreamMetaData { + #[getter] + pub fn channelInfo(&self) -> Vec { + self.channelInfo.clone() + } + #[getter] + fn rawDatatype(&self) -> DataType { + self.rawDatatype + } + + #[getter] + fn samplerate(&self) -> Flt { + self.samplerate + } + #[getter] + fn framesPerBlock(&self) -> usize { + self.framesPerBlock + } +} diff --git a/src/daq/streammgr.rs b/src/daq/streammgr.rs index 16511a2..d79cca9 100644 --- a/src/daq/streammgr.rs +++ b/src/daq/streammgr.rs @@ -111,6 +111,12 @@ impl StreamMgr { fn setSiggen_py(&mut self, siggen: Siggen) { self.setSiggen(siggen) } + #[pyo3(name = "getStreamMetaData")] + fn getStreamMetaData_py(&self, st: StreamType) -> Option { + // Unfortunately (but not really, only cosmetically), the underlying + // value (not the Arc) has to be cloned. + self.getStreamMetaData(st).map(|b| (*b).clone()) + } } impl Default for StreamMgr { fn default() -> Self { @@ -274,7 +280,6 @@ impl StreamMgr { } } if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) { - sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg); } } diff --git a/src/lib.rs b/src/lib.rs index 18016c6..8854c35 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,22 +55,31 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; + + // Signal generator + m.add_class::()?; + + // SLM m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; + + // Power spectra classes + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; + + // Real time classes m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/rt/ppm.rs b/src/rt/ppm.rs index 010a70c..ac5a84b 100644 --- a/src/rt/ppm.rs +++ b/src/rt/ppm.rs @@ -14,10 +14,10 @@ use std::time::{Duration, Instant}; /// When the level reaches ALMOST_CLIPPED_REL_AMP in amplitude, w.r.t. to the /// full scale, we mark a channel as almost clipped. -const ALMOST_CLIPPED_REL_AMP: Flt = 0.95; +const ALMOST_CLIPPED_REL_AMP: Flt = 0.98; /// If clipping occured, this is the time it keeps saying 'signal is clipped' -const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(4); +const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(2); /// If the signal level falls below this value, we indicate that the signal level is low. const LEVEL_THRESHOLD_FOR_LOW_LEVEL: Flt = -50.; @@ -69,9 +69,11 @@ impl PPM { *status = Default::default(); slms.clear(); }; - let mut sleep_dur = Duration::from_millis(50); loop { - for msg in rxstream.try_iter() { + if let Some(msg) = rxstream + .recv_timeout(std::time::Duration::from_millis(10)) + .ok() + { match msg { InStreamMsg::InStreamData(d) => { let mut status = status.lock(); @@ -120,11 +122,6 @@ impl PPM { resetall(&mut slms); } InStreamMsg::StreamStarted(meta) => { - // Compute 'good' sleep time, as half the approximate time between two blocks - let block_time_us = - ((1e6 * meta.framesPerBlock as Flt) / meta.samplerate) as u64; - sleep_dur = Duration::from_micros(std::cmp::max(1, block_time_us / 2)); - // Re-initalize sound level meters slms.clear(); let mut s = status.lock(); @@ -136,6 +133,8 @@ impl PPM { .freqWeighting(FreqWeighting::Z) .Lref(1.0) .timeWeighting(TimeWeighting::Impulse {}) + // .timeWeighting(TimeWeighting::CustomAsymmetric { tup: 0.01, tdown: 1.0 }) + // .timeWeighting(TimeWeighting::Fast {}) .filterDescriptors([ StandardFilterDescriptor::Overall().unwrap() ]) @@ -152,10 +151,7 @@ impl PPM { }); }); } - InStreamMsg::StreamStopped => { - // Restore sleep time to sensible polling default - sleep_dur = Duration::from_millis(50); - } + InStreamMsg::StreamStopped => {} } // Loop over any messages coming in from main thread for msg in rxmsg.try_iter() { @@ -171,7 +167,6 @@ impl PPM { } } } - std::thread::sleep(sleep_dur); } } }); diff --git a/src/rt/rtaps.rs b/src/rt/rtaps.rs index 77ca460..95a1b7a 100644 --- a/src/rt/rtaps.rs +++ b/src/rt/rtaps.rs @@ -1,23 +1,26 @@ -use std::ops::Deref; use crate::config::*; -use std::thread::{self, JoinHandle}; - use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr}; use crate::ps::ApsSettings; use crate::ps::{AvPowerSpectra, CPSResult}; use crate::I; use anyhow::Result; +use crossbeam::channel::{unbounded, Receiver, Sender}; use parking_lot::Mutex; use rayon::ThreadPool; +use std::ops::Deref; use std::sync::Arc; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +type SharedRtApsStatus = Arc>>; #[derive(Debug)] -enum RtApsComm { - CommStopThread, - NewResult(CPSResult), - NewMeta(Arc), +enum RtApsMessage { + StopThread, + ResetStatus, } -/// Result type coming from Real time Averaged Power Spectra computing engine +/// Result type coming from Real time Averaged Power Spectra computation engine +#[derive(Debug)] pub enum RtApsResult { /// New result NewResult(CPSResult), @@ -26,34 +29,48 @@ pub enum RtApsResult { } /// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent' +#[cfg_attr(feature = "python-bindings", pyclass)] +#[derive(Debug)] pub struct RtAps { /// Storage for optional last result - comm: Arc>>, - /// Settings used for real time power spectra. - pub settings: ApsSettings, + status: SharedRtApsStatus, + + // For sending messages to the data processing thread + sender: Sender, } impl RtAps { /// Create new Real time power spectra computing engine. pub fn new(mgr: &mut StreamMgr, settings: ApsSettings) -> RtAps { // Handler needs to be created here. - let handler = StreamHandler::new(mgr); - let last_result = Arc::new(Mutex::new(None)); - let last_result2 = last_result.clone(); - let settings2 = settings.clone(); + let status = Arc::new(Mutex::new(None)); - let mut aps = AvPowerSpectra::new(settings); + let (sender, rx) = unbounded(); + let aps = AvPowerSpectra::new(settings); + Self::startThread(aps, status.clone(), mgr, rx); - let thread = std::thread::spawn(move || { - // println!("Thread started..."); - let rx = handler.rx; + RtAps { status, sender } + } + + fn startThread( + mut aps: AvPowerSpectra, + status: SharedRtApsStatus, + smgr: &mut StreamMgr, + rxmsg: Receiver, + ) { + // Obtain messages from stream manager + let (tx, rxstream) = unbounded(); + + // Add queue sender part of queue to stream manager + smgr.addInQueue(tx); + + rayon::spawn(move || { // What is running on the thread - 'mainloop: loop { let mut last_cps: Option = None; let mut meta: Option> = None; - if let Some(msg) = rx.recv_timeout(std::time::Duration::from_millis(10)).ok() { + if let Some(msg) = rxstream.recv_timeout(Duration::from_millis(10)).ok() { match msg { InStreamMsg::StreamStarted(new_meta) => { aps.reset(); @@ -74,22 +91,33 @@ impl RtAps { } } + // Check for messages and act accordingly + if let Some(msg) = rxmsg.recv_timeout(Duration::from_millis(10)).ok() { + match msg { + RtApsMessage::StopThread => { + let mut status = status.lock(); + *status = None; + break 'mainloop; + } + RtApsMessage::ResetStatus => { + aps.reset(); + } + } + } + // Communicate last result, if any. 'commscope: { - let mut last_result_lock = last_result.lock(); + let mut status = status.lock(); - if let Some(RtApsComm::CommStopThread) = *last_result_lock { - break 'mainloop; - } if let Some(newmeta) = meta.take() { // New metadata has arrived. This is always the first // thing to push. Only when it is read, we will start // pushing actual data. - *last_result_lock = Some(RtApsComm::NewMeta(newmeta)); + *status = Some(RtApsResult::NewMeta(newmeta)); break 'commscope; } - if let Some(RtApsComm::NewMeta(_)) = *last_result_lock { + if let Some(RtApsResult::NewMeta(_)) = status.deref() { // New metadata is not yet read by reading thread. It // basically means we are not yet ready to give actual // data back. @@ -97,36 +125,39 @@ impl RtAps { } // Move last_cps into mutex. if let Some(last_cps) = last_cps.take() { - *last_result_lock = Some(RtApsComm::NewResult(last_cps)); + *status = Some(RtApsResult::NewResult(last_cps)); } - } - } // End of loop + } // end of commscope + } // End of mainloop }); - assert!(!thread.is_finished()); - - RtAps { - comm: last_result2, - settings: settings2, - } } - /// Get last computed value. When new stream metadata is + + /// Take last updated result. pub fn get_last(&self) -> Option { - let mut lck = self.comm.lock(); - let res = lck.take(); - if let Some(res) = res { - match res { - RtApsComm::CommStopThread => panic!("BUG: CommStopThread should never be set!"), - RtApsComm::NewMeta(m) => return Some(RtApsResult::NewMeta(m)), - RtApsComm::NewResult(r) => return Some(RtApsResult::NewResult(r)), - } - } - None + let mut lck = self.status.lock(); + lck.take() } } impl Drop for RtAps { fn drop(&mut self) { - let mut lck = self.comm.lock(); - *lck = Some(RtApsComm::CommStopThread); + self.sender.send(RtApsMessage::StopThread).unwrap(); + } +} +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] +impl RtAps { + #[new] + fn new_py(smgr: &mut StreamMgr, settings: ApsSettings) -> Self { + RtAps::new(smgr, settings) + } + // This method does not forward the metadata. Should come from somewhere else + #[pyo3(name = "get_last")] + fn get_last_py<'py>(&self, py: Python<'py>) -> Option>> { + let res = self.get_last(); + if let Some(RtApsResult::NewResult(res)) = res { + return Some(res.to_pyarray_bound(py)); + } + None } } diff --git a/src/slm/slm.rs b/src/slm/slm.rs index 776b537..1c41720 100644 --- a/src/slm/slm.rs +++ b/src/slm/slm.rs @@ -92,7 +92,7 @@ impl SLM { } let prefiltered = self.prefilter.filter(td); - let level = |a| 10. * Flt::log10(a) / self.Lrefsq; + let level_fun = |a| 10. * Flt::log10(a) / self.Lrefsq; let Lt_iter = self.channels.par_iter_mut().map(|ch| { let mut tmp = ch.bp.filter(&prefiltered); @@ -135,7 +135,7 @@ impl SLM { rectifier_down.setToDCValue(fup); fup } else { - rectifier_up.setToDCValue(fup); + rectifier_up.setToDCValue(fdown); fdown } }); @@ -155,7 +155,7 @@ impl SLM { // Update last signal power coming from SLM ch.stat.Pt_last = *filtered_squared.last().unwrap(); // Convert output to levels - filtered_squared.mapv_inplace(level); + filtered_squared.mapv_inplace(level_fun); tmp }); if provide_output { diff --git a/src/slm/tw.rs b/src/slm/tw.rs index 318986a..2367306 100644 --- a/src/slm/tw.rs +++ b/src/slm/tw.rs @@ -49,6 +49,10 @@ impl TimeWeighting { use TimeWeighting::*; vec![Slow {}, Fast {}, Impulse {}] } + #[pyo3(name = "getLowpassPoles")] + fn getLowpassPoles_py(&self) -> (Flt, Option) { + self.getLowpassPoles() + } } impl Default for TimeWeighting { @@ -91,7 +95,7 @@ impl TimeWeighting { CustomAsymmetric { tup, tdown } => { assert!(*tup > 0.); assert!(*tdown > 0.); - (-*tup, Some(-*tdown)) + (-1. / (*tup), Some(-1. / (*tdown))) } } }