FFT_tools: wrap long function definition lines for PEP 8 compliance
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 numpy.fft.rfft.
19
20 Create some fake data:
21
22 >>> import numpy
23 >>> data = numpy.random.rand(10)
24 >>> frequency = 1
25
26 Main entry functions:
27
28 >>> rfft = unitary_rfft(data, freq=frequency)
29 >>> psd = power_spectrum(data, freq=frequency)
30 >>> upsd = unitary_power_spectrum(data, freq=frequency)
31 >>> apsd = avg_power_spectrum(data, freq=frequency, chunk_size=2048,
32 ...     overlap=True, window=window_hann)
33 >>> aupsd = unitary_avg_power_spectrum(data, freq=frequency, chunk_size=2048,
34 ...     overlap=True, window=window_hann)
35 """
36
37 import numpy as _numpy
38
39
40 __version__ = '0.3'
41
42 # Display time- and freq-space plots of the test transforms if True
43 TEST_PLOTS = False
44
45
46 def floor_pow_of_two(num):
47     "Round num down to the closest exact a power of two."
48     lnum = _numpy.log2(num)
49     if int(lnum) != lnum:
50         num = 2**_numpy.floor(lnum)
51     return num
52
53 def round_pow_of_two(num):
54     "Round num to the closest exact a power of two on a log scale."
55     lnum = _numpy.log2(num)
56     if int(lnum) != lnum:
57         num = 2**_numpy.round(lnum)
58     return num
59
60 def ceil_pow_of_two(num):
61     "Round num up to the closest exact a power of two."
62     lnum = _numpy.log2(num)
63     if int(lnum) != lnum:
64         num = 2**_numpy.ceil(lnum)
65     return num
66
67 def _test_rfft(xs, Xs):
68     # Numpy's FFT algoritm returns
69     #          n-1
70     #   X[k] = SUM x[m] exp (-j 2pi km /n)
71     #          m=0
72     # (see http://www.tramy.us/numpybook.pdf)
73     j = _numpy.complex(0,1)
74     n = len(xs)
75     Xa = []
76     for k in range(n):
77         Xa.append(sum([x*_numpy.exp(-j*2*_numpy.pi*k*m/n)
78                        for x,m in zip(xs,range(n))]))
79         if k < len(Xs):
80             if (Xs[k]-Xa[k])/_numpy.abs(Xa[k]) >= 1e-6:
81                 raise ValueError(
82                     ('rfft mismatch on element {}: {} != {}, relative error {}'
83                      ).format(
84                         k, Xs[k], Xa[k], (Xs[k]-Xa[k])/_numpy.abs(Xa[k])))
85     # Which should satisfy the discrete form of Parseval's theorem
86     #   n-1               n-1
87     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
88     #   m=0               k=0
89     timeSum = sum([_numpy.abs(x)**2 for x in xs])
90     freqSum = sum([_numpy.abs(X)**2 for X in Xa])
91     if _numpy.abs(freqSum/_numpy.float(n) - timeSum)/timeSum >= 1e-6:
92         raise ValueError(
93             "Mismatch on Parseval's, {} != 1/{} * {}".format(
94                 timeSum, n, freqSum))
95
96 def _test_rfft_suite():
97     print('Test numpy rfft definition')
98     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
99     _test_rfft(xs, _numpy.fft.rfft(xs))
100
101 def unitary_rfft(data, freq=1.0):
102     """Compute the Fourier transform of real data.
103
104     Unitary (preserves power [Parseval's theorem]).
105
106     If the units on your data are Volts,
107     and your sampling frequency is in Hz,
108     then freq_axis will be in Hz,
109     and trans will be in Volts.
110     """
111     nsamps = floor_pow_of_two(len(data))
112     # Which should satisfy the discrete form of Parseval's theorem
113     #   n-1               n-1
114     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
115     #   m=0               k=0
116     # However, we want our FFT to satisfy the continuous Parseval eqn
117     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
118     # which has the discrete form
119     #   n-1              n-1
120     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
121     #   m=0              k=0
122     # with X'_k = AX, this gives us
123     #   n-1                     n-1
124     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
125     #   m=0                     k=0
126     # so we see
127     #   A^2 df/dt = 1/n
128     #   A^2 = 1/n dt/df
129     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
130     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
131     # f_c in the transformed data is the Nyquist frequency (12.1.2)
132     #   f_c = 1/2dt
133     # and the points are spaced out by (12.1.5)
134     #   df = 1/ndt
135     # so
136     #   dt = 1/ndf
137     #   dt/df = 1/ndf^2
138     #   A^2 = 1/n^2df^2
139     #   A = 1/ndf = ndt/n = dt
140     # so we can convert the Numpy transformed data to match our unitary
141     # continuous transformed data with (also NR 12.1.8)
142     #   X'_k = dtX = X / <sampling freq>
143     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
144     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
145     return (freq_axis, trans)
146
147 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
148     # Which should satisfy the discretized integral form of Parseval's theorem
149     #   n-1              n-1
150     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
151     #   m=0              k=0
152     dt = 1.0/freq
153     df = freqs[1]-freqs[0]
154     if df - 1/(len(xs)*dt)/df >= 1e-6:
155         raise ValueError(
156             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
157     Xa = list(Xs)
158     for k in range(len(Xs)-1,1,-1):
159         Xa.append(Xa[k])
160     if len(xs) != len(Xa):
161         raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa)))
162     lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
163     rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
164     if _numpy.abs(lhs - rhs)/lhs >= 1e-4:
165         raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs))
166
167 def _test_unitary_rfft_parsevals_suite():
168     print("Test unitary rfft on Parseval's theorem")
169     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
170     dt = _numpy.pi
171     freqs,Xs = unitary_rfft(xs, 1.0/dt)
172     _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
173
174 def _rect(t):
175     if _numpy.abs(t) < 0.5:
176         return 1
177     else:
178         return 0
179
180 def _test_unitary_rfft_rect(
181     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
182     "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)"
183     samp_freq = _numpy.float(samp_freq)
184     a = _numpy.float(a)
185
186     x = _numpy.zeros((samples,), dtype=_numpy.float)
187     dt = 1.0/samp_freq
188     for i in range(samples):
189         t = i*dt
190         x[i] = _rect(a*(t-time_shift))
191     freq_axis, X = unitary_rfft(x, samp_freq)
192     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
193
194     # remove the phase due to our time shift
195     j = _numpy.complex(0.0,1.0) # sqrt(-1)
196     for i in range(len(freq_axis)):
197         f = freq_axis[i]
198         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
199         X[i] *= inverse_phase_shift
200
201     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
202     # normalized sinc(x) = sin(pi x)/(pi x)
203     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
204     if _numpy.sinc(0.5) != 2.0/_numpy.pi:
205         raise ValueError('abnormal sinc()')
206     for i in range(len(freq_axis)):
207         f = freq_axis[i]
208         expected[i] = 1.0/_numpy.abs(a) * _numpy.sinc(f/a)
209
210     if TEST_PLOTS:
211         figure = _pyplot.figure()
212         time_axes = figure.add_subplot(2, 1, 1)
213         time_axes.plot(_numpy.arange(0, dt*samples, dt), x)
214         time_axes.set_title('time series')
215         freq_axes = figure.add_subplot(2, 1, 2)
216         freq_axes.plot(freq_axis, X.real, 'r.')
217         freq_axes.plot(freq_axis, X.imag, 'g.')
218         freq_axes.plot(freq_axis, expected, 'b-')
219         freq_axes.set_title('freq series')
220
221 def _test_unitary_rfft_rect_suite():
222     print('Test unitary FFTs on variously shaped rectangular functions')
223     _test_unitary_rfft_rect(a=0.5)
224     _test_unitary_rfft_rect(a=2.0)
225     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
226     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
227
228 def _gaussian(a, t):
229     return _numpy.exp(-a * t**2)
230
231 def _test_unitary_rfft_gaussian(
232     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
233     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
234     samp_freq = _numpy.float(samp_freq)
235     a = _numpy.float(a)
236
237     x = _numpy.zeros((samples,), dtype=_numpy.float)
238     dt = 1.0/samp_freq
239     for i in range(samples):
240         t = i*dt
241         x[i] = _gaussian(a, (t-time_shift))
242     freq_axis, X = unitary_rfft(x, samp_freq)
243     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
244
245     # remove the phase due to our time shift
246     j = _numpy.complex(0.0,1.0) # sqrt(-1)
247     for i in range(len(freq_axis)):
248         f = freq_axis[i]
249         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
250         X[i] *= inverse_phase_shift
251
252     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
253     for i in range(len(freq_axis)):
254         f = freq_axis[i]
255         # see Wikipedia, or do the integral yourself.
256         expected[i] = _numpy.sqrt(_numpy.pi/a) * _gaussian(
257             1.0/a, _numpy.pi*f)
258
259     if TEST_PLOTS:
260         figure = _pyplot.figure()
261         time_axes = figure.add_subplot(2, 1, 1)
262         time_axes.plot(_numpy.arange(0, dt*samples, dt), x)
263         time_axes.set_title('time series')
264         freq_axes = figure.add_subplot(2, 1, 2)
265         freq_axes.plot(freq_axis, X.real, 'r.')
266         freq_axes.plot(freq_axis, X.imag, 'g.')
267         freq_axes.plot(freq_axis, expected, 'b-')
268         freq_axes.set_title('freq series')
269
270 def _test_unitary_rfft_gaussian_suite():
271     print("Test unitary FFTs on variously shaped gaussian functions")
272     _test_unitary_rfft_gaussian(a=0.5)
273     _test_unitary_rfft_gaussian(a=2.0)
274     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
275     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
276
277
278
279 def power_spectrum(data, freq=1.0):
280     """Compute the power spectrum of DATA taken with a sampling frequency FREQ.
281
282     DATA must be real (not complex).
283     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
284     If the number of samples in data is not an integer power of two,
285     the FFT ignores some of the later points.
286     """
287     nsamps = floor_pow_of_two(len(data))
288
289     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
290     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
291     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
292     # See Numerical Recipies for a details.
293     trans = _numpy.fft.rfft(data[0:nsamps])
294     power = (trans * trans.conj()).real # We want the square of the amplitude.
295     return (freq_axis, power)
296
297 def unitary_power_spectrum(data, freq=1.0):
298     freq_axis,power = power_spectrum(data, freq)
299     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
300     #
301     # numpy normalizes with 1/N on the inverse transform ifft,
302     # so we should normalize the freq-space representation with 1/sqrt(N).
303     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
304     # So the power gets normalized by that twice and we have 2/N
305     #
306     # On top of this, the FFT assumes a sampling freq of 1 per second,
307     # and we want to preserve area under our curves.
308     # If our total time T = len(data)/freq is smaller than 1,
309     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)),
310     # and we need to scale the powers down to conserve area.
311     # df_fft * F_fft(f) = df_real *F_real(f)
312     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
313     # So the power gets normalized by *that* twice and we have 2/N * freq**2
314
315     # power per unit time
316     # measure x(t) for time T
317     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
318     # PSD(f) = 2 |X(f)|**2 / T
319
320     # total_time = len(data)/float(freq)
321     # power *= 2.0 / float(freq)**2   /   total_time
322     # power *= 2.0 / freq**2   *   freq / len(data)
323     power *= 2.0 / (freq * _numpy.float(len(data)))
324
325     return (freq_axis, power)
326
327 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024):
328     x = _numpy.zeros((samples,), dtype=_numpy.float)
329     samp_freq = _numpy.float(samp_freq)
330     for i in range(samples):
331         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
332     freq_axis, power = unitary_power_spectrum(x, samp_freq)
333     imax = _numpy.argmax(power)
334
335     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
336     df = samp_freq/_numpy.float(samples) # df = 1/T, where T = total_time
337     i = int(sin_freq/df)
338     # average power per unit time is
339     #  P = <x(t)**2>
340     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
341     # so average value of (int sin(t)**2 dt) per unit time is 0.5
342     #  P = 0.5
343     # we spread that power over a frequency bin of width df, sp
344     #  P(f0) = 0.5/df
345     # where f0 is the sin's frequency
346     #
347     # or:
348     # FFT of sin(2*pi*t*f0) gives
349     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
350     # (area under x(t) = 0, area under X(f) = 0)
351     # so one sided power spectral density (PSD) per unit time is
352     #  P(f) = 2 |X(f)|**2 / T
353     #       = 2 * |0.5 delta(f-f0)|**2 / T
354     #       = 0.5 * |delta(f-f0)|**2 / T
355     # but we're discrete and want the integral of the 'delta' to be 1,
356     # so 'delta'*df = 1  --> 'delta' = 1/df, and
357     #  P(f) = 0.5 / (df**2 * T)
358     #       = 0.5 / df                (T = 1/df)
359     expected[i] = 0.5 / df
360
361     print('The power should be a peak at {} Hz of {} ({}, {})'.format(
362             sin_freq, expected[i], freq_axis[imax], power[imax]))
363     Pexp = 0
364     P    = 0
365     for i in range(len(freq_axis)):
366         Pexp += expected[i] *df
367         P    += power[i] * df
368     print('The total power should be {} ({})'.format(Pexp, P))
369
370     if TEST_PLOTS:
371         figure = _pyplot.figure()
372         time_axes = figure.add_subplot(2, 1, 1)
373         time_axes.plot(
374             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
375         time_axes.set_title('time series')
376         freq_axes = figure.add_subplot(2, 1, 2)
377         freq_axes.plot(freq_axis, power, 'r.')
378         freq_axes.plot(freq_axis, expected, 'b-')
379         freq_axes.set_title(
380             '{} samples of sin at {} Hz'.format(samples, sin_freq))
381
382 def _test_unitary_power_spectrum_sin_suite():
383     print('Test unitary power spectrums on variously shaped sin functions')
384     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
385     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
386     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
387     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
388     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
389     # finally, with some irrational numbers, to check that I'm not getting lucky
390     _test_unitary_power_spectrum_sin(
391         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
392     # test with non-integer number of periods
393     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
394
395 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256):
396     x = _numpy.zeros((samples,), dtype=_numpy.float)
397     samp_freq = _numpy.float(samp_freq)
398     x[0] = amp
399     freq_axis, power = unitary_power_spectrum(x, samp_freq)
400
401     # power = <x(t)**2> = (amp)**2 * dt/T
402     # we spread that power over the entire freq_axis [0,fN], so
403     #  P(f)  = (amp)**2 dt / (T fN)
404     # where
405     #  dt = 1/samp_freq        (sample period)
406     #  T  = samples/samp_freq  (total time of data aquisition)
407     #  fN = 0.5 samp_freq      (Nyquist frequency)
408     # so
409     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
410     #       = 2 amp**2 / (samp_freq*samples)
411     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
412     expected = _numpy.ones(
413         (len(freq_axis),), dtype=_numpy.float) * expected_amp
414
415     print('The power should be flat at y = {} ({})'.format(
416         expected_amp, power[0]))
417
418     if TEST_PLOTS:
419         figure = _pyplot.figure()
420         time_axes = figure.add_subplot(2, 1, 1)
421         time_axes.plot(
422             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
423         time_axes.set_title('time series')
424         freq_axes = figure.add_subplot(2, 1, 2)
425         freq_axes.plot(freq_axis, power, 'r.')
426         freq_axes.plot(freq_axis, expected, 'b-')
427         freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
428
429 def _test_unitary_power_spectrum_delta_suite():
430     print('Test unitary power spectrums on various delta functions')
431     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
432     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
433     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
434     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
435     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
436     _test_unitary_power_spectrum_delta(
437         amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
438
439 def _gaussian2(area, mean, std, t):
440     "Integral over all time = area (i.e. normalized for area=1)"
441     return area/(std*_numpy.sqrt(2.0*_numpy.pi)) * _numpy.exp(
442         -0.5*((t-mean)/std)**2)
443
444 def _test_unitary_power_spectrum_gaussian(
445     area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512):
446     x = _numpy.zeros((samples,), dtype=_numpy.float)
447     mean = _numpy.float(mean)
448     for i in range(samples):
449         t = i/_numpy.float(samp_freq)
450         x[i] = _gaussian2(area, mean, std, t)
451     freq_axis, power = unitary_power_spectrum(x, samp_freq)
452
453     # generate the predicted curve
454     # by comparing our _gaussian2() form to _gaussian(),
455     # we see that the Fourier transform of x(t) has parameters:
456     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
457     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
458     #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
459     # So our power spectral density per unit time is given by
460     #  P(f) = 2 |X(f)|**2 / T
461     # Where
462     #  T  = samples/samp_freq  (total time of data aquisition)
463     mean = 0.0
464     area = area /(std*_numpy.sqrt(2.0*_numpy.pi))
465     std = 1.0/(2.0*_numpy.pi*std)
466     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
467     # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
468     df = _numpy.float(samp_freq)/samples
469     for i in range(len(freq_axis)):
470         f = i*df
471         gaus = _gaussian2(area, mean, std, f)
472         expected[i] = 2.0 * gaus**2 * samp_freq/samples
473     print(('The power should be a half-gaussian, '
474            'with a peak at 0 Hz with amplitude {} ({})').format(
475             expected[0], power[0]))
476
477     if TEST_PLOTS:
478         figure = _pyplot.figure()
479         time_axes = figure.add_subplot(2, 1, 1)
480         time_axes.plot(
481             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
482         time_axes.set_title('time series')
483         freq_axes = figure.add_subplot(2, 1, 2)
484         freq_axes.plot(freq_axis, power, 'r.')
485         freq_axes.plot(freq_axis, expected, 'b-')
486         freq_axes.set_title('freq series')
487
488 def _test_unitary_power_spectrum_gaussian_suite():
489     print('Test unitary power spectrums on various gaussian functions')
490     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=1024)
491     _test_unitary_power_spectrum_gaussian(area=1, std=2, samp_freq=10.0, samples=1024)
492     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048)
493     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048)
494     _test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024)
495     _test_unitary_power_spectrum_gaussian(
496         area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1),
497         samples=1024)
498
499 def window_hann(length):
500     "Returns a Hann window array with length entries"
501     win = _numpy.zeros((length,), dtype=_numpy.float)
502     for i in range(length):
503         win[i] = 0.5*(1.0-_numpy.cos(2.0*_numpy.pi*_numpy.float(i)/(length)))
504     # avg value of cos over a period is 0
505     # so average height of Hann window is 0.5
506     return win
507
508 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
509                        overlap=True, window=window_hann):
510     """Compute the avgerage power spectrum of DATA.
511
512     Taken with a sampling frequency FREQ.
513
514     DATA must be real (not complex) by breaking DATA into chunks.
515     The chunks may or may not be overlapping (by setting OVERLAP).
516     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
517     and the resulting spectra are averaged together.
518     See NR 13.4 for rational.
519
520     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
521     CHUNK_SIZE should really be a power of 2.
522     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
523     the FFT ignores some of the later points.
524     """
525     if chunk_size != floor_pow_of_two(chunk_size):
526         raise ValueError(
527             'chunk_size {} should be a power of 2'.format(chunk_size))
528
529     nchunks = len(data)/chunk_size # integer division = implicit floor
530     if overlap:
531         chunk_step = chunk_size/2
532     else:
533         chunk_step = chunk_size
534
535     win = window(chunk_size) # generate a window of the appropriate size
536     freq_axis = _numpy.linspace(0, freq/2, chunk_size/2+1)
537     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
538     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
539     # See Numerical Recipies for a details.
540     power = _numpy.zeros((chunk_size/2+1,), dtype=_numpy.float)
541     for i in range(nchunks):
542         starti = i*chunk_step
543         stopi = starti+chunk_size
544         fft_chunk = _numpy.fft.rfft(data[starti:stopi]*win)
545         p_chunk = (fft_chunk * fft_chunk.conj()).real
546         power += p_chunk.astype(_numpy.float)
547     power /= _numpy.float(nchunks)
548     return (freq_axis, power)
549
550 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
551                                overlap=True, window=window_hann):
552     """Compute the average power spectrum, preserving normalization
553     """
554     freq_axis,power = avg_power_spectrum(
555         data, freq, chunk_size, overlap, window)
556     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
557     # see unitary_power_spectrum()
558     power *= 2.0 / (freq*_numpy.float(chunk_size)) * 8/3
559     #                                       * 8/3  to remove power from windowing
560     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
561     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
562     # So our calulated power has and extra <w(t)**2> in it.
563     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
564     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
565     # The normalization is not perfect. ??
566     # The normalization approaches perfection as chunk_size -> infinity.
567     return (freq_axis, power)
568
569 def _test_unitary_avg_power_spectrum_sin(
570     sin_freq=10, samp_freq=512, samples=1024, chunk_size=512, overlap=True,
571     window=window_hann):
572     x = _numpy.zeros((samples,), dtype=_numpy.float)
573     samp_freq = _numpy.float(samp_freq)
574     for i in range(samples):
575         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
576     freq_axis, power = unitary_avg_power_spectrum(
577         x, samp_freq, chunk_size, overlap, window)
578     imax = _numpy.argmax(power)
579
580     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
581     df = samp_freq/_numpy.float(chunk_size) # df = 1/T, where T = total_time
582     i = int(sin_freq/df)
583     expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
584
585     print('The power should peak at {} Hz of {} ({}, {})'.format(
586         sin_freq, expected[i], freq_axis[imax], power[imax]))
587     Pexp = 0
588     P    = 0
589     for i in range(len(freq_axis)):
590         Pexp += expected[i] * df
591         P    += power[i] * df
592     print('The total power should be {} ({})'.format(Pexp, P))
593
594     if TEST_PLOTS:
595         figure = _pyplot.figure()
596         time_axes = figure.add_subplot(2, 1, 1)
597         time_axes.plot(
598             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
599         time_axes.set_title('time series')
600         freq_axes = figure.add_subplot(2, 1, 2)
601         freq_axes.plot(freq_axis, power, 'r.')
602         freq_axes.plot(freq_axis, expected, 'b-')
603         freq_axes.set_title(
604             '{} samples of sin at {} Hz'.format(samples, sin_freq))
605
606 def _test_unitary_avg_power_spectrum_sin_suite():
607     print('Test unitary avg power spectrums on variously shaped sin functions')
608     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
609     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
610     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
611     _test_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
612     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
613     # test long wavelenth sin, so be closer to window frequency
614     _test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
615     # finally, with some irrational numbers, to check that I'm not getting lucky
616     _test_unitary_avg_power_spectrum_sin(
617         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
618
619
620 def test():
621     _test_rfft_suite()
622     _test_unitary_rfft_parsevals_suite()
623     _test_unitary_rfft_rect_suite()
624     _test_unitary_rfft_gaussian_suite()
625     _test_unitary_power_spectrum_sin_suite()
626     _test_unitary_power_spectrum_delta_suite()
627     _test_unitary_power_spectrum_gaussian_suite()
628     _test_unitary_avg_power_spectrum_sin_suite()
629
630 if __name__ == '__main__':
631     from optparse import OptionParser
632
633     p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.')
634     p.add_option('-p', '--plot', dest='plot', action='store_true',
635                  help='Display time- and freq-space plots of test transforms.')
636
637     options,args = p.parse_args()
638
639     if options.plot:
640         import matplotlib.pyplot as _pyplot
641         TEST_PLOTS = True
642     test()
643     if options.plot:
644         _pyplot.show()