399 lines
10 KiB
Python
399 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
||
import shelve, logging, sys, appdirs, os, platform
|
||
import numpy as np
|
||
|
||
from collections import namedtuple
|
||
from dataclasses import dataclass
|
||
from dataclasses_json import dataclass_json
|
||
from enum import Enum, unique, auto
|
||
from .lasp_cpp import DaqChannel
|
||
|
||
"""
|
||
Common definitions used throughout the code.
|
||
"""
|
||
|
||
__all__ = [
|
||
'P_REF', 'FreqWeighting', 'TimeWeighting', 'getTime', 'getFreq', 'Qty',
|
||
'SIQtys',
|
||
'lasp_shelve', 'this_lasp_shelve', 'W_REF', 'U_REF', 'I_REF', 'dBFS_REF',
|
||
'AvType'
|
||
]
|
||
|
||
# Reference sound pressure level
|
||
P_REF = 2e-5 # 20 micropascal
|
||
|
||
W_REF = 1e-12 # 1 picoWatt
|
||
I_REF = 1e-12 # 1 picoWatt/ m^2
|
||
|
||
# Reference velocity for sound velocity level
|
||
U_REF = 5e-8 # 50 nano meter / s
|
||
|
||
# Wiki: The unit dB FS or dBFS is defined in AES Standard AES17-1998,[13]
|
||
# IEC 61606,[14] and ITU-T P.38x,[15][16] such that the RMS value of a
|
||
# full-scale sine wave is designated 0 dB FS
|
||
|
||
# The LASP code converts integer samples to floats, where the mapping is such
|
||
# that the maximum value (POSITIVE) that can be represented by the integer data
|
||
# is mapped to the value of 1.0. Following this convention, a sine wave with
|
||
# amplitude of 1.0 is 0 dB FS, and subsequently, its rms value is 0.5*sqrt(2),
|
||
# hence this is the reference level as specified below.
|
||
dBFS_REF = 0.5*2**0.5 # Which level would be -3.01 dBFS
|
||
|
||
|
||
@unique
|
||
class AvType(Enum):
|
||
"""Specificying the type of data, for adding and removing callbacks from
|
||
the stream."""
|
||
|
||
# Input stream
|
||
audio_input = (0, 'input')
|
||
|
||
# Output stream
|
||
audio_output = (1, 'output')
|
||
|
||
# Both input as well as output
|
||
audio_duplex = (2, 'duplex')
|
||
|
||
|
||
@dataclass_json
|
||
@dataclass(eq=False)
|
||
class Qty:
|
||
# Name, i.e.: Acoustic Pressure
|
||
name: str
|
||
# I.e.: Pascal
|
||
unit_name: str
|
||
# I.e.: Pa
|
||
unit_symb: str
|
||
# I.e.: ('dB SPL') <== tuple of possible level units
|
||
level_unit: object
|
||
# Contains a tuple of possible level names, including its reference value.
|
||
# For now, it is only able to compute for the first one. I.e.e we are not
|
||
# yet able to compute `dBPa's'
|
||
level_ref_name: object
|
||
level_ref_value: object
|
||
cpp_enum: DaqChannel.Qty
|
||
|
||
def __str__(self):
|
||
return f'{self.name} [{self.unit_symb}]'
|
||
|
||
def __eq__(self, other):
|
||
"""
|
||
Comparison breaks for the other objects, level unit, level ref name,
|
||
etc as these are tuples / a single string.
|
||
|
||
"""
|
||
# logging.debug(f'eq() {self.name} {other.name}')
|
||
return (self.name == other.name and
|
||
self.unit_name == other.unit_name)
|
||
|
||
def toInt(self):
|
||
"""
|
||
Convert quantity to its specific enum integer value
|
||
"""
|
||
return self.cpp_enum.value
|
||
|
||
|
||
|
||
@unique
|
||
class SIQtys(Enum):
|
||
N = Qty(name='Number',
|
||
unit_name='No unit / full scale',
|
||
unit_symb='-',
|
||
level_unit=('dBFS',),
|
||
level_ref_name=('Relative to full scale sine wave',),
|
||
level_ref_value=(dBFS_REF,),
|
||
cpp_enum = DaqChannel.Qty.Number
|
||
)
|
||
AP = Qty(name='Acoustic Pressure',
|
||
unit_name='Pascal',
|
||
unit_symb='Pa',
|
||
level_unit=('dB SPL','dBPa'),
|
||
level_ref_name=('2 micropascal', '1 pascal',),
|
||
level_ref_value=(P_REF, 1),
|
||
cpp_enum = DaqChannel.Qty.AcousticPressure
|
||
)
|
||
|
||
V = Qty(name='Voltage',
|
||
unit_name='Volt',
|
||
unit_symb=('V'),
|
||
level_unit=('dBV',), # dBV
|
||
level_ref_name=('1V',),
|
||
level_ref_value=(1.0,),
|
||
cpp_enum = DaqChannel.Qty.Voltage
|
||
)
|
||
|
||
@staticmethod
|
||
def default():
|
||
return SIQtys.N.value
|
||
|
||
@staticmethod
|
||
def fromCppEnum(enum):
|
||
"""
|
||
Convert enumeration index from - say - a measurement file back into
|
||
physical quantity information.
|
||
"""
|
||
for qty in SIQtys:
|
||
if qty.value.cpp_enum == enum:
|
||
return qty.value
|
||
raise RuntimeError(f'Qty corresponding to enum {enum} not found')
|
||
|
||
@staticmethod
|
||
def fromInt(val):
|
||
"""
|
||
Convert integer index from - say - a measurement file back into
|
||
physical quantity information.
|
||
"""
|
||
for qty in SIQtys:
|
||
if qty.value.cpp_enum.value == val:
|
||
return qty.value
|
||
raise RuntimeError(f'Qty corresponding to integer {val} not found')
|
||
|
||
|
||
@dataclass
|
||
class CalSetting:
|
||
name: str
|
||
cal_value_dB: float
|
||
cal_value_linear: float
|
||
qty: Qty
|
||
|
||
class CalibrationSettings:
|
||
one = CalSetting('94 dB SPL', 94.0 , 10**(94/20)*2e-5, SIQtys.AP.value)
|
||
two = CalSetting('1 Pa rms', 93.98, 1.0, SIQtys.AP.value)
|
||
three = CalSetting('114 dB SPL', 114.0 , 10**(114/20)*2e-5, SIQtys.AP.value)
|
||
four = CalSetting('10 Pa rms', 113.98, 10.0, SIQtys.AP.value)
|
||
five = CalSetting('93.7 dB SPL', 93.7 , 1.0, SIQtys.AP.value)
|
||
|
||
types = (one, two, three, four, five)
|
||
default = one
|
||
default_index = 0
|
||
|
||
@staticmethod
|
||
def fillComboBox(cb):
|
||
"""
|
||
Fill Calibration Settings to a combobox
|
||
|
||
Args:
|
||
cb: QComboBox to fill
|
||
"""
|
||
cb.clear()
|
||
for ty in CalibrationSettings.types:
|
||
cb.addItem(f'{ty.cal_value_dB}')
|
||
cb.setCurrentIndex(CalibrationSettings.default_index)
|
||
|
||
@staticmethod
|
||
def getCurrent(cb):
|
||
if cb.currentIndex() < len(CalibrationSettings.types):
|
||
return CalibrationSettings.types[cb.currentIndex()]
|
||
else:
|
||
return None
|
||
|
||
lasp_appdir = appdirs.user_data_dir('Lasp', 'ASCEE')
|
||
|
||
if not os.path.exists(lasp_appdir):
|
||
try:
|
||
os.makedirs(lasp_appdir, exist_ok=True)
|
||
except:
|
||
print('Fatal error: could not create application directory')
|
||
sys.exit(1)
|
||
|
||
|
||
|
||
class Shelve:
|
||
def load(self, key, default_value):
|
||
"""
|
||
Load data from a given key, if key is not found, returns the
|
||
default value if key is not found
|
||
"""
|
||
if key in self.shelve.keys():
|
||
return self.shelve[key]
|
||
else:
|
||
return default_value
|
||
|
||
def __enter__(self):
|
||
self.incref()
|
||
return self
|
||
|
||
def store(self, key, val):
|
||
self._shelve[key] = val
|
||
|
||
def deleteIfPresent(self, key):
|
||
try:
|
||
del self._shelve[key]
|
||
except:
|
||
pass
|
||
|
||
def printAllKeys(self):
|
||
print(list(self.shelve.keys()))
|
||
|
||
def incref(self):
|
||
if self.shelve is None:
|
||
assert self.refcount == 0
|
||
self.shelve = shelve.open(self.shelve_fn())
|
||
self.refcount += 1
|
||
|
||
def decref(self):
|
||
self.refcount -= 1
|
||
if self.refcount == 0:
|
||
self.shelve.close()
|
||
self.shelve = None
|
||
|
||
def __exit__(self, type, value, traceback):
|
||
self.decref()
|
||
|
||
|
||
class lasp_shelve(Shelve):
|
||
_refcount = 0
|
||
_shelve = None
|
||
|
||
@property
|
||
def refcount(self):
|
||
return lasp_shelve._refcount
|
||
|
||
@refcount.setter
|
||
def refcount(self, val):
|
||
lasp_shelve._refcount = val
|
||
|
||
@property
|
||
def shelve(self):
|
||
return lasp_shelve._shelve
|
||
|
||
@shelve.setter
|
||
def shelve(self, shelve):
|
||
lasp_shelve._shelve = shelve
|
||
|
||
def shelve_fn(self):
|
||
return os.path.join(lasp_appdir, 'config.shelve')
|
||
|
||
|
||
class this_lasp_shelve(Shelve):
|
||
_refcount = 0
|
||
_shelve = None
|
||
|
||
@property
|
||
def refcount(self):
|
||
return this_lasp_shelve._refcount
|
||
|
||
@refcount.setter
|
||
def refcount(self, val):
|
||
this_lasp_shelve._refcount = val
|
||
|
||
@property
|
||
def shelve(self):
|
||
return this_lasp_shelve._shelve
|
||
|
||
@shelve.setter
|
||
def shelve(self, shelve):
|
||
this_lasp_shelve._shelve = shelve
|
||
|
||
def shelve_fn(self):
|
||
node = platform.node()
|
||
return os.path.join(lasp_appdir, f'{node}_config.shelve')
|
||
|
||
class TimeWeighting:
|
||
# This is not actually a time weighting
|
||
none = (0, 'Raw (no time weighting)')
|
||
|
||
uufast = (1e-4, '0.1 ms')
|
||
ufast = (35e-3, 'Impulse (35 ms)')
|
||
fast = (0.125, 'Fast (0.125 s)')
|
||
slow = (1.0, 'Slow (1.0 s)')
|
||
tens = (10., '10 s')
|
||
|
||
# All-averaging: compute the current average of all history. Kind of the
|
||
# equivalent level. Only useful for power spectra. For Sound Level Meter,
|
||
# equivalent levels does that thing.
|
||
averaging = (-1, 'All-averaging')
|
||
|
||
types_all = (none, uufast, ufast, fast, slow, tens, averaging)
|
||
|
||
# Used for combobox of realt time power spectra
|
||
types_realtimeaps = (ufast, fast, slow, tens, averaging)
|
||
# Used for combobox of realt time sound level meter
|
||
types_realtimeslm = (ufast, fast, slow, tens)
|
||
|
||
# Used for combobox of sound level meter - time dependent
|
||
types_slmt = (none, uufast, ufast, fast, slow, tens)
|
||
|
||
# Used for combobox of sound level meter - Lmax statistics
|
||
types_slmstats = (uufast, ufast, fast, slow, tens)
|
||
|
||
default = fast
|
||
|
||
@staticmethod
|
||
def fillComboBox(cb, types):
|
||
"""
|
||
Fill TimeWeightings to a combobox
|
||
|
||
Args:
|
||
cb: QComboBox to fill
|
||
types: The types to fill it with
|
||
"""
|
||
cb.clear()
|
||
|
||
logging.debug(f'{types}')
|
||
for tw in types:
|
||
cb.addItem(tw[1], tw)
|
||
if TimeWeighting.default == tw:
|
||
cb.setCurrentIndex(cb.count()-1)
|
||
|
||
@staticmethod
|
||
def getCurrent(cb):
|
||
for type in TimeWeighting.types_all:
|
||
if cb.currentText() == type[1]:
|
||
return type
|
||
|
||
|
||
class FreqWeighting:
|
||
"""
|
||
Frequency weighting types
|
||
"""
|
||
Z = ('Z', 'Z-weighting')
|
||
A = ('A', 'A-weighting')
|
||
C = ('C', 'C-weighting')
|
||
types = (A, C, Z)
|
||
default = Z
|
||
default_index = 0
|
||
|
||
@staticmethod
|
||
def fillComboBox(cb):
|
||
"""
|
||
Fill FreqWeightings to a combobox
|
||
|
||
Args:
|
||
cb: QComboBox to fill
|
||
"""
|
||
cb.clear()
|
||
for fw in FreqWeighting.types:
|
||
cb.addItem(fw[1], fw)
|
||
cb.setCurrentIndex(FreqWeighting.default_index)
|
||
|
||
@staticmethod
|
||
def getCurrent(cb):
|
||
return FreqWeighting.types[cb.currentIndex()]
|
||
|
||
|
||
def getTime(fs, N, start=0):
|
||
"""
|
||
Return a time array for given number of points and sampling frequency.
|
||
|
||
Args:
|
||
fs: Sampling frequency [Hz]
|
||
N: Number of time samples
|
||
start: Optional start ofset in number of samples
|
||
"""
|
||
assert N > 0 and fs > 0
|
||
return np.linspace(start, start + N/fs, N, endpoint=False)
|
||
|
||
|
||
def getFreq(fs, nfft):
|
||
"""
|
||
return an array of frequencies for single-sided spectra
|
||
|
||
Args:
|
||
fs: Sampling frequency [Hz]
|
||
nfft: Fft length (int)
|
||
"""
|
||
df = fs/nfft # frequency resolution
|
||
K = nfft//2+1 # number of frequency bins
|
||
return np.linspace(0, (K-1)*df, K)
|