lasprs/src/daq/mod.rs

228 lines
6.7 KiB
Rust

//! Data acquisition model. Provides abstract layers around DAQ devices.
mod api;
mod daqconfig;
mod datatype;
mod deviceinfo;
mod qty;
mod streammsg;
pub use datatype::*;
pub use deviceinfo::*;
pub use qty::*;
pub use streammsg::*;
#[cfg(feature = "cpal_api")]
use api::api_cpal::CpalApi;
use crate::config::*;
use anyhow::{bail, Error, Result};
use api::Stream;
use core::time;
use crossbeam::{
channel::{unbounded, Receiver, Sender},
thread,
};
use deviceinfo::DeviceInfo;
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread};
use streammsg::*;
/// Keep track of whether the stream has been created. To ensure singleton behaviour.
static smgr_created: AtomicBool = AtomicBool::new(false);
struct InputStream {
streamtype: StreamType,
stream: Box<dyn Stream>,
threadhandle: JoinHandle<InQueues>,
comm: Sender<StreamCommand>,
}
/// Configure and manage input / output streams.
///
pub struct StreamMgr {
// Input stream can be both input and duplex
input_stream: Option<InputStream>,
// Output only stream
output_stream: Option<Box<dyn Stream>>,
// Signal generator
siggen: Option<crate::siggen::Siggen>,
#[cfg(feature = "cpal_api")]
cpal_api: CpalApi,
/// The storage of queues. When no streams are running, they
/// are here. When stream is running, they will become available
/// in the JoinHandle of the thread.
instreamqueues: Option<InQueues>,
}
impl StreamMgr {
/// Create new stream manager. A stream manager is supposed to be a singleton.
///
/// # Panics
///
/// When a StreamMgr object is already alive.
pub fn new() -> StreamMgr {
if smgr_created.load(std::sync::atomic::Ordering::Relaxed) {
panic!("BUG: Only one stream manager is supposed to be a singleton");
}
smgr_created.store(true, std::sync::atomic::Ordering::Relaxed);
StreamMgr {
input_stream: None,
output_stream: None,
siggen: None,
#[cfg(feature = "cpal_api")]
cpal_api: CpalApi::new(),
instreamqueues: Some(vec![]),
}
}
/// Obtain a list of devices that are available for each available API
fn getDeviceInfo(&mut self) -> Vec<DeviceInfo> {
let mut devinfo = vec![];
#[cfg(feature="cpal_api")]
devinfo.extend(self.cpal_api.getDeviceInfo());
devinfo
}
/// Start a default input stream, using default settings on everything. This is only possible
/// when
pub fn startDefaultInputStream(&mut self) -> Result<()> {
#![allow(unreachable_code)]
if !self.input_stream.is_none() {
bail!("Input stream is already running. Please first stop existing input stream.")
}
let (tx, rx): (Sender<RawStreamData>, Receiver<RawStreamData>) = unbounded();
cfg_if::cfg_if! {
if #[cfg(feature="cpal_api")] {
let stream = self.cpal_api.startDefaultInputStream(tx)?;
}
else {
bail!("Unable to start default input stream: no CPAL api available")
}
}
// Unwrap here, as the queues should be free to grab
let mut iqueues = self.instreamqueues.take().unwrap();
let (commtx, commrx) = unbounded();
// let metadata = StreamMetaData::new(
// nchannels:
// ).unwrap();
let threadhandle = std::thread::spawn(move || {
let mut ctr: usize = 0;
'infy: loop {
if let Ok(comm_msg) = commrx.try_recv() {
match comm_msg {
// New queue added
StreamCommand::AddInQueue(queue) => {
iqueues.push(queue);
// queue.send(streammsg::StreamMetaData(md))
}
// Remove queue from list
StreamCommand::RemoveInQueue(queue) => {
iqueues.retain(|q| !Arc::ptr_eq(q, &queue))
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
for q in iqueues.iter() {
q.send(InStreamMsg::StreamStopped).unwrap();
}
break 'infy;
}
}
}
if let Ok(msg) = rx.recv_timeout(time::Duration::from_millis(10)) {
// println!("Obtained raw stream data!");
let msg = Arc::new(msg);
for q in iqueues.iter() {
q.send(InStreamMsg::RawStreamData(ctr, msg.clone()))
.unwrap();
}
}
ctr += 1;
}
iqueues
});
cfg_if::cfg_if! {
if #[cfg(feature="cpal_api")] {
self.input_stream = Some(InputStream {
streamtype: StreamType::Input,
stream,
threadhandle,
comm: commtx,
});
} else {}
}
Ok(())
}
/// Stop existing input stream.
pub fn stopInputStream(&mut self) -> Result<()> {
if let Some(InputStream {
streamtype: _, // Ignored here
stream: _,
threadhandle,
comm,
}) = self.input_stream.take()
{
// println!("Stopping existing stream..");
// Send thread to stop
comm.send(StreamCommand::StopThread).unwrap();
// Store stream queues back into StreamMgr
self.instreamqueues = Some(threadhandle.join().expect("Stream thread panicked!"));
} else {
bail!("Stream is not running.")
}
Ok(())
}
/// Stop existing running stream.
///
/// Args
///
/// * st: The stream type.
pub fn stopStream(&mut self, st: StreamType) -> Result<()> {
match st {
StreamType::Input | StreamType::Duplex => self.stopInputStream(),
_ => bail!("Not implemented output stream"),
}
}
} // impl StreamMgr
impl Drop for StreamMgr {
fn drop(&mut self) {
// Kill input stream if there is one
if self.input_stream.is_some() {
self.stopStream(StreamType::Input).unwrap();
}
if self.output_stream.is_some() {
self.stopStream(StreamType::Output).unwrap();
}
// Decref the singleton
smgr_created.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
/// Daq devices
trait Daq {}
#[cfg(test)]
mod tests {
// #[test]
}