Code cleanup with more use statements. Better Python wrapper code.

This commit is contained in:
Anne de Jong 2024-05-01 15:25:26 +02:00
parent b15e81409e
commit 22a9c9f850
20 changed files with 153 additions and 125 deletions

View File

@ -88,3 +88,4 @@ record = ["dep:hdf5-sys", "dep:hdf5", "dep:chrono", "dep:uuid"]
f64 = []
f32 = []
python-bindings = ["dep:pyo3", "dep:numpy"]
extension-module = []

View File

@ -1 +1 @@
from ._lasprs import *

View File

@ -51,12 +51,12 @@ fn main() -> Result<()> {
}
sleep(100);
match smgr.getStatus(StreamType::Output) {
StreamStatus::NotRunning => {
StreamStatus::NotRunning{} => {
println!("Stream is not running?");
break 'infy;
}
StreamStatus::Running => {}
StreamStatus::Error(e) => {
StreamStatus::Running{} => {}
StreamStatus::Error{e} => {
println!("Stream error: {}", e);
break 'infy;
}

View File

@ -24,19 +24,21 @@ cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
pub use numpy::ndarray::{ArrayD, ArrayViewD, ArrayViewMutD};
pub use numpy::ndarray::prelude::*;
pub use numpy::{IntoPyArray, PyArray1, PyArrayDyn, PyArrayLike1, PyReadonlyArrayDyn};
pub use pyo3::exceptions::PyValueError;
pub use numpy::{IntoPyArray,PyArray, PyArray1, PyArrayDyn, PyArrayLike1, PyReadonlyArrayDyn};
pub use pyo3::prelude::*;
pub use pyo3::exceptions::PyValueError;
pub use pyo3::{pymodule, types::PyModule, PyResult};
pub use pyo3::anyhow::*;
pub use pyo3;
} else {
pub use ndarray::prelude::*;
pub use ndarray::{Array1, Array2, ArrayView1};
} }
// pub use num::complex::i;
use num::complex::*;
use ndarray::OwnedRepr;
use num::complex::Complex;
/// View into 1D array of floats
pub type VdView<'a> = ArrayView1<'a, Flt>;
@ -66,3 +68,17 @@ pub type Ccol = Array1<Cflt>;
pub type Dmat = Array2<Flt>;
/// 2D array of complex floats
pub type Cmat = Array2<Cflt>;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
/// 1D array of T as returned from Rust to Numpy
pub type PyArr1<'py, T> = Bound<'py, PyArray<T, ndarray::Dim<[usize; 1]>>>;
/// 1D array Floats returned from Rust to Numpy
pub type PyArr1Flt<'py> = PyArr1<'py, Flt>;
/// 1D array of Complex returned from Rust to Numpy
pub type PyArr1Cflt<'py> = PyArr1<'py, Cflt>;
}}

View File

@ -1,7 +1,7 @@
#![allow(dead_code)]
use super::Stream;
use super::StreamMetaData;
use crate::daq::streamdata::*;
use crate::daq::{streamdata::*, StreamApiDescr};
use crate::config::{self, *};
use crate::daq::{self, *};
use anyhow::{bail, Result};
@ -147,7 +147,7 @@ impl CpalApi {
};
let prefSampleRate = *avSampleRates.last().unwrap_or(&48000.);
devs.push(DeviceInfo {
api: super::StreamApiDescr::Cpal,
api: StreamApiDescr::Cpal,
device_name: dev.name()?,
avDataTypes: dtypes,
prefDataType,
@ -186,7 +186,7 @@ impl CpalApi {
if let Some(sender) = &send_ch {
sender.send(RawStreamData::StreamError(serr)).unwrap();
}
status.store(StreamStatus::Error(serr));
status.store(StreamStatus::Error{e: serr});
}
}
@ -251,7 +251,7 @@ impl CpalApi {
en_inchannels: Vec<usize>,
framesPerBlock: usize,
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning));
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning{}));
let errfcn = CpalApi::create_errfcn(Some(sender.clone()), status.clone());
@ -301,7 +301,7 @@ impl CpalApi {
let mut setToEquilibrium = || data.iter_mut().for_each(|v| *v = Sample::EQUILIBRIUM);
match status {
StreamStatus::NotRunning | StreamStatus::Error(_) => {
StreamStatus::NotRunning{} | StreamStatus::Error{..} => {
setToEquilibrium();
return;
}
@ -312,7 +312,7 @@ impl CpalApi {
// Obtain new samples from the generator
for dat in receiver.try_iter() {
let slice = dat.getRef::<T>();
if let StreamStatus::Running = status {
if let StreamStatus::Running{} = status {
q.extend(slice);
}
}
@ -330,7 +330,7 @@ impl CpalApi {
setToEquilibrium();
} else {
// Output buffer underrun
streamstatus.store(StreamStatus::Error(StreamError::OutputUnderrunError));
streamstatus.store(StreamStatus::Error{e:StreamError::OutputUnderrunError});
setToEquilibrium();
}
}
@ -345,7 +345,7 @@ impl CpalApi {
) -> Result<(cpal::Stream, Arc<AtomicCell<StreamStatus>>)> {
// let tot_ch = config.channels as usize;
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning));
let status = Arc::new(AtomicCell::new(StreamStatus::NotRunning{}));
let err_cb = CpalApi::create_errfcn(None, status.clone());
macro_rules! build_stream{
@ -472,7 +472,7 @@ impl CpalApi {
)?;
stream.play()?;
status.store(StreamStatus::Running);
status.store(StreamStatus::Running{});
return Ok(Box::new(CpalStream {
stream,
@ -514,7 +514,7 @@ impl CpalApi {
framesPerBlock,
)?;
stream.play()?;
status.store(StreamStatus::Running);
status.store(StreamStatus::Running{});
// Daq: default channel config
let daqchannels = Vec::from_iter(
@ -574,7 +574,7 @@ impl CpalApi {
)?;
stream.play()?;
status.store(StreamStatus::Running);
status.store(StreamStatus::Running{});
// Daq: default channel config
let daqchannels =

View File

@ -1,13 +1,14 @@
use serde::{Deserialize, Serialize};
/// Daq apis that are optionally compiled in. Examples:
///
/// - CPAL (Cross-Platform Audio Library)
/// - ...
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use strum::EnumMessage;
use strum_macros;
use std::sync::Arc;
use crate::config::*;
use super::{streamstatus::StreamStatus, streamdata::StreamMetaData};
use super::{streamdata::StreamMetaData, streamstatus::StreamStatus};
#[cfg(feature = "cpal-api")]
pub mod api_cpal;
@ -26,9 +27,12 @@ pub trait Stream {
/// Number of output channels in stream
fn noutchannels(&self) -> usize;
/// Obtain stream status
fn status(&self) -> StreamStatus;
}
/// Stream API descriptor: type and corresponding text
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(strum_macros::EnumMessage, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[allow(dead_code)]
pub enum StreamApiDescr {
@ -38,4 +42,4 @@ pub enum StreamApiDescr {
/// PulseAudio api
#[strum(message = "pulse", detailed_message = "Pulseaudio")]
Pulse = 1,
}
}

View File

@ -1,9 +1,6 @@
use std::{ops::Index, path::PathBuf};
use super::api::StreamApiDescr;
use super::datatype::DataType;
use super::deviceinfo::DeviceInfo;
use super::qty::Qty;
use super::*;
use crate::config::*;
use anyhow::Result;
use serde::{Deserialize, Serialize};

View File

@ -3,10 +3,12 @@
use strum::EnumMessage;
use strum_macros;
use serde::{Serialize, Deserialize};
use crate::config::*;
/// Data type description for samples coming from a stream
#[derive(strum_macros::EnumMessage, PartialEq, Copy, Debug, Clone, Serialize, Deserialize)]
#[allow(dead_code)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub enum DataType {
/// 32-bit floats
#[strum(message = "F32", detailed_message = "32-bits floating points")]

View File

@ -1,14 +1,16 @@
//! Data acquisition model. Provides abstract layers around DAQ devices.
#![allow(non_snake_case)]
use super::api::StreamApiDescr;
use super::StreamApiDescr;
use super::*;
use crate::config::*;
use crate::config::*;
/// Device info structure. Gives all information regarding a device, i.e. the number of input and
/// output channels, its name and available sample rates and types.
#[derive(Clone, Debug)]
#[allow(dead_code)]
#[cfg_attr(feature = "python-bindings", pyclass(get_all))]
pub struct DeviceInfo {
/// The api in use for this device
pub api: StreamApiDescr,
@ -17,9 +19,11 @@ pub struct DeviceInfo {
pub device_name: String,
/// Available data types for the sample
// #[pyo3(get)]
pub avDataTypes: Vec<DataType>,
/// Preferred data type for device
// #[pyo3(get)]
pub prefDataType: DataType,
/// Available frames per block
@ -65,5 +69,12 @@ pub struct DeviceInfo {
/// devices, this is typically a 'number' between +/- full scale. For some
/// devices however, the output quantity corresponds to a physical signal,
/// such a Volts.
// #[pyo3(get)]
pub physicalIOQty: Qty,
}
#[cfg_attr(feature = "python-bindings", pymethods)]
impl DeviceInfo {
fn __repr__(&self) -> String {
format!("{:?}", self)
}
}

View File

@ -22,9 +22,10 @@ pub use datatype::DataType;
pub use deviceinfo::DeviceInfo;
pub use qty::Qty;
pub use streamhandler::StreamHandler;
pub use streammgr::StreamMgr;
pub use streammgr::*;
pub use streammsg::InStreamMsg;
pub use streamstatus::StreamStatus;
use api::*;
#[cfg(feature = "record")]
pub use record::*;
@ -46,8 +47,26 @@ pub enum StreamType {
Duplex,
}
#[cfg(feature = "python-bindings")]
/// Add Python classes from stream manager
pub fn add_py_classses(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<DeviceInfo>()?;
m.add_class::<StreamMgr>()?;
m.add_class::<StreamApiDescr>()?;
m.add_class::<DataType>()?;
m.add_class::<Qty>()?;
m.add_class::<StreamType>()?;
m.add_class::<StreamError>()?;
m.add_class::<StreamStatus>()?;
m.add_class::<StreamError>()?;
Ok(())
}
/// Errors that happen in a stream
#[derive(strum_macros::EnumMessage, Debug, Clone, Display, Copy)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub enum StreamError {
/// Input overrun
#[strum(

View File

@ -1,12 +1,14 @@
//! Physical quantities that are input / output of a daq device. Provides an enumeration for these.
//!
use crate::config::*;
use strum::EnumMessage;
use strum_macros;
use serde::{Serialize, Deserialize};
/// Physical quantities that are I/O of a Daq device.
#[derive(PartialEq, Serialize, Deserialize, strum_macros::EnumMessage, Debug, Clone, Copy)]
#[cfg_attr(feature = "python-bindings", pyclass)]
#[allow(dead_code)]
pub enum Qty {
/// Number

View File

@ -5,7 +5,6 @@ use clap::builder::OsStr;
use crossbeam::atomic::AtomicCell;
use hdf5::types::{VarLenArray, VarLenUnicode};
use hdf5::{dataset, datatype, Dataset, File, H5Type};
use ndarray::ArrayView2;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};

View File

@ -1,11 +1,5 @@
use crate::siggen::*;
use super::streammgr::SharedInQueue;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Commands that can be sent to a running stream
@ -13,9 +7,6 @@ pub enum StreamCommand {
/// Add a new queue to a running INPUT stream
AddInQueue(SharedInQueue),
/// Remove a queue to a running INPUT stream
RemoveInQueue(SharedInQueue),
/// New signal generator config to be used in OUTPUT stream
NewSiggen(Siggen),

View File

@ -12,12 +12,6 @@ use std::u128::MAX;
use strum_macros::Display;
use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Raw stream data coming from a stream.
#[derive(Clone, Debug)]

View File

@ -1,6 +1,6 @@
//! Data acquisition model. Provides abstract layers around DAQ devices.
use super::api::*;
use super::*;
use super::config::*;
use crate::{
config::*,
siggen::{self, Siggen},
@ -15,19 +15,14 @@ use crossbeam::{
};
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread::{JoinHandle, Thread};
use streamcmd::StreamCommand;
use streamdata::*;
use streamcmd::StreamCommand;
use streammsg::*;
use api::StreamApiDescr;
#[cfg(feature = "cpal-api")]
use super::api::api_cpal::CpalApi;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, types::PyModule, PyResult};
} else {} }
use super::api::{api_cpal::CpalApi, Stream};
/// Store a queue in a shared pointer, to share sending
/// and receiving part of the queue.
@ -45,9 +40,9 @@ struct StreamInfo<T> {
/// Keep track of whether the stream has been created. To ensure singleton behaviour.
static smgr_created: AtomicBool = AtomicBool::new(false);
#[cfg_attr(feature = "python-bindings", pyclass(unsendable))]
/// Configure and manage input / output streams.
///
#[cfg_attr(feature = "python-bindings", pyclass(unsendable))]
pub struct StreamMgr {
// List of available devices
devs: Vec<DeviceInfo>,
@ -82,13 +77,23 @@ impl StreamMgr {
StreamMgr::new()
}
// #[pyo3(name = "unit")]
// #[staticmethod]
// /// See: [Biquad::unit()]
// pub fn unit_py() -> Biquad {
// Biquad::unit()
// }
// #[pyo3(name = "firstOrderHighPass")]
#[pyo3(name = "startDefaultInputStream")]
fn startDefaultInputStream_py(&mut self) -> PyResult<()> {
Ok(self.startDefaultInputStream()?)
}
#[pyo3(name = "startDefaultOutputStream")]
fn startDefaultOutputStream_py(&mut self) -> PyResult<()> {
Ok(self.startDefaultOutputStream()?)
}
#[pyo3(name = "getDeviceInfo")]
fn getDeviceInfo_py(&mut self) -> PyResult<Vec<DeviceInfo>> {
Ok(self.getDeviceInfo())
}
#[pyo3(name = "getStatus")]
fn getStatus_py(&self, st: StreamType) -> StreamStatus {
self.getStatus(st)
}
}
impl Default for StreamMgr {
fn default() -> Self {
@ -130,14 +135,14 @@ impl StreamMgr {
if let Some(s) = &self.input_stream {
s.stream.status()
} else {
StreamStatus::NotRunning
StreamStatus::NotRunning{}
}
}
StreamType::Output => {
if let Some(s) = &self.output_stream {
s.stream.status()
} else {
StreamStatus::NotRunning
StreamStatus::NotRunning{}
}
}
}
@ -171,8 +176,8 @@ impl StreamMgr {
}
/// Obtain a list of devices that are available for each available API
pub fn getDeviceInfo(&mut self) -> &Vec<DeviceInfo> {
&self.devs
pub fn getDeviceInfo(&mut self) -> Vec<DeviceInfo> {
self.devs.clone()
}
fn scanDeviceInfo(&self) -> Vec<DeviceInfo> {
@ -227,14 +232,9 @@ impl StreamMgr {
}
}
// Remove queue from list
StreamCommand::RemoveInQueue(queue) => {
iqueues.retain(|q| !q.same_channel(&queue))
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
sendMsgToAllQueues(&mut iqueues, InStreamMsg::StreamStopped);
sendMsgToAllQueuesRemoveUnused(&mut iqueues, InStreamMsg::StreamStopped);
break 'infy;
}
StreamCommand::NewSiggen(_) => {
@ -249,7 +249,7 @@ impl StreamMgr {
let streamdata = Arc::new(streamdata);
let msg = InStreamMsg::StreamData(streamdata);
sendMsgToAllQueues(&mut iqueues, msg);
sendMsgToAllQueuesRemoveUnused(&mut iqueues, msg);
ctr += 1;
}
}
@ -295,11 +295,6 @@ impl StreamMgr {
panic!("Invalid message send to output thread: AddInQueue");
}
// Remove queue from list
StreamCommand::RemoveInQueue(_) => {
panic!("Invalid message send to output thread: RemoveInQueue");
}
// Stop this thread. Returns the queue
StreamCommand::StopThread => {
break 'infy;
@ -422,7 +417,7 @@ impl StreamMgr {
let meta = stream.metadata();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone()));
sendMsgToAllQueuesRemoveUnused(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
@ -453,7 +448,7 @@ impl StreamMgr {
let iqueues = self.instreamqueues.as_mut().unwrap();
let meta = stream.metadata();
sendMsgToAllQueues(iqueues, InStreamMsg::StreamStarted(meta.clone()));
sendMsgToAllQueuesRemoveUnused(iqueues, InStreamMsg::StreamStarted(meta.clone()));
let (threadhandle, commtx) = self.startInputStreamThread(meta, rx);
@ -582,7 +577,7 @@ impl Drop for StreamMgr {
// Send to all queues, remove queues that are disconnected when found out
// on the way.
fn sendMsgToAllQueues(iqueues: &mut InQueues, msg: InStreamMsg) {
fn sendMsgToAllQueuesRemoveUnused(iqueues: &mut InQueues, msg: InStreamMsg) {
// Loop over queues. Remove queues that error when we try to send
// to them
iqueues.retain(|q| match q.try_send(msg.clone()) {

View File

@ -2,24 +2,22 @@
use strum_macros::Display;
use super::*;
cfg_if::cfg_if! {
if #[cfg(feature = "python-bindings")] {
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{pymodule, pyclass, types::PyModule, PyResult};
} else {} }
/// Gives the stream status of a stream, either input / output or duplex.
/// Gives the stream status of a (possible) stream, either input / output or duplex.
#[derive(strum_macros::EnumMessage, Debug, Clone, Copy, Display)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub enum StreamStatus {
/// Stream is not running
#[strum(message = "NotRunning", detailed_message = "Stream is not running")]
NotRunning,
NotRunning{},
/// Stream is running properly
#[strum(message = "Running", detailed_message = "Stream is running")]
Running,
Running{},
/// An error occured in the stream.
#[strum(message = "Error", detailed_message = "An error occured with the stream")]
Error(StreamError)
Error{
/// In case the stream has an error: e is the field name
e: StreamError
}
}

View File

@ -1,5 +1,5 @@
use super::*;
use ndarray::prelude::*;
use crate::config::*;
use anyhow::{Result, bail};
use num::Complex;
@ -21,13 +21,12 @@ pub struct Biquad {
a1: Flt,
a2: Flt,
}
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl Biquad {
#[new]
/// Create new biquad filter. See [Biquad::new()]
///
pub fn new_py<'py>(coefs: PyReadonlyArrayDyn<Flt>) -> PyResult<Self> {
pub fn new_py<'py>(coefs: PyArrayLike1<Flt>) -> PyResult<Self> {
Ok(Biquad::new(coefs.as_slice()?)?)
}
#[pyo3(name = "unit")]
@ -48,8 +47,8 @@ impl Biquad {
&mut self,
py: Python<'py>,
input: PyArrayLike1<Flt>,
) -> PyResult<&'py PyArray1<Flt>> {
Ok(self.filter(input.as_slice()?).into_pyarray(py))
) -> PyResult<PyArr1Flt<'py>> {
Ok(PyArray1::from_vec_bound(py, self.filter(input.as_slice()?)))
}
}
impl Biquad {
@ -112,6 +111,8 @@ impl Biquad {
Ok(Biquad::new(&coefs).unwrap())
}
/// Filter input signal, output by overwriting input slice.
pub fn filter_inout(&mut self, inout: &mut [Flt]) {
for sample in inout.iter_mut() {
let w0 = *sample - self.a1 * self.w1 - self.a2 * self.w2;
@ -123,6 +124,7 @@ impl Biquad {
// println!("{:?}", inout);
}
}
impl Filter for Biquad {
fn filter(&mut self, input: &[Flt]) -> Vec<Flt> {
let mut out = input.to_vec();

View File

@ -1,17 +1,20 @@
use super::*;
use super::biquad::Biquad;
use anyhow::{bail, Result};
#[derive(Clone, Debug)]
#[cfg_attr(feature = "python-bindings", pyclass)]
/// Series of biquads that filter sequentially on an input signal
///
/// # Examples
///
/// See (tests)
///
use super::*;
use super::biquad::Biquad;
use anyhow::{bail, Result};
#[derive(Clone, Debug)]
#[cfg_attr(feature = "python-bindings", pyclass)]
pub struct SeriesBiquad {
biqs: Vec<Biquad>,
}
@ -19,6 +22,8 @@ pub struct SeriesBiquad {
#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl SeriesBiquad {
#[new]
/// Create new series filter set. See [SeriesBiquad::new()]
///
@ -37,8 +42,9 @@ impl SeriesBiquad {
&mut self,
py: Python<'py>,
input: PyArrayLike1<Flt>,
) -> PyResult<&'py PyArray1<Flt>> {
Ok(self.filter(input.as_slice()?).into_pyarray(py))
) -> Result<PyArr1Flt<'py>> {
let res = self.filter(input.as_slice()?);
Ok(PyArray1::from_vec_bound(py, res))
}
#[pyo3(name = "reset")]
/// See: [SeriesBiquad::reset()]

View File

@ -20,28 +20,22 @@ pub mod daq;
pub mod siggen;
use filter::*;
use daq::*;
/// A Python module implemented in Rust.
#[cfg(feature = "python-bindings")]
#[pymodule]
#[pyo3(name="_lasprs")]
fn lasprs(py: Python, m: &PyModule) -> PyResult<()> {
fn lasprs(m: &Bound<'_, PyModule>) -> PyResult<()> {
pyo3_add_submodule_filter(py, m)?;
Ok(())
}
/// Add filter submodule to extension
#[cfg(feature = "python-bindings")]
fn pyo3_add_submodule_filter(py: Python, m: &PyModule) -> PyResult<()> {
// Add filter submodule
let filter_module = PyModule::new(py, "filter")?;
filter_module.add_class::<filter::Biquad>()?;
filter_module.add_class::<filter::SeriesBiquad>()?;
filter_module.add_class::<filter::BiquadBank>()?;
m.add_submodule(filter_module)?;
m.add_class::<filter::Biquad>()?;
m.add_class::<filter::SeriesBiquad>()?;
m.add_class::<filter::BiquadBank>()?;
daq::add_py_classses(m)?;
Ok(())
}

View File

@ -23,9 +23,6 @@ use std::fmt::Debug;
use std::iter::ExactSizeIterator;
use std::slice::IterMut;
#[cfg(feature = "python-bindings")]
use pyo3::prelude::*;
use rand::prelude::*;
use rand::rngs::ThreadRng;
use rand_distr::StandardNormal;