lasp/src/lasp/lasp_common.py

399 lines
10 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import shelve, logging, sys, appdirs, os, platform
import numpy as np
from collections import namedtuple
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from enum import Enum, unique, auto
from .lasp_cpp import DaqChannel
"""
Common definitions used throughout the code.
"""
__all__ = [
'P_REF', 'FreqWeighting', 'TimeWeighting', 'getTime', 'getFreq', 'Qty',
'SIQtys',
'lasp_shelve', 'this_lasp_shelve', 'W_REF', 'U_REF', 'I_REF', 'dBFS_REF',
'AvType'
]
# Reference sound pressure level
P_REF = 2e-5 # 20 micropascal
W_REF = 1e-12 # 1 picoWatt
I_REF = 1e-12 # 1 picoWatt/ m^2
# Reference velocity for sound velocity level
U_REF = 5e-8 # 50 nano meter / s
# Wiki: The unit dB FS or dBFS is defined in AES Standard AES17-1998,[13]
# IEC 61606,[14] and ITU-T P.38x,[15][16] such that the RMS value of a
# full-scale sine wave is designated 0dB FS
# The LASP code converts integer samples to floats, where the mapping is such
# that the maximum value (POSITIVE) that can be represented by the integer data
# is mapped to the value of 1.0. Following this convention, a sine wave with
# amplitude of 1.0 is 0 dB FS, and subsequently, its rms value is 0.5*sqrt(2),
# hence this is the reference level as specified below.
dBFS_REF = 0.5*2**0.5 # Which level would be -3.01 dBFS
@unique
class AvType(Enum):
"""Specificying the type of data, for adding and removing callbacks from
the stream."""
# Input stream
audio_input = (0, 'input')
# Output stream
audio_output = (1, 'output')
# Both input as well as output
audio_duplex = (2, 'duplex')
@dataclass_json
@dataclass(eq=False)
class Qty:
# Name, i.e.: Acoustic Pressure
name: str
# I.e.: Pascal
unit_name: str
# I.e.: Pa
unit_symb: str
# I.e.: ('dB SPL') <== tuple of possible level units
level_unit: object
# Contains a tuple of possible level names, including its reference value.
# For now, it is only able to compute for the first one. I.e.e we are not
# yet able to compute `dBPa's'
level_ref_name: object
level_ref_value: object
cpp_enum: DaqChannel.Qty
def __str__(self):
return f'{self.name} [{self.unit_symb}]'
def __eq__(self, other):
"""
Comparison breaks for the other objects, level unit, level ref name,
etc as these are tuples / a single string.
"""
# logging.debug(f'eq() {self.name} {other.name}')
return (self.name == other.name and
self.unit_name == other.unit_name)
def toInt(self):
"""
Convert quantity to its specific enum integer value
"""
return self.cpp_enum.value
@unique
class SIQtys(Enum):
N = Qty(name='Number',
unit_name='No unit / full scale',
unit_symb='-',
level_unit=('dBFS',),
level_ref_name=('Relative to full scale sine wave',),
level_ref_value=(dBFS_REF,),
cpp_enum = DaqChannel.Qty.Number
)
AP = Qty(name='Acoustic Pressure',
unit_name='Pascal',
unit_symb='Pa',
level_unit=('dB SPL','dBPa'),
level_ref_name=('2 micropascal', '1 pascal',),
level_ref_value=(P_REF, 1),
cpp_enum = DaqChannel.Qty.AcousticPressure
)
V = Qty(name='Voltage',
unit_name='Volt',
unit_symb=('V'),
level_unit=('dBV',), # dBV
level_ref_name=('1V',),
level_ref_value=(1.0,),
cpp_enum = DaqChannel.Qty.Voltage
)
@staticmethod
def default():
return SIQtys.N.value
@staticmethod
def fromCppEnum(enum):
"""
Convert enumeration index from - say - a measurement file back into
physical quantity information.
"""
for qty in SIQtys:
if qty.value.cpp_enum == enum:
return qty.value
raise RuntimeError(f'Qty corresponding to enum {enum} not found')
@staticmethod
def fromInt(val):
"""
Convert integer index from - say - a measurement file back into
physical quantity information.
"""
for qty in SIQtys:
if qty.value.cpp_enum.value == val:
return qty.value
raise RuntimeError(f'Qty corresponding to integer {val} not found')
@dataclass
class CalSetting:
name: str
cal_value_dB: float
cal_value_linear: float
qty: Qty
class CalibrationSettings:
one = CalSetting('94 dB SPL', 94.0 , 10**(94/20)*2e-5, SIQtys.AP.value)
two = CalSetting('1 Pa rms', 93.98, 1.0, SIQtys.AP.value)
three = CalSetting('114 dB SPL', 114.0 , 10**(114/20)*2e-5, SIQtys.AP.value)
four = CalSetting('10 Pa rms', 113.98, 10.0, SIQtys.AP.value)
five = CalSetting('93.7 dB SPL', 93.7 , 1.0, SIQtys.AP.value)
types = (one, two, three, four, five)
default = one
default_index = 0
@staticmethod
def fillComboBox(cb):
"""
Fill Calibration Settings to a combobox
Args:
cb: QComboBox to fill
"""
cb.clear()
for ty in CalibrationSettings.types:
cb.addItem(f'{ty.cal_value_dB}')
cb.setCurrentIndex(CalibrationSettings.default_index)
@staticmethod
def getCurrent(cb):
if cb.currentIndex() < len(CalibrationSettings.types):
return CalibrationSettings.types[cb.currentIndex()]
else:
return None
lasp_appdir = appdirs.user_data_dir('Lasp', 'ASCEE')
if not os.path.exists(lasp_appdir):
try:
os.makedirs(lasp_appdir, exist_ok=True)
except:
print('Fatal error: could not create application directory')
sys.exit(1)
class Shelve:
def load(self, key, default_value):
"""
Load data from a given key, if key is not found, returns the
default value if key is not found
"""
if key in self.shelve.keys():
return self.shelve[key]
else:
return default_value
def __enter__(self):
self.incref()
return self
def store(self, key, val):
self._shelve[key] = val
def deleteIfPresent(self, key):
try:
del self._shelve[key]
except:
pass
def printAllKeys(self):
print(list(self.shelve.keys()))
def incref(self):
if self.shelve is None:
assert self.refcount == 0
self.shelve = shelve.open(self.shelve_fn())
self.refcount += 1
def decref(self):
self.refcount -= 1
if self.refcount == 0:
self.shelve.close()
self.shelve = None
def __exit__(self, type, value, traceback):
self.decref()
class lasp_shelve(Shelve):
_refcount = 0
_shelve = None
@property
def refcount(self):
return lasp_shelve._refcount
@refcount.setter
def refcount(self, val):
lasp_shelve._refcount = val
@property
def shelve(self):
return lasp_shelve._shelve
@shelve.setter
def shelve(self, shelve):
lasp_shelve._shelve = shelve
def shelve_fn(self):
return os.path.join(lasp_appdir, 'config.shelve')
class this_lasp_shelve(Shelve):
_refcount = 0
_shelve = None
@property
def refcount(self):
return this_lasp_shelve._refcount
@refcount.setter
def refcount(self, val):
this_lasp_shelve._refcount = val
@property
def shelve(self):
return this_lasp_shelve._shelve
@shelve.setter
def shelve(self, shelve):
this_lasp_shelve._shelve = shelve
def shelve_fn(self):
node = platform.node()
return os.path.join(lasp_appdir, f'{node}_config.shelve')
class TimeWeighting:
# This is not actually a time weighting
none = (0, 'Raw (no time weighting)')
uufast = (1e-4, '0.1 ms')
ufast = (35e-3, 'Impulse (35 ms)')
fast = (0.125, 'Fast (0.125 s)')
slow = (1.0, 'Slow (1.0 s)')
tens = (10., '10 s')
# All-averaging: compute the current average of all history. Kind of the
# equivalent level. Only useful for power spectra. For Sound Level Meter,
# equivalent levels does that thing.
averaging = (-1, 'All-averaging')
types_all = (none, uufast, ufast, fast, slow, tens, averaging)
# Used for combobox of realt time power spectra
types_realtimeaps = (ufast, fast, slow, tens, averaging)
# Used for combobox of realt time sound level meter
types_realtimeslm = (ufast, fast, slow, tens)
# Used for combobox of sound level meter - time dependent
types_slmt = (none, uufast, ufast, fast, slow, tens)
# Used for combobox of sound level meter - Lmax statistics
types_slmstats = (uufast, ufast, fast, slow, tens)
default = fast
@staticmethod
def fillComboBox(cb, types):
"""
Fill TimeWeightings to a combobox
Args:
cb: QComboBox to fill
types: The types to fill it with
"""
cb.clear()
logging.debug(f'{types}')
for tw in types:
cb.addItem(tw[1], tw)
if TimeWeighting.default == tw:
cb.setCurrentIndex(cb.count()-1)
@staticmethod
def getCurrent(cb):
for type in TimeWeighting.types_all:
if cb.currentText() == type[1]:
return type
class FreqWeighting:
"""
Frequency weighting types
"""
Z = ('Z', 'Z-weighting')
A = ('A', 'A-weighting')
C = ('C', 'C-weighting')
types = (A, C, Z)
default = Z
default_index = 0
@staticmethod
def fillComboBox(cb):
"""
Fill FreqWeightings to a combobox
Args:
cb: QComboBox to fill
"""
cb.clear()
for fw in FreqWeighting.types:
cb.addItem(fw[1], fw)
cb.setCurrentIndex(FreqWeighting.default_index)
@staticmethod
def getCurrent(cb):
return FreqWeighting.types[cb.currentIndex()]
def getTime(fs, N, start=0):
"""
Return a time array for given number of points and sampling frequency.
Args:
fs: Sampling frequency [Hz]
N: Number of time samples
start: Optional start ofset in number of samples
"""
assert N > 0 and fs > 0
return np.linspace(start, start + N/fs, N, endpoint=False)
def getFreq(fs, nfft):
"""
return an array of frequencies for single-sided spectra
Args:
fs: Sampling frequency [Hz]
nfft: Fft length (int)
"""
df = fs/nfft # frequency resolution
K = nfft//2+1 # number of frequency bins
return np.linspace(0, (K-1)*df, K)