diff --git a/src/bin/lasp_inputdefault.rs b/src/bin/lasp_inputdefault.rs index de62152..b525b0c 100644 --- a/src/bin/lasp_inputdefault.rs +++ b/src/bin/lasp_inputdefault.rs @@ -41,7 +41,7 @@ fn main() -> Result<()> { println!("Stream started metadata: {meta:#?}"); }, InStreamMsg::InStreamData(_) => {} - _ => { println!("Other msg...");} + _ => { println!("Instreamdata arrived...");} } } Err(e) => match e { diff --git a/src/daq/streammetadata.rs b/src/daq/streammetadata.rs index c999d0a..2f0f6fd 100644 --- a/src/daq/streammetadata.rs +++ b/src/daq/streammetadata.rs @@ -1,8 +1,10 @@ use super::*; use crate::config::Flt; use anyhow::Result; + /// Stream metadata. All information required for properly interpreting the raw /// data that is coming from the stream. +#[cfg_attr(feature = "python-bindings", pyclass)] #[derive(Clone, Debug)] pub struct StreamMetaData { /// Information for each channel in the stream @@ -58,3 +60,26 @@ impl StreamMetaData { self.channelInfo.len() } } + +/// Simple getters for all sub-attributes of the stream metadata. +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] +impl StreamMetaData { + #[getter] + pub fn channelInfo(&self) -> Vec { + self.channelInfo.clone() + } + #[getter] + fn rawDatatype(&self) -> DataType { + self.rawDatatype + } + + #[getter] + fn samplerate(&self) -> Flt { + self.samplerate + } + #[getter] + fn framesPerBlock(&self) -> usize { + self.framesPerBlock + } +} diff --git a/src/daq/streammgr.rs b/src/daq/streammgr.rs index 16511a2..d79cca9 100644 --- a/src/daq/streammgr.rs +++ b/src/daq/streammgr.rs @@ -111,6 +111,12 @@ impl StreamMgr { fn setSiggen_py(&mut self, siggen: Siggen) { self.setSiggen(siggen) } + #[pyo3(name = "getStreamMetaData")] + fn getStreamMetaData_py(&self, st: StreamType) -> Option { + // Unfortunately (but not really, only cosmetically), the underlying + // value (not the Arc) has to be cloned. + self.getStreamMetaData(st).map(|b| (*b).clone()) + } } impl Default for StreamMgr { fn default() -> Self { @@ -274,7 +280,6 @@ impl StreamMgr { } } if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) { - sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg); } } diff --git a/src/lib.rs b/src/lib.rs index 18016c6..8854c35 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,22 +55,31 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; + + // Signal generator + m.add_class::()?; + + // SLM m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; + + // Power spectra classes + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; + + // Real time classes m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/rt/ppm.rs b/src/rt/ppm.rs index 010a70c..ac5a84b 100644 --- a/src/rt/ppm.rs +++ b/src/rt/ppm.rs @@ -14,10 +14,10 @@ use std::time::{Duration, Instant}; /// When the level reaches ALMOST_CLIPPED_REL_AMP in amplitude, w.r.t. to the /// full scale, we mark a channel as almost clipped. -const ALMOST_CLIPPED_REL_AMP: Flt = 0.95; +const ALMOST_CLIPPED_REL_AMP: Flt = 0.98; /// If clipping occured, this is the time it keeps saying 'signal is clipped' -const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(4); +const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(2); /// If the signal level falls below this value, we indicate that the signal level is low. const LEVEL_THRESHOLD_FOR_LOW_LEVEL: Flt = -50.; @@ -69,9 +69,11 @@ impl PPM { *status = Default::default(); slms.clear(); }; - let mut sleep_dur = Duration::from_millis(50); loop { - for msg in rxstream.try_iter() { + if let Some(msg) = rxstream + .recv_timeout(std::time::Duration::from_millis(10)) + .ok() + { match msg { InStreamMsg::InStreamData(d) => { let mut status = status.lock(); @@ -120,11 +122,6 @@ impl PPM { resetall(&mut slms); } InStreamMsg::StreamStarted(meta) => { - // Compute 'good' sleep time, as half the approximate time between two blocks - let block_time_us = - ((1e6 * meta.framesPerBlock as Flt) / meta.samplerate) as u64; - sleep_dur = Duration::from_micros(std::cmp::max(1, block_time_us / 2)); - // Re-initalize sound level meters slms.clear(); let mut s = status.lock(); @@ -136,6 +133,8 @@ impl PPM { .freqWeighting(FreqWeighting::Z) .Lref(1.0) .timeWeighting(TimeWeighting::Impulse {}) + // .timeWeighting(TimeWeighting::CustomAsymmetric { tup: 0.01, tdown: 1.0 }) + // .timeWeighting(TimeWeighting::Fast {}) .filterDescriptors([ StandardFilterDescriptor::Overall().unwrap() ]) @@ -152,10 +151,7 @@ impl PPM { }); }); } - InStreamMsg::StreamStopped => { - // Restore sleep time to sensible polling default - sleep_dur = Duration::from_millis(50); - } + InStreamMsg::StreamStopped => {} } // Loop over any messages coming in from main thread for msg in rxmsg.try_iter() { @@ -171,7 +167,6 @@ impl PPM { } } } - std::thread::sleep(sleep_dur); } } }); diff --git a/src/rt/rtaps.rs b/src/rt/rtaps.rs index 77ca460..95a1b7a 100644 --- a/src/rt/rtaps.rs +++ b/src/rt/rtaps.rs @@ -1,23 +1,26 @@ -use std::ops::Deref; use crate::config::*; -use std::thread::{self, JoinHandle}; - use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr}; use crate::ps::ApsSettings; use crate::ps::{AvPowerSpectra, CPSResult}; use crate::I; use anyhow::Result; +use crossbeam::channel::{unbounded, Receiver, Sender}; use parking_lot::Mutex; use rayon::ThreadPool; +use std::ops::Deref; use std::sync::Arc; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +type SharedRtApsStatus = Arc>>; #[derive(Debug)] -enum RtApsComm { - CommStopThread, - NewResult(CPSResult), - NewMeta(Arc), +enum RtApsMessage { + StopThread, + ResetStatus, } -/// Result type coming from Real time Averaged Power Spectra computing engine +/// Result type coming from Real time Averaged Power Spectra computation engine +#[derive(Debug)] pub enum RtApsResult { /// New result NewResult(CPSResult), @@ -26,34 +29,48 @@ pub enum RtApsResult { } /// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent' +#[cfg_attr(feature = "python-bindings", pyclass)] +#[derive(Debug)] pub struct RtAps { /// Storage for optional last result - comm: Arc>>, - /// Settings used for real time power spectra. - pub settings: ApsSettings, + status: SharedRtApsStatus, + + // For sending messages to the data processing thread + sender: Sender, } impl RtAps { /// Create new Real time power spectra computing engine. pub fn new(mgr: &mut StreamMgr, settings: ApsSettings) -> RtAps { // Handler needs to be created here. - let handler = StreamHandler::new(mgr); - let last_result = Arc::new(Mutex::new(None)); - let last_result2 = last_result.clone(); - let settings2 = settings.clone(); + let status = Arc::new(Mutex::new(None)); - let mut aps = AvPowerSpectra::new(settings); + let (sender, rx) = unbounded(); + let aps = AvPowerSpectra::new(settings); + Self::startThread(aps, status.clone(), mgr, rx); - let thread = std::thread::spawn(move || { - // println!("Thread started..."); - let rx = handler.rx; + RtAps { status, sender } + } + + fn startThread( + mut aps: AvPowerSpectra, + status: SharedRtApsStatus, + smgr: &mut StreamMgr, + rxmsg: Receiver, + ) { + // Obtain messages from stream manager + let (tx, rxstream) = unbounded(); + + // Add queue sender part of queue to stream manager + smgr.addInQueue(tx); + + rayon::spawn(move || { // What is running on the thread - 'mainloop: loop { let mut last_cps: Option = None; let mut meta: Option> = None; - if let Some(msg) = rx.recv_timeout(std::time::Duration::from_millis(10)).ok() { + if let Some(msg) = rxstream.recv_timeout(Duration::from_millis(10)).ok() { match msg { InStreamMsg::StreamStarted(new_meta) => { aps.reset(); @@ -74,22 +91,33 @@ impl RtAps { } } + // Check for messages and act accordingly + if let Some(msg) = rxmsg.recv_timeout(Duration::from_millis(10)).ok() { + match msg { + RtApsMessage::StopThread => { + let mut status = status.lock(); + *status = None; + break 'mainloop; + } + RtApsMessage::ResetStatus => { + aps.reset(); + } + } + } + // Communicate last result, if any. 'commscope: { - let mut last_result_lock = last_result.lock(); + let mut status = status.lock(); - if let Some(RtApsComm::CommStopThread) = *last_result_lock { - break 'mainloop; - } if let Some(newmeta) = meta.take() { // New metadata has arrived. This is always the first // thing to push. Only when it is read, we will start // pushing actual data. - *last_result_lock = Some(RtApsComm::NewMeta(newmeta)); + *status = Some(RtApsResult::NewMeta(newmeta)); break 'commscope; } - if let Some(RtApsComm::NewMeta(_)) = *last_result_lock { + if let Some(RtApsResult::NewMeta(_)) = status.deref() { // New metadata is not yet read by reading thread. It // basically means we are not yet ready to give actual // data back. @@ -97,36 +125,39 @@ impl RtAps { } // Move last_cps into mutex. if let Some(last_cps) = last_cps.take() { - *last_result_lock = Some(RtApsComm::NewResult(last_cps)); + *status = Some(RtApsResult::NewResult(last_cps)); } - } - } // End of loop + } // end of commscope + } // End of mainloop }); - assert!(!thread.is_finished()); - - RtAps { - comm: last_result2, - settings: settings2, - } } - /// Get last computed value. When new stream metadata is + + /// Take last updated result. pub fn get_last(&self) -> Option { - let mut lck = self.comm.lock(); - let res = lck.take(); - if let Some(res) = res { - match res { - RtApsComm::CommStopThread => panic!("BUG: CommStopThread should never be set!"), - RtApsComm::NewMeta(m) => return Some(RtApsResult::NewMeta(m)), - RtApsComm::NewResult(r) => return Some(RtApsResult::NewResult(r)), - } - } - None + let mut lck = self.status.lock(); + lck.take() } } impl Drop for RtAps { fn drop(&mut self) { - let mut lck = self.comm.lock(); - *lck = Some(RtApsComm::CommStopThread); + self.sender.send(RtApsMessage::StopThread).unwrap(); + } +} +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] +impl RtAps { + #[new] + fn new_py(smgr: &mut StreamMgr, settings: ApsSettings) -> Self { + RtAps::new(smgr, settings) + } + // This method does not forward the metadata. Should come from somewhere else + #[pyo3(name = "get_last")] + fn get_last_py<'py>(&self, py: Python<'py>) -> Option>> { + let res = self.get_last(); + if let Some(RtApsResult::NewResult(res)) = res { + return Some(res.to_pyarray_bound(py)); + } + None } } diff --git a/src/slm/slm.rs b/src/slm/slm.rs index 776b537..1c41720 100644 --- a/src/slm/slm.rs +++ b/src/slm/slm.rs @@ -92,7 +92,7 @@ impl SLM { } let prefiltered = self.prefilter.filter(td); - let level = |a| 10. * Flt::log10(a) / self.Lrefsq; + let level_fun = |a| 10. * Flt::log10(a) / self.Lrefsq; let Lt_iter = self.channels.par_iter_mut().map(|ch| { let mut tmp = ch.bp.filter(&prefiltered); @@ -135,7 +135,7 @@ impl SLM { rectifier_down.setToDCValue(fup); fup } else { - rectifier_up.setToDCValue(fup); + rectifier_up.setToDCValue(fdown); fdown } }); @@ -155,7 +155,7 @@ impl SLM { // Update last signal power coming from SLM ch.stat.Pt_last = *filtered_squared.last().unwrap(); // Convert output to levels - filtered_squared.mapv_inplace(level); + filtered_squared.mapv_inplace(level_fun); tmp }); if provide_output { diff --git a/src/slm/tw.rs b/src/slm/tw.rs index 318986a..2367306 100644 --- a/src/slm/tw.rs +++ b/src/slm/tw.rs @@ -49,6 +49,10 @@ impl TimeWeighting { use TimeWeighting::*; vec![Slow {}, Fast {}, Impulse {}] } + #[pyo3(name = "getLowpassPoles")] + fn getLowpassPoles_py(&self) -> (Flt, Option) { + self.getLowpassPoles() + } } impl Default for TimeWeighting { @@ -91,7 +95,7 @@ impl TimeWeighting { CustomAsymmetric { tup, tdown } => { assert!(*tup > 0.); assert!(*tdown > 0.); - (-*tup, Some(-*tdown)) + (-1. / (*tup), Some(-1. / (*tdown))) } } }