Compare commits

...

3 Commits

9 changed files with 141 additions and 73 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "lasprs"
version = "0.6.2"
version = "0.6.3"
edition = "2021"
authors = ["J.A. de Jong <j.a.dejong@ascee.nl>"]
description = "Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)"

View File

@ -41,7 +41,7 @@ fn main() -> Result<()> {
println!("Stream started metadata: {meta:#?}");
},
InStreamMsg::InStreamData(_) => {}
_ => { println!("Other msg...");}
_ => { println!("Instreamdata arrived...");}
}
}
Err(e) => match e {

View File

@ -1,8 +1,10 @@
use super::*;
use crate::config::Flt;
use anyhow::Result;
/// Stream metadata. All information required for properly interpreting the raw
/// data that is coming from the stream.
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Clone, Debug)]
pub struct StreamMetaData {
/// Information for each channel in the stream
@ -58,3 +60,26 @@ impl StreamMetaData {
self.channelInfo.len()
}
}
/// Simple getters for all sub-attributes of the stream metadata.
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamMetaData {
#[getter]
pub fn channelInfo(&self) -> Vec<DaqChannel> {
self.channelInfo.clone()
}
#[getter]
fn rawDatatype(&self) -> DataType {
self.rawDatatype
}
#[getter]
fn samplerate(&self) -> Flt {
self.samplerate
}
#[getter]
fn framesPerBlock(&self) -> usize {
self.framesPerBlock
}
}

View File

@ -111,6 +111,12 @@ impl StreamMgr {
fn setSiggen_py(&mut self, siggen: Siggen) {
self.setSiggen(siggen)
}
#[pyo3(name = "getStreamMetaData")]
fn getStreamMetaData_py(&self, st: StreamType) -> Option<StreamMetaData> {
// Unfortunately (but not really, only cosmetically), the underlying
// value (not the Arc) has to be cloned.
self.getStreamMetaData(st).map(|b| (*b).clone())
}
}
impl Default for StreamMgr {
fn default() -> Self {
@ -274,7 +280,6 @@ impl StreamMgr {
}
}
if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) {
sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg);
}
}

View File

@ -55,22 +55,31 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<filter::Biquad>()?;
m.add_class::<filter::SeriesBiquad>()?;
m.add_class::<filter::BiquadBank>()?;
m.add_class::<siggen::Siggen>()?;
m.add_class::<filter::FilterSpec>()?;
m.add_class::<filter::ZPKModel>()?;
m.add_class::<filter::StandardFilterDescriptor>()?;
// Signal generator
m.add_class::<siggen::Siggen>()?;
// SLM
m.add_class::<slm::TimeWeighting>()?;
m.add_class::<ps::FreqWeighting>()?;
m.add_class::<slm::SLMSettings>()?;
m.add_class::<slm::SLM>()?;
m.add_class::<slm::TimeWeighting>()?;
// Power spectra classes
m.add_class::<ps::FreqWeighting>()?;
m.add_class::<ps::WindowType>()?;
m.add_class::<ps::Overlap>()?;
m.add_class::<ps::ApsMode>()?;
m.add_class::<ps::ApsSettings>()?;
m.add_class::<ps::AvPowerSpectra>()?;
// Real time classes
m.add_class::<rt::PPM>()?;
m.add_class::<rt::ClipState>()?;
m.add_class::<rt::RtAps>()?;
Ok(())
}

View File

@ -5,7 +5,6 @@ use crate::{config::*, FreqWeighting, StandardFilterDescriptor};
use crate::{daq::StreamMgr, Dcol};
use crossbeam::channel::{unbounded, Receiver, Sender};
use crossbeam::utils::Backoff;
use ndarray_stats::QuantileExt;
use parking_lot::Mutex;
use std::default;
use std::ops::DerefMut;
@ -14,10 +13,10 @@ use std::time::{Duration, Instant};
/// When the level reaches ALMOST_CLIPPED_REL_AMP in amplitude, w.r.t. to the
/// full scale, we mark a channel as almost clipped.
const ALMOST_CLIPPED_REL_AMP: Flt = 0.95;
const ALMOST_CLIPPED_REL_AMP: Flt = 0.98;
/// If clipping occured, this is the time it keeps saying 'signal is clipped'
const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(4);
const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(2);
/// If the signal level falls below this value, we indicate that the signal level is low.
const LEVEL_THRESHOLD_FOR_LOW_LEVEL: Flt = -50.;
@ -69,9 +68,11 @@ impl PPM {
*status = Default::default();
slms.clear();
};
let mut sleep_dur = Duration::from_millis(50);
loop {
for msg in rxstream.try_iter() {
if let Some(msg) = rxstream
.recv_timeout(std::time::Duration::from_millis(10))
.ok()
{
match msg {
InStreamMsg::InStreamData(d) => {
let mut status = status.lock();
@ -120,11 +121,6 @@ impl PPM {
resetall(&mut slms);
}
InStreamMsg::StreamStarted(meta) => {
// Compute 'good' sleep time, as half the approximate time between two blocks
let block_time_us =
((1e6 * meta.framesPerBlock as Flt) / meta.samplerate) as u64;
sleep_dur = Duration::from_micros(std::cmp::max(1, block_time_us / 2));
// Re-initalize sound level meters
slms.clear();
let mut s = status.lock();
@ -136,6 +132,8 @@ impl PPM {
.freqWeighting(FreqWeighting::Z)
.Lref(1.0)
.timeWeighting(TimeWeighting::Impulse {})
// .timeWeighting(TimeWeighting::CustomAsymmetric { tup: 0.01, tdown: 1.0 })
// .timeWeighting(TimeWeighting::Fast {})
.filterDescriptors([
StandardFilterDescriptor::Overall().unwrap()
])
@ -152,10 +150,7 @@ impl PPM {
});
});
}
InStreamMsg::StreamStopped => {
// Restore sleep time to sensible polling default
sleep_dur = Duration::from_millis(50);
}
InStreamMsg::StreamStopped => {}
}
// Loop over any messages coming in from main thread
for msg in rxmsg.try_iter() {
@ -171,7 +166,6 @@ impl PPM {
}
}
}
std::thread::sleep(sleep_dur);
}
}
});

View File

@ -1,23 +1,26 @@
use std::ops::Deref;
use crate::config::*;
use std::thread::{self, JoinHandle};
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
use crate::ps::ApsSettings;
use crate::ps::{AvPowerSpectra, CPSResult};
use crate::I;
use anyhow::Result;
use crossbeam::channel::{unbounded, Receiver, Sender};
use parking_lot::Mutex;
use rayon::ThreadPool;
use std::ops::Deref;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
type SharedRtApsStatus = Arc<Mutex<Option<RtApsResult>>>;
#[derive(Debug)]
enum RtApsComm {
CommStopThread,
NewResult(CPSResult),
NewMeta(Arc<StreamMetaData>),
enum RtApsMessage {
StopThread,
ResetStatus,
}
/// Result type coming from Real time Averaged Power Spectra computing engine
/// Result type coming from Real time Averaged Power Spectra computation engine
#[derive(Debug)]
pub enum RtApsResult {
/// New result
NewResult(CPSResult),
@ -26,34 +29,48 @@ pub enum RtApsResult {
}
/// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent'
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug)]
pub struct RtAps {
/// Storage for optional last result
comm: Arc<Mutex<Option<RtApsComm>>>,
/// Settings used for real time power spectra.
pub settings: ApsSettings,
status: SharedRtApsStatus,
// For sending messages to the data processing thread
sender: Sender<RtApsMessage>,
}
impl RtAps {
/// Create new Real time power spectra computing engine.
pub fn new(mgr: &mut StreamMgr, settings: ApsSettings) -> RtAps {
// Handler needs to be created here.
let handler = StreamHandler::new(mgr);
let last_result = Arc::new(Mutex::new(None));
let last_result2 = last_result.clone();
let settings2 = settings.clone();
let status = Arc::new(Mutex::new(None));
let mut aps = AvPowerSpectra::new(settings);
let (sender, rx) = unbounded();
let aps = AvPowerSpectra::new(settings);
Self::startThread(aps, status.clone(), mgr, rx);
let thread = std::thread::spawn(move || {
// println!("Thread started...");
let rx = handler.rx;
RtAps { status, sender }
}
fn startThread(
mut aps: AvPowerSpectra,
status: SharedRtApsStatus,
smgr: &mut StreamMgr,
rxmsg: Receiver<RtApsMessage>,
) {
// Obtain messages from stream manager
let (tx, rxstream) = unbounded();
// Add queue sender part of queue to stream manager
smgr.addInQueue(tx);
rayon::spawn(move || {
// What is running on the thread
'mainloop: loop {
let mut last_cps: Option<CPSResult> = None;
let mut meta: Option<Arc<StreamMetaData>> = None;
if let Some(msg) = rx.recv_timeout(std::time::Duration::from_millis(10)).ok() {
if let Some(msg) = rxstream.recv_timeout(Duration::from_millis(10)).ok() {
match msg {
InStreamMsg::StreamStarted(new_meta) => {
aps.reset();
@ -74,22 +91,33 @@ impl RtAps {
}
}
// Check for messages and act accordingly
if let Some(msg) = rxmsg.recv_timeout(Duration::from_millis(10)).ok() {
match msg {
RtApsMessage::StopThread => {
let mut status = status.lock();
*status = None;
break 'mainloop;
}
RtApsMessage::ResetStatus => {
aps.reset();
}
}
}
// Communicate last result, if any.
'commscope: {
let mut last_result_lock = last_result.lock();
let mut status = status.lock();
if let Some(RtApsComm::CommStopThread) = *last_result_lock {
break 'mainloop;
}
if let Some(newmeta) = meta.take() {
// New metadata has arrived. This is always the first
// thing to push. Only when it is read, we will start
// pushing actual data.
*last_result_lock = Some(RtApsComm::NewMeta(newmeta));
*status = Some(RtApsResult::NewMeta(newmeta));
break 'commscope;
}
if let Some(RtApsComm::NewMeta(_)) = *last_result_lock {
if let Some(RtApsResult::NewMeta(_)) = status.deref() {
// New metadata is not yet read by reading thread. It
// basically means we are not yet ready to give actual
// data back.
@ -97,36 +125,39 @@ impl RtAps {
}
// Move last_cps into mutex.
if let Some(last_cps) = last_cps.take() {
*last_result_lock = Some(RtApsComm::NewResult(last_cps));
*status = Some(RtApsResult::NewResult(last_cps));
}
}
} // End of loop
} // end of commscope
} // End of mainloop
});
assert!(!thread.is_finished());
RtAps {
comm: last_result2,
settings: settings2,
}
}
/// Get last computed value. When new stream metadata is
/// Take last updated result.
pub fn get_last(&self) -> Option<RtApsResult> {
let mut lck = self.comm.lock();
let res = lck.take();
if let Some(res) = res {
match res {
RtApsComm::CommStopThread => panic!("BUG: CommStopThread should never be set!"),
RtApsComm::NewMeta(m) => return Some(RtApsResult::NewMeta(m)),
RtApsComm::NewResult(r) => return Some(RtApsResult::NewResult(r)),
}
}
None
let mut lck = self.status.lock();
lck.take()
}
}
impl Drop for RtAps {
fn drop(&mut self) {
let mut lck = self.comm.lock();
*lck = Some(RtApsComm::CommStopThread);
self.sender.send(RtApsMessage::StopThread).unwrap();
}
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl RtAps {
#[new]
fn new_py(smgr: &mut StreamMgr, settings: ApsSettings) -> Self {
RtAps::new(smgr, settings)
}
// This method does not forward the metadata. Should come from somewhere else
#[pyo3(name = "get_last")]
fn get_last_py<'py>(&self, py: Python<'py>) -> Option<Bound<'py, PyArray3<Cflt>>> {
let res = self.get_last();
if let Some(RtApsResult::NewResult(res)) = res {
return Some(res.to_pyarray_bound(py));
}
None
}
}

View File

@ -92,7 +92,7 @@ impl SLM {
}
let prefiltered = self.prefilter.filter(td);
let level = |a| 10. * Flt::log10(a) / self.Lrefsq;
let level_fun = |a| 10. * Flt::log10(a) / self.Lrefsq;
let Lt_iter = self.channels.par_iter_mut().map(|ch| {
let mut tmp = ch.bp.filter(&prefiltered);
@ -135,7 +135,7 @@ impl SLM {
rectifier_down.setToDCValue(fup);
fup
} else {
rectifier_up.setToDCValue(fup);
rectifier_up.setToDCValue(fdown);
fdown
}
});
@ -155,7 +155,7 @@ impl SLM {
// Update last signal power coming from SLM
ch.stat.Pt_last = *filtered_squared.last().unwrap();
// Convert output to levels
filtered_squared.mapv_inplace(level);
filtered_squared.mapv_inplace(level_fun);
tmp
});
if provide_output {

View File

@ -49,6 +49,10 @@ impl TimeWeighting {
use TimeWeighting::*;
vec![Slow {}, Fast {}, Impulse {}]
}
#[pyo3(name = "getLowpassPoles")]
fn getLowpassPoles_py(&self) -> (Flt, Option<Flt>) {
self.getLowpassPoles()
}
}
impl Default for TimeWeighting {
@ -91,7 +95,7 @@ impl TimeWeighting {
CustomAsymmetric { tup, tdown } => {
assert!(*tup > 0.);
assert!(*tdown > 0.);
(-*tup, Some(-*tdown))
(-1. / (*tup), Some(-1. / (*tdown)))
}
}
}