... overlap=True, window=window_hann)
"""
-from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \
- sinc, arctan2, array, ones, arange, linspace, zeros, \
- uint16, float, concatenate, fromfile, argmax, complex
-from numpy.fft import rfft
+import numpy as _numpy
__version__ = '0.3'
def floor_pow_of_two(num) :
"Round num down to the closest exact a power of two."
- lnum = log2(num)
+ lnum = _numpy.log2(num)
if int(lnum) != lnum :
- num = 2**floor(lnum)
+ num = 2**_numpy.floor(lnum)
return num
def round_pow_of_two(num) :
"Round num to the closest exact a power of two on a log scale."
- lnum = log2(num)
+ lnum = _numpy.log2(num)
if int(lnum) != lnum :
- num = 2**round(lnum)
+ num = 2**_numpy.round(lnum)
return num
def ceil_pow_of_two(num) :
"Round num up to the closest exact a power of two."
- lnum = log2(num)
+ lnum = _numpy.log2(num)
if int(lnum) != lnum :
- num = 2**ceil(lnum)
+ num = 2**_numpy.ceil(lnum)
return num
def _test_rfft(xs, Xs) :
# X[k] = SUM x[m] exp (-j 2pi km /n)
# m=0
# (see http://www.tramy.us/numpybook.pdf)
- j = complex(0,1)
+ j = _numpy.complex(0,1)
n = len(xs)
Xa = []
for k in range(n) :
- Xa.append(sum([x*exp(-j*2*pi*k*m/n) for x,m in zip(xs,range(n))]))
+ Xa.append(sum([x*_numpy.exp(-j*2*_numpy.pi*k*m/n)
+ for x,m in zip(xs,range(n))]))
if k < len(Xs):
- if (Xs[k]-Xa[k])/abs(Xa[k]) >= 1e-6:
+ if (Xs[k]-Xa[k])/_numpy.abs(Xa[k]) >= 1e-6:
raise ValueError(
('rfft mismatch on element {}: {} != {}, relative error {}'
- ).format(k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k])))
+ ).format(
+ k, Xs[k], Xa[k], (Xs[k]-Xa[k])/_numpy.abs(Xa[k])))
# Which should satisfy the discrete form of Parseval's theorem
# n-1 n-1
# SUM |x_m|^2 = 1/n SUM |X_k|^2.
# m=0 k=0
- timeSum = sum([abs(x)**2 for x in xs])
- freqSum = sum([abs(X)**2 for X in Xa])
- if abs(freqSum/float(n) - timeSum)/timeSum >= 1e-6:
+ timeSum = sum([_numpy.abs(x)**2 for x in xs])
+ freqSum = sum([_numpy.abs(X)**2 for X in Xa])
+ if _numpy.abs(freqSum/_numpy.float(n) - timeSum)/timeSum >= 1e-6:
raise ValueError(
"Mismatch on Parseval's, {} != 1/{} * {}".format(
timeSum, n, freqSum))
def _test_rfft_suite() :
print('Test numpy rfft definition')
xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
- _test_rfft(xs, rfft(xs))
+ _test_rfft(xs, _numpy.fft.rfft(xs))
def unitary_rfft(data, freq=1.0) :
"""Compute the Fourier transform of real data.
# so we can convert the Numpy transformed data to match our unitary
# continuous transformed data with (also NR 12.1.8)
# X'_k = dtX = X / <sampling freq>
- trans = rfft(data[0:nsamps]) / float(freq)
- freq_axis = linspace(0, freq/2, nsamps/2+1)
+ trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
+ freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
return (freq_axis, trans)
def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
Xa.append(Xa[k])
if len(xs) != len(Xa):
raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa)))
- lhs = sum([abs(x)**2 for x in xs]) * dt
- rhs = sum([abs(X)**2 for X in Xa]) * df
- if abs(lhs - rhs)/lhs >= 1e-4:
+ lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
+ rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
+ if _numpy.abs(lhs - rhs)/lhs >= 1e-4:
raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs))
def _test_unitary_rfft_parsevals_suite():
print("Test unitary rfft on Parseval's theorem")
xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
- dt = pi
+ dt = _numpy.pi
freqs,Xs = unitary_rfft(xs, 1.0/dt)
_test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
def _rect(t) :
- if abs(t) < 0.5 :
+ if _numpy.abs(t) < 0.5 :
return 1
else :
return 0
def _test_unitary_rfft_rect(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
- "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
- samp_freq = float(samp_freq)
- a = float(a)
+ "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)"
+ samp_freq = _numpy.float(samp_freq)
+ a = _numpy.float(a)
- x = zeros((samples,), dtype=float)
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
dt = 1.0/samp_freq
for i in range(samples) :
t = i*dt
#_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
# remove the phase due to our time shift
- j = complex(0.0,1.0) # sqrt(-1)
+ j = _numpy.complex(0.0,1.0) # sqrt(-1)
for i in range(len(freq_axis)) :
f = freq_axis[i]
- inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
+ inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
X[i] *= inverse_phase_shift
- expected = zeros((len(freq_axis),), dtype=float)
+ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
# normalized sinc(x) = sin(pi x)/(pi x)
# so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
- if sinc(0.5) != 2.0/pi:
+ if _numpy.sinc(0.5) != 2.0/_numpy.pi:
raise ValueError('abnormal sinc()')
for i in range(len(freq_axis)) :
f = freq_axis[i]
- expected[i] = 1.0/abs(a) * sinc(f/a)
+ expected[i] = 1.0/_numpy.abs(a) * _numpy.sinc(f/a)
if TEST_PLOTS :
pylab.figure()
pylab.subplot(211)
- pylab.plot(arange(0, dt*samples, dt), x)
+ pylab.plot(_numpy.arange(0, dt*samples, dt), x)
pylab.title('time series')
pylab.subplot(212)
pylab.plot(freq_axis, X.real, 'r.')
_test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
def _gaussian(a, t) :
- return exp(-a * t**2)
+ return _numpy.exp(-a * t**2)
def _test_unitary_rfft_gaussian(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
"Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
- samp_freq = float(samp_freq)
- a = float(a)
+ samp_freq = _numpy.float(samp_freq)
+ a = _numpy.float(a)
- x = zeros((samples,), dtype=float)
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
dt = 1.0/samp_freq
for i in range(samples) :
t = i*dt
#_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
# remove the phase due to our time shift
- j = complex(0.0,1.0) # sqrt(-1)
+ j = _numpy.complex(0.0,1.0) # sqrt(-1)
for i in range(len(freq_axis)) :
f = freq_axis[i]
- inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
+ inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
X[i] *= inverse_phase_shift
- expected = zeros((len(freq_axis),), dtype=float)
+ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
for i in range(len(freq_axis)) :
f = freq_axis[i]
- expected[i] = sqrt(pi/a) * _gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself.
+ # see Wikipedia, or do the integral yourself.
+ expected[i] = _numpy.sqrt(_numpy.pi/a) * _gaussian(
+ 1.0/a, _numpy.pi*f)
if TEST_PLOTS :
pylab.figure()
pylab.subplot(211)
- pylab.plot(arange(0, dt*samples, dt), x)
+ pylab.plot(_numpy.arange(0, dt*samples, dt), x)
pylab.title('time series')
pylab.subplot(212)
pylab.plot(freq_axis, X.real, 'r.')
"""
nsamps = floor_pow_of_two(len(data))
- freq_axis = linspace(0, freq/2, nsamps/2+1)
+ freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
# nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
# >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
# See Numerical Recipies for a details.
- trans = rfft(data[0:nsamps])
+ trans = _numpy.fft.rfft(data[0:nsamps])
power = (trans * trans.conj()).real # We want the square of the amplitude.
return (freq_axis, power)
# total_time = len(data)/float(freq)
# power *= 2.0 / float(freq)**2 / total_time
# power *= 2.0 / freq**2 * freq / len(data)
- power *= 2.0 / (freq * float(len(data)))
+ power *= 2.0 / (freq * _numpy.float(len(data)))
return (freq_axis, power)
def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024) :
- x = zeros((samples,), dtype=float)
- samp_freq = float(samp_freq)
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
+ samp_freq = _numpy.float(samp_freq)
for i in range(samples) :
- x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
+ x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
freq_axis, power = unitary_power_spectrum(x, samp_freq)
- imax = argmax(power)
+ imax = _numpy.argmax(power)
- expected = zeros((len(freq_axis),), dtype=float)
- df = samp_freq/float(samples) # df = 1/T, where T = total_time
+ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
+ df = samp_freq/_numpy.float(samples) # df = 1/T, where T = total_time
i = int(sin_freq/df)
# average power per unit time is
# P = <x(t)**2>
if TEST_PLOTS :
pylab.figure()
pylab.subplot(211)
- pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
+ pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
pylab.title('time series')
pylab.subplot(212)
pylab.plot(freq_axis, power, 'r.')
_test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
_test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
# finally, with some irrational numbers, to check that I'm not getting lucky
- _test_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
+ _test_unitary_power_spectrum_sin(
+ sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
# test with non-integer number of periods
_test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) :
- x = zeros((samples,), dtype=float)
- samp_freq = float(samp_freq)
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
+ samp_freq = _numpy.float(samp_freq)
x[0] = amp
freq_axis, power = unitary_power_spectrum(x, samp_freq)
# P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
# = 2 amp**2 / (samp_freq*samples)
expected_amp = 2.0 * amp**2 / (samp_freq * samples)
- expected = ones((len(freq_axis),), dtype=float) * expected_amp
+ expected = _numpy.ones(
+ (len(freq_axis),), dtype=_numpy.float) * expected_amp
print('The power should be flat at y = {} ({})'.format(
expected_amp, power[0]))
if TEST_PLOTS :
pylab.figure()
pylab.subplot(211)
- pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
+ pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
pylab.title('time series')
pylab.subplot(212)
pylab.plot(freq_axis, power, 'r.')
_test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
_test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
_test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
- _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024)
+ _test_unitary_power_spectrum_delta(
+ amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
def _gaussian2(area, mean, std, t) :
"Integral over all time = area (i.e. normalized for area=1)"
- return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2)
+ return area/(std*_numpy.sqrt(2.0*_numpy.pi)) * _numpy.exp(
+ -0.5*((t-mean)/std)**2)
def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512) : #1024
- x = zeros((samples,), dtype=float)
- mean = float(mean)
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
+ mean = _numpy.float(mean)
for i in range(samples) :
- t = i/float(samp_freq)
+ t = i/_numpy.float(samp_freq)
x[i] = _gaussian2(area, mean, std, t)
freq_axis, power = unitary_power_spectrum(x, samp_freq)
# Where
# T = samples/samp_freq (total time of data aquisition)
mean = 0.0
- area = area /(std*sqrt(2.0*pi))
- std = 1.0/(2.0*pi*std)
- expected = zeros((len(freq_axis),), dtype=float)
- df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
+ area = area /(std*_numpy.sqrt(2.0*_numpy.pi))
+ std = 1.0/(2.0*_numpy.pi*std)
+ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
+ # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
+ df = _numpy.float(samp_freq)/samples
for i in range(len(freq_axis)) :
f = i*df
gaus = _gaussian2(area, mean, std, f)
if TEST_PLOTS :
pylab.figure()
pylab.subplot(211)
- pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
+ pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
pylab.title('time series')
pylab.subplot(212)
pylab.plot(freq_axis, power, 'r.')
_test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048)
_test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048)
_test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024)
- _test_unitary_power_spectrum_gaussian(area=pi, std=sqrt(2), samp_freq=exp(1), samples=1024)
+ _test_unitary_power_spectrum_gaussian(
+ area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1),
+ samples=1024)
def window_hann(length) :
"Returns a Hann window array with length entries"
- win = zeros((length,), dtype=float)
+ win = _numpy.zeros((length,), dtype=_numpy.float)
for i in range(length) :
- win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length)))
+ win[i] = 0.5*(1.0-_numpy.cos(2.0*_numpy.pi*_numpy.float(i)/(length)))
# avg value of cos over a period is 0
# so average height of Hann window is 0.5
return win
chunk_step = chunk_size
win = window(chunk_size) # generate a window of the appropriate size
- freq_axis = linspace(0, freq/2, chunk_size/2+1)
+ freq_axis = _numpy.linspace(0, freq/2, chunk_size/2+1)
# nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
# >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
# See Numerical Recipies for a details.
- power = zeros((chunk_size/2+1,), dtype=float)
+ power = _numpy.zeros((chunk_size/2+1,), dtype=_numpy.float)
for i in range(nchunks) :
starti = i*chunk_step
stopi = starti+chunk_size
- fft_chunk = rfft(data[starti:stopi]*win)
+ fft_chunk = _numpy.fft.rfft(data[starti:stopi]*win)
p_chunk = (fft_chunk * fft_chunk.conj()).real
- power += p_chunk.astype(float)
- power /= float(nchunks)
+ power += p_chunk.astype(_numpy.float)
+ power /= _numpy.float(nchunks)
return (freq_axis, power)
def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
freq_axis,power = avg_power_spectrum(
data, freq, chunk_size, overlap, window)
# 2.0 / (freq * chunk_size) |rfft()|**2 --> unitary_power_spectrum
- power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum()
+ # see unitary_power_spectrum()
+ power *= 2.0 / (freq*_numpy.float(chunk_size)) * 8/3
# * 8/3 to remove power from windowing
# <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
# where the ~= is because the frequency of x(t) >> the frequency of w(t).
def _test_unitary_avg_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024,
chunk_size=512, overlap=True,
window=window_hann) :
- x = zeros((samples,), dtype=float)
- samp_freq = float(samp_freq)
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
+ samp_freq = _numpy.float(samp_freq)
for i in range(samples) :
- x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
+ x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
freq_axis, power = unitary_avg_power_spectrum(
x, samp_freq, chunk_size, overlap, window)
- imax = argmax(power)
+ imax = _numpy.argmax(power)
- expected = zeros((len(freq_axis),), dtype=float)
- df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time
+ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
+ df = samp_freq/_numpy.float(chunk_size) # df = 1/T, where T = total_time
i = int(sin_freq/df)
expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
if TEST_PLOTS :
pylab.figure()
pylab.subplot(211)
- pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
+ pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
pylab.title('time series')
pylab.subplot(212)
pylab.plot(freq_axis, power, 'r.')
# test long wavelenth sin, so be closer to window frequency
_test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
# finally, with some irrational numbers, to check that I'm not getting lucky
- _test_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
+ _test_unitary_avg_power_spectrum_sin(
+ sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
def test() :