FFT_tools: add LOG and rework tests to use self.assert...().
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 :func:`numpy.fft.rfft`.
19
20 Main entry functions:
21
22 * :func:`unitary_rfft`
23 * :func:`power_spectrum`
24 * :func:`unitary_power_spectrum`
25 * :func:`avg_power_spectrum`
26 * :func:`unitary_avg_power_spectrum`
27 """
28
29 import logging as _logging
30 import unittest as _unittest
31
32 import numpy as _numpy
33
34
35 __version__ = '0.4'
36
37
38 LOG = _logging.getLogger('FFT-tools')
39 LOG.addHandler(_logging.StreamHandler())
40 LOG.setLevel(_logging.ERROR)
41
42
43 # Display time- and freq-space plots of the test transforms if True
44 TEST_PLOTS = False
45
46
47 def floor_pow_of_two(num):
48     """Round `num` down to the closest exact a power of two.
49
50     Examples
51     --------
52
53     >>> floor_pow_of_two(3)
54     2
55     >>> floor_pow_of_two(11)
56     8
57     >>> floor_pow_of_two(15)
58     8
59     """
60     lnum = _numpy.log2(num)
61     if int(lnum) != lnum:
62         num = 2**_numpy.floor(lnum)
63     return int(num)
64
65
66 def round_pow_of_two(num):
67     """Round `num` to the closest exact a power of two on a log scale.
68
69     Examples
70     --------
71
72     >>> round_pow_of_two(2.9) # Note rounding on *log scale*
73     4
74     >>> round_pow_of_two(11)
75     8
76     >>> round_pow_of_two(15)
77     16
78     """
79     lnum = _numpy.log2(num)
80     if int(lnum) != lnum:
81         num = 2**_numpy.round(lnum)
82     return int(num)
83
84
85 def ceil_pow_of_two(num):
86     """Round `num` up to the closest exact a power of two.
87
88     Examples
89     --------
90
91     >>> ceil_pow_of_two(3)
92     4
93     >>> ceil_pow_of_two(11)
94     16
95     >>> ceil_pow_of_two(15)
96     16
97     """
98     lnum = _numpy.log2(num)
99     if int(lnum) != lnum:
100         num = 2**_numpy.ceil(lnum)
101     return int(num)
102
103
104 def unitary_rfft(data, freq=1.0):
105     """Compute the unitary Fourier transform of real data.
106
107     Unitary = preserves power [Parseval's theorem].
108
109     Parameters
110     ----------
111     data : iterable
112         Real (not complex) data taken with a sampling frequency `freq`.
113     freq : real
114         Sampling frequency.
115
116     Returns
117     -------
118     freq_axis,trans : numpy.ndarray
119         Arrays ready for plotting.
120
121     Notes
122     -----
123     If the units on your data are Volts,
124     and your sampling frequency is in Hz,
125     then `freq_axis` will be in Hz,
126     and `trans` will be in Volts.
127     """
128     nsamps = floor_pow_of_two(len(data))
129     # Which should satisfy the discrete form of Parseval's theorem
130     #   n-1               n-1
131     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
132     #   m=0               k=0
133     # However, we want our FFT to satisfy the continuous Parseval eqn
134     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
135     # which has the discrete form
136     #   n-1              n-1
137     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
138     #   m=0              k=0
139     # with X'_k = AX, this gives us
140     #   n-1                     n-1
141     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
142     #   m=0                     k=0
143     # so we see
144     #   A^2 df/dt = 1/n
145     #   A^2 = 1/n dt/df
146     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
147     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
148     # f_c in the transformed data is the Nyquist frequency (12.1.2)
149     #   f_c = 1/2dt
150     # and the points are spaced out by (12.1.5)
151     #   df = 1/ndt
152     # so
153     #   dt = 1/ndf
154     #   dt/df = 1/ndf^2
155     #   A^2 = 1/n^2df^2
156     #   A = 1/ndf = ndt/n = dt
157     # so we can convert the Numpy transformed data to match our unitary
158     # continuous transformed data with (also NR 12.1.8)
159     #   X'_k = dtX = X / <sampling freq>
160     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
161     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
162     return (freq_axis, trans)
163
164
165 def power_spectrum(data, freq=1.0):
166     """Compute the power spectrum of the time series `data`.
167
168     Parameters
169     ----------
170     data : iterable
171         Real (not complex) data taken with a sampling frequency `freq`.
172     freq : real
173         Sampling frequency.
174
175     Returns
176     -------
177     freq_axis,power : numpy.ndarray
178         Arrays ready for plotting.
179
180     Notes
181     -----
182     If the number of samples in `data` is not an integer power of two,
183     the FFT ignores some of the later points.
184
185     See Also
186     --------
187     unitary_power_spectrum,avg_power_spectrum
188     """
189     nsamps = floor_pow_of_two(len(data))
190
191     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
192     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
193     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
194     # See Numerical Recipies for a details.
195     trans = _numpy.fft.rfft(data[0:nsamps])
196     power = (trans * trans.conj()).real  # we want the square of the amplitude
197     return (freq_axis, power)
198
199
200 def unitary_power_spectrum(data, freq=1.0):
201     """Compute the unitary power spectrum of the time series `data`.
202
203     See Also
204     --------
205     power_spectrum,unitary_avg_power_spectrum
206     """
207     freq_axis,power = power_spectrum(data, freq)
208     # One sided power spectral density, so 2|H(f)|**2
209     # (see NR 2nd edition 12.0.14, p498)
210     #
211     # numpy normalizes with 1/N on the inverse transform ifft,
212     # so we should normalize the freq-space representation with 1/sqrt(N).
213     # But we're using the rfft, where N points are like N/2 complex points,
214     # so 1/sqrt(N/2)
215     # So the power gets normalized by that twice and we have 2/N
216     #
217     # On top of this, the FFT assumes a sampling freq of 1 per second,
218     # and we want to preserve area under our curves.
219     # If our total time T = len(data)/freq is smaller than 1,
220     # our df_real = freq/len(data) is bigger that the FFT expects
221     # (dt_fft = 1/len(data)),
222     # and we need to scale the powers down to conserve area.
223     # df_fft * F_fft(f) = df_real *F_real(f)
224     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
225     # So the power gets normalized by *that* twice and we have 2/N * freq**2
226
227     # power per unit time
228     # measure x(t) for time T
229     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
230     # PSD(f) = 2 |X(f)|**2 / T
231
232     # total_time = len(data)/float(freq)
233     # power *= 2.0 / float(freq)**2   /   total_time
234     # power *= 2.0 / freq**2   *   freq / len(data)
235     power *= 2.0 / (freq * _numpy.float(len(data)))
236
237     return (freq_axis, power)
238
239
240 def window_hann(length):
241     r"""Returns a Hann window array with length entries
242
243     Notes
244     -----
245     The Hann window with length :math:`L` is defined as
246
247     .. math:: w_i = \frac{1}{2} (1-\cos(2\pi i/L))
248     """
249     win = _numpy.zeros((length,), dtype=_numpy.float)
250     for i in range(length):
251         win[i] = 0.5 * (
252             1.0 - _numpy.cos(2.0 * _numpy.pi * _numpy.float(i) / (length)))
253     # avg value of cos over a period is 0
254     # so average height of Hann window is 0.5
255     return win
256
257
258 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
259                        overlap=True, window=window_hann):
260     """Compute the avgerage power spectrum of `data`.
261
262     Parameters
263     ----------
264     data : iterable
265         Real (not complex) data taken with a sampling frequency `freq`.
266     freq : real
267         Sampling frequency.
268     chunk_size : int
269         Number of samples per chunk.  Use a power of two.
270     overlap: {True,False}
271         If `True`, each chunk overlaps the previous chunk by half its
272         length.  Otherwise, the chunks are end-to-end, and not
273         overlapping.
274     window: iterable
275         Weights used to "smooth" the chunks, there is a whole science
276         behind windowing, but if you're not trying to squeeze every
277         drop of information out of your data, you'll be OK with the
278         default Hann window.
279
280     Returns
281     -------
282     freq_axis,power : numpy.ndarray
283         Arrays ready for plotting.
284
285     Notes
286     -----
287     The average power spectrum is computed by breaking `data` into
288     chunks of length `chunk_size`.  These chunks are transformed
289     individually into frequency space and then averaged together.
290
291     See Numerical Recipes 2 section 13.4 for a good introduction to
292     the theory.
293
294     If the number of samples in `data` is not a multiple of
295     `chunk_size`, we ignore the extra points.
296     """
297     if chunk_size != floor_pow_of_two(chunk_size):
298         raise ValueError(
299             'chunk_size {} should be a power of 2'.format(chunk_size))
300
301     nchunks = len(data) // chunk_size  # integer division = implicit floor
302     if overlap:
303         chunk_step = chunk_size / 2
304     else:
305         chunk_step = chunk_size
306
307     win = window(chunk_size)  # generate a window of the appropriate size
308     freq_axis = _numpy.linspace(0, freq / 2, chunk_size / 2 + 1)
309     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
310     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
311     # See Numerical Recipies for a details.
312     power = _numpy.zeros((chunk_size / 2 + 1, ), dtype=_numpy.float)
313     for i in range(nchunks):
314         starti = i * chunk_step
315         stopi = starti + chunk_size
316         fft_chunk = _numpy.fft.rfft(data[starti:stopi] * win)
317         p_chunk = (fft_chunk * fft_chunk.conj()).real
318         power += p_chunk.astype(_numpy.float)
319     power /= _numpy.float(nchunks)
320     return (freq_axis, power)
321
322
323 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
324                                overlap=True, window=window_hann):
325     """Compute the unitary average power spectrum of `data`.
326
327     See Also
328     --------
329     avg_power_spectrum,unitary_power_spectrum
330     """
331     freq_axis,power = avg_power_spectrum(
332         data, freq, chunk_size, overlap, window)
333     #   2.0 / (freq * chunk_size)       |rfft()|**2 --> unitary_power_spectrum
334     # see unitary_power_spectrum()
335     power *= 2.0 / (freq * _numpy.float(chunk_size)) * 8.0 / 3
336     #             * 8/3  to remove power from windowing
337     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
338     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
339     # So our calulated power has and extra <w(t)**2> in it.
340     # For the Hann window,
341     #   <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
342     # For low frequency components,
343     # where the frequency of x(t) is ~= the frequency of w(t),
344     # the normalization is not perfect. ??
345     # The normalization approaches perfection as chunk_size -> infinity.
346     return (freq_axis, power)
347
348
349 class TestRFFT (_unittest.TestCase):
350     r"""Ensure Numpy's FFT algorithm acts as expected.
351
352     Notes
353     -----
354     The expected return values are [#dft]_:
355
356     .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n}
357
358     .. [#dft] See the *Background information* section of :mod:`numpy.fft`.
359     """
360     def run_rfft(self, xs, Xs):
361         i = _numpy.complex(0, 1)
362         n = len(xs)
363         Xa = []
364         for k in range(n):
365             Xa.append(sum([x * _numpy.exp(-2 * _numpy.pi * i * m * k / n)
366                            for x,m in zip(xs, range(n))]))
367             if k < len(Xs):
368                 self.assertAlmostEqual(
369                     (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]), 0, 6,
370                     ('rfft mismatch on element {}: {} != {}, '
371                      'relative error {}').format(
372                         k, Xs[k], Xa[k], (Xs[k] - Xa[k]) / _numpy.abs(Xa[k])))
373         # Which should satisfy the discrete form of Parseval's theorem
374         #   n-1               n-1
375         #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
376         #   m=0               k=0
377         timeSum = sum([_numpy.abs(x)**2 for x in xs])
378         freqSum = sum([_numpy.abs(X)**2 for X in Xa])
379         self.assertAlmostEqual(
380             _numpy.abs(freqSum / _numpy.float(n) - timeSum) / timeSum, 0, 6,
381             "Mismatch on Parseval's, {} != 1/{} * {}".format(
382                 timeSum, n, freqSum))
383
384     def test_rfft(self):
385         xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
386         self.run_rfft(xs, _numpy.fft.rfft(xs))
387
388
389 class TestUnitaryRFFT (_unittest.TestCase):
390     """Verify `unitary_rfft`.
391     """
392     def run_parsevals(self, xs, freq, freqs, Xs):
393         """Check the discretized integral form of Parseval's theorem
394
395         Notes
396         -----
397         Which is:
398
399         .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df
400         """
401         dt = 1.0 / freq
402         df = freqs[1] - freqs[0]
403         self.assertAlmostEqual(
404             (df - 1 / (len(xs) * dt)) / df, 0, 6,
405             'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
406         Xa = list(Xs)
407         for k in range(len(Xs) - 1, 1, -1):
408             Xa.append(Xa[k])
409         self.assertEqual(len(xs), len(Xa))
410         lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
411         rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
412         self.assertAlmostEqual(
413             _numpy.abs(lhs - rhs) / lhs, 0, 3,
414             "Mismatch on Parseval's, {} != {}".format(lhs, rhs))
415
416     def test_parsevals(self):
417         "Test unitary rfft on Parseval's theorem"
418         xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
419         dt = _numpy.pi
420         freqs,Xs = unitary_rfft(xs, 1.0 / dt)
421         self.run_parsevals(xs, 1.0 / dt, freqs, Xs)
422
423     def rect(self, t):
424         r"""Rectangle function.
425
426         Notes
427         -----
428
429         .. math::
430
431             \rect(t) = \begin{cases}
432                1& \text{if $|t| < 0.5$}, \\
433                0& \text{if $|t| \ge 0.5$}.
434                        \end{cases}
435         """
436         if _numpy.abs(t) < 0.5:
437             return 1
438         else:
439             return 0
440
441     def run_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
442         r"""Test `unitary_rttf` on known function `rect(at)`.
443
444         Notes
445         -----
446         Analytic result:
447
448         .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a)
449         """
450         samp_freq = _numpy.float(samp_freq)
451         a = _numpy.float(a)
452
453         x = _numpy.zeros((samples,), dtype=_numpy.float)
454         dt = 1.0 / samp_freq
455         for i in range(samples):
456             t = i * dt
457             x[i] = self.rect(a * (t - time_shift))
458         freq_axis,X = unitary_rfft(x, samp_freq)
459
460         # remove the phase due to our time shift
461         j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
462         for i in range(len(freq_axis)):
463             f = freq_axis[i]
464             inverse_phase_shift = _numpy.exp(
465                 j * 2.0 * _numpy.pi * time_shift * f)
466             X[i] *= inverse_phase_shift
467
468         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
469         # normalized sinc(x) = sin(pi x)/(pi x)
470         # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
471         self.assertEqual(_numpy.sinc(0.5), 2.0 / _numpy.pi)
472         for i in range(len(freq_axis)):
473             f = freq_axis[i]
474             expected[i] = 1.0 / _numpy.abs(a) * _numpy.sinc(f / a)
475
476         if TEST_PLOTS:
477             figure = _pyplot.figure()
478             time_axes = figure.add_subplot(2, 1, 1)
479             time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
480             time_axes.set_title('time series')
481             freq_axes = figure.add_subplot(2, 1, 2)
482             freq_axes.plot(freq_axis, X.real, 'r.')
483             freq_axes.plot(freq_axis, X.imag, 'g.')
484             freq_axes.plot(freq_axis, expected, 'b-')
485             freq_axes.set_title('freq series')
486
487     def test_rect(self):
488         "Test unitary FFTs on variously shaped rectangular functions."
489         self.run_rect(a=0.5)
490         self.run_rect(a=2.0)
491         self.run_rect(a=0.7, samp_freq=50, samples=512)
492         self.run_rect(a=3.0, samp_freq=60, samples=1024)
493
494     def gaussian(self, a, t):
495         r"""Gaussian function.
496
497         Notes
498         -----
499
500         .. math:: \gaussian(a,t) = \exp^{-at^2}
501         """
502         return _numpy.exp(-a * t**2)
503
504     def run_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
505         r"""Test `unitary_rttf` on known function `gaussian(a,t)`.
506
507         Notes
508         -----
509         Analytic result:
510
511         .. math::
512
513             \rfft(\gaussian(a,t)) = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f)
514         """
515         samp_freq = _numpy.float(samp_freq)
516         a = _numpy.float(a)
517
518         x = _numpy.zeros((samples,), dtype=_numpy.float)
519         dt = 1.0 / samp_freq
520         for i in range(samples):
521             t = i * dt
522             x[i] = self.gaussian(a, (t - time_shift))
523         freq_axis,X = unitary_rfft(x, samp_freq)
524
525         # remove the phase due to our time shift
526         j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
527         for i in range(len(freq_axis)):
528             f = freq_axis[i]
529             inverse_phase_shift = _numpy.exp(
530                 j * 2.0 * _numpy.pi * time_shift * f)
531             X[i] *= inverse_phase_shift
532
533         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
534         for i in range(len(freq_axis)):
535             f = freq_axis[i]
536             # see Wikipedia, or do the integral yourself.
537             expected[i] = _numpy.sqrt(_numpy.pi / a) * self.gaussian(
538                 1.0 / a, _numpy.pi * f)
539
540         if TEST_PLOTS:
541             figure = _pyplot.figure()
542             time_axes = figure.add_subplot(2, 1, 1)
543             time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
544             time_axes.set_title('time series')
545             freq_axes = figure.add_subplot(2, 1, 2)
546             freq_axes.plot(freq_axis, X.real, 'r.')
547             freq_axes.plot(freq_axis, X.imag, 'g.')
548             freq_axes.plot(freq_axis, expected, 'b-')
549             freq_axes.set_title('freq series')
550
551     def test_gaussian(self):
552         "Test unitary FFTs on variously shaped gaussian functions."
553         self.run_gaussian(a=0.5)
554         self.run_gaussian(a=2.0)
555         self.run_gaussian(a=0.7, samp_freq=50, samples=512)
556         self.run_gaussian(a=3.0, samp_freq=60, samples=1024)
557
558
559 class TestUnitaryPowerSpectrum (_unittest.TestCase):
560     def run_sin(self, sin_freq=10, samp_freq=512, samples=1024):
561         x = _numpy.zeros((samples,), dtype=_numpy.float)
562         samp_freq = _numpy.float(samp_freq)
563         for i in range(samples):
564             x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
565         freq_axis,power = unitary_power_spectrum(x, samp_freq)
566         imax = _numpy.argmax(power)
567
568         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
569         # df = 1/T, where T = total_time
570         df = samp_freq / _numpy.float(samples)
571         i = int(sin_freq / df)
572         # average power per unit time is
573         #  P = <x(t)**2>
574         # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
575         # so average value of (int sin(t)**2 dt) per unit time is 0.5
576         #  P = 0.5
577         # we spread that power over a frequency bin of width df, sp
578         #  P(f0) = 0.5/df
579         # where f0 is the sin's frequency
580         #
581         # or:
582         # FFT of sin(2*pi*t*f0) gives
583         #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
584         # (area under x(t) = 0, area under X(f) = 0)
585         # so one sided power spectral density (PSD) per unit time is
586         #  P(f) = 2 |X(f)|**2 / T
587         #       = 2 * |0.5 delta(f-f0)|**2 / T
588         #       = 0.5 * |delta(f-f0)|**2 / T
589         # but we're discrete and want the integral of the 'delta' to be 1,
590         # so 'delta'*df = 1  --> 'delta' = 1/df, and
591         #  P(f) = 0.5 / (df**2 * T)
592         #       = 0.5 / df                (T = 1/df)
593         expected[i] = 0.5 / df
594
595         LOG.debug('The power should be a peak at {} Hz of {} ({}, {})'.format(
596                 sin_freq, expected[i], freq_axis[imax], power[imax]))
597         Pexp = P = 0
598         for i in range(len(freq_axis)):
599             Pexp += expected[i] * df
600             P += power[i] * df
601         self.assertAlmostEqual(
602             _numpy.abs((P - Pexp) / Pexp), 0, 1,
603             'The total power should be {} ({})'.format(Pexp, P))
604
605         if TEST_PLOTS:
606             figure = _pyplot.figure()
607             time_axes = figure.add_subplot(2, 1, 1)
608             time_axes.plot(
609                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
610             time_axes.set_title('time series')
611             freq_axes = figure.add_subplot(2, 1, 2)
612             freq_axes.plot(freq_axis, power, 'r.')
613             freq_axes.plot(freq_axis, expected, 'b-')
614             freq_axes.set_title(
615                 '{} samples of sin at {} Hz'.format(samples, sin_freq))
616
617     def test_sin(self):
618         "Test unitary power spectrums on variously shaped sin functions"
619         self.run_sin(sin_freq=5, samp_freq=512, samples=1024)
620         self.run_sin(sin_freq=5, samp_freq=512, samples=2048)
621         self.run_sin(sin_freq=5, samp_freq=512, samples=4098)
622         self.run_sin(sin_freq=7, samp_freq=512, samples=1024)
623         self.run_sin(sin_freq=5, samp_freq=1024, samples=2048)
624         # finally, with some irrational numbers, to check that I'm not
625         # getting lucky
626         self.run_sin(
627             sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)
628         # test with non-integer number of periods
629         self.run_sin(sin_freq=5, samp_freq=512, samples=256)
630
631     def run_delta(self, amp=1, samp_freq=1, samples=256):
632         """TODO
633         """
634         x = _numpy.zeros((samples,), dtype=_numpy.float)
635         samp_freq = _numpy.float(samp_freq)
636         x[0] = amp
637         freq_axis,power = unitary_power_spectrum(x, samp_freq)
638
639         # power = <x(t)**2> = (amp)**2 * dt/T
640         # we spread that power over the entire freq_axis [0,fN], so
641         #  P(f)  = (amp)**2 dt / (T fN)
642         # where
643         #  dt = 1/samp_freq        (sample period)
644         #  T  = samples/samp_freq  (total time of data aquisition)
645         #  fN = 0.5 samp_freq      (Nyquist frequency)
646         # so
647         #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
648         #       = 2 amp**2 / (samp_freq*samples)
649         expected_amp = 2.0 * amp**2 / (samp_freq * samples)
650         expected = _numpy.ones(
651             (len(freq_axis),), dtype=_numpy.float) * expected_amp
652
653         self.assertAlmostEqual(
654             expected_amp, power[0], 4,
655             'The power should be flat at y = {} ({})'.format(
656                 expected_amp, power[0]))
657
658         if TEST_PLOTS:
659             figure = _pyplot.figure()
660             time_axes = figure.add_subplot(2, 1, 1)
661             time_axes.plot(
662                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
663             time_axes.set_title('time series')
664             freq_axes = figure.add_subplot(2, 1, 2)
665             freq_axes.plot(freq_axis, power, 'r.')
666             freq_axes.plot(freq_axis, expected, 'b-')
667             freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
668
669     def test_delta(self):
670         "Test unitary power spectrums on various delta functions"
671         self.run_delta(amp=1, samp_freq=1.0, samples=1024)
672         self.run_delta(amp=1, samp_freq=1.0, samples=2048)
673         # expected = 2*computed
674         self.run_delta(amp=1, samp_freq=0.5, samples=2048)
675         # expected = 0.5*computed
676         self.run_delta(amp=1, samp_freq=2.0, samples=2048)
677         self.run_delta(amp=3, samp_freq=1.0, samples=1024)
678         self.run_delta(amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
679
680     def gaussian(self, area, mean, std, t):
681         "Integral over all time = area (i.e. normalized for area=1)"
682         return area / (std * _numpy.sqrt(2.0 * _numpy.pi)) * _numpy.exp(
683             -0.5 * ((t-mean)/std)**2)
684
685     def run_gaussian(self, area=2.5, mean=5, std=1, samp_freq=10.24,
686                      samples=512):
687         """TODO.
688         """
689         x = _numpy.zeros((samples,), dtype=_numpy.float)
690         mean = _numpy.float(mean)
691         for i in range(samples):
692             t = i / _numpy.float(samp_freq)
693             x[i] = self.gaussian(area, mean, std, t)
694         freq_axis,power = unitary_power_spectrum(x, samp_freq)
695
696         # generate the predicted curve by comparing our
697         # TestUnitaryPowerSpectrum.gaussian() form to
698         # TestUnitaryRFFT.gaussian(),
699         # we see that the Fourier transform of x(t) has parameters:
700         #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are
701         #                           converting to angular frequency,
702         #                           not frequency like we are)
703         #  area' = area/[std sqrt(2*pi)]   (plugging into FT of
704         #                                   TestUnitaryRFFT.gaussian() above)
705         #  mean' = 0               (changing the mean in the time-domain just
706         #                           changes the phase in the freq-domain)
707         # So our power spectral density per unit time is given by
708         #  P(f) = 2 |X(f)|**2 / T
709         # Where
710         #  T  = samples/samp_freq  (total time of data aquisition)
711         mean = 0.0
712         area = area / (std * _numpy.sqrt(2.0 * _numpy.pi))
713         std = 1.0 / (2.0 * _numpy.pi * std)
714         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
715         # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
716         df = _numpy.float(samp_freq) / samples
717         for i in range(len(freq_axis)):
718             f = i * df
719             gaus = self.gaussian(area, mean, std, f)
720             expected[i] = 2.0 * gaus**2 * samp_freq / samples
721         self.assertAlmostEqual(
722             expected[0], power[0], 3,
723             ('The power should be a half-gaussian, '
724              'with a peak at 0 Hz with amplitude {} ({})').format(
725                 expected[0], power[0]))
726
727         if TEST_PLOTS:
728             figure = _pyplot.figure()
729             time_axes = figure.add_subplot(2, 1, 1)
730             time_axes.plot(
731                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
732                 x, 'b-')
733             time_axes.set_title('time series')
734             freq_axes = figure.add_subplot(2, 1, 2)
735             freq_axes.plot(freq_axis, power, 'r.')
736             freq_axes.plot(freq_axis, expected, 'b-')
737             freq_axes.set_title('freq series')
738
739     def test_gaussian(self):
740         "Test unitary power spectrums on various gaussian functions"
741         for area in [1, _numpy.pi]:
742             for std in [1, _numpy.sqrt(2)]:
743                 for samp_freq in [10.0, _numpy.exp(1)]:
744                     for samples in [1024, 2048]:
745                         self.run_gaussian(
746                             area=area, std=std, samp_freq=samp_freq,
747                             samples=samples)
748
749
750 class TestUnitaryAvgPowerSpectrum (_unittest.TestCase):
751     def run_sin(self, sin_freq=10, samp_freq=512, samples=1024, chunk_size=512,
752                 overlap=True, window=window_hann, places=3):
753         """TODO
754         """
755         x = _numpy.zeros((samples,), dtype=_numpy.float)
756         samp_freq = _numpy.float(samp_freq)
757         for i in range(samples):
758             x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
759         freq_axis,power = unitary_avg_power_spectrum(
760             x, samp_freq, chunk_size, overlap, window)
761         imax = _numpy.argmax(power)
762
763         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
764         # df = 1/T, where T = total_time
765         df = samp_freq / _numpy.float(chunk_size)
766         i = int(sin_freq / df)
767         # see TestUnitaryPowerSpectrum.run_unitary_power_spectrum_sin()
768         expected[i] = 0.5 / df
769
770         LOG.debug('The power should peak at {} Hz of {} ({}, {})'.format(
771                 sin_freq, expected[i], freq_axis[imax], power[imax]))
772         Pexp = P = 0
773         for i in range(len(freq_axis)):
774             Pexp += expected[i] * df
775             P += power[i] * df
776         self.assertAlmostEqual(
777             Pexp, P, places,
778             'The total power should be {} ({})'.format(Pexp, P))
779
780         if TEST_PLOTS:
781             figure = _pyplot.figure()
782             time_axes = figure.add_subplot(2, 1, 1)
783             time_axes.plot(
784                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
785                 x, 'b-')
786             time_axes.set_title('time series')
787             freq_axes = figure.add_subplot(2, 1, 2)
788             freq_axes.plot(freq_axis, power, 'r.')
789             freq_axes.plot(freq_axis, expected, 'b-')
790             freq_axes.set_title(
791                 '{} samples of sin at {} Hz'.format(samples, sin_freq))
792
793     def test_sin(self):
794         "Test unitary avg power spectrums on variously shaped sin functions."
795         self.run_sin(sin_freq=5, samp_freq=512, samples=1024)
796         self.run_sin(sin_freq=5, samp_freq=512, samples=2048)
797         self.run_sin(sin_freq=5, samp_freq=512, samples=4098)
798         self.run_sin(sin_freq=17, samp_freq=512, samples=1024)
799         self.run_sin(sin_freq=5, samp_freq=1024, samples=2048)
800         # test long wavelenth sin, so be closer to window frequency
801         self.run_sin(sin_freq=1, samp_freq=1024, samples=2048, places=0)
802         # finally, with some irrational numbers, to check that I'm not
803         # getting lucky
804         self.run_sin(
805             sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)