FFT_tools: wrap long comments for PEP8 compliance
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 numpy.fft.rfft.
19
20 Create some fake data:
21
22 >>> import numpy
23 >>> data = numpy.random.rand(10)
24 >>> frequency = 1
25
26 Main entry functions:
27
28 >>> rfft = unitary_rfft(data, freq=frequency)
29 >>> psd = power_spectrum(data, freq=frequency)
30 >>> upsd = unitary_power_spectrum(data, freq=frequency)
31 >>> apsd = avg_power_spectrum(data, freq=frequency, chunk_size=2048,
32 ...     overlap=True, window=window_hann)
33 >>> aupsd = unitary_avg_power_spectrum(data, freq=frequency, chunk_size=2048,
34 ...     overlap=True, window=window_hann)
35 """
36
37 import numpy as _numpy
38
39
40 __version__ = '0.3'
41
42 # Display time- and freq-space plots of the test transforms if True
43 TEST_PLOTS = False
44
45
46 def floor_pow_of_two(num):
47     "Round num down to the closest exact a power of two."
48     lnum = _numpy.log2(num)
49     if int(lnum) != lnum:
50         num = 2**_numpy.floor(lnum)
51     return num
52
53 def round_pow_of_two(num):
54     "Round num to the closest exact a power of two on a log scale."
55     lnum = _numpy.log2(num)
56     if int(lnum) != lnum:
57         num = 2**_numpy.round(lnum)
58     return num
59
60 def ceil_pow_of_two(num):
61     "Round num up to the closest exact a power of two."
62     lnum = _numpy.log2(num)
63     if int(lnum) != lnum:
64         num = 2**_numpy.ceil(lnum)
65     return num
66
67 def _test_rfft(xs, Xs):
68     # Numpy's FFT algoritm returns
69     #          n-1
70     #   X[k] = SUM x[m] exp (-j 2pi km /n)
71     #          m=0
72     # (see http://www.tramy.us/numpybook.pdf)
73     j = _numpy.complex(0,1)
74     n = len(xs)
75     Xa = []
76     for k in range(n):
77         Xa.append(sum([x*_numpy.exp(-j*2*_numpy.pi*k*m/n)
78                        for x,m in zip(xs,range(n))]))
79         if k < len(Xs):
80             if (Xs[k]-Xa[k])/_numpy.abs(Xa[k]) >= 1e-6:
81                 raise ValueError(
82                     ('rfft mismatch on element {}: {} != {}, relative error {}'
83                      ).format(
84                         k, Xs[k], Xa[k], (Xs[k]-Xa[k])/_numpy.abs(Xa[k])))
85     # Which should satisfy the discrete form of Parseval's theorem
86     #   n-1               n-1
87     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
88     #   m=0               k=0
89     timeSum = sum([_numpy.abs(x)**2 for x in xs])
90     freqSum = sum([_numpy.abs(X)**2 for X in Xa])
91     if _numpy.abs(freqSum/_numpy.float(n) - timeSum)/timeSum >= 1e-6:
92         raise ValueError(
93             "Mismatch on Parseval's, {} != 1/{} * {}".format(
94                 timeSum, n, freqSum))
95
96 def _test_rfft_suite():
97     print('Test numpy rfft definition')
98     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
99     _test_rfft(xs, _numpy.fft.rfft(xs))
100
101 def unitary_rfft(data, freq=1.0):
102     """Compute the Fourier transform of real data.
103
104     Unitary (preserves power [Parseval's theorem]).
105
106     If the units on your data are Volts,
107     and your sampling frequency is in Hz,
108     then freq_axis will be in Hz,
109     and trans will be in Volts.
110     """
111     nsamps = floor_pow_of_two(len(data))
112     # Which should satisfy the discrete form of Parseval's theorem
113     #   n-1               n-1
114     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
115     #   m=0               k=0
116     # However, we want our FFT to satisfy the continuous Parseval eqn
117     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
118     # which has the discrete form
119     #   n-1              n-1
120     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
121     #   m=0              k=0
122     # with X'_k = AX, this gives us
123     #   n-1                     n-1
124     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
125     #   m=0                     k=0
126     # so we see
127     #   A^2 df/dt = 1/n
128     #   A^2 = 1/n dt/df
129     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
130     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
131     # f_c in the transformed data is the Nyquist frequency (12.1.2)
132     #   f_c = 1/2dt
133     # and the points are spaced out by (12.1.5)
134     #   df = 1/ndt
135     # so
136     #   dt = 1/ndf
137     #   dt/df = 1/ndf^2
138     #   A^2 = 1/n^2df^2
139     #   A = 1/ndf = ndt/n = dt
140     # so we can convert the Numpy transformed data to match our unitary
141     # continuous transformed data with (also NR 12.1.8)
142     #   X'_k = dtX = X / <sampling freq>
143     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
144     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
145     return (freq_axis, trans)
146
147 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
148     # Which should satisfy the discretized integral form of Parseval's theorem
149     #   n-1              n-1
150     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
151     #   m=0              k=0
152     dt = 1.0/freq
153     df = freqs[1]-freqs[0]
154     if df - 1/(len(xs)*dt)/df >= 1e-6:
155         raise ValueError(
156             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
157     Xa = list(Xs)
158     for k in range(len(Xs)-1,1,-1):
159         Xa.append(Xa[k])
160     if len(xs) != len(Xa):
161         raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa)))
162     lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
163     rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
164     if _numpy.abs(lhs - rhs)/lhs >= 1e-4:
165         raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs))
166
167 def _test_unitary_rfft_parsevals_suite():
168     print("Test unitary rfft on Parseval's theorem")
169     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
170     dt = _numpy.pi
171     freqs,Xs = unitary_rfft(xs, 1.0/dt)
172     _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
173
174 def _rect(t):
175     if _numpy.abs(t) < 0.5:
176         return 1
177     else:
178         return 0
179
180 def _test_unitary_rfft_rect(
181     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
182     "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)"
183     samp_freq = _numpy.float(samp_freq)
184     a = _numpy.float(a)
185
186     x = _numpy.zeros((samples,), dtype=_numpy.float)
187     dt = 1.0/samp_freq
188     for i in range(samples):
189         t = i*dt
190         x[i] = _rect(a*(t-time_shift))
191     freq_axis, X = unitary_rfft(x, samp_freq)
192     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
193
194     # remove the phase due to our time shift
195     j = _numpy.complex(0.0,1.0) # sqrt(-1)
196     for i in range(len(freq_axis)):
197         f = freq_axis[i]
198         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
199         X[i] *= inverse_phase_shift
200
201     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
202     # normalized sinc(x) = sin(pi x)/(pi x)
203     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
204     if _numpy.sinc(0.5) != 2.0/_numpy.pi:
205         raise ValueError('abnormal sinc()')
206     for i in range(len(freq_axis)):
207         f = freq_axis[i]
208         expected[i] = 1.0/_numpy.abs(a) * _numpy.sinc(f/a)
209
210     if TEST_PLOTS:
211         figure = _pyplot.figure()
212         time_axes = figure.add_subplot(2, 1, 1)
213         time_axes.plot(_numpy.arange(0, dt*samples, dt), x)
214         time_axes.set_title('time series')
215         freq_axes = figure.add_subplot(2, 1, 2)
216         freq_axes.plot(freq_axis, X.real, 'r.')
217         freq_axes.plot(freq_axis, X.imag, 'g.')
218         freq_axes.plot(freq_axis, expected, 'b-')
219         freq_axes.set_title('freq series')
220
221 def _test_unitary_rfft_rect_suite():
222     print('Test unitary FFTs on variously shaped rectangular functions')
223     _test_unitary_rfft_rect(a=0.5)
224     _test_unitary_rfft_rect(a=2.0)
225     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
226     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
227
228 def _gaussian(a, t):
229     return _numpy.exp(-a * t**2)
230
231 def _test_unitary_rfft_gaussian(
232     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
233     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
234     samp_freq = _numpy.float(samp_freq)
235     a = _numpy.float(a)
236
237     x = _numpy.zeros((samples,), dtype=_numpy.float)
238     dt = 1.0/samp_freq
239     for i in range(samples):
240         t = i*dt
241         x[i] = _gaussian(a, (t-time_shift))
242     freq_axis, X = unitary_rfft(x, samp_freq)
243     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
244
245     # remove the phase due to our time shift
246     j = _numpy.complex(0.0,1.0) # sqrt(-1)
247     for i in range(len(freq_axis)):
248         f = freq_axis[i]
249         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
250         X[i] *= inverse_phase_shift
251
252     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
253     for i in range(len(freq_axis)):
254         f = freq_axis[i]
255         # see Wikipedia, or do the integral yourself.
256         expected[i] = _numpy.sqrt(_numpy.pi/a) * _gaussian(
257             1.0/a, _numpy.pi*f)
258
259     if TEST_PLOTS:
260         figure = _pyplot.figure()
261         time_axes = figure.add_subplot(2, 1, 1)
262         time_axes.plot(_numpy.arange(0, dt*samples, dt), x)
263         time_axes.set_title('time series')
264         freq_axes = figure.add_subplot(2, 1, 2)
265         freq_axes.plot(freq_axis, X.real, 'r.')
266         freq_axes.plot(freq_axis, X.imag, 'g.')
267         freq_axes.plot(freq_axis, expected, 'b-')
268         freq_axes.set_title('freq series')
269
270 def _test_unitary_rfft_gaussian_suite():
271     print("Test unitary FFTs on variously shaped gaussian functions")
272     _test_unitary_rfft_gaussian(a=0.5)
273     _test_unitary_rfft_gaussian(a=2.0)
274     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
275     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
276
277
278
279 def power_spectrum(data, freq=1.0):
280     """Compute the power spectrum of DATA taken with a sampling frequency FREQ.
281
282     DATA must be real (not complex).
283     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
284     If the number of samples in data is not an integer power of two,
285     the FFT ignores some of the later points.
286     """
287     nsamps = floor_pow_of_two(len(data))
288
289     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
290     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
291     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
292     # See Numerical Recipies for a details.
293     trans = _numpy.fft.rfft(data[0:nsamps])
294     power = (trans * trans.conj()).real # We want the square of the amplitude.
295     return (freq_axis, power)
296
297 def unitary_power_spectrum(data, freq=1.0):
298     freq_axis,power = power_spectrum(data, freq)
299     # One sided power spectral density, so 2|H(f)|**2
300     # (see NR 2nd edition 12.0.14, p498)
301     #
302     # numpy normalizes with 1/N on the inverse transform ifft,
303     # so we should normalize the freq-space representation with 1/sqrt(N).
304     # But we're using the rfft, where N points are like N/2 complex points,
305     # so 1/sqrt(N/2)
306     # So the power gets normalized by that twice and we have 2/N
307     #
308     # On top of this, the FFT assumes a sampling freq of 1 per second,
309     # and we want to preserve area under our curves.
310     # If our total time T = len(data)/freq is smaller than 1,
311     # our df_real = freq/len(data) is bigger that the FFT expects
312     # (dt_fft = 1/len(data)),
313     # and we need to scale the powers down to conserve area.
314     # df_fft * F_fft(f) = df_real *F_real(f)
315     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
316     # So the power gets normalized by *that* twice and we have 2/N * freq**2
317
318     # power per unit time
319     # measure x(t) for time T
320     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
321     # PSD(f) = 2 |X(f)|**2 / T
322
323     # total_time = len(data)/float(freq)
324     # power *= 2.0 / float(freq)**2   /   total_time
325     # power *= 2.0 / freq**2   *   freq / len(data)
326     power *= 2.0 / (freq * _numpy.float(len(data)))
327
328     return (freq_axis, power)
329
330 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024):
331     x = _numpy.zeros((samples,), dtype=_numpy.float)
332     samp_freq = _numpy.float(samp_freq)
333     for i in range(samples):
334         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
335     freq_axis, power = unitary_power_spectrum(x, samp_freq)
336     imax = _numpy.argmax(power)
337
338     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
339     df = samp_freq/_numpy.float(samples) # df = 1/T, where T = total_time
340     i = int(sin_freq/df)
341     # average power per unit time is
342     #  P = <x(t)**2>
343     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
344     # so average value of (int sin(t)**2 dt) per unit time is 0.5
345     #  P = 0.5
346     # we spread that power over a frequency bin of width df, sp
347     #  P(f0) = 0.5/df
348     # where f0 is the sin's frequency
349     #
350     # or:
351     # FFT of sin(2*pi*t*f0) gives
352     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
353     # (area under x(t) = 0, area under X(f) = 0)
354     # so one sided power spectral density (PSD) per unit time is
355     #  P(f) = 2 |X(f)|**2 / T
356     #       = 2 * |0.5 delta(f-f0)|**2 / T
357     #       = 0.5 * |delta(f-f0)|**2 / T
358     # but we're discrete and want the integral of the 'delta' to be 1,
359     # so 'delta'*df = 1  --> 'delta' = 1/df, and
360     #  P(f) = 0.5 / (df**2 * T)
361     #       = 0.5 / df                (T = 1/df)
362     expected[i] = 0.5 / df
363
364     print('The power should be a peak at {} Hz of {} ({}, {})'.format(
365             sin_freq, expected[i], freq_axis[imax], power[imax]))
366     Pexp = 0
367     P    = 0
368     for i in range(len(freq_axis)):
369         Pexp += expected[i] *df
370         P    += power[i] * df
371     print('The total power should be {} ({})'.format(Pexp, P))
372
373     if TEST_PLOTS:
374         figure = _pyplot.figure()
375         time_axes = figure.add_subplot(2, 1, 1)
376         time_axes.plot(
377             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
378         time_axes.set_title('time series')
379         freq_axes = figure.add_subplot(2, 1, 2)
380         freq_axes.plot(freq_axis, power, 'r.')
381         freq_axes.plot(freq_axis, expected, 'b-')
382         freq_axes.set_title(
383             '{} samples of sin at {} Hz'.format(samples, sin_freq))
384
385 def _test_unitary_power_spectrum_sin_suite():
386     print('Test unitary power spectrums on variously shaped sin functions')
387     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
388     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
389     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
390     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
391     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
392     # with some irrational numbers, to check that I'm not getting lucky
393     _test_unitary_power_spectrum_sin(
394         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
395     # test with non-integer number of periods
396     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
397
398 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256):
399     x = _numpy.zeros((samples,), dtype=_numpy.float)
400     samp_freq = _numpy.float(samp_freq)
401     x[0] = amp
402     freq_axis, power = unitary_power_spectrum(x, samp_freq)
403
404     # power = <x(t)**2> = (amp)**2 * dt/T
405     # we spread that power over the entire freq_axis [0,fN], so
406     #  P(f)  = (amp)**2 dt / (T fN)
407     # where
408     #  dt = 1/samp_freq        (sample period)
409     #  T  = samples/samp_freq  (total time of data aquisition)
410     #  fN = 0.5 samp_freq      (Nyquist frequency)
411     # so
412     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
413     #       = 2 amp**2 / (samp_freq*samples)
414     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
415     expected = _numpy.ones(
416         (len(freq_axis),), dtype=_numpy.float) * expected_amp
417
418     print('The power should be flat at y = {} ({})'.format(
419         expected_amp, power[0]))
420
421     if TEST_PLOTS:
422         figure = _pyplot.figure()
423         time_axes = figure.add_subplot(2, 1, 1)
424         time_axes.plot(
425             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
426         time_axes.set_title('time series')
427         freq_axes = figure.add_subplot(2, 1, 2)
428         freq_axes.plot(freq_axis, power, 'r.')
429         freq_axes.plot(freq_axis, expected, 'b-')
430         freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
431
432 def _test_unitary_power_spectrum_delta_suite():
433     print('Test unitary power spectrums on various delta functions')
434     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
435     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
436     # expected = 2*computed
437     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)
438     # expected = 0.5*computed
439     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)
440     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
441     _test_unitary_power_spectrum_delta(
442         amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
443
444 def _gaussian2(area, mean, std, t):
445     "Integral over all time = area (i.e. normalized for area=1)"
446     return area/(std*_numpy.sqrt(2.0*_numpy.pi)) * _numpy.exp(
447         -0.5*((t-mean)/std)**2)
448
449 def _test_unitary_power_spectrum_gaussian(
450     area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512):
451     x = _numpy.zeros((samples,), dtype=_numpy.float)
452     mean = _numpy.float(mean)
453     for i in range(samples):
454         t = i/_numpy.float(samp_freq)
455         x[i] = _gaussian2(area, mean, std, t)
456     freq_axis, power = unitary_power_spectrum(x, samp_freq)
457
458     # generate the predicted curve
459     # by comparing our _gaussian2() form to _gaussian(),
460     # we see that the Fourier transform of x(t) has parameters:
461     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are
462     #                           converting to angular frequency,
463     #                           not frequency like we are)
464     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
465     #  mean' = 0               (changing the mean in the time-domain just
466     #                           changes the phase in the freq-domain)
467     # So our power spectral density per unit time is given by
468     #  P(f) = 2 |X(f)|**2 / T
469     # Where
470     #  T  = samples/samp_freq  (total time of data aquisition)
471     mean = 0.0
472     area = area /(std*_numpy.sqrt(2.0*_numpy.pi))
473     std = 1.0/(2.0*_numpy.pi*std)
474     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
475     # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
476     df = _numpy.float(samp_freq)/samples
477     for i in range(len(freq_axis)):
478         f = i*df
479         gaus = _gaussian2(area, mean, std, f)
480         expected[i] = 2.0 * gaus**2 * samp_freq/samples
481     print(('The power should be a half-gaussian, '
482            'with a peak at 0 Hz with amplitude {} ({})').format(
483             expected[0], power[0]))
484
485     if TEST_PLOTS:
486         figure = _pyplot.figure()
487         time_axes = figure.add_subplot(2, 1, 1)
488         time_axes.plot(
489             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
490         time_axes.set_title('time series')
491         freq_axes = figure.add_subplot(2, 1, 2)
492         freq_axes.plot(freq_axis, power, 'r.')
493         freq_axes.plot(freq_axis, expected, 'b-')
494         freq_axes.set_title('freq series')
495
496 def _test_unitary_power_spectrum_gaussian_suite():
497     print('Test unitary power spectrums on various gaussian functions')
498     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=1024)
499     _test_unitary_power_spectrum_gaussian(area=1, std=2, samp_freq=10.0, samples=1024)
500     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048)
501     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048)
502     _test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024)
503     _test_unitary_power_spectrum_gaussian(
504         area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1),
505         samples=1024)
506
507 def window_hann(length):
508     "Returns a Hann window array with length entries"
509     win = _numpy.zeros((length,), dtype=_numpy.float)
510     for i in range(length):
511         win[i] = 0.5*(1.0-_numpy.cos(2.0*_numpy.pi*_numpy.float(i)/(length)))
512     # avg value of cos over a period is 0
513     # so average height of Hann window is 0.5
514     return win
515
516 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
517                        overlap=True, window=window_hann):
518     """Compute the avgerage power spectrum of DATA.
519
520     Taken with a sampling frequency FREQ.
521
522     DATA must be real (not complex) by breaking DATA into chunks.
523     The chunks may or may not be overlapping (by setting OVERLAP).
524     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
525     and the resulting spectra are averaged together.
526     See NR 13.4 for rational.
527
528     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
529     CHUNK_SIZE should really be a power of 2.
530     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
531     the FFT ignores some of the later points.
532     """
533     if chunk_size != floor_pow_of_two(chunk_size):
534         raise ValueError(
535             'chunk_size {} should be a power of 2'.format(chunk_size))
536
537     nchunks = len(data)/chunk_size # integer division = implicit floor
538     if overlap:
539         chunk_step = chunk_size/2
540     else:
541         chunk_step = chunk_size
542
543     win = window(chunk_size) # generate a window of the appropriate size
544     freq_axis = _numpy.linspace(0, freq/2, chunk_size/2+1)
545     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
546     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
547     # See Numerical Recipies for a details.
548     power = _numpy.zeros((chunk_size/2+1,), dtype=_numpy.float)
549     for i in range(nchunks):
550         starti = i*chunk_step
551         stopi = starti+chunk_size
552         fft_chunk = _numpy.fft.rfft(data[starti:stopi]*win)
553         p_chunk = (fft_chunk * fft_chunk.conj()).real
554         power += p_chunk.astype(_numpy.float)
555     power /= _numpy.float(nchunks)
556     return (freq_axis, power)
557
558 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
559                                overlap=True, window=window_hann):
560     """Compute the average power spectrum, preserving normalization
561     """
562     freq_axis,power = avg_power_spectrum(
563         data, freq, chunk_size, overlap, window)
564     #   2.0 / (freq * chunk_size)       |rfft()|**2 --> unitary_power_spectrum
565     # see unitary_power_spectrum()
566     power *= 2.0 / (freq*_numpy.float(chunk_size)) * 8/3
567     #             * 8/3  to remove power from windowing
568     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
569     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
570     # So our calulated power has and extra <w(t)**2> in it.
571     # For the Hann window,
572     #   <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
573     # For low frequency components,
574     # where the frequency of x(t) is ~= the frequency of w(t),
575     # the normalization is not perfect. ??
576     # The normalization approaches perfection as chunk_size -> infinity.
577     return (freq_axis, power)
578
579 def _test_unitary_avg_power_spectrum_sin(
580     sin_freq=10, samp_freq=512, samples=1024, chunk_size=512, overlap=True,
581     window=window_hann):
582     x = _numpy.zeros((samples,), dtype=_numpy.float)
583     samp_freq = _numpy.float(samp_freq)
584     for i in range(samples):
585         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
586     freq_axis, power = unitary_avg_power_spectrum(
587         x, samp_freq, chunk_size, overlap, window)
588     imax = _numpy.argmax(power)
589
590     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
591     df = samp_freq/_numpy.float(chunk_size) # df = 1/T, where T = total_time
592     i = int(sin_freq/df)
593     expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
594
595     print('The power should peak at {} Hz of {} ({}, {})'.format(
596         sin_freq, expected[i], freq_axis[imax], power[imax]))
597     Pexp = 0
598     P    = 0
599     for i in range(len(freq_axis)):
600         Pexp += expected[i] * df
601         P    += power[i] * df
602     print('The total power should be {} ({})'.format(Pexp, P))
603
604     if TEST_PLOTS:
605         figure = _pyplot.figure()
606         time_axes = figure.add_subplot(2, 1, 1)
607         time_axes.plot(
608             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
609         time_axes.set_title('time series')
610         freq_axes = figure.add_subplot(2, 1, 2)
611         freq_axes.plot(freq_axis, power, 'r.')
612         freq_axes.plot(freq_axis, expected, 'b-')
613         freq_axes.set_title(
614             '{} samples of sin at {} Hz'.format(samples, sin_freq))
615
616 def _test_unitary_avg_power_spectrum_sin_suite():
617     print('Test unitary avg power spectrums on variously shaped sin functions')
618     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
619     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
620     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
621     _test_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
622     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
623     # test long wavelenth sin, so be closer to window frequency
624     _test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
625     # finally, with some irrational numbers, to check that I'm not
626     # getting lucky
627     _test_unitary_avg_power_spectrum_sin(
628         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
629
630
631 def test():
632     _test_rfft_suite()
633     _test_unitary_rfft_parsevals_suite()
634     _test_unitary_rfft_rect_suite()
635     _test_unitary_rfft_gaussian_suite()
636     _test_unitary_power_spectrum_sin_suite()
637     _test_unitary_power_spectrum_delta_suite()
638     _test_unitary_power_spectrum_gaussian_suite()
639     _test_unitary_avg_power_spectrum_sin_suite()
640
641 if __name__ == '__main__':
642     from optparse import OptionParser
643
644     p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.')
645     p.add_option('-p', '--plot', dest='plot', action='store_true',
646                  help='Display time- and freq-space plots of test transforms.')
647
648     options,args = p.parse_args()
649
650     if options.plot:
651         import matplotlib.pyplot as _pyplot
652         TEST_PLOTS = True
653     test()
654     if options.plot:
655         _pyplot.show()