Explicitly convert powers to reals (they should have no imaginary portion.)
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2011  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 numpy.fft.rfft.
19
20 Main entry functions:
21   unitary_rfft(data, freq=1.0)
22   power_spectrum(data, freq=1.0)
23   unitary_power_spectrum(data, freq=1.0)
24   avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann)
25   unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann)
26 """
27
28 from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \
29     sinc, arctan2, array, ones, arange, linspace, zeros, \
30     uint16, float, concatenate, fromfile, argmax, complex
31 from numpy.fft import rfft
32
33
34 __version__ = '0.2'
35
36 # Display time- and freq-space plots of the test transforms if True
37 TEST_PLOTS = False
38
39
40 def floor_pow_of_two(num) :
41     "Round num down to the closest exact a power of two."
42     lnum = log2(num)
43     if int(lnum) != lnum :
44         num = 2**floor(lnum)
45     return num
46
47 def round_pow_of_two(num) :
48     "Round num to the closest exact a power of two on a log scale."
49     lnum = log2(num)
50     if int(lnum) != lnum :
51         num = 2**round(lnum)
52     return num
53
54 def ceil_pow_of_two(num) :
55     "Round num up to the closest exact a power of two."
56     lnum = log2(num)
57     if int(lnum) != lnum :
58         num = 2**ceil(lnum)
59     return num
60
61 def _test_rfft(xs, Xs) :
62     # Numpy's FFT algoritm returns
63     #          n-1
64     #   X[k] = SUM x[m] exp (-j 2pi km /n)
65     #          m=0
66     # (see http://www.tramy.us/numpybook.pdf)
67     j = complex(0,1)
68     n = len(xs)
69     Xa = []
70     for k in range(n) :
71         Xa.append(sum([x*exp(-j*2*pi*k*m/n) for x,m in zip(xs,range(n))]))
72         if k < len(Xs):
73             assert (Xs[k]-Xa[k])/abs(Xa[k]) < 1e-6, \
74                 "rfft mismatch on element %d: %g != %g, relative error %g" \
75                 % (k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k]))
76     # Which should satisfy the discrete form of Parseval's theorem
77     #   n-1               n-1
78     #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
79     #   m=0               k=0
80     timeSum = sum([abs(x)**2 for x in xs])
81     freqSum = sum([abs(X)**2 for X in Xa])
82     assert abs(freqSum/float(n) - timeSum)/timeSum < 1e-6, \
83         "Mismatch on Parseval's, %g != 1/%d * %g" % (timeSum, n, freqSum)
84
85 def _test_rfft_suite() :
86     print "Test numpy rfft definition"
87     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
88     _test_rfft(xs, rfft(xs))
89
90 def unitary_rfft(data, freq=1.0) :
91     """
92     Compute the Fourier transform of real data.
93     Unitary (preserves power [Parseval's theorem]).
94    
95     If the units on your data are Volts,
96     and your sampling frequency is in Hz,
97     then freq_axis will be in Hz,
98     and trans will be in Volts.
99     """
100     nsamps = floor_pow_of_two(len(data))
101     # Which should satisfy the discrete form of Parseval's theorem
102     #   n-1               n-1
103     #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
104     #   m=0               k=0
105     # However, we want our FFT to satisfy the continuous Parseval eqn
106     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
107     # which has the discrete form
108     #   n-1              n-1
109     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
110     #   m=0              k=0
111     # with X'_k = AX, this gives us
112     #   n-1                     n-1
113     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
114     #   m=0                     k=0
115     # so we see
116     #   A^2 df/dt = 1/n
117     #   A^2 = 1/n dt/df
118     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
119     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
120     # f_c in the transformed data is the Nyquist frequency (12.1.2)
121     #   f_c = 1/2dt
122     # and the points are spaced out by (12.1.5)
123     #   df = 1/ndt
124     # so
125     #   dt = 1/ndf
126     #   dt/df = 1/ndf^2
127     #   A^2 = 1/n^2df^2
128     #   A = 1/ndf = ndt/n = dt
129     # so we can convert the Numpy transformed data to match our unitary
130     # continuous transformed data with (also NR 12.1.8)
131     #   X'_k = dtX = X / <sampling freq>
132     trans = rfft(data[0:nsamps]) / float(freq)
133     freq_axis = linspace(0, freq/2, nsamps/2+1)
134     return (freq_axis, trans)
135
136 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
137     # Which should satisfy the discretized integral form of Parseval's theorem
138     #   n-1              n-1
139     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
140     #   m=0              k=0
141     dt = 1.0/freq
142     df = freqs[1]-freqs[0]
143     assert (df - 1/(len(xs)*dt))/df < 1e-6, \
144         "Mismatch in spacing, %g != 1/(%d*%g)" % (df, len(xs), dt)
145     Xa = list(Xs)
146     for k in range(len(Xs)-1,1,-1) :
147         Xa.append(Xa[k])
148     assert len(xs) == len(Xa), "Length mismatch %d != %d" % (len(xs), len(Xa))
149     lhs = sum([abs(x)**2 for x in xs]) * dt
150     rhs = sum([abs(X)**2 for X in Xa]) * df
151     assert abs(lhs - rhs)/lhs < 1e-4, "Mismatch on Parseval's, %g != %g" \
152         % (lhs, rhs)
153
154 def _test_unitary_rfft_parsevals_suite():
155     print "Test unitary rfft on Parseval's theorem"
156     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
157     dt = pi
158     freqs,Xs = unitary_rfft(xs, 1.0/dt)
159     _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
160
161 def _rect(t) :
162     if abs(t) < 0.5 :
163         return 1
164     else :
165         return 0
166
167 def _test_unitary_rfft_rect(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
168     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
169     samp_freq = float(samp_freq)
170     a = float(a)
171
172     x = zeros((samples,), dtype=float)
173     dt = 1.0/samp_freq
174     for i in range(samples) :
175         t = i*dt
176         x[i] = _rect(a*(t-time_shift))
177     freq_axis, X = unitary_rfft(x, samp_freq)
178     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
179
180     # remove the phase due to our time shift
181     j = complex(0.0,1.0) # sqrt(-1)
182     for i in range(len(freq_axis)) :
183         f = freq_axis[i]
184         inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
185         X[i] *= inverse_phase_shift
186
187     expected = zeros((len(freq_axis),), dtype=float)
188     # normalized sinc(x) = sin(pi x)/(pi x)
189     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
190     assert sinc(0.5) == 2.0/pi, "abnormal sinc()"
191     for i in range(len(freq_axis)) :
192         f = freq_axis[i]
193         expected[i] = 1.0/abs(a) * sinc(f/a)
194
195     if TEST_PLOTS :
196         pylab.figure()
197         pylab.subplot(211)
198         pylab.plot(arange(0, dt*samples, dt), x)
199         pylab.title('time series')
200         pylab.subplot(212)
201         pylab.plot(freq_axis, X.real, 'r.')
202         pylab.plot(freq_axis, X.imag, 'g.')
203         pylab.plot(freq_axis, expected, 'b-')
204         pylab.title('freq series')
205
206 def _test_unitary_rfft_rect_suite() :
207     print "Test unitary FFTs on variously shaped rectangular functions"
208     _test_unitary_rfft_rect(a=0.5)
209     _test_unitary_rfft_rect(a=2.0)
210     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
211     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
212
213 def _gaussian(a, t) :
214     return exp(-a * t**2)
215
216 def _test_unitary_rfft_gaussian(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
217     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
218     samp_freq = float(samp_freq)
219     a = float(a)
220
221     x = zeros((samples,), dtype=float)
222     dt = 1.0/samp_freq
223     for i in range(samples) :
224         t = i*dt
225         x[i] = _gaussian(a, (t-time_shift))
226     freq_axis, X = unitary_rfft(x, samp_freq)
227     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
228
229     # remove the phase due to our time shift
230     j = complex(0.0,1.0) # sqrt(-1)
231     for i in range(len(freq_axis)) :
232         f = freq_axis[i]
233         inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
234         X[i] *= inverse_phase_shift
235
236     expected = zeros((len(freq_axis),), dtype=float)
237     for i in range(len(freq_axis)) :
238         f = freq_axis[i]
239         expected[i] = sqrt(pi/a) * _gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself.
240
241     if TEST_PLOTS :
242         pylab.figure()
243         pylab.subplot(211)
244         pylab.plot(arange(0, dt*samples, dt), x)
245         pylab.title('time series')
246         pylab.subplot(212)
247         pylab.plot(freq_axis, X.real, 'r.')
248         pylab.plot(freq_axis, X.imag, 'g.')
249         pylab.plot(freq_axis, expected, 'b-')
250         pylab.title('freq series')
251
252 def _test_unitary_rfft_gaussian_suite() :
253     print "Test unitary FFTs on variously shaped gaussian functions"
254     _test_unitary_rfft_gaussian(a=0.5)
255     _test_unitary_rfft_gaussian(a=2.0)
256     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
257     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
258
259
260
261 def power_spectrum(data, freq=1.0) :
262     """
263     Compute the power spectrum of DATA taken with a sampling frequency FREQ.
264     DATA must be real (not complex).
265     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
266     If the number of samples in data is not an integer power of two,
267     the FFT ignores some of the later points.
268     """
269     nsamps = floor_pow_of_two(len(data))
270     
271     freq_axis = linspace(0, freq/2, nsamps/2+1)
272     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
273     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
274     # See Numerical Recipies for a details.
275     trans = rfft(data[0:nsamps])
276     power = (trans * trans.conj()).real # We want the square of the amplitude.
277     return (freq_axis, power)
278
279 def unitary_power_spectrum(data, freq=1.0) :
280     freq_axis,power = power_spectrum(data, freq)
281     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
282     #
283     # numpy normalizes with 1/N on the inverse transform ifft,
284     # so we should normalize the freq-space representation with 1/sqrt(N).
285     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
286     # So the power gets normalized by that twice and we have 2/N
287     #
288     # On top of this, the FFT assumes a sampling freq of 1 per second,
289     # and we want to preserve area under our curves.
290     # If our total time T = len(data)/freq is smaller than 1,
291     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)), 
292     # and we need to scale the powers down to conserve area.
293     # df_fft * F_fft(f) = df_real *F_real(f)
294     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
295     # So the power gets normalized by *that* twice and we have 2/N * freq**2
296
297     # power per unit time
298     # measure x(t) for time T
299     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
300     # PSD(f) = 2 |X(f)|**2 / T
301
302     # total_time = len(data)/float(freq)
303     # power *= 2.0 / float(freq)**2   /   total_time
304     # power *= 2.0 / freq**2   *   freq / len(data)
305     power *= 2.0 / (freq * float(len(data)))
306
307     return (freq_axis, power)
308
309 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024) :
310     x = zeros((samples,), dtype=float)
311     samp_freq = float(samp_freq)
312     for i in range(samples) :
313         x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
314     freq_axis, power = unitary_power_spectrum(x, samp_freq)
315     imax = argmax(power)
316
317     expected = zeros((len(freq_axis),), dtype=float)
318     df = samp_freq/float(samples) # df = 1/T, where T = total_time
319     i = int(sin_freq/df)
320     # average power per unit time is 
321     #  P = <x(t)**2>
322     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
323     # so average value of (int sin(t)**2 dt) per unit time is 0.5
324     #  P = 0.5
325     # we spread that power over a frequency bin of width df, sp
326     #  P(f0) = 0.5/df
327     # where f0 is the sin's frequency
328     #
329     # or :
330     # FFT of sin(2*pi*t*f0) gives
331     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
332     # (area under x(t) = 0, area under X(f) = 0)
333     # so one sided power spectral density (PSD) per unit time is
334     #  P(f) = 2 |X(f)|**2 / T
335     #       = 2 * |0.5 delta(f-f0)|**2 / T
336     #       = 0.5 * |delta(f-f0)|**2 / T
337     # but we're discrete and want the integral of the 'delta' to be 1, 
338     # so 'delta'*df = 1  --> 'delta' = 1/df, and
339     #  P(f) = 0.5 / (df**2 * T)
340     #       = 0.5 / df                (T = 1/df)
341     expected[i] = 0.5 / df
342
343     print "The power should be a peak at %g Hz of %g (%g, %g)" % \
344         (sin_freq, expected[i], freq_axis[imax], power[imax])
345     Pexp = 0
346     P    = 0
347     for i in range(len(freq_axis)) :
348         Pexp += expected[i] *df
349         P    += power[i] * df
350     print " The total power should be %g (%g)" % (Pexp, P)
351
352     if TEST_PLOTS :
353         pylab.figure()
354         pylab.subplot(211)
355         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
356         pylab.title('time series')
357         pylab.subplot(212)
358         pylab.plot(freq_axis, power, 'r.')
359         pylab.plot(freq_axis, expected, 'b-')
360         pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
361
362 def _test_unitary_power_spectrum_sin_suite() :
363     print "Test unitary power spectrums on variously shaped sin functions"
364     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
365     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
366     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
367     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
368     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
369     # finally, with some irrational numbers, to check that I'm not getting lucky
370     _test_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
371     # test with non-integer number of periods
372     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
373
374 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) :
375     x = zeros((samples,), dtype=float)
376     samp_freq = float(samp_freq)
377     x[0] = amp
378     freq_axis, power = unitary_power_spectrum(x, samp_freq)
379
380     # power = <x(t)**2> = (amp)**2 * dt/T
381     # we spread that power over the entire freq_axis [0,fN], so
382     #  P(f)  = (amp)**2 dt / (T fN)
383     # where
384     #  dt = 1/samp_freq        (sample period)
385     #  T  = samples/samp_freq  (total time of data aquisition)
386     #  fN = 0.5 samp_freq      (Nyquist frequency)
387     # so
388     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
389     #       = 2 amp**2 / (samp_freq*samples)
390     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
391     expected = ones((len(freq_axis),), dtype=float) * expected_amp
392
393     print "The power should be flat at y = %g (%g)" % (expected_amp, power[0])
394     
395     if TEST_PLOTS :
396         pylab.figure()
397         pylab.subplot(211)
398         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
399         pylab.title('time series')
400         pylab.subplot(212)
401         pylab.plot(freq_axis, power, 'r.')
402         pylab.plot(freq_axis, expected, 'b-')
403         pylab.title('%g samples of delta amp %g' % (samples, amp))
404
405 def _test_unitary_power_spectrum_delta_suite() :
406     print "Test unitary power spectrums on various delta functions"
407     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
408     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
409     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
410     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
411     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
412     _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024)
413
414 def _gaussian2(area, mean, std, t) :
415     "Integral over all time = area (i.e. normalized for area=1)"
416     return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2)
417     
418 def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512) : #1024
419     x = zeros((samples,), dtype=float)
420     mean = float(mean)
421     for i in range(samples) :
422         t = i/float(samp_freq)
423         x[i] = _gaussian2(area, mean, std, t)
424     freq_axis, power = unitary_power_spectrum(x, samp_freq)
425
426     # generate the predicted curve
427     # by comparing our _gaussian2() form to _gaussian(),
428     # we see that the Fourier transform of x(t) has parameters:
429     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
430     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
431     #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
432     # So our power spectral density per unit time is given by
433     #  P(f) = 2 |X(f)|**2 / T
434     # Where
435     #  T  = samples/samp_freq  (total time of data aquisition)
436     mean = 0.0
437     area = area /(std*sqrt(2.0*pi))
438     std = 1.0/(2.0*pi*std)
439     expected = zeros((len(freq_axis),), dtype=float)
440     df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
441     for i in range(len(freq_axis)) :
442         f = i*df
443         gaus = _gaussian2(area, mean, std, f)
444         expected[i] = 2.0 * gaus**2 * samp_freq/samples
445     print "The power should be a half-gaussian, ",
446     print "with a peak at 0 Hz with amplitude %g (%g)" % (expected[0], power[0])
447
448     if TEST_PLOTS :
449         pylab.figure()
450         pylab.subplot(211)
451         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
452         pylab.title('time series')
453         pylab.subplot(212)
454         pylab.plot(freq_axis, power, 'r.')
455         pylab.plot(freq_axis, expected, 'b-')
456         pylab.title('freq series')
457
458 def _test_unitary_power_spectrum_gaussian_suite() :
459     print "Test unitary power spectrums on various gaussian functions"
460     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=1024)
461     _test_unitary_power_spectrum_gaussian(area=1, std=2, samp_freq=10.0, samples=1024)
462     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048)
463     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048)
464     _test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024)
465     _test_unitary_power_spectrum_gaussian(area=pi, std=sqrt(2), samp_freq=exp(1), samples=1024)
466
467 def window_hann(length) :
468     "Returns a Hann window array with length entries"
469     win = zeros((length,), dtype=float)
470     for i in range(length) :
471         win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length)))
472     # avg value of cos over a period is 0
473     # so average height of Hann window is 0.5
474     return win
475
476 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
477                        overlap=True, window=window_hann) :
478     """
479     Compute the avg power spectrum of DATA taken with a sampling frequency FREQ.
480     DATA must be real (not complex) by breaking DATA into chunks.
481     The chunks may or may not be overlapping (by setting OVERLAP).
482     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
483     and the resulting spectra are averaged together.
484     See NR 13.4 for rational.
485     
486     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
487     CHUNK_SIZE should really be a power of 2.
488     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
489     the FFT ignores some of the later points.
490     """
491     assert chunk_size == floor_pow_of_two(chunk_size), \
492         "chunk_size %d should be a power of 2" % chunk_size
493
494     nchunks = len(data)/chunk_size # integer division = implicit floor
495     if overlap :
496         chunk_step = chunk_size/2
497     else :
498         chunk_step = chunk_size
499     
500     win = window(chunk_size) # generate a window of the appropriate size
501     freq_axis = linspace(0, freq/2, chunk_size/2+1)
502     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
503     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
504     # See Numerical Recipies for a details.
505     power = zeros((chunk_size/2+1,), dtype=float)
506     for i in range(nchunks) :
507         starti = i*chunk_step
508         stopi = starti+chunk_size
509         fft_chunk = rfft(data[starti:stopi]*win)
510         p_chunk = (fft_chunk * fft_chunk.conj()).real
511         power += p_chunk.astype(float)
512     power /= float(nchunks)
513     return (freq_axis, power)
514
515 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
516                                overlap=True, window=window_hann) :
517     """
518     compute the average power spectrum, preserving normalization
519     """
520     freq_axis,power = avg_power_spectrum(data, freq, chunk_size,
521                                          overlap, window)
522     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
523     power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum()            
524     #                                       * 8/3  to remove power from windowing
525     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
526     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
527     # So our calulated power has and extra <w(t)**2> in it.
528     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
529     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
530     # The normalization is not perfect. ??
531     # The normalization approaches perfection as chunk_size -> infinity.
532     return (freq_axis, power)
533
534 def _test_unitary_avg_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024,
535                                          chunk_size=512, overlap=True,
536                                          window=window_hann) :
537     x = zeros((samples,), dtype=float)
538     samp_freq = float(samp_freq)
539     for i in range(samples) :
540         x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
541     freq_axis, power = unitary_avg_power_spectrum(x, samp_freq, chunk_size,
542                                                   overlap, window)
543     imax = argmax(power)
544
545     expected = zeros((len(freq_axis),), dtype=float)
546     df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time
547     i = int(sin_freq/df)
548     expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
549
550     print "The power should be a peak at %g Hz of %g (%g, %g)" % \
551         (sin_freq, expected[i], freq_axis[imax], power[imax])
552     Pexp = 0
553     P    = 0
554     for i in range(len(freq_axis)) :
555         Pexp += expected[i] * df
556         P    += power[i] * df
557     print " The total power should be %g (%g)" % (Pexp, P)
558
559     if TEST_PLOTS :
560         pylab.figure()
561         pylab.subplot(211)
562         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
563         pylab.title('time series')
564         pylab.subplot(212)
565         pylab.plot(freq_axis, power, 'r.')
566         pylab.plot(freq_axis, expected, 'b-')
567         pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
568
569 def _test_unitary_avg_power_spectrum_sin_suite() :
570     print "Test unitary avg power spectrums on variously shaped sin functions"
571     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
572     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
573     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
574     _test_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
575     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
576     # test long wavelenth sin, so be closer to window frequency
577     _test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
578     # finally, with some irrational numbers, to check that I'm not getting lucky
579     _test_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
580
581
582 def test() :
583     _test_rfft_suite()
584     _test_unitary_rfft_parsevals_suite()
585     _test_unitary_rfft_rect_suite()
586     _test_unitary_rfft_gaussian_suite()
587     _test_unitary_power_spectrum_sin_suite()
588     _test_unitary_power_spectrum_delta_suite()
589     _test_unitary_power_spectrum_gaussian_suite()
590     _test_unitary_avg_power_spectrum_sin_suite()
591
592 if __name__ == "__main__" :
593     from optparse import OptionParser
594
595     p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.')
596     p.add_option('-p', '--plot', dest='plot', action='store_true', 
597                  help='Display time- and freq-space plots of test transforms.')
598
599     options,args = p.parse_args()
600
601     if options.plot:
602         import pylab
603         TEST_PLOTS = True
604     test()
605     if options.plot:
606         pylab.show()