diff --git a/.gitignore b/.gitignore index 155d90f..d326b89 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ __pycache__ python/lasprs/_lasprs* .venv .vscode/launch.json +.vscode diff --git a/Cargo.toml b/Cargo.toml index 55bcb2a..710e13f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lasprs" -version = "0.2.2" +version = "0.3.0" edition = "2021" authors = ["J.A. de Jong "] description = "Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)" @@ -37,7 +37,7 @@ rand = "0.8.5" rand_distr = "0.4.3" # Cross-platform audio lib -cpal = { version = "0.15.2", optional = true } +cpal = { version = "0.15.3", optional = true } # Nice enumerations strum = "0.25.0" diff --git a/src/bin/lasp_devinfo.rs b/src/bin/lasp_devinfo.rs index 400d200..cac67d9 100644 --- a/src/bin/lasp_devinfo.rs +++ b/src/bin/lasp_devinfo.rs @@ -6,7 +6,8 @@ use lasprs::daq::{DaqConfig, StreamMgr}; #[derive(Parser, Debug)] #[command(author, version, about="Generates DAQ configurations for available devices.", long_about = None)] struct Args { - /// Name of the person to greet + /// Devices to match. Search for these substrings in device names. Only + /// configurations are output based on these names. #[arg(short, long)] matches: Vec, } @@ -16,18 +17,26 @@ fn main() -> Result<()> { let write_all = args.matches.len() == 0; let mut smgr = StreamMgr::new(); + // Obtain list of devices let devs = smgr.getDeviceInfo(); + + // Iterate over them for dev in devs.iter() { + // The file name will be the device name, plus toml extension let filename = dev.device_name.clone() + ".toml"; + + // If no device name strings are given, we are outputting them all to a file. if write_all { let daqconfig = DaqConfig::newFromDeviceInfo(&dev); daqconfig.serialize_TOML_file(&filename.clone().into())?; } else { + // See if we find the name in the match list. for m in args.matches.iter() { - let needle =m.to_lowercase(); + let needle = m.to_lowercase(); let dev_lower = (&dev.device_name).to_lowercase(); if dev_lower.contains(&needle) { - DaqConfig::newFromDeviceInfo(&dev).serialize_TOML_file(&filename.clone().into())?; + DaqConfig::newFromDeviceInfo(&dev) + .serialize_TOML_file(&filename.clone().into())?; } } } diff --git a/src/bin/lasp_outputdefault.rs b/src/bin/lasp_outputdefault.rs new file mode 100644 index 0000000..7745358 --- /dev/null +++ b/src/bin/lasp_outputdefault.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use crossbeam::channel::{unbounded, Receiver, TryRecvError}; +use lasprs::daq::{InStreamMsg, StreamHandler, StreamMgr, StreamStatus, StreamType}; +use lasprs::siggen::Siggen; +use std::io; +use std::{thread, time}; +// use + +fn spawn_stdin_channel() -> Receiver { + let (tx, rx) = unbounded(); + thread::spawn(move || 'tt: loop { + let mut buffer = String::new(); + io::stdin().read_line(&mut buffer).unwrap(); + if let Err(_) = tx.send(buffer) { + break 'tt; + } + }); + rx +} +fn sleep(millis: u64) { + let duration = time::Duration::from_millis(millis); + thread::sleep(duration); +} +fn main() -> Result<()> { + let mut smgr = StreamMgr::new(); + + println!("Starting stream..."); + smgr.startDefaultOutputStream()?; + let stdin_channel = spawn_stdin_channel(); + + println!("Creating signal generator..."); + let mut siggen = Siggen::newSineWave(2, 100.); + siggen.setDCOffset(&[0.1, 0.]); + // let mut siggen = Siggen::newWhiteNoise(2); + siggen.setAllGains(0.1); + // siggen.setMute(&[true, true]); + // siggen.setMute(&[true, false]); + // siggen.setAllMute(false); + smgr.setSiggen(siggen)?; + + 'infy: loop { + match stdin_channel.try_recv() { + Ok(_key) => break 'infy, + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => panic!("Channel disconnected"), + } + sleep(1000); + match smgr.getStatus(StreamType::Output) { + StreamStatus::NotRunning => { + println!("Stream is not running?"); + break 'infy; + } + StreamStatus::Running => { + println!("Stream is running..."); + } + StreamStatus::Error(e) => { + println!("Stream error: {}", e); + break 'infy; + } + } + + // let stat = smgr. + } + + Ok(()) +} diff --git a/src/bin/lasp_record.rs b/src/bin/lasp_record.rs index 7c57a44..dae556a 100644 --- a/src/bin/lasp_record.rs +++ b/src/bin/lasp_record.rs @@ -48,8 +48,10 @@ fn main() -> Result<()> { startDelay: Duration::from_secs(ops.start_delay_s as u64), }; match ops.config_file_daq { + // No config file is given, start default input stream None => smgr.startDefaultInputStream()?, Some(filename) => { + // If config file is given, use that. let file = std::fs::read_to_string(filename)?; let cfg = DaqConfig::deserialize_TOML_str(&file)?; smgr.startStream(StreamType::Input, &cfg)?; @@ -66,7 +68,9 @@ fn main() -> Result<()> { println!("\nRecord error: {}", e); break 'infy; } - RecordStatus::Waiting => { println!("Waiting in start delay...");}, + RecordStatus::Waiting => { + println!("Waiting in start delay..."); + } RecordStatus::Finished => { println!("\nRecording finished."); break 'infy; @@ -81,8 +85,8 @@ fn main() -> Result<()> { Ok(_key) => { println!("User pressed key. Manually stopping recording here."); match _key.to_lowercase().as_str() { - "c" => r.cancel(), - _ => r.stop() + "c" => r.cancel(), + _ => r.stop(), } break 'infy; } diff --git a/src/daq/api/api_cpal.rs b/src/daq/api/api_cpal.rs index b28a7b3..9b84767 100644 --- a/src/daq/api/api_cpal.rs +++ b/src/daq/api/api_cpal.rs @@ -1,44 +1,43 @@ #![allow(dead_code)] use super::Stream; use crate::config::{self, *}; -use crate::daq::daqconfig::{DaqChannel, DaqConfig}; -use crate::daq::deviceinfo::DeviceInfo; -use crate::daq::{self, streammsg::*, DataType}; -use crate::siggen::Siggen; +use crate::daq::{self, *}; use anyhow::{bail, Result}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{Device, Host, Sample, SampleFormat, SupportedBufferSize}; +use crossbeam::atomic::AtomicCell; use crossbeam::channel::{Receiver, Sender}; use itertools::Itertools; +use num::ToPrimitive; +use reinterpret::reinterpret_slice; +use std::any::{Any, TypeId}; use std::collections::VecDeque; +use std::fmt::Debug; use std::sync::Arc; -/// Convert datatype in CPAL sampleformat +/// Convert CPAL sampleformat datatype impl From for cpal::SampleFormat { fn from(dt: DataType) -> cpal::SampleFormat { - let sf = match dt { + match dt { DataType::F64 => SampleFormat::F64, DataType::F32 => SampleFormat::F32, DataType::I8 => SampleFormat::I8, DataType::I16 => SampleFormat::I16, DataType::I32 => SampleFormat::I32, - DataType::I64 => SampleFormat::I64, - }; - sf + } } } +// Convert datatype to CPAL sample format impl From for DataType { fn from(sf: cpal::SampleFormat) -> DataType { - let dt = match sf { + match sf { SampleFormat::F64 => DataType::F64, SampleFormat::F32 => DataType::F32, SampleFormat::I8 => DataType::I8, SampleFormat::I16 => DataType::I16, SampleFormat::I32 => DataType::I32, - SampleFormat::I64 => DataType::I64, _ => panic!("Not implemented sample format: {}", sf), - }; - dt + } } } @@ -48,22 +47,23 @@ pub struct CpalApi { } pub struct CpalStream { stream: cpal::Stream, - md: Option, + md: Arc, noutchannels: usize, + status: Arc>, } impl Stream for CpalStream { - fn metadata(&self) -> Option { + fn metadata(&self) -> Arc { self.md.clone() } fn ninchannels(&self) -> usize { - if let Some(md) = &self.md { - return md.nchannels(); - } - 0 + self.md.nchannels() } fn noutchannels(&self) -> usize { self.noutchannels } + fn status(&self) -> StreamStatus { + self.status.load() + } } impl CpalApi { @@ -93,7 +93,7 @@ impl CpalApi { let mut iChannelCount = 0; let mut oChannelCount = 0; - let mut sample_rates = srs_tot.clone(); + let mut avSampleRates = srs_tot.clone(); let mut avFramesPerBlock = vec![256 as usize, 512, 1024, 2048, 8192]; let mut sample_formats = vec![]; @@ -105,8 +105,8 @@ impl CpalApi { continue; } sample_formats.push(icfg.sample_format()); - sample_rates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt); - sample_rates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt); + avSampleRates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt); + avSampleRates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt); if let SupportedBufferSize::Range { min, max } = icfg.buffer_size() { avFramesPerBlock.retain(|i| i >= &(*min as usize)); avFramesPerBlock.retain(|i| i <= &(*max as usize)); @@ -122,8 +122,8 @@ impl CpalApi { continue; } sample_formats.push(thissf); - sample_rates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt); - sample_rates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt); + avSampleRates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt); + avSampleRates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt); if let SupportedBufferSize::Range { min, max } = ocfg.buffer_size() { avFramesPerBlock.retain(|i| i >= &(*min as usize)); avFramesPerBlock.retain(|i| i <= &(*max as usize)); @@ -143,15 +143,15 @@ impl CpalApi { Some(idx) => dtypes[idx], None => dtypes[dtypes.len() - 1], }; - let prefSampleRate = *sample_rates.last().unwrap_or(&48000.); + let prefSampleRate = *avSampleRates.last().unwrap_or(&48000.); devs.push(DeviceInfo { api: super::StreamApiDescr::Cpal, device_name: dev.name()?, avDataTypes: dtypes, prefDataType, - avSampleRates: sample_rates, - prefSampleRate: prefSampleRate, + avSampleRates, + prefSampleRate, avFramesPerBlock, prefFramesPerBlock: 2048, @@ -163,7 +163,7 @@ impl CpalApi { hasInputTrigger: false, hasInternalOutputMonitor: false, duplexModeForced: false, - physicalIOQty: daq::Qty::Number, + physicalIOQty: Qty::Number, }) } @@ -171,18 +171,71 @@ impl CpalApi { } // Create the error function closure, that capture the send channel on which error messages from the stream are sent - fn create_errfcn(send_ch: Sender) -> impl FnMut(cpal::StreamError) { - let errfn = move |err: cpal::StreamError| match err { - cpal::StreamError::DeviceNotAvailable => send_ch - .send(RawStreamData::StreamError(StreamError::DeviceNotAvailable)) - .unwrap(), - cpal::StreamError::BackendSpecific { err: _ } => send_ch - .send(RawStreamData::StreamError(StreamError::DriverError)) - .unwrap(), + fn create_errfcn( + send_ch: Option>, + status: Arc>, + ) -> impl FnMut(cpal::StreamError) { + let errfn = move |err: cpal::StreamError| { + let serr = match err { + cpal::StreamError::DeviceNotAvailable => StreamError::DeviceNotAvailable, + cpal::StreamError::BackendSpecific { err: _ } => StreamError::DriverError, + }; + if let Some(sender) = &send_ch { + sender.send(RawStreamData::StreamError(serr)).unwrap(); + } + status.store(StreamStatus::Error(serr)); }; errfn } + fn create_incallback( + config: &cpal::StreamConfig, + sender: Sender, + framesPerBlock: usize, + en_inchannels: Vec, + ) -> impl FnMut(&[T], &cpal::InputCallbackInfo) + where + T: 'static + Sample + ToPrimitive, + { + let tot_inch = config.channels as usize; + + let mut q = VecDeque::::with_capacity(2 * tot_inch * framesPerBlock); + + let mut enabled_ch_data: Vec = + vec![Sample::EQUILIBRIUM; en_inchannels.len() * framesPerBlock]; + + // The actual callback that is returned + move |input: &[T], _: &cpal::InputCallbackInfo| { + // Copy elements over in ring buffer + q.extend(input); + + while q.len() > tot_inch * framesPerBlock { + // println!("q full enough: {}", q.len()); + + // // Loop over enabled channels + for (i, ch) in en_inchannels.iter().enumerate() { + let in_iterator = q.iter().skip(*ch).step_by(tot_inch); + let out_iterator = enabled_ch_data + .iter_mut() + .skip(i) + .step_by(en_inchannels.len()); + + // Copy over elements, *DEINTERLEAVED* + out_iterator.zip(in_iterator).for_each(|(o, i)| { + *o = *i; + }); + } + + // Drain copied elements from ring buffer + q.drain(0..framesPerBlock * tot_inch); + + // Send over data + let msg = RawStreamData::from(enabled_ch_data.clone()); + sender.send(msg).unwrap() + } + } + } + /// Create an input stream for a CPAL device. /// /// # Arguments @@ -195,32 +248,21 @@ impl CpalApi { sender: Sender, en_inchannels: Vec, framesPerBlock: usize, - ) -> Result { - let tot_inch = config.channels as usize; + ) -> Result<(cpal::Stream, Arc>)> { + let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning)); - let sender_err = sender.clone(); + let errfcn = CpalApi::create_errfcn(Some(sender.clone()), status.clone()); macro_rules! build_stream{ - ($($cpaltype:pat, $rtype:ty);*) => { + ($($cpaltype:pat => $rtype:ty),*) => { match sf { $( $cpaltype => { - let mut q = VecDeque::<$rtype>::with_capacity(2*tot_inch*framesPerBlock); - + let icb = CpalApi::create_incallback::<$rtype>(&config, sender, framesPerBlock, en_inchannels); device.build_input_stream( &config, - move |data, _: &_| InStreamCallback::<$rtype>( - data, &sender, - // Total number of input channels. This API has to filter out - // the channels that are not enabled - tot_inch, - // Vector of channels numbers that are enabled - &en_inchannels, - // Frames per block - framesPerBlock, - // Ring buffer for storage of samples as required. - &mut q), - CpalApi::create_errfcn(sender_err), + icb, + errfcn, None)? }),*, _ => bail!("Unsupported sample format '{}'", sf) @@ -228,12 +270,106 @@ impl CpalApi { } } let stream: cpal::Stream = build_stream!( - SampleFormat::I8, i8; - SampleFormat::I16, i16; - SampleFormat::I32, i32; - SampleFormat::F32, f32 + SampleFormat::I8 => i8, + SampleFormat::I16 => i16, + SampleFormat::I32 => i32, + SampleFormat::F32 => f32 ); - Ok(stream) + Ok((stream, status)) + } + + fn create_outcallback( + config: &cpal::StreamConfig, + streamstatus: Arc>, + receiver: Receiver, + framesPerBlock: usize, + ) -> impl FnMut(&mut [T], &cpal::OutputCallbackInfo) + where + T: 'static + Sample + Debug, + { + let tot_outch: usize = config.channels as usize; + // println!("Numer of channels: {:?}", tot_outch); + let mut callback_ctr: usize = 0; + let mut q = VecDeque::::with_capacity(2 * tot_outch * framesPerBlock); + + move |data, _info: &_| { + let nsamples_asked = data.len(); + let status = streamstatus.load(); + callback_ctr += 1; + + let mut setToEquilibrium = || data.iter_mut().for_each(|v| *v = Sample::EQUILIBRIUM); + match status { + StreamStatus::NotRunning | StreamStatus::Error(_) => { + setToEquilibrium(); + return; + } + _ => {} + } + + if q.len() < nsamples_asked { + // Obtain new samples from the generator + for dat in receiver.try_iter() { + let slice = dat.getRef::(); + if let StreamStatus::Running = status { + q.extend(slice); + } + } + } + + if q.len() >= nsamples_asked { + // All right, we have enough samples to send out! They are + // drained from the queue + data.iter_mut() + .zip(q.drain(..nsamples_asked)) + .for_each(|(o, i)| *o = i); + } else if callback_ctr <= 2 { + // For the first two blocks, we allow dat the data is not yet + // ready, without complaining on underruns + setToEquilibrium(); + } else { + // Output buffer underrun + streamstatus.store(StreamStatus::Error(StreamError::OutputUnderrunError)); + setToEquilibrium(); + } + } + } + + fn build_output_stream( + sf: cpal::SampleFormat, + config: &cpal::StreamConfig, + device: &cpal::Device, + receiver: Receiver, + framesPerBlock: usize, + ) -> Result<(cpal::Stream, Arc>)> { + // let tot_ch = config.channels as usize; + + let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning)); + + let err_cb = CpalApi::create_errfcn(None, status.clone()); + macro_rules! build_stream{ + ($($cpaltype:pat => $rtype:ty),*) => { + match sf { + $( + $cpaltype => { + let outcallback = CpalApi::create_outcallback::<$rtype>(config, status.clone(), receiver, framesPerBlock); + device.build_output_stream( + &config, + outcallback, + err_cb, + None)? + }),*, + _ => bail!("Unsupported sample format '{}'", sf) + } + } + } + let stream: cpal::Stream = build_stream!( + SampleFormat::I8 => i8, + SampleFormat::I16 => i16, + SampleFormat::I32 => i32, + SampleFormat::F32 => f32 + ); + + Ok((stream, status)) } /// Create CPAL specific configuration, from our specified daq config and device info @@ -285,7 +421,7 @@ impl CpalApi { } /// Start a stream for a device with a given configuration. - pub fn startStream( + pub fn startInputStream( &self, stype: StreamType, devinfo: &DeviceInfo, @@ -316,37 +452,31 @@ impl CpalApi { let sf = supported_config.sample_format(); let config: cpal::StreamConfig = supported_config.config(); - let (stream, metadata) = match stype { - StreamType::Input => { - let meta = StreamMetaData::new( - &conf.enabledInchannelConfig(), - conf.dtype, - supported_config.sample_rate().0 as Flt, - framesPerBlock, - )?; + let meta = StreamMetaData::new( + &conf.enabledInchannelConfig(), + conf.dtype, + supported_config.sample_rate().0 as Flt, + framesPerBlock, + )?; + let meta = Arc::new(meta); - let stream = CpalApi::build_input_stream( - sf, - &config, - &cpaldev, - sender, - conf.enabledInchannelsList(), - framesPerBlock, - )?; - (stream, Some(meta)) - } - - StreamType::Output => bail!("Not implemented output stream"), - _ => unreachable!(""), - }; + let (stream, status) = CpalApi::build_input_stream( + sf, + &config, + &cpaldev, + sender, + conf.enabledInchannelsList(), + framesPerBlock, + )?; stream.play()?; + status.store(StreamStatus::Running); - let noutchannels = conf.numberEnabledOutChannels(); return Ok(Box::new(CpalStream { stream, - md: metadata, - noutchannels, + md: meta, + noutchannels: 0, + status, })); } bail!(format!( @@ -373,7 +503,7 @@ impl CpalApi { let en_inchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize)); let sf = config.sample_format(); - let stream = CpalApi::build_input_stream( + let (stream, status) = CpalApi::build_input_stream( sf, &final_config, &device, @@ -382,11 +512,13 @@ impl CpalApi { framesPerBlock, )?; stream.play()?; + status.store(StreamStatus::Running); // Daq: default channel config - let daqchannels = Vec::from_iter((0..final_config.channels).map(|i| { - DaqChannel::defaultAudioInput(format!("Unnamed input channel {}", i)) - })); + let daqchannels = Vec::from_iter( + (0..final_config.channels) + .map(|i| DaqChannel::defaultAudio(format!("Unnamed input channel {}", i))), + ); // Specify data tape let dtype = DataType::from(sf); @@ -398,10 +530,12 @@ impl CpalApi { config.sample_rate().0 as Flt, framesPerBlock, )?; + let md = Arc::new(md); Ok(Box::new(CpalStream { stream, - md: Some(md), + md, noutchannels: 0, + status, })) } else { bail!("Could not obtain default input configuration") @@ -410,72 +544,71 @@ impl CpalApi { bail!("Could not open default input device") } } + + pub fn startDefaultOutputStream( + &self, + receiver: Receiver, + ) -> Result> { + if let Some(device) = self.host.default_output_device() { + if let Ok(config) = device.default_output_config() { + // let framesPerBlock: usize = 256; + // let framesPerBlock: usize = 8192; + let framesPerBlock: usize = config.sample_rate().0 as usize; + // let framesPerBlock: usize = 256; + let final_config = cpal::StreamConfig { + channels: config.channels(), + sample_rate: config.sample_rate(), + buffer_size: cpal::BufferSize::Fixed(framesPerBlock as u32), + }; + // let en_outchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize)); + + let sampleformat = config.sample_format(); + let (stream, status) = CpalApi::build_output_stream( + sampleformat, + &final_config, + &device, + receiver, + framesPerBlock, + )?; + + stream.play()?; + status.store(StreamStatus::Running); + + // Daq: default channel config + let daqchannels = + Vec::from_iter((0..final_config.channels).map(|i| { + DaqChannel::defaultAudio(format!("Unnamed output channel {}", i)) + })); + + // // Specify data tape + let dtype = DataType::from(sampleformat); + + // // Create stream metadata + let md = StreamMetaData::new( + &daqchannels, + dtype, + config.sample_rate().0 as Flt, + framesPerBlock, + )?; + let md = Arc::new(md); + let str = Box::new(CpalStream { + stream, + md, + noutchannels: daqchannels.len(), + status, + }); + Ok(str) + } else { + bail!("Could not obtain default output configuration") + } // Default output config is OK + } else { + bail!("Could not open output device") + } // Could not + } // Create an output stream, using given signal generators for each channel. - // fn build_output_stream( - // sf: cpal::SampleFormat, - // config: cpal::StreamConfig, - // device: &cpal::Device, - // siggens: Vec, - // ) -> Result { - // macro_rules! build_stream{ - // ($($cpaltype:pat, $rtype:ty);*) => { - // match sf { - // $( - // $cpaltype => device.build_input_stream( - // &config, - // move |data, _: &_| InStreamCallback::<$rtype>(data, &sender), - // CpalApi::create_errfcn(sender.clone()), - // None)? - // ),*, - // _ => bail!("Unsupported sample format '{}'", sf) - // } - // } - // } - // let stream: cpal::Stream = build_stream!( - // SampleFormat::I8, i8; - // SampleFormat::I16, i16; - // SampleFormat::I32, i32; - // SampleFormat::F32, f32 - // ); - // Ok(stream) // } -} -fn InStreamCallback( - input: &[T], - sender: &Sender, - tot_inch: usize, - en_inchannels: &[usize], - framesPerBlock: usize, - q: &mut VecDeque, -) where - T: Copy + num::ToPrimitive + 'static, -{ - // Copy elements over in ring buffer - q.extend(input); - while q.len() > tot_inch * framesPerBlock { - // println!("q full enough: {}", q.len()); - let mut enabled_ch_data: Vec = Vec::with_capacity(en_inchannels.len() * framesPerBlock); - unsafe { - enabled_ch_data.set_len(enabled_ch_data.capacity()); - } - - // Loop over enabled channels - for (i, ch) in en_inchannels.iter().enumerate() { - let in_iterator = q.iter().skip(*ch).step_by(tot_inch); - let out_iterator = enabled_ch_data.iter_mut().skip(i).step_by(en_inchannels.len()); - - // Copy over elements, *DEINTERLEAVED* - out_iterator.zip(in_iterator).for_each(|(o, i)| { - *o = *i; - }); - } - - // Drain copied elements from ring buffer - q.drain(0..framesPerBlock * tot_inch); - - // Send over data - let msg = RawStreamData::from(enabled_ch_data); - sender.send(msg).unwrap() + pub fn startOutputStream(&self, rx: Receiver) -> Result> { + bail!("Not implemented"); } } diff --git a/src/daq/api/mod.rs b/src/daq/api/mod.rs index 9ca3454..5a0290d 100644 --- a/src/daq/api/mod.rs +++ b/src/daq/api/mod.rs @@ -5,24 +5,28 @@ use serde::{Deserialize, Serialize}; /// - ... use strum::EnumMessage; use strum_macros; +use std::sync::Arc; -use super::StreamMetaData; +use super::{streamstatus::StreamStatus, StreamMetaData}; #[cfg(feature = "cpal-api")] pub mod api_cpal; #[cfg(feature = "pulse_api")] pub mod api_pulse; + /// A currently running stream pub trait Stream { /// Stream metadata. Only available for input streams - fn metadata(&self) -> Option; + fn metadata(&self) -> Arc; /// Number of input channels in stream fn ninchannels(&self) -> usize; /// Number of output channels in stream fn noutchannels(&self) -> usize; + + fn status(&self) -> StreamStatus; } #[derive(strum_macros::EnumMessage, Debug, Clone, PartialEq, Serialize, Deserialize)] diff --git a/src/daq/daqconfig.rs b/src/daq/daqconfig.rs index b507c7a..f345eb6 100644 --- a/src/daq/daqconfig.rs +++ b/src/daq/daqconfig.rs @@ -1,11 +1,11 @@ use std::{ops::Index, path::PathBuf}; -use anyhow::Result; use super::api::StreamApiDescr; use super::datatype::DataType; use super::deviceinfo::DeviceInfo; use super::qty::Qty; use crate::config::*; +use anyhow::Result; use serde::{Deserialize, Serialize}; /// DAQ Configuration for a single channel @@ -41,10 +41,10 @@ impl Default for DaqChannel { } impl DaqChannel { /// Default channel configuration for audio input from a certain channel - pub fn defaultAudioInput(name: String) -> Self { + pub fn defaultAudio(name: String) -> Self { DaqChannel { enabled: true, - name: name, + name, sensitivity: 1.0, IEPEEnabled: false, ACCouplingMode: false, @@ -59,21 +59,27 @@ impl DaqChannel { pub struct DaqConfig { /// The API pub api: StreamApiDescr, + /// Device name. Should match when starting a stream pub device_name: String, /// Configuration of the input channels pub inchannel_config: Vec, + /// Configuration of the output channels pub outchannel_config: Vec, /// The data type to use pub dtype: DataType, - /// Whether to apply a digital high pass on the input. <=0 means disabled. > 0 means, the value specifies the cut-on frequency for the first order high pass filter. + + /// Whether to apply a digital high pass on the input. <= 0 means disabled. > 0 means, the value specifies the cut-on frequency for the first order high pass filter. pub digitalHighPassCutOn: Flt, + /// The index to use in the list of possible sample rates sampleRateIndex: usize, + /// The index to use in the list of possible frames per block framesPerBlockIndex: usize, + /// Used when output channels should be monitored, i.e. reverse-looped back as input channels. monitorOutput: bool, } @@ -82,7 +88,6 @@ impl DaqConfig { /// Creates a new default device configuration for a given device as specified with /// the DeviceInfo descriptor. pub fn newFromDeviceInfo(devinfo: &DeviceInfo) -> DaqConfig { - let inchannel_config = (0..devinfo.iChannelCount) .map(|_| DaqChannel::default()) .collect(); @@ -94,7 +99,7 @@ impl DaqConfig { .avSampleRates .iter() .position(|x| x == &devinfo.prefSampleRate) - .unwrap_or(devinfo.avSampleRates.len()-1); + .unwrap_or(devinfo.avSampleRates.len() - 1); // Choose 4096 when in list, otherwise choose the highes available value in list let framesPerBlockIndex = devinfo .avFramesPerBlock @@ -116,13 +121,12 @@ impl DaqConfig { } /// Serialize DaqConfig object to TOML. - /// + /// /// Args - /// + /// /// * writer: Output writer, can be file or string, or anything that *is* std::io::Write - /// + /// pub fn serialize_TOML(&self, writer: &mut dyn std::io::Write) -> Result<()> { - let ser_str = toml::to_string(&self)?; writer.write_all(ser_str.as_bytes())?; @@ -130,36 +134,37 @@ impl DaqConfig { } /// Deserialize structure from TOML data - /// + /// /// # Args - /// + /// /// * reader: implements the Read trait, from which we read the data. - pub fn deserialize_TOML(reader: &mut T) -> Result where T: std::io::Read { + pub fn deserialize_TOML(reader: &mut T) -> Result + where + T: std::io::Read, + { let mut read_str = vec![]; reader.read_to_end(&mut read_str)?; let read_str = String::from_utf8(read_str)?; DaqConfig::deserialize_TOML_str(&read_str) } - /// Deserialize from TOML string - /// + /// /// # Args - /// + /// /// * st: string containing TOML data. pub fn deserialize_TOML_str(st: &String) -> Result { - let res : DaqConfig = toml::from_str(&st)?; + let res: DaqConfig = toml::from_str(&st)?; Ok(res) } /// Write this configuration to a TOML file. - /// + /// /// Args - /// + /// /// * file: Name of file to write to - /// + /// pub fn serialize_TOML_file(&self, file: &PathBuf) -> Result<()> { - let mut file = std::fs::File::create(file)?; self.serialize_TOML(&mut file)?; Ok(()) diff --git a/src/daq/datatype.rs b/src/daq/datatype.rs index 705d231..ef0745b 100644 --- a/src/daq/datatype.rs +++ b/src/daq/datatype.rs @@ -23,7 +23,4 @@ pub enum DataType { /// 32-bit integers #[strum(message = "I32", detailed_message = "32-bits integers")] I32 = 4, - /// 64-bit integers - #[strum(message = "I64", detailed_message = "64-bits integers")] - I64 = 5, } diff --git a/src/daq/deviceinfo.rs b/src/daq/deviceinfo.rs index ece86c6..3731854 100644 --- a/src/daq/deviceinfo.rs +++ b/src/daq/deviceinfo.rs @@ -1,15 +1,15 @@ //! Data acquisition model. Provides abstract layers around DAQ devices. #![allow(non_snake_case)] -use super::*; use super::api::StreamApiDescr; +use super::*; +use crate::config::*; /// Device info structure. Gives all information regarding a device, i.e. the number of input and /// output channels, its name and available sample rates and types. #[derive(Clone, Debug)] #[allow(dead_code)] pub struct DeviceInfo { - /// The api in use for this device pub api: StreamApiDescr, @@ -18,21 +18,25 @@ pub struct DeviceInfo { /// Available data types for the sample pub avDataTypes: Vec, + /// Preferred data type for device pub prefDataType: DataType, /// Available frames per block pub avFramesPerBlock: Vec, + /// Preferred frames per block for device pub prefFramesPerBlock: usize, /// Available sample rates pub avSampleRates: Vec, + /// Preferred sample rate for device pub prefSampleRate: Flt, /// Number of input channels available for this device pub iChannelCount: u8, + /// Number of output channels available for this device pub oChannelCount: u8, diff --git a/src/daq/mod.rs b/src/daq/mod.rs index 77feb3d..4b07d5b 100644 --- a/src/daq/mod.rs +++ b/src/daq/mod.rs @@ -5,385 +5,79 @@ mod daqconfig; mod datatype; mod deviceinfo; mod qty; + #[cfg(feature = "record")] mod record; + +mod streamcmd; +mod streamdata; mod streamhandler; +mod streammgr; mod streammsg; +mod streamstatus; pub use daqconfig::*; pub use datatype::*; pub use deviceinfo::*; pub use qty::*; +pub use streamcmd::*; +pub use streamdata::*; pub use streamhandler::*; +pub use streammgr::*; pub use streammsg::*; +pub use streamstatus::*; #[cfg(feature = "record")] pub use record::*; -#[cfg(feature = "cpal-api")] -use api::api_cpal::CpalApi; - -use crate::{ - config::*, - siggen::{self, Siggen}, -}; -use anyhow::{bail, Error, Result}; -use api::Stream; -use core::time; -use crossbeam::{ - channel::{unbounded, Receiver, Sender, TrySendError}, - thread, -}; -use deviceinfo::DeviceInfo; -use std::sync::{atomic::AtomicBool, Arc, Mutex}; -use std::thread::{JoinHandle, Thread}; -use streammsg::*; - -use self::api::StreamApiDescr; +use strum_macros::Display; cfg_if::cfg_if! { if #[cfg(feature = "python-bindings")] { use pyo3::exceptions::PyValueError; use pyo3::prelude::*; - use pyo3::{pymodule, types::PyModule, PyResult}; + use pyo3::{pymodule, pyclass, types::PyModule, PyResult}; } else {} } -/// Keep track of whether the stream has been created. To ensure singleton behaviour. -static smgr_created: AtomicBool = AtomicBool::new(false); - -struct StreamData { - streamtype: StreamType, - stream: Box, - threadhandle: JoinHandle, - comm: Sender, -} - -#[cfg_attr(feature = "python-bindings", pyclass(unsendable))] -/// Configure and manage input / output streams. +/// Stream types that can be started /// -pub struct StreamMgr { - // List of available devices - devs: Vec, - - // Input stream can be both input and duplex - input_stream: Option>, - - // Output only stream - output_stream: Option>, - - #[cfg(feature = "cpal-api")] - cpal_api: CpalApi, - - /// The storage of queues. When no streams are running, they - /// are here. When stream is running, they will become available - /// in the JoinHandle of the thread. - instreamqueues: Option, - - // Signal generator. Stored here on the bench in case no stream is running. - // It is picked when it is configured correctly for the starting output stream - // If it is not configured correctly, when a stream that outputs data is started - // ,it is removed here. - siggen: Option, +#[cfg_attr(feature = "python-bindings", pyclass)] +#[derive(PartialEq, Clone, Copy)] +pub enum StreamType { + /// Input-only stream + Input, + /// Output-only stream + Output, + /// Input and output at the same time + Duplex, } -#[cfg(feature = "python-bindings")] -#[cfg_attr(feature = "python-bindings", pymethods)] -impl StreamMgr { - #[new] - /// See (StreamMgr::new()) - fn new_py<'py>() -> StreamMgr { - StreamMgr::new() - } +/// Errors that happen in a stream +#[derive(strum_macros::EnumMessage, Debug, Clone, Display, Copy)] +pub enum StreamError { + /// Input overrun + #[strum( + message = "InputOverrun Error", + detailed_message = "Input buffer overrun" + )] + InputOverrunError, + + /// Output underrun + #[strum( + message = "OutputUnderrunError", + detailed_message = "Output buffer underrun" + )] + OutputUnderrunError, - // #[pyo3(name = "unit")] - // #[staticmethod] - // /// See: [Biquad::unit()] - // pub fn unit_py() -> Biquad { - // Biquad::unit() - // } - // #[pyo3(name = "firstOrderHighPass")] -} -impl StreamMgr { - /// Create new stream manager. A stream manager is supposed to be a singleton. - /// - /// # Panics - /// - /// When a StreamMgr object is already alive. - pub fn new() -> StreamMgr { - if smgr_created.load(std::sync::atomic::Ordering::Relaxed) { - panic!("BUG: Only one stream manager is supposed to be a singleton"); - } - smgr_created.store(true, std::sync::atomic::Ordering::Relaxed); - - let mut smgr = StreamMgr { - devs: vec![], - input_stream: None, - output_stream: None, - siggen: None, - - #[cfg(feature = "cpal-api")] - cpal_api: CpalApi::new(), - - instreamqueues: Some(vec![]), - }; - smgr.devs = smgr.scanDeviceInfo(); - smgr - } - /// Set a new signal generator. Returns an error if it is unapplicable. - /// It is unapplicable if the number of channels of output does not match the - /// number of output channels in a running stream. - pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> { - // Current signal generator. Where to place it? - if let Some(is) = &self.input_stream { - if let StreamType::Duplex = is.streamtype { - if siggen.nchannels() != is.stream.noutchannels() { - bail!("Invalid number of channels configured in signal generator") - } - assert!(self.siggen.is_none()); - is.comm.send(StreamCommand::NewSiggen(siggen)).unwrap(); - return Ok(()); - } - } else if let Some(os) = &self.output_stream { - assert!(self.siggen.is_none()); - if siggen.nchannels() != os.stream.noutchannels() { - bail!("Invalid number of channels configured in signal generator") - } - os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap(); - return Ok(()); - } else { - self.siggen = Some(siggen); - return Ok(()); - } - unreachable!() - } - - /// Obtain a list of devices that are available for each available API - pub fn getDeviceInfo(&mut self) -> &Vec { - &self.devs - } - - fn scanDeviceInfo(&self) -> Vec { - let mut devinfo = vec![]; - #[cfg(feature = "cpal-api")] - { - let cpal_devs = self.cpal_api.getDeviceInfo(); - if let Ok(devs) = cpal_devs { - devinfo.extend(devs); - } - } - devinfo - } - - /// Add a new queue to the lists of queues - pub fn addInQueue(&mut self, tx: Sender) { - if let Some(is) = &self.input_stream { - is.comm.send(StreamCommand::AddInQueue(tx)).unwrap() - } else { - self.instreamqueues.as_mut().unwrap().push(tx); - } - } - - fn startInputStreamThread( - &mut self, - stream: &Box, - rx: Receiver, - ) -> (JoinHandle, Sender) { - let (commtx, commrx) = unbounded(); - - // Unwrap here, as the queues should be free to grab - let mut iqueues = self.instreamqueues.take().unwrap(); - - let meta = stream.metadata().unwrap(); - - let threadhandle = std::thread::spawn(move || { - let mut ctr: usize = 0; - 'infy: loop { - if let Ok(comm_msg) = commrx.try_recv() { - match comm_msg { - // New queue added - StreamCommand::AddInQueue(queue) => { - match queue.send(InStreamMsg::StreamStarted(Arc::new(meta.clone()))) { - Ok(()) => iqueues.push(queue), - Err(_) => {} - } - } - - // Remove queue from list - StreamCommand::RemoveInQueue(queue) => { - iqueues.retain(|q| !q.same_channel(&queue)) - } - - // Stop this thread. Returns the queue - StreamCommand::StopThread => { - sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped); - break 'infy; - } - StreamCommand::NewSiggen(_) => { - panic!("Error: signal generator send to input-only stream."); - } - } - } - if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) { - // println!("Obtained raw stream data!"); - let msg = Arc::new(msg); - let msg = InStreamMsg::RawStreamData(ctr, msg); - sendMsgToAllQueues(&mut iqueues, msg); - ctr += 1; - } - } - iqueues - }); - (threadhandle, commtx) - } - fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> { - for d in self.devs.iter() { - if d.device_name == cfg.device_name { - return Some(d); - } - } - None - } - - /// Start a stream of certain type, using given configuration - pub fn startStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> { - if self.input_stream.is_some() { - bail!("Input stream is already running. Please first stop existing input stream.") - } - match stype { - StreamType::Input | StreamType::Duplex => { - if cfg.numberEnabledInChannels() == 0 { - bail!("At least one input channel should be enabled for an input stream") - } - } - _ => {} - } - - let (tx, rx): (Sender, Receiver) = unbounded(); - - let stream = match cfg.api { - StreamApiDescr::Cpal => { - let devinfo = self - .match_devinfo(cfg) - .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?; - self.cpal_api.startStream(stype, devinfo, cfg, tx)? - } - _ => bail!("Unimplemented api!"), - }; - - let iqueues = self.instreamqueues.as_mut().unwrap(); - let meta = stream.metadata().unwrap(); - sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(Arc::new(meta))); - - let (threadhandle, commtx) = self.startInputStreamThread(&stream, rx); - - self.input_stream = Some(StreamData { - streamtype: stype, - stream, - threadhandle, - comm: commtx, - }); - - Ok(()) - } - - /// Start a default input stream, using default settings on everything. This is only possible - /// when the CPAL_api is available - pub fn startDefaultInputStream(&mut self) -> Result<()> { - if self.input_stream.is_some() { - bail!("Input stream is already running. Please first stop existing input stream.") - } - - let (tx, rx): (Sender, Receiver) = unbounded(); - - // Only a default input stream when CPAL feature is enabled - cfg_if::cfg_if! { - if #[cfg(feature="cpal-api")] { - let stream = self.cpal_api.startDefaultInputStream(tx)?; - // Inform all listeners of new stream data - - let iqueues = self.instreamqueues.as_mut().unwrap(); - let meta = stream.metadata().unwrap(); - sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(Arc::new(meta))); - - let (threadhandle, commtx) = self.startInputStreamThread(&stream, rx); - - self.input_stream = Some(StreamData { - streamtype: StreamType::Input, - stream, - threadhandle, - comm: commtx, - }); - Ok(()) - - } - else { - bail!("Unable to start default input stream: no CPAL api available") - } - } - } - - /// Stop existing input stream. - pub fn stopInputStream(&mut self) -> Result<()> { - if let Some(StreamData { - streamtype: _, // Ignored here - stream: _, - threadhandle, - comm, - }) = self.input_stream.take() - { - // println!("Stopping existing stream.."); - // Send thread to stop - comm.send(StreamCommand::StopThread).unwrap(); - - // Store stream queues back into StreamMgr - self.instreamqueues = Some(threadhandle.join().expect("Stream thread panicked!")); - } else { - bail!("Stream is not running.") - } - Ok(()) - } - /// Stop existing running stream. - /// - /// Args - /// - /// * st: The stream type. - pub fn stopStream(&mut self, st: StreamType) -> Result<()> { - match st { - StreamType::Input | StreamType::Duplex => self.stopInputStream(), - _ => bail!("Not implemented output stream"), - } - } -} // impl StreamMgr -impl Drop for StreamMgr { - fn drop(&mut self) { - // Kill input stream if there is one - if self.input_stream.is_some() { - self.stopStream(StreamType::Input).unwrap(); - } - if self.output_stream.is_some() { - self.stopStream(StreamType::Output).unwrap(); - } - - // Decref the singleton - smgr_created.store(false, std::sync::atomic::Ordering::Relaxed); - } -} - -// Send to all queues, remove queues that are disconnected when found out -// on the way. -fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) { - // Loop over queues. Remove queues that error when we try to send - // to them - iqueues.retain(|q| match q.try_send(msg.clone()) { - Ok(_) => true, - Err(_e) => false, - }); -} -/// Daq devices -trait Daq {} - -#[cfg(test)] -mod tests { - - // #[test] + /// Driver specific error + #[strum(message = "DriverError", detailed_message = "Driver error")] + DriverError, + + /// Device + #[strum(detailed_message = "Device not available (anymore)")] + DeviceNotAvailable, + + /// Logic error (something weird happened) + #[strum(detailed_message = "Logic error")] + LogicError, } diff --git a/src/daq/record.rs b/src/daq/record.rs index 4747d69..4dafc6c 100644 --- a/src/daq/record.rs +++ b/src/daq/record.rs @@ -1,4 +1,5 @@ use super::*; +use crate::config::Flt; use anyhow::{bail, Error, Result}; use clap::builder::OsStr; use crossbeam::atomic::AtomicCell; @@ -10,8 +11,8 @@ use rayon::iter::Empty; use serde::de::IntoDeserializer; use std::path::{Path, PathBuf}; use std::str::FromStr; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; +use std::sync::{Arc, Mutex}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use strum::EnumMessage; @@ -100,7 +101,6 @@ impl Recording { DataType::I8 => Recording::create_dataset_type::(file, meta), DataType::I16 => Recording::create_dataset_type::(file, meta), DataType::I32 => Recording::create_dataset_type::(file, meta), - DataType::I64 => Recording::create_dataset_type::(file, meta), DataType::F32 => Recording::create_dataset_type::(file, meta), DataType::F64 => Recording::create_dataset_type::(file, meta), } @@ -152,9 +152,6 @@ impl Recording { let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; ds.write_slice(arr, (ctr, .., ..))?; } - RawStreamData::UnknownDataType => { - bail!("Unknown data type!") - } RawStreamData::StreamError(e) => { bail!("Stream error: {}", e) } @@ -289,17 +286,16 @@ impl Recording { // Early stop. User stopped it. break 'recloop; } - InStreamMsg::RawStreamData(incoming_ctr, dat) => { + InStreamMsg::StreamData(dat) => { if first { first = false; // Initialize counter offset - ctr_offset = incoming_ctr; - } else { - if incoming_ctr != stored_ctr + ctr_offset { - println!("********** PACKAGES MISSED ***********"); - bail!("Packages missed. Recording is invalid.") - } + ctr_offset = dat.ctr; + } else if dat.ctr != stored_ctr + ctr_offset { + println!("********** PACKAGES MISSED ***********"); + bail!("Packages missed. Recording is invalid.") } + if wait_block_ctr > 0 { // We are still waiting wait_block_ctr -= 1; @@ -316,7 +312,7 @@ impl Recording { Recording::append_to_dset( &ds, stored_ctr, - dat.as_ref(), + &dat.raw, framesPerBlock, nchannels, )?; diff --git a/src/daq/streamcmd.rs b/src/daq/streamcmd.rs new file mode 100644 index 0000000..6f7fe04 --- /dev/null +++ b/src/daq/streamcmd.rs @@ -0,0 +1,31 @@ +//! Provides stream messages that come from a running stream +use crate::config::*; +use crate::daq::Qty; +use crate::siggen::Siggen; +use anyhow::{bail, Result}; +use crossbeam::channel::Sender; +use std::any::TypeId; +use std::sync::{Arc, RwLock}; + +use super::*; +cfg_if::cfg_if! { +if #[cfg(feature = "python-bindings")] { + use pyo3::exceptions::PyValueError; + use pyo3::prelude::*; + use pyo3::{pymodule, pyclass, types::PyModule, PyResult}; +} else {} } + +/// Commands that can be sent to a running stream +pub enum StreamCommand { + /// Add a new queue to a running stream + AddInQueue(SharedInQueue), + + /// Remove a queue to a running stream + RemoveInQueue(SharedInQueue), + + /// New signal generator config to be used + NewSiggen(Siggen), + + /// Stop the thread, do not listen for data anymore. + StopThread, +} diff --git a/src/daq/streamdata.rs b/src/daq/streamdata.rs new file mode 100644 index 0000000..a348ff4 --- /dev/null +++ b/src/daq/streamdata.rs @@ -0,0 +1,303 @@ +//! Provides stream messages that come from a running stream +use crate::config::*; +use crate::daq::Qty; +use crate::siggen::Siggen; +use anyhow::{bail, Result}; +use cpal::Sample; +use crossbeam::channel::Sender; +use reinterpret::{reinterpret_slice, reinterpret_vec}; +use std::any::TypeId; +use std::sync::{Arc, RwLock}; +use std::u128::MAX; +use strum_macros::Display; + +use super::*; +cfg_if::cfg_if! { +if #[cfg(feature = "python-bindings")] { + use pyo3::exceptions::PyValueError; + use pyo3::prelude::*; + use pyo3::{pymodule, pyclass, types::PyModule, PyResult}; +} else {} } + +/// Raw stream data coming from a stream. +#[derive(Clone, Debug)] +pub enum RawStreamData { + /// 8-bits integer + Datai8(Vec), + /// 16-bits integer + Datai16(Vec), + /// 32-bits integer + Datai32(Vec), + /// 32-bits float + Dataf32(Vec), + /// 64-bits float + Dataf64(Vec), + + /// A stream error occured + StreamError(StreamError), +} + +impl RawStreamData { + pub fn toFloat(&self, nchannels: usize) -> Dmat { + // match &self { + // RawStreamData::Datai8(c) => { + // Dmat::zeros((2, 2)); + // } + // RawStreamData::Datai16(c) => { + // Dmat::zeros((2, 2)); + // } + // } + todo!() + } +} +impl RawStreamData { + /// Get reference to raw data buffer + pub fn getRef(&self) -> &[T] + where + T: Sample + 'static, + { + let thetype: TypeId = TypeId::of::(); + match &self { + RawStreamData::Datai8(t) => { + let i8type: TypeId = TypeId::of::(); + assert!(thetype == i8type); + let v: &[T] = unsafe { reinterpret_slice(t) }; + v + } + RawStreamData::Datai16(t) => { + let i16type: TypeId = TypeId::of::(); + assert!(thetype == i16type); + let v: &[T] = unsafe { reinterpret_slice(t) }; + v + } + RawStreamData::Datai32(t) => { + let i32type: TypeId = TypeId::of::(); + assert!(thetype == i32type); + let v: &[T] = unsafe { reinterpret_slice(t) }; + v + } + RawStreamData::Dataf32(t) => { + let f32type: TypeId = TypeId::of::(); + assert!(thetype == f32type); + let v: &[T] = unsafe { reinterpret_slice(t) }; + v + } + RawStreamData::Dataf64(t) => { + let f64type: TypeId = TypeId::of::(); + assert!(thetype == f64type); + let v: &[T] = unsafe { reinterpret_slice(t) }; + v + } + _ => panic!("Cannot getRef from "), + } + } +} + +// Create InStreamData object from +impl From<&[T]> for RawStreamData +where + T: num::ToPrimitive + Clone + 'static, +{ + fn from(input: &[T]) -> RawStreamData { + // Apparently, this code does not work with a match. I have searched around and have not found the + // reason for this. So this is a bit of stupid boilerplate. + let i8type: TypeId = TypeId::of::(); + let i16type: TypeId = TypeId::of::(); + let i32type: TypeId = TypeId::of::(); + let f32type: TypeId = TypeId::of::(); + let f64type: TypeId = TypeId::of::(); + let thetype: TypeId = TypeId::of::(); + if i8type == thetype { + let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; + RawStreamData::Datai8(v) + } else if i16type == thetype { + let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; + RawStreamData::Datai16(v) + } else if i16type == thetype { + let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; + RawStreamData::Datai16(v) + } else if i32type == thetype { + let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; + RawStreamData::Datai32(v) + } else if f32type == thetype { + let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; + RawStreamData::Dataf32(v) + } else if f64type == thetype { + let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; + RawStreamData::Dataf64(v) + } else { + panic!("Not implemented sample type!") + } + } +} + +// Create InStreamData object from +impl From> for RawStreamData +where + T: num::ToPrimitive + Clone + 'static, +{ + fn from(input: Vec) -> RawStreamData { + // Apparently, this code does not work with a match. I have searched around and have not found the + // reason for this. So this is a bit of stupid boilerplate. + let i8type: TypeId = TypeId::of::(); + let i16type: TypeId = TypeId::of::(); + let i32type: TypeId = TypeId::of::(); + let f32type: TypeId = TypeId::of::(); + let f64type: TypeId = TypeId::of::(); + let thetype: TypeId = TypeId::of::(); + if i8type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Datai8(v) + } else if i16type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Datai16(v) + } else if i16type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Datai16(v) + } else if i32type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Datai32(v) + } else if f32type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Dataf32(v) + } else if f64type == thetype { + let v: Vec = unsafe { reinterpret_vec(input) }; + RawStreamData::Dataf64(v) + } else { + panic!("Not implemented sample type!") + } + } +} + +/// Stream metadata. All information required for properly interpreting the raw +/// data that is coming from the stream. +#[derive(Clone, Debug)] +pub struct StreamMetaData { + /// Information for each channel in the stream + pub channelInfo: Vec, + + /// The data type of the device [Number / voltage / Acoustic pressure / ...] + pub rawDatatype: DataType, + + /// Sample rate in [Hz] + pub samplerate: Flt, + + /// The number of frames per block of data that comes in. Multiplied by + /// channelInfo.len() we get the total number of samples that come in at + /// each callback. + pub framesPerBlock: usize, +} +impl StreamMetaData { + /// Create new metadata object. + /// /// + /// # Args + /// + pub fn new( + channelInfo: &[DaqChannel], + rawdtype: DataType, + sr: Flt, + framesPerBlock: usize, + ) -> Result { + Ok(StreamMetaData { + channelInfo: channelInfo.to_vec(), + rawDatatype: rawdtype, + samplerate: sr, + framesPerBlock, + }) + } + + /// Returns the number of channels in the stream metadata. + pub fn nchannels(&self) -> usize { + self.channelInfo.len() + } +} + +/// Stream data (audio / other) coming from a stream or to be send to a stream +#[derive(Debug)] +pub struct StreamData { + /// Package counter. Should always increase monotonically. + pub ctr: usize, + + /// Stream metadata. All info required for properly interpreting the raw data. + pub meta: Arc, + + /// This is typically what is stored when recording + pub raw: RawStreamData, + + // Converted to floating point format. Used for further real time + // processing. Stored in an rw-lock. The first thread that acesses this data + // will perform the conversion. All threads after that will get the data. + converted: RwLock>>, +} +impl StreamData { + /// Create new stream data object. + pub fn new(ctr: usize, meta: Arc, raw: RawStreamData) -> StreamData { + StreamData { + ctr, + meta, + raw, + converted: RwLock::new(None), + } + } + + /// Get the data in floating point format. If already converted, uses the + /// cached float data. + pub fn getFloatData(&self) -> Arc { + if let Some(dat) = self.converted.read().unwrap().as_ref() { + return dat.clone(); + } + + // In case we reach here, the data has not yet be converted to floating + // point, so we do this. + let mut o = self.converted.write().unwrap(); + + // It might be that another thread was 'first', and already performed + // the conversion. In that case, we still do an early return, and we + // just openend the lock twice for writing. Not a problem. + if let Some(dat) = o.as_ref() { + return dat.clone(); + } + // Perform the actual conversion + let converted_data = Arc::new(self.raw.toFloat(self.meta.nchannels())); + // Replace the option with the Some + o.replace(converted_data.clone()); + + converted_data + } +} + +#[cfg(test)] +mod test { + use num::traits::sign; + use cpal::Sample; + + use super::*; + + #[test] + fn test() { + + const fs: Flt = 20.; + // Number of samples per channel + const Nframes: usize = 20; + const Nch: usize = 2; + let mut signal = [0.; Nch*Nframes]; + let mut siggen = Siggen::newSineWave(Nch, 1.); + + siggen.reset(fs); + siggen.setMute(&[false, true]); + siggen.genSignal(&mut signal); + + let raw: Vec = Vec::from_iter(signal.iter().map( + |o| o.to_sample::())); + + let ms1 = raw.iter().step_by(2).map(|s1| {*s1 as f64 * *s1 as f64}).sum::() / Nframes as f64; + + let i16maxsq = (i16::MAX as f64).powf(2.); + // println!("ms1: {} {}", ms1, i16maxsq/2.); + // println!("{:?}", raw.iter().cloned().step_by(2).collect::>()); + // println!("{:?}", i16::EQUILIBRIUM); + assert!(f64::abs(ms1 - i16maxsq/2.)/i16maxsq < 1e-3); + + } + +} \ No newline at end of file diff --git a/src/daq/streamhandler.rs b/src/daq/streamhandler.rs index 98a3c68..cf3fd58 100644 --- a/src/daq/streamhandler.rs +++ b/src/daq/streamhandler.rs @@ -1,5 +1,5 @@ -use crossbeam::channel::unbounded; +use crossbeam::channel::{unbounded, Receiver}; use super::*; /// A stream handler registers a queue in the stream manager, and keeps the other end to diff --git a/src/daq/streammgr.rs b/src/daq/streammgr.rs new file mode 100644 index 0000000..6805e82 --- /dev/null +++ b/src/daq/streammgr.rs @@ -0,0 +1,589 @@ +//! Data acquisition model. Provides abstract layers around DAQ devices. +use super::api::*; +use super::*; +use crate::{ + config::*, + siggen::{self, Siggen}, +}; +use anyhow::{bail, Error, Result}; +use array_init::from_iter; +use core::time; +use cpal::Sample; +use crossbeam::{ + channel::{unbounded, Receiver, Sender, TrySendError}, + thread, +}; +use std::sync::{atomic::AtomicBool, Arc, Mutex}; +use std::thread::{JoinHandle, Thread}; + +#[cfg(feature = "cpal-api")] +use super::api::api_cpal::CpalApi; + +cfg_if::cfg_if! { +if #[cfg(feature = "python-bindings")] { + use pyo3::exceptions::PyValueError; + use pyo3::prelude::*; + use pyo3::{pymodule, types::PyModule, PyResult}; +} else {} } + +/// Store a queue in a shared pointer, to share sending +/// and receiving part of the queue. +pub type SharedInQueue = Sender; +/// Vector of queues for stream messages +pub type InQueues = Vec; + +struct StreamInfo { + streamtype: StreamType, + stream: Box, + threadhandle: JoinHandle, + comm: Sender, +} + +/// Keep track of whether the stream has been created. To ensure singleton behaviour. +static smgr_created: AtomicBool = AtomicBool::new(false); + +#[cfg_attr(feature = "python-bindings", pyclass(unsendable))] +/// Configure and manage input / output streams. +/// +pub struct StreamMgr { + // List of available devices + devs: Vec, + + // Input stream can be both input and duplex + input_stream: Option>, + + // Output only stream + output_stream: Option>, + + #[cfg(feature = "cpal-api")] + cpal_api: CpalApi, + + /// The storage of queues. When no streams are running, they + /// are here. When stream is running, they will become available + /// in the JoinHandle of the thread. + instreamqueues: Option, + + // Signal generator. Stored here on the bench in case no stream is running. + // It is picked when it is configured correctly for the starting output stream + // If it is not configured correctly, when a stream that outputs data is started + // ,it is removed here. + siggen: Option, +} + +#[cfg(feature = "python-bindings")] +#[cfg_attr(feature = "python-bindings", pymethods)] +impl StreamMgr { + #[new] + /// See (StreamMgr::new()) + fn new_py<'py>() -> StreamMgr { + StreamMgr::new() + } + + // #[pyo3(name = "unit")] + // #[staticmethod] + // /// See: [Biquad::unit()] + // pub fn unit_py() -> Biquad { + // Biquad::unit() + // } + // #[pyo3(name = "firstOrderHighPass")] +} +impl StreamMgr { + /// Create new stream manager. A stream manager is supposed to be a singleton. + /// + /// # Panics + /// + /// When a StreamMgr object is already alive. + pub fn new() -> StreamMgr { + if smgr_created.load(std::sync::atomic::Ordering::Relaxed) { + panic!("BUG: Only one stream manager is supposed to be a singleton"); + } + smgr_created.store(true, std::sync::atomic::Ordering::Relaxed); + + let mut smgr = StreamMgr { + devs: vec![], + input_stream: None, + output_stream: None, + siggen: None, + + #[cfg(feature = "cpal-api")] + cpal_api: CpalApi::new(), + + instreamqueues: Some(vec![]), + }; + smgr.devs = smgr.scanDeviceInfo(); + smgr + } + + /// Get stream status for given stream type. + pub fn getStatus(&self, t: StreamType) -> StreamStatus { + match t { + StreamType::Input | StreamType::Duplex => { + if let Some(s) = &self.input_stream { + s.stream.status() + } else { + StreamStatus::NotRunning + } + } + StreamType::Output => { + if let Some(s) = &self.output_stream { + s.stream.status() + } else { + StreamStatus::NotRunning + } + } + } + } + /// Set a new signal generator. Returns an error if it is unapplicable. + /// It is unapplicable if the number of channels of output does not match the + /// number of output channels in a running stream. + pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> { + // Current signal generator. Where to place it? + if let Some(istream) = &self.input_stream { + if let StreamType::Duplex = istream.streamtype { + if siggen.nchannels() != istream.stream.noutchannels() { + bail!("Invalid number of channels configured in signal generator") + } + assert!(self.siggen.is_none()); + istream.comm.send(StreamCommand::NewSiggen(siggen)).unwrap(); + return Ok(()); + } + } else if let Some(os) = &self.output_stream { + assert!(self.siggen.is_none()); + if siggen.nchannels() != os.stream.noutchannels() { + bail!("Invalid number of channels configured in signal generator") + } + os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap(); + return Ok(()); + } else { + self.siggen = Some(siggen); + return Ok(()); + } + unreachable!() + } + + /// Obtain a list of devices that are available for each available API + pub fn getDeviceInfo(&mut self) -> &Vec { + &self.devs + } + + fn scanDeviceInfo(&self) -> Vec { + let mut devinfo = vec![]; + #[cfg(feature = "cpal-api")] + { + let cpal_devs = self.cpal_api.getDeviceInfo(); + if let Ok(devs) = cpal_devs { + devinfo.extend(devs); + } + } + devinfo + } + + /// Add a new queue to the lists of queues + pub fn addInQueue(&mut self, tx: Sender) { + if let Some(is) = &self.input_stream { + is.comm.send(StreamCommand::AddInQueue(tx)).unwrap() + } else { + self.instreamqueues.as_mut().unwrap().push(tx); + } + } + + fn startInputStreamThread( + &mut self, + meta: Arc, + rx: Receiver, + ) -> (JoinHandle, Sender) { + let (commtx, commrx) = unbounded(); + + // Unwrap here, as the queues should be free to grab + let mut iqueues = self + .instreamqueues + .take() + .expect("No input streams queues!"); + + let threadhandle = std::thread::spawn(move || { + let mut ctr: usize = 0; + 'infy: loop { + if let Ok(comm_msg) = commrx.try_recv() { + match comm_msg { + // New queue added + StreamCommand::AddInQueue(queue) => { + match queue.send(InStreamMsg::StreamStarted(meta.clone())) { + Ok(()) => iqueues.push(queue), + Err(_) => {} + } + } + + // Remove queue from list + StreamCommand::RemoveInQueue(queue) => { + iqueues.retain(|q| !q.same_channel(&queue)) + } + + // Stop this thread. Returns the queue + StreamCommand::StopThread => { + sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped); + break 'infy; + } + StreamCommand::NewSiggen(_) => { + panic!("Error: signal generator send to input-only stream."); + } + } + } + if let Ok(raw) = rx.recv_timeout(time::Duration::from_millis(10)) { + // println!("Obtained raw stream data!"); + + let streamdata = StreamData::new(ctr, meta.clone(), raw); + let streamdata = Arc::new(streamdata); + + let msg = InStreamMsg::StreamData(streamdata); + sendMsgToAllQueues(&mut iqueues, msg); + ctr += 1; + } + } + iqueues + }); + (threadhandle, commtx) + } + + // Match device info struct on given daq config. + fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> { + for d in self.devs.iter() { + if d.device_name == cfg.device_name { + return Some(d); + } + } + None + } + fn startOuputStreamThread( + &mut self, + meta: Arc, + tx: Sender, + ) -> (JoinHandle, Sender) { + let (commtx, commrx) = unbounded(); + + // Number of channels to output for + let nchannels = meta.nchannels(); + + // Obtain signal generator. Set to silence when no signal generator is + // installed. + let mut siggen = self + .siggen + .take() + .unwrap_or_else(|| Siggen::newSilence(nchannels)); + + if siggen.nchannels() != nchannels { + // Updating number of channels + siggen.setNChannels(nchannels); + } + siggen.reset(meta.samplerate); + + let threadhandle = std::thread::spawn(move || { + let mut floatbuf: Vec = Vec::with_capacity(nchannels * meta.framesPerBlock); + 'infy: loop { + if let Ok(comm_msg) = commrx.try_recv() { + match comm_msg { + // New queue added + StreamCommand::AddInQueue(_) => { + panic!("Invalid message send to output thread: AddInQueue"); + } + + // Remove queue from list + StreamCommand::RemoveInQueue(_) => { + panic!("Invalid message send to output thread: RemoveInQueue"); + } + + // Stop this thread. Returns the queue + StreamCommand::StopThread => { + break 'infy; + } + StreamCommand::NewSiggen(new_siggen) => { + // println!("NEW SIGNAL GENERATOR ARRIVED!"); + siggen = new_siggen; + siggen.reset(meta.samplerate); + if siggen.nchannels() != nchannels { + // println!("Updating channels"); + siggen.setNChannels(nchannels); + } + } + } + } + while tx.len() < 2 { + unsafe { + floatbuf.set_len(nchannels * meta.framesPerBlock); + } + // Obtain signal + siggen.genSignal(&mut floatbuf); + // println!("level: {}", floatbuf.iter().sum::()); + let msg = match meta.rawDatatype { + DataType::I8 => { + let v = Vec::::from_iter(floatbuf.iter().map(|f| f.to_sample())); + RawStreamData::Datai8(v) + } + DataType::I16 => { + let v = Vec::::from_iter(floatbuf.iter().map(|f| f.to_sample())); + RawStreamData::Datai16(v) + } + DataType::I32 => { + let v = Vec::::from_iter(floatbuf.iter().map(|f| f.to_sample())); + RawStreamData::Datai32(v) + } + DataType::F32 => { + let v = Vec::::from_iter(floatbuf.iter().map(|f| f.to_sample())); + RawStreamData::Dataf32(v) + } + DataType::F64 => { + let v = Vec::::from_iter(floatbuf.iter().map(|f| f.to_sample())); + RawStreamData::Dataf64(v) + } + }; + if let Err(_e) = tx.send(msg) { + // println!("Error sending raw stream data to output stream!"); + break 'infy; + } + } + + // } + } + siggen + }); + (threadhandle, commtx) + } + + /// Start a stream of certain type, using given configuration + pub fn startStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> { + match stype { + StreamType::Input | StreamType::Duplex => { + self.startInputOrDuplexStream(stype, cfg)?; + } + StreamType::Output => { + // self.startOutputStream(cfg)?; + bail!("No output stream defined yet"); + } + } + Ok(()) + } + + // fn startOutputStream(&mut self, cfg: &DaqConfig) -> Result<()> { + // let (tx, rx): (Sender, Receiver) = unbounded(); + // let stream = match cfg.api { + // StreamApiDescr::Cpal => { + // let devinfo = self + // .match_devinfo(cfg) + // .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?; + // self.cpal_api.startOutputStream(devinfo, cfg, tx)? + // } + // _ => bail!("Unimplemented api!"), + // }; + + // Ok(()) + // } + + // Start an input or duplex stream + fn startInputOrDuplexStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> { + if self.input_stream.is_some() { + bail!("An input stream is already running. Please first stop existing input stream.") + } + if cfg.numberEnabledInChannels() == 0 { + bail!("At least one input channel should be enabled for an input stream") + } + if stype == StreamType::Duplex { + if cfg.numberEnabledOutChannels() == 0 { + bail!("At least one output channel should be enabled for a duplex stream") + } + if self.output_stream.is_some() { + bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream."); + } + } + let (tx, rx): (Sender, Receiver) = unbounded(); + + let stream = match cfg.api { + StreamApiDescr::Cpal => { + if stype == StreamType::Duplex { + bail!("Duplex mode not supported for CPAL api"); + } + let devinfo = self + .match_devinfo(cfg) + .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?; + self.cpal_api.startInputStream(stype, devinfo, cfg, tx)? + } + _ => bail!("Unimplemented api!"), + }; + + // Input queues should be available, otherwise panic bug. + let iqueues = self.instreamqueues.as_mut().unwrap(); + + let meta = stream.metadata(); + + sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone())); + + let (threadhandle, commtx) = self.startInputStreamThread(meta, rx); + + self.input_stream = Some(StreamInfo { + streamtype: stype, + stream, + threadhandle, + comm: commtx, + }); + + Ok(()) + } + + /// Start a default input stream, using default settings on everything. This is only possible + /// when the CPAL_api is available + pub fn startDefaultInputStream(&mut self) -> Result<()> { + if self.input_stream.is_some() { + bail!("Input stream is already running. Please first stop existing input stream.") + } + + let (tx, rx): (Sender, Receiver) = unbounded(); + + // Only a default input stream when CPAL feature is enabled + cfg_if::cfg_if! { + if #[cfg(feature="cpal-api")] { + let stream = self.cpal_api.startDefaultInputStream(tx)?; + // Inform all listeners of new stream data + + let iqueues = self.instreamqueues.as_mut().unwrap(); + let meta = stream.metadata(); + sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone())); + + let (threadhandle, commtx) = self.startInputStreamThread(meta, rx); + + self.input_stream = Some(StreamInfo { + streamtype: StreamType::Input, + stream, + threadhandle, + comm: commtx, + }); + Ok(()) + + } + else { + bail!("Unable to start default input stream: no CPAL api available") + } + } + } + + /// Start a default output stream. Only possible when CPAL Api is available. + pub fn startDefaultOutputStream(&mut self) -> Result<()> { + if let Some(istream) = &self.input_stream { + if istream.streamtype == StreamType::Duplex { + bail!("Duplex stream is already running"); + } + } + if self.output_stream.is_some() { + bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream."); + } + + cfg_if::cfg_if! { + if #[cfg(feature="cpal-api")] { + + let (tx, rx)= unbounded(); + let stream = self.cpal_api.startDefaultOutputStream(rx)?; + let meta = stream.metadata(); + let (threadhandle, commtx) = self.startOuputStreamThread::(meta, tx); + // Inform all listeners of new stream data + + + self.output_stream = Some(StreamInfo { + streamtype: StreamType::Input, + stream, + threadhandle, + comm: commtx, + }); + Ok(()) + + } // end if cpal api available + else { + bail!("Unable to start default input stream: no CPAL api available") + } + + } // end of cfg_if + } + + /// Stop existing input stream. + pub fn stopInputStream(&mut self) -> Result<()> { + if let Some(StreamInfo { + streamtype: _, // Ignored here + stream: _, + threadhandle, + comm, + }) = self.input_stream.take() + { + // println!("Stopping existing stream.."); + // Send thread to stop + comm.send(StreamCommand::StopThread).unwrap(); + + // Store stream queues back into StreamMgr + self.instreamqueues = Some(threadhandle.join().expect("Stream thread panicked!")); + } else { + bail!("Stream is not running.") + } + Ok(()) + } + /// Stop existing output stream + pub fn stopOutputStream(&mut self) -> Result<()> { + if let Some(StreamInfo { + streamtype: _, // Ignored here + stream: _, + threadhandle, + comm, + }) = self.output_stream.take() + { + if let Err(_) = comm.send(StreamCommand::StopThread){ + assert!(threadhandle.is_finished()); + } + // println!("Wainting for threadhandle to join..."); + self.siggen = Some(threadhandle.join().expect("Output thread panicked!")); + // println!("Threadhandle joined!"); + } else { + bail!("Stream is not running."); + } + Ok(()) + } + /// Stop existing running stream. + /// + /// Args + /// + /// * st: The stream type. + pub fn stopStream(&mut self, st: StreamType) -> Result<()> { + match st { + StreamType::Input | StreamType::Duplex => self.stopInputStream(), + StreamType::Output => self.stopOutputStream(), + } + } +} // impl StreamMgr +impl Drop for StreamMgr { + fn drop(&mut self) { + // Kill input stream if there is one + if self.input_stream.is_some() { + self.stopStream(StreamType::Input).unwrap(); + } + if self.output_stream.is_some() { + // println!("Stopstream in Drop"); + self.stopStream(StreamType::Output).unwrap(); + // println!("Stopstream in Drop done"); + } + + // Decref the singleton + smgr_created.store(false, std::sync::atomic::Ordering::Relaxed); + } +} + +// Send to all queues, remove queues that are disconnected when found out +// on the way. +fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) { + // Loop over queues. Remove queues that error when we try to send + // to them + iqueues.retain(|q| match q.try_send(msg.clone()) { + Ok(_) => true, + Err(_e) => false, + }); +} +/// Daq devices +trait Daq {} + +#[cfg(test)] +mod tests { + + // #[test] +} diff --git a/src/daq/streammsg.rs b/src/daq/streammsg.rs index 4d7f1eb..f393509 100644 --- a/src/daq/streammsg.rs +++ b/src/daq/streammsg.rs @@ -6,160 +6,18 @@ use anyhow::{bail, Result}; use crossbeam::channel::Sender; use reinterpret::{reinterpret_slice, reinterpret_vec}; use std::any::TypeId; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::u128::MAX; use strum_macros::Display; use super::*; -cfg_if::cfg_if! { -if #[cfg(feature = "python-bindings")] { - use pyo3::exceptions::PyValueError; - use pyo3::prelude::*; - use pyo3::{pymodule, pyclass, types::PyModule, PyResult}; -} else {} } -/// Raw stream data coming from a stream. -#[derive(Clone, Debug)] -pub enum RawStreamData { - /// 8-bits integer - Datai8(Arc>), - /// 16-bits integer - Datai16(Arc>), - /// 32-bits integer - Datai32(Arc>), - /// 32-bits float - Dataf32(Arc>), - /// 64-bits float - Dataf64(Arc>), - - /// Unknown data type. We cannot do anything with it, we could instead also create an error, although this is easier to pass downstream. - UnknownDataType, - - /// A stream error occured - StreamError(StreamError), -} - -// Create InStreamData object from -impl From<&[T]> for RawStreamData -where - T: num::ToPrimitive + Clone + 'static, -{ - fn from(input: &[T]) -> RawStreamData { - // Apparently, this code does not work with a match. I have searched around and have not found the - // reason for this. So this is a bit of stupid boilerplate. - let i8type: TypeId = TypeId::of::(); - let i16type: TypeId = TypeId::of::(); - let i32type: TypeId = TypeId::of::(); - let f32type: TypeId = TypeId::of::(); - let f64type: TypeId = TypeId::of::(); - let thetype: TypeId = TypeId::of::(); - if i8type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Datai8(Arc::new(v)) - } else if i16type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Datai16(Arc::new(v)) - } else if i16type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Datai16(Arc::new(v)) - } else if i32type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Datai32(Arc::new(v)) - } else if f32type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Dataf32(Arc::new(v)) - } else if f64type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Dataf64(Arc::new(v)) - } else { - panic!("Not implemented sample type!") - } - } -} - -// Create InStreamData object from -impl From> for RawStreamData -where - T: num::ToPrimitive + Clone + 'static, -{ - fn from(input: Vec) -> RawStreamData { - // Apparently, this code does not work with a match. I have searched around and have not found the - // reason for this. So this is a bit of stupid boilerplate. - let i8type: TypeId = TypeId::of::(); - let i16type: TypeId = TypeId::of::(); - let i32type: TypeId = TypeId::of::(); - let f32type: TypeId = TypeId::of::(); - let f64type: TypeId = TypeId::of::(); - let thetype: TypeId = TypeId::of::(); - if i8type == thetype { - let v: Vec = unsafe { reinterpret_vec(input) }; - RawStreamData::Datai8(Arc::new(v)) - } else if i16type == thetype { - let v: Vec = unsafe { reinterpret_vec(input) }; - RawStreamData::Datai16(Arc::new(v)) - } else if i16type == thetype { - let v: Vec = unsafe { reinterpret_vec(input) }; - RawStreamData::Datai16(Arc::new(v)) - } else if i32type == thetype { - let v: Vec = unsafe { reinterpret_vec(input) }; - RawStreamData::Datai32(Arc::new(v)) - } else if f32type == thetype { - let v: Vec = unsafe { reinterpret_vec(input) }; - RawStreamData::Dataf32(Arc::new(v)) - } else if f64type == thetype { - let v: Vec = unsafe { reinterpret_vec(input) }; - RawStreamData::Dataf64(Arc::new(v)) - } else { - panic!("Not implemented sample type!") - } - } -} - -/// Stream metadata. All information required for -#[derive(Clone, Debug)] -pub struct StreamMetaData { - /// Information for each channel in the stream - pub channelInfo: Vec, - - /// The data type of the device [Number / voltage] - pub rawDatatype: DataType, - - /// Sample rate in [Hz] - pub samplerate: Flt, - - /// The number of frames per block send over - pub framesPerBlock: usize, -} -impl StreamMetaData { - /// Create new metadata object. - /// /// - /// # Args - /// - pub fn new( - channelInfo: &[DaqChannel], - rawdtype: DataType, - sr: Flt, - framesPerBlock: usize, - ) -> Result { - Ok(StreamMetaData { - channelInfo: channelInfo.to_vec(), - rawDatatype: rawdtype, - samplerate: sr, - framesPerBlock, - }) - } - - /// Returns the number of channels in the stream metadata. - pub fn nchannels(&self) -> usize { - self.channelInfo.len() - } -} /// Input stream messages, to be send to handlers. #[derive(Clone, Debug)] pub enum InStreamMsg { /// Raw stream data that is coming from a device. This is interleaved data. The number of channels is correct and /// specified in the stream metadata. - RawStreamData(usize, Arc), + StreamData(Arc), /// An error has occured in the stream StreamError(StreamError), @@ -175,25 +33,6 @@ pub enum InStreamMsg { StreamStopped, } -/// Store a queue in a shared pointer, to share sending -/// and receiving part of the queue. -pub type SharedInQueue = Sender; -/// Vector of queues for stream messages -pub type InQueues = Vec; - -/// Commands that can be sent to a running stream -pub enum StreamCommand { - /// Add a new queue to a running stream - AddInQueue(SharedInQueue), - /// Remove a queue to a running stream - RemoveInQueue(SharedInQueue), - - /// New signal generator config to be used - NewSiggen(Siggen), - - /// Stop the thread, do not listen for data anymore. - StopThread, -} /// Stream types that can be started /// @@ -207,28 +46,3 @@ pub enum StreamType { /// Input and output at the same time Duplex, } - -/// Errors that happen in a stream -#[derive(strum_macros::EnumMessage, Debug, Clone, Display)] -pub enum StreamError { - /// Input overrun - #[strum(message = "InputXRunError", detailed_message = "Input buffer overrun")] - InputXRunError, - /// Output underrun - #[strum( - message = "OutputXRunError", - detailed_message = "Output buffer overrun" - )] - OutputXRunError, - /// Driver specific error - #[strum(message = "DriverError", detailed_message = "Driver error")] - DriverError, - - /// Device - #[strum(detailed_message = "Device not available")] - DeviceNotAvailable, - - /// Logic error (something weird happened) - #[strum(detailed_message = "Logic error")] - LogicError, -} diff --git a/src/daq/streamstatus.rs b/src/daq/streamstatus.rs new file mode 100644 index 0000000..a93cf25 --- /dev/null +++ b/src/daq/streamstatus.rs @@ -0,0 +1,25 @@ +//! Provides stream messages that come from a running stream +use strum_macros::Display; + +use super::*; +cfg_if::cfg_if! { +if #[cfg(feature = "python-bindings")] { + use pyo3::exceptions::PyValueError; + use pyo3::prelude::*; + use pyo3::{pymodule, pyclass, types::PyModule, PyResult}; +} else {} } + +/// Gives the stream status of a stream, either input / output or duplex. +#[derive(strum_macros::EnumMessage, Debug, Clone, Copy, Display)] +pub enum StreamStatus { + /// Stream is not running + #[strum(message = "NotRunning", detailed_message = "Stream is not running")] + NotRunning, + /// Stream is running properly + #[strum(message = "Running", detailed_message = "Stream is running")] + Running, + + /// An error occured in the stream. + #[strum(message = "Error", detailed_message = "An error occured with the stream")] + Error(StreamError) +} diff --git a/src/filter.rs b/src/filter.rs index 226b193..8e89d60 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -61,7 +61,7 @@ impl Biquad { /// Create new biquad filter. See [Biquad::new()] /// pub fn new_py<'py>(coefs: PyReadonlyArrayDyn) -> PyResult { - Ok(Biquad::new(&coefs.as_slice()?)?) + Ok(Biquad::new(coefs.as_slice()?)?) } #[pyo3(name = "unit")] #[staticmethod] @@ -146,12 +146,12 @@ impl Biquad { Ok(Biquad::new(&coefs).unwrap()) } fn filter_inout(&mut self, inout: &mut [Flt]) { - for sample in 0..inout.len() { - let w0 = inout[sample] - self.a1 * self.w1 - self.a2 * self.w2; + for sample in inout.iter_mut() { + let w0 = *sample - self.a1 * self.w1 - self.a2 * self.w2; let yn = self.b0 * w0 + self.b1 * self.w1 + self.b2 * self.w2; self.w2 = self.w1; self.w1 = w0; - inout[sample] = yn; + *sample = yn; } // println!("{:?}", inout); } @@ -238,7 +238,7 @@ impl SeriesBiquad { biqs.push(biq); } - if biqs.len() == 0 { + if biqs.is_empty() { bail!("No filter coefficients given!"); } diff --git a/src/siggen.rs b/src/siggen.rs index 57c270a..bdd6ca7 100644 --- a/src/siggen.rs +++ b/src/siggen.rs @@ -18,6 +18,10 @@ use super::config::*; use super::filter::Filter; use dasp_sample::{FromSample, Sample}; +use rayon::prelude::*; +use std::fmt::Debug; +use std::iter::ExactSizeIterator; +use std::slice::IterMut; #[cfg(feature = "python-bindings")] use pyo3::prelude::*; @@ -26,10 +30,12 @@ use rand::prelude::*; use rand::rngs::ThreadRng; use rand_distr::StandardNormal; +const twopi: Flt = 2. * pi; + /// Source for the signal generator. Implementations are sine waves, sweeps, noise. pub trait Source: Send { /// Generate the 'pure' source signal. Output is placed inside the `sig` argument. - fn genSignal_unscaled(&mut self, sig: &mut [Flt]); + fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator); /// Reset the source state, i.e. set phase to 0, etc fn reset(&mut self, fs: Flt); /// Used to make the Siggen struct cloneable @@ -41,6 +47,19 @@ impl Clone for Box { } } +#[derive(Clone)] +struct Silence {} + +impl Source for Silence { + fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator) { + sig.for_each(|s| *s = 0.0); + } + fn reset(&mut self, _fs: Flt) {} + fn clone_dyn(&self) -> Box { + Box::new(self.clone()) + } +} + /// White noise source #[derive(Clone)] struct WhiteNoise {} @@ -51,9 +70,8 @@ impl WhiteNoise { } } impl Source for WhiteNoise { - fn genSignal_unscaled(&mut self, sig: &mut [Flt]) { - sig.iter_mut() - .for_each(|s| *s = thread_rng().sample(StandardNormal)); + fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator) { + sig.for_each(|s| *s = thread_rng().sample(StandardNormal)); } fn reset(&mut self, _fs: Flt) {} fn clone_dyn(&self) -> Box { @@ -87,20 +105,18 @@ impl Sine { } } impl Source for Sine { - fn genSignal_unscaled(&mut self, sig: &mut [Flt]) { - if self.fs < 0. { - sig.iter_mut().for_each(|s| { + fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator) { + if self.fs <= 0. { + sig.for_each(|s| { *s = 0.; }); return; } - sig.iter_mut().for_each(|s| { + sig.for_each(|s| { *s = Flt::sin(self.phase); self.phase += self.omg / self.fs; + self.phase %= twopi; }); - while self.phase > 2. * pi { - self.phase -= 2. * pi; - } } fn reset(&mut self, fs: Flt) { self.fs = fs; @@ -123,6 +139,12 @@ pub struct Siggen { source: Box, // Filter applied to the source signal channels: Vec, + + // Temporary source signal buffer + source_buf: Vec, + + // Output buffers (for filtered source signal) + chout_buf: Vec>, } /// Multiple channel signal generator. Can use a single source (coherent) to provide multiple signals /// that can be sent out through different EQ's @@ -134,20 +156,52 @@ impl Siggen { self.channels.len() } + /// Silence: create a signal generator that does not output any dynamic + /// signal at all. + pub fn newSilence(nchannels: usize) -> Siggen { + Siggen { + channels: vec![SiggenChannelConfig::new(); nchannels], + source: Box::new(Silence {}), + source_buf: vec![], + chout_buf: vec![], + } + } + /// Create a white noise signal generator. pub fn newWhiteNoise(nchannels: usize) -> Siggen { Siggen::new(nchannels, Box::new(WhiteNoise::new())) } /// Set gains of all channels in signal generator to the same value - /// + /// /// # Args - /// + /// /// * g: New gain value pub fn setAllGains(&mut self, g: Flt) { self.channels.iter_mut().for_each(|set| set.setGain(g)) } + /// Set the number of channels to generate a signal for. Truncates the + /// output in case the value before calling this method is too little. + /// Appends new channel configs in case to little is available. + /// + /// * nch: The new required number of channels + pub fn setNChannels(&mut self, nch: usize) { + self.channels.truncate(nch); + + while self.channels.len() < nch { + self.channels.push(SiggenChannelConfig::new()); + } + } + + /// Set the DC offset for all channels + pub fn setDCOffset(&mut self,dc: &[Flt]) { + self.channels.iter_mut().zip(dc).for_each( + |(ch, dc)| {ch.DCOffset = *dc;}); + + + } + /// Create a sine wave signal generator /// /// * freq: Frequency of the sine wave in \[Hz\] @@ -160,44 +214,49 @@ impl Siggen { Siggen { source, channels: vec![SiggenChannelConfig::new(); nchannels], + source_buf: vec![], + chout_buf: vec![], } } /// Creates *interleaved* output signal pub fn genSignal(&mut self, out: &mut [T]) where - T: Sample + FromSample, - Flt: Sample + T: Sample + FromSample + Debug, + Flt: Sample, { let nch = self.nchannels(); let nsamples: usize = out.len() / nch; assert!(out.len() % self.nchannels() == 0); - // No initialization required here, as the data is filled in in genSignal_unscaled // Create source signal - let mut src = Vec::with_capacity(nsamples); - unsafe { - src.set_len(nsamples); - } - self.source.genSignal_unscaled(&mut src); - - // Create output temporary vector - let mut chout = Vec::with_capacity(nsamples); - unsafe { - chout.set_len(nsamples); - } + self.source_buf.resize(nsamples, 0.); + self.source + .genSignal_unscaled(&mut self.source_buf.iter_mut()); + // println!("Source signal: {:?}", self.source_buf); // Write output while casted to the correct type // Iterate over each channel, and counter - for (ch, channel) in self.channels.iter_mut().enumerate() { - // Create output signal, overwrite chout, as it - channel.genSignal(&src, &mut chout); + self.chout_buf.resize(nch, vec![]); - let out_iterator = out.iter_mut().skip(ch).step_by(nch); - for (sampleout, samplein) in out_iterator.zip(&chout) { - *sampleout = samplein.to_sample(); - } + for (channelno, (channel, chout)) in self + .channels + .iter_mut() + .zip(self.chout_buf.iter_mut()) + .enumerate() + { + chout.resize(nsamples, 0.); + + // Create output signal, overwrite chout + channel.genSignal(&self.source_buf, chout); + // println!("Channel: {}, {:?}", channelno, chout); + + let out_iterator = out.iter_mut().skip(channelno).step_by(nch); + out_iterator + .zip(chout) + .for_each(|(out, chin)| *out = chin.to_sample()); } + // println!("{:?}", out); } /// Reset signal generator. Applies any kind of cleanup necessary. @@ -212,18 +271,30 @@ impl Siggen { } /// Mute / unmute all channels at once pub fn setAllMute(&mut self, mute: bool) { - self.channels.iter_mut().for_each(|s| {s.muted = mute;}); + self.channels.iter_mut().for_each(|s| { + s.setMute(mute); + }); + } + + /// Mute / unmute individual channels. Array of bools should have same size + /// as number of channels in signal generator. + pub fn setMute(&mut self, mute: &[bool]) { + assert!(mute.len() == self.nchannels()); + self.channels.iter_mut().zip(mute).for_each(|(s, m)| { + s.setMute(*m); + }); } } /// Signal generator config for a certain channel #[derive(Clone)] -pub struct SiggenChannelConfig { +struct SiggenChannelConfig { muted: bool, prefilter: Option>, gain: Flt, DCOffset: Flt, } +unsafe impl Send for SiggenChannelConfig {} impl SiggenChannelConfig { /// Set new pre-filter that filters the source signal pub fn setPreFilter(&mut self, pref: Option>) { @@ -301,17 +372,54 @@ mod test { // This code is just to check syntax. We should really be listening to these outputs. let mut t = [0.; 10]; Siggen::newWhiteNoise(1).genSignal(&mut t); - println!("{:?}", &t); + // println!("{:?}", &t); } #[test] fn test_sine() { - // This code is just to check syntax. We should really be listening to these outputs. - let mut s = [0.; 9]; + // This code is just to check syntax. We should really be listening to + // these outputs. + const N: usize = 10000; + let mut s1 = [0.; N]; + let mut s2 = [0.; N]; let mut siggen = Siggen::newSineWave(1, 1.); - siggen.reset(1.); - siggen.genSignal(&mut s); - println!("{:?}", &s); + + siggen.reset(10.); + siggen.setAllMute(false); + siggen.genSignal(&mut s1); + siggen.genSignal(&mut s2); + + let absdiff = s1.iter().zip(s2.iter()).map(|(s1, s2)| {Flt::abs(*s1-*s2)}).sum::(); + assert!(absdiff< 1e-10); + } + + #[test] + fn test_sine2() { + // Test if channels are properly separated etc. Check if RMS is correct + // for amplitude = 1.0. + const fs: Flt = 10.; + // Number of samples per channel + const Nframes: usize = 10000; + const Nch: usize = 2; + let mut signal = [0.; Nch*Nframes]; + let mut siggen = Siggen::newSineWave(Nch, 1.); + + siggen.reset(fs); + siggen.setMute(&[false, true]); + // siggen.channels[0].DCOffset = 0.1; + + // Split off in two terms, see if this works properly + siggen.genSignal(&mut signal[..Nframes/2]); + siggen.genSignal(&mut signal[Nframes/2..]); + + // Mean square of the signal + let ms1 = signal.iter().step_by(2).map(|s1| {*s1 * *s1}).sum::() / Nframes as Flt; + println!("ms1: {}",ms1); + + let ms2 = signal.iter().skip(1).step_by(2).map(|s1| {*s1 * *s1}).sum::() / Nframes as Flt; + + assert!(Flt::abs(ms1 - 0.5) < 1e-12); + assert_eq!(ms2 , 0.); } // A small test to learn a bit about sample types and conversion. This @@ -322,6 +430,5 @@ mod test { assert_eq!(1.0f32.to_sample::(), 127); assert_eq!(-1.0f32.to_sample::(), -127); assert_eq!(1.0f32.to_sample::(), i16::MAX); - } }