FFT_tools: Catch RuntimeErrors when importing matplotlib
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's :py:mod:`~numpy.fft` module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 :py:func:`numpy.fft.rfft`.
19
20 Main entry functions:
21
22 * :py:func:`unitary_rfft`
23 * :py:func:`power_spectrum`
24 * :py:func:`unitary_power_spectrum`
25 * :py:func:`avg_power_spectrum`
26 * :py:func:`unitary_avg_power_spectrum`
27 """
28
29 import logging as _logging
30 import unittest as _unittest
31
32 import numpy as _numpy
33 try:
34     import matplotlib.pyplot as _pyplot
35 except (ImportError, RuntimeError) as e:
36     _pyplot = None
37     _pyplot_import_error = e
38
39
40 __version__ = '0.5'
41
42
43 LOG = _logging.getLogger('FFT-tools')
44 LOG.addHandler(_logging.StreamHandler())
45 LOG.setLevel(_logging.ERROR)
46
47
48 # Display time- and freq-space plots of the test transforms if True
49 TEST_PLOTS = False
50
51
52 def floor_pow_of_two(num):
53     """Round `num` down to the closest exact a power of two.
54
55     Examples
56     --------
57
58     >>> floor_pow_of_two(3)
59     2
60     >>> floor_pow_of_two(11)
61     8
62     >>> floor_pow_of_two(15)
63     8
64     """
65     lnum = _numpy.log2(num)
66     if int(lnum) != lnum:
67         num = 2**_numpy.floor(lnum)
68     return int(num)
69
70
71 def round_pow_of_two(num):
72     """Round `num` to the closest exact a power of two on a log scale.
73
74     Examples
75     --------
76
77     >>> round_pow_of_two(2.9) # Note rounding on *log scale*
78     4
79     >>> round_pow_of_two(11)
80     8
81     >>> round_pow_of_two(15)
82     16
83     """
84     lnum = _numpy.log2(num)
85     if int(lnum) != lnum:
86         num = 2**_numpy.round(lnum)
87     return int(num)
88
89
90 def ceil_pow_of_two(num):
91     """Round `num` up to the closest exact a power of two.
92
93     Examples
94     --------
95
96     >>> ceil_pow_of_two(3)
97     4
98     >>> ceil_pow_of_two(11)
99     16
100     >>> ceil_pow_of_two(15)
101     16
102     """
103     lnum = _numpy.log2(num)
104     if int(lnum) != lnum:
105         num = 2**_numpy.ceil(lnum)
106     return int(num)
107
108
109 def unitary_rfft(data, freq=1.0):
110     """Compute the unitary Fourier transform of real data.
111
112     Unitary = preserves power [Parseval's theorem].
113
114     Parameters
115     ----------
116     data : iterable
117         Real (not complex) data taken with a sampling frequency `freq`.
118     freq : real
119         Sampling frequency.
120
121     Returns
122     -------
123     freq_axis,trans : numpy.ndarray
124         Arrays ready for plotting.
125
126     Notes
127     -----
128     If the units on your data are Volts,
129     and your sampling frequency is in Hz,
130     then `freq_axis` will be in Hz,
131     and `trans` will be in Volts.
132     """
133     nsamps = floor_pow_of_two(len(data))
134     # Which should satisfy the discrete form of Parseval's theorem
135     #   n-1               n-1
136     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
137     #   m=0               k=0
138     # However, we want our FFT to satisfy the continuous Parseval eqn
139     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
140     # which has the discrete form
141     #   n-1              n-1
142     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
143     #   m=0              k=0
144     # with X'_k = AX, this gives us
145     #   n-1                     n-1
146     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
147     #   m=0                     k=0
148     # so we see
149     #   A^2 df/dt = 1/n
150     #   A^2 = 1/n dt/df
151     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
152     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
153     # f_c in the transformed data is the Nyquist frequency (12.1.2)
154     #   f_c = 1/2dt
155     # and the points are spaced out by (12.1.5)
156     #   df = 1/ndt
157     # so
158     #   dt = 1/ndf
159     #   dt/df = 1/ndf^2
160     #   A^2 = 1/n^2df^2
161     #   A = 1/ndf = ndt/n = dt
162     # so we can convert the Numpy transformed data to match our unitary
163     # continuous transformed data with (also NR 12.1.8)
164     #   X'_k = dtX = X / <sampling freq>
165     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
166     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
167     return (freq_axis, trans)
168
169
170 def power_spectrum(data, freq=1.0):
171     """Compute the power spectrum of the time series `data`.
172
173     Parameters
174     ----------
175     data : iterable
176         Real (not complex) data taken with a sampling frequency `freq`.
177     freq : real
178         Sampling frequency.
179
180     Returns
181     -------
182     freq_axis,power : numpy.ndarray
183         Arrays ready for plotting.
184
185     Notes
186     -----
187     If the number of samples in `data` is not an integer power of two,
188     the FFT ignores some of the later points.
189
190     See Also
191     --------
192     unitary_power_spectrum,avg_power_spectrum
193     """
194     nsamps = floor_pow_of_two(len(data))
195
196     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
197     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
198     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
199     # See Numerical Recipies for a details.
200     trans = _numpy.fft.rfft(data[0:nsamps])
201     power = (trans * trans.conj()).real  # we want the square of the amplitude
202     return (freq_axis, power)
203
204
205 def unitary_power_spectrum(data, freq=1.0):
206     """Compute the unitary power spectrum of the time series `data`.
207
208     See Also
209     --------
210     power_spectrum,unitary_avg_power_spectrum
211     """
212     freq_axis,power = power_spectrum(data, freq)
213     # One sided power spectral density, so 2|H(f)|**2
214     # (see NR 2nd edition 12.0.14, p498)
215     #
216     # numpy normalizes with 1/N on the inverse transform ifft,
217     # so we should normalize the freq-space representation with 1/sqrt(N).
218     # But we're using the rfft, where N points are like N/2 complex points,
219     # so 1/sqrt(N/2)
220     # So the power gets normalized by that twice and we have 2/N
221     #
222     # On top of this, the FFT assumes a sampling freq of 1 per second,
223     # and we want to preserve area under our curves.
224     # If our total time T = len(data)/freq is smaller than 1,
225     # our df_real = freq/len(data) is bigger that the FFT expects
226     # (dt_fft = 1/len(data)),
227     # and we need to scale the powers down to conserve area.
228     # df_fft * F_fft(f) = df_real *F_real(f)
229     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
230     # So the power gets normalized by *that* twice and we have 2/N * freq**2
231
232     # power per unit time
233     # measure x(t) for time T
234     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
235     # PSD(f) = 2 |X(f)|**2 / T
236
237     # total_time = len(data)/float(freq)
238     # power *= 2.0 / float(freq)**2   /   total_time
239     # power *= 2.0 / freq**2   *   freq / len(data)
240     power *= 2.0 / (freq * _numpy.float(len(data)))
241
242     return (freq_axis, power)
243
244
245 def window_hann(length):
246     r"""Returns a Hann window array with length entries
247
248     Notes
249     -----
250     The Hann window with length :math:`L` is defined as
251
252     .. math:: w_i = \frac{1}{2} (1-\cos(2\pi i/L))
253     """
254     win = _numpy.zeros((length,), dtype=_numpy.float)
255     for i in range(length):
256         win[i] = 0.5 * (
257             1.0 - _numpy.cos(2.0 * _numpy.pi * _numpy.float(i) / (length)))
258     # avg value of cos over a period is 0
259     # so average height of Hann window is 0.5
260     return win
261
262
263 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
264                        overlap=True, window=window_hann):
265     """Compute the avgerage power spectrum of `data`.
266
267     Parameters
268     ----------
269     data : iterable
270         Real (not complex) data taken with a sampling frequency `freq`.
271     freq : real
272         Sampling frequency.
273     chunk_size : int
274         Number of samples per chunk.  Use a power of two.
275     overlap: {True,False}
276         If `True`, each chunk overlaps the previous chunk by half its
277         length.  Otherwise, the chunks are end-to-end, and not
278         overlapping.
279     window: iterable
280         Weights used to "smooth" the chunks, there is a whole science
281         behind windowing, but if you're not trying to squeeze every
282         drop of information out of your data, you'll be OK with the
283         default Hann window.
284
285     Returns
286     -------
287     freq_axis,power : numpy.ndarray
288         Arrays ready for plotting.
289
290     Notes
291     -----
292     The average power spectrum is computed by breaking `data` into
293     chunks of length `chunk_size`.  These chunks are transformed
294     individually into frequency space and then averaged together.
295
296     See Numerical Recipes 2 section 13.4 for a good introduction to
297     the theory.
298
299     If the number of samples in `data` is not a multiple of
300     `chunk_size`, we ignore the extra points.
301     """
302     if chunk_size != floor_pow_of_two(chunk_size):
303         raise ValueError(
304             'chunk_size {} should be a power of 2'.format(chunk_size))
305
306     nchunks = len(data) // chunk_size  # integer division = implicit floor
307     if overlap:
308         chunk_step = chunk_size / 2
309     else:
310         chunk_step = chunk_size
311
312     win = window(chunk_size)  # generate a window of the appropriate size
313     freq_axis = _numpy.linspace(0, freq / 2, chunk_size / 2 + 1)
314     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
315     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
316     # See Numerical Recipies for a details.
317     power = _numpy.zeros((chunk_size / 2 + 1, ), dtype=_numpy.float)
318     for i in range(nchunks):
319         starti = i * chunk_step
320         stopi = starti + chunk_size
321         fft_chunk = _numpy.fft.rfft(data[starti:stopi] * win)
322         p_chunk = (fft_chunk * fft_chunk.conj()).real
323         power += p_chunk.astype(_numpy.float)
324     power /= _numpy.float(nchunks)
325     return (freq_axis, power)
326
327
328 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
329                                overlap=True, window=window_hann):
330     """Compute the unitary average power spectrum of `data`.
331
332     See Also
333     --------
334     avg_power_spectrum,unitary_power_spectrum
335     """
336     freq_axis,power = avg_power_spectrum(
337         data, freq, chunk_size, overlap, window)
338     #   2.0 / (freq * chunk_size)       |rfft()|**2 --> unitary_power_spectrum
339     # see unitary_power_spectrum()
340     power *= 2.0 / (freq * _numpy.float(chunk_size)) * 8.0 / 3
341     #             * 8/3  to remove power from windowing
342     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
343     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
344     # So our calulated power has and extra <w(t)**2> in it.
345     # For the Hann window,
346     #   <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
347     # For low frequency components,
348     # where the frequency of x(t) is ~= the frequency of w(t),
349     # the normalization is not perfect. ??
350     # The normalization approaches perfection as chunk_size -> infinity.
351     return (freq_axis, power)
352
353
354 class TestRFFT (_unittest.TestCase):
355     r"""Ensure Numpy's FFT algorithm acts as expected.
356
357     Notes
358     -----
359     The expected return values are [#dft]_:
360
361     .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n}
362
363     .. [#dft] See the *Background information* section of
364        :py:mod:`numpy.fft`.
365     """
366     def run_rfft(self, xs, Xs):
367         i = _numpy.complex(0, 1)
368         n = len(xs)
369         Xa = []
370         for k in range(n):
371             Xa.append(sum([x * _numpy.exp(-2 * _numpy.pi * i * m * k / n)
372                            for x,m in zip(xs, range(n))]))
373             if k < len(Xs):
374                 self.assertAlmostEqual(
375                     (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]), 0, 6,
376                     ('rfft mismatch on element {}: {} != {}, '
377                      'relative error {}').format(
378                         k, Xs[k], Xa[k], (Xs[k] - Xa[k]) / _numpy.abs(Xa[k])))
379         # Which should satisfy the discrete form of Parseval's theorem
380         #   n-1               n-1
381         #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
382         #   m=0               k=0
383         timeSum = sum([_numpy.abs(x)**2 for x in xs])
384         freqSum = sum([_numpy.abs(X)**2 for X in Xa])
385         self.assertAlmostEqual(
386             _numpy.abs(freqSum / _numpy.float(n) - timeSum) / timeSum, 0, 6,
387             "Mismatch on Parseval's, {} != 1/{} * {}".format(
388                 timeSum, n, freqSum))
389
390     def test_rfft(self):
391         "Test NumPy's builtin :py:func:`numpy.fft.rfft`"
392         xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
393         self.run_rfft(xs, _numpy.fft.rfft(xs))
394
395
396 class TestUnitaryRFFT (_unittest.TestCase):
397     """Verify :py:func:`unitary_rfft`.
398     """
399     def run_parsevals(self, xs, freq, freqs, Xs):
400         """Check the discretized integral form of Parseval's theorem
401
402         Notes
403         -----
404         Which is:
405
406         .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df
407         """
408         dt = 1.0 / freq
409         df = freqs[1] - freqs[0]
410         self.assertAlmostEqual(
411             (df - 1 / (len(xs) * dt)) / df, 0, 6,
412             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
413         Xa = list(Xs)
414         for k in range(len(Xs) - 1, 1, -1):
415             Xa.append(Xa[k])
416         self.assertEqual(len(xs), len(Xa))
417         lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
418         rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
419         self.assertAlmostEqual(
420             _numpy.abs(lhs - rhs) / lhs, 0, 3,
421             "Mismatch on Parseval's, {} != {}".format(lhs, rhs))
422
423     def test_parsevals(self):
424         "Test unitary rfft on Parseval's theorem"
425         xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
426         dt = _numpy.pi
427         freqs,Xs = unitary_rfft(xs, 1.0 / dt)
428         self.run_parsevals(xs, 1.0 / dt, freqs, Xs)
429
430     def rect(self, t):
431         r"""Rectangle function.
432
433         Notes
434         -----
435
436         .. math::
437
438             \rect(t) = \begin{cases}
439                1& \text{if $|t| < 0.5$}, \\
440                0& \text{if $|t| \ge 0.5$}.
441                              \end{cases}
442         """
443         if _numpy.abs(t) < 0.5:
444             return 1
445         else:
446             return 0
447
448     def run_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
449         r"""Test :py:func:`unitary_rfft` on known function :py:meth:`rect`.
450
451         Notes
452         -----
453         Analytic result:
454
455         .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a)
456         """
457         samp_freq = _numpy.float(samp_freq)
458         a = _numpy.float(a)
459
460         x = _numpy.zeros((samples,), dtype=_numpy.float)
461         dt = 1.0 / samp_freq
462         for i in range(samples):
463             t = i * dt
464             x[i] = self.rect(a * (t - time_shift))
465         freq_axis,X = unitary_rfft(x, samp_freq)
466
467         # remove the phase due to our time shift
468         j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
469         for i in range(len(freq_axis)):
470             f = freq_axis[i]
471             inverse_phase_shift = _numpy.exp(
472                 j * 2.0 * _numpy.pi * time_shift * f)
473             X[i] *= inverse_phase_shift
474
475         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
476         # normalized sinc(x) = sin(pi x)/(pi x)
477         # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
478         self.assertEqual(_numpy.sinc(0.5), 2.0 / _numpy.pi)
479         for i in range(len(freq_axis)):
480             f = freq_axis[i]
481             expected[i] = 1.0 / _numpy.abs(a) * _numpy.sinc(f / a)
482
483         if TEST_PLOTS:
484             if _pyplot is None:
485                 raise _pyplot_import_error
486             figure = _pyplot.figure()
487             time_axes = figure.add_subplot(2, 1, 1)
488             time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
489             time_axes.set_title('time series')
490             freq_axes = figure.add_subplot(2, 1, 2)
491             freq_axes.plot(freq_axis, X.real, 'r.')
492             freq_axes.plot(freq_axis, X.imag, 'g.')
493             freq_axes.plot(freq_axis, expected, 'b-')
494             freq_axes.set_title('freq series')
495
496     def test_rect(self):
497         "Test unitary FFTs on variously shaped rectangular functions."
498         self.run_rect(a=0.5)
499         self.run_rect(a=2.0)
500         self.run_rect(a=0.7, samp_freq=50, samples=512)
501         self.run_rect(a=3.0, samp_freq=60, samples=1024)
502
503     def gaussian(self, a, t):
504         r"""Gaussian function.
505
506         Notes
507         -----
508
509         .. math:: \gaussian(a,t) = \exp^{-at^2}
510         """
511         return _numpy.exp(-a * t**2)
512
513     def run_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
514         r"""Test :py:func:`unitary_rttf` on known function :py:meth:`gaussian`.
515
516         Notes
517         -----
518         Analytic result:
519
520         .. math::
521
522             \rfft(\gaussian(a,t))
523               = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f)
524         """
525         samp_freq = _numpy.float(samp_freq)
526         a = _numpy.float(a)
527
528         x = _numpy.zeros((samples,), dtype=_numpy.float)
529         dt = 1.0 / samp_freq
530         for i in range(samples):
531             t = i * dt
532             x[i] = self.gaussian(a, (t - time_shift))
533         freq_axis,X = unitary_rfft(x, samp_freq)
534
535         # remove the phase due to our time shift
536         j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
537         for i in range(len(freq_axis)):
538             f = freq_axis[i]
539             inverse_phase_shift = _numpy.exp(
540                 j * 2.0 * _numpy.pi * time_shift * f)
541             X[i] *= inverse_phase_shift
542
543         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
544         for i in range(len(freq_axis)):
545             f = freq_axis[i]
546             # see Wikipedia, or do the integral yourself.
547             expected[i] = _numpy.sqrt(_numpy.pi / a) * self.gaussian(
548                 1.0 / a, _numpy.pi * f)
549
550         if TEST_PLOTS:
551             if _pyplot is None:
552                 raise _pyplot_import_error
553             figure = _pyplot.figure()
554             time_axes = figure.add_subplot(2, 1, 1)
555             time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
556             time_axes.set_title('time series')
557             freq_axes = figure.add_subplot(2, 1, 2)
558             freq_axes.plot(freq_axis, X.real, 'r.')
559             freq_axes.plot(freq_axis, X.imag, 'g.')
560             freq_axes.plot(freq_axis, expected, 'b-')
561             freq_axes.set_title('freq series')
562
563     def test_gaussian(self):
564         "Test unitary FFTs on variously shaped gaussian functions."
565         self.run_gaussian(a=0.5)
566         self.run_gaussian(a=2.0)
567         self.run_gaussian(a=0.7, samp_freq=50, samples=512)
568         self.run_gaussian(a=3.0, samp_freq=60, samples=1024)
569
570
571 class TestUnitaryPowerSpectrum (_unittest.TestCase):
572     def run_sin(self, sin_freq=10, samp_freq=512, samples=1024):
573         x = _numpy.zeros((samples,), dtype=_numpy.float)
574         samp_freq = _numpy.float(samp_freq)
575         for i in range(samples):
576             x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
577         freq_axis,power = unitary_power_spectrum(x, samp_freq)
578         imax = _numpy.argmax(power)
579
580         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
581         # df = 1/T, where T = total_time
582         df = samp_freq / _numpy.float(samples)
583         i = int(sin_freq / df)
584         # average power per unit time is
585         #  P = <x(t)**2>
586         # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
587         # so average value of (int sin(t)**2 dt) per unit time is 0.5
588         #  P = 0.5
589         # we spread that power over a frequency bin of width df, sp
590         #  P(f0) = 0.5/df
591         # where f0 is the sin's frequency
592         #
593         # or:
594         # FFT of sin(2*pi*t*f0) gives
595         #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
596         # (area under x(t) = 0, area under X(f) = 0)
597         # so one sided power spectral density (PSD) per unit time is
598         #  P(f) = 2 |X(f)|**2 / T
599         #       = 2 * |0.5 delta(f-f0)|**2 / T
600         #       = 0.5 * |delta(f-f0)|**2 / T
601         # but we're discrete and want the integral of the 'delta' to be 1,
602         # so 'delta'*df = 1  --> 'delta' = 1/df, and
603         #  P(f) = 0.5 / (df**2 * T)
604         #       = 0.5 / df                (T = 1/df)
605         expected[i] = 0.5 / df
606
607         LOG.debug('The power should be a peak at {} Hz of {} ({}, {})'.format(
608                 sin_freq, expected[i], freq_axis[imax], power[imax]))
609         Pexp = P = 0
610         for i in range(len(freq_axis)):
611             Pexp += expected[i] * df
612             P += power[i] * df
613         self.assertAlmostEqual(
614             _numpy.abs((P - Pexp) / Pexp), 0, 1,
615             'The total power should be {} ({})'.format(Pexp, P))
616
617         if TEST_PLOTS:
618             if _pyplot is None:
619                 raise _pyplot_import_error
620             figure = _pyplot.figure()
621             time_axes = figure.add_subplot(2, 1, 1)
622             time_axes.plot(
623                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
624             time_axes.set_title('time series')
625             freq_axes = figure.add_subplot(2, 1, 2)
626             freq_axes.plot(freq_axis, power, 'r.')
627             freq_axes.plot(freq_axis, expected, 'b-')
628             freq_axes.set_title(
629                 '{} samples of sin at {} Hz'.format(samples, sin_freq))
630
631     def test_sin(self):
632         "Test unitary power spectrums on variously shaped sin functions"
633         self.run_sin(sin_freq=5, samp_freq=512, samples=1024)
634         self.run_sin(sin_freq=5, samp_freq=512, samples=2048)
635         self.run_sin(sin_freq=5, samp_freq=512, samples=4098)
636         self.run_sin(sin_freq=7, samp_freq=512, samples=1024)
637         self.run_sin(sin_freq=5, samp_freq=1024, samples=2048)
638         # finally, with some irrational numbers, to check that I'm not
639         # getting lucky
640         self.run_sin(
641             sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)
642         # test with non-integer number of periods
643         self.run_sin(sin_freq=5, samp_freq=512, samples=256)
644
645     def run_delta(self, amp=1, samp_freq=1, samples=256):
646         """TODO
647         """
648         x = _numpy.zeros((samples,), dtype=_numpy.float)
649         samp_freq = _numpy.float(samp_freq)
650         x[0] = amp
651         freq_axis,power = unitary_power_spectrum(x, samp_freq)
652
653         # power = <x(t)**2> = (amp)**2 * dt/T
654         # we spread that power over the entire freq_axis [0,fN], so
655         #  P(f)  = (amp)**2 dt / (T fN)
656         # where
657         #  dt = 1/samp_freq        (sample period)
658         #  T  = samples/samp_freq  (total time of data aquisition)
659         #  fN = 0.5 samp_freq      (Nyquist frequency)
660         # so
661         #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
662         #       = 2 amp**2 / (samp_freq*samples)
663         expected_amp = 2.0 * amp**2 / (samp_freq * samples)
664         expected = _numpy.ones(
665             (len(freq_axis),), dtype=_numpy.float) * expected_amp
666
667         self.assertAlmostEqual(
668             expected_amp, power[0], 4,
669             'The power should be flat at y = {} ({})'.format(
670                 expected_amp, power[0]))
671
672         if TEST_PLOTS:
673             if _pyplot is None:
674                 raise _pyplot_import_error
675             figure = _pyplot.figure()
676             time_axes = figure.add_subplot(2, 1, 1)
677             time_axes.plot(
678                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
679             time_axes.set_title('time series')
680             freq_axes = figure.add_subplot(2, 1, 2)
681             freq_axes.plot(freq_axis, power, 'r.')
682             freq_axes.plot(freq_axis, expected, 'b-')
683             freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
684
685     def test_delta(self):
686         "Test unitary power spectrums on various delta functions"
687         self.run_delta(amp=1, samp_freq=1.0, samples=1024)
688         self.run_delta(amp=1, samp_freq=1.0, samples=2048)
689         # expected = 2*computed
690         self.run_delta(amp=1, samp_freq=0.5, samples=2048)
691         # expected = 0.5*computed
692         self.run_delta(amp=1, samp_freq=2.0, samples=2048)
693         self.run_delta(amp=3, samp_freq=1.0, samples=1024)
694         self.run_delta(amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
695
696     def gaussian(self, area, mean, std, t):
697         "Integral over all time = area (i.e. normalized for area=1)"
698         return area / (std * _numpy.sqrt(2.0 * _numpy.pi)) * _numpy.exp(
699             -0.5 * ((t-mean)/std)**2)
700
701     def run_gaussian(self, area=2.5, mean=5, std=1, samp_freq=10.24,
702                      samples=512):
703         """TODO.
704         """
705         x = _numpy.zeros((samples,), dtype=_numpy.float)
706         mean = _numpy.float(mean)
707         for i in range(samples):
708             t = i / _numpy.float(samp_freq)
709             x[i] = self.gaussian(area, mean, std, t)
710         freq_axis,power = unitary_power_spectrum(x, samp_freq)
711
712         # generate the predicted curve by comparing our
713         # TestUnitaryPowerSpectrum.gaussian() form to
714         # TestUnitaryRFFT.gaussian(),
715         # we see that the Fourier transform of x(t) has parameters:
716         #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are
717         #                           converting to angular frequency,
718         #                           not frequency like we are)
719         #  area' = area/[std sqrt(2*pi)]   (plugging into FT of
720         #                                   TestUnitaryRFFT.gaussian() above)
721         #  mean' = 0               (changing the mean in the time-domain just
722         #                           changes the phase in the freq-domain)
723         # So our power spectral density per unit time is given by
724         #  P(f) = 2 |X(f)|**2 / T
725         # Where
726         #  T  = samples/samp_freq  (total time of data aquisition)
727         mean = 0.0
728         area = area / (std * _numpy.sqrt(2.0 * _numpy.pi))
729         std = 1.0 / (2.0 * _numpy.pi * std)
730         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
731         # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
732         df = _numpy.float(samp_freq) / samples
733         for i in range(len(freq_axis)):
734             f = i * df
735             gaus = self.gaussian(area, mean, std, f)
736             expected[i] = 2.0 * gaus**2 * samp_freq / samples
737         self.assertAlmostEqual(
738             expected[0], power[0], 3,
739             ('The power should be a half-gaussian, '
740              'with a peak at 0 Hz with amplitude {} ({})').format(
741                 expected[0], power[0]))
742
743         if TEST_PLOTS:
744             if _pyplot is None:
745                 raise _pyplot_import_error
746             figure = _pyplot.figure()
747             time_axes = figure.add_subplot(2, 1, 1)
748             time_axes.plot(
749                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
750                 x, 'b-')
751             time_axes.set_title('time series')
752             freq_axes = figure.add_subplot(2, 1, 2)
753             freq_axes.plot(freq_axis, power, 'r.')
754             freq_axes.plot(freq_axis, expected, 'b-')
755             freq_axes.set_title('freq series')
756
757     def test_gaussian(self):
758         "Test unitary power spectrums on various gaussian functions"
759         for area in [1, _numpy.pi]:
760             for std in [1, _numpy.sqrt(2)]:
761                 for samp_freq in [10.0, _numpy.exp(1)]:
762                     for samples in [1024, 2048]:
763                         self.run_gaussian(
764                             area=area, std=std, samp_freq=samp_freq,
765                             samples=samples)
766
767
768 class TestUnitaryAvgPowerSpectrum (_unittest.TestCase):
769     def run_sin(self, sin_freq=10, samp_freq=512, samples=1024, chunk_size=512,
770                 overlap=True, window=window_hann, places=3):
771         """TODO
772         """
773         x = _numpy.zeros((samples,), dtype=_numpy.float)
774         samp_freq = _numpy.float(samp_freq)
775         for i in range(samples):
776             x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
777         freq_axis,power = unitary_avg_power_spectrum(
778             x, samp_freq, chunk_size, overlap, window)
779         imax = _numpy.argmax(power)
780
781         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
782         # df = 1/T, where T = total_time
783         df = samp_freq / _numpy.float(chunk_size)
784         i = int(sin_freq / df)
785         # see TestUnitaryPowerSpectrum.run_unitary_power_spectrum_sin()
786         expected[i] = 0.5 / df
787
788         LOG.debug('The power should peak at {} Hz of {} ({}, {})'.format(
789                 sin_freq, expected[i], freq_axis[imax], power[imax]))
790         Pexp = P = 0
791         for i in range(len(freq_axis)):
792             Pexp += expected[i] * df
793             P += power[i] * df
794         self.assertAlmostEqual(
795             Pexp, P, places,
796             'The total power should be {} ({})'.format(Pexp, P))
797
798         if TEST_PLOTS:
799             if _pyplot is None:
800                 raise _pyplot_import_error
801             figure = _pyplot.figure()
802             time_axes = figure.add_subplot(2, 1, 1)
803             time_axes.plot(
804                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
805                 x, 'b-')
806             time_axes.set_title('time series')
807             freq_axes = figure.add_subplot(2, 1, 2)
808             freq_axes.plot(freq_axis, power, 'r.')
809             freq_axes.plot(freq_axis, expected, 'b-')
810             freq_axes.set_title(
811                 '{} samples of sin at {} Hz'.format(samples, sin_freq))
812
813     def test_sin(self):
814         "Test unitary avg power spectrums on variously shaped sin functions."
815         self.run_sin(sin_freq=5, samp_freq=512, samples=1024)
816         self.run_sin(sin_freq=5, samp_freq=512, samples=2048)
817         self.run_sin(sin_freq=5, samp_freq=512, samples=4098)
818         self.run_sin(sin_freq=17, samp_freq=512, samples=1024)
819         self.run_sin(sin_freq=5, samp_freq=1024, samples=2048)
820         # test long wavelenth sin, so be closer to window frequency
821         self.run_sin(sin_freq=1, samp_freq=1024, samples=2048, places=0)
822         # finally, with some irrational numbers, to check that I'm not
823         # getting lucky
824         self.run_sin(
825             sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)