FFT_tools: clean up namespace by using _numpy
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 numpy.fft.rfft.
19
20 Create some fake data:
21
22 >>> import numpy
23 >>> data = numpy.random.rand(10)
24 >>> frequency = 1
25
26 Main entry functions:
27
28 >>> rfft = unitary_rfft(data, freq=frequency)
29 >>> psd = power_spectrum(data, freq=frequency)
30 >>> upsd = unitary_power_spectrum(data, freq=frequency)
31 >>> apsd = avg_power_spectrum(data, freq=frequency, chunk_size=2048,
32 ...     overlap=True, window=window_hann)
33 >>> aupsd = unitary_avg_power_spectrum(data, freq=frequency, chunk_size=2048,
34 ...     overlap=True, window=window_hann)
35 """
36
37 import numpy as _numpy
38
39
40 __version__ = '0.3'
41
42 # Display time- and freq-space plots of the test transforms if True
43 TEST_PLOTS = False
44
45
46 def floor_pow_of_two(num) :
47     "Round num down to the closest exact a power of two."
48     lnum = _numpy.log2(num)
49     if int(lnum) != lnum :
50         num = 2**_numpy.floor(lnum)
51     return num
52
53 def round_pow_of_two(num) :
54     "Round num to the closest exact a power of two on a log scale."
55     lnum = _numpy.log2(num)
56     if int(lnum) != lnum :
57         num = 2**_numpy.round(lnum)
58     return num
59
60 def ceil_pow_of_two(num) :
61     "Round num up to the closest exact a power of two."
62     lnum = _numpy.log2(num)
63     if int(lnum) != lnum :
64         num = 2**_numpy.ceil(lnum)
65     return num
66
67 def _test_rfft(xs, Xs) :
68     # Numpy's FFT algoritm returns
69     #          n-1
70     #   X[k] = SUM x[m] exp (-j 2pi km /n)
71     #          m=0
72     # (see http://www.tramy.us/numpybook.pdf)
73     j = _numpy.complex(0,1)
74     n = len(xs)
75     Xa = []
76     for k in range(n) :
77         Xa.append(sum([x*_numpy.exp(-j*2*_numpy.pi*k*m/n)
78                        for x,m in zip(xs,range(n))]))
79         if k < len(Xs):
80             if (Xs[k]-Xa[k])/_numpy.abs(Xa[k]) >= 1e-6:
81                 raise ValueError(
82                     ('rfft mismatch on element {}: {} != {}, relative error {}'
83                      ).format(
84                         k, Xs[k], Xa[k], (Xs[k]-Xa[k])/_numpy.abs(Xa[k])))
85     # Which should satisfy the discrete form of Parseval's theorem
86     #   n-1               n-1
87     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
88     #   m=0               k=0
89     timeSum = sum([_numpy.abs(x)**2 for x in xs])
90     freqSum = sum([_numpy.abs(X)**2 for X in Xa])
91     if _numpy.abs(freqSum/_numpy.float(n) - timeSum)/timeSum >= 1e-6:
92         raise ValueError(
93             "Mismatch on Parseval's, {} != 1/{} * {}".format(
94                 timeSum, n, freqSum))
95
96 def _test_rfft_suite() :
97     print('Test numpy rfft definition')
98     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
99     _test_rfft(xs, _numpy.fft.rfft(xs))
100
101 def unitary_rfft(data, freq=1.0) :
102     """Compute the Fourier transform of real data.
103
104     Unitary (preserves power [Parseval's theorem]).
105
106     If the units on your data are Volts,
107     and your sampling frequency is in Hz,
108     then freq_axis will be in Hz,
109     and trans will be in Volts.
110     """
111     nsamps = floor_pow_of_two(len(data))
112     # Which should satisfy the discrete form of Parseval's theorem
113     #   n-1               n-1
114     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
115     #   m=0               k=0
116     # However, we want our FFT to satisfy the continuous Parseval eqn
117     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
118     # which has the discrete form
119     #   n-1              n-1
120     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
121     #   m=0              k=0
122     # with X'_k = AX, this gives us
123     #   n-1                     n-1
124     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
125     #   m=0                     k=0
126     # so we see
127     #   A^2 df/dt = 1/n
128     #   A^2 = 1/n dt/df
129     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
130     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
131     # f_c in the transformed data is the Nyquist frequency (12.1.2)
132     #   f_c = 1/2dt
133     # and the points are spaced out by (12.1.5)
134     #   df = 1/ndt
135     # so
136     #   dt = 1/ndf
137     #   dt/df = 1/ndf^2
138     #   A^2 = 1/n^2df^2
139     #   A = 1/ndf = ndt/n = dt
140     # so we can convert the Numpy transformed data to match our unitary
141     # continuous transformed data with (also NR 12.1.8)
142     #   X'_k = dtX = X / <sampling freq>
143     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
144     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
145     return (freq_axis, trans)
146
147 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
148     # Which should satisfy the discretized integral form of Parseval's theorem
149     #   n-1              n-1
150     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
151     #   m=0              k=0
152     dt = 1.0/freq
153     df = freqs[1]-freqs[0]
154     if df - 1/(len(xs)*dt)/df >= 1e-6:
155         raise ValueError(
156             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
157     Xa = list(Xs)
158     for k in range(len(Xs)-1,1,-1) :
159         Xa.append(Xa[k])
160     if len(xs) != len(Xa):
161         raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa)))
162     lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
163     rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
164     if _numpy.abs(lhs - rhs)/lhs >= 1e-4:
165         raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs))
166
167 def _test_unitary_rfft_parsevals_suite():
168     print("Test unitary rfft on Parseval's theorem")
169     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
170     dt = _numpy.pi
171     freqs,Xs = unitary_rfft(xs, 1.0/dt)
172     _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
173
174 def _rect(t) :
175     if _numpy.abs(t) < 0.5 :
176         return 1
177     else :
178         return 0
179
180 def _test_unitary_rfft_rect(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
181     "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)"
182     samp_freq = _numpy.float(samp_freq)
183     a = _numpy.float(a)
184
185     x = _numpy.zeros((samples,), dtype=_numpy.float)
186     dt = 1.0/samp_freq
187     for i in range(samples) :
188         t = i*dt
189         x[i] = _rect(a*(t-time_shift))
190     freq_axis, X = unitary_rfft(x, samp_freq)
191     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
192
193     # remove the phase due to our time shift
194     j = _numpy.complex(0.0,1.0) # sqrt(-1)
195     for i in range(len(freq_axis)) :
196         f = freq_axis[i]
197         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
198         X[i] *= inverse_phase_shift
199
200     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
201     # normalized sinc(x) = sin(pi x)/(pi x)
202     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
203     if _numpy.sinc(0.5) != 2.0/_numpy.pi:
204         raise ValueError('abnormal sinc()')
205     for i in range(len(freq_axis)) :
206         f = freq_axis[i]
207         expected[i] = 1.0/_numpy.abs(a) * _numpy.sinc(f/a)
208
209     if TEST_PLOTS :
210         pylab.figure()
211         pylab.subplot(211)
212         pylab.plot(_numpy.arange(0, dt*samples, dt), x)
213         pylab.title('time series')
214         pylab.subplot(212)
215         pylab.plot(freq_axis, X.real, 'r.')
216         pylab.plot(freq_axis, X.imag, 'g.')
217         pylab.plot(freq_axis, expected, 'b-')
218         pylab.title('freq series')
219
220 def _test_unitary_rfft_rect_suite() :
221     print('Test unitary FFTs on variously shaped rectangular functions')
222     _test_unitary_rfft_rect(a=0.5)
223     _test_unitary_rfft_rect(a=2.0)
224     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
225     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
226
227 def _gaussian(a, t) :
228     return _numpy.exp(-a * t**2)
229
230 def _test_unitary_rfft_gaussian(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
231     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
232     samp_freq = _numpy.float(samp_freq)
233     a = _numpy.float(a)
234
235     x = _numpy.zeros((samples,), dtype=_numpy.float)
236     dt = 1.0/samp_freq
237     for i in range(samples) :
238         t = i*dt
239         x[i] = _gaussian(a, (t-time_shift))
240     freq_axis, X = unitary_rfft(x, samp_freq)
241     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
242
243     # remove the phase due to our time shift
244     j = _numpy.complex(0.0,1.0) # sqrt(-1)
245     for i in range(len(freq_axis)) :
246         f = freq_axis[i]
247         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
248         X[i] *= inverse_phase_shift
249
250     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
251     for i in range(len(freq_axis)) :
252         f = freq_axis[i]
253         # see Wikipedia, or do the integral yourself.
254         expected[i] = _numpy.sqrt(_numpy.pi/a) * _gaussian(
255             1.0/a, _numpy.pi*f)
256
257     if TEST_PLOTS :
258         pylab.figure()
259         pylab.subplot(211)
260         pylab.plot(_numpy.arange(0, dt*samples, dt), x)
261         pylab.title('time series')
262         pylab.subplot(212)
263         pylab.plot(freq_axis, X.real, 'r.')
264         pylab.plot(freq_axis, X.imag, 'g.')
265         pylab.plot(freq_axis, expected, 'b-')
266         pylab.title('freq series')
267
268 def _test_unitary_rfft_gaussian_suite() :
269     print("Test unitary FFTs on variously shaped gaussian functions")
270     _test_unitary_rfft_gaussian(a=0.5)
271     _test_unitary_rfft_gaussian(a=2.0)
272     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
273     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
274
275
276
277 def power_spectrum(data, freq=1.0) :
278     """Compute the power spectrum of DATA taken with a sampling frequency FREQ.
279
280     DATA must be real (not complex).
281     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
282     If the number of samples in data is not an integer power of two,
283     the FFT ignores some of the later points.
284     """
285     nsamps = floor_pow_of_two(len(data))
286
287     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
288     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
289     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
290     # See Numerical Recipies for a details.
291     trans = _numpy.fft.rfft(data[0:nsamps])
292     power = (trans * trans.conj()).real # We want the square of the amplitude.
293     return (freq_axis, power)
294
295 def unitary_power_spectrum(data, freq=1.0) :
296     freq_axis,power = power_spectrum(data, freq)
297     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
298     #
299     # numpy normalizes with 1/N on the inverse transform ifft,
300     # so we should normalize the freq-space representation with 1/sqrt(N).
301     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
302     # So the power gets normalized by that twice and we have 2/N
303     #
304     # On top of this, the FFT assumes a sampling freq of 1 per second,
305     # and we want to preserve area under our curves.
306     # If our total time T = len(data)/freq is smaller than 1,
307     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)),
308     # and we need to scale the powers down to conserve area.
309     # df_fft * F_fft(f) = df_real *F_real(f)
310     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
311     # So the power gets normalized by *that* twice and we have 2/N * freq**2
312
313     # power per unit time
314     # measure x(t) for time T
315     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
316     # PSD(f) = 2 |X(f)|**2 / T
317
318     # total_time = len(data)/float(freq)
319     # power *= 2.0 / float(freq)**2   /   total_time
320     # power *= 2.0 / freq**2   *   freq / len(data)
321     power *= 2.0 / (freq * _numpy.float(len(data)))
322
323     return (freq_axis, power)
324
325 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024) :
326     x = _numpy.zeros((samples,), dtype=_numpy.float)
327     samp_freq = _numpy.float(samp_freq)
328     for i in range(samples) :
329         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
330     freq_axis, power = unitary_power_spectrum(x, samp_freq)
331     imax = _numpy.argmax(power)
332
333     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
334     df = samp_freq/_numpy.float(samples) # df = 1/T, where T = total_time
335     i = int(sin_freq/df)
336     # average power per unit time is
337     #  P = <x(t)**2>
338     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
339     # so average value of (int sin(t)**2 dt) per unit time is 0.5
340     #  P = 0.5
341     # we spread that power over a frequency bin of width df, sp
342     #  P(f0) = 0.5/df
343     # where f0 is the sin's frequency
344     #
345     # or :
346     # FFT of sin(2*pi*t*f0) gives
347     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
348     # (area under x(t) = 0, area under X(f) = 0)
349     # so one sided power spectral density (PSD) per unit time is
350     #  P(f) = 2 |X(f)|**2 / T
351     #       = 2 * |0.5 delta(f-f0)|**2 / T
352     #       = 0.5 * |delta(f-f0)|**2 / T
353     # but we're discrete and want the integral of the 'delta' to be 1,
354     # so 'delta'*df = 1  --> 'delta' = 1/df, and
355     #  P(f) = 0.5 / (df**2 * T)
356     #       = 0.5 / df                (T = 1/df)
357     expected[i] = 0.5 / df
358
359     print('The power should be a peak at {} Hz of {} ({}, {})'.format(
360             sin_freq, expected[i], freq_axis[imax], power[imax]))
361     Pexp = 0
362     P    = 0
363     for i in range(len(freq_axis)) :
364         Pexp += expected[i] *df
365         P    += power[i] * df
366     print('The total power should be {} ({})'.format(Pexp, P))
367
368     if TEST_PLOTS :
369         pylab.figure()
370         pylab.subplot(211)
371         pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
372         pylab.title('time series')
373         pylab.subplot(212)
374         pylab.plot(freq_axis, power, 'r.')
375         pylab.plot(freq_axis, expected, 'b-')
376         pylab.title('{} samples of sin at {} Hz'.format(samples, sin_freq))
377
378 def _test_unitary_power_spectrum_sin_suite() :
379     print('Test unitary power spectrums on variously shaped sin functions')
380     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
381     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
382     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
383     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
384     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
385     # finally, with some irrational numbers, to check that I'm not getting lucky
386     _test_unitary_power_spectrum_sin(
387         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
388     # test with non-integer number of periods
389     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
390
391 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) :
392     x = _numpy.zeros((samples,), dtype=_numpy.float)
393     samp_freq = _numpy.float(samp_freq)
394     x[0] = amp
395     freq_axis, power = unitary_power_spectrum(x, samp_freq)
396
397     # power = <x(t)**2> = (amp)**2 * dt/T
398     # we spread that power over the entire freq_axis [0,fN], so
399     #  P(f)  = (amp)**2 dt / (T fN)
400     # where
401     #  dt = 1/samp_freq        (sample period)
402     #  T  = samples/samp_freq  (total time of data aquisition)
403     #  fN = 0.5 samp_freq      (Nyquist frequency)
404     # so
405     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
406     #       = 2 amp**2 / (samp_freq*samples)
407     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
408     expected = _numpy.ones(
409         (len(freq_axis),), dtype=_numpy.float) * expected_amp
410
411     print('The power should be flat at y = {} ({})'.format(
412         expected_amp, power[0]))
413
414     if TEST_PLOTS :
415         pylab.figure()
416         pylab.subplot(211)
417         pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
418         pylab.title('time series')
419         pylab.subplot(212)
420         pylab.plot(freq_axis, power, 'r.')
421         pylab.plot(freq_axis, expected, 'b-')
422         pylab.title('{} samples of delta amp {}'.format(samples, amp))
423
424 def _test_unitary_power_spectrum_delta_suite() :
425     print('Test unitary power spectrums on various delta functions')
426     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
427     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
428     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
429     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
430     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
431     _test_unitary_power_spectrum_delta(
432         amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
433
434 def _gaussian2(area, mean, std, t) :
435     "Integral over all time = area (i.e. normalized for area=1)"
436     return area/(std*_numpy.sqrt(2.0*_numpy.pi)) * _numpy.exp(
437         -0.5*((t-mean)/std)**2)
438
439 def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512) : #1024
440     x = _numpy.zeros((samples,), dtype=_numpy.float)
441     mean = _numpy.float(mean)
442     for i in range(samples) :
443         t = i/_numpy.float(samp_freq)
444         x[i] = _gaussian2(area, mean, std, t)
445     freq_axis, power = unitary_power_spectrum(x, samp_freq)
446
447     # generate the predicted curve
448     # by comparing our _gaussian2() form to _gaussian(),
449     # we see that the Fourier transform of x(t) has parameters:
450     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
451     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
452     #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
453     # So our power spectral density per unit time is given by
454     #  P(f) = 2 |X(f)|**2 / T
455     # Where
456     #  T  = samples/samp_freq  (total time of data aquisition)
457     mean = 0.0
458     area = area /(std*_numpy.sqrt(2.0*_numpy.pi))
459     std = 1.0/(2.0*_numpy.pi*std)
460     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
461     # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
462     df = _numpy.float(samp_freq)/samples
463     for i in range(len(freq_axis)) :
464         f = i*df
465         gaus = _gaussian2(area, mean, std, f)
466         expected[i] = 2.0 * gaus**2 * samp_freq/samples
467     print(('The power should be a half-gaussian, '
468            'with a peak at 0 Hz with amplitude {} ({})').format(
469             expected[0], power[0]))
470
471     if TEST_PLOTS :
472         pylab.figure()
473         pylab.subplot(211)
474         pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
475         pylab.title('time series')
476         pylab.subplot(212)
477         pylab.plot(freq_axis, power, 'r.')
478         pylab.plot(freq_axis, expected, 'b-')
479         pylab.title('freq series')
480
481 def _test_unitary_power_spectrum_gaussian_suite() :
482     print('Test unitary power spectrums on various gaussian functions')
483     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=1024)
484     _test_unitary_power_spectrum_gaussian(area=1, std=2, samp_freq=10.0, samples=1024)
485     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048)
486     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048)
487     _test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024)
488     _test_unitary_power_spectrum_gaussian(
489         area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1),
490         samples=1024)
491
492 def window_hann(length) :
493     "Returns a Hann window array with length entries"
494     win = _numpy.zeros((length,), dtype=_numpy.float)
495     for i in range(length) :
496         win[i] = 0.5*(1.0-_numpy.cos(2.0*_numpy.pi*_numpy.float(i)/(length)))
497     # avg value of cos over a period is 0
498     # so average height of Hann window is 0.5
499     return win
500
501 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
502                        overlap=True, window=window_hann) :
503     """Compute the avgerage power spectrum of DATA.
504
505     Taken with a sampling frequency FREQ.
506
507     DATA must be real (not complex) by breaking DATA into chunks.
508     The chunks may or may not be overlapping (by setting OVERLAP).
509     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
510     and the resulting spectra are averaged together.
511     See NR 13.4 for rational.
512
513     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
514     CHUNK_SIZE should really be a power of 2.
515     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
516     the FFT ignores some of the later points.
517     """
518     if chunk_size != floor_pow_of_two(chunk_size):
519         raise ValueError(
520             'chunk_size {} should be a power of 2'.format(chunk_size))
521
522     nchunks = len(data)/chunk_size # integer division = implicit floor
523     if overlap :
524         chunk_step = chunk_size/2
525     else :
526         chunk_step = chunk_size
527
528     win = window(chunk_size) # generate a window of the appropriate size
529     freq_axis = _numpy.linspace(0, freq/2, chunk_size/2+1)
530     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
531     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
532     # See Numerical Recipies for a details.
533     power = _numpy.zeros((chunk_size/2+1,), dtype=_numpy.float)
534     for i in range(nchunks) :
535         starti = i*chunk_step
536         stopi = starti+chunk_size
537         fft_chunk = _numpy.fft.rfft(data[starti:stopi]*win)
538         p_chunk = (fft_chunk * fft_chunk.conj()).real
539         power += p_chunk.astype(_numpy.float)
540     power /= _numpy.float(nchunks)
541     return (freq_axis, power)
542
543 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
544                                overlap=True, window=window_hann) :
545     """Compute the average power spectrum, preserving normalization
546     """
547     freq_axis,power = avg_power_spectrum(
548         data, freq, chunk_size, overlap, window)
549     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
550     # see unitary_power_spectrum()
551     power *= 2.0 / (freq*_numpy.float(chunk_size)) * 8/3
552     #                                       * 8/3  to remove power from windowing
553     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
554     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
555     # So our calulated power has and extra <w(t)**2> in it.
556     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
557     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
558     # The normalization is not perfect. ??
559     # The normalization approaches perfection as chunk_size -> infinity.
560     return (freq_axis, power)
561
562 def _test_unitary_avg_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024,
563                                          chunk_size=512, overlap=True,
564                                          window=window_hann) :
565     x = _numpy.zeros((samples,), dtype=_numpy.float)
566     samp_freq = _numpy.float(samp_freq)
567     for i in range(samples) :
568         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
569     freq_axis, power = unitary_avg_power_spectrum(
570         x, samp_freq, chunk_size, overlap, window)
571     imax = _numpy.argmax(power)
572
573     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
574     df = samp_freq/_numpy.float(chunk_size) # df = 1/T, where T = total_time
575     i = int(sin_freq/df)
576     expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
577
578     print('The power should peak at {} Hz of {} ({}, {})'.format(
579         sin_freq, expected[i], freq_axis[imax], power[imax]))
580     Pexp = 0
581     P    = 0
582     for i in range(len(freq_axis)) :
583         Pexp += expected[i] * df
584         P    += power[i] * df
585     print('The total power should be {} ({})'.format(Pexp, P))
586
587     if TEST_PLOTS :
588         pylab.figure()
589         pylab.subplot(211)
590         pylab.plot(_numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
591         pylab.title('time series')
592         pylab.subplot(212)
593         pylab.plot(freq_axis, power, 'r.')
594         pylab.plot(freq_axis, expected, 'b-')
595         pylab.title('{} samples of sin at {} Hz'.format(samples, sin_freq))
596
597 def _test_unitary_avg_power_spectrum_sin_suite() :
598     print('Test unitary avg power spectrums on variously shaped sin functions')
599     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
600     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
601     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
602     _test_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
603     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
604     # test long wavelenth sin, so be closer to window frequency
605     _test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
606     # finally, with some irrational numbers, to check that I'm not getting lucky
607     _test_unitary_avg_power_spectrum_sin(
608         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
609
610
611 def test() :
612     _test_rfft_suite()
613     _test_unitary_rfft_parsevals_suite()
614     _test_unitary_rfft_rect_suite()
615     _test_unitary_rfft_gaussian_suite()
616     _test_unitary_power_spectrum_sin_suite()
617     _test_unitary_power_spectrum_delta_suite()
618     _test_unitary_power_spectrum_gaussian_suite()
619     _test_unitary_avg_power_spectrum_sin_suite()
620
621 if __name__ == '__main__':
622     from optparse import OptionParser
623
624     p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.')
625     p.add_option('-p', '--plot', dest='plot', action='store_true',
626                  help='Display time- and freq-space plots of test transforms.')
627
628     options,args = p.parse_args()
629
630     if options.plot:
631         import pylab
632         TEST_PLOTS = True
633     test()
634     if options.plot:
635         pylab.show()