From 18b61b02f384aedf174189cc4ab16614d3f90682 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Mon, 25 Mar 2024 20:30:04 +0100 Subject: [PATCH] Improved recording code. Now properly removes file in case of empty. --- Cargo.toml | 7 +- src/bin/lasp_record.rs | 17 +++- src/daq/datatype.rs | 1 - src/daq/mod.rs | 17 ++++ src/daq/record.rs | 204 ++++++++++++++++++++++++++++++----------- 5 files changed, 184 insertions(+), 62 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 03543e1..55bcb2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,9 +9,10 @@ repository = "https://code.ascee.nl/ascee/lasprs" license = "MIT OR Apache-2.0" keywords = ["dsp", "audio", "measurement", "acoustics", "filter"] categories = ["multimedia::audio", "science", "mathematics"] + [lib] name = "lasprs" -crate-type = ["cdylib", "rlib"] +crate-type = ["cdylib", "rlib",] [dependencies] # Error handling @@ -78,9 +79,9 @@ uuid = { version = "1.6.1", features = ["v4"] , optional = true} clap = { version = "4.4.11", features = ["derive", "color", "help", "suggestions"] } [features] -default = ["f64", "cpal-api", "record"] +# default = ["f64", "cpal-api", "record"] # Use this for debugging extensions -# default = ["f64", "python-bindings", "record", "cpal-api"] +default = ["f64", "python-bindings", "record", "cpal-api"] cpal-api = ["dep:cpal"] record = ["dep:hdf5-sys", "dep:hdf5", "dep:chrono", "dep:uuid"] diff --git a/src/bin/lasp_record.rs b/src/bin/lasp_record.rs index a56cbb1..7c57a44 100644 --- a/src/bin/lasp_record.rs +++ b/src/bin/lasp_record.rs @@ -1,4 +1,4 @@ -use anyhow::{bail, Result}; +use anyhow::Result; use clap::{arg, command, Parser}; use crossbeam::channel::{unbounded, Receiver, TryRecvError}; #[cfg(feature = "record")] @@ -19,6 +19,11 @@ struct Cli { #[arg(short, long = "duration", default_value_t = 0.)] duration_s: Flt, + /// Start delay in [s]. Rounds down to whole seconds. If not specified, no + /// start delay will be used. + #[arg(short, long = "startdelay", default_value_t = 0.)] + start_delay_s: Flt, + /// TOML configuration file for used stream #[arg(short, long = "config-file")] config_file_daq: Option, @@ -40,6 +45,7 @@ fn main() -> Result<()> { let settings = RecordSettings { filename: ops.filename.into(), duration: Duration::from_secs(ops.duration_s as u64), + startDelay: Duration::from_secs(ops.start_delay_s as u64), }; match ops.config_file_daq { None => smgr.startDefaultInputStream()?, @@ -52,7 +58,7 @@ fn main() -> Result<()> { let mut r = Recording::new(settings, &mut smgr)?; - println!("Starting to record..."); + println!("Starting to record... Enter 'c' to cancel."); 'infy: loop { match r.status() { RecordStatus::Idle => println!("\nIdle"), @@ -60,6 +66,7 @@ fn main() -> Result<()> { println!("\nRecord error: {}", e); break 'infy; } + RecordStatus::Waiting => { println!("Waiting in start delay...");}, RecordStatus::Finished => { println!("\nRecording finished."); break 'infy; @@ -73,13 +80,17 @@ fn main() -> Result<()> { match stdin_channel.try_recv() { Ok(_key) => { println!("User pressed key. Manually stopping recording here."); + match _key.to_lowercase().as_str() { + "c" => r.cancel(), + _ => r.stop() + } break 'infy; } Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => panic!("Channel disconnected"), } - sleep(100); + sleep(500); } Ok(()) diff --git a/src/daq/datatype.rs b/src/daq/datatype.rs index 9fc3804..705d231 100644 --- a/src/daq/datatype.rs +++ b/src/daq/datatype.rs @@ -23,7 +23,6 @@ pub enum DataType { /// 32-bit integers #[strum(message = "I32", detailed_message = "32-bits integers")] I32 = 4, - /// 64-bit integers #[strum(message = "I64", detailed_message = "64-bits integers")] I64 = 5, diff --git a/src/daq/mod.rs b/src/daq/mod.rs index 3dd998b..77feb3d 100644 --- a/src/daq/mod.rs +++ b/src/daq/mod.rs @@ -86,6 +86,23 @@ pub struct StreamMgr { siggen: Option, } +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] +impl StreamMgr { + #[new] + /// See (StreamMgr::new()) + fn new_py<'py>() -> StreamMgr { + StreamMgr::new() + } + + // #[pyo3(name = "unit")] + // #[staticmethod] + // /// See: [Biquad::unit()] + // pub fn unit_py() -> Biquad { + // Biquad::unit() + // } + // #[pyo3(name = "firstOrderHighPass")] +} impl StreamMgr { /// Create new stream manager. A stream manager is supposed to be a singleton. /// diff --git a/src/daq/record.rs b/src/daq/record.rs index 2cad991..4747d69 100644 --- a/src/daq/record.rs +++ b/src/daq/record.rs @@ -6,9 +6,11 @@ use hdf5::types::{VarLenArray, VarLenUnicode}; use hdf5::{dataset, datatype, Dataset, File, H5Type}; use ndarray::ArrayView2; use num::traits::ops::mul_add; +use rayon::iter::Empty; use serde::de::IntoDeserializer; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::atomic::Ordering::SeqCst; use std::sync::Mutex; use std::thread::{spawn, JoinHandle}; use std::time::Duration; @@ -19,13 +21,20 @@ use strum::EnumMessage; pub enum RecordStatus { /// Nothing to update NoUpdate, + /// Not yet started, waiting for first msg Idle, + + /// Waiting for start delay to be processed. + Waiting, + /// Recording in progress Recording(Duration), + /// Recording finished Finished, - /// An error occurred. + + /// An error occurred, in any case when an error occurs, it is tried to remove the file. Error(String), } @@ -37,14 +46,33 @@ pub struct RecordSettings { /// The recording time. Set to 0 to perform indefinite recording pub duration: Duration, + + /// The delay to wait before adding data + pub startDelay: Duration, } -/// Create a recording +/// This struct lets a recording run on a stream, waits till the first data arrives and records for a given period of time. Usage: +/// +/// ``` +/// use lasprs::{RecordSettings, StreamMgr, Recording}; +/// use std::time::Duration; +/// let smgr = StreamMgr::new(); +/// smgr.startDefaultInputStream()?; +/// let settings = RecordSettings{ +/// filename: "test.h5", +/// duration: Duration::from_secs(5), +/// }; +/// let rec = Recording::new(settings)?; +/// ``` pub struct Recording { settings: RecordSettings, handle: Option>>, - tx: Sender, + + // Stop the recording. This stops the thread + stopThread: Arc, + // Obtain status from thread. status_from_thread: Arc>, + // Stores latest status from thread, if no update comes from status_from_thread last_status: RecordStatus, } @@ -151,6 +179,9 @@ impl Recording { } }; + let stopThread = Arc::new(AtomicBool::new(false)); + let stopThread_clone = stopThread.clone(); + // Fail if filename already exists if settings.filename.exists() { bail!( @@ -158,10 +189,10 @@ impl Recording { settings.filename.to_string_lossy() ); } - let settings2 = settings.clone(); + let settings_clone = settings.clone(); let status = Arc::new(AtomicCell::new(RecordStatus::Idle)); - let status2 = status.clone(); + let status_clone = status.clone(); let (tx, rx) = crossbeam::channel::unbounded(); mgr.addInQueue(tx.clone()); @@ -216,15 +247,36 @@ impl Recording { // Create the dataset let ds = Recording::create_dataset(&file, &meta)?; - // Indicate we are ready to rec! - status.store(RecordStatus::Recording(Duration::ZERO)); - - let mut ctr = 0; - let mut ctr_offset = 0; - let mut first = true; let framesPerBlock = meta.framesPerBlock as usize; + let mut wait_block_ctr = 0; + // Indicate we are ready to rec! + if settings.startDelay > Duration::ZERO { + status.store(RecordStatus::Waiting); + let startdelay_s = settings.startDelay.as_micros() as Flt / 1e6; + wait_block_ctr = + (meta.samplerate as Flt * startdelay_s / framesPerBlock as Flt) as u32; + } else { + status.store(RecordStatus::Recording(Duration::ZERO)); + } + + // Counter of stored blocks + let mut stored_ctr = 0; + + // Offset in stream + let mut ctr_offset = 0; + + // Flag indicating that the first RawStreamData package still has to + // be arrived + let mut first = true; + + // Indicating the file is still empty (does not contain recorded data) + let mut empty_file = true; + let nchannels = meta.nchannels() as usize; 'recloop: loop { + if stopThread.load(SeqCst) { + break 'recloop; + } match rx.recv().unwrap() { InStreamMsg::StreamError(e) => { bail!("Recording failed due to stream error: {}.", e) @@ -240,36 +292,61 @@ impl Recording { InStreamMsg::RawStreamData(incoming_ctr, dat) => { if first { first = false; + // Initialize counter offset ctr_offset = incoming_ctr; } else { - if incoming_ctr != ctr + ctr_offset { + if incoming_ctr != stored_ctr + ctr_offset { println!("********** PACKAGES MISSED ***********"); - bail!("Packages missed. Recording invalid.") + bail!("Packages missed. Recording is invalid.") } } - ds.resize((ctr + 1, framesPerBlock, nchannels))?; + if wait_block_ctr > 0 { + // We are still waiting + wait_block_ctr -= 1; + if wait_block_ctr == 0 { + status.store(RecordStatus::Recording(Duration::ZERO)); + } + // TODO: Is it a good idea to increase the counter + // here, as well as below? + stored_ctr += 1; + continue 'recloop; + } + + ds.resize((stored_ctr + 1, framesPerBlock, nchannels))?; Recording::append_to_dset( &ds, - ctr, + stored_ctr, dat.as_ref(), framesPerBlock, nchannels, )?; + // Once we have added to the file, this flag is swapped + // and a file should be deleted in case of an error. + empty_file = false; + // Recorded time rounded of to milliseconds. let recorded_time = Duration::from_millis( - ((1000 * (ctr + 1) * framesPerBlock) as Flt / meta.samplerate) as u64, + ((1000 * (stored_ctr + 1) * framesPerBlock) as Flt / meta.samplerate) + as u64, ); + if !settings.duration.is_zero() { + // Duration not equal to zero, meaning we record up to a + // certain duration. if recorded_time >= settings.duration { break 'recloop; } } // println!("\n... {} {} {}", recorded_time.as_millis(), meta.samplerate, framesPerBlock); - ctr += 1; + stored_ctr += 1; status.store(RecordStatus::Recording(recorded_time)); } } - } // end of loop + } // end of 'recloop + + if empty_file { + bail!("Recording stopped before any data is stored."); + } status.store(RecordStatus::Finished); Ok(()) @@ -277,71 +354,88 @@ impl Recording { }); Ok(Recording { - settings: settings2, + settings: settings_clone, + stopThread: stopThread_clone, handle: Some(handle), - status_from_thread: status2, last_status: RecordStatus::NoUpdate, - tx, + status_from_thread: status_clone, }) } - fn cleanupThreadIfPossible(&mut self) { - // println!("CleanupIfPossible()"); - if let Some(h) = &self.handle { - if h.is_finished() { - // println!("Thread finished"); - let h = self.handle.take().unwrap(); - let res = h.join().unwrap(); - if let Err(e) = res { - self.last_status = RecordStatus::Error(format!("{}", e)); - // File should not be un use anymore, as thread is joined. - // In case of error, we try to delete the file - if let Err(e) = std::fs::remove_file(&self.settings.filename) { - eprintln!("Recording failed, but file removal failed as well: {}", e); - } - } - } + // Delete recording file, should be done when something went wrong (an error + // occured), or when cancel() is called, or when recording object is dropped + // while thread is still running. + fn deleteFile(&self) { + if let Some(_) = self.handle { + panic!("Misuse bug: cannot delete file while thread is still running"); + } + // File should not be un use anymore, as thread is joined. + // In case of error, we try to delete the file + if let Err(e) = std::fs::remove_file(&self.settings.filename) { + eprintln!("Recording failed, but file removal failed as well: {}", e); } } + // Join the thread, store the last status. Please make sure it is joinable, + // otherwise this method will hang forever. + fn cleanupThread(&mut self) { + if let Some(h) = self.handle.take() { + let res = h.join().unwrap(); + if let Err(e) = res { + self.last_status = RecordStatus::Error(format!("{}", e)); + } + } + } /// Get current record status pub fn status(&mut self) -> RecordStatus { + // Update status due to normal messaging let status_from_thread = self.status_from_thread.swap(RecordStatus::NoUpdate); match status_from_thread { RecordStatus::NoUpdate => {} _ => { - // println!("Updating status to: {:?}", status_from_thread); self.last_status = status_from_thread; } } - // If the thread has exited with an error, the status is overwritten - // in this method. - self.cleanupThreadIfPossible(); + + if let Some(h) = &self.handle { + // Update the status by taking any error messages + if h.is_finished() { + self.cleanupThread(); + } + } // Return latest status self.last_status.clone() } /// Stop existing recording early. At the current time, or st - pub fn stop(&mut self) -> Result<()> { - if self.handle.is_none() { - bail!("Recording is already stopped.") + pub fn stop(&mut self) { + // Stop thread , join, update status + self.stopThread.store(true, SeqCst); + self.cleanupThread(); + match self.status() { + RecordStatus::Finished => { // Do nothing + } + _ => { + // an error occured, we try to delete the backing file + self.deleteFile() + } } + } - // Stope stream, if running - self.tx.send(InStreamMsg::StreamStopped)?; - - let h = self.handle.take().unwrap(); - let res = h.join().unwrap(); - if let Err(e) = res { - self.last_status = RecordStatus::Error(format!("{}", e)); - } - - Ok(()) + /// Cancel recording. Deletes the recording file + pub fn cancel(&mut self) { + self.stopThread.store(true, SeqCst); + self.cleanupThread(); + self.deleteFile(); } } impl Drop for Recording { fn drop(&mut self) { - let _ = self.stop(); + if let Some(_) = &self.handle { + // If we enter here, stop() or cancel() has not been called. In that + // case, we cleanup here by cancelling the recording + self.cancel(); + } } }