//! Data acquisition model. Provides abstract layers around DAQ devices. use super::api::*; use super::*; use crate::{ config::*, siggen::{self, Siggen}, }; use streamcmd::StreamCommand; use streammsg::*; use streamdata::*; use anyhow::{bail, Error, Result}; use array_init::from_iter; use core::time; use cpal::Sample; use crossbeam::{ channel::{unbounded, Receiver, Sender, TrySendError}, thread, }; use std::sync::{atomic::AtomicBool, Arc, Mutex}; use std::thread::{JoinHandle, Thread}; #[cfg(feature = "cpal-api")] use super::api::api_cpal::CpalApi; cfg_if::cfg_if! { if #[cfg(feature = "python-bindings")] { use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::{pymodule, types::PyModule, PyResult}; } else {} } /// Store a queue in a shared pointer, to share sending /// and receiving part of the queue. pub type SharedInQueue = Sender; /// Vector of queues for stream messages pub type InQueues = Vec; struct StreamInfo { streamtype: StreamType, stream: Box, threadhandle: JoinHandle, comm: Sender, } /// Keep track of whether the stream has been created. To ensure singleton behaviour. static smgr_created: AtomicBool = AtomicBool::new(false); #[cfg_attr(feature = "python-bindings", pyclass(unsendable))] /// Configure and manage input / output streams. /// pub struct StreamMgr { // List of available devices devs: Vec, // Input stream can be both input and duplex input_stream: Option>, // Output only stream output_stream: Option>, #[cfg(feature = "cpal-api")] cpal_api: CpalApi, /// The storage of queues. When no streams are running, they /// are here. When stream is running, they will become available /// in the JoinHandle of the thread. instreamqueues: Option, // Signal generator. Stored here on the bench in case no stream is running. // It is picked when it is configured correctly for the starting output stream // If it is not configured correctly, when a stream that outputs data is started // ,it is removed here. siggen: Option, } #[cfg(feature = "python-bindings")] #[cfg_attr(feature = "python-bindings", pymethods)] impl StreamMgr { #[new] /// See (StreamMgr::new()) fn new_py<'py>() -> StreamMgr { StreamMgr::new() } // #[pyo3(name = "unit")] // #[staticmethod] // /// See: [Biquad::unit()] // pub fn unit_py() -> Biquad { // Biquad::unit() // } // #[pyo3(name = "firstOrderHighPass")] } impl StreamMgr { /// Create new stream manager. A stream manager is supposed to be a singleton. /// /// # Panics /// /// When a StreamMgr object is already alive. pub fn new() -> StreamMgr { if smgr_created.load(std::sync::atomic::Ordering::Relaxed) { panic!("BUG: Only one stream manager is supposed to be a singleton"); } smgr_created.store(true, std::sync::atomic::Ordering::Relaxed); let mut smgr = StreamMgr { devs: vec![], input_stream: None, output_stream: None, siggen: None, #[cfg(feature = "cpal-api")] cpal_api: CpalApi::new(), instreamqueues: Some(vec![]), }; smgr.devs = smgr.scanDeviceInfo(); smgr } /// Get stream status for given stream type. pub fn getStatus(&self, t: StreamType) -> StreamStatus { match t { StreamType::Input | StreamType::Duplex => { if let Some(s) = &self.input_stream { s.stream.status() } else { StreamStatus::NotRunning } } StreamType::Output => { if let Some(s) = &self.output_stream { s.stream.status() } else { StreamStatus::NotRunning } } } } /// Set a new signal generator. Returns an error if it is unapplicable. /// It is unapplicable if the number of channels of output does not match the /// number of output channels in a running stream. pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> { // Current signal generator. Where to place it? if let Some(istream) = &self.input_stream { if let StreamType::Duplex = istream.streamtype { if siggen.nchannels() != istream.stream.noutchannels() { bail!("Invalid number of channels configured in signal generator") } assert!(self.siggen.is_none()); istream.comm.send(StreamCommand::NewSiggen(siggen)).unwrap(); return Ok(()); } } else if let Some(os) = &self.output_stream { assert!(self.siggen.is_none()); if siggen.nchannels() != os.stream.noutchannels() { bail!("Invalid number of channels configured in signal generator") } os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap(); return Ok(()); } else { self.siggen = Some(siggen); return Ok(()); } unreachable!() } /// Obtain a list of devices that are available for each available API pub fn getDeviceInfo(&mut self) -> &Vec { &self.devs } fn scanDeviceInfo(&self) -> Vec { let mut devinfo = vec![]; #[cfg(feature = "cpal-api")] { let cpal_devs = self.cpal_api.getDeviceInfo(); if let Ok(devs) = cpal_devs { devinfo.extend(devs); } } devinfo } /// Add a new queue to the lists of queues pub fn addInQueue(&mut self, tx: Sender) { if let Some(is) = &self.input_stream { is.comm.send(StreamCommand::AddInQueue(tx)).unwrap() } else { self.instreamqueues.as_mut().unwrap().push(tx); } } fn startInputStreamThread( &mut self, meta: Arc, rx: Receiver, ) -> (JoinHandle, Sender) { let (commtx, commrx) = unbounded(); // Unwrap here, as the queues should be free to grab let mut iqueues = self .instreamqueues .take() .expect("No input streams queues!"); let threadhandle = std::thread::spawn(move || { let mut ctr: usize = 0; 'infy: loop { if let Ok(comm_msg) = commrx.try_recv() { match comm_msg { // New queue added StreamCommand::AddInQueue(queue) => { match queue.send(InStreamMsg::StreamStarted(meta.clone())) { Ok(()) => iqueues.push(queue), Err(_) => {} } } // Remove queue from list StreamCommand::RemoveInQueue(queue) => { iqueues.retain(|q| !q.same_channel(&queue)) } // Stop this thread. Returns the queue StreamCommand::StopThread => { sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped); break 'infy; } StreamCommand::NewSiggen(_) => { panic!("Error: signal generator send to input-only stream."); } } } if let Ok(raw) = rx.recv_timeout(time::Duration::from_millis(10)) { // println!("Obtained raw stream data!"); let streamdata = StreamData::new(ctr, meta.clone(), raw); let streamdata = Arc::new(streamdata); let msg = InStreamMsg::StreamData(streamdata); sendMsgToAllQueues(&mut iqueues, msg); ctr += 1; } } iqueues }); (threadhandle, commtx) } // Match device info struct on given daq config. fn match_devinfo(&self, cfg: &DaqConfig) -> Option<&DeviceInfo> { for d in self.devs.iter() { if d.device_name == cfg.device_name { return Some(d); } } None } fn startOuputStreamThread( &mut self, meta: Arc, tx: Sender, ) -> (JoinHandle, Sender) { let (commtx, commrx) = unbounded(); // Number of channels to output for let nchannels = meta.nchannels(); // Obtain signal generator. Set to silence when no signal generator is // installed. let mut siggen = self .siggen .take() .unwrap_or_else(|| Siggen::newSilence(nchannels)); if siggen.nchannels() != nchannels { // Updating number of channels siggen.setNChannels(nchannels); } siggen.reset(meta.samplerate); let threadhandle = std::thread::spawn(move || { let mut floatbuf: Vec = Vec::with_capacity(nchannels * meta.framesPerBlock); 'infy: loop { if let Ok(comm_msg) = commrx.try_recv() { match comm_msg { // New queue added StreamCommand::AddInQueue(_) => { panic!("Invalid message send to output thread: AddInQueue"); } // Remove queue from list StreamCommand::RemoveInQueue(_) => { panic!("Invalid message send to output thread: RemoveInQueue"); } // Stop this thread. Returns the queue StreamCommand::StopThread => { break 'infy; } StreamCommand::NewSiggen(new_siggen) => { // println!("NEW SIGNAL GENERATOR ARRIVED!"); siggen = new_siggen; siggen.reset(meta.samplerate); if siggen.nchannels() != nchannels { // println!("Updating channels"); siggen.setNChannels(nchannels); } } } } while tx.len() < 2 { unsafe { floatbuf.set_len(nchannels * meta.framesPerBlock); } // Obtain signal siggen.genSignal(&mut floatbuf); // println!("level: {}", floatbuf.iter().sum::()); let msg = match meta.rawDatatype { DataType::I8 => { let v = Vec::::from_iter(floatbuf.iter().map(|f| f.to_sample())); RawStreamData::Datai8(v) } DataType::I16 => { let v = Vec::::from_iter(floatbuf.iter().map(|f| f.to_sample())); RawStreamData::Datai16(v) } DataType::I32 => { let v = Vec::::from_iter(floatbuf.iter().map(|f| f.to_sample())); RawStreamData::Datai32(v) } DataType::F32 => { let v = Vec::::from_iter(floatbuf.iter().map(|f| f.to_sample())); RawStreamData::Dataf32(v) } DataType::F64 => { let v = Vec::::from_iter(floatbuf.iter().map(|f| f.to_sample())); RawStreamData::Dataf64(v) } }; if let Err(_e) = tx.send(msg) { // println!("Error sending raw stream data to output stream!"); break 'infy; } } // } } siggen }); (threadhandle, commtx) } /// Start a stream of certain type, using given configuration pub fn startStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> { match stype { StreamType::Input | StreamType::Duplex => { self.startInputOrDuplexStream(stype, cfg)?; } StreamType::Output => { // self.startOutputStream(cfg)?; bail!("No output stream defined yet"); } } Ok(()) } // fn startOutputStream(&mut self, cfg: &DaqConfig) -> Result<()> { // let (tx, rx): (Sender, Receiver) = unbounded(); // let stream = match cfg.api { // StreamApiDescr::Cpal => { // let devinfo = self // .match_devinfo(cfg) // .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?; // self.cpal_api.startOutputStream(devinfo, cfg, tx)? // } // _ => bail!("Unimplemented api!"), // }; // Ok(()) // } // Start an input or duplex stream fn startInputOrDuplexStream(&mut self, stype: StreamType, cfg: &DaqConfig) -> Result<()> { if self.input_stream.is_some() { bail!("An input stream is already running. Please first stop existing input stream.") } if cfg.numberEnabledInChannels() == 0 { bail!("At least one input channel should be enabled for an input stream") } if stype == StreamType::Duplex { if cfg.numberEnabledOutChannels() == 0 { bail!("At least one output channel should be enabled for a duplex stream") } if self.output_stream.is_some() { bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream."); } } let (tx, rx): (Sender, Receiver) = unbounded(); let stream = match cfg.api { StreamApiDescr::Cpal => { if stype == StreamType::Duplex { bail!("Duplex mode not supported for CPAL api"); } let devinfo = self .match_devinfo(cfg) .ok_or(anyhow::anyhow!("Unable to find device {}", cfg.device_name))?; self.cpal_api.startInputStream(stype, devinfo, cfg, tx)? } _ => bail!("Unimplemented api!"), }; // Input queues should be available, otherwise panic bug. let iqueues = self.instreamqueues.as_mut().unwrap(); let meta = stream.metadata(); sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone())); let (threadhandle, commtx) = self.startInputStreamThread(meta, rx); self.input_stream = Some(StreamInfo { streamtype: stype, stream, threadhandle, comm: commtx, }); Ok(()) } /// Start a default input stream, using default settings on everything. This is only possible /// when the CPAL_api is available pub fn startDefaultInputStream(&mut self) -> Result<()> { if self.input_stream.is_some() { bail!("Input stream is already running. Please first stop existing input stream.") } let (tx, rx): (Sender, Receiver) = unbounded(); // Only a default input stream when CPAL feature is enabled cfg_if::cfg_if! { if #[cfg(feature="cpal-api")] { let stream = self.cpal_api.startDefaultInputStream(tx)?; // Inform all listeners of new stream data let iqueues = self.instreamqueues.as_mut().unwrap(); let meta = stream.metadata(); sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone())); let (threadhandle, commtx) = self.startInputStreamThread(meta, rx); self.input_stream = Some(StreamInfo { streamtype: StreamType::Input, stream, threadhandle, comm: commtx, }); Ok(()) } else { bail!("Unable to start default input stream: no CPAL api available") } } } /// Start a default output stream. Only possible when CPAL Api is available. pub fn startDefaultOutputStream(&mut self) -> Result<()> { if let Some(istream) = &self.input_stream { if istream.streamtype == StreamType::Duplex { bail!("Duplex stream is already running"); } } if self.output_stream.is_some() { bail!("An output stream is already running. Duplex mode stream cannot be started. Please first stop existing output stream."); } cfg_if::cfg_if! { if #[cfg(feature="cpal-api")] { let (tx, rx)= unbounded(); let stream = self.cpal_api.startDefaultOutputStream(rx)?; let meta = stream.metadata(); let (threadhandle, commtx) = self.startOuputStreamThread::(meta, tx); // Inform all listeners of new stream data self.output_stream = Some(StreamInfo { streamtype: StreamType::Input, stream, threadhandle, comm: commtx, }); Ok(()) } // end if cpal api available else { bail!("Unable to start default input stream: no CPAL api available") } } // end of cfg_if } /// Stop existing input stream. pub fn stopInputStream(&mut self) -> Result<()> { if let Some(StreamInfo { streamtype: _, // Ignored here stream: _, threadhandle, comm, }) = self.input_stream.take() { // println!("Stopping existing stream.."); // Send thread to stop comm.send(StreamCommand::StopThread).unwrap(); // Store stream queues back into StreamMgr self.instreamqueues = Some(threadhandle.join().expect("Stream thread panicked!")); } else { bail!("Stream is not running.") } Ok(()) } /// Stop existing output stream pub fn stopOutputStream(&mut self) -> Result<()> { if let Some(StreamInfo { streamtype: _, // Ignored here stream: _, threadhandle, comm, }) = self.output_stream.take() { if let Err(_) = comm.send(StreamCommand::StopThread){ assert!(threadhandle.is_finished()); } // println!("Wainting for threadhandle to join..."); self.siggen = Some(threadhandle.join().expect("Output thread panicked!")); // println!("Threadhandle joined!"); } else { bail!("Stream is not running."); } Ok(()) } /// Stop existing running stream. /// /// Args /// /// * st: The stream type. pub fn stopStream(&mut self, st: StreamType) -> Result<()> { match st { StreamType::Input | StreamType::Duplex => self.stopInputStream(), StreamType::Output => self.stopOutputStream(), } } } // impl StreamMgr impl Drop for StreamMgr { fn drop(&mut self) { // Kill input stream if there is one if self.input_stream.is_some() { self.stopStream(StreamType::Input).unwrap(); } if self.output_stream.is_some() { // println!("Stopstream in Drop"); self.stopStream(StreamType::Output).unwrap(); // println!("Stopstream in Drop done"); } // Decref the singleton smgr_created.store(false, std::sync::atomic::Ordering::Relaxed); } } // Send to all queues, remove queues that are disconnected when found out // on the way. fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) { // Loop over queues. Remove queues that error when we try to send // to them iqueues.retain(|q| match q.try_send(msg.clone()) { Ok(_) => true, Err(_e) => false, }); } /// Daq devices trait Daq {} #[cfg(test)] mod tests { // #[test] }