Compare commits

...

14 Commits

25 changed files with 3435 additions and 68 deletions

2
.gitignore vendored
View File

@ -3,3 +3,5 @@
__pycache__
python/lasprs/_lasprs*
.venv
.vscode/launch.json
.vscode

View File

@ -1,6 +1,6 @@
[package]
name = "lasprs"
version = "0.2.2"
version = "0.3.0"
edition = "2021"
authors = ["J.A. de Jong <j.a.dejong@ascee.nl>"]
description = "Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)"
@ -8,30 +8,83 @@ readme = "README.md"
repository = "https://code.ascee.nl/ascee/lasprs"
license = "MIT OR Apache-2.0"
keywords = ["dsp", "audio", "measurement", "acoustics", "filter"]
categories = [
"multimedia::audio",
"science",
"mathematics"]
categories = ["multimedia::audio", "science", "mathematics"]
[lib]
name = "lasprs"
crate-type = ["cdylib", "lib"]
crate-type = ["cdylib", "rlib",]
[dependencies]
# Error handling
anyhow = "1.0.75"
pyo3 = { version = "0.20", features=["anyhow", "extension-module"]}
# Numerics
# Optional future feature for ndarray: blas
ndarray = { version = "0.15.3", features = ["rayon"] }
num = "0.4.1"
rayon = "1.8.0"
numpy = { version = "0.20" }
# blas-src = { version = "0.8", features = ["openblas"] }
# openblas-src = { version = "0.10", features = ["cblas", "system"] }
# Parallel iterators
rayon = "1.8.0"
# Python bindings
pyo3 = { version = "0.20", optional = true, features = ["extension-module", "anyhow"]}
numpy = { version = "0.20", optional = true}
# White noise etc
rand = "0.8.5"
rand_distr = "0.4.3"
# Cross-platform audio lib
cpal = { version = "0.15.3", optional = true }
# Nice enumerations
strum = "0.25.0"
strum_macros = "0.25.3"
# Conditional compilation enhancements
cfg-if = "1.0.0"
# Reinterpret buffers. This is a workaround for the #[feature(specialize)] not
# being available in stable rust.
reinterpret = "0.2.1"
# Faster channels for multithreaded communication
crossbeam = "0.8.2"
# Serialization
serde = { version = "1.0.193", features = ["derive"] }
toml = "0.8.8"
# Initialize array for non-copy type
array-init = "2.1.0"
# Types of a sample
dasp_sample = "0.11.0"
# Required for recording and looking into measurements
hdf5-sys = { version = "0.8.1", features = ["static"], optional = true }
hdf5 = { version = "0.8.1", optional = true }
# Useful iterator stuff
itertools = "0.12.0"
# For getting timestamps. Only useful when recording.
chrono = {version = "0.4.31", optional = true}
# For getting UUIDs in recording
uuid = { version = "1.6.1", features = ["v4"] , optional = true}
# Command line argument parser, for CLI apps
clap = { version = "4.4.11", features = ["derive", "color", "help", "suggestions"] }
[features]
# default = ["f64"]
# Use this for debugging extension
default = ["f64", "extension-module", "pyo3/extension-module"]
# default = ["f64", "cpal-api", "record"]
# Use this for debugging extensions
default = ["f64", "python-bindings", "record", "cpal-api"]
cpal-api = ["dep:cpal"]
record = ["dep:hdf5-sys", "dep:hdf5", "dep:chrono", "dep:uuid"]
f64 = []
f32 = []
extension-module = ["pyo3/extension-module"]
python-bindings = ["dep:pyo3", "dep:numpy"]

46
src/bin/lasp_devinfo.rs Normal file
View File

@ -0,0 +1,46 @@
use anyhow::Result;
use clap::Parser;
use lasprs::daq::{DaqConfig, StreamMgr};
/// Simple program to greet a person
#[derive(Parser, Debug)]
#[command(author, version, about="Generates DAQ configurations for available devices.", long_about = None)]
struct Args {
/// Devices to match. Search for these substrings in device names. Only
/// configurations are output based on these names.
#[arg(short, long)]
matches: Vec<String>,
}
fn main() -> Result<()> {
let args = Args::parse();
let write_all = args.matches.is_empty();
let mut smgr = StreamMgr::new();
// Obtain list of devices
let devs = smgr.getDeviceInfo();
// Iterate over them
for dev in devs.iter() {
// The file name will be the device name, plus toml extension
let filename = dev.device_name.clone() + ".toml";
// If no device name strings are given, we are outputting them all to a file.
if write_all {
let daqconfig = DaqConfig::newFromDeviceInfo(dev);
daqconfig.serialize_TOML_file(&filename.clone().into())?;
} else {
// See if we find the name in the match list.
for m in args.matches.iter() {
let needle = m.to_lowercase();
let dev_lower = dev.device_name.to_lowercase();
if dev_lower.contains(&needle) {
DaqConfig::newFromDeviceInfo(dev)
.serialize_TOML_file(&filename.clone().into())?;
}
}
}
}
Ok(())
}

View File

@ -0,0 +1,56 @@
use anyhow::Result;
use crossbeam::channel::{unbounded, Receiver, TryRecvError};
use lasprs::daq::{StreamHandler, StreamMgr, InStreamMsg};
use std::io;
use std::{thread, time};
// use
fn spawn_stdin_channel() -> Receiver<String> {
let (tx, rx) = unbounded();
thread::spawn(move || loop {
let mut buffer = String::new();
io::stdin().read_line(&mut buffer).unwrap();
tx.send(buffer).unwrap();
});
rx
}
fn sleep(millis: u64) {
let duration = time::Duration::from_millis(millis);
thread::sleep(duration);
}
fn main() -> Result<()> {
let mut smgr = StreamMgr::new();
smgr.startDefaultInputStream()?;
let stdin_channel = spawn_stdin_channel();
let sh = StreamHandler::new(&mut smgr);
'infy: loop {
match stdin_channel.try_recv() {
Ok(_key) => break 'infy,
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
sleep(100);
match sh.rx.try_recv() {
Ok(msg) => {
// eprint!("Obtained message: {:?}", msg);
match msg {
InStreamMsg::StreamStarted(meta) => {
println!("Stream started: {:?}", meta);
},
_ => { println!("Other msg...");}
}
}
Err(e) => match e {
TryRecvError::Disconnected => {
break 'infy;
}
TryRecvError::Empty => {}
},
}
}
Ok(())
}

View File

@ -0,0 +1,67 @@
use anyhow::Result;
use crossbeam::channel::{unbounded, Receiver, TryRecvError};
use lasprs::daq::{StreamMgr, StreamStatus, StreamType};
use lasprs::siggen::Siggen;
use std::io;
use std::{thread, time};
// use
/// Spawns a thread and waits for a single line, pushes it to the receiver and returns
fn stdin_channel_wait_for_return() -> Receiver<String> {
let (tx, rx) = unbounded();
thread::spawn(move || {
let mut buffer = String::new();
io::stdin().read_line(&mut buffer).unwrap();
// Do not care whether we succeed here.
let _ = tx.send(buffer);
});
rx
}
fn sleep(millis: u64) {
let duration = time::Duration::from_millis(millis);
thread::sleep(duration);
}
fn main() -> Result<()> {
let mut smgr = StreamMgr::new();
let stdin_channel = stdin_channel_wait_for_return();
println!("Creating signal generator...");
let mut siggen = Siggen::newSineWave(2, 432.);
// Some things that can be done
// siggen.setDCOffset(&[0.1, 0.]);
// Reduce all gains a bit...
siggen.setAllGains(0.1);
// Apply signal generator
smgr.setSiggen(siggen)?;
println!("Starting stream...");
smgr.startDefaultOutputStream()?;
println!("Press <enter> key to quit...");
'infy: loop {
match stdin_channel.try_recv() {
Ok(_key) => break 'infy,
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
sleep(100);
match smgr.getStatus(StreamType::Output) {
StreamStatus::NotRunning => {
println!("Stream is not running?");
break 'infy;
}
StreamStatus::Running => {}
StreamStatus::Error(e) => {
println!("Stream error: {}", e);
break 'infy;
}
}
}
Ok(())
}

116
src/bin/lasp_record.rs Normal file
View File

@ -0,0 +1,116 @@
use anyhow::Result;
use clap::{arg, command, Parser};
use crossbeam::channel::{unbounded, Receiver, TryRecvError};
#[cfg(feature = "record")]
use lasprs::daq::{RecordSettings, RecordStatus, Recording, StreamMgr, StreamType};
use lasprs::Flt;
use std::{
io, thread,
time::{self, Duration},
};
#[derive(Parser)]
#[command(author, version, about = "Record data to h5 file, according to LASP format", long_about = None)]
struct Cli {
/// File name to write recording to
filename: String,
/// Recording duration in [s]. Rounds down to whole seconds. If not specified, records until user presses a key
#[arg(short, long = "duration", default_value_t = 0.)]
duration_s: Flt,
/// Start delay in [s]. Rounds down to whole seconds. If not specified, no
/// start delay will be used.
#[arg(short, long = "startdelay", default_value_t = 0.)]
start_delay_s: Flt,
/// TOML configuration file for used stream
#[arg(short, long = "config-file")]
config_file_daq: Option<String>,
}
#[cfg(not(feature = "record"))]
fn main() -> Result<()> {
bail!("Record feature not enabled. This executable is not working");
}
#[cfg(feature = "record")]
fn main() -> Result<()> {
use lasprs::daq::DaqConfig;
let ops = Cli::parse();
let mut smgr = StreamMgr::new();
let stdin_channel = spawn_stdin_channel();
let settings = RecordSettings {
filename: ops.filename.into(),
duration: Duration::from_secs(ops.duration_s as u64),
startDelay: Duration::from_secs(ops.start_delay_s as u64),
};
match ops.config_file_daq {
// No config file is given, start default input stream
None => smgr.startDefaultInputStream()?,
Some(filename) => {
// If config file is given, use that.
let file = std::fs::read_to_string(filename)?;
let cfg = DaqConfig::deserialize_TOML_str(&file)?;
smgr.startStream(StreamType::Input, &cfg)?;
}
}
let mut r = Recording::new(settings, &mut smgr)?;
println!("Starting to record... Enter 'c' to cancel.");
'infy: loop {
match r.status() {
RecordStatus::Idle => println!("\nIdle"),
RecordStatus::Error(e) => {
println!("\nRecord error: {}", e);
break 'infy;
}
RecordStatus::Waiting => {
println!("Waiting in start delay...");
}
RecordStatus::Finished => {
println!("\nRecording finished.");
break 'infy;
}
RecordStatus::Recording(duration) => {
println!("Recording... {} ms", duration.as_millis());
}
RecordStatus::NoUpdate => {}
};
match stdin_channel.try_recv() {
Ok(_key) => {
println!("User pressed key. Manually stopping recording here.");
match _key.to_lowercase().as_str() {
"c" => r.cancel(),
_ => r.stop(),
}
break 'infy;
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
sleep(500);
}
Ok(())
}
fn sleep(millis: u64) {
let duration = time::Duration::from_millis(millis);
thread::sleep(duration);
}
fn spawn_stdin_channel() -> Receiver<String> {
let (tx, rx) = unbounded();
thread::spawn(move || loop {
let mut buffer = String::new();
io::stdin().read_line(&mut buffer).unwrap();
tx.send(buffer).unwrap();
});
rx
}

View File

@ -1,21 +1,41 @@
// #![
#[cfg(feature = "f32")]
pub type Flt = f32;
#[cfg(feature = "f32")]
pub const pi: Flt = std::f32::consts::PI;
//! Configuration of module. Here, we can choose to compile for 32-bits or 64-bit floating point values
//! as basic data storage and computation size. Default is f64.
//!
#[cfg(feature = "f64")]
pub type Flt = f64;
#[cfg(feature = "f64")]
pub const pi: Flt = std::f64::consts::PI;
cfg_if::cfg_if! {
if #[cfg(feature="f64")] {
/// Floating-point value, compile time option to make it either f32, or f64
pub type Flt = f64;
/// Ratio between circumference and diameter of a circle
pub const pi: Flt = std::f64::consts::PI;
}
else if #[cfg(feature="f32")] {
/// Floating-point value, compile time option to make it either f32, or f64
pub type Flt = f32;
/// Ratio between circumference and diameter of a circle
pub const pi: Flt = std::f32::consts::PI;
}
else {
std::compile_error!("feature should be f32 or f64");
}
}
use num::complex::*;
/// Complex number floating point
pub type Cflt = Complex<Flt>;
use numpy::ndarray::{Array1, Array2};
use ndarray::{Array1, Array2};
/// Vector of floating point values
pub type Vd = Vec<Flt>;
/// Vector of complex floating point values
pub type Vc = Vec<Cflt>;
pub type Dmat = Array2<Flt>;
pub type Cmat = Array2<Cflt>;
/// 1D array of floats
pub type Dcol = Array1<Flt>;
/// 1D array of complex floats
pub type Ccol = Array1<Cflt>;
/// 2D array of floats
pub type Dmat = Array2<Flt>;
/// 2D array of complex floats
pub type Cmat = Array2<Cflt>;

616
src/daq/api/api_cpal.rs Normal file
View File

@ -0,0 +1,616 @@
#![allow(dead_code)]
use super::Stream;
use super::StreamMetaData;
use crate::daq::streamdata::*;
use crate::config::{self, *};
use crate::daq::{self, *};
use anyhow::{bail, Result};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{Device, Host, Sample, SampleFormat, SupportedBufferSize};
use crossbeam::atomic::AtomicCell;
use crossbeam::channel::{Receiver, Sender};
use itertools::Itertools;
use num::ToPrimitive;
use reinterpret::reinterpret_slice;
use std::any::{Any, TypeId};
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
/// Convert CPAL sampleformat datatype
impl From<DataType> for cpal::SampleFormat {
fn from(dt: DataType) -> cpal::SampleFormat {
match dt {
DataType::F64 => SampleFormat::F64,
DataType::F32 => SampleFormat::F32,
DataType::I8 => SampleFormat::I8,
DataType::I16 => SampleFormat::I16,
DataType::I32 => SampleFormat::I32,
}
}
}
// Convert datatype to CPAL sample format
impl From<cpal::SampleFormat> for DataType {
fn from(sf: cpal::SampleFormat) -> DataType {
match sf {
SampleFormat::F64 => DataType::F64,
SampleFormat::F32 => DataType::F32,
SampleFormat::I8 => DataType::I8,
SampleFormat::I16 => DataType::I16,
SampleFormat::I32 => DataType::I32,
_ => panic!("Not implemented sample format: {}", sf),
}
}
}
/// Cpal api
pub struct CpalApi {
host: cpal::Host,
}
pub struct CpalStream {
stream: cpal::Stream,
md: Arc<StreamMetaData>,
noutchannels: usize,
status: Arc<AtomicCell<StreamStatus>>,
}
impl Stream for CpalStream {
fn metadata(&self) -> Arc<StreamMetaData> {
self.md.clone()
}
fn ninchannels(&self) -> usize {
self.md.nchannels()
}
fn noutchannels(&self) -> usize {
self.noutchannels
}
fn status(&self) -> StreamStatus {
self.status.load()
}
}
impl CpalApi {
pub fn new() -> CpalApi {
// for h in cpal::platform::available_hosts() {
// println!("h: {:?}", h);
// }
CpalApi {
host: cpal::default_host(),
}
}
pub fn getDeviceInfo(&self) -> Result<Vec<DeviceInfo>> {
let srs_1 = [
1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000,
];
let srs_2 = [11025, 22050, 44100, 88200];
let mut srs_tot = Vec::from_iter(srs_1.iter().chain(srs_2.iter()));
srs_tot.sort();
let srs_tot = Vec::from_iter(srs_tot.iter().copied().map(|i| *i as Flt));
// srs_tot.sort();
let mut devs = vec![];
for dev in self.host.devices()? {
// println!("{:?}", dev.name());
let mut iChannelCount = 0;
let mut oChannelCount = 0;
let mut avSampleRates = srs_tot.clone();
let mut avFramesPerBlock = vec![256_usize, 512, 1024, 2048, 8192];
let mut sample_formats = vec![];
// Search for sample formats
if let Ok(icfg) = dev.supported_input_configs() {
for icfg in icfg {
let thissf = icfg.sample_format();
if thissf.is_uint() {
continue;
}
sample_formats.push(icfg.sample_format());
avSampleRates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt);
if let SupportedBufferSize::Range { min, max } = icfg.buffer_size() {
avFramesPerBlock.retain(|i| i >= &(*min as usize));
avFramesPerBlock.retain(|i| i <= &(*max as usize));
}
iChannelCount = icfg.channels() as u8;
// avFramesPerBlock.retain(|i| i >= icfg.buffer_size().)
}
}
if let Ok(ocfg) = dev.supported_input_configs() {
for ocfg in ocfg {
let thissf = ocfg.sample_format();
if thissf.is_uint() {
continue;
}
sample_formats.push(thissf);
avSampleRates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt);
if let SupportedBufferSize::Range { min, max } = ocfg.buffer_size() {
avFramesPerBlock.retain(|i| i >= &(*min as usize));
avFramesPerBlock.retain(|i| i <= &(*max as usize));
}
oChannelCount = ocfg.channels() as u8;
}
}
sample_formats.dedup();
if sample_formats.is_empty() {
continue;
}
let dtypes: Vec<DataType> =
sample_formats.iter().dedup().map(|i| (*i).into()).collect();
let prefDataType = match dtypes.iter().position(|d| d == &DataType::F32) {
Some(idx) => dtypes[idx],
None => dtypes[dtypes.len() - 1],
};
let prefSampleRate = *avSampleRates.last().unwrap_or(&48000.);
devs.push(DeviceInfo {
api: super::StreamApiDescr::Cpal,
device_name: dev.name()?,
avDataTypes: dtypes,
prefDataType,
avSampleRates,
prefSampleRate,
avFramesPerBlock,
prefFramesPerBlock: 2048,
iChannelCount,
oChannelCount,
hasInputIEPE: false,
hasInputACCouplingSwitch: false,
hasInputTrigger: false,
hasInternalOutputMonitor: false,
duplexModeForced: false,
physicalIOQty: Qty::Number,
})
}
Ok(devs)
}
// Create the error function closure, that capture the send channel on which error messages from the stream are sent
fn create_errfcn(
send_ch: Option<Sender<RawStreamData>>,
status: Arc<AtomicCell<StreamStatus>>,
) -> impl FnMut(cpal::StreamError) {
move |err: cpal::StreamError| {
let serr = match err {
cpal::StreamError::DeviceNotAvailable => StreamError::DeviceNotAvailable,
cpal::StreamError::BackendSpecific { err: _ } => StreamError::DriverError,
};
if let Some(sender) = &send_ch {
sender.send(RawStreamData::StreamError(serr)).unwrap();
}
status.store(StreamStatus::Error(serr));
}
}
fn create_incallback<T>(
config: &cpal::StreamConfig,
sender: Sender<RawStreamData>,
framesPerBlock: usize,
en_inchannels: Vec<usize>,
) -> impl FnMut(&[T], &cpal::InputCallbackInfo)
where
T: 'static + Sample + ToPrimitive,
{
let tot_inch = config.channels as usize;
let mut q = VecDeque::<T>::with_capacity(2 * tot_inch * framesPerBlock);
let mut enabled_ch_data: Vec<T> =
vec![Sample::EQUILIBRIUM; en_inchannels.len() * framesPerBlock];
// The actual callback that is returned
move |input: &[T], _: &cpal::InputCallbackInfo| {
// Copy elements over in ring buffer
q.extend(input);
while q.len() > tot_inch * framesPerBlock {
// println!("q full enough: {}", q.len());
// // Loop over enabled channels
for (i, ch) in en_inchannels.iter().enumerate() {
let in_iterator = q.iter().skip(*ch).step_by(tot_inch);
let out_iterator = enabled_ch_data
.iter_mut()
.skip(i)
.step_by(en_inchannels.len());
// Copy over elements, *DEINTERLEAVED*
out_iterator.zip(in_iterator).for_each(|(o, i)| {
*o = *i;
});
}
// Drain copied elements from ring buffer
q.drain(0..framesPerBlock * tot_inch);
// Send over data
let msg = RawStreamData::from(enabled_ch_data.clone());
sender.send(msg).unwrap()
}
}
}
/// Create an input stream for a CPAL device.
///
/// # Arguments
///
/// * sf: Sample format
fn build_input_stream(
sf: cpal::SampleFormat,
config: &cpal::StreamConfig,
device: &cpal::Device,
sender: Sender<RawStreamData>,
en_inchannels: Vec<usize>,
framesPerBlock: usize,
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning));
let errfcn = CpalApi::create_errfcn(Some(sender.clone()), status.clone());
macro_rules! build_stream{
($($cpaltype:pat => $rtype:ty),*) => {
match sf {
$(
$cpaltype => {
let icb = CpalApi::create_incallback::<$rtype>(&config, sender, framesPerBlock, en_inchannels);
device.build_input_stream(
&config,
icb,
errfcn,
None)?
}),*,
_ => bail!("Unsupported sample format '{}'", sf)
}
}
}
let stream: cpal::Stream = build_stream!(
SampleFormat::I8 => i8,
SampleFormat::I16 => i16,
SampleFormat::I32 => i32,
SampleFormat::F32 => f32
);
Ok((stream, status))
}
fn create_outcallback<T>(
config: &cpal::StreamConfig,
streamstatus: Arc<AtomicCell<StreamStatus>>,
receiver: Receiver<RawStreamData>,
framesPerBlock: usize,
) -> impl FnMut(&mut [T], &cpal::OutputCallbackInfo)
where
T: 'static + Sample + Debug,
{
let tot_outch: usize = config.channels as usize;
// println!("Numer of channels: {:?}", tot_outch);
let mut callback_ctr: usize = 0;
let mut q = VecDeque::<T>::with_capacity(2 * tot_outch * framesPerBlock);
move |data, _info: &_| {
let nsamples_asked = data.len();
let status = streamstatus.load();
callback_ctr += 1;
let mut setToEquilibrium = || data.iter_mut().for_each(|v| *v = Sample::EQUILIBRIUM);
match status {
StreamStatus::NotRunning | StreamStatus::Error(_) => {
setToEquilibrium();
return;
}
_ => {}
}
if q.len() < nsamples_asked {
// Obtain new samples from the generator
for dat in receiver.try_iter() {
let slice = dat.getRef::<T>();
if let StreamStatus::Running = status {
q.extend(slice);
}
}
}
if q.len() >= nsamples_asked {
// All right, we have enough samples to send out! They are
// drained from the queue
data.iter_mut()
.zip(q.drain(..nsamples_asked))
.for_each(|(o, i)| *o = i);
} else if callback_ctr <= 2 {
// For the first two blocks, we allow dat the data is not yet
// ready, without complaining on underruns
setToEquilibrium();
} else {
// Output buffer underrun
streamstatus.store(StreamStatus::Error(StreamError::OutputUnderrunError));
setToEquilibrium();
}
}
}
fn build_output_stream(
sf: cpal::SampleFormat,
config: &cpal::StreamConfig,
device: &cpal::Device,
receiver: Receiver<RawStreamData>,
framesPerBlock: usize,
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
// let tot_ch = config.channels as usize;
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning));
let err_cb = CpalApi::create_errfcn(None, status.clone());
macro_rules! build_stream{
($($cpaltype:pat => $rtype:ty),*) => {
match sf {
$(
$cpaltype => {
let outcallback = CpalApi::create_outcallback::<$rtype>(config, status.clone(), receiver, framesPerBlock);
device.build_output_stream(
&config,
outcallback,
err_cb,
None)?
}),*,
_ => bail!("Unsupported sample format '{}'", sf)
}
}
}
let stream: cpal::Stream = build_stream!(
SampleFormat::I8 => i8,
SampleFormat::I16 => i16,
SampleFormat::I32 => i32,
SampleFormat::F32 => f32
);
Ok((stream, status))
}
/// Create CPAL specific configuration, from our specified daq config and device info
fn create_cpal_config<T>(
st: StreamType,
devinfo: &DeviceInfo,
conf: &DaqConfig,
_dev: &cpal::Device,
conf_iterator: T,
) -> Result<cpal::SupportedStreamConfig>
where
T: Iterator<Item = cpal::SupportedStreamConfigRange>,
{
let nchannels = match st {
StreamType::Input => devinfo.iChannelCount,
StreamType::Output => devinfo.oChannelCount,
_ => unreachable!(),
};
for cpalconf in conf_iterator {
if cpalconf.sample_format() == conf.dtype.into() {
// Specified sample format is available
if cpalconf.channels() == nchannels as u16 {
let requested_sr = conf.sampleRate(devinfo);
if cpalconf.min_sample_rate().0 as Flt <= requested_sr
&& cpalconf.max_sample_rate().0 as Flt >= requested_sr
{
// Sample rate falls within range.
let requested_fpb = conf.framesPerBlock(devinfo) as u32;
// Last check: check if buffer size is allowed
match cpalconf.buffer_size() {
SupportedBufferSize::Range { min, max } => {
if min >= &requested_fpb || max <= &requested_fpb {
bail!(
"Frames per block should be >= {} and <= {}. Requested {}.",
min,
max,
requested_fpb
)
}
}
_ => {}
}
return Ok(cpalconf.with_sample_rate(cpal::SampleRate(requested_sr as u32)));
}
}
}
}
bail!("API error: specified DAQ configuration is not available for device")
}
/// Start a stream for a device with a given configuration.
pub fn startInputStream(
&self,
stype: StreamType,
devinfo: &DeviceInfo,
conf: &DaqConfig,
sender: Sender<RawStreamData>,
) -> Result<Box<dyn Stream>> {
for cpaldev in self.host.devices()? {
// See if we can create a supported stream config.
let supported_config = match stype {
StreamType::Duplex => bail!("Duplex stream not supported for CPAL"),
StreamType::Input => CpalApi::create_cpal_config(
stype,
devinfo,
conf,
&cpaldev,
cpaldev.supported_input_configs()?,
),
StreamType::Output => CpalApi::create_cpal_config(
stype,
devinfo,
conf,
&cpaldev,
cpaldev.supported_output_configs()?,
),
}?;
let framesPerBlock = conf.framesPerBlock(devinfo);
let sf = supported_config.sample_format();
let config: cpal::StreamConfig = supported_config.config();
let meta = StreamMetaData::new(
&conf.enabledInchannelConfig(),
conf.dtype,
supported_config.sample_rate().0 as Flt,
framesPerBlock,
)?;
let meta = Arc::new(meta);
let (stream, status) = CpalApi::build_input_stream(
sf,
&config,
&cpaldev,
sender,
conf.enabledInchannelsList(),
framesPerBlock,
)?;
stream.play()?;
status.store(StreamStatus::Running);
return Ok(Box::new(CpalStream {
stream,
md: meta,
noutchannels: 0,
status,
}));
}
bail!(format!(
"Error: requested device {} not found. Please make sure the device is available.",
devinfo.device_name
))
}
/// Start a default input stream.
///
///
pub fn startDefaultInputStream(
&mut self,
sender: Sender<RawStreamData>,
) -> Result<Box<dyn Stream>> {
if let Some(device) = self.host.default_input_device() {
if let Ok(config) = device.default_input_config() {
let framesPerBlock: usize = 4096;
let final_config = cpal::StreamConfig {
channels: config.channels(),
sample_rate: config.sample_rate(),
buffer_size: cpal::BufferSize::Fixed(framesPerBlock as u32),
};
let en_inchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize));
let sf = config.sample_format();
let (stream, status) = CpalApi::build_input_stream(
sf,
&final_config,
&device,
sender,
en_inchannels,
framesPerBlock,
)?;
stream.play()?;
status.store(StreamStatus::Running);
// Daq: default channel config
let daqchannels = Vec::from_iter(
(0..final_config.channels)
.map(|i| DaqChannel::defaultAudio(format!("Unnamed input channel {}", i))),
);
// Specify data tape
let dtype = DataType::from(sf);
// Create stream metadata
let md = StreamMetaData::new(
&daqchannels,
dtype,
config.sample_rate().0 as Flt,
framesPerBlock,
)?;
let md = Arc::new(md);
Ok(Box::new(CpalStream {
stream,
md,
noutchannels: 0,
status,
}))
} else {
bail!("Could not obtain default input configuration")
}
} else {
bail!("Could not open default input device")
}
}
pub fn startDefaultOutputStream(
&self,
receiver: Receiver<RawStreamData>,
) -> Result<Box<dyn Stream>> {
if let Some(device) = self.host.default_output_device() {
if let Ok(config) = device.default_output_config() {
// let framesPerBlock: usize = 256;
// let framesPerBlock: usize = 8192;
let framesPerBlock: usize = config.sample_rate().0 as usize;
// let framesPerBlock: usize = 256;
let final_config = cpal::StreamConfig {
channels: config.channels(),
sample_rate: config.sample_rate(),
buffer_size: cpal::BufferSize::Fixed(framesPerBlock as u32),
};
// let en_outchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize));
let sampleformat = config.sample_format();
let (stream, status) = CpalApi::build_output_stream(
sampleformat,
&final_config,
&device,
receiver,
framesPerBlock,
)?;
stream.play()?;
status.store(StreamStatus::Running);
// Daq: default channel config
let daqchannels =
Vec::from_iter((0..final_config.channels).map(|i| {
DaqChannel::defaultAudio(format!("Unnamed output channel {}", i))
}));
// // Specify data tape
let dtype = DataType::from(sampleformat);
// // Create stream metadata
let md = StreamMetaData::new(
&daqchannels,
dtype,
config.sample_rate().0 as Flt,
framesPerBlock,
)?;
let md = Arc::new(md);
let str = Box::new(CpalStream {
stream,
md,
noutchannels: daqchannels.len(),
status,
});
Ok(str)
} else {
bail!("Could not obtain default output configuration")
} // Default output config is OK
} else {
bail!("Could not open output device")
} // Could not
}
// Create an output stream, using given signal generators for each channel.
// }
pub fn startOutputStream(&self, _rx: Receiver<RawStreamData>) -> Result<Box<dyn Stream>> {
bail!("Not implemented");
}
}

0
src/daq/api/api_pulse.rs Normal file
View File

41
src/daq/api/mod.rs Normal file
View File

@ -0,0 +1,41 @@
use serde::{Deserialize, Serialize};
/// Daq apis that are optionally compiled in. Examples:
///
/// - CPAL (Cross-Platform Audio Library)
/// - ...
use strum::EnumMessage;
use strum_macros;
use std::sync::Arc;
use super::{streamstatus::StreamStatus, streamdata::StreamMetaData};
#[cfg(feature = "cpal-api")]
pub mod api_cpal;
#[cfg(feature = "pulse_api")]
pub mod api_pulse;
/// A currently running stream
pub trait Stream {
/// Stream metadata. Only available for input streams
fn metadata(&self) -> Arc<StreamMetaData>;
/// Number of input channels in stream
fn ninchannels(&self) -> usize;
/// Number of output channels in stream
fn noutchannels(&self) -> usize;
fn status(&self) -> StreamStatus;
}
#[derive(strum_macros::EnumMessage, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[allow(dead_code)]
pub enum StreamApiDescr {
/// CPAL api
#[strum(message = "Cpal", detailed_message = "Cross-Platform Audio Library")]
Cpal = 0,
/// PulseAudio api
#[strum(message = "pulse", detailed_message = "Pulseaudio")]
Pulse = 1,
}

222
src/daq/daqconfig.rs Normal file
View File

@ -0,0 +1,222 @@
use std::{ops::Index, path::PathBuf};
use super::api::StreamApiDescr;
use super::datatype::DataType;
use super::deviceinfo::DeviceInfo;
use super::qty::Qty;
use crate::config::*;
use anyhow::Result;
use serde::{Deserialize, Serialize};
/// DAQ Configuration for a single channel
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct DaqChannel {
/// Whether the channel is enabled
pub enabled: bool,
/// Readable name for channel
pub name: String,
/// To convert to physical units. Divide values by this to obtain it.
pub sensitivity: Flt,
/// Enabled constant current power supply for sensor (if device supports it)
pub IEPEEnabled: bool,
/// Enabled hardware AC coupling (if)
pub ACCouplingMode: bool,
/// If supporting multiple input ranges: select the right index
pub rangeIndex: usize,
/// Physical quantity
pub qty: Qty,
}
impl Default for DaqChannel {
fn default() -> Self {
DaqChannel {
enabled: false,
name: "".into(),
sensitivity: -1.0,
IEPEEnabled: false,
ACCouplingMode: false,
rangeIndex: 0,
qty: Qty::Number,
}
}
}
impl DaqChannel {
/// Default channel configuration for audio input from a certain channel
pub fn defaultAudio(name: String) -> Self {
DaqChannel {
enabled: true,
name,
sensitivity: 1.0,
IEPEEnabled: false,
ACCouplingMode: false,
rangeIndex: 0,
qty: Qty::Number,
}
}
}
/// Configuration of a device.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct DaqConfig {
/// The API
pub api: StreamApiDescr,
/// Device name. Should match when starting a stream
pub device_name: String,
/// Configuration of the input channels
pub inchannel_config: Vec<DaqChannel>,
/// Configuration of the output channels
pub outchannel_config: Vec<DaqChannel>,
/// The data type to use
pub dtype: DataType,
/// Whether to apply a digital high pass on the input. <= 0 means disabled. > 0 means, the value specifies the cut-on frequency for the first order high pass filter.
pub digitalHighPassCutOn: Flt,
/// The index to use in the list of possible sample rates
sampleRateIndex: usize,
/// The index to use in the list of possible frames per block
framesPerBlockIndex: usize,
/// Used when output channels should be monitored, i.e. reverse-looped back as input channels.
monitorOutput: bool,
}
impl DaqConfig {
/// Creates a new default device configuration for a given device as specified with
/// the DeviceInfo descriptor.
pub fn newFromDeviceInfo(devinfo: &DeviceInfo) -> DaqConfig {
let inchannel_config = (0..devinfo.iChannelCount)
.map(|_| DaqChannel::default())
.collect();
let outchannel_config = (0..devinfo.oChannelCount)
.map(|_| DaqChannel::default())
.collect();
let sampleRateIndex = devinfo
.avSampleRates
.iter()
.position(|x| x == &devinfo.prefSampleRate)
.unwrap_or(devinfo.avSampleRates.len() - 1);
// Choose 4096 when in list, otherwise choose the highes available value in list
let framesPerBlockIndex = devinfo
.avFramesPerBlock
.iter()
.position(|x| x == &4096)
.unwrap_or(devinfo.avFramesPerBlock.len() - 1);
DaqConfig {
api: devinfo.api.clone(),
device_name: devinfo.device_name.clone(),
inchannel_config,
outchannel_config,
dtype: devinfo.prefDataType,
digitalHighPassCutOn: -1.0,
sampleRateIndex,
framesPerBlockIndex,
monitorOutput: false,
}
}
/// Serialize DaqConfig object to TOML.
///
/// Args
///
/// * writer: Output writer, can be file or string, or anything that *is* std::io::Write
///
pub fn serialize_TOML(&self, writer: &mut dyn std::io::Write) -> Result<()> {
let ser_str = toml::to_string(&self)?;
writer.write_all(ser_str.as_bytes())?;
Ok(())
}
/// Deserialize structure from TOML data
///
/// # Args
///
/// * reader: implements the Read trait, from which we read the data.
pub fn deserialize_TOML<T>(reader: &mut T) -> Result<DaqConfig>
where
T: std::io::Read,
{
let mut read_str = vec![];
reader.read_to_end(&mut read_str)?;
let read_str = String::from_utf8(read_str)?;
DaqConfig::deserialize_TOML_str(&read_str)
}
/// Deserialize from TOML string
///
/// # Args
///
/// * st: string containing TOML data.
pub fn deserialize_TOML_str(st: &String) -> Result<DaqConfig> {
let res: DaqConfig = toml::from_str(st)?;
Ok(res)
}
/// Write this configuration to a TOML file.
///
/// Args
///
/// * file: Name of file to write to
///
pub fn serialize_TOML_file(&self, file: &PathBuf) -> Result<()> {
let mut file = std::fs::File::create(file)?;
self.serialize_TOML(&mut file)?;
Ok(())
}
/// Returns a list of enabled input channel numbers as indices
/// in the list of all input channels (enabled and not)
pub fn enabledInchannelsList(&self) -> Vec<usize> {
self.inchannel_config
.iter()
.enumerate()
.filter(|(_, ch)| ch.enabled)
.map(|(i, _)| i)
.collect()
}
/// Returns the total number of channels that appear in a running input stream.
pub fn numberEnabledInChannels(&self) -> usize {
self.inchannel_config.iter().filter(|ch| ch.enabled).count()
}
/// Returns the total number of channels that appear in a running output stream.
pub fn numberEnabledOutChannels(&self) -> usize {
self.outchannel_config
.iter()
.filter(|ch| ch.enabled)
.count()
}
/// Provide samplerate, based on device and specified sample rate index
pub fn sampleRate(&self, dev: &DeviceInfo) -> Flt {
dev.avSampleRates[self.sampleRateIndex]
}
/// Provide samplerate, based on device and specified sample rate index
pub fn framesPerBlock(&self, dev: &DeviceInfo) -> usize {
dev.avFramesPerBlock[self.framesPerBlockIndex]
}
/// Returns vec of channel configuration for enabled input channels only
pub fn enabledInchannelConfig(&self) -> Vec<DaqChannel> {
self.inchannel_config
.iter()
.filter(|ch| ch.enabled)
.cloned()
.collect()
}
/// Returns vec of channel configuration for enabled output channels only
pub fn enabledOutchannelConfig(&self) -> Vec<DaqChannel> {
self.outchannel_config
.iter()
.filter(|ch| ch.enabled)
.cloned()
.collect()
}
}

26
src/daq/datatype.rs Normal file
View File

@ -0,0 +1,26 @@
//! Data types (sample formats) that can come from a DAQ device, or have to be sent as output to a
//! DAQ device.
use strum::EnumMessage;
use strum_macros;
use serde::{Serialize, Deserialize};
/// Data type description for samples coming from a stream
#[derive(strum_macros::EnumMessage, PartialEq, Copy, Debug, Clone, Serialize, Deserialize)]
#[allow(dead_code)]
pub enum DataType {
/// 32-bit floats
#[strum(message = "F32", detailed_message = "32-bits floating points")]
F32 = 0,
/// 64-bit floats
#[strum(message = "F64", detailed_message = "64-bits floating points")]
F64 = 1,
/// 8-bit integers
#[strum(message = "I8", detailed_message = "8-bits integers")]
I8 = 2,
/// 16-bit integers
#[strum(message = "I16", detailed_message = "16-bits integers")]
I16 = 3,
/// 32-bit integers
#[strum(message = "I32", detailed_message = "32-bits integers")]
I32 = 4,
}

69
src/daq/deviceinfo.rs Normal file
View File

@ -0,0 +1,69 @@
//! Data acquisition model. Provides abstract layers around DAQ devices.
#![allow(non_snake_case)]
use super::api::StreamApiDescr;
use super::*;
use crate::config::*;
/// Device info structure. Gives all information regarding a device, i.e. the number of input and
/// output channels, its name and available sample rates and types.
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct DeviceInfo {
/// The api in use for this device
pub api: StreamApiDescr,
/// Name for the device.
pub device_name: String,
/// Available data types for the sample
pub avDataTypes: Vec<DataType>,
/// Preferred data type for device
pub prefDataType: DataType,
/// Available frames per block
pub avFramesPerBlock: Vec<usize>,
/// Preferred frames per block for device
pub prefFramesPerBlock: usize,
/// Available sample rates
pub avSampleRates: Vec<Flt>,
/// Preferred sample rate for device
pub prefSampleRate: Flt,
/// Number of input channels available for this device
pub iChannelCount: u8,
/// Number of output channels available for this device
pub oChannelCount: u8,
/// Whether the device is capable to provide IEPE constant current power supply.
pub hasInputIEPE: bool,
/// Whether the device is capable of enabling a hardware AC-coupling
pub hasInputACCouplingSwitch: bool,
///Whether the device is able to trigger on input
pub hasInputTrigger: bool,
/// Whether the device has an internal monitor of the output signal. If
/// true, the device is able to monitor output signals internally and able to
/// present output signals as virtual input signals. This only works together
/// Daq's that are able to run in full duplex mode.
pub hasInternalOutputMonitor: bool,
/// This flag is used to be able to indicate that the device cannot run
/// input and output streams independently, without opening the device in
/// duplex mode. This is for example true for the UlDaq: only one handle to
/// the device can be given at the same time.
pub duplexModeForced: bool,
/// The physical quantity of the output signal. For 'normal' audio
/// devices, this is typically a 'number' between +/- full scale. For some
/// devices however, the output quantity corresponds to a physical signal,
/// such a Volts.
pub physicalIOQty: Qty,
}

82
src/daq/mod.rs Normal file
View File

@ -0,0 +1,82 @@
//! Data acquisition model. Provides abstract layers around DAQ devices.
mod api;
mod daqconfig;
mod datatype;
mod deviceinfo;
mod qty;
#[cfg(feature = "record")]
mod record;
mod streamcmd;
mod streamdata;
mod streamhandler;
mod streammgr;
mod streammsg;
mod streamstatus;
// Module re-exports
pub use daqconfig::{DaqChannel, DaqConfig};
pub use datatype::*;
pub use deviceinfo::DeviceInfo;
pub use qty::Qty;
pub use streamhandler::StreamHandler;
pub use streammgr::StreamMgr;
pub use streammsg::InStreamMsg;
pub use streamstatus::StreamStatus;
#[cfg(feature = "record")]
pub use record::*;
use strum_macros::Display;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Stream types that can be started
///
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(PartialEq, Clone, Copy)]
pub enum StreamType {
/// Input-only stream
Input,
/// Output-only stream
Output,
/// Input and output at the same time
Duplex,
}
/// Errors that happen in a stream
#[derive(strum_macros::EnumMessage, Debug, Clone, Display, Copy)]
pub enum StreamError {
/// Input overrun
#[strum(
message = "InputOverrun Error",
detailed_message = "Input buffer overrun"
)]
InputOverrunError,
/// Output underrun
#[strum(
message = "OutputUnderrunError",
detailed_message = "Output buffer underrun"
)]
OutputUnderrunError,
/// Driver specific error
#[strum(message = "DriverError", detailed_message = "Driver error")]
DriverError,
/// Device
#[strum(detailed_message = "Device not available (anymore)")]
DeviceNotAvailable,
/// Logic error (something weird happened)
#[strum(detailed_message = "Logic error")]
LogicError,
}

24
src/daq/qty.rs Normal file
View File

@ -0,0 +1,24 @@
//! Physical quantities that are input / output of a daq device. Provides an enumeration for these.
//!
use strum::EnumMessage;
use strum_macros;
use serde::{Serialize, Deserialize};
/// Physical quantities that are I/O of a Daq device.
#[derive(PartialEq, Serialize, Deserialize, strum_macros::EnumMessage, Debug, Clone, Copy)]
#[allow(dead_code)]
pub enum Qty {
/// Number
#[strum(message = "number", detailed_message = "Unitless number")]
Number = 0,
/// Acoustic pressure
#[strum(message = "acousticpressure", detailed_message = "Acoustic Pressure [Pa]")]
AcousticPressure = 1,
/// Voltage
#[strum(message = "voltage", detailed_message = "Voltage [V]")]
Voltage = 2,
#[strum(message = "userdefined", detailed_message = "User defined [#]")]
/// User defined
UserDefined = 3,
}

466
src/daq/record.rs Normal file
View File

@ -0,0 +1,466 @@
use super::*;
use crate::config::Flt;
use anyhow::{bail, Error, Result};
use clap::builder::OsStr;
use crossbeam::atomic::AtomicCell;
use hdf5::types::{VarLenArray, VarLenUnicode};
use hdf5::{dataset, datatype, Dataset, File, H5Type};
use ndarray::ArrayView2;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamdata::*;
use streammgr::*;
use streammsg::InStreamMsg;
use strum::EnumMessage;
#[derive(Clone, Debug)]
/// Status of a recording
pub enum RecordStatus {
/// Nothing to update
NoUpdate,
/// Not yet started, waiting for first msg
Idle,
/// Waiting for start delay to be processed.
Waiting,
/// Recording in progress
Recording(Duration),
/// Recording finished
Finished,
/// An error occurred, in any case when an error occurs, it is tried to remove the file.
Error(String),
}
/// Settings used to start a recording.
#[derive(Clone)]
pub struct RecordSettings {
/// File name to record to.
pub filename: PathBuf,
/// The recording time. Set to 0 to perform indefinite recording
pub duration: Duration,
/// The delay to wait before adding data
pub startDelay: Duration,
}
impl RecordSettings {
/// Create new record settings. Convenience wrapper to fill in fields in
/// right form. Start delay is optional
///
/// * args:
/// filename: Name of file to record to
/// duration: How long recording should be. Zero means record indefinitely.
/// startDelay: Optional start delay.
pub fn new<T, U>(filename: T, duration: U, startDelay: Option<U>) -> RecordSettings
where
T: Into<PathBuf>,
U: Into<Duration> + Default,
{
RecordSettings {
filename: filename.into(),
duration: duration.into(),
startDelay: startDelay
.map(|s| s.into())
.unwrap_or_else(|| Duration::ZERO),
}
}
}
/// This struct lets a recording run on a stream, waits till the first data arrives and records for a given period of time. Usage:
///
/// ```
/// use lasprs::{RecordSettings, StreamMgr, Recording};
/// use std::time::Duration;
///
/// fn main() -> anyhow::Result<()> {
/// let mut smgr = StreamMgr::new();
/// smgr.startDefaultInputStream()?;
///
/// // Create record settings
/// let settings = RecordSettings::new(
/// "test.h5",
/// Duration::from_millis(100),
/// None,
/// );
/// let rec = Recording::new(settings, &mut smgr)?;
/// Ok(())
/// }
/// ```
pub struct Recording {
settings: RecordSettings,
handle: Option<JoinHandle<Result<()>>>,
// Stop the recording. This stops the thread
stopThread: Arc<AtomicBool>,
// Obtain status from thread.
status_from_thread: Arc<AtomicCell<RecordStatus>>,
// Stores latest status from thread, if no update comes from status_from_thread
last_status: RecordStatus,
}
impl Recording {
fn create_dataset_type<T>(file: &File, meta: &StreamMetaData) -> Result<Dataset>
where
T: H5Type,
{
let bs = meta.framesPerBlock;
let nch = meta.nchannels();
match file
.new_dataset::<T>()
.chunk((1, bs, nch))
.shape((1.., bs, nch))
// .deflate(3)
.create("audio")
{
Ok(f) => Ok(f),
Err(e) => bail!("{}", e),
}
}
fn create_dataset(file: &File, meta: &StreamMetaData) -> Result<Dataset> {
match meta.rawDatatype {
DataType::I8 => Recording::create_dataset_type::<i8>(file, meta),
DataType::I16 => Recording::create_dataset_type::<i16>(file, meta),
DataType::I32 => Recording::create_dataset_type::<i32>(file, meta),
DataType::F32 => Recording::create_dataset_type::<f32>(file, meta),
DataType::F64 => Recording::create_dataset_type::<f64>(file, meta),
}
}
fn write_hdf5_attr_scalar<T>(file: &File, name: &str, val: T) -> Result<()>
where
T: H5Type,
{
let attr = file.new_attr::<T>().create(name)?;
attr.write_scalar(&val)?;
Ok(())
}
fn write_hdf5_attr_list<T>(file: &File, name: &str, val: &[T]) -> Result<()>
where
T: H5Type,
{
let attr = file.new_attr::<T>().shape([val.len()]).create(name)?;
attr.write(&val)?;
Ok(())
}
#[inline]
fn append_to_dset(
ds: &Dataset,
ctr: usize,
msg: &RawStreamData,
framesPerBlock: usize,
nchannels: usize,
) -> Result<()> {
match msg {
RawStreamData::Datai8(dat) => {
let arr = ndarray::ArrayView2::<i8>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::Datai16(dat) => {
let arr = ndarray::ArrayView2::<i16>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::Datai32(dat) => {
let arr = ndarray::ArrayView2::<i32>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::Dataf32(dat) => {
let arr = ndarray::ArrayView2::<f32>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::Dataf64(dat) => {
let arr = ndarray::ArrayView2::<f64>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::StreamError(e) => {
bail!("Stream error: {}", e)
}
}
Ok(())
}
/// Start a new recording
///
/// # Arguments
///
/// * setttings: The settings to use for the recording
/// * smgr: Stream manager to use to start the recording
///
pub fn new(mut settings: RecordSettings, mgr: &mut StreamMgr) -> Result<Recording> {
// Append extension if not yet there
match settings.filename.extension() {
Some(a) if a == "h5" => {}
None | Some(_) => {
settings.filename =
(settings.filename.to_string_lossy().to_string() + ".h5").into();
}
};
let stopThread = Arc::new(AtomicBool::new(false));
let stopThread_clone = stopThread.clone();
// Fail if filename already exists
if settings.filename.exists() {
bail!(
"Filename '{}' already exists in filesystem",
settings.filename.to_string_lossy()
);
}
let settings_clone = settings.clone();
let status = Arc::new(AtomicCell::new(RecordStatus::Idle));
let status_clone = status.clone();
let (tx, rx) = crossbeam::channel::unbounded();
mgr.addInQueue(tx.clone());
// The thread doing the actual work
let handle = spawn(move || {
let file = File::create(settings.filename)?;
let firstmsg = match rx.recv() {
Ok(msg) => msg,
Err(_) => bail!("Queue handle error"),
};
let meta = match firstmsg {
InStreamMsg::StreamStarted(meta) => meta,
_ => bail!("Recording failed. Missed stream metadata message."),
};
// Samplerate, block size, number of channels
Recording::write_hdf5_attr_scalar(&file, "samplerate", meta.samplerate)?;
Recording::write_hdf5_attr_scalar(&file, "nchannels", meta.nchannels())?;
Recording::write_hdf5_attr_scalar(&file, "blocksize", meta.framesPerBlock)?;
// Store sensitivity
let sens: Vec<Flt> = meta.channelInfo.iter().map(|ch| ch.sensitivity).collect();
Recording::write_hdf5_attr_list(&file, "sensitivity", &sens)?;
// Timestamp
use chrono::DateTime;
let now_utc = chrono::Utc::now();
let timestamp = now_utc.timestamp();
Recording::write_hdf5_attr_scalar(&file, "time", timestamp)?;
// Create UUID for measurement
use hdf5::types::VarLenUnicode;
let uuid = uuid::Uuid::new_v4();
let uuid_unicode: VarLenUnicode = VarLenUnicode::from_str(&uuid.to_string()).unwrap();
Recording::write_hdf5_attr_scalar(&file, "UUID", uuid_unicode)?;
// Channel names
let chnames: Vec<VarLenUnicode> = meta
.channelInfo
.iter()
.map(|ch| VarLenUnicode::from_str(&ch.name).unwrap())
.collect();
let chname_attr = file
.new_attr::<VarLenUnicode>()
.shape([chnames.len()])
.create("channelNames")?;
chname_attr.write(&chnames)?;
// Create the dataset
let ds = Recording::create_dataset(&file, &meta)?;
let framesPerBlock = meta.framesPerBlock as usize;
let mut wait_block_ctr = 0;
// Indicate we are ready to rec!
if settings.startDelay > Duration::ZERO {
status.store(RecordStatus::Waiting);
let startdelay_s = settings.startDelay.as_micros() as Flt / 1e6;
wait_block_ctr =
(meta.samplerate as Flt * startdelay_s / framesPerBlock as Flt) as u32;
} else {
status.store(RecordStatus::Recording(Duration::ZERO));
}
// Counter of stored blocks
let mut stored_ctr = 0;
// Offset in stream
let mut ctr_offset = 0;
// Flag indicating that the first RawStreamData package still has to
// be arrived
let mut first = true;
// Indicating the file is still empty (does not contain recorded data)
let mut empty_file = true;
let nchannels = meta.nchannels() as usize;
'recloop: loop {
if stopThread.load(SeqCst) {
break 'recloop;
}
match rx.recv().unwrap() {
InStreamMsg::StreamError(e) => {
bail!("Recording failed due to stream error: {}.", e)
}
InStreamMsg::ConvertedStreamData(..) => {}
InStreamMsg::StreamStarted(_) => {
bail!("Stream started again?")
}
InStreamMsg::StreamStopped => {
// Early stop. User stopped it.
break 'recloop;
}
InStreamMsg::StreamData(dat) => {
if first {
first = false;
// Initialize counter offset
ctr_offset = dat.ctr;
} else if dat.ctr != stored_ctr + ctr_offset {
println!("********** PACKAGES MISSED ***********");
bail!("Packages missed. Recording is invalid.")
}
if wait_block_ctr > 0 {
// We are still waiting
wait_block_ctr -= 1;
if wait_block_ctr == 0 {
status.store(RecordStatus::Recording(Duration::ZERO));
}
// TODO: Is it a good idea to increase the counter
// here, as well as below?
stored_ctr += 1;
continue 'recloop;
}
ds.resize((stored_ctr + 1, framesPerBlock, nchannels))?;
Recording::append_to_dset(
&ds,
stored_ctr,
&dat.raw,
framesPerBlock,
nchannels,
)?;
// Once we have added to the file, this flag is swapped
// and a file should be deleted in case of an error.
empty_file = false;
// Recorded time rounded of to milliseconds.
let recorded_time = Duration::from_millis(
((1000 * (stored_ctr + 1) * framesPerBlock) as Flt / meta.samplerate)
as u64,
);
if !settings.duration.is_zero() {
// Duration not equal to zero, meaning we record up to a
// certain duration.
if recorded_time >= settings.duration {
break 'recloop;
}
}
// println!("\n... {} {} {}", recorded_time.as_millis(), meta.samplerate, framesPerBlock);
stored_ctr += 1;
status.store(RecordStatus::Recording(recorded_time));
}
}
} // end of 'recloop
if empty_file {
bail!("Recording stopped before any data is stored.");
}
status.store(RecordStatus::Finished);
Ok(())
// End of thread
});
Ok(Recording {
settings: settings_clone,
stopThread: stopThread_clone,
handle: Some(handle),
last_status: RecordStatus::NoUpdate,
status_from_thread: status_clone,
})
}
// Delete recording file, should be done when something went wrong (an error
// occured), or when cancel() is called, or when recording object is dropped
// while thread is still running.
fn deleteFile(&self) {
if let Some(_) = self.handle {
panic!("Misuse bug: cannot delete file while thread is still running");
}
// File should not be un use anymore, as thread is joined.
// In case of error, we try to delete the file
if let Err(e) = std::fs::remove_file(&self.settings.filename) {
eprintln!("Recording failed, but file removal failed as well: {}", e);
}
}
// Join the thread, store the last status. Please make sure it is joinable,
// otherwise this method will hang forever.
fn cleanupThread(&mut self) {
if let Some(h) = self.handle.take() {
let res = h.join().unwrap();
if let Err(e) = res {
self.last_status = RecordStatus::Error(format!("{}", e));
}
}
}
/// Get current record status
pub fn status(&mut self) -> RecordStatus {
// Update status due to normal messaging
let status_from_thread = self.status_from_thread.swap(RecordStatus::NoUpdate);
match status_from_thread {
RecordStatus::NoUpdate => {}
_ => {
self.last_status = status_from_thread;
}
}
if let Some(h) = &self.handle {
// Update the status by taking any error messages
if h.is_finished() {
self.cleanupThread();
}
}
// Return latest status
self.last_status.clone()
}
/// Stop existing recording early. At the current time, or st
pub fn stop(&mut self) {
// Stop thread , join, update status
self.stopThread.store(true, SeqCst);
self.cleanupThread();
match self.status() {
RecordStatus::Finished => { // Do nothing
}
_ => {
// an error occured, we try to delete the backing file
self.deleteFile()
}
}
}
/// Cancel recording. Deletes the recording file
pub fn cancel(&mut self) {
self.stopThread.store(true, SeqCst);
self.cleanupThread();
self.deleteFile();
}
}
impl Drop for Recording {
fn drop(&mut self) {
if self.handle.is_some() {
// If we enter here, stop() or cancel() has not been called. In that
// case, we cleanup here by cancelling the recording
self.cancel();
}
}
}

27
src/daq/streamcmd.rs Normal file
View File

@ -0,0 +1,27 @@
use crate::siggen::*;
use super::streammgr::SharedInQueue;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Commands that can be sent to a running stream
pub enum StreamCommand {
/// Add a new queue to a running INPUT stream
AddInQueue(SharedInQueue),
/// Remove a queue to a running INPUT stream
RemoveInQueue(SharedInQueue),
/// New signal generator config to be used in OUTPUT stream
NewSiggen(Siggen),
/// Stop the thread, do not listen for data anymore.
StopThread,
// New signal generator source
// NewSiggenSource(Source)
}

303
src/daq/streamdata.rs Normal file
View File

@ -0,0 +1,303 @@
//! Provides stream messages that come from a running stream
use crate::config::*;
use crate::daq::Qty;
use crate::siggen::Siggen;
use anyhow::{bail, Result};
use cpal::Sample;
use crossbeam::channel::Sender;
use reinterpret::{reinterpret_slice, reinterpret_vec};
use std::any::TypeId;
use std::sync::{Arc, RwLock};
use std::u128::MAX;
use strum_macros::Display;
use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Raw stream data coming from a stream.
#[derive(Clone, Debug)]
pub enum RawStreamData {
/// 8-bits integer
Datai8(Vec<i8>),
/// 16-bits integer
Datai16(Vec<i16>),
/// 32-bits integer
Datai32(Vec<i32>),
/// 32-bits float
Dataf32(Vec<f32>),
/// 64-bits float
Dataf64(Vec<f64>),
/// A stream error occured
StreamError(StreamError),
}
impl RawStreamData {
pub fn toFloat(&self, _nchannels: usize) -> Dmat {
// match &self {
// RawStreamData::Datai8(c) => {
// Dmat::zeros((2, 2));
// }
// RawStreamData::Datai16(c) => {
// Dmat::zeros((2, 2));
// }
// }
todo!()
}
}
impl RawStreamData {
/// Get reference to raw data buffer
pub fn getRef<T>(&self) -> &[T]
where
T: Sample + 'static,
{
let thetype: TypeId = TypeId::of::<T>();
match &self {
RawStreamData::Datai8(t) => {
let i8type: TypeId = TypeId::of::<i8>();
assert!(thetype == i8type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Datai16(t) => {
let i16type: TypeId = TypeId::of::<i16>();
assert!(thetype == i16type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Datai32(t) => {
let i32type: TypeId = TypeId::of::<i32>();
assert!(thetype == i32type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Dataf32(t) => {
let f32type: TypeId = TypeId::of::<f32>();
assert!(thetype == f32type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Dataf64(t) => {
let f64type: TypeId = TypeId::of::<f64>();
assert!(thetype == f64type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
_ => panic!("Cannot getRef from "),
}
}
}
// Create InStreamData object from
impl<T> From<&[T]> for RawStreamData
where
T: num::ToPrimitive + Clone + 'static,
{
fn from(input: &[T]) -> RawStreamData {
// Apparently, this code does not work with a match. I have searched around and have not found the
// reason for this. So this is a bit of stupid boilerplate.
let i8type: TypeId = TypeId::of::<i8>();
let i16type: TypeId = TypeId::of::<i16>();
let i32type: TypeId = TypeId::of::<i32>();
let f32type: TypeId = TypeId::of::<f32>();
let f64type: TypeId = TypeId::of::<f64>();
let thetype: TypeId = TypeId::of::<T>();
if i8type == thetype {
let v: Vec<i8> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai8(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai16(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai16(v)
} else if i32type == thetype {
let v: Vec<i32> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai32(v)
} else if f32type == thetype {
let v: Vec<f32> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Dataf32(v)
} else if f64type == thetype {
let v: Vec<f64> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Dataf64(v)
} else {
panic!("Not implemented sample type!")
}
}
}
// Create InStreamData object from
impl<T> From<Vec<T>> for RawStreamData
where
T: num::ToPrimitive + Clone + 'static,
{
fn from(input: Vec<T>) -> RawStreamData {
// Apparently, this code does not work with a match. I have searched around and have not found the
// reason for this. So this is a bit of stupid boilerplate.
let i8type: TypeId = TypeId::of::<i8>();
let i16type: TypeId = TypeId::of::<i16>();
let i32type: TypeId = TypeId::of::<i32>();
let f32type: TypeId = TypeId::of::<f32>();
let f64type: TypeId = TypeId::of::<f64>();
let thetype: TypeId = TypeId::of::<T>();
if i8type == thetype {
let v: Vec<i8> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai8(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai16(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai16(v)
} else if i32type == thetype {
let v: Vec<i32> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai32(v)
} else if f32type == thetype {
let v: Vec<f32> = unsafe { reinterpret_vec(input) };
RawStreamData::Dataf32(v)
} else if f64type == thetype {
let v: Vec<f64> = unsafe { reinterpret_vec(input) };
RawStreamData::Dataf64(v)
} else {
panic!("Not implemented sample type!")
}
}
}
/// Stream metadata. All information required for properly interpreting the raw
/// data that is coming from the stream.
#[derive(Clone, Debug)]
pub struct StreamMetaData {
/// Information for each channel in the stream
pub channelInfo: Vec<DaqChannel>,
/// The data type of the device [Number / voltage / Acoustic pressure / ...]
pub rawDatatype: DataType,
/// Sample rate in [Hz]
pub samplerate: Flt,
/// The number of frames per block of data that comes in. Multiplied by
/// channelInfo.len() we get the total number of samples that come in at
/// each callback.
pub framesPerBlock: usize,
}
impl StreamMetaData {
/// Create new metadata object.
/// ///
/// # Args
///
pub fn new(
channelInfo: &[DaqChannel],
rawdtype: DataType,
sr: Flt,
framesPerBlock: usize,
) -> Result<StreamMetaData> {
Ok(StreamMetaData {
channelInfo: channelInfo.to_vec(),
rawDatatype: rawdtype,
samplerate: sr,
framesPerBlock,
})
}
/// Returns the number of channels in the stream metadata.
pub fn nchannels(&self) -> usize {
self.channelInfo.len()
}
}
/// Stream data (audio / other) coming from a stream or to be send to a stream
#[derive(Debug)]
pub struct StreamData {
/// Package counter. Should always increase monotonically.
pub ctr: usize,
/// Stream metadata. All info required for properly interpreting the raw data.
pub meta: Arc<StreamMetaData>,
/// This is typically what is stored when recording
pub raw: RawStreamData,
// Converted to floating point format. Used for further real time
// processing. Stored in an rw-lock. The first thread that acesses this data
// will perform the conversion. All threads after that will get the data.
converted: RwLock<Option<Arc<Dmat>>>,
}
impl StreamData {
/// Create new stream data object.
pub fn new(ctr: usize, meta: Arc<StreamMetaData>, raw: RawStreamData) -> StreamData {
StreamData {
ctr,
meta,
raw,
converted: RwLock::new(None),
}
}
/// Get the data in floating point format. If already converted, uses the
/// cached float data.
pub fn getFloatData(&self) -> Arc<Dmat> {
if let Some(dat) = self.converted.read().unwrap().as_ref() {
return dat.clone();
}
// In case we reach here, the data has not yet be converted to floating
// point, so we do this.
let mut o = self.converted.write().unwrap();
// It might be that another thread was 'first', and already performed
// the conversion. In that case, we still do an early return, and we
// just openend the lock twice for writing. Not a problem.
if let Some(dat) = o.as_ref() {
return dat.clone();
}
// Perform the actual conversion
let converted_data = Arc::new(self.raw.toFloat(self.meta.nchannels()));
// Replace the option with the Some
o.replace(converted_data.clone());
converted_data
}
}
#[cfg(test)]
mod test {
use num::traits::sign;
use cpal::Sample;
use super::*;
#[test]
fn test() {
const fs: Flt = 20.;
// Number of samples per channel
const Nframes: usize = 20;
const Nch: usize = 2;
let mut signal = [0.; Nch*Nframes];
let mut siggen = Siggen::newSineWave(Nch, 1.);
siggen.reset(fs);
siggen.setMute(&[false, true]);
siggen.genSignal(&mut signal);
let raw: Vec<i16> = Vec::from_iter(signal.iter().map(
|o| o.to_sample::<i16>()));
let ms1 = raw.iter().step_by(2).map(|s1| {*s1 as f64 * *s1 as f64}).sum::<f64>() / Nframes as f64;
let i16maxsq = (i16::MAX as f64).powf(2.);
// println!("ms1: {} {}", ms1, i16maxsq/2.);
// println!("{:?}", raw.iter().cloned().step_by(2).collect::<Vec<i16>>());
// println!("{:?}", i16::EQUILIBRIUM);
assert!(f64::abs(ms1 - i16maxsq/2.)/i16maxsq < 1e-3);
}
}

19
src/daq/streamhandler.rs Normal file
View File

@ -0,0 +1,19 @@
use crossbeam::channel::{unbounded, Receiver};
use super::*;
use streammsg::InStreamMsg;
/// A stream handler registers a queue in the stream manager, and keeps the other end to
/// get InStreamData from a running input stream.
pub struct StreamHandler {
/// The receiving part of the channel on which (InStreamData) is received..
pub rx: Receiver<InStreamMsg>
}
impl StreamHandler {
/// Create new stream handler.
pub fn new(smgr: &mut StreamMgr) -> StreamHandler{
let (tx, rx) = unbounded();
smgr.addInQueue(tx);
StreamHandler{rx}
}
}

600
src/daq/streammgr.rs Normal file
View File

@ -0,0 +1,600 @@
//! Data acquisition model. Provides abstract layers around DAQ devices.
use super::api::*;
use super::*;
use crate::{
config::*,
siggen::{self, Siggen},
};
use anyhow::{bail, Error, Result};
use array_init::from_iter;
use core::time;
use cpal::Sample;
use crossbeam::{
channel::{unbounded, Receiver, Sender, TrySendError},
thread,
};
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread};
use streamcmd::StreamCommand;
use streamdata::*;
use streammsg::*;
#[cfg(feature = "cpal-api")]
use super::api::api_cpal::CpalApi;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, types::PyModule, PyResult};
} else {} }
/// Store a queue in a shared pointer, to share sending
/// and receiving part of the queue.
pub type SharedInQueue = Sender<InStreamMsg>;
/// Vector of queues for stream messages
pub type InQueues = Vec<SharedInQueue>;
struct StreamInfo<T> {
streamtype: StreamType,
stream: Box<dyn Stream>,
threadhandle: JoinHandle<T>,
comm: Sender<StreamCommand>,
}
/// Keep track of whether the stream has been created. To ensure singleton behaviour.
static smgr_created: AtomicBool = AtomicBool::new(false);
#[cfg_attr(feature = "python-bindings", pyclass(unsendable))]
/// Configure and manage input / output streams.
///
pub struct StreamMgr {
// List of available devices
devs: Vec<DeviceInfo>,
// Input stream can be both input and duplex
input_stream: Option<StreamInfo<InQueues>>,
// Output only stream
output_stream: Option<StreamInfo<Siggen>>,
#[cfg(feature = "cpal-api")]
cpal_api: CpalApi,
/// The storage of queues. When no streams are running, they
/// are here. When stream is running, they will become available
/// in the JoinHandle of the thread.
instreamqueues: Option<InQueues>,
// Signal generator. Stored here on the bench in case no stream is running.
// It is picked when it is configured correctly for the starting output stream
// If it is not configured correctly, when a stream that outputs data is started
// ,it is removed here.
siggen: Option<crate::siggen::Siggen>,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamMgr {
#[new]
/// See (StreamMgr::new())
fn new_py<'py>() -> StreamMgr {
StreamMgr::new()
}
// #[pyo3(name = "unit")]
// #[staticmethod]
// /// See: [Biquad::unit()]
// pub fn unit_py() -> Biquad {
// Biquad::unit()
// }
// #[pyo3(name = "firstOrderHighPass")]
}
impl Default for StreamMgr {
fn default() -> Self {
Self::new()
}
}
impl StreamMgr {
/// Create new stream manager. A stream manager is supposed to be a singleton.
///
/// # Panics
///
/// When a StreamMgr object is already alive.
pub fn new() -> StreamMgr {
if smgr_created.load(std::sync::atomic::Ordering::Relaxed) {
panic!("BUG: Only one stream manager is supposed to be a singleton");
}
smgr_created.store(true, std::sync::atomic::Ordering::Relaxed);
let mut smgr = StreamMgr {
devs: vec![],
input_stream: None,
output_stream: None,
siggen: None,
#[cfg(feature = "cpal-api")]
cpal_api: CpalApi::new(),
instreamqueues: Some(vec![]),
};
smgr.devs = smgr.scanDeviceInfo();
smgr
}
/// Get stream status for given stream type.
pub fn getStatus(&self, t: StreamType) -> StreamStatus {
match t {
StreamType::Input | StreamType::Duplex => {
if let Some(s) = &self.input_stream {
s.stream.status()
} else {
StreamStatus::NotRunning
}
}
StreamType::Output => {
if let Some(s) = &self.output_stream {
s.stream.status()
} else {
StreamStatus::NotRunning
}
}
}
}
/// Set a new signal generator. Returns an error if it is unapplicable.
/// It is unapplicable if the number of channels of output does not match the
/// number of output channels in a running stream.
pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> {
// Current signal generator. Where to place it?
if let Some(istream) = &self.input_stream {
if let StreamType::Duplex = istream.streamtype {
if siggen.nchannels() != istream.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
assert!(self.siggen.is_none());
istream.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
}
} else if let Some(os) = &self.output_stream {
assert!(self.siggen.is_none());
if siggen.nchannels() != os.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
} else {
self.siggen = Some(siggen);
return Ok(());
}
unreachable!()
}
/// Obtain a list of devices that are available for each available API
pub fn getDeviceInfo(&mut self) -> &Vec<DeviceInfo> {
&self.devs
}
fn scanDeviceInfo(&self) -> Vec<DeviceInfo> {
let mut devinfo = vec![];
#[cfg(feature = "cpal-api")]
{
let cpal_devs = self.cpal_api.getDeviceInfo();
if let Ok(devs) = cpal_devs {
devinfo.extend(devs);
}
}
devinfo
}
/// Add a new queue to the lists of queues. On the queue, input data is
/// added.
///
/// If the stream is unable to write data on the queue (which might
/// happen when the handler is dropped), the queue is removed from the list
/// of queues that get data from the stream.
pub fn addInQueue(&mut self, tx: Sender<InStreamMsg>) {
if let Some(is) = &self.input_stream {
is.comm.send(StreamCommand::AddInQueue(tx)).unwrap()
} else {
self.instreamqueues.as_mut().unwrap().push(tx);
}
}
fn startInputStreamThread(
&mut self,
meta: Arc<StreamMetaData>,
rx: Receiver<RawStreamData>,
) -> (JoinHandle<InQueues>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded();
// Unwrap here, as the queues should be free to grab
let mut iqueues = self
.instreamqueues
.take()
.expect("No input streams queues!");
let threadhandle = std::thread::spawn(move || {
let mut ctr: usize = 0;
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
// New queue added
StreamCommand::AddInQueue(queue) => {
match queue.send(InStreamMsg::StreamStarted(meta.clone())) {
Ok(()) => iqueues.push(queue),
Err(_) => {}
}
}
// Remove queue from list
StreamCommand::RemoveInQueue(queue) => {
iqueues.retain(|q| !q.same_channel(&queue))
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped);
break 'infy;
}
StreamCommand::NewSiggen(_) => {
panic!("Error: signal generator send to input-only stream.");
}
}
}
if let Ok(raw) = rx.recv_timeout(time::Duration::from_millis(10)) {
// println!("Obtained raw stream data!");
let streamdata = StreamData::new(ctr, meta.clone(), raw);
let streamdata = Arc::new(streamdata);
let msg = InStreamMsg::StreamData(streamdata);
sendMsgToAllQueues(&mut iqueues, msg);
ctr += 1;
}
}
iqueues
});
(threadhandle, commtx)
}
// Match device info struct on given daq config.
fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> {
self.devs.iter().find(|&d| d.device_name == cfg.device_name)
}
fn startOuputStreamThread(
&mut self,
meta: Arc<StreamMetaData>,
tx: Sender<RawStreamData>,
) -> (JoinHandle<Siggen>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded();
// Number of channels to output for
let nchannels = meta.nchannels();
// Obtain signal generator. Set to silence when no signal generator is
// installed.
let mut siggen = self
.siggen
.take()
.unwrap_or_else(|| Siggen::newSilence(nchannels));
if siggen.nchannels() != nchannels {
// Updating number of channels
siggen.setNChannels(nchannels);
}
siggen.reset(meta.samplerate);
let threadhandle = std::thread::spawn(move || {
let mut floatbuf: Vec<Flt> = Vec::with_capacity(nchannels * meta.framesPerBlock);
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
// New queue added
StreamCommand::AddInQueue(_) => {
panic!("Invalid message send to output thread: AddInQueue");
}
// Remove queue from list
StreamCommand::RemoveInQueue(_) => {
panic!("Invalid message send to output thread: RemoveInQueue");
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
break 'infy;
}
StreamCommand::NewSiggen(new_siggen) => {
// println!("NEW SIGNAL GENERATOR ARRIVED!");
siggen = new_siggen;
siggen.reset(meta.samplerate);
if siggen.nchannels() != nchannels {
// println!("Updating channels");
siggen.setNChannels(nchannels);
}
}
}
}
while tx.len() < 2 {
unsafe {
floatbuf.set_len(nchannels * meta.framesPerBlock);
}
// Obtain signal
siggen.genSignal(&mut floatbuf);
// println!("level: {}", floatbuf.iter().sum::<Flt>());
let msg = match meta.rawDatatype {
DataType::I8 => {
let v = Vec::<i8>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai8(v)
}
DataType::I16 => {
let v = Vec::<i16>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai16(v)
}
DataType::I32 => {
let v = Vec::<i32>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai32(v)
}
DataType::F32 => {
let v = Vec::<f32>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Dataf32(v)
}
DataType::F64 => {
let v = Vec::<f64>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Dataf64(v)
}
};
if let Err(_e) = tx.send(msg) {
// println!("Error sending raw stream data to output stream!");
break 'infy;
}
}
// }
}
siggen
});
(threadhandle, commtx)
}
/// Start a stream of certain type, using given configuration
pub fn startStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
match stype {
StreamType::Input | StreamType::Duplex => {
self.startInputOrDuplexStream(stype, cfg)?;
}
StreamType::Output => {
// self.startOutputStream(cfg)?;
bail!("No output stream defined yet");
}
}
Ok(())
}
// fn startOutputStream(&mut self, cfg: &DaqConfig) -> Result<()> {
// let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
// let stream = match cfg.api {
// StreamApiDescr::Cpal => {
// let devinfo = self
// .match_devinfo(cfg)
// .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
// self.cpal_api.startOutputStream(devinfo, cfg, tx)?
// }
// _ => bail!("Unimplemented api!"),
// };
// Ok(())
// }
// Start an input or duplex stream
fn startInputOrDuplexStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
if self.input_stream.is_some() {
bail!("An input stream is already running. Please first stop existing input stream.")
}
if cfg.numberEnabledInChannels() == 0 {
bail!("At least one input channel should be enabled for an input stream")
}
if stype == StreamType::Duplex {
if cfg.numberEnabledOutChannels() == 0 {
bail!("At least one output channel should be enabled for a duplex stream")
}
if self.output_stream.is_some() {
bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream.");
}
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
let stream = match cfg.api {
StreamApiDescr::Cpal => {
if stype == StreamType::Duplex {
bail!("Duplex mode not supported for CPAL api");
}
let devinfo = self
.match_devinfo(cfg)
.ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
self.cpal_api.startInputStream(stype, devinfo, cfg, tx)?
}
_ => bail!("Unimplemented api!"),
};
// Input queues should be available, otherwise panic bug.
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
self.input_stream = Some(StreamInfo {
streamtype: stype,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
/// Start a default input stream, using default settings on everything. This is only possible
/// when the CPAL_api is available
pub fn startDefaultInputStream(&mut self) -> Result<()> {
if self.input_stream.is_some() {
bail!("Input stream is already running. Please first stop existing input stream.")
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
// Only a default input stream when CPAL feature is enabled
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
let stream = self.cpal_api.startDefaultInputStream(tx)?;
// Inform all listeners of new stream data
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
self.input_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
else {
bail!("Unable to start default input stream: no CPAL api available")
}
}
}
/// Start a default output stream. Only possible when CPAL Api is available.
pub fn startDefaultOutputStream(&mut self) -> Result<()> {
if let Some(istream) = &self.input_stream {
if istream.streamtype == StreamType::Duplex {
bail!("Duplex stream is already running");
}
}
if self.output_stream.is_some() {
bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream.");
}
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
let (tx, rx)= unbounded();
let stream = self.cpal_api.startDefaultOutputStream(rx)?;
let meta = stream.metadata();
let (threadhandle, commtx) = self.startOuputStreamThread(meta, tx);
// Inform all listeners of new stream data
self.output_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
} // end if cpal api available
else {
bail!("Unable to start default input stream: no CPAL api available")
}
} // end of cfg_if
}
/// Stop existing input stream.
pub fn stopInputStream(&mut self) -> Result<()> {
if let Some(StreamInfo {
streamtype: _, // Ignored here
stream: _,
threadhandle,
comm,
}) = self.input_stream.take()
{
// println!("Stopping existing stream..");
// Send thread to stop
comm.send(StreamCommand::StopThread).unwrap();
// Store stream queues back into StreamMgr
self.instreamqueues = Some(threadhandle.join().expect("Stream thread panicked!"));
} else {
bail!("Stream is not running.")
}
Ok(())
}
/// Stop existing output stream
pub fn stopOutputStream(&mut self) -> Result<()> {
if let Some(StreamInfo {
streamtype: _, // Ignored here
stream: _,
threadhandle,
comm,
}) = self.output_stream.take()
{
if comm.send(StreamCommand::StopThread).is_err() {
// Failed to send command over channel. This means the thread is
// already finished due to some other reason.
assert!(threadhandle.is_finished());
}
// println!("Wainting for threadhandle to join...");
self.siggen = Some(threadhandle.join().expect("Output thread panicked!"));
// println!("Threadhandle joined!");
} else {
bail!("Stream is not running.");
}
Ok(())
}
/// Stop existing running stream.
///
/// Args
///
/// * st: The stream type.
pub fn stopStream(&mut self, st: StreamType) -> Result<()> {
match st {
StreamType::Input | StreamType::Duplex => self.stopInputStream(),
StreamType::Output => self.stopOutputStream(),
}
}
} // impl StreamMgr
impl Drop for StreamMgr {
fn drop(&mut self) {
// Kill input stream if there is one
if self.input_stream.is_some() {
self.stopStream(StreamType::Input).unwrap();
}
if self.output_stream.is_some() {
// println!("Stopstream in Drop");
self.stopStream(StreamType::Output).unwrap();
// println!("Stopstream in Drop done");
}
// Decref the singleton
smgr_created.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
// Send to all queues, remove queues that are disconnected when found out
// on the way.
fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) {
// Loop over queues. Remove queues that error when we try to send
// to them
iqueues.retain(|q| match q.try_send(msg.clone()) {
Ok(_) => true,
Err(_e) => false,
});
}
/// Daq devices
trait Daq {}
#[cfg(test)]
mod tests {
// #[test]
}

36
src/daq/streammsg.rs Normal file
View File

@ -0,0 +1,36 @@
//! Provides stream messages that come from a running stream
use crate::config::*;
use crate::daq::Qty;
use crate::siggen::Siggen;
use anyhow::{bail, Result};
use streamdata::*;
use crossbeam::channel::Sender;
use reinterpret::{reinterpret_slice, reinterpret_vec};
use std::any::TypeId;
use std::sync::{Arc, RwLock};
use std::u128::MAX;
use strum_macros::Display;
use super::*;
/// Input stream messages, to be send to handlers.
#[derive(Clone, Debug)]
pub enum InStreamMsg {
/// Raw stream data that is coming from a device. This is interleaved data. The number of channels is correct and
/// specified in the stream metadata.
StreamData(Arc<StreamData>),
/// An error has occured in the stream
StreamError(StreamError),
/// Stream data converted to floating point with sample width as
/// compiled in.
ConvertedStreamData(usize, Arc<crate::config::Dmat>),
/// new Stream metadata enters the scene. Probably a new stream started.
StreamStarted(Arc<StreamMetaData>),
/// An existing stream stopped.
StreamStopped,
}

25
src/daq/streamstatus.rs Normal file
View File

@ -0,0 +1,25 @@
//! Provides stream messages that come from a running stream
use strum_macros::Display;
use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Gives the stream status of a stream, either input / output or duplex.
#[derive(strum_macros::EnumMessage, Debug, Clone, Copy, Display)]
pub enum StreamStatus {
/// Stream is not running
#[strum(message = "NotRunning", detailed_message = "Stream is not running")]
NotRunning,
/// Stream is running properly
#[strum(message = "Running", detailed_message = "Stream is running")]
Running,
/// An error occured in the stream.
#[strum(message = "Error", detailed_message = "An error occured with the stream")]
Error(StreamError)
}

View File

@ -5,13 +5,19 @@
#![allow(non_snake_case)]
use super::config::*;
use anyhow::{bail, Result};
use numpy::ndarray::{ArrayD, ArrayViewD, ArrayViewMutD};
use numpy::{IntoPyArray, PyArray1, PyArrayDyn, PyArrayLike1, PyReadonlyArrayDyn};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, types::PyModule, PyResult};
use cfg_if::cfg_if;
use rayon::prelude::*;
cfg_if! {
if #[cfg(feature = "python-bindings")] {
use numpy::ndarray::{ArrayD, ArrayViewD, ArrayViewMutD};
use numpy::{IntoPyArray, PyArray1, PyArrayDyn, PyArrayLike1, PyReadonlyArrayDyn};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, types::PyModule, PyResult};
} else {} }
pub trait Filter: Send {
//! The filter trait is implemented by Biquad, SeriesBiquad, and BiquadBank
@ -33,7 +39,7 @@ impl Clone for Box<dyn Filter> {
}
/// # A biquad is a second order recursive filter structure.
#[cfg_attr(feature = "extension-module", pyclass)]
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Clone, Copy, Debug)]
pub struct Biquad {
// State parameters
@ -48,14 +54,14 @@ pub struct Biquad {
a1: Flt,
a2: Flt,
}
#[cfg(feature = "extension-module")]
#[cfg_attr(feature = "extension-module", pymethods)]
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl Biquad {
#[new]
/// Create new biquad filter. See [Biquad::new()]
///
pub fn new_py<'py>(coefs: PyReadonlyArrayDyn<Flt>) -> PyResult<Self> {
Ok(Biquad::new(&coefs.as_slice()?)?)
Ok(Biquad::new(coefs.as_slice()?)?)
}
#[pyo3(name = "unit")]
#[staticmethod]
@ -140,12 +146,12 @@ impl Biquad {
Ok(Biquad::new(&coefs).unwrap())
}
fn filter_inout(&mut self, inout: &mut [Flt]) {
for sample in 0..inout.len() {
let w0 = inout[sample] - self.a1 * self.w1 - self.a2 * self.w2;
for sample in inout.iter_mut() {
let w0 = *sample - self.a1 * self.w1 - self.a2 * self.w2;
let yn = self.b0 * w0 + self.b1 * self.w1 + self.b2 * self.w2;
self.w2 = self.w1;
self.w1 = w0;
inout[sample] = yn;
*sample = yn;
}
// println!("{:?}", inout);
}
@ -162,7 +168,7 @@ impl Filter for Biquad {
self.w2 = 0.;
}
fn clone_dyn(&self) -> Box<dyn Filter> {
Box::new(self.clone())
Box::new(*self)
}
}
@ -170,22 +176,22 @@ impl Filter for Biquad {
///
/// # Examples
///
/// See [tests]
/// See (tests)
/// ```
#[derive(Clone, Debug)]
#[cfg_attr(feature = "extension-module", pyclass)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct SeriesBiquad {
biqs: Vec<Biquad>,
}
#[cfg(feature = "extension-module")]
#[cfg_attr(feature = "extension-module", pymethods)]
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl SeriesBiquad {
#[new]
/// Create new series filter set. See [SeriesBiquad::new()]
///
pub fn new_py<'py>(coefs: PyReadonlyArrayDyn<Flt>) -> PyResult<Self> {
Ok(SeriesBiquad::new(&coefs.as_slice()?)?)
Ok(SeriesBiquad::new(coefs.as_slice()?)?)
}
#[pyo3(name = "unit")]
#[staticmethod]
@ -232,7 +238,7 @@ impl SeriesBiquad {
biqs.push(biq);
}
if biqs.len() == 0 {
if biqs.is_empty() {
bail!("No filter coefficients given!");
}
@ -268,7 +274,7 @@ impl Filter for SeriesBiquad {
}
}
#[cfg_attr(feature = "extension-module", pyclass)]
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Clone)]
/// Multiple biquad filter that operate in parallel on a signal, and can apply a gain value to each
/// of the returned values. The BiquadBank can be used to decompose a signal by running it through
@ -305,8 +311,8 @@ pub struct BiquadBank {
gains: Vec<Flt>,
}
#[cfg(feature = "extension-module")]
#[cfg_attr(feature = "extension-module", pymethods)]
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
/// Methods to wrap it in Python
impl BiquadBank {
#[new]
@ -344,7 +350,7 @@ impl BiquadBank {
#[pyo3(name = "set_gains")]
/// See: [BiquadBank::set_gains()]
pub fn set_gains_py<'py>(&mut self, py: Python<'py>, gains: PyArrayLike1<Flt>) -> PyResult<()> {
pub fn set_gains_py<'py>(&mut self, gains: PyArrayLike1<Flt>) -> PyResult<()> {
if gains.len() != self.len() {
return Err(PyValueError::new_err("Invalid number of provided gains"));
}
@ -353,13 +359,12 @@ impl BiquadBank {
}
#[pyo3(name = "set_gains_dB")]
/// See: [BiquadBank::set_gains_dB()]
pub fn set_gains_dB_py<'py>(&mut self, py: Python<'py>, gains_dB: PyArrayLike1<Flt>) -> PyResult<()> {
pub fn set_gains_dB_py<'py>(&mut self, gains_dB: PyArrayLike1<Flt>) -> PyResult<()> {
if gains_dB.len() != self.len() {
return Err(PyValueError::new_err("Invalid number of provided gains"));
}
self.set_gains_dB(gains_dB.as_slice()?);
Ok(())
}
#[pyo3(name = "len")]
/// See: [BiquadBank::len()]

View File

@ -3,30 +3,42 @@
//! This crate contains structures and functions to perform acoustic measurements, interact with
//! data acquisition devices and apply common acoustic analysis operations on them.
#![warn(missing_docs)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(unused_imports)]
#![warn(missing_docs)]
#![allow(non_snake_case)]
mod config;
pub mod filter;
#![allow(non_upper_case_globals)]
#![allow(unused_imports)]
extern crate pyo3;
#[cfg(feature = "extension-module")]
use pyo3::prelude::*;
mod config;
pub mod filter;
/// A Python module implemented in Rust.
#[cfg(feature = "extension-module")]
#[pymodule]
#[pyo3(name="_lasprs")]
fn lasprs(py: Python, m: &PyModule) -> PyResult<()> {
// pub mod window;
// pub mod ps;
pub mod daq;
pub mod siggen;
pyo3_add_submodule_filter(py, &m)?;
Ok(())
}
pub use config::*;
pub use daq::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::prelude::*;
use pyo3::{pymodule, PyResult};
} else {} }
/// A Python module implemented in Rust.
#[cfg(feature = "python-bindings")]
#[pymodule]
#[pyo3(name="_lasprs")]
fn lasprs(py: Python, m: &PyModule) -> PyResult<()> {
pyo3_add_submodule_filter(py, m)?;
Ok(())
}
/// Add filter submodule to extension
#[cfg(feature = "extension-module")]
#[cfg(feature = "python-bindings")]
fn pyo3_add_submodule_filter(py: Python, m: &PyModule) -> PyResult<()> {
// Add filter submodule
let filter_module = PyModule::new(py, "filter")?;

434
src/siggen.rs Normal file
View File

@ -0,0 +1,434 @@
//! This module provide signal generators.
//!
//! # Examples
//!
//! ## Create some white noise and print it.
//!
//! ```
//! use lasprs::siggen::Siggen;
//! let mut wn = Siggen::newWhiteNoise(1);
//! // Set gains for all channels
//! wn.setAllGains(0.1);
//! wn.setAllMute(false);
//! let mut sig = [0. ; 1024];
//! wn.genSignal(&mut sig);
//! println!("{:?}", &sig);
//!
//! ```
use super::config::*;
use super::filter::Filter;
use dasp_sample::{FromSample, Sample};
use rayon::prelude::*;
use std::fmt::Debug;
use std::iter::ExactSizeIterator;
use std::slice::IterMut;
#[cfg(feature = "python-bindings")]
use pyo3::prelude::*;
use rand::prelude::*;
use rand::rngs::ThreadRng;
use rand_distr::StandardNormal;
const twopi: Flt = 2. * pi;
/// Source for the signal generator. Implementations are sine waves, sweeps, noise.
pub trait Source: Send {
/// Generate the 'pure' source signal. Output is placed inside the `sig` argument.
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>);
/// Reset the source state, i.e. set phase to 0, etc
fn reset(&mut self, fs: Flt);
/// Used to make the Siggen struct cloneable
fn clone_dyn(&self) -> Box<dyn Source>;
}
impl Clone for Box<dyn Source> {
fn clone(&self) -> Self {
self.clone_dyn()
}
}
#[derive(Clone)]
struct Silence {}
impl Source for Silence {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
sig.for_each(|s| *s = 0.0);
}
fn reset(&mut self, _fs: Flt) {}
fn clone_dyn(&self) -> Box<dyn Source> {
Box::new(self.clone())
}
}
/// White noise source
#[derive(Clone)]
struct WhiteNoise {}
impl WhiteNoise {
/// Generate new WhiteNoise generator
fn new() -> WhiteNoise {
WhiteNoise {}
}
}
impl Source for WhiteNoise {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
sig.for_each(|s| *s = thread_rng().sample(StandardNormal));
}
fn reset(&mut self, _fs: Flt) {}
fn clone_dyn(&self) -> Box<dyn Source> {
Box::new(self.clone())
}
}
/// Sine wave, with configurable frequency
#[derive(Clone)]
struct Sine {
// Sampling freq [Hz]
fs: Flt,
// current stored phase
phase: Flt,
// Signal frequency [rad/s]
omg: Flt,
}
impl Sine {
/// Create new sine source signal
///
/// Args:
///
/// * fs: Sampling freq [Hz]
/// *
fn new(freq: Flt) -> Sine {
Sine {
fs: -1.,
phase: 0.,
omg: 2. * pi * freq,
}
}
}
impl Source for Sine {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
if self.fs <= 0. {
sig.for_each(|s| {
*s = 0.;
});
return;
}
sig.for_each(|s| {
*s = Flt::sin(self.phase);
self.phase += self.omg / self.fs;
self.phase %= twopi;
});
}
fn reset(&mut self, fs: Flt) {
self.fs = fs;
self.phase = 0.;
}
fn clone_dyn(&self) -> Box<dyn Source> {
Box::new(self.clone())
}
}
/// Signal generator. Able to create acoustic output signals. See above example on how to use.
/// Typical signal that can be created are:
///
/// * (Siggen::newWhiteNoise)
/// * (Siggen::newSine)
///
#[derive(Clone)]
pub struct Siggen {
// The source dynamic signal. Noise, a sine wave, sweep, etc
source: Box<dyn Source>,
// Filter applied to the source signal
channels: Vec<SiggenChannelConfig>,
// Temporary source signal buffer
source_buf: Vec<Flt>,
// Output buffers (for filtered source signal)
chout_buf: Vec<Vec<Flt>>,
}
/// Multiple channel signal generator. Can use a single source (coherent) to provide multiple signals
/// that can be sent out through different EQ's
/// A struct that implements the Siggen trait is able to generate a signal.
impl Siggen {
/// Returns the number of channels this signal generator is generating for.
pub fn nchannels(&self) -> usize {
self.channels.len()
}
/// Silence: create a signal generator that does not output any dynamic
/// signal at all.
pub fn newSilence(nchannels: usize) -> Siggen {
Siggen {
channels: vec![SiggenChannelConfig::new(); nchannels],
source: Box::new(Silence {}),
source_buf: vec![],
chout_buf: vec![],
}
}
/// Create a white noise signal generator.
pub fn newWhiteNoise(nchannels: usize) -> Siggen {
Siggen::new(nchannels, Box::new(WhiteNoise::new()))
}
/// Set gains of all channels in signal generator to the same value
///
/// # Args
///
/// * g: New gain value
pub fn setAllGains(&mut self, g: Flt) {
self.channels.iter_mut().for_each(|set| set.setGain(g))
}
/// Set the number of channels to generate a signal for. Truncates the
/// output in case the value before calling this method is too little.
/// Appends new channel configs in case to little is available.
///
/// * nch: The new required number of channels
pub fn setNChannels(&mut self, nch: usize) {
self.channels.truncate(nch);
while self.channels.len() < nch {
self.channels.push(SiggenChannelConfig::new());
}
}
/// Set the DC offset for all channels
pub fn setDCOffset(&mut self,dc: &[Flt]) {
self.channels.iter_mut().zip(dc).for_each(
|(ch, dc)| {ch.DCOffset = *dc;});
}
/// Create a sine wave signal generator
///
/// * freq: Frequency of the sine wave in \[Hz\]
pub fn newSineWave(nchannels: usize, freq: Flt) -> Siggen {
Siggen::new(nchannels, Box::new(Sine::new(freq)))
}
/// Create a new signal generator wiht an arbitrary source.
pub fn new(nchannels: usize, source: Box<dyn Source>) -> Siggen {
Siggen {
source,
channels: vec![SiggenChannelConfig::new(); nchannels],
source_buf: vec![],
chout_buf: vec![],
}
}
/// Creates *interleaved* output signal
pub fn genSignal<T>(&mut self, out: &mut [T])
where
T: Sample + FromSample<Flt> + Debug,
Flt: Sample,
{
let nch = self.nchannels();
let nsamples: usize = out.len() / nch;
assert!(out.len() % self.nchannels() == 0);
// Create source signal
self.source_buf.resize(nsamples, 0.);
self.source
.genSignal_unscaled(&mut self.source_buf.iter_mut());
// println!("Source signal: {:?}", self.source_buf);
// Write output while casted to the correct type
// Iterate over each channel, and counter
self.chout_buf.resize(nch, vec![]);
for (channelno, (channel, chout)) in self
.channels
.iter_mut()
.zip(self.chout_buf.iter_mut())
.enumerate()
{
chout.resize(nsamples, 0.);
// Create output signal, overwrite chout
channel.genSignal(&self.source_buf, chout);
// println!("Channel: {}, {:?}", channelno, chout);
let out_iterator = out.iter_mut().skip(channelno).step_by(nch);
out_iterator
.zip(chout)
.for_each(|(out, chin)| *out = chin.to_sample());
}
// println!("{:?}", out);
}
/// Reset signal generator. Applies any kind of cleanup necessary.
///
/// Args
///
/// * fs: (New) Sampling frequency \[Hz\]
///
pub fn reset(&mut self, fs: Flt) {
self.source.reset(fs);
self.channels.iter_mut().for_each(|x| x.reset(fs))
}
/// Mute / unmute all channels at once
pub fn setAllMute(&mut self, mute: bool) {
self.channels.iter_mut().for_each(|s| {
s.setMute(mute);
});
}
/// Mute / unmute individual channels. Array of bools should have same size
/// as number of channels in signal generator.
pub fn setMute(&mut self, mute: &[bool]) {
assert!(mute.len() == self.nchannels());
self.channels.iter_mut().zip(mute).for_each(|(s, m)| {
s.setMute(*m);
});
}
}
/// Signal generator config for a certain channel
#[derive(Clone)]
struct SiggenChannelConfig {
muted: bool,
prefilter: Option<Box<dyn Filter>>,
gain: Flt,
DCOffset: Flt,
}
unsafe impl Send for SiggenChannelConfig {}
impl SiggenChannelConfig {
/// Set new pre-filter that filters the source signal
pub fn setPreFilter(&mut self, pref: Option<Box<dyn Filter>>) {
self.prefilter = pref;
}
/// Set the gain applied to the source signal
///
/// * g: Gain value. Can be any float. If set to 0.0, the source is effectively muted. Only
/// using (setMute) is a more efficient way to do this.
pub fn setGain(&mut self, g: Flt) {
self.gain = g;
}
/// Reset signal channel config. Only resets the prefilter state
pub fn reset(&mut self, _fs: Flt) {
if let Some(f) = &mut self.prefilter {
f.reset()
}
}
/// Generate new channel configuration using 'arbitrary' initial config: muted false, gain 1.0, DC offset 0.
/// and no prefilter
pub fn new() -> SiggenChannelConfig {
SiggenChannelConfig {
muted: false,
prefilter: None,
gain: 1.0,
DCOffset: 0.,
}
}
/// Set mute on channel. If true, only DC signal offset is outputed from (SiggenChannelConfig::transform).
pub fn setMute(&mut self, mute: bool) {
self.muted = mute
}
/// Generate new signal data, given input source data.
///
/// # Args
///
/// source: Input source signal.
/// result: Reference of array of float values to be filled with signal data.
///
/// # Details
///
/// - When muted, the DC offset is still applied
/// - The order of the generation is:
/// - If a prefilter is installed, this pre-filter is applied to the source signal.
/// - Gain is applied.
/// - Offset is applied (thus, no gain is applied to the DC offset).
///
pub fn genSignal(&mut self, source: &[Flt], result: &mut [Flt]) {
if self.muted {
result.iter_mut().for_each(|x| {
*x = 0.0;
});
} else {
result.copy_from_slice(source);
if let Some(f) = &mut self.prefilter {
f.filter(result);
}
}
result.iter_mut().for_each(|x| {
// First apply gain, then offset
*x *= self.gain;
*x += self.DCOffset;
});
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_whitenoise() {
// This code is just to check syntax. We should really be listening to these outputs.
let mut t = [0.; 10];
Siggen::newWhiteNoise(1).genSignal(&mut t);
// println!("{:?}", &t);
}
#[test]
fn test_sine() {
// This code is just to check syntax. We should really be listening to
// these outputs.
const N: usize = 10000;
let mut s1 = [0.; N];
let mut s2 = [0.; N];
let mut siggen = Siggen::newSineWave(1, 1.);
siggen.reset(10.);
siggen.setAllMute(false);
siggen.genSignal(&mut s1);
siggen.genSignal(&mut s2);
let absdiff = s1.iter().zip(s2.iter()).map(|(s1, s2)| {Flt::abs(*s1-*s2)}).sum::<Flt>();
assert!(absdiff< 1e-10);
}
#[test]
fn test_sine2() {
// Test if channels are properly separated etc. Check if RMS is correct
// for amplitude = 1.0.
const fs: Flt = 10.;
// Number of samples per channel
const Nframes: usize = 10000;
const Nch: usize = 2;
let mut signal = [0.; Nch*Nframes];
let mut siggen = Siggen::newSineWave(Nch, 1.);
siggen.reset(fs);
siggen.setMute(&[false, true]);
// siggen.channels[0].DCOffset = 0.1;
// Split off in two terms, see if this works properly
siggen.genSignal(&mut signal[..Nframes/2]);
siggen.genSignal(&mut signal[Nframes/2..]);
// Mean square of the signal
let ms1 = signal.iter().step_by(2).map(|s1| {*s1 * *s1}).sum::<Flt>() / Nframes as Flt;
println!("ms1: {}",ms1);
let ms2 = signal.iter().skip(1).step_by(2).map(|s1| {*s1 * *s1}).sum::<Flt>() / Nframes as Flt;
assert!(Flt::abs(ms1 - 0.5) < 1e-12);
assert_eq!(ms2 , 0.);
}
// A small test to learn a bit about sample types and conversion. This
// is the thing we want.
#[test]
fn test_sample() {
assert_eq!(0.5f32.to_sample::<i8>(), 64);
assert_eq!(1.0f32.to_sample::<i8>(), 127);
assert_eq!(-(1.0f32.to_sample::<i8>()), -127);
assert_eq!(1.0f32.to_sample::<i16>(), i16::MAX);
}
}