Compare commits

...

2 Commits

21 changed files with 747 additions and 376 deletions

View File

@ -88,3 +88,4 @@ record = ["dep:hdf5-sys", "dep:hdf5", "dep:chrono", "dep:uuid"]
f64 = []
f32 = []
python-bindings = ["dep:pyo3", "dep:numpy"]
extension-module = []

View File

@ -1 +1 @@
from ._lasprs import *

102
src/bin/lasp_output.rs Normal file
View File

@ -0,0 +1,102 @@
use anyhow::Result;
use crossbeam::channel::{ unbounded, Receiver, TryRecvError };
use lasprs::daq::{ DaqConfig, StreamMgr, StreamStatus, StreamType };
use lasprs::siggen::Siggen;
use std::io;
use std::time::Duration;
use std::{ thread, time };
// use
/// Spawns a thread and waits for a single line, pushes it to the receiver and returns
fn stdin_channel_wait_for_return() -> Receiver<String> {
let (tx, rx) = unbounded();
thread::spawn(move || {
loop {
let mut buffer = String::new();
io::stdin().read_line(&mut buffer).unwrap();
// Do not care whether we succeed here.
let _ = tx.send(buffer);
}
});
rx
}
fn sleep(millis: u64) {
let duration = time::Duration::from_millis(millis);
thread::sleep(duration);
}
fn main() -> Result<()> {
let mut smgr = StreamMgr::new();
let stdin_channel = stdin_channel_wait_for_return();
println!("Creating signal generator...");
let mut siggen = Siggen::newSine(2, 432.0);
// Reduce all gains a bit...
siggen.setAllGains(0.1);
// Apply signal generator
smgr.setSiggen(siggen);
println!("Starting stream...");
let devs = smgr.getDeviceInfo();
for (i, dev) in devs.iter().enumerate() {
println!("No: {}, name: {}", i, dev.device_name);
}
print!("Please choose device by number [0-{}]: ", devs.len());
let dev = loop {
match stdin_channel.try_recv() {
Ok(nostr) => {
if let Ok(val) = nostr.trim().parse::<i32>() {
if (val as usize) > devs.len() - 1 {
println!(
"Invalid device number. Expected a value between 0 and {}. Please try again.",
devs.len()
);
continue;
}
break &devs[val as usize];
} else {
println!("Invalid value. Please fill in a number. ");
}
}
Err(TryRecvError::Empty) => {
continue;
}
Err(TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
thread::sleep(Duration::from_millis(100));
};
let mut cfg = DaqConfig::newFromDeviceInfo(dev);
cfg.outchannel_config[0].enabled = true;
cfg.outchannel_config[1].enabled = true;
cfg.outchannel_config[2].enabled = true;
smgr.startStream(StreamType::Output, &cfg)?;
println!("Press <enter> key to quit...");
'infy: loop {
match stdin_channel.try_recv() {
Ok(_key) => {
break 'infy;
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
sleep(100);
match smgr.getStatus(StreamType::Output) {
StreamStatus::NotRunning {} => {
println!("Stream is not running?");
break 'infy;
}
StreamStatus::Running {} => {}
StreamStatus::Error { e } => {
println!("Stream error: {}", e);
break 'infy;
}
}
}
Ok(())
}

View File

@ -28,7 +28,7 @@ fn main() -> Result<()> {
let stdin_channel = stdin_channel_wait_for_return();
println!("Creating signal generator...");
let mut siggen = Siggen::newSineWave(2, 432.);
let mut siggen = Siggen::newSine(2, 432.);
// Some things that can be done
// siggen.setDCOffset(&[0.1, 0.]);
@ -36,11 +36,12 @@ fn main() -> Result<()> {
// Reduce all gains a bit...
siggen.setAllGains(0.1);
// Apply signal generator
smgr.setSiggen(siggen)?;
println!("Starting stream...");
smgr.startDefaultOutputStream()?;
// Apply signal generator
smgr.setSiggen(siggen);
println!("Press <enter> key to quit...");
'infy: loop {
@ -51,12 +52,12 @@ fn main() -> Result<()> {
}
sleep(100);
match smgr.getStatus(StreamType::Output) {
StreamStatus::NotRunning => {
StreamStatus::NotRunning{} => {
println!("Stream is not running?");
break 'infy;
}
StreamStatus::Running => {}
StreamStatus::Error(e) => {
StreamStatus::Running{} => {}
StreamStatus::Error{e} => {
println!("Stream error: {}", e);
break 'infy;
}

View File

@ -24,19 +24,21 @@ cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
pub use numpy::ndarray::{ArrayD, ArrayViewD, ArrayViewMutD};
pub use numpy::ndarray::prelude::*;
pub use numpy::{IntoPyArray, PyArray1, PyArrayDyn, PyArrayLike1, PyReadonlyArrayDyn};
pub use pyo3::exceptions::PyValueError;
pub use numpy::{IntoPyArray,PyArray, PyArray1, PyArrayDyn, PyArrayLike1, PyReadonlyArrayDyn};
pub use pyo3::prelude::*;
pub use pyo3::exceptions::PyValueError;
pub use pyo3::{pymodule, types::PyModule, PyResult};
pub use pyo3::anyhow::*;
pub use pyo3;
} else {
pub use ndarray::prelude::*;
pub use ndarray::{Array1, Array2, ArrayView1};
} }
// pub use num::complex::i;
use num::complex::*;
use ndarray::OwnedRepr;
use num::complex::Complex;
/// View into 1D array of floats
pub type VdView<'a> = ArrayView1<'a, Flt>;
@ -66,3 +68,17 @@ pub type Ccol = Array1<Cflt>;
pub type Dmat = Array2<Flt>;
/// 2D array of complex floats
pub type Cmat = Array2<Cflt>;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
/// 1D array of T as returned from Rust to Numpy
pub type PyArr1<'py, T> = Bound<'py, PyArray<T, ndarray::Dim<[usize; 1]>>>;
/// 1D array Floats returned from Rust to Numpy
pub type PyArr1Flt<'py> = PyArr1<'py, Flt>;
/// 1D array of Complex returned from Rust to Numpy
pub type PyArr1Cflt<'py> = PyArr1<'py, Cflt>;
}}

View File

@ -1,18 +1,22 @@
#![allow(dead_code)]
use super::Stream;
use super::StreamMetaData;
use crate::daq::streamdata::*;
use crate::config::{self, *};
use crate::daq::{self, *};
use anyhow::{bail, Result};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{Device, Host, Sample, SampleFormat, SupportedBufferSize};
use crate::config::{ self, * };
use crate::daq::{ self, * };
use crate::daq::{ streamdata::*, StreamApiDescr };
use anyhow::{ bail, Result };
use cpal::traits::{ DeviceTrait, HostTrait, StreamTrait };
use cpal::SampleRate;
use cpal::SupportedStreamConfig;
use cpal::{ Device, Host, Sample, SampleFormat, SupportedBufferSize };
use crossbeam::atomic::AtomicCell;
use crossbeam::channel::{Receiver, Sender};
use crossbeam::channel::{ Receiver, Sender };
use itertools::Itertools;
use num::ToPrimitive;
use reinterpret::reinterpret_slice;
use std::any::{Any, TypeId};
use std::any;
use std::any::{ Any, TypeId };
use std::collections::btree_map::OccupiedEntry;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
@ -78,19 +82,22 @@ impl CpalApi {
}
}
pub fn getDeviceInfo(&self) -> Result<Vec<DeviceInfo>> {
let srs_1 = [
1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000,
];
let srs_1 = [1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000];
let srs_2 = [11025, 22050, 44100, 88200];
let mut srs_tot = Vec::from_iter(srs_1.iter().chain(srs_2.iter()));
srs_tot.sort();
let srs_tot = Vec::from_iter(srs_tot.iter().copied().map(|i| *i as Flt));
let srs_tot = Vec::from_iter(
srs_tot
.iter()
.copied()
.map(|i| *i as Flt)
);
// srs_tot.sort();
let mut devs = vec![];
for dev in self.host.devices()? {
'devloop: for dev in self.host.devices()? {
// println!("{:?}", dev.name());
let mut iChannelCount = 0;
let mut oChannelCount = 0;
@ -107,8 +114,8 @@ impl CpalApi {
continue;
}
sample_formats.push(icfg.sample_format());
avSampleRates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr >= (icfg.min_sample_rate().0 as Flt));
avSampleRates.retain(|sr| *sr <= (icfg.max_sample_rate().0 as Flt));
if let SupportedBufferSize::Range { min, max } = icfg.buffer_size() {
avFramesPerBlock.retain(|i| i >= &(*min as usize));
avFramesPerBlock.retain(|i| i <= &(*max as usize));
@ -117,15 +124,15 @@ impl CpalApi {
// avFramesPerBlock.retain(|i| i >= icfg.buffer_size().)
}
}
if let Ok(ocfg) = dev.supported_input_configs() {
if let Ok(ocfg) = dev.supported_output_configs() {
for ocfg in ocfg {
let thissf = ocfg.sample_format();
if thissf.is_uint() {
continue;
}
sample_formats.push(thissf);
avSampleRates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr >= (ocfg.min_sample_rate().0 as Flt));
avSampleRates.retain(|sr| *sr <= (ocfg.max_sample_rate().0 as Flt));
if let SupportedBufferSize::Range { min, max } = ocfg.buffer_size() {
avFramesPerBlock.retain(|i| i >= &(*min as usize));
avFramesPerBlock.retain(|i| i <= &(*max as usize));
@ -138,16 +145,24 @@ impl CpalApi {
continue;
}
let dtypes: Vec<DataType> =
sample_formats.iter().dedup().map(|i| (*i).into()).collect();
let dtypes: Vec<DataType> = sample_formats
.iter()
.dedup()
.map(|i| (*i).into())
.collect();
let prefDataType = match dtypes.iter().position(|d| d == &DataType::F32) {
Some(idx) => dtypes[idx],
None => dtypes[dtypes.len() - 1],
};
let prefSampleRate = *avSampleRates.last().unwrap_or(&48000.);
let prefSampleRate = *avSampleRates.last().unwrap_or(&48000.0);
// Do not add device if it does not have any channels at all.
if iChannelCount == oChannelCount && oChannelCount == 0 {
break 'devloop;
}
devs.push(DeviceInfo {
api: super::StreamApiDescr::Cpal,
api: StreamApiDescr::Cpal,
device_name: dev.name()?,
avDataTypes: dtypes,
prefDataType,
@ -166,7 +181,7 @@ impl CpalApi {
hasInternalOutputMonitor: false,
duplexModeForced: false,
physicalIOQty: Qty::Number,
})
});
}
Ok(devs)
@ -175,9 +190,8 @@ impl CpalApi {
// Create the error function closure, that capture the send channel on which error messages from the stream are sent
fn create_errfcn(
send_ch: Option<Sender<RawStreamData>>,
status: Arc<AtomicCell<StreamStatus>>,
status: Arc<AtomicCell<StreamStatus>>
) -> impl FnMut(cpal::StreamError) {
move |err: cpal::StreamError| {
let serr = match err {
cpal::StreamError::DeviceNotAvailable => StreamError::DeviceNotAvailable,
@ -186,7 +200,7 @@ impl CpalApi {
if let Some(sender) = &send_ch {
sender.send(RawStreamData::StreamError(serr)).unwrap();
}
status.store(StreamStatus::Error(serr));
status.store(StreamStatus::Error { e: serr });
}
}
@ -194,10 +208,9 @@ impl CpalApi {
config: &cpal::StreamConfig,
sender: Sender<RawStreamData>,
framesPerBlock: usize,
en_inchannels: Vec<usize>,
en_inchannels: Vec<usize>
) -> impl FnMut(&[T], &cpal::InputCallbackInfo)
where
T: 'static + Sample + ToPrimitive,
where T: 'static + Sample + ToPrimitive
{
let tot_inch = config.channels as usize;
@ -233,7 +246,7 @@ impl CpalApi {
// Send over data
let msg = RawStreamData::from(enabled_ch_data.clone());
sender.send(msg).unwrap()
sender.send(msg).unwrap();
}
}
}
@ -249,13 +262,13 @@ impl CpalApi {
device: &cpal::Device,
sender: Sender<RawStreamData>,
en_inchannels: Vec<usize>,
framesPerBlock: usize,
framesPerBlock: usize
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning));
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning {}));
let errfcn = CpalApi::create_errfcn(Some(sender.clone()), status.clone());
macro_rules! build_stream{
macro_rules! build_stream {
($($cpaltype:pat => $rtype:ty),*) => {
match sf {
$(
@ -269,9 +282,10 @@ impl CpalApi {
}),*,
_ => bail!("Unsupported sample format '{}'", sf)
}
}
};
}
let stream: cpal::Stream = build_stream!(
let stream: cpal::Stream =
build_stream!(
SampleFormat::I8 => i8,
SampleFormat::I16 => i16,
SampleFormat::I32 => i32,
@ -284,24 +298,43 @@ impl CpalApi {
config: &cpal::StreamConfig,
streamstatus: Arc<AtomicCell<StreamStatus>>,
receiver: Receiver<RawStreamData>,
framesPerBlock: usize,
ch_config: &[DaqChannel],
framesPerBlock: usize
) -> impl FnMut(&mut [T], &cpal::OutputCallbackInfo)
where
T: 'static + Sample + Debug,
where T: 'static + Sample + Debug
{
let tot_outch: usize = config.channels as usize;
// println!("Numer of channels: {:?}", tot_outch);
let mut callback_ctr: usize = 0;
let mut q = VecDeque::<T>::with_capacity(2 * tot_outch * framesPerBlock);
let number_total_out_channels: usize = config.channels as usize;
let number_enabled_out_channels = ch_config
.iter()
.filter(|ch| ch.enabled)
.count();
move |data, _info: &_| {
let nsamples_asked = data.len();
let disabled_ch = DaqChannel::default();
let disabled_repeater = std::iter::repeat(&disabled_ch);
let enabled_outch = ch_config.iter().chain(disabled_repeater);
// Vector of enabled output channells, with length of number_total_out_channels
let enabled_outch: Vec<bool> = (0..number_total_out_channels)
.zip(enabled_outch)
.map(|(_, b)| b.enabled)
.collect();
assert_eq!(enabled_outch.len(), number_total_out_channels);
let mut callback_ctr: usize = 0;
let mut q = VecDeque::<T>::with_capacity(2 * number_total_out_channels * framesPerBlock);
move |outdata, _info: &_| {
let nsamples_asked =
(outdata.len() / number_total_out_channels) * number_enabled_out_channels;
let status = streamstatus.load();
callback_ctr += 1;
let mut setToEquilibrium = || data.iter_mut().for_each(|v| *v = Sample::EQUILIBRIUM);
let mut setToEquilibrium = ||
outdata.iter_mut().for_each(|v| {
*v = Sample::EQUILIBRIUM;
});
match status {
StreamStatus::NotRunning | StreamStatus::Error(_) => {
StreamStatus::NotRunning {} | StreamStatus::Error { .. } => {
setToEquilibrium();
return;
}
@ -312,7 +345,7 @@ impl CpalApi {
// Obtain new samples from the generator
for dat in receiver.try_iter() {
let slice = dat.getRef::<T>();
if let StreamStatus::Running = status {
if let StreamStatus::Running {} = status {
q.extend(slice);
}
}
@ -321,16 +354,37 @@ impl CpalApi {
if q.len() >= nsamples_asked {
// All right, we have enough samples to send out! They are
// drained from the queue
data.iter_mut()
.zip(q.drain(..nsamples_asked))
.for_each(|(o, i)| *o = i);
let out_chunks = outdata.iter_mut().chunks(number_total_out_channels);
let siggen_chunks = q.drain(..nsamples_asked).chunks(number_enabled_out_channels);
for (och, ich) in out_chunks.into_iter().zip(siggen_chunks.into_iter()) {
let mut sig_frame_iter = ich.into_iter();
och.into_iter()
.zip(&enabled_outch)
.for_each(|(o, en)| (
if *en {
*o = sig_frame_iter.next().unwrap();
} else {
*o = Sample::EQUILIBRIUM;
}
));
}
// outdata
// .iter_mut()
// .zip(q.drain(..nsamples_asked))
// .for_each(|(o, i)| {
// *o = i;
// });
} else if callback_ctr <= 2 {
// For the first two blocks, we allow dat the data is not yet
// ready, without complaining on underruns
setToEquilibrium();
} else {
// Output buffer underrun
streamstatus.store(StreamStatus::Error(StreamError::OutputUnderrunError));
streamstatus.store(StreamStatus::Error {
e: StreamError::OutputUnderrunError,
});
setToEquilibrium();
}
}
@ -341,19 +395,18 @@ impl CpalApi {
config: &cpal::StreamConfig,
device: &cpal::Device,
receiver: Receiver<RawStreamData>,
framesPerBlock: usize,
ch_config: &[DaqChannel],
framesPerBlock: usize
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
// let tot_ch = config.channels as usize;
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning));
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning {}));
let err_cb = CpalApi::create_errfcn(None, status.clone());
macro_rules! build_stream{
macro_rules! build_stream {
($($cpaltype:pat => $rtype:ty),*) => {
match sf {
$(
$cpaltype => {
let outcallback = CpalApi::create_outcallback::<$rtype>(config, status.clone(), receiver, framesPerBlock);
let outcallback = CpalApi::create_outcallback::<$rtype>(config, status.clone(), receiver, ch_config, framesPerBlock);
device.build_output_stream(
&config,
outcallback,
@ -362,9 +415,10 @@ impl CpalApi {
}),*,
_ => bail!("Unsupported sample format '{}'", sf)
}
}
};
}
let stream: cpal::Stream = build_stream!(
let stream: cpal::Stream =
build_stream!(
SampleFormat::I8 => i8,
SampleFormat::I16 => i16,
SampleFormat::I32 => i32,
@ -380,10 +434,9 @@ impl CpalApi {
devinfo: &DeviceInfo,
conf: &DaqConfig,
_dev: &cpal::Device,
conf_iterator: T,
conf_iterator: T
) -> Result<cpal::SupportedStreamConfig>
where
T: Iterator<Item = cpal::SupportedStreamConfigRange>,
where T: Iterator<Item = cpal::SupportedStreamConfigRange>
{
let nchannels = match st {
StreamType::Input => devinfo.iChannelCount,
@ -393,10 +446,11 @@ impl CpalApi {
for cpalconf in conf_iterator {
if cpalconf.sample_format() == conf.dtype.into() {
// Specified sample format is available
if cpalconf.channels() == nchannels as u16 {
if cpalconf.channels() == (nchannels as u16) {
let requested_sr = conf.sampleRate(devinfo);
if cpalconf.min_sample_rate().0 as Flt <= requested_sr
&& cpalconf.max_sample_rate().0 as Flt >= requested_sr
if
(cpalconf.min_sample_rate().0 as Flt) <= requested_sr &&
(cpalconf.max_sample_rate().0 as Flt) >= requested_sr
{
// Sample rate falls within range.
let requested_fpb = conf.framesPerBlock(devinfo) as u32;
@ -409,7 +463,7 @@ impl CpalApi {
min,
max,
requested_fpb
)
);
}
}
_ => {}
@ -428,27 +482,29 @@ impl CpalApi {
stype: StreamType,
devinfo: &DeviceInfo,
conf: &DaqConfig,
sender: Sender<RawStreamData>,
sender: Sender<RawStreamData>
) -> Result<Box<dyn Stream>> {
for cpaldev in self.host.devices()? {
// See if we can create a supported stream config.
let supported_config = match stype {
let supported_config = (match stype {
StreamType::Duplex => bail!("Duplex stream not supported for CPAL"),
StreamType::Input => CpalApi::create_cpal_config(
stype,
devinfo,
conf,
&cpaldev,
cpaldev.supported_input_configs()?,
),
StreamType::Output => CpalApi::create_cpal_config(
stype,
devinfo,
conf,
&cpaldev,
cpaldev.supported_output_configs()?,
),
}?;
StreamType::Input =>
CpalApi::create_cpal_config(
stype,
devinfo,
conf,
&cpaldev,
cpaldev.supported_input_configs()?
),
StreamType::Output =>
CpalApi::create_cpal_config(
stype,
devinfo,
conf,
&cpaldev,
cpaldev.supported_output_configs()?
),
})?;
let framesPerBlock = conf.framesPerBlock(devinfo);
let sf = supported_config.sample_format();
@ -458,7 +514,7 @@ impl CpalApi {
&conf.enabledInchannelConfig(),
conf.dtype,
supported_config.sample_rate().0 as Flt,
framesPerBlock,
framesPerBlock
)?;
let meta = Arc::new(meta);
@ -468,23 +524,27 @@ impl CpalApi {
&cpaldev,
sender,
conf.enabledInchannelsList(),
framesPerBlock,
framesPerBlock
)?;
stream.play()?;
status.store(StreamStatus::Running);
status.store(StreamStatus::Running {});
return Ok(Box::new(CpalStream {
stream,
md: meta,
noutchannels: 0,
status,
}));
return Ok(
Box::new(CpalStream {
stream,
md: meta,
noutchannels: 0,
status,
})
);
}
bail!(format!(
"Error: requested device {} not found. Please make sure the device is available.",
devinfo.device_name
))
bail!(
format!(
"Error: requested device {} not found. Please make sure the device is available.",
devinfo.device_name
)
)
}
/// Start a default input stream.
@ -492,7 +552,7 @@ impl CpalApi {
///
pub fn startDefaultInputStream(
&mut self,
sender: Sender<RawStreamData>,
sender: Sender<RawStreamData>
) -> Result<Box<dyn Stream>> {
if let Some(device) = self.host.default_input_device() {
if let Ok(config) = device.default_input_config() {
@ -511,15 +571,16 @@ impl CpalApi {
&device,
sender,
en_inchannels,
framesPerBlock,
framesPerBlock
)?;
stream.play()?;
status.store(StreamStatus::Running);
status.store(StreamStatus::Running {});
// Daq: default channel config
let daqchannels = Vec::from_iter(
(0..final_config.channels)
.map(|i| DaqChannel::defaultAudio(format!("Unnamed input channel {}", i))),
(0..final_config.channels).map(|i|
DaqChannel::defaultAudio(format!("Unnamed input channel {}", i))
)
);
// Specify data tape
@ -530,15 +591,17 @@ impl CpalApi {
&daqchannels,
dtype,
config.sample_rate().0 as Flt,
framesPerBlock,
framesPerBlock
)?;
let md = Arc::new(md);
Ok(Box::new(CpalStream {
stream,
md,
noutchannels: 0,
status,
}))
Ok(
Box::new(CpalStream {
stream,
md,
noutchannels: 0,
status,
})
)
} else {
bail!("Could not obtain default input configuration")
}
@ -547,70 +610,147 @@ impl CpalApi {
}
}
fn getDefaultOutputConfig(&self) -> Result<(Device, cpal::StreamConfig, SampleFormat, usize)> {
if let Some(dev) = self.host.default_output_device() {
let cfg = dev.default_output_config()?;
// let framesPerBlock: usize = 256;
// let framesPerBlock: usize = 8192;
let framesPerBlock: usize = cfg.sample_rate().0 as usize;
// let framesPerBlock: usize = 256;
let final_config = cpal::StreamConfig {
channels: cfg.channels(),
sample_rate: cfg.sample_rate(),
buffer_size: cpal::BufferSize::Fixed(framesPerBlock as u32),
};
return Ok((dev, final_config, cfg.sample_format(), framesPerBlock));
}
bail!("Could not find default output device!");
}
pub fn startDefaultOutputStream(
&self,
receiver: Receiver<RawStreamData>,
receiver: Receiver<RawStreamData>
) -> Result<Box<dyn Stream>> {
if let Some(device) = self.host.default_output_device() {
if let Ok(config) = device.default_output_config() {
// let framesPerBlock: usize = 256;
// let framesPerBlock: usize = 8192;
let framesPerBlock: usize = config.sample_rate().0 as usize;
// let framesPerBlock: usize = 256;
let final_config = cpal::StreamConfig {
channels: config.channels(),
sample_rate: config.sample_rate(),
buffer_size: cpal::BufferSize::Fixed(framesPerBlock as u32),
};
// let en_outchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize));
let (device, config, sampleformat, framesPerBlock) = self.getDefaultOutputConfig()?;
let sampleformat = config.sample_format();
let (stream, status) = CpalApi::build_output_stream(
sampleformat,
&final_config,
&device,
receiver,
framesPerBlock,
)?;
// Daq: default channel config
let daqchannels = Vec::from_iter(
(0..config.channels).map(|i|
DaqChannel::defaultAudio(format!("Unnamed output channel {}", i))
)
);
let (stream, status) = CpalApi::build_output_stream(
sampleformat,
&config,
&device,
receiver,
&daqchannels,
framesPerBlock
)?;
stream.play()?;
status.store(StreamStatus::Running);
stream.play()?;
status.store(StreamStatus::Running {});
// Daq: default channel config
let daqchannels =
Vec::from_iter((0..final_config.channels).map(|i| {
DaqChannel::defaultAudio(format!("Unnamed output channel {}", i))
}));
// // Specify data tape
let dtype = DataType::from(sampleformat);
// // Specify data tape
let dtype = DataType::from(sampleformat);
// // Create stream metadata
let md = StreamMetaData::new(
&daqchannels,
dtype,
config.sample_rate().0 as Flt,
framesPerBlock,
)?;
let md = Arc::new(md);
let str = Box::new(CpalStream {
stream,
md,
noutchannels: daqchannels.len(),
status,
});
Ok(str)
} else {
bail!("Could not obtain default output configuration")
} // Default output config is OK
} else {
bail!("Could not open output device")
} // Could not
// // Create stream metadata
let md = StreamMetaData::new(
&daqchannels,
dtype,
config.sample_rate.0 as Flt,
framesPerBlock
)?;
let md = Arc::new(md);
let str = Box::new(CpalStream {
stream,
md,
noutchannels: daqchannels.len(),
status,
});
Ok(str)
}
// Create an output stream, using given signal generators for each channel.
// }
pub fn startOutputStream(&self, _rx: Receiver<RawStreamData>) -> Result<Box<dyn Stream>> {
bail!("Not implemented");
fn getCPALOutputConfig(
&self,
dev: &DeviceInfo,
daqconfig: &DaqConfig
) -> Result<(Device, cpal::StreamConfig, SampleFormat, usize)> {
let samplerate = dev.avSampleRates[daqconfig.sampleRateIndex] as u32;
let framesPerBlock = dev.avFramesPerBlock[daqconfig.framesPerBlockIndex];
let highest_ch: Result<usize, anyhow::Error> = daqconfig
.highestEnabledOutChannel()
.ok_or_else(|| anyhow::anyhow!("No output channels enabled."));
let highest_ch = highest_ch? as u16;
for cpaldev in self.host.devices()? {
if cpaldev.name()? == dev.device_name {
// Check, device name matches required device name
for cpalcfg in cpaldev.supported_output_configs()? {
let sf = cpalcfg.sample_format();
if sf == daqconfig.dtype.into() {
let max_sr = cpalcfg.max_sample_rate().0;
let min_sr = cpalcfg.min_sample_rate().0;
if samplerate <= max_sr && samplerate >= min_sr {
let cfg = cpalcfg.with_sample_rate(SampleRate(samplerate as u32));
let mut cfg = cfg.config();
cfg.channels = highest_ch + 1;
// Overwrite buffer size to required buffer size
cfg.buffer_size = cpal::BufferSize::Fixed(framesPerBlock as u32);
// Return tuple of device, config, sample format and
// frames per block
return Ok((cpaldev, cfg, sf, framesPerBlock));
}
}
}
}
}
bail!("Could not find device with name '{}'", dev.device_name)
}
pub fn startOutputStream(
&self,
dev: &DeviceInfo,
cfg: &DaqConfig,
receiver: Receiver<RawStreamData>
) -> Result<Box<dyn Stream>> {
let (device, cpalconfig, sampleformat, framesPerBlock) = self.getCPALOutputConfig(
dev,
cfg
)?;
let (stream, status) = Self::build_output_stream(
sampleformat,
&cpalconfig,
&device,
receiver,
&cfg.outchannel_config,
framesPerBlock
)?;
stream.play()?;
status.store(StreamStatus::Running {});
// // Specify data tape
let dtype = DataType::from(sampleformat);
let md = StreamMetaData::new(
&cfg.enabledOutchannelConfig(),
dtype,
cpalconfig.sample_rate.0 as Flt,
framesPerBlock
)?;
let md = Arc::new(md);
let str = Box::new(CpalStream {
stream,
md,
noutchannels: cpalconfig.channels as usize,
status,
});
Ok(str)
}
}

View File

@ -1,13 +1,14 @@
use serde::{Deserialize, Serialize};
/// Daq apis that are optionally compiled in. Examples:
///
/// - CPAL (Cross-Platform Audio Library)
/// - ...
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use strum::EnumMessage;
use strum_macros;
use std::sync::Arc;
use crate::config::*;
use super::{streamstatus::StreamStatus, streamdata::StreamMetaData};
use super::{streamdata::StreamMetaData, streamstatus::StreamStatus};
#[cfg(feature = "cpal-api")]
pub mod api_cpal;
@ -26,10 +27,13 @@ pub trait Stream {
/// Number of output channels in stream
fn noutchannels(&self) -> usize;
/// Obtain stream status
fn status(&self) -> StreamStatus;
}
#[derive(strum_macros::EnumMessage, Debug, Clone, PartialEq, Serialize, Deserialize)]
/// Stream API descriptor: type and corresponding text
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(strum_macros::EnumMessage, Debug, Clone, PartialEq, Serialize, Deserialize, strum_macros::Display)]
#[allow(dead_code)]
pub enum StreamApiDescr {
/// CPAL api
@ -38,4 +42,4 @@ pub enum StreamApiDescr {
/// PulseAudio api
#[strum(message = "pulse", detailed_message = "Pulseaudio")]
Pulse = 1,
}
}

View File

@ -1,15 +1,13 @@
use std::{ops::Index, path::PathBuf};
use super::api::StreamApiDescr;
use super::datatype::DataType;
use super::deviceinfo::DeviceInfo;
use super::qty::Qty;
use super::*;
use crate::config::*;
use anyhow::Result;
use serde::{Deserialize, Serialize};
/// DAQ Configuration for a single channel
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[cfg_attr(feature = "python-bindings", pyclass(get_all, set_all))]
pub struct DaqChannel {
/// Whether the channel is enabled
pub enabled: bool,
@ -56,6 +54,7 @@ impl DaqChannel {
/// Configuration of a device.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(feature = "python-bindings", pyclass(get_all, set_all))]
pub struct DaqConfig {
/// The API
pub api: StreamApiDescr,
@ -68,6 +67,7 @@ pub struct DaqConfig {
/// Configuration of the output channels
pub outchannel_config: Vec<DaqChannel>,
/// The data type to use
pub dtype: DataType,
@ -75,15 +75,28 @@ pub struct DaqConfig {
pub digitalHighPassCutOn: Flt,
/// The index to use in the list of possible sample rates
sampleRateIndex: usize,
pub sampleRateIndex: usize,
/// The index to use in the list of possible frames per block
framesPerBlockIndex: usize,
pub framesPerBlockIndex: usize,
/// Used when output channels should be monitored, i.e. reverse-looped back as input channels.
monitorOutput: bool,
pub monitorOutput: bool,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl DaqConfig {
#[pyo3(name = "newFromDeviceInfo")]
#[staticmethod]
fn newFromDeviceInfo_py(d: &DeviceInfo) -> PyResult<DaqConfig> {
Ok(DaqConfig::newFromDeviceInfo(d))
}
fn __repr__(&self) -> String {
format!("{:#?}", self)
}
}
impl DaqConfig {
/// Creates a new default device configuration for a given device as specified with
/// the DeviceInfo descriptor.
@ -219,4 +232,24 @@ impl DaqConfig {
.cloned()
.collect()
}
/// Returns the channel number of the highest enabled input channel, if any.
pub fn highestEnabledInChannel(&self) -> Option<usize> {
let mut highest = None;
self.inchannel_config.iter().enumerate().for_each(|(i,c)| if c.enabled {highest = Some(i);});
highest
}
/// Returns the channel number of the highest enabled output channel, if any.
pub fn highestEnabledOutChannel(&self) -> Option<usize> {
let mut highest = None;
self.outchannel_config.iter().enumerate().for_each(|(i,c)| if c.enabled {highest = Some(i);});
println!("{:?}", highest);
highest
}
}

View File

@ -3,10 +3,12 @@
use strum::EnumMessage;
use strum_macros;
use serde::{Serialize, Deserialize};
use crate::config::*;
/// Data type description for samples coming from a stream
#[derive(strum_macros::EnumMessage, PartialEq, Copy, Debug, Clone, Serialize, Deserialize)]
#[allow(dead_code)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub enum DataType {
/// 32-bit floats
#[strum(message = "F32", detailed_message = "32-bits floating points")]

View File

@ -1,14 +1,16 @@
//! Data acquisition model. Provides abstract layers around DAQ devices.
#![allow(non_snake_case)]
use super::api::StreamApiDescr;
use super::StreamApiDescr;
use super::*;
use crate::config::*;
use crate::config::*;
/// Device info structure. Gives all information regarding a device, i.e. the number of input and
/// output channels, its name and available sample rates and types.
#[derive(Clone, Debug)]
#[allow(dead_code)]
#[cfg_attr(feature = "python-bindings", pyclass(get_all))]
pub struct DeviceInfo {
/// The api in use for this device
pub api: StreamApiDescr,
@ -17,9 +19,11 @@ pub struct DeviceInfo {
pub device_name: String,
/// Available data types for the sample
// #[pyo3(get)]
pub avDataTypes: Vec<DataType>,
/// Preferred data type for device
// #[pyo3(get)]
pub prefDataType: DataType,
/// Available frames per block
@ -65,5 +69,12 @@ pub struct DeviceInfo {
/// devices, this is typically a 'number' between +/- full scale. For some
/// devices however, the output quantity corresponds to a physical signal,
/// such a Volts.
// #[pyo3(get)]
pub physicalIOQty: Qty,
}
#[cfg_attr(feature = "python-bindings", pymethods)]
impl DeviceInfo {
fn __repr__(&self) -> String {
format!("{:?}", self)
}
}

View File

@ -22,9 +22,10 @@ pub use datatype::DataType;
pub use deviceinfo::DeviceInfo;
pub use qty::Qty;
pub use streamhandler::StreamHandler;
pub use streammgr::StreamMgr;
pub use streammgr::*;
pub use streammsg::InStreamMsg;
pub use streamstatus::StreamStatus;
use api::*;
#[cfg(feature = "record")]
pub use record::*;
@ -46,8 +47,28 @@ pub enum StreamType {
Duplex,
}
#[cfg(feature = "python-bindings")]
/// Add Python classes from stream manager
pub fn add_py_classses(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<DeviceInfo>()?;
m.add_class::<StreamMgr>()?;
m.add_class::<StreamApiDescr>()?;
m.add_class::<DataType>()?;
m.add_class::<Qty>()?;
m.add_class::<StreamType>()?;
m.add_class::<StreamError>()?;
m.add_class::<StreamStatus>()?;
m.add_class::<StreamError>()?;
m.add_class::<DaqChannel>()?;
m.add_class::<DaqConfig>()?;
Ok(())
}
/// Errors that happen in a stream
#[derive(strum_macros::EnumMessage, Debug, Clone, Display, Copy)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub enum StreamError {
/// Input overrun
#[strum(

View File

@ -1,12 +1,14 @@
//! Physical quantities that are input / output of a daq device. Provides an enumeration for these.
//!
use crate::config::*;
use strum::EnumMessage;
use strum_macros;
use serde::{Serialize, Deserialize};
/// Physical quantities that are I/O of a Daq device.
#[derive(PartialEq, Serialize, Deserialize, strum_macros::EnumMessage, Debug, Clone, Copy)]
#[cfg_attr(feature = "python-bindings", pyclass)]
#[allow(dead_code)]
pub enum Qty {
/// Number

View File

@ -5,7 +5,6 @@ use clap::builder::OsStr;
use crossbeam::atomic::AtomicCell;
use hdf5::types::{VarLenArray, VarLenUnicode};
use hdf5::{dataset, datatype, Dataset, File, H5Type};
use ndarray::ArrayView2;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};

View File

@ -1,11 +1,5 @@
use crate::siggen::*;
use super::streammgr::SharedInQueue;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Commands that can be sent to a running stream
@ -13,9 +7,6 @@ pub enum StreamCommand {
/// Add a new queue to a running INPUT stream
AddInQueue(SharedInQueue),
/// Remove a queue to a running INPUT stream
RemoveInQueue(SharedInQueue),
/// New signal generator config to be used in OUTPUT stream
NewSiggen(Siggen),

View File

@ -12,12 +12,6 @@ use std::u128::MAX;
use strum_macros::Display;
use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Raw stream data coming from a stream.
#[derive(Clone, Debug)]
@ -281,7 +275,7 @@ mod test {
const Nframes: usize = 20;
const Nch: usize = 2;
let mut signal = [0.; Nch*Nframes];
let mut siggen = Siggen::newSineWave(Nch, 1.);
let mut siggen = Siggen::newSine(Nch, 1.);
siggen.reset(fs);
siggen.setMute(&[false, true]);

View File

@ -1,11 +1,12 @@
//! Data acquisition model. Provides abstract layers around DAQ devices.
use super::api::*;
use super::config::*;
use super::*;
use crate::{
config::*,
siggen::{self, Siggen},
};
use anyhow::{bail, Error, Result};
use api::StreamApiDescr;
use array_init::from_iter;
use core::time;
use cpal::Sample;
@ -20,14 +21,7 @@ use streamdata::*;
use streammsg::*;
#[cfg(feature = "cpal-api")]
use super::api::api_cpal::CpalApi;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, types::PyModule, PyResult};
} else {} }
use super::api::{api_cpal::CpalApi, Stream};
/// Store a queue in a shared pointer, to share sending
/// and receiving part of the queue.
@ -45,9 +39,9 @@ struct StreamInfo<T> {
/// Keep track of whether the stream has been created. To ensure singleton behaviour.
static smgr_created: AtomicBool = AtomicBool::new(false);
#[cfg_attr(feature = "python-bindings", pyclass(unsendable))]
/// Configure and manage input / output streams.
///
#[cfg_attr(feature = "python-bindings", pyclass(unsendable))]
pub struct StreamMgr {
// List of available devices
devs: Vec<DeviceInfo>,
@ -73,7 +67,7 @@ pub struct StreamMgr {
siggen: Option<crate::siggen::Siggen>,
}
#[cfg(feature = "python-bindings")]
#[cfg(feature="python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamMgr {
#[new]
@ -82,13 +76,34 @@ impl StreamMgr {
StreamMgr::new()
}
// #[pyo3(name = "unit")]
// #[staticmethod]
// /// See: [Biquad::unit()]
// pub fn unit_py() -> Biquad {
// Biquad::unit()
// }
// #[pyo3(name = "firstOrderHighPass")]
#[pyo3(name = "startDefaultInputStream")]
fn startDefaultInputStream_py(&mut self) -> PyResult<()> {
Ok(self.startDefaultInputStream()?)
}
#[pyo3(name = "startDefaultOutputStream")]
fn startDefaultOutputStream_py(&mut self) -> PyResult<()> {
Ok(self.startDefaultOutputStream()?)
}
#[pyo3(name = "startStream")]
fn startStream_py(&mut self, st: StreamType, d: &DaqConfig) -> PyResult<()> {
Ok(self.startStream(st, d)?)
}
#[pyo3(name = "stopStream")]
fn stopStream_py(&mut self, st: StreamType) -> PyResult<()> {
Ok(self.stopStream(st)?)
}
#[pyo3(name = "getDeviceInfo")]
fn getDeviceInfo_py(&mut self) -> PyResult<Vec<DeviceInfo>> {
Ok(self.getDeviceInfo())
}
#[pyo3(name = "getStatus")]
fn getStatus_py(&self, st: StreamType) -> StreamStatus {
self.getStatus(st)
}
#[pyo3(name = "setSiggen")]
fn setSiggen_py(&mut self, siggen: Siggen) {
self.setSiggen(siggen)
}
}
impl Default for StreamMgr {
fn default() -> Self {
@ -130,14 +145,14 @@ impl StreamMgr {
if let Some(s) = &self.input_stream {
s.stream.status()
} else {
StreamStatus::NotRunning
StreamStatus::NotRunning {}
}
}
StreamType::Output => {
if let Some(s) = &self.output_stream {
s.stream.status()
} else {
StreamStatus::NotRunning
StreamStatus::NotRunning {}
}
}
}
@ -145,34 +160,24 @@ impl StreamMgr {
/// Set a new signal generator. Returns an error if it is unapplicable.
/// It is unapplicable if the number of channels of output does not match the
/// number of output channels in a running stream.
pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> {
pub fn setSiggen(&mut self, siggen: Siggen) {
// Current signal generator. Where to place it?
if let Some(istream) = &self.input_stream {
if let StreamType::Duplex = istream.streamtype {
if siggen.nchannels() != istream.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
assert!(self.siggen.is_none());
istream.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
}
} else if let Some(os) = &self.output_stream {
assert!(self.siggen.is_none());
if siggen.nchannels() != os.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
} else {
self.siggen = Some(siggen);
return Ok(());
}
unreachable!()
}
/// Obtain a list of devices that are available for each available API
pub fn getDeviceInfo(&mut self) -> &Vec<DeviceInfo> {
&self.devs
pub fn getDeviceInfo(&mut self) -> Vec<DeviceInfo> {
self.devs.clone()
}
fn scanDeviceInfo(&self) -> Vec<DeviceInfo> {
@ -189,7 +194,7 @@ impl StreamMgr {
/// Add a new queue to the lists of queues. On the queue, input data is
/// added.
///
///
/// If the stream is unable to write data on the queue (which might
/// happen when the handler is dropped), the queue is removed from the list
/// of queues that get data from the stream.
@ -227,14 +232,12 @@ impl StreamMgr {
}
}
// Remove queue from list
StreamCommand::RemoveInQueue(queue) => {
iqueues.retain(|q| !q.same_channel(&queue))
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped);
sendMsgToAllQueuesRemoveUnused(
&mut iqueues,
InStreamMsg::StreamStopped,
);
break 'infy;
}
StreamCommand::NewSiggen(_) => {
@ -249,7 +252,7 @@ impl StreamMgr {
let streamdata = Arc::new(streamdata);
let msg = InStreamMsg::StreamData(streamdata);
sendMsgToAllQueues(&mut iqueues, msg);
sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg);
ctr += 1;
}
}
@ -259,8 +262,15 @@ impl StreamMgr {
}
// Match device info struct on given daq config.
fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> {
self.devs.iter().find(|&d| d.device_name == cfg.device_name)
fn find_device(&self, cfg: &DaqConfig) -> Result<&DeviceInfo> {
if let Some(matching_dev) = self
.devs
.iter()
.find(|&d| d.device_name == cfg.device_name && d.api == cfg.api)
{
return Ok(matching_dev);
}
bail!("Could not find device with name {}.", cfg.device_name);
}
fn startOuputStreamThread(
&mut self,
@ -295,11 +305,6 @@ impl StreamMgr {
panic!("Invalid message send to output thread: AddInQueue");
}
// Remove queue from list
StreamCommand::RemoveInQueue(_) => {
panic!("Invalid message send to output thread: RemoveInQueue");
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
break 'infy;
@ -364,27 +369,41 @@ impl StreamMgr {
self.startInputOrDuplexStream(stype, cfg)?;
}
StreamType::Output => {
// self.startOutputStream(cfg)?;
bail!("No output stream defined yet");
self.startOutputStream(cfg)?;
}
}
Ok(())
}
// fn startOutputStream(&mut self, cfg: &DaqConfig) -> Result<()> {
// let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
// let stream = match cfg.api {
// StreamApiDescr::Cpal => {
// let devinfo = self
// .match_devinfo(cfg)
// .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
// self.cpal_api.startOutputStream(devinfo, cfg, tx)?
// }
// _ => bail!("Unimplemented api!"),
// };
/// Start a stream for output only, using only the output channel
/// configuration as given in the `cfg`.
fn startOutputStream(&mut self, cfg: &DaqConfig) -> Result<()> {
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
let stream = match cfg.api {
StreamApiDescr::Cpal => {
let devinfo = self.find_device(cfg)?;
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
self.cpal_api.startOutputStream(devinfo, cfg, rx)?
} else {
bail!("API {} not available", cfg.api)
}
}
}
_ => bail!("API {} not implemented!", cfg.api),
};
let meta = stream.metadata();
let (threadhandle, commtx) = self.startOuputStreamThread(meta, tx);
// Ok(())
// }
self.output_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
// Start an input or duplex stream
fn startInputOrDuplexStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
@ -409,12 +428,16 @@ impl StreamMgr {
if stype == StreamType::Duplex {
bail!("Duplex mode not supported for CPAL api");
}
let devinfo = self
.match_devinfo(cfg)
.ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
self.cpal_api.startInputStream(stype, devinfo, cfg, tx)?
let devinfo = self.find_device(cfg)?;
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
self.cpal_api.startInputStream(stype, devinfo, cfg, tx)?
} else {
bail!("API {} not available", cfg.api)
}
}
}
_ => bail!("Unimplemented api!"),
_ => bail!("API {} not implemented!", cfg.api),
};
// Input queues should be available, otherwise panic bug.
@ -422,7 +445,7 @@ impl StreamMgr {
let meta = stream.metadata();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone()));
sendMsgToAllQueuesRemoveUnused(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
@ -453,7 +476,7 @@ impl StreamMgr {
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone()));
sendMsgToAllQueuesRemoveUnused(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
@ -582,7 +605,7 @@ impl Drop for StreamMgr {
// Send to all queues, remove queues that are disconnected when found out
// on the way.
fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) {
fn sendMsgToAllQueuesRemoveUnused(iqueues: &mut InQueues, msg: InStreamMsg) {
// Loop over queues. Remove queues that error when we try to send
// to them
iqueues.retain(|q| match q.try_send(msg.clone()) {

View File

@ -2,24 +2,22 @@
use strum_macros::Display;
use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Gives the stream status of a stream, either input / output or duplex.
/// Gives the stream status of a (possible) stream, either input / output or duplex.
#[derive(strum_macros::EnumMessage, Debug, Clone, Copy, Display)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub enum StreamStatus {
/// Stream is not running
#[strum(message = "NotRunning", detailed_message = "Stream is not running")]
NotRunning,
NotRunning{},
/// Stream is running properly
#[strum(message = "Running", detailed_message = "Stream is running")]
Running,
Running{},
/// An error occured in the stream.
#[strum(message = "Error", detailed_message = "An error occured with the stream")]
Error(StreamError)
Error{
/// In case the stream has an error: e is the field name
e: StreamError
}
}

View File

@ -1,5 +1,5 @@
use super::*;
use ndarray::prelude::*;
use crate::config::*;
use anyhow::{Result, bail};
use num::Complex;
@ -27,7 +27,7 @@ impl Biquad {
#[new]
/// Create new biquad filter. See [Biquad::new()]
///
pub fn new_py<'py>(coefs: PyReadonlyArrayDyn<Flt>) -> PyResult<Self> {
pub fn new_py<'py>(coefs: PyArrayLike1<Flt>) -> PyResult<Self> {
Ok(Biquad::new(coefs.as_slice()?)?)
}
#[pyo3(name = "unit")]
@ -48,8 +48,8 @@ impl Biquad {
&mut self,
py: Python<'py>,
input: PyArrayLike1<Flt>,
) -> PyResult<&'py PyArray1<Flt>> {
Ok(self.filter(input.as_slice()?).into_pyarray(py))
) -> PyResult<PyArr1Flt<'py>> {
Ok(PyArray1::from_vec_bound(py, self.filter(input.as_slice()?)))
}
}
impl Biquad {
@ -112,6 +112,8 @@ impl Biquad {
Ok(Biquad::new(&coefs).unwrap())
}
/// Filter input signal, output by overwriting input slice.
pub fn filter_inout(&mut self, inout: &mut [Flt]) {
for sample in inout.iter_mut() {
let w0 = *sample - self.a1 * self.w1 - self.a2 * self.w2;
@ -123,6 +125,7 @@ impl Biquad {
// println!("{:?}", inout);
}
}
impl Filter for Biquad {
fn filter(&mut self, input: &[Flt]) -> Vec<Flt> {
let mut out = input.to_vec();

View File

@ -1,17 +1,20 @@
use super::*;
use super::biquad::Biquad;
use anyhow::{bail, Result};
#[derive(Clone, Debug)]
#[cfg_attr(feature = "python-bindings", pyclass)]
/// Series of biquads that filter sequentially on an input signal
///
/// # Examples
///
/// See (tests)
///
use super::*;
use super::biquad::Biquad;
use anyhow::{bail, Result};
#[derive(Clone, Debug)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct SeriesBiquad {
biqs: Vec<Biquad>,
}
@ -19,6 +22,8 @@ pub struct SeriesBiquad {
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl SeriesBiquad {
#[new]
/// Create new series filter set. See [SeriesBiquad::new()]
///
@ -37,8 +42,9 @@ impl SeriesBiquad {
&mut self,
py: Python<'py>,
input: PyArrayLike1<Flt>,
) -> PyResult<&'py PyArray1<Flt>> {
Ok(self.filter(input.as_slice()?).into_pyarray(py))
) -> Result<PyArr1Flt<'py>> {
let res = self.filter(input.as_slice()?);
Ok(PyArray1::from_vec_bound(py, res))
}
#[pyo3(name = "reset")]
/// See: [SeriesBiquad::reset()]

View File

@ -20,28 +20,23 @@ pub mod daq;
pub mod siggen;
use filter::*;
use daq::*;
/// A Python module implemented in Rust.
#[cfg(feature = "python-bindings")]
#[pymodule]
#[pyo3(name="_lasprs")]
fn lasprs(py: Python, m: &PyModule) -> PyResult<()> {
fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> {
pyo3_add_submodule_filter(py, m)?;
Ok(())
}
/// Add filter submodule to extension
#[cfg(feature = "python-bindings")]
fn pyo3_add_submodule_filter(py: Python, m: &PyModule) -> PyResult<()> {
// Add filter submodule
let filter_module = PyModule::new(py, "filter")?;
filter_module.add_class::<filter::Biquad>()?;
filter_module.add_class::<filter::SeriesBiquad>()?;
filter_module.add_class::<filter::BiquadBank>()?;
m.add_submodule(filter_module)?;
m.add_class::<filter::Biquad>()?;
m.add_class::<filter::SeriesBiquad>()?;
m.add_class::<filter::BiquadBank>()?;
m.add_class::<siggen::Siggen>()?;
daq::add_py_classses(m)?;
Ok(())
}

View File

@ -17,20 +17,17 @@
//! ```
use super::config::*;
use super::filter::Filter;
use dasp_sample::{FromSample, Sample};
use dasp_sample::{ FromSample, Sample };
use rayon::prelude::*;
use std::fmt::Debug;
use std::iter::ExactSizeIterator;
use std::slice::IterMut;
#[cfg(feature = "python-bindings")]
use pyo3::prelude::*;
use rand::prelude::*;
use rand::rngs::ThreadRng;
use rand_distr::StandardNormal;
const twopi: Flt = 2. * pi;
const twopi: Flt = 2.0 * pi;
/// Source for the signal generator. Implementations are sine waves, sweeps, noise.
pub trait Source: Send {
@ -52,7 +49,9 @@ struct Silence {}
impl Source for Silence {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
sig.for_each(|s| *s = 0.0);
sig.for_each(|s| {
*s = 0.0;
});
}
fn reset(&mut self, _fs: Flt) {}
fn clone_dyn(&self) -> Box<dyn Source> {
@ -71,7 +70,9 @@ impl WhiteNoise {
}
impl Source for WhiteNoise {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
sig.for_each(|s| *s = thread_rng().sample(StandardNormal));
sig.for_each(|s| {
*s = thread_rng().sample(StandardNormal);
});
}
fn reset(&mut self, _fs: Flt) {}
fn clone_dyn(&self) -> Box<dyn Source> {
@ -98,17 +99,17 @@ impl Sine {
/// *
fn new(freq: Flt) -> Sine {
Sine {
fs: -1.,
phase: 0.,
omg: 2. * pi * freq,
fs: -1.0,
phase: 0.0,
omg: 2.0 * pi * freq,
}
}
}
impl Source for Sine {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
if self.fs <= 0. {
if self.fs <= 0.0 {
sig.for_each(|s| {
*s = 0.;
*s = 0.0;
});
return;
}
@ -120,7 +121,7 @@ impl Source for Sine {
}
fn reset(&mut self, fs: Flt) {
self.fs = fs;
self.phase = 0.;
self.phase = 0.0;
}
fn clone_dyn(&self) -> Box<dyn Source> {
Box::new(self.clone())
@ -134,6 +135,7 @@ impl Source for Sine {
/// * (Siggen::newSine)
///
#[derive(Clone)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct Siggen {
// The source dynamic signal. Noise, a sine wave, sweep, etc
source: Box<dyn Source>,
@ -146,10 +148,22 @@ pub struct Siggen {
// Output buffers (for filtered source signal)
chout_buf: Vec<Vec<Flt>>,
}
#[cfg_attr(feature = "python-bindings", pymethods)]
impl Siggen {
#[pyo3(name = "newWhiteNoise")]
#[staticmethod]
fn newWhiteNoise_py() -> Siggen {
Siggen::newWhiteNoise(0)
}
#[pyo3(name = "newSine")]
#[staticmethod]
fn newSine_py(freq: Flt) -> Siggen {
Siggen::newSine(0, freq)
}
}
/// Multiple channel signal generator. Can use a single source (coherent) to provide multiple signals
/// that can be sent out through different EQ's
/// A struct that implements the Siggen trait is able to generate a signal.
impl Siggen {
/// Returns the number of channels this signal generator is generating for.
pub fn nchannels(&self) -> usize {
@ -195,17 +209,19 @@ impl Siggen {
}
/// Set the DC offset for all channels
pub fn setDCOffset(&mut self,dc: &[Flt]) {
self.channels.iter_mut().zip(dc).for_each(
|(ch, dc)| {ch.DCOffset = *dc;});
pub fn setDCOffset(&mut self, dc: &[Flt]) {
self.channels
.iter_mut()
.zip(dc)
.for_each(|(ch, dc)| {
ch.DCOffset = *dc;
});
}
/// Create a sine wave signal generator
///
/// * freq: Frequency of the sine wave in \[Hz\]
pub fn newSineWave(nchannels: usize, freq: Flt) -> Siggen {
pub fn newSine(nchannels: usize, freq: Flt) -> Siggen {
Siggen::new(nchannels, Box::new(Sine::new(freq)))
}
@ -221,40 +237,35 @@ impl Siggen {
/// Creates *interleaved* output signal
pub fn genSignal<T>(&mut self, out: &mut [T])
where
T: Sample + FromSample<Flt> + Debug,
Flt: Sample,
where T: Sample + FromSample<Flt> + Debug, Flt: Sample
{
let nch = self.nchannels();
let nsamples: usize = out.len() / nch;
assert!(out.len() % self.nchannels() == 0);
// Create source signal
self.source_buf.resize(nsamples, 0.);
self.source
.genSignal_unscaled(&mut self.source_buf.iter_mut());
self.source_buf.resize(nsamples, 0.0);
self.source.genSignal_unscaled(&mut self.source_buf.iter_mut());
// println!("Source signal: {:?}", self.source_buf);
// Write output while casted to the correct type
// Iterate over each channel, and counter
self.chout_buf.resize(nch, vec![]);
for (channelno, (channel, chout)) in self
.channels
for (channelno, (channel, chout)) in self.channels
.iter_mut()
.zip(self.chout_buf.iter_mut())
.enumerate()
{
chout.resize(nsamples, 0.);
.enumerate() {
chout.resize(nsamples, 0.0);
// Create output signal, overwrite chout
channel.genSignal(&self.source_buf, chout);
// println!("Channel: {}, {:?}", channelno, chout);
let out_iterator = out.iter_mut().skip(channelno).step_by(nch);
out_iterator
.zip(chout)
.for_each(|(out, chin)| *out = chin.to_sample());
out_iterator.zip(chout).for_each(|(out, chin)| {
*out = chin.to_sample();
});
}
// println!("{:?}", out);
}
@ -280,9 +291,12 @@ impl Siggen {
/// as number of channels in signal generator.
pub fn setMute(&mut self, mute: &[bool]) {
assert!(mute.len() == self.nchannels());
self.channels.iter_mut().zip(mute).for_each(|(s, m)| {
s.setMute(*m);
});
self.channels
.iter_mut()
.zip(mute)
.for_each(|(s, m)| {
s.setMute(*m);
});
}
}
@ -321,13 +335,13 @@ impl SiggenChannelConfig {
muted: false,
prefilter: None,
gain: 1.0,
DCOffset: 0.,
DCOffset: 0.0,
}
}
/// Set mute on channel. If true, only DC signal offset is outputed from (SiggenChannelConfig::transform).
pub fn setMute(&mut self, mute: bool) {
self.muted = mute
self.muted = mute;
}
/// Generate new signal data, given input source data.
///
@ -370,7 +384,7 @@ mod test {
#[test]
fn test_whitenoise() {
// This code is just to check syntax. We should really be listening to these outputs.
let mut t = [0.; 10];
let mut t = [0.0; 10];
Siggen::newWhiteNoise(1).genSignal(&mut t);
// println!("{:?}", &t);
}
@ -380,55 +394,70 @@ mod test {
// This code is just to check syntax. We should really be listening to
// these outputs.
const N: usize = 10000;
let mut s1 = [0.; N];
let mut s2 = [0.; N];
let mut siggen = Siggen::newSineWave(1, 1.);
let mut s1 = [0.0; N];
let mut s2 = [0.0; N];
let mut siggen = Siggen::newSine(1, 1.0);
siggen.reset(10.);
siggen.reset(10.0);
siggen.setAllMute(false);
siggen.genSignal(&mut s1);
siggen.genSignal(&mut s2);
let absdiff = s1.iter().zip(s2.iter()).map(|(s1, s2)| {Flt::abs(*s1-*s2)}).sum::<Flt>();
assert!(absdiff< 1e-10);
let absdiff = s1
.iter()
.zip(s2.iter())
.map(|(s1, s2)| { Flt::abs(*s1 - *s2) })
.sum::<Flt>();
assert!(absdiff < 1e-10);
}
#[test]
fn test_sine2() {
// Test if channels are properly separated etc. Check if RMS is correct
// for amplitude = 1.0.
const fs: Flt = 10.;
const fs: Flt = 10.0;
// Number of samples per channel
const Nframes: usize = 10000;
const Nch: usize = 2;
let mut signal = [0.; Nch*Nframes];
let mut siggen = Siggen::newSineWave(Nch, 1.);
let mut signal = [0.0; Nch * Nframes];
let mut siggen = Siggen::newSine(Nch, 1.0);
siggen.reset(fs);
siggen.setMute(&[false, true]);
// siggen.channels[0].DCOffset = 0.1;
// Split off in two terms, see if this works properly
siggen.genSignal(&mut signal[..Nframes/2]);
siggen.genSignal(&mut signal[Nframes/2..]);
siggen.genSignal(&mut signal[..Nframes / 2]);
siggen.genSignal(&mut signal[Nframes / 2..]);
// Mean square of the signal
let ms1 = signal.iter().step_by(2).map(|s1| {*s1 * *s1}).sum::<Flt>() / Nframes as Flt;
println!("ms1: {}",ms1);
let ms1 =
signal
.iter()
.step_by(2)
.map(|s1| { *s1 * *s1 })
.sum::<Flt>() / (Nframes as Flt);
println!("ms1: {}", ms1);
let ms2 = signal.iter().skip(1).step_by(2).map(|s1| {*s1 * *s1}).sum::<Flt>() / Nframes as Flt;
let ms2 =
signal
.iter()
.skip(1)
.step_by(2)
.map(|s1| { *s1 * *s1 })
.sum::<Flt>() / (Nframes as Flt);
assert!(Flt::abs(ms1 - 0.5) < 1e-12);
assert_eq!(ms2 , 0.);
assert_eq!(ms2, 0.0);
}
// A small test to learn a bit about sample types and conversion. This
// is the thing we want.
#[test]
fn test_sample() {
assert_eq!(0.5f32.to_sample::<i8>(), 64);
assert_eq!(1.0f32.to_sample::<i8>(), 127);
assert_eq!(-(1.0f32.to_sample::<i8>()), -127);
assert_eq!(1.0f32.to_sample::<i16>(), i16::MAX);
assert_eq!((0.5f32).to_sample::<i8>(), 64);
assert_eq!((1.0f32).to_sample::<i8>(), 127);
assert_eq!(-(1.0f32).to_sample::<i8>(), -127);
assert_eq!((1.0f32).to_sample::<i16>(), i16::MAX);
}
}