FFT_tools: add spaces around operators (PEP8) except **
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 numpy.fft.rfft.
19
20 Create some fake data:
21
22 >>> import numpy
23 >>> data = numpy.random.rand(10)
24 >>> frequency = 1
25
26 Main entry functions:
27
28 >>> rfft = unitary_rfft(data, freq=frequency)
29 >>> psd = power_spectrum(data, freq=frequency)
30 >>> upsd = unitary_power_spectrum(data, freq=frequency)
31 >>> apsd = avg_power_spectrum(data, freq=frequency, chunk_size=2048,
32 ...     overlap=True, window=window_hann)
33 >>> aupsd = unitary_avg_power_spectrum(data, freq=frequency, chunk_size=2048,
34 ...     overlap=True, window=window_hann)
35 """
36
37 import numpy as _numpy
38
39
40 __version__ = '0.3'
41
42 # Display time- and freq-space plots of the test transforms if True
43 TEST_PLOTS = False
44
45
46 def floor_pow_of_two(num):
47     "Round num down to the closest exact a power of two."
48     lnum = _numpy.log2(num)
49     if int(lnum) != lnum:
50         num = 2**_numpy.floor(lnum)
51     return num
52
53
54 def round_pow_of_two(num):
55     "Round num to the closest exact a power of two on a log scale."
56     lnum = _numpy.log2(num)
57     if int(lnum) != lnum:
58         num = 2**_numpy.round(lnum)
59     return num
60
61
62 def ceil_pow_of_two(num):
63     "Round num up to the closest exact a power of two."
64     lnum = _numpy.log2(num)
65     if int(lnum) != lnum:
66         num = 2**_numpy.ceil(lnum)
67     return num
68
69
70 def _test_rfft(xs, Xs):
71     # Numpy's FFT algoritm returns
72     #          n-1
73     #   X[k] = SUM x[m] exp (-j 2pi km /n)
74     #          m=0
75     # (see http://www.tramy.us/numpybook.pdf)
76     j = _numpy.complex(0,1)
77     n = len(xs)
78     Xa = []
79     for k in range(n):
80         Xa.append(sum([x * _numpy.exp(-j * 2 * _numpy.pi * k * m / n)
81                        for x,m in zip(xs,range(n))]))
82         if k < len(Xs):
83             if (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]) >= 1e-6:
84                 raise ValueError(
85                     ('rfft mismatch on element {}: {} != {}, relative error {}'
86                      ).format(
87                         k, Xs[k], Xa[k], (Xs[k] - Xa[k]) / _numpy.abs(Xa[k])))
88     # Which should satisfy the discrete form of Parseval's theorem
89     #   n-1               n-1
90     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
91     #   m=0               k=0
92     timeSum = sum([_numpy.abs(x)**2 for x in xs])
93     freqSum = sum([_numpy.abs(X)**2 for X in Xa])
94     if _numpy.abs(freqSum / _numpy.float(n) - timeSum) / timeSum >= 1e-6:
95         raise ValueError(
96             "Mismatch on Parseval's, {} != 1/{} * {}".format(
97                 timeSum, n, freqSum))
98
99
100 def _test_rfft_suite():
101     print('Test numpy rfft definition')
102     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
103     _test_rfft(xs, _numpy.fft.rfft(xs))
104
105
106 def unitary_rfft(data, freq=1.0):
107     """Compute the Fourier transform of real data.
108
109     Unitary (preserves power [Parseval's theorem]).
110
111     If the units on your data are Volts,
112     and your sampling frequency is in Hz,
113     then freq_axis will be in Hz,
114     and trans will be in Volts.
115     """
116     nsamps = floor_pow_of_two(len(data))
117     # Which should satisfy the discrete form of Parseval's theorem
118     #   n-1               n-1
119     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
120     #   m=0               k=0
121     # However, we want our FFT to satisfy the continuous Parseval eqn
122     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
123     # which has the discrete form
124     #   n-1              n-1
125     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
126     #   m=0              k=0
127     # with X'_k = AX, this gives us
128     #   n-1                     n-1
129     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
130     #   m=0                     k=0
131     # so we see
132     #   A^2 df/dt = 1/n
133     #   A^2 = 1/n dt/df
134     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
135     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
136     # f_c in the transformed data is the Nyquist frequency (12.1.2)
137     #   f_c = 1/2dt
138     # and the points are spaced out by (12.1.5)
139     #   df = 1/ndt
140     # so
141     #   dt = 1/ndf
142     #   dt/df = 1/ndf^2
143     #   A^2 = 1/n^2df^2
144     #   A = 1/ndf = ndt/n = dt
145     # so we can convert the Numpy transformed data to match our unitary
146     # continuous transformed data with (also NR 12.1.8)
147     #   X'_k = dtX = X / <sampling freq>
148     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
149     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
150     return (freq_axis, trans)
151
152
153 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
154     # Which should satisfy the discretized integral form of Parseval's theorem
155     #   n-1              n-1
156     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
157     #   m=0              k=0
158     dt = 1.0 / freq
159     df = freqs[1] - freqs[0]
160     if df - 1 / (len(xs) * dt * df) >= 1e-6:
161         raise ValueError(
162             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
163     Xa = list(Xs)
164     for k in range(len(Xs) - 1, 1, -1):
165         Xa.append(Xa[k])
166     if len(xs) != len(Xa):
167         raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa)))
168     lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
169     rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
170     if _numpy.abs(lhs - rhs) / lhs >= 1e-4:
171         raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs))
172
173
174 def _test_unitary_rfft_parsevals_suite():
175     print("Test unitary rfft on Parseval's theorem")
176     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
177     dt = _numpy.pi
178     freqs,Xs = unitary_rfft(xs, 1.0 / dt)
179     _test_unitary_rfft_parsevals(xs, 1.0 / dt, freqs, Xs)
180
181
182 def _rect(t):
183     if _numpy.abs(t) < 0.5:
184         return 1
185     else:
186         return 0
187
188
189 def _test_unitary_rfft_rect(
190     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
191     "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)"
192     samp_freq = _numpy.float(samp_freq)
193     a = _numpy.float(a)
194
195     x = _numpy.zeros((samples,), dtype=_numpy.float)
196     dt = 1.0 / samp_freq
197     for i in range(samples):
198         t = i * dt
199         x[i] = _rect(a * (t - time_shift))
200     freq_axis, X = unitary_rfft(x, samp_freq)
201     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
202
203     # remove the phase due to our time shift
204     j = _numpy.complex(0.0,1.0)  # sqrt(-1)
205     for i in range(len(freq_axis)):
206         f = freq_axis[i]
207         inverse_phase_shift = _numpy.exp(j * 2.0 * _numpy.pi * time_shift * f)
208         X[i] *= inverse_phase_shift
209
210     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
211     # normalized sinc(x) = sin(pi x)/(pi x)
212     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
213     if _numpy.sinc(0.5) != 2.0 / _numpy.pi:
214         raise ValueError('abnormal sinc()')
215     for i in range(len(freq_axis)):
216         f = freq_axis[i]
217         expected[i] = 1.0 / _numpy.abs(a) * _numpy.sinc(f / a)
218
219     if TEST_PLOTS:
220         figure = _pyplot.figure()
221         time_axes = figure.add_subplot(2, 1, 1)
222         time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
223         time_axes.set_title('time series')
224         freq_axes = figure.add_subplot(2, 1, 2)
225         freq_axes.plot(freq_axis, X.real, 'r.')
226         freq_axes.plot(freq_axis, X.imag, 'g.')
227         freq_axes.plot(freq_axis, expected, 'b-')
228         freq_axes.set_title('freq series')
229
230
231 def _test_unitary_rfft_rect_suite():
232     print('Test unitary FFTs on variously shaped rectangular functions')
233     _test_unitary_rfft_rect(a=0.5)
234     _test_unitary_rfft_rect(a=2.0)
235     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
236     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
237
238
239 def _gaussian(a, t):
240     return _numpy.exp(-a * t**2)
241
242
243 def _test_unitary_rfft_gaussian(
244     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
245     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
246     samp_freq = _numpy.float(samp_freq)
247     a = _numpy.float(a)
248
249     x = _numpy.zeros((samples,), dtype=_numpy.float)
250     dt = 1.0 / samp_freq
251     for i in range(samples):
252         t = i * dt
253         x[i] = _gaussian(a, (t - time_shift))
254     freq_axis, X = unitary_rfft(x, samp_freq)
255     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
256
257     # remove the phase due to our time shift
258     j = _numpy.complex(0.0,1.0)  # sqrt(-1)
259     for i in range(len(freq_axis)):
260         f = freq_axis[i]
261         inverse_phase_shift = _numpy.exp(j * 2.0 * _numpy.pi * time_shift * f)
262         X[i] *= inverse_phase_shift
263
264     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
265     for i in range(len(freq_axis)):
266         f = freq_axis[i]
267         # see Wikipedia, or do the integral yourself.
268         expected[i] = _numpy.sqrt(_numpy.pi / a) * _gaussian(
269             1.0 / a, _numpy.pi * f)
270
271     if TEST_PLOTS:
272         figure = _pyplot.figure()
273         time_axes = figure.add_subplot(2, 1, 1)
274         time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
275         time_axes.set_title('time series')
276         freq_axes = figure.add_subplot(2, 1, 2)
277         freq_axes.plot(freq_axis, X.real, 'r.')
278         freq_axes.plot(freq_axis, X.imag, 'g.')
279         freq_axes.plot(freq_axis, expected, 'b-')
280         freq_axes.set_title('freq series')
281
282
283 def _test_unitary_rfft_gaussian_suite():
284     print("Test unitary FFTs on variously shaped gaussian functions")
285     _test_unitary_rfft_gaussian(a=0.5)
286     _test_unitary_rfft_gaussian(a=2.0)
287     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
288     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
289
290
291 def power_spectrum(data, freq=1.0):
292     """Compute the power spectrum of DATA taken with a sampling frequency FREQ.
293
294     DATA must be real (not complex).
295     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
296     If the number of samples in data is not an integer power of two,
297     the FFT ignores some of the later points.
298     """
299     nsamps = floor_pow_of_two(len(data))
300
301     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
302     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
303     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
304     # See Numerical Recipies for a details.
305     trans = _numpy.fft.rfft(data[0:nsamps])
306     power = (trans * trans.conj()).real  # we want the square of the amplitude
307     return (freq_axis, power)
308
309
310 def unitary_power_spectrum(data, freq=1.0):
311     freq_axis,power = power_spectrum(data, freq)
312     # One sided power spectral density, so 2|H(f)|**2
313     # (see NR 2nd edition 12.0.14, p498)
314     #
315     # numpy normalizes with 1/N on the inverse transform ifft,
316     # so we should normalize the freq-space representation with 1/sqrt(N).
317     # But we're using the rfft, where N points are like N/2 complex points,
318     # so 1/sqrt(N/2)
319     # So the power gets normalized by that twice and we have 2/N
320     #
321     # On top of this, the FFT assumes a sampling freq of 1 per second,
322     # and we want to preserve area under our curves.
323     # If our total time T = len(data)/freq is smaller than 1,
324     # our df_real = freq/len(data) is bigger that the FFT expects
325     # (dt_fft = 1/len(data)),
326     # and we need to scale the powers down to conserve area.
327     # df_fft * F_fft(f) = df_real *F_real(f)
328     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
329     # So the power gets normalized by *that* twice and we have 2/N * freq**2
330
331     # power per unit time
332     # measure x(t) for time T
333     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
334     # PSD(f) = 2 |X(f)|**2 / T
335
336     # total_time = len(data)/float(freq)
337     # power *= 2.0 / float(freq)**2   /   total_time
338     # power *= 2.0 / freq**2   *   freq / len(data)
339     power *= 2.0 / (freq * _numpy.float(len(data)))
340
341     return (freq_axis, power)
342
343
344 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024):
345     x = _numpy.zeros((samples,), dtype=_numpy.float)
346     samp_freq = _numpy.float(samp_freq)
347     for i in range(samples):
348         x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
349     freq_axis, power = unitary_power_spectrum(x, samp_freq)
350     imax = _numpy.argmax(power)
351
352     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
353     df = samp_freq / _numpy.float(samples)  # df = 1/T, where T = total_time
354     i = int(sin_freq / df)
355     # average power per unit time is
356     #  P = <x(t)**2>
357     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
358     # so average value of (int sin(t)**2 dt) per unit time is 0.5
359     #  P = 0.5
360     # we spread that power over a frequency bin of width df, sp
361     #  P(f0) = 0.5/df
362     # where f0 is the sin's frequency
363     #
364     # or:
365     # FFT of sin(2*pi*t*f0) gives
366     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
367     # (area under x(t) = 0, area under X(f) = 0)
368     # so one sided power spectral density (PSD) per unit time is
369     #  P(f) = 2 |X(f)|**2 / T
370     #       = 2 * |0.5 delta(f-f0)|**2 / T
371     #       = 0.5 * |delta(f-f0)|**2 / T
372     # but we're discrete and want the integral of the 'delta' to be 1,
373     # so 'delta'*df = 1  --> 'delta' = 1/df, and
374     #  P(f) = 0.5 / (df**2 * T)
375     #       = 0.5 / df                (T = 1/df)
376     expected[i] = 0.5 / df
377
378     print('The power should be a peak at {} Hz of {} ({}, {})'.format(
379             sin_freq, expected[i], freq_axis[imax], power[imax]))
380     Pexp = P = 0
381     for i in range(len(freq_axis)):
382         Pexp += expected[i] * df
383         P += power[i] * df
384     print('The total power should be {} ({})'.format(Pexp, P))
385
386     if TEST_PLOTS:
387         figure = _pyplot.figure()
388         time_axes = figure.add_subplot(2, 1, 1)
389         time_axes.plot(
390             _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
391         time_axes.set_title('time series')
392         freq_axes = figure.add_subplot(2, 1, 2)
393         freq_axes.plot(freq_axis, power, 'r.')
394         freq_axes.plot(freq_axis, expected, 'b-')
395         freq_axes.set_title(
396             '{} samples of sin at {} Hz'.format(samples, sin_freq))
397
398
399 def _test_unitary_power_spectrum_sin_suite():
400     print('Test unitary power spectrums on variously shaped sin functions')
401     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
402     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
403     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
404     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
405     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
406     # with some irrational numbers, to check that I'm not getting lucky
407     _test_unitary_power_spectrum_sin(
408         sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)
409     # test with non-integer number of periods
410     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
411
412
413 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256):
414     x = _numpy.zeros((samples,), dtype=_numpy.float)
415     samp_freq = _numpy.float(samp_freq)
416     x[0] = amp
417     freq_axis, power = unitary_power_spectrum(x, samp_freq)
418
419     # power = <x(t)**2> = (amp)**2 * dt/T
420     # we spread that power over the entire freq_axis [0,fN], so
421     #  P(f)  = (amp)**2 dt / (T fN)
422     # where
423     #  dt = 1/samp_freq        (sample period)
424     #  T  = samples/samp_freq  (total time of data aquisition)
425     #  fN = 0.5 samp_freq      (Nyquist frequency)
426     # so
427     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
428     #       = 2 amp**2 / (samp_freq*samples)
429     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
430     expected = _numpy.ones(
431         (len(freq_axis),), dtype=_numpy.float) * expected_amp
432
433     print('The power should be flat at y = {} ({})'.format(
434         expected_amp, power[0]))
435
436     if TEST_PLOTS:
437         figure = _pyplot.figure()
438         time_axes = figure.add_subplot(2, 1, 1)
439         time_axes.plot(
440             _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
441         time_axes.set_title('time series')
442         freq_axes = figure.add_subplot(2, 1, 2)
443         freq_axes.plot(freq_axis, power, 'r.')
444         freq_axes.plot(freq_axis, expected, 'b-')
445         freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
446
447
448 def _test_unitary_power_spectrum_delta_suite():
449     print('Test unitary power spectrums on various delta functions')
450     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
451     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
452     # expected = 2*computed
453     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)
454     # expected = 0.5*computed
455     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)
456     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
457     _test_unitary_power_spectrum_delta(
458         amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
459
460
461 def _gaussian2(area, mean, std, t):
462     "Integral over all time = area (i.e. normalized for area=1)"
463     return area / (std * _numpy.sqrt(2.0 * _numpy.pi)) * _numpy.exp(
464         -0.5 * ((t-mean)/std)**2)
465
466
467 def _test_unitary_power_spectrum_gaussian(
468     area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512):
469     x = _numpy.zeros((samples,), dtype=_numpy.float)
470     mean = _numpy.float(mean)
471     for i in range(samples):
472         t = i / _numpy.float(samp_freq)
473         x[i] = _gaussian2(area, mean, std, t)
474     freq_axis, power = unitary_power_spectrum(x, samp_freq)
475
476     # generate the predicted curve
477     # by comparing our _gaussian2() form to _gaussian(),
478     # we see that the Fourier transform of x(t) has parameters:
479     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are
480     #                           converting to angular frequency,
481     #                           not frequency like we are)
482     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
483     #  mean' = 0               (changing the mean in the time-domain just
484     #                           changes the phase in the freq-domain)
485     # So our power spectral density per unit time is given by
486     #  P(f) = 2 |X(f)|**2 / T
487     # Where
488     #  T  = samples/samp_freq  (total time of data aquisition)
489     mean = 0.0
490     area = area / (std * _numpy.sqrt(2.0 * _numpy.pi))
491     std = 1.0 / (2.0 * _numpy.pi * std)
492     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
493     # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
494     df = _numpy.float(samp_freq) / samples
495     for i in range(len(freq_axis)):
496         f = i * df
497         gaus = _gaussian2(area, mean, std, f)
498         expected[i] = 2.0 * gaus**2 * samp_freq / samples
499     print(('The power should be a half-gaussian, '
500            'with a peak at 0 Hz with amplitude {} ({})').format(
501             expected[0], power[0]))
502
503     if TEST_PLOTS:
504         figure = _pyplot.figure()
505         time_axes = figure.add_subplot(2, 1, 1)
506         time_axes.plot(
507             _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
508         time_axes.set_title('time series')
509         freq_axes = figure.add_subplot(2, 1, 2)
510         freq_axes.plot(freq_axis, power, 'r.')
511         freq_axes.plot(freq_axis, expected, 'b-')
512         freq_axes.set_title('freq series')
513
514
515 def _test_unitary_power_spectrum_gaussian_suite():
516     print('Test unitary power spectrums on various gaussian functions')
517     _test_unitary_power_spectrum_gaussian(
518         area=1, std=1, samp_freq=10.0, samples=1024)
519     _test_unitary_power_spectrum_gaussian(
520         area=1, std=2, samp_freq=10.0, samples=1024)
521     _test_unitary_power_spectrum_gaussian(
522         area=1, std=1, samp_freq=10.0, samples=2048)
523     _test_unitary_power_spectrum_gaussian(
524         area=1, std=1, samp_freq=20.0, samples=2048)
525     _test_unitary_power_spectrum_gaussian(
526         area=3, std=1, samp_freq=10.0, samples=1024)
527     _test_unitary_power_spectrum_gaussian(
528         area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1),
529         samples=1024)
530
531
532 def window_hann(length):
533     "Returns a Hann window array with length entries"
534     win = _numpy.zeros((length,), dtype=_numpy.float)
535     for i in range(length):
536         win[i] = 0.5 * (
537             1.0 - _numpy.cos(2.0 * _numpy.pi * _numpy.float(i) / (length)))
538     # avg value of cos over a period is 0
539     # so average height of Hann window is 0.5
540     return win
541
542
543 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
544                        overlap=True, window=window_hann):
545     """Compute the avgerage power spectrum of DATA.
546
547     Taken with a sampling frequency FREQ.
548
549     DATA must be real (not complex) by breaking DATA into chunks.
550     The chunks may or may not be overlapping (by setting OVERLAP).
551     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
552     and the resulting spectra are averaged together.
553     See NR 13.4 for rational.
554
555     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
556     CHUNK_SIZE should really be a power of 2.
557     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
558     the FFT ignores some of the later points.
559     """
560     if chunk_size != floor_pow_of_two(chunk_size):
561         raise ValueError(
562             'chunk_size {} should be a power of 2'.format(chunk_size))
563
564     nchunks = len(data) // chunk_size  # integer division = implicit floor
565     if overlap:
566         chunk_step = chunk_size / 2
567     else:
568         chunk_step = chunk_size
569
570     win = window(chunk_size)  # generate a window of the appropriate size
571     freq_axis = _numpy.linspace(0, freq / 2, chunk_size / 2 + 1)
572     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
573     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
574     # See Numerical Recipies for a details.
575     power = _numpy.zeros((chunk_size / 2 + 1, ), dtype=_numpy.float)
576     for i in range(nchunks):
577         starti = i * chunk_step
578         stopi = starti + chunk_size
579         fft_chunk = _numpy.fft.rfft(data[starti:stopi] * win)
580         p_chunk = (fft_chunk * fft_chunk.conj()).real
581         power += p_chunk.astype(_numpy.float)
582     power /= _numpy.float(nchunks)
583     return (freq_axis, power)
584
585
586 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
587                                overlap=True, window=window_hann):
588     """Compute the average power spectrum, preserving normalization
589     """
590     freq_axis,power = avg_power_spectrum(
591         data, freq, chunk_size, overlap, window)
592     #   2.0 / (freq * chunk_size)       |rfft()|**2 --> unitary_power_spectrum
593     # see unitary_power_spectrum()
594     power *= 2.0 / (freq * _numpy.float(chunk_size)) * 8.0 / 3
595     #             * 8/3  to remove power from windowing
596     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
597     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
598     # So our calulated power has and extra <w(t)**2> in it.
599     # For the Hann window,
600     #   <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
601     # For low frequency components,
602     # where the frequency of x(t) is ~= the frequency of w(t),
603     # the normalization is not perfect. ??
604     # The normalization approaches perfection as chunk_size -> infinity.
605     return (freq_axis, power)
606
607
608 def _test_unitary_avg_power_spectrum_sin(
609     sin_freq=10, samp_freq=512, samples=1024, chunk_size=512, overlap=True,
610     window=window_hann):
611     x = _numpy.zeros((samples,), dtype=_numpy.float)
612     samp_freq = _numpy.float(samp_freq)
613     for i in range(samples):
614         x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
615     freq_axis, power = unitary_avg_power_spectrum(
616         x, samp_freq, chunk_size, overlap, window)
617     imax = _numpy.argmax(power)
618
619     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
620     df = samp_freq / _numpy.float(chunk_size)  # df = 1/T, where T = total_time
621     i = int(sin_freq / df)
622     expected[i] = 0.5 / df  # see _test_unitary_power_spectrum_sin()
623
624     print('The power should peak at {} Hz of {} ({}, {})'.format(
625         sin_freq, expected[i], freq_axis[imax], power[imax]))
626     Pexp = P = 0
627     for i in range(len(freq_axis)):
628         Pexp += expected[i] * df
629         P += power[i] * df
630     print('The total power should be {} ({})'.format(Pexp, P))
631
632     if TEST_PLOTS:
633         figure = _pyplot.figure()
634         time_axes = figure.add_subplot(2, 1, 1)
635         time_axes.plot(
636             _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
637         time_axes.set_title('time series')
638         freq_axes = figure.add_subplot(2, 1, 2)
639         freq_axes.plot(freq_axis, power, 'r.')
640         freq_axes.plot(freq_axis, expected, 'b-')
641         freq_axes.set_title(
642             '{} samples of sin at {} Hz'.format(samples, sin_freq))
643
644
645 def _test_unitary_avg_power_spectrum_sin_suite():
646     print('Test unitary avg power spectrums on variously shaped sin functions')
647     _test_unitary_avg_power_spectrum_sin(
648         sin_freq=5, samp_freq=512, samples=1024)
649     _test_unitary_avg_power_spectrum_sin(
650         sin_freq=5, samp_freq=512, samples=2048)
651     _test_unitary_avg_power_spectrum_sin(
652         sin_freq=5, samp_freq=512, samples=4098)
653     _test_unitary_avg_power_spectrum_sin(
654         sin_freq=17, samp_freq=512, samples=1024)
655     _test_unitary_avg_power_spectrum_sin(
656         sin_freq=5, samp_freq=1024, samples=2048)
657     # test long wavelenth sin, so be closer to window frequency
658     _test_unitary_avg_power_spectrum_sin(
659         sin_freq=1, samp_freq=1024, samples=2048)
660     # finally, with some irrational numbers, to check that I'm not
661     # getting lucky
662     _test_unitary_avg_power_spectrum_sin(
663         sin_freq=_numpy.pi,
664         samp_freq=100 * _numpy.exp(1),
665         samples=1024)
666
667
668 def test():
669     _test_rfft_suite()
670     _test_unitary_rfft_parsevals_suite()
671     _test_unitary_rfft_rect_suite()
672     _test_unitary_rfft_gaussian_suite()
673     _test_unitary_power_spectrum_sin_suite()
674     _test_unitary_power_spectrum_delta_suite()
675     _test_unitary_power_spectrum_gaussian_suite()
676     _test_unitary_avg_power_spectrum_sin_suite()
677
678
679 if __name__ == '__main__':
680     from optparse import OptionParser
681
682     p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.')
683     p.add_option('-p', '--plot', dest='plot', action='store_true',
684                  help='Display time- and freq-space plots of test transforms.')
685
686     options,args = p.parse_args()
687
688     if options.plot:
689         import matplotlib.pyplot as _pyplot
690         TEST_PLOTS = True
691     test()
692     if options.plot:
693         _pyplot.show()