From ff6a0687c41d79f9eb6f70aaa8e327828ce8f4f4 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Thu, 18 Jul 2024 13:06:49 +0200 Subject: [PATCH] Bugfix in type detection for instreammsg. Refactored some overcomplicated code, added first steps for rtaps --- Cargo.toml | 3 + src/bin/lasp_inputdefault.rs | 3 +- src/daq/api/api_cpal.rs | 284 ++++++++++++------------ src/daq/api/mod.rs | 2 +- src/daq/mod.rs | 42 +--- src/daq/record.rs | 33 +-- src/daq/streamdata.rs | 404 +++++++++++++++++++---------------- src/daq/streamerror.rs | 32 +++ src/daq/streamhandler.rs | 6 + src/daq/streammetadata.rs | 60 ++++++ src/daq/streammgr.rs | 18 +- src/daq/streammsg.rs | 6 +- src/lib.rs | 1 + src/math/mod.rs | 4 + src/rt/mod.rs | 4 + src/rt/rtaps.rs | 137 ++++++++++++ 16 files changed, 641 insertions(+), 398 deletions(-) create mode 100644 src/daq/streamerror.rs create mode 100644 src/daq/streammetadata.rs create mode 100644 src/math/mod.rs create mode 100644 src/rt/mod.rs create mode 100644 src/rt/rtaps.rs diff --git a/Cargo.toml b/Cargo.toml index e086c86..aa50c30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,9 @@ clap = { version = "4.5.8", features = ["derive", "color", "help", "suggestions" # FFT's realfft = "3.3.0" +# Fast Mutex +parking_lot = "0.12.3" + [dev-dependencies] approx = "0.5.1" ndarray-rand = "0.14.0" diff --git a/src/bin/lasp_inputdefault.rs b/src/bin/lasp_inputdefault.rs index 6461729..de62152 100644 --- a/src/bin/lasp_inputdefault.rs +++ b/src/bin/lasp_inputdefault.rs @@ -38,8 +38,9 @@ fn main() -> Result<()> { // eprint!("Obtained message: {:?}", msg); match msg { InStreamMsg::StreamStarted(meta) => { - println!("Stream started: {:?}", meta); + println!("Stream started metadata: {meta:#?}"); }, + InStreamMsg::InStreamData(_) => {} _ => { println!("Other msg...");} } } diff --git a/src/daq/api/api_cpal.rs b/src/daq/api/api_cpal.rs index 66c15ec..4b14501 100644 --- a/src/daq/api/api_cpal.rs +++ b/src/daq/api/api_cpal.rs @@ -1,21 +1,21 @@ #![allow(dead_code)] use super::Stream; use super::StreamMetaData; -use crate::config::{ self, * }; -use crate::daq::{ self, * }; -use crate::daq::{ streamdata::*, StreamApiDescr }; -use anyhow::{ bail, Result }; -use cpal::traits::{ DeviceTrait, HostTrait, StreamTrait }; +use crate::config::{self, *}; +use crate::daq::{self, *}; +use crate::daq::{streamdata::*, StreamApiDescr}; +use anyhow::{bail, Result}; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::SampleRate; use cpal::SupportedStreamConfig; -use cpal::{ Device, Host, Sample, SampleFormat, SupportedBufferSize }; +use cpal::{Device, Host, Sample, SampleFormat, SupportedBufferSize}; use crossbeam::atomic::AtomicCell; -use crossbeam::channel::{ Receiver, Sender }; +use crossbeam::channel::{Receiver, Sender}; use itertools::Itertools; use num::ToPrimitive; use reinterpret::reinterpret_slice; use std::any; -use std::any::{ Any, TypeId }; +use std::any::{Any, TypeId}; use std::collections::btree_map::OccupiedEntry; use std::collections::VecDeque; use std::fmt::Debug; @@ -53,16 +53,16 @@ pub struct CpalApi { } pub struct CpalStream { stream: cpal::Stream, - md: Arc, + metadata: Arc, noutchannels: usize, status: Arc>, } impl Stream for CpalStream { fn metadata(&self) -> Arc { - self.md.clone() + self.metadata.clone() } fn ninchannels(&self) -> usize { - self.md.nchannels() + self.metadata.nchannels() } fn noutchannels(&self) -> usize { self.noutchannels @@ -82,17 +82,14 @@ impl CpalApi { } } pub fn getDeviceInfo(&self) -> Result> { - let srs_1 = [1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000]; + let srs_1 = [ + 1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000, + ]; let srs_2 = [11025, 22050, 44100, 88200]; let mut srs_tot = Vec::from_iter(srs_1.iter().chain(srs_2.iter())); srs_tot.sort(); - let srs_tot = Vec::from_iter( - srs_tot - .iter() - .copied() - .map(|i| *i as Flt) - ); + let srs_tot = Vec::from_iter(srs_tot.iter().copied().map(|i| *i as Flt)); // srs_tot.sort(); @@ -145,11 +142,8 @@ impl CpalApi { continue; } - let dtypes: Vec = sample_formats - .iter() - .dedup() - .map(|i| (*i).into()) - .collect(); + let dtypes: Vec = + sample_formats.iter().dedup().map(|i| (*i).into()).collect(); let prefDataType = match dtypes.iter().position(|d| d == &DataType::F32) { Some(idx) => dtypes[idx], @@ -189,8 +183,8 @@ impl CpalApi { // Create the error function closure, that capture the send channel on which error messages from the stream are sent fn create_errfcn( - send_ch: Option>, - status: Arc> + send_ch: Option>, + status: Arc>, ) -> impl FnMut(cpal::StreamError) { move |err: cpal::StreamError| { let serr = match err { @@ -198,19 +192,21 @@ impl CpalApi { cpal::StreamError::BackendSpecific { err: _ } => StreamError::DriverError, }; if let Some(sender) = &send_ch { - sender.send(RawStreamData::StreamError(serr)).unwrap(); + sender.send(InStreamMsg::StreamError(serr)).unwrap(); } status.store(StreamStatus::Error { e: serr }); } } fn create_incallback( + meta: Arc, config: &cpal::StreamConfig, - sender: Sender, + sender: Sender, framesPerBlock: usize, - en_inchannels: Vec + en_inchannels: Vec, ) -> impl FnMut(&[T], &cpal::InputCallbackInfo) - where T: 'static + Sample + ToPrimitive + where + T: 'static + Sample + ToPrimitive, { let tot_inch = config.channels as usize; @@ -219,15 +215,16 @@ impl CpalApi { let mut enabled_ch_data: Vec = vec![Sample::EQUILIBRIUM; en_inchannels.len() * framesPerBlock]; + // let meta = StreamMetaData::new() + let mut ctr = 0; + // The actual callback that is returned move |input: &[T], _: &cpal::InputCallbackInfo| { // Copy elements over in ring buffer q.extend(input); while q.len() > tot_inch * framesPerBlock { - // println!("q full enough: {}", q.len()); - - // // Loop over enabled channels + // Loop over enabled channels for (i, ch) in en_inchannels.iter().enumerate() { let in_iterator = q.iter().skip(*ch).step_by(tot_inch); let out_iterator = enabled_ch_data @@ -245,8 +242,14 @@ impl CpalApi { q.drain(0..framesPerBlock * tot_inch); // Send over data - let msg = RawStreamData::from(enabled_ch_data.clone()); - sender.send(msg).unwrap(); + let streamdata = Arc::new(InStreamData::new( + ctr, + meta.clone(), + enabled_ch_data.clone(), + )); + + sender.send(InStreamMsg::InStreamData(streamdata)).unwrap(); + ctr += 1; } } } @@ -257,12 +260,13 @@ impl CpalApi { /// /// * sf: Sample format fn build_input_stream( + meta: Arc, sf: cpal::SampleFormat, config: &cpal::StreamConfig, device: &cpal::Device, - sender: Sender, + sender: Sender, en_inchannels: Vec, - framesPerBlock: usize + framesPerBlock: usize, ) -> Result<(cpal::Stream, Arc>)> { let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning {})); @@ -273,7 +277,9 @@ impl CpalApi { match sf { $( $cpaltype => { - let icb = CpalApi::create_incallback::<$rtype>(&config, sender, framesPerBlock, en_inchannels); + let icb = CpalApi::create_incallback::<$rtype>( + meta, + &config, sender, framesPerBlock, en_inchannels); device.build_input_stream( &config, icb, @@ -284,8 +290,7 @@ impl CpalApi { } }; } - let stream: cpal::Stream = - build_stream!( + let stream: cpal::Stream = build_stream!( SampleFormat::I8 => i8, SampleFormat::I16 => i16, SampleFormat::I32 => i32, @@ -299,15 +304,13 @@ impl CpalApi { streamstatus: Arc>, receiver: Receiver, ch_config: &[DaqChannel], - framesPerBlock: usize + framesPerBlock: usize, ) -> impl FnMut(&mut [T], &cpal::OutputCallbackInfo) - where T: 'static + Sample + Debug + where + T: 'static + Sample + Debug, { let number_total_out_channels: usize = config.channels as usize; - let number_enabled_out_channels = ch_config - .iter() - .filter(|ch| ch.enabled) - .count(); + let number_enabled_out_channels = ch_config.iter().filter(|ch| ch.enabled).count(); let disabled_ch = DaqChannel::default(); let disabled_repeater = std::iter::repeat(&disabled_ch); @@ -329,10 +332,11 @@ impl CpalApi { let status = streamstatus.load(); callback_ctr += 1; - let mut setToEquilibrium = || + let mut setToEquilibrium = || { outdata.iter_mut().for_each(|v| { *v = Sample::EQUILIBRIUM; - }); + }) + }; match status { StreamStatus::NotRunning {} | StreamStatus::Error { .. } => { setToEquilibrium(); @@ -355,19 +359,18 @@ impl CpalApi { // All right, we have enough samples to send out! They are // drained from the queue let out_chunks = outdata.iter_mut().chunks(number_total_out_channels); - let siggen_chunks = q.drain(..nsamples_asked).chunks(number_enabled_out_channels); + let siggen_chunks = q + .drain(..nsamples_asked) + .chunks(number_enabled_out_channels); for (och, ich) in out_chunks.into_iter().zip(siggen_chunks.into_iter()) { - let mut sig_frame_iter = ich.into_iter(); - och.into_iter() - .zip(&enabled_outch) - .for_each(|(o, en)| ( - if *en { - *o = sig_frame_iter.next().unwrap(); - } else { - *o = Sample::EQUILIBRIUM; - } - )); + och.into_iter().zip(&enabled_outch).for_each(|(o, en)| { + (if *en { + *o = sig_frame_iter.next().unwrap(); + } else { + *o = Sample::EQUILIBRIUM; + }) + }); } // outdata @@ -396,7 +399,7 @@ impl CpalApi { device: &cpal::Device, receiver: Receiver, ch_config: &[DaqChannel], - framesPerBlock: usize + framesPerBlock: usize, ) -> Result<(cpal::Stream, Arc>)> { let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning {})); @@ -417,8 +420,7 @@ impl CpalApi { } }; } - let stream: cpal::Stream = - build_stream!( + let stream: cpal::Stream = build_stream!( SampleFormat::I8 => i8, SampleFormat::I16 => i16, SampleFormat::I32 => i32, @@ -434,9 +436,10 @@ impl CpalApi { devinfo: &DeviceInfo, conf: &DaqConfig, _dev: &cpal::Device, - conf_iterator: T + conf_iterator: T, ) -> Result - where T: Iterator + where + T: Iterator, { let nchannels = match st { StreamType::Input => devinfo.iChannelCount, @@ -448,9 +451,8 @@ impl CpalApi { // Specified sample format is available if cpalconf.channels() == (nchannels as u16) { let requested_sr = conf.sampleRate(devinfo); - if - (cpalconf.min_sample_rate().0 as Flt) <= requested_sr && - (cpalconf.max_sample_rate().0 as Flt) >= requested_sr + if (cpalconf.min_sample_rate().0 as Flt) <= requested_sr + && (cpalconf.max_sample_rate().0 as Flt) >= requested_sr { // Sample rate falls within range. let requested_fpb = conf.framesPerBlock(devinfo) as u32; @@ -482,28 +484,26 @@ impl CpalApi { stype: StreamType, devinfo: &DeviceInfo, conf: &DaqConfig, - sender: Sender + sender: Sender, ) -> Result> { for cpaldev in self.host.devices()? { // See if we can create a supported stream config. let supported_config = (match stype { StreamType::Duplex => bail!("Duplex stream not supported for CPAL"), - StreamType::Input => - CpalApi::create_cpal_config( - stype, - devinfo, - conf, - &cpaldev, - cpaldev.supported_input_configs()? - ), - StreamType::Output => - CpalApi::create_cpal_config( - stype, - devinfo, - conf, - &cpaldev, - cpaldev.supported_output_configs()? - ), + StreamType::Input => CpalApi::create_cpal_config( + stype, + devinfo, + conf, + &cpaldev, + cpaldev.supported_input_configs()?, + ), + StreamType::Output => CpalApi::create_cpal_config( + stype, + devinfo, + conf, + &cpaldev, + cpaldev.supported_output_configs()?, + ), })?; let framesPerBlock = conf.framesPerBlock(devinfo); @@ -514,37 +514,34 @@ impl CpalApi { &conf.enabledInchannelConfig(), conf.dtype, supported_config.sample_rate().0 as Flt, - framesPerBlock - )?; + framesPerBlock, + ); let meta = Arc::new(meta); let (stream, status) = CpalApi::build_input_stream( + meta.clone(), sf, &config, &cpaldev, sender, conf.enabledInchannelsList(), - framesPerBlock + framesPerBlock, )?; stream.play()?; status.store(StreamStatus::Running {}); - return Ok( - Box::new(CpalStream { - stream, - md: meta, - noutchannels: 0, - status, - }) - ); + return Ok(Box::new(CpalStream { + stream, + metadata: meta, + noutchannels: 0, + status, + })); } - bail!( - format!( - "Error: requested device {} not found. Please make sure the device is available.", - devinfo.device_name - ) - ) + bail!(format!( + "Error: requested device {} not found. Please make sure the device is available.", + devinfo.device_name + )) } /// Start a default input stream. @@ -552,7 +549,7 @@ impl CpalApi { /// pub fn startDefaultInputStream( &mut self, - sender: Sender + sender: Sender, ) -> Result> { if let Some(device) = self.host.default_input_device() { if let Ok(config) = device.default_input_config() { @@ -565,43 +562,41 @@ impl CpalApi { let en_inchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize)); let sf = config.sample_format(); + // Specify data tape + let dtype = DataType::from(sf); + + // Daq: default channel config + let daqchannels = Vec::from_iter( + (0..final_config.channels) + .map(|i| DaqChannel::defaultAudio(format!("Unnamed input channel {}", i))), + ); + // Create stream metadata + let metadata = StreamMetaData::new( + &daqchannels, + dtype, + config.sample_rate().0 as Flt, + framesPerBlock, + ); + let metadata = Arc::new(metadata); + let (stream, status) = CpalApi::build_input_stream( + metadata.clone(), sf, &final_config, &device, sender, en_inchannels, - framesPerBlock + framesPerBlock, )?; stream.play()?; status.store(StreamStatus::Running {}); - // Daq: default channel config - let daqchannels = Vec::from_iter( - (0..final_config.channels).map(|i| - DaqChannel::defaultAudio(format!("Unnamed input channel {}", i)) - ) - ); - - // Specify data tape - let dtype = DataType::from(sf); - - // Create stream metadata - let md = StreamMetaData::new( - &daqchannels, - dtype, - config.sample_rate().0 as Flt, - framesPerBlock - )?; - let md = Arc::new(md); - Ok( - Box::new(CpalStream { - stream, - md, - noutchannels: 0, - status, - }) - ) + Ok(Box::new(CpalStream { + stream, + metadata, + noutchannels: 0, + status, + })) } else { bail!("Could not obtain default input configuration") } @@ -629,15 +624,14 @@ impl CpalApi { pub fn startDefaultOutputStream( &self, - receiver: Receiver + receiver: Receiver, ) -> Result> { let (device, config, sampleformat, framesPerBlock) = self.getDefaultOutputConfig()?; // Daq: default channel config let daqchannels = Vec::from_iter( - (0..config.channels).map(|i| - DaqChannel::defaultAudio(format!("Unnamed output channel {}", i)) - ) + (0..config.channels) + .map(|i| DaqChannel::defaultAudio(format!("Unnamed output channel {}", i))), ); let (stream, status) = CpalApi::build_output_stream( sampleformat, @@ -645,7 +639,7 @@ impl CpalApi { &device, receiver, &daqchannels, - framesPerBlock + framesPerBlock, )?; stream.play()?; @@ -659,12 +653,12 @@ impl CpalApi { &daqchannels, dtype, config.sample_rate.0 as Flt, - framesPerBlock - )?; + framesPerBlock, + ); let md = Arc::new(md); let str = Box::new(CpalStream { stream, - md, + metadata: md, noutchannels: daqchannels.len(), status, }); @@ -674,7 +668,7 @@ impl CpalApi { fn getCPALOutputConfig( &self, dev: &DeviceInfo, - daqconfig: &DaqConfig + daqconfig: &DaqConfig, ) -> Result<(Device, cpal::StreamConfig, SampleFormat, usize)> { let samplerate = dev.avSampleRates[daqconfig.sampleRateIndex] as u32; let framesPerBlock = dev.avFramesPerBlock[daqconfig.framesPerBlockIndex]; @@ -716,12 +710,10 @@ impl CpalApi { &self, dev: &DeviceInfo, cfg: &DaqConfig, - receiver: Receiver + receiver: Receiver, ) -> Result> { - let (device, cpalconfig, sampleformat, framesPerBlock) = self.getCPALOutputConfig( - dev, - cfg - )?; + let (device, cpalconfig, sampleformat, framesPerBlock) = + self.getCPALOutputConfig(dev, cfg)?; let (stream, status) = Self::build_output_stream( sampleformat, @@ -729,7 +721,7 @@ impl CpalApi { &device, receiver, &cfg.outchannel_config, - framesPerBlock + framesPerBlock, )?; stream.play()?; @@ -742,12 +734,12 @@ impl CpalApi { &cfg.enabledOutchannelConfig(), dtype, cpalconfig.sample_rate.0 as Flt, - framesPerBlock - )?; + framesPerBlock, + ); let md = Arc::new(md); let str = Box::new(CpalStream { stream, - md, + metadata: md, noutchannels: cpalconfig.channels as usize, status, }); diff --git a/src/daq/api/mod.rs b/src/daq/api/mod.rs index fe37a4f..f9da4e1 100644 --- a/src/daq/api/mod.rs +++ b/src/daq/api/mod.rs @@ -8,7 +8,7 @@ use strum::EnumMessage; use strum_macros; use crate::config::*; -use super::{streamdata::StreamMetaData, streamstatus::StreamStatus}; +use super::{StreamStatus, StreamMetaData}; #[cfg(feature = "cpal-api")] pub mod api_cpal; diff --git a/src/daq/mod.rs b/src/daq/mod.rs index aea8d2f..ad99305 100644 --- a/src/daq/mod.rs +++ b/src/daq/mod.rs @@ -19,6 +19,8 @@ mod qty; #[cfg(feature = "record")] mod record; +#[cfg(feature = "record")] +pub use record::*; mod streamcmd; mod streamdata; @@ -26,6 +28,8 @@ mod streamhandler; mod streammgr; mod streammsg; mod streamstatus; +mod streammetadata; +mod streamerror; // Module re-exports pub use daqconfig::{DaqChannel, DaqConfig}; @@ -36,15 +40,14 @@ pub use streamhandler::StreamHandler; pub use streammgr::*; pub use streammsg::InStreamMsg; pub use streamstatus::StreamStatus; +pub use streamdata::{RawStreamData, InStreamData}; +pub use streammetadata::StreamMetaData; +pub use streamerror::StreamError; use api::*; #[cfg(feature = "record")] -pub use record::*; - -use strum_macros::Display; use crate::config::*; -use super::*; /// Stream types that can be started /// #[cfg_attr(feature = "python-bindings", pyclass)] @@ -76,34 +79,3 @@ pub fn add_py_classses(m: &Bound<'_, PyModule>) -> PyResult<()> { Ok(()) } - -/// Errors that happen in a stream -#[derive(strum_macros::EnumMessage, Debug, Clone, Display, Copy)] -#[cfg_attr(feature = "python-bindings", pyclass)] -pub enum StreamError { - /// Input overrun - #[strum( - message = "InputOverrun Error", - detailed_message = "Input buffer overrun" - )] - InputOverrunError, - - /// Output underrun - #[strum( - message = "OutputUnderrunError", - detailed_message = "Output buffer underrun" - )] - OutputUnderrunError, - - /// Driver specific error - #[strum(message = "DriverError", detailed_message = "Driver error")] - DriverError, - - /// Device - #[strum(detailed_message = "Device not available (anymore)")] - DeviceNotAvailable, - - /// Logic error (something weird happened) - #[strum(detailed_message = "Logic error")] - LogicError, -} diff --git a/src/daq/record.rs b/src/daq/record.rs index 86f2daf..05f4683 100644 --- a/src/daq/record.rs +++ b/src/daq/record.rs @@ -5,10 +5,11 @@ use clap::builder::OsStr; use crossbeam::atomic::AtomicCell; use hdf5::types::{VarLenArray, VarLenUnicode}; use hdf5::{dataset, datatype, Dataset, File, H5Type}; +use parking_lot::Mutex; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamdata::*; @@ -155,34 +156,35 @@ impl Recording { fn append_to_dset( ds: &Dataset, ctr: usize, - msg: &RawStreamData, + data: &InStreamData, framesPerBlock: usize, nchannels: usize, ) -> Result<()> { - match msg { + match data.getRaw() { RawStreamData::Datai8(dat) => { - let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), &dat)?; ds.write_slice(arr, (ctr, .., ..))?; } RawStreamData::Datai16(dat) => { - let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + let arr = + ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), &dat)?; ds.write_slice(arr, (ctr, .., ..))?; } RawStreamData::Datai32(dat) => { - let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + let arr = + ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), &dat)?; ds.write_slice(arr, (ctr, .., ..))?; } RawStreamData::Dataf32(dat) => { - let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + let arr = + ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), &dat)?; ds.write_slice(arr, (ctr, .., ..))?; } RawStreamData::Dataf64(dat) => { - let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + let arr = + ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), &dat)?; ds.write_slice(arr, (ctr, .., ..))?; } - RawStreamData::StreamError(e) => { - bail!("Stream error: {}", e) - } } Ok(()) } @@ -306,7 +308,6 @@ impl Recording { InStreamMsg::StreamError(e) => { bail!("Recording failed due to stream error: {}.", e) } - InStreamMsg::ConvertedStreamData(..) => {} InStreamMsg::StreamStarted(_) => { bail!("Stream started again?") } @@ -314,12 +315,12 @@ impl Recording { // Early stop. User stopped it. break 'recloop; } - InStreamMsg::StreamData(dat) => { + InStreamMsg::InStreamData(instreamdata) => { if first { first = false; // Initialize counter offset - ctr_offset = dat.ctr; - } else if dat.ctr != stored_ctr + ctr_offset { + ctr_offset = instreamdata.ctr; + } else if instreamdata.ctr != stored_ctr + ctr_offset { println!("********** PACKAGES MISSED ***********"); bail!("Packages missed. Recording is invalid.") } @@ -340,7 +341,7 @@ impl Recording { Recording::append_to_dset( &ds, stored_ctr, - &dat.raw, + &instreamdata, framesPerBlock, nchannels, )?; diff --git a/src/daq/streamdata.rs b/src/daq/streamdata.rs index c8251d2..b244f41 100644 --- a/src/daq/streamdata.rs +++ b/src/daq/streamdata.rs @@ -1,153 +1,63 @@ //! Provides stream messages that come from a running stream use crate::config::*; -use crate::daq::Qty; use crate::siggen::Siggen; use anyhow::{bail, Result}; -use cpal::Sample; -use crossbeam::channel::Sender; +use cpal::{FromSample, Sample}; +use itertools::Itertools; +use num::cast::AsPrimitive; +use num::{Bounded, FromPrimitive, Num}; + +use super::*; +use super::*; +use parking_lot::RwLock; use reinterpret::{reinterpret_slice, reinterpret_vec}; use std::any::TypeId; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::u128::MAX; use strum_macros::Display; -use super::*; - -/// Raw stream data coming from a stream. +/// Raw stream data coming from a stream or going to a stream. #[derive(Clone, Debug)] pub enum RawStreamData { - /// 8-bits integer + /// 8-bits integers Datai8(Vec), - /// 16-bits integer + /// 16-bits integers Datai16(Vec), - /// 32-bits integer + /// 32-bits integers Datai32(Vec), - /// 32-bits float + /// 32-bits floats Dataf32(Vec), - /// 64-bits float + /// 64-bits floats Dataf64(Vec), - - /// A stream error occured - StreamError(StreamError), } impl RawStreamData { - pub fn toFloat(&self, _nchannels: usize) -> Dmat { - // match &self { - // RawStreamData::Datai8(c) => { - // Dmat::zeros((2, 2)); - // } - // RawStreamData::Datai16(c) => { - // Dmat::zeros((2, 2)); - // } - // } - todo!() - } -} -impl RawStreamData { - /// Get reference to raw data buffer - pub fn getRef(&self) -> &[T] + /// Create raw stream data from slice of data, or vec of data. Copies over + /// the data. + fn new(input: T) -> RawStreamData where - T: Sample + 'static, + T: Into> + 'static, + U: num::ToPrimitive + Clone + 'static, { - let thetype: TypeId = TypeId::of::(); - match &self { - RawStreamData::Datai8(t) => { - let i8type: TypeId = TypeId::of::(); - assert!(thetype == i8type); - let v: &[T] = unsafe { reinterpret_slice(t) }; - v - } - RawStreamData::Datai16(t) => { - let i16type: TypeId = TypeId::of::(); - assert!(thetype == i16type); - let v: &[T] = unsafe { reinterpret_slice(t) }; - v - } - RawStreamData::Datai32(t) => { - let i32type: TypeId = TypeId::of::(); - assert!(thetype == i32type); - let v: &[T] = unsafe { reinterpret_slice(t) }; - v - } - RawStreamData::Dataf32(t) => { - let f32type: TypeId = TypeId::of::(); - assert!(thetype == f32type); - let v: &[T] = unsafe { reinterpret_slice(t) }; - v - } - RawStreamData::Dataf64(t) => { - let f64type: TypeId = TypeId::of::(); - assert!(thetype == f64type); - let v: &[T] = unsafe { reinterpret_slice(t) }; - v - } - _ => panic!("Cannot getRef from "), - } - } -} - -// Create InStreamData object from -impl From<&[T]> for RawStreamData -where - T: num::ToPrimitive + Clone + 'static, -{ - fn from(input: &[T]) -> RawStreamData { - // Apparently, this code does not work with a match. I have searched around and have not found the - // reason for this. So this is a bit of stupid boilerplate. + let input = input.into(); + // Apparently, this code does not work with a match. I have searched + // around and have not found the reason for this. So this is a bit of + // stupid boilerplate. let i8type: TypeId = TypeId::of::(); let i16type: TypeId = TypeId::of::(); let i32type: TypeId = TypeId::of::(); let f32type: TypeId = TypeId::of::(); let f64type: TypeId = TypeId::of::(); - let thetype: TypeId = TypeId::of::(); - if i8type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Datai8(v) - } else if i16type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Datai16(v) - } else if i16type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Datai16(v) - } else if i32type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Datai32(v) - } else if f32type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Dataf32(v) - } else if f64type == thetype { - let v: Vec = unsafe { reinterpret_slice(input).to_vec() }; - RawStreamData::Dataf64(v) - } else { - panic!("Not implemented sample type!") - } - } -} -// Create InStreamData object from -impl From> for RawStreamData -where - T: num::ToPrimitive + Clone + 'static, -{ - fn from(input: Vec) -> RawStreamData { - // Apparently, this code does not work with a match. I have searched around and have not found the - // reason for this. So this is a bit of stupid boilerplate. - let i8type: TypeId = TypeId::of::(); - let i16type: TypeId = TypeId::of::(); - let i32type: TypeId = TypeId::of::(); - let f32type: TypeId = TypeId::of::(); - let f64type: TypeId = TypeId::of::(); - let thetype: TypeId = TypeId::of::(); + // The type to create for + let thetype: TypeId = TypeId::of::(); + if i8type == thetype { let v: Vec = unsafe { reinterpret_vec(input) }; RawStreamData::Datai8(v) } else if i16type == thetype { let v: Vec = unsafe { reinterpret_vec(input) }; RawStreamData::Datai16(v) - } else if i16type == thetype { - let v: Vec = unsafe { reinterpret_vec(input) }; - RawStreamData::Datai16(v) } else if i32type == thetype { let v: Vec = unsafe { reinterpret_vec(input) }; RawStreamData::Datai32(v) @@ -158,57 +68,49 @@ where let v: Vec = unsafe { reinterpret_vec(input) }; RawStreamData::Dataf64(v) } else { - panic!("Not implemented sample type!") + panic!("Not implemented sample type! Type: {thetype:?}, i8 = {i8type:?}, i16 = {i16type:?}, i32 = {i32type:?}, f32 = {f32type:?}, f64 = {f64type:?}.") + } + } + /// Return a reference to the slice of data. + /// + /// # Panics + /// + /// - If the tye requested does not match the type stored. + pub fn getRef(&self) -> &[T] + where + T: Sample + 'static, + { + let type_requested = TypeId::of::(); + macro_rules! ret_ref { + ($c:expr,$t:ty) => {{ + let type_this = TypeId::of::<$t>(); + assert_eq!(type_requested, type_this, "Wrong type requested"); + unsafe { reinterpret_slice::<$t, T>(&$c) } + }}; + } + use RawStreamData::*; + match &self { + Datai8(v) => { + ret_ref!(v, i8) + } + Datai16(v) => { + ret_ref!(v, i16) + } + Datai32(v) => { + ret_ref!(v, i32) + } + Dataf32(v) => { + ret_ref!(v, f32) + } + Dataf64(v) => { + ret_ref!(v, f64) + } } } } - -/// Stream metadata. All information required for properly interpreting the raw -/// data that is coming from the stream. -#[derive(Clone, Debug)] -pub struct StreamMetaData { - /// Information for each channel in the stream - pub channelInfo: Vec, - - /// The data type of the device [Number / voltage / Acoustic pressure / ...] - pub rawDatatype: DataType, - - /// Sample rate in [Hz] - pub samplerate: Flt, - - /// The number of frames per block of data that comes in. Multiplied by - /// channelInfo.len() we get the total number of samples that come in at - /// each callback. - pub framesPerBlock: usize, -} -impl StreamMetaData { - /// Create new metadata object. - /// /// - /// # Args - /// - pub fn new( - channelInfo: &[DaqChannel], - rawdtype: DataType, - sr: Flt, - framesPerBlock: usize, - ) -> Result { - Ok(StreamMetaData { - channelInfo: channelInfo.to_vec(), - rawDatatype: rawdtype, - samplerate: sr, - framesPerBlock, - }) - } - - /// Returns the number of channels in the stream metadata. - pub fn nchannels(&self) -> usize { - self.channelInfo.len() - } -} - /// Stream data (audio / other) coming from a stream or to be send to a stream #[derive(Debug)] -pub struct StreamData { +pub struct InStreamData { /// Package counter. Should always increase monotonically. pub ctr: usize, @@ -216,45 +118,177 @@ pub struct StreamData { pub meta: Arc, /// This is typically what is stored when recording - pub raw: RawStreamData, + raw: RawStreamData, // Converted to floating point format. Used for further real time // processing. Stored in an rw-lock. The first thread that acesses this data // will perform the conversion. All threads after that will get the data. converted: RwLock>>, } -impl StreamData { + +impl InStreamData { + #[inline] + /// Return reference to underlying raw data storage + pub fn getRaw(&self) -> &RawStreamData { + return &self.raw; + } + #[inline] + /// Convenience function to return the number of channels in this instreamdata. + pub fn nchannels(&self) -> usize { + return self.meta.nchannels(); + } + /// Iterate over raw data of a certain channel. Tye should be specificied + /// and if not set correctly, this results in undefined behavior + pub fn iter_channel_raw<'a, T>(&'a self, ch: usize) -> impl Iterator + 'a + where + T: Sample + Copy + 'static, + { + let type_requested: TypeId = TypeId::of::(); + macro_rules! create_iter { + ($c:expr,$t:ty) => {{ + // Check that the type matches the type stored + let cur_type: TypeId = TypeId::of::<$t>(); + assert!( + type_requested == cur_type, + "BUG: Type mismatch on channel data iterator" + ); + let v: &'a [T] = unsafe { reinterpret_slice($c) }; + v.iter().skip(ch).step_by(self.meta.nchannels()) + }}; + }; + + match &self.raw { + RawStreamData::Datai8(c) => { + create_iter!(c, i8) + } + RawStreamData::Datai16(c) => { + create_iter!(c, i16) + } + RawStreamData::Datai32(c) => { + create_iter!(c, i32) + } + RawStreamData::Dataf32(c) => { + create_iter!(c, f32) + } + RawStreamData::Dataf64(c) => { + create_iter!(c, f64) + } + } + } + /// Iterate over all channels, deinterleaved. So first all samples from the + /// first channel, etc... + pub fn iter_deinterleaved_raw_allchannels<'a, T>( + &'a self, + ) -> Box + 'a> + where + T: Sample + Copy + 'static, + { + Box::new( + (0..self.meta.nchannels()) + .into_iter() + .flat_map(|chi| self.iter_channel_raw(chi)), + ) + } + fn iter_channel_converted<'a, T>(&'a self, ch: usize) -> impl Iterator + 'a + where + T: Sample + Copy + 'static, + Flt: FromSample, + { + self.iter_channel_raw(ch) + .copied() + .map(move |v: T| Flt::from_sample(v) / self.meta.channelInfo[ch].sensitivity) + } + + /// Iterate over data. where data is converted to floating point, and + /// corrected for sensivity values. Returns all data, in order of channel. + pub fn iter_deinterleaved_converted<'a, T>(&'a self) -> Box + 'a> + where + T: Sample + Copy + 'static, + Flt: FromSample, + { + Box::new( + (0..self.meta.nchannels()) + .into_iter() + .flat_map(move |chi| self.iter_channel_converted(chi)), + ) + } + /// Create new stream data object. - pub fn new(ctr: usize, meta: Arc, raw: RawStreamData) -> StreamData { - StreamData { + pub fn new(ctr: usize, meta: Arc, raw: T) -> InStreamData + where + T: Into> + 'static, + U: Sample + num::ToPrimitive + Clone + 'static, + { + InStreamData { ctr, meta, - raw, + raw: RawStreamData::new(raw), converted: RwLock::new(None), } } + /// Returns the number of frames in this InstreamData + pub fn nframes(&self) -> usize { + let nch = self.meta.nchannels(); + match &self.raw { + RawStreamData::Datai8(c) => { + return c.len() / nch; + } + RawStreamData::Datai16(c) => { + return c.len() / nch; + } + RawStreamData::Datai32(c) => { + return c.len() / nch; + } + RawStreamData::Dataf32(c) => { + return c.len() / nch; + } + RawStreamData::Dataf64(c) => { + return c.len() / nch; + } + } + } /// Get the data in floating point format. If already converted, uses the /// cached float data. pub fn getFloatData(&self) -> Arc { - if let Some(dat) = self.converted.read().unwrap().as_ref() { + if let Some(dat) = self.converted.read().as_ref() { return dat.clone(); } // In case we reach here, the data has not yet be converted to floating // point, so we do this. - let mut o = self.converted.write().unwrap(); + let mut write_lock = self.converted.write(); // It might be that another thread was 'first', and already performed // the conversion. In that case, we still do an early return, and we // just openend the lock twice for writing. Not a problem. - if let Some(dat) = o.as_ref() { + if let Some(dat) = write_lock.as_ref() { return dat.clone(); } + + let errmsg = "Data cannot be converted to floating point"; + + macro_rules! convert_data { + ($t:ty) => { + Dmat::from_shape_vec( + (self.nframes(), self.nchannels()).f(), + self.iter_deinterleaved_converted::<$t>().collect(), + ) + .expect(errmsg) + }; + }; + // Perform the actual conversion - let converted_data = Arc::new(self.raw.toFloat(self.meta.nchannels())); + let converted_data = match &self.raw { + RawStreamData::Datai8(_) => convert_data!(i8), + RawStreamData::Datai16(_) => convert_data!(i16), + RawStreamData::Datai32(_) => convert_data!(i32), + RawStreamData::Dataf32(_) => convert_data!(f32), + RawStreamData::Dataf64(_) => convert_data!(f64), + }; + let converted_data = Arc::new(converted_data); // Replace the option with the Some - o.replace(converted_data.clone()); + write_lock.replace(converted_data.clone()); converted_data } @@ -262,36 +296,38 @@ impl StreamData { #[cfg(test)] mod test { - use num::traits::sign; use cpal::Sample; + use num::traits::sign; use super::*; + use crate::siggen::Siggen; #[test] fn test() { - const fs: Flt = 20.; // Number of samples per channel const Nframes: usize = 20; const Nch: usize = 2; - let mut signal = [0.; Nch*Nframes]; + let mut signal = [0.; Nch * Nframes]; let mut siggen = Siggen::newSine(Nch, 1.); siggen.reset(fs); siggen.setMute(&[false, true]); siggen.genSignal(&mut signal); - let raw: Vec = Vec::from_iter(signal.iter().map( - |o| o.to_sample::())); - - let ms1 = raw.iter().step_by(2).map(|s1| {*s1 as f64 * *s1 as f64}).sum::() / Nframes as f64; + let raw: Vec = Vec::from_iter(signal.iter().map(|o| o.to_sample::())); + + let ms1 = raw + .iter() + .step_by(2) + .map(|s1| *s1 as f64 * *s1 as f64) + .sum::() + / Nframes as f64; let i16maxsq = (i16::MAX as f64).powf(2.); // println!("ms1: {} {}", ms1, i16maxsq/2.); // println!("{:?}", raw.iter().cloned().step_by(2).collect::>()); // println!("{:?}", i16::EQUILIBRIUM); - assert!(f64::abs(ms1 - i16maxsq/2.)/i16maxsq < 1e-3); - + assert!(f64::abs(ms1 - i16maxsq / 2.) / i16maxsq < 1e-3); } - -} \ No newline at end of file +} diff --git a/src/daq/streamerror.rs b/src/daq/streamerror.rs new file mode 100644 index 0000000..61de1d2 --- /dev/null +++ b/src/daq/streamerror.rs @@ -0,0 +1,32 @@ +use strum_macros::Display; + +/// Errors that happen in a stream +#[derive(strum_macros::EnumMessage, Debug, Clone, Display, Copy)] +#[cfg_attr(feature = "python-bindings", pyclass)] +pub enum StreamError { + /// Input overrun + #[strum( + message = "InputOverrun Error", + detailed_message = "Input buffer overrun" + )] + InputOverrunError, + + /// Output underrun + #[strum( + message = "OutputUnderrunError", + detailed_message = "Output buffer underrun" + )] + OutputUnderrunError, + + /// Driver specific error + #[strum(message = "DriverError", detailed_message = "Driver error")] + DriverError, + + /// Device + #[strum(detailed_message = "Device not available (anymore)")] + DeviceNotAvailable, + + /// Logic error (something weird happened) + #[strum(detailed_message = "Logic error")] + LogicError, +} \ No newline at end of file diff --git a/src/daq/streamhandler.rs b/src/daq/streamhandler.rs index 3c3a3bf..8a381da 100644 --- a/src/daq/streamhandler.rs +++ b/src/daq/streamhandler.rs @@ -13,7 +13,13 @@ impl StreamHandler { /// Create new stream handler. pub fn new(smgr: &mut StreamMgr) -> StreamHandler{ let (tx, rx) = unbounded(); + + // The queue there is not 'drop()' in streamhandler, as StreamMgr + // detects on its own when the stream other end of the channel is + // dropped. smgr.addInQueue(tx); StreamHandler{rx} } + } + diff --git a/src/daq/streammetadata.rs b/src/daq/streammetadata.rs new file mode 100644 index 0000000..7a95e84 --- /dev/null +++ b/src/daq/streammetadata.rs @@ -0,0 +1,60 @@ +use super::*; +use crate::config::Flt; +use anyhow::Result; +/// Stream metadata. All information required for properly interpreting the raw +/// data that is coming from the stream. +#[derive(Clone, Debug)] +pub struct StreamMetaData { + /// Information for each channel in the stream + pub channelInfo: Vec, + + /// The data type of the device [Number / voltage / Acoustic pressure / ...] + pub rawDatatype: DataType, + + /// Sample rate in [Hz] + pub samplerate: Flt, + + /// The number of frames per block of data that comes in. Multiplied by + /// channelInfo.len() we get the total number of samples that come in at + /// each callback. + pub framesPerBlock: usize, +} + +impl StreamMetaData { + /// Create new metadata object. + /// /// + /// # Args + /// + pub fn new<'a, T>( + channelInfo: T, + rawdtype: DataType, + sr: Flt, + framesPerBlock: usize, + ) -> StreamMetaData + where + T: IntoIterator, + { + let channelInfo = channelInfo + .into_iter() + .inspect(|ch| { + assert!( + ch.enabled, + "Only enabled channels should be given as input to StreamMetaData" + ); + }) + .cloned() + .collect(); + StreamMetaData { + channelInfo, + rawDatatype: rawdtype, + samplerate: sr, + framesPerBlock, + } + } + + /// Returns the number of channels in the stream metadata. + #[inline] + pub fn nchannels(&self) -> usize { + self.channelInfo.len() + } +} diff --git a/src/daq/streammgr.rs b/src/daq/streammgr.rs index a781d63..3776539 100644 --- a/src/daq/streammgr.rs +++ b/src/daq/streammgr.rs @@ -1,5 +1,4 @@ //! Data acquisition model. Provides abstract layers around DAQ devices. -use super::config::*; use super::*; use crate::{ config::*, @@ -18,6 +17,7 @@ use std::sync::{atomic::AtomicBool, Arc, Mutex}; use std::thread::{JoinHandle, Thread}; use streamcmd::StreamCommand; use streamdata::*; +use streammetadata::*; use streammsg::*; #[cfg(feature = "cpal-api")] @@ -129,7 +129,7 @@ impl StreamMgr { /// pub fn new() -> StreamMgr { if smgr_created.load(std::sync::atomic::Ordering::Relaxed) { - panic!("BUG: Only one stream manager is supposed to be a singleton"); + panic!("BUG: Stream manager is supposed to be a singleton"); } smgr_created.store(true, std::sync::atomic::Ordering::Relaxed); @@ -238,7 +238,7 @@ impl StreamMgr { fn startInputStreamThread( &mut self, meta: Arc, - rx: Receiver, + rx: Receiver, ) -> (JoinHandle, Sender) { let (commtx, commrx) = unbounded(); @@ -274,13 +274,12 @@ impl StreamMgr { } } } - if let Ok(raw) = rx.recv_timeout(time::Duration::from_millis(10)) { + if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) { // println!("Obtained raw stream data!"); + if let InStreamMsg::StreamError(e) = msg { - let streamdata = StreamData::new(ctr, meta.clone(), raw); - let streamdata = Arc::new(streamdata); + } - let msg = InStreamMsg::StreamData(streamdata); sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg); ctr += 1; } @@ -384,7 +383,6 @@ impl StreamMgr { } } - // } } siggen }); @@ -450,7 +448,7 @@ impl StreamMgr { bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream."); } } - let (tx, rx): (Sender, Receiver) = unbounded(); + let (tx, rx): (Sender, Receiver) = unbounded(); let stream = match cfg.api { StreamApiDescr::Cpal => { @@ -495,7 +493,7 @@ impl StreamMgr { bail!("Input stream is already running. Please first stop existing input stream.") } - let (tx, rx): (Sender, Receiver) = unbounded(); + let (tx, rx): (Sender, Receiver) = unbounded(); // Only a default input stream when CPAL feature is enabled cfg_if::cfg_if! { diff --git a/src/daq/streammsg.rs b/src/daq/streammsg.rs index 9cc6058..142503a 100644 --- a/src/daq/streammsg.rs +++ b/src/daq/streammsg.rs @@ -18,15 +18,11 @@ use super::*; pub enum InStreamMsg { /// Raw stream data that is coming from a device. This is interleaved data. The number of channels is correct and /// specified in the stream metadata. - StreamData(Arc), + InStreamData(Arc), /// An error has occured in the stream StreamError(StreamError), - /// Stream data converted to floating point with sample width as - /// compiled in. - ConvertedStreamData(usize, Arc), - /// new Stream metadata enters the scene. Probably a new stream started. StreamStarted(Arc), diff --git a/src/lib.rs b/src/lib.rs index 4092b98..51581f4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,7 @@ pub mod filter; pub mod ps; pub mod siggen; use filter::*; +pub mod rt; /// A Python module implemented in Rust. #[cfg(feature = "python-bindings")] diff --git a/src/math/mod.rs b/src/math/mod.rs new file mode 100644 index 0000000..c0b9747 --- /dev/null +++ b/src/math/mod.rs @@ -0,0 +1,4 @@ +//! General math tools that are internally required +//! +//! + diff --git a/src/rt/mod.rs b/src/rt/mod.rs new file mode 100644 index 0000000..d00ef6c --- /dev/null +++ b/src/rt/mod.rs @@ -0,0 +1,4 @@ +//! Real time signal analysis blocks, used for visual inspection and showing +//! data 'on the fly'. Examples are real time power spectra plotting +//! (Spectrograms, Auto powers, ..., or ) +mod rtaps; \ No newline at end of file diff --git a/src/rt/rtaps.rs b/src/rt/rtaps.rs new file mode 100644 index 0000000..ad308b9 --- /dev/null +++ b/src/rt/rtaps.rs @@ -0,0 +1,137 @@ +use std::ops::Deref; +use std::thread::{self, JoinHandle}; + +use crate::daq::{InStreamMsg, StreamHandler, StreamMetaData, StreamMgr}; +use crate::ps::{AvPowerSpectra, CPSResult}; +use crate::I; +use anyhow::Result; +use parking_lot::Mutex; +use rayon::ThreadPool; +use std::sync::Arc; + +enum RtApsComm { + CommStopThread, + NewResult(CPSResult), + NewMeta(Arc), +} + +/// Real time power spectra viewer. Shows cross-power or auto-power signal 'time-dependent' +pub struct RtAps { + /// Storage for optional last result + comm: Arc>>, +} + +impl RtAps { + /// Build new Real time power spectra computing engine. + pub fn build(mgr: &mut StreamMgr) -> Result { + // Handler needs to be created here. + let handler = StreamHandler::new(mgr); + let last_result = Arc::new(Mutex::new(None)); + let last_result2 = last_result.clone(); + + let mut aps = AvPowerSpectra::build(2048, None, None, None)?; + + let thread = std::thread::spawn(move || { + println!("Thread started..."); + let rx = handler.rx; + // What is running on the thread + + let mut last_cps: Option = None; + let mut meta: Option> = None; + + 'mainloop: loop { + println!("LOOP"); + 'msgloop: for msg in &rx { + println!("Message found!"); + match msg { + InStreamMsg::StreamStarted(new_meta) => { + aps.reset(); + last_cps = None; + meta = Some(new_meta); + break 'msgloop; + } + InStreamMsg::StreamStopped | InStreamMsg::StreamError(_) => { + debug_assert!(meta.is_none()); + last_cps = None; + } + InStreamMsg::InStreamData(id) => { + debug_assert!(meta.is_none()); + let flt = id.getFloatData(); + if let Some(cpsresult) = aps.compute_last(flt.view()) { + last_cps = Some(cpsresult.clone()); + } + } + } + } + + println!("LOOP2"); + // Communicate last result, if any. + 'commscope: { + let mut last_result_lock = last_result.lock(); + + if let Some(RtApsComm::CommStopThread) = *last_result_lock { + println!("Stopping RtAps thread"); + break 'mainloop; + } + if let Some(newmeta) = meta.take() { + // New metadata has arrived. This is always the first + // thing to push. Only when it is read, we will start + // pushing actual data. + *last_result_lock = Some(RtApsComm::NewMeta(newmeta)); + break 'commscope; + } + + if let Some(RtApsComm::NewMeta(_)) = *last_result_lock { + // New metadata is not yet read by reading thread. It + // basically means we are not yet ready to give actual + // data back. + break 'commscope; + } + // Move last_cps into mutex. + if let Some(last_cps) = last_cps.take() { + *last_result_lock = Some(RtApsComm::NewResult(last_cps)); + } + } + } // End of loop + println!("Ending RtAps thread"); + }); + assert!(!thread.is_finished()); + + Ok(RtAps { comm: last_result2 }) + } + /// Get last computed value. When new stream metadata is + pub fn get_last(&self) -> Option { + let mut lck = self.comm.lock(); + let res = lck.take(); + if let Some(RtApsComm::CommStopThread) = res { + panic!("BUG: CommStopThread should never be set!") + } + return lck.take(); + } +} +impl Drop for RtAps { + fn drop(&mut self) { + println!("DROP"); + let mut lck = self.comm.lock(); + *lck = Some(RtApsComm::CommStopThread); + println!("DROP done"); + } +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use super::*; + use crate::daq::StreamMgr; + #[test] + fn test_rtaps1() -> Result<()> { + { + let mut smgr = StreamMgr::new(); + let rtaps = RtAps::build(&mut smgr)?; + smgr.startDefaultInputStream()?; + thread::sleep(Duration::from_secs(2)); + } + Ok(()) + } +}