FFT_tools: try to import pyplot up front
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 :func:`numpy.fft.rfft`.
19
20 Main entry functions:
21
22 * :func:`unitary_rfft`
23 * :func:`power_spectrum`
24 * :func:`unitary_power_spectrum`
25 * :func:`avg_power_spectrum`
26 * :func:`unitary_avg_power_spectrum`
27 """
28
29 import logging as _logging
30 import unittest as _unittest
31
32 import numpy as _numpy
33 try:
34     import matplotlib.pyplot as _pyplot
35 except ImportError as e:
36     _pyplot = None
37     _pyplot_import_error = e
38
39
40 __version__ = '0.4'
41
42
43 LOG = _logging.getLogger('FFT-tools')
44 LOG.addHandler(_logging.StreamHandler())
45 LOG.setLevel(_logging.ERROR)
46
47
48 # Display time- and freq-space plots of the test transforms if True
49 TEST_PLOTS = False
50
51
52 def floor_pow_of_two(num):
53     """Round `num` down to the closest exact a power of two.
54
55     Examples
56     --------
57
58     >>> floor_pow_of_two(3)
59     2
60     >>> floor_pow_of_two(11)
61     8
62     >>> floor_pow_of_two(15)
63     8
64     """
65     lnum = _numpy.log2(num)
66     if int(lnum) != lnum:
67         num = 2**_numpy.floor(lnum)
68     return int(num)
69
70
71 def round_pow_of_two(num):
72     """Round `num` to the closest exact a power of two on a log scale.
73
74     Examples
75     --------
76
77     >>> round_pow_of_two(2.9) # Note rounding on *log scale*
78     4
79     >>> round_pow_of_two(11)
80     8
81     >>> round_pow_of_two(15)
82     16
83     """
84     lnum = _numpy.log2(num)
85     if int(lnum) != lnum:
86         num = 2**_numpy.round(lnum)
87     return int(num)
88
89
90 def ceil_pow_of_two(num):
91     """Round `num` up to the closest exact a power of two.
92
93     Examples
94     --------
95
96     >>> ceil_pow_of_two(3)
97     4
98     >>> ceil_pow_of_two(11)
99     16
100     >>> ceil_pow_of_two(15)
101     16
102     """
103     lnum = _numpy.log2(num)
104     if int(lnum) != lnum:
105         num = 2**_numpy.ceil(lnum)
106     return int(num)
107
108
109 def unitary_rfft(data, freq=1.0):
110     """Compute the unitary Fourier transform of real data.
111
112     Unitary = preserves power [Parseval's theorem].
113
114     Parameters
115     ----------
116     data : iterable
117         Real (not complex) data taken with a sampling frequency `freq`.
118     freq : real
119         Sampling frequency.
120
121     Returns
122     -------
123     freq_axis,trans : numpy.ndarray
124         Arrays ready for plotting.
125
126     Notes
127     -----
128     If the units on your data are Volts,
129     and your sampling frequency is in Hz,
130     then `freq_axis` will be in Hz,
131     and `trans` will be in Volts.
132     """
133     nsamps = floor_pow_of_two(len(data))
134     # Which should satisfy the discrete form of Parseval's theorem
135     #   n-1               n-1
136     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
137     #   m=0               k=0
138     # However, we want our FFT to satisfy the continuous Parseval eqn
139     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
140     # which has the discrete form
141     #   n-1              n-1
142     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
143     #   m=0              k=0
144     # with X'_k = AX, this gives us
145     #   n-1                     n-1
146     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
147     #   m=0                     k=0
148     # so we see
149     #   A^2 df/dt = 1/n
150     #   A^2 = 1/n dt/df
151     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
152     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
153     # f_c in the transformed data is the Nyquist frequency (12.1.2)
154     #   f_c = 1/2dt
155     # and the points are spaced out by (12.1.5)
156     #   df = 1/ndt
157     # so
158     #   dt = 1/ndf
159     #   dt/df = 1/ndf^2
160     #   A^2 = 1/n^2df^2
161     #   A = 1/ndf = ndt/n = dt
162     # so we can convert the Numpy transformed data to match our unitary
163     # continuous transformed data with (also NR 12.1.8)
164     #   X'_k = dtX = X / <sampling freq>
165     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
166     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
167     return (freq_axis, trans)
168
169
170 def power_spectrum(data, freq=1.0):
171     """Compute the power spectrum of the time series `data`.
172
173     Parameters
174     ----------
175     data : iterable
176         Real (not complex) data taken with a sampling frequency `freq`.
177     freq : real
178         Sampling frequency.
179
180     Returns
181     -------
182     freq_axis,power : numpy.ndarray
183         Arrays ready for plotting.
184
185     Notes
186     -----
187     If the number of samples in `data` is not an integer power of two,
188     the FFT ignores some of the later points.
189
190     See Also
191     --------
192     unitary_power_spectrum,avg_power_spectrum
193     """
194     nsamps = floor_pow_of_two(len(data))
195
196     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
197     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
198     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
199     # See Numerical Recipies for a details.
200     trans = _numpy.fft.rfft(data[0:nsamps])
201     power = (trans * trans.conj()).real  # we want the square of the amplitude
202     return (freq_axis, power)
203
204
205 def unitary_power_spectrum(data, freq=1.0):
206     """Compute the unitary power spectrum of the time series `data`.
207
208     See Also
209     --------
210     power_spectrum,unitary_avg_power_spectrum
211     """
212     freq_axis,power = power_spectrum(data, freq)
213     # One sided power spectral density, so 2|H(f)|**2
214     # (see NR 2nd edition 12.0.14, p498)
215     #
216     # numpy normalizes with 1/N on the inverse transform ifft,
217     # so we should normalize the freq-space representation with 1/sqrt(N).
218     # But we're using the rfft, where N points are like N/2 complex points,
219     # so 1/sqrt(N/2)
220     # So the power gets normalized by that twice and we have 2/N
221     #
222     # On top of this, the FFT assumes a sampling freq of 1 per second,
223     # and we want to preserve area under our curves.
224     # If our total time T = len(data)/freq is smaller than 1,
225     # our df_real = freq/len(data) is bigger that the FFT expects
226     # (dt_fft = 1/len(data)),
227     # and we need to scale the powers down to conserve area.
228     # df_fft * F_fft(f) = df_real *F_real(f)
229     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
230     # So the power gets normalized by *that* twice and we have 2/N * freq**2
231
232     # power per unit time
233     # measure x(t) for time T
234     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
235     # PSD(f) = 2 |X(f)|**2 / T
236
237     # total_time = len(data)/float(freq)
238     # power *= 2.0 / float(freq)**2   /   total_time
239     # power *= 2.0 / freq**2   *   freq / len(data)
240     power *= 2.0 / (freq * _numpy.float(len(data)))
241
242     return (freq_axis, power)
243
244
245 def window_hann(length):
246     r"""Returns a Hann window array with length entries
247
248     Notes
249     -----
250     The Hann window with length :math:`L` is defined as
251
252     .. math:: w_i = \frac{1}{2} (1-\cos(2\pi i/L))
253     """
254     win = _numpy.zeros((length,), dtype=_numpy.float)
255     for i in range(length):
256         win[i] = 0.5 * (
257             1.0 - _numpy.cos(2.0 * _numpy.pi * _numpy.float(i) / (length)))
258     # avg value of cos over a period is 0
259     # so average height of Hann window is 0.5
260     return win
261
262
263 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
264                        overlap=True, window=window_hann):
265     """Compute the avgerage power spectrum of `data`.
266
267     Parameters
268     ----------
269     data : iterable
270         Real (not complex) data taken with a sampling frequency `freq`.
271     freq : real
272         Sampling frequency.
273     chunk_size : int
274         Number of samples per chunk.  Use a power of two.
275     overlap: {True,False}
276         If `True`, each chunk overlaps the previous chunk by half its
277         length.  Otherwise, the chunks are end-to-end, and not
278         overlapping.
279     window: iterable
280         Weights used to "smooth" the chunks, there is a whole science
281         behind windowing, but if you're not trying to squeeze every
282         drop of information out of your data, you'll be OK with the
283         default Hann window.
284
285     Returns
286     -------
287     freq_axis,power : numpy.ndarray
288         Arrays ready for plotting.
289
290     Notes
291     -----
292     The average power spectrum is computed by breaking `data` into
293     chunks of length `chunk_size`.  These chunks are transformed
294     individually into frequency space and then averaged together.
295
296     See Numerical Recipes 2 section 13.4 for a good introduction to
297     the theory.
298
299     If the number of samples in `data` is not a multiple of
300     `chunk_size`, we ignore the extra points.
301     """
302     if chunk_size != floor_pow_of_two(chunk_size):
303         raise ValueError(
304             'chunk_size {} should be a power of 2'.format(chunk_size))
305
306     nchunks = len(data) // chunk_size  # integer division = implicit floor
307     if overlap:
308         chunk_step = chunk_size / 2
309     else:
310         chunk_step = chunk_size
311
312     win = window(chunk_size)  # generate a window of the appropriate size
313     freq_axis = _numpy.linspace(0, freq / 2, chunk_size / 2 + 1)
314     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
315     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
316     # See Numerical Recipies for a details.
317     power = _numpy.zeros((chunk_size / 2 + 1, ), dtype=_numpy.float)
318     for i in range(nchunks):
319         starti = i * chunk_step
320         stopi = starti + chunk_size
321         fft_chunk = _numpy.fft.rfft(data[starti:stopi] * win)
322         p_chunk = (fft_chunk * fft_chunk.conj()).real
323         power += p_chunk.astype(_numpy.float)
324     power /= _numpy.float(nchunks)
325     return (freq_axis, power)
326
327
328 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
329                                overlap=True, window=window_hann):
330     """Compute the unitary average power spectrum of `data`.
331
332     See Also
333     --------
334     avg_power_spectrum,unitary_power_spectrum
335     """
336     freq_axis,power = avg_power_spectrum(
337         data, freq, chunk_size, overlap, window)
338     #   2.0 / (freq * chunk_size)       |rfft()|**2 --> unitary_power_spectrum
339     # see unitary_power_spectrum()
340     power *= 2.0 / (freq * _numpy.float(chunk_size)) * 8.0 / 3
341     #             * 8/3  to remove power from windowing
342     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
343     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
344     # So our calulated power has and extra <w(t)**2> in it.
345     # For the Hann window,
346     #   <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
347     # For low frequency components,
348     # where the frequency of x(t) is ~= the frequency of w(t),
349     # the normalization is not perfect. ??
350     # The normalization approaches perfection as chunk_size -> infinity.
351     return (freq_axis, power)
352
353
354 class TestRFFT (_unittest.TestCase):
355     r"""Ensure Numpy's FFT algorithm acts as expected.
356
357     Notes
358     -----
359     The expected return values are [#dft]_:
360
361     .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n}
362
363     .. [#dft] See the *Background information* section of :mod:`numpy.fft`.
364     """
365     def run_rfft(self, xs, Xs):
366         i = _numpy.complex(0, 1)
367         n = len(xs)
368         Xa = []
369         for k in range(n):
370             Xa.append(sum([x * _numpy.exp(-2 * _numpy.pi * i * m * k / n)
371                            for x,m in zip(xs, range(n))]))
372             if k < len(Xs):
373                 self.assertAlmostEqual(
374                     (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]), 0, 6,
375                     ('rfft mismatch on element {}: {} != {}, '
376                      'relative error {}').format(
377                         k, Xs[k], Xa[k], (Xs[k] - Xa[k]) / _numpy.abs(Xa[k])))
378         # Which should satisfy the discrete form of Parseval's theorem
379         #   n-1               n-1
380         #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
381         #   m=0               k=0
382         timeSum = sum([_numpy.abs(x)**2 for x in xs])
383         freqSum = sum([_numpy.abs(X)**2 for X in Xa])
384         self.assertAlmostEqual(
385             _numpy.abs(freqSum / _numpy.float(n) - timeSum) / timeSum, 0, 6,
386             "Mismatch on Parseval's, {} != 1/{} * {}".format(
387                 timeSum, n, freqSum))
388
389     def test_rfft(self):
390         "Test NumPy's builtin :func:`numpy.fft.rfft`"
391         xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
392         self.run_rfft(xs, _numpy.fft.rfft(xs))
393
394
395 class TestUnitaryRFFT (_unittest.TestCase):
396     """Verify `unitary_rfft`.
397     """
398     def run_parsevals(self, xs, freq, freqs, Xs):
399         """Check the discretized integral form of Parseval's theorem
400
401         Notes
402         -----
403         Which is:
404
405         .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df
406         """
407         dt = 1.0 / freq
408         df = freqs[1] - freqs[0]
409         self.assertAlmostEqual(
410             (df - 1 / (len(xs) * dt)) / df, 0, 6,
411             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
412         Xa = list(Xs)
413         for k in range(len(Xs) - 1, 1, -1):
414             Xa.append(Xa[k])
415         self.assertEqual(len(xs), len(Xa))
416         lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
417         rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
418         self.assertAlmostEqual(
419             _numpy.abs(lhs - rhs) / lhs, 0, 3,
420             "Mismatch on Parseval's, {} != {}".format(lhs, rhs))
421
422     def test_parsevals(self):
423         "Test unitary rfft on Parseval's theorem"
424         xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
425         dt = _numpy.pi
426         freqs,Xs = unitary_rfft(xs, 1.0 / dt)
427         self.run_parsevals(xs, 1.0 / dt, freqs, Xs)
428
429     def rect(self, t):
430         r"""Rectangle function.
431
432         Notes
433         -----
434
435         .. math::
436
437             \rect(t) = \begin{cases}
438                1& \text{if $|t| < 0.5$}, \\
439                0& \text{if $|t| \ge 0.5$}.
440                        \end{cases}
441         """
442         if _numpy.abs(t) < 0.5:
443             return 1
444         else:
445             return 0
446
447     def run_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
448         r"""Test `unitary_rttf` on known function `rect(at)`.
449
450         Notes
451         -----
452         Analytic result:
453
454         .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a)
455         """
456         samp_freq = _numpy.float(samp_freq)
457         a = _numpy.float(a)
458
459         x = _numpy.zeros((samples,), dtype=_numpy.float)
460         dt = 1.0 / samp_freq
461         for i in range(samples):
462             t = i * dt
463             x[i] = self.rect(a * (t - time_shift))
464         freq_axis,X = unitary_rfft(x, samp_freq)
465
466         # remove the phase due to our time shift
467         j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
468         for i in range(len(freq_axis)):
469             f = freq_axis[i]
470             inverse_phase_shift = _numpy.exp(
471                 j * 2.0 * _numpy.pi * time_shift * f)
472             X[i] *= inverse_phase_shift
473
474         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
475         # normalized sinc(x) = sin(pi x)/(pi x)
476         # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
477         self.assertEqual(_numpy.sinc(0.5), 2.0 / _numpy.pi)
478         for i in range(len(freq_axis)):
479             f = freq_axis[i]
480             expected[i] = 1.0 / _numpy.abs(a) * _numpy.sinc(f / a)
481
482         if TEST_PLOTS:
483             if _pyplot is None:
484                 raise _pyplot_import_error
485             figure = _pyplot.figure()
486             time_axes = figure.add_subplot(2, 1, 1)
487             time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
488             time_axes.set_title('time series')
489             freq_axes = figure.add_subplot(2, 1, 2)
490             freq_axes.plot(freq_axis, X.real, 'r.')
491             freq_axes.plot(freq_axis, X.imag, 'g.')
492             freq_axes.plot(freq_axis, expected, 'b-')
493             freq_axes.set_title('freq series')
494
495     def test_rect(self):
496         "Test unitary FFTs on variously shaped rectangular functions."
497         self.run_rect(a=0.5)
498         self.run_rect(a=2.0)
499         self.run_rect(a=0.7, samp_freq=50, samples=512)
500         self.run_rect(a=3.0, samp_freq=60, samples=1024)
501
502     def gaussian(self, a, t):
503         r"""Gaussian function.
504
505         Notes
506         -----
507
508         .. math:: \gaussian(a,t) = \exp^{-at^2}
509         """
510         return _numpy.exp(-a * t**2)
511
512     def run_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
513         r"""Test `unitary_rttf` on known function `gaussian(a,t)`.
514
515         Notes
516         -----
517         Analytic result:
518
519         .. math::
520
521             \rfft(\gaussian(a,t)) = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f)
522         """
523         samp_freq = _numpy.float(samp_freq)
524         a = _numpy.float(a)
525
526         x = _numpy.zeros((samples,), dtype=_numpy.float)
527         dt = 1.0 / samp_freq
528         for i in range(samples):
529             t = i * dt
530             x[i] = self.gaussian(a, (t - time_shift))
531         freq_axis,X = unitary_rfft(x, samp_freq)
532
533         # remove the phase due to our time shift
534         j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
535         for i in range(len(freq_axis)):
536             f = freq_axis[i]
537             inverse_phase_shift = _numpy.exp(
538                 j * 2.0 * _numpy.pi * time_shift * f)
539             X[i] *= inverse_phase_shift
540
541         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
542         for i in range(len(freq_axis)):
543             f = freq_axis[i]
544             # see Wikipedia, or do the integral yourself.
545             expected[i] = _numpy.sqrt(_numpy.pi / a) * self.gaussian(
546                 1.0 / a, _numpy.pi * f)
547
548         if TEST_PLOTS:
549             if _pyplot is None:
550                 raise _pyplot_import_error
551             figure = _pyplot.figure()
552             time_axes = figure.add_subplot(2, 1, 1)
553             time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
554             time_axes.set_title('time series')
555             freq_axes = figure.add_subplot(2, 1, 2)
556             freq_axes.plot(freq_axis, X.real, 'r.')
557             freq_axes.plot(freq_axis, X.imag, 'g.')
558             freq_axes.plot(freq_axis, expected, 'b-')
559             freq_axes.set_title('freq series')
560
561     def test_gaussian(self):
562         "Test unitary FFTs on variously shaped gaussian functions."
563         self.run_gaussian(a=0.5)
564         self.run_gaussian(a=2.0)
565         self.run_gaussian(a=0.7, samp_freq=50, samples=512)
566         self.run_gaussian(a=3.0, samp_freq=60, samples=1024)
567
568
569 class TestUnitaryPowerSpectrum (_unittest.TestCase):
570     def run_sin(self, sin_freq=10, samp_freq=512, samples=1024):
571         x = _numpy.zeros((samples,), dtype=_numpy.float)
572         samp_freq = _numpy.float(samp_freq)
573         for i in range(samples):
574             x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
575         freq_axis,power = unitary_power_spectrum(x, samp_freq)
576         imax = _numpy.argmax(power)
577
578         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
579         # df = 1/T, where T = total_time
580         df = samp_freq / _numpy.float(samples)
581         i = int(sin_freq / df)
582         # average power per unit time is
583         #  P = <x(t)**2>
584         # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
585         # so average value of (int sin(t)**2 dt) per unit time is 0.5
586         #  P = 0.5
587         # we spread that power over a frequency bin of width df, sp
588         #  P(f0) = 0.5/df
589         # where f0 is the sin's frequency
590         #
591         # or:
592         # FFT of sin(2*pi*t*f0) gives
593         #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
594         # (area under x(t) = 0, area under X(f) = 0)
595         # so one sided power spectral density (PSD) per unit time is
596         #  P(f) = 2 |X(f)|**2 / T
597         #       = 2 * |0.5 delta(f-f0)|**2 / T
598         #       = 0.5 * |delta(f-f0)|**2 / T
599         # but we're discrete and want the integral of the 'delta' to be 1,
600         # so 'delta'*df = 1  --> 'delta' = 1/df, and
601         #  P(f) = 0.5 / (df**2 * T)
602         #       = 0.5 / df                (T = 1/df)
603         expected[i] = 0.5 / df
604
605         LOG.debug('The power should be a peak at {} Hz of {} ({}, {})'.format(
606                 sin_freq, expected[i], freq_axis[imax], power[imax]))
607         Pexp = P = 0
608         for i in range(len(freq_axis)):
609             Pexp += expected[i] * df
610             P += power[i] * df
611         self.assertAlmostEqual(
612             _numpy.abs((P - Pexp) / Pexp), 0, 1,
613             'The total power should be {} ({})'.format(Pexp, P))
614
615         if TEST_PLOTS:
616             if _pyplot is None:
617                 raise _pyplot_import_error
618             figure = _pyplot.figure()
619             time_axes = figure.add_subplot(2, 1, 1)
620             time_axes.plot(
621                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
622             time_axes.set_title('time series')
623             freq_axes = figure.add_subplot(2, 1, 2)
624             freq_axes.plot(freq_axis, power, 'r.')
625             freq_axes.plot(freq_axis, expected, 'b-')
626             freq_axes.set_title(
627                 '{} samples of sin at {} Hz'.format(samples, sin_freq))
628
629     def test_sin(self):
630         "Test unitary power spectrums on variously shaped sin functions"
631         self.run_sin(sin_freq=5, samp_freq=512, samples=1024)
632         self.run_sin(sin_freq=5, samp_freq=512, samples=2048)
633         self.run_sin(sin_freq=5, samp_freq=512, samples=4098)
634         self.run_sin(sin_freq=7, samp_freq=512, samples=1024)
635         self.run_sin(sin_freq=5, samp_freq=1024, samples=2048)
636         # finally, with some irrational numbers, to check that I'm not
637         # getting lucky
638         self.run_sin(
639             sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)
640         # test with non-integer number of periods
641         self.run_sin(sin_freq=5, samp_freq=512, samples=256)
642
643     def run_delta(self, amp=1, samp_freq=1, samples=256):
644         """TODO
645         """
646         x = _numpy.zeros((samples,), dtype=_numpy.float)
647         samp_freq = _numpy.float(samp_freq)
648         x[0] = amp
649         freq_axis,power = unitary_power_spectrum(x, samp_freq)
650
651         # power = <x(t)**2> = (amp)**2 * dt/T
652         # we spread that power over the entire freq_axis [0,fN], so
653         #  P(f)  = (amp)**2 dt / (T fN)
654         # where
655         #  dt = 1/samp_freq        (sample period)
656         #  T  = samples/samp_freq  (total time of data aquisition)
657         #  fN = 0.5 samp_freq      (Nyquist frequency)
658         # so
659         #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
660         #       = 2 amp**2 / (samp_freq*samples)
661         expected_amp = 2.0 * amp**2 / (samp_freq * samples)
662         expected = _numpy.ones(
663             (len(freq_axis),), dtype=_numpy.float) * expected_amp
664
665         self.assertAlmostEqual(
666             expected_amp, power[0], 4,
667             'The power should be flat at y = {} ({})'.format(
668                 expected_amp, power[0]))
669
670         if TEST_PLOTS:
671             if _pyplot is None:
672                 raise _pyplot_import_error
673             figure = _pyplot.figure()
674             time_axes = figure.add_subplot(2, 1, 1)
675             time_axes.plot(
676                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
677             time_axes.set_title('time series')
678             freq_axes = figure.add_subplot(2, 1, 2)
679             freq_axes.plot(freq_axis, power, 'r.')
680             freq_axes.plot(freq_axis, expected, 'b-')
681             freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
682
683     def test_delta(self):
684         "Test unitary power spectrums on various delta functions"
685         self.run_delta(amp=1, samp_freq=1.0, samples=1024)
686         self.run_delta(amp=1, samp_freq=1.0, samples=2048)
687         # expected = 2*computed
688         self.run_delta(amp=1, samp_freq=0.5, samples=2048)
689         # expected = 0.5*computed
690         self.run_delta(amp=1, samp_freq=2.0, samples=2048)
691         self.run_delta(amp=3, samp_freq=1.0, samples=1024)
692         self.run_delta(amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
693
694     def gaussian(self, area, mean, std, t):
695         "Integral over all time = area (i.e. normalized for area=1)"
696         return area / (std * _numpy.sqrt(2.0 * _numpy.pi)) * _numpy.exp(
697             -0.5 * ((t-mean)/std)**2)
698
699     def run_gaussian(self, area=2.5, mean=5, std=1, samp_freq=10.24,
700                      samples=512):
701         """TODO.
702         """
703         x = _numpy.zeros((samples,), dtype=_numpy.float)
704         mean = _numpy.float(mean)
705         for i in range(samples):
706             t = i / _numpy.float(samp_freq)
707             x[i] = self.gaussian(area, mean, std, t)
708         freq_axis,power = unitary_power_spectrum(x, samp_freq)
709
710         # generate the predicted curve by comparing our
711         # TestUnitaryPowerSpectrum.gaussian() form to
712         # TestUnitaryRFFT.gaussian(),
713         # we see that the Fourier transform of x(t) has parameters:
714         #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are
715         #                           converting to angular frequency,
716         #                           not frequency like we are)
717         #  area' = area/[std sqrt(2*pi)]   (plugging into FT of
718         #                                   TestUnitaryRFFT.gaussian() above)
719         #  mean' = 0               (changing the mean in the time-domain just
720         #                           changes the phase in the freq-domain)
721         # So our power spectral density per unit time is given by
722         #  P(f) = 2 |X(f)|**2 / T
723         # Where
724         #  T  = samples/samp_freq  (total time of data aquisition)
725         mean = 0.0
726         area = area / (std * _numpy.sqrt(2.0 * _numpy.pi))
727         std = 1.0 / (2.0 * _numpy.pi * std)
728         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
729         # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
730         df = _numpy.float(samp_freq) / samples
731         for i in range(len(freq_axis)):
732             f = i * df
733             gaus = self.gaussian(area, mean, std, f)
734             expected[i] = 2.0 * gaus**2 * samp_freq / samples
735         self.assertAlmostEqual(
736             expected[0], power[0], 3,
737             ('The power should be a half-gaussian, '
738              'with a peak at 0 Hz with amplitude {} ({})').format(
739                 expected[0], power[0]))
740
741         if TEST_PLOTS:
742             if _pyplot is None:
743                 raise _pyplot_import_error
744             figure = _pyplot.figure()
745             time_axes = figure.add_subplot(2, 1, 1)
746             time_axes.plot(
747                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
748                 x, 'b-')
749             time_axes.set_title('time series')
750             freq_axes = figure.add_subplot(2, 1, 2)
751             freq_axes.plot(freq_axis, power, 'r.')
752             freq_axes.plot(freq_axis, expected, 'b-')
753             freq_axes.set_title('freq series')
754
755     def test_gaussian(self):
756         "Test unitary power spectrums on various gaussian functions"
757         for area in [1, _numpy.pi]:
758             for std in [1, _numpy.sqrt(2)]:
759                 for samp_freq in [10.0, _numpy.exp(1)]:
760                     for samples in [1024, 2048]:
761                         self.run_gaussian(
762                             area=area, std=std, samp_freq=samp_freq,
763                             samples=samples)
764
765
766 class TestUnitaryAvgPowerSpectrum (_unittest.TestCase):
767     def run_sin(self, sin_freq=10, samp_freq=512, samples=1024, chunk_size=512,
768                 overlap=True, window=window_hann, places=3):
769         """TODO
770         """
771         x = _numpy.zeros((samples,), dtype=_numpy.float)
772         samp_freq = _numpy.float(samp_freq)
773         for i in range(samples):
774             x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
775         freq_axis,power = unitary_avg_power_spectrum(
776             x, samp_freq, chunk_size, overlap, window)
777         imax = _numpy.argmax(power)
778
779         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
780         # df = 1/T, where T = total_time
781         df = samp_freq / _numpy.float(chunk_size)
782         i = int(sin_freq / df)
783         # see TestUnitaryPowerSpectrum.run_unitary_power_spectrum_sin()
784         expected[i] = 0.5 / df
785
786         LOG.debug('The power should peak at {} Hz of {} ({}, {})'.format(
787                 sin_freq, expected[i], freq_axis[imax], power[imax]))
788         Pexp = P = 0
789         for i in range(len(freq_axis)):
790             Pexp += expected[i] * df
791             P += power[i] * df
792         self.assertAlmostEqual(
793             Pexp, P, places,
794             'The total power should be {} ({})'.format(Pexp, P))
795
796         if TEST_PLOTS:
797             if _pyplot is None:
798                 raise _pyplot_import_error
799             figure = _pyplot.figure()
800             time_axes = figure.add_subplot(2, 1, 1)
801             time_axes.plot(
802                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
803                 x, 'b-')
804             time_axes.set_title('time series')
805             freq_axes = figure.add_subplot(2, 1, 2)
806             freq_axes.plot(freq_axis, power, 'r.')
807             freq_axes.plot(freq_axis, expected, 'b-')
808             freq_axes.set_title(
809                 '{} samples of sin at {} Hz'.format(samples, sin_freq))
810
811     def test_sin(self):
812         "Test unitary avg power spectrums on variously shaped sin functions."
813         self.run_sin(sin_freq=5, samp_freq=512, samples=1024)
814         self.run_sin(sin_freq=5, samp_freq=512, samples=2048)
815         self.run_sin(sin_freq=5, samp_freq=512, samples=4098)
816         self.run_sin(sin_freq=17, samp_freq=512, samples=1024)
817         self.run_sin(sin_freq=5, samp_freq=1024, samples=2048)
818         # test long wavelenth sin, so be closer to window frequency
819         self.run_sin(sin_freq=1, samp_freq=1024, samples=2048, places=0)
820         # finally, with some irrational numbers, to check that I'm not
821         # getting lucky
822         self.run_sin(
823             sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)