Ran ./update_copyright.py to setup copyrights in caclulus and fft.
[hooke.git] / hooke / util / fft.py
1 # Copyright (C) 2008-2010 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of Hooke.
4 #
5 # Hooke is free software: you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation, either
8 # version 3 of the License, or (at your option) any later version.
9 #
10 # Hooke is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Wrap :mod:`numpy.fft` to produce 1D unitary transforms and power spectra.
20
21 Define some FFT wrappers to reduce clutter.
22 Provides a unitary discrete FFT and a windowed version.
23 Based on numpy.fft.rfft.
24
25 Main entry functions:
26   unitary_rfft(data, freq=1.0)
27   power_spectrum(data, freq=1.0)
28   unitary_power_spectrum(data, freq=1.0)
29   avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann)
30   unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann)
31 """
32
33 from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \
34     sinc, arctan2, array, ones, arange, linspace, zeros, \
35     uint16, float, concatenate, fromfile, argmax, complex
36 from numpy.fft import rfft
37
38
39 # print time- and freq- space plots of the test transforms if True
40 TEST_PLOTS = False
41 #TEST_PLOTS = True 
42
43 def floor_pow_of_two(num) :
44     "Round num down to the closest exact a power of two."
45     lnum = log2(num)
46     if int(lnum) != lnum :
47         num = 2**floor(lnum)
48     return num
49
50 def round_pow_of_two(num) :
51     "Round num to the closest exact a power of two on a log scale."
52     lnum = log2(num)
53     if int(lnum) != lnum :
54         num = 2**round(lnum)
55     return num
56
57 def ceil_pow_of_two(num) :
58     "Round num up to the closest exact a power of two."
59     lnum = log2(num)
60     if int(lnum) != lnum :
61         num = 2**ceil(lnum)
62     return num
63
64 def _test_rfft(xs, Xs) :
65     # Numpy's FFT algoritm returns
66     #          n-1
67     #   X[k] = SUM x[m] exp (-j 2pi km /n)
68     #          m=0
69     # (see http://www.tramy.us/numpybook.pdf)
70     j = complex(0,1)
71     n = len(xs)
72     Xa = []
73     for k in range(n) :
74         Xa.append(sum([x*exp(-j*2*pi*k*m/n) for x,m in zip(xs,range(n))]))
75         if k < len(Xs):
76             assert (Xs[k]-Xa[k])/abs(Xa[k]) < 1e-6, \
77                 "rfft mismatch on element %d: %g != %g, relative error %g" \
78                 % (k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k]))
79     # Which should satisfy the discrete form of Parseval's theorem
80     #   n-1               n-1
81     #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
82     #   m=0               k=0
83     timeSum = sum([abs(x)**2 for x in xs])
84     freqSum = sum([abs(X)**2 for X in Xa])
85     assert abs(freqSum/float(n) - timeSum)/timeSum < 1e-6, \
86         "Mismatch on Parseval's, %g != 1/%d * %g" % (timeSum, n, freqSum)
87
88 def _test_rfft_suite() :
89     print "Test numpy rfft definition"
90     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
91     _test_rfft(xs, rfft(xs))
92
93 def unitary_rfft(data, freq=1.0) :
94     """
95     Compute the Fourier transform of real data.
96     Unitary (preserves power [Parseval's theorem]).
97    
98     If the units on your data are Volts,
99     and your sampling frequency is in Hz,
100     then freq_axis will be in Hz,
101     and trans will be in Volts.
102     """
103     nsamps = floor_pow_of_two(len(data))
104     # Which should satisfy the discrete form of Parseval's theorem
105     #   n-1               n-1
106     #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
107     #   m=0               k=0
108     # However, we want our FFT to satisfy the continuous Parseval eqn
109     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
110     # which has the discrete form
111     #   n-1              n-1
112     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
113     #   m=0              k=0
114     # with X'_k = AX, this gives us
115     #   n-1                     n-1
116     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
117     #   m=0                     k=0
118     # so we see
119     #   A^2 df/dt = 1/n
120     #   A^2 = 1/n dt/df
121     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
122     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
123     # f_c in the transformed data is the Nyquist frequency (12.1.2)
124     #   f_c = 1/2dt
125     # and the points are spaced out by (12.1.5)
126     #   df = 1/ndt
127     # so
128     #   dt = 1/ndf
129     #   dt/df = 1/ndf^2
130     #   A^2 = 1/n^2df^2
131     #   A = 1/ndf = ndt/n = dt
132     # so we can convert the Numpy transformed data to match our unitary
133     # continuous transformed data with (also NR 12.1.8)
134     #   X'_k = dtX = X / <sampling freq>
135     trans = rfft(data[0:nsamps]) / float(freq)
136     freq_axis = linspace(0, freq/2, nsamps/2+1)
137     return (freq_axis, trans)
138
139 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
140     # Which should satisfy the discretized integral form of Parseval's theorem
141     #   n-1              n-1
142     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
143     #   m=0              k=0
144     dt = 1.0/freq
145     df = freqs[1]-freqs[0]
146     assert (df - 1/(len(xs)*dt))/df < 1e-6, \
147         "Mismatch in spacing, %g != 1/(%d*%g)" % (df, len(xs), dt)
148     Xa = list(Xs)
149     for k in range(len(Xs)-1,1,-1) :
150         Xa.append(Xa[k])
151     assert len(xs) == len(Xa), "Length mismatch %d != %d" % (len(xs), len(Xa))
152     lhs = sum([abs(x)**2 for x in xs]) * dt
153     rhs = sum([abs(X)**2 for X in Xa]) * df
154     assert abs(lhs - rhs)/lhs < 1e-4, "Mismatch on Parseval's, %g != %g" \
155         % (lhs, rhs)
156
157 def _test_unitary_rfft_parsevals_suite():
158     print "Test unitary rfft on Parseval's theorem"
159     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
160     dt = pi
161     freqs,Xs = unitary_rfft(xs, 1.0/dt)
162     _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
163
164 def _rect(t) :
165     if abs(t) < 0.5 :
166         return 1
167     else :
168         return 0
169
170 def _test_unitary_rfft_rect(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
171     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
172     samp_freq = float(samp_freq)
173     a = float(a)
174
175     x = zeros((samples,), dtype=float)
176     dt = 1.0/samp_freq
177     for i in range(samples) :
178         t = i*dt
179         x[i] = _rect(a*(t-time_shift))
180     freq_axis, X = unitary_rfft(x, samp_freq)
181     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
182
183     # remove the phase due to our time shift
184     j = complex(0.0,1.0) # sqrt(-1)
185     for i in range(len(freq_axis)) :
186         f = freq_axis[i]
187         inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
188         X[i] *= inverse_phase_shift
189
190     expected = zeros((len(freq_axis),), dtype=float)
191     # normalized sinc(x) = sin(pi x)/(pi x)
192     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
193     assert sinc(0.5) == 2.0/pi, "abnormal sinc()"
194     for i in range(len(freq_axis)) :
195         f = freq_axis[i]
196         expected[i] = 1.0/abs(a) * sinc(f/a)
197
198     if TEST_PLOTS :
199         pylab.figure()
200         pylab.subplot(211)
201         pylab.plot(arange(0, dt*samples, dt), x)
202         pylab.title('time series')
203         pylab.subplot(212)
204         pylab.plot(freq_axis, X.real, 'r.')
205         pylab.plot(freq_axis, X.imag, 'g.')
206         pylab.plot(freq_axis, expected, 'b-')
207         pylab.title('freq series')
208
209 def _test_unitary_rfft_rect_suite() :
210     print "Test unitary FFTs on variously shaped rectangular functions"
211     _test_unitary_rfft_rect(a=0.5)
212     _test_unitary_rfft_rect(a=2.0)
213     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
214     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
215
216 def _gaussian(a, t) :
217     return exp(-a * t**2)
218
219 def _test_unitary_rfft_gaussian(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
220     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
221     samp_freq = float(samp_freq)
222     a = float(a)
223
224     x = zeros((samples,), dtype=float)
225     dt = 1.0/samp_freq
226     for i in range(samples) :
227         t = i*dt
228         x[i] = _gaussian(a, (t-time_shift))
229     freq_axis, X = unitary_rfft(x, samp_freq)
230     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
231
232     # remove the phase due to our time shift
233     j = complex(0.0,1.0) # sqrt(-1)
234     for i in range(len(freq_axis)) :
235         f = freq_axis[i]
236         inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
237         X[i] *= inverse_phase_shift
238
239     expected = zeros((len(freq_axis),), dtype=float)
240     for i in range(len(freq_axis)) :
241         f = freq_axis[i]
242         expected[i] = sqrt(pi/a) * _gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself.
243
244     if TEST_PLOTS :
245         pylab.figure()
246         pylab.subplot(211)
247         pylab.plot(arange(0, dt*samples, dt), x)
248         pylab.title('time series')
249         pylab.subplot(212)
250         pylab.plot(freq_axis, X.real, 'r.')
251         pylab.plot(freq_axis, X.imag, 'g.')
252         pylab.plot(freq_axis, expected, 'b-')
253         pylab.title('freq series')
254
255 def _test_unitary_rfft_gaussian_suite() :
256     print "Test unitary FFTs on variously shaped gaussian functions"
257     _test_unitary_rfft_gaussian(a=0.5)
258     _test_unitary_rfft_gaussian(a=2.0)
259     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
260     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
261
262
263
264 def power_spectrum(data, freq=1.0) :
265     """
266     Compute the power spectrum of DATA taken with a sampling frequency FREQ.
267     DATA must be real (not complex).
268     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
269     If the number of samples in data is not an integer power of two,
270     the FFT ignores some of the later points.
271     """
272     nsamps = floor_pow_of_two(len(data))
273     
274     freq_axis = linspace(0, freq/2, nsamps/2+1)
275     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
276     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
277     # See Numerical Recipies for a details.
278     trans = rfft(data[0:nsamps])
279     power = trans * trans.conj() # We want the square of the amplitude.
280     return (freq_axis, power)
281
282 def unitary_power_spectrum(data, freq=1.0) :
283     freq_axis,power = power_spectrum(data, freq)
284     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
285     #
286     # numpy normalizes with 1/N on the inverse transform ifft,
287     # so we should normalize the freq-space representation with 1/sqrt(N).
288     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
289     # So the power gets normalized by that twice and we have 2/N
290     #
291     # On top of this, the FFT assumes a sampling freq of 1 per second,
292     # and we want to preserve area under our curves.
293     # If our total time T = len(data)/freq is smaller than 1,
294     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)), 
295     # and we need to scale the powers down to conserve area.
296     # df_fft * F_fft(f) = df_real *F_real(f)
297     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
298     # So the power gets normalized by *that* twice and we have 2/N * freq**2
299
300     # power per unit time
301     # measure x(t) for time T
302     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
303     # PSD(f) = 2 |X(f)|**2 / T
304
305     # total_time = len(data)/float(freq)
306     # power *= 2.0 / float(freq)**2   /   total_time
307     # power *= 2.0 / freq**2   *   freq / len(data)
308     power *= 2.0 / (freq * float(len(data)))
309
310     return (freq_axis, power)
311
312 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024) :
313     x = zeros((samples,), dtype=float)
314     samp_freq = float(samp_freq)
315     for i in range(samples) :
316         x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
317     freq_axis, power = unitary_power_spectrum(x, samp_freq)
318     imax = argmax(power)
319
320     expected = zeros((len(freq_axis),), dtype=float)
321     df = samp_freq/float(samples) # df = 1/T, where T = total_time
322     i = int(sin_freq/df)
323     # average power per unit time is 
324     #  P = <x(t)**2>
325     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
326     # so average value of (int sin(t)**2 dt) per unit time is 0.5
327     #  P = 0.5
328     # we spread that power over a frequency bin of width df, sp
329     #  P(f0) = 0.5/df
330     # where f0 is the sin's frequency
331     #
332     # or :
333     # FFT of sin(2*pi*t*f0) gives
334     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
335     # (area under x(t) = 0, area under X(f) = 0)
336     # so one sided power spectral density (PSD) per unit time is
337     #  P(f) = 2 |X(f)|**2 / T
338     #       = 2 * |0.5 delta(f-f0)|**2 / T
339     #       = 0.5 * |delta(f-f0)|**2 / T
340     # but we're discrete and want the integral of the 'delta' to be 1, 
341     # so 'delta'*df = 1  --> 'delta' = 1/df, and
342     #  P(f) = 0.5 / (df**2 * T)
343     #       = 0.5 / df                (T = 1/df)
344     expected[i] = 0.5 / df
345
346     print "The power should be a peak at %g Hz of %g (%g, %g)" % \
347         (sin_freq, expected[i], freq_axis[imax], power[imax])
348     Pexp = 0
349     P    = 0
350     for i in range(len(freq_axis)) :
351         Pexp += expected[i] *df
352         P    += power[i] * df
353     print " The total power should be %g (%g)" % (Pexp, P)
354
355     if TEST_PLOTS :
356         pylab.figure()
357         pylab.subplot(211)
358         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
359         pylab.title('time series')
360         pylab.subplot(212)
361         pylab.plot(freq_axis, power, 'r.')
362         pylab.plot(freq_axis, expected, 'b-')
363         pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
364
365 def _test_unitary_power_spectrum_sin_suite() :
366     print "Test unitary power spectrums on variously shaped sin functions"
367     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
368     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
369     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
370     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
371     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
372     # finally, with some irrational numbers, to check that I'm not getting lucky
373     _test_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
374     # test with non-integer number of periods
375     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
376
377 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) :
378     x = zeros((samples,), dtype=float)
379     samp_freq = float(samp_freq)
380     x[0] = amp
381     freq_axis, power = unitary_power_spectrum(x, samp_freq)
382
383     # power = <x(t)**2> = (amp)**2 * dt/T
384     # we spread that power over the entire freq_axis [0,fN], so
385     #  P(f)  = (amp)**2 dt / (T fN)
386     # where
387     #  dt = 1/samp_freq        (sample period)
388     #  T  = samples/samp_freq  (total time of data aquisition)
389     #  fN = 0.5 samp_freq      (Nyquist frequency)
390     # so
391     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
392     #       = 2 amp**2 / (samp_freq*samples)
393     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
394     expected = ones((len(freq_axis),), dtype=float) * expected_amp
395
396     print "The power should be flat at y = %g (%g)" % (expected_amp, power[0])
397     
398     if TEST_PLOTS :
399         pylab.figure()
400         pylab.subplot(211)
401         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
402         pylab.title('time series')
403         pylab.subplot(212)
404         pylab.plot(freq_axis, power, 'r.')
405         pylab.plot(freq_axis, expected, 'b-')
406         pylab.title('%g samples of delta amp %g' % (samples, amp))
407
408 def _test_unitary_power_spectrum_delta_suite() :
409     print "Test unitary power spectrums on various delta functions"
410     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
411     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
412     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
413     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
414     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
415     _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024)
416
417 def _gaussian2(area, mean, std, t) :
418     "Integral over all time = area (i.e. normalized for area=1)"
419     return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2)
420     
421 def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512) : #1024
422     x = zeros((samples,), dtype=float)
423     mean = float(mean)
424     for i in range(samples) :
425         t = i/float(samp_freq)
426         x[i] = _gaussian2(area, mean, std, t)
427     freq_axis, power = unitary_power_spectrum(x, samp_freq)
428
429     # generate the predicted curve
430     # by comparing our _gaussian2() form to _gaussian(),
431     # we see that the Fourier transform of x(t) has parameters:
432     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
433     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
434     #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
435     # So our power spectral density per unit time is given by
436     #  P(f) = 2 |X(f)|**2 / T
437     # Where
438     #  T  = samples/samp_freq  (total time of data aquisition)
439     mean = 0.0
440     area = area /(std*sqrt(2.0*pi))
441     std = 1.0/(2.0*pi*std)
442     expected = zeros((len(freq_axis),), dtype=float)
443     df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
444     for i in range(len(freq_axis)) :
445         f = i*df
446         gaus = _gaussian2(area, mean, std, f)
447         expected[i] = 2.0 * gaus**2 * samp_freq/samples
448     print "The power should be a half-gaussian, ",
449     print "with a peak at 0 Hz with amplitude %g (%g)" % (expected[0], power[0])
450
451     if TEST_PLOTS :
452         pylab.figure()
453         pylab.subplot(211)
454         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
455         pylab.title('time series')
456         pylab.subplot(212)
457         pylab.plot(freq_axis, power, 'r.')
458         pylab.plot(freq_axis, expected, 'b-')
459         pylab.title('freq series')
460
461 def _test_unitary_power_spectrum_gaussian_suite() :
462     print "Test unitary power spectrums on various gaussian functions"
463     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=1024)
464     _test_unitary_power_spectrum_gaussian(area=1, std=2, samp_freq=10.0, samples=1024)
465     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048)
466     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048)
467     _test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024)
468     _test_unitary_power_spectrum_gaussian(area=pi, std=sqrt(2), samp_freq=exp(1), samples=1024)
469
470 def window_hann(length) :
471     "Returns a Hann window array with length entries"
472     win = zeros((length,), dtype=float)
473     for i in range(length) :
474         win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length)))
475     # avg value of cos over a period is 0
476     # so average height of Hann window is 0.5
477     return win
478
479 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
480                        overlap=True, window=window_hann) :
481     """
482     Compute the avg power spectrum of DATA taken with a sampling frequency FREQ.
483     DATA must be real (not complex) by breaking DATA into chunks.
484     The chunks may or may not be overlapping (by setting OVERLAP).
485     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
486     and the resulting spectra are averaged together.
487     See NR 13.4 for rational.
488     
489     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
490     CHUNK_SIZE should really be a power of 2.
491     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
492     the FFT ignores some of the later points.
493     """
494     assert chunk_size == floor_pow_of_two(chunk_size), \
495         "chunk_size %d should be a power of 2" % chunk_size
496
497     nchunks = len(data)/chunk_size # integer division = implicit floor
498     if overlap :
499         chunk_step = chunk_size/2
500     else :
501         chunk_step = chunk_size
502     
503     win = window(chunk_size) # generate a window of the appropriate size
504     freq_axis = linspace(0, freq/2, chunk_size/2+1)
505     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
506     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
507     # See Numerical Recipies for a details.
508     power = zeros((chunk_size/2+1,), dtype=float)
509     for i in range(nchunks) :
510         starti = i*chunk_step
511         stopi = starti+chunk_size
512         fft_chunk = rfft(data[starti:stopi]*win)
513         p_chunk = fft_chunk * fft_chunk.conj() 
514         power += p_chunk.astype(float)
515     power /= float(nchunks)
516     return (freq_axis, power)
517
518 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
519                                overlap=True, window=window_hann) :
520     """
521     compute the average power spectrum, preserving normalization
522     """
523     freq_axis,power = avg_power_spectrum(data, freq, chunk_size,
524                                          overlap, window)
525     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
526     power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum()            
527     #                                       * 8/3  to remove power from windowing
528     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
529     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
530     # So our calulated power has and extra <w(t)**2> in it.
531     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
532     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
533     # The normalization is not perfect. ??
534     # The normalization approaches perfection as chunk_size -> infinity.
535     return (freq_axis, power)
536
537 def _test_unitary_avg_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024,
538                                          chunk_size=512, overlap=True,
539                                          window=window_hann) :
540     x = zeros((samples,), dtype=float)
541     samp_freq = float(samp_freq)
542     for i in range(samples) :
543         x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
544     freq_axis, power = unitary_avg_power_spectrum(x, samp_freq, chunk_size,
545                                                   overlap, window)
546     imax = argmax(power)
547
548     expected = zeros((len(freq_axis),), dtype=float)
549     df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time
550     i = int(sin_freq/df)
551     expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
552
553     print "The power should be a peak at %g Hz of %g (%g, %g)" % \
554         (sin_freq, expected[i], freq_axis[imax], power[imax])
555     Pexp = 0
556     P    = 0
557     for i in range(len(freq_axis)) :
558         Pexp += expected[i] * df
559         P    += power[i] * df
560     print " The total power should be %g (%g)" % (Pexp, P)
561
562     if TEST_PLOTS :
563         pylab.figure()
564         pylab.subplot(211)
565         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
566         pylab.title('time series')
567         pylab.subplot(212)
568         pylab.plot(freq_axis, power, 'r.')
569         pylab.plot(freq_axis, expected, 'b-')
570         pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
571
572 def _test_unitary_avg_power_spectrum_sin_suite() :
573     print "Test unitary avg power spectrums on variously shaped sin functions"
574     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
575     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
576     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
577     _test_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
578     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
579     # test long wavelenth sin, so be closer to window frequency
580     _test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
581     # finally, with some irrational numbers, to check that I'm not getting lucky
582     _test_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
583
584
585 def test() :
586     _test_rfft_suite()
587     _test_unitary_rfft_parsevals_suite()
588     _test_unitary_rfft_rect_suite()
589     _test_unitary_rfft_gaussian_suite()
590     _test_unitary_power_spectrum_sin_suite()
591     _test_unitary_power_spectrum_delta_suite()
592     _test_unitary_power_spectrum_gaussian_suite()
593     _test_unitary_avg_power_spectrum_sin_suite()
594
595 if __name__ == "__main__" :
596     if TEST_PLOTS :
597         import pylab
598     test()
599     if TEST_PLOTS :
600         pylab.show()