lasprs/src/daq/mod.rs

303 lines
9.7 KiB
Rust

//! Data acquisition model. Provides abstract layers around DAQ devices.
mod api;
mod daqconfig;
mod datatype;
mod deviceinfo;
mod qty;
#[cfg(feature = "record")]
mod record;
mod streamhandler;
mod streammsg;
pub use daqconfig::*;
pub use datatype::*;
pub use deviceinfo::*;
pub use qty::*;
pub use streamhandler::*;
pub use streammsg::*;
#[cfg(feature = "record")]
pub use record::*;
#[cfg(feature = "cpal_api")]
use api::api_cpal::CpalApi;
use crate::{
config::*,
siggen::{self, Siggen},
};
use anyhow::{bail, Error, Result};
use api::Stream;
use core::time;
use crossbeam::{
channel::{unbounded, Receiver, Sender, TrySendError},
thread,
};
use deviceinfo::DeviceInfo;
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread};
use streammsg::*;
/// Keep track of whether the stream has been created. To ensure singleton behaviour.
static smgr_created: AtomicBool = AtomicBool::new(false);
struct StreamData<T> {
streamtype: StreamType,
stream: Box<dyn Stream>,
threadhandle: JoinHandle<T>,
comm: Sender<StreamCommand>,
}
/// Configure and manage input / output streams.
///
pub struct StreamMgr {
// Input stream can be both input and duplex
input_stream: Option<StreamData<InQueues>>,
// Output only stream
output_stream: Option<StreamData<Siggen>>,
#[cfg(feature = "cpal_api")]
cpal_api: CpalApi,
/// The storage of queues. When no streams are running, they
/// are here. When stream is running, they will become available
/// in the JoinHandle of the thread.
instreamqueues: Option<InQueues>,
// Signal generator. Stored here on the bench in case no stream is running.
// It is picked when it is configured correctly for the starting output stream
// If it is not configured correctly, when a stream that outputs data is started
// ,it is removed here.
siggen: Option<crate::siggen::Siggen>,
}
impl StreamMgr {
/// Create new stream manager. A stream manager is supposed to be a singleton.
///
/// # Panics
///
/// When a StreamMgr object is already alive.
pub fn new() -> StreamMgr {
if smgr_created.load(std::sync::atomic::Ordering::Relaxed) {
panic!("BUG: Only one stream manager is supposed to be a singleton");
}
smgr_created.store(true, std::sync::atomic::Ordering::Relaxed);
StreamMgr {
input_stream: None,
output_stream: None,
siggen: None,
#[cfg(feature = "cpal_api")]
cpal_api: CpalApi::new(),
instreamqueues: Some(vec![]),
}
}
/// Set a new signal generator. Returns an error if it is unapplicable.
/// It is unapplicable if the number of channels of output does not match the
/// number of output channels in a running stream.
pub fn setSiggen(&mut self, siggen: Siggen) -> Result<()> {
// Current signal generator. Where to place it?
if let Some(is) = &self.input_stream {
if let StreamType::Duplex = is.streamtype {
if siggen.nchannels() != is.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
assert!(self.siggen.is_none());
is.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
}
} else if let Some(os) = &self.output_stream {
assert!(self.siggen.is_none());
if siggen.nchannels() != os.stream.noutchannels() {
bail!("Invalid number of channels configured in signal generator")
}
os.comm.send(StreamCommand::NewSiggen(siggen)).unwrap();
return Ok(());
} else {
self.siggen = Some(siggen);
return Ok(());
}
unreachable!()
}
/// Obtain a list of devices that are available for each available API
pub fn getDeviceInfo(&mut self) -> Vec<DeviceInfo> {
let mut devinfo = vec![];
#[cfg(feature = "cpal_api")]
{
let cpal_devs = self.cpal_api.getDeviceInfo();
if let Ok(devs) = cpal_devs {
devinfo.extend(devs);
}
}
devinfo
}
/// Add a new queue to the lists of queues
pub fn addInQueue(&mut self, tx: Sender<InStreamMsg>) {
if let Some(is) = &self.input_stream {
is.comm.send(StreamCommand::AddInQueue(tx)).unwrap()
} else {
self.instreamqueues.as_mut().unwrap().push(tx);
}
}
fn startInputStreamThread(
&mut self,
stream: &Box<dyn Stream>,
rx: Receiver<RawStreamData>,
) -> (JoinHandle<InQueues>, Sender<StreamCommand>) {
let (commtx, commrx) = unbounded();
// Unwrap here, as the queues should be free to grab
let mut iqueues = self.instreamqueues.take().unwrap();
let meta = stream.metadata().unwrap();
let threadhandle = std::thread::spawn(move || {
let mut ctr: usize = 0;
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
// New queue added
StreamCommand::AddInQueue(queue) => {
match queue.send(InStreamMsg::StreamStarted(Arc::new(meta.clone()))) {
Ok(()) => iqueues.push(queue),
Err(_) => {}
}
}
// Remove queue from list
StreamCommand::RemoveInQueue(queue) => {
iqueues.retain(|q| !q.same_channel(&queue))
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped);
break 'infy;
}
StreamCommand::NewSiggen(_) => {
panic!("Error: signal generator send to input-only stream.");
break 'infy;
}
}
}
if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) {
// println!("Obtained raw stream data!");
let msg = Arc::new(msg);
let msg = InStreamMsg::RawStreamData(ctr, msg);
sendMsgToAllQueues(&mut iqueues, msg);
}
ctr += 1;
}
iqueues
});
(threadhandle, commtx)
}
/// Start a default input stream, using default settings on everything. This is only possible
/// when
pub fn startDefaultInputStream(&mut self) -> Result<()> {
if self.input_stream.is_some() {
bail!("Input stream is already running. Please first stop existing input stream.")
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
// Only a default input stream when CPAL feature is enabled
cfg_if::cfg_if! {
if #[cfg(feature="cpal_api")] {
let stream = self.cpal_api.startDefaultInputStream(tx)?;
// Inform all listeners of new stream data
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata().unwrap();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(Arc::new(meta)));
let (threadhandle, commtx) = self.startInputStreamThread(&stream, rx);
self.input_stream = Some(StreamData {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
Ok(())
}
else {
bail!("Unable to start default input stream: no CPAL api available")
}
}
}
/// Stop existing input stream.
pub fn stopInputStream(&mut self) -> Result<()> {
if let Some(StreamData {
streamtype: _, // Ignored here
stream: _,
threadhandle,
comm,
}) = self.input_stream.take()
{
// println!("Stopping existing stream..");
// Send thread to stop
comm.send(StreamCommand::StopThread).unwrap();
// Store stream queues back into StreamMgr
self.instreamqueues = Some(threadhandle.join().expect("Stream thread panicked!"));
} else {
bail!("Stream is not running.")
}
Ok(())
}
/// Stop existing running stream.
///
/// Args
///
/// * st: The stream type.
pub fn stopStream(&mut self, st: StreamType) -> Result<()> {
match st {
StreamType::Input | StreamType::Duplex => self.stopInputStream(),
_ => bail!("Not implemented output stream"),
}
}
} // impl StreamMgr
impl Drop for StreamMgr {
fn drop(&mut self) {
// Kill input stream if there is one
if self.input_stream.is_some() {
self.stopStream(StreamType::Input).unwrap();
}
if self.output_stream.is_some() {
self.stopStream(StreamType::Output).unwrap();
}
// Decref the singleton
smgr_created.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
// Send to all queues, remove queues that are disconnected when found out
// on the way.
fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) {
// Loop over queues. Remove queues that error when we try to send
// to them
iqueues.retain(|q| match q.try_send(msg.clone()) {
Ok(_) => true,
Err(_e) => false,
});
}
/// Daq devices
trait Daq {}
#[cfg(test)]
mod tests {
// #[test]
}