Compare commits

...

3 Commits

9 changed files with 470 additions and 242 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lasprs" name = "lasprs"
version = "0.6.5" version = "0.6.6"
edition = "2021" edition = "2021"
authors = ["J.A. de Jong <j.a.dejong@ascee.nl>"] authors = ["J.A. de Jong <j.a.dejong@ascee.nl>"]
description = "Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)" description = "Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)"

View File

@ -60,25 +60,27 @@ fn main() -> Result<()> {
let mut r = Recording::new(settings, &mut smgr)?; let mut r = Recording::new(settings, &mut smgr)?;
println!("Starting to record... Enter 'c' to cancel."); // println!("Starting to record... Enter 'c' to cancel.");
'infy: loop { 'infy: loop {
match r.status() { match r.status() {
RecordStatus::Idle => println!("\nIdle"), RecordStatus::Idle {} => println!("\nIdle"),
RecordStatus::Error(e) => { RecordStatus::Waiting {} => {
println!("\nRecord error: {}", e);
break 'infy;
}
RecordStatus::Waiting => {
println!("Waiting in start delay..."); println!("Waiting in start delay...");
} }
RecordStatus::Finished => { RecordStatus::Finished { clipped, error } => {
println!("\nRecording finished."); println!("\nRecording finished.");
if clipped {
println!("Recording clipped!");
}
if let Some(msg) = error {
println!("Recording failed with an error: {msg}");
}
break 'infy; break 'infy;
} }
RecordStatus::Recording(duration) => { RecordStatus::Recording { recorded, .. } => {
println!("Recording... {} ms", duration.as_millis()); println!("Recording... {:.0} ms", recorded * 1000.);
} }
RecordStatus::NoUpdate => {} RecordStatus::NoUpdate {} => {}
}; };
match stdin_channel.try_recv() { match stdin_channel.try_recv() {

View File

@ -75,6 +75,9 @@ pub fn add_py_classses(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<StreamError>()?; m.add_class::<StreamError>()?;
m.add_class::<DaqChannel>()?; m.add_class::<DaqChannel>()?;
m.add_class::<DaqConfig>()?; m.add_class::<DaqConfig>()?;
m.add_class::<RecordSettings>()?;
m.add_class::<RecordStatus>()?;
m.add_class::<Recording>()?;
Ok(()) Ok(())
} }

View File

@ -1,6 +1,7 @@
use super::*; use super::*;
use crate::config::Flt; use crate::config::Flt;
use anyhow::{bail, Error, Result}; use crate::rt::SimpleClipDetector;
use anyhow::{anyhow, bail, Error, Result};
use clap::builder::OsStr; use clap::builder::OsStr;
use crossbeam::atomic::AtomicCell; use crossbeam::atomic::AtomicCell;
use hdf5::types::{VarLenArray, VarLenUnicode}; use hdf5::types::{VarLenArray, VarLenUnicode};
@ -8,7 +9,7 @@ use hdf5::{dataset, datatype, Dataset, File, H5Type};
use parking_lot::Mutex; use parking_lot::Mutex;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -17,30 +18,44 @@ use streammgr::*;
use streammsg::InStreamMsg; use streammsg::InStreamMsg;
use strum::EnumMessage; use strum::EnumMessage;
#[derive(Clone, Debug)]
/// Status of a recording /// Status of a recording
#[cfg_attr(feature = "python-bindings", pyclass(get_all, eq))]
#[derive(Clone, Debug, PartialEq)]
pub enum RecordStatus { pub enum RecordStatus {
/// Nothing to update /// Nothing to update
NoUpdate, NoUpdate {},
/// Not yet started, waiting for first msg /// Not yet started, waiting for first msg
Idle, Idle {},
/// Waiting for start delay to be processed. /// Waiting for start delay to be processed.
Waiting, Waiting {},
/// Recording in progress /// Recording in progress
Recording(Duration), Recording {
/// Whether a clip has already happened during recording
clipped: bool,
/// The amount of time that is currently recorded
recorded: Flt,
/// The percentage done [0 to 100%]. Stays at 0% for infinite duration
/// recordings
pct_done: Flt,
},
/// Recording finished /// Recording finished
Finished, Finished {
/// Whether there occured a signal clip during the recording. Can only
/// An error occurred, in any case when an error occurs, it is tried to remove the file. /// be detected when the flag 'record_clip`
Error(String), clipped: bool,
/// Possibly, the recording finished, but an error happened, due to
/// which the data is invalid.
error: Option<String>,
},
} }
/// Settings used to start a recording. /// Settings used to start a recording.
#[derive(Clone)] #[derive(Clone)]
#[cfg_attr(feature = "python-bindings", pyclass(get_all))]
pub struct RecordSettings { pub struct RecordSettings {
/// File name to record to. /// File name to record to.
pub filename: PathBuf, pub filename: PathBuf,
@ -74,6 +89,29 @@ impl RecordSettings {
} }
} }
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl RecordSettings {
#[new]
#[pyo3(signature=(filename, duration=None, startDelay=None))]
fn new_py(filename: String, duration: Option<Flt>, startDelay: Option<Flt>) -> PyResult<Self> {
let startDelay = if let Some(delay) = startDelay {
let delay = Duration::try_from_secs_f64(delay)
.map_err(|e| anyhow! {"Invalid start delay specified: {e}"})?;
Some(delay)
} else {
None
};
let duration = if let Some(duration) = duration {
Duration::try_from_secs_f64(duration)
.map_err(|e| anyhow! {"Invalid duration specified: {e}"})?
} else {
Duration::ZERO
};
Ok(RecordSettings::new(filename, duration, startDelay))
}
}
/// This struct lets a recording run on a stream, waits till the first data arrives and records for a given period of time. Usage: /// This struct lets a recording run on a stream, waits till the first data arrives and records for a given period of time. Usage:
/// ///
/// ``` /// ```
@ -94,14 +132,17 @@ impl RecordSettings {
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct Recording { pub struct Recording {
// Recording settings
settings: RecordSettings, settings: RecordSettings,
handle: Option<JoinHandle<Result<()>>>,
// Stop the recording. This stops the thread // Stop the recording. This stops the thread
stopThread: Arc<AtomicBool>, stopThread: Arc<AtomicBool>,
// Obtain status from thread. // Obtain status from thread.
status_from_thread: Arc<AtomicCell<RecordStatus>>, status_from_thread: Arc<AtomicCell<RecordStatus>>,
// Stores latest status from thread, if no update comes from status_from_thread // Stores latest status from thread, if no update comes from status_from_thread
last_status: RecordStatus, last_status: RecordStatus,
} }
@ -166,7 +207,8 @@ impl Recording {
// or interleaved. This happens to be the default for ndarray as // or interleaved. This happens to be the default for ndarray as
// well. // well.
RawStreamData::Datai8(dat) => { RawStreamData::Datai8(dat) => {
let arr = ndarray15p6::ArrayView2::<i8>::from_shape((framesPerBlock, nchannels), dat)?; let arr =
ndarray15p6::ArrayView2::<i8>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?; ds.write_slice(arr, (ctr, .., ..))?;
} }
RawStreamData::Datai16(dat) => { RawStreamData::Datai16(dat) => {
@ -210,6 +252,9 @@ impl Recording {
} }
}; };
// Detector for signal clipping
let clipdetector = SimpleClipDetector::new(mgr);
let stopThread = Arc::new(AtomicBool::new(false)); let stopThread = Arc::new(AtomicBool::new(false));
let stopThread_clone = stopThread.clone(); let stopThread_clone = stopThread.clone();
@ -222,171 +267,200 @@ impl Recording {
} }
let settings_clone = settings.clone(); let settings_clone = settings.clone();
let status = Arc::new(AtomicCell::new(RecordStatus::Idle)); let status = Arc::new(AtomicCell::new(RecordStatus::Idle {}));
let status_clone = status.clone(); let status_clone = status.clone();
let (tx, rx) = crossbeam::channel::unbounded(); let (tx, rx) = crossbeam::channel::unbounded();
mgr.addInQueue(tx.clone()); mgr.addInQueue(tx.clone());
// The thread doing the actual work // The thread doing the actual work
let handle = spawn(move || { rayon::spawn(move || {
let file = File::create(settings.filename)?; let recording_closure = || {
let file = File::create(settings.filename)?;
let firstmsg = match rx.recv() { let firstmsg = match rx.recv() {
Ok(msg) => msg, Ok(msg) => msg,
Err(_) => bail!("Queue handle error"), Err(_) => bail!("Queue handle error"),
}; };
let meta = match firstmsg { let meta = match firstmsg {
InStreamMsg::StreamStarted(meta) => meta, InStreamMsg::StreamStarted(meta) => meta,
_ => bail!("Recording failed. Missed stream metadata message."), _ => bail!("Recording failed. Missed stream metadata message."),
}; };
// Samplerate, block size, number of channels // Samplerate, block size, number of channels
Recording::write_hdf5_attr_scalar(&file, "samplerate", meta.samplerate)?; Recording::write_hdf5_attr_scalar(&file, "samplerate", meta.samplerate)?;
Recording::write_hdf5_attr_scalar(&file, "nchannels", meta.nchannels())?; Recording::write_hdf5_attr_scalar(&file, "nchannels", meta.nchannels())?;
Recording::write_hdf5_attr_scalar(&file, "blocksize", meta.framesPerBlock)?; Recording::write_hdf5_attr_scalar(&file, "blocksize", meta.framesPerBlock)?;
// Store sensitivity // Store sensitivity
let sens: Vec<Flt> = meta.channelInfo.iter().map(|ch| ch.sensitivity).collect(); let sens: Vec<Flt> = meta.channelInfo.iter().map(|ch| ch.sensitivity).collect();
Recording::write_hdf5_attr_list(&file, "sensitivity", &sens)?; Recording::write_hdf5_attr_list(&file, "sensitivity", &sens)?;
// Timestamp // Timestamp
use chrono::DateTime; use chrono::DateTime;
let now_utc = chrono::Utc::now(); let now_utc = chrono::Utc::now();
let timestamp = now_utc.timestamp(); let timestamp = now_utc.timestamp();
Recording::write_hdf5_attr_scalar(&file, "time", timestamp)?; Recording::write_hdf5_attr_scalar(&file, "time", timestamp)?;
// Create UUID for measurement // Create UUID for measurement
use hdf5::types::VarLenUnicode; use hdf5::types::VarLenUnicode;
let uuid = uuid::Uuid::new_v4(); let uuid = uuid::Uuid::new_v4();
let uuid_unicode: VarLenUnicode = VarLenUnicode::from_str(&uuid.to_string()).unwrap(); let uuid_unicode: VarLenUnicode =
Recording::write_hdf5_attr_scalar(&file, "UUID", uuid_unicode)?; VarLenUnicode::from_str(&uuid.to_string()).unwrap();
Recording::write_hdf5_attr_scalar(&file, "UUID", uuid_unicode)?;
// Channel names // Channel names
let chnames: Vec<VarLenUnicode> = meta let chnames: Vec<VarLenUnicode> = meta
.channelInfo .channelInfo
.iter() .iter()
.map(|ch| VarLenUnicode::from_str(&ch.name).unwrap()) .map(|ch| VarLenUnicode::from_str(&ch.name).unwrap())
.collect(); .collect();
let chname_attr = file let chname_attr = file
.new_attr::<VarLenUnicode>() .new_attr::<VarLenUnicode>()
.shape([chnames.len()]) .shape([chnames.len()])
.create("channelNames")?; .create("channelNames")?;
chname_attr.write(&chnames)?; chname_attr.write(&chnames)?;
// Create the dataset // Create the dataset
let ds = Recording::create_dataset(&file, &meta)?; let ds = Recording::create_dataset(&file, &meta)?;
let framesPerBlock = meta.framesPerBlock as usize; let framesPerBlock = meta.framesPerBlock as usize;
let mut wait_block_ctr = 0; let mut wait_block_ctr = 0;
// Indicate we are ready to rec! // Indicate we are ready to rec!
if settings.startDelay > Duration::ZERO { if settings.startDelay > Duration::ZERO {
status.store(RecordStatus::Waiting); status.store(RecordStatus::Waiting {});
let startdelay_s = settings.startDelay.as_micros() as Flt / 1e6; let startdelay_s = settings.startDelay.as_micros() as Flt / 1e6;
wait_block_ctr = wait_block_ctr =
(meta.samplerate as Flt * startdelay_s / framesPerBlock as Flt) as u32; (meta.samplerate as Flt * startdelay_s / framesPerBlock as Flt) as u32;
} else { } else {
status.store(RecordStatus::Recording(Duration::ZERO)); status.store(RecordStatus::Recording {
} clipped: clipdetector.hasClipped(),
recorded: 0.,
// Counter of stored blocks pct_done: 0.,
let mut stored_ctr = 0; });
// Offset in stream
let mut ctr_offset = 0;
// Flag indicating that the first RawStreamData package still has to
// be arrived
let mut first = true;
// Indicating the file is still empty (does not contain recorded data)
let mut empty_file = true;
let nchannels = meta.nchannels() as usize;
'recloop: loop {
if stopThread.load(SeqCst) {
break 'recloop;
} }
match rx.recv().unwrap() {
InStreamMsg::StreamError(e) => { // Counter of stored blocks
bail!("Recording failed due to stream error: {}.", e) let mut recorded_ctr = 0;
}
InStreamMsg::StreamStarted(_) => { // Offset in stream
bail!("Stream started again?") let mut incoming_ctr = 0;
}
InStreamMsg::StreamStopped => { // Flag indicating that the first RawStreamData package still has
// Early stop. User stopped it. // arrived
let mut firstblockhasarrived = false;
// Indicating the file is still empty (does not contain recorded data)
let mut empty_file = true;
let nchannels = meta.nchannels() as usize;
'recloop: loop {
if stopThread.load(Relaxed) {
break 'recloop; break 'recloop;
} }
InStreamMsg::InStreamData(instreamdata) => { match rx.recv().unwrap() {
if first { InStreamMsg::StreamError(e) => {
first = false; bail!("Recording failed due to stream error: {}.", e)
// Initialize counter offset
ctr_offset = instreamdata.ctr;
} else if instreamdata.ctr != stored_ctr + ctr_offset {
println!("********** PACKAGES MISSED ***********");
bail!("Packages missed. Recording is invalid.")
} }
InStreamMsg::StreamStarted(_) => {
if wait_block_ctr > 0 { bail!("Stream started again?")
// We are still waiting }
wait_block_ctr -= 1; InStreamMsg::StreamStopped => {
if wait_block_ctr == 0 { // Early stop. User stopped it.
status.store(RecordStatus::Recording(Duration::ZERO)); break 'recloop;
}
InStreamMsg::InStreamData(instreamdata) => {
if ! firstblockhasarrived {
// Toggle flag.
firstblockhasarrived = true;
// Initialize counter offset
incoming_ctr = instreamdata.ctr;
} else {
incoming_ctr += 1;
} }
// TODO: Is it a good idea to increase the counter if instreamdata.ctr != incoming_ctr {
// here, as well as below? eprintln!("********** PACKAGES MISSED ***********");
stored_ctr += 1; bail!("Stream data blocks missed. Recording is invalid.")
continue 'recloop;
}
ds.resize((stored_ctr + 1, framesPerBlock, nchannels))?;
Recording::append_to_dset(
&ds,
stored_ctr,
&instreamdata,
framesPerBlock,
nchannels,
)?;
// Once we have added to the file, this flag is swapped
// and a file should be deleted in case of an error.
empty_file = false;
// Recorded time rounded of to milliseconds.
let recorded_time = Duration::from_millis(
((1000 * (stored_ctr + 1) * framesPerBlock) as Flt / meta.samplerate)
as u64,
);
if !settings.duration.is_zero() {
// Duration not equal to zero, meaning we record up to a
// certain duration.
if recorded_time >= settings.duration {
break 'recloop;
} }
if wait_block_ctr > 0 {
// We are still waiting
wait_block_ctr -= 1;
if wait_block_ctr == 0 {
status.store(RecordStatus::Recording {
clipped: clipdetector.hasClipped(),
recorded: 0.,
pct_done: 0.,
});
}
continue 'recloop;
}
ds.resize((recorded_ctr + 1, framesPerBlock, nchannels))?;
Recording::append_to_dset(
&ds,
recorded_ctr,
&instreamdata,
framesPerBlock,
nchannels,
)?;
// Once we have added to the file, this flag is swapped
// and a file should only be deleted in case of an error.
empty_file = false;
// Recorded time rounded of to milliseconds.
let recorded_time = Duration::from_millis(
((1000 * (recorded_ctr + 1) * framesPerBlock) as Flt
/ meta.samplerate) as u64,
);
if !settings.duration.is_zero() {
// Duration not equal to zero, meaning we record up to a
// certain duration.
if recorded_time >= settings.duration {
break 'recloop;
}
}
// println!("\n... {} {} {}", recorded_time.as_millis(), meta.samplerate, framesPerBlock);
recorded_ctr += 1;
status.store(RecordStatus::Recording {
clipped: clipdetector.hasClipped(),
recorded: recorded_time.as_secs_f64(),
pct_done: if settings.duration > Duration::ZERO {
100. * recorded_time.as_millis() as Flt
/ settings.duration.as_millis() as Flt
} else {
0.
},
});
} }
// println!("\n... {} {} {}", recorded_time.as_millis(), meta.samplerate, framesPerBlock);
stored_ctr += 1;
status.store(RecordStatus::Recording(recorded_time));
} }
} // end of 'recloop
if empty_file {
bail!("Recording stopped before any data is stored.");
} }
} // end of 'recloop Ok(())
};
if empty_file { if let Err(e) = recording_closure() {
bail!("Recording stopped before any data is stored."); status.store(RecordStatus::Finished {
clipped: clipdetector.hasClipped(),
error: Some(format!("{e}")),
});
} else {
status.store(RecordStatus::Finished {
clipped: clipdetector.hasClipped(),
error: None,
});
// End of thread
} }
status.store(RecordStatus::Finished);
Ok(())
// End of thread
}); });
Ok(Recording { Ok(Recording {
settings: settings_clone, settings: settings_clone,
stopThread: stopThread_clone, stopThread: stopThread_clone,
handle: Some(handle), last_status: RecordStatus::NoUpdate {},
last_status: RecordStatus::NoUpdate,
status_from_thread: status_clone, status_from_thread: status_clone,
}) })
} }
@ -395,76 +469,75 @@ impl Recording {
// occured), or when cancel() is called, or when recording object is dropped // occured), or when cancel() is called, or when recording object is dropped
// while thread is still running. // while thread is still running.
fn deleteFile(&self) { fn deleteFile(&self) {
if let Some(_) = self.handle {
panic!("Misuse bug: cannot delete file while thread is still running");
}
// File should not be un use anymore, as thread is joined. // File should not be un use anymore, as thread is joined.
// In case of error, we try to delete the file // In case of error, we try to delete the file
if let Err(e) = std::fs::remove_file(&self.settings.filename) { if let Err(e) = std::fs::remove_file(&self.settings.filename) {
eprintln!("Recording failed, but file removal failed as well: {}", e); eprintln!("Recording failed, but file removal failed as well: {}", e);
} }
} }
}
// Join the thread, store the last status. Please make sure it is joinable, #[cfg_attr(feature = "python-bindings", pymethods)]
// otherwise this method will hang forever. impl Recording {
fn cleanupThread(&mut self) { #[cfg(feature = "python-bindings")]
if let Some(h) = self.handle.take() { #[new]
let res = h.join().unwrap(); fn new_py(settings: RecordSettings, mgr: &mut StreamMgr) -> PyResult<Recording> {
if let Err(e) = res { Ok(Recording::new(settings, mgr)?)
self.last_status = RecordStatus::Error(format!("{}", e));
}
}
} }
/// Get current record status /// Get current record status
pub fn status(&mut self) -> RecordStatus { pub fn status(&mut self) -> RecordStatus {
// Update status due to normal messaging // Update status due to normal messaging
let status_from_thread = self.status_from_thread.swap(RecordStatus::NoUpdate); let status_from_thread = self.status_from_thread.swap(RecordStatus::NoUpdate {});
match status_from_thread { match status_from_thread {
RecordStatus::NoUpdate => {} RecordStatus::NoUpdate {} => {}
_ => { _ => {
self.last_status = status_from_thread; self.last_status = status_from_thread;
} }
} }
if let Some(h) = &self.handle {
// Update the status by taking any error messages
if h.is_finished() {
self.cleanupThread();
}
}
// Return latest status // Return latest status
self.last_status.clone() self.last_status.clone()
} }
/// Stop existing recording early. At the current time, or st /// Stop existing recording early. At the current time, or st
pub fn stop(&mut self) { pub fn stop(&mut self) {
// Stop thread , join, update status // Stop thread and wait for message of finished
self.stopThread.store(true, SeqCst); self.stopThread.store(true, Relaxed);
self.cleanupThread(); self.waitForThread();
match self.status() { match self.status() {
RecordStatus::Finished => { // Do nothing RecordStatus::Finished { error, .. } => {
if error.is_some() {
// Delete file if there was an error
self.deleteFile();
}
} }
_ => { _ => {
// an error occured, we try to delete the backing file panic!("RecordStatus should be finished!");
self.deleteFile()
} }
} }
} }
/// Cancel recording. Deletes the recording file /// Cancel recording. Deletes the recording file
pub fn cancel(&mut self) { pub fn cancel(&mut self) {
self.stopThread.store(true, SeqCst); self.stopThread.store(true, Relaxed);
self.cleanupThread(); self.waitForThread();
self.deleteFile(); self.deleteFile();
} }
fn waitForThread(&mut self) {
while !matches!(self.status(), RecordStatus::Finished { .. }) {
std::thread::sleep(Duration::from_millis(10));
}
}
} }
impl Drop for Recording { impl Drop for Recording {
fn drop(&mut self) { fn drop(&mut self) {
if self.handle.is_some() { // If we enter here, stop() or cancel() has not been called. In that
// If we enter here, stop() or cancel() has not been called. In that // case, we cleanup here by cancelling the recording and deleting the file.
// case, we cleanup here by cancelling the recording if !matches!(self.status(), RecordStatus::Finished { .. }) {
self.cancel(); self.stopThread.store(true, Relaxed);
self.waitForThread();
self.deleteFile();
} }
} }
} }

View File

@ -4,6 +4,8 @@
mod ppm; mod ppm;
mod rtaps; mod rtaps;
mod rtview; mod rtview;
mod simpleclip;
pub use ppm::{PPM, ClipState}; pub use ppm::{PPM, ClipState};
pub use rtaps::{RtAps, RtApsResult}; pub use rtaps::{RtAps, RtApsResult};
pub use rtview::RtViewer; pub use rtview::RtViewer;
pub use simpleclip::SimpleClipDetector;

78
src/rt/simpleclip.rs Normal file
View File

@ -0,0 +1,78 @@
use crate::daq::InStreamMsg;
use crate::daq::StreamMgr;
use crate::math::maxabs;
use crate::Flt;
use crossbeam::channel::bounded;
use parking_lot::Mutex;
use std::sync::atomic::Ordering::Relaxed;
use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
/// If signal is above this value, we indicate that the signal has clipped.
const CLIP_STRONG_LIMIT: Flt = 0.999;
/// Very simple clip detector. Used to detect cliping in a recording. Stores one
/// clip value if just something happened between time of new and moment of drop().
///
pub struct SimpleClipDetector {
clipped: Arc<AtomicBool>,
stopThread: Arc<AtomicBool>,
}
impl SimpleClipDetector {
/// Create new clip detector
///
/// # Args
///
/// - `smgr` - see [StreamMgr]
pub fn new(smgr: &mut StreamMgr) -> Self {
let (tx, rx) = bounded(0);
let clipstate = Arc::new(AtomicBool::new(false));
let stopThread = Arc::new(AtomicBool::new(false));
let clipstate2 = clipstate.clone();
let stopThread2 = stopThread.clone();
smgr.addInQueue(tx);
rayon::spawn(move || loop {
if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1500)) {
match msg {
InStreamMsg::InStreamData(dat) => {
let flt = dat.getFloatData();
let maxabs = maxabs(flt.view());
if maxabs >= CLIP_STRONG_LIMIT {
clipstate.store(true, Relaxed);
// We do not have to do anything anymore. The signal
// has clipped so we do not have to check any new
// blocks anymore.
return;
}
}
_ => {}
}
};
if stopThread.load(Relaxed) {
return;
}
});
Self {
clipped: clipstate2,
stopThread: stopThread2,
}
}
/// Check whether a clip has happened
pub fn hasClipped(&self) -> bool {
return self.clipped.load(Relaxed);
}
}
impl Drop for SimpleClipDetector {
fn drop(&mut self) {
self.stopThread.store(true, Relaxed);
}
}

View File

@ -14,21 +14,72 @@ const PINKNOISE_ANALOG_ORDER: usize = 10;
pub struct WhiteNoise { pub struct WhiteNoise {
// SmallRng is a cheap random number generator // SmallRng is a cheap random number generator
rng: SmallRng, rng: SmallRng,
// Interruption state (whether to output, number of samples, number of samples after which a switch need to be performed)
interrupt_state: Option<InterruptState>,
} }
impl WhiteNoise { impl WhiteNoise {
pub fn new() -> Self { pub fn new(fs: Flt, interrupt_period: Option<Flt>) -> Self {
let interrupt_state = if let Some(period) = interrupt_period {
if period > 0. {
Some(InterruptState {
period,
cur_idx: 0,
max_idx: (period * fs) as usize,
silence: false,
})
} else {
None
}
} else {
None
};
WhiteNoise { WhiteNoise {
rng: SmallRng::from_entropy(), rng: SmallRng::from_entropy(),
interrupt_state,
} }
} }
} }
impl SourceImpl for WhiteNoise { impl SourceImpl for WhiteNoise {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) { fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
sig.for_each(|s| { let mut output = true;
*s = self.rng.sample(StandardNormal);
}); // Look at whether we should do interruption of the played noise.
if let Some(InterruptState {
cur_idx,
max_idx,
silence,
..
}) = &mut self.interrupt_state
{
if cur_idx > max_idx {
// Swap flag
*cur_idx = 0;
*silence = !*silence;
}
output = !*silence;
*cur_idx += sig.len();
}
// If output is true, send new random noise. Otherwise, just silence
if output {
sig.for_each(|s| {
*s = self.rng.sample(StandardNormal);
});
} else {
sig.for_each(|s| {
*s = 0.;
});
}
}
fn reset(&mut self, fs: Flt) {
if let Some(state) = &mut self.interrupt_state {
// Restore to first start with output
state.silence = false;
state.cur_idx = 0;
state.max_idx = (state.period * fs) as usize;
// state.period = untouched
}
} }
fn reset(&mut self, _fs: Flt) {}
fn clone_dyn(&self) -> Box<dyn SourceImpl> { fn clone_dyn(&self) -> Box<dyn SourceImpl> {
Box::new(self.clone()) Box::new(self.clone())
} }
@ -38,14 +89,18 @@ impl SourceImpl for WhiteNoise {
pub struct ColoredNoise { pub struct ColoredNoise {
// White noise generator // White noise generator
wn: WhiteNoise, wn: WhiteNoise,
// Temporary storage for the generated signal. Needs to be able to slice,
// which is not guaranteed by the input iterator.
tmp: Vec<Flt>, tmp: Vec<Flt>,
// Analog filter used to generate the digital filter below
analogue_blueprint: ZPKModel, analogue_blueprint: ZPKModel,
// The digital filter that colors the white noise
filter: SeriesBiquad, filter: SeriesBiquad,
} }
impl ColoredNoise { impl ColoredNoise {
/// Generate a colored noise signal source that outputs pink noise (-3 dB / /// Generate a colored noise signal source that outputs pink noise (-3 dB /
/// octave ) from 20 Hz to 20 kHz. /// octave ) from 20 Hz to 20 kHz.
pub fn newPinkNoise() -> Self { pub fn newPinkNoise(fs: Flt, interrupt_period: Option<Flt>) -> Self {
let twopi = 2. * pi; let twopi = 2. * pi;
let fl = 10.; let fl = 10.;
let fu = 20e3; let fu = 20e3;
@ -72,7 +127,7 @@ impl ColoredNoise {
let analogue_blueprint = ZPKModel::new(zeros, poles, gain); let analogue_blueprint = ZPKModel::new(zeros, poles, gain);
let filter = analogue_blueprint.bilinear(480000.); let filter = analogue_blueprint.bilinear(480000.);
Self { Self {
wn: WhiteNoise::new(), wn: WhiteNoise::new(fs, interrupt_period),
tmp: vec![], tmp: vec![],
analogue_blueprint, analogue_blueprint,
filter, filter,
@ -128,3 +183,11 @@ impl SourceImpl for ColoredNoise {
Box::new(self.clone()) Box::new(self.clone())
} }
} }
#[derive(Clone, Copy, Debug)]
struct InterruptState {
period: Flt,
cur_idx: usize,
max_idx: usize,
silence: bool,
}

View File

@ -11,6 +11,10 @@ use std::fmt::Debug;
use std::iter::ExactSizeIterator; use std::iter::ExactSizeIterator;
use std::slice::IterMut; use std::slice::IterMut;
/// Dummy sampling frequency to be filled in when the sampling frequency is
/// still unknown at the point in time.
pub const DUMMY_SAMPLING_FREQ: Flt = 48000.;
/// Multiple channel signal generator. Able to create (acoustic) output signals. See above example on how to use. /// Multiple channel signal generator. Able to create (acoustic) output signals. See above example on how to use.
/// Typical signal that can be created are: /// Typical signal that can be created are:
/// ///
@ -39,30 +43,9 @@ pub struct Siggen {
#[cfg(feature = "python-bindings")] #[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)] #[cfg_attr(feature = "python-bindings", pymethods)]
impl Siggen { impl Siggen {
#[pyo3(name = "newWhiteNoise")] #[new]
#[staticmethod] fn new_py() -> Self {
fn newWhiteNoise_py(fs: Flt) -> Siggen { Siggen::new(1, Source::newSilence())
Siggen::newWhiteNoise(fs, 0)
}
#[pyo3(name = "newSine")]
#[staticmethod]
fn newSine_py(fs: Flt, freq: Flt, nchannels: usize) -> PyResult<Siggen> {
Ok(Siggen::newSine(fs, nchannels, freq)?)
}
#[pyo3(name = "newSweep")]
#[staticmethod]
fn newSweep_py(
fs: Flt,
nchannels: usize,
fl: Flt,
fu: Flt,
sweep_time: Flt,
quiet_time: Flt,
sweep_type: SweepType,
) -> Result<Self> {
Ok(Siggen::newSweep(
fs, nchannels, fl, fu, sweep_time, quiet_time, sweep_type,
)?)
} }
} }
@ -71,7 +54,7 @@ impl Siggen {
/// # Args /// # Args
/// ///
/// - `nchannels` - The number of channels to output /// - `nchannels` - The number of channels to output
/// - `source` - Source function /// - `source` - Source that generates the signal
pub fn new(nchannels: usize, source: Source) -> Siggen { pub fn new(nchannels: usize, source: Source) -> Siggen {
Siggen { Siggen {
fs: None, fs: None,
@ -131,8 +114,8 @@ impl Siggen {
/// ///
/// - `fs` - Sampling frequency \[Hz\] /// - `fs` - Sampling frequency \[Hz\]
/// - `nchannels` - The number of channels to output /// - `nchannels` - The number of channels to output
pub fn newWhiteNoise(_fs: Flt, nchannels: usize) -> Siggen { pub fn newWhiteNoise(fs: Flt, nchannels: usize, interrupt_period: Option<Flt>) -> Siggen {
Siggen::new(nchannels, Source::newWhiteNoise()) Siggen::new(nchannels, Source::newWhiteNoise(fs, interrupt_period))
} }
/// Returns the number of channels this signal generator is generating for. /// Returns the number of channels this signal generator is generating for.
@ -278,7 +261,7 @@ mod test {
fn test_whitenoise() { fn test_whitenoise() {
// This code is just to check syntax. We should really be listening to these outputs. // This code is just to check syntax. We should really be listening to these outputs.
let mut t = [0.0; 10]; let mut t = [0.0; 10];
Siggen::newWhiteNoise(1., 1).genSignal(&mut t); Siggen::newWhiteNoise(1., 1, None).genSignal(&mut t);
// println!("{:?}", &t); // println!("{:?}", &t);
} }
@ -289,7 +272,7 @@ mod test {
const N: usize = 10000; const N: usize = 10000;
let mut s1 = [0.0; N]; let mut s1 = [0.0; N];
let mut s2 = [0.0; N]; let mut s2 = [0.0; N];
let mut siggen = Siggen::newSine(1., 1, 1.0).unwrap(); let mut siggen = Siggen::newSine(10., 1, 1.0).unwrap();
siggen.reset(10.0); siggen.reset(10.0);
siggen.setAllMute(false); siggen.setAllMute(false);

View File

@ -1,6 +1,7 @@
//! All sources for a signal generator. Sine waves, sweeps, noise, etc. //! All sources for a signal generator. Sine waves, sweeps, noise, etc.
use super::sweep::{SweepParams, SweepType};
use super::noise::{ColoredNoise, WhiteNoise}; use super::noise::{ColoredNoise, WhiteNoise};
use super::siggen::DUMMY_SAMPLING_FREQ;
use super::sweep::{SweepParams, SweepType};
use crate::config::*; use crate::config::*;
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
@ -23,7 +24,10 @@ impl Source {
/// ///
/// # Args /// # Args
/// ///
/// - `fs` - Sampling frequency \[Hz\] /// - `fs`: Sampling frequency \[Hz\]. When not known at the point in time,
/// just fill in something sensible. If the [Siggen] runs in a
/// [StreamMgr], the [StreamMgr] cals [Siggen::reset] to set the right
/// sampling frequency.
/// * `freq` - Frequency of the sine wave in \[Hz\] /// * `freq` - Frequency of the sine wave in \[Hz\]
pub fn newSine(fs: Flt, freq: Flt) -> Result<Source> { pub fn newSine(fs: Flt, freq: Flt) -> Result<Source> {
Ok(Source { Ok(Source {
@ -39,15 +43,32 @@ impl Source {
} }
/// Create a white noise signal source /// Create a white noise signal source
pub fn newWhiteNoise() -> Source { ///
/// # Args
///
/// - `fs`: Sampling frequency \[Hz\]. When not known at the point in time,
/// just fill in something sensible. If the [Siggen] runs in a
/// [StreamMgr], the [StreamMgr] cals [Siggen::reset] to set the right
/// sampling frequency.
/// - `interrupt_period` - when given AND > 0, this turns on and off the
/// noise source with periods given by the value, in \[s\].
pub fn newWhiteNoise(fs: Flt, interrupt_period: Option<Flt>) -> Source {
Source { Source {
src: Box::new(WhiteNoise::new()), src: Box::new(WhiteNoise::new(fs, interrupt_period)),
} }
} }
/// Create a pink noise signal source /// Create a pink noise signal source
pub fn newPinkNoise() -> Source { /// # Args
///
/// - `fs`: Sampling frequency \[Hz\]. When not known at the point in time,
/// just fill in something sensible. If the [Siggen] runs in a
/// [StreamMgr], the [StreamMgr] cals [Siggen::reset] to set the right
/// sampling frequency.
/// - `interrupt_period` - when given AND > 0, this turns on and off the
/// noise source with periods given by the value, in \[s\].
pub fn newPinkNoise(fs: Flt, interrupt_period: Option<Flt>) -> Source {
Source { Source {
src: Box::new(ColoredNoise::newPinkNoise()), src: Box::new(ColoredNoise::newPinkNoise(fs, interrupt_period)),
} }
} }
@ -55,7 +76,10 @@ impl Source {
/// ///
/// # Args /// # Args
/// ///
/// - `fs` - Sample rate \[Hz\] /// - `fs`: Sampling frequency \[Hz\]. When not known at the point in time,
/// just fill in something sensible. If the [Siggen] runs in a
/// [StreamMgr], the [StreamMgr] cals [Siggen::reset] to set the right
/// sampling frequency.
/// - `fl` - Lower frequency \[Hz\] /// - `fl` - Lower frequency \[Hz\]
/// - `fu` - Upper frequency \[Hz\] /// - `fu` - Upper frequency \[Hz\]
/// - `sweep_time` - The duration of a single sweep \[s\] /// - `sweep_time` - The duration of a single sweep \[s\]
@ -89,14 +113,14 @@ impl Source {
Self::newSilence() Self::newSilence()
} }
#[staticmethod] #[staticmethod]
#[pyo3(name = "newWhiteNoise")] #[pyo3(name = "newWhiteNoise", signature=(interrupt_period=None))]
fn newWhiteNoise_py() -> Source { fn newWhiteNoise_py(interrupt_period: Option<Flt>) -> Source {
Self::newWhiteNoise() Self::newWhiteNoise(DUMMY_SAMPLING_FREQ, interrupt_period)
} }
#[staticmethod] #[staticmethod]
#[pyo3(name = "newPinkNoise")] #[pyo3(name = "newPinkNoise", signature=(interrupt_period=None))]
fn newPinkNoise_py() -> Source { fn newPinkNoise_py(interrupt_period: Option<Flt>) -> Source {
Self::newPinkNoise() Self::newPinkNoise(DUMMY_SAMPLING_FREQ, interrupt_period)
} }
#[staticmethod] #[staticmethod]
#[pyo3(name = "newSweep")] #[pyo3(name = "newSweep")]