Create extra block holding PowerSpectrumCommand output data
[FFT-tools.git] / FFT_tools.py
1 #!/usr/bin/python
2 #
3 # Wrap Numpy's fft module to produce 1D unitary transforms and power spectra.
4 #
5 # Copyright (C) 2008  W. Trevor King
6 # All rights reserved.
7 #
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions
10 # are met:
11
12 #  * Redistributions of source code must retain the above copyright
13 #    notice, this list of conditions and the following disclaimer.
14 #
15 #  * Redistributions in binary form must reproduce the above copyright
16 #    notice, this list of conditions and the following disclaimer in
17 #    the documentation and/or other materials provided with the
18 #    distribution.
19 #
20 #  * Neither the name of the copyright holders nor the names of the
21 #    contributors may be used to endorse or promote products derived
22 #    from this software without specific prior written permission.
23
24 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
28 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
30 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
32 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
34 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35 # POSSIBILITY OF SUCH DAMAGE.
36
37 """
38 Define some FFT wrappers to reduce clutter.
39 Provides a unitary discrete FFT and a windowed version.
40 Based on :func:`numpy.fft.rfft`.
41
42 Main entry functions:
43
44 * :func:`unitary_rfft`
45 * :func:`power_spectrum`
46 * :func:`unitary_power_spectrum`
47 * :func:`avg_power_spectrum`
48 * :func:`unitary_avg_power_spectrum`
49 """
50
51 import unittest
52
53 from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \
54     sinc, arctan2, array, ones, arange, linspace, zeros, \
55     uint16, float, concatenate, fromfile, argmax, complex
56 from numpy.fft import rfft
57
58
59 TEST_PLOTS = False
60
61 def floor_pow_of_two(num):
62     """Round `num` down to the closest exact a power of two.
63
64     Examples
65     --------
66
67     >>> floor_pow_of_two(3)
68     2
69     >>> floor_pow_of_two(11)
70     8
71     >>> floor_pow_of_two(15)
72     8
73     """
74     lnum = log2(num)
75     if int(lnum) != lnum:
76         num = 2**floor(lnum)
77     return int(num)
78
79 def round_pow_of_two(num):
80     """Round `num` to the closest exact a power of two on a log scale.
81
82     Examples
83     --------
84
85     >>> round_pow_of_two(2.9) # Note rounding on *log scale*
86     4
87     >>> round_pow_of_two(11)
88     8
89     >>> round_pow_of_two(15)
90     16
91     """
92     lnum = log2(num)
93     if int(lnum) != lnum:
94         num = 2**round(lnum)
95     return int(num)
96
97 def ceil_pow_of_two(num):
98     """Round `num` up to the closest exact a power of two.
99
100     Examples
101     --------
102
103     >>> ceil_pow_of_two(3)
104     4
105     >>> ceil_pow_of_two(11)
106     16
107     >>> ceil_pow_of_two(15)
108     16
109     """
110     lnum = log2(num)
111     if int(lnum) != lnum:
112         num = 2**ceil(lnum)
113     return int(num)
114
115 def unitary_rfft(data, freq=1.0):
116     """Compute the unitary Fourier transform of real data.
117
118     Unitary = preserves power [Parseval's theorem].
119
120     Parameters
121     ----------
122     data : iterable
123         Real (not complex) data taken with a sampling frequency `freq`.
124     freq : real
125         Sampling frequency.
126
127     Returns
128     -------
129     freq_axis,trans : numpy.ndarray
130         Arrays ready for plotting.
131
132     Notes
133     -----
134     If the units on your data are Volts,
135     and your sampling frequency is in Hz,
136     then `freq_axis` will be in Hz,
137     and `trans` will be in Volts.
138     """
139     nsamps = floor_pow_of_two(len(data))
140     # Which should satisfy the discrete form of Parseval's theorem
141     #   n-1               n-1
142     #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
143     #   m=0               k=0
144     # However, we want our FFT to satisfy the continuous Parseval eqn
145     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
146     # which has the discrete form
147     #   n-1              n-1
148     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
149     #   m=0              k=0
150     # with X'_k = AX, this gives us
151     #   n-1                     n-1
152     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
153     #   m=0                     k=0
154     # so we see
155     #   A^2 df/dt = 1/n
156     #   A^2 = 1/n dt/df
157     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
158     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
159     # f_c in the transformed data is the Nyquist frequency (12.1.2)
160     #   f_c = 1/2dt
161     # and the points are spaced out by (12.1.5)
162     #   df = 1/ndt
163     # so
164     #   dt = 1/ndf
165     #   dt/df = 1/ndf^2
166     #   A^2 = 1/n^2df^2
167     #   A = 1/ndf = ndt/n = dt
168     # so we can convert the Numpy transformed data to match our unitary
169     # continuous transformed data with (also NR 12.1.8)
170     #   X'_k = dtX = X / <sampling freq>
171     trans = rfft(data[0:nsamps]) / float(freq)
172     freq_axis = linspace(0, freq/2, nsamps/2+1)
173     return (freq_axis, trans)
174
175 def power_spectrum(data, freq=1.0):
176     """Compute the power spectrum of the time series `data`.
177
178     Parameters
179     ----------
180     data : iterable
181         Real (not complex) data taken with a sampling frequency `freq`.
182     freq : real
183         Sampling frequency.
184
185     Returns
186     -------
187     freq_axis,power : numpy.ndarray
188         Arrays ready for plotting.
189
190     Notes
191     -----
192     If the number of samples in `data` is not an integer power of two,
193     the FFT ignores some of the later points.
194
195     See Also
196     --------
197     unitary_power_spectrum,avg_power_spectrum
198     """
199     nsamps = floor_pow_of_two(len(data))
200     
201     freq_axis = linspace(0, freq/2, nsamps/2+1)
202     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
203     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
204     # See Numerical Recipies for a details.
205     trans = rfft(data[0:nsamps])
206     power = trans * trans.conj() # We want the square of the amplitude.
207     return (freq_axis, power)
208
209 def unitary_power_spectrum(data, freq=1.0):
210     """Compute the unitary power spectrum of the time series `data`.
211
212     See Also
213     --------
214     power_spectrum,unitary_avg_power_spectrum
215     """
216     freq_axis,power = power_spectrum(data, freq)
217     # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498)
218     #
219     # numpy normalizes with 1/N on the inverse transform ifft,
220     # so we should normalize the freq-space representation with 1/sqrt(N).
221     # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2)
222     # So the power gets normalized by that twice and we have 2/N
223     #
224     # On top of this, the FFT assumes a sampling freq of 1 per second,
225     # and we want to preserve area under our curves.
226     # If our total time T = len(data)/freq is smaller than 1,
227     # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)), 
228     # and we need to scale the powers down to conserve area.
229     # df_fft * F_fft(f) = df_real *F_real(f)
230     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
231     # So the power gets normalized by *that* twice and we have 2/N * freq**2
232
233     # power per unit time
234     # measure x(t) for time T
235     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
236     # PSD(f) = 2 |X(f)|**2 / T
237
238     # total_time = len(data)/float(freq)
239     # power *= 2.0 / float(freq)**2   /   total_time
240     # power *= 2.0 / freq**2   *   freq / len(data)
241     power *= 2.0 / (freq * float(len(data)))
242
243     return (freq_axis, power)
244
245 def window_hann(length):
246     r"""Returns a Hann window array with length entries
247
248     Notes
249     -----
250     The Hann window with length :math:`L` is defined as
251
252     .. math:: w_i = \frac{1}{2} (1-\cos(2\pi i/L))
253     """
254     win = zeros((length,), dtype=float)
255     for i in range(length):
256         win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length)))
257     # avg value of cos over a period is 0
258     # so average height of Hann window is 0.5
259     return win
260
261 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
262                        overlap=True, window=window_hann):
263     """Compute the avgerage power spectrum of `data`.
264
265     Parameters
266     ----------
267     data : iterable
268         Real (not complex) data taken with a sampling frequency `freq`.
269     freq : real
270         Sampling frequency.
271     chunk_size : int
272         Number of samples per chunk.  Use a power of two.
273     overlap: {True,False}
274         If `True`, each chunk overlaps the previous chunk by half its
275         length.  Otherwise, the chunks are end-to-end, and not
276         overlapping.
277     window: iterable
278         Weights used to "smooth" the chunks, there is a whole science
279         behind windowing, but if you're not trying to squeeze every
280         drop of information out of your data, you'll be OK with the
281         default Hann window.
282
283     Returns
284     -------
285     freq_axis,power : numpy.ndarray
286         Arrays ready for plotting.
287
288     Notes
289     -----
290     The average power spectrum is computed by breaking `data` into
291     chunks of length `chunk_size`.  These chunks are transformed
292     individually into frequency space and then averaged together.
293
294     See Numerical Recipes 2 section 13.4 for a good introduction to
295     the theory.
296
297     If the number of samples in `data` is not a multiple of
298     `chunk_size`, we ignore the extra points.
299     """
300     assert chunk_size == floor_pow_of_two(chunk_size), \
301         "chunk_size %d should be a power of 2" % chunk_size
302
303     nchunks = len(data)/chunk_size # integer division = implicit floor
304     if overlap:
305         chunk_step = chunk_size/2
306     else:
307         chunk_step = chunk_size
308     
309     win = window(chunk_size) # generate a window of the appropriate size
310     freq_axis = linspace(0, freq/2, chunk_size/2+1)
311     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
312     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
313     # See Numerical Recipies for a details.
314     power = zeros((chunk_size/2+1,), dtype=float)
315     for i in range(nchunks):
316         starti = i*chunk_step
317         stopi = starti+chunk_size
318         fft_chunk = rfft(data[starti:stopi]*win)
319         p_chunk = fft_chunk * fft_chunk.conj() 
320         power += p_chunk.astype(float)
321     power /= float(nchunks)
322     return (freq_axis, power)
323
324 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
325                                overlap=True, window=window_hann):
326     """Compute the unitary average power spectrum of `data`.
327
328     See Also
329     --------
330     avg_power_spectrum,unitary_power_spectrum
331     """
332     freq_axis,power = avg_power_spectrum(data, freq, chunk_size,
333                                          overlap, window)
334     #        2.0 / (freq * chunk_size)          |rfft()|**2 --> unitary_power_spectrum
335     power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum()            
336     #                                       * 8/3  to remove power from windowing
337     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
338     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
339     # So our calulated power has and extra <w(t)**2> in it.
340     # For the Hann window, <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
341     # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t),
342     # The normalization is not perfect. ??
343     # The normalization approaches perfection as chunk_size -> infinity.
344     return (freq_axis, power)
345
346
347
348 class TestRFFT (unittest.TestCase):
349     r"""Ensure Numpy's FFT algorithm acts as expected.
350
351     Notes
352     -----
353     The expected return values are [#dft]_:
354
355     .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n}
356
357     .. [#dft] See the *Background information* section of :mod:`numpy.fft`.
358     """
359     def run_rfft(self, xs, Xs):
360         i = complex(0,1)
361         n = len(xs)
362         Xa = []
363         for k in range(n):
364             Xa.append(sum([x*exp(-2*pi*i*m*k/n) for x,m in zip(xs,range(n))]))
365             if k < len(Xs):
366                 assert (Xs[k]-Xa[k])/abs(Xa[k]) < 1e-6, \
367                     "rfft mismatch on element %d: %g != %g, relative error %g" \
368                     % (k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k]))
369         # Which should satisfy the discrete form of Parseval's theorem
370         #   n-1               n-1
371         #   SUM |x_m|^2 = 1/n SUM |X_k|^2. 
372         #   m=0               k=0
373         timeSum = sum([abs(x)**2 for x in xs])
374         freqSum = sum([abs(X)**2 for X in Xa])
375         assert abs(freqSum/float(n) - timeSum)/timeSum < 1e-6, \
376             "Mismatch on Parseval's, %g != 1/%d * %g" % (timeSum, n, freqSum)
377
378     def test_rfft(self):
379         xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
380         self.run_rfft(xs, rfft(xs))
381
382 class TestUnitaryRFFT (unittest.TestCase):
383     """Verify `unitary_rfft`.
384     """
385     def run_unitary_rfft_parsevals(self, xs, freq, freqs, Xs):
386         """Check the discretized integral form of Parseval's theorem
387
388         Notes
389         -----
390         Which is:
391
392         .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df
393         """
394         dt = 1.0/freq
395         df = freqs[1]-freqs[0]
396         assert (df - 1/(len(xs)*dt))/df < 1e-6, \
397             "Mismatch in spacing, %g != 1/(%d*%g)" % (df, len(xs), dt)
398         Xa = list(Xs)
399         for k in range(len(Xs)-1,1,-1):
400             Xa.append(Xa[k])
401         assert len(xs) == len(Xa), "Length mismatch %d != %d" % (len(xs), len(Xa))
402         lhs = sum([abs(x)**2 for x in xs]) * dt
403         rhs = sum([abs(X)**2 for X in Xa]) * df
404         assert abs(lhs - rhs)/lhs < 1e-4, "Mismatch on Parseval's, %g != %g" \
405             % (lhs, rhs)
406     
407     def test_unitary_rfft_parsevals(self):
408         "Test unitary rfft on Parseval's theorem"
409         xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1]
410         dt = pi
411         freqs,Xs = unitary_rfft(xs, 1.0/dt)
412         self.run_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs)
413     
414     def rect(self, t):
415         r"""Rectangle function.
416
417         Notes
418         -----
419
420         .. math::
421
422             \rect(t) = \begin{cases}
423                1& \text{if $|t| < 0.5$}, \\
424                0& \text{if $|t| \ge 0.5$}.
425                        \end{cases}
426         """
427         if abs(t) < 0.5:
428             return 1
429         else:
430             return 0
431     
432     def run_unitary_rfft_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6,
433                               samples=256):
434         r"""Test `unitary_rttf` on known function `rect(at)`.
435
436         Notes
437         -----
438         Analytic result:
439
440         .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a)
441         """
442         samp_freq = float(samp_freq)
443         a = float(a)
444     
445         x = zeros((samples,), dtype=float)
446         dt = 1.0/samp_freq
447         for i in range(samples):
448             t = i*dt
449             x[i] = self.rect(a*(t-time_shift))
450         freq_axis, X = unitary_rfft(x, samp_freq)
451         #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
452     
453         # remove the phase due to our time shift
454         j = complex(0.0,1.0) # sqrt(-1)
455         for i in range(len(freq_axis)):
456             f = freq_axis[i]
457             inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
458             X[i] *= inverse_phase_shift
459     
460         expected = zeros((len(freq_axis),), dtype=float)
461         # normalized sinc(x) = sin(pi x)/(pi x)
462         # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
463         assert sinc(0.5) == 2.0/pi, "abnormal sinc()"
464         for i in range(len(freq_axis)):
465             f = freq_axis[i]
466             expected[i] = 1.0/abs(a) * sinc(f/a)
467     
468         if TEST_PLOTS:
469             pylab.figure()
470             pylab.subplot(211)
471             pylab.plot(arange(0, dt*samples, dt), x)
472             pylab.title('time series')
473             pylab.subplot(212)
474             pylab.plot(freq_axis, X.real, 'r.')
475             pylab.plot(freq_axis, X.imag, 'g.')
476             pylab.plot(freq_axis, expected, 'b-')
477             pylab.title('freq series')
478     
479     def test_unitary_rfft_rect(self):
480         "Test unitary FFTs on variously shaped rectangular functions."
481         self.run_unitary_rfft_rect(a=0.5)
482         self.run_unitary_rfft_rect(a=2.0)
483         self.run_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
484         self.run_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
485     
486     def gaussian(self, a, t):
487         r"""Gaussian function.
488
489         Notes
490         -----
491
492         .. math:: \gaussian(a,t) = \exp^{-at^2}
493         """
494         return exp(-a * t**2)
495     
496     def run_unitary_rfft_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6,
497                                   samples=256):
498         r"""Test `unitary_rttf` on known function `gaussian(a,t)`.
499
500         Notes
501         -----
502         Analytic result:
503
504         .. math::
505
506             \rfft(\gaussian(a,t)) = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f)
507         """
508         samp_freq = float(samp_freq)
509         a = float(a)
510     
511         x = zeros((samples,), dtype=float)
512         dt = 1.0/samp_freq
513         for i in range(samples):
514             t = i*dt
515             x[i] = self.gaussian(a, (t-time_shift))
516         freq_axis, X = unitary_rfft(x, samp_freq)
517         #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X)
518     
519         # remove the phase due to our time shift
520         j = complex(0.0,1.0) # sqrt(-1)
521         for i in range(len(freq_axis)):
522             f = freq_axis[i]
523             inverse_phase_shift = exp(j*2.0*pi*time_shift*f)
524             X[i] *= inverse_phase_shift
525     
526         expected = zeros((len(freq_axis),), dtype=float)
527         for i in range(len(freq_axis)):
528             f = freq_axis[i]
529             expected[i] = sqrt(pi/a) * self.gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself.
530     
531         if TEST_PLOTS:
532             pylab.figure()
533             pylab.subplot(211)
534             pylab.plot(arange(0, dt*samples, dt), x)
535             pylab.title('time series')
536             pylab.subplot(212)
537             pylab.plot(freq_axis, X.real, 'r.')
538             pylab.plot(freq_axis, X.imag, 'g.')
539             pylab.plot(freq_axis, expected, 'b-')
540             pylab.title('freq series')
541     
542     def test_unitary_rfft_gaussian(self):
543         "Test unitary FFTs on variously shaped gaussian functions."
544         self.run_unitary_rfft_gaussian(a=0.5)
545         self.run_unitary_rfft_gaussian(a=2.0)
546         self.run_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
547         self.run_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
548
549 class TestUnitaryPowerSpectrum (unittest.TestCase):
550     def run_unitary_power_spectrum_sin(self, sin_freq=10, samp_freq=512,
551                                        samples=1024):
552         x = zeros((samples,), dtype=float)
553         samp_freq = float(samp_freq)
554         for i in range(samples):
555             x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
556         freq_axis, power = unitary_power_spectrum(x, samp_freq)
557         imax = argmax(power)
558     
559         expected = zeros((len(freq_axis),), dtype=float)
560         df = samp_freq/float(samples) # df = 1/T, where T = total_time
561         i = int(sin_freq/df)
562         # average power per unit time is 
563         #  P = <x(t)**2>
564         # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
565         # so average value of (int sin(t)**2 dt) per unit time is 0.5
566         #  P = 0.5
567         # we spread that power over a frequency bin of width df, sp
568         #  P(f0) = 0.5/df
569         # where f0 is the sin's frequency
570         #
571         # or:
572         # FFT of sin(2*pi*t*f0) gives
573         #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
574         # (area under x(t) = 0, area under X(f) = 0)
575         # so one sided power spectral density (PSD) per unit time is
576         #  P(f) = 2 |X(f)|**2 / T
577         #       = 2 * |0.5 delta(f-f0)|**2 / T
578         #       = 0.5 * |delta(f-f0)|**2 / T
579         # but we're discrete and want the integral of the 'delta' to be 1, 
580         # so 'delta'*df = 1  --> 'delta' = 1/df, and
581         #  P(f) = 0.5 / (df**2 * T)
582         #       = 0.5 / df                (T = 1/df)
583         expected[i] = 0.5 / df
584     
585         print "The power should be a peak at %g Hz of %g (%g, %g)" % \
586             (sin_freq, expected[i], freq_axis[imax], power[imax])
587         Pexp = 0
588         P    = 0
589         for i in range(len(freq_axis)):
590             Pexp += expected[i] *df
591             P    += power[i] * df
592         print " The total power should be %g (%g)" % (Pexp, P)
593     
594         if TEST_PLOTS:
595             pylab.figure()
596             pylab.subplot(211)
597             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
598             pylab.title('time series')
599             pylab.subplot(212)
600             pylab.plot(freq_axis, power, 'r.')
601             pylab.plot(freq_axis, expected, 'b-')
602             pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
603     
604     def test_unitary_power_spectrum_sin(self):
605         "Test unitary power spectrums on variously shaped sin functions"
606         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
607         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
608         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
609         self.run_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024)
610         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
611         # finally, with some irrational numbers, to check that I'm not getting lucky
612         self.run_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)
613         # test with non-integer number of periods
614         self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256)
615     
616     def run_unitary_power_spectrum_delta(self, amp=1, samp_freq=1,
617                                          samples=256):
618         """TODO
619         """
620         x = zeros((samples,), dtype=float)
621         samp_freq = float(samp_freq)
622         x[0] = amp
623         freq_axis, power = unitary_power_spectrum(x, samp_freq)
624     
625         # power = <x(t)**2> = (amp)**2 * dt/T
626         # we spread that power over the entire freq_axis [0,fN], so
627         #  P(f)  = (amp)**2 dt / (T fN)
628         # where
629         #  dt = 1/samp_freq        (sample period)
630         #  T  = samples/samp_freq  (total time of data aquisition)
631         #  fN = 0.5 samp_freq      (Nyquist frequency)
632         # so
633         #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
634         #       = 2 amp**2 / (samp_freq*samples)
635         expected_amp = 2.0 * amp**2 / (samp_freq * samples)
636         expected = ones((len(freq_axis),), dtype=float) * expected_amp
637     
638         print "The power should be flat at y = %g (%g)" % (expected_amp, power[0])
639         
640         if TEST_PLOTS:
641             pylab.figure()
642             pylab.subplot(211)
643             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
644             pylab.title('time series')
645             pylab.subplot(212)
646             pylab.plot(freq_axis, power, 'r.')
647             pylab.plot(freq_axis, expected, 'b-')
648             pylab.title('%g samples of delta amp %g' % (samples, amp))
649     
650     def _test_unitary_power_spectrum_delta(self):
651         "Test unitary power spectrums on various delta functions"
652         _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024)
653         _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048)
654         _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed
655         _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed
656         _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024)
657         _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024)
658     
659     def gaussian(self, area, mean, std, t):
660         "Integral over all time = area (i.e. normalized for area=1)"
661         return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2)
662         
663     def run_unitary_power_spectrum_gaussian(self, area=2.5, mean=5, std=1,
664                                             samp_freq=10.24 ,samples=512):
665         """TODO.
666         """
667         x = zeros((samples,), dtype=float)
668         mean = float(mean)
669         for i in range(samples):
670             t = i/float(samp_freq)
671             x[i] = self.gaussian(area, mean, std, t)
672         freq_axis, power = unitary_power_spectrum(x, samp_freq)
673     
674         # generate the predicted curve
675         # by comparing our self.gaussian() form to _gaussian(),
676         # we see that the Fourier transform of x(t) has parameters:
677         #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are converting to angular frequency, not frequency like we are)
678         #  area' = area/[std sqrt(2*pi)]   (plugging into FT of _gaussian() above)
679         #  mean' = 0               (changing the mean in the time-domain just changes the phase in the freq-domain)
680         # So our power spectral density per unit time is given by
681         #  P(f) = 2 |X(f)|**2 / T
682         # Where
683         #  T  = samples/samp_freq  (total time of data aquisition)
684         mean = 0.0
685         area = area /(std*sqrt(2.0*pi))
686         std = 1.0/(2.0*pi*std)
687         expected = zeros((len(freq_axis),), dtype=float)
688         df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
689         for i in range(len(freq_axis)):
690             f = i*df
691             gaus = self.gaussian(area, mean, std, f)
692             expected[i] = 2.0 * gaus**2 * samp_freq/samples
693         print "The power should be a half-gaussian, ",
694         print "with a peak at 0 Hz with amplitude %g (%g)" % (expected[0], power[0])
695     
696         if TEST_PLOTS:
697             pylab.figure()
698             pylab.subplot(211)
699             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
700             pylab.title('time series')
701             pylab.subplot(212)
702             pylab.plot(freq_axis, power, 'r.')
703             pylab.plot(freq_axis, expected, 'b-')
704             pylab.title('freq series')
705     
706     def test_unitary_power_spectrum_gaussian(self):
707         "Test unitary power spectrums on various gaussian functions"
708         for area in [1,pi]:
709             for std in [1,sqrt(2)]:
710                 for samp_freq in [10.0, exp(1)]:
711                     for samples in [1024,2048]:
712                         self.run_unitary_power_spectrum_gaussian(
713                             area=area, std=std, samp_freq=samp_freq,
714                             samples=samples)
715
716 class TestUnitaryAvgPowerSpectrum (unittest.TestCase):
717     def run_unitary_avg_power_spectrum_sin(self, sin_freq=10, samp_freq=512,
718                                            samples=1024, chunk_size=512,
719                                            overlap=True, window=window_hann):
720         """TODO
721         """
722         x = zeros((samples,), dtype=float)
723         samp_freq = float(samp_freq)
724         for i in range(samples):
725             x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq)
726         freq_axis, power = unitary_avg_power_spectrum(x, samp_freq, chunk_size,
727                                                       overlap, window)
728         imax = argmax(power)
729     
730         expected = zeros((len(freq_axis),), dtype=float)
731         df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time
732         i = int(sin_freq/df)
733         expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin()
734     
735         print "The power should be a peak at %g Hz of %g (%g, %g)" % \
736             (sin_freq, expected[i], freq_axis[imax], power[imax])
737         Pexp = 0
738         P    = 0
739         for i in range(len(freq_axis)):
740             Pexp += expected[i] * df
741             P    += power[i] * df
742         print " The total power should be %g (%g)" % (Pexp, P)
743     
744         if TEST_PLOTS:
745             pylab.figure()
746             pylab.subplot(211)
747             pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-')
748             pylab.title('time series')
749             pylab.subplot(212)
750             pylab.plot(freq_axis, power, 'r.')
751             pylab.plot(freq_axis, expected, 'b-')
752             pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq))
753     
754     def test_unitary_avg_power_spectrum_sin(self):
755         "Test unitary avg power spectrums on variously shaped sin functions."
756         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024)
757         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048)
758         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098)
759         self.run_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024)
760         self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048)
761         # test long wavelenth sin, so be closer to window frequency
762         self.run_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048)
763         # finally, with some irrational numbers, to check that I'm not getting lucky
764         self.run_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024)