FFT_tools: add docstring to TestRFFT.test_rfft
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 :func:`numpy.fft.rfft`.
19
20 Main entry functions:
21
22 * :func:`unitary_rfft`
23 * :func:`power_spectrum`
24 * :func:`unitary_power_spectrum`
25 * :func:`avg_power_spectrum`
26 * :func:`unitary_avg_power_spectrum`
27 """
28
29 import logging as _logging
30 import unittest as _unittest
31
32 import numpy as _numpy
33
34
35 __version__ = '0.4'
36
37
38 LOG = _logging.getLogger('FFT-tools')
39 LOG.addHandler(_logging.StreamHandler())
40 LOG.setLevel(_logging.ERROR)
41
42
43 # Display time- and freq-space plots of the test transforms if True
44 TEST_PLOTS = False
45
46
47 def floor_pow_of_two(num):
48     """Round `num` down to the closest exact a power of two.
49
50     Examples
51     --------
52
53     >>> floor_pow_of_two(3)
54     2
55     >>> floor_pow_of_two(11)
56     8
57     >>> floor_pow_of_two(15)
58     8
59     """
60     lnum = _numpy.log2(num)
61     if int(lnum) != lnum:
62         num = 2**_numpy.floor(lnum)
63     return int(num)
64
65
66 def round_pow_of_two(num):
67     """Round `num` to the closest exact a power of two on a log scale.
68
69     Examples
70     --------
71
72     >>> round_pow_of_two(2.9) # Note rounding on *log scale*
73     4
74     >>> round_pow_of_two(11)
75     8
76     >>> round_pow_of_two(15)
77     16
78     """
79     lnum = _numpy.log2(num)
80     if int(lnum) != lnum:
81         num = 2**_numpy.round(lnum)
82     return int(num)
83
84
85 def ceil_pow_of_two(num):
86     """Round `num` up to the closest exact a power of two.
87
88     Examples
89     --------
90
91     >>> ceil_pow_of_two(3)
92     4
93     >>> ceil_pow_of_two(11)
94     16
95     >>> ceil_pow_of_two(15)
96     16
97     """
98     lnum = _numpy.log2(num)
99     if int(lnum) != lnum:
100         num = 2**_numpy.ceil(lnum)
101     return int(num)
102
103
104 def unitary_rfft(data, freq=1.0):
105     """Compute the unitary Fourier transform of real data.
106
107     Unitary = preserves power [Parseval's theorem].
108
109     Parameters
110     ----------
111     data : iterable
112         Real (not complex) data taken with a sampling frequency `freq`.
113     freq : real
114         Sampling frequency.
115
116     Returns
117     -------
118     freq_axis,trans : numpy.ndarray
119         Arrays ready for plotting.
120
121     Notes
122     -----
123     If the units on your data are Volts,
124     and your sampling frequency is in Hz,
125     then `freq_axis` will be in Hz,
126     and `trans` will be in Volts.
127     """
128     nsamps = floor_pow_of_two(len(data))
129     # Which should satisfy the discrete form of Parseval's theorem
130     #   n-1               n-1
131     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
132     #   m=0               k=0
133     # However, we want our FFT to satisfy the continuous Parseval eqn
134     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
135     # which has the discrete form
136     #   n-1              n-1
137     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
138     #   m=0              k=0
139     # with X'_k = AX, this gives us
140     #   n-1                     n-1
141     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
142     #   m=0                     k=0
143     # so we see
144     #   A^2 df/dt = 1/n
145     #   A^2 = 1/n dt/df
146     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
147     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
148     # f_c in the transformed data is the Nyquist frequency (12.1.2)
149     #   f_c = 1/2dt
150     # and the points are spaced out by (12.1.5)
151     #   df = 1/ndt
152     # so
153     #   dt = 1/ndf
154     #   dt/df = 1/ndf^2
155     #   A^2 = 1/n^2df^2
156     #   A = 1/ndf = ndt/n = dt
157     # so we can convert the Numpy transformed data to match our unitary
158     # continuous transformed data with (also NR 12.1.8)
159     #   X'_k = dtX = X / <sampling freq>
160     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
161     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
162     return (freq_axis, trans)
163
164
165 def power_spectrum(data, freq=1.0):
166     """Compute the power spectrum of the time series `data`.
167
168     Parameters
169     ----------
170     data : iterable
171         Real (not complex) data taken with a sampling frequency `freq`.
172     freq : real
173         Sampling frequency.
174
175     Returns
176     -------
177     freq_axis,power : numpy.ndarray
178         Arrays ready for plotting.
179
180     Notes
181     -----
182     If the number of samples in `data` is not an integer power of two,
183     the FFT ignores some of the later points.
184
185     See Also
186     --------
187     unitary_power_spectrum,avg_power_spectrum
188     """
189     nsamps = floor_pow_of_two(len(data))
190
191     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
192     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
193     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
194     # See Numerical Recipies for a details.
195     trans = _numpy.fft.rfft(data[0:nsamps])
196     power = (trans * trans.conj()).real  # we want the square of the amplitude
197     return (freq_axis, power)
198
199
200 def unitary_power_spectrum(data, freq=1.0):
201     """Compute the unitary power spectrum of the time series `data`.
202
203     See Also
204     --------
205     power_spectrum,unitary_avg_power_spectrum
206     """
207     freq_axis,power = power_spectrum(data, freq)
208     # One sided power spectral density, so 2|H(f)|**2
209     # (see NR 2nd edition 12.0.14, p498)
210     #
211     # numpy normalizes with 1/N on the inverse transform ifft,
212     # so we should normalize the freq-space representation with 1/sqrt(N).
213     # But we're using the rfft, where N points are like N/2 complex points,
214     # so 1/sqrt(N/2)
215     # So the power gets normalized by that twice and we have 2/N
216     #
217     # On top of this, the FFT assumes a sampling freq of 1 per second,
218     # and we want to preserve area under our curves.
219     # If our total time T = len(data)/freq is smaller than 1,
220     # our df_real = freq/len(data) is bigger that the FFT expects
221     # (dt_fft = 1/len(data)),
222     # and we need to scale the powers down to conserve area.
223     # df_fft * F_fft(f) = df_real *F_real(f)
224     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
225     # So the power gets normalized by *that* twice and we have 2/N * freq**2
226
227     # power per unit time
228     # measure x(t) for time T
229     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
230     # PSD(f) = 2 |X(f)|**2 / T
231
232     # total_time = len(data)/float(freq)
233     # power *= 2.0 / float(freq)**2   /   total_time
234     # power *= 2.0 / freq**2   *   freq / len(data)
235     power *= 2.0 / (freq * _numpy.float(len(data)))
236
237     return (freq_axis, power)
238
239
240 def window_hann(length):
241     r"""Returns a Hann window array with length entries
242
243     Notes
244     -----
245     The Hann window with length :math:`L` is defined as
246
247     .. math:: w_i = \frac{1}{2} (1-\cos(2\pi i/L))
248     """
249     win = _numpy.zeros((length,), dtype=_numpy.float)
250     for i in range(length):
251         win[i] = 0.5 * (
252             1.0 - _numpy.cos(2.0 * _numpy.pi * _numpy.float(i) / (length)))
253     # avg value of cos over a period is 0
254     # so average height of Hann window is 0.5
255     return win
256
257
258 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
259                        overlap=True, window=window_hann):
260     """Compute the avgerage power spectrum of `data`.
261
262     Parameters
263     ----------
264     data : iterable
265         Real (not complex) data taken with a sampling frequency `freq`.
266     freq : real
267         Sampling frequency.
268     chunk_size : int
269         Number of samples per chunk.  Use a power of two.
270     overlap: {True,False}
271         If `True`, each chunk overlaps the previous chunk by half its
272         length.  Otherwise, the chunks are end-to-end, and not
273         overlapping.
274     window: iterable
275         Weights used to "smooth" the chunks, there is a whole science
276         behind windowing, but if you're not trying to squeeze every
277         drop of information out of your data, you'll be OK with the
278         default Hann window.
279
280     Returns
281     -------
282     freq_axis,power : numpy.ndarray
283         Arrays ready for plotting.
284
285     Notes
286     -----
287     The average power spectrum is computed by breaking `data` into
288     chunks of length `chunk_size`.  These chunks are transformed
289     individually into frequency space and then averaged together.
290
291     See Numerical Recipes 2 section 13.4 for a good introduction to
292     the theory.
293
294     If the number of samples in `data` is not a multiple of
295     `chunk_size`, we ignore the extra points.
296     """
297     if chunk_size != floor_pow_of_two(chunk_size):
298         raise ValueError(
299             'chunk_size {} should be a power of 2'.format(chunk_size))
300
301     nchunks = len(data) // chunk_size  # integer division = implicit floor
302     if overlap:
303         chunk_step = chunk_size / 2
304     else:
305         chunk_step = chunk_size
306
307     win = window(chunk_size)  # generate a window of the appropriate size
308     freq_axis = _numpy.linspace(0, freq / 2, chunk_size / 2 + 1)
309     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
310     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
311     # See Numerical Recipies for a details.
312     power = _numpy.zeros((chunk_size / 2 + 1, ), dtype=_numpy.float)
313     for i in range(nchunks):
314         starti = i * chunk_step
315         stopi = starti + chunk_size
316         fft_chunk = _numpy.fft.rfft(data[starti:stopi] * win)
317         p_chunk = (fft_chunk * fft_chunk.conj()).real
318         power += p_chunk.astype(_numpy.float)
319     power /= _numpy.float(nchunks)
320     return (freq_axis, power)
321
322
323 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
324                                overlap=True, window=window_hann):
325     """Compute the unitary average power spectrum of `data`.
326
327     See Also
328     --------
329     avg_power_spectrum,unitary_power_spectrum
330     """
331     freq_axis,power = avg_power_spectrum(
332         data, freq, chunk_size, overlap, window)
333     #   2.0 / (freq * chunk_size)       |rfft()|**2 --> unitary_power_spectrum
334     # see unitary_power_spectrum()
335     power *= 2.0 / (freq * _numpy.float(chunk_size)) * 8.0 / 3
336     #             * 8/3  to remove power from windowing
337     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
338     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
339     # So our calulated power has and extra <w(t)**2> in it.
340     # For the Hann window,
341     #   <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
342     # For low frequency components,
343     # where the frequency of x(t) is ~= the frequency of w(t),
344     # the normalization is not perfect. ??
345     # The normalization approaches perfection as chunk_size -> infinity.
346     return (freq_axis, power)
347
348
349 class TestRFFT (_unittest.TestCase):
350     r"""Ensure Numpy's FFT algorithm acts as expected.
351
352     Notes
353     -----
354     The expected return values are [#dft]_:
355
356     .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n}
357
358     .. [#dft] See the *Background information* section of :mod:`numpy.fft`.
359     """
360     def run_rfft(self, xs, Xs):
361         i = _numpy.complex(0, 1)
362         n = len(xs)
363         Xa = []
364         for k in range(n):
365             Xa.append(sum([x * _numpy.exp(-2 * _numpy.pi * i * m * k / n)
366                            for x,m in zip(xs, range(n))]))
367             if k < len(Xs):
368                 self.assertAlmostEqual(
369                     (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]), 0, 6,
370                     ('rfft mismatch on element {}: {} != {}, '
371                      'relative error {}').format(
372                         k, Xs[k], Xa[k], (Xs[k] - Xa[k]) / _numpy.abs(Xa[k])))
373         # Which should satisfy the discrete form of Parseval's theorem
374         #   n-1               n-1
375         #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
376         #   m=0               k=0
377         timeSum = sum([_numpy.abs(x)**2 for x in xs])
378         freqSum = sum([_numpy.abs(X)**2 for X in Xa])
379         self.assertAlmostEqual(
380             _numpy.abs(freqSum / _numpy.float(n) - timeSum) / timeSum, 0, 6,
381             "Mismatch on Parseval's, {} != 1/{} * {}".format(
382                 timeSum, n, freqSum))
383
384     def test_rfft(self):
385         "Test NumPy's builtin :func:`numpy.fft.rfft`"
386         xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
387         self.run_rfft(xs, _numpy.fft.rfft(xs))
388
389
390 class TestUnitaryRFFT (_unittest.TestCase):
391     """Verify `unitary_rfft`.
392     """
393     def run_parsevals(self, xs, freq, freqs, Xs):
394         """Check the discretized integral form of Parseval's theorem
395
396         Notes
397         -----
398         Which is:
399
400         .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df
401         """
402         dt = 1.0 / freq
403         df = freqs[1] - freqs[0]
404         self.assertAlmostEqual(
405             (df - 1 / (len(xs) * dt)) / df, 0, 6,
406             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
407         Xa = list(Xs)
408         for k in range(len(Xs) - 1, 1, -1):
409             Xa.append(Xa[k])
410         self.assertEqual(len(xs), len(Xa))
411         lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
412         rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
413         self.assertAlmostEqual(
414             _numpy.abs(lhs - rhs) / lhs, 0, 3,
415             "Mismatch on Parseval's, {} != {}".format(lhs, rhs))
416
417     def test_parsevals(self):
418         "Test unitary rfft on Parseval's theorem"
419         xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
420         dt = _numpy.pi
421         freqs,Xs = unitary_rfft(xs, 1.0 / dt)
422         self.run_parsevals(xs, 1.0 / dt, freqs, Xs)
423
424     def rect(self, t):
425         r"""Rectangle function.
426
427         Notes
428         -----
429
430         .. math::
431
432             \rect(t) = \begin{cases}
433                1& \text{if $|t| < 0.5$}, \\
434                0& \text{if $|t| \ge 0.5$}.
435                        \end{cases}
436         """
437         if _numpy.abs(t) < 0.5:
438             return 1
439         else:
440             return 0
441
442     def run_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
443         r"""Test `unitary_rttf` on known function `rect(at)`.
444
445         Notes
446         -----
447         Analytic result:
448
449         .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a)
450         """
451         samp_freq = _numpy.float(samp_freq)
452         a = _numpy.float(a)
453
454         x = _numpy.zeros((samples,), dtype=_numpy.float)
455         dt = 1.0 / samp_freq
456         for i in range(samples):
457             t = i * dt
458             x[i] = self.rect(a * (t - time_shift))
459         freq_axis,X = unitary_rfft(x, samp_freq)
460
461         # remove the phase due to our time shift
462         j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
463         for i in range(len(freq_axis)):
464             f = freq_axis[i]
465             inverse_phase_shift = _numpy.exp(
466                 j * 2.0 * _numpy.pi * time_shift * f)
467             X[i] *= inverse_phase_shift
468
469         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
470         # normalized sinc(x) = sin(pi x)/(pi x)
471         # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
472         self.assertEqual(_numpy.sinc(0.5), 2.0 / _numpy.pi)
473         for i in range(len(freq_axis)):
474             f = freq_axis[i]
475             expected[i] = 1.0 / _numpy.abs(a) * _numpy.sinc(f / a)
476
477         if TEST_PLOTS:
478             figure = _pyplot.figure()
479             time_axes = figure.add_subplot(2, 1, 1)
480             time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
481             time_axes.set_title('time series')
482             freq_axes = figure.add_subplot(2, 1, 2)
483             freq_axes.plot(freq_axis, X.real, 'r.')
484             freq_axes.plot(freq_axis, X.imag, 'g.')
485             freq_axes.plot(freq_axis, expected, 'b-')
486             freq_axes.set_title('freq series')
487
488     def test_rect(self):
489         "Test unitary FFTs on variously shaped rectangular functions."
490         self.run_rect(a=0.5)
491         self.run_rect(a=2.0)
492         self.run_rect(a=0.7, samp_freq=50, samples=512)
493         self.run_rect(a=3.0, samp_freq=60, samples=1024)
494
495     def gaussian(self, a, t):
496         r"""Gaussian function.
497
498         Notes
499         -----
500
501         .. math:: \gaussian(a,t) = \exp^{-at^2}
502         """
503         return _numpy.exp(-a * t**2)
504
505     def run_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
506         r"""Test `unitary_rttf` on known function `gaussian(a,t)`.
507
508         Notes
509         -----
510         Analytic result:
511
512         .. math::
513
514             \rfft(\gaussian(a,t)) = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f)
515         """
516         samp_freq = _numpy.float(samp_freq)
517         a = _numpy.float(a)
518
519         x = _numpy.zeros((samples,), dtype=_numpy.float)
520         dt = 1.0 / samp_freq
521         for i in range(samples):
522             t = i * dt
523             x[i] = self.gaussian(a, (t - time_shift))
524         freq_axis,X = unitary_rfft(x, samp_freq)
525
526         # remove the phase due to our time shift
527         j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
528         for i in range(len(freq_axis)):
529             f = freq_axis[i]
530             inverse_phase_shift = _numpy.exp(
531                 j * 2.0 * _numpy.pi * time_shift * f)
532             X[i] *= inverse_phase_shift
533
534         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
535         for i in range(len(freq_axis)):
536             f = freq_axis[i]
537             # see Wikipedia, or do the integral yourself.
538             expected[i] = _numpy.sqrt(_numpy.pi / a) * self.gaussian(
539                 1.0 / a, _numpy.pi * f)
540
541         if TEST_PLOTS:
542             figure = _pyplot.figure()
543             time_axes = figure.add_subplot(2, 1, 1)
544             time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
545             time_axes.set_title('time series')
546             freq_axes = figure.add_subplot(2, 1, 2)
547             freq_axes.plot(freq_axis, X.real, 'r.')
548             freq_axes.plot(freq_axis, X.imag, 'g.')
549             freq_axes.plot(freq_axis, expected, 'b-')
550             freq_axes.set_title('freq series')
551
552     def test_gaussian(self):
553         "Test unitary FFTs on variously shaped gaussian functions."
554         self.run_gaussian(a=0.5)
555         self.run_gaussian(a=2.0)
556         self.run_gaussian(a=0.7, samp_freq=50, samples=512)
557         self.run_gaussian(a=3.0, samp_freq=60, samples=1024)
558
559
560 class TestUnitaryPowerSpectrum (_unittest.TestCase):
561     def run_sin(self, sin_freq=10, samp_freq=512, samples=1024):
562         x = _numpy.zeros((samples,), dtype=_numpy.float)
563         samp_freq = _numpy.float(samp_freq)
564         for i in range(samples):
565             x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
566         freq_axis,power = unitary_power_spectrum(x, samp_freq)
567         imax = _numpy.argmax(power)
568
569         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
570         # df = 1/T, where T = total_time
571         df = samp_freq / _numpy.float(samples)
572         i = int(sin_freq / df)
573         # average power per unit time is
574         #  P = <x(t)**2>
575         # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
576         # so average value of (int sin(t)**2 dt) per unit time is 0.5
577         #  P = 0.5
578         # we spread that power over a frequency bin of width df, sp
579         #  P(f0) = 0.5/df
580         # where f0 is the sin's frequency
581         #
582         # or:
583         # FFT of sin(2*pi*t*f0) gives
584         #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
585         # (area under x(t) = 0, area under X(f) = 0)
586         # so one sided power spectral density (PSD) per unit time is
587         #  P(f) = 2 |X(f)|**2 / T
588         #       = 2 * |0.5 delta(f-f0)|**2 / T
589         #       = 0.5 * |delta(f-f0)|**2 / T
590         # but we're discrete and want the integral of the 'delta' to be 1,
591         # so 'delta'*df = 1  --> 'delta' = 1/df, and
592         #  P(f) = 0.5 / (df**2 * T)
593         #       = 0.5 / df                (T = 1/df)
594         expected[i] = 0.5 / df
595
596         LOG.debug('The power should be a peak at {} Hz of {} ({}, {})'.format(
597                 sin_freq, expected[i], freq_axis[imax], power[imax]))
598         Pexp = P = 0
599         for i in range(len(freq_axis)):
600             Pexp += expected[i] * df
601             P += power[i] * df
602         self.assertAlmostEqual(
603             _numpy.abs((P - Pexp) / Pexp), 0, 1,
604             'The total power should be {} ({})'.format(Pexp, P))
605
606         if TEST_PLOTS:
607             figure = _pyplot.figure()
608             time_axes = figure.add_subplot(2, 1, 1)
609             time_axes.plot(
610                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
611             time_axes.set_title('time series')
612             freq_axes = figure.add_subplot(2, 1, 2)
613             freq_axes.plot(freq_axis, power, 'r.')
614             freq_axes.plot(freq_axis, expected, 'b-')
615             freq_axes.set_title(
616                 '{} samples of sin at {} Hz'.format(samples, sin_freq))
617
618     def test_sin(self):
619         "Test unitary power spectrums on variously shaped sin functions"
620         self.run_sin(sin_freq=5, samp_freq=512, samples=1024)
621         self.run_sin(sin_freq=5, samp_freq=512, samples=2048)
622         self.run_sin(sin_freq=5, samp_freq=512, samples=4098)
623         self.run_sin(sin_freq=7, samp_freq=512, samples=1024)
624         self.run_sin(sin_freq=5, samp_freq=1024, samples=2048)
625         # finally, with some irrational numbers, to check that I'm not
626         # getting lucky
627         self.run_sin(
628             sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)
629         # test with non-integer number of periods
630         self.run_sin(sin_freq=5, samp_freq=512, samples=256)
631
632     def run_delta(self, amp=1, samp_freq=1, samples=256):
633         """TODO
634         """
635         x = _numpy.zeros((samples,), dtype=_numpy.float)
636         samp_freq = _numpy.float(samp_freq)
637         x[0] = amp
638         freq_axis,power = unitary_power_spectrum(x, samp_freq)
639
640         # power = <x(t)**2> = (amp)**2 * dt/T
641         # we spread that power over the entire freq_axis [0,fN], so
642         #  P(f)  = (amp)**2 dt / (T fN)
643         # where
644         #  dt = 1/samp_freq        (sample period)
645         #  T  = samples/samp_freq  (total time of data aquisition)
646         #  fN = 0.5 samp_freq      (Nyquist frequency)
647         # so
648         #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
649         #       = 2 amp**2 / (samp_freq*samples)
650         expected_amp = 2.0 * amp**2 / (samp_freq * samples)
651         expected = _numpy.ones(
652             (len(freq_axis),), dtype=_numpy.float) * expected_amp
653
654         self.assertAlmostEqual(
655             expected_amp, power[0], 4,
656             'The power should be flat at y = {} ({})'.format(
657                 expected_amp, power[0]))
658
659         if TEST_PLOTS:
660             figure = _pyplot.figure()
661             time_axes = figure.add_subplot(2, 1, 1)
662             time_axes.plot(
663                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
664             time_axes.set_title('time series')
665             freq_axes = figure.add_subplot(2, 1, 2)
666             freq_axes.plot(freq_axis, power, 'r.')
667             freq_axes.plot(freq_axis, expected, 'b-')
668             freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
669
670     def test_delta(self):
671         "Test unitary power spectrums on various delta functions"
672         self.run_delta(amp=1, samp_freq=1.0, samples=1024)
673         self.run_delta(amp=1, samp_freq=1.0, samples=2048)
674         # expected = 2*computed
675         self.run_delta(amp=1, samp_freq=0.5, samples=2048)
676         # expected = 0.5*computed
677         self.run_delta(amp=1, samp_freq=2.0, samples=2048)
678         self.run_delta(amp=3, samp_freq=1.0, samples=1024)
679         self.run_delta(amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
680
681     def gaussian(self, area, mean, std, t):
682         "Integral over all time = area (i.e. normalized for area=1)"
683         return area / (std * _numpy.sqrt(2.0 * _numpy.pi)) * _numpy.exp(
684             -0.5 * ((t-mean)/std)**2)
685
686     def run_gaussian(self, area=2.5, mean=5, std=1, samp_freq=10.24,
687                      samples=512):
688         """TODO.
689         """
690         x = _numpy.zeros((samples,), dtype=_numpy.float)
691         mean = _numpy.float(mean)
692         for i in range(samples):
693             t = i / _numpy.float(samp_freq)
694             x[i] = self.gaussian(area, mean, std, t)
695         freq_axis,power = unitary_power_spectrum(x, samp_freq)
696
697         # generate the predicted curve by comparing our
698         # TestUnitaryPowerSpectrum.gaussian() form to
699         # TestUnitaryRFFT.gaussian(),
700         # we see that the Fourier transform of x(t) has parameters:
701         #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are
702         #                           converting to angular frequency,
703         #                           not frequency like we are)
704         #  area' = area/[std sqrt(2*pi)]   (plugging into FT of
705         #                                   TestUnitaryRFFT.gaussian() above)
706         #  mean' = 0               (changing the mean in the time-domain just
707         #                           changes the phase in the freq-domain)
708         # So our power spectral density per unit time is given by
709         #  P(f) = 2 |X(f)|**2 / T
710         # Where
711         #  T  = samples/samp_freq  (total time of data aquisition)
712         mean = 0.0
713         area = area / (std * _numpy.sqrt(2.0 * _numpy.pi))
714         std = 1.0 / (2.0 * _numpy.pi * std)
715         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
716         # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
717         df = _numpy.float(samp_freq) / samples
718         for i in range(len(freq_axis)):
719             f = i * df
720             gaus = self.gaussian(area, mean, std, f)
721             expected[i] = 2.0 * gaus**2 * samp_freq / samples
722         self.assertAlmostEqual(
723             expected[0], power[0], 3,
724             ('The power should be a half-gaussian, '
725              'with a peak at 0 Hz with amplitude {} ({})').format(
726                 expected[0], power[0]))
727
728         if TEST_PLOTS:
729             figure = _pyplot.figure()
730             time_axes = figure.add_subplot(2, 1, 1)
731             time_axes.plot(
732                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
733                 x, 'b-')
734             time_axes.set_title('time series')
735             freq_axes = figure.add_subplot(2, 1, 2)
736             freq_axes.plot(freq_axis, power, 'r.')
737             freq_axes.plot(freq_axis, expected, 'b-')
738             freq_axes.set_title('freq series')
739
740     def test_gaussian(self):
741         "Test unitary power spectrums on various gaussian functions"
742         for area in [1, _numpy.pi]:
743             for std in [1, _numpy.sqrt(2)]:
744                 for samp_freq in [10.0, _numpy.exp(1)]:
745                     for samples in [1024, 2048]:
746                         self.run_gaussian(
747                             area=area, std=std, samp_freq=samp_freq,
748                             samples=samples)
749
750
751 class TestUnitaryAvgPowerSpectrum (_unittest.TestCase):
752     def run_sin(self, sin_freq=10, samp_freq=512, samples=1024, chunk_size=512,
753                 overlap=True, window=window_hann, places=3):
754         """TODO
755         """
756         x = _numpy.zeros((samples,), dtype=_numpy.float)
757         samp_freq = _numpy.float(samp_freq)
758         for i in range(samples):
759             x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
760         freq_axis,power = unitary_avg_power_spectrum(
761             x, samp_freq, chunk_size, overlap, window)
762         imax = _numpy.argmax(power)
763
764         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
765         # df = 1/T, where T = total_time
766         df = samp_freq / _numpy.float(chunk_size)
767         i = int(sin_freq / df)
768         # see TestUnitaryPowerSpectrum.run_unitary_power_spectrum_sin()
769         expected[i] = 0.5 / df
770
771         LOG.debug('The power should peak at {} Hz of {} ({}, {})'.format(
772                 sin_freq, expected[i], freq_axis[imax], power[imax]))
773         Pexp = P = 0
774         for i in range(len(freq_axis)):
775             Pexp += expected[i] * df
776             P += power[i] * df
777         self.assertAlmostEqual(
778             Pexp, P, places,
779             'The total power should be {} ({})'.format(Pexp, P))
780
781         if TEST_PLOTS:
782             figure = _pyplot.figure()
783             time_axes = figure.add_subplot(2, 1, 1)
784             time_axes.plot(
785                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
786                 x, 'b-')
787             time_axes.set_title('time series')
788             freq_axes = figure.add_subplot(2, 1, 2)
789             freq_axes.plot(freq_axis, power, 'r.')
790             freq_axes.plot(freq_axis, expected, 'b-')
791             freq_axes.set_title(
792                 '{} samples of sin at {} Hz'.format(samples, sin_freq))
793
794     def test_sin(self):
795         "Test unitary avg power spectrums on variously shaped sin functions."
796         self.run_sin(sin_freq=5, samp_freq=512, samples=1024)
797         self.run_sin(sin_freq=5, samp_freq=512, samples=2048)
798         self.run_sin(sin_freq=5, samp_freq=512, samples=4098)
799         self.run_sin(sin_freq=17, samp_freq=512, samples=1024)
800         self.run_sin(sin_freq=5, samp_freq=1024, samples=2048)
801         # test long wavelenth sin, so be closer to window frequency
802         self.run_sin(sin_freq=1, samp_freq=1024, samples=2048, places=0)
803         # finally, with some irrational numbers, to check that I'm not
804         # getting lucky
805         self.run_sin(
806             sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)