From b770c4d8fbbc3126b8ec43008f4ed3806fbd4710 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Thu, 28 Dec 2023 23:49:25 +0100 Subject: [PATCH] Recording is working. Partial pyo3 exposure. Now, RtAps, or first Siggen? --- Cargo.toml | 5 +- src/bin/lasp_devinfo.rs | 32 ++++++-- src/bin/lasp_inputdefault.rs | 2 +- src/bin/lasp_record.rs | 98 +++++++++++++++++++++++ src/bin/lasp_recorddefault.rs | 45 ----------- src/config.rs | 10 +++ src/daq/api/api_cpal.rs | 95 ++++++++++++++-------- src/daq/api/mod.rs | 5 +- src/daq/daqconfig.rs | 97 ++++++++++++++++++++-- src/daq/deviceinfo.rs | 5 +- src/daq/mod.rs | 92 ++++++++++++++++++--- src/daq/record.rs | 147 ++++++++++++++++++++++++++++------ src/daq/streammsg.rs | 35 +++++--- src/lib.rs | 10 ++- 14 files changed, 533 insertions(+), 145 deletions(-) create mode 100644 src/bin/lasp_record.rs delete mode 100644 src/bin/lasp_recorddefault.rs diff --git a/Cargo.toml b/Cargo.toml index 141dd15..55ebe2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,13 +73,14 @@ itertools = "0.12.0" chrono = {version = "0.4.31", optional = true} # For getting UUIDs in recording uuid = { version = "1.6.1", features = ["v4"] , optional = true} +clap = { version = "4.4.11", features = ["derive", "color", "help", "suggestions"] } [features] -default = ["f64", "cpal_api", "record"] +default = ["f64", "cpal-api", "record"] # Use this for debugging extensions # default = ["f64", "python-bindings", "record", "cpal-api"] -cpal_api = ["dep:cpal"] +cpal-api = ["dep:cpal"] record = ["dep:hdf5-sys", "dep:hdf5", "dep:chrono", "dep:uuid"] f64 = [] f32 = [] diff --git a/src/bin/lasp_devinfo.rs b/src/bin/lasp_devinfo.rs index 3f5f4ab..400d200 100644 --- a/src/bin/lasp_devinfo.rs +++ b/src/bin/lasp_devinfo.rs @@ -1,14 +1,36 @@ use anyhow::Result; -use lasprs::daq::StreamMgr; +use clap::Parser; +use lasprs::daq::{DaqConfig, StreamMgr}; + +/// Simple program to greet a person +#[derive(Parser, Debug)] +#[command(author, version, about="Generates DAQ configurations for available devices.", long_about = None)] +struct Args { + /// Name of the person to greet + #[arg(short, long)] + matches: Vec, +} fn main() -> Result<()> { + let args = Args::parse(); + let write_all = args.matches.len() == 0; let mut smgr = StreamMgr::new(); let devs = smgr.getDeviceInfo(); - for dev in devs { - println!("========="); - println!("{:?}", dev); - println!("-------------"); + for dev in devs.iter() { + let filename = dev.device_name.clone() + ".toml"; + if write_all { + let daqconfig = DaqConfig::newFromDeviceInfo(&dev); + daqconfig.serialize_TOML_file(&filename.clone().into())?; + } else { + for m in args.matches.iter() { + let needle =m.to_lowercase(); + let dev_lower = (&dev.device_name).to_lowercase(); + if dev_lower.contains(&needle) { + DaqConfig::newFromDeviceInfo(&dev).serialize_TOML_file(&filename.clone().into())?; + } + } + } } Ok(()) diff --git a/src/bin/lasp_inputdefault.rs b/src/bin/lasp_inputdefault.rs index ff1dcba..6461729 100644 --- a/src/bin/lasp_inputdefault.rs +++ b/src/bin/lasp_inputdefault.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; +use crossbeam::channel::{unbounded, Receiver, TryRecvError}; use lasprs::daq::{StreamHandler, StreamMgr, InStreamMsg}; use std::io; use std::{thread, time}; diff --git a/src/bin/lasp_record.rs b/src/bin/lasp_record.rs new file mode 100644 index 0000000..393ee23 --- /dev/null +++ b/src/bin/lasp_record.rs @@ -0,0 +1,98 @@ +use anyhow::Result; +use clap::{arg, command, Parser}; +use crossbeam::channel::{unbounded, Receiver, TryRecvError}; +#[cfg(feature = "record")] +use lasprs::daq::{StreamType,RecordSettings, RecordStatus, Recording, StreamMgr}; +use lasprs::Flt; +use std::{ + io, thread, + time::{self, Duration}, +}; + +#[derive(Parser)] +#[command(author, version, about = "Record data to h5 file, according to LASP format", long_about = None)] +struct Cli { + /// File name to write recording to + filename: String, + + /// Recording duration in [s]. Rounds down to whole seconds. If not specified, records until user presses a key + #[arg(short, long = "duration", default_value_t = 0.)] + duration_s: Flt, + + /// TOML configuration file for used stream + #[arg(short, long = "config-file")] + config_file_daq: Option, +} + +#[cfg(feature = "record")] +fn main() -> Result<()> { + use lasprs::daq::DaqConfig; + + let ops = Cli::parse(); + + let mut smgr = StreamMgr::new(); + let stdin_channel = spawn_stdin_channel(); + + let settings = RecordSettings { + filename: ops.filename.into(), + duration: Duration::from_secs(ops.duration_s as u64), + }; + match ops.config_file_daq { + None => smgr.startDefaultInputStream()?, + Some(filename) => { + let file = std::fs::read_to_string(filename)?; + let cfg = DaqConfig::deserialize_TOML_str(&file)?; + smgr.startStream(StreamType::Input, &cfg)?; + } + + } + + let mut r = Recording::new(settings, &mut smgr)?; + + println!("Starting to record..."); + 'infy: loop { + match r.status() { + RecordStatus::Idle => println!("\nIdle"), + RecordStatus::Error(e) => { + println!("\nRecord error: {}", e); + break 'infy; + } + RecordStatus::Finished => { + println!("\nRecording finished."); + break 'infy; + } + RecordStatus::Recording(duration) => { + println!("Recording... {} ms", duration.as_millis()); + } + RecordStatus::NoUpdate => {} + }; + + match stdin_channel.try_recv() { + Ok(_key) => { + println!("User pressed key. Manually stopping recording here."); + break 'infy; + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => panic!("Channel disconnected"), + } + + sleep(100); + } + + Ok(()) +} + +fn sleep(millis: u64) { + let duration = time::Duration::from_millis(millis); + thread::sleep(duration); +} + +fn spawn_stdin_channel() -> Receiver { + let (tx, rx) = unbounded(); + thread::spawn(move || loop { + let mut buffer = String::new(); + io::stdin().read_line(&mut buffer).unwrap(); + tx.send(buffer).unwrap(); + }); + rx +} diff --git a/src/bin/lasp_recorddefault.rs b/src/bin/lasp_recorddefault.rs deleted file mode 100644 index 531f31a..0000000 --- a/src/bin/lasp_recorddefault.rs +++ /dev/null @@ -1,45 +0,0 @@ -use anyhow::Result; -#[cfg(feature="record")] -use lasprs::daq::{RecordSettings, RecordStatus, Recording, StreamMgr}; -use std::{thread, time::{self, Duration}}; -// use - -#[cfg(feature="record")] -fn main() -> Result<()> { - let mut smgr = StreamMgr::new(); - - let settings = RecordSettings { - filename: "test.h5".into(), - duration: Duration::from_secs(2), - }; - - smgr.startDefaultInputStream()?; - let mut r = Recording::new(settings, &mut smgr)?; - - println!("Starting to record..."); - loop { - match r.status() { - RecordStatus::Idle => println!("Idle"), - RecordStatus::Error(e) => { - println!("Record error: {}", e); - break; - - } - RecordStatus::Finished => { - println!("\nRecording finished."); - break; - } - RecordStatus::Recording(duration) => { - print!("\rRecording... {} ms", duration.as_millis()); - } - }; - sleep(10); - } - - Ok(()) -} - -fn sleep(millis: u64) { - let duration = time::Duration::from_millis(millis); - thread::sleep(duration); -} diff --git a/src/config.rs b/src/config.rs index 46f5960..459b797 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,11 +4,15 @@ cfg_if::cfg_if! { if #[cfg(feature="f64")] { + /// Floating-point value, compile time option to make it either f32, or f64 pub type Flt = f64; + /// Ratio between circumference and diameter of a circle pub const pi: Flt = std::f64::consts::PI; } else if #[cfg(feature="f32")] { + /// Floating-point value, compile time option to make it either f32, or f64 pub type Flt = f32; + /// Ratio between circumference and diameter of a circle pub const pi: Flt = std::f32::consts::PI; } else { @@ -21,11 +25,17 @@ use num::complex::*; pub type Cflt = Complex; use ndarray::{Array1, Array2}; +/// Vector of floating point values pub type Vd = Vec; +/// Vector of complex floating point values pub type Vc = Vec; +/// 1D array of floats pub type Dcol = Array1; +/// 1D array of complex floats pub type Ccol = Array1; +/// 2D array of floats pub type Dmat = Array2; +/// 2D array of complex floats pub type Cmat = Array2; diff --git a/src/daq/api/api_cpal.rs b/src/daq/api/api_cpal.rs index e21fae2..b28a7b3 100644 --- a/src/daq/api/api_cpal.rs +++ b/src/daq/api/api_cpal.rs @@ -10,6 +10,7 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{Device, Host, Sample, SampleFormat, SupportedBufferSize}; use crossbeam::channel::{Receiver, Sender}; use itertools::Itertools; +use std::collections::VecDeque; use std::sync::Arc; /// Convert datatype in CPAL sampleformat @@ -93,7 +94,7 @@ impl CpalApi { let mut oChannelCount = 0; let mut sample_rates = srs_tot.clone(); - let mut avFramesPerBlock = vec![256, 512, 1024, 2048, 8192]; + let mut avFramesPerBlock = vec![256 as usize, 512, 1024, 2048, 8192]; let mut sample_formats = vec![]; // Search for sample formats @@ -107,8 +108,8 @@ impl CpalApi { sample_rates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt); sample_rates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt); if let SupportedBufferSize::Range { min, max } = icfg.buffer_size() { - avFramesPerBlock.retain(|i| i >= min); - avFramesPerBlock.retain(|i| i <= max); + avFramesPerBlock.retain(|i| i >= &(*min as usize)); + avFramesPerBlock.retain(|i| i <= &(*max as usize)); } iChannelCount = icfg.channels() as u8; // avFramesPerBlock.retain(|i| i >= icfg.buffer_size().) @@ -124,8 +125,8 @@ impl CpalApi { sample_rates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt); sample_rates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt); if let SupportedBufferSize::Range { min, max } = ocfg.buffer_size() { - avFramesPerBlock.retain(|i| i >= min); - avFramesPerBlock.retain(|i| i <= max); + avFramesPerBlock.retain(|i| i >= &(*min as usize)); + avFramesPerBlock.retain(|i| i <= &(*max as usize)); } oChannelCount = ocfg.channels() as u8; } @@ -145,7 +146,7 @@ impl CpalApi { let prefSampleRate = *sample_rates.last().unwrap_or(&48000.); devs.push(DeviceInfo { api: super::StreamApiDescr::Cpal, - name: dev.name()?, + device_name: dev.name()?, avDataTypes: dtypes, prefDataType, @@ -193,21 +194,35 @@ impl CpalApi { device: &cpal::Device, sender: Sender, en_inchannels: Vec, - framesPerBlock: u32, + framesPerBlock: usize, ) -> Result { - let tot_inch = config.channels; + let tot_inch = config.channels as usize; let sender_err = sender.clone(); + macro_rules! build_stream{ ($($cpaltype:pat, $rtype:ty);*) => { match sf { $( - $cpaltype => device.build_input_stream( + $cpaltype => { + let mut q = VecDeque::<$rtype>::with_capacity(2*tot_inch*framesPerBlock); + + device.build_input_stream( &config, - move |data, _: &_| InStreamCallback::<$rtype>(data, &sender, tot_inch, &en_inchannels, framesPerBlock), + move |data, _: &_| InStreamCallback::<$rtype>( + data, &sender, + // Total number of input channels. This API has to filter out + // the channels that are not enabled + tot_inch, + // Vector of channels numbers that are enabled + &en_inchannels, + // Frames per block + framesPerBlock, + // Ring buffer for storage of samples as required. + &mut q), CpalApi::create_errfcn(sender_err), None)? - ),*, + }),*, _ => bail!("Unsupported sample format '{}'", sf) } } @@ -226,7 +241,7 @@ impl CpalApi { st: StreamType, devinfo: &DeviceInfo, conf: &DaqConfig, - dev: &cpal::Device, + _dev: &cpal::Device, conf_iterator: T, ) -> Result where @@ -246,7 +261,7 @@ impl CpalApi { && cpalconf.max_sample_rate().0 as Flt >= requested_sr { // Sample rate falls within range. - let requested_fpb = conf.framesPerBlock(devinfo); + let requested_fpb = conf.framesPerBlock(devinfo) as u32; // Last check: check if buffer size is allowed match cpalconf.buffer_size() { SupportedBufferSize::Range { min, max } => { @@ -336,11 +351,11 @@ impl CpalApi { } bail!(format!( "Error: requested device {} not found. Please make sure the device is available.", - devinfo.name + devinfo.device_name )) } - /// Start a default input stream for a device + /// Start a default input stream. /// /// pub fn startDefaultInputStream( @@ -349,11 +364,11 @@ impl CpalApi { ) -> Result> { if let Some(device) = self.host.default_input_device() { if let Ok(config) = device.default_input_config() { - let framesPerBlock = 4096; + let framesPerBlock: usize = 4096; let final_config = cpal::StreamConfig { channels: config.channels(), sample_rate: config.sample_rate(), - buffer_size: cpal::BufferSize::Fixed(framesPerBlock), + buffer_size: cpal::BufferSize::Fixed(framesPerBlock as u32), }; let en_inchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize)); @@ -429,28 +444,38 @@ impl CpalApi { fn InStreamCallback( input: &[T], sender: &Sender, - tot_inch: u16, + tot_inch: usize, en_inchannels: &[usize], - framesPerBlock: u32, + framesPerBlock: usize, + q: &mut VecDeque, ) where T: Copy + num::ToPrimitive + 'static, { - let msg = RawStreamData::from(input); - let nen_ch = en_inchannels.len(); - let nframes = input.len() / tot_inch as usize; - let mut enabled_ch_data = Vec::with_capacity(nen_ch * nframes); - unsafe { - enabled_ch_data.set_len(enabled_ch_data.capacity()); - } - // Chops of the disabled channels and forwards the data, DEINTERLEAVED - for (chout_idx, chout) in en_inchannels.iter().enumerate() { - let in_iterator = input.iter().skip(*chout).step_by(tot_inch as usize); - let out_iterator = enabled_ch_data.iter_mut().skip(chout_idx * nframes); - for (out, in_) in out_iterator.zip(in_iterator) { - *out = *in_; + // Copy elements over in ring buffer + q.extend(input); + while q.len() > tot_inch * framesPerBlock { + // println!("q full enough: {}", q.len()); + let mut enabled_ch_data: Vec = Vec::with_capacity(en_inchannels.len() * framesPerBlock); + unsafe { + enabled_ch_data.set_len(enabled_ch_data.capacity()); } - } - let msg = RawStreamData::from(enabled_ch_data); - sender.send(msg).unwrap() + // Loop over enabled channels + for (i, ch) in en_inchannels.iter().enumerate() { + let in_iterator = q.iter().skip(*ch).step_by(tot_inch); + let out_iterator = enabled_ch_data.iter_mut().skip(i).step_by(en_inchannels.len()); + + // Copy over elements, *DEINTERLEAVED* + out_iterator.zip(in_iterator).for_each(|(o, i)| { + *o = *i; + }); + } + + // Drain copied elements from ring buffer + q.drain(0..framesPerBlock * tot_inch); + + // Send over data + let msg = RawStreamData::from(enabled_ch_data); + sender.send(msg).unwrap() + } } diff --git a/src/daq/api/mod.rs b/src/daq/api/mod.rs index a1dbc28..9ca3454 100644 --- a/src/daq/api/mod.rs +++ b/src/daq/api/mod.rs @@ -8,7 +8,7 @@ use strum_macros; use super::StreamMetaData; -#[cfg(feature = "cpal_api")] +#[cfg(feature = "cpal-api")] pub mod api_cpal; #[cfg(feature = "pulse_api")] @@ -31,4 +31,7 @@ pub enum StreamApiDescr { /// CPAL api #[strum(message = "Cpal", detailed_message = "Cross-Platform Audio Library")] Cpal = 0, + /// PulseAudio api + #[strum(message = "pulse", detailed_message = "Pulseaudio")] + Pulse = 1, } diff --git a/src/daq/daqconfig.rs b/src/daq/daqconfig.rs index bcc274f..f12a2cd 100644 --- a/src/daq/daqconfig.rs +++ b/src/daq/daqconfig.rs @@ -1,3 +1,7 @@ +use std::{ops::Index, path::PathBuf}; + +use anyhow::Result; +use hdf5::File; use super::api::StreamApiDescr; use super::datatype::DataType; use super::deviceinfo::DeviceInfo; @@ -76,6 +80,92 @@ pub struct DaqConfig { } impl DaqConfig { + /// Creates a new default device configuration for a given device as specified with + /// the DeviceInfo descriptor. + pub fn newFromDeviceInfo(devinfo: &DeviceInfo) -> DaqConfig { + + let inchannel_config = (0..devinfo.iChannelCount) + .map(|_| DaqChannel::default()) + .collect(); + let outchannel_config = (0..devinfo.oChannelCount) + .map(|_| DaqChannel::default()) + .collect(); + + let sampleRateIndex = devinfo + .avSampleRates + .iter() + .position(|x| x == &devinfo.prefSampleRate) + .unwrap_or(devinfo.avSampleRates.len()-1); + // Choose 4096 when in list, otherwise choose the highes available value in list + let framesPerBlockIndex = devinfo + .avFramesPerBlock + .iter() + .position(|x| x == &4096) + .unwrap_or(devinfo.avFramesPerBlock.len() - 1); + + DaqConfig { + api: devinfo.api.clone(), + device_name: devinfo.device_name.clone(), + inchannel_config, + outchannel_config, + dtype: devinfo.prefDataType, + digitalHighPassCutOn: -1.0, + sampleRateIndex, + framesPerBlockIndex, + monitorOutput: false, + } + } + + /// Serialize DaqConfig object to TOML. + /// + /// Args + /// + /// * writer: Output writer, can be file or string, or anything that *is* std::io::Write + /// + pub fn serialize_TOML(&self, writer: &mut dyn std::io::Write) -> Result<()> { + + let ser_str = toml::to_string(&self)?; + writer.write_all(ser_str.as_bytes())?; + + Ok(()) + } + + /// Deserialize structure from TOML data + /// + /// # Args + /// + /// * reader: implements the Read trait, from which we read the data. + pub fn deserialize_TOML(reader: &mut T) -> Result where T: std::io::Read { + let mut read_str = vec![]; + reader.read_to_end(&mut read_str)?; + let read_str = String::from_utf8(read_str)?; + DaqConfig::deserialize_TOML_str(&read_str) + } + + + /// Deserialize from TOML string + /// + /// # Args + /// + /// * st: string containing TOML data. + pub fn deserialize_TOML_str(st: &String) -> Result { + let res : DaqConfig = toml::from_str(&st)?; + Ok(res) + } + + /// Write this configuration to a TOML file. + /// + /// Args + /// + /// * file: Name of file to write to + /// + pub fn serialize_TOML_file(&self, file: &PathBuf) -> Result<()> { + + let mut file = std::fs::File::create(file)?; + self.serialize_TOML(&mut file)?; + Ok(()) + } + /// Returns a list of enabled input channel numbers as indices /// in the list of all input channels (enabled and not) pub fn enabledInchannelsList(&self) -> Vec { @@ -89,10 +179,7 @@ impl DaqConfig { /// Returns the total number of channels that appear in a running input stream. pub fn numberEnabledInChannels(&self) -> usize { - self.inchannel_config - .iter() - .filter(|ch| ch.enabled) - .count() + self.inchannel_config.iter().filter(|ch| ch.enabled).count() } /// Returns the total number of channels that appear in a running output stream. pub fn numberEnabledOutChannels(&self) -> usize { @@ -108,7 +195,7 @@ impl DaqConfig { } /// Provide samplerate, based on device and specified sample rate index - pub fn framesPerBlock(&self, dev: &DeviceInfo) -> u32 { + pub fn framesPerBlock(&self, dev: &DeviceInfo) -> usize { dev.avFramesPerBlock[self.framesPerBlockIndex] } diff --git a/src/daq/deviceinfo.rs b/src/daq/deviceinfo.rs index 22af511..ece86c6 100644 --- a/src/daq/deviceinfo.rs +++ b/src/daq/deviceinfo.rs @@ -14,7 +14,7 @@ pub struct DeviceInfo { pub api: StreamApiDescr, /// Name for the device. - pub name: String, + pub device_name: String, /// Available data types for the sample pub avDataTypes: Vec, @@ -22,7 +22,7 @@ pub struct DeviceInfo { pub prefDataType: DataType, /// Available frames per block - pub avFramesPerBlock: Vec, + pub avFramesPerBlock: Vec, /// Preferred frames per block for device pub prefFramesPerBlock: usize, @@ -63,4 +63,3 @@ pub struct DeviceInfo { /// such a Volts. pub physicalIOQty: Qty, } - diff --git a/src/daq/mod.rs b/src/daq/mod.rs index 7c23697..3dd998b 100644 --- a/src/daq/mod.rs +++ b/src/daq/mod.rs @@ -20,7 +20,7 @@ pub use streammsg::*; #[cfg(feature = "record")] pub use record::*; -#[cfg(feature = "cpal_api")] +#[cfg(feature = "cpal-api")] use api::api_cpal::CpalApi; use crate::{ @@ -39,6 +39,15 @@ use std::sync::{atomic::AtomicBool, Arc, Mutex}; use std::thread::{JoinHandle, Thread}; use streammsg::*; +use self::api::StreamApiDescr; + +cfg_if::cfg_if! { +if #[cfg(feature = "python-bindings")] { + use pyo3::exceptions::PyValueError; + use pyo3::prelude::*; + use pyo3::{pymodule, types::PyModule, PyResult}; +} else {} } + /// Keep track of whether the stream has been created. To ensure singleton behaviour. static smgr_created: AtomicBool = AtomicBool::new(false); @@ -49,16 +58,20 @@ struct StreamData { comm: Sender, } +#[cfg_attr(feature = "python-bindings", pyclass(unsendable))] /// Configure and manage input / output streams. /// pub struct StreamMgr { + // List of available devices + devs: Vec, + // Input stream can be both input and duplex input_stream: Option>, // Output only stream output_stream: Option>, - #[cfg(feature = "cpal_api")] + #[cfg(feature = "cpal-api")] cpal_api: CpalApi, /// The storage of queues. When no streams are running, they @@ -85,16 +98,19 @@ impl StreamMgr { } smgr_created.store(true, std::sync::atomic::Ordering::Relaxed); - StreamMgr { + let mut smgr = StreamMgr { + devs: vec![], input_stream: None, output_stream: None, siggen: None, - #[cfg(feature = "cpal_api")] + #[cfg(feature = "cpal-api")] cpal_api: CpalApi::new(), instreamqueues: Some(vec![]), - } + }; + smgr.devs = smgr.scanDeviceInfo(); + smgr } /// Set a new signal generator. Returns an error if it is unapplicable. /// It is unapplicable if the number of channels of output does not match the @@ -125,9 +141,13 @@ impl StreamMgr { } /// Obtain a list of devices that are available for each available API - pub fn getDeviceInfo(&mut self) -> Vec { + pub fn getDeviceInfo(&mut self) -> &Vec { + &self.devs + } + + fn scanDeviceInfo(&self) -> Vec { let mut devinfo = vec![]; - #[cfg(feature = "cpal_api")] + #[cfg(feature = "cpal-api")] { let cpal_devs = self.cpal_api.getDeviceInfo(); if let Ok(devs) = cpal_devs { @@ -136,6 +156,7 @@ impl StreamMgr { } devinfo } + /// Add a new queue to the lists of queues pub fn addInQueue(&mut self, tx: Sender) { if let Some(is) = &self.input_stream { @@ -182,7 +203,6 @@ impl StreamMgr { } StreamCommand::NewSiggen(_) => { panic!("Error: signal generator send to input-only stream."); - break 'infy; } } } @@ -191,16 +211,66 @@ impl StreamMgr { let msg = Arc::new(msg); let msg = InStreamMsg::RawStreamData(ctr, msg); sendMsgToAllQueues(&mut iqueues, msg); + ctr += 1; } - ctr += 1; } iqueues }); (threadhandle, commtx) } + fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> { + for d in self.devs.iter() { + if d.device_name == cfg.device_name { + return Some(d); + } + } + None + } + + /// Start a stream of certain type, using given configuration + pub fn startStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> { + if self.input_stream.is_some() { + bail!("Input stream is already running. Please first stop existing input stream.") + } + match stype { + StreamType::Input | StreamType::Duplex => { + if cfg.numberEnabledInChannels() == 0 { + bail!("At least one input channel should be enabled for an input stream") + } + } + _ => {} + } + + let (tx, rx): (Sender, Receiver) = unbounded(); + + let stream = match cfg.api { + StreamApiDescr::Cpal => { + let devinfo = self + .match_devinfo(cfg) + .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?; + self.cpal_api.startStream(stype, devinfo, cfg, tx)? + } + _ => bail!("Unimplemented api!"), + }; + + let iqueues = self.instreamqueues.as_mut().unwrap(); + let meta = stream.metadata().unwrap(); + sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(Arc::new(meta))); + + let (threadhandle, commtx) = self.startInputStreamThread(&stream, rx); + + self.input_stream = Some(StreamData { + streamtype: stype, + stream, + threadhandle, + comm: commtx, + }); + + Ok(()) + } /// Start a default input stream, using default settings on everything. This is only possible - /// when + /// when the CPAL_api is available pub fn startDefaultInputStream(&mut self) -> Result<()> { if self.input_stream.is_some() { bail!("Input stream is already running. Please first stop existing input stream.") @@ -210,7 +280,7 @@ impl StreamMgr { // Only a default input stream when CPAL feature is enabled cfg_if::cfg_if! { - if #[cfg(feature="cpal_api")] { + if #[cfg(feature="cpal-api")] { let stream = self.cpal_api.startDefaultInputStream(tx)?; // Inform all listeners of new stream data diff --git a/src/daq/record.rs b/src/daq/record.rs index d035597..2cad991 100644 --- a/src/daq/record.rs +++ b/src/daq/record.rs @@ -1,7 +1,10 @@ use super::*; use anyhow::{bail, Error, Result}; +use clap::builder::OsStr; +use crossbeam::atomic::AtomicCell; use hdf5::types::{VarLenArray, VarLenUnicode}; use hdf5::{dataset, datatype, Dataset, File, H5Type}; +use ndarray::ArrayView2; use num::traits::ops::mul_add; use serde::de::IntoDeserializer; use std::path::{Path, PathBuf}; @@ -11,15 +14,23 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use strum::EnumMessage; -#[derive(Clone)] +#[derive(Clone, Debug)] +/// Status of a recording pub enum RecordStatus { + /// Nothing to update + NoUpdate, + /// Not yet started, waiting for first msg Idle, + /// Recording in progress Recording(Duration), + /// Recording finished Finished, + /// An error occurred. Error(String), } /// Settings used to start a recording. +#[derive(Clone)] pub struct RecordSettings { /// File name to record to. pub filename: PathBuf, @@ -30,9 +41,11 @@ pub struct RecordSettings { /// Create a recording pub struct Recording { + settings: RecordSettings, handle: Option>>, tx: Sender, - status: Arc>, + status_from_thread: Arc>, + last_status: RecordStatus, } impl Recording { @@ -82,6 +95,45 @@ impl Recording { Ok(()) } + #[inline] + fn append_to_dset( + ds: &Dataset, + ctr: usize, + msg: &RawStreamData, + framesPerBlock: usize, + nchannels: usize, + ) -> Result<()> { + match msg { + RawStreamData::Datai8(dat) => { + let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + ds.write_slice(arr, (ctr, .., ..))?; + } + RawStreamData::Datai16(dat) => { + let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + ds.write_slice(arr, (ctr, .., ..))?; + } + RawStreamData::Datai32(dat) => { + let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + ds.write_slice(arr, (ctr, .., ..))?; + } + RawStreamData::Dataf32(dat) => { + let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + ds.write_slice(arr, (ctr, .., ..))?; + } + RawStreamData::Dataf64(dat) => { + let arr = ndarray::ArrayView2::::from_shape((framesPerBlock, nchannels), dat)?; + ds.write_slice(arr, (ctr, .., ..))?; + } + RawStreamData::UnknownDataType => { + bail!("Unknown data type!") + } + RawStreamData::StreamError(e) => { + bail!("Stream error: {}", e) + } + } + Ok(()) + } + /// Start a new recording /// /// # Arguments @@ -89,8 +141,26 @@ impl Recording { /// * setttings: The settings to use for the recording /// * smgr: Stream manager to use to start the recording /// - pub fn new(settings: RecordSettings, mgr: &mut StreamMgr) -> Result { - let status = Arc::new(Mutex::new(RecordStatus::Idle)); + pub fn new(mut settings: RecordSettings, mgr: &mut StreamMgr) -> Result { + // Append extension if not yet there + match settings.filename.extension() { + Some(a) if a == OsStr::from("h5") => {} + None | Some(_) => { + settings.filename = + (settings.filename.to_string_lossy().to_string() + ".h5").into(); + } + }; + + // Fail if filename already exists + if settings.filename.exists() { + bail!( + "Filename '{}' already exists in filesystem", + settings.filename.to_string_lossy() + ); + } + let settings2 = settings.clone(); + + let status = Arc::new(AtomicCell::new(RecordStatus::Idle)); let status2 = status.clone(); let (tx, rx) = crossbeam::channel::unbounded(); @@ -102,7 +172,7 @@ impl Recording { let firstmsg = match rx.recv() { Ok(msg) => msg, - Err(e) => bail!("Queue handle error"), + Err(_) => bail!("Queue handle error"), }; let meta = match firstmsg { @@ -125,7 +195,6 @@ impl Recording { let timestamp = now_utc.timestamp(); Recording::write_hdf5_attr_scalar(&file, "time", timestamp)?; - // Create UUID for measurement use hdf5::types::VarLenUnicode; let uuid = uuid::Uuid::new_v4(); @@ -148,15 +217,17 @@ impl Recording { let ds = Recording::create_dataset(&file, &meta)?; // Indicate we are ready to rec! - *status.lock().unwrap() = RecordStatus::Recording(Duration::ZERO); + status.store(RecordStatus::Recording(Duration::ZERO)); let mut ctr = 0; + let mut ctr_offset = 0; + let mut first = true; let framesPerBlock = meta.framesPerBlock as usize; let nchannels = meta.nchannels() as usize; 'recloop: loop { match rx.recv().unwrap() { InStreamMsg::StreamError(e) => { - bail!("Recording failed due to stream error.") + bail!("Recording failed due to stream error: {}.", e) } InStreamMsg::ConvertedStreamData(..) => {} InStreamMsg::StreamStarted(_) => { @@ -167,19 +238,24 @@ impl Recording { break 'recloop; } InStreamMsg::RawStreamData(incoming_ctr, dat) => { - // if incoming_ctr != ctr { - // bail!("Packages missed. Recording invalid.") - // } - - let tst = ndarray::Array2::::ones((framesPerBlock, nchannels)); - + if first { + first = false; + ctr_offset = incoming_ctr; + } else { + if incoming_ctr != ctr + ctr_offset { + println!("********** PACKAGES MISSED ***********"); + bail!("Packages missed. Recording invalid.") + } + } ds.resize((ctr + 1, framesPerBlock, nchannels))?; - ds.write_slice(&tst, (ctr, .., ..))?; + Recording::append_to_dset( + &ds, + ctr, + dat.as_ref(), + framesPerBlock, + nchannels, + )?; - // match dat { - // RawStreamData::Datai8(d) => ds. - - // } let recorded_time = Duration::from_millis( ((1000 * (ctr + 1) * framesPerBlock) as Flt / meta.samplerate) as u64, ); @@ -188,21 +264,23 @@ impl Recording { break 'recloop; } } - // println!("... {}", recorded_time.as_millis()); + // println!("\n... {} {} {}", recorded_time.as_millis(), meta.samplerate, framesPerBlock); ctr += 1; - *status.lock().unwrap() = RecordStatus::Recording(recorded_time); + status.store(RecordStatus::Recording(recorded_time)); } } } // end of loop - *status.lock().unwrap() = RecordStatus::Finished; + status.store(RecordStatus::Finished); Ok(()) // End of thread }); Ok(Recording { + settings: settings2, handle: Some(handle), - status: status2, + status_from_thread: status2, + last_status: RecordStatus::NoUpdate, tx, }) } @@ -215,15 +293,32 @@ impl Recording { let h = self.handle.take().unwrap(); let res = h.join().unwrap(); if let Err(e) = res { - *self.status.lock().unwrap() = RecordStatus::Error(format!("{}", e)); + self.last_status = RecordStatus::Error(format!("{}", e)); + // File should not be un use anymore, as thread is joined. + // In case of error, we try to delete the file + if let Err(e) = std::fs::remove_file(&self.settings.filename) { + eprintln!("Recording failed, but file removal failed as well: {}", e); + } } } } } + /// Get current record status pub fn status(&mut self) -> RecordStatus { + let status_from_thread = self.status_from_thread.swap(RecordStatus::NoUpdate); + match status_from_thread { + RecordStatus::NoUpdate => {} + _ => { + // println!("Updating status to: {:?}", status_from_thread); + self.last_status = status_from_thread; + } + } + // If the thread has exited with an error, the status is overwritten + // in this method. self.cleanupThreadIfPossible(); - self.status.lock().unwrap().clone() + // Return latest status + self.last_status.clone() } /// Stop existing recording early. At the current time, or st @@ -238,7 +333,7 @@ impl Recording { let h = self.handle.take().unwrap(); let res = h.join().unwrap(); if let Err(e) = res { - *self.status.lock().unwrap() = RecordStatus::Error(format!("{}", e)); + self.last_status = RecordStatus::Error(format!("{}", e)); } Ok(()) diff --git a/src/daq/streammsg.rs b/src/daq/streammsg.rs index 416a4fe..4d7f1eb 100644 --- a/src/daq/streammsg.rs +++ b/src/daq/streammsg.rs @@ -8,8 +8,15 @@ use reinterpret::{reinterpret_slice, reinterpret_vec}; use std::any::TypeId; use std::sync::Arc; use std::u128::MAX; +use strum_macros::Display; use super::*; +cfg_if::cfg_if! { +if #[cfg(feature = "python-bindings")] { + use pyo3::exceptions::PyValueError; + use pyo3::prelude::*; + use pyo3::{pymodule, pyclass, types::PyModule, PyResult}; +} else {} } /// Raw stream data coming from a stream. #[derive(Clone, Debug)] @@ -111,7 +118,6 @@ where /// Stream metadata. All information required for #[derive(Clone, Debug)] pub struct StreamMetaData { - /// Information for each channel in the stream pub channelInfo: Vec, @@ -122,14 +128,19 @@ pub struct StreamMetaData { pub samplerate: Flt, /// The number of frames per block send over - pub framesPerBlock: u32, + pub framesPerBlock: usize, } impl StreamMetaData { /// Create new metadata object. /// /// /// # Args /// - pub fn new(channelInfo: &[DaqChannel], rawdtype: DataType, sr: Flt, framesPerBlock: u32) -> Result { + pub fn new( + channelInfo: &[DaqChannel], + rawdtype: DataType, + sr: Flt, + framesPerBlock: usize, + ) -> Result { Ok(StreamMetaData { channelInfo: channelInfo.to_vec(), rawDatatype: rawdtype, @@ -139,7 +150,9 @@ impl StreamMetaData { } /// Returns the number of channels in the stream metadata. - pub fn nchannels(&self) -> usize {self.channelInfo.len()} + pub fn nchannels(&self) -> usize { + self.channelInfo.len() + } } /// Input stream messages, to be send to handlers. #[derive(Clone, Debug)] @@ -183,8 +196,9 @@ pub enum StreamCommand { } /// Stream types that can be started -/// -#[derive(PartialEq, Clone)] +/// +#[cfg_attr(feature = "python-bindings", pyclass)] +#[derive(PartialEq, Clone, Copy)] pub enum StreamType { /// Input-only stream Input, @@ -195,13 +209,16 @@ pub enum StreamType { } /// Errors that happen in a stream -#[derive(strum_macros::EnumMessage, Debug, Clone)] +#[derive(strum_macros::EnumMessage, Debug, Clone, Display)] pub enum StreamError { /// Input overrun #[strum(message = "InputXRunError", detailed_message = "Input buffer overrun")] InputXRunError, /// Output underrun - #[strum(message = "OutputXRunError", detailed_message = "Output buffer overrun")] + #[strum( + message = "OutputXRunError", + detailed_message = "Output buffer overrun" + )] OutputXRunError, /// Driver specific error #[strum(message = "DriverError", detailed_message = "Driver error")] @@ -213,5 +230,5 @@ pub enum StreamError { /// Logic error (something weird happened) #[strum(detailed_message = "Logic error")] - LogicError + LogicError, } diff --git a/src/lib.rs b/src/lib.rs index 1f2c92c..5156d8e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,8 +17,14 @@ pub mod filter; pub mod daq; pub mod siggen; -#[cfg(feature = "python-bindings")] -use pyo3::prelude::*; +pub use config::*; + +cfg_if::cfg_if! { +if #[cfg(feature = "python-bindings")] { + use pyo3::prelude::*; + use pyo3::{pymodule, PyResult}; +} else {} } + /// A Python module implemented in Rust. #[cfg(feature = "python-bindings")]