From 8c92ac9ef86aef228103734aa6150d5b3968815e Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Sun, 18 Nov 2012 16:24:47 -0500 Subject: [PATCH] FFT_tools: clean up namespace by using _numpy --- FFT_tools.py | 182 +++++++++++++++++++++++++++------------------------ 1 file changed, 96 insertions(+), 86 deletions(-) diff --git a/FFT_tools.py b/FFT_tools.py index 9b9aee4..3383c08 100644 --- a/FFT_tools.py +++ b/FFT_tools.py @@ -34,10 +34,7 @@ Main entry functions: ... overlap=True, window=window_hann) """ -from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \ - sinc, arctan2, array, ones, arange, linspace, zeros, \ - uint16, float, concatenate, fromfile, argmax, complex -from numpy.fft import rfft +import numpy as _numpy __version__ = '0.3' @@ -48,23 +45,23 @@ TEST_PLOTS = False def floor_pow_of_two(num) : "Round num down to the closest exact a power of two." - lnum = log2(num) + lnum = _numpy.log2(num) if int(lnum) != lnum : - num = 2**floor(lnum) + num = 2**_numpy.floor(lnum) return num def round_pow_of_two(num) : "Round num to the closest exact a power of two on a log scale." - lnum = log2(num) + lnum = _numpy.log2(num) if int(lnum) != lnum : - num = 2**round(lnum) + num = 2**_numpy.round(lnum) return num def ceil_pow_of_two(num) : "Round num up to the closest exact a power of two." - lnum = log2(num) + lnum = _numpy.log2(num) if int(lnum) != lnum : - num = 2**ceil(lnum) + num = 2**_numpy.ceil(lnum) return num def _test_rfft(xs, Xs) : @@ -73,23 +70,25 @@ def _test_rfft(xs, Xs) : # X[k] = SUM x[m] exp (-j 2pi km /n) # m=0 # (see http://www.tramy.us/numpybook.pdf) - j = complex(0,1) + j = _numpy.complex(0,1) n = len(xs) Xa = [] for k in range(n) : - Xa.append(sum([x*exp(-j*2*pi*k*m/n) for x,m in zip(xs,range(n))])) + Xa.append(sum([x*_numpy.exp(-j*2*_numpy.pi*k*m/n) + for x,m in zip(xs,range(n))])) if k < len(Xs): - if (Xs[k]-Xa[k])/abs(Xa[k]) >= 1e-6: + if (Xs[k]-Xa[k])/_numpy.abs(Xa[k]) >= 1e-6: raise ValueError( ('rfft mismatch on element {}: {} != {}, relative error {}' - ).format(k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k]))) + ).format( + k, Xs[k], Xa[k], (Xs[k]-Xa[k])/_numpy.abs(Xa[k]))) # Which should satisfy the discrete form of Parseval's theorem # n-1 n-1 # SUM |x_m|^2 = 1/n SUM |X_k|^2. # m=0 k=0 - timeSum = sum([abs(x)**2 for x in xs]) - freqSum = sum([abs(X)**2 for X in Xa]) - if abs(freqSum/float(n) - timeSum)/timeSum >= 1e-6: + timeSum = sum([_numpy.abs(x)**2 for x in xs]) + freqSum = sum([_numpy.abs(X)**2 for X in Xa]) + if _numpy.abs(freqSum/_numpy.float(n) - timeSum)/timeSum >= 1e-6: raise ValueError( "Mismatch on Parseval's, {} != 1/{} * {}".format( timeSum, n, freqSum)) @@ -97,7 +96,7 @@ def _test_rfft(xs, Xs) : def _test_rfft_suite() : print('Test numpy rfft definition') xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1] - _test_rfft(xs, rfft(xs)) + _test_rfft(xs, _numpy.fft.rfft(xs)) def unitary_rfft(data, freq=1.0) : """Compute the Fourier transform of real data. @@ -141,8 +140,8 @@ def unitary_rfft(data, freq=1.0) : # so we can convert the Numpy transformed data to match our unitary # continuous transformed data with (also NR 12.1.8) # X'_k = dtX = X / - trans = rfft(data[0:nsamps]) / float(freq) - freq_axis = linspace(0, freq/2, nsamps/2+1) + trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq) + freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1) return (freq_axis, trans) def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs): @@ -160,30 +159,30 @@ def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs): Xa.append(Xa[k]) if len(xs) != len(Xa): raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa))) - lhs = sum([abs(x)**2 for x in xs]) * dt - rhs = sum([abs(X)**2 for X in Xa]) * df - if abs(lhs - rhs)/lhs >= 1e-4: + lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt + rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df + if _numpy.abs(lhs - rhs)/lhs >= 1e-4: raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs)) def _test_unitary_rfft_parsevals_suite(): print("Test unitary rfft on Parseval's theorem") xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1] - dt = pi + dt = _numpy.pi freqs,Xs = unitary_rfft(xs, 1.0/dt) _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs) def _rect(t) : - if abs(t) < 0.5 : + if _numpy.abs(t) < 0.5 : return 1 else : return 0 def _test_unitary_rfft_rect(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) : - "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)" - samp_freq = float(samp_freq) - a = float(a) + "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)" + samp_freq = _numpy.float(samp_freq) + a = _numpy.float(a) - x = zeros((samples,), dtype=float) + x = _numpy.zeros((samples,), dtype=_numpy.float) dt = 1.0/samp_freq for i in range(samples) : t = i*dt @@ -192,25 +191,25 @@ def _test_unitary_rfft_rect(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X) # remove the phase due to our time shift - j = complex(0.0,1.0) # sqrt(-1) + j = _numpy.complex(0.0,1.0) # sqrt(-1) for i in range(len(freq_axis)) : f = freq_axis[i] - inverse_phase_shift = exp(j*2.0*pi*time_shift*f) + inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f) X[i] *= inverse_phase_shift - expected = zeros((len(freq_axis),), dtype=float) + expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) # normalized sinc(x) = sin(pi x)/(pi x) # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi - if sinc(0.5) != 2.0/pi: + if _numpy.sinc(0.5) != 2.0/_numpy.pi: raise ValueError('abnormal sinc()') for i in range(len(freq_axis)) : f = freq_axis[i] - expected[i] = 1.0/abs(a) * sinc(f/a) + expected[i] = 1.0/_numpy.abs(a) * _numpy.sinc(f/a) if TEST_PLOTS : pylab.figure() pylab.subplot(211) - pylab.plot(arange(0, dt*samples, dt), x) + pylab.plot(_numpy.arange(0, dt*samples, dt), x) pylab.title('time series') pylab.subplot(212) pylab.plot(freq_axis, X.real, 'r.') @@ -226,14 +225,14 @@ def _test_unitary_rfft_rect_suite() : _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024) def _gaussian(a, t) : - return exp(-a * t**2) + return _numpy.exp(-a * t**2) def _test_unitary_rfft_gaussian(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) : "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)" - samp_freq = float(samp_freq) - a = float(a) + samp_freq = _numpy.float(samp_freq) + a = _numpy.float(a) - x = zeros((samples,), dtype=float) + x = _numpy.zeros((samples,), dtype=_numpy.float) dt = 1.0/samp_freq for i in range(samples) : t = i*dt @@ -242,21 +241,23 @@ def _test_unitary_rfft_gaussian(a=1.0, time_shift=5.0, samp_freq=25.6, samples=2 #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X) # remove the phase due to our time shift - j = complex(0.0,1.0) # sqrt(-1) + j = _numpy.complex(0.0,1.0) # sqrt(-1) for i in range(len(freq_axis)) : f = freq_axis[i] - inverse_phase_shift = exp(j*2.0*pi*time_shift*f) + inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f) X[i] *= inverse_phase_shift - expected = zeros((len(freq_axis),), dtype=float) + expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) for i in range(len(freq_axis)) : f = freq_axis[i] - expected[i] = sqrt(pi/a) * _gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself. + # see Wikipedia, or do the integral yourself. + expected[i] = _numpy.sqrt(_numpy.pi/a) * _gaussian( + 1.0/a, _numpy.pi*f) if TEST_PLOTS : pylab.figure() pylab.subplot(211) - pylab.plot(arange(0, dt*samples, dt), x) + pylab.plot(_numpy.arange(0, dt*samples, dt), x) pylab.title('time series') pylab.subplot(212) pylab.plot(freq_axis, X.real, 'r.') @@ -283,11 +284,11 @@ def power_spectrum(data, freq=1.0) : """ nsamps = floor_pow_of_two(len(data)) - freq_axis = linspace(0, freq/2, nsamps/2+1) + freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1) # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real. # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination. # See Numerical Recipies for a details. - trans = rfft(data[0:nsamps]) + trans = _numpy.fft.rfft(data[0:nsamps]) power = (trans * trans.conj()).real # We want the square of the amplitude. return (freq_axis, power) @@ -317,20 +318,20 @@ def unitary_power_spectrum(data, freq=1.0) : # total_time = len(data)/float(freq) # power *= 2.0 / float(freq)**2 / total_time # power *= 2.0 / freq**2 * freq / len(data) - power *= 2.0 / (freq * float(len(data))) + power *= 2.0 / (freq * _numpy.float(len(data))) return (freq_axis, power) def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024) : - x = zeros((samples,), dtype=float) - samp_freq = float(samp_freq) + x = _numpy.zeros((samples,), dtype=_numpy.float) + samp_freq = _numpy.float(samp_freq) for i in range(samples) : - x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq) + x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq) freq_axis, power = unitary_power_spectrum(x, samp_freq) - imax = argmax(power) + imax = _numpy.argmax(power) - expected = zeros((len(freq_axis),), dtype=float) - df = samp_freq/float(samples) # df = 1/T, where T = total_time + expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) + df = samp_freq/_numpy.float(samples) # df = 1/T, where T = total_time i = int(sin_freq/df) # average power per unit time is # P = @@ -367,7 +368,7 @@ def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024) : if TEST_PLOTS : pylab.figure() pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') + pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') pylab.title('time series') pylab.subplot(212) pylab.plot(freq_axis, power, 'r.') @@ -382,13 +383,14 @@ def _test_unitary_power_spectrum_sin_suite() : _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024) _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048) # finally, with some irrational numbers, to check that I'm not getting lucky - _test_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024) + _test_unitary_power_spectrum_sin( + sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024) # test with non-integer number of periods _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256) def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) : - x = zeros((samples,), dtype=float) - samp_freq = float(samp_freq) + x = _numpy.zeros((samples,), dtype=_numpy.float) + samp_freq = _numpy.float(samp_freq) x[0] = amp freq_axis, power = unitary_power_spectrum(x, samp_freq) @@ -403,7 +405,8 @@ def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) : # P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq) # = 2 amp**2 / (samp_freq*samples) expected_amp = 2.0 * amp**2 / (samp_freq * samples) - expected = ones((len(freq_axis),), dtype=float) * expected_amp + expected = _numpy.ones( + (len(freq_axis),), dtype=_numpy.float) * expected_amp print('The power should be flat at y = {} ({})'.format( expected_amp, power[0])) @@ -411,7 +414,7 @@ def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) : if TEST_PLOTS : pylab.figure() pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') + pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') pylab.title('time series') pylab.subplot(212) pylab.plot(freq_axis, power, 'r.') @@ -425,17 +428,19 @@ def _test_unitary_power_spectrum_delta_suite() : _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024) - _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024) + _test_unitary_power_spectrum_delta( + amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024) def _gaussian2(area, mean, std, t) : "Integral over all time = area (i.e. normalized for area=1)" - return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2) + return area/(std*_numpy.sqrt(2.0*_numpy.pi)) * _numpy.exp( + -0.5*((t-mean)/std)**2) def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512) : #1024 - x = zeros((samples,), dtype=float) - mean = float(mean) + x = _numpy.zeros((samples,), dtype=_numpy.float) + mean = _numpy.float(mean) for i in range(samples) : - t = i/float(samp_freq) + t = i/_numpy.float(samp_freq) x[i] = _gaussian2(area, mean, std, t) freq_axis, power = unitary_power_spectrum(x, samp_freq) @@ -450,10 +455,11 @@ def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10. # Where # T = samples/samp_freq (total time of data aquisition) mean = 0.0 - area = area /(std*sqrt(2.0*pi)) - std = 1.0/(2.0*pi*std) - expected = zeros((len(freq_axis),), dtype=float) - df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1]) + area = area /(std*_numpy.sqrt(2.0*_numpy.pi)) + std = 1.0/(2.0*_numpy.pi*std) + expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) + # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1]) + df = _numpy.float(samp_freq)/samples for i in range(len(freq_axis)) : f = i*df gaus = _gaussian2(area, mean, std, f) @@ -465,7 +471,7 @@ def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10. if TEST_PLOTS : pylab.figure() pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') + pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') pylab.title('time series') pylab.subplot(212) pylab.plot(freq_axis, power, 'r.') @@ -479,13 +485,15 @@ def _test_unitary_power_spectrum_gaussian_suite() : _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048) _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048) _test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024) - _test_unitary_power_spectrum_gaussian(area=pi, std=sqrt(2), samp_freq=exp(1), samples=1024) + _test_unitary_power_spectrum_gaussian( + area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1), + samples=1024) def window_hann(length) : "Returns a Hann window array with length entries" - win = zeros((length,), dtype=float) + win = _numpy.zeros((length,), dtype=_numpy.float) for i in range(length) : - win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length))) + win[i] = 0.5*(1.0-_numpy.cos(2.0*_numpy.pi*_numpy.float(i)/(length))) # avg value of cos over a period is 0 # so average height of Hann window is 0.5 return win @@ -518,18 +526,18 @@ def avg_power_spectrum(data, freq=1.0, chunk_size=2048, chunk_step = chunk_size win = window(chunk_size) # generate a window of the appropriate size - freq_axis = linspace(0, freq/2, chunk_size/2+1) + freq_axis = _numpy.linspace(0, freq/2, chunk_size/2+1) # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real. # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination. # See Numerical Recipies for a details. - power = zeros((chunk_size/2+1,), dtype=float) + power = _numpy.zeros((chunk_size/2+1,), dtype=_numpy.float) for i in range(nchunks) : starti = i*chunk_step stopi = starti+chunk_size - fft_chunk = rfft(data[starti:stopi]*win) + fft_chunk = _numpy.fft.rfft(data[starti:stopi]*win) p_chunk = (fft_chunk * fft_chunk.conj()).real - power += p_chunk.astype(float) - power /= float(nchunks) + power += p_chunk.astype(_numpy.float) + power /= _numpy.float(nchunks) return (freq_axis, power) def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048, @@ -539,7 +547,8 @@ def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048, freq_axis,power = avg_power_spectrum( data, freq, chunk_size, overlap, window) # 2.0 / (freq * chunk_size) |rfft()|**2 --> unitary_power_spectrum - power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum() + # see unitary_power_spectrum() + power *= 2.0 / (freq*_numpy.float(chunk_size)) * 8/3 # * 8/3 to remove power from windowing # <[x(t)*w(t)]**2> = ~= * # where the ~= is because the frequency of x(t) >> the frequency of w(t). @@ -553,16 +562,16 @@ def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048, def _test_unitary_avg_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024, chunk_size=512, overlap=True, window=window_hann) : - x = zeros((samples,), dtype=float) - samp_freq = float(samp_freq) + x = _numpy.zeros((samples,), dtype=_numpy.float) + samp_freq = _numpy.float(samp_freq) for i in range(samples) : - x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq) + x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq) freq_axis, power = unitary_avg_power_spectrum( x, samp_freq, chunk_size, overlap, window) - imax = argmax(power) + imax = _numpy.argmax(power) - expected = zeros((len(freq_axis),), dtype=float) - df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time + expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) + df = samp_freq/_numpy.float(chunk_size) # df = 1/T, where T = total_time i = int(sin_freq/df) expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin() @@ -578,7 +587,7 @@ def _test_unitary_avg_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=102 if TEST_PLOTS : pylab.figure() pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') + pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') pylab.title('time series') pylab.subplot(212) pylab.plot(freq_axis, power, 'r.') @@ -595,7 +604,8 @@ def _test_unitary_avg_power_spectrum_sin_suite() : # test long wavelenth sin, so be closer to window frequency _test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048) # finally, with some irrational numbers, to check that I'm not getting lucky - _test_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024) + _test_unitary_avg_power_spectrum_sin( + sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024) def test() : -- 2.26.2