From f58c6f88399fd1581773983fdf020c35f03a418d Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F." Date: Fri, 1 Nov 2024 23:54:21 +0100 Subject: [PATCH] Updated clip code to include data range from channel. Changed DaqChannel and DaqConfig struct to match expected API --- src/daq/api/api_cpal.rs | 31 ++++--- src/daq/api/mod.rs | 24 ++++- src/daq/daqconfig.rs | 187 ++++++++++++++++++++++---------------- src/daq/deviceinfo.rs | 15 ++- src/daq/qty.rs | 50 +++++++++- src/daq/streammetadata.rs | 36 ++++---- src/rt/ppm.rs | 43 +++++++-- src/rt/simpleclip.rs | 43 ++++++++- 8 files changed, 297 insertions(+), 132 deletions(-) diff --git a/src/daq/api/api_cpal.rs b/src/daq/api/api_cpal.rs index 761be49..5f60a2b 100644 --- a/src/daq/api/api_cpal.rs +++ b/src/daq/api/api_cpal.rs @@ -82,16 +82,16 @@ impl CpalApi { } } pub fn getDeviceInfo(&self) -> Result> { - let srs_1 = [ + let samplerates_set1 = [ 1000, 2000, 4000, 8000, 12000, 16000, 24000, 48000, 96000, 192000, 384000, ]; - let srs_2 = [11025, 22050, 44100, 88200]; + let samplerates_set2 = [11025, 22050, 44100, 88200]; - let mut srs_tot = Vec::from_iter(srs_1.iter().chain(srs_2.iter())); - srs_tot.sort(); - let srs_tot = Vec::from_iter(srs_tot.iter().copied().map(|i| *i as Flt)); - - // srs_tot.sort(); + let mut samplerates_set = + Vec::from_iter(samplerates_set1.iter().chain(samplerates_set2.iter())); + samplerates_set.sort(); + // Convert to floating point + let samplerates_set = Vec::from_iter(samplerates_set.iter().copied().map(|i| *i as Flt)); let mut devs = vec![]; 'devloop: for dev in self.host.devices()? { @@ -99,7 +99,7 @@ impl CpalApi { let mut iChannelCount = 0; let mut oChannelCount = 0; - let mut avSampleRates = srs_tot.clone(); + let mut avSampleRates = samplerates_set.clone(); let mut avFramesPerBlock = vec![256_usize, 512, 1024, 2048, 8192]; let mut sample_formats = vec![]; @@ -155,6 +155,9 @@ impl CpalApi { if iChannelCount == oChannelCount && oChannelCount == 0 { break 'devloop; } + if avSampleRates.len() == 0 { + break 'devloop; + } devs.push(DeviceInfo { api: StreamApiDescr::Cpal, device_name: dev.name()?, @@ -168,8 +171,10 @@ impl CpalApi { iChannelCount, oChannelCount, - + avInputRanges: vec![(-1.,1.)], + avOutputRanges: vec![(-1.,1.)], hasInputIEPE: false, + hasDuplexMode: false, hasInputACCouplingSwitch: false, hasInputTrigger: false, hasInternalOutputMonitor: false, @@ -512,10 +517,11 @@ impl CpalApi { let config: cpal::StreamConfig = supported_config.config(); let meta = StreamMetaData::new( - &conf.enabledInchannelConfig(), + &conf.enabledInChannels(), conf.dtype, supported_config.sample_rate().0 as Flt, framesPerBlock, + Qty::Number ); let meta = Arc::new(meta); @@ -578,6 +584,7 @@ impl CpalApi { dtype, config.sample_rate().0 as Flt, framesPerBlock, + Qty::Number ); let metadata = Arc::new(metadata); @@ -656,6 +663,7 @@ impl CpalApi { dtype, config.sample_rate.0 as Flt, framesPerBlock, + Qty::Number ); let md = Arc::new(md); let str = Box::new(CpalStream { @@ -733,10 +741,11 @@ impl CpalApi { let dtype = DataType::from(sampleformat); let md = StreamMetaData::new( - &cfg.enabledOutchannelConfig(), + &cfg.enabledOutChannels(), dtype, cpalconfig.sample_rate.0 as Flt, framesPerBlock, + Qty::Number ); let md = Arc::new(md); let str = Box::new(CpalStream { diff --git a/src/daq/api/mod.rs b/src/daq/api/mod.rs index 29f352b..e8291fb 100644 --- a/src/daq/api/mod.rs +++ b/src/daq/api/mod.rs @@ -1,14 +1,14 @@ +use crate::config::*; /// Daq apis that are optionally compiled in. Examples: /// /// - CPAL (Cross-Platform Audio Library) /// - ... use serde::{Deserialize, Serialize}; -use std::sync::Arc; +use std::{fmt::format, sync::Arc}; use strum::EnumMessage; use strum_macros; -use crate::config::*; -use super::{StreamStatus, StreamMetaData}; +use super::{StreamMetaData, StreamStatus}; #[cfg(feature = "cpal-api")] pub mod api_cpal; @@ -33,7 +33,15 @@ pub trait Stream { /// Stream API descriptor: type and corresponding text #[cfg_attr(feature = "python-bindings", pyclass(eq, eq_int))] -#[derive(strum_macros::EnumMessage, Debug, Clone, PartialEq, Serialize, Deserialize, strum_macros::Display)] +#[derive( + strum_macros::EnumMessage, + Debug, + Clone, + PartialEq, + Serialize, + Deserialize, + strum_macros::Display, +)] #[allow(dead_code)] pub enum StreamApiDescr { /// CPAL api @@ -42,4 +50,10 @@ pub enum StreamApiDescr { /// PulseAudio api #[strum(message = "pulse", detailed_message = "Pulseaudio")] Pulse = 1, -} \ No newline at end of file +} +#[cfg(feature = "python-bindings")] +impl StreamApiDescr { + fn __str__(&self) -> String { + format!("{}", self.get_detailed_message().unwrap()) + } +} diff --git a/src/daq/daqconfig.rs b/src/daq/daqconfig.rs index 9791008..391915a 100644 --- a/src/daq/daqconfig.rs +++ b/src/daq/daqconfig.rs @@ -2,7 +2,7 @@ use std::{ops::Index, path::PathBuf}; use super::*; use crate::config::*; -use anyhow::Result; +use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; /// DAQ Configuration for a single channel @@ -19,21 +19,25 @@ pub struct DaqChannel { pub IEPEEnabled: bool, /// Enabled hardware AC coupling (if) pub ACCouplingMode: bool, - /// If supporting multiple input ranges: select the right index - pub rangeIndex: usize, + /// The configured range (minumum, maximum) value + pub range: (Flt, Flt), /// Physical quantity pub qty: Qty, + /// Apply digital highpass filter to remove D.C. before processing + /// Value is in Hz. A value <= 0 means it is disabled. + pub digitalHighpassCutOn: Flt, } impl Default for DaqChannel { fn default() -> Self { DaqChannel { enabled: false, name: "".into(), - sensitivity: -1.0, + sensitivity: 1.0, IEPEEnabled: false, ACCouplingMode: false, - rangeIndex: 0, + range: (-1.0, 1.0), qty: Qty::Number, + digitalHighpassCutOn: -1., } } } @@ -46,11 +50,20 @@ impl DaqChannel { sensitivity: 1.0, IEPEEnabled: false, ACCouplingMode: false, - rangeIndex: 0, + range: (-1.0, 1.0), qty: Qty::Number, + digitalHighpassCutOn: -1., } } } +#[cfg_attr(feature = "python-bindings", pymethods)] +impl DaqChannel { + #[cfg(feature = "python-bindings")] + #[new] + fn new() -> Self { + Self::default() + } +} /// Configuration of a device. #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] @@ -64,10 +77,10 @@ pub struct DaqConfig { /// Configuration of the input channels pub inchannel_config: Vec, - + /// Configuration of the output channels pub outchannel_config: Vec, - + /// The data type to use pub dtype: DataType, @@ -84,18 +97,104 @@ pub struct DaqConfig { pub monitorOutput: bool, } -#[cfg(feature = "python-bindings")] #[cfg_attr(feature = "python-bindings", pymethods)] impl DaqConfig { + #[cfg(feature = "python-bindings")] #[pyo3(name = "newFromDeviceInfo")] #[staticmethod] fn newFromDeviceInfo_py(d: &DeviceInfo) -> PyResult { Ok(DaqConfig::newFromDeviceInfo(d)) } + + #[cfg(feature = "python-bindings")] fn __repr__(&self) -> String { format!("{:#?}", self) } + #[cfg(feature = "python-bindings")] + #[staticmethod] + fn fromTOML(toml: &str) -> PyResult { + let res = toml::from_str::(toml).map_err(|e| anyhow!(format!("{e}")))?; + Ok(res) + } + #[cfg(feature = "python-bindings")] + fn toTOML(&self) -> PyResult { + Ok(toml::to_string(&self).map_err(|e| anyhow!(format!("{e}")))?) + } + /// Returns the total number of channels that appear in a running input stream. + pub fn numberEnabledInChannels(&self) -> usize { + self.inchannel_config.iter().filter(|ch| ch.enabled).count() + } + /// Returns the total number of channels that appear in a running output stream. + pub fn numberEnabledOutChannels(&self) -> usize { + self.outchannel_config + .iter() + .filter(|ch| ch.enabled) + .count() + } + /// Provide samplerate, based on device and specified sample rate index + pub fn sampleRate(&self, dev: &DeviceInfo) -> Flt { + *dev.avSampleRates.get(self.sampleRateIndex).unwrap() + } + + /// Provide samplerate, based on device and specified sample rate index + pub fn framesPerBlock(&self, dev: &DeviceInfo) -> usize { + dev.avFramesPerBlock[self.framesPerBlockIndex] + } + /// Returns vec of channel configuration for enabled input channels only + pub fn enabledInChannels(&self) -> Vec { + self.inchannel_config + .iter() + .filter(|ch| ch.enabled) + .cloned() + .collect() + } + /// Returns a list of enabled input channel numbers as indices + /// in the list of all input channels (enabled and not) + pub fn enabledInchannelsList(&self) -> Vec { + self.inchannel_config + .iter() + .enumerate() + .filter(|(_, ch)| ch.enabled) + .map(|(i, _)| i) + .collect() + } + /// Returns vec of channel configuration for enabled output channels only + pub fn enabledOutChannels(&self) -> Vec { + self.outchannel_config + .iter() + .filter(|ch| ch.enabled) + .cloned() + .collect() + } + + /// Returns the channel number of the highest enabled input channel, if any. + pub fn highestEnabledInChannel(&self) -> Option { + let mut highest = None; + + self.inchannel_config.iter().enumerate().for_each(|(i, c)| { + if c.enabled { + highest = Some(i); + } + }); + + highest + } + /// Returns the channel number of the highest enabled output channel, if any. + pub fn highestEnabledOutChannel(&self) -> Option { + let mut highest = None; + + self.outchannel_config + .iter() + .enumerate() + .for_each(|(i, c)| { + if c.enabled { + highest = Some(i); + } + }); + + highest + } } impl DaqConfig { /// Creates a new default device configuration for a given device as specified with @@ -182,74 +281,4 @@ impl DaqConfig { self.serialize_TOML(&mut file)?; Ok(()) } - - /// Returns a list of enabled input channel numbers as indices - /// in the list of all input channels (enabled and not) - pub fn enabledInchannelsList(&self) -> Vec { - self.inchannel_config - .iter() - .enumerate() - .filter(|(_, ch)| ch.enabled) - .map(|(i, _)| i) - .collect() - } - - /// Returns the total number of channels that appear in a running input stream. - pub fn numberEnabledInChannels(&self) -> usize { - self.inchannel_config.iter().filter(|ch| ch.enabled).count() - } - /// Returns the total number of channels that appear in a running output stream. - pub fn numberEnabledOutChannels(&self) -> usize { - self.outchannel_config - .iter() - .filter(|ch| ch.enabled) - .count() - } - - /// Provide samplerate, based on device and specified sample rate index - pub fn sampleRate(&self, dev: &DeviceInfo) -> Flt { - dev.avSampleRates[self.sampleRateIndex] - } - - /// Provide samplerate, based on device and specified sample rate index - pub fn framesPerBlock(&self, dev: &DeviceInfo) -> usize { - dev.avFramesPerBlock[self.framesPerBlockIndex] - } - - /// Returns vec of channel configuration for enabled input channels only - pub fn enabledInchannelConfig(&self) -> Vec { - self.inchannel_config - .iter() - .filter(|ch| ch.enabled) - .cloned() - .collect() - } - /// Returns vec of channel configuration for enabled output channels only - pub fn enabledOutchannelConfig(&self) -> Vec { - self.outchannel_config - .iter() - .filter(|ch| ch.enabled) - .cloned() - .collect() - } - - /// Returns the channel number of the highest enabled input channel, if any. - pub fn highestEnabledInChannel(&self) -> Option { - let mut highest = None; - - self.inchannel_config.iter().enumerate().for_each(|(i,c)| if c.enabled {highest = Some(i);}); - - highest - } - /// Returns the channel number of the highest enabled output channel, if any. - pub fn highestEnabledOutChannel(&self) -> Option { - let mut highest = None; - - self.outchannel_config.iter().enumerate().for_each(|(i,c)| if c.enabled {highest = Some(i);}); - println!("{:?}", highest); - - highest - } - - } diff --git a/src/daq/deviceinfo.rs b/src/daq/deviceinfo.rs index fc63222..ac35e9d 100644 --- a/src/daq/deviceinfo.rs +++ b/src/daq/deviceinfo.rs @@ -58,18 +58,27 @@ pub struct DeviceInfo { /// Daq's that are able to run in full duplex mode. pub hasInternalOutputMonitor: bool, + /// Whether the device can run in duplex mode Y/N. Duplex mode means that a + /// stream provides both input and output at the same time. + pub hasDuplexMode: bool, + /// This flag is used to be able to indicate that the device cannot run /// input and output streams independently, without opening the device in /// duplex mode. This is for example true for the UlDaq: only one handle to /// the device can be given at the same time. pub duplexModeForced: bool, - /// The physical quantity of the output signal. For 'normal' audio + /// The physical quantity of the input / output signal. For 'normal' audio /// devices, this is typically a 'number' between +/- full scale. For some /// devices however, the output quantity corresponds to a physical signal, - /// such a Volts. - // #[pyo3(get)] + /// such a Volts. Same holds for inputs. pub physicalIOQty: Qty, + + /// Data range for input signal (minimum, maximum) + pub avInputRanges: Vec<(Flt, Flt)>, + + /// Data range for output signal (minimum, maximum) + pub avOutputRanges: Vec<(Flt, Flt)>, } #[cfg_attr(feature = "python-bindings", pymethods)] impl DeviceInfo { diff --git a/src/daq/qty.rs b/src/daq/qty.rs index efb45ac..dc23f00 100644 --- a/src/daq/qty.rs +++ b/src/daq/qty.rs @@ -2,9 +2,9 @@ //! use crate::config::*; +use serde::{Deserialize, Serialize}; use strum::EnumMessage; use strum_macros; -use serde::{Serialize, Deserialize}; /// Physical quantities that are I/O of a Daq device. #[cfg_attr(feature = "python-bindings", pyclass(eq, eq_int))] @@ -15,7 +15,10 @@ pub enum Qty { #[strum(message = "number", detailed_message = "Unitless number")] Number = 0, /// Acoustic pressure - #[strum(message = "acousticpressure", detailed_message = "Acoustic Pressure [Pa]")] + #[strum( + message = "acousticpressure", + detailed_message = "Acoustic Pressure [Pa]" + )] AcousticPressure = 1, /// Voltage #[strum(message = "voltage", detailed_message = "Voltage [V]")] @@ -24,3 +27,46 @@ pub enum Qty { /// User defined UserDefined = 3, } + +#[cfg_attr(feature = "python-bindings", pymethods)] +impl Qty { + #[cfg(feature = "python-bindings")] + #[staticmethod] + fn all() -> Vec { + use Qty::*; + vec![Number, AcousticPressure, Voltage] + } + fn __str__(&self) -> String { + self.get_detailed_message().unwrap().into() + } + /// Return a unit symbol for the current quantity + fn unit_symb(&self) -> String { + use Qty::*; + match self { + Number => "1".into(), + AcousticPressure => "Pa".into(), + Voltage => "V".into(), + UserDefined => "?".into(), + } + } + /// Reference level for computing dB's + fn level_ref_value(&self) -> Flt { + use Qty::*; + match self { + Number => 1., + AcousticPressure => 2e-5, + Voltage => 1., + UserDefined => 1., + } + } + /// Level units (dB re..) + fn level_unit(&self) -> String { + use Qty::*; + match self { + Number => "dB re FS".into(), + AcousticPressure => "dB SPL".into(), + Voltage => "dBV".into(), + UserDefined => "?".into(), + } + } +} diff --git a/src/daq/streammetadata.rs b/src/daq/streammetadata.rs index 5aa9869..643d7cb 100644 --- a/src/daq/streammetadata.rs +++ b/src/daq/streammetadata.rs @@ -4,7 +4,7 @@ use anyhow::Result; /// Stream metadata. All information required for properly interpreting the raw /// data that is coming from the stream. -#[cfg_attr(feature = "python-bindings", pyclass)] +#[cfg_attr(feature = "python-bindings", pyclass(get_all))] #[derive(Clone, Debug)] pub struct StreamMetaData { /// Information for each channel in the stream @@ -20,18 +20,27 @@ pub struct StreamMetaData { /// channelInfo.len() we get the total number of samples that come in at /// each callback. pub framesPerBlock: usize, + + /// The quantity of input / output + pub physicalIOQty: Qty, } impl StreamMetaData { /// Create new metadata object. - /// /// + /// # Args /// + /// - `channelInfo`: DaqChannel configuraion for each channel in the stream + /// - `rawdtype`: Datatype of raw stream data + /// - `samplerate`: Sampling frequency \[Hz\] + /// - `framesPerBlock`: Number of frames per callback + /// - `ioqty` - Physical quantity of i/o pub fn new<'a, T>( channelInfo: T, rawdtype: DataType, - sr: Flt, + samplerate: Flt, framesPerBlock: usize, + ioqty: Qty, ) -> StreamMetaData where T: IntoIterator, @@ -49,8 +58,9 @@ impl StreamMetaData { StreamMetaData { channelInfo, rawDatatype: rawdtype, - samplerate: sr, + samplerate, framesPerBlock, + physicalIOQty: ioqty, } } @@ -65,21 +75,7 @@ impl StreamMetaData { #[cfg(feature = "python-bindings")] #[cfg_attr(feature = "python-bindings", pymethods)] impl StreamMetaData { - #[getter] - fn channelInfo(&self) -> Vec { - self.channelInfo.clone() - } - #[getter] - fn rawDatatype(&self) -> DataType { - self.rawDatatype - } - - #[getter] - fn samplerate(&self) -> Flt { - self.samplerate - } - #[getter] - fn framesPerBlock(&self) -> usize { - self.framesPerBlock + fn __repr__(&self) -> String { + format!("{:#?}", self) } } diff --git a/src/rt/ppm.rs b/src/rt/ppm.rs index 8a52572..f4af49f 100644 --- a/src/rt/ppm.rs +++ b/src/rt/ppm.rs @@ -1,5 +1,5 @@ -use crate::daq::InStreamMsg; -use crate::math::maxabs; +use crate::daq::{InStreamMsg, StreamMetaData}; +use crate::math::{max, maxabs, min}; use crate::slm::{self, SLMSettingsBuilder, TimeWeighting, SLM}; use crate::{config::*, FreqWeighting, StandardFilterDescriptor}; use crate::{daq::StreamMgr, Dcol}; @@ -26,6 +26,10 @@ const LEVEL_THRESHOLD_FOR_HIGH_LEVEL: Flt = -10.; type SharedPPMStatus = Arc>>; +fn level(lin: Flt) -> Flt { + 20. * lin.log10() +} + /// Peak programme meter implementation, including clip detector. Effectively uses a realtime SLM on all /// input channels. Also includes a clipping detector. #[derive(Debug)] @@ -62,6 +66,7 @@ impl PPM { rayon::spawn(move || { let mut slms: Vec = vec![]; + let mut streammeta: Option> = None; let resetall = |slms: &mut Vec| { let mut status = status.lock(); @@ -77,12 +82,19 @@ impl PPM { InStreamMsg::InStreamData(d) => { let mut status = status.lock(); let floatdata = d.getFloatData(); + let meta = streammeta.as_ref().expect("Stream metadata not available"); + let channels = &meta.channelInfo; - 'channel: for (chno, (slm, ppmstatus)) in - slms.iter_mut().zip(status.iter_mut()).enumerate() + 'channel: for (chno, ((slm, ppmstatus), ch)) in slms + .iter_mut() + .zip(status.iter_mut()) + .zip(channels) + .enumerate() { let chdata = floatdata.slice(s![.., chno]); - let maxabs_new = maxabs(chdata); + let min_val = min(chdata); + let max_val = max(chdata); + let chdata = chdata .as_slice() .expect("Data not contiguous on sample axis"); @@ -104,13 +116,27 @@ impl PPM { continue 'channel; } + let clip = min_val <= ALMOST_CLIPPED_REL_AMP * ch.range.0 + || max_val >= ALMOST_CLIPPED_REL_AMP * ch.range.1; + + let abs_range = if ch.range.0.abs() > ch.range.1.abs() { + ch.range.0.abs() + } else { + ch.range.1.abs() + }; + let high_level_threshold = + level(abs_range) + LEVEL_THRESHOLD_FOR_HIGH_LEVEL; + let low_level_threshold = + level(abs_range) + LEVEL_THRESHOLD_FOR_LOW_LEVEL; + let high_level = last_level > high_level_threshold; + let low_level = last_level < low_level_threshold; // Update clip status, if we were not clipping - ppmstatus.clip = if maxabs_new > ALMOST_CLIPPED_REL_AMP { + ppmstatus.clip = if clip { ppmstatus.clip_time = Some(Instant::now()); ClipState::Clipped - } else if last_level > LEVEL_THRESHOLD_FOR_HIGH_LEVEL { + } else if high_level { ClipState::HighLevel - } else if last_level < LEVEL_THRESHOLD_FOR_LOW_LEVEL { + } else if low_level { ClipState::LowLevel } else { ClipState::LevelFine @@ -149,6 +175,7 @@ impl PPM { clip_time: None, }); }); + streammeta = Some(meta); } InStreamMsg::StreamStopped => {} } diff --git a/src/rt/simpleclip.rs b/src/rt/simpleclip.rs index 6d7853e..e11fe55 100644 --- a/src/rt/simpleclip.rs +++ b/src/rt/simpleclip.rs @@ -1,6 +1,9 @@ use crate::daq::InStreamMsg; +use crate::daq::StreamMetaData; use crate::daq::StreamMgr; +use crate::math::max; use crate::math::maxabs; +use crate::math::min; use crate::Flt; use crossbeam::channel::bounded; use parking_lot::Mutex; @@ -10,8 +13,9 @@ use std::{ time::Duration, }; -/// If signal is above this value, we indicate that the signal has clipped. -const CLIP_STRONG_LIMIT: Flt = 0.999; +/// If signal is below / above the range times the value below, we indicate that +/// the signal has clipped. +const CLIP_REL_LIMIT: Flt = 0.999; /// Very simple clip detector. Used to detect cliping in a recording. Stores one /// clip value if just something happened between time of new and moment of drop(). @@ -38,12 +42,40 @@ impl SimpleClipDetector { smgr.addInQueue(tx); rayon::spawn(move || loop { + let mut streammeta: Option> = None; if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1500)) { match msg { InStreamMsg::InStreamData(dat) => { + let meta = streammeta + .expect("If we are here, stream metadata should be available"); let flt = dat.getFloatData(); - let maxabs = maxabs(flt.view()); - if maxabs >= CLIP_STRONG_LIMIT { + let maxs = flt + .columns() + .into_iter() + .map(|col| max(col)) + .collect::>(); + let mins = flt + .columns() + .into_iter() + .map(|col| min(col)) + .collect::>(); + + let mut clip = false; + + maxs.into_iter().zip(mins).zip(&meta.channelInfo).for_each( + |((max, min), ch)| { + let min_for_clip = CLIP_REL_LIMIT * ch.range.0; + let max_for_clip = CLIP_REL_LIMIT * ch.range.1; + if max >= max_for_clip { + clip = true; + } + if min <= min_for_clip { + clip = true; + } + }, + ); + + if clip { clipstate.store(true, Relaxed); // We do not have to do anything anymore. The signal // has clipped so we do not have to check any new @@ -51,6 +83,9 @@ impl SimpleClipDetector { return; } } + InStreamMsg::StreamStarted(meta) => { + streammeta = Some(meta); + } _ => {} } };