FFT_tools: add docstrings to separate function args and body
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 numpy.fft.rfft.
19
20 Create some fake data:
21
22 >>> import numpy
23 >>> data = numpy.random.rand(10)
24 >>> frequency = 1
25
26 Main entry functions:
27
28 >>> rfft = unitary_rfft(data, freq=frequency)
29 >>> psd = power_spectrum(data, freq=frequency)
30 >>> upsd = unitary_power_spectrum(data, freq=frequency)
31 >>> apsd = avg_power_spectrum(data, freq=frequency, chunk_size=2048,
32 ...     overlap=True, window=window_hann)
33 >>> aupsd = unitary_avg_power_spectrum(data, freq=frequency, chunk_size=2048,
34 ...     overlap=True, window=window_hann)
35 """
36
37 import numpy as _numpy
38
39
40 __version__ = '0.3'
41
42 # Display time- and freq-space plots of the test transforms if True
43 TEST_PLOTS = False
44
45
46 def floor_pow_of_two(num):
47     "Round num down to the closest exact a power of two."
48     lnum = _numpy.log2(num)
49     if int(lnum) != lnum:
50         num = 2**_numpy.floor(lnum)
51     return num
52
53
54 def round_pow_of_two(num):
55     "Round num to the closest exact a power of two on a log scale."
56     lnum = _numpy.log2(num)
57     if int(lnum) != lnum:
58         num = 2**_numpy.round(lnum)
59     return num
60
61
62 def ceil_pow_of_two(num):
63     "Round num up to the closest exact a power of two."
64     lnum = _numpy.log2(num)
65     if int(lnum) != lnum:
66         num = 2**_numpy.ceil(lnum)
67     return num
68
69
70 def _test_rfft(xs, Xs):
71     # Numpy's FFT algoritm returns
72     #          n-1
73     #   X[k] = SUM x[m] exp (-j 2pi km /n)
74     #          m=0
75     # (see http://www.tramy.us/numpybook.pdf)
76     j = _numpy.complex(0, 1)
77     n = len(xs)
78     Xa = []
79     for k in range(n):
80         Xa.append(sum([x * _numpy.exp(-j * 2 * _numpy.pi * k * m / n)
81                        for x,m in zip(xs, range(n))]))
82         if k < len(Xs):
83             if (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]) >= 1e-6:
84                 raise ValueError(
85                     ('rfft mismatch on element {}: {} != {}, relative error {}'
86                      ).format(
87                         k, Xs[k], Xa[k], (Xs[k] - Xa[k]) / _numpy.abs(Xa[k])))
88     # Which should satisfy the discrete form of Parseval's theorem
89     #   n-1               n-1
90     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
91     #   m=0               k=0
92     timeSum = sum([_numpy.abs(x)**2 for x in xs])
93     freqSum = sum([_numpy.abs(X)**2 for X in Xa])
94     if _numpy.abs(freqSum / _numpy.float(n) - timeSum) / timeSum >= 1e-6:
95         raise ValueError(
96             "Mismatch on Parseval's, {} != 1/{} * {}".format(
97                 timeSum, n, freqSum))
98
99
100 def _test_rfft_suite():
101     print('Test numpy rfft definition')
102     xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
103     _test_rfft(xs, _numpy.fft.rfft(xs))
104
105
106 def unitary_rfft(data, freq=1.0):
107     """Compute the Fourier transform of real data.
108
109     Unitary (preserves power [Parseval's theorem]).
110
111     If the units on your data are Volts,
112     and your sampling frequency is in Hz,
113     then freq_axis will be in Hz,
114     and trans will be in Volts.
115     """
116     nsamps = floor_pow_of_two(len(data))
117     # Which should satisfy the discrete form of Parseval's theorem
118     #   n-1               n-1
119     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
120     #   m=0               k=0
121     # However, we want our FFT to satisfy the continuous Parseval eqn
122     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
123     # which has the discrete form
124     #   n-1              n-1
125     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
126     #   m=0              k=0
127     # with X'_k = AX, this gives us
128     #   n-1                     n-1
129     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
130     #   m=0                     k=0
131     # so we see
132     #   A^2 df/dt = 1/n
133     #   A^2 = 1/n dt/df
134     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
135     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
136     # f_c in the transformed data is the Nyquist frequency (12.1.2)
137     #   f_c = 1/2dt
138     # and the points are spaced out by (12.1.5)
139     #   df = 1/ndt
140     # so
141     #   dt = 1/ndf
142     #   dt/df = 1/ndf^2
143     #   A^2 = 1/n^2df^2
144     #   A = 1/ndf = ndt/n = dt
145     # so we can convert the Numpy transformed data to match our unitary
146     # continuous transformed data with (also NR 12.1.8)
147     #   X'_k = dtX = X / <sampling freq>
148     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
149     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
150     return (freq_axis, trans)
151
152
153 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
154     # Which should satisfy the discretized integral form of Parseval's theorem
155     #   n-1              n-1
156     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
157     #   m=0              k=0
158     dt = 1.0 / freq
159     df = freqs[1] - freqs[0]
160     if df - 1 / (len(xs) * dt * df) >= 1e-6:
161         raise ValueError(
162             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
163     Xa = list(Xs)
164     for k in range(len(Xs) - 1, 1, -1):
165         Xa.append(Xa[k])
166     if len(xs) != len(Xa):
167         raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa)))
168     lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
169     rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
170     if _numpy.abs(lhs - rhs) / lhs >= 1e-4:
171         raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs))
172
173
174 def _test_unitary_rfft_parsevals_suite():
175     print("Test unitary rfft on Parseval's theorem")
176     xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
177     dt = _numpy.pi
178     freqs,Xs = unitary_rfft(xs, 1.0 / dt)
179     _test_unitary_rfft_parsevals(xs, 1.0 / dt, freqs, Xs)
180
181
182 def _rect(t):
183     if _numpy.abs(t) < 0.5:
184         return 1
185     else:
186         return 0
187
188
189 def _test_unitary_rfft_rect(
190     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
191     "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)"
192     samp_freq = _numpy.float(samp_freq)
193     a = _numpy.float(a)
194
195     x = _numpy.zeros((samples,), dtype=_numpy.float)
196     dt = 1.0 / samp_freq
197     for i in range(samples):
198         t = i * dt
199         x[i] = _rect(a * (t - time_shift))
200     freq_axis, X = unitary_rfft(x, samp_freq)
201     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
202
203     # remove the phase due to our time shift
204     j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
205     for i in range(len(freq_axis)):
206         f = freq_axis[i]
207         inverse_phase_shift = _numpy.exp(j * 2.0 * _numpy.pi * time_shift * f)
208         X[i] *= inverse_phase_shift
209
210     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
211     # normalized sinc(x) = sin(pi x)/(pi x)
212     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
213     if _numpy.sinc(0.5) != 2.0 / _numpy.pi:
214         raise ValueError('abnormal sinc()')
215     for i in range(len(freq_axis)):
216         f = freq_axis[i]
217         expected[i] = 1.0 / _numpy.abs(a) * _numpy.sinc(f / a)
218
219     if TEST_PLOTS:
220         figure = _pyplot.figure()
221         time_axes = figure.add_subplot(2, 1, 1)
222         time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
223         time_axes.set_title('time series')
224         freq_axes = figure.add_subplot(2, 1, 2)
225         freq_axes.plot(freq_axis, X.real, 'r.')
226         freq_axes.plot(freq_axis, X.imag, 'g.')
227         freq_axes.plot(freq_axis, expected, 'b-')
228         freq_axes.set_title('freq series')
229
230
231 def _test_unitary_rfft_rect_suite():
232     print('Test unitary FFTs on variously shaped rectangular functions')
233     _test_unitary_rfft_rect(a=0.5)
234     _test_unitary_rfft_rect(a=2.0)
235     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
236     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
237
238
239 def _gaussian(a, t):
240     return _numpy.exp(-a * t**2)
241
242
243 def _test_unitary_rfft_gaussian(
244     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
245     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
246     samp_freq = _numpy.float(samp_freq)
247     a = _numpy.float(a)
248
249     x = _numpy.zeros((samples,), dtype=_numpy.float)
250     dt = 1.0 / samp_freq
251     for i in range(samples):
252         t = i * dt
253         x[i] = _gaussian(a, (t - time_shift))
254     freq_axis, X = unitary_rfft(x, samp_freq)
255     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
256
257     # remove the phase due to our time shift
258     j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
259     for i in range(len(freq_axis)):
260         f = freq_axis[i]
261         inverse_phase_shift = _numpy.exp(j * 2.0 * _numpy.pi * time_shift * f)
262         X[i] *= inverse_phase_shift
263
264     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
265     for i in range(len(freq_axis)):
266         f = freq_axis[i]
267         # see Wikipedia, or do the integral yourself.
268         expected[i] = _numpy.sqrt(_numpy.pi / a) * _gaussian(
269             1.0 / a, _numpy.pi * f)
270
271     if TEST_PLOTS:
272         figure = _pyplot.figure()
273         time_axes = figure.add_subplot(2, 1, 1)
274         time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
275         time_axes.set_title('time series')
276         freq_axes = figure.add_subplot(2, 1, 2)
277         freq_axes.plot(freq_axis, X.real, 'r.')
278         freq_axes.plot(freq_axis, X.imag, 'g.')
279         freq_axes.plot(freq_axis, expected, 'b-')
280         freq_axes.set_title('freq series')
281
282
283 def _test_unitary_rfft_gaussian_suite():
284     print("Test unitary FFTs on variously shaped gaussian functions")
285     _test_unitary_rfft_gaussian(a=0.5)
286     _test_unitary_rfft_gaussian(a=2.0)
287     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
288     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
289
290
291 def power_spectrum(data, freq=1.0):
292     """Compute the power spectrum of DATA taken with a sampling frequency FREQ.
293
294     DATA must be real (not complex).
295     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
296     If the number of samples in data is not an integer power of two,
297     the FFT ignores some of the later points.
298     """
299     nsamps = floor_pow_of_two(len(data))
300
301     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
302     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
303     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
304     # See Numerical Recipies for a details.
305     trans = _numpy.fft.rfft(data[0:nsamps])
306     power = (trans * trans.conj()).real  # we want the square of the amplitude
307     return (freq_axis, power)
308
309
310 def unitary_power_spectrum(data, freq=1.0):
311     freq_axis,power = power_spectrum(data, freq)
312     # One sided power spectral density, so 2|H(f)|**2
313     # (see NR 2nd edition 12.0.14, p498)
314     #
315     # numpy normalizes with 1/N on the inverse transform ifft,
316     # so we should normalize the freq-space representation with 1/sqrt(N).
317     # But we're using the rfft, where N points are like N/2 complex points,
318     # so 1/sqrt(N/2)
319     # So the power gets normalized by that twice and we have 2/N
320     #
321     # On top of this, the FFT assumes a sampling freq of 1 per second,
322     # and we want to preserve area under our curves.
323     # If our total time T = len(data)/freq is smaller than 1,
324     # our df_real = freq/len(data) is bigger that the FFT expects
325     # (dt_fft = 1/len(data)),
326     # and we need to scale the powers down to conserve area.
327     # df_fft * F_fft(f) = df_real *F_real(f)
328     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
329     # So the power gets normalized by *that* twice and we have 2/N * freq**2
330
331     # power per unit time
332     # measure x(t) for time T
333     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
334     # PSD(f) = 2 |X(f)|**2 / T
335
336     # total_time = len(data)/float(freq)
337     # power *= 2.0 / float(freq)**2   /   total_time
338     # power *= 2.0 / freq**2   *   freq / len(data)
339     power *= 2.0 / (freq * _numpy.float(len(data)))
340
341     return (freq_axis, power)
342
343
344 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024):
345     x = _numpy.zeros((samples,), dtype=_numpy.float)
346     samp_freq = _numpy.float(samp_freq)
347     for i in range(samples):
348         x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
349     freq_axis, power = unitary_power_spectrum(x, samp_freq)
350     imax = _numpy.argmax(power)
351
352     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
353     df = samp_freq / _numpy.float(samples)  # df = 1/T, where T = total_time
354     i = int(sin_freq / df)
355     # average power per unit time is
356     #  P = <x(t)**2>
357     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
358     # so average value of (int sin(t)**2 dt) per unit time is 0.5
359     #  P = 0.5
360     # we spread that power over a frequency bin of width df, sp
361     #  P(f0) = 0.5/df
362     # where f0 is the sin's frequency
363     #
364     # or:
365     # FFT of sin(2*pi*t*f0) gives
366     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
367     # (area under x(t) = 0, area under X(f) = 0)
368     # so one sided power spectral density (PSD) per unit time is
369     #  P(f) = 2 |X(f)|**2 / T
370     #       = 2 * |0.5 delta(f-f0)|**2 / T
371     #       = 0.5 * |delta(f-f0)|**2 / T
372     # but we're discrete and want the integral of the 'delta' to be 1,
373     # so 'delta'*df = 1  --> 'delta' = 1/df, and
374     #  P(f) = 0.5 / (df**2 * T)
375     #       = 0.5 / df                (T = 1/df)
376     expected[i] = 0.5 / df
377
378     print('The power should be a peak at {} Hz of {} ({}, {})'.format(
379             sin_freq, expected[i], freq_axis[imax], power[imax]))
380     Pexp = P = 0
381     for i in range(len(freq_axis)):
382         Pexp += expected[i] * df
383         P += power[i] * df
384     print('The total power should be {} ({})'.format(Pexp, P))
385
386     if TEST_PLOTS:
387         figure = _pyplot.figure()
388         time_axes = figure.add_subplot(2, 1, 1)
389         time_axes.plot(
390             _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
391         time_axes.set_title('time series')
392         freq_axes = figure.add_subplot(2, 1, 2)
393         freq_axes.plot(freq_axis, power, 'r.')
394         freq_axes.plot(freq_axis, expected, 'b-')
395         freq_axes.set_title(
396             '{} samples of sin at {} Hz'.format(samples, sin_freq))
397
398
399 def _test_unitary_power_spectrum_sin_suite():
400     print('Test unitary power spectrums on variously shaped sin functions')
401     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
402     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
403     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
404     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
405     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
406     # with some irrational numbers, to check that I'm not getting lucky
407     _test_unitary_power_spectrum_sin(
408         sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)
409     # test with non-integer number of periods
410     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
411
412
413 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256):
414     x = _numpy.zeros((samples,), dtype=_numpy.float)
415     samp_freq = _numpy.float(samp_freq)
416     x[0] = amp
417     freq_axis, power = unitary_power_spectrum(x, samp_freq)
418
419     # power = <x(t)**2> = (amp)**2 * dt/T
420     # we spread that power over the entire freq_axis [0,fN], so
421     #  P(f)  = (amp)**2 dt / (T fN)
422     # where
423     #  dt = 1/samp_freq        (sample period)
424     #  T  = samples/samp_freq  (total time of data aquisition)
425     #  fN = 0.5 samp_freq      (Nyquist frequency)
426     # so
427     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
428     #       = 2 amp**2 / (samp_freq*samples)
429     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
430     expected = _numpy.ones(
431         (len(freq_axis),), dtype=_numpy.float) * expected_amp
432
433     print('The power should be flat at y = {} ({})'.format(
434         expected_amp, power[0]))
435
436     if TEST_PLOTS:
437         figure = _pyplot.figure()
438         time_axes = figure.add_subplot(2, 1, 1)
439         time_axes.plot(
440             _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
441         time_axes.set_title('time series')
442         freq_axes = figure.add_subplot(2, 1, 2)
443         freq_axes.plot(freq_axis, power, 'r.')
444         freq_axes.plot(freq_axis, expected, 'b-')
445         freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
446
447
448 def _test_unitary_power_spectrum_delta_suite():
449     print('Test unitary power spectrums on various delta functions')
450     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
451     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
452     # expected = 2*computed
453     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)
454     # expected = 0.5*computed
455     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)
456     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
457     _test_unitary_power_spectrum_delta(
458         amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
459
460
461 def _gaussian2(area, mean, std, t):
462     "Integral over all time = area (i.e. normalized for area=1)"
463     return area / (std * _numpy.sqrt(2.0 * _numpy.pi)) * _numpy.exp(
464         -0.5 * ((t-mean)/std)**2)
465
466
467 def _test_unitary_power_spectrum_gaussian(
468     area=2.5, mean=5, std=1, samp_freq=10.24, samples=512):
469     "Test unitary_power_spectrum() on the gaussian function"
470     x = _numpy.zeros((samples,), dtype=_numpy.float)
471     mean = _numpy.float(mean)
472     for i in range(samples):
473         t = i / _numpy.float(samp_freq)
474         x[i] = _gaussian2(area, mean, std, t)
475     freq_axis, power = unitary_power_spectrum(x, samp_freq)
476
477     # generate the predicted curve
478     # by comparing our _gaussian2() form to _gaussian(),
479     # we see that the Fourier transform of x(t) has parameters:
480     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are
481     #                           converting to angular frequency,
482     #                           not frequency like we are)
483     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
484     #  mean' = 0               (changing the mean in the time-domain just
485     #                           changes the phase in the freq-domain)
486     # So our power spectral density per unit time is given by
487     #  P(f) = 2 |X(f)|**2 / T
488     # Where
489     #  T  = samples/samp_freq  (total time of data aquisition)
490     mean = 0.0
491     area = area / (std * _numpy.sqrt(2.0 * _numpy.pi))
492     std = 1.0 / (2.0 * _numpy.pi * std)
493     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
494     # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
495     df = _numpy.float(samp_freq) / samples
496     for i in range(len(freq_axis)):
497         f = i * df
498         gaus = _gaussian2(area, mean, std, f)
499         expected[i] = 2.0 * gaus**2 * samp_freq / samples
500     print(('The power should be a half-gaussian, '
501            'with a peak at 0 Hz with amplitude {} ({})').format(
502             expected[0], power[0]))
503
504     if TEST_PLOTS:
505         figure = _pyplot.figure()
506         time_axes = figure.add_subplot(2, 1, 1)
507         time_axes.plot(
508             _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
509         time_axes.set_title('time series')
510         freq_axes = figure.add_subplot(2, 1, 2)
511         freq_axes.plot(freq_axis, power, 'r.')
512         freq_axes.plot(freq_axis, expected, 'b-')
513         freq_axes.set_title('freq series')
514
515
516 def _test_unitary_power_spectrum_gaussian_suite():
517     print('Test unitary power spectrums on various gaussian functions')
518     _test_unitary_power_spectrum_gaussian(
519         area=1, std=1, samp_freq=10.0, samples=1024)
520     _test_unitary_power_spectrum_gaussian(
521         area=1, std=2, samp_freq=10.0, samples=1024)
522     _test_unitary_power_spectrum_gaussian(
523         area=1, std=1, samp_freq=10.0, samples=2048)
524     _test_unitary_power_spectrum_gaussian(
525         area=1, std=1, samp_freq=20.0, samples=2048)
526     _test_unitary_power_spectrum_gaussian(
527         area=3, std=1, samp_freq=10.0, samples=1024)
528     _test_unitary_power_spectrum_gaussian(
529         area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1),
530         samples=1024)
531
532
533 def window_hann(length):
534     "Returns a Hann window array with length entries"
535     win = _numpy.zeros((length,), dtype=_numpy.float)
536     for i in range(length):
537         win[i] = 0.5 * (
538             1.0 - _numpy.cos(2.0 * _numpy.pi * _numpy.float(i) / (length)))
539     # avg value of cos over a period is 0
540     # so average height of Hann window is 0.5
541     return win
542
543
544 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
545                        overlap=True, window=window_hann):
546     """Compute the avgerage power spectrum of DATA.
547
548     Taken with a sampling frequency FREQ.
549
550     DATA must be real (not complex) by breaking DATA into chunks.
551     The chunks may or may not be overlapping (by setting OVERLAP).
552     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
553     and the resulting spectra are averaged together.
554     See NR 13.4 for rational.
555
556     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
557     CHUNK_SIZE should really be a power of 2.
558     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
559     the FFT ignores some of the later points.
560     """
561     if chunk_size != floor_pow_of_two(chunk_size):
562         raise ValueError(
563             'chunk_size {} should be a power of 2'.format(chunk_size))
564
565     nchunks = len(data) // chunk_size  # integer division = implicit floor
566     if overlap:
567         chunk_step = chunk_size / 2
568     else:
569         chunk_step = chunk_size
570
571     win = window(chunk_size)  # generate a window of the appropriate size
572     freq_axis = _numpy.linspace(0, freq / 2, chunk_size / 2 + 1)
573     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
574     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
575     # See Numerical Recipies for a details.
576     power = _numpy.zeros((chunk_size / 2 + 1, ), dtype=_numpy.float)
577     for i in range(nchunks):
578         starti = i * chunk_step
579         stopi = starti + chunk_size
580         fft_chunk = _numpy.fft.rfft(data[starti:stopi] * win)
581         p_chunk = (fft_chunk * fft_chunk.conj()).real
582         power += p_chunk.astype(_numpy.float)
583     power /= _numpy.float(nchunks)
584     return (freq_axis, power)
585
586
587 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
588                                overlap=True, window=window_hann):
589     """Compute the average power spectrum, preserving normalization
590     """
591     freq_axis,power = avg_power_spectrum(
592         data, freq, chunk_size, overlap, window)
593     #   2.0 / (freq * chunk_size)       |rfft()|**2 --> unitary_power_spectrum
594     # see unitary_power_spectrum()
595     power *= 2.0 / (freq * _numpy.float(chunk_size)) * 8.0 / 3
596     #             * 8/3  to remove power from windowing
597     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
598     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
599     # So our calulated power has and extra <w(t)**2> in it.
600     # For the Hann window,
601     #   <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
602     # For low frequency components,
603     # where the frequency of x(t) is ~= the frequency of w(t),
604     # the normalization is not perfect. ??
605     # The normalization approaches perfection as chunk_size -> infinity.
606     return (freq_axis, power)
607
608
609 def _test_unitary_avg_power_spectrum_sin(
610     sin_freq=10, samp_freq=512, samples=1024, chunk_size=512, overlap=True,
611     window=window_hann):
612     "Test unitary_avg_power_spectrum() on the sine function"
613     x = _numpy.zeros((samples,), dtype=_numpy.float)
614     samp_freq = _numpy.float(samp_freq)
615     for i in range(samples):
616         x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
617     freq_axis, power = unitary_avg_power_spectrum(
618         x, samp_freq, chunk_size, overlap, window)
619     imax = _numpy.argmax(power)
620
621     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
622     df = samp_freq / _numpy.float(chunk_size)  # df = 1/T, where T = total_time
623     i = int(sin_freq / df)
624     expected[i] = 0.5 / df  # see _test_unitary_power_spectrum_sin()
625
626     print('The power should peak at {} Hz of {} ({}, {})'.format(
627         sin_freq, expected[i], freq_axis[imax], power[imax]))
628     Pexp = P = 0
629     for i in range(len(freq_axis)):
630         Pexp += expected[i] * df
631         P += power[i] * df
632     print('The total power should be {} ({})'.format(Pexp, P))
633
634     if TEST_PLOTS:
635         figure = _pyplot.figure()
636         time_axes = figure.add_subplot(2, 1, 1)
637         time_axes.plot(
638             _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
639         time_axes.set_title('time series')
640         freq_axes = figure.add_subplot(2, 1, 2)
641         freq_axes.plot(freq_axis, power, 'r.')
642         freq_axes.plot(freq_axis, expected, 'b-')
643         freq_axes.set_title(
644             '{} samples of sin at {} Hz'.format(samples, sin_freq))
645
646
647 def _test_unitary_avg_power_spectrum_sin_suite():
648     print('Test unitary avg power spectrums on variously shaped sin functions')
649     _test_unitary_avg_power_spectrum_sin(
650         sin_freq=5, samp_freq=512, samples=1024)
651     _test_unitary_avg_power_spectrum_sin(
652         sin_freq=5, samp_freq=512, samples=2048)
653     _test_unitary_avg_power_spectrum_sin(
654         sin_freq=5, samp_freq=512, samples=4098)
655     _test_unitary_avg_power_spectrum_sin(
656         sin_freq=17, samp_freq=512, samples=1024)
657     _test_unitary_avg_power_spectrum_sin(
658         sin_freq=5, samp_freq=1024, samples=2048)
659     # test long wavelenth sin, so be closer to window frequency
660     _test_unitary_avg_power_spectrum_sin(
661         sin_freq=1, samp_freq=1024, samples=2048)
662     # finally, with some irrational numbers, to check that I'm not
663     # getting lucky
664     _test_unitary_avg_power_spectrum_sin(
665         sin_freq=_numpy.pi,
666         samp_freq=100 * _numpy.exp(1),
667         samples=1024)
668
669
670 def test():
671     _test_rfft_suite()
672     _test_unitary_rfft_parsevals_suite()
673     _test_unitary_rfft_rect_suite()
674     _test_unitary_rfft_gaussian_suite()
675     _test_unitary_power_spectrum_sin_suite()
676     _test_unitary_power_spectrum_delta_suite()
677     _test_unitary_power_spectrum_gaussian_suite()
678     _test_unitary_avg_power_spectrum_sin_suite()
679
680
681 if __name__ == '__main__':
682     from optparse import OptionParser
683
684     p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.')
685     p.add_option('-p', '--plot', dest='plot', action='store_true',
686                  help='Display time- and freq-space plots of test transforms.')
687
688     options,args = p.parse_args()
689
690     if options.plot:
691         import matplotlib.pyplot as _pyplot
692         TEST_PLOTS = True
693     test()
694     if options.plot:
695         _pyplot.show()