Improved recording code. Now properly removes file in case of empty.

This commit is contained in:
Anne de Jong 2024-03-25 20:30:04 +01:00
parent 21703321bd
commit 18b61b02f3
5 changed files with 184 additions and 62 deletions

View File

@ -9,9 +9,10 @@ repository = "https://code.ascee.nl/ascee/lasprs"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
keywords = ["dsp", "audio", "measurement", "acoustics", "filter"] keywords = ["dsp", "audio", "measurement", "acoustics", "filter"]
categories = ["multimedia::audio", "science", "mathematics"] categories = ["multimedia::audio", "science", "mathematics"]
[lib] [lib]
name = "lasprs" name = "lasprs"
crate-type = ["cdylib", "rlib"] crate-type = ["cdylib", "rlib",]
[dependencies] [dependencies]
# Error handling # Error handling
@ -78,9 +79,9 @@ uuid = { version = "1.6.1", features = ["v4"] , optional = true}
clap = { version = "4.4.11", features = ["derive", "color", "help", "suggestions"] } clap = { version = "4.4.11", features = ["derive", "color", "help", "suggestions"] }
[features] [features]
default = ["f64", "cpal-api", "record"] # default = ["f64", "cpal-api", "record"]
# Use this for debugging extensions # Use this for debugging extensions
# default = ["f64", "python-bindings", "record", "cpal-api"] default = ["f64", "python-bindings", "record", "cpal-api"]
cpal-api = ["dep:cpal"] cpal-api = ["dep:cpal"]
record = ["dep:hdf5-sys", "dep:hdf5", "dep:chrono", "dep:uuid"] record = ["dep:hdf5-sys", "dep:hdf5", "dep:chrono", "dep:uuid"]

View File

@ -1,4 +1,4 @@
use anyhow::{bail, Result}; use anyhow::Result;
use clap::{arg, command, Parser}; use clap::{arg, command, Parser};
use crossbeam::channel::{unbounded, Receiver, TryRecvError}; use crossbeam::channel::{unbounded, Receiver, TryRecvError};
#[cfg(feature = "record")] #[cfg(feature = "record")]
@ -19,6 +19,11 @@ struct Cli {
#[arg(short, long = "duration", default_value_t = 0.)] #[arg(short, long = "duration", default_value_t = 0.)]
duration_s: Flt, duration_s: Flt,
/// Start delay in [s]. Rounds down to whole seconds. If not specified, no
/// start delay will be used.
#[arg(short, long = "startdelay", default_value_t = 0.)]
start_delay_s: Flt,
/// TOML configuration file for used stream /// TOML configuration file for used stream
#[arg(short, long = "config-file")] #[arg(short, long = "config-file")]
config_file_daq: Option<String>, config_file_daq: Option<String>,
@ -40,6 +45,7 @@ fn main() -> Result<()> {
let settings = RecordSettings { let settings = RecordSettings {
filename: ops.filename.into(), filename: ops.filename.into(),
duration: Duration::from_secs(ops.duration_s as u64), duration: Duration::from_secs(ops.duration_s as u64),
startDelay: Duration::from_secs(ops.start_delay_s as u64),
}; };
match ops.config_file_daq { match ops.config_file_daq {
None => smgr.startDefaultInputStream()?, None => smgr.startDefaultInputStream()?,
@ -52,7 +58,7 @@ fn main() -> Result<()> {
let mut r = Recording::new(settings, &mut smgr)?; let mut r = Recording::new(settings, &mut smgr)?;
println!("Starting to record..."); println!("Starting to record... Enter 'c' to cancel.");
'infy: loop { 'infy: loop {
match r.status() { match r.status() {
RecordStatus::Idle => println!("\nIdle"), RecordStatus::Idle => println!("\nIdle"),
@ -60,6 +66,7 @@ fn main() -> Result<()> {
println!("\nRecord error: {}", e); println!("\nRecord error: {}", e);
break 'infy; break 'infy;
} }
RecordStatus::Waiting => { println!("Waiting in start delay...");},
RecordStatus::Finished => { RecordStatus::Finished => {
println!("\nRecording finished."); println!("\nRecording finished.");
break 'infy; break 'infy;
@ -73,13 +80,17 @@ fn main() -> Result<()> {
match stdin_channel.try_recv() { match stdin_channel.try_recv() {
Ok(_key) => { Ok(_key) => {
println!("User pressed key. Manually stopping recording here."); println!("User pressed key. Manually stopping recording here.");
match _key.to_lowercase().as_str() {
"c" => r.cancel(),
_ => r.stop()
}
break 'infy; break 'infy;
} }
Err(TryRecvError::Empty) => {} Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => panic!("Channel disconnected"), Err(TryRecvError::Disconnected) => panic!("Channel disconnected"),
} }
sleep(100); sleep(500);
} }
Ok(()) Ok(())

View File

@ -23,7 +23,6 @@ pub enum DataType {
/// 32-bit integers /// 32-bit integers
#[strum(message = "I32", detailed_message = "32-bits integers")] #[strum(message = "I32", detailed_message = "32-bits integers")]
I32 = 4, I32 = 4,
/// 64-bit integers /// 64-bit integers
#[strum(message = "I64", detailed_message = "64-bits integers")] #[strum(message = "I64", detailed_message = "64-bits integers")]
I64 = 5, I64 = 5,

View File

@ -86,6 +86,23 @@ pub struct StreamMgr {
siggen: Option<crate::siggen::Siggen>, siggen: Option<crate::siggen::Siggen>,
} }
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamMgr {
#[new]
/// See (StreamMgr::new())
fn new_py<'py>() -> StreamMgr {
StreamMgr::new()
}
// #[pyo3(name = "unit")]
// #[staticmethod]
// /// See: [Biquad::unit()]
// pub fn unit_py() -> Biquad {
// Biquad::unit()
// }
// #[pyo3(name = "firstOrderHighPass")]
}
impl StreamMgr { impl StreamMgr {
/// Create new stream manager. A stream manager is supposed to be a singleton. /// Create new stream manager. A stream manager is supposed to be a singleton.
/// ///

View File

@ -6,9 +6,11 @@ use hdf5::types::{VarLenArray, VarLenUnicode};
use hdf5::{dataset, datatype, Dataset, File, H5Type}; use hdf5::{dataset, datatype, Dataset, File, H5Type};
use ndarray::ArrayView2; use ndarray::ArrayView2;
use num::traits::ops::mul_add; use num::traits::ops::mul_add;
use rayon::iter::Empty;
use serde::de::IntoDeserializer; use serde::de::IntoDeserializer;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Mutex; use std::sync::Mutex;
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -19,13 +21,20 @@ use strum::EnumMessage;
pub enum RecordStatus { pub enum RecordStatus {
/// Nothing to update /// Nothing to update
NoUpdate, NoUpdate,
/// Not yet started, waiting for first msg /// Not yet started, waiting for first msg
Idle, Idle,
/// Waiting for start delay to be processed.
Waiting,
/// Recording in progress /// Recording in progress
Recording(Duration), Recording(Duration),
/// Recording finished /// Recording finished
Finished, Finished,
/// An error occurred.
/// An error occurred, in any case when an error occurs, it is tried to remove the file.
Error(String), Error(String),
} }
@ -37,14 +46,33 @@ pub struct RecordSettings {
/// The recording time. Set to 0 to perform indefinite recording /// The recording time. Set to 0 to perform indefinite recording
pub duration: Duration, pub duration: Duration,
/// The delay to wait before adding data
pub startDelay: Duration,
} }
/// Create a recording /// This struct lets a recording run on a stream, waits till the first data arrives and records for a given period of time. Usage:
///
/// ```
/// use lasprs::{RecordSettings, StreamMgr, Recording};
/// use std::time::Duration;
/// let smgr = StreamMgr::new();
/// smgr.startDefaultInputStream()?;
/// let settings = RecordSettings{
/// filename: "test.h5",
/// duration: Duration::from_secs(5),
/// };
/// let rec = Recording::new(settings)?;
/// ```
pub struct Recording { pub struct Recording {
settings: RecordSettings, settings: RecordSettings,
handle: Option<JoinHandle<Result<()>>>, handle: Option<JoinHandle<Result<()>>>,
tx: Sender<InStreamMsg>,
// Stop the recording. This stops the thread
stopThread: Arc<AtomicBool>,
// Obtain status from thread.
status_from_thread: Arc<AtomicCell<RecordStatus>>, status_from_thread: Arc<AtomicCell<RecordStatus>>,
// Stores latest status from thread, if no update comes from status_from_thread
last_status: RecordStatus, last_status: RecordStatus,
} }
@ -151,6 +179,9 @@ impl Recording {
} }
}; };
let stopThread = Arc::new(AtomicBool::new(false));
let stopThread_clone = stopThread.clone();
// Fail if filename already exists // Fail if filename already exists
if settings.filename.exists() { if settings.filename.exists() {
bail!( bail!(
@ -158,10 +189,10 @@ impl Recording {
settings.filename.to_string_lossy() settings.filename.to_string_lossy()
); );
} }
let settings2 = settings.clone(); let settings_clone = settings.clone();
let status = Arc::new(AtomicCell::new(RecordStatus::Idle)); let status = Arc::new(AtomicCell::new(RecordStatus::Idle));
let status2 = status.clone(); let status_clone = status.clone();
let (tx, rx) = crossbeam::channel::unbounded(); let (tx, rx) = crossbeam::channel::unbounded();
mgr.addInQueue(tx.clone()); mgr.addInQueue(tx.clone());
@ -216,15 +247,36 @@ impl Recording {
// Create the dataset // Create the dataset
let ds = Recording::create_dataset(&file, &meta)?; let ds = Recording::create_dataset(&file, &meta)?;
// Indicate we are ready to rec!
status.store(RecordStatus::Recording(Duration::ZERO));
let mut ctr = 0;
let mut ctr_offset = 0;
let mut first = true;
let framesPerBlock = meta.framesPerBlock as usize; let framesPerBlock = meta.framesPerBlock as usize;
let mut wait_block_ctr = 0;
// Indicate we are ready to rec!
if settings.startDelay > Duration::ZERO {
status.store(RecordStatus::Waiting);
let startdelay_s = settings.startDelay.as_micros() as Flt / 1e6;
wait_block_ctr =
(meta.samplerate as Flt * startdelay_s / framesPerBlock as Flt) as u32;
} else {
status.store(RecordStatus::Recording(Duration::ZERO));
}
// Counter of stored blocks
let mut stored_ctr = 0;
// Offset in stream
let mut ctr_offset = 0;
// Flag indicating that the first RawStreamData package still has to
// be arrived
let mut first = true;
// Indicating the file is still empty (does not contain recorded data)
let mut empty_file = true;
let nchannels = meta.nchannels() as usize; let nchannels = meta.nchannels() as usize;
'recloop: loop { 'recloop: loop {
if stopThread.load(SeqCst) {
break 'recloop;
}
match rx.recv().unwrap() { match rx.recv().unwrap() {
InStreamMsg::StreamError(e) => { InStreamMsg::StreamError(e) => {
bail!("Recording failed due to stream error: {}.", e) bail!("Recording failed due to stream error: {}.", e)
@ -240,36 +292,61 @@ impl Recording {
InStreamMsg::RawStreamData(incoming_ctr, dat) => { InStreamMsg::RawStreamData(incoming_ctr, dat) => {
if first { if first {
first = false; first = false;
// Initialize counter offset
ctr_offset = incoming_ctr; ctr_offset = incoming_ctr;
} else { } else {
if incoming_ctr != ctr + ctr_offset { if incoming_ctr != stored_ctr + ctr_offset {
println!("********** PACKAGES MISSED ***********"); println!("********** PACKAGES MISSED ***********");
bail!("Packages missed. Recording invalid.") bail!("Packages missed. Recording is invalid.")
} }
} }
ds.resize((ctr + 1, framesPerBlock, nchannels))?; if wait_block_ctr > 0 {
// We are still waiting
wait_block_ctr -= 1;
if wait_block_ctr == 0 {
status.store(RecordStatus::Recording(Duration::ZERO));
}
// TODO: Is it a good idea to increase the counter
// here, as well as below?
stored_ctr += 1;
continue 'recloop;
}
ds.resize((stored_ctr + 1, framesPerBlock, nchannels))?;
Recording::append_to_dset( Recording::append_to_dset(
&ds, &ds,
ctr, stored_ctr,
dat.as_ref(), dat.as_ref(),
framesPerBlock, framesPerBlock,
nchannels, nchannels,
)?; )?;
// Once we have added to the file, this flag is swapped
// and a file should be deleted in case of an error.
empty_file = false;
// Recorded time rounded of to milliseconds.
let recorded_time = Duration::from_millis( let recorded_time = Duration::from_millis(
((1000 * (ctr + 1) * framesPerBlock) as Flt / meta.samplerate) as u64, ((1000 * (stored_ctr + 1) * framesPerBlock) as Flt / meta.samplerate)
as u64,
); );
if !settings.duration.is_zero() { if !settings.duration.is_zero() {
// Duration not equal to zero, meaning we record up to a
// certain duration.
if recorded_time >= settings.duration { if recorded_time >= settings.duration {
break 'recloop; break 'recloop;
} }
} }
// println!("\n... {} {} {}", recorded_time.as_millis(), meta.samplerate, framesPerBlock); // println!("\n... {} {} {}", recorded_time.as_millis(), meta.samplerate, framesPerBlock);
ctr += 1; stored_ctr += 1;
status.store(RecordStatus::Recording(recorded_time)); status.store(RecordStatus::Recording(recorded_time));
} }
} }
} // end of loop } // end of 'recloop
if empty_file {
bail!("Recording stopped before any data is stored.");
}
status.store(RecordStatus::Finished); status.store(RecordStatus::Finished);
Ok(()) Ok(())
@ -277,71 +354,88 @@ impl Recording {
}); });
Ok(Recording { Ok(Recording {
settings: settings2, settings: settings_clone,
stopThread: stopThread_clone,
handle: Some(handle), handle: Some(handle),
status_from_thread: status2,
last_status: RecordStatus::NoUpdate, last_status: RecordStatus::NoUpdate,
tx, status_from_thread: status_clone,
}) })
} }
fn cleanupThreadIfPossible(&mut self) { // Delete recording file, should be done when something went wrong (an error
// println!("CleanupIfPossible()"); // occured), or when cancel() is called, or when recording object is dropped
if let Some(h) = &self.handle { // while thread is still running.
if h.is_finished() { fn deleteFile(&self) {
// println!("Thread finished"); if let Some(_) = self.handle {
let h = self.handle.take().unwrap(); panic!("Misuse bug: cannot delete file while thread is still running");
let res = h.join().unwrap(); }
if let Err(e) = res {
self.last_status = RecordStatus::Error(format!("{}", e));
// File should not be un use anymore, as thread is joined. // File should not be un use anymore, as thread is joined.
// In case of error, we try to delete the file // In case of error, we try to delete the file
if let Err(e) = std::fs::remove_file(&self.settings.filename) { if let Err(e) = std::fs::remove_file(&self.settings.filename) {
eprintln!("Recording failed, but file removal failed as well: {}", e); eprintln!("Recording failed, but file removal failed as well: {}", e);
} }
} }
}
}
}
// Join the thread, store the last status. Please make sure it is joinable,
// otherwise this method will hang forever.
fn cleanupThread(&mut self) {
if let Some(h) = self.handle.take() {
let res = h.join().unwrap();
if let Err(e) = res {
self.last_status = RecordStatus::Error(format!("{}", e));
}
}
}
/// Get current record status /// Get current record status
pub fn status(&mut self) -> RecordStatus { pub fn status(&mut self) -> RecordStatus {
// Update status due to normal messaging
let status_from_thread = self.status_from_thread.swap(RecordStatus::NoUpdate); let status_from_thread = self.status_from_thread.swap(RecordStatus::NoUpdate);
match status_from_thread { match status_from_thread {
RecordStatus::NoUpdate => {} RecordStatus::NoUpdate => {}
_ => { _ => {
// println!("Updating status to: {:?}", status_from_thread);
self.last_status = status_from_thread; self.last_status = status_from_thread;
} }
} }
// If the thread has exited with an error, the status is overwritten
// in this method. if let Some(h) = &self.handle {
self.cleanupThreadIfPossible(); // Update the status by taking any error messages
if h.is_finished() {
self.cleanupThread();
}
}
// Return latest status // Return latest status
self.last_status.clone() self.last_status.clone()
} }
/// Stop existing recording early. At the current time, or st /// Stop existing recording early. At the current time, or st
pub fn stop(&mut self) -> Result<()> { pub fn stop(&mut self) {
if self.handle.is_none() { // Stop thread , join, update status
bail!("Recording is already stopped.") self.stopThread.store(true, SeqCst);
self.cleanupThread();
match self.status() {
RecordStatus::Finished => { // Do nothing
}
_ => {
// an error occured, we try to delete the backing file
self.deleteFile()
}
}
} }
// Stope stream, if running /// Cancel recording. Deletes the recording file
self.tx.send(InStreamMsg::StreamStopped)?; pub fn cancel(&mut self) {
self.stopThread.store(true, SeqCst);
let h = self.handle.take().unwrap(); self.cleanupThread();
let res = h.join().unwrap(); self.deleteFile();
if let Err(e) = res {
self.last_status = RecordStatus::Error(format!("{}", e));
}
Ok(())
} }
} }
impl Drop for Recording { impl Drop for Recording {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.stop(); if let Some(_) = &self.handle {
// If we enter here, stop() or cancel() has not been called. In that
// case, we cleanup here by cancelling the recording
self.cancel();
}
} }
} }