diff --git a/CMakeLists.txt b/CMakeLists.txt index ee6f910..2f229c0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -67,7 +67,7 @@ endif(LASP_FLOAT STREQUAL "double") # ##################### END Cmake variables converted to a macro -set(Python_ADDITIONAL_VERSIONS "3.8 3.9") +set(Python_ADDITIONAL_VERSIONS "3.8") # #################### Setting definitions and debug-specific compilation flags # General make flags diff --git a/lasp/filter/biquad.py b/lasp/filter/biquad.py new file mode 100644 index 0000000..2eb7891 --- /dev/null +++ b/lasp/filter/biquad.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""! +Author: J.A. de Jong - ASCEE V.O.F. + +Description: Filter design implementation of common biquad filters that are +often used in parametric equalizers. + +Major source is Audio EQ Cookbook: + https://archive.is/20121220231853/http://www.musicdsp.org/ + files/Audio-EQ-Cookbook.txt + +The definition of the BiQuad filter coefficients as coming out of these +functions defines the filter as: + +y[n] = 1/ba[3] * ( ba[0] * x[n] + ba[1] * x[n-1] + ba[2] * x[n-2] + + + ba[4] * y[n-1] + ba[5] * y[n-2] + ) + +*Note that all filters are normalized such that ba[3] is by definition equal to +1.0!* + + +""" +__all__ = ['peaking', 'biquadTF', 'notch', 'lowpass', 'highpass', + 'highshelve', 'lowshelve'] + +from numpy import array, cos, pi, sin, sqrt +from scipy.interpolate import interp1d +from scipy.signal import sosfreqz + + +def peaking(fs, f0, Q, gain): + """ + Design of peaking biquad filter + + Args: + fs: Sampling frequency [Hz] + f0: Center frequency + Q: Quality factor (~ inverse of bandwidth) + gain: Increase in level at the center frequency [dB] + """ + A = sqrt(10**(gain/20)) + omg0 = 2*pi*f0/fs + alpha = sin(omg0)/Q/2 + b0 = 1+alpha*A + b1 = -2*cos(omg0) + b2 = 1-alpha*A + a0 = 1 + alpha/A + a1 = -2*cos(omg0) + a2 = 1-alpha/A + + return array([b0/a0, b1/a0, b2/a0, a0/a0, a1/a0, a2/a0]) + + +def notch(fs, f0, Q): + """ + Notch filter + + Args: + fs: Sampling frequency [Hz] + f0: Center frequency [Hz] + Q: Quality factor (~ inverse of bandwidth) + """ + omg0 = 2*pi*f0/fs + alpha = sin(omg0)/Q/2 + b0 = 1 + b1 = -2*cos(omg0) + b2 = 1 + a0 = 1 + alpha + a1 = -2*cos(omg0) + a2 = 1 - alpha + return array([b0/a0, b1/a0, b2/a0, a0/a0, a1/a0, a2/a0]) + + +def lowpass(fs, f0, Q): + """ + Second order low pass filter + + Args: + fs: Sampling frequency [Hz] + f0: Cut-off frequency [Hz] + Q: Quality factor (~ inverse of bandwidth) + """ + w0 = 2*pi*f0/fs + alpha = sin(w0)/Q/2 + b0 = (1 - cos(w0))/2 + b1 = 1 - cos(w0) + b2 = (1 - cos(w0))/2 + a0 = 1 + alpha + a1 = -2*cos(w0) + a2 = 1 - alpha + return array([b0/a0, b1/a0, b2/a0, a0/a0, a1/a0, a2/a0]) + + +def highpass(fs, f0, Q): + """ + Second order high pass filter + + Args: + fs: Sampling frequency [Hz] + f0: Cut-on frequency [Hz] + Q: Quality factor (~ inverse of bandwidth) + """ + w0 = 2*pi*f0/fs + alpha = sin(w0)/Q/2 + + b0 = (1 + cos(w0))/2 + b1 = -(1 + cos(w0)) + b2 = (1 + cos(w0))/2 + a0 = 1 + alpha + a1 = -2*cos(w0) + a2 = 1 - alpha + return array([b0/a0, b1/a0, b2/a0, a0/a0, a1/a0, a2/a0]) + + +def highshelve(fs, f0, Q, gain): + """ + High shelving filter + + Args: + fs: Sampling frequency [Hz] + f0: Cut-on frequency [Hz] + Q: Quality factor (~ inverse of bandwidth) + gain: Increase in level w.r.t. "wire" [dB] + """ + w0 = 2*pi*f0/fs + alpha = sin(w0)/Q/2 + A = 10**(gain/40) + b0 = A*((A+1) + (A-1)*cos(w0) + 2*sqrt(A)*alpha) + b1 = -2*A*((A-1) + (A+1)*cos(w0)) + b2 = A*((A+1) + (A-1)*cos(w0) - 2*sqrt(A)*alpha) + a0 = (A+1) - (A-1)*cos(w0) + 2*sqrt(A)*alpha + a1 = 2*((A-1) - (A+1)*cos(w0)) + a2 = (A+1) - (A-1)*cos(w0) - 2*sqrt(A)*alpha + return array([b0/a0, b1/a0, b2/a0, a0/a0, a1/a0, a2/a0]) + + +def lowshelve(fs, f0, Q, gain): + """ + Low shelving filter + + Args: + fs: Sampling frequency [Hz] + f0: Cut-on frequency [Hz] + Q: Quality factor (~ inverse of bandwidth) + gain: Increase in level w.r.t. "wire" [dB] + """ + w0 = 2*pi*f0/fs + alpha = sin(w0)/Q/2 + A = 10**(gain/40) + b0 = A*((A+1) - (A-1)*cos(w0) + 2*sqrt(A)*alpha) + b1 = 2*A*((A-1) - (A+1)*cos(w0)) + b2 = A*((A+1) - (A-1)*cos(w0) - 2*sqrt(A)*alpha) + a0 = (A+1) + (A-1)*cos(w0) + 2*sqrt(A)*alpha + a1 = -2*((A-1) + (A+1)*cos(w0)) + a2 = (A+1) + (A-1)*cos(w0) - 2*sqrt(A)*alpha + return array([b0/a0, b1/a0, b2/a0, a0/a0, a1/a0, a2/a0]) + + +def biquadTF(fs, freq, ba): + """ + Computes the transfer function of the biquad. + + Interpolates the frequency response to `freq` + + Args: + fs: Sampling frequency [Hz] + freq: Frequency array to compute the + ba: Biquad filter coefficients in common form. + + TODO: This code is not yet tested + """ + freq2, h = sosfreqz(ba, worN=freq, fs=fs) + interpolator = interp1d(freq2, h, kind='quadratic') + return interpolator(freq) diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index 08ba3c1..70bfc87 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -21,7 +21,6 @@ __all__ = ['AvStream'] video_x, video_y = 640, 480 - class AvStream: """Audio and video data stream, to which callbacks can be added for processing the data.""" diff --git a/lasp/lasp_measurement.py b/lasp/lasp_measurement.py index 4858afb..1e86a24 100644 --- a/lasp/lasp_measurement.py +++ b/lasp/lasp_measurement.py @@ -96,6 +96,9 @@ class IterRawData: Args: f: Audio dataset in the h5 file, accessed as f['audio'] + channels: list of channel indices to use + istart: index of first sample + istop: index of last sample (not including istop) """ assert isinstance(channels, list) @@ -173,22 +176,6 @@ class IterData(IterRawData): return scaleBlockSens(nextraw, self.sens) - -def exportAsWave(fn, fs, data, force=False): - if '.wav' not in fn[-4:]: - fn += '.wav' - - nchannels = data.shape[1] - sampwidth = getSampWidth(data.dtype) - - if os.path.exists(fn) and not force: - raise RuntimeError('File already exists: %s', fn) - - with wave.open(fn, 'w') as wf: - wf.setparams((nchannels, sampwidth, fs, 0, 'NONE', 'NONE')) - wf.writeframes(np.asfortranarray(data).tobytes()) - - class Measurement: """Provides access to measurement data stored in the h5 measurement file format.""" @@ -593,7 +580,8 @@ class Measurement: return False - def exportAsWave(self, fn=None, force=False, newsampwidth=None, normalize=True): + def exportAsWave(self, fn=None, force=False, newsampwidth=None, + normalize=True, **kwargs): """Export measurement file as wave. In case the measurement data is stored as floats, the values are scaled to the proper integer (PCM) data format. @@ -621,15 +609,25 @@ class Measurement: if os.path.exists(fn) and not force: raise RuntimeError(f'File already exists: {fn}') - data = self.rawData() + + if not np.isclose(self.samplerate%1,0): + raise RuntimeError(f'Sample rates should be approximately integer for exporting to Wave to work') + + # TODO: With VERY large measurment files, this is not possible! Is this + # a theoretical case? + data = self.rawData(**kwargs) + if np.issubdtype(data.dtype, np.floating) and newsampwidth is None: + raise ValueError('Newsampwidth parameter should be given for floating point raw data') if normalize: - maxabs = np.max(np.abs(data), axis=0) - data /= maxabs[np.newaxis, :] + # Scale back to maximum of absolute value + maxabs = np.max(np.abs(data)) + data /= maxabs if newsampwidth is not None: # Convert to floats, then to new sample width - data = scaleBlockSens(data, self.sensitivity**0) + sensone = np.ones_like(self.sensitivity) + data = scaleBlockSens(data, sensone) if newsampwidth == 2: newtype = np.int16 @@ -638,11 +636,12 @@ class Measurement: else: raise ValueError('Invalid sample width, should be 2 or 4') - scalefac = 2**(8*(newsampwidth-1))-1 + scalefac = 2**(8*newsampwidth-1)-1 + # Scale data to integer range, and then convert to integers data = (data*scalefac).astype(newtype) - wavfile.write(fn, self.samplerate, data) + wavfile.write(fn, int(self.samplerate), data) @staticmethod def fromtxt(fn,