Better tuning of realtime ppm. Set timeouts in rtaps. Rtaps wrapped to Python. Bugfix in time constants for time weighting to compute the poles (not inverse of time weighting constant token). Bugfix in asymmetric time weighting for slm

This commit is contained in:
Anne de Jong 2024-10-06 14:26:41 +02:00
parent 94e478d372
commit 2f95c1a95d
8 changed files with 140 additions and 71 deletions

View File

@ -41,7 +41,7 @@ fn main() -> Result<()> {
println!("Stream started metadata: {meta:#?}");
},
InStreamMsg::InStreamData(_) => {}
_ => { println!("Other msg...");}
_ => { println!("Instreamdata arrived...");}
}
}
Err(e) => match e {

View File

@ -1,8 +1,10 @@
use super::*;
use crate::config::Flt;
use anyhow::Result;
/// Stream metadata. All information required for properly interpreting the raw
/// data that is coming from the stream.
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Clone, Debug)]
pub struct StreamMetaData {
/// Information for each channel in the stream
@ -58,3 +60,26 @@ impl StreamMetaData {
self.channelInfo.len()
}
}
/// Simple getters for all sub-attributes of the stream metadata.
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamMetaData {
#[getter]
pub fn channelInfo(&self) -> Vec<DaqChannel> {
self.channelInfo.clone()
}
#[getter]
fn rawDatatype(&self) -> DataType {
self.rawDatatype
}
#[getter]
fn samplerate(&self) -> Flt {
self.samplerate
}
#[getter]
fn framesPerBlock(&self) -> usize {
self.framesPerBlock
}
}

View File

@ -111,6 +111,12 @@ impl StreamMgr {
fn setSiggen_py(&mut self, siggen: Siggen) {
self.setSiggen(siggen)
}
#[pyo3(name = "getStreamMetaData")]
fn getStreamMetaData_py(&self, st: StreamType) -> Option<StreamMetaData> {
// Unfortunately (but not really, only cosmetically), the underlying
// value (not the Arc) has to be cloned.
self.getStreamMetaData(st).map(|b| (*b).clone())
}
}
impl Default for StreamMgr {
fn default() -> Self {
@ -274,7 +280,6 @@ impl StreamMgr {
}
}
if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) {
sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg);
}
}

View File

@ -55,22 +55,31 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<filter::Biquad>()?;
m.add_class::<filter::SeriesBiquad>()?;
m.add_class::<filter::BiquadBank>()?;
m.add_class::<siggen::Siggen>()?;
m.add_class::<filter::FilterSpec>()?;
m.add_class::<filter::ZPKModel>()?;
m.add_class::<filter::StandardFilterDescriptor>()?;
// Signal generator
m.add_class::<siggen::Siggen>()?;
// SLM
m.add_class::<slm::TimeWeighting>()?;
m.add_class::<ps::FreqWeighting>()?;
m.add_class::<slm::SLMSettings>()?;
m.add_class::<slm::SLM>()?;
m.add_class::<slm::TimeWeighting>()?;
// Power spectra classes
m.add_class::<ps::FreqWeighting>()?;
m.add_class::<ps::WindowType>()?;
m.add_class::<ps::Overlap>()?;
m.add_class::<ps::ApsMode>()?;
m.add_class::<ps::ApsSettings>()?;
m.add_class::<ps::AvPowerSpectra>()?;
// Real time classes
m.add_class::<rt::PPM>()?;
m.add_class::<rt::ClipState>()?;
m.add_class::<rt::RtAps>()?;
Ok(())
}

View File

@ -14,10 +14,10 @@ use std::time::{Duration, Instant};
/// When the level reaches ALMOST_CLIPPED_REL_AMP in amplitude, w.r.t. to the
/// full scale, we mark a channel as almost clipped.
const ALMOST_CLIPPED_REL_AMP: Flt = 0.95;
const ALMOST_CLIPPED_REL_AMP: Flt = 0.98;
/// If clipping occured, this is the time it keeps saying 'signal is clipped'
const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(4);
const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(2);
/// If the signal level falls below this value, we indicate that the signal level is low.
const LEVEL_THRESHOLD_FOR_LOW_LEVEL: Flt = -50.;
@ -69,9 +69,11 @@ impl PPM {
*status = Default::default();
slms.clear();
};
let mut sleep_dur = Duration::from_millis(50);
loop {
for msg in rxstream.try_iter() {
if let Some(msg) = rxstream
.recv_timeout(std::time::Duration::from_millis(10))
.ok()
{
match msg {
InStreamMsg::InStreamData(d) => {
let mut status = status.lock();
@ -120,11 +122,6 @@ impl PPM {
resetall(&mut slms);
}
InStreamMsg::StreamStarted(meta) => {
// Compute 'good' sleep time, as half the approximate time between two blocks
let block_time_us =
((1e6 * meta.framesPerBlock as Flt) / meta.samplerate) as u64;
sleep_dur = Duration::from_micros(std::cmp::max(1, block_time_us / 2));
// Re-initalize sound level meters
slms.clear();
let mut s = status.lock();
@ -136,6 +133,8 @@ impl PPM {
.freqWeighting(FreqWeighting::Z)
.Lref(1.0)
.timeWeighting(TimeWeighting::Impulse {})
// .timeWeighting(TimeWeighting::CustomAsymmetric { tup: 0.01, tdown: 1.0 })
// .timeWeighting(TimeWeighting::Fast {})
.filterDescriptors([
StandardFilterDescriptor::Overall().unwrap()
])
@ -152,10 +151,7 @@ impl PPM {
});
});
}
InStreamMsg::StreamStopped => {
// Restore sleep time to sensible polling default
sleep_dur = Duration::from_millis(50);
}
InStreamMsg::StreamStopped => {}
}
// Loop over any messages coming in from main thread
for msg in rxmsg.try_iter() {
@ -171,7 +167,6 @@ impl PPM {
}
}
}
std::thread::sleep(sleep_dur);
}
}
});

View File

@ -1,23 +1,26 @@
use std::ops::Deref;
use crate::config::*;
use std::thread::{self, JoinHandle};
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
use crate::ps::ApsSettings;
use crate::ps::{AvPowerSpectra, CPSResult};
use crate::I;
use anyhow::Result;
use crossbeam::channel::{unbounded, Receiver, Sender};
use parking_lot::Mutex;
use rayon::ThreadPool;
use std::ops::Deref;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
type SharedRtApsStatus = Arc<Mutex<Option<RtApsResult>>>;
#[derive(Debug)]
enum RtApsComm {
CommStopThread,
NewResult(CPSResult),
NewMeta(Arc<StreamMetaData>),
enum RtApsMessage {
StopThread,
ResetStatus,
}
/// Result type coming from Real time Averaged Power Spectra computing engine
/// Result type coming from Real time Averaged Power Spectra computation engine
#[derive(Debug)]
pub enum RtApsResult {
/// New result
NewResult(CPSResult),
@ -26,34 +29,48 @@ pub enum RtApsResult {
}
/// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent'
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug)]
pub struct RtAps {
/// Storage for optional last result
comm: Arc<Mutex<Option<RtApsComm>>>,
/// Settings used for real time power spectra.
pub settings: ApsSettings,
status: SharedRtApsStatus,
// For sending messages to the data processing thread
sender: Sender<RtApsMessage>,
}
impl RtAps {
/// Create new Real time power spectra computing engine.
pub fn new(mgr: &mut StreamMgr, settings: ApsSettings) -> RtAps {
// Handler needs to be created here.
let handler = StreamHandler::new(mgr);
let last_result = Arc::new(Mutex::new(None));
let last_result2 = last_result.clone();
let settings2 = settings.clone();
let status = Arc::new(Mutex::new(None));
let mut aps = AvPowerSpectra::new(settings);
let (sender, rx) = unbounded();
let aps = AvPowerSpectra::new(settings);
Self::startThread(aps, status.clone(), mgr, rx);
let thread = std::thread::spawn(move || {
// println!("Thread started...");
let rx = handler.rx;
RtAps { status, sender }
}
fn startThread(
mut aps: AvPowerSpectra,
status: SharedRtApsStatus,
smgr: &mut StreamMgr,
rxmsg: Receiver<RtApsMessage>,
) {
// Obtain messages from stream manager
let (tx, rxstream) = unbounded();
// Add queue sender part of queue to stream manager
smgr.addInQueue(tx);
rayon::spawn(move || {
// What is running on the thread
'mainloop: loop {
let mut last_cps: Option<CPSResult> = None;
let mut meta: Option<Arc<StreamMetaData>> = None;
if let Some(msg) = rx.recv_timeout(std::time::Duration::from_millis(10)).ok() {
if let Some(msg) = rxstream.recv_timeout(Duration::from_millis(10)).ok() {
match msg {
InStreamMsg::StreamStarted(new_meta) => {
aps.reset();
@ -74,22 +91,33 @@ impl RtAps {
}
}
// Communicate last result, if any.
'commscope: {
let mut last_result_lock = last_result.lock();
if let Some(RtApsComm::CommStopThread) = *last_result_lock {
// Check for messages and act accordingly
if let Some(msg) = rxmsg.recv_timeout(Duration::from_millis(10)).ok() {
match msg {
RtApsMessage::StopThread => {
let mut status = status.lock();
*status = None;
break 'mainloop;
}
RtApsMessage::ResetStatus => {
aps.reset();
}
}
}
// Communicate last result, if any.
'commscope: {
let mut status = status.lock();
if let Some(newmeta) = meta.take() {
// New metadata has arrived. This is always the first
// thing to push. Only when it is read, we will start
// pushing actual data.
*last_result_lock = Some(RtApsComm::NewMeta(newmeta));
*status = Some(RtApsResult::NewMeta(newmeta));
break 'commscope;
}
if let Some(RtApsComm::NewMeta(_)) = *last_result_lock {
if let Some(RtApsResult::NewMeta(_)) = status.deref() {
// New metadata is not yet read by reading thread. It
// basically means we are not yet ready to give actual
// data back.
@ -97,36 +125,39 @@ impl RtAps {
}
// Move last_cps into mutex.
if let Some(last_cps) = last_cps.take() {
*last_result_lock = Some(RtApsComm::NewResult(last_cps));
*status = Some(RtApsResult::NewResult(last_cps));
}
}
} // End of loop
} // end of commscope
} // End of mainloop
});
assert!(!thread.is_finished());
}
RtAps {
comm: last_result2,
settings: settings2,
}
}
/// Get last computed value. When new stream metadata is
/// Take last updated result.
pub fn get_last(&self) -> Option<RtApsResult> {
let mut lck = self.comm.lock();
let res = lck.take();
if let Some(res) = res {
match res {
RtApsComm::CommStopThread => panic!("BUG: CommStopThread should never be set!"),
RtApsComm::NewMeta(m) => return Some(RtApsResult::NewMeta(m)),
RtApsComm::NewResult(r) => return Some(RtApsResult::NewResult(r)),
}
}
None
let mut lck = self.status.lock();
lck.take()
}
}
impl Drop for RtAps {
fn drop(&mut self) {
let mut lck = self.comm.lock();
*lck = Some(RtApsComm::CommStopThread);
self.sender.send(RtApsMessage::StopThread).unwrap();
}
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl RtAps {
#[new]
fn new_py(smgr: &mut StreamMgr, settings: ApsSettings) -> Self {
RtAps::new(smgr, settings)
}
// This method does not forward the metadata. Should come from somewhere else
#[pyo3(name = "get_last")]
fn get_last_py<'py>(&self, py: Python<'py>) -> Option<Bound<'py, PyArray3<Cflt>>> {
let res = self.get_last();
if let Some(RtApsResult::NewResult(res)) = res {
return Some(res.to_pyarray_bound(py));
}
None
}
}

View File

@ -92,7 +92,7 @@ impl SLM {
}
let prefiltered = self.prefilter.filter(td);
let level = |a| 10. * Flt::log10(a) / self.Lrefsq;
let level_fun = |a| 10. * Flt::log10(a) / self.Lrefsq;
let Lt_iter = self.channels.par_iter_mut().map(|ch| {
let mut tmp = ch.bp.filter(&prefiltered);
@ -135,7 +135,7 @@ impl SLM {
rectifier_down.setToDCValue(fup);
fup
} else {
rectifier_up.setToDCValue(fup);
rectifier_up.setToDCValue(fdown);
fdown
}
});
@ -155,7 +155,7 @@ impl SLM {
// Update last signal power coming from SLM
ch.stat.Pt_last = *filtered_squared.last().unwrap();
// Convert output to levels
filtered_squared.mapv_inplace(level);
filtered_squared.mapv_inplace(level_fun);
tmp
});
if provide_output {

View File

@ -49,6 +49,10 @@ impl TimeWeighting {
use TimeWeighting::*;
vec![Slow {}, Fast {}, Impulse {}]
}
#[pyo3(name = "getLowpassPoles")]
fn getLowpassPoles_py(&self) -> (Flt, Option<Flt>) {
self.getLowpassPoles()
}
}
impl Default for TimeWeighting {
@ -91,7 +95,7 @@ impl TimeWeighting {
CustomAsymmetric { tup, tdown } => {
assert!(*tup > 0.);
assert!(*tdown > 0.);
(-*tup, Some(-*tdown))
(-1. / (*tup), Some(-1. / (*tdown)))
}
}
}