From 90d598aa5d643cdc99654f190047f0be6a5b677d Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F." Date: Thu, 31 Oct 2024 21:46:08 +0100 Subject: [PATCH] Improved recording code (again). Added simple clip detector that detects whether a clip happened between its instanteation and its drop moment --- src/bin/lasp_record.rs | 24 +-- src/daq/mod.rs | 3 + src/daq/record.rs | 435 ++++++++++++++++++++++++----------------- src/rt/mod.rs | 2 + src/rt/simpleclip.rs | 78 ++++++++ 5 files changed, 350 insertions(+), 192 deletions(-) create mode 100644 src/rt/simpleclip.rs diff --git a/src/bin/lasp_record.rs b/src/bin/lasp_record.rs index 7234cc2..0072b00 100644 --- a/src/bin/lasp_record.rs +++ b/src/bin/lasp_record.rs @@ -60,25 +60,27 @@ fn main() -> Result<()> { let mut r = Recording::new(settings, &mut smgr)?; - println!("Starting to record... Enter 'c' to cancel."); + // println!("Starting to record... Enter 'c' to cancel."); 'infy: loop { match r.status() { - RecordStatus::Idle => println!("\nIdle"), - RecordStatus::Error(e) => { - println!("\nRecord error: {}", e); - break 'infy; - } - RecordStatus::Waiting => { + RecordStatus::Idle {} => println!("\nIdle"), + RecordStatus::Waiting {} => { println!("Waiting in start delay..."); } - RecordStatus::Finished => { + RecordStatus::Finished { clipped, error } => { println!("\nRecording finished."); + if clipped { + println!("Recording clipped!"); + } + if let Some(msg) = error { + println!("Recording failed with an error: {msg}"); + } break 'infy; } - RecordStatus::Recording(duration) => { - println!("Recording... {} ms", duration.as_millis()); + RecordStatus::Recording { recorded, .. } => { + println!("Recording... {:.0} ms", recorded * 1000.); } - RecordStatus::NoUpdate => {} + RecordStatus::NoUpdate {} => {} }; match stdin_channel.try_recv() { diff --git a/src/daq/mod.rs b/src/daq/mod.rs index 408463a..ec57f7b 100644 --- a/src/daq/mod.rs +++ b/src/daq/mod.rs @@ -75,6 +75,9 @@ pub fn add_py_classses(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/daq/record.rs b/src/daq/record.rs index 3e81ca5..ed1af1f 100644 --- a/src/daq/record.rs +++ b/src/daq/record.rs @@ -1,6 +1,7 @@ use super::*; use crate::config::Flt; -use anyhow::{bail, Error, Result}; +use crate::rt::SimpleClipDetector; +use anyhow::{anyhow, bail, Error, Result}; use clap::builder::OsStr; use crossbeam::atomic::AtomicCell; use hdf5::types::{VarLenArray, VarLenUnicode}; @@ -8,7 +9,7 @@ use hdf5::{dataset, datatype, Dataset, File, H5Type}; use parking_lot::Mutex; use std::path::{Path, PathBuf}; use std::str::FromStr; -use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; use std::sync::Arc; use std::thread::{spawn, JoinHandle}; use std::time::Duration; @@ -17,30 +18,44 @@ use streammgr::*; use streammsg::InStreamMsg; use strum::EnumMessage; -#[derive(Clone, Debug)] /// Status of a recording +#[cfg_attr(feature = "python-bindings", pyclass(get_all, eq))] +#[derive(Clone, Debug, PartialEq)] pub enum RecordStatus { /// Nothing to update - NoUpdate, + NoUpdate {}, /// Not yet started, waiting for first msg - Idle, + Idle {}, /// Waiting for start delay to be processed. - Waiting, + Waiting {}, /// Recording in progress - Recording(Duration), + Recording { + /// Whether a clip has already happened during recording + clipped: bool, + /// The amount of time that is currently recorded + recorded: Flt, + /// The percentage done [0 to 100%]. Stays at 0% for infinite duration + /// recordings + pct_done: Flt, + }, /// Recording finished - Finished, - - /// An error occurred, in any case when an error occurs, it is tried to remove the file. - Error(String), + Finished { + /// Whether there occured a signal clip during the recording. Can only + /// be detected when the flag 'record_clip` + clipped: bool, + /// Possibly, the recording finished, but an error happened, due to + /// which the data is invalid. + error: Option, + }, } /// Settings used to start a recording. #[derive(Clone)] +#[cfg_attr(feature = "python-bindings", pyclass(get_all))] pub struct RecordSettings { /// File name to record to. pub filename: PathBuf, @@ -74,6 +89,29 @@ impl RecordSettings { } } +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] +impl RecordSettings { + #[new] + #[pyo3(signature=(filename, duration=None, startDelay=None))] + fn new_py(filename: String, duration: Option, startDelay: Option) -> PyResult { + let startDelay = if let Some(delay) = startDelay { + let delay = Duration::try_from_secs_f64(delay) + .map_err(|e| anyhow! {"Invalid start delay specified: {e}"})?; + Some(delay) + } else { + None + }; + let duration = if let Some(duration) = duration { + Duration::try_from_secs_f64(duration) + .map_err(|e| anyhow! {"Invalid duration specified: {e}"})? + } else { + Duration::ZERO + }; + Ok(RecordSettings::new(filename, duration, startDelay)) + } +} + /// This struct lets a recording run on a stream, waits till the first data arrives and records for a given period of time. Usage: /// /// ``` @@ -94,14 +132,17 @@ impl RecordSettings { /// Ok(()) /// } /// ``` +#[cfg_attr(feature = "python-bindings", pyclass)] pub struct Recording { + // Recording settings settings: RecordSettings, - handle: Option>>, // Stop the recording. This stops the thread stopThread: Arc, + // Obtain status from thread. status_from_thread: Arc>, + // Stores latest status from thread, if no update comes from status_from_thread last_status: RecordStatus, } @@ -166,7 +207,8 @@ impl Recording { // or interleaved. This happens to be the default for ndarray as // well. RawStreamData::Datai8(dat) => { - let arr = ndarray15p6::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + let arr = + ndarray15p6::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; ds.write_slice(arr, (ctr, .., ..))?; } RawStreamData::Datai16(dat) => { @@ -210,6 +252,9 @@ impl Recording { } }; + // Detector for signal clipping + let clipdetector = SimpleClipDetector::new(mgr); + let stopThread = Arc::new(AtomicBool::new(false)); let stopThread_clone = stopThread.clone(); @@ -222,171 +267,200 @@ impl Recording { } let settings_clone = settings.clone(); - let status = Arc::new(AtomicCell::new(RecordStatus::Idle)); + let status = Arc::new(AtomicCell::new(RecordStatus::Idle {})); let status_clone = status.clone(); let (tx, rx) = crossbeam::channel::unbounded(); mgr.addInQueue(tx.clone()); // The thread doing the actual work - let handle = spawn(move || { - let file = File::create(settings.filename)?; + rayon::spawn(move || { + let recording_closure = || { + let file = File::create(settings.filename)?; - let firstmsg = match rx.recv() { - Ok(msg) => msg, - Err(_) => bail!("Queue handle error"), - }; + let firstmsg = match rx.recv() { + Ok(msg) => msg, + Err(_) => bail!("Queue handle error"), + }; - let meta = match firstmsg { - InStreamMsg::StreamStarted(meta) => meta, - _ => bail!("Recording failed. Missed stream metadata message."), - }; + let meta = match firstmsg { + InStreamMsg::StreamStarted(meta) => meta, + _ => bail!("Recording failed. Missed stream metadata message."), + }; - // Samplerate, block size, number of channels - Recording::write_hdf5_attr_scalar(&file, "samplerate", meta.samplerate)?; - Recording::write_hdf5_attr_scalar(&file, "nchannels", meta.nchannels())?; - Recording::write_hdf5_attr_scalar(&file, "blocksize", meta.framesPerBlock)?; + // Samplerate, block size, number of channels + Recording::write_hdf5_attr_scalar(&file, "samplerate", meta.samplerate)?; + Recording::write_hdf5_attr_scalar(&file, "nchannels", meta.nchannels())?; + Recording::write_hdf5_attr_scalar(&file, "blocksize", meta.framesPerBlock)?; - // Store sensitivity - let sens: Vec = meta.channelInfo.iter().map(|ch| ch.sensitivity).collect(); - Recording::write_hdf5_attr_list(&file, "sensitivity", &sens)?; + // Store sensitivity + let sens: Vec = meta.channelInfo.iter().map(|ch| ch.sensitivity).collect(); + Recording::write_hdf5_attr_list(&file, "sensitivity", &sens)?; - // Timestamp - use chrono::DateTime; - let now_utc = chrono::Utc::now(); - let timestamp = now_utc.timestamp(); - Recording::write_hdf5_attr_scalar(&file, "time", timestamp)?; + // Timestamp + use chrono::DateTime; + let now_utc = chrono::Utc::now(); + let timestamp = now_utc.timestamp(); + Recording::write_hdf5_attr_scalar(&file, "time", timestamp)?; - // Create UUID for measurement - use hdf5::types::VarLenUnicode; - let uuid = uuid::Uuid::new_v4(); - let uuid_unicode: VarLenUnicode = VarLenUnicode::from_str(&uuid.to_string()).unwrap(); - Recording::write_hdf5_attr_scalar(&file, "UUID", uuid_unicode)?; + // Create UUID for measurement + use hdf5::types::VarLenUnicode; + let uuid = uuid::Uuid::new_v4(); + let uuid_unicode: VarLenUnicode = + VarLenUnicode::from_str(&uuid.to_string()).unwrap(); + Recording::write_hdf5_attr_scalar(&file, "UUID", uuid_unicode)?; - // Channel names - let chnames: Vec = meta - .channelInfo - .iter() - .map(|ch| VarLenUnicode::from_str(&ch.name).unwrap()) - .collect(); - let chname_attr = file - .new_attr::() - .shape([chnames.len()]) - .create("channelNames")?; - chname_attr.write(&chnames)?; + // Channel names + let chnames: Vec = meta + .channelInfo + .iter() + .map(|ch| VarLenUnicode::from_str(&ch.name).unwrap()) + .collect(); + let chname_attr = file + .new_attr::() + .shape([chnames.len()]) + .create("channelNames")?; + chname_attr.write(&chnames)?; - // Create the dataset - let ds = Recording::create_dataset(&file, &meta)?; + // Create the dataset + let ds = Recording::create_dataset(&file, &meta)?; - let framesPerBlock = meta.framesPerBlock as usize; - let mut wait_block_ctr = 0; - // Indicate we are ready to rec! - if settings.startDelay > Duration::ZERO { - status.store(RecordStatus::Waiting); - let startdelay_s = settings.startDelay.as_micros() as Flt / 1e6; - wait_block_ctr = - (meta.samplerate as Flt * startdelay_s / framesPerBlock as Flt) as u32; - } else { - status.store(RecordStatus::Recording(Duration::ZERO)); - } - - // Counter of stored blocks - let mut stored_ctr = 0; - - // Offset in stream - let mut ctr_offset = 0; - - // Flag indicating that the first RawStreamData package still has to - // be arrived - let mut first = true; - - // Indicating the file is still empty (does not contain recorded data) - let mut empty_file = true; - - let nchannels = meta.nchannels() as usize; - 'recloop: loop { - if stopThread.load(SeqCst) { - break 'recloop; + let framesPerBlock = meta.framesPerBlock as usize; + let mut wait_block_ctr = 0; + // Indicate we are ready to rec! + if settings.startDelay > Duration::ZERO { + status.store(RecordStatus::Waiting {}); + let startdelay_s = settings.startDelay.as_micros() as Flt / 1e6; + wait_block_ctr = + (meta.samplerate as Flt * startdelay_s / framesPerBlock as Flt) as u32; + } else { + status.store(RecordStatus::Recording { + clipped: clipdetector.hasClipped(), + recorded: 0., + pct_done: 0., + }); } - match rx.recv().unwrap() { - InStreamMsg::StreamError(e) => { - bail!("Recording failed due to stream error: {}.", e) - } - InStreamMsg::StreamStarted(_) => { - bail!("Stream started again?") - } - InStreamMsg::StreamStopped => { - // Early stop. User stopped it. + + // Counter of stored blocks + let mut recorded_ctr = 0; + + // Offset in stream + let mut incoming_ctr = 0; + + // Flag indicating that the first RawStreamData package still has + // arrived + let mut firstblockhasarrived = false; + + // Indicating the file is still empty (does not contain recorded data) + let mut empty_file = true; + + let nchannels = meta.nchannels() as usize; + 'recloop: loop { + if stopThread.load(Relaxed) { break 'recloop; } - InStreamMsg::InStreamData(instreamdata) => { - if first { - first = false; - // Initialize counter offset - ctr_offset = instreamdata.ctr; - } else if instreamdata.ctr != stored_ctr + ctr_offset { - println!("********** PACKAGES MISSED ***********"); - bail!("Packages missed. Recording is invalid.") + match rx.recv().unwrap() { + InStreamMsg::StreamError(e) => { + bail!("Recording failed due to stream error: {}.", e) } - - if wait_block_ctr > 0 { - // We are still waiting - wait_block_ctr -= 1; - if wait_block_ctr == 0 { - status.store(RecordStatus::Recording(Duration::ZERO)); + InStreamMsg::StreamStarted(_) => { + bail!("Stream started again?") + } + InStreamMsg::StreamStopped => { + // Early stop. User stopped it. + break 'recloop; + } + InStreamMsg::InStreamData(instreamdata) => { + if ! firstblockhasarrived { + // Toggle flag. + firstblockhasarrived = true; + // Initialize counter offset + incoming_ctr = instreamdata.ctr; + } else { + incoming_ctr += 1; } - // TODO: Is it a good idea to increase the counter - // here, as well as below? - stored_ctr += 1; - continue 'recloop; - } - - ds.resize((stored_ctr + 1, framesPerBlock, nchannels))?; - Recording::append_to_dset( - &ds, - stored_ctr, - &instreamdata, - framesPerBlock, - nchannels, - )?; - // Once we have added to the file, this flag is swapped - // and a file should be deleted in case of an error. - empty_file = false; - - // Recorded time rounded of to milliseconds. - let recorded_time = Duration::from_millis( - ((1000 * (stored_ctr + 1) * framesPerBlock) as Flt / meta.samplerate) - as u64, - ); - - if !settings.duration.is_zero() { - // Duration not equal to zero, meaning we record up to a - // certain duration. - if recorded_time >= settings.duration { - break 'recloop; + if instreamdata.ctr != incoming_ctr { + eprintln!("********** PACKAGES MISSED ***********"); + bail!("Stream data blocks missed. Recording is invalid.") } + + if wait_block_ctr > 0 { + // We are still waiting + wait_block_ctr -= 1; + if wait_block_ctr == 0 { + status.store(RecordStatus::Recording { + clipped: clipdetector.hasClipped(), + recorded: 0., + pct_done: 0., + }); + } + continue 'recloop; + } + + ds.resize((recorded_ctr + 1, framesPerBlock, nchannels))?; + Recording::append_to_dset( + &ds, + recorded_ctr, + &instreamdata, + framesPerBlock, + nchannels, + )?; + // Once we have added to the file, this flag is swapped + // and a file should only be deleted in case of an error. + empty_file = false; + + // Recorded time rounded of to milliseconds. + let recorded_time = Duration::from_millis( + ((1000 * (recorded_ctr + 1) * framesPerBlock) as Flt + / meta.samplerate) as u64, + ); + + if !settings.duration.is_zero() { + // Duration not equal to zero, meaning we record up to a + // certain duration. + if recorded_time >= settings.duration { + break 'recloop; + } + } + // println!("\n... {} {} {}", recorded_time.as_millis(), meta.samplerate, framesPerBlock); + recorded_ctr += 1; + status.store(RecordStatus::Recording { + clipped: clipdetector.hasClipped(), + recorded: recorded_time.as_secs_f64(), + pct_done: if settings.duration > Duration::ZERO { + 100. * recorded_time.as_millis() as Flt + / settings.duration.as_millis() as Flt + } else { + 0. + }, + }); } - // println!("\n... {} {} {}", recorded_time.as_millis(), meta.samplerate, framesPerBlock); - stored_ctr += 1; - status.store(RecordStatus::Recording(recorded_time)); } + } // end of 'recloop + + if empty_file { + bail!("Recording stopped before any data is stored."); } - } // end of 'recloop - - if empty_file { - bail!("Recording stopped before any data is stored."); + Ok(()) + }; + if let Err(e) = recording_closure() { + status.store(RecordStatus::Finished { + clipped: clipdetector.hasClipped(), + error: Some(format!("{e}")), + }); + } else { + status.store(RecordStatus::Finished { + clipped: clipdetector.hasClipped(), + error: None, + }); + // End of thread } - - status.store(RecordStatus::Finished); - Ok(()) - // End of thread }); Ok(Recording { settings: settings_clone, stopThread: stopThread_clone, - handle: Some(handle), - last_status: RecordStatus::NoUpdate, + last_status: RecordStatus::NoUpdate {}, status_from_thread: status_clone, }) } @@ -395,76 +469,75 @@ impl Recording { // occured), or when cancel() is called, or when recording object is dropped // while thread is still running. fn deleteFile(&self) { - if let Some(_) = self.handle { - panic!("Misuse bug: cannot delete file while thread is still running"); - } // File should not be un use anymore, as thread is joined. // In case of error, we try to delete the file if let Err(e) = std::fs::remove_file(&self.settings.filename) { eprintln!("Recording failed, but file removal failed as well: {}", e); } } - - // Join the thread, store the last status. Please make sure it is joinable, - // otherwise this method will hang forever. - fn cleanupThread(&mut self) { - if let Some(h) = self.handle.take() { - let res = h.join().unwrap(); - if let Err(e) = res { - self.last_status = RecordStatus::Error(format!("{}", e)); - } - } +} +#[cfg_attr(feature = "python-bindings", pymethods)] +impl Recording { + #[cfg(feature = "python-bindings")] + #[new] + fn new_py(settings: RecordSettings, mgr: &mut StreamMgr) -> PyResult { + Ok(Recording::new(settings, mgr)?) } + /// Get current record status pub fn status(&mut self) -> RecordStatus { // Update status due to normal messaging - let status_from_thread = self.status_from_thread.swap(RecordStatus::NoUpdate); + let status_from_thread = self.status_from_thread.swap(RecordStatus::NoUpdate {}); match status_from_thread { - RecordStatus::NoUpdate => {} + RecordStatus::NoUpdate {} => {} _ => { self.last_status = status_from_thread; } } - if let Some(h) = &self.handle { - // Update the status by taking any error messages - if h.is_finished() { - self.cleanupThread(); - } - } // Return latest status self.last_status.clone() } /// Stop existing recording early. At the current time, or st pub fn stop(&mut self) { - // Stop thread , join, update status - self.stopThread.store(true, SeqCst); - self.cleanupThread(); + // Stop thread and wait for message of finished + self.stopThread.store(true, Relaxed); + self.waitForThread(); match self.status() { - RecordStatus::Finished => { // Do nothing + RecordStatus::Finished { error, .. } => { + if error.is_some() { + // Delete file if there was an error + self.deleteFile(); + } } _ => { - // an error occured, we try to delete the backing file - self.deleteFile() + panic!("RecordStatus should be finished!"); } } } /// Cancel recording. Deletes the recording file pub fn cancel(&mut self) { - self.stopThread.store(true, SeqCst); - self.cleanupThread(); + self.stopThread.store(true, Relaxed); + self.waitForThread(); self.deleteFile(); } + fn waitForThread(&mut self) { + while !matches!(self.status(), RecordStatus::Finished { .. }) { + std::thread::sleep(Duration::from_millis(10)); + } + } } impl Drop for Recording { fn drop(&mut self) { - if self.handle.is_some() { - // If we enter here, stop() or cancel() has not been called. In that - // case, we cleanup here by cancelling the recording - self.cancel(); + // If we enter here, stop() or cancel() has not been called. In that + // case, we cleanup here by cancelling the recording and deleting the file. + if !matches!(self.status(), RecordStatus::Finished { .. }) { + self.stopThread.store(true, Relaxed); + self.waitForThread(); + self.deleteFile(); } } } diff --git a/src/rt/mod.rs b/src/rt/mod.rs index fb97e3e..d46b712 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -4,6 +4,8 @@ mod ppm; mod rtaps; mod rtview; +mod simpleclip; pub use ppm::{PPM, ClipState}; pub use rtaps::{RtAps, RtApsResult}; pub use rtview::RtViewer; +pub use simpleclip::SimpleClipDetector; diff --git a/src/rt/simpleclip.rs b/src/rt/simpleclip.rs new file mode 100644 index 0000000..6d7853e --- /dev/null +++ b/src/rt/simpleclip.rs @@ -0,0 +1,78 @@ +use crate::daq::InStreamMsg; +use crate::daq::StreamMgr; +use crate::math::maxabs; +use crate::Flt; +use crossbeam::channel::bounded; +use parking_lot::Mutex; +use std::sync::atomic::Ordering::Relaxed; +use std::{ + sync::{atomic::AtomicBool, Arc}, + time::Duration, +}; + +/// If signal is above this value, we indicate that the signal has clipped. +const CLIP_STRONG_LIMIT: Flt = 0.999; + +/// Very simple clip detector. Used to detect cliping in a recording. Stores one +/// clip value if just something happened between time of new and moment of drop(). +/// +pub struct SimpleClipDetector { + clipped: Arc, + stopThread: Arc, +} + +impl SimpleClipDetector { + /// Create new clip detector + /// + /// # Args + /// + /// - `smgr` - see [StreamMgr] + pub fn new(smgr: &mut StreamMgr) -> Self { + let (tx, rx) = bounded(0); + + let clipstate = Arc::new(AtomicBool::new(false)); + let stopThread = Arc::new(AtomicBool::new(false)); + + let clipstate2 = clipstate.clone(); + let stopThread2 = stopThread.clone(); + + smgr.addInQueue(tx); + rayon::spawn(move || loop { + if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1500)) { + match msg { + InStreamMsg::InStreamData(dat) => { + let flt = dat.getFloatData(); + let maxabs = maxabs(flt.view()); + if maxabs >= CLIP_STRONG_LIMIT { + clipstate.store(true, Relaxed); + // We do not have to do anything anymore. The signal + // has clipped so we do not have to check any new + // blocks anymore. + return; + } + } + _ => {} + } + }; + if stopThread.load(Relaxed) { + return; + } + }); + + Self { + clipped: clipstate2, + stopThread: stopThread2, + } + } + + /// Check whether a clip has happened + pub fn hasClipped(&self) -> bool { + return self.clipped.load(Relaxed); + } +} + +impl Drop for SimpleClipDetector { + fn drop(&mut self) { + self.stopThread.store(true, Relaxed); + } +}