Ran update_copyright.py
[hooke.git] / hooke / util / fft.py
1 # Copyright (C) 2008-2010 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of Hooke.
4 #
5 # Hooke is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # Hooke is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
13 # Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Wrap :mod:`numpy.fft` to produce 1D unitary transforms and power spectra.
20
21 Define some FFT wrappers to reduce clutter.
22 Provides a unitary discrete FFT and a windowed version.
23 Based on :func:`numpy.fft.rfft`.
24
25 Main entry functions:
26
27 * :func:`unitary_rfft`
28 * :func:`power_spectrum`
29 * :func:`unitary_power_spectrum`
30 * :func:`avg_power_spectrum`
31 * :func:`unitary_avg_power_spectrum`
32 """
33
34 import unittest
35
36 from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \
37     sinc, arctan2, array, ones, arange, linspace, zeros, \
38     uint16, float, concatenate, fromfile, argmax, complex
39 from numpy.fft import rfft
40
41
42 TEST_PLOTS = False
43
44 def floor_pow_of_two(num):
45     """Round `num` down to the closest exact a power of two.
46
47     Examples
48     --------
49
50     >>> floor_pow_of_two(3)
51     2
52     >>> floor_pow_of_two(11)
53     8
54     >>> floor_pow_of_two(15)
55     8
56     """
57     lnum = log2(num)
58     if int(lnum) != lnum:
59         num = 2**floor(lnum)
60     return int(num)
61
62 def round_pow_of_two(num):
63     """Round `num` to the closest exact a power of two on a log scale.
64
65     Examples
66     --------
67
68     >>> round_pow_of_two(2.9) # Note rounding on *log scale*
69     4
70     >>> round_pow_of_two(11)
71     8
72     >>> round_pow_of_two(15)
73     16
74     """
75     lnum = log2(num)
76     if int(lnum) != lnum:
77         num = 2**round(lnum)
78     return int(num)
79
80 def ceil_pow_of_two(num):
81     """Round `num` up to the closest exact a power of two.
82
83     Examples
84     --------
85
86     >>> ceil_pow_of_two(3)
87     4
88     >>> ceil_pow_of_two(11)
89     16
90     >>> ceil_pow_of_two(15)
91     16
92     """
93     lnum = log2(num)
94     if int(lnum) != lnum:
95         num = 2**ceil(lnum)
96     return int(num)
97
98 def unitary_rfft(data, freq=1.0):
99     """Compute the unitary Fourier transform of real data.
100
101     Unitary = preserves power [Parseval's theorem].
102
103     Parameters
104     ----------
105     data : iterable
106         Real (not complex) data taken with a sampling frequency `freq`.
107     freq : real
108         Sampling frequency.
109
110     Returns
111     -------
112     freq_axis,trans : numpy.ndarray
113         Arrays ready for plotting.
114
115     Notes
116     -----
117     
118     If the units on your data are Volts,
119     and your sampling frequency is in Hz,
120     then `freq_axis` will be in Hz,
121     and `trans` will be in Volts.
122     """
123     nsamps = floor_pow_of_two(len(data))
124     # Which should satisfy the discrete form of Parseval's theorem
125     #   n-1               n-1
126     #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
127     #   m=0               k=0
128     # However, we want our FFT to satisfy the continuous Parseval eqn
129     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
130     # which has the discrete form
131     #   n-1              n-1
132     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
133     #   m=0              k=0
134     # with X'_k = AX, this gives us
135     #   n-1                     n-1
136     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
137     #   m=0                     k=0
138     # so we see
139     #   A^2 df/dt = 1/n
140     #   A^2 = 1/n dt/df
141     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
142     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
143     # f_c in the transformed data is the Nyquist frequency (12.1.2)
144     #   f_c = 1/2dt
145     # and the points are spaced out by (12.1.5)
146     #   df = 1/ndt
147     # so
148     #   dt = 1/ndf
149     #   dt/df = 1/ndf^2
150     #   A^2 = 1/n^2df^2
151     #   A = 1/ndf = ndt/n = dt
152     # so we can convert the Numpy transformed data to match our unitary
153     # continuous transformed data with (also NR 12.1.8)
154     #   X'_k = dtX = X / <sampling freq>
155     trans = rfft(data[0:nsamps]) / float(freq)
156     freq_axis = linspace(0, freq/2, nsamps/2+1)
157     return (freq_axis, trans)
158
159 def power_spectrum(data, freq=1.0):
160     """Compute the power spectrum of the time series `data`.
161
162     Parameters
163     ----------
164     data : iterable
165         Real (not complex) data taken with a sampling frequency `freq`.
166     freq : real
167         Sampling frequency.
168
169     Returns
170     -------
171     freq_axis,power : numpy.ndarray
172         Arrays ready for plotting.
173
174     Notes
175     -----
176     If the number of samples in `data` is not an integer power of two,
177     the FFT ignores some of the later points.
178
179     See Also
180     --------
181     unitary_power_spectrum,avg_power_spectrum
182     """
183     nsamps = floor_pow_of_two(len(data))
184     
185     freq_axis = linspace(0, freq/2, nsamps/2+1)
186     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
187     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
188     # See Numerical Recipies for a details.
189     trans = rfft(data[0:nsamps])
190     power = trans * trans.conj() # We want the square of the amplitude.
191     return (freq_axis, power)
192
193 def unitary_power_spectrum(data, freq=1.0):
194     """Compute the unitary power spectrum of the time series `data`.
195
196     See Also
197     --------
198     power_spectrum,unitary_avg_power_spectrum
199     """
200     freq_axis,power = power_spectrum(data, freq)
201     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
202     #
203     # numpy normalizes with 1/N on the inverse transform ifft,
204     # so we should normalize the freq-space representation with 1/sqrt(N).
205     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
206     # So the power gets normalized by that twice and we have 2/N
207     #
208     # On top of this, the FFT assumes a sampling freq of 1 per second,
209     # and we want to preserve area under our curves.
210     # If our total time T = len(data)/freq is smaller than 1,
211     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)), 
212     # and we need to scale the powers down to conserve area.
213     # df_fft * F_fft(f) = df_real *F_real(f)
214     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
215     # So the power gets normalized by *that* twice and we have 2/N * freq**2
216
217     # power per unit time
218     # measure x(t) for time T
219     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
220     # PSD(f) = 2 |X(f)|**2 / T
221
222     # total_time = len(data)/float(freq)
223     # power *= 2.0 / float(freq)**2   /   total_time
224     # power *= 2.0 / freq**2   *   freq / len(data)
225     power *= 2.0 / (freq * float(len(data)))
226
227     return (freq_axis, power)
228
229 def window_hann(length):
230     r"""Returns a Hann window array with length entries
231
232     Notes
233     -----
234     The Hann window with length :math:`L` is defined as
235
236     .. math:: w_i = \frac{1}{2} (1-\cos(2\pi i/L))
237     """
238     win = zeros((length,), dtype=float)
239     for i in range(length):
240         win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length)))
241     # avg value of cos over a period is 0
242     # so average height of Hann window is 0.5
243     return win
244
245 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
246                        overlap=True, window=window_hann):
247     """Compute the avgerage power spectrum of `data`.
248
249     Parameters
250     ----------
251     data : iterable
252         Real (not complex) data taken with a sampling frequency `freq`.
253     freq : real
254         Sampling frequency.
255     chunk_size : int
256         Number of samples per chunk.  Use a power of two.
257     overlap: {True,False}
258         If `True`, each chunk overlaps the previous chunk by half its
259         length.  Otherwise, the chunks are end-to-end, and not
260         overlapping.
261     window: iterable
262         Weights used to "smooth" the chunks, there is a whole science
263         behind windowing, but if you're not trying to squeeze every
264         drop of information out of your data, you'll be OK with the
265         default Hann window.
266
267     Returns
268     -------
269     freq_axis,power : numpy.ndarray
270         Arrays ready for plotting.
271
272     Notes
273     -----
274     The average power spectrum is computed by breaking `data` into
275     chunks of length `chunk_size`.  These chunks are transformed
276     individually into frequency space and then averaged together.
277
278     See Numerical Recipes 2 section 13.4 for a good introduction to
279     the theory.
280
281     If the number of samples in `data` is not a multiple of
282     `chunk_size`, we ignore the extra points.
283     """
284     assert chunk_size == floor_pow_of_two(chunk_size), \
285         "chunk_size %d should be a power of 2" % chunk_size
286
287     nchunks = len(data)/chunk_size # integer division = implicit floor
288     if overlap:
289         chunk_step = chunk_size/2
290     else:
291         chunk_step = chunk_size
292     
293     win = window(chunk_size) # generate a window of the appropriate size
294     freq_axis = linspace(0, freq/2, chunk_size/2+1)
295     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
296     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
297     # See Numerical Recipies for a details.
298     power = zeros((chunk_size/2+1,), dtype=float)
299     for i in range(nchunks):
300         starti = i*chunk_step
301         stopi = starti+chunk_size
302         fft_chunk = rfft(data[starti:stopi]*win)
303         p_chunk = fft_chunk * fft_chunk.conj() 
304         power += p_chunk.astype(float)
305     power /= float(nchunks)
306     return (freq_axis, power)
307
308 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
309                                overlap=True, window=window_hann):
310     """Compute the unitary avgerage power spectrum of `data`.
311
312     See Also
313     --------
314     avg_power_spectrum,unitary_power_spectrum
315     """
316     freq_axis,power = avg_power_spectrum(data, freq, chunk_size,
317                                          overlap, window)
318     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
319     power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum()            
320     #                                       * 8/3  to remove power from windowing
321     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
322     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
323     # So our calulated power has and extra <w(t)**2> in it.
324     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
325     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
326     # The normalization is not perfect. ??
327     # The normalization approaches perfection as chunk_size -> infinity.
328     return (freq_axis, power)
329
330
331
332 class TestRFFT (unittest.TestCase):
333     r"""Ensure Numpy's FFT algorithm acts as expected.
334
335     Notes
336     -----
337
338     The expected return values are [#dft]_:
339
340     .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n}
341
342     .. [#dft] See the *Background information* section of :mod:`numpy.fft`.
343     """
344     def run_rfft(self, xs, Xs):
345         i = complex(0,1)
346         n = len(xs)
347         Xa = []
348         for k in range(n):
349             Xa.append(sum([x*exp(-2*pi*i*m*k/n) for x,m in zip(xs,range(n))]))
350             if k < len(Xs):
351                 assert (Xs[k]-Xa[k])/abs(Xa[k]) < 1e-6, \
352                     "rfft mismatch on element %d: %g != %g, relative error %g" \
353                     % (k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k]))
354         # Which should satisfy the discrete form of Parseval's theorem
355         #   n-1               n-1
356         #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
357         #   m=0               k=0
358         timeSum = sum([abs(x)**2 for x in xs])
359         freqSum = sum([abs(X)**2 for X in Xa])
360         assert abs(freqSum/float(n) - timeSum)/timeSum < 1e-6, \
361             "Mismatch on Parseval's, %g != 1/%d * %g" % (timeSum, n, freqSum)
362
363     def test_rfft(self):
364         xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
365         self.run_rfft(xs, rfft(xs))
366
367 class TestUnitaryRFFT (unittest.TestCase):
368     """Verify `unitary_rfft`.
369     """
370     def run_unitary_rfft_parsevals(self, xs, freq, freqs, Xs):
371         """Check the discretized integral form of Parseval's theorem
372
373         Notes
374         -----
375
376         Which is:
377
378         .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df
379         """
380         dt = 1.0/freq
381         df = freqs[1]-freqs[0]
382         assert (df - 1/(len(xs)*dt))/df < 1e-6, \
383             "Mismatch in spacing, %g != 1/(%d*%g)" % (df, len(xs), dt)
384         Xa = list(Xs)
385         for k in range(len(Xs)-1,1,-1):
386             Xa.append(Xa[k])
387         assert len(xs) == len(Xa), "Length mismatch %d != %d" % (len(xs), len(Xa))
388         lhs = sum([abs(x)**2 for x in xs]) * dt
389         rhs = sum([abs(X)**2 for X in Xa]) * df
390         assert abs(lhs - rhs)/lhs < 1e-4, "Mismatch on Parseval's, %g != %g" \
391             % (lhs, rhs)
392     
393     def test_unitary_rfft_parsevals(self):
394         "Test unitary rfft on Parseval's theorem"
395         xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
396         dt = pi
397         freqs,Xs = unitary_rfft(xs, 1.0/dt)
398         self.run_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
399     
400     def rect(self, t):
401         r"""Rectangle function.
402
403         Notes
404         -----
405
406         .. math::
407
408             \rect(t) = \begin{cases}
409                1& \text{if $|t| < 0.5$}, \\
410                0& \text{if $|t| \ge 0.5$}.
411                        \end{cases}
412         """
413         if abs(t) < 0.5:
414             return 1
415         else:
416             return 0
417     
418     def run_unitary_rfft_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6,
419                               samples=256):
420         r"""Test `unitary_rttf` on known function `rect(at)`.
421
422         Notes
423         -----
424
425         Analytic result:
426
427         .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a)
428         """
429         samp_freq = float(samp_freq)
430         a = float(a)
431     
432         x = zeros((samples,), dtype=float)
433         dt = 1.0/samp_freq
434         for i in range(samples):
435             t = i*dt
436             x[i] = self.rect(a*(t-time_shift))
437         freq_axis, X = unitary_rfft(x, samp_freq)
438         #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
439     
440         # remove the phase due to our time shift
441         j = complex(0.0,1.0) # sqrt(-1)
442         for i in range(len(freq_axis)):
443             f = freq_axis[i]
444             inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
445             X[i] *= inverse_phase_shift
446     
447         expected = zeros((len(freq_axis),), dtype=float)
448         # normalized sinc(x) = sin(pi x)/(pi x)
449         # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
450         assert sinc(0.5) == 2.0/pi, "abnormal sinc()"
451         for i in range(len(freq_axis)):
452             f = freq_axis[i]
453             expected[i] = 1.0/abs(a) * sinc(f/a)
454     
455         if TEST_PLOTS:
456             pylab.figure()
457             pylab.subplot(211)
458             pylab.plot(arange(0, dt*samples, dt), x)
459             pylab.title('time series')
460             pylab.subplot(212)
461             pylab.plot(freq_axis, X.real, 'r.')
462             pylab.plot(freq_axis, X.imag, 'g.')
463             pylab.plot(freq_axis, expected, 'b-')
464             pylab.title('freq series')
465     
466     def test_unitary_rfft_rect(self):
467         "Test unitary FFTs on variously shaped rectangular functions."
468         self.run_unitary_rfft_rect(a=0.5)
469         self.run_unitary_rfft_rect(a=2.0)
470         self.run_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
471         self.run_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
472     
473     def gaussian(self, a, t):
474         r"""Gaussian function.
475
476         Notes
477         -----
478
479         .. math:: \gaussian(a,t) = \exp^{-at^2}
480         """
481         return exp(-a * t**2)
482     
483     def run_unitary_rfft_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6,
484                                   samples=256):
485         r"""Test `unitary_rttf` on known function `gaussian(a,t)`.
486
487         Notes
488         -----
489
490         Analytic result:
491
492         .. math::
493
494             \rfft(\gaussian(a,t)) = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f)
495         """
496         samp_freq = float(samp_freq)
497         a = float(a)
498     
499         x = zeros((samples,), dtype=float)
500         dt = 1.0/samp_freq
501         for i in range(samples):
502             t = i*dt
503             x[i] = self.gaussian(a, (t-time_shift))
504         freq_axis, X = unitary_rfft(x, samp_freq)
505         #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
506     
507         # remove the phase due to our time shift
508         j = complex(0.0,1.0) # sqrt(-1)
509         for i in range(len(freq_axis)):
510             f = freq_axis[i]
511             inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
512             X[i] *= inverse_phase_shift
513     
514         expected = zeros((len(freq_axis),), dtype=float)
515         for i in range(len(freq_axis)):
516             f = freq_axis[i]
517             expected[i] = sqrt(pi/a) * self.gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself.
518     
519         if TEST_PLOTS:
520             pylab.figure()
521             pylab.subplot(211)
522             pylab.plot(arange(0, dt*samples, dt), x)
523             pylab.title('time series')
524             pylab.subplot(212)
525             pylab.plot(freq_axis, X.real, 'r.')
526             pylab.plot(freq_axis, X.imag, 'g.')
527             pylab.plot(freq_axis, expected, 'b-')
528             pylab.title('freq series')
529     
530     def test_unitary_rfft_gaussian(self):
531         "Test unitary FFTs on variously shaped gaussian functions."
532         self.run_unitary_rfft_gaussian(a=0.5)
533         self.run_unitary_rfft_gaussian(a=2.0)
534         self.run_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
535         self.run_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
536
537 class TestUnitaryPowerSpectrum (unittest.TestCase):
538     def run_unitary_power_spectrum_sin(self, sin_freq=10, samp_freq=512,
539                                        samples=1024):
540         x = zeros((samples,), dtype=float)
541         samp_freq = float(samp_freq)
542         for i in range(samples):
543             x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
544         freq_axis, power = unitary_power_spectrum(x, samp_freq)
545         imax = argmax(power)
546     
547         expected = zeros((len(freq_axis),), dtype=float)
548         df = samp_freq/float(samples) # df = 1/T, where T = total_time
549         i = int(sin_freq/df)
550         # average power per unit time is 
551         #  P = <x(t)**2>
552         # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
553         # so average value of (int sin(t)**2 dt) per unit time is 0.5
554         #  P = 0.5
555         # we spread that power over a frequency bin of width df, sp
556         #  P(f0) = 0.5/df
557         # where f0 is the sin's frequency
558         #
559         # or:
560         # FFT of sin(2*pi*t*f0) gives
561         #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
562         # (area under x(t) = 0, area under X(f) = 0)
563         # so one sided power spectral density (PSD) per unit time is
564         #  P(f) = 2 |X(f)|**2 / T
565         #       = 2 * |0.5 delta(f-f0)|**2 / T
566         #       = 0.5 * |delta(f-f0)|**2 / T
567         # but we're discrete and want the integral of the 'delta' to be 1, 
568         # so 'delta'*df = 1  --> 'delta' = 1/df, and
569         #  P(f) = 0.5 / (df**2 * T)
570         #       = 0.5 / df                (T = 1/df)
571         expected[i] = 0.5 / df
572     
573         print "The power should be a peak at %g Hz of %g (%g, %g)" % \
574             (sin_freq, expected[i], freq_axis[imax], power[imax])
575         Pexp = 0
576         P    = 0
577         for i in range(len(freq_axis)):
578             Pexp += expected[i] *df
579             P    += power[i] * df
580         print " The total power should be %g (%g)" % (Pexp, P)
581     
582         if TEST_PLOTS:
583             pylab.figure()
584             pylab.subplot(211)
585             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
586             pylab.title('time series')
587             pylab.subplot(212)
588             pylab.plot(freq_axis, power, 'r.')
589             pylab.plot(freq_axis, expected, 'b-')
590             pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
591     
592     def test_unitary_power_spectrum_sin(self):
593         "Test unitary power spectrums on variously shaped sin functions"
594         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
595         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
596         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
597         self.run_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
598         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
599         # finally, with some irrational numbers, to check that I'm not getting lucky
600         self.run_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
601         # test with non-integer number of periods
602         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
603     
604     def run_unitary_power_spectrum_delta(self, amp=1, samp_freq=1,
605                                          samples=256):
606         """TODO
607         """
608         x = zeros((samples,), dtype=float)
609         samp_freq = float(samp_freq)
610         x[0] = amp
611         freq_axis, power = unitary_power_spectrum(x, samp_freq)
612     
613         # power = <x(t)**2> = (amp)**2 * dt/T
614         # we spread that power over the entire freq_axis [0,fN], so
615         #  P(f)  = (amp)**2 dt / (T fN)
616         # where
617         #  dt = 1/samp_freq        (sample period)
618         #  T  = samples/samp_freq  (total time of data aquisition)
619         #  fN = 0.5 samp_freq      (Nyquist frequency)
620         # so
621         #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
622         #       = 2 amp**2 / (samp_freq*samples)
623         expected_amp = 2.0 * amp**2 / (samp_freq * samples)
624         expected = ones((len(freq_axis),), dtype=float) * expected_amp
625     
626         print "The power should be flat at y = %g (%g)" % (expected_amp, power[0])
627         
628         if TEST_PLOTS:
629             pylab.figure()
630             pylab.subplot(211)
631             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
632             pylab.title('time series')
633             pylab.subplot(212)
634             pylab.plot(freq_axis, power, 'r.')
635             pylab.plot(freq_axis, expected, 'b-')
636             pylab.title('%g samples of delta amp %g' % (samples, amp))
637     
638     def _test_unitary_power_spectrum_delta(self):
639         "Test unitary power spectrums on various delta functions"
640         _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
641         _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
642         _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
643         _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
644         _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
645         _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024)
646     
647     def gaussian(self, area, mean, std, t):
648         "Integral over all time = area (i.e. normalized for area=1)"
649         return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2)
650         
651     def run_unitary_power_spectrum_gaussian(self, area=2.5, mean=5, std=1,
652                                             samp_freq=10.24 ,samples=512):
653         """TODO.
654         """
655         x = zeros((samples,), dtype=float)
656         mean = float(mean)
657         for i in range(samples):
658             t = i/float(samp_freq)
659             x[i] = self.gaussian(area, mean, std, t)
660         freq_axis, power = unitary_power_spectrum(x, samp_freq)
661     
662         # generate the predicted curve
663         # by comparing our self.gaussian() form to _gaussian(),
664         # we see that the Fourier transform of x(t) has parameters:
665         #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
666         #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
667         #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
668         # So our power spectral density per unit time is given by
669         #  P(f) = 2 |X(f)|**2 / T
670         # Where
671         #  T  = samples/samp_freq  (total time of data aquisition)
672         mean = 0.0
673         area = area /(std*sqrt(2.0*pi))
674         std = 1.0/(2.0*pi*std)
675         expected = zeros((len(freq_axis),), dtype=float)
676         df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
677         for i in range(len(freq_axis)):
678             f = i*df
679             gaus = self.gaussian(area, mean, std, f)
680             expected[i] = 2.0 * gaus**2 * samp_freq/samples
681         print "The power should be a half-gaussian, ",
682         print "with a peak at 0 Hz with amplitude %g (%g)" % (expected[0], power[0])
683     
684         if TEST_PLOTS:
685             pylab.figure()
686             pylab.subplot(211)
687             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
688             pylab.title('time series')
689             pylab.subplot(212)
690             pylab.plot(freq_axis, power, 'r.')
691             pylab.plot(freq_axis, expected, 'b-')
692             pylab.title('freq series')
693     
694     def test_unitary_power_spectrum_gaussian(self):
695         "Test unitary power spectrums on various gaussian functions"
696         for area in [1,pi]:
697             for std in [1,sqrt(2)]:
698                 for samp_freq in [10.0, exp(1)]:
699                     for samples in [1024,2048]:
700                         self.run_unitary_power_spectrum_gaussian(
701                             area=area, std=std, samp_freq=samp_freq,
702                             samples=samples)
703
704 class TestUnitaryAvgPowerSpectrum (unittest.TestCase):
705     def run_unitary_avg_power_spectrum_sin(self, sin_freq=10, samp_freq=512,
706                                            samples=1024, chunk_size=512,
707                                            overlap=True, window=window_hann):
708         """TODO
709         """
710         x = zeros((samples,), dtype=float)
711         samp_freq = float(samp_freq)
712         for i in range(samples):
713             x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
714         freq_axis, power = unitary_avg_power_spectrum(x, samp_freq, chunk_size,
715                                                       overlap, window)
716         imax = argmax(power)
717     
718         expected = zeros((len(freq_axis),), dtype=float)
719         df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time
720         i = int(sin_freq/df)
721         expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
722     
723         print "The power should be a peak at %g Hz of %g (%g, %g)" % \
724             (sin_freq, expected[i], freq_axis[imax], power[imax])
725         Pexp = 0
726         P    = 0
727         for i in range(len(freq_axis)):
728             Pexp += expected[i] * df
729             P    += power[i] * df
730         print " The total power should be %g (%g)" % (Pexp, P)
731     
732         if TEST_PLOTS:
733             pylab.figure()
734             pylab.subplot(211)
735             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
736             pylab.title('time series')
737             pylab.subplot(212)
738             pylab.plot(freq_axis, power, 'r.')
739             pylab.plot(freq_axis, expected, 'b-')
740             pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
741     
742     def test_unitary_avg_power_spectrum_sin(self):
743         "Test unitary avg power spectrums on variously shaped sin functions."
744         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
745         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
746         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
747         self.run_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
748         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
749         # test long wavelenth sin, so be closer to window frequency
750         self.run_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
751         # finally, with some irrational numbers, to check that I'm not getting lucky
752         self.run_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)