b79d3b44922df92dd084318189df0499bb7d25aa
[FFT-tools.git] / FFT_tools.py
1 #!/usr/bin/python
2 #
3 # Wrap Numpy's fft module to produce 1D unitary transforms and power spectra.
4 #
5 # Copyright (C) 2008  W. Trevor King
6 # All rights reserved.
7 #
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions
10 # are met:
11
12 #  * Redistributions of source code must retain the above copyright
13 #    notice, this list of conditions and the following disclaimer.
14 #
15 #  * Redistributions in binary form must reproduce the above copyright
16 #    notice, this list of conditions and the following disclaimer in
17 #    the documentation and/or other materials provided with the
18 #    distribution.
19 #
20 #  * Neither the name of the copyright holders nor the names of the
21 #    contributors may be used to endorse or promote products derived
22 #    from this software without specific prior written permission.
23
24 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
28 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
30 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
32 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
34 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35 # POSSIBILITY OF SUCH DAMAGE.
36
37 """
38 Define some FFT wrappers to reduce clutter.
39 Provides a unitary discrete FFT and a windowed version.
40 Based on numpy.fft.rfft.
41
42 Main entry functions:
43   unitary_rfft(data, freq=1.0)
44   power_spectrum(data, freq=1.0)
45   unitary_power_spectrum(data, freq=1.0)
46   avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann)
47   unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann)
48 """
49
50 from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \
51     sinc, arctan2, array, ones, arange, linspace, zeros, \
52     uint16, float, concatenate, fromfile, argmax, complex
53 from numpy.fft import rfft
54
55
56 # print time- and freq- space plots of the test transforms if True
57 TEST_PLOTS = False
58 #TEST_PLOTS = True 
59
60 def floor_pow_of_two(num) :
61     "Round num down to the closest exact a power of two."
62     lnum = log2(num)
63     if int(lnum) != lnum :
64         num = 2**floor(lnum)
65     return num
66
67 def round_pow_of_two(num) :
68     "Round num to the closest exact a power of two on a log scale."
69     lnum = log2(num)
70     if int(lnum) != lnum :
71         num = 2**round(lnum)
72     return num
73
74 def ceil_pow_of_two(num) :
75     "Round num up to the closest exact a power of two."
76     lnum = log2(num)
77     if int(lnum) != lnum :
78         num = 2**ceil(lnum)
79     return num
80
81 def _test_rfft(xs, Xs) :
82     # Numpy's FFT algoritm returns
83     #          n-1
84     #   X[k] = SUM x[m] exp (-j 2pi km /n)
85     #          m=0
86     # (see http://www.tramy.us/numpybook.pdf)
87     j = complex(0,1)
88     n = len(xs)
89     Xa = []
90     for k in range(n) :
91         Xa.append(sum([x*exp(-j*2*pi*k*m/n) for x,m in zip(xs,range(n))]))
92         if k < len(Xs):
93             assert (Xs[k]-Xa[k])/abs(Xa[k]) < 1e-6, \
94                 "rfft mismatch on element %d: %g != %g, relative error %g" \
95                 % (k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k]))
96     # Which should satisfy the discrete form of Parseval's theorem
97     #   n-1               n-1
98     #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
99     #   m=0               k=0
100     timeSum = sum([abs(x)**2 for x in xs])
101     freqSum = sum([abs(X)**2 for X in Xa])
102     assert abs(freqSum/float(n) - timeSum)/timeSum < 1e-6, \
103         "Mismatch on Parseval's, %g != 1/%d * %g" % (timeSum, n, freqSum)
104
105 def _test_rfft_suite() :
106     print "Test numpy rfft definition"
107     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
108     _test_rfft(xs, rfft(xs))
109
110 def unitary_rfft(data, freq=1.0) :
111     """
112     Compute the Fourier transform of real data.
113     Unitary (preserves power [Parseval's theorem]).
114    
115     If the units on your data are Volts,
116     and your sampling frequency is in Hz,
117     then freq_axis will be in Hz,
118     and trans will be in Volts.
119     """
120     nsamps = floor_pow_of_two(len(data))
121     # Which should satisfy the discrete form of Parseval's theorem
122     #   n-1               n-1
123     #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
124     #   m=0               k=0
125     # However, we want our FFT to satisfy the continuous Parseval eqn
126     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
127     # which has the discrete form
128     #   n-1              n-1
129     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
130     #   m=0              k=0
131     # with X'_k = AX, this gives us
132     #   n-1                     n-1
133     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
134     #   m=0                     k=0
135     # so we see
136     #   A^2 df/dt = 1/n
137     #   A^2 = 1/n dt/df
138     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
139     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
140     # f_c in the transformed data is the Nyquist frequency (12.1.2)
141     #   f_c = 1/2dt
142     # and the points are spaced out by (12.1.5)
143     #   df = 1/ndt
144     # so
145     #   dt = 1/ndf
146     #   dt/df = 1/ndf^2
147     #   A^2 = 1/n^2df^2
148     #   A = 1/ndf = ndt/n = dt
149     # so we can convert the Numpy transformed data to match our unitary
150     # continuous transformed data with (also NR 12.1.8)
151     #   X'_k = dtX = X / <sampling freq>
152     trans = rfft(data[0:nsamps]) / float(freq)
153     freq_axis = linspace(0, freq/2, nsamps/2+1)
154     return (freq_axis, trans)
155
156 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
157     # Which should satisfy the discretized integral form of Parseval's theorem
158     #   n-1              n-1
159     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
160     #   m=0              k=0
161     dt = 1.0/freq
162     df = freqs[1]-freqs[0]
163     assert (df - 1/(len(xs)*dt))/df < 1e-6, \
164         "Mismatch in spacing, %g != 1/(%d*%g)" % (df, len(xs), dt)
165     Xa = list(Xs)
166     for k in range(len(Xs)-1,1,-1) :
167         Xa.append(Xa[k])
168     assert len(xs) == len(Xa), "Length mismatch %d != %d" % (len(xs), len(Xa))
169     lhs = sum([abs(x)**2 for x in xs]) * dt
170     rhs = sum([abs(X)**2 for X in Xa]) * df
171     assert abs(lhs - rhs)/lhs < 1e-4, "Mismatch on Parseval's, %g != %g" \
172         % (lhs, rhs)
173
174 def _test_unitary_rfft_parsevals_suite():
175     print "Test unitary rfft on Parseval's theorem"
176     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
177     dt = pi
178     freqs,Xs = unitary_rfft(xs, 1.0/dt)
179     _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
180
181 def _rect(t) :
182     if abs(t) < 0.5 :
183         return 1
184     else :
185         return 0
186
187 def _test_unitary_rfft_rect(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
188     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
189     samp_freq = float(samp_freq)
190     a = float(a)
191
192     x = zeros((samples,), dtype=float)
193     dt = 1.0/samp_freq
194     for i in range(samples) :
195         t = i*dt
196         x[i] = _rect(a*(t-time_shift))
197     freq_axis, X = unitary_rfft(x, samp_freq)
198     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
199
200     # remove the phase due to our time shift
201     j = complex(0.0,1.0) # sqrt(-1)
202     for i in range(len(freq_axis)) :
203         f = freq_axis[i]
204         inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
205         X[i] *= inverse_phase_shift
206
207     expected = zeros((len(freq_axis),), dtype=float)
208     # normalized sinc(x) = sin(pi x)/(pi x)
209     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
210     assert sinc(0.5) == 2.0/pi, "abnormal sinc()"
211     for i in range(len(freq_axis)) :
212         f = freq_axis[i]
213         expected[i] = 1.0/abs(a) * sinc(f/a)
214
215     if TEST_PLOTS :
216         pylab.figure()
217         pylab.subplot(211)
218         pylab.plot(arange(0, dt*samples, dt), x)
219         pylab.title('time series')
220         pylab.subplot(212)
221         pylab.plot(freq_axis, X.real, 'r.')
222         pylab.plot(freq_axis, X.imag, 'g.')
223         pylab.plot(freq_axis, expected, 'b-')
224         pylab.title('freq series')
225
226 def _test_unitary_rfft_rect_suite() :
227     print "Test unitary FFTs on variously shaped rectangular functions"
228     _test_unitary_rfft_rect(a=0.5)
229     _test_unitary_rfft_rect(a=2.0)
230     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
231     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
232
233 def _gaussian(a, t) :
234     return exp(-a * t**2)
235
236 def _test_unitary_rfft_gaussian(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
237     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
238     samp_freq = float(samp_freq)
239     a = float(a)
240
241     x = zeros((samples,), dtype=float)
242     dt = 1.0/samp_freq
243     for i in range(samples) :
244         t = i*dt
245         x[i] = _gaussian(a, (t-time_shift))
246     freq_axis, X = unitary_rfft(x, samp_freq)
247     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
248
249     # remove the phase due to our time shift
250     j = complex(0.0,1.0) # sqrt(-1)
251     for i in range(len(freq_axis)) :
252         f = freq_axis[i]
253         inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
254         X[i] *= inverse_phase_shift
255
256     expected = zeros((len(freq_axis),), dtype=float)
257     for i in range(len(freq_axis)) :
258         f = freq_axis[i]
259         expected[i] = sqrt(pi/a) * _gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself.
260
261     if TEST_PLOTS :
262         pylab.figure()
263         pylab.subplot(211)
264         pylab.plot(arange(0, dt*samples, dt), x)
265         pylab.title('time series')
266         pylab.subplot(212)
267         pylab.plot(freq_axis, X.real, 'r.')
268         pylab.plot(freq_axis, X.imag, 'g.')
269         pylab.plot(freq_axis, expected, 'b-')
270         pylab.title('freq series')
271
272 def _test_unitary_rfft_gaussian_suite() :
273     print "Test unitary FFTs on variously shaped gaussian functions"
274     _test_unitary_rfft_gaussian(a=0.5)
275     _test_unitary_rfft_gaussian(a=2.0)
276     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
277     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
278
279
280
281 def power_spectrum(data, freq=1.0) :
282     """
283     Compute the power spectrum of DATA taken with a sampling frequency FREQ.
284     DATA must be real (not complex).
285     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
286     If the number of samples in data is not an integer power of two,
287     the FFT ignores some of the later points.
288     """
289     nsamps = floor_pow_of_two(len(data))
290     
291     freq_axis = linspace(0, freq/2, nsamps/2+1)
292     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
293     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
294     # See Numerical Recipies for a details.
295     trans = rfft(data[0:nsamps])
296     power = trans * trans.conj() # We want the square of the amplitude.
297     return (freq_axis, power)
298
299 def unitary_power_spectrum(data, freq=1.0) :
300     freq_axis,power = power_spectrum(data, freq)
301     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
302     #
303     # numpy normalizes with 1/N on the inverse transform ifft,
304     # so we should normalize the freq-space representation with 1/sqrt(N).
305     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
306     # So the power gets normalized by that twice and we have 2/N
307     #
308     # On top of this, the FFT assumes a sampling freq of 1 per second,
309     # and we want to preserve area under our curves.
310     # If our total time T = len(data)/freq is smaller than 1,
311     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)), 
312     # and we need to scale the powers down to conserve area.
313     # df_fft * F_fft(f) = df_real *F_real(f)
314     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
315     # So the power gets normalized by *that* twice and we have 2/N * freq**2
316
317     # power per unit time
318     # measure x(t) for time T
319     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
320     # PSD(f) = 2 |X(f)|**2 / T
321
322     # total_time = len(data)/float(freq)
323     # power *= 2.0 / float(freq)**2   /   total_time
324     # power *= 2.0 / freq**2   *   freq / len(data)
325     power *= 2.0 / (freq * float(len(data)))
326
327     return (freq_axis, power)
328
329 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024) :
330     x = zeros((samples,), dtype=float)
331     samp_freq = float(samp_freq)
332     for i in range(samples) :
333         x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
334     freq_axis, power = unitary_power_spectrum(x, samp_freq)
335     imax = argmax(power)
336
337     expected = zeros((len(freq_axis),), dtype=float)
338     df = samp_freq/float(samples) # df = 1/T, where T = total_time
339     i = int(sin_freq/df)
340     # average power per unit time is 
341     #  P = <x(t)**2>
342     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
343     # so average value of (int sin(t)**2 dt) per unit time is 0.5
344     #  P = 0.5
345     # we spread that power over a frequency bin of width df, sp
346     #  P(f0) = 0.5/df
347     # where f0 is the sin's frequency
348     #
349     # or :
350     # FFT of sin(2*pi*t*f0) gives
351     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
352     # (area under x(t) = 0, area under X(f) = 0)
353     # so one sided power spectral density (PSD) per unit time is
354     #  P(f) = 2 |X(f)|**2 / T
355     #       = 2 * |0.5 delta(f-f0)|**2 / T
356     #       = 0.5 * |delta(f-f0)|**2 / T
357     # but we're discrete and want the integral of the 'delta' to be 1, 
358     # so 'delta'*df = 1  --> 'delta' = 1/df, and
359     #  P(f) = 0.5 / (df**2 * T)
360     #       = 0.5 / df                (T = 1/df)
361     expected[i] = 0.5 / df
362
363     print "The power should be a peak at %g Hz of %g (%g, %g)" % \
364         (sin_freq, expected[i], freq_axis[imax], power[imax])
365     Pexp = 0
366     P    = 0
367     for i in range(len(freq_axis)) :
368         Pexp += expected[i] *df
369         P    += power[i] * df
370     print " The total power should be %g (%g)" % (Pexp, P)
371
372     if TEST_PLOTS :
373         pylab.figure()
374         pylab.subplot(211)
375         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
376         pylab.title('time series')
377         pylab.subplot(212)
378         pylab.plot(freq_axis, power, 'r.')
379         pylab.plot(freq_axis, expected, 'b-')
380         pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
381
382 def _test_unitary_power_spectrum_sin_suite() :
383     print "Test unitary power spectrums on variously shaped sin functions"
384     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
385     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
386     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
387     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
388     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
389     # finally, with some irrational numbers, to check that I'm not getting lucky
390     _test_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
391     # test with non-integer number of periods
392     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
393
394 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) :
395     x = zeros((samples,), dtype=float)
396     samp_freq = float(samp_freq)
397     x[0] = amp
398     freq_axis, power = unitary_power_spectrum(x, samp_freq)
399
400     # power = <x(t)**2> = (amp)**2 * dt/T
401     # we spread that power over the entire freq_axis [0,fN], so
402     #  P(f)  = (amp)**2 dt / (T fN)
403     # where
404     #  dt = 1/samp_freq        (sample period)
405     #  T  = samples/samp_freq  (total time of data aquisition)
406     #  fN = 0.5 samp_freq      (Nyquist frequency)
407     # so
408     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
409     #       = 2 amp**2 / (samp_freq*samples)
410     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
411     expected = ones((len(freq_axis),), dtype=float) * expected_amp
412
413     print "The power should be flat at y = %g (%g)" % (expected_amp, power[0])
414     
415     if TEST_PLOTS :
416         pylab.figure()
417         pylab.subplot(211)
418         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
419         pylab.title('time series')
420         pylab.subplot(212)
421         pylab.plot(freq_axis, power, 'r.')
422         pylab.plot(freq_axis, expected, 'b-')
423         pylab.title('%g samples of delta amp %g' % (samples, amp))
424
425 def _test_unitary_power_spectrum_delta_suite() :
426     print "Test unitary power spectrums on various delta functions"
427     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
428     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
429     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
430     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
431     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
432     _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024)
433
434 def _gaussian2(area, mean, std, t) :
435     "Integral over all time = area (i.e. normalized for area=1)"
436     return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2)
437     
438 def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512) : #1024
439     x = zeros((samples,), dtype=float)
440     mean = float(mean)
441     for i in range(samples) :
442         t = i/float(samp_freq)
443         x[i] = _gaussian2(area, mean, std, t)
444     freq_axis, power = unitary_power_spectrum(x, samp_freq)
445
446     # generate the predicted curve
447     # by comparing our _gaussian2() form to _gaussian(),
448     # we see that the Fourier transform of x(t) has parameters:
449     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
450     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
451     #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
452     # So our power spectral density per unit time is given by
453     #  P(f) = 2 |X(f)|**2 / T
454     # Where
455     #  T  = samples/samp_freq  (total time of data aquisition)
456     mean = 0.0
457     area = area /(std*sqrt(2.0*pi))
458     std = 1.0/(2.0*pi*std)
459     expected = zeros((len(freq_axis),), dtype=float)
460     df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
461     for i in range(len(freq_axis)) :
462         f = i*df
463         gaus = _gaussian2(area, mean, std, f)
464         expected[i] = 2.0 * gaus**2 * samp_freq/samples
465     print "The power should be a half-gaussian, ",
466     print "with a peak at 0 Hz with amplitude %g (%g)" % (expected[0], power[0])
467
468     if TEST_PLOTS :
469         pylab.figure()
470         pylab.subplot(211)
471         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
472         pylab.title('time series')
473         pylab.subplot(212)
474         pylab.plot(freq_axis, power, 'r.')
475         pylab.plot(freq_axis, expected, 'b-')
476         pylab.title('freq series')
477
478 def _test_unitary_power_spectrum_gaussian_suite() :
479     print "Test unitary power spectrums on various gaussian functions"
480     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=1024)
481     _test_unitary_power_spectrum_gaussian(area=1, std=2, samp_freq=10.0, samples=1024)
482     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048)
483     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048)
484     _test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024)
485     _test_unitary_power_spectrum_gaussian(area=pi, std=sqrt(2), samp_freq=exp(1), samples=1024)
486
487 def window_hann(length) :
488     "Returns a Hann window array with length entries"
489     win = zeros((length,), dtype=float)
490     for i in range(length) :
491         win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length)))
492     # avg value of cos over a period is 0
493     # so average height of Hann window is 0.5
494     return win
495
496 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
497                        overlap=True, window=window_hann) :
498     """
499     Compute the avg power spectrum of DATA taken with a sampling frequency FREQ.
500     DATA must be real (not complex) by breaking DATA into chunks.
501     The chunks may or may not be overlapping (by setting OVERLAP).
502     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
503     and the resulting spectra are averaged together.
504     See NR 13.4 for rational.
505     
506     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
507     CHUNK_SIZE should really be a power of 2.
508     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
509     the FFT ignores some of the later points.
510     """
511     assert chunk_size == floor_pow_of_two(chunk_size), \
512         "chunk_size %d should be a power of 2" % chunk_size
513
514     nchunks = len(data)/chunk_size # integer division = implicit floor
515     if overlap :
516         chunk_step = chunk_size/2
517     else :
518         chunk_step = chunk_size
519     
520     win = window(chunk_size) # generate a window of the appropriate size
521     freq_axis = linspace(0, freq/2, chunk_size/2+1)
522     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
523     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
524     # See Numerical Recipies for a details.
525     power = zeros((chunk_size/2+1,), dtype=float)
526     for i in range(nchunks) :
527         starti = i*chunk_step
528         stopi = starti+chunk_size
529         fft_chunk = rfft(data[starti:stopi]*win)
530         p_chunk = fft_chunk * fft_chunk.conj() 
531         power += p_chunk.astype(float)
532     power /= float(nchunks)
533     return (freq_axis, power)
534
535 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
536                                overlap=True, window=window_hann) :
537     """
538     compute the average power spectrum, preserving normalization
539     """
540     freq_axis,power = avg_power_spectrum(data, freq, chunk_size,
541                                          overlap, window)
542     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
543     power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum()            
544     #                                       * 8/3  to remove power from windowing
545     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
546     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
547     # So our calulated power has and extra <w(t)**2> in it.
548     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
549     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
550     # The normalization is not perfect. ??
551     # The normalization approaches perfection as chunk_size -> infinity.
552     return (freq_axis, power)
553
554 def _test_unitary_avg_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024,
555                                          chunk_size=512, overlap=True,
556                                          window=window_hann) :
557     x = zeros((samples,), dtype=float)
558     samp_freq = float(samp_freq)
559     for i in range(samples) :
560         x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
561     freq_axis, power = unitary_avg_power_spectrum(x, samp_freq, chunk_size,
562                                                   overlap, window)
563     imax = argmax(power)
564
565     expected = zeros((len(freq_axis),), dtype=float)
566     df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time
567     i = int(sin_freq/df)
568     expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
569
570     print "The power should be a peak at %g Hz of %g (%g, %g)" % \
571         (sin_freq, expected[i], freq_axis[imax], power[imax])
572     Pexp = 0
573     P    = 0
574     for i in range(len(freq_axis)) :
575         Pexp += expected[i] * df
576         P    += power[i] * df
577     print " The total power should be %g (%g)" % (Pexp, P)
578
579     if TEST_PLOTS :
580         pylab.figure()
581         pylab.subplot(211)
582         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
583         pylab.title('time series')
584         pylab.subplot(212)
585         pylab.plot(freq_axis, power, 'r.')
586         pylab.plot(freq_axis, expected, 'b-')
587         pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
588
589 def _test_unitary_avg_power_spectrum_sin_suite() :
590     print "Test unitary avg power spectrums on variously shaped sin functions"
591     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
592     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
593     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
594     _test_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
595     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
596     # test long wavelenth sin, so be closer to window frequency
597     _test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
598     # finally, with some irrational numbers, to check that I'm not getting lucky
599     _test_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
600
601
602 def test() :
603     _test_rfft_suite()
604     _test_unitary_rfft_parsevals_suite()
605     _test_unitary_rfft_rect_suite()
606     _test_unitary_rfft_gaussian_suite()
607     _test_unitary_power_spectrum_sin_suite()
608     _test_unitary_power_spectrum_delta_suite()
609     _test_unitary_power_spectrum_gaussian_suite()
610     _test_unitary_avg_power_spectrum_sin_suite()
611
612 if __name__ == "__main__" :
613     if TEST_PLOTS :
614         import pylab
615     test()
616     if TEST_PLOTS :
617         pylab.show()