FFT_tools: cleanup namespace by using _pyplot
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 numpy.fft.rfft.
19
20 Create some fake data:
21
22 >>> import numpy
23 >>> data = numpy.random.rand(10)
24 >>> frequency = 1
25
26 Main entry functions:
27
28 >>> rfft = unitary_rfft(data, freq=frequency)
29 >>> psd = power_spectrum(data, freq=frequency)
30 >>> upsd = unitary_power_spectrum(data, freq=frequency)
31 >>> apsd = avg_power_spectrum(data, freq=frequency, chunk_size=2048,
32 ...     overlap=True, window=window_hann)
33 >>> aupsd = unitary_avg_power_spectrum(data, freq=frequency, chunk_size=2048,
34 ...     overlap=True, window=window_hann)
35 """
36
37 import numpy as _numpy
38
39
40 __version__ = '0.3'
41
42 # Display time- and freq-space plots of the test transforms if True
43 TEST_PLOTS = False
44
45
46 def floor_pow_of_two(num) :
47     "Round num down to the closest exact a power of two."
48     lnum = _numpy.log2(num)
49     if int(lnum) != lnum :
50         num = 2**_numpy.floor(lnum)
51     return num
52
53 def round_pow_of_two(num) :
54     "Round num to the closest exact a power of two on a log scale."
55     lnum = _numpy.log2(num)
56     if int(lnum) != lnum :
57         num = 2**_numpy.round(lnum)
58     return num
59
60 def ceil_pow_of_two(num) :
61     "Round num up to the closest exact a power of two."
62     lnum = _numpy.log2(num)
63     if int(lnum) != lnum :
64         num = 2**_numpy.ceil(lnum)
65     return num
66
67 def _test_rfft(xs, Xs) :
68     # Numpy's FFT algoritm returns
69     #          n-1
70     #   X[k] = SUM x[m] exp (-j 2pi km /n)
71     #          m=0
72     # (see http://www.tramy.us/numpybook.pdf)
73     j = _numpy.complex(0,1)
74     n = len(xs)
75     Xa = []
76     for k in range(n) :
77         Xa.append(sum([x*_numpy.exp(-j*2*_numpy.pi*k*m/n)
78                        for x,m in zip(xs,range(n))]))
79         if k < len(Xs):
80             if (Xs[k]-Xa[k])/_numpy.abs(Xa[k]) >= 1e-6:
81                 raise ValueError(
82                     ('rfft mismatch on element {}: {} != {}, relative error {}'
83                      ).format(
84                         k, Xs[k], Xa[k], (Xs[k]-Xa[k])/_numpy.abs(Xa[k])))
85     # Which should satisfy the discrete form of Parseval's theorem
86     #   n-1               n-1
87     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
88     #   m=0               k=0
89     timeSum = sum([_numpy.abs(x)**2 for x in xs])
90     freqSum = sum([_numpy.abs(X)**2 for X in Xa])
91     if _numpy.abs(freqSum/_numpy.float(n) - timeSum)/timeSum >= 1e-6:
92         raise ValueError(
93             "Mismatch on Parseval's, {} != 1/{} * {}".format(
94                 timeSum, n, freqSum))
95
96 def _test_rfft_suite() :
97     print('Test numpy rfft definition')
98     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
99     _test_rfft(xs, _numpy.fft.rfft(xs))
100
101 def unitary_rfft(data, freq=1.0) :
102     """Compute the Fourier transform of real data.
103
104     Unitary (preserves power [Parseval's theorem]).
105
106     If the units on your data are Volts,
107     and your sampling frequency is in Hz,
108     then freq_axis will be in Hz,
109     and trans will be in Volts.
110     """
111     nsamps = floor_pow_of_two(len(data))
112     # Which should satisfy the discrete form of Parseval's theorem
113     #   n-1               n-1
114     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
115     #   m=0               k=0
116     # However, we want our FFT to satisfy the continuous Parseval eqn
117     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
118     # which has the discrete form
119     #   n-1              n-1
120     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
121     #   m=0              k=0
122     # with X'_k = AX, this gives us
123     #   n-1                     n-1
124     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
125     #   m=0                     k=0
126     # so we see
127     #   A^2 df/dt = 1/n
128     #   A^2 = 1/n dt/df
129     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
130     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
131     # f_c in the transformed data is the Nyquist frequency (12.1.2)
132     #   f_c = 1/2dt
133     # and the points are spaced out by (12.1.5)
134     #   df = 1/ndt
135     # so
136     #   dt = 1/ndf
137     #   dt/df = 1/ndf^2
138     #   A^2 = 1/n^2df^2
139     #   A = 1/ndf = ndt/n = dt
140     # so we can convert the Numpy transformed data to match our unitary
141     # continuous transformed data with (also NR 12.1.8)
142     #   X'_k = dtX = X / <sampling freq>
143     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
144     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
145     return (freq_axis, trans)
146
147 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
148     # Which should satisfy the discretized integral form of Parseval's theorem
149     #   n-1              n-1
150     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
151     #   m=0              k=0
152     dt = 1.0/freq
153     df = freqs[1]-freqs[0]
154     if df - 1/(len(xs)*dt)/df >= 1e-6:
155         raise ValueError(
156             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
157     Xa = list(Xs)
158     for k in range(len(Xs)-1,1,-1) :
159         Xa.append(Xa[k])
160     if len(xs) != len(Xa):
161         raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa)))
162     lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
163     rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
164     if _numpy.abs(lhs - rhs)/lhs >= 1e-4:
165         raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs))
166
167 def _test_unitary_rfft_parsevals_suite():
168     print("Test unitary rfft on Parseval's theorem")
169     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
170     dt = _numpy.pi
171     freqs,Xs = unitary_rfft(xs, 1.0/dt)
172     _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
173
174 def _rect(t) :
175     if _numpy.abs(t) < 0.5 :
176         return 1
177     else :
178         return 0
179
180 def _test_unitary_rfft_rect(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
181     "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)"
182     samp_freq = _numpy.float(samp_freq)
183     a = _numpy.float(a)
184
185     x = _numpy.zeros((samples,), dtype=_numpy.float)
186     dt = 1.0/samp_freq
187     for i in range(samples) :
188         t = i*dt
189         x[i] = _rect(a*(t-time_shift))
190     freq_axis, X = unitary_rfft(x, samp_freq)
191     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
192
193     # remove the phase due to our time shift
194     j = _numpy.complex(0.0,1.0) # sqrt(-1)
195     for i in range(len(freq_axis)) :
196         f = freq_axis[i]
197         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
198         X[i] *= inverse_phase_shift
199
200     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
201     # normalized sinc(x) = sin(pi x)/(pi x)
202     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
203     if _numpy.sinc(0.5) != 2.0/_numpy.pi:
204         raise ValueError('abnormal sinc()')
205     for i in range(len(freq_axis)) :
206         f = freq_axis[i]
207         expected[i] = 1.0/_numpy.abs(a) * _numpy.sinc(f/a)
208
209     if TEST_PLOTS :
210         figure = _pyplot.figure()
211         time_axes = figure.add_subplot(2, 1, 1)
212         time_axes.plot(_numpy.arange(0, dt*samples, dt), x)
213         time_axes.set_title('time series')
214         freq_axes = figure.add_subplot(2, 1, 2)
215         freq_axes.plot(freq_axis, X.real, 'r.')
216         freq_axes.plot(freq_axis, X.imag, 'g.')
217         freq_axes.plot(freq_axis, expected, 'b-')
218         freq_axes.set_title('freq series')
219
220 def _test_unitary_rfft_rect_suite() :
221     print('Test unitary FFTs on variously shaped rectangular functions')
222     _test_unitary_rfft_rect(a=0.5)
223     _test_unitary_rfft_rect(a=2.0)
224     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
225     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
226
227 def _gaussian(a, t) :
228     return _numpy.exp(-a * t**2)
229
230 def _test_unitary_rfft_gaussian(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
231     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
232     samp_freq = _numpy.float(samp_freq)
233     a = _numpy.float(a)
234
235     x = _numpy.zeros((samples,), dtype=_numpy.float)
236     dt = 1.0/samp_freq
237     for i in range(samples) :
238         t = i*dt
239         x[i] = _gaussian(a, (t-time_shift))
240     freq_axis, X = unitary_rfft(x, samp_freq)
241     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
242
243     # remove the phase due to our time shift
244     j = _numpy.complex(0.0,1.0) # sqrt(-1)
245     for i in range(len(freq_axis)) :
246         f = freq_axis[i]
247         inverse_phase_shift = _numpy.exp(j*2.0*_numpy.pi*time_shift*f)
248         X[i] *= inverse_phase_shift
249
250     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
251     for i in range(len(freq_axis)) :
252         f = freq_axis[i]
253         # see Wikipedia, or do the integral yourself.
254         expected[i] = _numpy.sqrt(_numpy.pi/a) * _gaussian(
255             1.0/a, _numpy.pi*f)
256
257     if TEST_PLOTS :
258         figure = _pyplot.figure()
259         time_axes = figure.add_subplot(2, 1, 1)
260         time_axes.plot(_numpy.arange(0, dt*samples, dt), x)
261         time_axes.set_title('time series')
262         freq_axes = figure.add_subplot(2, 1, 2)
263         freq_axes.plot(freq_axis, X.real, 'r.')
264         freq_axes.plot(freq_axis, X.imag, 'g.')
265         freq_axes.plot(freq_axis, expected, 'b-')
266         freq_axes.set_title('freq series')
267
268 def _test_unitary_rfft_gaussian_suite() :
269     print("Test unitary FFTs on variously shaped gaussian functions")
270     _test_unitary_rfft_gaussian(a=0.5)
271     _test_unitary_rfft_gaussian(a=2.0)
272     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
273     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
274
275
276
277 def power_spectrum(data, freq=1.0) :
278     """Compute the power spectrum of DATA taken with a sampling frequency FREQ.
279
280     DATA must be real (not complex).
281     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
282     If the number of samples in data is not an integer power of two,
283     the FFT ignores some of the later points.
284     """
285     nsamps = floor_pow_of_two(len(data))
286
287     freq_axis = _numpy.linspace(0, freq/2, nsamps/2+1)
288     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
289     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
290     # See Numerical Recipies for a details.
291     trans = _numpy.fft.rfft(data[0:nsamps])
292     power = (trans * trans.conj()).real # We want the square of the amplitude.
293     return (freq_axis, power)
294
295 def unitary_power_spectrum(data, freq=1.0) :
296     freq_axis,power = power_spectrum(data, freq)
297     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
298     #
299     # numpy normalizes with 1/N on the inverse transform ifft,
300     # so we should normalize the freq-space representation with 1/sqrt(N).
301     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
302     # So the power gets normalized by that twice and we have 2/N
303     #
304     # On top of this, the FFT assumes a sampling freq of 1 per second,
305     # and we want to preserve area under our curves.
306     # If our total time T = len(data)/freq is smaller than 1,
307     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)),
308     # and we need to scale the powers down to conserve area.
309     # df_fft * F_fft(f) = df_real *F_real(f)
310     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
311     # So the power gets normalized by *that* twice and we have 2/N * freq**2
312
313     # power per unit time
314     # measure x(t) for time T
315     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
316     # PSD(f) = 2 |X(f)|**2 / T
317
318     # total_time = len(data)/float(freq)
319     # power *= 2.0 / float(freq)**2   /   total_time
320     # power *= 2.0 / freq**2   *   freq / len(data)
321     power *= 2.0 / (freq * _numpy.float(len(data)))
322
323     return (freq_axis, power)
324
325 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024) :
326     x = _numpy.zeros((samples,), dtype=_numpy.float)
327     samp_freq = _numpy.float(samp_freq)
328     for i in range(samples) :
329         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
330     freq_axis, power = unitary_power_spectrum(x, samp_freq)
331     imax = _numpy.argmax(power)
332
333     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
334     df = samp_freq/_numpy.float(samples) # df = 1/T, where T = total_time
335     i = int(sin_freq/df)
336     # average power per unit time is
337     #  P = <x(t)**2>
338     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
339     # so average value of (int sin(t)**2 dt) per unit time is 0.5
340     #  P = 0.5
341     # we spread that power over a frequency bin of width df, sp
342     #  P(f0) = 0.5/df
343     # where f0 is the sin's frequency
344     #
345     # or :
346     # FFT of sin(2*pi*t*f0) gives
347     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
348     # (area under x(t) = 0, area under X(f) = 0)
349     # so one sided power spectral density (PSD) per unit time is
350     #  P(f) = 2 |X(f)|**2 / T
351     #       = 2 * |0.5 delta(f-f0)|**2 / T
352     #       = 0.5 * |delta(f-f0)|**2 / T
353     # but we're discrete and want the integral of the 'delta' to be 1,
354     # so 'delta'*df = 1  --> 'delta' = 1/df, and
355     #  P(f) = 0.5 / (df**2 * T)
356     #       = 0.5 / df                (T = 1/df)
357     expected[i] = 0.5 / df
358
359     print('The power should be a peak at {} Hz of {} ({}, {})'.format(
360             sin_freq, expected[i], freq_axis[imax], power[imax]))
361     Pexp = 0
362     P    = 0
363     for i in range(len(freq_axis)) :
364         Pexp += expected[i] *df
365         P    += power[i] * df
366     print('The total power should be {} ({})'.format(Pexp, P))
367
368     if TEST_PLOTS :
369         figure = _pyplot.figure()
370         time_axes = figure.add_subplot(2, 1, 1)
371         time_axes.plot(
372             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
373         time_axes.set_title('time series')
374         freq_axes = figure.add_subplot(2, 1, 2)
375         freq_axes.plot(freq_axis, power, 'r.')
376         freq_axes.plot(freq_axis, expected, 'b-')
377         freq_axes.set_title(
378             '{} samples of sin at {} Hz'.format(samples, sin_freq))
379
380 def _test_unitary_power_spectrum_sin_suite() :
381     print('Test unitary power spectrums on variously shaped sin functions')
382     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
383     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
384     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
385     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
386     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
387     # finally, with some irrational numbers, to check that I'm not getting lucky
388     _test_unitary_power_spectrum_sin(
389         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
390     # test with non-integer number of periods
391     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
392
393 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) :
394     x = _numpy.zeros((samples,), dtype=_numpy.float)
395     samp_freq = _numpy.float(samp_freq)
396     x[0] = amp
397     freq_axis, power = unitary_power_spectrum(x, samp_freq)
398
399     # power = <x(t)**2> = (amp)**2 * dt/T
400     # we spread that power over the entire freq_axis [0,fN], so
401     #  P(f)  = (amp)**2 dt / (T fN)
402     # where
403     #  dt = 1/samp_freq        (sample period)
404     #  T  = samples/samp_freq  (total time of data aquisition)
405     #  fN = 0.5 samp_freq      (Nyquist frequency)
406     # so
407     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
408     #       = 2 amp**2 / (samp_freq*samples)
409     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
410     expected = _numpy.ones(
411         (len(freq_axis),), dtype=_numpy.float) * expected_amp
412
413     print('The power should be flat at y = {} ({})'.format(
414         expected_amp, power[0]))
415
416     if TEST_PLOTS :
417         figure = _pyplot.figure()
418         time_axes = figure.add_subplot(2, 1, 1)
419         time_axes.plot(
420             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
421         time_axes.set_title('time series')
422         freq_axes = figure.add_subplot(2, 1, 2)
423         freq_axes.plot(freq_axis, power, 'r.')
424         freq_axes.plot(freq_axis, expected, 'b-')
425         freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
426
427 def _test_unitary_power_spectrum_delta_suite() :
428     print('Test unitary power spectrums on various delta functions')
429     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
430     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
431     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
432     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
433     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
434     _test_unitary_power_spectrum_delta(
435         amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
436
437 def _gaussian2(area, mean, std, t) :
438     "Integral over all time = area (i.e. normalized for area=1)"
439     return area/(std*_numpy.sqrt(2.0*_numpy.pi)) * _numpy.exp(
440         -0.5*((t-mean)/std)**2)
441
442 def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512) : #1024
443     x = _numpy.zeros((samples,), dtype=_numpy.float)
444     mean = _numpy.float(mean)
445     for i in range(samples) :
446         t = i/_numpy.float(samp_freq)
447         x[i] = _gaussian2(area, mean, std, t)
448     freq_axis, power = unitary_power_spectrum(x, samp_freq)
449
450     # generate the predicted curve
451     # by comparing our _gaussian2() form to _gaussian(),
452     # we see that the Fourier transform of x(t) has parameters:
453     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
454     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
455     #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
456     # So our power spectral density per unit time is given by
457     #  P(f) = 2 |X(f)|**2 / T
458     # Where
459     #  T  = samples/samp_freq  (total time of data aquisition)
460     mean = 0.0
461     area = area /(std*_numpy.sqrt(2.0*_numpy.pi))
462     std = 1.0/(2.0*_numpy.pi*std)
463     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
464     # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
465     df = _numpy.float(samp_freq)/samples
466     for i in range(len(freq_axis)) :
467         f = i*df
468         gaus = _gaussian2(area, mean, std, f)
469         expected[i] = 2.0 * gaus**2 * samp_freq/samples
470     print(('The power should be a half-gaussian, '
471            'with a peak at 0 Hz with amplitude {} ({})').format(
472             expected[0], power[0]))
473
474     if TEST_PLOTS :
475         figure = _pyplot.figure()
476         time_axes = figure.add_subplot(2, 1, 1)
477         time_axes.plot(
478             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
479         time_axes.set_title('time series')
480         freq_axes = figure.add_subplot(2, 1, 2)
481         freq_axes.plot(freq_axis, power, 'r.')
482         freq_axes.plot(freq_axis, expected, 'b-')
483         freq_axes.set_title('freq series')
484
485 def _test_unitary_power_spectrum_gaussian_suite() :
486     print('Test unitary power spectrums on various gaussian functions')
487     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=1024)
488     _test_unitary_power_spectrum_gaussian(area=1, std=2, samp_freq=10.0, samples=1024)
489     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048)
490     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048)
491     _test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024)
492     _test_unitary_power_spectrum_gaussian(
493         area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1),
494         samples=1024)
495
496 def window_hann(length) :
497     "Returns a Hann window array with length entries"
498     win = _numpy.zeros((length,), dtype=_numpy.float)
499     for i in range(length) :
500         win[i] = 0.5*(1.0-_numpy.cos(2.0*_numpy.pi*_numpy.float(i)/(length)))
501     # avg value of cos over a period is 0
502     # so average height of Hann window is 0.5
503     return win
504
505 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
506                        overlap=True, window=window_hann) :
507     """Compute the avgerage power spectrum of DATA.
508
509     Taken with a sampling frequency FREQ.
510
511     DATA must be real (not complex) by breaking DATA into chunks.
512     The chunks may or may not be overlapping (by setting OVERLAP).
513     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
514     and the resulting spectra are averaged together.
515     See NR 13.4 for rational.
516
517     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
518     CHUNK_SIZE should really be a power of 2.
519     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
520     the FFT ignores some of the later points.
521     """
522     if chunk_size != floor_pow_of_two(chunk_size):
523         raise ValueError(
524             'chunk_size {} should be a power of 2'.format(chunk_size))
525
526     nchunks = len(data)/chunk_size # integer division = implicit floor
527     if overlap :
528         chunk_step = chunk_size/2
529     else :
530         chunk_step = chunk_size
531
532     win = window(chunk_size) # generate a window of the appropriate size
533     freq_axis = _numpy.linspace(0, freq/2, chunk_size/2+1)
534     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
535     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
536     # See Numerical Recipies for a details.
537     power = _numpy.zeros((chunk_size/2+1,), dtype=_numpy.float)
538     for i in range(nchunks) :
539         starti = i*chunk_step
540         stopi = starti+chunk_size
541         fft_chunk = _numpy.fft.rfft(data[starti:stopi]*win)
542         p_chunk = (fft_chunk * fft_chunk.conj()).real
543         power += p_chunk.astype(_numpy.float)
544     power /= _numpy.float(nchunks)
545     return (freq_axis, power)
546
547 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
548                                overlap=True, window=window_hann) :
549     """Compute the average power spectrum, preserving normalization
550     """
551     freq_axis,power = avg_power_spectrum(
552         data, freq, chunk_size, overlap, window)
553     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
554     # see unitary_power_spectrum()
555     power *= 2.0 / (freq*_numpy.float(chunk_size)) * 8/3
556     #                                       * 8/3  to remove power from windowing
557     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
558     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
559     # So our calulated power has and extra <w(t)**2> in it.
560     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
561     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
562     # The normalization is not perfect. ??
563     # The normalization approaches perfection as chunk_size -> infinity.
564     return (freq_axis, power)
565
566 def _test_unitary_avg_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024,
567                                          chunk_size=512, overlap=True,
568                                          window=window_hann) :
569     x = _numpy.zeros((samples,), dtype=_numpy.float)
570     samp_freq = _numpy.float(samp_freq)
571     for i in range(samples) :
572         x[i] = _numpy.sin(2.0 * _numpy.pi * (i/samp_freq) * sin_freq)
573     freq_axis, power = unitary_avg_power_spectrum(
574         x, samp_freq, chunk_size, overlap, window)
575     imax = _numpy.argmax(power)
576
577     expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
578     df = samp_freq/_numpy.float(chunk_size) # df = 1/T, where T = total_time
579     i = int(sin_freq/df)
580     expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
581
582     print('The power should peak at {} Hz of {} ({}, {})'.format(
583         sin_freq, expected[i], freq_axis[imax], power[imax]))
584     Pexp = 0
585     P    = 0
586     for i in range(len(freq_axis)) :
587         Pexp += expected[i] * df
588         P    += power[i] * df
589     print('The total power should be {} ({})'.format(Pexp, P))
590
591     if TEST_PLOTS :
592         figure = _pyplot.figure()
593         time_axes = figure.add_subplot(2, 1, 1)
594         time_axes.plot(
595             _numpy.arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
596         time_axes.set_title('time series')
597         freq_axes = figure.add_subplot(2, 1, 2)
598         freq_axes.plot(freq_axis, power, 'r.')
599         freq_axes.plot(freq_axis, expected, 'b-')
600         freq_axes.set_title(
601             '{} samples of sin at {} Hz'.format(samples, sin_freq))
602
603 def _test_unitary_avg_power_spectrum_sin_suite() :
604     print('Test unitary avg power spectrums on variously shaped sin functions')
605     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
606     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
607     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
608     _test_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
609     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
610     # test long wavelenth sin, so be closer to window frequency
611     _test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
612     # finally, with some irrational numbers, to check that I'm not getting lucky
613     _test_unitary_avg_power_spectrum_sin(
614         sin_freq=_numpy.pi, samp_freq=100*_numpy.exp(1), samples=1024)
615
616
617 def test() :
618     _test_rfft_suite()
619     _test_unitary_rfft_parsevals_suite()
620     _test_unitary_rfft_rect_suite()
621     _test_unitary_rfft_gaussian_suite()
622     _test_unitary_power_spectrum_sin_suite()
623     _test_unitary_power_spectrum_delta_suite()
624     _test_unitary_power_spectrum_gaussian_suite()
625     _test_unitary_avg_power_spectrum_sin_suite()
626
627 if __name__ == '__main__':
628     from optparse import OptionParser
629
630     p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.')
631     p.add_option('-p', '--plot', dest='plot', action='store_true',
632                  help='Display time- and freq-space plots of test transforms.')
633
634     options,args = p.parse_args()
635
636     if options.plot:
637         import matplotlib.pyplot as _pyplot
638         TEST_PLOTS = True
639     test()
640     if options.plot:
641         _pyplot.show()