Bugfix in type detection for instreammsg. Refactored some overcomplicated code, added first steps for rtaps

This commit is contained in:
Anne de Jong 2024-07-18 13:06:49 +02:00
parent a905e58023
commit ff6a0687c4
16 changed files with 641 additions and 398 deletions

View File

@ -81,6 +81,9 @@ clap = { version = "4.5.8", features = ["derive", "color", "help", "suggestions"
# FFT's # FFT's
realfft = "3.3.0" realfft = "3.3.0"
# Fast Mutex
parking_lot = "0.12.3"
[dev-dependencies] [dev-dependencies]
approx = "0.5.1" approx = "0.5.1"
ndarray-rand = "0.14.0" ndarray-rand = "0.14.0"

View File

@ -38,8 +38,9 @@ fn main() -> Result<()> {
// eprint!("Obtained message: {:?}", msg); // eprint!("Obtained message: {:?}", msg);
match msg { match msg {
InStreamMsg::StreamStarted(meta) => { InStreamMsg::StreamStarted(meta) => {
println!("Stream started: {:?}", meta); println!("Stream started metadata: {meta:#?}");
}, },
InStreamMsg::InStreamData(_) => {}
_ => { println!("Other msg...");} _ => { println!("Other msg...");}
} }
} }

View File

@ -1,21 +1,21 @@
#![allow(dead_code)] #![allow(dead_code)]
use super::Stream; use super::Stream;
use super::StreamMetaData; use super::StreamMetaData;
use crate::config::{ self, * }; use crate::config::{self, *};
use crate::daq::{ self, * }; use crate::daq::{self, *};
use crate::daq::{ streamdata::*, StreamApiDescr }; use crate::daq::{streamdata::*, StreamApiDescr};
use anyhow::{ bail, Result }; use anyhow::{bail, Result};
use cpal::traits::{ DeviceTrait, HostTrait, StreamTrait }; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::SampleRate; use cpal::SampleRate;
use cpal::SupportedStreamConfig; use cpal::SupportedStreamConfig;
use cpal::{ Device, Host, Sample, SampleFormat, SupportedBufferSize }; use cpal::{Device, Host, Sample, SampleFormat, SupportedBufferSize};
use crossbeam::atomic::AtomicCell; use crossbeam::atomic::AtomicCell;
use crossbeam::channel::{ Receiver, Sender }; use crossbeam::channel::{Receiver, Sender};
use itertools::Itertools; use itertools::Itertools;
use num::ToPrimitive; use num::ToPrimitive;
use reinterpret::reinterpret_slice; use reinterpret::reinterpret_slice;
use std::any; use std::any;
use std::any::{ Any, TypeId }; use std::any::{Any, TypeId};
use std::collections::btree_map::OccupiedEntry; use std::collections::btree_map::OccupiedEntry;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt::Debug; use std::fmt::Debug;
@ -53,16 +53,16 @@ pub struct CpalApi {
} }
pub struct CpalStream { pub struct CpalStream {
stream: cpal::Stream, stream: cpal::Stream,
md: Arc<StreamMetaData>, metadata: Arc<StreamMetaData>,
noutchannels: usize, noutchannels: usize,
status: Arc<AtomicCell<StreamStatus>>, status: Arc<AtomicCell<StreamStatus>>,
} }
impl Stream for CpalStream { impl Stream for CpalStream {
fn metadata(&self) -> Arc<StreamMetaData> { fn metadata(&self) -> Arc<StreamMetaData> {
self.md.clone() self.metadata.clone()
} }
fn ninchannels(&self) -> usize { fn ninchannels(&self) -> usize {
self.md.nchannels() self.metadata.nchannels()
} }
fn noutchannels(&self) -> usize { fn noutchannels(&self) -> usize {
self.noutchannels self.noutchannels
@ -82,17 +82,14 @@ impl CpalApi {
} }
} }
pub fn getDeviceInfo(&self) -> Result<Vec<DeviceInfo>> { pub fn getDeviceInfo(&self) -> Result<Vec<DeviceInfo>> {
let srs_1 = [1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000]; let srs_1 = [
1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000,
];
let srs_2 = [11025, 22050, 44100, 88200]; let srs_2 = [11025, 22050, 44100, 88200];
let mut srs_tot = Vec::from_iter(srs_1.iter().chain(srs_2.iter())); let mut srs_tot = Vec::from_iter(srs_1.iter().chain(srs_2.iter()));
srs_tot.sort(); srs_tot.sort();
let srs_tot = Vec::from_iter( let srs_tot = Vec::from_iter(srs_tot.iter().copied().map(|i| *i as Flt));
srs_tot
.iter()
.copied()
.map(|i| *i as Flt)
);
// srs_tot.sort(); // srs_tot.sort();
@ -145,11 +142,8 @@ impl CpalApi {
continue; continue;
} }
let dtypes: Vec<DataType> = sample_formats let dtypes: Vec<DataType> =
.iter() sample_formats.iter().dedup().map(|i| (*i).into()).collect();
.dedup()
.map(|i| (*i).into())
.collect();
let prefDataType = match dtypes.iter().position(|d| d == &DataType::F32) { let prefDataType = match dtypes.iter().position(|d| d == &DataType::F32) {
Some(idx) => dtypes[idx], Some(idx) => dtypes[idx],
@ -189,8 +183,8 @@ impl CpalApi {
// Create the error function closure, that capture the send channel on which error messages from the stream are sent // Create the error function closure, that capture the send channel on which error messages from the stream are sent
fn create_errfcn( fn create_errfcn(
send_ch: Option<Sender<RawStreamData>>, send_ch: Option<Sender<InStreamMsg>>,
status: Arc<AtomicCell<StreamStatus>> status: Arc<AtomicCell<StreamStatus>>,
) -> impl FnMut(cpal::StreamError) { ) -> impl FnMut(cpal::StreamError) {
move |err: cpal::StreamError| { move |err: cpal::StreamError| {
let serr = match err { let serr = match err {
@ -198,19 +192,21 @@ impl CpalApi {
cpal::StreamError::BackendSpecific { err: _ } => StreamError::DriverError, cpal::StreamError::BackendSpecific { err: _ } => StreamError::DriverError,
}; };
if let Some(sender) = &send_ch { if let Some(sender) = &send_ch {
sender.send(RawStreamData::StreamError(serr)).unwrap(); sender.send(InStreamMsg::StreamError(serr)).unwrap();
} }
status.store(StreamStatus::Error { e: serr }); status.store(StreamStatus::Error { e: serr });
} }
} }
fn create_incallback<T>( fn create_incallback<T>(
meta: Arc<StreamMetaData>,
config: &cpal::StreamConfig, config: &cpal::StreamConfig,
sender: Sender<RawStreamData>, sender: Sender<InStreamMsg>,
framesPerBlock: usize, framesPerBlock: usize,
en_inchannels: Vec<usize> en_inchannels: Vec<usize>,
) -> impl FnMut(&[T], &cpal::InputCallbackInfo) ) -> impl FnMut(&[T], &cpal::InputCallbackInfo)
where T: 'static + Sample + ToPrimitive where
T: 'static + Sample + ToPrimitive,
{ {
let tot_inch = config.channels as usize; let tot_inch = config.channels as usize;
@ -219,15 +215,16 @@ impl CpalApi {
let mut enabled_ch_data: Vec<T> = let mut enabled_ch_data: Vec<T> =
vec![Sample::EQUILIBRIUM; en_inchannels.len() * framesPerBlock]; vec![Sample::EQUILIBRIUM; en_inchannels.len() * framesPerBlock];
// let meta = StreamMetaData::new()
let mut ctr = 0;
// The actual callback that is returned // The actual callback that is returned
move |input: &[T], _: &cpal::InputCallbackInfo| { move |input: &[T], _: &cpal::InputCallbackInfo| {
// Copy elements over in ring buffer // Copy elements over in ring buffer
q.extend(input); q.extend(input);
while q.len() > tot_inch * framesPerBlock { while q.len() > tot_inch * framesPerBlock {
// println!("q full enough: {}", q.len()); // Loop over enabled channels
// // Loop over enabled channels
for (i, ch) in en_inchannels.iter().enumerate() { for (i, ch) in en_inchannels.iter().enumerate() {
let in_iterator = q.iter().skip(*ch).step_by(tot_inch); let in_iterator = q.iter().skip(*ch).step_by(tot_inch);
let out_iterator = enabled_ch_data let out_iterator = enabled_ch_data
@ -245,8 +242,14 @@ impl CpalApi {
q.drain(0..framesPerBlock * tot_inch); q.drain(0..framesPerBlock * tot_inch);
// Send over data // Send over data
let msg = RawStreamData::from(enabled_ch_data.clone()); let streamdata = Arc::new(InStreamData::new(
sender.send(msg).unwrap(); ctr,
meta.clone(),
enabled_ch_data.clone(),
));
sender.send(InStreamMsg::InStreamData(streamdata)).unwrap();
ctr += 1;
} }
} }
} }
@ -257,12 +260,13 @@ impl CpalApi {
/// ///
/// * sf: Sample format /// * sf: Sample format
fn build_input_stream( fn build_input_stream(
meta: Arc<StreamMetaData>,
sf: cpal::SampleFormat, sf: cpal::SampleFormat,
config: &cpal::StreamConfig, config: &cpal::StreamConfig,
device: &cpal::Device, device: &cpal::Device,
sender: Sender<RawStreamData>, sender: Sender<InStreamMsg>,
en_inchannels: Vec<usize>, en_inchannels: Vec<usize>,
framesPerBlock: usize framesPerBlock: usize,
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> { ) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning {})); let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning {}));
@ -273,7 +277,9 @@ impl CpalApi {
match sf { match sf {
$( $(
$cpaltype => { $cpaltype => {
let icb = CpalApi::create_incallback::<$rtype>(&config, sender, framesPerBlock, en_inchannels); let icb = CpalApi::create_incallback::<$rtype>(
meta,
&config, sender, framesPerBlock, en_inchannels);
device.build_input_stream( device.build_input_stream(
&config, &config,
icb, icb,
@ -284,8 +290,7 @@ impl CpalApi {
} }
}; };
} }
let stream: cpal::Stream = let stream: cpal::Stream = build_stream!(
build_stream!(
SampleFormat::I8 => i8, SampleFormat::I8 => i8,
SampleFormat::I16 => i16, SampleFormat::I16 => i16,
SampleFormat::I32 => i32, SampleFormat::I32 => i32,
@ -299,15 +304,13 @@ impl CpalApi {
streamstatus: Arc<AtomicCell<StreamStatus>>, streamstatus: Arc<AtomicCell<StreamStatus>>,
receiver: Receiver<RawStreamData>, receiver: Receiver<RawStreamData>,
ch_config: &[DaqChannel], ch_config: &[DaqChannel],
framesPerBlock: usize framesPerBlock: usize,
) -> impl FnMut(&mut [T], &cpal::OutputCallbackInfo) ) -> impl FnMut(&mut [T], &cpal::OutputCallbackInfo)
where T: 'static + Sample + Debug where
T: 'static + Sample + Debug,
{ {
let number_total_out_channels: usize = config.channels as usize; let number_total_out_channels: usize = config.channels as usize;
let number_enabled_out_channels = ch_config let number_enabled_out_channels = ch_config.iter().filter(|ch| ch.enabled).count();
.iter()
.filter(|ch| ch.enabled)
.count();
let disabled_ch = DaqChannel::default(); let disabled_ch = DaqChannel::default();
let disabled_repeater = std::iter::repeat(&disabled_ch); let disabled_repeater = std::iter::repeat(&disabled_ch);
@ -329,10 +332,11 @@ impl CpalApi {
let status = streamstatus.load(); let status = streamstatus.load();
callback_ctr += 1; callback_ctr += 1;
let mut setToEquilibrium = || let mut setToEquilibrium = || {
outdata.iter_mut().for_each(|v| { outdata.iter_mut().for_each(|v| {
*v = Sample::EQUILIBRIUM; *v = Sample::EQUILIBRIUM;
}); })
};
match status { match status {
StreamStatus::NotRunning {} | StreamStatus::Error { .. } => { StreamStatus::NotRunning {} | StreamStatus::Error { .. } => {
setToEquilibrium(); setToEquilibrium();
@ -355,19 +359,18 @@ impl CpalApi {
// All right, we have enough samples to send out! They are // All right, we have enough samples to send out! They are
// drained from the queue // drained from the queue
let out_chunks = outdata.iter_mut().chunks(number_total_out_channels); let out_chunks = outdata.iter_mut().chunks(number_total_out_channels);
let siggen_chunks = q.drain(..nsamples_asked).chunks(number_enabled_out_channels); let siggen_chunks = q
.drain(..nsamples_asked)
.chunks(number_enabled_out_channels);
for (och, ich) in out_chunks.into_iter().zip(siggen_chunks.into_iter()) { for (och, ich) in out_chunks.into_iter().zip(siggen_chunks.into_iter()) {
let mut sig_frame_iter = ich.into_iter(); let mut sig_frame_iter = ich.into_iter();
och.into_iter() och.into_iter().zip(&enabled_outch).for_each(|(o, en)| {
.zip(&enabled_outch) (if *en {
.for_each(|(o, en)| (
if *en {
*o = sig_frame_iter.next().unwrap(); *o = sig_frame_iter.next().unwrap();
} else { } else {
*o = Sample::EQUILIBRIUM; *o = Sample::EQUILIBRIUM;
} })
)); });
} }
// outdata // outdata
@ -396,7 +399,7 @@ impl CpalApi {
device: &cpal::Device, device: &cpal::Device,
receiver: Receiver<RawStreamData>, receiver: Receiver<RawStreamData>,
ch_config: &[DaqChannel], ch_config: &[DaqChannel],
framesPerBlock: usize framesPerBlock: usize,
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> { ) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning {})); let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning {}));
@ -417,8 +420,7 @@ impl CpalApi {
} }
}; };
} }
let stream: cpal::Stream = let stream: cpal::Stream = build_stream!(
build_stream!(
SampleFormat::I8 => i8, SampleFormat::I8 => i8,
SampleFormat::I16 => i16, SampleFormat::I16 => i16,
SampleFormat::I32 => i32, SampleFormat::I32 => i32,
@ -434,9 +436,10 @@ impl CpalApi {
devinfo: &DeviceInfo, devinfo: &DeviceInfo,
conf: &DaqConfig, conf: &DaqConfig,
_dev: &cpal::Device, _dev: &cpal::Device,
conf_iterator: T conf_iterator: T,
) -> Result<cpal::SupportedStreamConfig> ) -> Result<cpal::SupportedStreamConfig>
where T: Iterator<Item = cpal::SupportedStreamConfigRange> where
T: Iterator<Item = cpal::SupportedStreamConfigRange>,
{ {
let nchannels = match st { let nchannels = match st {
StreamType::Input => devinfo.iChannelCount, StreamType::Input => devinfo.iChannelCount,
@ -448,9 +451,8 @@ impl CpalApi {
// Specified sample format is available // Specified sample format is available
if cpalconf.channels() == (nchannels as u16) { if cpalconf.channels() == (nchannels as u16) {
let requested_sr = conf.sampleRate(devinfo); let requested_sr = conf.sampleRate(devinfo);
if if (cpalconf.min_sample_rate().0 as Flt) <= requested_sr
(cpalconf.min_sample_rate().0 as Flt) <= requested_sr && && (cpalconf.max_sample_rate().0 as Flt) >= requested_sr
(cpalconf.max_sample_rate().0 as Flt) >= requested_sr
{ {
// Sample rate falls within range. // Sample rate falls within range.
let requested_fpb = conf.framesPerBlock(devinfo) as u32; let requested_fpb = conf.framesPerBlock(devinfo) as u32;
@ -482,27 +484,25 @@ impl CpalApi {
stype: StreamType, stype: StreamType,
devinfo: &DeviceInfo, devinfo: &DeviceInfo,
conf: &DaqConfig, conf: &DaqConfig,
sender: Sender<RawStreamData> sender: Sender<InStreamMsg>,
) -> Result<Box<dyn Stream>> { ) -> Result<Box<dyn Stream>> {
for cpaldev in self.host.devices()? { for cpaldev in self.host.devices()? {
// See if we can create a supported stream config. // See if we can create a supported stream config.
let supported_config = (match stype { let supported_config = (match stype {
StreamType::Duplex => bail!("Duplex stream not supported for CPAL"), StreamType::Duplex => bail!("Duplex stream not supported for CPAL"),
StreamType::Input => StreamType::Input => CpalApi::create_cpal_config(
CpalApi::create_cpal_config(
stype, stype,
devinfo, devinfo,
conf, conf,
&cpaldev, &cpaldev,
cpaldev.supported_input_configs()? cpaldev.supported_input_configs()?,
), ),
StreamType::Output => StreamType::Output => CpalApi::create_cpal_config(
CpalApi::create_cpal_config(
stype, stype,
devinfo, devinfo,
conf, conf,
&cpaldev, &cpaldev,
cpaldev.supported_output_configs()? cpaldev.supported_output_configs()?,
), ),
})?; })?;
let framesPerBlock = conf.framesPerBlock(devinfo); let framesPerBlock = conf.framesPerBlock(devinfo);
@ -514,37 +514,34 @@ impl CpalApi {
&conf.enabledInchannelConfig(), &conf.enabledInchannelConfig(),
conf.dtype, conf.dtype,
supported_config.sample_rate().0 as Flt, supported_config.sample_rate().0 as Flt,
framesPerBlock framesPerBlock,
)?; );
let meta = Arc::new(meta); let meta = Arc::new(meta);
let (stream, status) = CpalApi::build_input_stream( let (stream, status) = CpalApi::build_input_stream(
meta.clone(),
sf, sf,
&config, &config,
&cpaldev, &cpaldev,
sender, sender,
conf.enabledInchannelsList(), conf.enabledInchannelsList(),
framesPerBlock framesPerBlock,
)?; )?;
stream.play()?; stream.play()?;
status.store(StreamStatus::Running {}); status.store(StreamStatus::Running {});
return Ok( return Ok(Box::new(CpalStream {
Box::new(CpalStream {
stream, stream,
md: meta, metadata: meta,
noutchannels: 0, noutchannels: 0,
status, status,
}) }));
);
} }
bail!( bail!(format!(
format!(
"Error: requested device {} not found. Please make sure the device is available.", "Error: requested device {} not found. Please make sure the device is available.",
devinfo.device_name devinfo.device_name
) ))
)
} }
/// Start a default input stream. /// Start a default input stream.
@ -552,7 +549,7 @@ impl CpalApi {
/// ///
pub fn startDefaultInputStream( pub fn startDefaultInputStream(
&mut self, &mut self,
sender: Sender<RawStreamData> sender: Sender<InStreamMsg>,
) -> Result<Box<dyn Stream>> { ) -> Result<Box<dyn Stream>> {
if let Some(device) = self.host.default_input_device() { if let Some(device) = self.host.default_input_device() {
if let Ok(config) = device.default_input_config() { if let Ok(config) = device.default_input_config() {
@ -565,43 +562,41 @@ impl CpalApi {
let en_inchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize)); let en_inchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize));
let sf = config.sample_format(); let sf = config.sample_format();
// Specify data tape
let dtype = DataType::from(sf);
// Daq: default channel config
let daqchannels = Vec::from_iter(
(0..final_config.channels)
.map(|i| DaqChannel::defaultAudio(format!("Unnamed input channel {}", i))),
);
// Create stream metadata
let metadata = StreamMetaData::new(
&daqchannels,
dtype,
config.sample_rate().0 as Flt,
framesPerBlock,
);
let metadata = Arc::new(metadata);
let (stream, status) = CpalApi::build_input_stream( let (stream, status) = CpalApi::build_input_stream(
metadata.clone(),
sf, sf,
&final_config, &final_config,
&device, &device,
sender, sender,
en_inchannels, en_inchannels,
framesPerBlock framesPerBlock,
)?; )?;
stream.play()?; stream.play()?;
status.store(StreamStatus::Running {}); status.store(StreamStatus::Running {});
// Daq: default channel config Ok(Box::new(CpalStream {
let daqchannels = Vec::from_iter(
(0..final_config.channels).map(|i|
DaqChannel::defaultAudio(format!("Unnamed input channel {}", i))
)
);
// Specify data tape
let dtype = DataType::from(sf);
// Create stream metadata
let md = StreamMetaData::new(
&daqchannels,
dtype,
config.sample_rate().0 as Flt,
framesPerBlock
)?;
let md = Arc::new(md);
Ok(
Box::new(CpalStream {
stream, stream,
md, metadata,
noutchannels: 0, noutchannels: 0,
status, status,
}) }))
)
} else { } else {
bail!("Could not obtain default input configuration") bail!("Could not obtain default input configuration")
} }
@ -629,15 +624,14 @@ impl CpalApi {
pub fn startDefaultOutputStream( pub fn startDefaultOutputStream(
&self, &self,
receiver: Receiver<RawStreamData> receiver: Receiver<RawStreamData>,
) -> Result<Box<dyn Stream>> { ) -> Result<Box<dyn Stream>> {
let (device, config, sampleformat, framesPerBlock) = self.getDefaultOutputConfig()?; let (device, config, sampleformat, framesPerBlock) = self.getDefaultOutputConfig()?;
// Daq: default channel config // Daq: default channel config
let daqchannels = Vec::from_iter( let daqchannels = Vec::from_iter(
(0..config.channels).map(|i| (0..config.channels)
DaqChannel::defaultAudio(format!("Unnamed output channel {}", i)) .map(|i| DaqChannel::defaultAudio(format!("Unnamed output channel {}", i))),
)
); );
let (stream, status) = CpalApi::build_output_stream( let (stream, status) = CpalApi::build_output_stream(
sampleformat, sampleformat,
@ -645,7 +639,7 @@ impl CpalApi {
&device, &device,
receiver, receiver,
&daqchannels, &daqchannels,
framesPerBlock framesPerBlock,
)?; )?;
stream.play()?; stream.play()?;
@ -659,12 +653,12 @@ impl CpalApi {
&daqchannels, &daqchannels,
dtype, dtype,
config.sample_rate.0 as Flt, config.sample_rate.0 as Flt,
framesPerBlock framesPerBlock,
)?; );
let md = Arc::new(md); let md = Arc::new(md);
let str = Box::new(CpalStream { let str = Box::new(CpalStream {
stream, stream,
md, metadata: md,
noutchannels: daqchannels.len(), noutchannels: daqchannels.len(),
status, status,
}); });
@ -674,7 +668,7 @@ impl CpalApi {
fn getCPALOutputConfig( fn getCPALOutputConfig(
&self, &self,
dev: &DeviceInfo, dev: &DeviceInfo,
daqconfig: &DaqConfig daqconfig: &DaqConfig,
) -> Result<(Device, cpal::StreamConfig, SampleFormat, usize)> { ) -> Result<(Device, cpal::StreamConfig, SampleFormat, usize)> {
let samplerate = dev.avSampleRates[daqconfig.sampleRateIndex] as u32; let samplerate = dev.avSampleRates[daqconfig.sampleRateIndex] as u32;
let framesPerBlock = dev.avFramesPerBlock[daqconfig.framesPerBlockIndex]; let framesPerBlock = dev.avFramesPerBlock[daqconfig.framesPerBlockIndex];
@ -716,12 +710,10 @@ impl CpalApi {
&self, &self,
dev: &DeviceInfo, dev: &DeviceInfo,
cfg: &DaqConfig, cfg: &DaqConfig,
receiver: Receiver<RawStreamData> receiver: Receiver<RawStreamData>,
) -> Result<Box<dyn Stream>> { ) -> Result<Box<dyn Stream>> {
let (device, cpalconfig, sampleformat, framesPerBlock) = self.getCPALOutputConfig( let (device, cpalconfig, sampleformat, framesPerBlock) =
dev, self.getCPALOutputConfig(dev, cfg)?;
cfg
)?;
let (stream, status) = Self::build_output_stream( let (stream, status) = Self::build_output_stream(
sampleformat, sampleformat,
@ -729,7 +721,7 @@ impl CpalApi {
&device, &device,
receiver, receiver,
&cfg.outchannel_config, &cfg.outchannel_config,
framesPerBlock framesPerBlock,
)?; )?;
stream.play()?; stream.play()?;
@ -742,12 +734,12 @@ impl CpalApi {
&cfg.enabledOutchannelConfig(), &cfg.enabledOutchannelConfig(),
dtype, dtype,
cpalconfig.sample_rate.0 as Flt, cpalconfig.sample_rate.0 as Flt,
framesPerBlock framesPerBlock,
)?; );
let md = Arc::new(md); let md = Arc::new(md);
let str = Box::new(CpalStream { let str = Box::new(CpalStream {
stream, stream,
md, metadata: md,
noutchannels: cpalconfig.channels as usize, noutchannels: cpalconfig.channels as usize,
status, status,
}); });

View File

@ -8,7 +8,7 @@ use strum::EnumMessage;
use strum_macros; use strum_macros;
use crate::config::*; use crate::config::*;
use super::{streamdata::StreamMetaData, streamstatus::StreamStatus}; use super::{StreamStatus, StreamMetaData};
#[cfg(feature = "cpal-api")] #[cfg(feature = "cpal-api")]
pub mod api_cpal; pub mod api_cpal;

View File

@ -19,6 +19,8 @@ mod qty;
#[cfg(feature = "record")] #[cfg(feature = "record")]
mod record; mod record;
#[cfg(feature = "record")]
pub use record::*;
mod streamcmd; mod streamcmd;
mod streamdata; mod streamdata;
@ -26,6 +28,8 @@ mod streamhandler;
mod streammgr; mod streammgr;
mod streammsg; mod streammsg;
mod streamstatus; mod streamstatus;
mod streammetadata;
mod streamerror;
// Module re-exports // Module re-exports
pub use daqconfig::{DaqChannel, DaqConfig}; pub use daqconfig::{DaqChannel, DaqConfig};
@ -36,15 +40,14 @@ pub use streamhandler::StreamHandler;
pub use streammgr::*; pub use streammgr::*;
pub use streammsg::InStreamMsg; pub use streammsg::InStreamMsg;
pub use streamstatus::StreamStatus; pub use streamstatus::StreamStatus;
pub use streamdata::{RawStreamData, InStreamData};
pub use streammetadata::StreamMetaData;
pub use streamerror::StreamError;
use api::*; use api::*;
#[cfg(feature = "record")] #[cfg(feature = "record")]
pub use record::*;
use strum_macros::Display;
use crate::config::*; use crate::config::*;
use super::*;
/// Stream types that can be started /// Stream types that can be started
/// ///
#[cfg_attr(feature = "python-bindings", pyclass)] #[cfg_attr(feature = "python-bindings", pyclass)]
@ -76,34 +79,3 @@ pub fn add_py_classses(m: &Bound<'_, PyModule>) -> PyResult<()> {
Ok(()) Ok(())
} }
/// Errors that happen in a stream
#[derive(strum_macros::EnumMessage, Debug, Clone, Display, Copy)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub enum StreamError {
/// Input overrun
#[strum(
message = "InputOverrun Error",
detailed_message = "Input buffer overrun"
)]
InputOverrunError,
/// Output underrun
#[strum(
message = "OutputUnderrunError",
detailed_message = "Output buffer underrun"
)]
OutputUnderrunError,
/// Driver specific error
#[strum(message = "DriverError", detailed_message = "Driver error")]
DriverError,
/// Device
#[strum(detailed_message = "Device not available (anymore)")]
DeviceNotAvailable,
/// Logic error (something weird happened)
#[strum(detailed_message = "Logic error")]
LogicError,
}

View File

@ -5,10 +5,11 @@ use clap::builder::OsStr;
use crossbeam::atomic::AtomicCell; use crossbeam::atomic::AtomicCell;
use hdf5::types::{VarLenArray, VarLenUnicode}; use hdf5::types::{VarLenArray, VarLenUnicode};
use hdf5::{dataset, datatype, Dataset, File, H5Type}; use hdf5::{dataset, datatype, Dataset, File, H5Type};
use parking_lot::Mutex;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamdata::*; use streamdata::*;
@ -155,34 +156,35 @@ impl Recording {
fn append_to_dset( fn append_to_dset(
ds: &Dataset, ds: &Dataset,
ctr: usize, ctr: usize,
msg: &RawStreamData, data: &InStreamData,
framesPerBlock: usize, framesPerBlock: usize,
nchannels: usize, nchannels: usize,
) -> Result<()> { ) -> Result<()> {
match msg { match data.getRaw() {
RawStreamData::Datai8(dat) => { RawStreamData::Datai8(dat) => {
let arr = ndarray::ArrayView2::<i8>::from_shape((framesPerBlock, nchannels), dat)?; let arr = ndarray::ArrayView2::<i8>::from_shape((framesPerBlock, nchannels), &dat)?;
ds.write_slice(arr, (ctr, .., ..))?; ds.write_slice(arr, (ctr, .., ..))?;
} }
RawStreamData::Datai16(dat) => { RawStreamData::Datai16(dat) => {
let arr = ndarray::ArrayView2::<i16>::from_shape((framesPerBlock, nchannels), dat)?; let arr =
ndarray::ArrayView2::<i16>::from_shape((framesPerBlock, nchannels), &dat)?;
ds.write_slice(arr, (ctr, .., ..))?; ds.write_slice(arr, (ctr, .., ..))?;
} }
RawStreamData::Datai32(dat) => { RawStreamData::Datai32(dat) => {
let arr = ndarray::ArrayView2::<i32>::from_shape((framesPerBlock, nchannels), dat)?; let arr =
ndarray::ArrayView2::<i32>::from_shape((framesPerBlock, nchannels), &dat)?;
ds.write_slice(arr, (ctr, .., ..))?; ds.write_slice(arr, (ctr, .., ..))?;
} }
RawStreamData::Dataf32(dat) => { RawStreamData::Dataf32(dat) => {
let arr = ndarray::ArrayView2::<f32>::from_shape((framesPerBlock, nchannels), dat)?; let arr =
ndarray::ArrayView2::<f32>::from_shape((framesPerBlock, nchannels), &dat)?;
ds.write_slice(arr, (ctr, .., ..))?; ds.write_slice(arr, (ctr, .., ..))?;
} }
RawStreamData::Dataf64(dat) => { RawStreamData::Dataf64(dat) => {
let arr = ndarray::ArrayView2::<f64>::from_shape((framesPerBlock, nchannels), dat)?; let arr =
ndarray::ArrayView2::<f64>::from_shape((framesPerBlock, nchannels), &dat)?;
ds.write_slice(arr, (ctr, .., ..))?; ds.write_slice(arr, (ctr, .., ..))?;
} }
RawStreamData::StreamError(e) => {
bail!("Stream error: {}", e)
}
} }
Ok(()) Ok(())
} }
@ -306,7 +308,6 @@ impl Recording {
InStreamMsg::StreamError(e) => { InStreamMsg::StreamError(e) => {
bail!("Recording failed due to stream error: {}.", e) bail!("Recording failed due to stream error: {}.", e)
} }
InStreamMsg::ConvertedStreamData(..) => {}
InStreamMsg::StreamStarted(_) => { InStreamMsg::StreamStarted(_) => {
bail!("Stream started again?") bail!("Stream started again?")
} }
@ -314,12 +315,12 @@ impl Recording {
// Early stop. User stopped it. // Early stop. User stopped it.
break 'recloop; break 'recloop;
} }
InStreamMsg::StreamData(dat) => { InStreamMsg::InStreamData(instreamdata) => {
if first { if first {
first = false; first = false;
// Initialize counter offset // Initialize counter offset
ctr_offset = dat.ctr; ctr_offset = instreamdata.ctr;
} else if dat.ctr != stored_ctr + ctr_offset { } else if instreamdata.ctr != stored_ctr + ctr_offset {
println!("********** PACKAGES MISSED ***********"); println!("********** PACKAGES MISSED ***********");
bail!("Packages missed. Recording is invalid.") bail!("Packages missed. Recording is invalid.")
} }
@ -340,7 +341,7 @@ impl Recording {
Recording::append_to_dset( Recording::append_to_dset(
&ds, &ds,
stored_ctr, stored_ctr,
&dat.raw, &instreamdata,
framesPerBlock, framesPerBlock,
nchannels, nchannels,
)?; )?;

View File

@ -1,153 +1,63 @@
//! Provides stream messages that come from a running stream //! Provides stream messages that come from a running stream
use crate::config::*; use crate::config::*;
use crate::daq::Qty;
use crate::siggen::Siggen; use crate::siggen::Siggen;
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use cpal::Sample; use cpal::{FromSample, Sample};
use crossbeam::channel::Sender; use itertools::Itertools;
use num::cast::AsPrimitive;
use num::{Bounded, FromPrimitive, Num};
use super::*;
use super::*;
use parking_lot::RwLock;
use reinterpret::{reinterpret_slice, reinterpret_vec}; use reinterpret::{reinterpret_slice, reinterpret_vec};
use std::any::TypeId; use std::any::TypeId;
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use std::u128::MAX; use std::u128::MAX;
use strum_macros::Display; use strum_macros::Display;
use super::*; /// Raw stream data coming from a stream or going to a stream.
/// Raw stream data coming from a stream.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum RawStreamData { pub enum RawStreamData {
/// 8-bits integer /// 8-bits integers
Datai8(Vec<i8>), Datai8(Vec<i8>),
/// 16-bits integer /// 16-bits integers
Datai16(Vec<i16>), Datai16(Vec<i16>),
/// 32-bits integer /// 32-bits integers
Datai32(Vec<i32>), Datai32(Vec<i32>),
/// 32-bits float /// 32-bits floats
Dataf32(Vec<f32>), Dataf32(Vec<f32>),
/// 64-bits float /// 64-bits floats
Dataf64(Vec<f64>), Dataf64(Vec<f64>),
/// A stream error occured
StreamError(StreamError),
} }
impl RawStreamData { impl RawStreamData {
pub fn toFloat(&self, _nchannels: usize) -> Dmat { /// Create raw stream data from slice of data, or vec of data. Copies over
// match &self { /// the data.
// RawStreamData::Datai8(c) => { fn new<T, U>(input: T) -> RawStreamData
// Dmat::zeros((2, 2));
// }
// RawStreamData::Datai16(c) => {
// Dmat::zeros((2, 2));
// }
// }
todo!()
}
}
impl RawStreamData {
/// Get reference to raw data buffer
pub fn getRef<T>(&self) -> &[T]
where where
T: Sample + 'static, T: Into<Vec<U>> + 'static,
U: num::ToPrimitive + Clone + 'static,
{ {
let thetype: TypeId = TypeId::of::<T>(); let input = input.into();
match &self { // Apparently, this code does not work with a match. I have searched
RawStreamData::Datai8(t) => { // around and have not found the reason for this. So this is a bit of
// stupid boilerplate.
let i8type: TypeId = TypeId::of::<i8>(); let i8type: TypeId = TypeId::of::<i8>();
assert!(thetype == i8type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Datai16(t) => {
let i16type: TypeId = TypeId::of::<i16>(); let i16type: TypeId = TypeId::of::<i16>();
assert!(thetype == i16type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Datai32(t) => {
let i32type: TypeId = TypeId::of::<i32>(); let i32type: TypeId = TypeId::of::<i32>();
assert!(thetype == i32type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Dataf32(t) => {
let f32type: TypeId = TypeId::of::<f32>(); let f32type: TypeId = TypeId::of::<f32>();
assert!(thetype == f32type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Dataf64(t) => {
let f64type: TypeId = TypeId::of::<f64>(); let f64type: TypeId = TypeId::of::<f64>();
assert!(thetype == f64type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
_ => panic!("Cannot getRef from "),
}
}
}
// Create InStreamData object from // The type to create for
impl<T> From<&[T]> for RawStreamData let thetype: TypeId = TypeId::of::<U>();
where
T: num::ToPrimitive + Clone + 'static,
{
fn from(input: &[T]) -> RawStreamData {
// Apparently, this code does not work with a match. I have searched around and have not found the
// reason for this. So this is a bit of stupid boilerplate.
let i8type: TypeId = TypeId::of::<i8>();
let i16type: TypeId = TypeId::of::<i16>();
let i32type: TypeId = TypeId::of::<i32>();
let f32type: TypeId = TypeId::of::<f32>();
let f64type: TypeId = TypeId::of::<f64>();
let thetype: TypeId = TypeId::of::<T>();
if i8type == thetype {
let v: Vec<i8> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai8(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai16(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai16(v)
} else if i32type == thetype {
let v: Vec<i32> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai32(v)
} else if f32type == thetype {
let v: Vec<f32> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Dataf32(v)
} else if f64type == thetype {
let v: Vec<f64> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Dataf64(v)
} else {
panic!("Not implemented sample type!")
}
}
}
// Create InStreamData object from
impl<T> From<Vec<T>> for RawStreamData
where
T: num::ToPrimitive + Clone + 'static,
{
fn from(input: Vec<T>) -> RawStreamData {
// Apparently, this code does not work with a match. I have searched around and have not found the
// reason for this. So this is a bit of stupid boilerplate.
let i8type: TypeId = TypeId::of::<i8>();
let i16type: TypeId = TypeId::of::<i16>();
let i32type: TypeId = TypeId::of::<i32>();
let f32type: TypeId = TypeId::of::<f32>();
let f64type: TypeId = TypeId::of::<f64>();
let thetype: TypeId = TypeId::of::<T>();
if i8type == thetype { if i8type == thetype {
let v: Vec<i8> = unsafe { reinterpret_vec(input) }; let v: Vec<i8> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai8(v) RawStreamData::Datai8(v)
} else if i16type == thetype { } else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_vec(input) }; let v: Vec<i16> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai16(v) RawStreamData::Datai16(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai16(v)
} else if i32type == thetype { } else if i32type == thetype {
let v: Vec<i32> = unsafe { reinterpret_vec(input) }; let v: Vec<i32> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai32(v) RawStreamData::Datai32(v)
@ -158,57 +68,49 @@ where
let v: Vec<f64> = unsafe { reinterpret_vec(input) }; let v: Vec<f64> = unsafe { reinterpret_vec(input) };
RawStreamData::Dataf64(v) RawStreamData::Dataf64(v)
} else { } else {
panic!("Not implemented sample type!") panic!("Not implemented sample type! Type: {thetype:?}, i8 = {i8type:?}, i16 = {i16type:?}, i32 = {i32type:?}, f32 = {f32type:?}, f64 = {f64type:?}.")
} }
} }
} /// Return a reference to the slice of data.
/// Stream metadata. All information required for properly interpreting the raw
/// data that is coming from the stream.
#[derive(Clone, Debug)]
pub struct StreamMetaData {
/// Information for each channel in the stream
pub channelInfo: Vec<DaqChannel>,
/// The data type of the device [Number / voltage / Acoustic pressure / ...]
pub rawDatatype: DataType,
/// Sample rate in [Hz]
pub samplerate: Flt,
/// The number of frames per block of data that comes in. Multiplied by
/// channelInfo.len() we get the total number of samples that come in at
/// each callback.
pub framesPerBlock: usize,
}
impl StreamMetaData {
/// Create new metadata object.
/// ///
/// # Args
/// ///
pub fn new( /// # Panics
channelInfo: &[DaqChannel], ///
rawdtype: DataType, /// - If the tye requested does not match the type stored.
sr: Flt, pub fn getRef<T>(&self) -> &[T]
framesPerBlock: usize, where
) -> Result<StreamMetaData> { T: Sample + 'static,
Ok(StreamMetaData { {
channelInfo: channelInfo.to_vec(), let type_requested = TypeId::of::<T>();
rawDatatype: rawdtype, macro_rules! ret_ref {
samplerate: sr, ($c:expr,$t:ty) => {{
framesPerBlock, let type_this = TypeId::of::<$t>();
}) assert_eq!(type_requested, type_this, "Wrong type requested");
unsafe { reinterpret_slice::<$t, T>(&$c) }
}};
}
use RawStreamData::*;
match &self {
Datai8(v) => {
ret_ref!(v, i8)
}
Datai16(v) => {
ret_ref!(v, i16)
}
Datai32(v) => {
ret_ref!(v, i32)
}
Dataf32(v) => {
ret_ref!(v, f32)
}
Dataf64(v) => {
ret_ref!(v, f64)
}
} }
/// Returns the number of channels in the stream metadata.
pub fn nchannels(&self) -> usize {
self.channelInfo.len()
} }
} }
/// Stream data (audio / other) coming from a stream or to be send to a stream /// Stream data (audio / other) coming from a stream or to be send to a stream
#[derive(Debug)] #[derive(Debug)]
pub struct StreamData { pub struct InStreamData {
/// Package counter. Should always increase monotonically. /// Package counter. Should always increase monotonically.
pub ctr: usize, pub ctr: usize,
@ -216,45 +118,177 @@ pub struct StreamData {
pub meta: Arc<StreamMetaData>, pub meta: Arc<StreamMetaData>,
/// This is typically what is stored when recording /// This is typically what is stored when recording
pub raw: RawStreamData, raw: RawStreamData,
// Converted to floating point format. Used for further real time // Converted to floating point format. Used for further real time
// processing. Stored in an rw-lock. The first thread that acesses this data // processing. Stored in an rw-lock. The first thread that acesses this data
// will perform the conversion. All threads after that will get the data. // will perform the conversion. All threads after that will get the data.
converted: RwLock<Option<Arc<Dmat>>>, converted: RwLock<Option<Arc<Dmat>>>,
} }
impl StreamData {
impl InStreamData {
#[inline]
/// Return reference to underlying raw data storage
pub fn getRaw(&self) -> &RawStreamData {
return &self.raw;
}
#[inline]
/// Convenience function to return the number of channels in this instreamdata.
pub fn nchannels(&self) -> usize {
return self.meta.nchannels();
}
/// Iterate over raw data of a certain channel. Tye should be specificied
/// and if not set correctly, this results in undefined behavior
pub fn iter_channel_raw<'a, T>(&'a self, ch: usize) -> impl Iterator<Item = &'a T> + 'a
where
T: Sample + Copy + 'static,
{
let type_requested: TypeId = TypeId::of::<T>();
macro_rules! create_iter {
($c:expr,$t:ty) => {{
// Check that the type matches the type stored
let cur_type: TypeId = TypeId::of::<$t>();
assert!(
type_requested == cur_type,
"BUG: Type mismatch on channel data iterator"
);
let v: &'a [T] = unsafe { reinterpret_slice($c) };
v.iter().skip(ch).step_by(self.meta.nchannels())
}};
};
match &self.raw {
RawStreamData::Datai8(c) => {
create_iter!(c, i8)
}
RawStreamData::Datai16(c) => {
create_iter!(c, i16)
}
RawStreamData::Datai32(c) => {
create_iter!(c, i32)
}
RawStreamData::Dataf32(c) => {
create_iter!(c, f32)
}
RawStreamData::Dataf64(c) => {
create_iter!(c, f64)
}
}
}
/// Iterate over all channels, deinterleaved. So first all samples from the
/// first channel, etc...
pub fn iter_deinterleaved_raw_allchannels<'a, T>(
&'a self,
) -> Box<dyn Iterator<Item = &'a T> + 'a>
where
T: Sample + Copy + 'static,
{
Box::new(
(0..self.meta.nchannels())
.into_iter()
.flat_map(|chi| self.iter_channel_raw(chi)),
)
}
fn iter_channel_converted<'a, T>(&'a self, ch: usize) -> impl Iterator<Item = Flt> + 'a
where
T: Sample + Copy + 'static,
Flt: FromSample<T>,
{
self.iter_channel_raw(ch)
.copied()
.map(move |v: T| Flt::from_sample(v) / self.meta.channelInfo[ch].sensitivity)
}
/// Iterate over data. where data is converted to floating point, and
/// corrected for sensivity values. Returns all data, in order of channel.
pub fn iter_deinterleaved_converted<'a, T>(&'a self) -> Box<dyn Iterator<Item = Flt> + 'a>
where
T: Sample + Copy + 'static,
Flt: FromSample<T>,
{
Box::new(
(0..self.meta.nchannels())
.into_iter()
.flat_map(move |chi| self.iter_channel_converted(chi)),
)
}
/// Create new stream data object. /// Create new stream data object.
pub fn new(ctr: usize, meta: Arc<StreamMetaData>, raw: RawStreamData) -> StreamData { pub fn new<T, U>(ctr: usize, meta: Arc<StreamMetaData>, raw: T) -> InStreamData
StreamData { where
T: Into<Vec<U>> + 'static,
U: Sample + num::ToPrimitive + Clone + 'static,
{
InStreamData {
ctr, ctr,
meta, meta,
raw, raw: RawStreamData::new(raw),
converted: RwLock::new(None), converted: RwLock::new(None),
} }
} }
/// Returns the number of frames in this InstreamData
pub fn nframes(&self) -> usize {
let nch = self.meta.nchannels();
match &self.raw {
RawStreamData::Datai8(c) => {
return c.len() / nch;
}
RawStreamData::Datai16(c) => {
return c.len() / nch;
}
RawStreamData::Datai32(c) => {
return c.len() / nch;
}
RawStreamData::Dataf32(c) => {
return c.len() / nch;
}
RawStreamData::Dataf64(c) => {
return c.len() / nch;
}
}
}
/// Get the data in floating point format. If already converted, uses the /// Get the data in floating point format. If already converted, uses the
/// cached float data. /// cached float data.
pub fn getFloatData(&self) -> Arc<Dmat> { pub fn getFloatData(&self) -> Arc<Dmat> {
if let Some(dat) = self.converted.read().unwrap().as_ref() { if let Some(dat) = self.converted.read().as_ref() {
return dat.clone(); return dat.clone();
} }
// In case we reach here, the data has not yet be converted to floating // In case we reach here, the data has not yet be converted to floating
// point, so we do this. // point, so we do this.
let mut o = self.converted.write().unwrap(); let mut write_lock = self.converted.write();
// It might be that another thread was 'first', and already performed // It might be that another thread was 'first', and already performed
// the conversion. In that case, we still do an early return, and we // the conversion. In that case, we still do an early return, and we
// just openend the lock twice for writing. Not a problem. // just openend the lock twice for writing. Not a problem.
if let Some(dat) = o.as_ref() { if let Some(dat) = write_lock.as_ref() {
return dat.clone(); return dat.clone();
} }
let errmsg = "Data cannot be converted to floating point";
macro_rules! convert_data {
($t:ty) => {
Dmat::from_shape_vec(
(self.nframes(), self.nchannels()).f(),
self.iter_deinterleaved_converted::<$t>().collect(),
)
.expect(errmsg)
};
};
// Perform the actual conversion // Perform the actual conversion
let converted_data = Arc::new(self.raw.toFloat(self.meta.nchannels())); let converted_data = match &self.raw {
RawStreamData::Datai8(_) => convert_data!(i8),
RawStreamData::Datai16(_) => convert_data!(i16),
RawStreamData::Datai32(_) => convert_data!(i32),
RawStreamData::Dataf32(_) => convert_data!(f32),
RawStreamData::Dataf64(_) => convert_data!(f64),
};
let converted_data = Arc::new(converted_data);
// Replace the option with the Some // Replace the option with the Some
o.replace(converted_data.clone()); write_lock.replace(converted_data.clone());
converted_data converted_data
} }
@ -262,36 +296,38 @@ impl StreamData {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use num::traits::sign;
use cpal::Sample; use cpal::Sample;
use num::traits::sign;
use super::*; use super::*;
use crate::siggen::Siggen;
#[test] #[test]
fn test() { fn test() {
const fs: Flt = 20.; const fs: Flt = 20.;
// Number of samples per channel // Number of samples per channel
const Nframes: usize = 20; const Nframes: usize = 20;
const Nch: usize = 2; const Nch: usize = 2;
let mut signal = [0.; Nch*Nframes]; let mut signal = [0.; Nch * Nframes];
let mut siggen = Siggen::newSine(Nch, 1.); let mut siggen = Siggen::newSine(Nch, 1.);
siggen.reset(fs); siggen.reset(fs);
siggen.setMute(&[false, true]); siggen.setMute(&[false, true]);
siggen.genSignal(&mut signal); siggen.genSignal(&mut signal);
let raw: Vec<i16> = Vec::from_iter(signal.iter().map( let raw: Vec<i16> = Vec::from_iter(signal.iter().map(|o| o.to_sample::<i16>()));
|o| o.to_sample::<i16>()));
let ms1 = raw.iter().step_by(2).map(|s1| {*s1 as f64 * *s1 as f64}).sum::<f64>() / Nframes as f64; let ms1 = raw
.iter()
.step_by(2)
.map(|s1| *s1 as f64 * *s1 as f64)
.sum::<f64>()
/ Nframes as f64;
let i16maxsq = (i16::MAX as f64).powf(2.); let i16maxsq = (i16::MAX as f64).powf(2.);
// println!("ms1: {} {}", ms1, i16maxsq/2.); // println!("ms1: {} {}", ms1, i16maxsq/2.);
// println!("{:?}", raw.iter().cloned().step_by(2).collect::<Vec<i16>>()); // println!("{:?}", raw.iter().cloned().step_by(2).collect::<Vec<i16>>());
// println!("{:?}", i16::EQUILIBRIUM); // println!("{:?}", i16::EQUILIBRIUM);
assert!(f64::abs(ms1 - i16maxsq/2.)/i16maxsq < 1e-3); assert!(f64::abs(ms1 - i16maxsq / 2.) / i16maxsq < 1e-3);
} }
} }

32
src/daq/streamerror.rs Normal file
View File

@ -0,0 +1,32 @@
use strum_macros::Display;
/// Errors that happen in a stream
#[derive(strum_macros::EnumMessage, Debug, Clone, Display, Copy)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub enum StreamError {
/// Input overrun
#[strum(
message = "InputOverrun Error",
detailed_message = "Input buffer overrun"
)]
InputOverrunError,
/// Output underrun
#[strum(
message = "OutputUnderrunError",
detailed_message = "Output buffer underrun"
)]
OutputUnderrunError,
/// Driver specific error
#[strum(message = "DriverError", detailed_message = "Driver error")]
DriverError,
/// Device
#[strum(detailed_message = "Device not available (anymore)")]
DeviceNotAvailable,
/// Logic error (something weird happened)
#[strum(detailed_message = "Logic error")]
LogicError,
}

View File

@ -13,7 +13,13 @@ impl StreamHandler {
/// Create new stream handler. /// Create new stream handler.
pub fn new(smgr: &mut StreamMgr) -> StreamHandler{ pub fn new(smgr: &mut StreamMgr) -> StreamHandler{
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
// The queue there is not 'drop()' in streamhandler, as StreamMgr
// detects on its own when the stream other end of the channel is
// dropped.
smgr.addInQueue(tx); smgr.addInQueue(tx);
StreamHandler{rx} StreamHandler{rx}
} }
} }

60
src/daq/streammetadata.rs Normal file
View File

@ -0,0 +1,60 @@
use super::*;
use crate::config::Flt;
use anyhow::Result;
/// Stream metadata. All information required for properly interpreting the raw
/// data that is coming from the stream.
#[derive(Clone, Debug)]
pub struct StreamMetaData {
/// Information for each channel in the stream
pub channelInfo: Vec<DaqChannel>,
/// The data type of the device [Number / voltage / Acoustic pressure / ...]
pub rawDatatype: DataType,
/// Sample rate in [Hz]
pub samplerate: Flt,
/// The number of frames per block of data that comes in. Multiplied by
/// channelInfo.len() we get the total number of samples that come in at
/// each callback.
pub framesPerBlock: usize,
}
impl StreamMetaData {
/// Create new metadata object.
/// ///
/// # Args
///
pub fn new<'a, T>(
channelInfo: T,
rawdtype: DataType,
sr: Flt,
framesPerBlock: usize,
) -> StreamMetaData
where
T: IntoIterator<Item = &'a DaqChannel>,
{
let channelInfo = channelInfo
.into_iter()
.inspect(|ch| {
assert!(
ch.enabled,
"Only enabled channels should be given as input to StreamMetaData"
);
})
.cloned()
.collect();
StreamMetaData {
channelInfo,
rawDatatype: rawdtype,
samplerate: sr,
framesPerBlock,
}
}
/// Returns the number of channels in the stream metadata.
#[inline]
pub fn nchannels(&self) -> usize {
self.channelInfo.len()
}
}

View File

@ -1,5 +1,4 @@
//! Data acquisition model. Provides abstract layers around DAQ devices. //! Data acquisition model. Provides abstract layers around DAQ devices.
use super::config::*;
use super::*; use super::*;
use crate::{ use crate::{
config::*, config::*,
@ -18,6 +17,7 @@ use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread}; use std::thread::{JoinHandle, Thread};
use streamcmd::StreamCommand; use streamcmd::StreamCommand;
use streamdata::*; use streamdata::*;
use streammetadata::*;
use streammsg::*; use streammsg::*;
#[cfg(feature = "cpal-api")] #[cfg(feature = "cpal-api")]
@ -129,7 +129,7 @@ impl StreamMgr {
/// ///
pub fn new() -> StreamMgr { pub fn new() -> StreamMgr {
if smgr_created.load(std::sync::atomic::Ordering::Relaxed) { if smgr_created.load(std::sync::atomic::Ordering::Relaxed) {
panic!("BUG: Only one stream manager is supposed to be a singleton"); panic!("BUG: Stream manager is supposed to be a singleton");
} }
smgr_created.store(true, std::sync::atomic::Ordering::Relaxed); smgr_created.store(true, std::sync::atomic::Ordering::Relaxed);
@ -238,7 +238,7 @@ impl StreamMgr {
fn startInputStreamThread( fn startInputStreamThread(
&mut self, &mut self,
meta: Arc<StreamMetaData>, meta: Arc<StreamMetaData>,
rx: Receiver<RawStreamData>, rx: Receiver<InStreamMsg>,
) -> (JoinHandle<InQueues>, Sender<StreamCommand>) { ) -> (JoinHandle<InQueues>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded(); let (commtx, commrx) = unbounded();
@ -274,13 +274,12 @@ impl StreamMgr {
} }
} }
} }
if let Ok(raw) = rx.recv_timeout(time::Duration::from_millis(10)) { if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) {
// println!("Obtained raw stream data!"); // println!("Obtained raw stream data!");
if let InStreamMsg::StreamError(e) = msg {
let streamdata = StreamData::new(ctr, meta.clone(), raw); }
let streamdata = Arc::new(streamdata);
let msg = InStreamMsg::StreamData(streamdata);
sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg); sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg);
ctr += 1; ctr += 1;
} }
@ -384,7 +383,6 @@ impl StreamMgr {
} }
} }
// }
} }
siggen siggen
}); });
@ -450,7 +448,7 @@ impl StreamMgr {
bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream."); bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream.");
} }
} }
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded(); let (tx, rx): (Sender<InStreamMsg>, Receiver<InStreamMsg>) = unbounded();
let stream = match cfg.api { let stream = match cfg.api {
StreamApiDescr::Cpal => { StreamApiDescr::Cpal => {
@ -495,7 +493,7 @@ impl StreamMgr {
bail!("Input stream is already running. Please first stop existing input stream.") bail!("Input stream is already running. Please first stop existing input stream.")
} }
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded(); let (tx, rx): (Sender<InStreamMsg>, Receiver<InStreamMsg>) = unbounded();
// Only a default input stream when CPAL feature is enabled // Only a default input stream when CPAL feature is enabled
cfg_if::cfg_if! { cfg_if::cfg_if! {

View File

@ -18,15 +18,11 @@ use super::*;
pub enum InStreamMsg { pub enum InStreamMsg {
/// Raw stream data that is coming from a device. This is interleaved data. The number of channels is correct and /// Raw stream data that is coming from a device. This is interleaved data. The number of channels is correct and
/// specified in the stream metadata. /// specified in the stream metadata.
StreamData(Arc<StreamData>), InStreamData(Arc<InStreamData>),
/// An error has occured in the stream /// An error has occured in the stream
StreamError(StreamError), StreamError(StreamError),
/// Stream data converted to floating point with sample width as
/// compiled in.
ConvertedStreamData(usize, Arc<crate::config::Dmat>),
/// new Stream metadata enters the scene. Probably a new stream started. /// new Stream metadata enters the scene. Probably a new stream started.
StreamStarted(Arc<StreamMetaData>), StreamStarted(Arc<StreamMetaData>),

View File

@ -39,6 +39,7 @@ pub mod filter;
pub mod ps; pub mod ps;
pub mod siggen; pub mod siggen;
use filter::*; use filter::*;
pub mod rt;
/// A Python module implemented in Rust. /// A Python module implemented in Rust.
#[cfg(feature = "python-bindings")] #[cfg(feature = "python-bindings")]

4
src/math/mod.rs Normal file
View File

@ -0,0 +1,4 @@
//! General math tools that are internally required
//!
//!

4
src/rt/mod.rs Normal file
View File

@ -0,0 +1,4 @@
//! Real time signal analysis blocks, used for visual inspection and showing
//! data 'on the fly'. Examples are real time power spectra plotting
//! (Spectrograms, Auto powers, ..., or )
mod rtaps;

137
src/rt/rtaps.rs Normal file
View File

@ -0,0 +1,137 @@
use std::ops::Deref;
use std::thread::{self, JoinHandle};
use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr};
use crate::ps::{AvPowerSpectra, CPSResult};
use crate::I;
use anyhow::Result;
use parking_lot::Mutex;
use rayon::ThreadPool;
use std::sync::Arc;
enum RtApsComm {
CommStopThread,
NewResult(CPSResult),
NewMeta(Arc<StreamMetaData>),
}
/// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent'
pub struct RtAps {
/// Storage for optional last result
comm: Arc<Mutex<Option<RtApsComm>>>,
}
impl RtAps {
/// Build new Real time power spectra computing engine.
pub fn build(mgr: &mut StreamMgr) -> Result<RtAps> {
// Handler needs to be created here.
let handler = StreamHandler::new(mgr);
let last_result = Arc::new(Mutex::new(None));
let last_result2 = last_result.clone();
let mut aps = AvPowerSpectra::build(2048, None, None, None)?;
let thread = std::thread::spawn(move || {
println!("Thread started...");
let rx = handler.rx;
// What is running on the thread
let mut last_cps: Option<CPSResult> = None;
let mut meta: Option<Arc<StreamMetaData>> = None;
'mainloop: loop {
println!("LOOP");
'msgloop: for msg in &rx {
println!("Message found!");
match msg {
InStreamMsg::StreamStarted(new_meta) => {
aps.reset();
last_cps = None;
meta = Some(new_meta);
break 'msgloop;
}
InStreamMsg::StreamStopped | InStreamMsg::StreamError(_) => {
debug_assert!(meta.is_none());
last_cps = None;
}
InStreamMsg::InStreamData(id) => {
debug_assert!(meta.is_none());
let flt = id.getFloatData();
if let Some(cpsresult) = aps.compute_last(flt.view()) {
last_cps = Some(cpsresult.clone());
}
}
}
}
println!("LOOP2");
// Communicate last result, if any.
'commscope: {
let mut last_result_lock = last_result.lock();
if let Some(RtApsComm::CommStopThread) = *last_result_lock {
println!("Stopping RtAps thread");
break 'mainloop;
}
if let Some(newmeta) = meta.take() {
// New metadata has arrived. This is always the first
// thing to push. Only when it is read, we will start
// pushing actual data.
*last_result_lock = Some(RtApsComm::NewMeta(newmeta));
break 'commscope;
}
if let Some(RtApsComm::NewMeta(_)) = *last_result_lock {
// New metadata is not yet read by reading thread. It
// basically means we are not yet ready to give actual
// data back.
break 'commscope;
}
// Move last_cps into mutex.
if let Some(last_cps) = last_cps.take() {
*last_result_lock = Some(RtApsComm::NewResult(last_cps));
}
}
} // End of loop
println!("Ending RtAps thread");
});
assert!(!thread.is_finished());
Ok(RtAps { comm: last_result2 })
}
/// Get last computed value. When new stream metadata is
pub fn get_last(&self) -> Option<RtApsComm> {
let mut lck = self.comm.lock();
let res = lck.take();
if let Some(RtApsComm::CommStopThread) = res {
panic!("BUG: CommStopThread should never be set!")
}
return lck.take();
}
}
impl Drop for RtAps {
fn drop(&mut self) {
println!("DROP");
let mut lck = self.comm.lock();
*lck = Some(RtApsComm::CommStopThread);
println!("DROP done");
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use super::*;
use crate::daq::StreamMgr;
#[test]
fn test_rtaps1() -> Result<()> {
{
let mut smgr = StreamMgr::new();
let rtaps = RtAps::build(&mut smgr)?;
smgr.startDefaultInputStream()?;
thread::sleep(Duration::from_secs(2));
}
Ok(())
}
}