From 36de394a2ef69c071a1a181cf7ba2470f472ce53 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F." Date: Sat, 2 Nov 2024 00:22:07 +0100 Subject: [PATCH] Re-implemented simpleclip, debugged --- src/rt/simpleclip.rs | 110 ++++++++++++++++++++++--------------------- 1 file changed, 57 insertions(+), 53 deletions(-) diff --git a/src/rt/simpleclip.rs b/src/rt/simpleclip.rs index e11fe55..dd8195e 100644 --- a/src/rt/simpleclip.rs +++ b/src/rt/simpleclip.rs @@ -5,7 +5,7 @@ use crate::math::max; use crate::math::maxabs; use crate::math::min; use crate::Flt; -use crossbeam::channel::bounded; +use crossbeam::channel::unbounded; use parking_lot::Mutex; use std::sync::atomic::Ordering::Relaxed; use std::{ @@ -15,7 +15,7 @@ use std::{ /// If signal is below / above the range times the value below, we indicate that /// the signal has clipped. -const CLIP_REL_LIMIT: Flt = 0.999; +const CLIP_REL_LIMIT: Flt = 0.99; /// Very simple clip detector. Used to detect cliping in a recording. Stores one /// clip value if just something happened between time of new and moment of drop(). @@ -32,7 +32,7 @@ impl SimpleClipDetector { /// /// - `smgr` - see [StreamMgr] pub fn new(smgr: &mut StreamMgr) -> Self { - let (tx, rx) = bounded(0); + let (tx, rx) = unbounded(); let clipstate = Arc::new(AtomicBool::new(false)); let stopThread = Arc::new(AtomicBool::new(false)); @@ -41,58 +41,62 @@ impl SimpleClipDetector { let stopThread2 = stopThread.clone(); smgr.addInQueue(tx); - rayon::spawn(move || loop { - let mut streammeta: Option> = None; - if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1500)) { - match msg { - InStreamMsg::InStreamData(dat) => { - let meta = streammeta - .expect("If we are here, stream metadata should be available"); - let flt = dat.getFloatData(); - let maxs = flt - .columns() - .into_iter() - .map(|col| max(col)) - .collect::>(); - let mins = flt - .columns() - .into_iter() - .map(|col| min(col)) - .collect::>(); + rayon::spawn( + move || { + let mut streammeta: Option> = None; + loop { + if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1500)) { + match msg { + InStreamMsg::StreamStarted(meta) => { + streammeta = Some(meta); + } + InStreamMsg::InStreamData(dat) => { + let meta = streammeta + .as_ref() + .expect("If we are here, stream metadata should be available"); + let flt = dat.getFloatData(); + let maxs = flt + .columns() + .into_iter() + .map(|col| max(col)) + .collect::>(); + let mins = flt + .columns() + .into_iter() + .map(|col| min(col)) + .collect::>(); - let mut clip = false; - - maxs.into_iter().zip(mins).zip(&meta.channelInfo).for_each( - |((max, min), ch)| { - let min_for_clip = CLIP_REL_LIMIT * ch.range.0; - let max_for_clip = CLIP_REL_LIMIT * ch.range.1; - if max >= max_for_clip { - clip = true; - } - if min <= min_for_clip { - clip = true; - } - }, - ); - - if clip { - clipstate.store(true, Relaxed); - // We do not have to do anything anymore. The signal - // has clipped so we do not have to check any new - // blocks anymore. - return; - } + maxs.into_iter().zip(mins).zip(&meta.channelInfo).for_each( + |((max, min), ch)| { + let min_for_clip = CLIP_REL_LIMIT * ch.range.0; + let max_for_clip = CLIP_REL_LIMIT * ch.range.1; + if max >= max_for_clip { + clipstate.store(true, Relaxed); + // We do not have to do anything anymore. The signal + // has clipped so we do not have to check any new + // blocks anymore. + return; + } + if min <= min_for_clip { + clipstate.store(true, Relaxed); + // We do not have to do anything anymore. The signal + // has clipped so we do not have to check any new + // blocks anymore. + return; + } + }, + ); + } + // Ignore other stream messages + _ => {} + } // end match msg + }; // end if let Ok(msg) + if stopThread.load(Relaxed) { + return; } - InStreamMsg::StreamStarted(meta) => { - streammeta = Some(meta); - } - _ => {} - } - }; - if stopThread.load(Relaxed) { - return; - } - }); + } // end of loop + }, // End of rayon closure + ); Self { clipped: clipstate2,