lasprs/src/daq/streammgr.rs

601 lines
21 KiB
Rust

//! Data acquisition model. Provides abstract layers around DAQ devices.
use super::api::*;
use super::*;
use crate::{
config::*,
siggen::{self, Siggen},
};
use anyhow::{bail, Error, Result};
use array_init::from_iter;
use core::time;
use cpal::Sample;
use crossbeam::{
channel::{unbounded, Receiver, Sender, TrySendError},
thread,
};
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread};
use streamcmd::StreamCommand;
use streamdata::*;
use streammsg::*;
#[cfg(feature = "cpal-api")]
use super::api::api_cpal::CpalApi;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, types::PyModule, PyResult};
} else {} }
/// Store a queue in a shared pointer, to share sending
/// and receiving part of the queue.
pub type SharedInQueue = Sender<InStreamMsg>;
/// Vector of queues for stream messages
pub type InQueues = Vec<SharedInQueue>;
struct StreamInfo<T> {
streamtype: StreamType,
stream: Box<dyn Stream>,
threadhandle: JoinHandle<T>,
comm: Sender<StreamCommand>,
}
/// Keep track of whether the stream has been created. To ensure singleton behaviour.
static smgr_created: AtomicBool = AtomicBool::new(false);
#[cfg_attr(feature = "python-bindings", pyclass(unsendable))]
/// Configure and manage input / output streams.
///
pub struct StreamMgr {
// List of available devices
devs: Vec<DeviceInfo>,
// Input stream can be both input and duplex
input_stream: Option<StreamInfo<InQueues>>,
// Output only stream
output_stream: Option<StreamInfo<Siggen>>,
#[cfg(feature = "cpal-api")]
cpal_api: CpalApi,
/// The storage of queues. When no streams are running, they
/// are here. When stream is running, they will become available
/// in the JoinHandle of the thread.
instreamqueues: Option<InQueues>,
// Signal generator. Stored here on the bench in case no stream is running.
// It is picked when it is configured correctly for the starting output stream
// If it is not configured correctly, when a stream that outputs data is started
// ,it is removed here.
siggen: Option<crate::siggen::Siggen>,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl StreamMgr {
#[new]
/// See (StreamMgr::new())
fn new_py<'py>() -> StreamMgr {
StreamMgr::new()
}
// #[pyo3(name = "unit")]
// #[staticmethod]
// /// See: [Biquad::unit()]
// pub fn unit_py() -> Biquad {
// Biquad::unit()
// }
// #[pyo3(name = "firstOrderHighPass")]
}
impl Default for StreamMgr {
fn default() -> Self {
Self::new()
}
}
impl StreamMgr {
/// Create new stream manager. A stream manager is supposed to be a singleton.
///
/// # Panics
///
/// When a StreamMgr object is already alive.
pub fn new() -> StreamMgr {
if smgr_created.load(std::sync::atomic::Ordering::Relaxed) {
panic!("BUG: Only one stream manager is supposed to be a singleton");
}
smgr_created.store(true, std::sync::atomic::Ordering::Relaxed);
let mut smgr = StreamMgr {
devs: vec![],
input_stream: None,
output_stream: None,
siggen: None,
#[cfg(feature = "cpal-api")]
cpal_api: CpalApi::new(),
instreamqueues: Some(vec![]),
};
smgr.devs = smgr.scanDeviceInfo();
smgr
}
/// Get stream status for given stream type.
pub fn getStatus(&self, t: StreamType) -> StreamStatus {
match t {
StreamType::Input | StreamType::Duplex => {
if let Some(s) = &self.input_stream {
s.stream.status()
} else {
StreamStatus::NotRunning
}
}
StreamType::Output => {
if let Some(s) = &self.output_stream {
s.stream.status()
} else {
StreamStatus::NotRunning
}
}
}
}
/// Set a new signal generator. Returns an error if it is unapplicable.
/// It is unapplicable if the number of channels of output does not match the
/// number of output channels in a running stream.
pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> {
// Current signal generator. Where to place it?
if let Some(istream) = &self.input_stream {
if let StreamType::Duplex = istream.streamtype {
if siggen.nchannels() != istream.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
assert!(self.siggen.is_none());
istream.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
}
} else if let Some(os) = &self.output_stream {
assert!(self.siggen.is_none());
if siggen.nchannels() != os.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
} else {
self.siggen = Some(siggen);
return Ok(());
}
unreachable!()
}
/// Obtain a list of devices that are available for each available API
pub fn getDeviceInfo(&mut self) -> &Vec<DeviceInfo> {
&self.devs
}
fn scanDeviceInfo(&self) -> Vec<DeviceInfo> {
let mut devinfo = vec![];
#[cfg(feature = "cpal-api")]
{
let cpal_devs = self.cpal_api.getDeviceInfo();
if let Ok(devs) = cpal_devs {
devinfo.extend(devs);
}
}
devinfo
}
/// Add a new queue to the lists of queues. On the queue, input data is
/// added.
///
/// If the stream is unable to write data on the queue (which might
/// happen when the handler is dropped), the queue is removed from the list
/// of queues that get data from the stream.
pub fn addInQueue(&mut self, tx: Sender<InStreamMsg>) {
if let Some(is) = &self.input_stream {
is.comm.send(StreamCommand::AddInQueue(tx)).unwrap()
} else {
self.instreamqueues.as_mut().unwrap().push(tx);
}
}
fn startInputStreamThread(
&mut self,
meta: Arc<StreamMetaData>,
rx: Receiver<RawStreamData>,
) -> (JoinHandle<InQueues>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded();
// Unwrap here, as the queues should be free to grab
let mut iqueues = self
.instreamqueues
.take()
.expect("No input streams queues!");
let threadhandle = std::thread::spawn(move || {
let mut ctr: usize = 0;
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
// New queue added
StreamCommand::AddInQueue(queue) => {
match queue.send(InStreamMsg::StreamStarted(meta.clone())) {
Ok(()) => iqueues.push(queue),
Err(_) => {}
}
}
// Remove queue from list
StreamCommand::RemoveInQueue(queue) => {
iqueues.retain(|q| !q.same_channel(&queue))
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped);
break 'infy;
}
StreamCommand::NewSiggen(_) => {
panic!("Error: signal generator send to input-only stream.");
}
}
}
if let Ok(raw) = rx.recv_timeout(time::Duration::from_millis(10)) {
// println!("Obtained raw stream data!");
let streamdata = StreamData::new(ctr, meta.clone(), raw);
let streamdata = Arc::new(streamdata);
let msg = InStreamMsg::StreamData(streamdata);
sendMsgToAllQueues(&mut iqueues, msg);
ctr += 1;
}
}
iqueues
});
(threadhandle, commtx)
}
// Match device info struct on given daq config.
fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> {
self.devs.iter().find(|&d| d.device_name == cfg.device_name)
}
fn startOuputStreamThread(
&mut self,
meta: Arc<StreamMetaData>,
tx: Sender<RawStreamData>,
) -> (JoinHandle<Siggen>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded();
// Number of channels to output for
let nchannels = meta.nchannels();
// Obtain signal generator. Set to silence when no signal generator is
// installed.
let mut siggen = self
.siggen
.take()
.unwrap_or_else(|| Siggen::newSilence(nchannels));
if siggen.nchannels() != nchannels {
// Updating number of channels
siggen.setNChannels(nchannels);
}
siggen.reset(meta.samplerate);
let threadhandle = std::thread::spawn(move || {
let mut floatbuf: Vec<Flt> = Vec::with_capacity(nchannels * meta.framesPerBlock);
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
// New queue added
StreamCommand::AddInQueue(_) => {
panic!("Invalid message send to output thread: AddInQueue");
}
// Remove queue from list
StreamCommand::RemoveInQueue(_) => {
panic!("Invalid message send to output thread: RemoveInQueue");
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
break 'infy;
}
StreamCommand::NewSiggen(new_siggen) => {
// println!("NEW SIGNAL GENERATOR ARRIVED!");
siggen = new_siggen;
siggen.reset(meta.samplerate);
if siggen.nchannels() != nchannels {
// println!("Updating channels");
siggen.setNChannels(nchannels);
}
}
}
}
while tx.len() < 2 {
unsafe {
floatbuf.set_len(nchannels * meta.framesPerBlock);
}
// Obtain signal
siggen.genSignal(&mut floatbuf);
// println!("level: {}", floatbuf.iter().sum::<Flt>());
let msg = match meta.rawDatatype {
DataType::I8 => {
let v = Vec::<i8>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai8(v)
}
DataType::I16 => {
let v = Vec::<i16>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai16(v)
}
DataType::I32 => {
let v = Vec::<i32>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Datai32(v)
}
DataType::F32 => {
let v = Vec::<f32>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Dataf32(v)
}
DataType::F64 => {
let v = Vec::<f64>::from_iter(floatbuf.iter().map(|f| f.to_sample()));
RawStreamData::Dataf64(v)
}
};
if let Err(_e) = tx.send(msg) {
// println!("Error sending raw stream data to output stream!");
break 'infy;
}
}
// }
}
siggen
});
(threadhandle, commtx)
}
/// Start a stream of certain type, using given configuration
pub fn startStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
match stype {
StreamType::Input | StreamType::Duplex => {
self.startInputOrDuplexStream(stype, cfg)?;
}
StreamType::Output => {
// self.startOutputStream(cfg)?;
bail!("No output stream defined yet");
}
}
Ok(())
}
// fn startOutputStream(&mut self, cfg: &DaqConfig) -> Result<()> {
// let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
// let stream = match cfg.api {
// StreamApiDescr::Cpal => {
// let devinfo = self
// .match_devinfo(cfg)
// .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
// self.cpal_api.startOutputStream(devinfo, cfg, tx)?
// }
// _ => bail!("Unimplemented api!"),
// };
// Ok(())
// }
// Start an input or duplex stream
fn startInputOrDuplexStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> {
if self.input_stream.is_some() {
bail!("An input stream is already running. Please first stop existing input stream.")
}
if cfg.numberEnabledInChannels() == 0 {
bail!("At least one input channel should be enabled for an input stream")
}
if stype == StreamType::Duplex {
if cfg.numberEnabledOutChannels() == 0 {
bail!("At least one output channel should be enabled for a duplex stream")
}
if self.output_stream.is_some() {
bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream.");
}
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
let stream = match cfg.api {
StreamApiDescr::Cpal => {
if stype == StreamType::Duplex {
bail!("Duplex mode not supported for CPAL api");
}
let devinfo = self
.match_devinfo(cfg)
.ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?;
self.cpal_api.startInputStream(stype, devinfo, cfg, tx)?
}
_ => bail!("Unimplemented api!"),
};
// Input queues should be available, otherwise panic bug.
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
self.input_stream = Some(StreamInfo {
streamtype: stype,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
/// Start a default input stream, using default settings on everything. This is only possible
/// when the CPAL_api is available
pub fn startDefaultInputStream(&mut self) -> Result<()> {
if self.input_stream.is_some() {
bail!("Input stream is already running. Please first stop existing input stream.")
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
// Only a default input stream when CPAL feature is enabled
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
let stream = self.cpal_api.startDefaultInputStream(tx)?;
// Inform all listeners of new stream data
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
self.input_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
else {
bail!("Unable to start default input stream: no CPAL api available")
}
}
}
/// Start a default output stream. Only possible when CPAL Api is available.
pub fn startDefaultOutputStream(&mut self) -> Result<()> {
if let Some(istream) = &self.input_stream {
if istream.streamtype == StreamType::Duplex {
bail!("Duplex stream is already running");
}
}
if self.output_stream.is_some() {
bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream.");
}
cfg_if::cfg_if! {
if #[cfg(feature="cpal-api")] {
let (tx, rx)= unbounded();
let stream = self.cpal_api.startDefaultOutputStream(rx)?;
let meta = stream.metadata();
let (threadhandle, commtx) = self.startOuputStreamThread(meta, tx);
// Inform all listeners of new stream data
self.output_stream = Some(StreamInfo {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
} // end if cpal api available
else {
bail!("Unable to start default input stream: no CPAL api available")
}
} // end of cfg_if
}
/// Stop existing input stream.
pub fn stopInputStream(&mut self) -> Result<()> {
if let Some(StreamInfo {
streamtype: _, // Ignored here
stream: _,
threadhandle,
comm,
}) = self.input_stream.take()
{
// println!("Stopping existing stream..");
// Send thread to stop
comm.send(StreamCommand::StopThread).unwrap();
// Store stream queues back into StreamMgr
self.instreamqueues = Some(threadhandle.join().expect("Stream thread panicked!"));
} else {
bail!("Stream is not running.")
}
Ok(())
}
/// Stop existing output stream
pub fn stopOutputStream(&mut self) -> Result<()> {
if let Some(StreamInfo {
streamtype: _, // Ignored here
stream: _,
threadhandle,
comm,
}) = self.output_stream.take()
{
if comm.send(StreamCommand::StopThread).is_err() {
// Failed to send command over channel. This means the thread is
// already finished due to some other reason.
assert!(threadhandle.is_finished());
}
// println!("Wainting for threadhandle to join...");
self.siggen = Some(threadhandle.join().expect("Output thread panicked!"));
// println!("Threadhandle joined!");
} else {
bail!("Stream is not running.");
}
Ok(())
}
/// Stop existing running stream.
///
/// Args
///
/// * st: The stream type.
pub fn stopStream(&mut self, st: StreamType) -> Result<()> {
match st {
StreamType::Input | StreamType::Duplex => self.stopInputStream(),
StreamType::Output => self.stopOutputStream(),
}
}
} // impl StreamMgr
impl Drop for StreamMgr {
fn drop(&mut self) {
// Kill input stream if there is one
if self.input_stream.is_some() {
self.stopStream(StreamType::Input).unwrap();
}
if self.output_stream.is_some() {
// println!("Stopstream in Drop");
self.stopStream(StreamType::Output).unwrap();
// println!("Stopstream in Drop done");
}
// Decref the singleton
smgr_created.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
// Send to all queues, remove queues that are disconnected when found out
// on the way.
fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) {
// Loop over queues. Remove queues that error when we try to send
// to them
iqueues.retain(|q| match q.try_send(msg.clone()) {
Ok(_) => true,
Err(_e) => false,
});
}
/// Daq devices
trait Daq {}
#[cfg(test)]
mod tests {
// #[test]
}