Ran update_copyright.py.
[hooke.git] / hooke / util / fft.py
1 # Copyright (C) 2010-2012 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of Hooke.
4 #
5 # Hooke is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # Hooke is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
13 # Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Wrap :mod:`numpy.fft` to produce 1D unitary transforms and power spectra.
20
21 Define some FFT wrappers to reduce clutter.
22 Provides a unitary discrete FFT and a windowed version.
23 Based on :func:`numpy.fft.rfft`.
24
25 Main entry functions:
26
27 * :func:`unitary_rfft`
28 * :func:`power_spectrum`
29 * :func:`unitary_power_spectrum`
30 * :func:`avg_power_spectrum`
31 * :func:`unitary_avg_power_spectrum`
32 """
33
34 import unittest
35
36 from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \
37     sinc, arctan2, array, ones, arange, linspace, zeros, \
38     uint16, float, concatenate, fromfile, argmax, complex
39 from numpy.fft import rfft
40
41
42 TEST_PLOTS = False
43
44 def floor_pow_of_two(num):
45     """Round `num` down to the closest exact a power of two.
46
47     Examples
48     --------
49
50     >>> floor_pow_of_two(3)
51     2
52     >>> floor_pow_of_two(11)
53     8
54     >>> floor_pow_of_two(15)
55     8
56     """
57     lnum = log2(num)
58     if int(lnum) != lnum:
59         num = 2**floor(lnum)
60     return int(num)
61
62 def round_pow_of_two(num):
63     """Round `num` to the closest exact a power of two on a log scale.
64
65     Examples
66     --------
67
68     >>> round_pow_of_two(2.9) # Note rounding on *log scale*
69     4
70     >>> round_pow_of_two(11)
71     8
72     >>> round_pow_of_two(15)
73     16
74     """
75     lnum = log2(num)
76     if int(lnum) != lnum:
77         num = 2**round(lnum)
78     return int(num)
79
80 def ceil_pow_of_two(num):
81     """Round `num` up to the closest exact a power of two.
82
83     Examples
84     --------
85
86     >>> ceil_pow_of_two(3)
87     4
88     >>> ceil_pow_of_two(11)
89     16
90     >>> ceil_pow_of_two(15)
91     16
92     """
93     lnum = log2(num)
94     if int(lnum) != lnum:
95         num = 2**ceil(lnum)
96     return int(num)
97
98 def unitary_rfft(data, freq=1.0):
99     """Compute the unitary Fourier transform of real data.
100
101     Unitary = preserves power [Parseval's theorem].
102
103     Parameters
104     ----------
105     data : iterable
106         Real (not complex) data taken with a sampling frequency `freq`.
107     freq : real
108         Sampling frequency.
109
110     Returns
111     -------
112     freq_axis,trans : numpy.ndarray
113         Arrays ready for plotting.
114
115     Notes
116     -----
117     If the units on your data are Volts,
118     and your sampling frequency is in Hz,
119     then `freq_axis` will be in Hz,
120     and `trans` will be in Volts.
121     """
122     nsamps = floor_pow_of_two(len(data))
123     # Which should satisfy the discrete form of Parseval's theorem
124     #   n-1               n-1
125     #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
126     #   m=0               k=0
127     # However, we want our FFT to satisfy the continuous Parseval eqn
128     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
129     # which has the discrete form
130     #   n-1              n-1
131     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
132     #   m=0              k=0
133     # with X'_k = AX, this gives us
134     #   n-1                     n-1
135     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
136     #   m=0                     k=0
137     # so we see
138     #   A^2 df/dt = 1/n
139     #   A^2 = 1/n dt/df
140     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
141     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
142     # f_c in the transformed data is the Nyquist frequency (12.1.2)
143     #   f_c = 1/2dt
144     # and the points are spaced out by (12.1.5)
145     #   df = 1/ndt
146     # so
147     #   dt = 1/ndf
148     #   dt/df = 1/ndf^2
149     #   A^2 = 1/n^2df^2
150     #   A = 1/ndf = ndt/n = dt
151     # so we can convert the Numpy transformed data to match our unitary
152     # continuous transformed data with (also NR 12.1.8)
153     #   X'_k = dtX = X / <sampling freq>
154     trans = rfft(data[0:nsamps]) / float(freq)
155     freq_axis = linspace(0, freq/2, nsamps/2+1)
156     return (freq_axis, trans)
157
158 def power_spectrum(data, freq=1.0):
159     """Compute the power spectrum of the time series `data`.
160
161     Parameters
162     ----------
163     data : iterable
164         Real (not complex) data taken with a sampling frequency `freq`.
165     freq : real
166         Sampling frequency.
167
168     Returns
169     -------
170     freq_axis,power : numpy.ndarray
171         Arrays ready for plotting.
172
173     Notes
174     -----
175     If the number of samples in `data` is not an integer power of two,
176     the FFT ignores some of the later points.
177
178     See Also
179     --------
180     unitary_power_spectrum,avg_power_spectrum
181     """
182     nsamps = floor_pow_of_two(len(data))
183     
184     freq_axis = linspace(0, freq/2, nsamps/2+1)
185     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
186     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
187     # See Numerical Recipies for a details.
188     trans = rfft(data[0:nsamps])
189     power = trans * trans.conj() # We want the square of the amplitude.
190     return (freq_axis, power)
191
192 def unitary_power_spectrum(data, freq=1.0):
193     """Compute the unitary power spectrum of the time series `data`.
194
195     See Also
196     --------
197     power_spectrum,unitary_avg_power_spectrum
198     """
199     freq_axis,power = power_spectrum(data, freq)
200     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
201     #
202     # numpy normalizes with 1/N on the inverse transform ifft,
203     # so we should normalize the freq-space representation with 1/sqrt(N).
204     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
205     # So the power gets normalized by that twice and we have 2/N
206     #
207     # On top of this, the FFT assumes a sampling freq of 1 per second,
208     # and we want to preserve area under our curves.
209     # If our total time T = len(data)/freq is smaller than 1,
210     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)), 
211     # and we need to scale the powers down to conserve area.
212     # df_fft * F_fft(f) = df_real *F_real(f)
213     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
214     # So the power gets normalized by *that* twice and we have 2/N * freq**2
215
216     # power per unit time
217     # measure x(t) for time T
218     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
219     # PSD(f) = 2 |X(f)|**2 / T
220
221     # total_time = len(data)/float(freq)
222     # power *= 2.0 / float(freq)**2   /   total_time
223     # power *= 2.0 / freq**2   *   freq / len(data)
224     power *= 2.0 / (freq * float(len(data)))
225
226     return (freq_axis, power)
227
228 def window_hann(length):
229     r"""Returns a Hann window array with length entries
230
231     Notes
232     -----
233     The Hann window with length :math:`L` is defined as
234
235     .. math:: w_i = \frac{1}{2} (1-\cos(2\pi i/L))
236     """
237     win = zeros((length,), dtype=float)
238     for i in range(length):
239         win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length)))
240     # avg value of cos over a period is 0
241     # so average height of Hann window is 0.5
242     return win
243
244 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
245                        overlap=True, window=window_hann):
246     """Compute the avgerage power spectrum of `data`.
247
248     Parameters
249     ----------
250     data : iterable
251         Real (not complex) data taken with a sampling frequency `freq`.
252     freq : real
253         Sampling frequency.
254     chunk_size : int
255         Number of samples per chunk.  Use a power of two.
256     overlap: {True,False}
257         If `True`, each chunk overlaps the previous chunk by half its
258         length.  Otherwise, the chunks are end-to-end, and not
259         overlapping.
260     window: iterable
261         Weights used to "smooth" the chunks, there is a whole science
262         behind windowing, but if you're not trying to squeeze every
263         drop of information out of your data, you'll be OK with the
264         default Hann window.
265
266     Returns
267     -------
268     freq_axis,power : numpy.ndarray
269         Arrays ready for plotting.
270
271     Notes
272     -----
273     The average power spectrum is computed by breaking `data` into
274     chunks of length `chunk_size`.  These chunks are transformed
275     individually into frequency space and then averaged together.
276
277     See Numerical Recipes 2 section 13.4 for a good introduction to
278     the theory.
279
280     If the number of samples in `data` is not a multiple of
281     `chunk_size`, we ignore the extra points.
282     """
283     assert chunk_size == floor_pow_of_two(chunk_size), \
284         "chunk_size %d should be a power of 2" % chunk_size
285
286     nchunks = len(data)/chunk_size # integer division = implicit floor
287     if overlap:
288         chunk_step = chunk_size/2
289     else:
290         chunk_step = chunk_size
291     
292     win = window(chunk_size) # generate a window of the appropriate size
293     freq_axis = linspace(0, freq/2, chunk_size/2+1)
294     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
295     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
296     # See Numerical Recipies for a details.
297     power = zeros((chunk_size/2+1,), dtype=float)
298     for i in range(nchunks):
299         starti = i*chunk_step
300         stopi = starti+chunk_size
301         fft_chunk = rfft(data[starti:stopi]*win)
302         p_chunk = fft_chunk * fft_chunk.conj() 
303         power += p_chunk.astype(float)
304     power /= float(nchunks)
305     return (freq_axis, power)
306
307 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
308                                overlap=True, window=window_hann):
309     """Compute the unitary average power spectrum of `data`.
310
311     See Also
312     --------
313     avg_power_spectrum,unitary_power_spectrum
314     """
315     freq_axis,power = avg_power_spectrum(data, freq, chunk_size,
316                                          overlap, window)
317     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
318     power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum()            
319     #                                       * 8/3  to remove power from windowing
320     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
321     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
322     # So our calulated power has and extra <w(t)**2> in it.
323     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
324     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
325     # The normalization is not perfect. ??
326     # The normalization approaches perfection as chunk_size -> infinity.
327     return (freq_axis, power)
328
329
330
331 class TestRFFT (unittest.TestCase):
332     r"""Ensure Numpy's FFT algorithm acts as expected.
333
334     Notes
335     -----
336     The expected return values are [#dft]_:
337
338     .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n}
339
340     .. [#dft] See the *Background information* section of :mod:`numpy.fft`.
341     """
342     def run_rfft(self, xs, Xs):
343         i = complex(0,1)
344         n = len(xs)
345         Xa = []
346         for k in range(n):
347             Xa.append(sum([x*exp(-2*pi*i*m*k/n) for x,m in zip(xs,range(n))]))
348             if k < len(Xs):
349                 assert (Xs[k]-Xa[k])/abs(Xa[k]) < 1e-6, \
350                     "rfft mismatch on element %d: %g != %g, relative error %g" \
351                     % (k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k]))
352         # Which should satisfy the discrete form of Parseval's theorem
353         #   n-1               n-1
354         #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
355         #   m=0               k=0
356         timeSum = sum([abs(x)**2 for x in xs])
357         freqSum = sum([abs(X)**2 for X in Xa])
358         assert abs(freqSum/float(n) - timeSum)/timeSum < 1e-6, \
359             "Mismatch on Parseval's, %g != 1/%d * %g" % (timeSum, n, freqSum)
360
361     def test_rfft(self):
362         xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
363         self.run_rfft(xs, rfft(xs))
364
365 class TestUnitaryRFFT (unittest.TestCase):
366     """Verify `unitary_rfft`.
367     """
368     def run_unitary_rfft_parsevals(self, xs, freq, freqs, Xs):
369         """Check the discretized integral form of Parseval's theorem
370
371         Notes
372         -----
373         Which is:
374
375         .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df
376         """
377         dt = 1.0/freq
378         df = freqs[1]-freqs[0]
379         assert (df - 1/(len(xs)*dt))/df < 1e-6, \
380             "Mismatch in spacing, %g != 1/(%d*%g)" % (df, len(xs), dt)
381         Xa = list(Xs)
382         for k in range(len(Xs)-1,1,-1):
383             Xa.append(Xa[k])
384         assert len(xs) == len(Xa), "Length mismatch %d != %d" % (len(xs), len(Xa))
385         lhs = sum([abs(x)**2 for x in xs]) * dt
386         rhs = sum([abs(X)**2 for X in Xa]) * df
387         assert abs(lhs - rhs)/lhs < 1e-4, "Mismatch on Parseval's, %g != %g" \
388             % (lhs, rhs)
389     
390     def test_unitary_rfft_parsevals(self):
391         "Test unitary rfft on Parseval's theorem"
392         xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
393         dt = pi
394         freqs,Xs = unitary_rfft(xs, 1.0/dt)
395         self.run_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
396     
397     def rect(self, t):
398         r"""Rectangle function.
399
400         Notes
401         -----
402
403         .. math::
404
405             \rect(t) = \begin{cases}
406                1& \text{if $|t| < 0.5$}, \\
407                0& \text{if $|t| \ge 0.5$}.
408                        \end{cases}
409         """
410         if abs(t) < 0.5:
411             return 1
412         else:
413             return 0
414     
415     def run_unitary_rfft_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6,
416                               samples=256):
417         r"""Test `unitary_rttf` on known function `rect(at)`.
418
419         Notes
420         -----
421         Analytic result:
422
423         .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a)
424         """
425         samp_freq = float(samp_freq)
426         a = float(a)
427     
428         x = zeros((samples,), dtype=float)
429         dt = 1.0/samp_freq
430         for i in range(samples):
431             t = i*dt
432             x[i] = self.rect(a*(t-time_shift))
433         freq_axis, X = unitary_rfft(x, samp_freq)
434         #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
435     
436         # remove the phase due to our time shift
437         j = complex(0.0,1.0) # sqrt(-1)
438         for i in range(len(freq_axis)):
439             f = freq_axis[i]
440             inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
441             X[i] *= inverse_phase_shift
442     
443         expected = zeros((len(freq_axis),), dtype=float)
444         # normalized sinc(x) = sin(pi x)/(pi x)
445         # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
446         assert sinc(0.5) == 2.0/pi, "abnormal sinc()"
447         for i in range(len(freq_axis)):
448             f = freq_axis[i]
449             expected[i] = 1.0/abs(a) * sinc(f/a)
450     
451         if TEST_PLOTS:
452             pylab.figure()
453             pylab.subplot(211)
454             pylab.plot(arange(0, dt*samples, dt), x)
455             pylab.title('time series')
456             pylab.subplot(212)
457             pylab.plot(freq_axis, X.real, 'r.')
458             pylab.plot(freq_axis, X.imag, 'g.')
459             pylab.plot(freq_axis, expected, 'b-')
460             pylab.title('freq series')
461     
462     def test_unitary_rfft_rect(self):
463         "Test unitary FFTs on variously shaped rectangular functions."
464         self.run_unitary_rfft_rect(a=0.5)
465         self.run_unitary_rfft_rect(a=2.0)
466         self.run_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
467         self.run_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
468     
469     def gaussian(self, a, t):
470         r"""Gaussian function.
471
472         Notes
473         -----
474
475         .. math:: \gaussian(a,t) = \exp^{-at^2}
476         """
477         return exp(-a * t**2)
478     
479     def run_unitary_rfft_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6,
480                                   samples=256):
481         r"""Test `unitary_rttf` on known function `gaussian(a,t)`.
482
483         Notes
484         -----
485         Analytic result:
486
487         .. math::
488
489             \rfft(\gaussian(a,t)) = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f)
490         """
491         samp_freq = float(samp_freq)
492         a = float(a)
493     
494         x = zeros((samples,), dtype=float)
495         dt = 1.0/samp_freq
496         for i in range(samples):
497             t = i*dt
498             x[i] = self.gaussian(a, (t-time_shift))
499         freq_axis, X = unitary_rfft(x, samp_freq)
500         #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
501     
502         # remove the phase due to our time shift
503         j = complex(0.0,1.0) # sqrt(-1)
504         for i in range(len(freq_axis)):
505             f = freq_axis[i]
506             inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
507             X[i] *= inverse_phase_shift
508     
509         expected = zeros((len(freq_axis),), dtype=float)
510         for i in range(len(freq_axis)):
511             f = freq_axis[i]
512             expected[i] = sqrt(pi/a) * self.gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself.
513     
514         if TEST_PLOTS:
515             pylab.figure()
516             pylab.subplot(211)
517             pylab.plot(arange(0, dt*samples, dt), x)
518             pylab.title('time series')
519             pylab.subplot(212)
520             pylab.plot(freq_axis, X.real, 'r.')
521             pylab.plot(freq_axis, X.imag, 'g.')
522             pylab.plot(freq_axis, expected, 'b-')
523             pylab.title('freq series')
524     
525     def test_unitary_rfft_gaussian(self):
526         "Test unitary FFTs on variously shaped gaussian functions."
527         self.run_unitary_rfft_gaussian(a=0.5)
528         self.run_unitary_rfft_gaussian(a=2.0)
529         self.run_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
530         self.run_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
531
532 class TestUnitaryPowerSpectrum (unittest.TestCase):
533     def run_unitary_power_spectrum_sin(self, sin_freq=10, samp_freq=512,
534                                        samples=1024):
535         x = zeros((samples,), dtype=float)
536         samp_freq = float(samp_freq)
537         for i in range(samples):
538             x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
539         freq_axis, power = unitary_power_spectrum(x, samp_freq)
540         imax = argmax(power)
541     
542         expected = zeros((len(freq_axis),), dtype=float)
543         df = samp_freq/float(samples) # df = 1/T, where T = total_time
544         i = int(sin_freq/df)
545         # average power per unit time is 
546         #  P = <x(t)**2>
547         # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
548         # so average value of (int sin(t)**2 dt) per unit time is 0.5
549         #  P = 0.5
550         # we spread that power over a frequency bin of width df, sp
551         #  P(f0) = 0.5/df
552         # where f0 is the sin's frequency
553         #
554         # or:
555         # FFT of sin(2*pi*t*f0) gives
556         #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
557         # (area under x(t) = 0, area under X(f) = 0)
558         # so one sided power spectral density (PSD) per unit time is
559         #  P(f) = 2 |X(f)|**2 / T
560         #       = 2 * |0.5 delta(f-f0)|**2 / T
561         #       = 0.5 * |delta(f-f0)|**2 / T
562         # but we're discrete and want the integral of the 'delta' to be 1, 
563         # so 'delta'*df = 1  --> 'delta' = 1/df, and
564         #  P(f) = 0.5 / (df**2 * T)
565         #       = 0.5 / df                (T = 1/df)
566         expected[i] = 0.5 / df
567     
568         print "The power should be a peak at %g Hz of %g (%g, %g)" % \
569             (sin_freq, expected[i], freq_axis[imax], power[imax])
570         Pexp = 0
571         P    = 0
572         for i in range(len(freq_axis)):
573             Pexp += expected[i] *df
574             P    += power[i] * df
575         print " The total power should be %g (%g)" % (Pexp, P)
576     
577         if TEST_PLOTS:
578             pylab.figure()
579             pylab.subplot(211)
580             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
581             pylab.title('time series')
582             pylab.subplot(212)
583             pylab.plot(freq_axis, power, 'r.')
584             pylab.plot(freq_axis, expected, 'b-')
585             pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
586     
587     def test_unitary_power_spectrum_sin(self):
588         "Test unitary power spectrums on variously shaped sin functions"
589         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
590         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
591         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
592         self.run_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
593         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
594         # finally, with some irrational numbers, to check that I'm not getting lucky
595         self.run_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
596         # test with non-integer number of periods
597         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
598     
599     def run_unitary_power_spectrum_delta(self, amp=1, samp_freq=1,
600                                          samples=256):
601         """TODO
602         """
603         x = zeros((samples,), dtype=float)
604         samp_freq = float(samp_freq)
605         x[0] = amp
606         freq_axis, power = unitary_power_spectrum(x, samp_freq)
607     
608         # power = <x(t)**2> = (amp)**2 * dt/T
609         # we spread that power over the entire freq_axis [0,fN], so
610         #  P(f)  = (amp)**2 dt / (T fN)
611         # where
612         #  dt = 1/samp_freq        (sample period)
613         #  T  = samples/samp_freq  (total time of data aquisition)
614         #  fN = 0.5 samp_freq      (Nyquist frequency)
615         # so
616         #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
617         #       = 2 amp**2 / (samp_freq*samples)
618         expected_amp = 2.0 * amp**2 / (samp_freq * samples)
619         expected = ones((len(freq_axis),), dtype=float) * expected_amp
620     
621         print "The power should be flat at y = %g (%g)" % (expected_amp, power[0])
622         
623         if TEST_PLOTS:
624             pylab.figure()
625             pylab.subplot(211)
626             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
627             pylab.title('time series')
628             pylab.subplot(212)
629             pylab.plot(freq_axis, power, 'r.')
630             pylab.plot(freq_axis, expected, 'b-')
631             pylab.title('%g samples of delta amp %g' % (samples, amp))
632     
633     def _test_unitary_power_spectrum_delta(self):
634         "Test unitary power spectrums on various delta functions"
635         _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
636         _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
637         _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
638         _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
639         _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
640         _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024)
641     
642     def gaussian(self, area, mean, std, t):
643         "Integral over all time = area (i.e. normalized for area=1)"
644         return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2)
645         
646     def run_unitary_power_spectrum_gaussian(self, area=2.5, mean=5, std=1,
647                                             samp_freq=10.24 ,samples=512):
648         """TODO.
649         """
650         x = zeros((samples,), dtype=float)
651         mean = float(mean)
652         for i in range(samples):
653             t = i/float(samp_freq)
654             x[i] = self.gaussian(area, mean, std, t)
655         freq_axis, power = unitary_power_spectrum(x, samp_freq)
656     
657         # generate the predicted curve
658         # by comparing our self.gaussian() form to _gaussian(),
659         # we see that the Fourier transform of x(t) has parameters:
660         #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
661         #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
662         #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
663         # So our power spectral density per unit time is given by
664         #  P(f) = 2 |X(f)|**2 / T
665         # Where
666         #  T  = samples/samp_freq  (total time of data aquisition)
667         mean = 0.0
668         area = area /(std*sqrt(2.0*pi))
669         std = 1.0/(2.0*pi*std)
670         expected = zeros((len(freq_axis),), dtype=float)
671         df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
672         for i in range(len(freq_axis)):
673             f = i*df
674             gaus = self.gaussian(area, mean, std, f)
675             expected[i] = 2.0 * gaus**2 * samp_freq/samples
676         print "The power should be a half-gaussian, ",
677         print "with a peak at 0 Hz with amplitude %g (%g)" % (expected[0], power[0])
678     
679         if TEST_PLOTS:
680             pylab.figure()
681             pylab.subplot(211)
682             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
683             pylab.title('time series')
684             pylab.subplot(212)
685             pylab.plot(freq_axis, power, 'r.')
686             pylab.plot(freq_axis, expected, 'b-')
687             pylab.title('freq series')
688     
689     def test_unitary_power_spectrum_gaussian(self):
690         "Test unitary power spectrums on various gaussian functions"
691         for area in [1,pi]:
692             for std in [1,sqrt(2)]:
693                 for samp_freq in [10.0, exp(1)]:
694                     for samples in [1024,2048]:
695                         self.run_unitary_power_spectrum_gaussian(
696                             area=area, std=std, samp_freq=samp_freq,
697                             samples=samples)
698
699 class TestUnitaryAvgPowerSpectrum (unittest.TestCase):
700     def run_unitary_avg_power_spectrum_sin(self, sin_freq=10, samp_freq=512,
701                                            samples=1024, chunk_size=512,
702                                            overlap=True, window=window_hann):
703         """TODO
704         """
705         x = zeros((samples,), dtype=float)
706         samp_freq = float(samp_freq)
707         for i in range(samples):
708             x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
709         freq_axis, power = unitary_avg_power_spectrum(x, samp_freq, chunk_size,
710                                                       overlap, window)
711         imax = argmax(power)
712     
713         expected = zeros((len(freq_axis),), dtype=float)
714         df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time
715         i = int(sin_freq/df)
716         expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
717     
718         print "The power should be a peak at %g Hz of %g (%g, %g)" % \
719             (sin_freq, expected[i], freq_axis[imax], power[imax])
720         Pexp = 0
721         P    = 0
722         for i in range(len(freq_axis)):
723             Pexp += expected[i] * df
724             P    += power[i] * df
725         print " The total power should be %g (%g)" % (Pexp, P)
726     
727         if TEST_PLOTS:
728             pylab.figure()
729             pylab.subplot(211)
730             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
731             pylab.title('time series')
732             pylab.subplot(212)
733             pylab.plot(freq_axis, power, 'r.')
734             pylab.plot(freq_axis, expected, 'b-')
735             pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
736     
737     def test_unitary_avg_power_spectrum_sin(self):
738         "Test unitary avg power spectrums on variously shaped sin functions."
739         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
740         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
741         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
742         self.run_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
743         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
744         # test long wavelenth sin, so be closer to window frequency
745         self.run_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
746         # finally, with some irrational numbers, to check that I'm not getting lucky
747         self.run_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)