Recording is working. Partial pyo3 exposure. Now, RtAps, or first Siggen?

This commit is contained in:
Anne de Jong 2023-12-28 23:49:25 +01:00
parent 87f8b05eea
commit b770c4d8fb
14 changed files with 533 additions and 145 deletions

View File

@ -73,13 +73,14 @@ itertools = "0.12.0"
chrono = {version = "0.4.31", optional = true} chrono = {version = "0.4.31", optional = true}
# For getting UUIDs in recording # For getting UUIDs in recording
uuid = { version = "1.6.1", features = ["v4"] , optional = true} uuid = { version = "1.6.1", features = ["v4"] , optional = true}
clap = { version = "4.4.11", features = ["derive", "color", "help", "suggestions"] }
[features] [features]
default = ["f64", "cpal_api", "record"] default = ["f64", "cpal-api", "record"]
# Use this for debugging extensions # Use this for debugging extensions
# default = ["f64", "python-bindings", "record", "cpal-api"] # default = ["f64", "python-bindings", "record", "cpal-api"]
cpal_api = ["dep:cpal"] cpal-api = ["dep:cpal"]
record = ["dep:hdf5-sys", "dep:hdf5", "dep:chrono", "dep:uuid"] record = ["dep:hdf5-sys", "dep:hdf5", "dep:chrono", "dep:uuid"]
f64 = [] f64 = []
f32 = [] f32 = []

View File

@ -1,14 +1,36 @@
use anyhow::Result; use anyhow::Result;
use lasprs::daq::StreamMgr; use clap::Parser;
use lasprs::daq::{DaqConfig, StreamMgr};
/// Simple program to greet a person
#[derive(Parser, Debug)]
#[command(author, version, about="Generates DAQ configurations for available devices.", long_about = None)]
struct Args {
/// Name of the person to greet
#[arg(short, long)]
matches: Vec<String>,
}
fn main() -> Result<()> { fn main() -> Result<()> {
let args = Args::parse();
let write_all = args.matches.len() == 0;
let mut smgr = StreamMgr::new(); let mut smgr = StreamMgr::new();
let devs = smgr.getDeviceInfo(); let devs = smgr.getDeviceInfo();
for dev in devs { for dev in devs.iter() {
println!("========="); let filename = dev.device_name.clone() + ".toml";
println!("{:?}", dev); if write_all {
println!("-------------"); let daqconfig = DaqConfig::newFromDeviceInfo(&dev);
daqconfig.serialize_TOML_file(&filename.clone().into())?;
} else {
for m in args.matches.iter() {
let needle =m.to_lowercase();
let dev_lower = (&dev.device_name).to_lowercase();
if dev_lower.contains(&needle) {
DaqConfig::newFromDeviceInfo(&dev).serialize_TOML_file(&filename.clone().into())?;
}
}
}
} }
Ok(()) Ok(())

View File

@ -1,5 +1,5 @@
use anyhow::Result; use anyhow::Result;
use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; use crossbeam::channel::{unbounded, Receiver, TryRecvError};
use lasprs::daq::{StreamHandler, StreamMgr, InStreamMsg}; use lasprs::daq::{StreamHandler, StreamMgr, InStreamMsg};
use std::io; use std::io;
use std::{thread, time}; use std::{thread, time};

98
src/bin/lasp_record.rs Normal file
View File

@ -0,0 +1,98 @@
use anyhow::Result;
use clap::{arg, command, Parser};
use crossbeam::channel::{unbounded, Receiver, TryRecvError};
#[cfg(feature = "record")]
use lasprs::daq::{StreamType,RecordSettings, RecordStatus, Recording, StreamMgr};
use lasprs::Flt;
use std::{
io, thread,
time::{self, Duration},
};
#[derive(Parser)]
#[command(author, version, about = "Record data to h5 file, according to LASP format", long_about = None)]
struct Cli {
/// File name to write recording to
filename: String,
/// Recording duration in [s]. Rounds down to whole seconds. If not specified, records until user presses a key
#[arg(short, long = "duration", default_value_t = 0.)]
duration_s: Flt,
/// TOML configuration file for used stream
#[arg(short, long = "config-file")]
config_file_daq: Option<String>,
}
#[cfg(feature = "record")]
fn main() -> Result<()> {
use lasprs::daq::DaqConfig;
let ops = Cli::parse();
let mut smgr = StreamMgr::new();
let stdin_channel = spawn_stdin_channel();
let settings = RecordSettings {
filename: ops.filename.into(),
duration: Duration::from_secs(ops.duration_s as u64),
};
match ops.config_file_daq {
None => smgr.startDefaultInputStream()?,
Some(filename) => {
let file = std::fs::read_to_string(filename)?;
let cfg = DaqConfig::deserialize_TOML_str(&file)?;
smgr.startStream(StreamType::Input, &cfg)?;
}
}
let mut r = Recording::new(settings, &mut smgr)?;
println!("Starting to record...");
'infy: loop {
match r.status() {
RecordStatus::Idle => println!("\nIdle"),
RecordStatus::Error(e) => {
println!("\nRecord error: {}", e);
break 'infy;
}
RecordStatus::Finished => {
println!("\nRecording finished.");
break 'infy;
}
RecordStatus::Recording(duration) => {
println!("Recording... {} ms", duration.as_millis());
}
RecordStatus::NoUpdate => {}
};
match stdin_channel.try_recv() {
Ok(_key) => {
println!("User pressed key. Manually stopping recording here.");
break 'infy;
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
sleep(100);
}
Ok(())
}
fn sleep(millis: u64) {
let duration = time::Duration::from_millis(millis);
thread::sleep(duration);
}
fn spawn_stdin_channel() -> Receiver<String> {
let (tx, rx) = unbounded();
thread::spawn(move || loop {
let mut buffer = String::new();
io::stdin().read_line(&mut buffer).unwrap();
tx.send(buffer).unwrap();
});
rx
}

View File

@ -1,45 +0,0 @@
use anyhow::Result;
#[cfg(feature="record")]
use lasprs::daq::{RecordSettings, RecordStatus, Recording, StreamMgr};
use std::{thread, time::{self, Duration}};
// use
#[cfg(feature="record")]
fn main() -> Result<()> {
let mut smgr = StreamMgr::new();
let settings = RecordSettings {
filename: "test.h5".into(),
duration: Duration::from_secs(2),
};
smgr.startDefaultInputStream()?;
let mut r = Recording::new(settings, &mut smgr)?;
println!("Starting to record...");
loop {
match r.status() {
RecordStatus::Idle => println!("Idle"),
RecordStatus::Error(e) => {
println!("Record error: {}", e);
break;
}
RecordStatus::Finished => {
println!("\nRecording finished.");
break;
}
RecordStatus::Recording(duration) => {
print!("\rRecording... {} ms", duration.as_millis());
}
};
sleep(10);
}
Ok(())
}
fn sleep(millis: u64) {
let duration = time::Duration::from_millis(millis);
thread::sleep(duration);
}

View File

@ -4,11 +4,15 @@
cfg_if::cfg_if! { cfg_if::cfg_if! {
if #[cfg(feature="f64")] { if #[cfg(feature="f64")] {
/// Floating-point value, compile time option to make it either f32, or f64
pub type Flt = f64; pub type Flt = f64;
/// Ratio between circumference and diameter of a circle
pub const pi: Flt = std::f64::consts::PI; pub const pi: Flt = std::f64::consts::PI;
} }
else if #[cfg(feature="f32")] { else if #[cfg(feature="f32")] {
/// Floating-point value, compile time option to make it either f32, or f64
pub type Flt = f32; pub type Flt = f32;
/// Ratio between circumference and diameter of a circle
pub const pi: Flt = std::f32::consts::PI; pub const pi: Flt = std::f32::consts::PI;
} }
else { else {
@ -21,11 +25,17 @@ use num::complex::*;
pub type Cflt = Complex<Flt>; pub type Cflt = Complex<Flt>;
use ndarray::{Array1, Array2}; use ndarray::{Array1, Array2};
/// Vector of floating point values
pub type Vd = Vec<Flt>; pub type Vd = Vec<Flt>;
/// Vector of complex floating point values
pub type Vc = Vec<Cflt>; pub type Vc = Vec<Cflt>;
/// 1D array of floats
pub type Dcol = Array1<Flt>; pub type Dcol = Array1<Flt>;
/// 1D array of complex floats
pub type Ccol = Array1<Cflt>; pub type Ccol = Array1<Cflt>;
/// 2D array of floats
pub type Dmat = Array2<Flt>; pub type Dmat = Array2<Flt>;
/// 2D array of complex floats
pub type Cmat = Array2<Cflt>; pub type Cmat = Array2<Cflt>;

View File

@ -10,6 +10,7 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{Device, Host, Sample, SampleFormat, SupportedBufferSize}; use cpal::{Device, Host, Sample, SampleFormat, SupportedBufferSize};
use crossbeam::channel::{Receiver, Sender}; use crossbeam::channel::{Receiver, Sender};
use itertools::Itertools; use itertools::Itertools;
use std::collections::VecDeque;
use std::sync::Arc; use std::sync::Arc;
/// Convert datatype in CPAL sampleformat /// Convert datatype in CPAL sampleformat
@ -93,7 +94,7 @@ impl CpalApi {
let mut oChannelCount = 0; let mut oChannelCount = 0;
let mut sample_rates = srs_tot.clone(); let mut sample_rates = srs_tot.clone();
let mut avFramesPerBlock = vec![256, 512, 1024, 2048, 8192]; let mut avFramesPerBlock = vec![256 as usize, 512, 1024, 2048, 8192];
let mut sample_formats = vec![]; let mut sample_formats = vec![];
// Search for sample formats // Search for sample formats
@ -107,8 +108,8 @@ impl CpalApi {
sample_rates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt); sample_rates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt);
sample_rates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt); sample_rates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt);
if let SupportedBufferSize::Range { min, max } = icfg.buffer_size() { if let SupportedBufferSize::Range { min, max } = icfg.buffer_size() {
avFramesPerBlock.retain(|i| i >= min); avFramesPerBlock.retain(|i| i >= &(*min as usize));
avFramesPerBlock.retain(|i| i <= max); avFramesPerBlock.retain(|i| i <= &(*max as usize));
} }
iChannelCount = icfg.channels() as u8; iChannelCount = icfg.channels() as u8;
// avFramesPerBlock.retain(|i| i >= icfg.buffer_size().) // avFramesPerBlock.retain(|i| i >= icfg.buffer_size().)
@ -124,8 +125,8 @@ impl CpalApi {
sample_rates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt); sample_rates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt);
sample_rates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt); sample_rates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt);
if let SupportedBufferSize::Range { min, max } = ocfg.buffer_size() { if let SupportedBufferSize::Range { min, max } = ocfg.buffer_size() {
avFramesPerBlock.retain(|i| i >= min); avFramesPerBlock.retain(|i| i >= &(*min as usize));
avFramesPerBlock.retain(|i| i <= max); avFramesPerBlock.retain(|i| i <= &(*max as usize));
} }
oChannelCount = ocfg.channels() as u8; oChannelCount = ocfg.channels() as u8;
} }
@ -145,7 +146,7 @@ impl CpalApi {
let prefSampleRate = *sample_rates.last().unwrap_or(&48000.); let prefSampleRate = *sample_rates.last().unwrap_or(&48000.);
devs.push(DeviceInfo { devs.push(DeviceInfo {
api: super::StreamApiDescr::Cpal, api: super::StreamApiDescr::Cpal,
name: dev.name()?, device_name: dev.name()?,
avDataTypes: dtypes, avDataTypes: dtypes,
prefDataType, prefDataType,
@ -193,21 +194,35 @@ impl CpalApi {
device: &cpal::Device, device: &cpal::Device,
sender: Sender<RawStreamData>, sender: Sender<RawStreamData>,
en_inchannels: Vec<usize>, en_inchannels: Vec<usize>,
framesPerBlock: u32, framesPerBlock: usize,
) -> Result<cpal::Stream> { ) -> Result<cpal::Stream> {
let tot_inch = config.channels; let tot_inch = config.channels as usize;
let sender_err = sender.clone(); let sender_err = sender.clone();
macro_rules! build_stream{ macro_rules! build_stream{
($($cpaltype:pat, $rtype:ty);*) => { ($($cpaltype:pat, $rtype:ty);*) => {
match sf { match sf {
$( $(
$cpaltype => device.build_input_stream( $cpaltype => {
let mut q = VecDeque::<$rtype>::with_capacity(2*tot_inch*framesPerBlock);
device.build_input_stream(
&config, &config,
move |data, _: &_| InStreamCallback::<$rtype>(data, &sender, tot_inch, &en_inchannels, framesPerBlock), move |data, _: &_| InStreamCallback::<$rtype>(
data, &sender,
// Total number of input channels. This API has to filter out
// the channels that are not enabled
tot_inch,
// Vector of channels numbers that are enabled
&en_inchannels,
// Frames per block
framesPerBlock,
// Ring buffer for storage of samples as required.
&mut q),
CpalApi::create_errfcn(sender_err), CpalApi::create_errfcn(sender_err),
None)? None)?
),*, }),*,
_ => bail!("Unsupported sample format '{}'", sf) _ => bail!("Unsupported sample format '{}'", sf)
} }
} }
@ -226,7 +241,7 @@ impl CpalApi {
st: StreamType, st: StreamType,
devinfo: &DeviceInfo, devinfo: &DeviceInfo,
conf: &DaqConfig, conf: &DaqConfig,
dev: &cpal::Device, _dev: &cpal::Device,
conf_iterator: T, conf_iterator: T,
) -> Result<cpal::SupportedStreamConfig> ) -> Result<cpal::SupportedStreamConfig>
where where
@ -246,7 +261,7 @@ impl CpalApi {
&& cpalconf.max_sample_rate().0 as Flt >= requested_sr && cpalconf.max_sample_rate().0 as Flt >= requested_sr
{ {
// Sample rate falls within range. // Sample rate falls within range.
let requested_fpb = conf.framesPerBlock(devinfo); let requested_fpb = conf.framesPerBlock(devinfo) as u32;
// Last check: check if buffer size is allowed // Last check: check if buffer size is allowed
match cpalconf.buffer_size() { match cpalconf.buffer_size() {
SupportedBufferSize::Range { min, max } => { SupportedBufferSize::Range { min, max } => {
@ -336,11 +351,11 @@ impl CpalApi {
} }
bail!(format!( bail!(format!(
"Error: requested device {} not found. Please make sure the device is available.", "Error: requested device {} not found. Please make sure the device is available.",
devinfo.name devinfo.device_name
)) ))
} }
/// Start a default input stream for a device /// Start a default input stream.
/// ///
/// ///
pub fn startDefaultInputStream( pub fn startDefaultInputStream(
@ -349,11 +364,11 @@ impl CpalApi {
) -> Result<Box<dyn Stream>> { ) -> Result<Box<dyn Stream>> {
if let Some(device) = self.host.default_input_device() { if let Some(device) = self.host.default_input_device() {
if let Ok(config) = device.default_input_config() { if let Ok(config) = device.default_input_config() {
let framesPerBlock = 4096; let framesPerBlock: usize = 4096;
let final_config = cpal::StreamConfig { let final_config = cpal::StreamConfig {
channels: config.channels(), channels: config.channels(),
sample_rate: config.sample_rate(), sample_rate: config.sample_rate(),
buffer_size: cpal::BufferSize::Fixed(framesPerBlock), buffer_size: cpal::BufferSize::Fixed(framesPerBlock as u32),
}; };
let en_inchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize)); let en_inchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize));
@ -429,28 +444,38 @@ impl CpalApi {
fn InStreamCallback<T>( fn InStreamCallback<T>(
input: &[T], input: &[T],
sender: &Sender<RawStreamData>, sender: &Sender<RawStreamData>,
tot_inch: u16, tot_inch: usize,
en_inchannels: &[usize], en_inchannels: &[usize],
framesPerBlock: u32, framesPerBlock: usize,
q: &mut VecDeque<T>,
) where ) where
T: Copy + num::ToPrimitive + 'static, T: Copy + num::ToPrimitive + 'static,
{ {
let msg = RawStreamData::from(input); // Copy elements over in ring buffer
let nen_ch = en_inchannels.len(); q.extend(input);
let nframes = input.len() / tot_inch as usize; while q.len() > tot_inch * framesPerBlock {
let mut enabled_ch_data = Vec::with_capacity(nen_ch * nframes); // println!("q full enough: {}", q.len());
unsafe { let mut enabled_ch_data: Vec<T> = Vec::with_capacity(en_inchannels.len() * framesPerBlock);
enabled_ch_data.set_len(enabled_ch_data.capacity()); unsafe {
} enabled_ch_data.set_len(enabled_ch_data.capacity());
// Chops of the disabled channels and forwards the data, DEINTERLEAVED
for (chout_idx, chout) in en_inchannels.iter().enumerate() {
let in_iterator = input.iter().skip(*chout).step_by(tot_inch as usize);
let out_iterator = enabled_ch_data.iter_mut().skip(chout_idx * nframes);
for (out, in_) in out_iterator.zip(in_iterator) {
*out = *in_;
} }
}
let msg = RawStreamData::from(enabled_ch_data); // Loop over enabled channels
sender.send(msg).unwrap() for (i, ch) in en_inchannels.iter().enumerate() {
let in_iterator = q.iter().skip(*ch).step_by(tot_inch);
let out_iterator = enabled_ch_data.iter_mut().skip(i).step_by(en_inchannels.len());
// Copy over elements, *DEINTERLEAVED*
out_iterator.zip(in_iterator).for_each(|(o, i)| {
*o = *i;
});
}
// Drain copied elements from ring buffer
q.drain(0..framesPerBlock * tot_inch);
// Send over data
let msg = RawStreamData::from(enabled_ch_data);
sender.send(msg).unwrap()
}
} }

View File

@ -8,7 +8,7 @@ use strum_macros;
use super::StreamMetaData; use super::StreamMetaData;
#[cfg(feature = "cpal_api")] #[cfg(feature = "cpal-api")]
pub mod api_cpal; pub mod api_cpal;
#[cfg(feature = "pulse_api")] #[cfg(feature = "pulse_api")]
@ -31,4 +31,7 @@ pub enum StreamApiDescr {
/// CPAL api /// CPAL api
#[strum(message = "Cpal", detailed_message = "Cross-Platform Audio Library")] #[strum(message = "Cpal", detailed_message = "Cross-Platform Audio Library")]
Cpal = 0, Cpal = 0,
/// PulseAudio api
#[strum(message = "pulse", detailed_message = "Pulseaudio")]
Pulse = 1,
} }

View File

@ -1,3 +1,7 @@
use std::{ops::Index, path::PathBuf};
use anyhow::Result;
use hdf5::File;
use super::api::StreamApiDescr; use super::api::StreamApiDescr;
use super::datatype::DataType; use super::datatype::DataType;
use super::deviceinfo::DeviceInfo; use super::deviceinfo::DeviceInfo;
@ -76,6 +80,92 @@ pub struct DaqConfig {
} }
impl DaqConfig { impl DaqConfig {
/// Creates a new default device configuration for a given device as specified with
/// the DeviceInfo descriptor.
pub fn newFromDeviceInfo(devinfo: &DeviceInfo) -> DaqConfig {
let inchannel_config = (0..devinfo.iChannelCount)
.map(|_| DaqChannel::default())
.collect();
let outchannel_config = (0..devinfo.oChannelCount)
.map(|_| DaqChannel::default())
.collect();
let sampleRateIndex = devinfo
.avSampleRates
.iter()
.position(|x| x == &devinfo.prefSampleRate)
.unwrap_or(devinfo.avSampleRates.len()-1);
// Choose 4096 when in list, otherwise choose the highes available value in list
let framesPerBlockIndex = devinfo
.avFramesPerBlock
.iter()
.position(|x| x == &4096)
.unwrap_or(devinfo.avFramesPerBlock.len() - 1);
DaqConfig {
api: devinfo.api.clone(),
device_name: devinfo.device_name.clone(),
inchannel_config,
outchannel_config,
dtype: devinfo.prefDataType,
digitalHighPassCutOn: -1.0,
sampleRateIndex,
framesPerBlockIndex,
monitorOutput: false,
}
}
/// Serialize DaqConfig object to TOML.
///
/// Args
///
/// * writer: Output writer, can be file or string, or anything that *is* std::io::Write
///
pub fn serialize_TOML(&self, writer: &mut dyn std::io::Write) -> Result<()> {
let ser_str = toml::to_string(&self)?;
writer.write_all(ser_str.as_bytes())?;
Ok(())
}
/// Deserialize structure from TOML data
///
/// # Args
///
/// * reader: implements the Read trait, from which we read the data.
pub fn deserialize_TOML<T>(reader: &mut T) -> Result<DaqConfig> where T: std::io::Read {
let mut read_str = vec![];
reader.read_to_end(&mut read_str)?;
let read_str = String::from_utf8(read_str)?;
DaqConfig::deserialize_TOML_str(&read_str)
}
/// Deserialize from TOML string
///
/// # Args
///
/// * st: string containing TOML data.
pub fn deserialize_TOML_str(st: &String) -> Result<DaqConfig> {
let res : DaqConfig = toml::from_str(&st)?;
Ok(res)
}
/// Write this configuration to a TOML file.
///
/// Args
///
/// * file: Name of file to write to
///
pub fn serialize_TOML_file(&self, file: &PathBuf) -> Result<()> {
let mut file = std::fs::File::create(file)?;
self.serialize_TOML(&mut file)?;
Ok(())
}
/// Returns a list of enabled input channel numbers as indices /// Returns a list of enabled input channel numbers as indices
/// in the list of all input channels (enabled and not) /// in the list of all input channels (enabled and not)
pub fn enabledInchannelsList(&self) -> Vec<usize> { pub fn enabledInchannelsList(&self) -> Vec<usize> {
@ -89,10 +179,7 @@ impl DaqConfig {
/// Returns the total number of channels that appear in a running input stream. /// Returns the total number of channels that appear in a running input stream.
pub fn numberEnabledInChannels(&self) -> usize { pub fn numberEnabledInChannels(&self) -> usize {
self.inchannel_config self.inchannel_config.iter().filter(|ch| ch.enabled).count()
.iter()
.filter(|ch| ch.enabled)
.count()
} }
/// Returns the total number of channels that appear in a running output stream. /// Returns the total number of channels that appear in a running output stream.
pub fn numberEnabledOutChannels(&self) -> usize { pub fn numberEnabledOutChannels(&self) -> usize {
@ -108,7 +195,7 @@ impl DaqConfig {
} }
/// Provide samplerate, based on device and specified sample rate index /// Provide samplerate, based on device and specified sample rate index
pub fn framesPerBlock(&self, dev: &DeviceInfo) -> u32 { pub fn framesPerBlock(&self, dev: &DeviceInfo) -> usize {
dev.avFramesPerBlock[self.framesPerBlockIndex] dev.avFramesPerBlock[self.framesPerBlockIndex]
} }

View File

@ -14,7 +14,7 @@ pub struct DeviceInfo {
pub api: StreamApiDescr, pub api: StreamApiDescr,
/// Name for the device. /// Name for the device.
pub name: String, pub device_name: String,
/// Available data types for the sample /// Available data types for the sample
pub avDataTypes: Vec<DataType>, pub avDataTypes: Vec<DataType>,
@ -22,7 +22,7 @@ pub struct DeviceInfo {
pub prefDataType: DataType, pub prefDataType: DataType,
/// Available frames per block /// Available frames per block
pub avFramesPerBlock: Vec<u32>, pub avFramesPerBlock: Vec<usize>,
/// Preferred frames per block for device /// Preferred frames per block for device
pub prefFramesPerBlock: usize, pub prefFramesPerBlock: usize,
@ -63,4 +63,3 @@ pub struct DeviceInfo {
/// such a Volts. /// such a Volts.
pub physicalIOQty: Qty, pub physicalIOQty: Qty,
} }

View File

@ -20,7 +20,7 @@ pub use streammsg::*;
#[cfg(feature = "record")] #[cfg(feature = "record")]
pub use record::*; pub use record::*;
#[cfg(feature = "cpal_api")] #[cfg(feature = "cpal-api")]
use api::api_cpal::CpalApi; use api::api_cpal::CpalApi;
use crate::{ use crate::{
@ -39,6 +39,15 @@ use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread}; use std::thread::{JoinHandle, Thread};
use streammsg::*; use streammsg::*;
use self::api::StreamApiDescr;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, types::PyModule, PyResult};
} else {} }
/// Keep track of whether the stream has been created. To ensure singleton behaviour. /// Keep track of whether the stream has been created. To ensure singleton behaviour.
static smgr_created: AtomicBool = AtomicBool::new(false); static smgr_created: AtomicBool = AtomicBool::new(false);
@ -49,16 +58,20 @@ struct StreamData<T> {
comm: Sender<StreamCommand>, comm: Sender<StreamCommand>,
} }
#[cfg_attr(feature = "python-bindings", pyclass(unsendable))]
/// Configure and manage input / output streams. /// Configure and manage input / output streams.
/// ///
pub struct StreamMgr { pub struct StreamMgr {
// List of available devices
devs: Vec<DeviceInfo>,
// Input stream can be both input and duplex // Input stream can be both input and duplex
input_stream: Option<StreamData<InQueues>>, input_stream: Option<StreamData<InQueues>>,
// Output only stream // Output only stream
output_stream: Option<StreamData<Siggen>>, output_stream: Option<StreamData<Siggen>>,
#[cfg(feature = "cpal_api")] #[cfg(feature = "cpal-api")]
cpal_api: CpalApi, cpal_api: CpalApi,
/// The storage of queues. When no streams are running, they /// The storage of queues. When no streams are running, they
@ -85,16 +98,19 @@ impl StreamMgr {
} }
smgr_created.store(true, std::sync::atomic::Ordering::Relaxed); smgr_created.store(true, std::sync::atomic::Ordering::Relaxed);
StreamMgr { let mut smgr = StreamMgr {
devs: vec![],
input_stream: None, input_stream: None,
output_stream: None, output_stream: None,
siggen: None, siggen: None,
#[cfg(feature = "cpal_api")] #[cfg(feature = "cpal-api")]
cpal_api: CpalApi::new(), cpal_api: CpalApi::new(),
instreamqueues: Some(vec![]), instreamqueues: Some(vec![]),
} };
smgr.devs = smgr.scanDeviceInfo();
smgr
} }
/// Set a new signal generator. Returns an error if it is unapplicable. /// Set a new signal generator. Returns an error if it is unapplicable.
/// It is unapplicable if the number of channels of output does not match the /// It is unapplicable if the number of channels of output does not match the
@ -125,9 +141,13 @@ impl StreamMgr {
} }
/// Obtain a list of devices that are available for each available API /// Obtain a list of devices that are available for each available API
pub fn getDeviceInfo(&mut self) -> Vec<DeviceInfo> { pub fn getDeviceInfo(&mut self) -> &Vec<DeviceInfo> {
&self.devs
}
fn scanDeviceInfo(&self) -> Vec<DeviceInfo> {
let mut devinfo = vec![]; let mut devinfo = vec![];
#[cfg(feature = "cpal_api")] #[cfg(feature = "cpal-api")]
{ {
let cpal_devs = self.cpal_api.getDeviceInfo(); let cpal_devs = self.cpal_api.getDeviceInfo();
if let Ok(devs) = cpal_devs { if let Ok(devs) = cpal_devs {
@ -136,6 +156,7 @@ impl StreamMgr {
} }
devinfo devinfo
} }
/// Add a new queue to the lists of queues /// Add a new queue to the lists of queues
pub fn addInQueue(&mut self, tx: Sender<InStreamMsg>) { pub fn addInQueue(&mut self, tx: Sender<InStreamMsg>) {
if let Some(is) = &self.input_stream { if let Some(is) = &self.input_stream {
@ -182,7 +203,6 @@ impl StreamMgr {
} }
StreamCommand::NewSiggen(_) => { StreamCommand::NewSiggen(_) => {
panic!("Error: signal generator send to input-only stream."); panic!("Error: signal generator send to input-only stream.");
break 'infy;
} }
} }
} }
@ -191,16 +211,66 @@ impl StreamMgr {
let msg = Arc::new(msg); let msg = Arc::new(msg);
let msg = InStreamMsg::RawStreamData(ctr, msg); let msg = InStreamMsg::RawStreamData(ctr, msg);
sendMsgToAllQueues(&mut iqueues, msg); sendMsgToAllQueues(&mut iqueues, msg);
ctr += 1;
} }
ctr += 1;
} }
iqueues iqueues
}); });
(threadhandle, commtx) (threadhandle, commtx)
} }
fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> {
for d in self.devs.iter() {
if d.device_name == cfg.device_name {
return Some(d);
}
}
None
}
/// Start a stream of certain type, using given configuration
pub fn startStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
if self.input_stream.is_some() {
bail!("Input stream is already running. Please first stop existing input stream.")
}
match stype {
StreamType::Input | StreamType::Duplex => {
if cfg.numberEnabledInChannels() == 0 {
bail!("At least one input channel should be enabled for an input stream")
}
}
_ => {}
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
let stream = match cfg.api {
StreamApiDescr::Cpal => {
let devinfo = self
.match_devinfo(cfg)
.ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
self.cpal_api.startStream(stype, devinfo, cfg, tx)?
}
_ => bail!("Unimplemented api!"),
};
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata().unwrap();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(Arc::new(meta)));
let (threadhandle, commtx) = self.startInputStreamThread(&stream, rx);
self.input_stream = Some(StreamData {
streamtype: stype,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
/// Start a default input stream, using default settings on everything. This is only possible /// Start a default input stream, using default settings on everything. This is only possible
/// when /// when the CPAL_api is available
pub fn startDefaultInputStream(&mut self) -> Result<()> { pub fn startDefaultInputStream(&mut self) -> Result<()> {
if self.input_stream.is_some() { if self.input_stream.is_some() {
bail!("Input stream is already running. Please first stop existing input stream.") bail!("Input stream is already running. Please first stop existing input stream.")
@ -210,7 +280,7 @@ impl StreamMgr {
// Only a default input stream when CPAL feature is enabled // Only a default input stream when CPAL feature is enabled
cfg_if::cfg_if! { cfg_if::cfg_if! {
if #[cfg(feature="cpal_api")] { if #[cfg(feature="cpal-api")] {
let stream = self.cpal_api.startDefaultInputStream(tx)?; let stream = self.cpal_api.startDefaultInputStream(tx)?;
// Inform all listeners of new stream data // Inform all listeners of new stream data

View File

@ -1,7 +1,10 @@
use super::*; use super::*;
use anyhow::{bail, Error, Result}; use anyhow::{bail, Error, Result};
use clap::builder::OsStr;
use crossbeam::atomic::AtomicCell;
use hdf5::types::{VarLenArray, VarLenUnicode}; use hdf5::types::{VarLenArray, VarLenUnicode};
use hdf5::{dataset, datatype, Dataset, File, H5Type}; use hdf5::{dataset, datatype, Dataset, File, H5Type};
use ndarray::ArrayView2;
use num::traits::ops::mul_add; use num::traits::ops::mul_add;
use serde::de::IntoDeserializer; use serde::de::IntoDeserializer;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -11,15 +14,23 @@ use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use strum::EnumMessage; use strum::EnumMessage;
#[derive(Clone)] #[derive(Clone, Debug)]
/// Status of a recording
pub enum RecordStatus { pub enum RecordStatus {
/// Nothing to update
NoUpdate,
/// Not yet started, waiting for first msg
Idle, Idle,
/// Recording in progress
Recording(Duration), Recording(Duration),
/// Recording finished
Finished, Finished,
/// An error occurred.
Error(String), Error(String),
} }
/// Settings used to start a recording. /// Settings used to start a recording.
#[derive(Clone)]
pub struct RecordSettings { pub struct RecordSettings {
/// File name to record to. /// File name to record to.
pub filename: PathBuf, pub filename: PathBuf,
@ -30,9 +41,11 @@ pub struct RecordSettings {
/// Create a recording /// Create a recording
pub struct Recording { pub struct Recording {
settings: RecordSettings,
handle: Option<JoinHandle<Result<()>>>, handle: Option<JoinHandle<Result<()>>>,
tx: Sender<InStreamMsg>, tx: Sender<InStreamMsg>,
status: Arc<Mutex<RecordStatus>>, status_from_thread: Arc<AtomicCell<RecordStatus>>,
last_status: RecordStatus,
} }
impl Recording { impl Recording {
@ -82,6 +95,45 @@ impl Recording {
Ok(()) Ok(())
} }
#[inline]
fn append_to_dset(
ds: &Dataset,
ctr: usize,
msg: &RawStreamData,
framesPerBlock: usize,
nchannels: usize,
) -> Result<()> {
match msg {
RawStreamData::Datai8(dat) => {
let arr = ndarray::ArrayView2::<i8>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::Datai16(dat) => {
let arr = ndarray::ArrayView2::<i16>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::Datai32(dat) => {
let arr = ndarray::ArrayView2::<i32>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::Dataf32(dat) => {
let arr = ndarray::ArrayView2::<f32>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::Dataf64(dat) => {
let arr = ndarray::ArrayView2::<f64>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::UnknownDataType => {
bail!("Unknown data type!")
}
RawStreamData::StreamError(e) => {
bail!("Stream error: {}", e)
}
}
Ok(())
}
/// Start a new recording /// Start a new recording
/// ///
/// # Arguments /// # Arguments
@ -89,8 +141,26 @@ impl Recording {
/// * setttings: The settings to use for the recording /// * setttings: The settings to use for the recording
/// * smgr: Stream manager to use to start the recording /// * smgr: Stream manager to use to start the recording
/// ///
pub fn new(settings: RecordSettings, mgr: &mut StreamMgr) -> Result<Recording> { pub fn new(mut settings: RecordSettings, mgr: &mut StreamMgr) -> Result<Recording> {
let status = Arc::new(Mutex::new(RecordStatus::Idle)); // Append extension if not yet there
match settings.filename.extension() {
Some(a) if a == OsStr::from("h5") => {}
None | Some(_) => {
settings.filename =
(settings.filename.to_string_lossy().to_string() + ".h5").into();
}
};
// Fail if filename already exists
if settings.filename.exists() {
bail!(
"Filename '{}' already exists in filesystem",
settings.filename.to_string_lossy()
);
}
let settings2 = settings.clone();
let status = Arc::new(AtomicCell::new(RecordStatus::Idle));
let status2 = status.clone(); let status2 = status.clone();
let (tx, rx) = crossbeam::channel::unbounded(); let (tx, rx) = crossbeam::channel::unbounded();
@ -102,7 +172,7 @@ impl Recording {
let firstmsg = match rx.recv() { let firstmsg = match rx.recv() {
Ok(msg) => msg, Ok(msg) => msg,
Err(e) => bail!("Queue handle error"), Err(_) => bail!("Queue handle error"),
}; };
let meta = match firstmsg { let meta = match firstmsg {
@ -125,7 +195,6 @@ impl Recording {
let timestamp = now_utc.timestamp(); let timestamp = now_utc.timestamp();
Recording::write_hdf5_attr_scalar(&file, "time", timestamp)?; Recording::write_hdf5_attr_scalar(&file, "time", timestamp)?;
// Create UUID for measurement // Create UUID for measurement
use hdf5::types::VarLenUnicode; use hdf5::types::VarLenUnicode;
let uuid = uuid::Uuid::new_v4(); let uuid = uuid::Uuid::new_v4();
@ -148,15 +217,17 @@ impl Recording {
let ds = Recording::create_dataset(&file, &meta)?; let ds = Recording::create_dataset(&file, &meta)?;
// Indicate we are ready to rec! // Indicate we are ready to rec!
*status.lock().unwrap() = RecordStatus::Recording(Duration::ZERO); status.store(RecordStatus::Recording(Duration::ZERO));
let mut ctr = 0; let mut ctr = 0;
let mut ctr_offset = 0;
let mut first = true;
let framesPerBlock = meta.framesPerBlock as usize; let framesPerBlock = meta.framesPerBlock as usize;
let nchannels = meta.nchannels() as usize; let nchannels = meta.nchannels() as usize;
'recloop: loop { 'recloop: loop {
match rx.recv().unwrap() { match rx.recv().unwrap() {
InStreamMsg::StreamError(e) => { InStreamMsg::StreamError(e) => {
bail!("Recording failed due to stream error.") bail!("Recording failed due to stream error: {}.", e)
} }
InStreamMsg::ConvertedStreamData(..) => {} InStreamMsg::ConvertedStreamData(..) => {}
InStreamMsg::StreamStarted(_) => { InStreamMsg::StreamStarted(_) => {
@ -167,19 +238,24 @@ impl Recording {
break 'recloop; break 'recloop;
} }
InStreamMsg::RawStreamData(incoming_ctr, dat) => { InStreamMsg::RawStreamData(incoming_ctr, dat) => {
// if incoming_ctr != ctr { if first {
// bail!("Packages missed. Recording invalid.") first = false;
// } ctr_offset = incoming_ctr;
} else {
let tst = ndarray::Array2::<f32>::ones((framesPerBlock, nchannels)); if incoming_ctr != ctr + ctr_offset {
println!("********** PACKAGES MISSED ***********");
bail!("Packages missed. Recording invalid.")
}
}
ds.resize((ctr + 1, framesPerBlock, nchannels))?; ds.resize((ctr + 1, framesPerBlock, nchannels))?;
ds.write_slice(&tst, (ctr, .., ..))?; Recording::append_to_dset(
&ds,
ctr,
dat.as_ref(),
framesPerBlock,
nchannels,
)?;
// match dat {
// RawStreamData::Datai8(d) => ds.
// }
let recorded_time = Duration::from_millis( let recorded_time = Duration::from_millis(
((1000 * (ctr + 1) * framesPerBlock) as Flt / meta.samplerate) as u64, ((1000 * (ctr + 1) * framesPerBlock) as Flt / meta.samplerate) as u64,
); );
@ -188,21 +264,23 @@ impl Recording {
break 'recloop; break 'recloop;
} }
} }
// println!("... {}", recorded_time.as_millis()); // println!("\n... {} {} {}", recorded_time.as_millis(), meta.samplerate, framesPerBlock);
ctr += 1; ctr += 1;
*status.lock().unwrap() = RecordStatus::Recording(recorded_time); status.store(RecordStatus::Recording(recorded_time));
} }
} }
} // end of loop } // end of loop
*status.lock().unwrap() = RecordStatus::Finished; status.store(RecordStatus::Finished);
Ok(()) Ok(())
// End of thread // End of thread
}); });
Ok(Recording { Ok(Recording {
settings: settings2,
handle: Some(handle), handle: Some(handle),
status: status2, status_from_thread: status2,
last_status: RecordStatus::NoUpdate,
tx, tx,
}) })
} }
@ -215,15 +293,32 @@ impl Recording {
let h = self.handle.take().unwrap(); let h = self.handle.take().unwrap();
let res = h.join().unwrap(); let res = h.join().unwrap();
if let Err(e) = res { if let Err(e) = res {
*self.status.lock().unwrap() = RecordStatus::Error(format!("{}", e)); self.last_status = RecordStatus::Error(format!("{}", e));
// File should not be un use anymore, as thread is joined.
// In case of error, we try to delete the file
if let Err(e) = std::fs::remove_file(&self.settings.filename) {
eprintln!("Recording failed, but file removal failed as well: {}", e);
}
} }
} }
} }
} }
/// Get current record status
pub fn status(&mut self) -> RecordStatus { pub fn status(&mut self) -> RecordStatus {
let status_from_thread = self.status_from_thread.swap(RecordStatus::NoUpdate);
match status_from_thread {
RecordStatus::NoUpdate => {}
_ => {
// println!("Updating status to: {:?}", status_from_thread);
self.last_status = status_from_thread;
}
}
// If the thread has exited with an error, the status is overwritten
// in this method.
self.cleanupThreadIfPossible(); self.cleanupThreadIfPossible();
self.status.lock().unwrap().clone() // Return latest status
self.last_status.clone()
} }
/// Stop existing recording early. At the current time, or st /// Stop existing recording early. At the current time, or st
@ -238,7 +333,7 @@ impl Recording {
let h = self.handle.take().unwrap(); let h = self.handle.take().unwrap();
let res = h.join().unwrap(); let res = h.join().unwrap();
if let Err(e) = res { if let Err(e) = res {
*self.status.lock().unwrap() = RecordStatus::Error(format!("{}", e)); self.last_status = RecordStatus::Error(format!("{}", e));
} }
Ok(()) Ok(())

View File

@ -8,8 +8,15 @@ use reinterpret::{reinterpret_slice, reinterpret_vec};
use std::any::TypeId; use std::any::TypeId;
use std::sync::Arc; use std::sync::Arc;
use std::u128::MAX; use std::u128::MAX;
use strum_macros::Display;
use super::*; use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Raw stream data coming from a stream. /// Raw stream data coming from a stream.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -111,7 +118,6 @@ where
/// Stream metadata. All information required for /// Stream metadata. All information required for
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct StreamMetaData { pub struct StreamMetaData {
/// Information for each channel in the stream /// Information for each channel in the stream
pub channelInfo: Vec<DaqChannel>, pub channelInfo: Vec<DaqChannel>,
@ -122,14 +128,19 @@ pub struct StreamMetaData {
pub samplerate: Flt, pub samplerate: Flt,
/// The number of frames per block send over /// The number of frames per block send over
pub framesPerBlock: u32, pub framesPerBlock: usize,
} }
impl StreamMetaData { impl StreamMetaData {
/// Create new metadata object. /// Create new metadata object.
/// /// /// ///
/// # Args /// # Args
/// ///
pub fn new(channelInfo: &[DaqChannel], rawdtype: DataType, sr: Flt, framesPerBlock: u32) -> Result<StreamMetaData> { pub fn new(
channelInfo: &[DaqChannel],
rawdtype: DataType,
sr: Flt,
framesPerBlock: usize,
) -> Result<StreamMetaData> {
Ok(StreamMetaData { Ok(StreamMetaData {
channelInfo: channelInfo.to_vec(), channelInfo: channelInfo.to_vec(),
rawDatatype: rawdtype, rawDatatype: rawdtype,
@ -139,7 +150,9 @@ impl StreamMetaData {
} }
/// Returns the number of channels in the stream metadata. /// Returns the number of channels in the stream metadata.
pub fn nchannels(&self) -> usize {self.channelInfo.len()} pub fn nchannels(&self) -> usize {
self.channelInfo.len()
}
} }
/// Input stream messages, to be send to handlers. /// Input stream messages, to be send to handlers.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -184,7 +197,8 @@ pub enum StreamCommand {
/// Stream types that can be started /// Stream types that can be started
/// ///
#[derive(PartialEq, Clone)] #[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(PartialEq, Clone, Copy)]
pub enum StreamType { pub enum StreamType {
/// Input-only stream /// Input-only stream
Input, Input,
@ -195,13 +209,16 @@ pub enum StreamType {
} }
/// Errors that happen in a stream /// Errors that happen in a stream
#[derive(strum_macros::EnumMessage, Debug, Clone)] #[derive(strum_macros::EnumMessage, Debug, Clone, Display)]
pub enum StreamError { pub enum StreamError {
/// Input overrun /// Input overrun
#[strum(message = "InputXRunError", detailed_message = "Input buffer overrun")] #[strum(message = "InputXRunError", detailed_message = "Input buffer overrun")]
InputXRunError, InputXRunError,
/// Output underrun /// Output underrun
#[strum(message = "OutputXRunError", detailed_message = "Output buffer overrun")] #[strum(
message = "OutputXRunError",
detailed_message = "Output buffer overrun"
)]
OutputXRunError, OutputXRunError,
/// Driver specific error /// Driver specific error
#[strum(message = "DriverError", detailed_message = "Driver error")] #[strum(message = "DriverError", detailed_message = "Driver error")]
@ -213,5 +230,5 @@ pub enum StreamError {
/// Logic error (something weird happened) /// Logic error (something weird happened)
#[strum(detailed_message = "Logic error")] #[strum(detailed_message = "Logic error")]
LogicError LogicError,
} }

View File

@ -17,8 +17,14 @@ pub mod filter;
pub mod daq; pub mod daq;
pub mod siggen; pub mod siggen;
#[cfg(feature = "python-bindings")] pub use config::*;
use pyo3::prelude::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::prelude::*;
use pyo3::{pymodule, PyResult};
} else {} }
/// A Python module implemented in Rust. /// A Python module implemented in Rust.
#[cfg(feature = "python-bindings")] #[cfg(feature = "python-bindings")]