75a281903c06b84f63e4cc09ef4ffd67372a7485
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2011  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 numpy.fft.rfft.
19
20 Main entry functions:
21   unitary_rfft(data, freq=1.0)
22   power_spectrum(data, freq=1.0)
23   unitary_power_spectrum(data, freq=1.0)
24   avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann)
25   unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann)
26 """
27
28 from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \
29     sinc, arctan2, array, ones, arange, linspace, zeros, \
30     uint16, float, concatenate, fromfile, argmax, complex
31 from numpy.fft import rfft
32
33
34 __version__ = '0.3'
35
36 # Display time- and freq-space plots of the test transforms if True
37 TEST_PLOTS = False
38
39
40 def floor_pow_of_two(num) :
41     "Round num down to the closest exact a power of two."
42     lnum = log2(num)
43     if int(lnum) != lnum :
44         num = 2**floor(lnum)
45     return num
46
47 def round_pow_of_two(num) :
48     "Round num to the closest exact a power of two on a log scale."
49     lnum = log2(num)
50     if int(lnum) != lnum :
51         num = 2**round(lnum)
52     return num
53
54 def ceil_pow_of_two(num) :
55     "Round num up to the closest exact a power of two."
56     lnum = log2(num)
57     if int(lnum) != lnum :
58         num = 2**ceil(lnum)
59     return num
60
61 def _test_rfft(xs, Xs) :
62     # Numpy's FFT algoritm returns
63     #          n-1
64     #   X[k] = SUM x[m] exp (-j 2pi km /n)
65     #          m=0
66     # (see http://www.tramy.us/numpybook.pdf)
67     j = complex(0,1)
68     n = len(xs)
69     Xa = []
70     for k in range(n) :
71         Xa.append(sum([x*exp(-j*2*pi*k*m/n) for x,m in zip(xs,range(n))]))
72         if k < len(Xs):
73             if (Xs[k]-Xa[k])/abs(Xa[k]) >= 1e-6:
74                 raise ValueError(
75                     ('rfft mismatch on element {}: {} != {}, relative error {}'
76                      ).format(k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k])))
77     # Which should satisfy the discrete form of Parseval's theorem
78     #   n-1               n-1
79     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
80     #   m=0               k=0
81     timeSum = sum([abs(x)**2 for x in xs])
82     freqSum = sum([abs(X)**2 for X in Xa])
83     if abs(freqSum/float(n) - timeSum)/timeSum >= 1e-6:
84         raise ValueError(
85             "Mismatch on Parseval's, {} != 1/{} * {}".format(
86                 timeSum, n, freqSum))
87
88 def _test_rfft_suite() :
89     print('Test numpy rfft definition')
90     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
91     _test_rfft(xs, rfft(xs))
92
93 def unitary_rfft(data, freq=1.0) :
94     """Compute the Fourier transform of real data.
95
96     Unitary (preserves power [Parseval's theorem]).
97
98     If the units on your data are Volts,
99     and your sampling frequency is in Hz,
100     then freq_axis will be in Hz,
101     and trans will be in Volts.
102     """
103     nsamps = floor_pow_of_two(len(data))
104     # Which should satisfy the discrete form of Parseval's theorem
105     #   n-1               n-1
106     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
107     #   m=0               k=0
108     # However, we want our FFT to satisfy the continuous Parseval eqn
109     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
110     # which has the discrete form
111     #   n-1              n-1
112     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
113     #   m=0              k=0
114     # with X'_k = AX, this gives us
115     #   n-1                     n-1
116     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
117     #   m=0                     k=0
118     # so we see
119     #   A^2 df/dt = 1/n
120     #   A^2 = 1/n dt/df
121     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
122     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
123     # f_c in the transformed data is the Nyquist frequency (12.1.2)
124     #   f_c = 1/2dt
125     # and the points are spaced out by (12.1.5)
126     #   df = 1/ndt
127     # so
128     #   dt = 1/ndf
129     #   dt/df = 1/ndf^2
130     #   A^2 = 1/n^2df^2
131     #   A = 1/ndf = ndt/n = dt
132     # so we can convert the Numpy transformed data to match our unitary
133     # continuous transformed data with (also NR 12.1.8)
134     #   X'_k = dtX = X / <sampling freq>
135     trans = rfft(data[0:nsamps]) / float(freq)
136     freq_axis = linspace(0, freq/2, nsamps/2+1)
137     return (freq_axis, trans)
138
139 def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs):
140     # Which should satisfy the discretized integral form of Parseval's theorem
141     #   n-1              n-1
142     #   SUM |x_m|^2 dt = SUM |X_k|^2 df
143     #   m=0              k=0
144     dt = 1.0/freq
145     df = freqs[1]-freqs[0]
146     if df - 1/(len(xs)*dt)/df >= 1e-6:
147         raise ValueError(
148             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
149     Xa = list(Xs)
150     for k in range(len(Xs)-1,1,-1) :
151         Xa.append(Xa[k])
152     if len(xs) != len(Xa):
153         raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa)))
154     lhs = sum([abs(x)**2 for x in xs]) * dt
155     rhs = sum([abs(X)**2 for X in Xa]) * df
156     if abs(lhs - rhs)/lhs >= 1e-4:
157         raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs))
158
159 def _test_unitary_rfft_parsevals_suite():
160     print("Test unitary rfft on Parseval's theorem")
161     xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
162     dt = pi
163     freqs,Xs = unitary_rfft(xs, 1.0/dt)
164     _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
165
166 def _rect(t) :
167     if abs(t) < 0.5 :
168         return 1
169     else :
170         return 0
171
172 def _test_unitary_rfft_rect(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
173     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
174     samp_freq = float(samp_freq)
175     a = float(a)
176
177     x = zeros((samples,), dtype=float)
178     dt = 1.0/samp_freq
179     for i in range(samples) :
180         t = i*dt
181         x[i] = _rect(a*(t-time_shift))
182     freq_axis, X = unitary_rfft(x, samp_freq)
183     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
184
185     # remove the phase due to our time shift
186     j = complex(0.0,1.0) # sqrt(-1)
187     for i in range(len(freq_axis)) :
188         f = freq_axis[i]
189         inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
190         X[i] *= inverse_phase_shift
191
192     expected = zeros((len(freq_axis),), dtype=float)
193     # normalized sinc(x) = sin(pi x)/(pi x)
194     # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
195     if sinc(0.5) != 2.0/pi:
196         raise ValueError('abnormal sinc()')
197     for i in range(len(freq_axis)) :
198         f = freq_axis[i]
199         expected[i] = 1.0/abs(a) * sinc(f/a)
200
201     if TEST_PLOTS :
202         pylab.figure()
203         pylab.subplot(211)
204         pylab.plot(arange(0, dt*samples, dt), x)
205         pylab.title('time series')
206         pylab.subplot(212)
207         pylab.plot(freq_axis, X.real, 'r.')
208         pylab.plot(freq_axis, X.imag, 'g.')
209         pylab.plot(freq_axis, expected, 'b-')
210         pylab.title('freq series')
211
212 def _test_unitary_rfft_rect_suite() :
213     print('Test unitary FFTs on variously shaped rectangular functions')
214     _test_unitary_rfft_rect(a=0.5)
215     _test_unitary_rfft_rect(a=2.0)
216     _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
217     _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
218
219 def _gaussian(a, t) :
220     return exp(-a * t**2)
221
222 def _test_unitary_rfft_gaussian(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) :
223     "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)"
224     samp_freq = float(samp_freq)
225     a = float(a)
226
227     x = zeros((samples,), dtype=float)
228     dt = 1.0/samp_freq
229     for i in range(samples) :
230         t = i*dt
231         x[i] = _gaussian(a, (t-time_shift))
232     freq_axis, X = unitary_rfft(x, samp_freq)
233     #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
234
235     # remove the phase due to our time shift
236     j = complex(0.0,1.0) # sqrt(-1)
237     for i in range(len(freq_axis)) :
238         f = freq_axis[i]
239         inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
240         X[i] *= inverse_phase_shift
241
242     expected = zeros((len(freq_axis),), dtype=float)
243     for i in range(len(freq_axis)) :
244         f = freq_axis[i]
245         expected[i] = sqrt(pi/a) * _gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself.
246
247     if TEST_PLOTS :
248         pylab.figure()
249         pylab.subplot(211)
250         pylab.plot(arange(0, dt*samples, dt), x)
251         pylab.title('time series')
252         pylab.subplot(212)
253         pylab.plot(freq_axis, X.real, 'r.')
254         pylab.plot(freq_axis, X.imag, 'g.')
255         pylab.plot(freq_axis, expected, 'b-')
256         pylab.title('freq series')
257
258 def _test_unitary_rfft_gaussian_suite() :
259     print("Test unitary FFTs on variously shaped gaussian functions")
260     _test_unitary_rfft_gaussian(a=0.5)
261     _test_unitary_rfft_gaussian(a=2.0)
262     _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
263     _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
264
265
266
267 def power_spectrum(data, freq=1.0) :
268     """Compute the power spectrum of DATA taken with a sampling frequency FREQ.
269
270     DATA must be real (not complex).
271     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
272     If the number of samples in data is not an integer power of two,
273     the FFT ignores some of the later points.
274     """
275     nsamps = floor_pow_of_two(len(data))
276
277     freq_axis = linspace(0, freq/2, nsamps/2+1)
278     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
279     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
280     # See Numerical Recipies for a details.
281     trans = rfft(data[0:nsamps])
282     power = (trans * trans.conj()).real # We want the square of the amplitude.
283     return (freq_axis, power)
284
285 def unitary_power_spectrum(data, freq=1.0) :
286     freq_axis,power = power_spectrum(data, freq)
287     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
288     #
289     # numpy normalizes with 1/N on the inverse transform ifft,
290     # so we should normalize the freq-space representation with 1/sqrt(N).
291     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
292     # So the power gets normalized by that twice and we have 2/N
293     #
294     # On top of this, the FFT assumes a sampling freq of 1 per second,
295     # and we want to preserve area under our curves.
296     # If our total time T = len(data)/freq is smaller than 1,
297     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)),
298     # and we need to scale the powers down to conserve area.
299     # df_fft * F_fft(f) = df_real *F_real(f)
300     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
301     # So the power gets normalized by *that* twice and we have 2/N * freq**2
302
303     # power per unit time
304     # measure x(t) for time T
305     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
306     # PSD(f) = 2 |X(f)|**2 / T
307
308     # total_time = len(data)/float(freq)
309     # power *= 2.0 / float(freq)**2   /   total_time
310     # power *= 2.0 / freq**2   *   freq / len(data)
311     power *= 2.0 / (freq * float(len(data)))
312
313     return (freq_axis, power)
314
315 def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024) :
316     x = zeros((samples,), dtype=float)
317     samp_freq = float(samp_freq)
318     for i in range(samples) :
319         x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
320     freq_axis, power = unitary_power_spectrum(x, samp_freq)
321     imax = argmax(power)
322
323     expected = zeros((len(freq_axis),), dtype=float)
324     df = samp_freq/float(samples) # df = 1/T, where T = total_time
325     i = int(sin_freq/df)
326     # average power per unit time is
327     #  P = <x(t)**2>
328     # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
329     # so average value of (int sin(t)**2 dt) per unit time is 0.5
330     #  P = 0.5
331     # we spread that power over a frequency bin of width df, sp
332     #  P(f0) = 0.5/df
333     # where f0 is the sin's frequency
334     #
335     # or :
336     # FFT of sin(2*pi*t*f0) gives
337     #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
338     # (area under x(t) = 0, area under X(f) = 0)
339     # so one sided power spectral density (PSD) per unit time is
340     #  P(f) = 2 |X(f)|**2 / T
341     #       = 2 * |0.5 delta(f-f0)|**2 / T
342     #       = 0.5 * |delta(f-f0)|**2 / T
343     # but we're discrete and want the integral of the 'delta' to be 1,
344     # so 'delta'*df = 1  --> 'delta' = 1/df, and
345     #  P(f) = 0.5 / (df**2 * T)
346     #       = 0.5 / df                (T = 1/df)
347     expected[i] = 0.5 / df
348
349     print('The power should be a peak at {} Hz of {} ({}, {})'.format(
350             sin_freq, expected[i], freq_axis[imax], power[imax]))
351     Pexp = 0
352     P    = 0
353     for i in range(len(freq_axis)) :
354         Pexp += expected[i] *df
355         P    += power[i] * df
356     print('The total power should be {} ({})'.format(Pexp, P))
357
358     if TEST_PLOTS :
359         pylab.figure()
360         pylab.subplot(211)
361         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
362         pylab.title('time series')
363         pylab.subplot(212)
364         pylab.plot(freq_axis, power, 'r.')
365         pylab.plot(freq_axis, expected, 'b-')
366         pylab.title('{} samples of sin at {} Hz'.format(samples, sin_freq))
367
368 def _test_unitary_power_spectrum_sin_suite() :
369     print('Test unitary power spectrums on variously shaped sin functions')
370     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
371     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
372     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
373     _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
374     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
375     # finally, with some irrational numbers, to check that I'm not getting lucky
376     _test_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
377     # test with non-integer number of periods
378     _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
379
380 def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) :
381     x = zeros((samples,), dtype=float)
382     samp_freq = float(samp_freq)
383     x[0] = amp
384     freq_axis, power = unitary_power_spectrum(x, samp_freq)
385
386     # power = <x(t)**2> = (amp)**2 * dt/T
387     # we spread that power over the entire freq_axis [0,fN], so
388     #  P(f)  = (amp)**2 dt / (T fN)
389     # where
390     #  dt = 1/samp_freq        (sample period)
391     #  T  = samples/samp_freq  (total time of data aquisition)
392     #  fN = 0.5 samp_freq      (Nyquist frequency)
393     # so
394     #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
395     #       = 2 amp**2 / (samp_freq*samples)
396     expected_amp = 2.0 * amp**2 / (samp_freq * samples)
397     expected = ones((len(freq_axis),), dtype=float) * expected_amp
398
399     print('The power should be flat at y = {} ({})'.format(
400         expected_amp, power[0]))
401
402     if TEST_PLOTS :
403         pylab.figure()
404         pylab.subplot(211)
405         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
406         pylab.title('time series')
407         pylab.subplot(212)
408         pylab.plot(freq_axis, power, 'r.')
409         pylab.plot(freq_axis, expected, 'b-')
410         pylab.title('{} samples of delta amp {}'.format(samples, amp))
411
412 def _test_unitary_power_spectrum_delta_suite() :
413     print('Test unitary power spectrums on various delta functions')
414     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
415     _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
416     _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
417     _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
418     _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
419     _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024)
420
421 def _gaussian2(area, mean, std, t) :
422     "Integral over all time = area (i.e. normalized for area=1)"
423     return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2)
424
425 def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512) : #1024
426     x = zeros((samples,), dtype=float)
427     mean = float(mean)
428     for i in range(samples) :
429         t = i/float(samp_freq)
430         x[i] = _gaussian2(area, mean, std, t)
431     freq_axis, power = unitary_power_spectrum(x, samp_freq)
432
433     # generate the predicted curve
434     # by comparing our _gaussian2() form to _gaussian(),
435     # we see that the Fourier transform of x(t) has parameters:
436     #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
437     #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
438     #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
439     # So our power spectral density per unit time is given by
440     #  P(f) = 2 |X(f)|**2 / T
441     # Where
442     #  T  = samples/samp_freq  (total time of data aquisition)
443     mean = 0.0
444     area = area /(std*sqrt(2.0*pi))
445     std = 1.0/(2.0*pi*std)
446     expected = zeros((len(freq_axis),), dtype=float)
447     df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
448     for i in range(len(freq_axis)) :
449         f = i*df
450         gaus = _gaussian2(area, mean, std, f)
451         expected[i] = 2.0 * gaus**2 * samp_freq/samples
452     print(('The power should be a half-gaussian, '
453            'with a peak at 0 Hz with amplitude {} ({})').format(
454             expected[0], power[0]))
455
456     if TEST_PLOTS :
457         pylab.figure()
458         pylab.subplot(211)
459         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
460         pylab.title('time series')
461         pylab.subplot(212)
462         pylab.plot(freq_axis, power, 'r.')
463         pylab.plot(freq_axis, expected, 'b-')
464         pylab.title('freq series')
465
466 def _test_unitary_power_spectrum_gaussian_suite() :
467     print('Test unitary power spectrums on various gaussian functions')
468     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=1024)
469     _test_unitary_power_spectrum_gaussian(area=1, std=2, samp_freq=10.0, samples=1024)
470     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048)
471     _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048)
472     _test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024)
473     _test_unitary_power_spectrum_gaussian(area=pi, std=sqrt(2), samp_freq=exp(1), samples=1024)
474
475 def window_hann(length) :
476     "Returns a Hann window array with length entries"
477     win = zeros((length,), dtype=float)
478     for i in range(length) :
479         win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length)))
480     # avg value of cos over a period is 0
481     # so average height of Hann window is 0.5
482     return win
483
484 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
485                        overlap=True, window=window_hann) :
486     """Compute the avgerage power spectrum of DATA.
487
488     Taken with a sampling frequency FREQ.
489
490     DATA must be real (not complex) by breaking DATA into chunks.
491     The chunks may or may not be overlapping (by setting OVERLAP).
492     The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed,
493     and the resulting spectra are averaged together.
494     See NR 13.4 for rational.
495
496     Returns a tuple of two arrays, (freq_axis, power), suitable for plotting.
497     CHUNK_SIZE should really be a power of 2.
498     If the number of samples in DATA is not an integer power of CHUNK_SIZE,
499     the FFT ignores some of the later points.
500     """
501     if chunk_size != floor_pow_of_two(chunk_size):
502         raise ValueError(
503             'chunk_size {} should be a power of 2'.format(chunk_size))
504
505     nchunks = len(data)/chunk_size # integer division = implicit floor
506     if overlap :
507         chunk_step = chunk_size/2
508     else :
509         chunk_step = chunk_size
510
511     win = window(chunk_size) # generate a window of the appropriate size
512     freq_axis = linspace(0, freq/2, chunk_size/2+1)
513     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
514     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
515     # See Numerical Recipies for a details.
516     power = zeros((chunk_size/2+1,), dtype=float)
517     for i in range(nchunks) :
518         starti = i*chunk_step
519         stopi = starti+chunk_size
520         fft_chunk = rfft(data[starti:stopi]*win)
521         p_chunk = (fft_chunk * fft_chunk.conj()).real
522         power += p_chunk.astype(float)
523     power /= float(nchunks)
524     return (freq_axis, power)
525
526 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
527                                overlap=True, window=window_hann) :
528     """Compute the average power spectrum, preserving normalization
529     """
530     freq_axis,power = avg_power_spectrum(
531         data, freq, chunk_size, overlap, window)
532     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
533     power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum()
534     #                                       * 8/3  to remove power from windowing
535     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
536     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
537     # So our calulated power has and extra <w(t)**2> in it.
538     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
539     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
540     # The normalization is not perfect. ??
541     # The normalization approaches perfection as chunk_size -> infinity.
542     return (freq_axis, power)
543
544 def _test_unitary_avg_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024,
545                                          chunk_size=512, overlap=True,
546                                          window=window_hann) :
547     x = zeros((samples,), dtype=float)
548     samp_freq = float(samp_freq)
549     for i in range(samples) :
550         x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
551     freq_axis, power = unitary_avg_power_spectrum(
552         x, samp_freq, chunk_size, overlap, window)
553     imax = argmax(power)
554
555     expected = zeros((len(freq_axis),), dtype=float)
556     df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time
557     i = int(sin_freq/df)
558     expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
559
560     print('The power should peak at {} Hz of {} ({}, {})'.format(
561         sin_freq, expected[i], freq_axis[imax], power[imax]))
562     Pexp = 0
563     P    = 0
564     for i in range(len(freq_axis)) :
565         Pexp += expected[i] * df
566         P    += power[i] * df
567     print('The total power should be {} ({})'.format(Pexp, P))
568
569     if TEST_PLOTS :
570         pylab.figure()
571         pylab.subplot(211)
572         pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
573         pylab.title('time series')
574         pylab.subplot(212)
575         pylab.plot(freq_axis, power, 'r.')
576         pylab.plot(freq_axis, expected, 'b-')
577         pylab.title('{} samples of sin at {} Hz'.format(samples, sin_freq))
578
579 def _test_unitary_avg_power_spectrum_sin_suite() :
580     print('Test unitary avg power spectrums on variously shaped sin functions')
581     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
582     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
583     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
584     _test_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
585     _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
586     # test long wavelenth sin, so be closer to window frequency
587     _test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
588     # finally, with some irrational numbers, to check that I'm not getting lucky
589     _test_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
590
591
592 def test() :
593     _test_rfft_suite()
594     _test_unitary_rfft_parsevals_suite()
595     _test_unitary_rfft_rect_suite()
596     _test_unitary_rfft_gaussian_suite()
597     _test_unitary_power_spectrum_sin_suite()
598     _test_unitary_power_spectrum_delta_suite()
599     _test_unitary_power_spectrum_gaussian_suite()
600     _test_unitary_avg_power_spectrum_sin_suite()
601
602 if __name__ == '__main__':
603     from optparse import OptionParser
604
605     p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.')
606     p.add_option('-p', '--plot', dest='plot', action='store_true',
607                  help='Display time- and freq-space plots of test transforms.')
608
609     options,args = p.parse_args()
610
611     if options.plot:
612         import pylab
613         TEST_PLOTS = True
614     test()
615     if options.plot:
616         pylab.show()