Compare commits

...

5 Commits

6 changed files with 325 additions and 112 deletions

View File

@ -24,10 +24,7 @@ typedef struct Siggen {
SignalType signaltype; SignalType signaltype;
d fs; // Sampling frequency [Hz] d fs; // Sampling frequency [Hz]
d level_amp; d level_amp;
void* private;
char private_data[PRIVATE_SIZE]; char private_data[PRIVATE_SIZE];
} Siggen; } Siggen;
typedef struct { typedef struct {
@ -35,6 +32,16 @@ typedef struct {
d omg; d omg;
} SinewaveSpecific; } SinewaveSpecific;
typedef struct {
d fl;
d fu;
d Ts;
d phase;
d tau;
bool pos;
us flags;
} SweepSpecific;
typedef struct { typedef struct {
d V1, V2, S; d V1, V2, S;
int phase; int phase;
@ -51,7 +58,6 @@ Siggen* Siggen_create(SignalType type, const d fs,const d level_dB) {
Siggen* siggen = a_malloc(sizeof(Siggen)); Siggen* siggen = a_malloc(sizeof(Siggen));
siggen->signaltype = type; siggen->signaltype = type;
siggen->fs = fs; siggen->fs = fs;
siggen->private = NULL;
siggen->level_amp = level_amp(level_dB); siggen->level_amp = level_amp(level_dB);
feTRACE(15); feTRACE(15);
@ -61,23 +67,24 @@ Siggen* Siggen_create(SignalType type, const d fs,const d level_dB) {
Siggen* Siggen_Sinewave_create(const d fs, const d freq,const d level_dB) { Siggen* Siggen_Sinewave_create(const d fs, const d freq,const d level_dB) {
fsTRACE(15); fsTRACE(15);
Siggen* sinesiggen = Siggen_create(SINEWAVE, fs, level_dB); Siggen* sine = Siggen_create(SINEWAVE, fs, level_dB);
sinesiggen->private = sinesiggen->private_data; dbgassert(sizeof(SinewaveSpecific) <= sizeof(sine->private_data),
SinewaveSpecific* sp = (SinewaveSpecific*) sinesiggen->private; "Allocated memory too small");
SinewaveSpecific* sp = (SinewaveSpecific*) sine->private_data;
sp->curtime = 0; sp->curtime = 0;
sp->omg = 2*number_pi*freq; sp->omg = 2*number_pi*freq;
feTRACE(15); feTRACE(15);
return sinesiggen; return sine;
} }
Siggen* Siggen_Whitenoise_create(const d fs, const d level_dB) { Siggen* Siggen_Whitenoise_create(const d fs, const d level_dB) {
fsTRACE(15); fsTRACE(15);
Siggen* whitenoise = Siggen_create(WHITENOISE, fs, level_dB); Siggen* whitenoise = Siggen_create(WHITENOISE, fs, level_dB);
whitenoise->private = whitenoise->private_data; dbgassert(sizeof(WhitenoiseSpecific) <= sizeof(whitenoise->private_data),
dbgassert(sizeof(WhitenoiseSpecific) <= sizeof(whitenoise->private_data), "Allocated memory too small"); "Allocated memory too small");
WhitenoiseSpecific* wn = whitenoise->private; WhitenoiseSpecific* wn = (WhitenoiseSpecific*) whitenoise->private_data;
wn->phase = 0; wn->phase = 0;
wn->V1 = 0; wn->V1 = 0;
wn->V2 = 0; wn->V2 = 0;
@ -87,12 +94,50 @@ Siggen* Siggen_Whitenoise_create(const d fs, const d level_dB) {
return whitenoise; return whitenoise;
} }
Siggen* Siggen_Sweep_create(const d fs,const d fl,const d fu,
const d Ts, const us flags, const d level_dB) {
fsTRACE(15);
Siggen* sweep = Siggen_create(SWEEP, fs, level_dB);
dbgassert(sizeof(SweepSpecific) <= sizeof(sweep->private_data),
"Allocated memory too small");
// Set pointer to inplace data storage
SweepSpecific* sp = (SweepSpecific*) sweep->private_data;
if(fl < 0 || fu < 0 || Ts <= 0) {
return NULL;
}
sp->flags = flags;
sp->fl = fl;
sp->fu = fu;
sp->Ts = Ts;
sp->phase = 0;
sp->pos = flags & SWEEP_FLAG_BACKWARD ? false: true;
if(flags & SWEEP_FLAG_BACKWARD) {
sp->tau = Ts;
} else {
sp->tau = 0;
}
/* sp->pos = false; */
/* sp->tau = Ts/2; */
feTRACE(15);
return sweep;
}
void Siggen_free(Siggen* siggen) { void Siggen_free(Siggen* siggen) {
fsTRACE(15); fsTRACE(15);
assertvalidptr(siggen); assertvalidptr(siggen);
if(siggen->signaltype == SWEEP) { switch(siggen->signaltype) {
case SWEEP:
/* Sweep specific stuff here */ /* Sweep specific stuff here */
break;
case SINEWAVE:
/* Sweep specific stuff here */
break;
} }
a_free(siggen); a_free(siggen);
@ -114,6 +159,81 @@ static void Sinewave_genSignal(Siggen* siggen, SinewaveSpecific* sine, vd* sampl
feTRACE(15); feTRACE(15);
} }
static void Sweep_genSignal(Siggen* siggen, SweepSpecific* sweep,
vd* samples) {
fsTRACE(15);
assertvalidptr(sweep);
const d fl = sweep->fl;
const d fu = sweep->fu;
const d deltat = 1/siggen->fs;
const d Ts = sweep->Ts;
const d Thalf = Ts/2;
dVARTRACE(20, deltat);
// Load state
d tau = sweep->tau;
bool pos = sweep->pos;
// Obtain flags and expand
us flags = sweep->flags;
bool forward_sweep = flags & SWEEP_FLAG_FORWARD;
bool backward_sweep = flags & SWEEP_FLAG_BACKWARD;
dbgassert(!(forward_sweep && backward_sweep), "Both forward and backward flag set");
d k, Treverse;
if(forward_sweep || backward_sweep) {
k = (fu - fl)/Ts;
Treverse = Ts;
}
else {
k = (fu - fl)/Thalf;
Treverse = Ts/2;
}
/* const d k = 0; */
d phase = sweep->phase;
d curfreq;
for(us i =0; i< samples->n_rows; i++) {
curfreq = fl + k*tau;
phase = phase + 2*number_pi*curfreq*deltat;
// Subtract some to avoid possible overflow. Don't know whether such a
// thing really happens
if(phase > 2*number_pi)
phase = phase - 2*number_pi;
if(pos) {
tau = tau + deltat;
if(tau >= Treverse) {
if(forward_sweep) { tau = 0; }
else if(backward_sweep) { dbgassert(false, "BUG"); }
else { pos = false; }
}
} else {
/* dbgassert(false, "cannot get here"); */
tau = tau - deltat;
if(tau <= 0) {
if(backward_sweep) { tau = Treverse; }
else if(forward_sweep) { dbgassert(false, "BUG"); }
else { pos = true; }
}
}
setvecval(samples, i, siggen->level_amp*d_sin(phase));
}
// Store state
sweep->phase = phase;
sweep->pos = pos;
sweep->tau = tau;
feTRACE(15);
}
static void Whitenoise_genSignal(Siggen* siggen, WhitenoiseSpecific* wn, vd* samples) { static void Whitenoise_genSignal(Siggen* siggen, WhitenoiseSpecific* wn, vd* samples) {
fsTRACE(15); fsTRACE(15);
d X; d X;
@ -155,19 +275,23 @@ void Siggen_genSignal(Siggen* siggen,vd* samples) {
fsTRACE(15); fsTRACE(15);
assertvalidptr(siggen); assertvalidptr(siggen);
assert_vx(samples); assert_vx(samples);
d fs = siggen->fs;
switch(siggen->signaltype) { switch(siggen->signaltype) {
case SINEWAVE: case SINEWAVE:
Sinewave_genSignal(siggen, (SinewaveSpecific*) siggen->private, Sinewave_genSignal(siggen,
(SinewaveSpecific*) siggen->private_data,
samples); samples);
break; break;
case WHITENOISE: case WHITENOISE:
Whitenoise_genSignal(siggen, (WhitenoiseSpecific*) siggen->private, Whitenoise_genSignal(siggen,
(WhitenoiseSpecific*) siggen->private_data,
samples); samples);
break; break;
case SWEEP: case SWEEP:
Sweep_genSignal(siggen,
(SweepSpecific*) siggen->private_data,
samples);
break; break;
default: default:
dbgassert(false, "Not implementend signal type"); dbgassert(false, "Not implementend signal type");

View File

@ -37,8 +37,30 @@ Siggen* Siggen_Whitenoise_create(const d fs, const d level_dB);
*/ */
Siggen* Siggen_Pinknoise_create(const us fs,const d level_dB); Siggen* Siggen_Pinknoise_create(const us fs,const d level_dB);
/* Siggen* Siggen_ForwardSweep_create(const d fs,; */ // Define this flag to repeat a forward sweep only, or backward only. If not
/* Siggen* Siggen_(const d fs,; */ // set, we do a continuous sweep
#define SWEEP_FLAG_FORWARD 1
#define SWEEP_FLAG_BACKWARD 2
// Types of sweeps
#define SWEEP_FLAG_LINEAR 4
#define SWEEP_FLAG_EXPONENTIAL 8
#define SWEEP_FLAG_HYPERBOLIC 16
/**
* Create a forward sweep
*
* @param[in] fs: Sampling frequency [Hz]
* @param[in] fl: Lower frequency [Hz]
* @param[in] fl: Upper frequency [Hz]
* @param[in] Ts: Sweep period [s]
* @param[in] sweep_flags: Sweep period [s]
* @param[in] level: Relative level in [dB], should be between -inf and 0
* @return Siggen* handle
*/
Siggen* Siggen_Sweep_create(const d fs,const d fl,const d fu,
const d Ts, const us sweep_flags,
const d level);
/** /**
* Obtain a new piece of signal * Obtain a new piece of signal

View File

@ -1,4 +1,4 @@
#define TRACERPLUS 10 #define TRACERPLUS (-5)
#include "lasp_sosfilterbank.h" #include "lasp_sosfilterbank.h"

View File

@ -52,7 +52,6 @@ class FilterBankDesigner:
Returns: Returns:
True if filter is norm-compliant, False if not True if filter is norm-compliant, False if not
""" """
# Skip zero-frequency # Skip zero-frequency
if np.isclose(freq[0], 0): if np.isclose(freq[0], 0):
@ -62,7 +61,8 @@ class FilterBankDesigner:
# Interpolate limites to frequency array as given # Interpolate limites to frequency array as given
llim_full = np.interp(freq, freqlim, llim, left=-np.inf, right=-np.inf) llim_full = np.interp(freq, freqlim, llim, left=-np.inf, right=-np.inf)
ulim_full = np.interp(freq, freqlim, ulim, left=ulim[0], right=ulim[-1]) ulim_full = np.interp(freq, freqlim, ulim,
left=ulim[0], right=ulim[-1])
return bool(np.all(llim_full <= h_dB) and return bool(np.all(llim_full <= h_dB) and
np.all(ulim_full >= h_dB)) np.all(ulim_full >= h_dB))
@ -161,6 +161,128 @@ class FilterBankDesigner:
raise ValueError( raise ValueError(
f'Could not find an x-value corresponding to {nom_txt}.') f'Could not find an x-value corresponding to {nom_txt}.')
def getxs(self, nom_txt_start, nom_txt_end):
"""Returns a list of all filter designators, for given start end end
nominal frequencies.
Args:
nom_txt_start: Start frequency band, i.e. '31.5'
nom_txt_end: End frequency band, i.e. '10k'
Returns:
[x0, x1, ..]
"""
xstart = self.nominal_txt_tox(nom_txt_start)
xend = self.nominal_txt_tox(nom_txt_end)
return list(range(xstart, xend+1))
def getNarrowBandFromOctaveBand(self, xl, xu,
levels_in_bands, npoints=500,
method='flat',
scale='lin'):
"""Create a narrow band spectrum based on a spectrum in (fractional)
octave bands. The result is create such that the total energy in each
frequency band is constant. The latter can be checked by working it
back to (fractional) octave bands, which is doen using the function
`getOctaveBandsFromNarrowBand`. Note that the resulting narrow band has
units of *power*, not power spectral density. Input should be levels in
**deciBells**.
Args:
xl: Band designator of lowest band
xu: Band designator of highest band
levels_in_bands: levels in dB for each band, should have length
(xu+1 - xl)
npoints: Number of discrete frequency points in linear frequency
array.
method: 'flat' to give the same power for all frequencies in a
certain band.
scale: 'lin' for a linear frequency distribution. 'log' for a
logspace. Use 'log' with caution and only if you really know what
you are doing!'
Returns:
freq, levels_dB. Where levels_dB is an array of narrow band levels
"""
# Lowest frequency of all frequencies
fll = self.fl(xl)
# Highest frequency of all frequencies
fuu = self.fu(xu)
if scale == 'lin':
freq = np.linspace(fll, fuu, npoints)
elif scale == 'log':
freq = np.logspace(np.log10(fll), np.log10(fuu), npoints)
else:
raise ValueError(f'Invalid scale parameter: {scale}')
levels_narrow = np.empty_like(freq)
if method == 'flat':
for i, x in enumerate(range(xl, xu + 1)):
fl = self.fl(x)
fu = self.fu(x)
# Find the indices in the frequency array which correspond to the
# frequency band x
if x != xu:
indices_cur = np.where((freq >= fl) & (freq < fu))
else:
indices_cur = np.where((freq >= fl) & (freq <= fu))
power_cur = 10**(levels_in_bands[i] / 10)
power_narrow = power_cur / indices_cur[0].size
level_narrow = 10*np.log10(power_narrow)
levels_narrow[indices_cur] = level_narrow
return freq, levels_narrow
else:
raise ValueError('Unimplemented interpolation method')
def getOctaveBandFromNarrowBand(self, freq, levels_narrow):
"""Put all level results in a certain frequency band (`binning`), by
summing up the power.
Args:
freq: Narrow-band frequency array
levels_narrow: Narrow band power levels, should be power, not *PSD*
Returns:
(fm, levels_binned, nom_txt)
"""
# Find lower frequency xl
for x in self.xs:
xl = x
fl = self.fl(x)
if self.fl(x+1) > freq[0]:
break
# Find upper frequency xu
for x in self.xs:
xu = x
fu = self.fu(xu)
if fu >= freq[-1]:
break
freq_in_bands = []
levels_in_bands = []
nom_txt = []
for x in range(xl, xu+1):
fl = self.fl(x)
fu = self.fu(x)
if x != xu:
indices_cur = np.where((freq >= fl) & (freq < fu))
else:
indices_cur = np.where((freq >= fl) & (freq <= fu))
power_cur = np.sum(10**(levels_narrow[indices_cur] / 10))
levels_in_bands.append(10*np.log10(power_cur))
nom_txt.append(self.nominal_txt(x))
freq_in_bands.append(self.fm(x))
return np.array(freq_in_bands), np.array(levels_in_bands), nom_txt
class OctaveBankDesigner(FilterBankDesigner): class OctaveBankDesigner(FilterBankDesigner):
"""Octave band filter designer.""" """Octave band filter designer."""
@ -204,9 +326,11 @@ class OctaveBankDesigner(FilterBankDesigner):
mininf = -1e300 mininf = -1e300
if filter_class == 1: if filter_class == 1:
lower_limits_pos = [-0.3, -0.4, -0.6, -1.3, -5.0, -5.0] + 4*[mininf] lower_limits_pos = [-0.3, -0.4, -
0.6, -1.3, -5.0, -5.0] + 4*[mininf]
elif filter_class == 0: elif filter_class == 0:
lower_limits_pos = [-0.15, -0.2, -0.4, -1.1, -4.5, -4.5] + 4*[mininf] lower_limits_pos = [-0.15, -0.2, -
0.4, -1.1, -4.5, -4.5] + 4*[mininf]
lower_limits_neg = lower_limits_pos[:] lower_limits_neg = lower_limits_pos[:]
lower_limits_neg.reverse() lower_limits_neg.reverse()
lower_limits = np.asarray(lower_limits_neg[:-1] + lower_limits_pos) lower_limits = np.asarray(lower_limits_neg[:-1] + lower_limits_pos)
@ -223,7 +347,6 @@ class OctaveBankDesigner(FilterBankDesigner):
return freqs, lower_limits, upper_limits return freqs, lower_limits, upper_limits
def nominal_txt(self, x): def nominal_txt(self, x):
"""Returns textual repressentation of corresponding to the nominal """Returns textual repressentation of corresponding to the nominal
frequency.""" frequency."""

View File

@ -1,86 +0,0 @@
#!/usr/bin/python
"""
Bin narrow band power in octave/third octave band data
"""
from lasp.filter.bandpass_fir import (OctaveBankDesigner,
ThirdOctaveBankDesigner)
import numpy as np
import warnings
def binPower(freq, narrow_power, band=3, start_band=-16, stop_band=13):
"""
Apply binning to narrow band frequency domain power results
Args:
freq: Array of frequency indices
narrow_power: narrow-band power values in units of [W] or [Pa^2]
band: 1, or 3
Returns:
( ['25', '31.5', '40', '50', ... ],
[float(power_25), float(power_31p5), ...]) putting NAN values where
inaccurate.
"""
if band == 3:
designer = ThirdOctaveBankDesigner()
elif band == 1:
designer = OctaveBankDesigner()
else:
raise ValueError("Parameter 'Band' should be either '1', or '3'")
freq = np.copy(freq)
narrow_power = np.copy(narrow_power)
# Exact midband, lower and upper frequency limit of each band
fm = [designer.fm(x) for x in range(start_band, stop_band+1)]
fl = [designer.fl(x) for x in range(start_band, stop_band+1)]
fu = [designer.fu(x) for x in range(start_band, stop_band+1)]
fex = [designer.nominal_txt(x) for x in range(start_band, stop_band+1)]
# print(fl)
binned_power = np.zeros(len(fm), dtype=float)
## Start: linear interpolation between bins while Parseval is conserved
# current frequency resolution
df_old = freq[1]-freq[0]
# preferred new frequency resolution
df_new = .1
# ratio of resolutions
ratio = int(df_old/df_new)
# calculate new frequency bins
freq_new = np.linspace(freq[0],freq[-1],(len(freq)-1)*ratio+1)
# calculate the new bin data
interp_power = np.interp(freq_new, freq, narrow_power)/ratio
# adapt first and last bin values so that Parseval still holds
interp_power[0] = binned_power[0]*(1.+1./ratio)/2.
interp_power[-1] = binned_power[-1]*(1.+1./ratio)/2.
# check if Parseval still holds
# print(np.sum(y, axis=0))
# print(np.sum(y_new, axis=0))
## Stop: linear interpolation between bins while Parseval is conserved
binned_power = np.zeros(len(fm), dtype=float)
for k in range(len(fm)):
# print(k)
# find the bins which are in the corresponding band
bins = (fl[k] <= freq_new) & (freq_new < fu[k])
# print(bins)
# sum the output values of these bins to obtain the band value
binned_power[k] = np.sum(interp_power[bins], axis=0)
# if no frequency bin falls in a certain band, skip previous bands
# if not any(bins):
# binned_power[0:k+1] = np.nan
# check if data is valid
if(np.isnan(binned_power).all()):
warnings.warn('Invalid frequency array, we cannot bin these values')
return fm, fex, binned_power

View File

@ -65,7 +65,9 @@ cdef extern from "numpy/arrayobject.h":
cdef extern from "lasp_python.h": cdef extern from "lasp_python.h":
object dmat_to_ndarray(dmat*,bint transfer_ownership) object dmat_to_ndarray(dmat*,bint transfer_ownership)
__all__ = ['AvPowerSpectra', 'SosFilterBank', 'FilterBank'] __all__ = ['AvPowerSpectra', 'SosFilterBank', 'FilterBank', 'Siggen',
'sweep_flag_forward', 'sweep_flag_backward', 'sweep_flag_linear',
'sweep_flag_exponential', 'sweep_flag_hyperbolic']
setTracerLevel(15) setTracerLevel(15)
cdef extern from "cblas.h": cdef extern from "cblas.h":
@ -523,13 +525,27 @@ cdef class SPLowpass:
cdef extern from "lasp_siggen.h": cdef extern from "lasp_siggen.h":
ctypedef struct c_Siggen "Siggen" ctypedef struct c_Siggen "Siggen"
us SWEEP_FLAG_FORWARD, SWEEP_FLAG_BACKWARD, SWEEP_FLAG_LINEAR
us SWEEP_FLAG_EXPONENTIAL,SWEEP_FLAG_HYPERBOLIC
c_Siggen* Siggen_Whitenoise_create(d fs, d level_dB) c_Siggen* Siggen_Whitenoise_create(d fs, d level_dB)
c_Siggen* Siggen_Sinewave_create(d fs, d freq, d level_dB) c_Siggen* Siggen_Sinewave_create(d fs, d freq, d level_dB)
c_Siggen* Siggen_Sweep_create(d fs, d fl,
d fu, d Ts,us sweep_flags,
d level_dB)
void Siggen_genSignal(c_Siggen*, vd* samples) nogil void Siggen_genSignal(c_Siggen*, vd* samples) nogil
void Siggen_free(c_Siggen*) void Siggen_free(c_Siggen*)
# Sweep flags
sweep_flag_forward = SWEEP_FLAG_FORWARD
sweep_flag_backward = SWEEP_FLAG_BACKWARD
sweep_flag_linear = SWEEP_FLAG_LINEAR
sweep_flag_exponential = SWEEP_FLAG_EXPONENTIAL
sweep_flag_hyperbolic = SWEEP_FLAG_HYPERBOLIC
cdef class Siggen: cdef class Siggen:
cdef c_Siggen *_siggen cdef c_Siggen *_siggen
def __cinit__(self): def __cinit__(self):
@ -570,3 +586,17 @@ cdef class Siggen:
siggen = Siggen() siggen = Siggen()
siggen._siggen = c_siggen siggen._siggen = c_siggen
return siggen return siggen
@staticmethod
def sweep(d fs, d fl, d fu, d Ts,us sweep_flags, d level_dB):
cdef c_Siggen* c_siggen = Siggen_Sweep_create(fs,
fl,
fu,
Ts,
sweep_flags,
level_dB)
if c_siggen == NULL:
raise ValueError('Failed creating signal generator')
siggen = Siggen()
siggen._siggen = c_siggen
return siggen