FFT_tools: wrap long function calls for PEP8 compliance
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 numpy.fft.rfft.
19
20 Create some fake data:
21
22 >>> import numpy
23 >>> data = numpy.random.rand(10)
24 >>> frequency = 1
25
26 Main entry functions:
27
28 >>> rfft = unitary_rfft(data, freq=frequency)
29 >>> psd = power_spectrum(data, freq=frequency)
30 >>> upsd = unitary_power_spectrum(data, freq=frequency)
31 >>> apsd = avg_power_spectrum(data, freq=frequency, chunk_size=2048,
32 ...     overlap=True, window=window_hann)
33 >>> aupsd = unitary_avg_power_spectrum(data, freq=frequency, chunk_size=2048,
34 ...     overlap=True, window=window_hann)
35 """
36
37 import numpy as _numpy
38
39
40 __version__ = '0.3'
41
42 # Display time- and freq-space plots of the test transforms if True
43 TEST_PLOTS = False
44
45
46 def floor_pow_of_two(num):
47     "Round num down to the closest exact a power of two."
48     lnum = _numpy.log2(num)
49     if int(lnum) != lnum:
50         num = 2**_numpy.floor(lnum)
51     return num
52
53 def round_pow_of_two(num):
54     "Round num to the closest exact a power of two on a log scale."
55     lnum = _numpy.log2(num)
56     if int(lnum) != lnum:
57         num = 2**_numpy.round(lnum)
58     return num
59
60 def ceil_pow_of_two(num):
61     "Round num up to the closest exact a power of two."
62     lnum = _numpy.log2(num)
63     if int(lnum) != lnum:
64         num = 2**_numpy.ceil(lnum)
65     return num
66
67 def _test_rfft(xs, Xs):
68     # Numpy's FFT algoritm returns
69     #          n-1
70     #   X[k] = SUM x[m] exp (-j 2pi km /n)
71     #          m=0
72     # (see http://www.tramy.us/numpybook.pdf)
73     j = _numpy.complex(0,1)
74     n = len(xs)
75     Xa = []
76     for k in range(n):
77         Xa.append(sum([x*_numpy.exp(-j*2*_numpy.pi*k*m/n)
78                        for x,m in zip(xs,range(n))]))
79         if k < len(Xs):
80             if (Xs[k]-Xa[k])/_numpy.abs(Xa[k]) >= 1e-6:
81                 raise ValueError(
82                     ('rfft mismatch on element {}: {} != {}, relative error {}'
83                      ).format(
84                         k, Xs[k], Xa[k], (Xs[k]-Xa[k])/_numpy.abs(Xa[k])))
85     # Which should satisfy the discrete form of Parseval's theorem
86     #   n-1               n-1
87     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
88     #   m=0               k=0
89     timeSum = sum([_numpy.abs(x)**2 for x in xs])
90     freqSum = sum([_numpy.abs(X)**2 for X in Xa])
91     if _numpy.abs(freqSum/_numpy.float(n) - timeSum)/timeSum >= 1e-6:
92         raise ValueError(
93             "Mismatch on Parseval's, {} != 1/{} * {}".format(
94                 timeSum, n, freqSum))
95
96 def _test_rfft_suite():
97     print('Test numpy rfft definition')
98     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
99     _test_rfft(xs, _numpy.fft.rfft(xs))
100
101 def unitary_rfft(data, freq=1.0):
102     """Compute the Fourier transform of real data.
103
104     Unitary (preserves power [Parseval's theorem]).
105
106     If the units on your data are Volts,
107     and your sampling frequency is in Hz,
108     then freq_axis will be in Hz,
109     and trans will be in Volts.
110     """
111     nsamps = floor_pow_of_two(len(data))
112     # Which should satisfy the discrete form of Parseval's theorem
113     #   n-1               n-1
114     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
115     #   m=0               k=0
116     # However, we want our FFT to satisfy the continuous Parseval eqn
117     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
118     # which has the discrete form
119     #   n-1              n-1
120     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
121     #   m=0              k=0
122     # with X'_k = AX, this gives us
123     #   n-1                     n-1
124     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
125     #   m=0                     k=0
126     # so we see
127     #   A^2 df/dt = 1/n
128     #   A^2 = 1/n dt/df
129     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
130     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
131     # f_c in the transformed data is the Nyquist frequency (12.1.2)
132     #   f_c = 1/2dt
133     # and the points are spaced out by (12.1.5)
134     #   df = 1/ndt
135     # so
136     #   dt = 1/ndf
137     #   dt/df = 1/ndf^2
138     #   A^2 = 1/n^2df^2
139     #   A = 1/ndf = ndt/n = dt
140     # so we can convert the Numpy transformed data to match our unitary
141     # continuous transformed data with (also NR 12.1.8)
142     #   X'_k = dtX = X / <sampling freq>
143     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
144     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
145     return (freq_axis, trans)
146
147 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
148     # Which should satisfy the discretized integral form of Parseval's theorem
149     #   n-1              n-1
150     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
151     #   m=0              k=0
152     dt = 1.0/freq
153     df = freqs[1]-freqs[0]
154     if df - 1/(len(xs)*dt)/df >= 1e-6:
155         raise ValueError(
156             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
157     Xa = list(Xs)
158     for k in range(len(Xs)-1,1,-1):
159         Xa.append(Xa[k])
160     if len(xs) != len(Xa):
161         raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa)))
162     lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
163     rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
164     if _numpy.abs(lhs - rhs)/lhs >= 1e-4:
165         raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs))
166
167 def _test_unitary_rfft_parsevals_suite():
168     print("Test unitary rfft on Parseval's theorem")
169     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
170     dt = _numpy.pi
171     freqs,Xs = unitary_rfft(xs, 1.0/dt)
172     _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
173
174 def _rect(t):
175     if _numpy.abs(t) < 0.5:
176         return 1
177     else:
178         return 0
179
180 def _test_unitary_rfft_rect(
181     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
182     "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)"
183     samp_freq = _numpy.float(samp_freq)
184     a = _numpy.float(a)
185
186     x = _numpy.zeros((samples,), dtype=_numpy.float)
187     dt = 1.0/samp_freq
188     for i in range(samples):
189         t = i*dt
190         x[i] = _rect(a*(t-time_shift))
191     freq_axis, X = unitary_rfft(x, samp_freq)
192     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
193
194     # remove the phase due to our time shift
195     j = _numpy.complex(0.0,1.0) # sqrt(-1)
196     for i in range(len(freq_axis)):
197         f = freq_axis[i]
198         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
199         X[i] *= inverse_phase_shift
200
201     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
202     # normalized sinc(x) = sin(pi x)/(pi x)
203     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
204     if _numpy.sinc(0.5) != 2.0/_numpy.pi:
205         raise ValueError('abnormal sinc()')
206     for i in range(len(freq_axis)):
207         f = freq_axis[i]
208         expected[i] = 1.0/_numpy.abs(a) * _numpy.sinc(f/a)
209
210     if TEST_PLOTS:
211         figure = _pyplot.figure()
212         time_axes = figure.add_subplot(2, 1, 1)
213         time_axes.plot(_numpy.arange(0, dt*samples, dt), x)
214         time_axes.set_title('time series')
215         freq_axes = figure.add_subplot(2, 1, 2)
216         freq_axes.plot(freq_axis, X.real, 'r.')
217         freq_axes.plot(freq_axis, X.imag, 'g.')
218         freq_axes.plot(freq_axis, expected, 'b-')
219         freq_axes.set_title('freq series')
220
221 def _test_unitary_rfft_rect_suite():
222     print('Test unitary FFTs on variously shaped rectangular functions')
223     _test_unitary_rfft_rect(a=0.5)
224     _test_unitary_rfft_rect(a=2.0)
225     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
226     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
227
228 def _gaussian(a, t):
229     return _numpy.exp(-a * t**2)
230
231 def _test_unitary_rfft_gaussian(
232     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
233     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
234     samp_freq = _numpy.float(samp_freq)
235     a = _numpy.float(a)
236
237     x = _numpy.zeros((samples,), dtype=_numpy.float)
238     dt = 1.0/samp_freq
239     for i in range(samples):
240         t = i*dt
241         x[i] = _gaussian(a, (t-time_shift))
242     freq_axis, X = unitary_rfft(x, samp_freq)
243     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
244
245     # remove the phase due to our time shift
246     j = _numpy.complex(0.0,1.0) # sqrt(-1)
247     for i in range(len(freq_axis)):
248         f = freq_axis[i]
249         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
250         X[i] *= inverse_phase_shift
251
252     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
253     for i in range(len(freq_axis)):
254         f = freq_axis[i]
255         # see Wikipedia, or do the integral yourself.
256         expected[i] = _numpy.sqrt(_numpy.pi/a) * _gaussian(
257             1.0/a, _numpy.pi*f)
258
259     if TEST_PLOTS:
260         figure = _pyplot.figure()
261         time_axes = figure.add_subplot(2, 1, 1)
262         time_axes.plot(_numpy.arange(0, dt*samples, dt), x)
263         time_axes.set_title('time series')
264         freq_axes = figure.add_subplot(2, 1, 2)
265         freq_axes.plot(freq_axis, X.real, 'r.')
266         freq_axes.plot(freq_axis, X.imag, 'g.')
267         freq_axes.plot(freq_axis, expected, 'b-')
268         freq_axes.set_title('freq series')
269
270 def _test_unitary_rfft_gaussian_suite():
271     print("Test unitary FFTs on variously shaped gaussian functions")
272     _test_unitary_rfft_gaussian(a=0.5)
273     _test_unitary_rfft_gaussian(a=2.0)
274     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
275     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
276
277
278
279 def power_spectrum(data, freq=1.0):
280     """Compute the power spectrum of DATA taken with a sampling frequency FREQ.
281
282     DATA must be real (not complex).
283     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
284     If the number of samples in data is not an integer power of two,
285     the FFT ignores some of the later points.
286     """
287     nsamps = floor_pow_of_two(len(data))
288
289     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
290     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
291     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
292     # See Numerical Recipies for a details.
293     trans = _numpy.fft.rfft(data[0:nsamps])
294     power = (trans * trans.conj()).real # We want the square of the amplitude.
295     return (freq_axis, power)
296
297 def unitary_power_spectrum(data, freq=1.0):
298     freq_axis,power = power_spectrum(data, freq)
299     # One sided power spectral density, so 2|H(f)|**2
300     # (see NR 2nd edition 12.0.14, p498)
301     #
302     # numpy normalizes with 1/N on the inverse transform ifft,
303     # so we should normalize the freq-space representation with 1/sqrt(N).
304     # But we're using the rfft, where N points are like N/2 complex points,
305     # so 1/sqrt(N/2)
306     # So the power gets normalized by that twice and we have 2/N
307     #
308     # On top of this, the FFT assumes a sampling freq of 1 per second,
309     # and we want to preserve area under our curves.
310     # If our total time T = len(data)/freq is smaller than 1,
311     # our df_real = freq/len(data) is bigger that the FFT expects
312     # (dt_fft = 1/len(data)),
313     # and we need to scale the powers down to conserve area.
314     # df_fft * F_fft(f) = df_real *F_real(f)
315     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
316     # So the power gets normalized by *that* twice and we have 2/N * freq**2
317
318     # power per unit time
319     # measure x(t) for time T
320     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
321     # PSD(f) = 2 |X(f)|**2 / T
322
323     # total_time = len(data)/float(freq)
324     # power *= 2.0 / float(freq)**2   /   total_time
325     # power *= 2.0 / freq**2   *   freq / len(data)
326     power *= 2.0 / (freq * _numpy.float(len(data)))
327
328     return (freq_axis, power)
329
330 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024):
331     x = _numpy.zeros((samples,), dtype=_numpy.float)
332     samp_freq = _numpy.float(samp_freq)
333     for i in range(samples):
334         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
335     freq_axis, power = unitary_power_spectrum(x, samp_freq)
336     imax = _numpy.argmax(power)
337
338     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
339     df = samp_freq/_numpy.float(samples) # df = 1/T, where T = total_time
340     i = int(sin_freq/df)
341     # average power per unit time is
342     #  P = <x(t)**2>
343     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
344     # so average value of (int sin(t)**2 dt) per unit time is 0.5
345     #  P = 0.5
346     # we spread that power over a frequency bin of width df, sp
347     #  P(f0) = 0.5/df
348     # where f0 is the sin's frequency
349     #
350     # or:
351     # FFT of sin(2*pi*t*f0) gives
352     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
353     # (area under x(t) = 0, area under X(f) = 0)
354     # so one sided power spectral density (PSD) per unit time is
355     #  P(f) = 2 |X(f)|**2 / T
356     #       = 2 * |0.5 delta(f-f0)|**2 / T
357     #       = 0.5 * |delta(f-f0)|**2 / T
358     # but we're discrete and want the integral of the 'delta' to be 1,
359     # so 'delta'*df = 1  --> 'delta' = 1/df, and
360     #  P(f) = 0.5 / (df**2 * T)
361     #       = 0.5 / df                (T = 1/df)
362     expected[i] = 0.5 / df
363
364     print('The power should be a peak at {} Hz of {} ({}, {})'.format(
365             sin_freq, expected[i], freq_axis[imax], power[imax]))
366     Pexp = 0
367     P    = 0
368     for i in range(len(freq_axis)):
369         Pexp += expected[i] *df
370         P    += power[i] * df
371     print('The total power should be {} ({})'.format(Pexp, P))
372
373     if TEST_PLOTS:
374         figure = _pyplot.figure()
375         time_axes = figure.add_subplot(2, 1, 1)
376         time_axes.plot(
377             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
378         time_axes.set_title('time series')
379         freq_axes = figure.add_subplot(2, 1, 2)
380         freq_axes.plot(freq_axis, power, 'r.')
381         freq_axes.plot(freq_axis, expected, 'b-')
382         freq_axes.set_title(
383             '{} samples of sin at {} Hz'.format(samples, sin_freq))
384
385 def _test_unitary_power_spectrum_sin_suite():
386     print('Test unitary power spectrums on variously shaped sin functions')
387     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
388     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
389     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
390     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
391     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
392     # with some irrational numbers, to check that I'm not getting lucky
393     _test_unitary_power_spectrum_sin(
394         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
395     # test with non-integer number of periods
396     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
397
398 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256):
399     x = _numpy.zeros((samples,), dtype=_numpy.float)
400     samp_freq = _numpy.float(samp_freq)
401     x[0] = amp
402     freq_axis, power = unitary_power_spectrum(x, samp_freq)
403
404     # power = <x(t)**2> = (amp)**2 * dt/T
405     # we spread that power over the entire freq_axis [0,fN], so
406     #  P(f)  = (amp)**2 dt / (T fN)
407     # where
408     #  dt = 1/samp_freq        (sample period)
409     #  T  = samples/samp_freq  (total time of data aquisition)
410     #  fN = 0.5 samp_freq      (Nyquist frequency)
411     # so
412     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
413     #       = 2 amp**2 / (samp_freq*samples)
414     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
415     expected = _numpy.ones(
416         (len(freq_axis),), dtype=_numpy.float) * expected_amp
417
418     print('The power should be flat at y = {} ({})'.format(
419         expected_amp, power[0]))
420
421     if TEST_PLOTS:
422         figure = _pyplot.figure()
423         time_axes = figure.add_subplot(2, 1, 1)
424         time_axes.plot(
425             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
426         time_axes.set_title('time series')
427         freq_axes = figure.add_subplot(2, 1, 2)
428         freq_axes.plot(freq_axis, power, 'r.')
429         freq_axes.plot(freq_axis, expected, 'b-')
430         freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
431
432 def _test_unitary_power_spectrum_delta_suite():
433     print('Test unitary power spectrums on various delta functions')
434     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
435     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
436     # expected = 2*computed
437     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)
438     # expected = 0.5*computed
439     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)
440     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
441     _test_unitary_power_spectrum_delta(
442         amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
443
444 def _gaussian2(area, mean, std, t):
445     "Integral over all time = area (i.e. normalized for area=1)"
446     return area/(std*_numpy.sqrt(2.0*_numpy.pi)) * _numpy.exp(
447         -0.5*((t-mean)/std)**2)
448
449 def _test_unitary_power_spectrum_gaussian(
450     area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512):
451     x = _numpy.zeros((samples,), dtype=_numpy.float)
452     mean = _numpy.float(mean)
453     for i in range(samples):
454         t = i/_numpy.float(samp_freq)
455         x[i] = _gaussian2(area, mean, std, t)
456     freq_axis, power = unitary_power_spectrum(x, samp_freq)
457
458     # generate the predicted curve
459     # by comparing our _gaussian2() form to _gaussian(),
460     # we see that the Fourier transform of x(t) has parameters:
461     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are
462     #                           converting to angular frequency,
463     #                           not frequency like we are)
464     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
465     #  mean' = 0               (changing the mean in the time-domain just
466     #                           changes the phase in the freq-domain)
467     # So our power spectral density per unit time is given by
468     #  P(f) = 2 |X(f)|**2 / T
469     # Where
470     #  T  = samples/samp_freq  (total time of data aquisition)
471     mean = 0.0
472     area = area /(std*_numpy.sqrt(2.0*_numpy.pi))
473     std = 1.0/(2.0*_numpy.pi*std)
474     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
475     # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
476     df = _numpy.float(samp_freq)/samples
477     for i in range(len(freq_axis)):
478         f = i*df
479         gaus = _gaussian2(area, mean, std, f)
480         expected[i] = 2.0 * gaus**2 * samp_freq/samples
481     print(('The power should be a half-gaussian, '
482            'with a peak at 0 Hz with amplitude {} ({})').format(
483             expected[0], power[0]))
484
485     if TEST_PLOTS:
486         figure = _pyplot.figure()
487         time_axes = figure.add_subplot(2, 1, 1)
488         time_axes.plot(
489             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
490         time_axes.set_title('time series')
491         freq_axes = figure.add_subplot(2, 1, 2)
492         freq_axes.plot(freq_axis, power, 'r.')
493         freq_axes.plot(freq_axis, expected, 'b-')
494         freq_axes.set_title('freq series')
495
496 def _test_unitary_power_spectrum_gaussian_suite():
497     print('Test unitary power spectrums on various gaussian functions')
498     _test_unitary_power_spectrum_gaussian(
499         area=1, std=1, samp_freq=10.0, samples=1024)
500     _test_unitary_power_spectrum_gaussian(
501         area=1, std=2, samp_freq=10.0, samples=1024)
502     _test_unitary_power_spectrum_gaussian(
503         area=1, std=1, samp_freq=10.0, samples=2048)
504     _test_unitary_power_spectrum_gaussian(
505         area=1, std=1, samp_freq=20.0, samples=2048)
506     _test_unitary_power_spectrum_gaussian(
507         area=3, std=1, samp_freq=10.0, samples=1024)
508     _test_unitary_power_spectrum_gaussian(
509         area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1),
510         samples=1024)
511
512 def window_hann(length):
513     "Returns a Hann window array with length entries"
514     win = _numpy.zeros((length,), dtype=_numpy.float)
515     for i in range(length):
516         win[i] = 0.5*(1.0-_numpy.cos(2.0*_numpy.pi*_numpy.float(i)/(length)))
517     # avg value of cos over a period is 0
518     # so average height of Hann window is 0.5
519     return win
520
521 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
522                        overlap=True, window=window_hann):
523     """Compute the avgerage power spectrum of DATA.
524
525     Taken with a sampling frequency FREQ.
526
527     DATA must be real (not complex) by breaking DATA into chunks.
528     The chunks may or may not be overlapping (by setting OVERLAP).
529     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
530     and the resulting spectra are averaged together.
531     See NR 13.4 for rational.
532
533     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
534     CHUNK_SIZE should really be a power of 2.
535     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
536     the FFT ignores some of the later points.
537     """
538     if chunk_size != floor_pow_of_two(chunk_size):
539         raise ValueError(
540             'chunk_size {} should be a power of 2'.format(chunk_size))
541
542     nchunks = len(data)/chunk_size # integer division = implicit floor
543     if overlap:
544         chunk_step = chunk_size/2
545     else:
546         chunk_step = chunk_size
547
548     win = window(chunk_size) # generate a window of the appropriate size
549     freq_axis = _numpy.linspace(0, freq/2, chunk_size/2+1)
550     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
551     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
552     # See Numerical Recipies for a details.
553     power = _numpy.zeros((chunk_size/2+1,), dtype=_numpy.float)
554     for i in range(nchunks):
555         starti = i*chunk_step
556         stopi = starti+chunk_size
557         fft_chunk = _numpy.fft.rfft(data[starti:stopi]*win)
558         p_chunk = (fft_chunk * fft_chunk.conj()).real
559         power += p_chunk.astype(_numpy.float)
560     power /= _numpy.float(nchunks)
561     return (freq_axis, power)
562
563 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
564                                overlap=True, window=window_hann):
565     """Compute the average power spectrum, preserving normalization
566     """
567     freq_axis,power = avg_power_spectrum(
568         data, freq, chunk_size, overlap, window)
569     #   2.0 / (freq * chunk_size)       |rfft()|**2 --> unitary_power_spectrum
570     # see unitary_power_spectrum()
571     power *= 2.0 / (freq*_numpy.float(chunk_size)) * 8/3
572     #             * 8/3  to remove power from windowing
573     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
574     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
575     # So our calulated power has and extra <w(t)**2> in it.
576     # For the Hann window,
577     #   <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
578     # For low frequency components,
579     # where the frequency of x(t) is ~= the frequency of w(t),
580     # the normalization is not perfect. ??
581     # The normalization approaches perfection as chunk_size -> infinity.
582     return (freq_axis, power)
583
584 def _test_unitary_avg_power_spectrum_sin(
585     sin_freq=10, samp_freq=512, samples=1024, chunk_size=512, overlap=True,
586     window=window_hann):
587     x = _numpy.zeros((samples,), dtype=_numpy.float)
588     samp_freq = _numpy.float(samp_freq)
589     for i in range(samples):
590         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
591     freq_axis, power = unitary_avg_power_spectrum(
592         x, samp_freq, chunk_size, overlap, window)
593     imax = _numpy.argmax(power)
594
595     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
596     df = samp_freq/_numpy.float(chunk_size) # df = 1/T, where T = total_time
597     i = int(sin_freq/df)
598     expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
599
600     print('The power should peak at {} Hz of {} ({}, {})'.format(
601         sin_freq, expected[i], freq_axis[imax], power[imax]))
602     Pexp = 0
603     P    = 0
604     for i in range(len(freq_axis)):
605         Pexp += expected[i] * df
606         P    += power[i] * df
607     print('The total power should be {} ({})'.format(Pexp, P))
608
609     if TEST_PLOTS:
610         figure = _pyplot.figure()
611         time_axes = figure.add_subplot(2, 1, 1)
612         time_axes.plot(
613             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
614         time_axes.set_title('time series')
615         freq_axes = figure.add_subplot(2, 1, 2)
616         freq_axes.plot(freq_axis, power, 'r.')
617         freq_axes.plot(freq_axis, expected, 'b-')
618         freq_axes.set_title(
619             '{} samples of sin at {} Hz'.format(samples, sin_freq))
620
621 def _test_unitary_avg_power_spectrum_sin_suite():
622     print('Test unitary avg power spectrums on variously shaped sin functions')
623     _test_unitary_avg_power_spectrum_sin(
624         sin_freq=5, samp_freq=512, samples=1024)
625     _test_unitary_avg_power_spectrum_sin(
626         sin_freq=5, samp_freq=512, samples=2048)
627     _test_unitary_avg_power_spectrum_sin(
628         sin_freq=5, samp_freq=512, samples=4098)
629     _test_unitary_avg_power_spectrum_sin(
630         sin_freq=17, samp_freq=512, samples=1024)
631     _test_unitary_avg_power_spectrum_sin(
632         sin_freq=5, samp_freq=1024, samples=2048)
633     # test long wavelenth sin, so be closer to window frequency
634     _test_unitary_avg_power_spectrum_sin(
635         sin_freq=1, samp_freq=1024, samples=2048)
636     # finally, with some irrational numbers, to check that I'm not
637     # getting lucky
638     _test_unitary_avg_power_spectrum_sin(
639         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
640
641
642 def test():
643     _test_rfft_suite()
644     _test_unitary_rfft_parsevals_suite()
645     _test_unitary_rfft_rect_suite()
646     _test_unitary_rfft_gaussian_suite()
647     _test_unitary_power_spectrum_sin_suite()
648     _test_unitary_power_spectrum_delta_suite()
649     _test_unitary_power_spectrum_gaussian_suite()
650     _test_unitary_avg_power_spectrum_sin_suite()
651
652 if __name__ == '__main__':
653     from optparse import OptionParser
654
655     p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.')
656     p.add_option('-p', '--plot', dest='plot', action='store_true',
657                  help='Display time- and freq-space plots of test transforms.')
658
659     options,args = p.parse_args()
660
661     if options.plot:
662         import matplotlib.pyplot as _pyplot
663         TEST_PLOTS = True
664     test()
665     if options.plot:
666         _pyplot.show()