Compare commits

..

3 Commits

9 changed files with 141 additions and 73 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lasprs" name = "lasprs"
version = "0.6.2" version = "0.6.3"
edition = "2021" edition = "2021"
authors = ["J.A. de Jong <j.a.dejong@ascee.nl>"] authors = ["J.A. de Jong <j.a.dejong@ascee.nl>"]
description = "Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)" description = "Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)"

View File

@ -41,7 +41,7 @@ fn main() -> Result<()> {
println!("Stream started metadata: {meta:#?}"); println!("Stream started metadata: {meta:#?}");
}, },
InStreamMsg::InStreamData(_) => {} InStreamMsg::InStreamData(_) => {}
_ => { println!("Other msg...");} _ => { println!("Instreamdata arrived...");}
} }
} }
Err(e) => match e { Err(e) => match e {

View File

@ -1,8 +1,10 @@
use super::*; use super::*;
use crate::config::Flt; use crate::config::Flt;
use anyhow::Result; use anyhow::Result;
/// Stream metadata. All information required for properly interpreting the raw /// Stream metadata. All information required for properly interpreting the raw
/// data that is coming from the stream. /// data that is coming from the stream.
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct StreamMetaData { pub struct StreamMetaData {
/// Information for each channel in the stream /// Information for each channel in the stream
@ -58,3 +60,26 @@ impl StreamMetaData {
self.channelInfo.len() self.channelInfo.len()
} }
} }
/// Simple getters for all sub-attributes of the stream metadata.
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamMetaData {
#[getter]
pub fn channelInfo(&self) -> Vec<DaqChannel> {
self.channelInfo.clone()
}
#[getter]
fn rawDatatype(&self) -> DataType {
self.rawDatatype
}
#[getter]
fn samplerate(&self) -> Flt {
self.samplerate
}
#[getter]
fn framesPerBlock(&self) -> usize {
self.framesPerBlock
}
}

View File

@ -111,6 +111,12 @@ impl StreamMgr {
fn setSiggen_py(&mut self, siggen: Siggen) { fn setSiggen_py(&mut self, siggen: Siggen) {
self.setSiggen(siggen) self.setSiggen(siggen)
} }
#[pyo3(name = "getStreamMetaData")]
fn getStreamMetaData_py(&self, st: StreamType) -> Option<StreamMetaData> {
// Unfortunately (but not really, only cosmetically), the underlying
// value (not the Arc) has to be cloned.
self.getStreamMetaData(st).map(|b| (*b).clone())
}
} }
impl Default for StreamMgr { impl Default for StreamMgr {
fn default() -> Self { fn default() -> Self {
@ -274,7 +280,6 @@ impl StreamMgr {
} }
} }
if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) { if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) {
sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg); sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg);
} }
} }

View File

@ -55,22 +55,31 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<filter::Biquad>()?; m.add_class::<filter::Biquad>()?;
m.add_class::<filter::SeriesBiquad>()?; m.add_class::<filter::SeriesBiquad>()?;
m.add_class::<filter::BiquadBank>()?; m.add_class::<filter::BiquadBank>()?;
m.add_class::<siggen::Siggen>()?;
m.add_class::<filter::FilterSpec>()?; m.add_class::<filter::FilterSpec>()?;
m.add_class::<filter::ZPKModel>()?; m.add_class::<filter::ZPKModel>()?;
m.add_class::<filter::StandardFilterDescriptor>()?; m.add_class::<filter::StandardFilterDescriptor>()?;
// Signal generator
m.add_class::<siggen::Siggen>()?;
// SLM
m.add_class::<slm::TimeWeighting>()?; m.add_class::<slm::TimeWeighting>()?;
m.add_class::<ps::FreqWeighting>()?;
m.add_class::<slm::SLMSettings>()?; m.add_class::<slm::SLMSettings>()?;
m.add_class::<slm::SLM>()?; m.add_class::<slm::SLM>()?;
m.add_class::<slm::TimeWeighting>()?; m.add_class::<slm::TimeWeighting>()?;
// Power spectra classes
m.add_class::<ps::FreqWeighting>()?;
m.add_class::<ps::WindowType>()?; m.add_class::<ps::WindowType>()?;
m.add_class::<ps::Overlap>()?; m.add_class::<ps::Overlap>()?;
m.add_class::<ps::ApsMode>()?; m.add_class::<ps::ApsMode>()?;
m.add_class::<ps::ApsSettings>()?; m.add_class::<ps::ApsSettings>()?;
m.add_class::<ps::AvPowerSpectra>()?; m.add_class::<ps::AvPowerSpectra>()?;
// Real time classes
m.add_class::<rt::PPM>()?; m.add_class::<rt::PPM>()?;
m.add_class::<rt::ClipState>()?; m.add_class::<rt::ClipState>()?;
m.add_class::<rt::RtAps>()?;
Ok(()) Ok(())
} }

View File

@ -5,7 +5,6 @@ use crate::{config::*, FreqWeighting, StandardFilterDescriptor};
use crate::{daq::StreamMgr, Dcol}; use crate::{daq::StreamMgr, Dcol};
use crossbeam::channel::{unbounded, Receiver, Sender}; use crossbeam::channel::{unbounded, Receiver, Sender};
use crossbeam::utils::Backoff; use crossbeam::utils::Backoff;
use ndarray_stats::QuantileExt;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::default; use std::default;
use std::ops::DerefMut; use std::ops::DerefMut;
@ -14,10 +13,10 @@ use std::time::{Duration, Instant};
/// When the level reaches ALMOST_CLIPPED_REL_AMP in amplitude, w.r.t. to the /// When the level reaches ALMOST_CLIPPED_REL_AMP in amplitude, w.r.t. to the
/// full scale, we mark a channel as almost clipped. /// full scale, we mark a channel as almost clipped.
const ALMOST_CLIPPED_REL_AMP: Flt = 0.95; const ALMOST_CLIPPED_REL_AMP: Flt = 0.98;
/// If clipping occured, this is the time it keeps saying 'signal is clipped' /// If clipping occured, this is the time it keeps saying 'signal is clipped'
const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(4); const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(2);
/// If the signal level falls below this value, we indicate that the signal level is low. /// If the signal level falls below this value, we indicate that the signal level is low.
const LEVEL_THRESHOLD_FOR_LOW_LEVEL: Flt = -50.; const LEVEL_THRESHOLD_FOR_LOW_LEVEL: Flt = -50.;
@ -69,9 +68,11 @@ impl PPM {
*status = Default::default(); *status = Default::default();
slms.clear(); slms.clear();
}; };
let mut sleep_dur = Duration::from_millis(50);
loop { loop {
for msg in rxstream.try_iter() { if let Some(msg) = rxstream
.recv_timeout(std::time::Duration::from_millis(10))
.ok()
{
match msg { match msg {
InStreamMsg::InStreamData(d) => { InStreamMsg::InStreamData(d) => {
let mut status = status.lock(); let mut status = status.lock();
@ -120,11 +121,6 @@ impl PPM {
resetall(&mut slms); resetall(&mut slms);
} }
InStreamMsg::StreamStarted(meta) => { InStreamMsg::StreamStarted(meta) => {
// Compute 'good' sleep time, as half the approximate time between two blocks
let block_time_us =
((1e6 * meta.framesPerBlock as Flt) / meta.samplerate) as u64;
sleep_dur = Duration::from_micros(std::cmp::max(1, block_time_us / 2));
// Re-initalize sound level meters // Re-initalize sound level meters
slms.clear(); slms.clear();
let mut s = status.lock(); let mut s = status.lock();
@ -136,6 +132,8 @@ impl PPM {
.freqWeighting(FreqWeighting::Z) .freqWeighting(FreqWeighting::Z)
.Lref(1.0) .Lref(1.0)
.timeWeighting(TimeWeighting::Impulse {}) .timeWeighting(TimeWeighting::Impulse {})
// .timeWeighting(TimeWeighting::CustomAsymmetric { tup: 0.01, tdown: 1.0 })
// .timeWeighting(TimeWeighting::Fast {})
.filterDescriptors([ .filterDescriptors([
StandardFilterDescriptor::Overall().unwrap() StandardFilterDescriptor::Overall().unwrap()
]) ])
@ -152,10 +150,7 @@ impl PPM {
}); });
}); });
} }
InStreamMsg::StreamStopped => { InStreamMsg::StreamStopped => {}
// Restore sleep time to sensible polling default
sleep_dur = Duration::from_millis(50);
}
} }
// Loop over any messages coming in from main thread // Loop over any messages coming in from main thread
for msg in rxmsg.try_iter() { for msg in rxmsg.try_iter() {
@ -171,7 +166,6 @@ impl PPM {
} }
} }
} }
std::thread::sleep(sleep_dur);
} }
} }
}); });

View File

@ -1,23 +1,26 @@
use std::ops::Deref;
use crate::config::*; use crate::config::*;
use std::thread::{self, JoinHandle};
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr}; use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
use crate::ps::ApsSettings; use crate::ps::ApsSettings;
use crate::ps::{AvPowerSpectra, CPSResult}; use crate::ps::{AvPowerSpectra, CPSResult};
use crate::I; use crate::I;
use anyhow::Result; use anyhow::Result;
use crossbeam::channel::{unbounded, Receiver, Sender};
use parking_lot::Mutex; use parking_lot::Mutex;
use rayon::ThreadPool; use rayon::ThreadPool;
use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
type SharedRtApsStatus = Arc<Mutex<Option<RtApsResult>>>;
#[derive(Debug)] #[derive(Debug)]
enum RtApsComm { enum RtApsMessage {
CommStopThread, StopThread,
NewResult(CPSResult), ResetStatus,
NewMeta(Arc<StreamMetaData>),
} }
/// Result type coming from Real time Averaged Power Spectra computing engine /// Result type coming from Real time Averaged Power Spectra computation engine
#[derive(Debug)]
pub enum RtApsResult { pub enum RtApsResult {
/// New result /// New result
NewResult(CPSResult), NewResult(CPSResult),
@ -26,34 +29,48 @@ pub enum RtApsResult {
} }
/// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent' /// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent'
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug)]
pub struct RtAps { pub struct RtAps {
/// Storage for optional last result /// Storage for optional last result
comm: Arc<Mutex<Option<RtApsComm>>>, status: SharedRtApsStatus,
/// Settings used for real time power spectra.
pub settings: ApsSettings, // For sending messages to the data processing thread
sender: Sender<RtApsMessage>,
} }
impl RtAps { impl RtAps {
/// Create new Real time power spectra computing engine. /// Create new Real time power spectra computing engine.
pub fn new(mgr: &mut StreamMgr, settings: ApsSettings) -> RtAps { pub fn new(mgr: &mut StreamMgr, settings: ApsSettings) -> RtAps {
// Handler needs to be created here. // Handler needs to be created here.
let handler = StreamHandler::new(mgr); let status = Arc::new(Mutex::new(None));
let last_result = Arc::new(Mutex::new(None));
let last_result2 = last_result.clone();
let settings2 = settings.clone();
let mut aps = AvPowerSpectra::new(settings); let (sender, rx) = unbounded();
let aps = AvPowerSpectra::new(settings);
Self::startThread(aps, status.clone(), mgr, rx);
let thread = std::thread::spawn(move || { RtAps { status, sender }
// println!("Thread started..."); }
let rx = handler.rx;
fn startThread(
mut aps: AvPowerSpectra,
status: SharedRtApsStatus,
smgr: &mut StreamMgr,
rxmsg: Receiver<RtApsMessage>,
) {
// Obtain messages from stream manager
let (tx, rxstream) = unbounded();
// Add queue sender part of queue to stream manager
smgr.addInQueue(tx);
rayon::spawn(move || {
// What is running on the thread // What is running on the thread
'mainloop: loop { 'mainloop: loop {
let mut last_cps: Option<CPSResult> = None; let mut last_cps: Option<CPSResult> = None;
let mut meta: Option<Arc<StreamMetaData>> = None; let mut meta: Option<Arc<StreamMetaData>> = None;
if let Some(msg) = rx.recv_timeout(std::time::Duration::from_millis(10)).ok() { if let Some(msg) = rxstream.recv_timeout(Duration::from_millis(10)).ok() {
match msg { match msg {
InStreamMsg::StreamStarted(new_meta) => { InStreamMsg::StreamStarted(new_meta) => {
aps.reset(); aps.reset();
@ -74,22 +91,33 @@ impl RtAps {
} }
} }
// Communicate last result, if any. // Check for messages and act accordingly
'commscope: { if let Some(msg) = rxmsg.recv_timeout(Duration::from_millis(10)).ok() {
let mut last_result_lock = last_result.lock(); match msg {
RtApsMessage::StopThread => {
if let Some(RtApsComm::CommStopThread) = *last_result_lock { let mut status = status.lock();
*status = None;
break 'mainloop; break 'mainloop;
} }
RtApsMessage::ResetStatus => {
aps.reset();
}
}
}
// Communicate last result, if any.
'commscope: {
let mut status = status.lock();
if let Some(newmeta) = meta.take() { if let Some(newmeta) = meta.take() {
// New metadata has arrived. This is always the first // New metadata has arrived. This is always the first
// thing to push. Only when it is read, we will start // thing to push. Only when it is read, we will start
// pushing actual data. // pushing actual data.
*last_result_lock = Some(RtApsComm::NewMeta(newmeta)); *status = Some(RtApsResult::NewMeta(newmeta));
break 'commscope; break 'commscope;
} }
if let Some(RtApsComm::NewMeta(_)) = *last_result_lock { if let Some(RtApsResult::NewMeta(_)) = status.deref() {
// New metadata is not yet read by reading thread. It // New metadata is not yet read by reading thread. It
// basically means we are not yet ready to give actual // basically means we are not yet ready to give actual
// data back. // data back.
@ -97,36 +125,39 @@ impl RtAps {
} }
// Move last_cps into mutex. // Move last_cps into mutex.
if let Some(last_cps) = last_cps.take() { if let Some(last_cps) = last_cps.take() {
*last_result_lock = Some(RtApsComm::NewResult(last_cps)); *status = Some(RtApsResult::NewResult(last_cps));
} }
} } // end of commscope
} // End of loop } // End of mainloop
}); });
assert!(!thread.is_finished()); }
RtAps { /// Take last updated result.
comm: last_result2,
settings: settings2,
}
}
/// Get last computed value. When new stream metadata is
pub fn get_last(&self) -> Option<RtApsResult> { pub fn get_last(&self) -> Option<RtApsResult> {
let mut lck = self.comm.lock(); let mut lck = self.status.lock();
let res = lck.take(); lck.take()
if let Some(res) = res {
match res {
RtApsComm::CommStopThread => panic!("BUG: CommStopThread should never be set!"),
RtApsComm::NewMeta(m) => return Some(RtApsResult::NewMeta(m)),
RtApsComm::NewResult(r) => return Some(RtApsResult::NewResult(r)),
}
}
None
} }
} }
impl Drop for RtAps { impl Drop for RtAps {
fn drop(&mut self) { fn drop(&mut self) {
let mut lck = self.comm.lock(); self.sender.send(RtApsMessage::StopThread).unwrap();
*lck = Some(RtApsComm::CommStopThread); }
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl RtAps {
#[new]
fn new_py(smgr: &mut StreamMgr, settings: ApsSettings) -> Self {
RtAps::new(smgr, settings)
}
// This method does not forward the metadata. Should come from somewhere else
#[pyo3(name = "get_last")]
fn get_last_py<'py>(&self, py: Python<'py>) -> Option<Bound<'py, PyArray3<Cflt>>> {
let res = self.get_last();
if let Some(RtApsResult::NewResult(res)) = res {
return Some(res.to_pyarray_bound(py));
}
None
} }
} }

View File

@ -92,7 +92,7 @@ impl SLM {
} }
let prefiltered = self.prefilter.filter(td); let prefiltered = self.prefilter.filter(td);
let level = |a| 10. * Flt::log10(a) / self.Lrefsq; let level_fun = |a| 10. * Flt::log10(a) / self.Lrefsq;
let Lt_iter = self.channels.par_iter_mut().map(|ch| { let Lt_iter = self.channels.par_iter_mut().map(|ch| {
let mut tmp = ch.bp.filter(&prefiltered); let mut tmp = ch.bp.filter(&prefiltered);
@ -135,7 +135,7 @@ impl SLM {
rectifier_down.setToDCValue(fup); rectifier_down.setToDCValue(fup);
fup fup
} else { } else {
rectifier_up.setToDCValue(fup); rectifier_up.setToDCValue(fdown);
fdown fdown
} }
}); });
@ -155,7 +155,7 @@ impl SLM {
// Update last signal power coming from SLM // Update last signal power coming from SLM
ch.stat.Pt_last = *filtered_squared.last().unwrap(); ch.stat.Pt_last = *filtered_squared.last().unwrap();
// Convert output to levels // Convert output to levels
filtered_squared.mapv_inplace(level); filtered_squared.mapv_inplace(level_fun);
tmp tmp
}); });
if provide_output { if provide_output {

View File

@ -49,6 +49,10 @@ impl TimeWeighting {
use TimeWeighting::*; use TimeWeighting::*;
vec![Slow {}, Fast {}, Impulse {}] vec![Slow {}, Fast {}, Impulse {}]
} }
#[pyo3(name = "getLowpassPoles")]
fn getLowpassPoles_py(&self) -> (Flt, Option<Flt>) {
self.getLowpassPoles()
}
} }
impl Default for TimeWeighting { impl Default for TimeWeighting {
@ -91,7 +95,7 @@ impl TimeWeighting {
CustomAsymmetric { tup, tdown } => { CustomAsymmetric { tup, tdown } => {
assert!(*tup > 0.); assert!(*tup > 0.);
assert!(*tdown > 0.); assert!(*tdown > 0.);
(-*tup, Some(-*tdown)) (-1. / (*tup), Some(-1. / (*tdown)))
} }
} }
} }