149 lines
4.6 KiB
Python
149 lines
4.6 KiB
Python
"""
|
|
Provides class MeasurementSet, a class used to perform checks and adjustments
|
|
on a group of measurements at the same time.
|
|
|
|
"""
|
|
|
|
__all__ = ["MeasurementSet"]
|
|
from .lasp_measurement import Measurement, MeasurementType
|
|
from typing import List
|
|
import time
|
|
|
|
|
|
class MeasurementSet(list):
|
|
"""
|
|
Group of measurements that have some correspondence to one another. Class
|
|
is used to operate on multiple measurements at once.
|
|
"""
|
|
|
|
def __init__(self, mlist: List[Measurement] = []):
|
|
"""
|
|
Initialize a measurement set
|
|
|
|
Arg:
|
|
mlist: Measurement list
|
|
"""
|
|
|
|
if any([not isinstance(i, Measurement) for i in mlist]):
|
|
raise TypeError("Object in list should be of Measurement type")
|
|
|
|
# Sort by time stamp, otherwise the order is random
|
|
mlist.sort(key=lambda x: x.time, reverse=True)
|
|
super().__init__(mlist)
|
|
|
|
def getNewestReferenceMeasurement(self, mtype: MeasurementType):
|
|
"""
|
|
Get the NEWEST ref. measurement of a current type, in the current set.
|
|
|
|
Arg:
|
|
mtype (MeasurementType): The type required.
|
|
|
|
Return:
|
|
- The newest (in time) measurement in the current list of a certain type.
|
|
- None, in case no measurement could be found.
|
|
"""
|
|
|
|
mnewest = None
|
|
for m in self:
|
|
if m.measurementType() == mtype:
|
|
if mnewest is None:
|
|
mnewest = m
|
|
else:
|
|
if mnewest.time < m.time:
|
|
mnewest = m
|
|
return mnewest
|
|
|
|
def getReferenceMeasurements(self, mtype: MeasurementType):
|
|
"""Get ALL ref. measurements of a certain type, in the current set.
|
|
|
|
Arg:
|
|
mtype (MeasurementType): The type of which to list
|
|
|
|
Return:
|
|
A new measurement set, including all measurements of a certain type
|
|
"""
|
|
|
|
return [m for m in self if m.measurementType() == mtype]
|
|
|
|
def getNewestReferenceMeasurements(self):
|
|
"""
|
|
Get the NEWEST ref. measurement of all types, in the current set.
|
|
|
|
Return:
|
|
- A dictionary with the newest measurement of each type that is not specific
|
|
- None, in case no measurement is found.
|
|
"""
|
|
|
|
newest = {}
|
|
for m in self:
|
|
mtype = m.measurementType()
|
|
if mtype == MeasurementType.NotSpecific:
|
|
continue
|
|
if mtype not in newest:
|
|
newest[mtype] = m
|
|
else:
|
|
if m.time > newest[mtype].time:
|
|
newest[mtype] = m
|
|
return newest
|
|
|
|
def newestReferenceOlderThan(self, secs):
|
|
"""
|
|
Get a dictionary of reference measurements which are older than a
|
|
specified threshold. Only one of each type is returned.
|
|
|
|
Args:
|
|
- secs: time threshold, in seconds
|
|
|
|
Return:
|
|
- a dictionary of references with the newest reference, that is still
|
|
older than `secs` seconds
|
|
"""
|
|
curtime = time.time()
|
|
newest = self.getNewestReferenceMeasurements()
|
|
newest_older_than = {}
|
|
for key, m in newest.items():
|
|
if curtime - m.time >= secs:
|
|
newest_older_than[key] = m
|
|
return newest_older_than
|
|
|
|
def measTimeSame(self):
|
|
"""
|
|
Returns True if all measurements have the same measurement length and
|
|
sample rate
|
|
"""
|
|
|
|
if len(self) > 0:
|
|
firstN = self[0].N # samples
|
|
firstFS = self[0].samplerate # sample rate
|
|
sameN = all([firstN == meas.N for meas in self])
|
|
sameFS = all([firstFS == meas.samplerate for meas in self])
|
|
return sameN and sameFS
|
|
else:
|
|
return False
|
|
|
|
def measSimilar(self):
|
|
"""
|
|
Similar means: channel metadata is the same, and the measurement time
|
|
is the same.
|
|
|
|
Returns:
|
|
- True if measChannelsSame() and measTimeSame()
|
|
- False otherwise
|
|
"""
|
|
|
|
return self.measTimeSame() and self.measChannelsSame()
|
|
|
|
def measChannelsSame(self):
|
|
"""
|
|
This method is used to check whether a set of measurements can be
|
|
accessed in a loop, i.e. for computing power spectra or sound levels on
|
|
a set of measurements, simultaneously. If the channel data is the same
|
|
(name, sensitivity, ...) it returns True.
|
|
"""
|
|
|
|
if len(self) > 0:
|
|
first = self[0].channelConfig
|
|
return all([first == meas.channelConfig for meas in self])
|
|
else:
|
|
return False
|