87dbf11cc3af8e5ea977bac91107b47f91fb736a
[hooke.git] / hooke / util / fft.py
1 # Copyright (C) 2010-2012 W. Trevor King <wking@tremily.us>
2 #
3 # This file is part of Hooke.
4 #
5 # Hooke is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU Lesser General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option) any
8 # later version.
9 #
10 # Hooke is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
13 # details.
14 #
15 # You should have received a copy of the GNU Lesser General Public License
16 # along with Hooke.  If not, see <http://www.gnu.org/licenses/>.
17
18 """Wrap :mod:`numpy.fft` to produce 1D unitary transforms and power spectra.
19
20 Define some FFT wrappers to reduce clutter.
21 Provides a unitary discrete FFT and a windowed version.
22 Based on :func:`numpy.fft.rfft`.
23
24 Main entry functions:
25
26 * :func:`unitary_rfft`
27 * :func:`power_spectrum`
28 * :func:`unitary_power_spectrum`
29 * :func:`avg_power_spectrum`
30 * :func:`unitary_avg_power_spectrum`
31 """
32
33 import unittest
34
35 from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \
36     sinc, arctan2, array, ones, arange, linspace, zeros, \
37     uint16, float, concatenate, fromfile, argmax, complex
38 from numpy.fft import rfft
39
40
41 TEST_PLOTS = False
42
43 def floor_pow_of_two(num):
44     """Round `num` down to the closest exact a power of two.
45
46     Examples
47     --------
48
49     >>> floor_pow_of_two(3)
50     2
51     >>> floor_pow_of_two(11)
52     8
53     >>> floor_pow_of_two(15)
54     8
55     """
56     lnum = log2(num)
57     if int(lnum) != lnum:
58         num = 2**floor(lnum)
59     return int(num)
60
61 def round_pow_of_two(num):
62     """Round `num` to the closest exact a power of two on a log scale.
63
64     Examples
65     --------
66
67     >>> round_pow_of_two(2.9) # Note rounding on *log scale*
68     4
69     >>> round_pow_of_two(11)
70     8
71     >>> round_pow_of_two(15)
72     16
73     """
74     lnum = log2(num)
75     if int(lnum) != lnum:
76         num = 2**round(lnum)
77     return int(num)
78
79 def ceil_pow_of_two(num):
80     """Round `num` up to the closest exact a power of two.
81
82     Examples
83     --------
84
85     >>> ceil_pow_of_two(3)
86     4
87     >>> ceil_pow_of_two(11)
88     16
89     >>> ceil_pow_of_two(15)
90     16
91     """
92     lnum = log2(num)
93     if int(lnum) != lnum:
94         num = 2**ceil(lnum)
95     return int(num)
96
97 def unitary_rfft(data, freq=1.0):
98     """Compute the unitary Fourier transform of real data.
99
100     Unitary = preserves power [Parseval's theorem].
101
102     Parameters
103     ----------
104     data : iterable
105         Real (not complex) data taken with a sampling frequency `freq`.
106     freq : real
107         Sampling frequency.
108
109     Returns
110     -------
111     freq_axis,trans : numpy.ndarray
112         Arrays ready for plotting.
113
114     Notes
115     -----
116     If the units on your data are Volts,
117     and your sampling frequency is in Hz,
118     then `freq_axis` will be in Hz,
119     and `trans` will be in Volts.
120     """
121     nsamps = floor_pow_of_two(len(data))
122     # Which should satisfy the discrete form of Parseval's theorem
123     #   n-1               n-1
124     #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
125     #   m=0               k=0
126     # However, we want our FFT to satisfy the continuous Parseval eqn
127     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
128     # which has the discrete form
129     #   n-1              n-1
130     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
131     #   m=0              k=0
132     # with X'_k = AX, this gives us
133     #   n-1                     n-1
134     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
135     #   m=0                     k=0
136     # so we see
137     #   A^2 df/dt = 1/n
138     #   A^2 = 1/n dt/df
139     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
140     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
141     # f_c in the transformed data is the Nyquist frequency (12.1.2)
142     #   f_c = 1/2dt
143     # and the points are spaced out by (12.1.5)
144     #   df = 1/ndt
145     # so
146     #   dt = 1/ndf
147     #   dt/df = 1/ndf^2
148     #   A^2 = 1/n^2df^2
149     #   A = 1/ndf = ndt/n = dt
150     # so we can convert the Numpy transformed data to match our unitary
151     # continuous transformed data with (also NR 12.1.8)
152     #   X'_k = dtX = X / <sampling freq>
153     trans = rfft(data[0:nsamps]) / float(freq)
154     freq_axis = linspace(0, freq/2, nsamps/2+1)
155     return (freq_axis, trans)
156
157 def power_spectrum(data, freq=1.0):
158     """Compute the power spectrum of the time series `data`.
159
160     Parameters
161     ----------
162     data : iterable
163         Real (not complex) data taken with a sampling frequency `freq`.
164     freq : real
165         Sampling frequency.
166
167     Returns
168     -------
169     freq_axis,power : numpy.ndarray
170         Arrays ready for plotting.
171
172     Notes
173     -----
174     If the number of samples in `data` is not an integer power of two,
175     the FFT ignores some of the later points.
176
177     See Also
178     --------
179     unitary_power_spectrum,avg_power_spectrum
180     """
181     nsamps = floor_pow_of_two(len(data))
182     
183     freq_axis = linspace(0, freq/2, nsamps/2+1)
184     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
185     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
186     # See Numerical Recipies for a details.
187     trans = rfft(data[0:nsamps])
188     power = trans * trans.conj() # We want the square of the amplitude.
189     return (freq_axis, power)
190
191 def unitary_power_spectrum(data, freq=1.0):
192     """Compute the unitary power spectrum of the time series `data`.
193
194     See Also
195     --------
196     power_spectrum,unitary_avg_power_spectrum
197     """
198     freq_axis,power = power_spectrum(data, freq)
199     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
200     #
201     # numpy normalizes with 1/N on the inverse transform ifft,
202     # so we should normalize the freq-space representation with 1/sqrt(N).
203     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
204     # So the power gets normalized by that twice and we have 2/N
205     #
206     # On top of this, the FFT assumes a sampling freq of 1 per second,
207     # and we want to preserve area under our curves.
208     # If our total time T = len(data)/freq is smaller than 1,
209     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)), 
210     # and we need to scale the powers down to conserve area.
211     # df_fft * F_fft(f) = df_real *F_real(f)
212     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
213     # So the power gets normalized by *that* twice and we have 2/N * freq**2
214
215     # power per unit time
216     # measure x(t) for time T
217     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
218     # PSD(f) = 2 |X(f)|**2 / T
219
220     # total_time = len(data)/float(freq)
221     # power *= 2.0 / float(freq)**2   /   total_time
222     # power *= 2.0 / freq**2   *   freq / len(data)
223     power *= 2.0 / (freq * float(len(data)))
224
225     return (freq_axis, power)
226
227 def window_hann(length):
228     r"""Returns a Hann window array with length entries
229
230     Notes
231     -----
232     The Hann window with length :math:`L` is defined as
233
234     .. math:: w_i = \frac{1}{2} (1-\cos(2\pi i/L))
235     """
236     win = zeros((length,), dtype=float)
237     for i in range(length):
238         win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length)))
239     # avg value of cos over a period is 0
240     # so average height of Hann window is 0.5
241     return win
242
243 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
244                        overlap=True, window=window_hann):
245     """Compute the avgerage power spectrum of `data`.
246
247     Parameters
248     ----------
249     data : iterable
250         Real (not complex) data taken with a sampling frequency `freq`.
251     freq : real
252         Sampling frequency.
253     chunk_size : int
254         Number of samples per chunk.  Use a power of two.
255     overlap: {True,False}
256         If `True`, each chunk overlaps the previous chunk by half its
257         length.  Otherwise, the chunks are end-to-end, and not
258         overlapping.
259     window: iterable
260         Weights used to "smooth" the chunks, there is a whole science
261         behind windowing, but if you're not trying to squeeze every
262         drop of information out of your data, you'll be OK with the
263         default Hann window.
264
265     Returns
266     -------
267     freq_axis,power : numpy.ndarray
268         Arrays ready for plotting.
269
270     Notes
271     -----
272     The average power spectrum is computed by breaking `data` into
273     chunks of length `chunk_size`.  These chunks are transformed
274     individually into frequency space and then averaged together.
275
276     See Numerical Recipes 2 section 13.4 for a good introduction to
277     the theory.
278
279     If the number of samples in `data` is not a multiple of
280     `chunk_size`, we ignore the extra points.
281     """
282     assert chunk_size == floor_pow_of_two(chunk_size), \
283         "chunk_size %d should be a power of 2" % chunk_size
284
285     nchunks = len(data)/chunk_size # integer division = implicit floor
286     if overlap:
287         chunk_step = chunk_size/2
288     else:
289         chunk_step = chunk_size
290     
291     win = window(chunk_size) # generate a window of the appropriate size
292     freq_axis = linspace(0, freq/2, chunk_size/2+1)
293     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
294     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
295     # See Numerical Recipies for a details.
296     power = zeros((chunk_size/2+1,), dtype=float)
297     for i in range(nchunks):
298         starti = i*chunk_step
299         stopi = starti+chunk_size
300         fft_chunk = rfft(data[starti:stopi]*win)
301         p_chunk = fft_chunk * fft_chunk.conj() 
302         power += p_chunk.astype(float)
303     power /= float(nchunks)
304     return (freq_axis, power)
305
306 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
307                                overlap=True, window=window_hann):
308     """Compute the unitary average power spectrum of `data`.
309
310     See Also
311     --------
312     avg_power_spectrum,unitary_power_spectrum
313     """
314     freq_axis,power = avg_power_spectrum(data, freq, chunk_size,
315                                          overlap, window)
316     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
317     power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum()            
318     #                                       * 8/3  to remove power from windowing
319     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
320     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
321     # So our calulated power has and extra <w(t)**2> in it.
322     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
323     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
324     # The normalization is not perfect. ??
325     # The normalization approaches perfection as chunk_size -> infinity.
326     return (freq_axis, power)
327
328
329
330 class TestRFFT (unittest.TestCase):
331     r"""Ensure Numpy's FFT algorithm acts as expected.
332
333     Notes
334     -----
335     The expected return values are [#dft]_:
336
337     .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n}
338
339     .. [#dft] See the *Background information* section of :mod:`numpy.fft`.
340     """
341     def run_rfft(self, xs, Xs):
342         i = complex(0,1)
343         n = len(xs)
344         Xa = []
345         for k in range(n):
346             Xa.append(sum([x*exp(-2*pi*i*m*k/n) for x,m in zip(xs,range(n))]))
347             if k < len(Xs):
348                 assert (Xs[k]-Xa[k])/abs(Xa[k]) < 1e-6, \
349                     "rfft mismatch on element %d: %g != %g, relative error %g" \
350                     % (k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k]))
351         # Which should satisfy the discrete form of Parseval's theorem
352         #   n-1               n-1
353         #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
354         #   m=0               k=0
355         timeSum = sum([abs(x)**2 for x in xs])
356         freqSum = sum([abs(X)**2 for X in Xa])
357         assert abs(freqSum/float(n) - timeSum)/timeSum < 1e-6, \
358             "Mismatch on Parseval's, %g != 1/%d * %g" % (timeSum, n, freqSum)
359
360     def test_rfft(self):
361         xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
362         self.run_rfft(xs, rfft(xs))
363
364 class TestUnitaryRFFT (unittest.TestCase):
365     """Verify `unitary_rfft`.
366     """
367     def run_unitary_rfft_parsevals(self, xs, freq, freqs, Xs):
368         """Check the discretized integral form of Parseval's theorem
369
370         Notes
371         -----
372         Which is:
373
374         .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df
375         """
376         dt = 1.0/freq
377         df = freqs[1]-freqs[0]
378         assert (df - 1/(len(xs)*dt))/df < 1e-6, \
379             "Mismatch in spacing, %g != 1/(%d*%g)" % (df, len(xs), dt)
380         Xa = list(Xs)
381         for k in range(len(Xs)-1,1,-1):
382             Xa.append(Xa[k])
383         assert len(xs) == len(Xa), "Length mismatch %d != %d" % (len(xs), len(Xa))
384         lhs = sum([abs(x)**2 for x in xs]) * dt
385         rhs = sum([abs(X)**2 for X in Xa]) * df
386         assert abs(lhs - rhs)/lhs < 1e-4, "Mismatch on Parseval's, %g != %g" \
387             % (lhs, rhs)
388     
389     def test_unitary_rfft_parsevals(self):
390         "Test unitary rfft on Parseval's theorem"
391         xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
392         dt = pi
393         freqs,Xs = unitary_rfft(xs, 1.0/dt)
394         self.run_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
395     
396     def rect(self, t):
397         r"""Rectangle function.
398
399         Notes
400         -----
401
402         .. math::
403
404             \rect(t) = \begin{cases}
405                1& \text{if $|t| < 0.5$}, \\
406                0& \text{if $|t| \ge 0.5$}.
407                        \end{cases}
408         """
409         if abs(t) < 0.5:
410             return 1
411         else:
412             return 0
413     
414     def run_unitary_rfft_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6,
415                               samples=256):
416         r"""Test `unitary_rttf` on known function `rect(at)`.
417
418         Notes
419         -----
420         Analytic result:
421
422         .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a)
423         """
424         samp_freq = float(samp_freq)
425         a = float(a)
426     
427         x = zeros((samples,), dtype=float)
428         dt = 1.0/samp_freq
429         for i in range(samples):
430             t = i*dt
431             x[i] = self.rect(a*(t-time_shift))
432         freq_axis, X = unitary_rfft(x, samp_freq)
433         #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
434     
435         # remove the phase due to our time shift
436         j = complex(0.0,1.0) # sqrt(-1)
437         for i in range(len(freq_axis)):
438             f = freq_axis[i]
439             inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
440             X[i] *= inverse_phase_shift
441     
442         expected = zeros((len(freq_axis),), dtype=float)
443         # normalized sinc(x) = sin(pi x)/(pi x)
444         # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
445         assert sinc(0.5) == 2.0/pi, "abnormal sinc()"
446         for i in range(len(freq_axis)):
447             f = freq_axis[i]
448             expected[i] = 1.0/abs(a) * sinc(f/a)
449     
450         if TEST_PLOTS:
451             pylab.figure()
452             pylab.subplot(211)
453             pylab.plot(arange(0, dt*samples, dt), x)
454             pylab.title('time series')
455             pylab.subplot(212)
456             pylab.plot(freq_axis, X.real, 'r.')
457             pylab.plot(freq_axis, X.imag, 'g.')
458             pylab.plot(freq_axis, expected, 'b-')
459             pylab.title('freq series')
460     
461     def test_unitary_rfft_rect(self):
462         "Test unitary FFTs on variously shaped rectangular functions."
463         self.run_unitary_rfft_rect(a=0.5)
464         self.run_unitary_rfft_rect(a=2.0)
465         self.run_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
466         self.run_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
467     
468     def gaussian(self, a, t):
469         r"""Gaussian function.
470
471         Notes
472         -----
473
474         .. math:: \gaussian(a,t) = \exp^{-at^2}
475         """
476         return exp(-a * t**2)
477     
478     def run_unitary_rfft_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6,
479                                   samples=256):
480         r"""Test `unitary_rttf` on known function `gaussian(a,t)`.
481
482         Notes
483         -----
484         Analytic result:
485
486         .. math::
487
488             \rfft(\gaussian(a,t)) = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f)
489         """
490         samp_freq = float(samp_freq)
491         a = float(a)
492     
493         x = zeros((samples,), dtype=float)
494         dt = 1.0/samp_freq
495         for i in range(samples):
496             t = i*dt
497             x[i] = self.gaussian(a, (t-time_shift))
498         freq_axis, X = unitary_rfft(x, samp_freq)
499         #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
500     
501         # remove the phase due to our time shift
502         j = complex(0.0,1.0) # sqrt(-1)
503         for i in range(len(freq_axis)):
504             f = freq_axis[i]
505             inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
506             X[i] *= inverse_phase_shift
507     
508         expected = zeros((len(freq_axis),), dtype=float)
509         for i in range(len(freq_axis)):
510             f = freq_axis[i]
511             expected[i] = sqrt(pi/a) * self.gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself.
512     
513         if TEST_PLOTS:
514             pylab.figure()
515             pylab.subplot(211)
516             pylab.plot(arange(0, dt*samples, dt), x)
517             pylab.title('time series')
518             pylab.subplot(212)
519             pylab.plot(freq_axis, X.real, 'r.')
520             pylab.plot(freq_axis, X.imag, 'g.')
521             pylab.plot(freq_axis, expected, 'b-')
522             pylab.title('freq series')
523     
524     def test_unitary_rfft_gaussian(self):
525         "Test unitary FFTs on variously shaped gaussian functions."
526         self.run_unitary_rfft_gaussian(a=0.5)
527         self.run_unitary_rfft_gaussian(a=2.0)
528         self.run_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
529         self.run_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
530
531 class TestUnitaryPowerSpectrum (unittest.TestCase):
532     def run_unitary_power_spectrum_sin(self, sin_freq=10, samp_freq=512,
533                                        samples=1024):
534         x = zeros((samples,), dtype=float)
535         samp_freq = float(samp_freq)
536         for i in range(samples):
537             x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
538         freq_axis, power = unitary_power_spectrum(x, samp_freq)
539         imax = argmax(power)
540     
541         expected = zeros((len(freq_axis),), dtype=float)
542         df = samp_freq/float(samples) # df = 1/T, where T = total_time
543         i = int(sin_freq/df)
544         # average power per unit time is 
545         #  P = <x(t)**2>
546         # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
547         # so average value of (int sin(t)**2 dt) per unit time is 0.5
548         #  P = 0.5
549         # we spread that power over a frequency bin of width df, sp
550         #  P(f0) = 0.5/df
551         # where f0 is the sin's frequency
552         #
553         # or:
554         # FFT of sin(2*pi*t*f0) gives
555         #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
556         # (area under x(t) = 0, area under X(f) = 0)
557         # so one sided power spectral density (PSD) per unit time is
558         #  P(f) = 2 |X(f)|**2 / T
559         #       = 2 * |0.5 delta(f-f0)|**2 / T
560         #       = 0.5 * |delta(f-f0)|**2 / T
561         # but we're discrete and want the integral of the 'delta' to be 1, 
562         # so 'delta'*df = 1  --> 'delta' = 1/df, and
563         #  P(f) = 0.5 / (df**2 * T)
564         #       = 0.5 / df                (T = 1/df)
565         expected[i] = 0.5 / df
566     
567         print "The power should be a peak at %g Hz of %g (%g, %g)" % \
568             (sin_freq, expected[i], freq_axis[imax], power[imax])
569         Pexp = 0
570         P    = 0
571         for i in range(len(freq_axis)):
572             Pexp += expected[i] *df
573             P    += power[i] * df
574         print " The total power should be %g (%g)" % (Pexp, P)
575     
576         if TEST_PLOTS:
577             pylab.figure()
578             pylab.subplot(211)
579             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
580             pylab.title('time series')
581             pylab.subplot(212)
582             pylab.plot(freq_axis, power, 'r.')
583             pylab.plot(freq_axis, expected, 'b-')
584             pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
585     
586     def test_unitary_power_spectrum_sin(self):
587         "Test unitary power spectrums on variously shaped sin functions"
588         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
589         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
590         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
591         self.run_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
592         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
593         # finally, with some irrational numbers, to check that I'm not getting lucky
594         self.run_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
595         # test with non-integer number of periods
596         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
597     
598     def run_unitary_power_spectrum_delta(self, amp=1, samp_freq=1,
599                                          samples=256):
600         """TODO
601         """
602         x = zeros((samples,), dtype=float)
603         samp_freq = float(samp_freq)
604         x[0] = amp
605         freq_axis, power = unitary_power_spectrum(x, samp_freq)
606     
607         # power = <x(t)**2> = (amp)**2 * dt/T
608         # we spread that power over the entire freq_axis [0,fN], so
609         #  P(f)  = (amp)**2 dt / (T fN)
610         # where
611         #  dt = 1/samp_freq        (sample period)
612         #  T  = samples/samp_freq  (total time of data aquisition)
613         #  fN = 0.5 samp_freq      (Nyquist frequency)
614         # so
615         #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
616         #       = 2 amp**2 / (samp_freq*samples)
617         expected_amp = 2.0 * amp**2 / (samp_freq * samples)
618         expected = ones((len(freq_axis),), dtype=float) * expected_amp
619     
620         print "The power should be flat at y = %g (%g)" % (expected_amp, power[0])
621         
622         if TEST_PLOTS:
623             pylab.figure()
624             pylab.subplot(211)
625             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
626             pylab.title('time series')
627             pylab.subplot(212)
628             pylab.plot(freq_axis, power, 'r.')
629             pylab.plot(freq_axis, expected, 'b-')
630             pylab.title('%g samples of delta amp %g' % (samples, amp))
631     
632     def _test_unitary_power_spectrum_delta(self):
633         "Test unitary power spectrums on various delta functions"
634         _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
635         _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
636         _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
637         _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
638         _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
639         _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024)
640     
641     def gaussian(self, area, mean, std, t):
642         "Integral over all time = area (i.e. normalized for area=1)"
643         return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2)
644         
645     def run_unitary_power_spectrum_gaussian(self, area=2.5, mean=5, std=1,
646                                             samp_freq=10.24 ,samples=512):
647         """TODO.
648         """
649         x = zeros((samples,), dtype=float)
650         mean = float(mean)
651         for i in range(samples):
652             t = i/float(samp_freq)
653             x[i] = self.gaussian(area, mean, std, t)
654         freq_axis, power = unitary_power_spectrum(x, samp_freq)
655     
656         # generate the predicted curve
657         # by comparing our self.gaussian() form to _gaussian(),
658         # we see that the Fourier transform of x(t) has parameters:
659         #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
660         #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
661         #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
662         # So our power spectral density per unit time is given by
663         #  P(f) = 2 |X(f)|**2 / T
664         # Where
665         #  T  = samples/samp_freq  (total time of data aquisition)
666         mean = 0.0
667         area = area /(std*sqrt(2.0*pi))
668         std = 1.0/(2.0*pi*std)
669         expected = zeros((len(freq_axis),), dtype=float)
670         df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
671         for i in range(len(freq_axis)):
672             f = i*df
673             gaus = self.gaussian(area, mean, std, f)
674             expected[i] = 2.0 * gaus**2 * samp_freq/samples
675         print "The power should be a half-gaussian, ",
676         print "with a peak at 0 Hz with amplitude %g (%g)" % (expected[0], power[0])
677     
678         if TEST_PLOTS:
679             pylab.figure()
680             pylab.subplot(211)
681             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
682             pylab.title('time series')
683             pylab.subplot(212)
684             pylab.plot(freq_axis, power, 'r.')
685             pylab.plot(freq_axis, expected, 'b-')
686             pylab.title('freq series')
687     
688     def test_unitary_power_spectrum_gaussian(self):
689         "Test unitary power spectrums on various gaussian functions"
690         for area in [1,pi]:
691             for std in [1,sqrt(2)]:
692                 for samp_freq in [10.0, exp(1)]:
693                     for samples in [1024,2048]:
694                         self.run_unitary_power_spectrum_gaussian(
695                             area=area, std=std, samp_freq=samp_freq,
696                             samples=samples)
697
698 class TestUnitaryAvgPowerSpectrum (unittest.TestCase):
699     def run_unitary_avg_power_spectrum_sin(self, sin_freq=10, samp_freq=512,
700                                            samples=1024, chunk_size=512,
701                                            overlap=True, window=window_hann):
702         """TODO
703         """
704         x = zeros((samples,), dtype=float)
705         samp_freq = float(samp_freq)
706         for i in range(samples):
707             x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
708         freq_axis, power = unitary_avg_power_spectrum(x, samp_freq, chunk_size,
709                                                       overlap, window)
710         imax = argmax(power)
711     
712         expected = zeros((len(freq_axis),), dtype=float)
713         df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time
714         i = int(sin_freq/df)
715         expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
716     
717         print "The power should be a peak at %g Hz of %g (%g, %g)" % \
718             (sin_freq, expected[i], freq_axis[imax], power[imax])
719         Pexp = 0
720         P    = 0
721         for i in range(len(freq_axis)):
722             Pexp += expected[i] * df
723             P    += power[i] * df
724         print " The total power should be %g (%g)" % (Pexp, P)
725     
726         if TEST_PLOTS:
727             pylab.figure()
728             pylab.subplot(211)
729             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
730             pylab.title('time series')
731             pylab.subplot(212)
732             pylab.plot(freq_axis, power, 'r.')
733             pylab.plot(freq_axis, expected, 'b-')
734             pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
735     
736     def test_unitary_avg_power_spectrum_sin(self):
737         "Test unitary avg power spectrums on variously shaped sin functions."
738         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
739         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
740         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
741         self.run_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
742         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
743         # test long wavelenth sin, so be closer to window frequency
744         self.run_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
745         # finally, with some irrational numbers, to check that I'm not getting lucky
746         self.run_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)