Compare commits
3 Commits
94e478d372
...
172356055c
Author | SHA1 | Date | |
---|---|---|---|
172356055c | |||
3cc3ff54a3 | |||
2f95c1a95d |
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lasprs"
|
||||
version = "0.6.2"
|
||||
version = "0.6.3"
|
||||
edition = "2021"
|
||||
authors = ["J.A. de Jong <j.a.dejong@ascee.nl>"]
|
||||
description = "Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)"
|
||||
|
@ -41,7 +41,7 @@ fn main() -> Result<()> {
|
||||
println!("Stream started metadata: {meta:#?}");
|
||||
},
|
||||
InStreamMsg::InStreamData(_) => {}
|
||||
_ => { println!("Other msg...");}
|
||||
_ => { println!("Instreamdata arrived...");}
|
||||
}
|
||||
}
|
||||
Err(e) => match e {
|
||||
|
@ -1,8 +1,10 @@
|
||||
use super::*;
|
||||
use crate::config::Flt;
|
||||
use anyhow::Result;
|
||||
|
||||
/// Stream metadata. All information required for properly interpreting the raw
|
||||
/// data that is coming from the stream.
|
||||
#[cfg_attr(feature = "python-bindings", pyclass)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct StreamMetaData {
|
||||
/// Information for each channel in the stream
|
||||
@ -58,3 +60,26 @@ impl StreamMetaData {
|
||||
self.channelInfo.len()
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple getters for all sub-attributes of the stream metadata.
|
||||
#[cfg(feature = "python-bindings")]
|
||||
#[cfg_attr(feature = "python-bindings", pymethods)]
|
||||
impl StreamMetaData {
|
||||
#[getter]
|
||||
pub fn channelInfo(&self) -> Vec<DaqChannel> {
|
||||
self.channelInfo.clone()
|
||||
}
|
||||
#[getter]
|
||||
fn rawDatatype(&self) -> DataType {
|
||||
self.rawDatatype
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn samplerate(&self) -> Flt {
|
||||
self.samplerate
|
||||
}
|
||||
#[getter]
|
||||
fn framesPerBlock(&self) -> usize {
|
||||
self.framesPerBlock
|
||||
}
|
||||
}
|
||||
|
@ -111,6 +111,12 @@ impl StreamMgr {
|
||||
fn setSiggen_py(&mut self, siggen: Siggen) {
|
||||
self.setSiggen(siggen)
|
||||
}
|
||||
#[pyo3(name = "getStreamMetaData")]
|
||||
fn getStreamMetaData_py(&self, st: StreamType) -> Option<StreamMetaData> {
|
||||
// Unfortunately (but not really, only cosmetically), the underlying
|
||||
// value (not the Arc) has to be cloned.
|
||||
self.getStreamMetaData(st).map(|b| (*b).clone())
|
||||
}
|
||||
}
|
||||
impl Default for StreamMgr {
|
||||
fn default() -> Self {
|
||||
@ -274,7 +280,6 @@ impl StreamMgr {
|
||||
}
|
||||
}
|
||||
if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) {
|
||||
|
||||
sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg);
|
||||
}
|
||||
}
|
||||
|
13
src/lib.rs
13
src/lib.rs
@ -55,22 +55,31 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
m.add_class::<filter::Biquad>()?;
|
||||
m.add_class::<filter::SeriesBiquad>()?;
|
||||
m.add_class::<filter::BiquadBank>()?;
|
||||
m.add_class::<siggen::Siggen>()?;
|
||||
m.add_class::<filter::FilterSpec>()?;
|
||||
m.add_class::<filter::ZPKModel>()?;
|
||||
m.add_class::<filter::StandardFilterDescriptor>()?;
|
||||
|
||||
// Signal generator
|
||||
m.add_class::<siggen::Siggen>()?;
|
||||
|
||||
// SLM
|
||||
m.add_class::<slm::TimeWeighting>()?;
|
||||
m.add_class::<ps::FreqWeighting>()?;
|
||||
m.add_class::<slm::SLMSettings>()?;
|
||||
m.add_class::<slm::SLM>()?;
|
||||
m.add_class::<slm::TimeWeighting>()?;
|
||||
|
||||
// Power spectra classes
|
||||
m.add_class::<ps::FreqWeighting>()?;
|
||||
m.add_class::<ps::WindowType>()?;
|
||||
m.add_class::<ps::Overlap>()?;
|
||||
m.add_class::<ps::ApsMode>()?;
|
||||
m.add_class::<ps::ApsSettings>()?;
|
||||
m.add_class::<ps::AvPowerSpectra>()?;
|
||||
|
||||
// Real time classes
|
||||
m.add_class::<rt::PPM>()?;
|
||||
m.add_class::<rt::ClipState>()?;
|
||||
m.add_class::<rt::RtAps>()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ use crate::{config::*, FreqWeighting, StandardFilterDescriptor};
|
||||
use crate::{daq::StreamMgr, Dcol};
|
||||
use crossbeam::channel::{unbounded, Receiver, Sender};
|
||||
use crossbeam::utils::Backoff;
|
||||
use ndarray_stats::QuantileExt;
|
||||
use parking_lot::Mutex;
|
||||
use std::default;
|
||||
use std::ops::DerefMut;
|
||||
@ -14,10 +13,10 @@ use std::time::{Duration, Instant};
|
||||
|
||||
/// When the level reaches ALMOST_CLIPPED_REL_AMP in amplitude, w.r.t. to the
|
||||
/// full scale, we mark a channel as almost clipped.
|
||||
const ALMOST_CLIPPED_REL_AMP: Flt = 0.95;
|
||||
const ALMOST_CLIPPED_REL_AMP: Flt = 0.98;
|
||||
|
||||
/// If clipping occured, this is the time it keeps saying 'signal is clipped'
|
||||
const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(4);
|
||||
const CLIP_INDICATOR_WAIT_S: Duration = Duration::from_secs(2);
|
||||
|
||||
/// If the signal level falls below this value, we indicate that the signal level is low.
|
||||
const LEVEL_THRESHOLD_FOR_LOW_LEVEL: Flt = -50.;
|
||||
@ -69,9 +68,11 @@ impl PPM {
|
||||
*status = Default::default();
|
||||
slms.clear();
|
||||
};
|
||||
let mut sleep_dur = Duration::from_millis(50);
|
||||
loop {
|
||||
for msg in rxstream.try_iter() {
|
||||
if let Some(msg) = rxstream
|
||||
.recv_timeout(std::time::Duration::from_millis(10))
|
||||
.ok()
|
||||
{
|
||||
match msg {
|
||||
InStreamMsg::InStreamData(d) => {
|
||||
let mut status = status.lock();
|
||||
@ -120,11 +121,6 @@ impl PPM {
|
||||
resetall(&mut slms);
|
||||
}
|
||||
InStreamMsg::StreamStarted(meta) => {
|
||||
// Compute 'good' sleep time, as half the approximate time between two blocks
|
||||
let block_time_us =
|
||||
((1e6 * meta.framesPerBlock as Flt) / meta.samplerate) as u64;
|
||||
sleep_dur = Duration::from_micros(std::cmp::max(1, block_time_us / 2));
|
||||
|
||||
// Re-initalize sound level meters
|
||||
slms.clear();
|
||||
let mut s = status.lock();
|
||||
@ -136,6 +132,8 @@ impl PPM {
|
||||
.freqWeighting(FreqWeighting::Z)
|
||||
.Lref(1.0)
|
||||
.timeWeighting(TimeWeighting::Impulse {})
|
||||
// .timeWeighting(TimeWeighting::CustomAsymmetric { tup: 0.01, tdown: 1.0 })
|
||||
// .timeWeighting(TimeWeighting::Fast {})
|
||||
.filterDescriptors([
|
||||
StandardFilterDescriptor::Overall().unwrap()
|
||||
])
|
||||
@ -152,10 +150,7 @@ impl PPM {
|
||||
});
|
||||
});
|
||||
}
|
||||
InStreamMsg::StreamStopped => {
|
||||
// Restore sleep time to sensible polling default
|
||||
sleep_dur = Duration::from_millis(50);
|
||||
}
|
||||
InStreamMsg::StreamStopped => {}
|
||||
}
|
||||
// Loop over any messages coming in from main thread
|
||||
for msg in rxmsg.try_iter() {
|
||||
@ -171,7 +166,6 @@ impl PPM {
|
||||
}
|
||||
}
|
||||
}
|
||||
std::thread::sleep(sleep_dur);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
129
src/rt/rtaps.rs
129
src/rt/rtaps.rs
@ -1,23 +1,26 @@
|
||||
use std::ops::Deref;
|
||||
use crate::config::*;
|
||||
use std::thread::{self, JoinHandle};
|
||||
|
||||
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
|
||||
use crate::ps::ApsSettings;
|
||||
use crate::ps::{AvPowerSpectra, CPSResult};
|
||||
use crate::I;
|
||||
use anyhow::Result;
|
||||
use crossbeam::channel::{unbounded, Receiver, Sender};
|
||||
use parking_lot::Mutex;
|
||||
use rayon::ThreadPool;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::time::Duration;
|
||||
|
||||
type SharedRtApsStatus = Arc<Mutex<Option<RtApsResult>>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum RtApsComm {
|
||||
CommStopThread,
|
||||
NewResult(CPSResult),
|
||||
NewMeta(Arc<StreamMetaData>),
|
||||
enum RtApsMessage {
|
||||
StopThread,
|
||||
ResetStatus,
|
||||
}
|
||||
/// Result type coming from Real time Averaged Power Spectra computing engine
|
||||
/// Result type coming from Real time Averaged Power Spectra computation engine
|
||||
#[derive(Debug)]
|
||||
pub enum RtApsResult {
|
||||
/// New result
|
||||
NewResult(CPSResult),
|
||||
@ -26,34 +29,48 @@ pub enum RtApsResult {
|
||||
}
|
||||
|
||||
/// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent'
|
||||
#[cfg_attr(feature = "python-bindings", pyclass)]
|
||||
#[derive(Debug)]
|
||||
pub struct RtAps {
|
||||
/// Storage for optional last result
|
||||
comm: Arc<Mutex<Option<RtApsComm>>>,
|
||||
/// Settings used for real time power spectra.
|
||||
pub settings: ApsSettings,
|
||||
status: SharedRtApsStatus,
|
||||
|
||||
// For sending messages to the data processing thread
|
||||
sender: Sender<RtApsMessage>,
|
||||
}
|
||||
|
||||
impl RtAps {
|
||||
/// Create new Real time power spectra computing engine.
|
||||
pub fn new(mgr: &mut StreamMgr, settings: ApsSettings) -> RtAps {
|
||||
// Handler needs to be created here.
|
||||
let handler = StreamHandler::new(mgr);
|
||||
let last_result = Arc::new(Mutex::new(None));
|
||||
let last_result2 = last_result.clone();
|
||||
let settings2 = settings.clone();
|
||||
let status = Arc::new(Mutex::new(None));
|
||||
|
||||
let mut aps = AvPowerSpectra::new(settings);
|
||||
let (sender, rx) = unbounded();
|
||||
let aps = AvPowerSpectra::new(settings);
|
||||
Self::startThread(aps, status.clone(), mgr, rx);
|
||||
|
||||
let thread = std::thread::spawn(move || {
|
||||
// println!("Thread started...");
|
||||
let rx = handler.rx;
|
||||
RtAps { status, sender }
|
||||
}
|
||||
|
||||
fn startThread(
|
||||
mut aps: AvPowerSpectra,
|
||||
status: SharedRtApsStatus,
|
||||
smgr: &mut StreamMgr,
|
||||
rxmsg: Receiver<RtApsMessage>,
|
||||
) {
|
||||
// Obtain messages from stream manager
|
||||
let (tx, rxstream) = unbounded();
|
||||
|
||||
// Add queue sender part of queue to stream manager
|
||||
smgr.addInQueue(tx);
|
||||
|
||||
rayon::spawn(move || {
|
||||
// What is running on the thread
|
||||
|
||||
'mainloop: loop {
|
||||
let mut last_cps: Option<CPSResult> = None;
|
||||
let mut meta: Option<Arc<StreamMetaData>> = None;
|
||||
|
||||
if let Some(msg) = rx.recv_timeout(std::time::Duration::from_millis(10)).ok() {
|
||||
if let Some(msg) = rxstream.recv_timeout(Duration::from_millis(10)).ok() {
|
||||
match msg {
|
||||
InStreamMsg::StreamStarted(new_meta) => {
|
||||
aps.reset();
|
||||
@ -74,22 +91,33 @@ impl RtAps {
|
||||
}
|
||||
}
|
||||
|
||||
// Check for messages and act accordingly
|
||||
if let Some(msg) = rxmsg.recv_timeout(Duration::from_millis(10)).ok() {
|
||||
match msg {
|
||||
RtApsMessage::StopThread => {
|
||||
let mut status = status.lock();
|
||||
*status = None;
|
||||
break 'mainloop;
|
||||
}
|
||||
RtApsMessage::ResetStatus => {
|
||||
aps.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Communicate last result, if any.
|
||||
'commscope: {
|
||||
let mut last_result_lock = last_result.lock();
|
||||
let mut status = status.lock();
|
||||
|
||||
if let Some(RtApsComm::CommStopThread) = *last_result_lock {
|
||||
break 'mainloop;
|
||||
}
|
||||
if let Some(newmeta) = meta.take() {
|
||||
// New metadata has arrived. This is always the first
|
||||
// thing to push. Only when it is read, we will start
|
||||
// pushing actual data.
|
||||
*last_result_lock = Some(RtApsComm::NewMeta(newmeta));
|
||||
*status = Some(RtApsResult::NewMeta(newmeta));
|
||||
break 'commscope;
|
||||
}
|
||||
|
||||
if let Some(RtApsComm::NewMeta(_)) = *last_result_lock {
|
||||
if let Some(RtApsResult::NewMeta(_)) = status.deref() {
|
||||
// New metadata is not yet read by reading thread. It
|
||||
// basically means we are not yet ready to give actual
|
||||
// data back.
|
||||
@ -97,36 +125,39 @@ impl RtAps {
|
||||
}
|
||||
// Move last_cps into mutex.
|
||||
if let Some(last_cps) = last_cps.take() {
|
||||
*last_result_lock = Some(RtApsComm::NewResult(last_cps));
|
||||
*status = Some(RtApsResult::NewResult(last_cps));
|
||||
}
|
||||
}
|
||||
} // End of loop
|
||||
} // end of commscope
|
||||
} // End of mainloop
|
||||
});
|
||||
assert!(!thread.is_finished());
|
||||
|
||||
RtAps {
|
||||
comm: last_result2,
|
||||
settings: settings2,
|
||||
}
|
||||
}
|
||||
/// Get last computed value. When new stream metadata is
|
||||
|
||||
/// Take last updated result.
|
||||
pub fn get_last(&self) -> Option<RtApsResult> {
|
||||
let mut lck = self.comm.lock();
|
||||
let res = lck.take();
|
||||
if let Some(res) = res {
|
||||
match res {
|
||||
RtApsComm::CommStopThread => panic!("BUG: CommStopThread should never be set!"),
|
||||
RtApsComm::NewMeta(m) => return Some(RtApsResult::NewMeta(m)),
|
||||
RtApsComm::NewResult(r) => return Some(RtApsResult::NewResult(r)),
|
||||
}
|
||||
}
|
||||
None
|
||||
let mut lck = self.status.lock();
|
||||
lck.take()
|
||||
}
|
||||
}
|
||||
impl Drop for RtAps {
|
||||
fn drop(&mut self) {
|
||||
let mut lck = self.comm.lock();
|
||||
*lck = Some(RtApsComm::CommStopThread);
|
||||
self.sender.send(RtApsMessage::StopThread).unwrap();
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "python-bindings")]
|
||||
#[cfg_attr(feature = "python-bindings", pymethods)]
|
||||
impl RtAps {
|
||||
#[new]
|
||||
fn new_py(smgr: &mut StreamMgr, settings: ApsSettings) -> Self {
|
||||
RtAps::new(smgr, settings)
|
||||
}
|
||||
// This method does not forward the metadata. Should come from somewhere else
|
||||
#[pyo3(name = "get_last")]
|
||||
fn get_last_py<'py>(&self, py: Python<'py>) -> Option<Bound<'py, PyArray3<Cflt>>> {
|
||||
let res = self.get_last();
|
||||
if let Some(RtApsResult::NewResult(res)) = res {
|
||||
return Some(res.to_pyarray_bound(py));
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -92,7 +92,7 @@ impl SLM {
|
||||
}
|
||||
let prefiltered = self.prefilter.filter(td);
|
||||
|
||||
let level = |a| 10. * Flt::log10(a) / self.Lrefsq;
|
||||
let level_fun = |a| 10. * Flt::log10(a) / self.Lrefsq;
|
||||
|
||||
let Lt_iter = self.channels.par_iter_mut().map(|ch| {
|
||||
let mut tmp = ch.bp.filter(&prefiltered);
|
||||
@ -135,7 +135,7 @@ impl SLM {
|
||||
rectifier_down.setToDCValue(fup);
|
||||
fup
|
||||
} else {
|
||||
rectifier_up.setToDCValue(fup);
|
||||
rectifier_up.setToDCValue(fdown);
|
||||
fdown
|
||||
}
|
||||
});
|
||||
@ -155,7 +155,7 @@ impl SLM {
|
||||
// Update last signal power coming from SLM
|
||||
ch.stat.Pt_last = *filtered_squared.last().unwrap();
|
||||
// Convert output to levels
|
||||
filtered_squared.mapv_inplace(level);
|
||||
filtered_squared.mapv_inplace(level_fun);
|
||||
tmp
|
||||
});
|
||||
if provide_output {
|
||||
|
@ -49,6 +49,10 @@ impl TimeWeighting {
|
||||
use TimeWeighting::*;
|
||||
vec![Slow {}, Fast {}, Impulse {}]
|
||||
}
|
||||
#[pyo3(name = "getLowpassPoles")]
|
||||
fn getLowpassPoles_py(&self) -> (Flt, Option<Flt>) {
|
||||
self.getLowpassPoles()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TimeWeighting {
|
||||
@ -91,7 +95,7 @@ impl TimeWeighting {
|
||||
CustomAsymmetric { tup, tdown } => {
|
||||
assert!(*tup > 0.);
|
||||
assert!(*tdown > 0.);
|
||||
(-*tup, Some(-*tdown))
|
||||
(-1. / (*tup), Some(-1. / (*tdown)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user