Signal generator working, even with Python bindings. startOutputStream implementend for CPAL

This commit is contained in:
Anne de Jong 2024-05-05 15:01:50 +02:00
parent 22a9c9f850
commit 158ea77c40
11 changed files with 614 additions and 271 deletions

102
src/bin/lasp_output.rs Normal file
View File

@ -0,0 +1,102 @@
use anyhow::Result;
use crossbeam::channel::{ unbounded, Receiver, TryRecvError };
use lasprs::daq::{ DaqConfig, StreamMgr, StreamStatus, StreamType };
use lasprs::siggen::Siggen;
use std::io;
use std::time::Duration;
use std::{ thread, time };
// use
/// Spawns a thread and waits for a single line, pushes it to the receiver and returns
fn stdin_channel_wait_for_return() -> Receiver<String> {
let (tx, rx) = unbounded();
thread::spawn(move || {
loop {
let mut buffer = String::new();
io::stdin().read_line(&mut buffer).unwrap();
// Do not care whether we succeed here.
let _ = tx.send(buffer);
}
});
rx
}
fn sleep(millis: u64) {
let duration = time::Duration::from_millis(millis);
thread::sleep(duration);
}
fn main() -> Result<()> {
let mut smgr = StreamMgr::new();
let stdin_channel = stdin_channel_wait_for_return();
println!("Creating signal generator...");
let mut siggen = Siggen::newSine(2, 432.0);
// Reduce all gains a bit...
siggen.setAllGains(0.1);
// Apply signal generator
smgr.setSiggen(siggen);
println!("Starting stream...");
let devs = smgr.getDeviceInfo();
for (i, dev) in devs.iter().enumerate() {
println!("No: {}, name: {}", i, dev.device_name);
}
print!("Please choose device by number [0-{}]: ", devs.len());
let dev = loop {
match stdin_channel.try_recv() {
Ok(nostr) => {
if let Ok(val) = nostr.trim().parse::<i32>() {
if (val as usize) > devs.len() - 1 {
println!(
"Invalid device number. Expected a value between 0 and {}. Please try again.",
devs.len()
);
continue;
}
break &devs[val as usize];
} else {
println!("Invalid value. Please fill in a number. ");
}
}
Err(TryRecvError::Empty) => {
continue;
}
Err(TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
thread::sleep(Duration::from_millis(100));
};
let mut cfg = DaqConfig::newFromDeviceInfo(dev);
cfg.outchannel_config[0].enabled = true;
cfg.outchannel_config[1].enabled = true;
cfg.outchannel_config[2].enabled = true;
smgr.startStream(StreamType::Output, &cfg)?;
println!("Press <enter> key to quit...");
'infy: loop {
match stdin_channel.try_recv() {
Ok(_key) => {
break 'infy;
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
sleep(100);
match smgr.getStatus(StreamType::Output) {
StreamStatus::NotRunning {} => {
println!("Stream is not running?");
break 'infy;
}
StreamStatus::Running {} => {}
StreamStatus::Error { e } => {
println!("Stream error: {}", e);
break 'infy;
}
}
}
Ok(())
}

View File

@ -28,7 +28,7 @@ fn main() -> Result<()> {
let stdin_channel = stdin_channel_wait_for_return();
println!("Creating signal generator...");
let mut siggen = Siggen::newSineWave(2, 432.);
let mut siggen = Siggen::newSine(2, 432.);
// Some things that can be done
// siggen.setDCOffset(&[0.1, 0.]);
@ -36,12 +36,13 @@ fn main() -> Result<()> {
// Reduce all gains a bit...
siggen.setAllGains(0.1);
// Apply signal generator
smgr.setSiggen(siggen)?;
println!("Starting stream...");
smgr.startDefaultOutputStream()?;
// Apply signal generator
smgr.setSiggen(siggen);
println!("Press <enter> key to quit...");
'infy: loop {
match stdin_channel.try_recv() {

View File

@ -1,18 +1,22 @@
#![allow(dead_code)]
use super::Stream;
use super::StreamMetaData;
use crate::daq::{streamdata::*, StreamApiDescr};
use crate::config::{ self, * };
use crate::daq::{ self, * };
use crate::daq::{ streamdata::*, StreamApiDescr };
use anyhow::{ bail, Result };
use cpal::traits::{ DeviceTrait, HostTrait, StreamTrait };
use cpal::SampleRate;
use cpal::SupportedStreamConfig;
use cpal::{ Device, Host, Sample, SampleFormat, SupportedBufferSize };
use crossbeam::atomic::AtomicCell;
use crossbeam::channel::{ Receiver, Sender };
use itertools::Itertools;
use num::ToPrimitive;
use reinterpret::reinterpret_slice;
use std::any;
use std::any::{ Any, TypeId };
use std::collections::btree_map::OccupiedEntry;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
@ -78,19 +82,22 @@ impl CpalApi {
}
}
pub fn getDeviceInfo(&self) -> Result<Vec<DeviceInfo>> {
let srs_1 = [
1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000,
];
let srs_1 = [1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000];
let srs_2 = [11025, 22050, 44100, 88200];
let mut srs_tot = Vec::from_iter(srs_1.iter().chain(srs_2.iter()));
srs_tot.sort();
let srs_tot = Vec::from_iter(srs_tot.iter().copied().map(|i| *i as Flt));
let srs_tot = Vec::from_iter(
srs_tot
.iter()
.copied()
.map(|i| *i as Flt)
);
// srs_tot.sort();
let mut devs = vec![];
for dev in self.host.devices()? {
'devloop: for dev in self.host.devices()? {
// println!("{:?}", dev.name());
let mut iChannelCount = 0;
let mut oChannelCount = 0;
@ -107,8 +114,8 @@ impl CpalApi {
continue;
}
sample_formats.push(icfg.sample_format());
avSampleRates.retain(|sr| *sr >= icfg.min_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr <= icfg.max_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr >= (icfg.min_sample_rate().0 as Flt));
avSampleRates.retain(|sr| *sr <= (icfg.max_sample_rate().0 as Flt));
if let SupportedBufferSize::Range { min, max } = icfg.buffer_size() {
avFramesPerBlock.retain(|i| i >= &(*min as usize));
avFramesPerBlock.retain(|i| i <= &(*max as usize));
@ -117,15 +124,15 @@ impl CpalApi {
// avFramesPerBlock.retain(|i| i >= icfg.buffer_size().)
}
}
if let Ok(ocfg) = dev.supported_input_configs() {
if let Ok(ocfg) = dev.supported_output_configs() {
for ocfg in ocfg {
let thissf = ocfg.sample_format();
if thissf.is_uint() {
continue;
}
sample_formats.push(thissf);
avSampleRates.retain(|sr| *sr >= ocfg.min_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr <= ocfg.max_sample_rate().0 as Flt);
avSampleRates.retain(|sr| *sr >= (ocfg.min_sample_rate().0 as Flt));
avSampleRates.retain(|sr| *sr <= (ocfg.max_sample_rate().0 as Flt));
if let SupportedBufferSize::Range { min, max } = ocfg.buffer_size() {
avFramesPerBlock.retain(|i| i >= &(*min as usize));
avFramesPerBlock.retain(|i| i <= &(*max as usize));
@ -138,14 +145,22 @@ impl CpalApi {
continue;
}
let dtypes: Vec<DataType> =
sample_formats.iter().dedup().map(|i| (*i).into()).collect();
let dtypes: Vec<DataType> = sample_formats
.iter()
.dedup()
.map(|i| (*i).into())
.collect();
let prefDataType = match dtypes.iter().position(|d| d == &DataType::F32) {
Some(idx) => dtypes[idx],
None => dtypes[dtypes.len() - 1],
};
let prefSampleRate = *avSampleRates.last().unwrap_or(&48000.);
let prefSampleRate = *avSampleRates.last().unwrap_or(&48000.0);
// Do not add device if it does not have any channels at all.
if iChannelCount == oChannelCount && oChannelCount == 0 {
break 'devloop;
}
devs.push(DeviceInfo {
api: StreamApiDescr::Cpal,
device_name: dev.name()?,
@ -166,7 +181,7 @@ impl CpalApi {
hasInternalOutputMonitor: false,
duplexModeForced: false,
physicalIOQty: Qty::Number,
})
});
}
Ok(devs)
@ -175,9 +190,8 @@ impl CpalApi {
// Create the error function closure, that capture the send channel on which error messages from the stream are sent
fn create_errfcn(
send_ch: Option<Sender<RawStreamData>>,
status: Arc<AtomicCell<StreamStatus>>,
status: Arc<AtomicCell<StreamStatus>>
) -> impl FnMut(cpal::StreamError) {
move |err: cpal::StreamError| {
let serr = match err {
cpal::StreamError::DeviceNotAvailable => StreamError::DeviceNotAvailable,
@ -194,10 +208,9 @@ impl CpalApi {
config: &cpal::StreamConfig,
sender: Sender<RawStreamData>,
framesPerBlock: usize,
en_inchannels: Vec<usize>,
en_inchannels: Vec<usize>
) -> impl FnMut(&[T], &cpal::InputCallbackInfo)
where
T: 'static + Sample + ToPrimitive,
where T: 'static + Sample + ToPrimitive
{
let tot_inch = config.channels as usize;
@ -233,7 +246,7 @@ impl CpalApi {
// Send over data
let msg = RawStreamData::from(enabled_ch_data.clone());
sender.send(msg).unwrap()
sender.send(msg).unwrap();
}
}
}
@ -249,7 +262,7 @@ impl CpalApi {
device: &cpal::Device,
sender: Sender<RawStreamData>,
en_inchannels: Vec<usize>,
framesPerBlock: usize,
framesPerBlock: usize
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning {}));
@ -269,9 +282,10 @@ impl CpalApi {
}),*,
_ => bail!("Unsupported sample format '{}'", sf)
}
};
}
}
let stream: cpal::Stream = build_stream!(
let stream: cpal::Stream =
build_stream!(
SampleFormat::I8 => i8,
SampleFormat::I16 => i16,
SampleFormat::I32 => i32,
@ -284,22 +298,41 @@ impl CpalApi {
config: &cpal::StreamConfig,
streamstatus: Arc<AtomicCell<StreamStatus>>,
receiver: Receiver<RawStreamData>,
framesPerBlock: usize,
ch_config: &[DaqChannel],
framesPerBlock: usize
) -> impl FnMut(&mut [T], &cpal::OutputCallbackInfo)
where
T: 'static + Sample + Debug,
where T: 'static + Sample + Debug
{
let tot_outch: usize = config.channels as usize;
// println!("Numer of channels: {:?}", tot_outch);
let mut callback_ctr: usize = 0;
let mut q = VecDeque::<T>::with_capacity(2 * tot_outch * framesPerBlock);
let number_total_out_channels: usize = config.channels as usize;
let number_enabled_out_channels = ch_config
.iter()
.filter(|ch| ch.enabled)
.count();
move |data, _info: &_| {
let nsamples_asked = data.len();
let disabled_ch = DaqChannel::default();
let disabled_repeater = std::iter::repeat(&disabled_ch);
let enabled_outch = ch_config.iter().chain(disabled_repeater);
// Vector of enabled output channells, with length of number_total_out_channels
let enabled_outch: Vec<bool> = (0..number_total_out_channels)
.zip(enabled_outch)
.map(|(_, b)| b.enabled)
.collect();
assert_eq!(enabled_outch.len(), number_total_out_channels);
let mut callback_ctr: usize = 0;
let mut q = VecDeque::<T>::with_capacity(2 * number_total_out_channels * framesPerBlock);
move |outdata, _info: &_| {
let nsamples_asked =
(outdata.len() / number_total_out_channels) * number_enabled_out_channels;
let status = streamstatus.load();
callback_ctr += 1;
let mut setToEquilibrium = || data.iter_mut().for_each(|v| *v = Sample::EQUILIBRIUM);
let mut setToEquilibrium = ||
outdata.iter_mut().for_each(|v| {
*v = Sample::EQUILIBRIUM;
});
match status {
StreamStatus::NotRunning {} | StreamStatus::Error { .. } => {
setToEquilibrium();
@ -321,16 +354,37 @@ impl CpalApi {
if q.len() >= nsamples_asked {
// All right, we have enough samples to send out! They are
// drained from the queue
data.iter_mut()
.zip(q.drain(..nsamples_asked))
.for_each(|(o, i)| *o = i);
let out_chunks = outdata.iter_mut().chunks(number_total_out_channels);
let siggen_chunks = q.drain(..nsamples_asked).chunks(number_enabled_out_channels);
for (och, ich) in out_chunks.into_iter().zip(siggen_chunks.into_iter()) {
let mut sig_frame_iter = ich.into_iter();
och.into_iter()
.zip(&enabled_outch)
.for_each(|(o, en)| (
if *en {
*o = sig_frame_iter.next().unwrap();
} else {
*o = Sample::EQUILIBRIUM;
}
));
}
// outdata
// .iter_mut()
// .zip(q.drain(..nsamples_asked))
// .for_each(|(o, i)| {
// *o = i;
// });
} else if callback_ctr <= 2 {
// For the first two blocks, we allow dat the data is not yet
// ready, without complaining on underruns
setToEquilibrium();
} else {
// Output buffer underrun
streamstatus.store(StreamStatus::Error{e:StreamError::OutputUnderrunError});
streamstatus.store(StreamStatus::Error {
e: StreamError::OutputUnderrunError,
});
setToEquilibrium();
}
}
@ -341,10 +395,9 @@ impl CpalApi {
config: &cpal::StreamConfig,
device: &cpal::Device,
receiver: Receiver<RawStreamData>,
framesPerBlock: usize,
ch_config: &[DaqChannel],
framesPerBlock: usize
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
// let tot_ch = config.channels as usize;
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning {}));
let err_cb = CpalApi::create_errfcn(None, status.clone());
@ -353,7 +406,7 @@ impl CpalApi {
match sf {
$(
$cpaltype => {
let outcallback = CpalApi::create_outcallback::<$rtype>(config, status.clone(), receiver, framesPerBlock);
let outcallback = CpalApi::create_outcallback::<$rtype>(config, status.clone(), receiver, ch_config, framesPerBlock);
device.build_output_stream(
&config,
outcallback,
@ -362,9 +415,10 @@ impl CpalApi {
}),*,
_ => bail!("Unsupported sample format '{}'", sf)
}
};
}
}
let stream: cpal::Stream = build_stream!(
let stream: cpal::Stream =
build_stream!(
SampleFormat::I8 => i8,
SampleFormat::I16 => i16,
SampleFormat::I32 => i32,
@ -380,10 +434,9 @@ impl CpalApi {
devinfo: &DeviceInfo,
conf: &DaqConfig,
_dev: &cpal::Device,
conf_iterator: T,
conf_iterator: T
) -> Result<cpal::SupportedStreamConfig>
where
T: Iterator<Item = cpal::SupportedStreamConfigRange>,
where T: Iterator<Item = cpal::SupportedStreamConfigRange>
{
let nchannels = match st {
StreamType::Input => devinfo.iChannelCount,
@ -393,10 +446,11 @@ impl CpalApi {
for cpalconf in conf_iterator {
if cpalconf.sample_format() == conf.dtype.into() {
// Specified sample format is available
if cpalconf.channels() == nchannels as u16 {
if cpalconf.channels() == (nchannels as u16) {
let requested_sr = conf.sampleRate(devinfo);
if cpalconf.min_sample_rate().0 as Flt <= requested_sr
&& cpalconf.max_sample_rate().0 as Flt >= requested_sr
if
(cpalconf.min_sample_rate().0 as Flt) <= requested_sr &&
(cpalconf.max_sample_rate().0 as Flt) >= requested_sr
{
// Sample rate falls within range.
let requested_fpb = conf.framesPerBlock(devinfo) as u32;
@ -409,7 +463,7 @@ impl CpalApi {
min,
max,
requested_fpb
)
);
}
}
_ => {}
@ -428,27 +482,29 @@ impl CpalApi {
stype: StreamType,
devinfo: &DeviceInfo,
conf: &DaqConfig,
sender: Sender<RawStreamData>,
sender: Sender<RawStreamData>
) -> Result<Box<dyn Stream>> {
for cpaldev in self.host.devices()? {
// See if we can create a supported stream config.
let supported_config = match stype {
let supported_config = (match stype {
StreamType::Duplex => bail!("Duplex stream not supported for CPAL"),
StreamType::Input => CpalApi::create_cpal_config(
StreamType::Input =>
CpalApi::create_cpal_config(
stype,
devinfo,
conf,
&cpaldev,
cpaldev.supported_input_configs()?,
cpaldev.supported_input_configs()?
),
StreamType::Output => CpalApi::create_cpal_config(
StreamType::Output =>
CpalApi::create_cpal_config(
stype,
devinfo,
conf,
&cpaldev,
cpaldev.supported_output_configs()?,
cpaldev.supported_output_configs()?
),
}?;
})?;
let framesPerBlock = conf.framesPerBlock(devinfo);
let sf = supported_config.sample_format();
@ -458,7 +514,7 @@ impl CpalApi {
&conf.enabledInchannelConfig(),
conf.dtype,
supported_config.sample_rate().0 as Flt,
framesPerBlock,
framesPerBlock
)?;
let meta = Arc::new(meta);
@ -468,23 +524,27 @@ impl CpalApi {
&cpaldev,
sender,
conf.enabledInchannelsList(),
framesPerBlock,
framesPerBlock
)?;
stream.play()?;
status.store(StreamStatus::Running {});
return Ok(Box::new(CpalStream {
return Ok(
Box::new(CpalStream {
stream,
md: meta,
noutchannels: 0,
status,
}));
})
);
}
bail!(format!(
bail!(
format!(
"Error: requested device {} not found. Please make sure the device is available.",
devinfo.device_name
))
)
)
}
/// Start a default input stream.
@ -492,7 +552,7 @@ impl CpalApi {
///
pub fn startDefaultInputStream(
&mut self,
sender: Sender<RawStreamData>,
sender: Sender<RawStreamData>
) -> Result<Box<dyn Stream>> {
if let Some(device) = self.host.default_input_device() {
if let Ok(config) = device.default_input_config() {
@ -511,15 +571,16 @@ impl CpalApi {
&device,
sender,
en_inchannels,
framesPerBlock,
framesPerBlock
)?;
stream.play()?;
status.store(StreamStatus::Running {});
// Daq: default channel config
let daqchannels = Vec::from_iter(
(0..final_config.channels)
.map(|i| DaqChannel::defaultAudio(format!("Unnamed input channel {}", i))),
(0..final_config.channels).map(|i|
DaqChannel::defaultAudio(format!("Unnamed input channel {}", i))
)
);
// Specify data tape
@ -530,15 +591,17 @@ impl CpalApi {
&daqchannels,
dtype,
config.sample_rate().0 as Flt,
framesPerBlock,
framesPerBlock
)?;
let md = Arc::new(md);
Ok(Box::new(CpalStream {
Ok(
Box::new(CpalStream {
stream,
md,
noutchannels: 0,
status,
}))
})
)
} else {
bail!("Could not obtain default input configuration")
}
@ -547,41 +610,47 @@ impl CpalApi {
}
}
pub fn startDefaultOutputStream(
&self,
receiver: Receiver<RawStreamData>,
) -> Result<Box<dyn Stream>> {
if let Some(device) = self.host.default_output_device() {
if let Ok(config) = device.default_output_config() {
fn getDefaultOutputConfig(&self) -> Result<(Device, cpal::StreamConfig, SampleFormat, usize)> {
if let Some(dev) = self.host.default_output_device() {
let cfg = dev.default_output_config()?;
// let framesPerBlock: usize = 256;
// let framesPerBlock: usize = 8192;
let framesPerBlock: usize = config.sample_rate().0 as usize;
let framesPerBlock: usize = cfg.sample_rate().0 as usize;
// let framesPerBlock: usize = 256;
let final_config = cpal::StreamConfig {
channels: config.channels(),
sample_rate: config.sample_rate(),
channels: cfg.channels(),
sample_rate: cfg.sample_rate(),
buffer_size: cpal::BufferSize::Fixed(framesPerBlock as u32),
};
// let en_outchannels = Vec::from_iter((0..config.channels()).map(|i| i as usize));
return Ok((dev, final_config, cfg.sample_format(), framesPerBlock));
}
bail!("Could not find default output device!");
}
let sampleformat = config.sample_format();
pub fn startDefaultOutputStream(
&self,
receiver: Receiver<RawStreamData>
) -> Result<Box<dyn Stream>> {
let (device, config, sampleformat, framesPerBlock) = self.getDefaultOutputConfig()?;
// Daq: default channel config
let daqchannels = Vec::from_iter(
(0..config.channels).map(|i|
DaqChannel::defaultAudio(format!("Unnamed output channel {}", i))
)
);
let (stream, status) = CpalApi::build_output_stream(
sampleformat,
&final_config,
&config,
&device,
receiver,
framesPerBlock,
&daqchannels,
framesPerBlock
)?;
stream.play()?;
status.store(StreamStatus::Running {});
// Daq: default channel config
let daqchannels =
Vec::from_iter((0..final_config.channels).map(|i| {
DaqChannel::defaultAudio(format!("Unnamed output channel {}", i))
}));
// // Specify data tape
let dtype = DataType::from(sampleformat);
@ -589,8 +658,8 @@ impl CpalApi {
let md = StreamMetaData::new(
&daqchannels,
dtype,
config.sample_rate().0 as Flt,
framesPerBlock,
config.sample_rate.0 as Flt,
framesPerBlock
)?;
let md = Arc::new(md);
let str = Box::new(CpalStream {
@ -600,17 +669,88 @@ impl CpalApi {
status,
});
Ok(str)
} else {
bail!("Could not obtain default output configuration")
} // Default output config is OK
} else {
bail!("Could not open output device")
} // Could not
}
// Create an output stream, using given signal generators for each channel.
// }
pub fn startOutputStream(&self, _rx: Receiver<RawStreamData>) -> Result<Box<dyn Stream>> {
bail!("Not implemented");
fn getCPALOutputConfig(
&self,
dev: &DeviceInfo,
daqconfig: &DaqConfig
) -> Result<(Device, cpal::StreamConfig, SampleFormat, usize)> {
let samplerate = dev.avSampleRates[daqconfig.sampleRateIndex] as u32;
let framesPerBlock = dev.avFramesPerBlock[daqconfig.framesPerBlockIndex];
let highest_ch: Result<usize, anyhow::Error> = daqconfig
.highestEnabledOutChannel()
.ok_or_else(|| anyhow::anyhow!("No output channels enabled."));
let highest_ch = highest_ch? as u16;
for cpaldev in self.host.devices()? {
if cpaldev.name()? == dev.device_name {
// Check, device name matches required device name
for cpalcfg in cpaldev.supported_output_configs()? {
let sf = cpalcfg.sample_format();
if sf == daqconfig.dtype.into() {
let max_sr = cpalcfg.max_sample_rate().0;
let min_sr = cpalcfg.min_sample_rate().0;
if samplerate <= max_sr && samplerate >= min_sr {
let cfg = cpalcfg.with_sample_rate(SampleRate(samplerate as u32));
let mut cfg = cfg.config();
cfg.channels = highest_ch + 1;
// Overwrite buffer size to required buffer size
cfg.buffer_size = cpal::BufferSize::Fixed(framesPerBlock as u32);
// Return tuple of device, config, sample format and
// frames per block
return Ok((cpaldev, cfg, sf, framesPerBlock));
}
}
}
}
}
bail!("Could not find device with name '{}'", dev.device_name)
}
pub fn startOutputStream(
&self,
dev: &DeviceInfo,
cfg: &DaqConfig,
receiver: Receiver<RawStreamData>
) -> Result<Box<dyn Stream>> {
let (device, cpalconfig, sampleformat, framesPerBlock) = self.getCPALOutputConfig(
dev,
cfg
)?;
let (stream, status) = Self::build_output_stream(
sampleformat,
&cpalconfig,
&device,
receiver,
&cfg.outchannel_config,
framesPerBlock
)?;
stream.play()?;
status.store(StreamStatus::Running {});
// // Specify data tape
let dtype = DataType::from(sampleformat);
let md = StreamMetaData::new(
&cfg.enabledOutchannelConfig(),
dtype,
cpalconfig.sample_rate.0 as Flt,
framesPerBlock
)?;
let md = Arc::new(md);
let str = Box::new(CpalStream {
stream,
md,
noutchannels: cpalconfig.channels as usize,
status,
});
Ok(str)
}
}

View File

@ -33,7 +33,7 @@ pub trait Stream {
/// Stream API descriptor: type and corresponding text
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(strum_macros::EnumMessage, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(strum_macros::EnumMessage, Debug, Clone, PartialEq, Serialize, Deserialize, strum_macros::Display)]
#[allow(dead_code)]
pub enum StreamApiDescr {
/// CPAL api

View File

@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
/// DAQ Configuration for a single channel
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[cfg_attr(feature = "python-bindings", pyclass(get_all, set_all))]
pub struct DaqChannel {
/// Whether the channel is enabled
pub enabled: bool,
@ -53,6 +54,7 @@ impl DaqChannel {
/// Configuration of a device.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(feature = "python-bindings", pyclass(get_all, set_all))]
pub struct DaqConfig {
/// The API
pub api: StreamApiDescr,
@ -65,6 +67,7 @@ pub struct DaqConfig {
/// Configuration of the output channels
pub outchannel_config: Vec<DaqChannel>,
/// The data type to use
pub dtype: DataType,
@ -72,15 +75,28 @@ pub struct DaqConfig {
pub digitalHighPassCutOn: Flt,
/// The index to use in the list of possible sample rates
sampleRateIndex: usize,
pub sampleRateIndex: usize,
/// The index to use in the list of possible frames per block
framesPerBlockIndex: usize,
pub framesPerBlockIndex: usize,
/// Used when output channels should be monitored, i.e. reverse-looped back as input channels.
monitorOutput: bool,
pub monitorOutput: bool,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl DaqConfig {
#[pyo3(name = "newFromDeviceInfo")]
#[staticmethod]
fn newFromDeviceInfo_py(d: &DeviceInfo) -> PyResult<DaqConfig> {
Ok(DaqConfig::newFromDeviceInfo(d))
}
fn __repr__(&self) -> String {
format!("{:#?}", self)
}
}
impl DaqConfig {
/// Creates a new default device configuration for a given device as specified with
/// the DeviceInfo descriptor.
@ -216,4 +232,24 @@ impl DaqConfig {
.cloned()
.collect()
}
/// Returns the channel number of the highest enabled input channel, if any.
pub fn highestEnabledInChannel(&self) -> Option<usize> {
let mut highest = None;
self.inchannel_config.iter().enumerate().for_each(|(i,c)| if c.enabled {highest = Some(i);});
highest
}
/// Returns the channel number of the highest enabled output channel, if any.
pub fn highestEnabledOutChannel(&self) -> Option<usize> {
let mut highest = None;
self.outchannel_config.iter().enumerate().for_each(|(i,c)| if c.enabled {highest = Some(i);});
println!("{:?}", highest);
highest
}
}

View File

@ -60,6 +60,8 @@ pub fn add_py_classses(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<StreamError>()?;
m.add_class::<StreamStatus>()?;
m.add_class::<StreamError>()?;
m.add_class::<DaqChannel>()?;
m.add_class::<DaqConfig>()?;
Ok(())
}

View File

@ -275,7 +275,7 @@ mod test {
const Nframes: usize = 20;
const Nch: usize = 2;
let mut signal = [0.; Nch*Nframes];
let mut siggen = Siggen::newSineWave(Nch, 1.);
let mut siggen = Siggen::newSine(Nch, 1.);
siggen.reset(fs);
siggen.setMute(&[false, true]);

View File

@ -1,11 +1,12 @@
//! Data acquisition model. Provides abstract layers around DAQ devices.
use super::*;
use super::config::*;
use super::*;
use crate::{
config::*,
siggen::{self, Siggen},
};
use anyhow::{bail, Error, Result};
use api::StreamApiDescr;
use array_init::from_iter;
use core::time;
use cpal::Sample;
@ -15,11 +16,9 @@ use crossbeam::{
};
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread};
use streamdata::*;
use streamcmd::StreamCommand;
use streamdata::*;
use streammsg::*;
use api::StreamApiDescr;
#[cfg(feature = "cpal-api")]
use super::api::{api_cpal::CpalApi, Stream};
@ -85,6 +84,14 @@ impl StreamMgr {
fn startDefaultOutputStream_py(&mut self) -> PyResult<()> {
Ok(self.startDefaultOutputStream()?)
}
#[pyo3(name = "startStream")]
fn startStream_py(&mut self, st: StreamType, d: &DaqConfig) -> PyResult<()> {
Ok(self.startStream(st, d)?)
}
#[pyo3(name = "stopStream")]
fn stopStream_py(&mut self, st: StreamType) -> PyResult<()> {
Ok(self.stopStream(st)?)
}
#[pyo3(name = "getDeviceInfo")]
fn getDeviceInfo_py(&mut self) -> PyResult<Vec<DeviceInfo>> {
Ok(self.getDeviceInfo())
@ -93,7 +100,10 @@ impl StreamMgr {
fn getStatus_py(&self, st: StreamType) -> StreamStatus {
self.getStatus(st)
}
#[pyo3(name = "setSiggen")]
fn setSiggen_py(&mut self, siggen: Siggen) {
self.setSiggen(siggen)
}
}
impl Default for StreamMgr {
fn default() -> Self {
@ -150,29 +160,19 @@ impl StreamMgr {
/// Set a new signal generator. Returns an error if it is unapplicable.
/// It is unapplicable if the number of channels of output does not match the
/// number of output channels in a running stream.
pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> {
pub fn setSiggen(&mut self, siggen: Siggen) {
// Current signal generator. Where to place it?
if let Some(istream) = &self.input_stream {
if let StreamType::Duplex = istream.streamtype {
if siggen.nchannels() != istream.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
assert!(self.siggen.is_none());
istream.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
}
} else if let Some(os) = &self.output_stream {
assert!(self.siggen.is_none());
if siggen.nchannels() != os.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
} else {
self.siggen = Some(siggen);
return Ok(());
}
unreachable!()
}
/// Obtain a list of devices that are available for each available API
@ -234,7 +234,10 @@ impl StreamMgr {
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
sendMsgToAllQueuesRemoveUnused(&mut iqueues, InStreamMsg::StreamStopped);
sendMsgToAllQueuesRemoveUnused(
&mut iqueues,
InStreamMsg::StreamStopped,
);
break 'infy;
}
StreamCommand::NewSiggen(_) => {
@ -259,8 +262,15 @@ impl StreamMgr {
}
// Match device info struct on given daq config.
fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> {
self.devs.iter().find(|&d| d.device_name == cfg.device_name)
fn find_device(&self, cfg: &DaqConfig) -> Result<&DeviceInfo> {
if let Some(matching_dev) = self
.devs
.iter()
.find(|&d| d.device_name == cfg.device_name && d.api == cfg.api)
{
return Ok(matching_dev);
}
bail!("Could not find device with name {}.", cfg.device_name);
}
fn startOuputStreamThread(
&mut self,
@ -359,27 +369,41 @@ impl StreamMgr {
self.startInputOrDuplexStream(stype, cfg)?;
}
StreamType::Output => {
// self.startOutputStream(cfg)?;
bail!("No output stream defined yet");
self.startOutputStream(cfg)?;
}
}
Ok(())
}
// fn startOutputStream(&mut self, cfg: &DaqConfig) -> Result<()> {
// let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
// let stream = match cfg.api {
// StreamApiDescr::Cpal => {
// let devinfo = self
// .match_devinfo(cfg)
// .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
// self.cpal_api.startOutputStream(devinfo, cfg, tx)?
// }
// _ => bail!("Unimplemented api!"),
// };
/// Start a stream for output only, using only the output channel
/// configuration as given in the `cfg`.
fn startOutputStream(&mut self, cfg: &DaqConfig) -> Result<()> {
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
let stream = match cfg.api {
StreamApiDescr::Cpal => {
let devinfo = self.find_device(cfg)?;
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
self.cpal_api.startOutputStream(devinfo, cfg, rx)?
} else {
bail!("API {} not available", cfg.api)
}
}
}
_ => bail!("API {} not implemented!", cfg.api),
};
let meta = stream.metadata();
let (threadhandle, commtx) = self.startOuputStreamThread(meta, tx);
// Ok(())
// }
self.output_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
// Start an input or duplex stream
fn startInputOrDuplexStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
@ -404,12 +428,16 @@ impl StreamMgr {
if stype == StreamType::Duplex {
bail!("Duplex mode not supported for CPAL api");
}
let devinfo = self
.match_devinfo(cfg)
.ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
let devinfo = self.find_device(cfg)?;
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
self.cpal_api.startInputStream(stype, devinfo, cfg, tx)?
} else {
bail!("API {} not available", cfg.api)
}
_ => bail!("Unimplemented api!"),
}
}
_ => bail!("API {} not implemented!", cfg.api),
};
// Input queues should be available, otherwise panic bug.

View File

@ -21,6 +21,7 @@ pub struct Biquad {
a1: Flt,
a2: Flt,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl Biquad {
#[new]

View File

@ -34,6 +34,7 @@ fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<filter::Biquad>()?;
m.add_class::<filter::SeriesBiquad>()?;
m.add_class::<filter::BiquadBank>()?;
m.add_class::<siggen::Siggen>()?;
daq::add_py_classses(m)?;

View File

@ -27,7 +27,7 @@ use rand::prelude::*;
use rand::rngs::ThreadRng;
use rand_distr::StandardNormal;
const twopi: Flt = 2. * pi;
const twopi: Flt = 2.0 * pi;
/// Source for the signal generator. Implementations are sine waves, sweeps, noise.
pub trait Source: Send {
@ -49,7 +49,9 @@ struct Silence {}
impl Source for Silence {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
sig.for_each(|s| *s = 0.0);
sig.for_each(|s| {
*s = 0.0;
});
}
fn reset(&mut self, _fs: Flt) {}
fn clone_dyn(&self) -> Box<dyn Source> {
@ -68,7 +70,9 @@ impl WhiteNoise {
}
impl Source for WhiteNoise {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
sig.for_each(|s| *s = thread_rng().sample(StandardNormal));
sig.for_each(|s| {
*s = thread_rng().sample(StandardNormal);
});
}
fn reset(&mut self, _fs: Flt) {}
fn clone_dyn(&self) -> Box<dyn Source> {
@ -95,17 +99,17 @@ impl Sine {
/// *
fn new(freq: Flt) -> Sine {
Sine {
fs: -1.,
phase: 0.,
omg: 2. * pi * freq,
fs: -1.0,
phase: 0.0,
omg: 2.0 * pi * freq,
}
}
}
impl Source for Sine {
fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
if self.fs <= 0. {
if self.fs <= 0.0 {
sig.for_each(|s| {
*s = 0.;
*s = 0.0;
});
return;
}
@ -117,7 +121,7 @@ impl Source for Sine {
}
fn reset(&mut self, fs: Flt) {
self.fs = fs;
self.phase = 0.;
self.phase = 0.0;
}
fn clone_dyn(&self) -> Box<dyn Source> {
Box::new(self.clone())
@ -131,6 +135,7 @@ impl Source for Sine {
/// * (Siggen::newSine)
///
#[derive(Clone)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct Siggen {
// The source dynamic signal. Noise, a sine wave, sweep, etc
source: Box<dyn Source>,
@ -143,10 +148,22 @@ pub struct Siggen {
// Output buffers (for filtered source signal)
chout_buf: Vec<Vec<Flt>>,
}
#[cfg_attr(feature = "python-bindings", pymethods)]
impl Siggen {
#[pyo3(name = "newWhiteNoise")]
#[staticmethod]
fn newWhiteNoise_py() -> Siggen {
Siggen::newWhiteNoise(0)
}
#[pyo3(name = "newSine")]
#[staticmethod]
fn newSine_py(freq: Flt) -> Siggen {
Siggen::newSine(0, freq)
}
}
/// Multiple channel signal generator. Can use a single source (coherent) to provide multiple signals
/// that can be sent out through different EQ's
/// A struct that implements the Siggen trait is able to generate a signal.
impl Siggen {
/// Returns the number of channels this signal generator is generating for.
pub fn nchannels(&self) -> usize {
@ -193,16 +210,18 @@ impl Siggen {
/// Set the DC offset for all channels
pub fn setDCOffset(&mut self, dc: &[Flt]) {
self.channels.iter_mut().zip(dc).for_each(
|(ch, dc)| {ch.DCOffset = *dc;});
self.channels
.iter_mut()
.zip(dc)
.for_each(|(ch, dc)| {
ch.DCOffset = *dc;
});
}
/// Create a sine wave signal generator
///
/// * freq: Frequency of the sine wave in \[Hz\]
pub fn newSineWave(nchannels: usize, freq: Flt) -> Siggen {
pub fn newSine(nchannels: usize, freq: Flt) -> Siggen {
Siggen::new(nchannels, Box::new(Sine::new(freq)))
}
@ -218,40 +237,35 @@ impl Siggen {
/// Creates *interleaved* output signal
pub fn genSignal<T>(&mut self, out: &mut [T])
where
T: Sample + FromSample<Flt> + Debug,
Flt: Sample,
where T: Sample + FromSample<Flt> + Debug, Flt: Sample
{
let nch = self.nchannels();
let nsamples: usize = out.len() / nch;
assert!(out.len() % self.nchannels() == 0);
// Create source signal
self.source_buf.resize(nsamples, 0.);
self.source
.genSignal_unscaled(&mut self.source_buf.iter_mut());
self.source_buf.resize(nsamples, 0.0);
self.source.genSignal_unscaled(&mut self.source_buf.iter_mut());
// println!("Source signal: {:?}", self.source_buf);
// Write output while casted to the correct type
// Iterate over each channel, and counter
self.chout_buf.resize(nch, vec![]);
for (channelno, (channel, chout)) in self
.channels
for (channelno, (channel, chout)) in self.channels
.iter_mut()
.zip(self.chout_buf.iter_mut())
.enumerate()
{
chout.resize(nsamples, 0.);
.enumerate() {
chout.resize(nsamples, 0.0);
// Create output signal, overwrite chout
channel.genSignal(&self.source_buf, chout);
// println!("Channel: {}, {:?}", channelno, chout);
let out_iterator = out.iter_mut().skip(channelno).step_by(nch);
out_iterator
.zip(chout)
.for_each(|(out, chin)| *out = chin.to_sample());
out_iterator.zip(chout).for_each(|(out, chin)| {
*out = chin.to_sample();
});
}
// println!("{:?}", out);
}
@ -277,7 +291,10 @@ impl Siggen {
/// as number of channels in signal generator.
pub fn setMute(&mut self, mute: &[bool]) {
assert!(mute.len() == self.nchannels());
self.channels.iter_mut().zip(mute).for_each(|(s, m)| {
self.channels
.iter_mut()
.zip(mute)
.for_each(|(s, m)| {
s.setMute(*m);
});
}
@ -318,13 +335,13 @@ impl SiggenChannelConfig {
muted: false,
prefilter: None,
gain: 1.0,
DCOffset: 0.,
DCOffset: 0.0,
}
}
/// Set mute on channel. If true, only DC signal offset is outputed from (SiggenChannelConfig::transform).
pub fn setMute(&mut self, mute: bool) {
self.muted = mute
self.muted = mute;
}
/// Generate new signal data, given input source data.
///
@ -367,7 +384,7 @@ mod test {
#[test]
fn test_whitenoise() {
// This code is just to check syntax. We should really be listening to these outputs.
let mut t = [0.; 10];
let mut t = [0.0; 10];
Siggen::newWhiteNoise(1).genSignal(&mut t);
// println!("{:?}", &t);
}
@ -377,16 +394,20 @@ mod test {
// This code is just to check syntax. We should really be listening to
// these outputs.
const N: usize = 10000;
let mut s1 = [0.; N];
let mut s2 = [0.; N];
let mut siggen = Siggen::newSineWave(1, 1.);
let mut s1 = [0.0; N];
let mut s2 = [0.0; N];
let mut siggen = Siggen::newSine(1, 1.0);
siggen.reset(10.);
siggen.reset(10.0);
siggen.setAllMute(false);
siggen.genSignal(&mut s1);
siggen.genSignal(&mut s2);
let absdiff = s1.iter().zip(s2.iter()).map(|(s1, s2)| {Flt::abs(*s1-*s2)}).sum::<Flt>();
let absdiff = s1
.iter()
.zip(s2.iter())
.map(|(s1, s2)| { Flt::abs(*s1 - *s2) })
.sum::<Flt>();
assert!(absdiff < 1e-10);
}
@ -394,12 +415,12 @@ mod test {
fn test_sine2() {
// Test if channels are properly separated etc. Check if RMS is correct
// for amplitude = 1.0.
const fs: Flt = 10.;
const fs: Flt = 10.0;
// Number of samples per channel
const Nframes: usize = 10000;
const Nch: usize = 2;
let mut signal = [0.; Nch*Nframes];
let mut siggen = Siggen::newSineWave(Nch, 1.);
let mut signal = [0.0; Nch * Nframes];
let mut siggen = Siggen::newSine(Nch, 1.0);
siggen.reset(fs);
siggen.setMute(&[false, true]);
@ -410,22 +431,33 @@ mod test {
siggen.genSignal(&mut signal[Nframes / 2..]);
// Mean square of the signal
let ms1 = signal.iter().step_by(2).map(|s1| {*s1 * *s1}).sum::<Flt>() / Nframes as Flt;
let ms1 =
signal
.iter()
.step_by(2)
.map(|s1| { *s1 * *s1 })
.sum::<Flt>() / (Nframes as Flt);
println!("ms1: {}", ms1);
let ms2 = signal.iter().skip(1).step_by(2).map(|s1| {*s1 * *s1}).sum::<Flt>() / Nframes as Flt;
let ms2 =
signal
.iter()
.skip(1)
.step_by(2)
.map(|s1| { *s1 * *s1 })
.sum::<Flt>() / (Nframes as Flt);
assert!(Flt::abs(ms1 - 0.5) < 1e-12);
assert_eq!(ms2 , 0.);
assert_eq!(ms2, 0.0);
}
// A small test to learn a bit about sample types and conversion. This
// is the thing we want.
#[test]
fn test_sample() {
assert_eq!(0.5f32.to_sample::<i8>(), 64);
assert_eq!(1.0f32.to_sample::<i8>(), 127);
assert_eq!(-(1.0f32.to_sample::<i8>()), -127);
assert_eq!(1.0f32.to_sample::<i16>(), i16::MAX);
assert_eq!((0.5f32).to_sample::<i8>(), 64);
assert_eq!((1.0f32).to_sample::<i8>(), 127);
assert_eq!(-(1.0f32).to_sample::<i8>(), -127);
assert_eq!((1.0f32).to_sample::<i16>(), i16::MAX);
}
}