FFT_tools: two spaces before inline comments (PEP8)
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 numpy.fft.rfft.
19
20 Create some fake data:
21
22 >>> import numpy
23 >>> data = numpy.random.rand(10)
24 >>> frequency = 1
25
26 Main entry functions:
27
28 >>> rfft = unitary_rfft(data, freq=frequency)
29 >>> psd = power_spectrum(data, freq=frequency)
30 >>> upsd = unitary_power_spectrum(data, freq=frequency)
31 >>> apsd = avg_power_spectrum(data, freq=frequency, chunk_size=2048,
32 ...     overlap=True, window=window_hann)
33 >>> aupsd = unitary_avg_power_spectrum(data, freq=frequency, chunk_size=2048,
34 ...     overlap=True, window=window_hann)
35 """
36
37 import numpy as _numpy
38
39
40 __version__ = '0.3'
41
42 # Display time- and freq-space plots of the test transforms if True
43 TEST_PLOTS = False
44
45
46 def floor_pow_of_two(num):
47     "Round num down to the closest exact a power of two."
48     lnum = _numpy.log2(num)
49     if int(lnum) != lnum:
50         num = 2**_numpy.floor(lnum)
51     return num
52
53
54 def round_pow_of_two(num):
55     "Round num to the closest exact a power of two on a log scale."
56     lnum = _numpy.log2(num)
57     if int(lnum) != lnum:
58         num = 2**_numpy.round(lnum)
59     return num
60
61
62 def ceil_pow_of_two(num):
63     "Round num up to the closest exact a power of two."
64     lnum = _numpy.log2(num)
65     if int(lnum) != lnum:
66         num = 2**_numpy.ceil(lnum)
67     return num
68
69
70 def _test_rfft(xs, Xs):
71     # Numpy's FFT algoritm returns
72     #          n-1
73     #   X[k] = SUM x[m] exp (-j 2pi km /n)
74     #          m=0
75     # (see http://www.tramy.us/numpybook.pdf)
76     j = _numpy.complex(0,1)
77     n = len(xs)
78     Xa = []
79     for k in range(n):
80         Xa.append(sum([x*_numpy.exp(-j*2*_numpy.pi*k*m/n)
81                        for x,m in zip(xs,range(n))]))
82         if k < len(Xs):
83             if (Xs[k]-Xa[k])/_numpy.abs(Xa[k]) >= 1e-6:
84                 raise ValueError(
85                     ('rfft mismatch on element {}: {} != {}, relative error {}'
86                      ).format(
87                         k, Xs[k], Xa[k], (Xs[k]-Xa[k])/_numpy.abs(Xa[k])))
88     # Which should satisfy the discrete form of Parseval's theorem
89     #   n-1               n-1
90     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
91     #   m=0               k=0
92     timeSum = sum([_numpy.abs(x)**2 for x in xs])
93     freqSum = sum([_numpy.abs(X)**2 for X in Xa])
94     if _numpy.abs(freqSum/_numpy.float(n) - timeSum)/timeSum >= 1e-6:
95         raise ValueError(
96             "Mismatch on Parseval's, {} != 1/{} * {}".format(
97                 timeSum, n, freqSum))
98
99
100 def _test_rfft_suite():
101     print('Test numpy rfft definition')
102     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
103     _test_rfft(xs, _numpy.fft.rfft(xs))
104
105
106 def unitary_rfft(data, freq=1.0):
107     """Compute the Fourier transform of real data.
108
109     Unitary (preserves power [Parseval's theorem]).
110
111     If the units on your data are Volts,
112     and your sampling frequency is in Hz,
113     then freq_axis will be in Hz,
114     and trans will be in Volts.
115     """
116     nsamps = floor_pow_of_two(len(data))
117     # Which should satisfy the discrete form of Parseval's theorem
118     #   n-1               n-1
119     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
120     #   m=0               k=0
121     # However, we want our FFT to satisfy the continuous Parseval eqn
122     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
123     # which has the discrete form
124     #   n-1              n-1
125     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
126     #   m=0              k=0
127     # with X'_k = AX, this gives us
128     #   n-1                     n-1
129     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
130     #   m=0                     k=0
131     # so we see
132     #   A^2 df/dt = 1/n
133     #   A^2 = 1/n dt/df
134     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
135     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
136     # f_c in the transformed data is the Nyquist frequency (12.1.2)
137     #   f_c = 1/2dt
138     # and the points are spaced out by (12.1.5)
139     #   df = 1/ndt
140     # so
141     #   dt = 1/ndf
142     #   dt/df = 1/ndf^2
143     #   A^2 = 1/n^2df^2
144     #   A = 1/ndf = ndt/n = dt
145     # so we can convert the Numpy transformed data to match our unitary
146     # continuous transformed data with (also NR 12.1.8)
147     #   X'_k = dtX = X / <sampling freq>
148     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
149     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
150     return (freq_axis, trans)
151
152
153 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
154     # Which should satisfy the discretized integral form of Parseval's theorem
155     #   n-1              n-1
156     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
157     #   m=0              k=0
158     dt = 1.0/freq
159     df = freqs[1]-freqs[0]
160     if df - 1/(len(xs)*dt)/df >= 1e-6:
161         raise ValueError(
162             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
163     Xa = list(Xs)
164     for k in range(len(Xs)-1,1,-1):
165         Xa.append(Xa[k])
166     if len(xs) != len(Xa):
167         raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa)))
168     lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
169     rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
170     if _numpy.abs(lhs - rhs)/lhs >= 1e-4:
171         raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs))
172
173
174 def _test_unitary_rfft_parsevals_suite():
175     print("Test unitary rfft on Parseval's theorem")
176     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
177     dt = _numpy.pi
178     freqs,Xs = unitary_rfft(xs, 1.0/dt)
179     _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
180
181
182 def _rect(t):
183     if _numpy.abs(t) < 0.5:
184         return 1
185     else:
186         return 0
187
188
189 def _test_unitary_rfft_rect(
190     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
191     "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)"
192     samp_freq = _numpy.float(samp_freq)
193     a = _numpy.float(a)
194
195     x = _numpy.zeros((samples,), dtype=_numpy.float)
196     dt = 1.0/samp_freq
197     for i in range(samples):
198         t = i*dt
199         x[i] = _rect(a*(t-time_shift))
200     freq_axis, X = unitary_rfft(x, samp_freq)
201     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
202
203     # remove the phase due to our time shift
204     j = _numpy.complex(0.0,1.0)  # sqrt(-1)
205     for i in range(len(freq_axis)):
206         f = freq_axis[i]
207         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
208         X[i] *= inverse_phase_shift
209
210     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
211     # normalized sinc(x) = sin(pi x)/(pi x)
212     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
213     if _numpy.sinc(0.5) != 2.0/_numpy.pi:
214         raise ValueError('abnormal sinc()')
215     for i in range(len(freq_axis)):
216         f = freq_axis[i]
217         expected[i] = 1.0/_numpy.abs(a) * _numpy.sinc(f/a)
218
219     if TEST_PLOTS:
220         figure = _pyplot.figure()
221         time_axes = figure.add_subplot(2, 1, 1)
222         time_axes.plot(_numpy.arange(0, dt*samples, dt), x)
223         time_axes.set_title('time series')
224         freq_axes = figure.add_subplot(2, 1, 2)
225         freq_axes.plot(freq_axis, X.real, 'r.')
226         freq_axes.plot(freq_axis, X.imag, 'g.')
227         freq_axes.plot(freq_axis, expected, 'b-')
228         freq_axes.set_title('freq series')
229
230
231 def _test_unitary_rfft_rect_suite():
232     print('Test unitary FFTs on variously shaped rectangular functions')
233     _test_unitary_rfft_rect(a=0.5)
234     _test_unitary_rfft_rect(a=2.0)
235     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
236     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
237
238
239 def _gaussian(a, t):
240     return _numpy.exp(-a * t**2)
241
242
243 def _test_unitary_rfft_gaussian(
244     a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
245     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
246     samp_freq = _numpy.float(samp_freq)
247     a = _numpy.float(a)
248
249     x = _numpy.zeros((samples,), dtype=_numpy.float)
250     dt = 1.0/samp_freq
251     for i in range(samples):
252         t = i*dt
253         x[i] = _gaussian(a, (t-time_shift))
254     freq_axis, X = unitary_rfft(x, samp_freq)
255     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
256
257     # remove the phase due to our time shift
258     j = _numpy.complex(0.0,1.0)  # sqrt(-1)
259     for i in range(len(freq_axis)):
260         f = freq_axis[i]
261         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
262         X[i] *= inverse_phase_shift
263
264     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
265     for i in range(len(freq_axis)):
266         f = freq_axis[i]
267         # see Wikipedia, or do the integral yourself.
268         expected[i] = _numpy.sqrt(_numpy.pi/a) * _gaussian(
269             1.0/a, _numpy.pi*f)
270
271     if TEST_PLOTS:
272         figure = _pyplot.figure()
273         time_axes = figure.add_subplot(2, 1, 1)
274         time_axes.plot(_numpy.arange(0, dt*samples, dt), x)
275         time_axes.set_title('time series')
276         freq_axes = figure.add_subplot(2, 1, 2)
277         freq_axes.plot(freq_axis, X.real, 'r.')
278         freq_axes.plot(freq_axis, X.imag, 'g.')
279         freq_axes.plot(freq_axis, expected, 'b-')
280         freq_axes.set_title('freq series')
281
282
283 def _test_unitary_rfft_gaussian_suite():
284     print("Test unitary FFTs on variously shaped gaussian functions")
285     _test_unitary_rfft_gaussian(a=0.5)
286     _test_unitary_rfft_gaussian(a=2.0)
287     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
288     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
289
290
291 def power_spectrum(data, freq=1.0):
292     """Compute the power spectrum of DATA taken with a sampling frequency FREQ.
293
294     DATA must be real (not complex).
295     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
296     If the number of samples in data is not an integer power of two,
297     the FFT ignores some of the later points.
298     """
299     nsamps = floor_pow_of_two(len(data))
300
301     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
302     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
303     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
304     # See Numerical Recipies for a details.
305     trans = _numpy.fft.rfft(data[0:nsamps])
306     power = (trans * trans.conj()).real  # we want the square of the amplitude
307     return (freq_axis, power)
308
309
310 def unitary_power_spectrum(data, freq=1.0):
311     freq_axis,power = power_spectrum(data, freq)
312     # One sided power spectral density, so 2|H(f)|**2
313     # (see NR 2nd edition 12.0.14, p498)
314     #
315     # numpy normalizes with 1/N on the inverse transform ifft,
316     # so we should normalize the freq-space representation with 1/sqrt(N).
317     # But we're using the rfft, where N points are like N/2 complex points,
318     # so 1/sqrt(N/2)
319     # So the power gets normalized by that twice and we have 2/N
320     #
321     # On top of this, the FFT assumes a sampling freq of 1 per second,
322     # and we want to preserve area under our curves.
323     # If our total time T = len(data)/freq is smaller than 1,
324     # our df_real = freq/len(data) is bigger that the FFT expects
325     # (dt_fft = 1/len(data)),
326     # and we need to scale the powers down to conserve area.
327     # df_fft * F_fft(f) = df_real *F_real(f)
328     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
329     # So the power gets normalized by *that* twice and we have 2/N * freq**2
330
331     # power per unit time
332     # measure x(t) for time T
333     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
334     # PSD(f) = 2 |X(f)|**2 / T
335
336     # total_time = len(data)/float(freq)
337     # power *= 2.0 / float(freq)**2   /   total_time
338     # power *= 2.0 / freq**2   *   freq / len(data)
339     power *= 2.0 / (freq * _numpy.float(len(data)))
340
341     return (freq_axis, power)
342
343
344 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024):
345     x = _numpy.zeros((samples,), dtype=_numpy.float)
346     samp_freq = _numpy.float(samp_freq)
347     for i in range(samples):
348         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
349     freq_axis, power = unitary_power_spectrum(x, samp_freq)
350     imax = _numpy.argmax(power)
351
352     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
353     df = samp_freq/_numpy.float(samples)  # df = 1/T, where T = total_time
354     i = int(sin_freq/df)
355     # average power per unit time is
356     #  P = <x(t)**2>
357     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
358     # so average value of (int sin(t)**2 dt) per unit time is 0.5
359     #  P = 0.5
360     # we spread that power over a frequency bin of width df, sp
361     #  P(f0) = 0.5/df
362     # where f0 is the sin's frequency
363     #
364     # or:
365     # FFT of sin(2*pi*t*f0) gives
366     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
367     # (area under x(t) = 0, area under X(f) = 0)
368     # so one sided power spectral density (PSD) per unit time is
369     #  P(f) = 2 |X(f)|**2 / T
370     #       = 2 * |0.5 delta(f-f0)|**2 / T
371     #       = 0.5 * |delta(f-f0)|**2 / T
372     # but we're discrete and want the integral of the 'delta' to be 1,
373     # so 'delta'*df = 1  --> 'delta' = 1/df, and
374     #  P(f) = 0.5 / (df**2 * T)
375     #       = 0.5 / df                (T = 1/df)
376     expected[i] = 0.5 / df
377
378     print('The power should be a peak at {} Hz of {} ({}, {})'.format(
379             sin_freq, expected[i], freq_axis[imax], power[imax]))
380     Pexp = 0
381     P    = 0
382     for i in range(len(freq_axis)):
383         Pexp += expected[i] *df
384         P    += power[i] * df
385     print('The total power should be {} ({})'.format(Pexp, P))
386
387     if TEST_PLOTS:
388         figure = _pyplot.figure()
389         time_axes = figure.add_subplot(2, 1, 1)
390         time_axes.plot(
391             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
392         time_axes.set_title('time series')
393         freq_axes = figure.add_subplot(2, 1, 2)
394         freq_axes.plot(freq_axis, power, 'r.')
395         freq_axes.plot(freq_axis, expected, 'b-')
396         freq_axes.set_title(
397             '{} samples of sin at {} Hz'.format(samples, sin_freq))
398
399
400 def _test_unitary_power_spectrum_sin_suite():
401     print('Test unitary power spectrums on variously shaped sin functions')
402     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
403     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
404     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
405     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
406     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
407     # with some irrational numbers, to check that I'm not getting lucky
408     _test_unitary_power_spectrum_sin(
409         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
410     # test with non-integer number of periods
411     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
412
413
414 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256):
415     x = _numpy.zeros((samples,), dtype=_numpy.float)
416     samp_freq = _numpy.float(samp_freq)
417     x[0] = amp
418     freq_axis, power = unitary_power_spectrum(x, samp_freq)
419
420     # power = <x(t)**2> = (amp)**2 * dt/T
421     # we spread that power over the entire freq_axis [0,fN], so
422     #  P(f)  = (amp)**2 dt / (T fN)
423     # where
424     #  dt = 1/samp_freq        (sample period)
425     #  T  = samples/samp_freq  (total time of data aquisition)
426     #  fN = 0.5 samp_freq      (Nyquist frequency)
427     # so
428     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
429     #       = 2 amp**2 / (samp_freq*samples)
430     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
431     expected = _numpy.ones(
432         (len(freq_axis),), dtype=_numpy.float) * expected_amp
433
434     print('The power should be flat at y = {} ({})'.format(
435         expected_amp, power[0]))
436
437     if TEST_PLOTS:
438         figure = _pyplot.figure()
439         time_axes = figure.add_subplot(2, 1, 1)
440         time_axes.plot(
441             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
442         time_axes.set_title('time series')
443         freq_axes = figure.add_subplot(2, 1, 2)
444         freq_axes.plot(freq_axis, power, 'r.')
445         freq_axes.plot(freq_axis, expected, 'b-')
446         freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
447
448
449 def _test_unitary_power_spectrum_delta_suite():
450     print('Test unitary power spectrums on various delta functions')
451     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
452     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
453     # expected = 2*computed
454     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)
455     # expected = 0.5*computed
456     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)
457     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
458     _test_unitary_power_spectrum_delta(
459         amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
460
461
462 def _gaussian2(area, mean, std, t):
463     "Integral over all time = area (i.e. normalized for area=1)"
464     return area/(std*_numpy.sqrt(2.0*_numpy.pi)) * _numpy.exp(
465         -0.5*((t-mean)/std)**2)
466
467
468 def _test_unitary_power_spectrum_gaussian(
469     area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512):
470     x = _numpy.zeros((samples,), dtype=_numpy.float)
471     mean = _numpy.float(mean)
472     for i in range(samples):
473         t = i/_numpy.float(samp_freq)
474         x[i] = _gaussian2(area, mean, std, t)
475     freq_axis, power = unitary_power_spectrum(x, samp_freq)
476
477     # generate the predicted curve
478     # by comparing our _gaussian2() form to _gaussian(),
479     # we see that the Fourier transform of x(t) has parameters:
480     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are
481     #                           converting to angular frequency,
482     #                           not frequency like we are)
483     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
484     #  mean' = 0               (changing the mean in the time-domain just
485     #                           changes the phase in the freq-domain)
486     # So our power spectral density per unit time is given by
487     #  P(f) = 2 |X(f)|**2 / T
488     # Where
489     #  T  = samples/samp_freq  (total time of data aquisition)
490     mean = 0.0
491     area = area /(std*_numpy.sqrt(2.0*_numpy.pi))
492     std = 1.0/(2.0*_numpy.pi*std)
493     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
494     # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
495     df = _numpy.float(samp_freq)/samples
496     for i in range(len(freq_axis)):
497         f = i*df
498         gaus = _gaussian2(area, mean, std, f)
499         expected[i] = 2.0 * gaus**2 * samp_freq/samples
500     print(('The power should be a half-gaussian, '
501            'with a peak at 0 Hz with amplitude {} ({})').format(
502             expected[0], power[0]))
503
504     if TEST_PLOTS:
505         figure = _pyplot.figure()
506         time_axes = figure.add_subplot(2, 1, 1)
507         time_axes.plot(
508             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
509         time_axes.set_title('time series')
510         freq_axes = figure.add_subplot(2, 1, 2)
511         freq_axes.plot(freq_axis, power, 'r.')
512         freq_axes.plot(freq_axis, expected, 'b-')
513         freq_axes.set_title('freq series')
514
515
516 def _test_unitary_power_spectrum_gaussian_suite():
517     print('Test unitary power spectrums on various gaussian functions')
518     _test_unitary_power_spectrum_gaussian(
519         area=1, std=1, samp_freq=10.0, samples=1024)
520     _test_unitary_power_spectrum_gaussian(
521         area=1, std=2, samp_freq=10.0, samples=1024)
522     _test_unitary_power_spectrum_gaussian(
523         area=1, std=1, samp_freq=10.0, samples=2048)
524     _test_unitary_power_spectrum_gaussian(
525         area=1, std=1, samp_freq=20.0, samples=2048)
526     _test_unitary_power_spectrum_gaussian(
527         area=3, std=1, samp_freq=10.0, samples=1024)
528     _test_unitary_power_spectrum_gaussian(
529         area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1),
530         samples=1024)
531
532
533 def window_hann(length):
534     "Returns a Hann window array with length entries"
535     win = _numpy.zeros((length,), dtype=_numpy.float)
536     for i in range(length):
537         win[i] = 0.5*(1.0-_numpy.cos(2.0*_numpy.pi*_numpy.float(i)/(length)))
538     # avg value of cos over a period is 0
539     # so average height of Hann window is 0.5
540     return win
541
542
543 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
544                        overlap=True, window=window_hann):
545     """Compute the avgerage power spectrum of DATA.
546
547     Taken with a sampling frequency FREQ.
548
549     DATA must be real (not complex) by breaking DATA into chunks.
550     The chunks may or may not be overlapping (by setting OVERLAP).
551     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
552     and the resulting spectra are averaged together.
553     See NR 13.4 for rational.
554
555     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
556     CHUNK_SIZE should really be a power of 2.
557     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
558     the FFT ignores some of the later points.
559     """
560     if chunk_size != floor_pow_of_two(chunk_size):
561         raise ValueError(
562             'chunk_size {} should be a power of 2'.format(chunk_size))
563
564     nchunks = len(data)/chunk_size  # integer division = implicit floor
565     if overlap:
566         chunk_step = chunk_size/2
567     else:
568         chunk_step = chunk_size
569
570     win = window(chunk_size)  # generate a window of the appropriate size
571     freq_axis = _numpy.linspace(0, freq/2, chunk_size/2+1)
572     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
573     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
574     # See Numerical Recipies for a details.
575     power = _numpy.zeros((chunk_size/2+1,), dtype=_numpy.float)
576     for i in range(nchunks):
577         starti = i*chunk_step
578         stopi = starti+chunk_size
579         fft_chunk = _numpy.fft.rfft(data[starti:stopi]*win)
580         p_chunk = (fft_chunk * fft_chunk.conj()).real
581         power += p_chunk.astype(_numpy.float)
582     power /= _numpy.float(nchunks)
583     return (freq_axis, power)
584
585
586 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
587                                overlap=True, window=window_hann):
588     """Compute the average power spectrum, preserving normalization
589     """
590     freq_axis,power = avg_power_spectrum(
591         data, freq, chunk_size, overlap, window)
592     #   2.0 / (freq * chunk_size)       |rfft()|**2 --> unitary_power_spectrum
593     # see unitary_power_spectrum()
594     power *= 2.0 / (freq*_numpy.float(chunk_size)) * 8/3
595     #             * 8/3  to remove power from windowing
596     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
597     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
598     # So our calulated power has and extra <w(t)**2> in it.
599     # For the Hann window,
600     #   <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
601     # For low frequency components,
602     # where the frequency of x(t) is ~= the frequency of w(t),
603     # the normalization is not perfect. ??
604     # The normalization approaches perfection as chunk_size -> infinity.
605     return (freq_axis, power)
606
607
608 def _test_unitary_avg_power_spectrum_sin(
609     sin_freq=10, samp_freq=512, samples=1024, chunk_size=512, overlap=True,
610     window=window_hann):
611     x = _numpy.zeros((samples,), dtype=_numpy.float)
612     samp_freq = _numpy.float(samp_freq)
613     for i in range(samples):
614         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
615     freq_axis, power = unitary_avg_power_spectrum(
616         x, samp_freq, chunk_size, overlap, window)
617     imax = _numpy.argmax(power)
618
619     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
620     df = samp_freq/_numpy.float(chunk_size)  # df = 1/T, where T = total_time
621     i = int(sin_freq/df)
622     expected[i] = 0.5 / df  # see _test_unitary_power_spectrum_sin()
623
624     print('The power should peak at {} Hz of {} ({}, {})'.format(
625         sin_freq, expected[i], freq_axis[imax], power[imax]))
626     Pexp = 0
627     P    = 0
628     for i in range(len(freq_axis)):
629         Pexp += expected[i] * df
630         P    += power[i] * df
631     print('The total power should be {} ({})'.format(Pexp, P))
632
633     if TEST_PLOTS:
634         figure = _pyplot.figure()
635         time_axes = figure.add_subplot(2, 1, 1)
636         time_axes.plot(
637             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
638         time_axes.set_title('time series')
639         freq_axes = figure.add_subplot(2, 1, 2)
640         freq_axes.plot(freq_axis, power, 'r.')
641         freq_axes.plot(freq_axis, expected, 'b-')
642         freq_axes.set_title(
643             '{} samples of sin at {} Hz'.format(samples, sin_freq))
644
645
646 def _test_unitary_avg_power_spectrum_sin_suite():
647     print('Test unitary avg power spectrums on variously shaped sin functions')
648     _test_unitary_avg_power_spectrum_sin(
649         sin_freq=5, samp_freq=512, samples=1024)
650     _test_unitary_avg_power_spectrum_sin(
651         sin_freq=5, samp_freq=512, samples=2048)
652     _test_unitary_avg_power_spectrum_sin(
653         sin_freq=5, samp_freq=512, samples=4098)
654     _test_unitary_avg_power_spectrum_sin(
655         sin_freq=17, samp_freq=512, samples=1024)
656     _test_unitary_avg_power_spectrum_sin(
657         sin_freq=5, samp_freq=1024, samples=2048)
658     # test long wavelenth sin, so be closer to window frequency
659     _test_unitary_avg_power_spectrum_sin(
660         sin_freq=1, samp_freq=1024, samples=2048)
661     # finally, with some irrational numbers, to check that I'm not
662     # getting lucky
663     _test_unitary_avg_power_spectrum_sin(
664         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
665
666
667 def test():
668     _test_rfft_suite()
669     _test_unitary_rfft_parsevals_suite()
670     _test_unitary_rfft_rect_suite()
671     _test_unitary_rfft_gaussian_suite()
672     _test_unitary_power_spectrum_sin_suite()
673     _test_unitary_power_spectrum_delta_suite()
674     _test_unitary_power_spectrum_gaussian_suite()
675     _test_unitary_avg_power_spectrum_sin_suite()
676
677
678 if __name__ == '__main__':
679     from optparse import OptionParser
680
681     p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.')
682     p.add_option('-p', '--plot', dest='plot', action='store_true',
683                  help='Display time- and freq-space plots of test transforms.')
684
685     options,args = p.parse_args()
686
687     if options.plot:
688         import matplotlib.pyplot as _pyplot
689         TEST_PLOTS = True
690     test()
691     if options.plot:
692         _pyplot.show()