From 87f8b05eea72b8c49d0055798cae995d6b8defd6 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F." Date: Wed, 20 Dec 2023 21:20:10 +0100 Subject: [PATCH] Intermediate commit. Still work to fix on the InstreamMsg etc. --- Cargo.toml | 37 +++- src/bin/lasp_devinfo.rs | 15 ++ src/bin/lasp_inputdefault.rs | 56 +++++ src/bin/lasp_recorddefault.rs | 45 ++++ src/bin/test_input.rs | 14 -- src/config.rs | 31 +-- src/daq/api/api_cpal.rs | 403 +++++++++++++++++++++++++++++++--- src/daq/api/api_pulse.rs | 0 src/daq/api/mod.rs | 24 +- src/daq/daqconfig.rs | 81 ++++++- src/daq/datatype.rs | 10 +- src/daq/deviceinfo.rs | 11 +- src/daq/mod.rs | 187 +++++++++++----- src/daq/record.rs | 252 +++++++++++++++++++++ src/daq/streamhandler.rs | 18 ++ src/daq/streammsg.rs | 98 ++++++--- src/filter.rs | 31 +-- src/lib.rs | 6 +- src/siggen.rs | 218 ++++++++++++------ 19 files changed, 1282 insertions(+), 255 deletions(-) create mode 100644 src/bin/lasp_devinfo.rs create mode 100644 src/bin/lasp_inputdefault.rs create mode 100644 src/bin/lasp_recorddefault.rs delete mode 100644 src/bin/test_input.rs create mode 100644 src/daq/api/api_pulse.rs create mode 100644 src/daq/record.rs create mode 100644 src/daq/streamhandler.rs diff --git a/Cargo.toml b/Cargo.toml index 5642d06..141dd15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,7 @@ readme = "README.md" repository = "https://code.ascee.nl/ascee/lasprs" license = "MIT OR Apache-2.0" keywords = ["dsp", "audio", "measurement", "acoustics", "filter"] -categories = [ - "multimedia::audio", - "science", - "mathematics"] +categories = ["multimedia::audio", "science", "mathematics"] [lib] name = "lasprs" crate-type = ["cdylib", "rlib"] @@ -31,15 +28,15 @@ num = "0.4.1" rayon = "1.8.0" # Python bindings -pyo3 = { version = "0.20", features=["anyhow", "extension-module"], optional=true } -numpy = { version = "0.20" } +pyo3 = { version = "0.20", optional = true, features = ["extension-module", "anyhow"]} +numpy = { version = "0.20", optional = true} # White noise etc rand = "0.8.5" rand_distr = "0.4.3" # Cross-platform audio lib -cpal = { version = "0.15.2", optional=true } +cpal = { version = "0.15.2", optional = true } # Nice enumerations strum = "0.25.0" @@ -62,12 +59,28 @@ toml = "0.8.8" # Initialize array for non-copy type array-init = "2.1.0" +# Types of a sample +dasp_sample = "0.11.0" + +# Required for recording and looking into measurements +hdf5-sys = { version = "0.8.1", features = ["static"], optional = true } +hdf5 = { version = "0.8.1", optional = true } + +# Useful iterator stuff +itertools = "0.12.0" + +# For getting timestamps. Only useful when recording. +chrono = {version = "0.4.31", optional = true} +# For getting UUIDs in recording +uuid = { version = "1.6.1", features = ["v4"] , optional = true} + [features] -default = ["f64", "cpal_api"] -# Use this for debugging extension -# default = ["f64", "extension-module", "pyo3/extension-module"] +default = ["f64", "cpal_api", "record"] +# Use this for debugging extensions +# default = ["f64", "python-bindings", "record", "cpal-api"] + cpal_api = ["dep:cpal"] -# default = ["f64", "cpal_api"] +record = ["dep:hdf5-sys", "dep:hdf5", "dep:chrono", "dep:uuid"] f64 = [] f32 = [] -extension-module = ["dep:pyo3", "pyo3/extension-module"] +python-bindings = ["dep:pyo3", "dep:numpy"] diff --git a/src/bin/lasp_devinfo.rs b/src/bin/lasp_devinfo.rs new file mode 100644 index 0000000..3f5f4ab --- /dev/null +++ b/src/bin/lasp_devinfo.rs @@ -0,0 +1,15 @@ +use anyhow::Result; +use lasprs::daq::StreamMgr; + +fn main() -> Result<()> { + let mut smgr = StreamMgr::new(); + + let devs = smgr.getDeviceInfo(); + for dev in devs { + println!("========="); + println!("{:?}", dev); + println!("-------------"); + } + + Ok(()) +} diff --git a/src/bin/lasp_inputdefault.rs b/src/bin/lasp_inputdefault.rs new file mode 100644 index 0000000..ff1dcba --- /dev/null +++ b/src/bin/lasp_inputdefault.rs @@ -0,0 +1,56 @@ +use anyhow::Result; +use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; +use lasprs::daq::{StreamHandler, StreamMgr, InStreamMsg}; +use std::io; +use std::{thread, time}; +// use + +fn spawn_stdin_channel() -> Receiver { + let (tx, rx) = unbounded(); + thread::spawn(move || loop { + let mut buffer = String::new(); + io::stdin().read_line(&mut buffer).unwrap(); + tx.send(buffer).unwrap(); + }); + rx +} +fn sleep(millis: u64) { + let duration = time::Duration::from_millis(millis); + thread::sleep(duration); +} +fn main() -> Result<()> { + let mut smgr = StreamMgr::new(); + + smgr.startDefaultInputStream()?; + let stdin_channel = spawn_stdin_channel(); + + let sh = StreamHandler::new(&mut smgr); + + 'infy: loop { + match stdin_channel.try_recv() { + Ok(_key) => break 'infy, + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => panic!("Channel disconnected"), + } + sleep(100); + match sh.rx.try_recv() { + Ok(msg) => { + // eprint!("Obtained message: {:?}", msg); + match msg { + InStreamMsg::StreamStarted(meta) => { + println!("Stream started: {:?}", meta); + }, + _ => { println!("Other msg...");} + } + } + Err(e) => match e { + TryRecvError::Disconnected => { + break 'infy; + } + TryRecvError::Empty => {} + }, + } + } + + Ok(()) +} diff --git a/src/bin/lasp_recorddefault.rs b/src/bin/lasp_recorddefault.rs new file mode 100644 index 0000000..531f31a --- /dev/null +++ b/src/bin/lasp_recorddefault.rs @@ -0,0 +1,45 @@ +use anyhow::Result; +#[cfg(feature="record")] +use lasprs::daq::{RecordSettings, RecordStatus, Recording, StreamMgr}; +use std::{thread, time::{self, Duration}}; +// use + +#[cfg(feature="record")] +fn main() -> Result<()> { + let mut smgr = StreamMgr::new(); + + let settings = RecordSettings { + filename: "test.h5".into(), + duration: Duration::from_secs(2), + }; + + smgr.startDefaultInputStream()?; + let mut r = Recording::new(settings, &mut smgr)?; + + println!("Starting to record..."); + loop { + match r.status() { + RecordStatus::Idle => println!("Idle"), + RecordStatus::Error(e) => { + println!("Record error: {}", e); + break; + + } + RecordStatus::Finished => { + println!("\nRecording finished."); + break; + } + RecordStatus::Recording(duration) => { + print!("\rRecording... {} ms", duration.as_millis()); + } + }; + sleep(10); + } + + Ok(()) +} + +fn sleep(millis: u64) { + let duration = time::Duration::from_millis(millis); + thread::sleep(duration); +} diff --git a/src/bin/test_input.rs b/src/bin/test_input.rs deleted file mode 100644 index be7d5bc..0000000 --- a/src/bin/test_input.rs +++ /dev/null @@ -1,14 +0,0 @@ -use lasprs::daq::StreamMgr; -use anyhow::Result; -use std::io; - -fn main() -> Result<()> { - - let mut smgr = StreamMgr::new(); - - smgr.startDefaultInputStream()?; - - let _ = io::stdin().read_line(&mut (String::new())); - - Ok(()) -} diff --git a/src/config.rs b/src/config.rs index 533e19b..46f5960 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,22 +1,26 @@ -// #![ -#[cfg(feature = "f32")] -pub type Flt = f32; -#[cfg(feature = "f32")] -pub const pi: Flt = std::f32::consts::PI; +//! Configuration of module. Here, we can choose to compile for 32-bits or 64-bit floating point values +//! as basic data storage and computation size. Default is f64. +//! -#[cfg(feature = "f64")] -pub type Flt = f64; -#[cfg(feature = "f64")] -pub const pi: Flt = std::f64::consts::PI; - -/// The maximum number of input channels allowed. Compile time constant to make some structs Copy. -pub const MAX_INPUT_CHANNELS: usize = 128; +cfg_if::cfg_if! { + if #[cfg(feature="f64")] { + pub type Flt = f64; + pub const pi: Flt = std::f64::consts::PI; + } + else if #[cfg(feature="f32")] { + pub type Flt = f32; + pub const pi: Flt = std::f32::consts::PI; + } + else { + std::compile_error!("feature should be f32 or f64"); + } +} use num::complex::*; /// Complex number floating point pub type Cflt = Complex; -use numpy::ndarray::{Array1, Array2}; +use ndarray::{Array1, Array2}; pub type Vd = Vec; pub type Vc = Vec; @@ -25,4 +29,3 @@ pub type Ccol = Array1; pub type Dmat = Array2; pub type Cmat = Array2; - diff --git a/src/daq/api/api_cpal.rs b/src/daq/api/api_cpal.rs index 7099423..e21fae2 100644 --- a/src/daq/api/api_cpal.rs +++ b/src/daq/api/api_cpal.rs @@ -1,58 +1,211 @@ +#![allow(dead_code)] use super::Stream; +use crate::config::{self, *}; +use crate::daq::daqconfig::{DaqChannel, DaqConfig}; use crate::daq::deviceinfo::DeviceInfo; -use crate::daq::streammsg::*; +use crate::daq::{self, streammsg::*, DataType}; +use crate::siggen::Siggen; use anyhow::{bail, Result}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{Device, Host, Sample, SampleFormat}; -use crossbeam::channel::Sender; +use cpal::{Device, Host, Sample, SampleFormat, SupportedBufferSize}; +use crossbeam::channel::{Receiver, Sender}; +use itertools::Itertools; use std::sync::Arc; +/// Convert datatype in CPAL sampleformat +impl From for cpal::SampleFormat { + fn from(dt: DataType) -> cpal::SampleFormat { + let sf = match dt { + DataType::F64 => SampleFormat::F64, + DataType::F32 => SampleFormat::F32, + DataType::I8 => SampleFormat::I8, + DataType::I16 => SampleFormat::I16, + DataType::I32 => SampleFormat::I32, + DataType::I64 => SampleFormat::I64, + }; + sf + } +} +impl From for DataType { + fn from(sf: cpal::SampleFormat) -> DataType { + let dt = match sf { + SampleFormat::F64 => DataType::F64, + SampleFormat::F32 => DataType::F32, + SampleFormat::I8 => DataType::I8, + SampleFormat::I16 => DataType::I16, + SampleFormat::I32 => DataType::I32, + SampleFormat::I64 => DataType::I64, + _ => panic!("Not implemented sample format: {}", sf), + }; + dt + } +} + /// Cpal api pub struct CpalApi { host: cpal::Host, } -impl Stream for cpal::Stream {} +pub struct CpalStream { + stream: cpal::Stream, + md: Option, + noutchannels: usize, +} +impl Stream for CpalStream { + fn metadata(&self) -> Option { + self.md.clone() + } + fn ninchannels(&self) -> usize { + if let Some(md) = &self.md { + return md.nchannels(); + } + 0 + } + fn noutchannels(&self) -> usize { + self.noutchannels + } +} impl CpalApi { pub fn new() -> CpalApi { + // for h in cpal::platform::available_hosts() { + // println!("h: {:?}", h); + // } CpalApi { host: cpal::default_host(), } } pub fn getDeviceInfo(&self) -> Result> { - let devs = vec![]; - for dev in self.host.devices()? { + let srs_1 = [ + 1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000, + ]; + let srs_2 = [11025, 22050, 44100, 88200]; + let mut srs_tot = Vec::from_iter(srs_1.iter().chain(srs_2.iter())); + srs_tot.sort(); + let srs_tot = Vec::from_iter(srs_tot.iter().copied().map(|i| *i as Flt)); + + // srs_tot.sort(); + + let mut devs = vec![]; + for dev in self.host.devices()? { + // println!("{:?}", dev.name()); + let mut iChannelCount = 0; + let mut oChannelCount = 0; + + let mut sample_rates = srs_tot.clone(); + let mut avFramesPerBlock = vec![256, 512, 1024, 2048, 8192]; + + let mut sample_formats = vec![]; + // Search for sample formats + if let Ok(icfg) = dev.supported_input_configs() { + for icfg in icfg { + let thissf = icfg.sample_format(); + if thissf.is_uint() { + continue; + } + sample_formats.push(icfg.sample_format()); + sample_rates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt); + sample_rates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt); + if let SupportedBufferSize::Range { min, max } = icfg.buffer_size() { + avFramesPerBlock.retain(|i| i >= min); + avFramesPerBlock.retain(|i| i <= max); + } + iChannelCount = icfg.channels() as u8; + // avFramesPerBlock.retain(|i| i >= icfg.buffer_size().) + } + } + if let Ok(ocfg) = dev.supported_input_configs() { + for ocfg in ocfg { + let thissf = ocfg.sample_format(); + if thissf.is_uint() { + continue; + } + sample_formats.push(thissf); + sample_rates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt); + sample_rates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt); + if let SupportedBufferSize::Range { min, max } = ocfg.buffer_size() { + avFramesPerBlock.retain(|i| i >= min); + avFramesPerBlock.retain(|i| i <= max); + } + oChannelCount = ocfg.channels() as u8; + } + } + sample_formats.dedup(); + if sample_formats.len() == 0 { + continue; + } + + let dtypes: Vec = + sample_formats.iter().dedup().map(|i| (*i).into()).collect(); + + let prefDataType = match dtypes.iter().position(|d| d == &DataType::F32) { + Some(idx) => dtypes[idx], + None => dtypes[dtypes.len() - 1], + }; + let prefSampleRate = *sample_rates.last().unwrap_or(&48000.); + devs.push(DeviceInfo { + api: super::StreamApiDescr::Cpal, + name: dev.name()?, + avDataTypes: dtypes, + prefDataType, + + avSampleRates: sample_rates, + prefSampleRate: prefSampleRate, + avFramesPerBlock, + prefFramesPerBlock: 2048, + + iChannelCount, + oChannelCount, + + hasInputIEPE: false, + hasInputACCouplingSwitch: false, + hasInputTrigger: false, + hasInternalOutputMonitor: false, + duplexModeForced: false, + physicalIOQty: daq::Qty::Number, + }) } + Ok(devs) } - fn build_input_stream( - sf: cpal::SampleFormat, - config: cpal::StreamConfig, - device: &cpal::Device, - sender: Sender, - ) -> Result { - - let sender_errcallback = sender.clone(); - + // Create the error function closure, that capture the send channel on which error messages from the stream are sent + fn create_errfcn(send_ch: Sender) -> impl FnMut(cpal::StreamError) { let errfn = move |err: cpal::StreamError| match err { - cpal::StreamError::DeviceNotAvailable => sender_errcallback + cpal::StreamError::DeviceNotAvailable => send_ch .send(RawStreamData::StreamError(StreamError::DeviceNotAvailable)) .unwrap(), - cpal::StreamError::BackendSpecific { err: _ } => sender_errcallback + cpal::StreamError::BackendSpecific { err: _ } => send_ch .send(RawStreamData::StreamError(StreamError::DriverError)) .unwrap(), }; + errfn + } + /// Create an input stream for a CPAL device. + /// + /// # Arguments + /// + /// * sf: Sample format + fn build_input_stream( + sf: cpal::SampleFormat, + config: &cpal::StreamConfig, + device: &cpal::Device, + sender: Sender, + en_inchannels: Vec, + framesPerBlock: u32, + ) -> Result { + let tot_inch = config.channels; + + let sender_err = sender.clone(); macro_rules! build_stream{ ($($cpaltype:pat, $rtype:ty);*) => { match sf { $( $cpaltype => device.build_input_stream( &config, - move |data, _: &_| InStreamCallback::<$rtype>(data, &sender), - errfn, + move |data, _: &_| InStreamCallback::<$rtype>(data, &sender, tot_inch, &en_inchannels, framesPerBlock), + CpalApi::create_errfcn(sender_err), None)? ),*, _ => bail!("Unsupported sample format '{}'", sf) @@ -67,7 +220,127 @@ impl CpalApi { ); Ok(stream) } - /// Start a default input stream + + /// Create CPAL specific configuration, from our specified daq config and device info + fn create_cpal_config( + st: StreamType, + devinfo: &DeviceInfo, + conf: &DaqConfig, + dev: &cpal::Device, + conf_iterator: T, + ) -> Result + where + T: Iterator, + { + let nchannels = match st { + StreamType::Input => devinfo.iChannelCount, + StreamType::Output => devinfo.oChannelCount, + _ => unreachable!(), + }; + for cpalconf in conf_iterator { + if cpalconf.sample_format() == conf.dtype.into() { + // Specified sample format is available + if cpalconf.channels() == nchannels as u16 { + let requested_sr = conf.sampleRate(devinfo); + if cpalconf.min_sample_rate().0 as Flt <= requested_sr + && cpalconf.max_sample_rate().0 as Flt >= requested_sr + { + // Sample rate falls within range. + let requested_fpb = conf.framesPerBlock(devinfo); + // Last check: check if buffer size is allowed + match cpalconf.buffer_size() { + SupportedBufferSize::Range { min, max } => { + if min >= &requested_fpb || max <= &requested_fpb { + bail!( + "Frames per block should be >= {} and <= {}. Requested {}.", + min, + max, + requested_fpb + ) + } + } + _ => {} + } + return Ok(cpalconf.with_sample_rate(cpal::SampleRate(requested_sr as u32))); + } + } + } + } + bail!("API error: specified DAQ configuration is not available for device") + } + + /// Start a stream for a device with a given configuration. + pub fn startStream( + &self, + stype: StreamType, + devinfo: &DeviceInfo, + conf: &DaqConfig, + sender: Sender, + ) -> Result> { + for cpaldev in self.host.devices()? { + // See if we can create a supported stream config. + let supported_config = match stype { + StreamType::Duplex => bail!("Duplex stream not supported for CPAL"), + StreamType::Input => CpalApi::create_cpal_config( + stype.clone(), + devinfo, + conf, + &cpaldev, + cpaldev.supported_input_configs()?, + ), + StreamType::Output => CpalApi::create_cpal_config( + stype.clone(), + devinfo, + conf, + &cpaldev, + cpaldev.supported_output_configs()?, + ), + }?; + let framesPerBlock = conf.framesPerBlock(devinfo); + + let sf = supported_config.sample_format(); + let config: cpal::StreamConfig = supported_config.config(); + + let (stream, metadata) = match stype { + StreamType::Input => { + let meta = StreamMetaData::new( + &conf.enabledInchannelConfig(), + conf.dtype, + supported_config.sample_rate().0 as Flt, + framesPerBlock, + )?; + + let stream = CpalApi::build_input_stream( + sf, + &config, + &cpaldev, + sender, + conf.enabledInchannelsList(), + framesPerBlock, + )?; + (stream, Some(meta)) + } + + StreamType::Output => bail!("Not implemented output stream"), + _ => unreachable!(""), + }; + + stream.play()?; + + let noutchannels = conf.numberEnabledOutChannels(); + return Ok(Box::new(CpalStream { + stream, + md: metadata, + noutchannels, + })); + } + bail!(format!( + "Error: requested device {} not found. Please make sure the device is available.", + devinfo.name + )) + } + + /// Start a default input stream for a device /// /// pub fn startDefaultInputStream( @@ -76,18 +349,45 @@ impl CpalApi { ) -> Result> { if let Some(device) = self.host.default_input_device() { if let Ok(config) = device.default_input_config() { + let framesPerBlock = 4096; let final_config = cpal::StreamConfig { channels: config.channels(), sample_rate: config.sample_rate(), - buffer_size: cpal::BufferSize::Fixed(4096), + buffer_size: cpal::BufferSize::Fixed(framesPerBlock), }; + let en_inchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize)); let sf = config.sample_format(); - let stream = CpalApi::build_input_stream(sf, final_config, &device, sender)?; + let stream = CpalApi::build_input_stream( + sf, + &final_config, + &device, + sender, + en_inchannels, + framesPerBlock, + )?; stream.play()?; - println!("Stream started with sample format {:?}", sf); - Ok(Box::new(stream)) + // Daq: default channel config + let daqchannels = Vec::from_iter((0..final_config.channels).map(|i| { + DaqChannel::defaultAudioInput(format!("Unnamed input channel {}", i)) + })); + + // Specify data tape + let dtype = DataType::from(sf); + + // Create stream metadata + let md = StreamMetaData::new( + &daqchannels, + dtype, + config.sample_rate().0 as Flt, + framesPerBlock, + )?; + Ok(Box::new(CpalStream { + stream, + md: Some(md), + noutchannels: 0, + })) } else { bail!("Could not obtain default input configuration") } @@ -95,15 +395,62 @@ impl CpalApi { bail!("Could not open default input device") } } - // pub fn getDeviceInfo(&self) -> Result> { - + // Create an output stream, using given signal generators for each channel. + // fn build_output_stream( + // sf: cpal::SampleFormat, + // config: cpal::StreamConfig, + // device: &cpal::Device, + // siggens: Vec, + // ) -> Result { + // macro_rules! build_stream{ + // ($($cpaltype:pat, $rtype:ty);*) => { + // match sf { + // $( + // $cpaltype => device.build_input_stream( + // &config, + // move |data, _: &_| InStreamCallback::<$rtype>(data, &sender), + // CpalApi::create_errfcn(sender.clone()), + // None)? + // ),*, + // _ => bail!("Unsupported sample format '{}'", sf) + // } + // } + // } + // let stream: cpal::Stream = build_stream!( + // SampleFormat::I8, i8; + // SampleFormat::I16, i16; + // SampleFormat::I32, i32; + // SampleFormat::F32, f32 + // ); + // Ok(stream) // } } -fn InStreamCallback(input: &[T], sender: &Sender) -where +fn InStreamCallback( + input: &[T], + sender: &Sender, + tot_inch: u16, + en_inchannels: &[usize], + framesPerBlock: u32, +) where T: Copy + num::ToPrimitive + 'static, { let msg = RawStreamData::from(input); + let nen_ch = en_inchannels.len(); + let nframes = input.len() / tot_inch as usize; + let mut enabled_ch_data = Vec::with_capacity(nen_ch * nframes); + unsafe { + enabled_ch_data.set_len(enabled_ch_data.capacity()); + } + // Chops of the disabled channels and forwards the data, DEINTERLEAVED + for (chout_idx, chout) in en_inchannels.iter().enumerate() { + let in_iterator = input.iter().skip(*chout).step_by(tot_inch as usize); + let out_iterator = enabled_ch_data.iter_mut().skip(chout_idx * nframes); + for (out, in_) in out_iterator.zip(in_iterator) { + *out = *in_; + } + } + + let msg = RawStreamData::from(enabled_ch_data); sender.send(msg).unwrap() } diff --git a/src/daq/api/api_pulse.rs b/src/daq/api/api_pulse.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/daq/api/mod.rs b/src/daq/api/mod.rs index d2cddd5..a1dbc28 100644 --- a/src/daq/api/mod.rs +++ b/src/daq/api/mod.rs @@ -1,19 +1,29 @@ +use serde::{Deserialize, Serialize}; /// Daq apis that are optionally compiled in. Examples: /// /// - CPAL (Cross-Platform Audio Library) /// - ... use strum::EnumMessage; use strum_macros; -use serde::{Serialize, Deserialize}; -cfg_if::cfg_if! { - if #[cfg(feature="cpal_api")] { - pub mod api_cpal; - } else { } -} +use super::StreamMetaData; +#[cfg(feature = "cpal_api")] +pub mod api_cpal; + +#[cfg(feature = "pulse_api")] +pub mod api_pulse; /// A currently running stream -pub trait Stream { } +pub trait Stream { + /// Stream metadata. Only available for input streams + fn metadata(&self) -> Option; + + /// Number of input channels in stream + fn ninchannels(&self) -> usize; + + /// Number of output channels in stream + fn noutchannels(&self) -> usize; +} #[derive(strum_macros::EnumMessage, Debug, Clone, PartialEq, Serialize, Deserialize)] #[allow(dead_code)] diff --git a/src/daq/daqconfig.rs b/src/daq/daqconfig.rs index 3f4e948..bcc274f 100644 --- a/src/daq/daqconfig.rs +++ b/src/daq/daqconfig.rs @@ -1,5 +1,6 @@ use super::api::StreamApiDescr; use super::datatype::DataType; +use super::deviceinfo::DeviceInfo; use super::qty::Qty; use crate::config::*; use serde::{Deserialize, Serialize}; @@ -35,18 +36,96 @@ impl Default for DaqChannel { } } } +impl DaqChannel { + /// Default channel configuration for audio input from a certain channel + pub fn defaultAudioInput(name: String) -> Self { + DaqChannel { + enabled: true, + name: name, + sensitivity: 1.0, + IEPEEnabled: false, + ACCouplingMode: false, + rangeIndex: 0, + qty: Qty::Number, + } + } +} /// Configuration of a device. #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct DaqConfig { + /// The API pub api: StreamApiDescr, + /// Device name. Should match when starting a stream pub device_name: String, + /// Configuration of the input channels pub inchannel_config: Vec, + /// Configuration of the output channels pub outchannel_config: Vec, + /// The data type to use pub dtype: DataType, + /// Whether to apply a digital high pass on the input. <=0 means disabled. > 0 means, the value specifies the cut-on frequency for the first order high pass filter. pub digitalHighPassCutOn: Flt, + /// The index to use in the list of possible sample rates sampleRateIndex: usize, - framesPerBlocIndex: usize, + /// The index to use in the list of possible frames per block + framesPerBlockIndex: usize, + /// Used when output channels should be monitored, i.e. reverse-looped back as input channels. monitorOutput: bool, } + +impl DaqConfig { + /// Returns a list of enabled input channel numbers as indices + /// in the list of all input channels (enabled and not) + pub fn enabledInchannelsList(&self) -> Vec { + self.inchannel_config + .iter() + .enumerate() + .filter(|(_, ch)| ch.enabled) + .map(|(i, _)| i) + .collect() + } + + /// Returns the total number of channels that appear in a running input stream. + pub fn numberEnabledInChannels(&self) -> usize { + self.inchannel_config + .iter() + .filter(|ch| ch.enabled) + .count() + } + /// Returns the total number of channels that appear in a running output stream. + pub fn numberEnabledOutChannels(&self) -> usize { + self.outchannel_config + .iter() + .filter(|ch| ch.enabled) + .count() + } + + /// Provide samplerate, based on device and specified sample rate index + pub fn sampleRate(&self, dev: &DeviceInfo) -> Flt { + dev.avSampleRates[self.sampleRateIndex] + } + + /// Provide samplerate, based on device and specified sample rate index + pub fn framesPerBlock(&self, dev: &DeviceInfo) -> u32 { + dev.avFramesPerBlock[self.framesPerBlockIndex] + } + + /// Returns vec of channel configuration for enabled input channels only + pub fn enabledInchannelConfig(&self) -> Vec { + self.inchannel_config + .iter() + .filter(|ch| ch.enabled) + .cloned() + .collect() + } + /// Returns vec of channel configuration for enabled output channels only + pub fn enabledOutchannelConfig(&self) -> Vec { + self.outchannel_config + .iter() + .filter(|ch| ch.enabled) + .cloned() + .collect() + } +} diff --git a/src/daq/datatype.rs b/src/daq/datatype.rs index bd8fe7a..9fc3804 100644 --- a/src/daq/datatype.rs +++ b/src/daq/datatype.rs @@ -15,12 +15,16 @@ pub enum DataType { #[strum(message = "F64", detailed_message = "64-bits floating points")] F64 = 1, /// 8-bit integers - #[strum(message = "F8", detailed_message = "8-bits integers")] + #[strum(message = "I8", detailed_message = "8-bits integers")] I8 = 2, /// 16-bit integers - #[strum(message = "F16", detailed_message = "16-bits integers")] + #[strum(message = "I16", detailed_message = "16-bits integers")] I16 = 3, /// 32-bit integers - #[strum(message = "F32", detailed_message = "32-bits integers")] + #[strum(message = "I32", detailed_message = "32-bits integers")] I32 = 4, + + /// 64-bit integers + #[strum(message = "I64", detailed_message = "64-bits integers")] + I64 = 5, } diff --git a/src/daq/deviceinfo.rs b/src/daq/deviceinfo.rs index a8e86e1..22af511 100644 --- a/src/daq/deviceinfo.rs +++ b/src/daq/deviceinfo.rs @@ -1,8 +1,7 @@ //! Data acquisition model. Provides abstract layers around DAQ devices. #![allow(non_snake_case)] -use super::datatype::DataType; -use super::qty::Qty; +use super::*; use super::api::StreamApiDescr; /// Device info structure. Gives all information regarding a device, i.e. the number of input and @@ -23,14 +22,14 @@ pub struct DeviceInfo { pub prefDataType: DataType, /// Available frames per block - pub avFramesPerBlock: Vec, + pub avFramesPerBlock: Vec, /// Preferred frames per block for device - pub prefFramesPerBlock: u16, + pub prefFramesPerBlock: usize, /// Available sample rates - pub avSampleRates: Vec, + pub avSampleRates: Vec, /// Preferred sample rate for device - pub prefSampleRate: u16, + pub prefSampleRate: Flt, /// Number of input channels available for this device pub iChannelCount: u8, diff --git a/src/daq/mod.rs b/src/daq/mod.rs index fbcdaf3..7c23697 100644 --- a/src/daq/mod.rs +++ b/src/daq/mod.rs @@ -5,22 +5,33 @@ mod daqconfig; mod datatype; mod deviceinfo; mod qty; +#[cfg(feature = "record")] +mod record; +mod streamhandler; mod streammsg; +pub use daqconfig::*; pub use datatype::*; pub use deviceinfo::*; pub use qty::*; +pub use streamhandler::*; pub use streammsg::*; +#[cfg(feature = "record")] +pub use record::*; + #[cfg(feature = "cpal_api")] use api::api_cpal::CpalApi; -use crate::config::*; +use crate::{ + config::*, + siggen::{self, Siggen}, +}; use anyhow::{bail, Error, Result}; use api::Stream; use core::time; use crossbeam::{ - channel::{unbounded, Receiver, Sender}, + channel::{unbounded, Receiver, Sender, TrySendError}, thread, }; use deviceinfo::DeviceInfo; @@ -31,10 +42,10 @@ use streammsg::*; /// Keep track of whether the stream has been created. To ensure singleton behaviour. static smgr_created: AtomicBool = AtomicBool::new(false); -struct InputStream { +struct StreamData { streamtype: StreamType, stream: Box, - threadhandle: JoinHandle, + threadhandle: JoinHandle, comm: Sender, } @@ -42,13 +53,10 @@ struct InputStream { /// pub struct StreamMgr { // Input stream can be both input and duplex - input_stream: Option, + input_stream: Option>, // Output only stream - output_stream: Option>, - - // Signal generator - siggen: Option, + output_stream: Option>, #[cfg(feature = "cpal_api")] cpal_api: CpalApi, @@ -57,6 +65,12 @@ pub struct StreamMgr { /// are here. When stream is running, they will become available /// in the JoinHandle of the thread. instreamqueues: Option, + + // Signal generator. Stored here on the bench in case no stream is running. + // It is picked when it is configured correctly for the starting output stream + // If it is not configured correctly, when a stream that outputs data is started + // ,it is removed here. + siggen: Option, } impl StreamMgr { @@ -82,41 +96,67 @@ impl StreamMgr { instreamqueues: Some(vec![]), } } - /// Obtain a list of devices that are available for each available API - fn getDeviceInfo(&mut self) -> Vec { - let mut devinfo = vec![]; - #[cfg(feature="cpal_api")] - devinfo.extend(self.cpal_api.getDeviceInfo()); - devinfo + /// Set a new signal generator. Returns an error if it is unapplicable. + /// It is unapplicable if the number of channels of output does not match the + /// number of output channels in a running stream. + pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> { + // Current signal generator. Where to place it? + if let Some(is) = &self.input_stream { + if let StreamType::Duplex = is.streamtype { + if siggen.nchannels() != is.stream.noutchannels() { + bail!("Invalid number of channels configured in signal generator") + } + assert!(self.siggen.is_none()); + is.comm.send(StreamCommand::NewSiggen(siggen)).unwrap(); + return Ok(()); + } + } else if let Some(os) = &self.output_stream { + assert!(self.siggen.is_none()); + if siggen.nchannels() != os.stream.noutchannels() { + bail!("Invalid number of channels configured in signal generator") + } + os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap(); + return Ok(()); + } else { + self.siggen = Some(siggen); + return Ok(()); + } + unreachable!() } - /// Start a default input stream, using default settings on everything. This is only possible - /// when - pub fn startDefaultInputStream(&mut self) -> Result<()> { - #![allow(unreachable_code)] - if !self.input_stream.is_none() { - bail!("Input stream is already running. Please first stop existing input stream.") - } - - let (tx, rx): (Sender, Receiver) = unbounded(); - - cfg_if::cfg_if! { - if #[cfg(feature="cpal_api")] { - let stream = self.cpal_api.startDefaultInputStream(tx)?; - } - else { - bail!("Unable to start default input stream: no CPAL api available") + /// Obtain a list of devices that are available for each available API + pub fn getDeviceInfo(&mut self) -> Vec { + let mut devinfo = vec![]; + #[cfg(feature = "cpal_api")] + { + let cpal_devs = self.cpal_api.getDeviceInfo(); + if let Ok(devs) = cpal_devs { + devinfo.extend(devs); } } + devinfo + } + /// Add a new queue to the lists of queues + pub fn addInQueue(&mut self, tx: Sender) { + if let Some(is) = &self.input_stream { + is.comm.send(StreamCommand::AddInQueue(tx)).unwrap() + } else { + self.instreamqueues.as_mut().unwrap().push(tx); + } + } + + fn startInputStreamThread( + &mut self, + stream: &Box, + rx: Receiver, + ) -> (JoinHandle, Sender) { + let (commtx, commrx) = unbounded(); // Unwrap here, as the queues should be free to grab let mut iqueues = self.instreamqueues.take().unwrap(); - let (commtx, commrx) = unbounded(); + let meta = stream.metadata().unwrap(); - // let metadata = StreamMetaData::new( - // nchannels: - // ).unwrap(); let threadhandle = std::thread::spawn(move || { let mut ctr: usize = 0; 'infy: loop { @@ -124,20 +164,24 @@ impl StreamMgr { match comm_msg { // New queue added StreamCommand::AddInQueue(queue) => { - iqueues.push(queue); - // queue.send(streammsg::StreamMetaData(md)) + match queue.send(InStreamMsg::StreamStarted(Arc::new(meta.clone()))) { + Ok(()) => iqueues.push(queue), + Err(_) => {} + } } // Remove queue from list StreamCommand::RemoveInQueue(queue) => { - iqueues.retain(|q| !Arc::ptr_eq(q, &queue)) + iqueues.retain(|q| !q.same_channel(&queue)) } // Stop this thread. Returns the queue StreamCommand::StopThread => { - for q in iqueues.iter() { - q.send(InStreamMsg::StreamStopped).unwrap(); - } + sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped); + break 'infy; + } + StreamCommand::NewSiggen(_) => { + panic!("Error: signal generator send to input-only stream."); break 'infy; } } @@ -145,33 +189,55 @@ impl StreamMgr { if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) { // println!("Obtained raw stream data!"); let msg = Arc::new(msg); - for q in iqueues.iter() { - q.send(InStreamMsg::RawStreamData(ctr, msg.clone())) - .unwrap(); - } + let msg = InStreamMsg::RawStreamData(ctr, msg); + sendMsgToAllQueues(&mut iqueues, msg); } ctr += 1; } iqueues }); + (threadhandle, commtx) + } - cfg_if::cfg_if! { - if #[cfg(feature="cpal_api")] { - self.input_stream = Some(InputStream { - streamtype: StreamType::Input, - stream, - threadhandle, - comm: commtx, - }); - } else {} + /// Start a default input stream, using default settings on everything. This is only possible + /// when + pub fn startDefaultInputStream(&mut self) -> Result<()> { + if self.input_stream.is_some() { + bail!("Input stream is already running. Please first stop existing input stream.") } + let (tx, rx): (Sender, Receiver) = unbounded(); + + // Only a default input stream when CPAL feature is enabled + cfg_if::cfg_if! { + if #[cfg(feature="cpal_api")] { + let stream = self.cpal_api.startDefaultInputStream(tx)?; + // Inform all listeners of new stream data + + let iqueues = self.instreamqueues.as_mut().unwrap(); + let meta = stream.metadata().unwrap(); + sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(Arc::new(meta))); + + let (threadhandle, commtx) = self.startInputStreamThread(&stream, rx); + + self.input_stream = Some(StreamData { + streamtype: StreamType::Input, + stream, + threadhandle, + comm: commtx, + }); Ok(()) + + } + else { + bail!("Unable to start default input stream: no CPAL api available") + } + } } /// Stop existing input stream. pub fn stopInputStream(&mut self) -> Result<()> { - if let Some(InputStream { + if let Some(StreamData { streamtype: _, // Ignored here stream: _, threadhandle, @@ -200,7 +266,6 @@ impl StreamMgr { _ => bail!("Not implemented output stream"), } } - } // impl StreamMgr impl Drop for StreamMgr { fn drop(&mut self) { @@ -217,6 +282,16 @@ impl Drop for StreamMgr { } } +// Send to all queues, remove queues that are disconnected when found out +// on the way. +fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) { + // Loop over queues. Remove queues that error when we try to send + // to them + iqueues.retain(|q| match q.try_send(msg.clone()) { + Ok(_) => true, + Err(_e) => false, + }); +} /// Daq devices trait Daq {} diff --git a/src/daq/record.rs b/src/daq/record.rs new file mode 100644 index 0000000..d035597 --- /dev/null +++ b/src/daq/record.rs @@ -0,0 +1,252 @@ +use super::*; +use anyhow::{bail, Error, Result}; +use hdf5::types::{VarLenArray, VarLenUnicode}; +use hdf5::{dataset, datatype, Dataset, File, H5Type}; +use num::traits::ops::mul_add; +use serde::de::IntoDeserializer; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::sync::Mutex; +use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use strum::EnumMessage; + +#[derive(Clone)] +pub enum RecordStatus { + Idle, + Recording(Duration), + Finished, + Error(String), +} + +/// Settings used to start a recording. +pub struct RecordSettings { + /// File name to record to. + pub filename: PathBuf, + + /// The recording time. Set to 0 to perform indefinite recording + pub duration: Duration, +} + +/// Create a recording +pub struct Recording { + handle: Option>>, + tx: Sender, + status: Arc>, +} + +impl Recording { + fn create_dataset_type(file: &File, meta: &StreamMetaData) -> Result + where + T: H5Type, + { + let bs = meta.framesPerBlock as usize; + let nch = meta.nchannels(); + match file + .new_dataset::() + .chunk((1, bs, nch)) + .shape((1.., bs, nch)) + // .deflate(3) + .create("audio") + { + Ok(f) => Ok(f), + Err(e) => bail!("{}", e), + } + } + + fn create_dataset(file: &File, meta: &StreamMetaData) -> Result { + match meta.rawDatatype { + DataType::I8 => Recording::create_dataset_type::(file, meta), + DataType::I16 => Recording::create_dataset_type::(file, meta), + DataType::I32 => Recording::create_dataset_type::(file, meta), + DataType::I64 => Recording::create_dataset_type::(file, meta), + DataType::F32 => Recording::create_dataset_type::(file, meta), + DataType::F64 => Recording::create_dataset_type::(file, meta), + } + } + + fn write_hdf5_attr_scalar(file: &File, name: &str, val: T) -> Result<()> + where + T: H5Type, + { + let attr = file.new_attr::().create(name)?; + attr.write_scalar(&val)?; + Ok(()) + } + fn write_hdf5_attr_list(file: &File, name: &str, val: &[T]) -> Result<()> + where + T: H5Type, + { + let attr = file.new_attr::().shape([val.len()]).create(name)?; + attr.write(&val)?; + Ok(()) + } + + /// Start a new recording + /// + /// # Arguments + /// + /// * setttings: The settings to use for the recording + /// * smgr: Stream manager to use to start the recording + /// + pub fn new(settings: RecordSettings, mgr: &mut StreamMgr) -> Result { + let status = Arc::new(Mutex::new(RecordStatus::Idle)); + let status2 = status.clone(); + + let (tx, rx) = crossbeam::channel::unbounded(); + mgr.addInQueue(tx.clone()); + + // The thread doing the actual work + let handle = spawn(move || { + let file = File::create(settings.filename)?; + + let firstmsg = match rx.recv() { + Ok(msg) => msg, + Err(e) => bail!("Queue handle error"), + }; + + let meta = match firstmsg { + InStreamMsg::StreamStarted(meta) => meta, + _ => bail!("Recording failed. Missed stream metadata message."), + }; + + // Samplerate, block size, number of channels + Recording::write_hdf5_attr_scalar(&file, "samplerate", meta.samplerate)?; + Recording::write_hdf5_attr_scalar(&file, "nchannels", meta.nchannels())?; + Recording::write_hdf5_attr_scalar(&file, "blocksize", meta.framesPerBlock)?; + + // Store sensitivity + let sens: Vec = meta.channelInfo.iter().map(|ch| ch.sensitivity).collect(); + Recording::write_hdf5_attr_list(&file, "sensitivity", &sens)?; + + // Timestamp + use chrono::DateTime; + let now_utc = chrono::Utc::now(); + let timestamp = now_utc.timestamp(); + Recording::write_hdf5_attr_scalar(&file, "time", timestamp)?; + + + // Create UUID for measurement + use hdf5::types::VarLenUnicode; + let uuid = uuid::Uuid::new_v4(); + let uuid_unicode: VarLenUnicode = VarLenUnicode::from_str(&uuid.to_string()).unwrap(); + Recording::write_hdf5_attr_scalar(&file, "UUID", uuid_unicode)?; + + // Channel names + let chnames: Vec = meta + .channelInfo + .iter() + .map(|ch| VarLenUnicode::from_str(&ch.name).unwrap()) + .collect(); + let chname_attr = file + .new_attr::() + .shape([chnames.len()]) + .create("channelNames")?; + chname_attr.write(&chnames)?; + + // Create the dataset + let ds = Recording::create_dataset(&file, &meta)?; + + // Indicate we are ready to rec! + *status.lock().unwrap() = RecordStatus::Recording(Duration::ZERO); + + let mut ctr = 0; + let framesPerBlock = meta.framesPerBlock as usize; + let nchannels = meta.nchannels() as usize; + 'recloop: loop { + match rx.recv().unwrap() { + InStreamMsg::StreamError(e) => { + bail!("Recording failed due to stream error.") + } + InStreamMsg::ConvertedStreamData(..) => {} + InStreamMsg::StreamStarted(_) => { + bail!("Stream started again?") + } + InStreamMsg::StreamStopped => { + // Early stop. User stopped it. + break 'recloop; + } + InStreamMsg::RawStreamData(incoming_ctr, dat) => { + // if incoming_ctr != ctr { + // bail!("Packages missed. Recording invalid.") + // } + + let tst = ndarray::Array2::::ones((framesPerBlock, nchannels)); + + ds.resize((ctr + 1, framesPerBlock, nchannels))?; + ds.write_slice(&tst, (ctr, .., ..))?; + + // match dat { + // RawStreamData::Datai8(d) => ds. + + // } + let recorded_time = Duration::from_millis( + ((1000 * (ctr + 1) * framesPerBlock) as Flt / meta.samplerate) as u64, + ); + if !settings.duration.is_zero() { + if recorded_time >= settings.duration { + break 'recloop; + } + } + // println!("... {}", recorded_time.as_millis()); + ctr += 1; + *status.lock().unwrap() = RecordStatus::Recording(recorded_time); + } + } + } // end of loop + + *status.lock().unwrap() = RecordStatus::Finished; + Ok(()) + // End of thread + }); + + Ok(Recording { + handle: Some(handle), + status: status2, + tx, + }) + } + + fn cleanupThreadIfPossible(&mut self) { + // println!("CleanupIfPossible()"); + if let Some(h) = &self.handle { + if h.is_finished() { + // println!("Thread finished"); + let h = self.handle.take().unwrap(); + let res = h.join().unwrap(); + if let Err(e) = res { + *self.status.lock().unwrap() = RecordStatus::Error(format!("{}", e)); + } + } + } + } + + pub fn status(&mut self) -> RecordStatus { + self.cleanupThreadIfPossible(); + self.status.lock().unwrap().clone() + } + + /// Stop existing recording early. At the current time, or st + pub fn stop(&mut self) -> Result<()> { + if self.handle.is_none() { + bail!("Recording is already stopped.") + } + + // Stope stream, if running + self.tx.send(InStreamMsg::StreamStopped)?; + + let h = self.handle.take().unwrap(); + let res = h.join().unwrap(); + if let Err(e) = res { + *self.status.lock().unwrap() = RecordStatus::Error(format!("{}", e)); + } + + Ok(()) + } +} + +impl Drop for Recording { + fn drop(&mut self) { + let _ = self.stop(); + } +} diff --git a/src/daq/streamhandler.rs b/src/daq/streamhandler.rs new file mode 100644 index 0000000..98a3c68 --- /dev/null +++ b/src/daq/streamhandler.rs @@ -0,0 +1,18 @@ + +use crossbeam::channel::unbounded; +use super::*; + +/// A stream handler registers a queue in the stream manager, and keeps the other end to +/// get InStreamData from a running input stream. +pub struct StreamHandler { + /// The receiving part of the channel on which (InStreamData) is received.. + pub rx: Receiver +} +impl StreamHandler { + /// Create new stream handler. + pub fn new(smgr: &mut StreamMgr) -> StreamHandler{ + let (tx, rx) = unbounded(); + smgr.addInQueue(tx); + StreamHandler{rx} + } +} \ No newline at end of file diff --git a/src/daq/streammsg.rs b/src/daq/streammsg.rs index b6c0ae1..416a4fe 100644 --- a/src/daq/streammsg.rs +++ b/src/daq/streammsg.rs @@ -1,18 +1,18 @@ //! Provides stream messages that come from a running stream use crate::config::*; -use crate::daq::DataType; use crate::daq::Qty; +use crate::siggen::Siggen; use anyhow::{bail, Result}; use crossbeam::channel::Sender; -use reinterpret::reinterpret_slice; +use reinterpret::{reinterpret_slice, reinterpret_vec}; use std::any::TypeId; use std::sync::Arc; use std::u128::MAX; -use super::daqconfig::DaqChannel; +use super::*; /// Raw stream data coming from a stream. -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum RawStreamData { /// 8-bits integer Datai8(Arc>), @@ -70,53 +70,79 @@ where } } +// Create InStreamData object from +impl From> for RawStreamData +where + T: num::ToPrimitive + Clone + 'static, +{ + fn from(input: Vec) -> RawStreamData { + // Apparently, this code does not work with a match. I have searched around and have not found the + // reason for this. So this is a bit of stupid boilerplate. + let i8type: TypeId = TypeId::of::(); + let i16type: TypeId = TypeId::of::(); + let i32type: TypeId = TypeId::of::(); + let f32type: TypeId = TypeId::of::(); + let f64type: TypeId = TypeId::of::(); + let thetype: TypeId = TypeId::of::(); + if i8type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Datai8(Arc::new(v)) + } else if i16type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Datai16(Arc::new(v)) + } else if i16type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Datai16(Arc::new(v)) + } else if i32type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Datai32(Arc::new(v)) + } else if f32type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Dataf32(Arc::new(v)) + } else if f64type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Dataf64(Arc::new(v)) + } else { + panic!("Not implemented sample type!") + } + } +} + /// Stream metadata. All information required for +#[derive(Clone, Debug)] pub struct StreamMetaData { - /// The number of channels. Should be <= MAX_INPUT_CHANNELS - pub nchannels: usize, /// Information for each channel in the stream - pub channelInfo: [DaqChannel; MAX_INPUT_CHANNELS], + pub channelInfo: Vec, /// The data type of the device [Number / voltage] pub rawDatatype: DataType, /// Sample rate in [Hz] pub samplerate: Flt, + + /// The number of frames per block send over + pub framesPerBlock: u32, } impl StreamMetaData { - /// Create new metadata object. Throws an error if the number of channels for the sensitivity and quantities does - /// not match. - /// + /// Create new metadata object. + /// /// /// # Args /// - /// * nchannels: The number of channels that are send - /// * sens: Sensitivity values for each channel - /// * rawdtype: The data type of the raw stream data. For sound cards this is Number, for DAQ's, this might be a voltage. - /// * qtys_: The physical quantities for each channel - /// * sr: The sample rate in \[Hz\] - /// - /// # Panics - /// - /// If the number of channels > MAX_INPUT_CHANNELS - pub fn new(channel_data: &[DaqChannel], rawdtype: DataType, sr: Flt) -> Result { - if channel_data.len() > MAX_INPUT_CHANNELS { - bail!("Too many channels provided.") - } - let nchannels = channel_data.len(); - let channelInfo: [DaqChannel; MAX_INPUT_CHANNELS] = - array_init::array_init(|_i: usize| DaqChannel::default()); - + pub fn new(channelInfo: &[DaqChannel], rawdtype: DataType, sr: Flt, framesPerBlock: u32) -> Result { Ok(StreamMetaData { - nchannels, - channelInfo, + channelInfo: channelInfo.to_vec(), rawDatatype: rawdtype, samplerate: sr, + framesPerBlock, }) } + + /// Returns the number of channels in the stream metadata. + pub fn nchannels(&self) -> usize {self.channelInfo.len()} } /// Input stream messages, to be send to handlers. -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum InStreamMsg { /// Raw stream data that is coming from a device. This is interleaved data. The number of channels is correct and /// specified in the stream metadata. @@ -138,7 +164,7 @@ pub enum InStreamMsg { /// Store a queue in a shared pointer, to share sending /// and receiving part of the queue. -pub type SharedInQueue = Arc>; +pub type SharedInQueue = Sender; /// Vector of queues for stream messages pub type InQueues = Vec; @@ -149,12 +175,16 @@ pub enum StreamCommand { /// Remove a queue to a running stream RemoveInQueue(SharedInQueue), + /// New signal generator config to be used + NewSiggen(Siggen), + /// Stop the thread, do not listen for data anymore. StopThread, } /// Stream types that can be started /// +#[derive(PartialEq, Clone)] pub enum StreamType { /// Input-only stream Input, @@ -179,5 +209,9 @@ pub enum StreamError { /// Device #[strum(detailed_message = "Device not available")] - DeviceNotAvailable + DeviceNotAvailable, + + /// Logic error (something weird happened) + #[strum(detailed_message = "Logic error")] + LogicError } diff --git a/src/filter.rs b/src/filter.rs index 3380cc9..226b193 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -6,15 +6,16 @@ use super::config::*; use anyhow::{bail, Result}; use cfg_if::cfg_if; -use numpy::ndarray::{ArrayD, ArrayViewD, ArrayViewMutD}; -use numpy::{IntoPyArray, PyArray1, PyArrayDyn, PyArrayLike1, PyReadonlyArrayDyn}; + use rayon::prelude::*; cfg_if! { -if #[cfg(feature = "extension-module")] { -use pyo3::exceptions::PyValueError; -use pyo3::prelude::*; -use pyo3::{pymodule, types::PyModule, PyResult}; +if #[cfg(feature = "python-bindings")] { + use numpy::ndarray::{ArrayD, ArrayViewD, ArrayViewMutD}; + use numpy::{IntoPyArray, PyArray1, PyArrayDyn, PyArrayLike1, PyReadonlyArrayDyn}; + use pyo3::exceptions::PyValueError; + use pyo3::prelude::*; + use pyo3::{pymodule, types::PyModule, PyResult}; } else {} } pub trait Filter: Send { @@ -38,7 +39,7 @@ impl Clone for Box { } /// # A biquad is a second order recursive filter structure. -#[cfg_attr(feature = "extension-module", pyclass)] +#[cfg_attr(feature = "python-bindings", pyclass)] #[derive(Clone, Copy, Debug)] pub struct Biquad { // State parameters @@ -53,8 +54,8 @@ pub struct Biquad { a1: Flt, a2: Flt, } -#[cfg(feature = "extension-module")] -#[cfg_attr(feature = "extension-module", pymethods)] +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] impl Biquad { #[new] /// Create new biquad filter. See [Biquad::new()] @@ -178,13 +179,13 @@ impl Filter for Biquad { /// See (tests) /// ``` #[derive(Clone, Debug)] -#[cfg_attr(feature = "extension-module", pyclass)] +#[cfg_attr(feature = "python-bindings", pyclass)] pub struct SeriesBiquad { biqs: Vec, } -#[cfg(feature = "extension-module")] -#[cfg_attr(feature = "extension-module", pymethods)] +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] impl SeriesBiquad { #[new] /// Create new series filter set. See [SeriesBiquad::new()] @@ -273,7 +274,7 @@ impl Filter for SeriesBiquad { } } -#[cfg_attr(feature = "extension-module", pyclass)] +#[cfg_attr(feature = "python-bindings", pyclass)] #[derive(Clone)] /// Multiple biquad filter that operate in parallel on a signal, and can apply a gain value to each /// of the returned values. The BiquadBank can be used to decompose a signal by running it through @@ -310,8 +311,8 @@ pub struct BiquadBank { gains: Vec, } -#[cfg(feature = "extension-module")] -#[cfg_attr(feature = "extension-module", pymethods)] +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] /// Methods to wrap it in Python impl BiquadBank { #[new] diff --git a/src/lib.rs b/src/lib.rs index dea7cff..1f2c92c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,11 +17,11 @@ pub mod filter; pub mod daq; pub mod siggen; -#[cfg(feature = "extension-module")] +#[cfg(feature = "python-bindings")] use pyo3::prelude::*; /// A Python module implemented in Rust. -#[cfg(feature = "extension-module")] +#[cfg(feature = "python-bindings")] #[pymodule] #[pyo3(name="_lasprs")] fn lasprs(py: Python, m: &PyModule) -> PyResult<()> { @@ -31,7 +31,7 @@ fn lasprs(py: Python, m: &PyModule) -> PyResult<()> { } /// Add filter submodule to extension -#[cfg(feature = "extension-module")] +#[cfg(feature = "python-bindings")] fn pyo3_add_submodule_filter(py: Python, m: &PyModule) -> PyResult<()> { // Add filter submodule let filter_module = PyModule::new(py, "filter")?; diff --git a/src/siggen.rs b/src/siggen.rs index d0bbf85..57c270a 100644 --- a/src/siggen.rs +++ b/src/siggen.rs @@ -1,4 +1,4 @@ -//! This module provide signal generators. +//! This module provide signal generators. //! //! # Examples //! @@ -6,9 +6,10 @@ //! //! ``` //! use lasprs::siggen::Siggen; -//! let mut wn = Siggen::newWhiteNoise(); -//! wn.setGain(0.1); -//! wn.setMute(false); +//! let mut wn = Siggen::newWhiteNoise(1); +//! // Set gains for all channels +//! wn.setAllGains(0.1); +//! wn.setAllMute(false); //! let mut sig = [0. ; 1024]; //! wn.genSignal(&mut sig); //! println!("{:?}", &sig); @@ -16,8 +17,11 @@ //! ``` use super::config::*; use super::filter::Filter; -#[cfg(feature="extension-module")] +use dasp_sample::{FromSample, Sample}; + +#[cfg(feature = "python-bindings")] use pyo3::prelude::*; + use rand::prelude::*; use rand::rngs::ThreadRng; use rand_distr::StandardNormal; @@ -107,91 +111,93 @@ impl Source for Sine { } } - -#[derive(Clone)] /// Signal generator. Able to create acoustic output signals. See above example on how to use. /// Typical signal that can be created are: /// /// * (Siggen::newWhiteNoise) /// * (Siggen::newSine) /// +#[derive(Clone)] pub struct Siggen { // The source dynamic signal. Noise, a sine wave, sweep, etc source: Box, // Filter applied to the source signal - prefilter: Option>, - // If set, no dynamic signal source output is given - muted: bool, - gain: Flt, - DCOffset: Flt, + channels: Vec, } +/// Multiple channel signal generator. Can use a single source (coherent) to provide multiple signals +/// that can be sent out through different EQ's /// A struct that implements the Siggen trait is able to generate a signal. impl Siggen { - /// Set new pre-filter that filters the source signal - pub fn setPreFilter(&mut self, pref: Option>) { - self.prefilter = pref.clone(); - } - /// Set the gain applied to the source signal - /// - /// * g: Gain value. Can be any float. If set to 0.0, the source is effectively muted. Only - /// using (setMute) is a more efficient way to do this. - pub fn setGain(&mut self, g: Flt) { - self.gain = g; + /// Returns the number of channels this signal generator is generating for. + pub fn nchannels(&self) -> usize { + self.channels.len() } + /// Create a white noise signal generator. - pub fn newWhiteNoise() -> Siggen { - Siggen::new(Box::new(WhiteNoise::new())) + pub fn newWhiteNoise(nchannels: usize) -> Siggen { + Siggen::new(nchannels, Box::new(WhiteNoise::new())) } + + /// Set gains of all channels in signal generator to the same value + /// + /// # Args + /// + /// * g: New gain value + pub fn setAllGains(&mut self, g: Flt) { + self.channels.iter_mut().for_each(|set| set.setGain(g)) + } + /// Create a sine wave signal generator /// /// * freq: Frequency of the sine wave in \[Hz\] - pub fn newSineWave(freq: Flt) -> Siggen { - Siggen::new(Box::new(Sine::new(freq))) + pub fn newSineWave(nchannels: usize, freq: Flt) -> Siggen { + Siggen::new(nchannels, Box::new(Sine::new(freq))) } /// Create a new signal generator wiht an arbitrary source. - pub fn new(source: Box) -> Siggen { + pub fn new(nchannels: usize, source: Box) -> Siggen { Siggen { source, - prefilter: None, - muted: false, - gain: 1.0, - DCOffset: 0.0, + channels: vec![SiggenChannelConfig::new(); nchannels], } } - /// Generate new signal data. - /// - /// # Args - /// - /// sig: Reference of array of float values to be filled with signal data. - /// - /// # Details - /// - /// - When muted, the DC offset is still applied - /// - The order of the generation is: - /// - First, the source is generated. - /// - If a prefilter is installed, this pre-filter is applied to the source signal. - /// - Gain is applied. - /// - Offset is applied (thus, no gain is applied to the DC offset). - /// - pub fn genSignal(&mut self, sig: &mut [Flt]) { - if self.muted { - sig.iter_mut().for_each(|x| { - *x = 0.0; - }); - } else { - self.source.genSignal_unscaled(sig); - if let Some(f) = &mut self.prefilter { - f.filter(sig); + /// Creates *interleaved* output signal + pub fn genSignal(&mut self, out: &mut [T]) + where + T: Sample + FromSample, + Flt: Sample + { + let nch = self.nchannels(); + let nsamples: usize = out.len() / nch; + assert!(out.len() % self.nchannels() == 0); + + // No initialization required here, as the data is filled in in genSignal_unscaled + // Create source signal + let mut src = Vec::with_capacity(nsamples); + unsafe { + src.set_len(nsamples); + } + self.source.genSignal_unscaled(&mut src); + + // Create output temporary vector + let mut chout = Vec::with_capacity(nsamples); + unsafe { + chout.set_len(nsamples); + } + + // Write output while casted to the correct type + // Iterate over each channel, and counter + for (ch, channel) in self.channels.iter_mut().enumerate() { + // Create output signal, overwrite chout, as it + channel.genSignal(&src, &mut chout); + + let out_iterator = out.iter_mut().skip(ch).step_by(nch); + for (sampleout, samplein) in out_iterator.zip(&chout) { + *sampleout = samplein.to_sample(); } } - sig.iter_mut().for_each(|x| { - // First apply gain, then offset - *x *= self.gain; - *x += self.DCOffset; - }); } /// Reset signal generator. Applies any kind of cleanup necessary. @@ -202,15 +208,88 @@ impl Siggen { /// pub fn reset(&mut self, fs: Flt) { self.source.reset(fs); + self.channels.iter_mut().for_each(|x| x.reset(fs)) + } + /// Mute / unmute all channels at once + pub fn setAllMute(&mut self, mute: bool) { + self.channels.iter_mut().for_each(|s| {s.muted = mute;}); + } +} + +/// Signal generator config for a certain channel +#[derive(Clone)] +pub struct SiggenChannelConfig { + muted: bool, + prefilter: Option>, + gain: Flt, + DCOffset: Flt, +} +impl SiggenChannelConfig { + /// Set new pre-filter that filters the source signal + pub fn setPreFilter(&mut self, pref: Option>) { + self.prefilter = pref; + } + /// Set the gain applied to the source signal + /// + /// * g: Gain value. Can be any float. If set to 0.0, the source is effectively muted. Only + /// using (setMute) is a more efficient way to do this. + pub fn setGain(&mut self, g: Flt) { + self.gain = g; + } + + /// Reset signal channel config. Only resets the prefilter state + pub fn reset(&mut self, _fs: Flt) { if let Some(f) = &mut self.prefilter { - f.reset(); + f.reset() + } + } + /// Generate new channel configuration using 'arbitrary' initial config: muted false, gain 1.0, DC offset 0. + /// and no prefilter + pub fn new() -> SiggenChannelConfig { + SiggenChannelConfig { + muted: false, + prefilter: None, + gain: 1.0, + DCOffset: 0., } } - /// Set mut on signal generator. If true, only DC signal offset is outputed from (Sigen::genSignal). + /// Set mute on channel. If true, only DC signal offset is outputed from (SiggenChannelConfig::transform). pub fn setMute(&mut self, mute: bool) { self.muted = mute } + /// Generate new signal data, given input source data. + /// + /// # Args + /// + /// source: Input source signal. + /// result: Reference of array of float values to be filled with signal data. + /// + /// # Details + /// + /// - When muted, the DC offset is still applied + /// - The order of the generation is: + /// - If a prefilter is installed, this pre-filter is applied to the source signal. + /// - Gain is applied. + /// - Offset is applied (thus, no gain is applied to the DC offset). + /// + pub fn genSignal(&mut self, source: &[Flt], result: &mut [Flt]) { + if self.muted { + result.iter_mut().for_each(|x| { + *x = 0.0; + }); + } else { + result.copy_from_slice(source); + if let Some(f) = &mut self.prefilter { + f.filter(&result); + } + } + result.iter_mut().for_each(|x| { + // First apply gain, then offset + *x *= self.gain; + *x += self.DCOffset; + }); + } } #[cfg(test)] @@ -221,7 +300,7 @@ mod test { fn test_whitenoise() { // This code is just to check syntax. We should really be listening to these outputs. let mut t = [0.; 10]; - Siggen::newWhiteNoise().genSignal(&mut t); + Siggen::newWhiteNoise(1).genSignal(&mut t); println!("{:?}", &t); } @@ -229,9 +308,20 @@ mod test { fn test_sine() { // This code is just to check syntax. We should really be listening to these outputs. let mut s = [0.; 9]; - let mut siggen = Siggen::newSineWave(1.); + let mut siggen = Siggen::newSineWave(1, 1.); siggen.reset(1.); siggen.genSignal(&mut s); println!("{:?}", &s); } + + // A small test to learn a bit about sample types and conversion. This + // is the thing we want. + #[test] + fn test_sample() { + assert_eq!(0.5f32.to_sample::(), 64); + assert_eq!(1.0f32.to_sample::(), 127); + assert_eq!(-1.0f32.to_sample::(), -127); + assert_eq!(1.0f32.to_sample::(), i16::MAX); + + } }