lasp/python_src/lasp/lasp_measurementset.py

123 lines
4.0 KiB
Python

"""
Provides class MeasurementSet, a class used to perform checks and adjustments
on a group of measurements at the same time.
"""
__all__ = ["MeasurementSet"]
from .lasp_measurement import Measurement, MeasurementType
from typing import List
import time
class MeasurementSet(list):
"""
Group of measurements that have some correspondence to one another. Class
is used to operate on multiple measurements at once.
"""
def __init__(self, mlist: List[Measurement] = []):
"""
Initialize a measurement set
Args:
mlist: Measurement list
"""
if any([not isinstance(i, Measurement) for i in mlist]):
raise TypeError("Object in list should be of Measurement type")
# Sort by time stamp, otherwise the order is random
mlist.sort(key=lambda x: x.time, reverse=True)
super().__init__(mlist)
def getNewestReferenceMeasurement(self, mtype: MeasurementType):
"""Return the newest (in time) measurement in the current list of a certain type. Returns None in case no measurement could be found.
Args:
mtype (MeasurementType): The type required.
"""
mnewest = None
for m in self:
if m.measurementType() == mtype:
if mnewest is None:
mnewest = m
else:
if mnewest.time < m.time:
mnewest = m
return mnewest
def getReferenceMeasurements(self, mtype: MeasurementType):
"""Get all available reference measurements of a certain type in the
current set.
Args:
mtype (MeasurementType): The type of which to list
Returns:
a new measurement set including all measurements of a certain type
"""
return [m for m in self if m.measurementType() == mtype]
def getNewestReferenceMeasurements(self):
"""Returns a dictionary with newest measurement of each type that is not specific returns None in case no measurement is found."""
newest = {}
for m in self:
mtype = m.measurementType()
if mtype == MeasurementType.NotSpecific:
continue
if not mtype in newest:
newest[mtype] = m
else:
if m.time > newest[mtype].time:
newest[mtype] = m
return newest
def newestReferenceOlderThan(self, secs):
"""Returns a dictionary of references with the newest reference, that is still
older than `secs` seconds."""
curtime = time.time()
newest = self.getNewestReferenceMeasurements()
newest_older_than = {}
for key, m in newest.items():
if curtime - m.time >= secs:
newest_older_than[key] = m
return newest_older_than
def measTimeSame(self):
"""
Returns True if all measurements have the same measurement
time (recorded time)
"""
if len(self) > 0:
first = self[0].N
return all([first == meas.N for meas in self])
else:
return False
def measSimilar(self):
"""
Similar means: channel metadata is the same, and the measurement time
is the same. It means that the recorded data is, of course, different.
Returns:
True if measChannelsSame() and measTimeSame() else False
"""
return self.measTimeSame() and self.measChannelsSame()
def measChannelsSame(self):
"""
This method is used to check whether a set of measurements can be
accessed in a loop, i.e. for computing power spectra or sound levels on
a set of measurements, simultaneously. If the channel data is the same
(name, sensitivity, ...) it returns True.
"""
if len(self) > 0:
first = self[0].channelConfig
return all([first == meas.channelConfig for meas in self])
else:
return False