Default output stream working

This commit is contained in:
Anne de Jong 2024-04-18 18:38:35 +02:00
parent 18b61b02f3
commit 35d5d7f750
20 changed files with 1578 additions and 796 deletions

1
.gitignore vendored
View File

@ -4,3 +4,4 @@ __pycache__
python/lasprs/_lasprs*
.venv
.vscode/launch.json
.vscode

View File

@ -1,6 +1,6 @@
[package]
name = "lasprs"
version = "0.2.2"
version = "0.3.0"
edition = "2021"
authors = ["J.A. de Jong <j.a.dejong@ascee.nl>"]
description = "Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)"
@ -37,7 +37,7 @@ rand = "0.8.5"
rand_distr = "0.4.3"
# Cross-platform audio lib
cpal = { version = "0.15.2", optional = true }
cpal = { version = "0.15.3", optional = true }
# Nice enumerations
strum = "0.25.0"

View File

@ -6,7 +6,8 @@ use lasprs::daq::{DaqConfig, StreamMgr};
#[derive(Parser, Debug)]
#[command(author, version, about="Generates DAQ configurations for available devices.", long_about = None)]
struct Args {
/// Name of the person to greet
/// Devices to match. Search for these substrings in device names. Only
/// configurations are output based on these names.
#[arg(short, long)]
matches: Vec<String>,
}
@ -16,18 +17,26 @@ fn main() -> Result<()> {
let write_all = args.matches.len() == 0;
let mut smgr = StreamMgr::new();
// Obtain list of devices
let devs = smgr.getDeviceInfo();
// Iterate over them
for dev in devs.iter() {
// The file name will be the device name, plus toml extension
let filename = dev.device_name.clone() + ".toml";
// If no device name strings are given, we are outputting them all to a file.
if write_all {
let daqconfig = DaqConfig::newFromDeviceInfo(&dev);
daqconfig.serialize_TOML_file(&filename.clone().into())?;
} else {
// See if we find the name in the match list.
for m in args.matches.iter() {
let needle =m.to_lowercase();
let needle = m.to_lowercase();
let dev_lower = (&dev.device_name).to_lowercase();
if dev_lower.contains(&needle) {
DaqConfig::newFromDeviceInfo(&dev).serialize_TOML_file(&filename.clone().into())?;
DaqConfig::newFromDeviceInfo(&dev)
.serialize_TOML_file(&filename.clone().into())?;
}
}
}

View File

@ -0,0 +1,66 @@
use anyhow::Result;
use crossbeam::channel::{unbounded, Receiver, TryRecvError};
use lasprs::daq::{InStreamMsg, StreamHandler, StreamMgr, StreamStatus, StreamType};
use lasprs::siggen::Siggen;
use std::io;
use std::{thread, time};
// use
fn spawn_stdin_channel() -> Receiver<String> {
let (tx, rx) = unbounded();
thread::spawn(move || 'tt: loop {
let mut buffer = String::new();
io::stdin().read_line(&mut buffer).unwrap();
if let Err(_) = tx.send(buffer) {
break 'tt;
}
});
rx
}
fn sleep(millis: u64) {
let duration = time::Duration::from_millis(millis);
thread::sleep(duration);
}
fn main() -> Result<()> {
let mut smgr = StreamMgr::new();
println!("Starting stream...");
smgr.startDefaultOutputStream()?;
let stdin_channel = spawn_stdin_channel();
println!("Creating signal generator...");
let mut siggen = Siggen::newSineWave(2, 100.);
siggen.setDCOffset(&[0.1, 0.]);
// let mut siggen = Siggen::newWhiteNoise(2);
siggen.setAllGains(0.1);
// siggen.setMute(&[true, true]);
// siggen.setMute(&[true, false]);
// siggen.setAllMute(false);
smgr.setSiggen(siggen)?;
'infy: loop {
match stdin_channel.try_recv() {
Ok(_key) => break 'infy,
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
sleep(1000);
match smgr.getStatus(StreamType::Output) {
StreamStatus::NotRunning => {
println!("Stream is not running?");
break 'infy;
}
StreamStatus::Running => {
println!("Stream is running...");
}
StreamStatus::Error(e) => {
println!("Stream error: {}", e);
break 'infy;
}
}
// let stat = smgr.
}
Ok(())
}

View File

@ -48,8 +48,10 @@ fn main() -> Result<()> {
startDelay: Duration::from_secs(ops.start_delay_s as u64),
};
match ops.config_file_daq {
// No config file is given, start default input stream
None => smgr.startDefaultInputStream()?,
Some(filename) => {
// If config file is given, use that.
let file = std::fs::read_to_string(filename)?;
let cfg = DaqConfig::deserialize_TOML_str(&file)?;
smgr.startStream(StreamType::Input, &cfg)?;
@ -66,7 +68,9 @@ fn main() -> Result<()> {
println!("\nRecord error: {}", e);
break 'infy;
}
RecordStatus::Waiting => { println!("Waiting in start delay...");},
RecordStatus::Waiting => {
println!("Waiting in start delay...");
}
RecordStatus::Finished => {
println!("\nRecording finished.");
break 'infy;
@ -81,8 +85,8 @@ fn main() -> Result<()> {
Ok(_key) => {
println!("User pressed key. Manually stopping recording here.");
match _key.to_lowercase().as_str() {
"c" => r.cancel(),
_ => r.stop()
"c" => r.cancel(),
_ => r.stop(),
}
break 'infy;
}

View File

@ -1,44 +1,43 @@
#![allow(dead_code)]
use super::Stream;
use crate::config::{self, *};
use crate::daq::daqconfig::{DaqChannel, DaqConfig};
use crate::daq::deviceinfo::DeviceInfo;
use crate::daq::{self, streammsg::*, DataType};
use crate::siggen::Siggen;
use crate::daq::{self, *};
use anyhow::{bail, Result};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{Device, Host, Sample, SampleFormat, SupportedBufferSize};
use crossbeam::atomic::AtomicCell;
use crossbeam::channel::{Receiver, Sender};
use itertools::Itertools;
use num::ToPrimitive;
use reinterpret::reinterpret_slice;
use std::any::{Any, TypeId};
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
/// Convert datatype in CPAL sampleformat
/// Convert CPAL sampleformat datatype
impl From<DataType> for cpal::SampleFormat {
fn from(dt: DataType) -> cpal::SampleFormat {
let sf = match dt {
match dt {
DataType::F64 => SampleFormat::F64,
DataType::F32 => SampleFormat::F32,
DataType::I8 => SampleFormat::I8,
DataType::I16 => SampleFormat::I16,
DataType::I32 => SampleFormat::I32,
DataType::I64 => SampleFormat::I64,
};
sf
}
}
}
// Convert datatype to CPAL sample format
impl From<cpal::SampleFormat> for DataType {
fn from(sf: cpal::SampleFormat) -> DataType {
let dt = match sf {
match sf {
SampleFormat::F64 => DataType::F64,
SampleFormat::F32 => DataType::F32,
SampleFormat::I8 => DataType::I8,
SampleFormat::I16 => DataType::I16,
SampleFormat::I32 => DataType::I32,
SampleFormat::I64 => DataType::I64,
_ => panic!("Not implemented sample format: {}", sf),
};
dt
}
}
}
@ -48,22 +47,23 @@ pub struct CpalApi {
}
pub struct CpalStream {
stream: cpal::Stream,
md: Option<StreamMetaData>,
md: Arc<StreamMetaData>,
noutchannels: usize,
status: Arc<AtomicCell<StreamStatus>>,
}
impl Stream for CpalStream {
fn metadata(&self) -> Option<StreamMetaData> {
fn metadata(&self) -> Arc<StreamMetaData> {
self.md.clone()
}
fn ninchannels(&self) -> usize {
if let Some(md) = &self.md {
return md.nchannels();
}
0
self.md.nchannels()
}
fn noutchannels(&self) -> usize {
self.noutchannels
}
fn status(&self) -> StreamStatus {
self.status.load()
}
}
impl CpalApi {
@ -93,7 +93,7 @@ impl CpalApi {
let mut iChannelCount = 0;
let mut oChannelCount = 0;
let mut sample_rates = srs_tot.clone();
let mut avSampleRates = srs_tot.clone();
let mut avFramesPerBlock = vec![256 as usize, 512, 1024, 2048, 8192];
let mut sample_formats = vec![];
@ -105,8 +105,8 @@ impl CpalApi {
continue;
}
sample_formats.push(icfg.sample_format());
sample_rates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt);
sample_rates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt);
if let SupportedBufferSize::Range { min, max } = icfg.buffer_size() {
avFramesPerBlock.retain(|i| i >= &(*min as usize));
avFramesPerBlock.retain(|i| i <= &(*max as usize));
@ -122,8 +122,8 @@ impl CpalApi {
continue;
}
sample_formats.push(thissf);
sample_rates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt);
sample_rates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt);
if let SupportedBufferSize::Range { min, max } = ocfg.buffer_size() {
avFramesPerBlock.retain(|i| i >= &(*min as usize));
avFramesPerBlock.retain(|i| i <= &(*max as usize));
@ -143,15 +143,15 @@ impl CpalApi {
Some(idx) => dtypes[idx],
None => dtypes[dtypes.len() - 1],
};
let prefSampleRate = *sample_rates.last().unwrap_or(&48000.);
let prefSampleRate = *avSampleRates.last().unwrap_or(&48000.);
devs.push(DeviceInfo {
api: super::StreamApiDescr::Cpal,
device_name: dev.name()?,
avDataTypes: dtypes,
prefDataType,
avSampleRates: sample_rates,
prefSampleRate: prefSampleRate,
avSampleRates,
prefSampleRate,
avFramesPerBlock,
prefFramesPerBlock: 2048,
@ -163,7 +163,7 @@ impl CpalApi {
hasInputTrigger: false,
hasInternalOutputMonitor: false,
duplexModeForced: false,
physicalIOQty: daq::Qty::Number,
physicalIOQty: Qty::Number,
})
}
@ -171,18 +171,71 @@ impl CpalApi {
}
// Create the error function closure, that capture the send channel on which error messages from the stream are sent
fn create_errfcn(send_ch: Sender<RawStreamData>) -> impl FnMut(cpal::StreamError) {
let errfn = move |err: cpal::StreamError| match err {
cpal::StreamError::DeviceNotAvailable => send_ch
.send(RawStreamData::StreamError(StreamError::DeviceNotAvailable))
.unwrap(),
cpal::StreamError::BackendSpecific { err: _ } => send_ch
.send(RawStreamData::StreamError(StreamError::DriverError))
.unwrap(),
fn create_errfcn(
send_ch: Option<Sender<RawStreamData>>,
status: Arc<AtomicCell<StreamStatus>>,
) -> impl FnMut(cpal::StreamError) {
let errfn = move |err: cpal::StreamError| {
let serr = match err {
cpal::StreamError::DeviceNotAvailable => StreamError::DeviceNotAvailable,
cpal::StreamError::BackendSpecific { err: _ } => StreamError::DriverError,
};
if let Some(sender) = &send_ch {
sender.send(RawStreamData::StreamError(serr)).unwrap();
}
status.store(StreamStatus::Error(serr));
};
errfn
}
fn create_incallback<T>(
config: &cpal::StreamConfig,
sender: Sender<RawStreamData>,
framesPerBlock: usize,
en_inchannels: Vec<usize>,
) -> impl FnMut(&[T], &cpal::InputCallbackInfo)
where
T: 'static + Sample + ToPrimitive,
{
let tot_inch = config.channels as usize;
let mut q = VecDeque::<T>::with_capacity(2 * tot_inch * framesPerBlock);
let mut enabled_ch_data: Vec<T> =
vec![Sample::EQUILIBRIUM; en_inchannels.len() * framesPerBlock];
// The actual callback that is returned
move |input: &[T], _: &cpal::InputCallbackInfo| {
// Copy elements over in ring buffer
q.extend(input);
while q.len() > tot_inch * framesPerBlock {
// println!("q full enough: {}", q.len());
// // Loop over enabled channels
for (i, ch) in en_inchannels.iter().enumerate() {
let in_iterator = q.iter().skip(*ch).step_by(tot_inch);
let out_iterator = enabled_ch_data
.iter_mut()
.skip(i)
.step_by(en_inchannels.len());
// Copy over elements, *DEINTERLEAVED*
out_iterator.zip(in_iterator).for_each(|(o, i)| {
*o = *i;
});
}
// Drain copied elements from ring buffer
q.drain(0..framesPerBlock * tot_inch);
// Send over data
let msg = RawStreamData::from(enabled_ch_data.clone());
sender.send(msg).unwrap()
}
}
}
/// Create an input stream for a CPAL device.
///
/// # Arguments
@ -195,32 +248,21 @@ impl CpalApi {
sender: Sender<RawStreamData>,
en_inchannels: Vec<usize>,
framesPerBlock: usize,
) -> Result<cpal::Stream> {
let tot_inch = config.channels as usize;
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning));
let sender_err = sender.clone();
let errfcn = CpalApi::create_errfcn(Some(sender.clone()), status.clone());
macro_rules! build_stream{
($($cpaltype:pat, $rtype:ty);*) => {
($($cpaltype:pat => $rtype:ty),*) => {
match sf {
$(
$cpaltype => {
let mut q = VecDeque::<$rtype>::with_capacity(2*tot_inch*framesPerBlock);
let icb = CpalApi::create_incallback::<$rtype>(&config, sender, framesPerBlock, en_inchannels);
device.build_input_stream(
&config,
move |data, _: &_| InStreamCallback::<$rtype>(
data, &sender,
// Total number of input channels. This API has to filter out
// the channels that are not enabled
tot_inch,
// Vector of channels numbers that are enabled
&en_inchannels,
// Frames per block
framesPerBlock,
// Ring buffer for storage of samples as required.
&mut q),
CpalApi::create_errfcn(sender_err),
icb,
errfcn,
None)?
}),*,
_ => bail!("Unsupported sample format '{}'", sf)
@ -228,12 +270,106 @@ impl CpalApi {
}
}
let stream: cpal::Stream = build_stream!(
SampleFormat::I8, i8;
SampleFormat::I16, i16;
SampleFormat::I32, i32;
SampleFormat::F32, f32
SampleFormat::I8 => i8,
SampleFormat::I16 => i16,
SampleFormat::I32 => i32,
SampleFormat::F32 => f32
);
Ok(stream)
Ok((stream, status))
}
fn create_outcallback<T>(
config: &cpal::StreamConfig,
streamstatus: Arc<AtomicCell<StreamStatus>>,
receiver: Receiver<RawStreamData>,
framesPerBlock: usize,
) -> impl FnMut(&mut [T], &cpal::OutputCallbackInfo)
where
T: 'static + Sample + Debug,
{
let tot_outch: usize = config.channels as usize;
// println!("Numer of channels: {:?}", tot_outch);
let mut callback_ctr: usize = 0;
let mut q = VecDeque::<T>::with_capacity(2 * tot_outch * framesPerBlock);
move |data, _info: &_| {
let nsamples_asked = data.len();
let status = streamstatus.load();
callback_ctr += 1;
let mut setToEquilibrium = || data.iter_mut().for_each(|v| *v = Sample::EQUILIBRIUM);
match status {
StreamStatus::NotRunning | StreamStatus::Error(_) => {
setToEquilibrium();
return;
}
_ => {}
}
if q.len() < nsamples_asked {
// Obtain new samples from the generator
for dat in receiver.try_iter() {
let slice = dat.getRef::<T>();
if let StreamStatus::Running = status {
q.extend(slice);
}
}
}
if q.len() >= nsamples_asked {
// All right, we have enough samples to send out! They are
// drained from the queue
data.iter_mut()
.zip(q.drain(..nsamples_asked))
.for_each(|(o, i)| *o = i);
} else if callback_ctr <= 2 {
// For the first two blocks, we allow dat the data is not yet
// ready, without complaining on underruns
setToEquilibrium();
} else {
// Output buffer underrun
streamstatus.store(StreamStatus::Error(StreamError::OutputUnderrunError));
setToEquilibrium();
}
}
}
fn build_output_stream(
sf: cpal::SampleFormat,
config: &cpal::StreamConfig,
device: &cpal::Device,
receiver: Receiver<RawStreamData>,
framesPerBlock: usize,
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
// let tot_ch = config.channels as usize;
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning));
let err_cb = CpalApi::create_errfcn(None, status.clone());
macro_rules! build_stream{
($($cpaltype:pat => $rtype:ty),*) => {
match sf {
$(
$cpaltype => {
let outcallback = CpalApi::create_outcallback::<$rtype>(config, status.clone(), receiver, framesPerBlock);
device.build_output_stream(
&config,
outcallback,
err_cb,
None)?
}),*,
_ => bail!("Unsupported sample format '{}'", sf)
}
}
}
let stream: cpal::Stream = build_stream!(
SampleFormat::I8 => i8,
SampleFormat::I16 => i16,
SampleFormat::I32 => i32,
SampleFormat::F32 => f32
);
Ok((stream, status))
}
/// Create CPAL specific configuration, from our specified daq config and device info
@ -285,7 +421,7 @@ impl CpalApi {
}
/// Start a stream for a device with a given configuration.
pub fn startStream(
pub fn startInputStream(
&self,
stype: StreamType,
devinfo: &DeviceInfo,
@ -316,37 +452,31 @@ impl CpalApi {
let sf = supported_config.sample_format();
let config: cpal::StreamConfig = supported_config.config();
let (stream, metadata) = match stype {
StreamType::Input => {
let meta = StreamMetaData::new(
&conf.enabledInchannelConfig(),
conf.dtype,
supported_config.sample_rate().0 as Flt,
framesPerBlock,
)?;
let meta = StreamMetaData::new(
&conf.enabledInchannelConfig(),
conf.dtype,
supported_config.sample_rate().0 as Flt,
framesPerBlock,
)?;
let meta = Arc::new(meta);
let stream = CpalApi::build_input_stream(
sf,
&config,
&cpaldev,
sender,
conf.enabledInchannelsList(),
framesPerBlock,
)?;
(stream, Some(meta))
}
StreamType::Output => bail!("Not implemented output stream"),
_ => unreachable!(""),
};
let (stream, status) = CpalApi::build_input_stream(
sf,
&config,
&cpaldev,
sender,
conf.enabledInchannelsList(),
framesPerBlock,
)?;
stream.play()?;
status.store(StreamStatus::Running);
let noutchannels = conf.numberEnabledOutChannels();
return Ok(Box::new(CpalStream {
stream,
md: metadata,
noutchannels,
md: meta,
noutchannels: 0,
status,
}));
}
bail!(format!(
@ -373,7 +503,7 @@ impl CpalApi {
let en_inchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize));
let sf = config.sample_format();
let stream = CpalApi::build_input_stream(
let (stream, status) = CpalApi::build_input_stream(
sf,
&final_config,
&device,
@ -382,11 +512,13 @@ impl CpalApi {
framesPerBlock,
)?;
stream.play()?;
status.store(StreamStatus::Running);
// Daq: default channel config
let daqchannels = Vec::from_iter((0..final_config.channels).map(|i| {
DaqChannel::defaultAudioInput(format!("Unnamed input channel {}", i))
}));
let daqchannels = Vec::from_iter(
(0..final_config.channels)
.map(|i| DaqChannel::defaultAudio(format!("Unnamed input channel {}", i))),
);
// Specify data tape
let dtype = DataType::from(sf);
@ -398,10 +530,12 @@ impl CpalApi {
config.sample_rate().0 as Flt,
framesPerBlock,
)?;
let md = Arc::new(md);
Ok(Box::new(CpalStream {
stream,
md: Some(md),
md,
noutchannels: 0,
status,
}))
} else {
bail!("Could not obtain default input configuration")
@ -410,72 +544,71 @@ impl CpalApi {
bail!("Could not open default input device")
}
}
pub fn startDefaultOutputStream(
&self,
receiver: Receiver<RawStreamData>,
) -> Result<Box<dyn Stream>> {
if let Some(device) = self.host.default_output_device() {
if let Ok(config) = device.default_output_config() {
// let framesPerBlock: usize = 256;
// let framesPerBlock: usize = 8192;
let framesPerBlock: usize = config.sample_rate().0 as usize;
// let framesPerBlock: usize = 256;
let final_config = cpal::StreamConfig {
channels: config.channels(),
sample_rate: config.sample_rate(),
buffer_size: cpal::BufferSize::Fixed(framesPerBlock as u32),
};
// let en_outchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize));
let sampleformat = config.sample_format();
let (stream, status) = CpalApi::build_output_stream(
sampleformat,
&final_config,
&device,
receiver,
framesPerBlock,
)?;
stream.play()?;
status.store(StreamStatus::Running);
// Daq: default channel config
let daqchannels =
Vec::from_iter((0..final_config.channels).map(|i| {
DaqChannel::defaultAudio(format!("Unnamed output channel {}", i))
}));
// // Specify data tape
let dtype = DataType::from(sampleformat);
// // Create stream metadata
let md = StreamMetaData::new(
&daqchannels,
dtype,
config.sample_rate().0 as Flt,
framesPerBlock,
)?;
let md = Arc::new(md);
let str = Box::new(CpalStream {
stream,
md,
noutchannels: daqchannels.len(),
status,
});
Ok(str)
} else {
bail!("Could not obtain default output configuration")
} // Default output config is OK
} else {
bail!("Could not open output device")
} // Could not
}
// Create an output stream, using given signal generators for each channel.
// fn build_output_stream(
// sf: cpal::SampleFormat,
// config: cpal::StreamConfig,
// device: &cpal::Device,
// siggens: Vec<Siggen>,
// ) -> Result<cpal::Stream> {
// macro_rules! build_stream{
// ($($cpaltype:pat, $rtype:ty);*) => {
// match sf {
// $(
// $cpaltype => device.build_input_stream(
// &config,
// move |data, _: &_| InStreamCallback::<$rtype>(data, &sender),
// CpalApi::create_errfcn(sender.clone()),
// None)?
// ),*,
// _ => bail!("Unsupported sample format '{}'", sf)
// }
// }
// }
// let stream: cpal::Stream = build_stream!(
// SampleFormat::I8, i8;
// SampleFormat::I16, i16;
// SampleFormat::I32, i32;
// SampleFormat::F32, f32
// );
// Ok(stream)
// }
}
fn InStreamCallback<T>(
input: &[T],
sender: &Sender<RawStreamData>,
tot_inch: usize,
en_inchannels: &[usize],
framesPerBlock: usize,
q: &mut VecDeque<T>,
) where
T: Copy + num::ToPrimitive + 'static,
{
// Copy elements over in ring buffer
q.extend(input);
while q.len() > tot_inch * framesPerBlock {
// println!("q full enough: {}", q.len());
let mut enabled_ch_data: Vec<T> = Vec::with_capacity(en_inchannels.len() * framesPerBlock);
unsafe {
enabled_ch_data.set_len(enabled_ch_data.capacity());
}
// Loop over enabled channels
for (i, ch) in en_inchannels.iter().enumerate() {
let in_iterator = q.iter().skip(*ch).step_by(tot_inch);
let out_iterator = enabled_ch_data.iter_mut().skip(i).step_by(en_inchannels.len());
// Copy over elements, *DEINTERLEAVED*
out_iterator.zip(in_iterator).for_each(|(o, i)| {
*o = *i;
});
}
// Drain copied elements from ring buffer
q.drain(0..framesPerBlock * tot_inch);
// Send over data
let msg = RawStreamData::from(enabled_ch_data);
sender.send(msg).unwrap()
pub fn startOutputStream(&self, rx: Receiver<RawStreamData>) -> Result<Box<dyn Stream>> {
bail!("Not implemented");
}
}

View File

@ -5,24 +5,28 @@ use serde::{Deserialize, Serialize};
/// - ...
use strum::EnumMessage;
use strum_macros;
use std::sync::Arc;
use super::StreamMetaData;
use super::{streamstatus::StreamStatus, StreamMetaData};
#[cfg(feature = "cpal-api")]
pub mod api_cpal;
#[cfg(feature = "pulse_api")]
pub mod api_pulse;
/// A currently running stream
pub trait Stream {
/// Stream metadata. Only available for input streams
fn metadata(&self) -> Option<StreamMetaData>;
fn metadata(&self) -> Arc<StreamMetaData>;
/// Number of input channels in stream
fn ninchannels(&self) -> usize;
/// Number of output channels in stream
fn noutchannels(&self) -> usize;
fn status(&self) -> StreamStatus;
}
#[derive(strum_macros::EnumMessage, Debug, Clone, PartialEq, Serialize, Deserialize)]

View File

@ -1,11 +1,11 @@
use std::{ops::Index, path::PathBuf};
use anyhow::Result;
use super::api::StreamApiDescr;
use super::datatype::DataType;
use super::deviceinfo::DeviceInfo;
use super::qty::Qty;
use crate::config::*;
use anyhow::Result;
use serde::{Deserialize, Serialize};
/// DAQ Configuration for a single channel
@ -41,10 +41,10 @@ impl Default for DaqChannel {
}
impl DaqChannel {
/// Default channel configuration for audio input from a certain channel
pub fn defaultAudioInput(name: String) -> Self {
pub fn defaultAudio(name: String) -> Self {
DaqChannel {
enabled: true,
name: name,
name,
sensitivity: 1.0,
IEPEEnabled: false,
ACCouplingMode: false,
@ -59,21 +59,27 @@ impl DaqChannel {
pub struct DaqConfig {
/// The API
pub api: StreamApiDescr,
/// Device name. Should match when starting a stream
pub device_name: String,
/// Configuration of the input channels
pub inchannel_config: Vec<DaqChannel>,
/// Configuration of the output channels
pub outchannel_config: Vec<DaqChannel>,
/// The data type to use
pub dtype: DataType,
/// Whether to apply a digital high pass on the input. <=0 means disabled. > 0 means, the value specifies the cut-on frequency for the first order high pass filter.
/// Whether to apply a digital high pass on the input. <= 0 means disabled. > 0 means, the value specifies the cut-on frequency for the first order high pass filter.
pub digitalHighPassCutOn: Flt,
/// The index to use in the list of possible sample rates
sampleRateIndex: usize,
/// The index to use in the list of possible frames per block
framesPerBlockIndex: usize,
/// Used when output channels should be monitored, i.e. reverse-looped back as input channels.
monitorOutput: bool,
}
@ -82,7 +88,6 @@ impl DaqConfig {
/// Creates a new default device configuration for a given device as specified with
/// the DeviceInfo descriptor.
pub fn newFromDeviceInfo(devinfo: &DeviceInfo) -> DaqConfig {
let inchannel_config = (0..devinfo.iChannelCount)
.map(|_| DaqChannel::default())
.collect();
@ -94,7 +99,7 @@ impl DaqConfig {
.avSampleRates
.iter()
.position(|x| x == &devinfo.prefSampleRate)
.unwrap_or(devinfo.avSampleRates.len()-1);
.unwrap_or(devinfo.avSampleRates.len() - 1);
// Choose 4096 when in list, otherwise choose the highes available value in list
let framesPerBlockIndex = devinfo
.avFramesPerBlock
@ -122,7 +127,6 @@ impl DaqConfig {
/// * writer: Output writer, can be file or string, or anything that *is* std::io::Write
///
pub fn serialize_TOML(&self, writer: &mut dyn std::io::Write) -> Result<()> {
let ser_str = toml::to_string(&self)?;
writer.write_all(ser_str.as_bytes())?;
@ -134,21 +138,23 @@ impl DaqConfig {
/// # Args
///
/// * reader: implements the Read trait, from which we read the data.
pub fn deserialize_TOML<T>(reader: &mut T) -> Result<DaqConfig> where T: std::io::Read {
pub fn deserialize_TOML<T>(reader: &mut T) -> Result<DaqConfig>
where
T: std::io::Read,
{
let mut read_str = vec![];
reader.read_to_end(&mut read_str)?;
let read_str = String::from_utf8(read_str)?;
DaqConfig::deserialize_TOML_str(&read_str)
}
/// Deserialize from TOML string
///
/// # Args
///
/// * st: string containing TOML data.
pub fn deserialize_TOML_str(st: &String) -> Result<DaqConfig> {
let res : DaqConfig = toml::from_str(&st)?;
let res: DaqConfig = toml::from_str(&st)?;
Ok(res)
}
@ -159,7 +165,6 @@ impl DaqConfig {
/// * file: Name of file to write to
///
pub fn serialize_TOML_file(&self, file: &PathBuf) -> Result<()> {
let mut file = std::fs::File::create(file)?;
self.serialize_TOML(&mut file)?;
Ok(())

View File

@ -23,7 +23,4 @@ pub enum DataType {
/// 32-bit integers
#[strum(message = "I32", detailed_message = "32-bits integers")]
I32 = 4,
/// 64-bit integers
#[strum(message = "I64", detailed_message = "64-bits integers")]
I64 = 5,
}

View File

@ -1,15 +1,15 @@
//! Data acquisition model. Provides abstract layers around DAQ devices.
#![allow(non_snake_case)]
use super::*;
use super::api::StreamApiDescr;
use super::*;
use crate::config::*;
/// Device info structure. Gives all information regarding a device, i.e. the number of input and
/// output channels, its name and available sample rates and types.
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct DeviceInfo {
/// The api in use for this device
pub api: StreamApiDescr,
@ -18,21 +18,25 @@ pub struct DeviceInfo {
/// Available data types for the sample
pub avDataTypes: Vec<DataType>,
/// Preferred data type for device
pub prefDataType: DataType,
/// Available frames per block
pub avFramesPerBlock: Vec<usize>,
/// Preferred frames per block for device
pub prefFramesPerBlock: usize,
/// Available sample rates
pub avSampleRates: Vec<Flt>,
/// Preferred sample rate for device
pub prefSampleRate: Flt,
/// Number of input channels available for this device
pub iChannelCount: u8,
/// Number of output channels available for this device
pub oChannelCount: u8,

View File

@ -5,385 +5,79 @@ mod daqconfig;
mod datatype;
mod deviceinfo;
mod qty;
#[cfg(feature = "record")]
mod record;
mod streamcmd;
mod streamdata;
mod streamhandler;
mod streammgr;
mod streammsg;
mod streamstatus;
pub use daqconfig::*;
pub use datatype::*;
pub use deviceinfo::*;
pub use qty::*;
pub use streamcmd::*;
pub use streamdata::*;
pub use streamhandler::*;
pub use streammgr::*;
pub use streammsg::*;
pub use streamstatus::*;
#[cfg(feature = "record")]
pub use record::*;
#[cfg(feature = "cpal-api")]
use api::api_cpal::CpalApi;
use crate::{
config::*,
siggen::{self, Siggen},
};
use anyhow::{bail, Error, Result};
use api::Stream;
use core::time;
use crossbeam::{
channel::{unbounded, Receiver, Sender, TrySendError},
thread,
};
use deviceinfo::DeviceInfo;
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread};
use streammsg::*;
use self::api::StreamApiDescr;
use strum_macros::Display;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, types::PyModule, PyResult};
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Keep track of whether the stream has been created. To ensure singleton behaviour.
static smgr_created: AtomicBool = AtomicBool::new(false);
struct StreamData<T> {
streamtype: StreamType,
stream: Box<dyn Stream>,
threadhandle: JoinHandle<T>,
comm: Sender<StreamCommand>,
}
#[cfg_attr(feature = "python-bindings", pyclass(unsendable))]
/// Configure and manage input / output streams.
/// Stream types that can be started
///
pub struct StreamMgr {
// List of available devices
devs: Vec<DeviceInfo>,
// Input stream can be both input and duplex
input_stream: Option<StreamData<InQueues>>,
// Output only stream
output_stream: Option<StreamData<Siggen>>,
#[cfg(feature = "cpal-api")]
cpal_api: CpalApi,
/// The storage of queues. When no streams are running, they
/// are here. When stream is running, they will become available
/// in the JoinHandle of the thread.
instreamqueues: Option<InQueues>,
// Signal generator. Stored here on the bench in case no stream is running.
// It is picked when it is configured correctly for the starting output stream
// If it is not configured correctly, when a stream that outputs data is started
// ,it is removed here.
siggen: Option<crate::siggen::Siggen>,
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(PartialEq, Clone, Copy)]
pub enum StreamType {
/// Input-only stream
Input,
/// Output-only stream
Output,
/// Input and output at the same time
Duplex,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamMgr {
#[new]
/// See (StreamMgr::new())
fn new_py<'py>() -> StreamMgr {
StreamMgr::new()
}
/// Errors that happen in a stream
#[derive(strum_macros::EnumMessage, Debug, Clone, Display, Copy)]
pub enum StreamError {
/// Input overrun
#[strum(
message = "InputOverrun Error",
detailed_message = "Input buffer overrun"
)]
InputOverrunError,
// #[pyo3(name = "unit")]
// #[staticmethod]
// /// See: [Biquad::unit()]
// pub fn unit_py() -> Biquad {
// Biquad::unit()
// }
// #[pyo3(name = "firstOrderHighPass")]
}
impl StreamMgr {
/// Create new stream manager. A stream manager is supposed to be a singleton.
///
/// # Panics
///
/// When a StreamMgr object is already alive.
pub fn new() -> StreamMgr {
if smgr_created.load(std::sync::atomic::Ordering::Relaxed) {
panic!("BUG: Only one stream manager is supposed to be a singleton");
}
smgr_created.store(true, std::sync::atomic::Ordering::Relaxed);
let mut smgr = StreamMgr {
devs: vec![],
input_stream: None,
output_stream: None,
siggen: None,
#[cfg(feature = "cpal-api")]
cpal_api: CpalApi::new(),
instreamqueues: Some(vec![]),
};
smgr.devs = smgr.scanDeviceInfo();
smgr
}
/// Set a new signal generator. Returns an error if it is unapplicable.
/// It is unapplicable if the number of channels of output does not match the
/// number of output channels in a running stream.
pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> {
// Current signal generator. Where to place it?
if let Some(is) = &self.input_stream {
if let StreamType::Duplex = is.streamtype {
if siggen.nchannels() != is.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
assert!(self.siggen.is_none());
is.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
}
} else if let Some(os) = &self.output_stream {
assert!(self.siggen.is_none());
if siggen.nchannels() != os.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
} else {
self.siggen = Some(siggen);
return Ok(());
}
unreachable!()
}
/// Obtain a list of devices that are available for each available API
pub fn getDeviceInfo(&mut self) -> &Vec<DeviceInfo> {
&self.devs
}
fn scanDeviceInfo(&self) -> Vec<DeviceInfo> {
let mut devinfo = vec![];
#[cfg(feature = "cpal-api")]
{
let cpal_devs = self.cpal_api.getDeviceInfo();
if let Ok(devs) = cpal_devs {
devinfo.extend(devs);
}
}
devinfo
}
/// Add a new queue to the lists of queues
pub fn addInQueue(&mut self, tx: Sender<InStreamMsg>) {
if let Some(is) = &self.input_stream {
is.comm.send(StreamCommand::AddInQueue(tx)).unwrap()
} else {
self.instreamqueues.as_mut().unwrap().push(tx);
}
}
fn startInputStreamThread(
&mut self,
stream: &Box<dyn Stream>,
rx: Receiver<RawStreamData>,
) -> (JoinHandle<InQueues>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded();
// Unwrap here, as the queues should be free to grab
let mut iqueues = self.instreamqueues.take().unwrap();
let meta = stream.metadata().unwrap();
let threadhandle = std::thread::spawn(move || {
let mut ctr: usize = 0;
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
// New queue added
StreamCommand::AddInQueue(queue) => {
match queue.send(InStreamMsg::StreamStarted(Arc::new(meta.clone()))) {
Ok(()) => iqueues.push(queue),
Err(_) => {}
}
}
// Remove queue from list
StreamCommand::RemoveInQueue(queue) => {
iqueues.retain(|q| !q.same_channel(&queue))
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped);
break 'infy;
}
StreamCommand::NewSiggen(_) => {
panic!("Error: signal generator send to input-only stream.");
}
}
}
if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) {
// println!("Obtained raw stream data!");
let msg = Arc::new(msg);
let msg = InStreamMsg::RawStreamData(ctr, msg);
sendMsgToAllQueues(&mut iqueues, msg);
ctr += 1;
}
}
iqueues
});
(threadhandle, commtx)
}
fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> {
for d in self.devs.iter() {
if d.device_name == cfg.device_name {
return Some(d);
}
}
None
}
/// Start a stream of certain type, using given configuration
pub fn startStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
if self.input_stream.is_some() {
bail!("Input stream is already running. Please first stop existing input stream.")
}
match stype {
StreamType::Input | StreamType::Duplex => {
if cfg.numberEnabledInChannels() == 0 {
bail!("At least one input channel should be enabled for an input stream")
}
}
_ => {}
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
let stream = match cfg.api {
StreamApiDescr::Cpal => {
let devinfo = self
.match_devinfo(cfg)
.ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
self.cpal_api.startStream(stype, devinfo, cfg, tx)?
}
_ => bail!("Unimplemented api!"),
};
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata().unwrap();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(Arc::new(meta)));
let (threadhandle, commtx) = self.startInputStreamThread(&stream, rx);
self.input_stream = Some(StreamData {
streamtype: stype,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
/// Start a default input stream, using default settings on everything. This is only possible
/// when the CPAL_api is available
pub fn startDefaultInputStream(&mut self) -> Result<()> {
if self.input_stream.is_some() {
bail!("Input stream is already running. Please first stop existing input stream.")
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
// Only a default input stream when CPAL feature is enabled
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
let stream = self.cpal_api.startDefaultInputStream(tx)?;
// Inform all listeners of new stream data
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata().unwrap();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(Arc::new(meta)));
let (threadhandle, commtx) = self.startInputStreamThread(&stream, rx);
self.input_stream = Some(StreamData {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
else {
bail!("Unable to start default input stream: no CPAL api available")
}
}
}
/// Stop existing input stream.
pub fn stopInputStream(&mut self) -> Result<()> {
if let Some(StreamData {
streamtype: _, // Ignored here
stream: _,
threadhandle,
comm,
}) = self.input_stream.take()
{
// println!("Stopping existing stream..");
// Send thread to stop
comm.send(StreamCommand::StopThread).unwrap();
// Store stream queues back into StreamMgr
self.instreamqueues = Some(threadhandle.join().expect("Stream thread panicked!"));
} else {
bail!("Stream is not running.")
}
Ok(())
}
/// Stop existing running stream.
///
/// Args
///
/// * st: The stream type.
pub fn stopStream(&mut self, st: StreamType) -> Result<()> {
match st {
StreamType::Input | StreamType::Duplex => self.stopInputStream(),
_ => bail!("Not implemented output stream"),
}
}
} // impl StreamMgr
impl Drop for StreamMgr {
fn drop(&mut self) {
// Kill input stream if there is one
if self.input_stream.is_some() {
self.stopStream(StreamType::Input).unwrap();
}
if self.output_stream.is_some() {
self.stopStream(StreamType::Output).unwrap();
}
// Decref the singleton
smgr_created.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
// Send to all queues, remove queues that are disconnected when found out
// on the way.
fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) {
// Loop over queues. Remove queues that error when we try to send
// to them
iqueues.retain(|q| match q.try_send(msg.clone()) {
Ok(_) => true,
Err(_e) => false,
});
}
/// Daq devices
trait Daq {}
#[cfg(test)]
mod tests {
// #[test]
/// Output underrun
#[strum(
message = "OutputUnderrunError",
detailed_message = "Output buffer underrun"
)]
OutputUnderrunError,
/// Driver specific error
#[strum(message = "DriverError", detailed_message = "Driver error")]
DriverError,
/// Device
#[strum(detailed_message = "Device not available (anymore)")]
DeviceNotAvailable,
/// Logic error (something weird happened)
#[strum(detailed_message = "Logic error")]
LogicError,
}

View File

@ -1,4 +1,5 @@
use super::*;
use crate::config::Flt;
use anyhow::{bail, Error, Result};
use clap::builder::OsStr;
use crossbeam::atomic::AtomicCell;
@ -10,8 +11,8 @@ use rayon::iter::Empty;
use serde::de::IntoDeserializer;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use strum::EnumMessage;
@ -100,7 +101,6 @@ impl Recording {
DataType::I8 => Recording::create_dataset_type::<i8>(file, meta),
DataType::I16 => Recording::create_dataset_type::<i16>(file, meta),
DataType::I32 => Recording::create_dataset_type::<i32>(file, meta),
DataType::I64 => Recording::create_dataset_type::<i64>(file, meta),
DataType::F32 => Recording::create_dataset_type::<f32>(file, meta),
DataType::F64 => Recording::create_dataset_type::<f64>(file, meta),
}
@ -152,9 +152,6 @@ impl Recording {
let arr = ndarray::ArrayView2::<f64>::from_shape((framesPerBlock, nchannels), dat)?;
ds.write_slice(arr, (ctr, .., ..))?;
}
RawStreamData::UnknownDataType => {
bail!("Unknown data type!")
}
RawStreamData::StreamError(e) => {
bail!("Stream error: {}", e)
}
@ -289,17 +286,16 @@ impl Recording {
// Early stop. User stopped it.
break 'recloop;
}
InStreamMsg::RawStreamData(incoming_ctr, dat) => {
InStreamMsg::StreamData(dat) => {
if first {
first = false;
// Initialize counter offset
ctr_offset = incoming_ctr;
} else {
if incoming_ctr != stored_ctr + ctr_offset {
println!("********** PACKAGES MISSED ***********");
bail!("Packages missed. Recording is invalid.")
}
ctr_offset = dat.ctr;
} else if dat.ctr != stored_ctr + ctr_offset {
println!("********** PACKAGES MISSED ***********");
bail!("Packages missed. Recording is invalid.")
}
if wait_block_ctr > 0 {
// We are still waiting
wait_block_ctr -= 1;
@ -316,7 +312,7 @@ impl Recording {
Recording::append_to_dset(
&ds,
stored_ctr,
dat.as_ref(),
&dat.raw,
framesPerBlock,
nchannels,
)?;

31
src/daq/streamcmd.rs Normal file
View File

@ -0,0 +1,31 @@
//! Provides stream messages that come from a running stream
use crate::config::*;
use crate::daq::Qty;
use crate::siggen::Siggen;
use anyhow::{bail, Result};
use crossbeam::channel::Sender;
use std::any::TypeId;
use std::sync::{Arc, RwLock};
use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Commands that can be sent to a running stream
pub enum StreamCommand {
/// Add a new queue to a running stream
AddInQueue(SharedInQueue),
/// Remove a queue to a running stream
RemoveInQueue(SharedInQueue),
/// New signal generator config to be used
NewSiggen(Siggen),
/// Stop the thread, do not listen for data anymore.
StopThread,
}

303
src/daq/streamdata.rs Normal file
View File

@ -0,0 +1,303 @@
//! Provides stream messages that come from a running stream
use crate::config::*;
use crate::daq::Qty;
use crate::siggen::Siggen;
use anyhow::{bail, Result};
use cpal::Sample;
use crossbeam::channel::Sender;
use reinterpret::{reinterpret_slice, reinterpret_vec};
use std::any::TypeId;
use std::sync::{Arc, RwLock};
use std::u128::MAX;
use strum_macros::Display;
use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Raw stream data coming from a stream.
#[derive(Clone, Debug)]
pub enum RawStreamData {
/// 8-bits integer
Datai8(Vec<i8>),
/// 16-bits integer
Datai16(Vec<i16>),
/// 32-bits integer
Datai32(Vec<i32>),
/// 32-bits float
Dataf32(Vec<f32>),
/// 64-bits float
Dataf64(Vec<f64>),
/// A stream error occured
StreamError(StreamError),
}
impl RawStreamData {
pub fn toFloat(&self, nchannels: usize) -> Dmat {
// match &self {
// RawStreamData::Datai8(c) => {
// Dmat::zeros((2, 2));
// }
// RawStreamData::Datai16(c) => {
// Dmat::zeros((2, 2));
// }
// }
todo!()
}
}
impl RawStreamData {
/// Get reference to raw data buffer
pub fn getRef<T>(&self) -> &[T]
where
T: Sample + 'static,
{
let thetype: TypeId = TypeId::of::<T>();
match &self {
RawStreamData::Datai8(t) => {
let i8type: TypeId = TypeId::of::<i8>();
assert!(thetype == i8type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Datai16(t) => {
let i16type: TypeId = TypeId::of::<i16>();
assert!(thetype == i16type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Datai32(t) => {
let i32type: TypeId = TypeId::of::<i32>();
assert!(thetype == i32type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Dataf32(t) => {
let f32type: TypeId = TypeId::of::<f32>();
assert!(thetype == f32type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
RawStreamData::Dataf64(t) => {
let f64type: TypeId = TypeId::of::<f64>();
assert!(thetype == f64type);
let v: &[T] = unsafe { reinterpret_slice(t) };
v
}
_ => panic!("Cannot getRef from "),
}
}
}
// Create InStreamData object from
impl<T> From<&[T]> for RawStreamData
where
T: num::ToPrimitive + Clone + 'static,
{
fn from(input: &[T]) -> RawStreamData {
// Apparently, this code does not work with a match. I have searched around and have not found the
// reason for this. So this is a bit of stupid boilerplate.
let i8type: TypeId = TypeId::of::<i8>();
let i16type: TypeId = TypeId::of::<i16>();
let i32type: TypeId = TypeId::of::<i32>();
let f32type: TypeId = TypeId::of::<f32>();
let f64type: TypeId = TypeId::of::<f64>();
let thetype: TypeId = TypeId::of::<T>();
if i8type == thetype {
let v: Vec<i8> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai8(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai16(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai16(v)
} else if i32type == thetype {
let v: Vec<i32> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai32(v)
} else if f32type == thetype {
let v: Vec<f32> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Dataf32(v)
} else if f64type == thetype {
let v: Vec<f64> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Dataf64(v)
} else {
panic!("Not implemented sample type!")
}
}
}
// Create InStreamData object from
impl<T> From<Vec<T>> for RawStreamData
where
T: num::ToPrimitive + Clone + 'static,
{
fn from(input: Vec<T>) -> RawStreamData {
// Apparently, this code does not work with a match. I have searched around and have not found the
// reason for this. So this is a bit of stupid boilerplate.
let i8type: TypeId = TypeId::of::<i8>();
let i16type: TypeId = TypeId::of::<i16>();
let i32type: TypeId = TypeId::of::<i32>();
let f32type: TypeId = TypeId::of::<f32>();
let f64type: TypeId = TypeId::of::<f64>();
let thetype: TypeId = TypeId::of::<T>();
if i8type == thetype {
let v: Vec<i8> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai8(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai16(v)
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai16(v)
} else if i32type == thetype {
let v: Vec<i32> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai32(v)
} else if f32type == thetype {
let v: Vec<f32> = unsafe { reinterpret_vec(input) };
RawStreamData::Dataf32(v)
} else if f64type == thetype {
let v: Vec<f64> = unsafe { reinterpret_vec(input) };
RawStreamData::Dataf64(v)
} else {
panic!("Not implemented sample type!")
}
}
}
/// Stream metadata. All information required for properly interpreting the raw
/// data that is coming from the stream.
#[derive(Clone, Debug)]
pub struct StreamMetaData {
/// Information for each channel in the stream
pub channelInfo: Vec<DaqChannel>,
/// The data type of the device [Number / voltage / Acoustic pressure / ...]
pub rawDatatype: DataType,
/// Sample rate in [Hz]
pub samplerate: Flt,
/// The number of frames per block of data that comes in. Multiplied by
/// channelInfo.len() we get the total number of samples that come in at
/// each callback.
pub framesPerBlock: usize,
}
impl StreamMetaData {
/// Create new metadata object.
/// ///
/// # Args
///
pub fn new(
channelInfo: &[DaqChannel],
rawdtype: DataType,
sr: Flt,
framesPerBlock: usize,
) -> Result<StreamMetaData> {
Ok(StreamMetaData {
channelInfo: channelInfo.to_vec(),
rawDatatype: rawdtype,
samplerate: sr,
framesPerBlock,
})
}
/// Returns the number of channels in the stream metadata.
pub fn nchannels(&self) -> usize {
self.channelInfo.len()
}
}
/// Stream data (audio / other) coming from a stream or to be send to a stream
#[derive(Debug)]
pub struct StreamData {
/// Package counter. Should always increase monotonically.
pub ctr: usize,
/// Stream metadata. All info required for properly interpreting the raw data.
pub meta: Arc<StreamMetaData>,
/// This is typically what is stored when recording
pub raw: RawStreamData,
// Converted to floating point format. Used for further real time
// processing. Stored in an rw-lock. The first thread that acesses this data
// will perform the conversion. All threads after that will get the data.
converted: RwLock<Option<Arc<Dmat>>>,
}
impl StreamData {
/// Create new stream data object.
pub fn new(ctr: usize, meta: Arc<StreamMetaData>, raw: RawStreamData) -> StreamData {
StreamData {
ctr,
meta,
raw,
converted: RwLock::new(None),
}
}
/// Get the data in floating point format. If already converted, uses the
/// cached float data.
pub fn getFloatData(&self) -> Arc<Dmat> {
if let Some(dat) = self.converted.read().unwrap().as_ref() {
return dat.clone();
}
// In case we reach here, the data has not yet be converted to floating
// point, so we do this.
let mut o = self.converted.write().unwrap();
// It might be that another thread was 'first', and already performed
// the conversion. In that case, we still do an early return, and we
// just openend the lock twice for writing. Not a problem.
if let Some(dat) = o.as_ref() {
return dat.clone();
}
// Perform the actual conversion
let converted_data = Arc::new(self.raw.toFloat(self.meta.nchannels()));
// Replace the option with the Some
o.replace(converted_data.clone());
converted_data
}
}
#[cfg(test)]
mod test {
use num::traits::sign;
use cpal::Sample;
use super::*;
#[test]
fn test() {
const fs: Flt = 20.;
// Number of samples per channel
const Nframes: usize = 20;
const Nch: usize = 2;
let mut signal = [0.; Nch*Nframes];
let mut siggen = Siggen::newSineWave(Nch, 1.);
siggen.reset(fs);
siggen.setMute(&[false, true]);
siggen.genSignal(&mut signal);
let raw: Vec<i16> = Vec::from_iter(signal.iter().map(
|o| o.to_sample::<i16>()));
let ms1 = raw.iter().step_by(2).map(|s1| {*s1 as f64 * *s1 as f64}).sum::<f64>() / Nframes as f64;
let i16maxsq = (i16::MAX as f64).powf(2.);
// println!("ms1: {} {}", ms1, i16maxsq/2.);
// println!("{:?}", raw.iter().cloned().step_by(2).collect::<Vec<i16>>());
// println!("{:?}", i16::EQUILIBRIUM);
assert!(f64::abs(ms1 - i16maxsq/2.)/i16maxsq < 1e-3);
}
}

View File

@ -1,5 +1,5 @@
use crossbeam::channel::unbounded;
use crossbeam::channel::{unbounded, Receiver};
use super::*;
/// A stream handler registers a queue in the stream manager, and keeps the other end to

589
src/daq/streammgr.rs Normal file
View File

@ -0,0 +1,589 @@
//! Data acquisition model. Provides abstract layers around DAQ devices.
use super::api::*;
use super::*;
use crate::{
config::*,
siggen::{self, Siggen},
};
use anyhow::{bail, Error, Result};
use array_init::from_iter;
use core::time;
use cpal::Sample;
use crossbeam::{
channel::{unbounded, Receiver, Sender, TrySendError},
thread,
};
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread};
#[cfg(feature = "cpal-api")]
use super::api::api_cpal::CpalApi;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, types::PyModule, PyResult};
} else {} }
/// Store a queue in a shared pointer, to share sending
/// and receiving part of the queue.
pub type SharedInQueue = Sender<InStreamMsg>;
/// Vector of queues for stream messages
pub type InQueues = Vec<SharedInQueue>;
struct StreamInfo<T> {
streamtype: StreamType,
stream: Box<dyn Stream>,
threadhandle: JoinHandle<T>,
comm: Sender<StreamCommand>,
}
/// Keep track of whether the stream has been created. To ensure singleton behaviour.
static smgr_created: AtomicBool = AtomicBool::new(false);
#[cfg_attr(feature = "python-bindings", pyclass(unsendable))]
/// Configure and manage input / output streams.
///
pub struct StreamMgr {
// List of available devices
devs: Vec<DeviceInfo>,
// Input stream can be both input and duplex
input_stream: Option<StreamInfo<InQueues>>,
// Output only stream
output_stream: Option<StreamInfo<Siggen>>,
#[cfg(feature = "cpal-api")]
cpal_api: CpalApi,
/// The storage of queues. When no streams are running, they
/// are here. When stream is running, they will become available
/// in the JoinHandle of the thread.
instreamqueues: Option<InQueues>,
// Signal generator. Stored here on the bench in case no stream is running.
// It is picked when it is configured correctly for the starting output stream
// If it is not configured correctly, when a stream that outputs data is started
// ,it is removed here.
siggen: Option<crate::siggen::Siggen>,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamMgr {
#[new]
/// See (StreamMgr::new())
fn new_py<'py>() -> StreamMgr {
StreamMgr::new()
}
// #[pyo3(name = "unit")]
// #[staticmethod]
// /// See: [Biquad::unit()]
// pub fn unit_py() -> Biquad {
// Biquad::unit()
// }
// #[pyo3(name = "firstOrderHighPass")]
}
impl StreamMgr {
/// Create new stream manager. A stream manager is supposed to be a singleton.
///
/// # Panics
///
/// When a StreamMgr object is already alive.
pub fn new() -> StreamMgr {
if smgr_created.load(std::sync::atomic::Ordering::Relaxed) {
panic!("BUG: Only one stream manager is supposed to be a singleton");
}
smgr_created.store(true, std::sync::atomic::Ordering::Relaxed);
let mut smgr = StreamMgr {
devs: vec![],
input_stream: None,
output_stream: None,
siggen: None,
#[cfg(feature = "cpal-api")]
cpal_api: CpalApi::new(),
instreamqueues: Some(vec![]),
};
smgr.devs = smgr.scanDeviceInfo();
smgr
}
/// Get stream status for given stream type.
pub fn getStatus(&self, t: StreamType) -> StreamStatus {
match t {
StreamType::Input | StreamType::Duplex => {
if let Some(s) = &self.input_stream {
s.stream.status()
} else {
StreamStatus::NotRunning
}
}
StreamType::Output => {
if let Some(s) = &self.output_stream {
s.stream.status()
} else {
StreamStatus::NotRunning
}
}
}
}
/// Set a new signal generator. Returns an error if it is unapplicable.
/// It is unapplicable if the number of channels of output does not match the
/// number of output channels in a running stream.
pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> {
// Current signal generator. Where to place it?
if let Some(istream) = &self.input_stream {
if let StreamType::Duplex = istream.streamtype {
if siggen.nchannels() != istream.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
assert!(self.siggen.is_none());
istream.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
}
} else if let Some(os) = &self.output_stream {
assert!(self.siggen.is_none());
if siggen.nchannels() != os.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
} else {
self.siggen = Some(siggen);
return Ok(());
}
unreachable!()
}
/// Obtain a list of devices that are available for each available API
pub fn getDeviceInfo(&mut self) -> &Vec<DeviceInfo> {
&self.devs
}
fn scanDeviceInfo(&self) -> Vec<DeviceInfo> {
let mut devinfo = vec![];
#[cfg(feature = "cpal-api")]
{
let cpal_devs = self.cpal_api.getDeviceInfo();
if let Ok(devs) = cpal_devs {
devinfo.extend(devs);
}
}
devinfo
}
/// Add a new queue to the lists of queues
pub fn addInQueue(&mut self, tx: Sender<InStreamMsg>) {
if let Some(is) = &self.input_stream {
is.comm.send(StreamCommand::AddInQueue(tx)).unwrap()
} else {
self.instreamqueues.as_mut().unwrap().push(tx);
}
}
fn startInputStreamThread(
&mut self,
meta: Arc<StreamMetaData>,
rx: Receiver<RawStreamData>,
) -> (JoinHandle<InQueues>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded();
// Unwrap here, as the queues should be free to grab
let mut iqueues = self
.instreamqueues
.take()
.expect("No input streams queues!");
let threadhandle = std::thread::spawn(move || {
let mut ctr: usize = 0;
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
// New queue added
StreamCommand::AddInQueue(queue) => {
match queue.send(InStreamMsg::StreamStarted(meta.clone())) {
Ok(()) => iqueues.push(queue),
Err(_) => {}
}
}
// Remove queue from list
StreamCommand::RemoveInQueue(queue) => {
iqueues.retain(|q| !q.same_channel(&queue))
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped);
break 'infy;
}
StreamCommand::NewSiggen(_) => {
panic!("Error: signal generator send to input-only stream.");
}
}
}
if let Ok(raw) = rx.recv_timeout(time::Duration::from_millis(10)) {
// println!("Obtained raw stream data!");
let streamdata = StreamData::new(ctr, meta.clone(), raw);
let streamdata = Arc::new(streamdata);
let msg = InStreamMsg::StreamData(streamdata);
sendMsgToAllQueues(&mut iqueues, msg);
ctr += 1;
}
}
iqueues
});
(threadhandle, commtx)
}
// Match device info struct on given daq config.
fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> {
for d in self.devs.iter() {
if d.device_name == cfg.device_name {
return Some(d);
}
}
None
}
fn startOuputStreamThread<T>(
&mut self,
meta: Arc<StreamMetaData>,
tx: Sender<RawStreamData>,
) -> (JoinHandle<Siggen>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded();
// Number of channels to output for
let nchannels = meta.nchannels();
// Obtain signal generator. Set to silence when no signal generator is
// installed.
let mut siggen = self
.siggen
.take()
.unwrap_or_else(|| Siggen::newSilence(nchannels));
if siggen.nchannels() != nchannels {
// Updating number of channels
siggen.setNChannels(nchannels);
}
siggen.reset(meta.samplerate);
let threadhandle = std::thread::spawn(move || {
let mut floatbuf: Vec<Flt> = Vec::with_capacity(nchannels * meta.framesPerBlock);
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
// New queue added
StreamCommand::AddInQueue(_) => {
panic!("Invalid message send to output thread: AddInQueue");
}
// Remove queue from list
StreamCommand::RemoveInQueue(_) => {
panic!("Invalid message send to output thread: RemoveInQueue");
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
break 'infy;
}
StreamCommand::NewSiggen(new_siggen) => {
// println!("NEW SIGNAL GENERATOR ARRIVED!");
siggen = new_siggen;
siggen.reset(meta.samplerate);
if siggen.nchannels() != nchannels {
// println!("Updating channels");
siggen.setNChannels(nchannels);
}
}
}
}
while tx.len() < 2 {
unsafe {
floatbuf.set_len(nchannels * meta.framesPerBlock);
}
// Obtain signal
siggen.genSignal(&mut floatbuf);
// println!("level: {}", floatbuf.iter().sum::<Flt>());
let msg = match meta.rawDatatype {
DataType::I8 => {
let v = Vec::<i8>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai8(v)
}
DataType::I16 => {
let v = Vec::<i16>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai16(v)
}
DataType::I32 => {
let v = Vec::<i32>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai32(v)
}
DataType::F32 => {
let v = Vec::<f32>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Dataf32(v)
}
DataType::F64 => {
let v = Vec::<f64>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Dataf64(v)
}
};
if let Err(_e) = tx.send(msg) {
// println!("Error sending raw stream data to output stream!");
break 'infy;
}
}
// }
}
siggen
});
(threadhandle, commtx)
}
/// Start a stream of certain type, using given configuration
pub fn startStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
match stype {
StreamType::Input | StreamType::Duplex => {
self.startInputOrDuplexStream(stype, cfg)?;
}
StreamType::Output => {
// self.startOutputStream(cfg)?;
bail!("No output stream defined yet");
}
}
Ok(())
}
// fn startOutputStream(&mut self, cfg: &DaqConfig) -> Result<()> {
// let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
// let stream = match cfg.api {
// StreamApiDescr::Cpal => {
// let devinfo = self
// .match_devinfo(cfg)
// .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
// self.cpal_api.startOutputStream(devinfo, cfg, tx)?
// }
// _ => bail!("Unimplemented api!"),
// };
// Ok(())
// }
// Start an input or duplex stream
fn startInputOrDuplexStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
if self.input_stream.is_some() {
bail!("An input stream is already running. Please first stop existing input stream.")
}
if cfg.numberEnabledInChannels() == 0 {
bail!("At least one input channel should be enabled for an input stream")
}
if stype == StreamType::Duplex {
if cfg.numberEnabledOutChannels() == 0 {
bail!("At least one output channel should be enabled for a duplex stream")
}
if self.output_stream.is_some() {
bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream.");
}
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
let stream = match cfg.api {
StreamApiDescr::Cpal => {
if stype == StreamType::Duplex {
bail!("Duplex mode not supported for CPAL api");
}
let devinfo = self
.match_devinfo(cfg)
.ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
self.cpal_api.startInputStream(stype, devinfo, cfg, tx)?
}
_ => bail!("Unimplemented api!"),
};
// Input queues should be available, otherwise panic bug.
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
self.input_stream = Some(StreamInfo {
streamtype: stype,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
/// Start a default input stream, using default settings on everything. This is only possible
/// when the CPAL_api is available
pub fn startDefaultInputStream(&mut self) -> Result<()> {
if self.input_stream.is_some() {
bail!("Input stream is already running. Please first stop existing input stream.")
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
// Only a default input stream when CPAL feature is enabled
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
let stream = self.cpal_api.startDefaultInputStream(tx)?;
// Inform all listeners of new stream data
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
self.input_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
else {
bail!("Unable to start default input stream: no CPAL api available")
}
}
}
/// Start a default output stream. Only possible when CPAL Api is available.
pub fn startDefaultOutputStream(&mut self) -> Result<()> {
if let Some(istream) = &self.input_stream {
if istream.streamtype == StreamType::Duplex {
bail!("Duplex stream is already running");
}
}
if self.output_stream.is_some() {
bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream.");
}
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
let (tx, rx)= unbounded();
let stream = self.cpal_api.startDefaultOutputStream(rx)?;
let meta = stream.metadata();
let (threadhandle, commtx) = self.startOuputStreamThread::<u16>(meta, tx);
// Inform all listeners of new stream data
self.output_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
} // end if cpal api available
else {
bail!("Unable to start default input stream: no CPAL api available")
}
} // end of cfg_if
}
/// Stop existing input stream.
pub fn stopInputStream(&mut self) -> Result<()> {
if let Some(StreamInfo {
streamtype: _, // Ignored here
stream: _,
threadhandle,
comm,
}) = self.input_stream.take()
{
// println!("Stopping existing stream..");
// Send thread to stop
comm.send(StreamCommand::StopThread).unwrap();
// Store stream queues back into StreamMgr
self.instreamqueues = Some(threadhandle.join().expect("Stream thread panicked!"));
} else {
bail!("Stream is not running.")
}
Ok(())
}
/// Stop existing output stream
pub fn stopOutputStream(&mut self) -> Result<()> {
if let Some(StreamInfo {
streamtype: _, // Ignored here
stream: _,
threadhandle,
comm,
}) = self.output_stream.take()
{
if let Err(_) = comm.send(StreamCommand::StopThread){
assert!(threadhandle.is_finished());
}
// println!("Wainting for threadhandle to join...");
self.siggen = Some(threadhandle.join().expect("Output thread panicked!"));
// println!("Threadhandle joined!");
} else {
bail!("Stream is not running.");
}
Ok(())
}
/// Stop existing running stream.
///
/// Args
///
/// * st: The stream type.
pub fn stopStream(&mut self, st: StreamType) -> Result<()> {
match st {
StreamType::Input | StreamType::Duplex => self.stopInputStream(),
StreamType::Output => self.stopOutputStream(),
}
}
} // impl StreamMgr
impl Drop for StreamMgr {
fn drop(&mut self) {
// Kill input stream if there is one
if self.input_stream.is_some() {
self.stopStream(StreamType::Input).unwrap();
}
if self.output_stream.is_some() {
// println!("Stopstream in Drop");
self.stopStream(StreamType::Output).unwrap();
// println!("Stopstream in Drop done");
}
// Decref the singleton
smgr_created.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
// Send to all queues, remove queues that are disconnected when found out
// on the way.
fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) {
// Loop over queues. Remove queues that error when we try to send
// to them
iqueues.retain(|q| match q.try_send(msg.clone()) {
Ok(_) => true,
Err(_e) => false,
});
}
/// Daq devices
trait Daq {}
#[cfg(test)]
mod tests {
// #[test]
}

View File

@ -6,160 +6,18 @@ use anyhow::{bail, Result};
use crossbeam::channel::Sender;
use reinterpret::{reinterpret_slice, reinterpret_vec};
use std::any::TypeId;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::u128::MAX;
use strum_macros::Display;
use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Raw stream data coming from a stream.
#[derive(Clone, Debug)]
pub enum RawStreamData {
/// 8-bits integer
Datai8(Arc<Vec<i8>>),
/// 16-bits integer
Datai16(Arc<Vec<i16>>),
/// 32-bits integer
Datai32(Arc<Vec<i32>>),
/// 32-bits float
Dataf32(Arc<Vec<f32>>),
/// 64-bits float
Dataf64(Arc<Vec<f64>>),
/// Unknown data type. We cannot do anything with it, we could instead also create an error, although this is easier to pass downstream.
UnknownDataType,
/// A stream error occured
StreamError(StreamError),
}
// Create InStreamData object from
impl<T> From<&[T]> for RawStreamData
where
T: num::ToPrimitive + Clone + 'static,
{
fn from(input: &[T]) -> RawStreamData {
// Apparently, this code does not work with a match. I have searched around and have not found the
// reason for this. So this is a bit of stupid boilerplate.
let i8type: TypeId = TypeId::of::<i8>();
let i16type: TypeId = TypeId::of::<i16>();
let i32type: TypeId = TypeId::of::<i32>();
let f32type: TypeId = TypeId::of::<f32>();
let f64type: TypeId = TypeId::of::<f64>();
let thetype: TypeId = TypeId::of::<T>();
if i8type == thetype {
let v: Vec<i8> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai8(Arc::new(v))
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai16(Arc::new(v))
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai16(Arc::new(v))
} else if i32type == thetype {
let v: Vec<i32> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Datai32(Arc::new(v))
} else if f32type == thetype {
let v: Vec<f32> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Dataf32(Arc::new(v))
} else if f64type == thetype {
let v: Vec<f64> = unsafe { reinterpret_slice(input).to_vec() };
RawStreamData::Dataf64(Arc::new(v))
} else {
panic!("Not implemented sample type!")
}
}
}
// Create InStreamData object from
impl<T> From<Vec<T>> for RawStreamData
where
T: num::ToPrimitive + Clone + 'static,
{
fn from(input: Vec<T>) -> RawStreamData {
// Apparently, this code does not work with a match. I have searched around and have not found the
// reason for this. So this is a bit of stupid boilerplate.
let i8type: TypeId = TypeId::of::<i8>();
let i16type: TypeId = TypeId::of::<i16>();
let i32type: TypeId = TypeId::of::<i32>();
let f32type: TypeId = TypeId::of::<f32>();
let f64type: TypeId = TypeId::of::<f64>();
let thetype: TypeId = TypeId::of::<T>();
if i8type == thetype {
let v: Vec<i8> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai8(Arc::new(v))
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai16(Arc::new(v))
} else if i16type == thetype {
let v: Vec<i16> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai16(Arc::new(v))
} else if i32type == thetype {
let v: Vec<i32> = unsafe { reinterpret_vec(input) };
RawStreamData::Datai32(Arc::new(v))
} else if f32type == thetype {
let v: Vec<f32> = unsafe { reinterpret_vec(input) };
RawStreamData::Dataf32(Arc::new(v))
} else if f64type == thetype {
let v: Vec<f64> = unsafe { reinterpret_vec(input) };
RawStreamData::Dataf64(Arc::new(v))
} else {
panic!("Not implemented sample type!")
}
}
}
/// Stream metadata. All information required for
#[derive(Clone, Debug)]
pub struct StreamMetaData {
/// Information for each channel in the stream
pub channelInfo: Vec<DaqChannel>,
/// The data type of the device [Number / voltage]
pub rawDatatype: DataType,
/// Sample rate in [Hz]
pub samplerate: Flt,
/// The number of frames per block send over
pub framesPerBlock: usize,
}
impl StreamMetaData {
/// Create new metadata object.
/// ///
/// # Args
///
pub fn new(
channelInfo: &[DaqChannel],
rawdtype: DataType,
sr: Flt,
framesPerBlock: usize,
) -> Result<StreamMetaData> {
Ok(StreamMetaData {
channelInfo: channelInfo.to_vec(),
rawDatatype: rawdtype,
samplerate: sr,
framesPerBlock,
})
}
/// Returns the number of channels in the stream metadata.
pub fn nchannels(&self) -> usize {
self.channelInfo.len()
}
}
/// Input stream messages, to be send to handlers.
#[derive(Clone, Debug)]
pub enum InStreamMsg {
/// Raw stream data that is coming from a device. This is interleaved data. The number of channels is correct and
/// specified in the stream metadata.
RawStreamData(usize, Arc<RawStreamData>),
StreamData(Arc<StreamData>),
/// An error has occured in the stream
StreamError(StreamError),
@ -175,25 +33,6 @@ pub enum InStreamMsg {
StreamStopped,
}
/// Store a queue in a shared pointer, to share sending
/// and receiving part of the queue.
pub type SharedInQueue = Sender<InStreamMsg>;
/// Vector of queues for stream messages
pub type InQueues = Vec<SharedInQueue>;
/// Commands that can be sent to a running stream
pub enum StreamCommand {
/// Add a new queue to a running stream
AddInQueue(SharedInQueue),
/// Remove a queue to a running stream
RemoveInQueue(SharedInQueue),
/// New signal generator config to be used
NewSiggen(Siggen),
/// Stop the thread, do not listen for data anymore.
StopThread,
}
/// Stream types that can be started
///
@ -207,28 +46,3 @@ pub enum StreamType {
/// Input and output at the same time
Duplex,
}
/// Errors that happen in a stream
#[derive(strum_macros::EnumMessage, Debug, Clone, Display)]
pub enum StreamError {
/// Input overrun
#[strum(message = "InputXRunError", detailed_message = "Input buffer overrun")]
InputXRunError,
/// Output underrun
#[strum(
message = "OutputXRunError",
detailed_message = "Output buffer overrun"
)]
OutputXRunError,
/// Driver specific error
#[strum(message = "DriverError", detailed_message = "Driver error")]
DriverError,
/// Device
#[strum(detailed_message = "Device not available")]
DeviceNotAvailable,
/// Logic error (something weird happened)
#[strum(detailed_message = "Logic error")]
LogicError,
}

25
src/daq/streamstatus.rs Normal file
View File

@ -0,0 +1,25 @@
//! Provides stream messages that come from a running stream
use strum_macros::Display;
use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Gives the stream status of a stream, either input / output or duplex.
#[derive(strum_macros::EnumMessage, Debug, Clone, Copy, Display)]
pub enum StreamStatus {
/// Stream is not running
#[strum(message = "NotRunning", detailed_message = "Stream is not running")]
NotRunning,
/// Stream is running properly
#[strum(message = "Running", detailed_message = "Stream is running")]
Running,
/// An error occured in the stream.
#[strum(message = "Error", detailed_message = "An error occured with the stream")]
Error(StreamError)
}

View File

@ -61,7 +61,7 @@ impl Biquad {
/// Create new biquad filter. See [Biquad::new()]
///
pub fn new_py<'py>(coefs: PyReadonlyArrayDyn<Flt>) -> PyResult<Self> {
Ok(Biquad::new(&coefs.as_slice()?)?)
Ok(Biquad::new(coefs.as_slice()?)?)
}
#[pyo3(name = "unit")]
#[staticmethod]
@ -146,12 +146,12 @@ impl Biquad {
Ok(Biquad::new(&coefs).unwrap())
}
fn filter_inout(&mut self, inout: &mut [Flt]) {
for sample in 0..inout.len() {
let w0 = inout[sample] - self.a1 * self.w1 - self.a2 * self.w2;
for sample in inout.iter_mut() {
let w0 = *sample - self.a1 * self.w1 - self.a2 * self.w2;
let yn = self.b0 * w0 + self.b1 * self.w1 + self.b2 * self.w2;
self.w2 = self.w1;
self.w1 = w0;
inout[sample] = yn;
*sample = yn;
}
// println!("{:?}", inout);
}
@ -238,7 +238,7 @@ impl SeriesBiquad {
biqs.push(biq);
}
if biqs.len() == 0 {
if biqs.is_empty() {
bail!("No filter coefficients given!");
}

View File

@ -18,6 +18,10 @@
use super::config::*;
use super::filter::Filter;
use dasp_sample::{FromSample, Sample};
use rayon::prelude::*;
use std::fmt::Debug;
use std::iter::ExactSizeIterator;
use std::slice::IterMut;
#[cfg(feature = "python-bindings")]
use pyo3::prelude::*;
@ -26,10 +30,12 @@ use rand::prelude::*;
use rand::rngs::ThreadRng;
use rand_distr::StandardNormal;
const twopi: Flt = 2. * pi;
/// Source for the signal generator. Implementations are sine waves, sweeps, noise.
pub trait Source: Send {
/// Generate the 'pure' source signal. Output is placed inside the `sig` argument.
fn genSignal_unscaled(&mut self, sig: &mut [Flt]);
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>);
/// Reset the source state, i.e. set phase to 0, etc
fn reset(&mut self, fs: Flt);
/// Used to make the Siggen struct cloneable
@ -41,6 +47,19 @@ impl Clone for Box<dyn Source> {
}
}
#[derive(Clone)]
struct Silence {}
impl Source for Silence {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
sig.for_each(|s| *s = 0.0);
}
fn reset(&mut self, _fs: Flt) {}
fn clone_dyn(&self) -> Box<dyn Source> {
Box::new(self.clone())
}
}
/// White noise source
#[derive(Clone)]
struct WhiteNoise {}
@ -51,9 +70,8 @@ impl WhiteNoise {
}
}
impl Source for WhiteNoise {
fn genSignal_unscaled(&mut self, sig: &mut [Flt]) {
sig.iter_mut()
.for_each(|s| *s = thread_rng().sample(StandardNormal));
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
sig.for_each(|s| *s = thread_rng().sample(StandardNormal));
}
fn reset(&mut self, _fs: Flt) {}
fn clone_dyn(&self) -> Box<dyn Source> {
@ -87,20 +105,18 @@ impl Sine {
}
}
impl Source for Sine {
fn genSignal_unscaled(&mut self, sig: &mut [Flt]) {
if self.fs < 0. {
sig.iter_mut().for_each(|s| {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
if self.fs <= 0. {
sig.for_each(|s| {
*s = 0.;
});
return;
}
sig.iter_mut().for_each(|s| {
sig.for_each(|s| {
*s = Flt::sin(self.phase);
self.phase += self.omg / self.fs;
self.phase %= twopi;
});
while self.phase > 2. * pi {
self.phase -= 2. * pi;
}
}
fn reset(&mut self, fs: Flt) {
self.fs = fs;
@ -123,6 +139,12 @@ pub struct Siggen {
source: Box<dyn Source>,
// Filter applied to the source signal
channels: Vec<SiggenChannelConfig>,
// Temporary source signal buffer
source_buf: Vec<Flt>,
// Output buffers (for filtered source signal)
chout_buf: Vec<Vec<Flt>>,
}
/// Multiple channel signal generator. Can use a single source (coherent) to provide multiple signals
/// that can be sent out through different EQ's
@ -134,6 +156,17 @@ impl Siggen {
self.channels.len()
}
/// Silence: create a signal generator that does not output any dynamic
/// signal at all.
pub fn newSilence(nchannels: usize) -> Siggen {
Siggen {
channels: vec![SiggenChannelConfig::new(); nchannels],
source: Box::new(Silence {}),
source_buf: vec![],
chout_buf: vec![],
}
}
/// Create a white noise signal generator.
pub fn newWhiteNoise(nchannels: usize) -> Siggen {
Siggen::new(nchannels, Box::new(WhiteNoise::new()))
@ -148,6 +181,27 @@ impl Siggen {
self.channels.iter_mut().for_each(|set| set.setGain(g))
}
/// Set the number of channels to generate a signal for. Truncates the
/// output in case the value before calling this method is too little.
/// Appends new channel configs in case to little is available.
///
/// * nch: The new required number of channels
pub fn setNChannels(&mut self, nch: usize) {
self.channels.truncate(nch);
while self.channels.len() < nch {
self.channels.push(SiggenChannelConfig::new());
}
}
/// Set the DC offset for all channels
pub fn setDCOffset(&mut self,dc: &[Flt]) {
self.channels.iter_mut().zip(dc).for_each(
|(ch, dc)| {ch.DCOffset = *dc;});
}
/// Create a sine wave signal generator
///
/// * freq: Frequency of the sine wave in \[Hz\]
@ -160,44 +214,49 @@ impl Siggen {
Siggen {
source,
channels: vec![SiggenChannelConfig::new(); nchannels],
source_buf: vec![],
chout_buf: vec![],
}
}
/// Creates *interleaved* output signal
pub fn genSignal<T>(&mut self, out: &mut [T])
where
T: Sample + FromSample<Flt>,
Flt: Sample
T: Sample + FromSample<Flt> + Debug,
Flt: Sample,
{
let nch = self.nchannels();
let nsamples: usize = out.len() / nch;
assert!(out.len() % self.nchannels() == 0);
// No initialization required here, as the data is filled in in genSignal_unscaled
// Create source signal
let mut src = Vec::with_capacity(nsamples);
unsafe {
src.set_len(nsamples);
}
self.source.genSignal_unscaled(&mut src);
// Create output temporary vector
let mut chout = Vec::with_capacity(nsamples);
unsafe {
chout.set_len(nsamples);
}
self.source_buf.resize(nsamples, 0.);
self.source
.genSignal_unscaled(&mut self.source_buf.iter_mut());
// println!("Source signal: {:?}", self.source_buf);
// Write output while casted to the correct type
// Iterate over each channel, and counter
for (ch, channel) in self.channels.iter_mut().enumerate() {
// Create output signal, overwrite chout, as it
channel.genSignal(&src, &mut chout);
self.chout_buf.resize(nch, vec![]);
let out_iterator = out.iter_mut().skip(ch).step_by(nch);
for (sampleout, samplein) in out_iterator.zip(&chout) {
*sampleout = samplein.to_sample();
}
for (channelno, (channel, chout)) in self
.channels
.iter_mut()
.zip(self.chout_buf.iter_mut())
.enumerate()
{
chout.resize(nsamples, 0.);
// Create output signal, overwrite chout
channel.genSignal(&self.source_buf, chout);
// println!("Channel: {}, {:?}", channelno, chout);
let out_iterator = out.iter_mut().skip(channelno).step_by(nch);
out_iterator
.zip(chout)
.for_each(|(out, chin)| *out = chin.to_sample());
}
// println!("{:?}", out);
}
/// Reset signal generator. Applies any kind of cleanup necessary.
@ -212,18 +271,30 @@ impl Siggen {
}
/// Mute / unmute all channels at once
pub fn setAllMute(&mut self, mute: bool) {
self.channels.iter_mut().for_each(|s| {s.muted = mute;});
self.channels.iter_mut().for_each(|s| {
s.setMute(mute);
});
}
/// Mute / unmute individual channels. Array of bools should have same size
/// as number of channels in signal generator.
pub fn setMute(&mut self, mute: &[bool]) {
assert!(mute.len() == self.nchannels());
self.channels.iter_mut().zip(mute).for_each(|(s, m)| {
s.setMute(*m);
});
}
}
/// Signal generator config for a certain channel
#[derive(Clone)]
pub struct SiggenChannelConfig {
struct SiggenChannelConfig {
muted: bool,
prefilter: Option<Box<dyn Filter>>,
gain: Flt,
DCOffset: Flt,
}
unsafe impl Send for SiggenChannelConfig {}
impl SiggenChannelConfig {
/// Set new pre-filter that filters the source signal
pub fn setPreFilter(&mut self, pref: Option<Box<dyn Filter>>) {
@ -301,17 +372,54 @@ mod test {
// This code is just to check syntax. We should really be listening to these outputs.
let mut t = [0.; 10];
Siggen::newWhiteNoise(1).genSignal(&mut t);
println!("{:?}", &t);
// println!("{:?}", &t);
}
#[test]
fn test_sine() {
// This code is just to check syntax. We should really be listening to these outputs.
let mut s = [0.; 9];
// This code is just to check syntax. We should really be listening to
// these outputs.
const N: usize = 10000;
let mut s1 = [0.; N];
let mut s2 = [0.; N];
let mut siggen = Siggen::newSineWave(1, 1.);
siggen.reset(1.);
siggen.genSignal(&mut s);
println!("{:?}", &s);
siggen.reset(10.);
siggen.setAllMute(false);
siggen.genSignal(&mut s1);
siggen.genSignal(&mut s2);
let absdiff = s1.iter().zip(s2.iter()).map(|(s1, s2)| {Flt::abs(*s1-*s2)}).sum::<Flt>();
assert!(absdiff< 1e-10);
}
#[test]
fn test_sine2() {
// Test if channels are properly separated etc. Check if RMS is correct
// for amplitude = 1.0.
const fs: Flt = 10.;
// Number of samples per channel
const Nframes: usize = 10000;
const Nch: usize = 2;
let mut signal = [0.; Nch*Nframes];
let mut siggen = Siggen::newSineWave(Nch, 1.);
siggen.reset(fs);
siggen.setMute(&[false, true]);
// siggen.channels[0].DCOffset = 0.1;
// Split off in two terms, see if this works properly
siggen.genSignal(&mut signal[..Nframes/2]);
siggen.genSignal(&mut signal[Nframes/2..]);
// Mean square of the signal
let ms1 = signal.iter().step_by(2).map(|s1| {*s1 * *s1}).sum::<Flt>() / Nframes as Flt;
println!("ms1: {}",ms1);
let ms2 = signal.iter().skip(1).step_by(2).map(|s1| {*s1 * *s1}).sum::<Flt>() / Nframes as Flt;
assert!(Flt::abs(ms1 - 0.5) < 1e-12);
assert_eq!(ms2 , 0.);
}
// A small test to learn a bit about sample types and conversion. This
@ -322,6 +430,5 @@ mod test {
assert_eq!(1.0f32.to_sample::<i8>(), 127);
assert_eq!(-1.0f32.to_sample::<i8>(), -127);
assert_eq!(1.0f32.to_sample::<i16>(), i16::MAX);
}
}