lasp/src/lasp/lasp_common.py

394 lines
9.7 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import shelve, logging, sys, appdirs, os, platform
import numpy as np
from collections import namedtuple
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from enum import Enum, unique, auto
from .lasp_cpp import Window as wWindow
"""
Common definitions used throughout the code.
"""
__all__ = [
'P_REF', 'FreqWeighting', 'TimeWeighting', 'getTime', 'getFreq', 'Qty',
'SIQtys', 'Window',
'lasp_shelve', 'this_lasp_shelve', 'W_REF', 'U_REF', 'I_REF', 'dBFS_REF',
'AvType'
]
# Reference sound pressure level
P_REF = 2e-5 # 20 micropascal
W_REF = 1e-12 # 1 picoWatt
I_REF = 1e-12 # 1 picoWatt/ m^2
# Reference velocity for sound velocity level
U_REF = 5e-8 # 50 nano meter / s
# Wiki: The unit dB FS or dBFS is defined in AES Standard AES17-1998,[13]
# IEC 61606,[14] and ITU-T P.38x,[15][16] such that the RMS value of a
# full-scale sine wave is designated 0dB FS
# The LASP code converts integer samples to floats, where the mapping is such
# that the maximum value (POSITIVE) that can be represented by the integer data
# is mapped to the value of 1.0. Following this convention, a sine wave with
# amplitude of 1.0 is 0 dB FS, and subsequently, its rms value is 0.5*sqrt(2),
# hence this is the reference level as specified below.
dBFS_REF = 0.5*2**0.5 # Which level would be -3.01 dBFS
@unique
class AvType(Enum):
"""Specificying the type of data, for adding and removing callbacks from
the stream."""
# Input stream
audio_input = (0, 'input')
# Output stream
audio_output = (1, 'output')
# Both input as well as output
audio_duplex = (2, 'duplex')
# video = 4
@dataclass_json
@dataclass(eq=False)
class Qty:
name: str
unit_name: str
unit_symb: str
level_unit: object
# Contains a tuple of possible level names, including its reference value.
# For now, it is only able to compute for the first one. I.e.e we are not
# yet able to compute `dBPa's'
level_ref_name: object
level_ref_value: object
def __str__(self):
return f'{self.name} [{self.unit_symb}]'
def __eq__(self, other):
# logging.debug('eq()')
"""
Comparison breaks for the other objects, level unit, level ref name,
etc as these are tuples / a single string.
"""
return (self.name == other.name and
self.unit_name == other.unit_name)
@unique
class SIQtys(Enum):
N = Qty(name='Number',
unit_name='No unit / full scale',
unit_symb='-',
level_unit=('dBFS',),
level_ref_name=('Relative to full scale sine wave',),
level_ref_value=(dBFS_REF,)
)
AP = Qty(name='Acoustic Pressure',
unit_name='Pascal',
unit_symb='Pa',
level_unit=('dB SPL','dBPa'),
level_ref_name=('2 micropascal', '1 pascal',),
level_ref_value=(P_REF, 1)
)
V = Qty(name='Voltage',
unit_name='Volt',
unit_symb=('V'),
level_unit=('dBV',), # dBV
level_ref_name=('1V',),
level_ref_value=(1.0,),
)
@staticmethod
def fillComboBox(cb):
"""
Fill to a combobox
Args:
cb: QComboBox to fill
"""
cb.clear()
for ty in SIQtys:
cb.addItem(f'{ty.value.unit_name}')
cb.setCurrentIndex(0)
@staticmethod
def default():
return SIQtys.N.value
@staticmethod
def getCurrent(cb):
return list(SIQtys)[cb.currentIndex()]
@dataclass
class CalSetting:
name: str
cal_value_dB: float
cal_value_linear: float
qty: Qty
class CalibrationSettings:
one = CalSetting('94 dB SPL', 94.0 , 1.0, SIQtys.AP)
two = CalSetting('114 dB SPL', 114.0 , 10.0, SIQtys.AP)
types = (one, two)
default = one
default_index = 0
@staticmethod
def fillComboBox(cb):
"""
Fill Calibration Settings to a combobox
Args:
cb: QComboBox to fill
"""
cb.clear()
for ty in CalibrationSettings.types:
cb.addItem(f'{ty.cal_value_dB}')
cb.setCurrentIndex(CalibrationSettings.default_index)
@staticmethod
def getCurrent(cb):
if cb.currentIndex() < len(CalibrationSettings.types):
return CalibrationSettings.types[cb.currentIndex()]
else:
return None
lasp_appdir = appdirs.user_data_dir('Lasp', 'ASCEE')
if not os.path.exists(lasp_appdir):
try:
os.makedirs(lasp_appdir, exist_ok=True)
except:
print('Fatal error: could not create application directory')
sys.exit(1)
class Shelve:
def load(self, key, default_value):
"""
Load data from a given key, if key is not found, returns the
default value if key is not found
"""
if key in self.shelve.keys():
return self.shelve[key]
else:
return default_value
def __enter__(self):
self.incref()
return self
def store(self, key, val):
self._shelve[key] = val
def deleteIfPresent(self, key):
try:
del self._shelve[key]
except:
pass
def printAllKeys(self):
print(list(self.shelve.keys()))
def incref(self):
if self.shelve is None:
assert self.refcount == 0
self.shelve = shelve.open(self.shelve_fn())
self.refcount += 1
def decref(self):
self.refcount -= 1
if self.refcount == 0:
self.shelve.close()
self.shelve = None
def __exit__(self, type, value, traceback):
self.decref()
class lasp_shelve(Shelve):
_refcount = 0
_shelve = None
@property
def refcount(self):
return lasp_shelve._refcount
@refcount.setter
def refcount(self, val):
lasp_shelve._refcount = val
@property
def shelve(self):
return lasp_shelve._shelve
@shelve.setter
def shelve(self, shelve):
lasp_shelve._shelve = shelve
def shelve_fn(self):
return os.path.join(lasp_appdir, 'config.shelve')
class this_lasp_shelve(Shelve):
_refcount = 0
_shelve = None
@property
def refcount(self):
return this_lasp_shelve._refcount
@refcount.setter
def refcount(self, val):
this_lasp_shelve._refcount = val
@property
def shelve(self):
return this_lasp_shelve._shelve
@shelve.setter
def shelve(self, shelve):
this_lasp_shelve._shelve = shelve
def shelve_fn(self):
node = platform.node()
return os.path.join(lasp_appdir, f'{node}_config.shelve')
@unique
class Window(Enum):
hann = (wWindow.Hann, 'Hann')
hamming = (wWindow.Hamming, 'Hamming')
rectangular = (wWindow.Rectangular, 'Rectangular')
bartlett = (wWindow.Bartlett, 'Bartlett')
blackman = (wWindow.Blackman, 'Blackman')
@staticmethod
def fillComboBox(cb):
"""
Fill Windows to a combobox
Args:
cb: QComboBox to fill
"""
cb.clear()
for w in list(Window):
cb.addItem(w.value[1], w)
cb.setCurrentIndex(0)
@staticmethod
def getCurrent(cb):
return list(Window)[cb.currentIndex()]
class TimeWeighting:
none = (-1, 'Raw (no time weighting)')
uufast = (1e-4, '0.1 ms')
ufast = (35e-3, 'Impulse (35 ms)')
fast = (0.125, 'Fast (0.125 s)')
slow = (1.0, 'Slow (1.0 s)')
tens = (10., '10 s')
infinite = (0, 'Infinite')
types_realtime = (ufast, fast, slow, tens, infinite)
types_all = (none, uufast, ufast, fast, slow, tens, infinite)
default = fast
default_index = 3
default_index_realtime = 1
@staticmethod
def fillComboBox(cb, realtime=False):
"""
Fill TimeWeightings to a combobox
Args:
cb: QComboBox to fill
"""
cb.clear()
if realtime:
types = TimeWeighting.types_realtime
defindex = TimeWeighting.default_index_realtime
else:
types = TimeWeighting.types_all
defindex = TimeWeighting.default_index
for tw in types:
cb.addItem(tw[1], tw)
cb.setCurrentIndex(defindex)
@staticmethod
def getCurrent(cb):
if cb.count() == len(TimeWeighting.types_realtime):
return TimeWeighting.types_realtime[cb.currentIndex()]
else:
return TimeWeighting.types_all[cb.currentIndex()]
class FreqWeighting:
"""
Frequency weighting types
"""
Z = ('Z', 'Z-weighting')
A = ('A', 'A-weighting')
C = ('C', 'C-weighting')
types = (A, C, Z)
default = Z
default_index = 0
@staticmethod
def fillComboBox(cb):
"""
Fill FreqWeightings to a combobox
Args:
cb: QComboBox to fill
"""
cb.clear()
for fw in FreqWeighting.types:
cb.addItem(fw[1], fw)
cb.setCurrentIndex(FreqWeighting.default_index)
@staticmethod
def getCurrent(cb):
return FreqWeighting.types[cb.currentIndex()]
def getTime(fs, N, start=0):
"""
Return a time array for given number of points and sampling frequency.
Args:
fs: Sampling frequency [Hz]
N: Number of time samples
start: Optional start ofset in number of samples
"""
assert N > 0 and fs > 0
return np.linspace(start, start + N/fs, N, endpoint=False)
def getFreq(fs, nfft):
"""
return an array of frequencies for single-sided spectra
Args:
fs: Sampling frequency [Hz]
nfft: Fft length (int)
"""
df = fs/nfft # frequency resolution
K = nfft//2+1 # number of frequency bins
return np.linspace(0, (K-1)*df, K)