123 lines
4.0 KiB
Python
123 lines
4.0 KiB
Python
"""
|
|
Provides class MeasurementSet, a class used to perform checks and adjustments
|
|
on a group of measurements at the same time.
|
|
|
|
"""
|
|
|
|
__all__ = ["MeasurementSet"]
|
|
from .lasp_measurement import Measurement, MeasurementType
|
|
from typing import List
|
|
import time
|
|
|
|
|
|
class MeasurementSet(list):
|
|
"""
|
|
Group of measurements that have some correspondence to one another. Class
|
|
is used to operate on multiple measurements at once.
|
|
|
|
"""
|
|
|
|
def __init__(self, mlist: List[Measurement] = []):
|
|
"""
|
|
Initialize a measurement set
|
|
|
|
Args:
|
|
mlist: Measurement list
|
|
|
|
"""
|
|
if any([not isinstance(i, Measurement) for i in mlist]):
|
|
raise TypeError("Object in list should be of Measurement type")
|
|
|
|
# Sort by time stamp, otherwise the order is random
|
|
mlist.sort(key=lambda x: x.time, reverse=True)
|
|
super().__init__(mlist)
|
|
|
|
def getNewestReferenceMeasurement(self, mtype: MeasurementType):
|
|
"""Return the newest (in time) measurement in the current list of a certain type. Returns None in case no measurement could be found.
|
|
|
|
Args:
|
|
mtype (MeasurementType): The type required.
|
|
"""
|
|
mnewest = None
|
|
for m in self:
|
|
if m.measurementType() == mtype:
|
|
if mnewest is None:
|
|
mnewest = m
|
|
else:
|
|
if mnewest.time < m.time:
|
|
mnewest = m
|
|
return mnewest
|
|
|
|
def getReferenceMeasurements(self, mtype: MeasurementType):
|
|
"""Get all available reference measurements of a certain type in the
|
|
current set.
|
|
|
|
Args:
|
|
mtype (MeasurementType): The type of which to list
|
|
|
|
Returns:
|
|
a new measurement set including all measurements of a certain type
|
|
"""
|
|
return [m for m in self if m.measurementType() == mtype]
|
|
|
|
def getNewestReferenceMeasurements(self):
|
|
"""Returns a dictionary with newest measurement of each type that is not specific returns None in case no measurement is found."""
|
|
newest = {}
|
|
for m in self:
|
|
mtype = m.measurementType()
|
|
if mtype == MeasurementType.NotSpecific:
|
|
continue
|
|
if not mtype in newest:
|
|
newest[mtype] = m
|
|
else:
|
|
if m.time > newest[mtype].time:
|
|
newest[mtype] = m
|
|
return newest
|
|
|
|
def newestReferenceOlderThan(self, secs):
|
|
"""Returns a dictionary of references with the newest reference, that is still
|
|
older than `secs` seconds."""
|
|
curtime = time.time()
|
|
newest = self.getNewestReferenceMeasurements()
|
|
newest_older_than = {}
|
|
for key, m in newest.items():
|
|
if curtime - m.time >= secs:
|
|
newest_older_than[key] = m
|
|
return newest_older_than
|
|
|
|
def measTimeSame(self):
|
|
"""
|
|
Returns True if all measurements have the same measurement
|
|
time (recorded time)
|
|
|
|
"""
|
|
if len(self) > 0:
|
|
first = self[0].N
|
|
return all([first == meas.N for meas in self])
|
|
else:
|
|
return False
|
|
|
|
def measSimilar(self):
|
|
"""
|
|
Similar means: channel metadata is the same, and the measurement time
|
|
is the same. It means that the recorded data is, of course, different.
|
|
|
|
Returns:
|
|
True if measChannelsSame() and measTimeSame() else False
|
|
|
|
"""
|
|
return self.measTimeSame() and self.measChannelsSame()
|
|
|
|
def measChannelsSame(self):
|
|
"""
|
|
This method is used to check whether a set of measurements can be
|
|
accessed in a loop, i.e. for computing power spectra or sound levels on
|
|
a set of measurements, simultaneously. If the channel data is the same
|
|
(name, sensitivity, ...) it returns True.
|
|
"""
|
|
if len(self) > 0:
|
|
first = self[0].channelConfig
|
|
return all([first == meas.channelConfig for meas in self])
|
|
else:
|
|
return False
|