Merge changes from hooke.util.fft
[FFT-tools.git] / FFT_tools.py
1 # Copyright (C) 2008-2012  W. Trevor King
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU General Public License
13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
14
15 """Wrap Numpy's fft module to reduce clutter.
16
17 Provides a unitary discrete FFT and a windowed version based on
18 :func:`numpy.fft.rfft`.
19
20 Main entry functions:
21
22 * :func:`unitary_rfft`
23 * :func:`power_spectrum`
24 * :func:`unitary_power_spectrum`
25 * :func:`avg_power_spectrum`
26 * :func:`unitary_avg_power_spectrum`
27 """
28
29 import unittest as _unittest
30
31 import numpy as _numpy
32
33
34 __version__ = '0.4'
35
36 # Display time- and freq-space plots of the test transforms if True
37 TEST_PLOTS = False
38
39
40 def floor_pow_of_two(num):
41     """Round `num` down to the closest exact a power of two.
42
43     Examples
44     --------
45
46     >>> floor_pow_of_two(3)
47     2
48     >>> floor_pow_of_two(11)
49     8
50     >>> floor_pow_of_two(15)
51     8
52     """
53     lnum = _numpy.log2(num)
54     if int(lnum) != lnum:
55         num = 2**_numpy.floor(lnum)
56     return int(num)
57
58
59 def round_pow_of_two(num):
60     """Round `num` to the closest exact a power of two on a log scale.
61
62     Examples
63     --------
64
65     >>> round_pow_of_two(2.9) # Note rounding on *log scale*
66     4
67     >>> round_pow_of_two(11)
68     8
69     >>> round_pow_of_two(15)
70     16
71     """
72     lnum = _numpy.log2(num)
73     if int(lnum) != lnum:
74         num = 2**_numpy.round(lnum)
75     return int(num)
76
77
78 def ceil_pow_of_two(num):
79     """Round `num` up to the closest exact a power of two.
80
81     Examples
82     --------
83
84     >>> ceil_pow_of_two(3)
85     4
86     >>> ceil_pow_of_two(11)
87     16
88     >>> ceil_pow_of_two(15)
89     16
90     """
91     lnum = _numpy.log2(num)
92     if int(lnum) != lnum:
93         num = 2**_numpy.ceil(lnum)
94     return int(num)
95
96
97 def unitary_rfft(data, freq=1.0):
98     """Compute the unitary Fourier transform of real data.
99
100     Unitary = preserves power [Parseval's theorem].
101
102     Parameters
103     ----------
104     data : iterable
105         Real (not complex) data taken with a sampling frequency `freq`.
106     freq : real
107         Sampling frequency.
108
109     Returns
110     -------
111     freq_axis,trans : numpy.ndarray
112         Arrays ready for plotting.
113
114     Notes
115     -----
116     If the units on your data are Volts,
117     and your sampling frequency is in Hz,
118     then `freq_axis` will be in Hz,
119     and `trans` will be in Volts.
120     """
121     nsamps = floor_pow_of_two(len(data))
122     # Which should satisfy the discrete form of Parseval's theorem
123     #   n-1               n-1
124     #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
125     #   m=0               k=0
126     # However, we want our FFT to satisfy the continuous Parseval eqn
127     #   int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df
128     # which has the discrete form
129     #   n-1              n-1
130     #   SUM |x_m|^2 dt = SUM |X'_k|^2 df
131     #   m=0              k=0
132     # with X'_k = AX, this gives us
133     #   n-1                     n-1
134     #   SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2
135     #   m=0                     k=0
136     # so we see
137     #   A^2 df/dt = 1/n
138     #   A^2 = 1/n dt/df
139     # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html),
140     # Section 12.1, we see that for a sampling rate dt, the maximum frequency
141     # f_c in the transformed data is the Nyquist frequency (12.1.2)
142     #   f_c = 1/2dt
143     # and the points are spaced out by (12.1.5)
144     #   df = 1/ndt
145     # so
146     #   dt = 1/ndf
147     #   dt/df = 1/ndf^2
148     #   A^2 = 1/n^2df^2
149     #   A = 1/ndf = ndt/n = dt
150     # so we can convert the Numpy transformed data to match our unitary
151     # continuous transformed data with (also NR 12.1.8)
152     #   X'_k = dtX = X / <sampling freq>
153     trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq)
154     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
155     return (freq_axis, trans)
156
157
158 def power_spectrum(data, freq=1.0):
159     """Compute the power spectrum of the time series `data`.
160
161     Parameters
162     ----------
163     data : iterable
164         Real (not complex) data taken with a sampling frequency `freq`.
165     freq : real
166         Sampling frequency.
167
168     Returns
169     -------
170     freq_axis,power : numpy.ndarray
171         Arrays ready for plotting.
172
173     Notes
174     -----
175     If the number of samples in `data` is not an integer power of two,
176     the FFT ignores some of the later points.
177
178     See Also
179     --------
180     unitary_power_spectrum,avg_power_spectrum
181     """
182     nsamps = floor_pow_of_two(len(data))
183
184     freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1)
185     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
186     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
187     # See Numerical Recipies for a details.
188     trans = _numpy.fft.rfft(data[0:nsamps])
189     power = (trans * trans.conj()).real  # we want the square of the amplitude
190     return (freq_axis, power)
191
192
193 def unitary_power_spectrum(data, freq=1.0):
194     """Compute the unitary power spectrum of the time series `data`.
195
196     See Also
197     --------
198     power_spectrum,unitary_avg_power_spectrum
199     """
200     freq_axis,power = power_spectrum(data, freq)
201     # One sided power spectral density, so 2|H(f)|**2
202     # (see NR 2nd edition 12.0.14, p498)
203     #
204     # numpy normalizes with 1/N on the inverse transform ifft,
205     # so we should normalize the freq-space representation with 1/sqrt(N).
206     # But we're using the rfft, where N points are like N/2 complex points,
207     # so 1/sqrt(N/2)
208     # So the power gets normalized by that twice and we have 2/N
209     #
210     # On top of this, the FFT assumes a sampling freq of 1 per second,
211     # and we want to preserve area under our curves.
212     # If our total time T = len(data)/freq is smaller than 1,
213     # our df_real = freq/len(data) is bigger that the FFT expects
214     # (dt_fft = 1/len(data)),
215     # and we need to scale the powers down to conserve area.
216     # df_fft * F_fft(f) = df_real *F_real(f)
217     # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq
218     # So the power gets normalized by *that* twice and we have 2/N * freq**2
219
220     # power per unit time
221     # measure x(t) for time T
222     # X(f)   = int_0^T x(t) exp(-2 pi ift) dt
223     # PSD(f) = 2 |X(f)|**2 / T
224
225     # total_time = len(data)/float(freq)
226     # power *= 2.0 / float(freq)**2   /   total_time
227     # power *= 2.0 / freq**2   *   freq / len(data)
228     power *= 2.0 / (freq * _numpy.float(len(data)))
229
230     return (freq_axis, power)
231
232
233 def window_hann(length):
234     r"""Returns a Hann window array with length entries
235
236     Notes
237     -----
238     The Hann window with length :math:`L` is defined as
239
240     .. math:: w_i = \frac{1}{2} (1-\cos(2\pi i/L))
241     """
242     win = _numpy.zeros((length,), dtype=_numpy.float)
243     for i in range(length):
244         win[i] = 0.5 * (
245             1.0 - _numpy.cos(2.0 * _numpy.pi * _numpy.float(i) / (length)))
246     # avg value of cos over a period is 0
247     # so average height of Hann window is 0.5
248     return win
249
250
251 def avg_power_spectrum(data, freq=1.0, chunk_size=2048,
252                        overlap=True, window=window_hann):
253     """Compute the avgerage power spectrum of `data`.
254
255     Parameters
256     ----------
257     data : iterable
258         Real (not complex) data taken with a sampling frequency `freq`.
259     freq : real
260         Sampling frequency.
261     chunk_size : int
262         Number of samples per chunk.  Use a power of two.
263     overlap: {True,False}
264         If `True`, each chunk overlaps the previous chunk by half its
265         length.  Otherwise, the chunks are end-to-end, and not
266         overlapping.
267     window: iterable
268         Weights used to "smooth" the chunks, there is a whole science
269         behind windowing, but if you're not trying to squeeze every
270         drop of information out of your data, you'll be OK with the
271         default Hann window.
272
273     Returns
274     -------
275     freq_axis,power : numpy.ndarray
276         Arrays ready for plotting.
277
278     Notes
279     -----
280     The average power spectrum is computed by breaking `data` into
281     chunks of length `chunk_size`.  These chunks are transformed
282     individually into frequency space and then averaged together.
283
284     See Numerical Recipes 2 section 13.4 for a good introduction to
285     the theory.
286
287     If the number of samples in `data` is not a multiple of
288     `chunk_size`, we ignore the extra points.
289     """
290     if chunk_size != floor_pow_of_two(chunk_size):
291         raise ValueError(
292             'chunk_size {} should be a power of 2'.format(chunk_size))
293
294     nchunks = len(data) // chunk_size  # integer division = implicit floor
295     if overlap:
296         chunk_step = chunk_size / 2
297     else:
298         chunk_step = chunk_size
299
300     win = window(chunk_size)  # generate a window of the appropriate size
301     freq_axis = _numpy.linspace(0, freq / 2, chunk_size / 2 + 1)
302     # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real.
303     # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination.
304     # See Numerical Recipies for a details.
305     power = _numpy.zeros((chunk_size / 2 + 1, ), dtype=_numpy.float)
306     for i in range(nchunks):
307         starti = i * chunk_step
308         stopi = starti + chunk_size
309         fft_chunk = _numpy.fft.rfft(data[starti:stopi] * win)
310         p_chunk = (fft_chunk * fft_chunk.conj()).real
311         power += p_chunk.astype(_numpy.float)
312     power /= _numpy.float(nchunks)
313     return (freq_axis, power)
314
315
316 def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048,
317                                overlap=True, window=window_hann):
318     """Compute the unitary average power spectrum of `data`.
319
320     See Also
321     --------
322     avg_power_spectrum,unitary_power_spectrum
323     """
324     freq_axis,power = avg_power_spectrum(
325         data, freq, chunk_size, overlap, window)
326     #   2.0 / (freq * chunk_size)       |rfft()|**2 --> unitary_power_spectrum
327     # see unitary_power_spectrum()
328     power *= 2.0 / (freq * _numpy.float(chunk_size)) * 8.0 / 3
329     #             * 8/3  to remove power from windowing
330     #  <[x(t)*w(t)]**2> = <x(t)**2 * w(t)**2> ~= <x(t)**2> * <w(t)**2>
331     # where the ~= is because the frequency of x(t) >> the frequency of w(t).
332     # So our calulated power has and extra <w(t)**2> in it.
333     # For the Hann window,
334     #   <w(t)**2> = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8
335     # For low frequency components,
336     # where the frequency of x(t) is ~= the frequency of w(t),
337     # the normalization is not perfect. ??
338     # The normalization approaches perfection as chunk_size -> infinity.
339     return (freq_axis, power)
340
341
342 class TestRFFT (_unittest.TestCase):
343     r"""Ensure Numpy's FFT algorithm acts as expected.
344
345     Notes
346     -----
347     The expected return values are [#dft]_:
348
349     .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n}
350
351     .. [#dft] See the *Background information* section of :mod:`numpy.fft`.
352     """
353     def run_rfft(self, xs, Xs):
354         i = _numpy.complex(0, 1)
355         n = len(xs)
356         Xa = []
357         for k in range(n):
358             Xa.append(sum([x * _numpy.exp(-2 * _numpy.pi * i * m * k / n)
359                            for x,m in zip(xs, range(n))]))
360             if k < len(Xs):
361                 if (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]) >= 1e-6:
362                     raise ValueError(
363                         ('rfft mismatch on element {}: {} != {}, '
364                          'relative error {}').format(
365                             k, Xs[k], Xa[k],
366                             (Xs[k] - Xa[k]) / _numpy.abs(Xa[k])))
367         # Which should satisfy the discrete form of Parseval's theorem
368         #   n-1               n-1
369         #   SUM |x_m|^2 = 1/n SUM |X_k|^2.
370         #   m=0               k=0
371         timeSum = sum([_numpy.abs(x)**2 for x in xs])
372         freqSum = sum([_numpy.abs(X)**2 for X in Xa])
373         if _numpy.abs(freqSum / _numpy.float(n) - timeSum) / timeSum >= 1e-6:
374             raise ValueError(
375                 "Mismatch on Parseval's, {} != 1/{} * {}".format(
376                     timeSum, n, freqSum))
377
378     def test_rfft(self):
379         xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
380         self.run_rfft(xs, _numpy.fft.rfft(xs))
381
382
383 class TestUnitaryRFFT (_unittest.TestCase):
384     """Verify `unitary_rfft`.
385     """
386     def run_unitary_rfft_parsevals(self, xs, freq, freqs, Xs):
387         """Check the discretized integral form of Parseval's theorem
388
389         Notes
390         -----
391         Which is:
392
393         .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df
394         """
395         dt = 1.0 / freq
396         df = freqs[1] - freqs[0]
397         if (df - 1 / (len(xs) * dt)) / df >= 1e-6:
398             raise ValueError(
399                 'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
400         Xa = list(Xs)
401         for k in range(len(Xs) - 1, 1, -1):
402             Xa.append(Xa[k])
403         if len(xs) != len(Xa):
404             raise ValueError(
405                 'Length mismatch {} != {}'.format(len(xs), len(Xa)))
406         lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
407         rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
408         if _numpy.abs(lhs - rhs) / lhs >= 1e-4:
409             raise ValueError(
410                 "Mismatch on Parseval's, {} != {}".format(lhs, rhs))
411
412     def test_unitary_rfft_parsevals(self):
413         "Test unitary rfft on Parseval's theorem"
414         xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
415         dt = _numpy.pi
416         freqs,Xs = unitary_rfft(xs, 1.0 / dt)
417         self.run_unitary_rfft_parsevals(xs, 1.0 / dt, freqs, Xs)
418
419     def rect(self, t):
420         r"""Rectangle function.
421
422         Notes
423         -----
424
425         .. math::
426
427             \rect(t) = \begin{cases}
428                1& \text{if $|t| < 0.5$}, \\
429                0& \text{if $|t| \ge 0.5$}.
430                        \end{cases}
431         """
432         if _numpy.abs(t) < 0.5:
433             return 1
434         else:
435             return 0
436
437     def run_unitary_rfft_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6,
438                               samples=256):
439         r"""Test `unitary_rttf` on known function `rect(at)`.
440
441         Notes
442         -----
443         Analytic result:
444
445         .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a)
446         """
447         samp_freq = _numpy.float(samp_freq)
448         a = _numpy.float(a)
449
450         x = _numpy.zeros((samples,), dtype=_numpy.float)
451         dt = 1.0 / samp_freq
452         for i in range(samples):
453             t = i * dt
454             x[i] = self.rect(a * (t - time_shift))
455         freq_axis,X = unitary_rfft(x, samp_freq)
456
457         # remove the phase due to our time shift
458         j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
459         for i in range(len(freq_axis)):
460             f = freq_axis[i]
461             inverse_phase_shift = _numpy.exp(
462                 j * 2.0 * _numpy.pi * time_shift * f)
463             X[i] *= inverse_phase_shift
464
465         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
466         # normalized sinc(x) = sin(pi x)/(pi x)
467         # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
468         if _numpy.sinc(0.5) != 2.0 / _numpy.pi:
469             raise ValueError('abnormal sinc()')
470         for i in range(len(freq_axis)):
471             f = freq_axis[i]
472             expected[i] = 1.0 / _numpy.abs(a) * _numpy.sinc(f / a)
473
474         if TEST_PLOTS:
475             figure = _pyplot.figure()
476             time_axes = figure.add_subplot(2, 1, 1)
477             time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
478             time_axes.set_title('time series')
479             freq_axes = figure.add_subplot(2, 1, 2)
480             freq_axes.plot(freq_axis, X.real, 'r.')
481             freq_axes.plot(freq_axis, X.imag, 'g.')
482             freq_axes.plot(freq_axis, expected, 'b-')
483             freq_axes.set_title('freq series')
484
485     def test_unitary_rfft_rect(self):
486         "Test unitary FFTs on variously shaped rectangular functions."
487         self.run_unitary_rfft_rect(a=0.5)
488         self.run_unitary_rfft_rect(a=2.0)
489         self.run_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512)
490         self.run_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024)
491
492     def gaussian(self, a, t):
493         r"""Gaussian function.
494
495         Notes
496         -----
497
498         .. math:: \gaussian(a,t) = \exp^{-at^2}
499         """
500         return _numpy.exp(-a * t**2)
501
502     def run_unitary_rfft_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6,
503                                   samples=256):
504         r"""Test `unitary_rttf` on known function `gaussian(a,t)`.
505
506         Notes
507         -----
508         Analytic result:
509
510         .. math::
511
512             \rfft(\gaussian(a,t)) = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f)
513         """
514         samp_freq = _numpy.float(samp_freq)
515         a = _numpy.float(a)
516
517         x = _numpy.zeros((samples,), dtype=_numpy.float)
518         dt = 1.0 / samp_freq
519         for i in range(samples):
520             t = i * dt
521             x[i] = self.gaussian(a, (t - time_shift))
522         freq_axis,X = unitary_rfft(x, samp_freq)
523
524         # remove the phase due to our time shift
525         j = _numpy.complex(0.0, 1.0)  # sqrt(-1)
526         for i in range(len(freq_axis)):
527             f = freq_axis[i]
528             inverse_phase_shift = _numpy.exp(
529                 j * 2.0 * _numpy.pi * time_shift * f)
530             X[i] *= inverse_phase_shift
531
532         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
533         for i in range(len(freq_axis)):
534             f = freq_axis[i]
535             # see Wikipedia, or do the integral yourself.
536             expected[i] = _numpy.sqrt(_numpy.pi / a) * self.gaussian(
537                 1.0 / a, _numpy.pi * f)
538
539         if TEST_PLOTS:
540             figure = _pyplot.figure()
541             time_axes = figure.add_subplot(2, 1, 1)
542             time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
543             time_axes.set_title('time series')
544             freq_axes = figure.add_subplot(2, 1, 2)
545             freq_axes.plot(freq_axis, X.real, 'r.')
546             freq_axes.plot(freq_axis, X.imag, 'g.')
547             freq_axes.plot(freq_axis, expected, 'b-')
548             freq_axes.set_title('freq series')
549
550     def test_unitary_rfft_gaussian(self):
551         "Test unitary FFTs on variously shaped gaussian functions."
552         self.run_unitary_rfft_gaussian(a=0.5)
553         self.run_unitary_rfft_gaussian(a=2.0)
554         self.run_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512)
555         self.run_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024)
556
557
558 class TestUnitaryPowerSpectrum (_unittest.TestCase):
559     def run_unitary_power_spectrum_sin(self, sin_freq=10, samp_freq=512,
560                                        samples=1024):
561         x = _numpy.zeros((samples,), dtype=_numpy.float)
562         samp_freq = _numpy.float(samp_freq)
563         for i in range(samples):
564             x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
565         freq_axis,power = unitary_power_spectrum(x, samp_freq)
566         imax = _numpy.argmax(power)
567
568         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
569         # df = 1/T, where T = total_time
570         df = samp_freq / _numpy.float(samples)
571         i = int(sin_freq / df)
572         # average power per unit time is
573         #  P = <x(t)**2>
574         # average value of sin(t)**2 = 0.5    (b/c sin**2+cos**2 == 1)
575         # so average value of (int sin(t)**2 dt) per unit time is 0.5
576         #  P = 0.5
577         # we spread that power over a frequency bin of width df, sp
578         #  P(f0) = 0.5/df
579         # where f0 is the sin's frequency
580         #
581         # or:
582         # FFT of sin(2*pi*t*f0) gives
583         #  X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
584         # (area under x(t) = 0, area under X(f) = 0)
585         # so one sided power spectral density (PSD) per unit time is
586         #  P(f) = 2 |X(f)|**2 / T
587         #       = 2 * |0.5 delta(f-f0)|**2 / T
588         #       = 0.5 * |delta(f-f0)|**2 / T
589         # but we're discrete and want the integral of the 'delta' to be 1,
590         # so 'delta'*df = 1  --> 'delta' = 1/df, and
591         #  P(f) = 0.5 / (df**2 * T)
592         #       = 0.5 / df                (T = 1/df)
593         expected[i] = 0.5 / df
594
595         print('The power should be a peak at {} Hz of {} ({}, {})'.format(
596                 sin_freq, expected[i], freq_axis[imax], power[imax]))
597         Pexp = P = 0
598         for i in range(len(freq_axis)):
599             Pexp += expected[i] * df
600             P += power[i] * df
601         print('The total power should be {} ({})'.format(Pexp, P))
602
603         if TEST_PLOTS:
604             figure = _pyplot.figure()
605             time_axes = figure.add_subplot(2, 1, 1)
606             time_axes.plot(
607                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
608             time_axes.set_title('time series')
609             freq_axes = figure.add_subplot(2, 1, 2)
610             freq_axes.plot(freq_axis, power, 'r.')
611             freq_axes.plot(freq_axis, expected, 'b-')
612             freq_axes.set_title(
613                 '{} samples of sin at {} Hz'.format(samples, sin_freq))
614
615
616     def test_unitary_power_spectrum_sin(self):
617         "Test unitary power spectrums on variously shaped sin functions"
618         self.run_unitary_power_spectrum_sin(
619             sin_freq=5, samp_freq=512, samples=1024)
620         self.run_unitary_power_spectrum_sin(
621             sin_freq=5, samp_freq=512, samples=2048)
622         self.run_unitary_power_spectrum_sin(
623             sin_freq=5, samp_freq=512, samples=4098)
624         self.run_unitary_power_spectrum_sin(
625             sin_freq=7, samp_freq=512, samples=1024)
626         self.run_unitary_power_spectrum_sin(
627             sin_freq=5, samp_freq=1024, samples=2048)
628         # finally, with some irrational numbers, to check that I'm not
629         # getting lucky
630         self.run_unitary_power_spectrum_sin(
631             sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)
632         # test with non-integer number of periods
633         self.run_unitary_power_spectrum_sin(
634             sin_freq=5, samp_freq=512, samples=256)
635
636     def run_unitary_power_spectrum_delta(self, amp=1, samp_freq=1,
637                                          samples=256):
638         """TODO
639         """
640         x = _numpy.zeros((samples,), dtype=_numpy.float)
641         samp_freq = _numpy.float(samp_freq)
642         x[0] = amp
643         freq_axis,power = unitary_power_spectrum(x, samp_freq)
644
645         # power = <x(t)**2> = (amp)**2 * dt/T
646         # we spread that power over the entire freq_axis [0,fN], so
647         #  P(f)  = (amp)**2 dt / (T fN)
648         # where
649         #  dt = 1/samp_freq        (sample period)
650         #  T  = samples/samp_freq  (total time of data aquisition)
651         #  fN = 0.5 samp_freq      (Nyquist frequency)
652         # so
653         #  P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
654         #       = 2 amp**2 / (samp_freq*samples)
655         expected_amp = 2.0 * amp**2 / (samp_freq * samples)
656         expected = _numpy.ones(
657             (len(freq_axis),), dtype=_numpy.float) * expected_amp
658
659         print('The power should be flat at y = {} ({})'.format(
660             expected_amp, power[0]))
661
662         if TEST_PLOTS:
663             figure = _pyplot.figure()
664             time_axes = figure.add_subplot(2, 1, 1)
665             time_axes.plot(
666                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
667             time_axes.set_title('time series')
668             freq_axes = figure.add_subplot(2, 1, 2)
669             freq_axes.plot(freq_axis, power, 'r.')
670             freq_axes.plot(freq_axis, expected, 'b-')
671             freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
672
673     def test_unitary_power_spectrum_delta(self):
674         "Test unitary power spectrums on various delta functions"
675         self.run_unitary_power_spectrum_delta(
676             amp=1, samp_freq=1.0, samples=1024)
677         self.run_unitary_power_spectrum_delta(
678             amp=1, samp_freq=1.0, samples=2048)
679         # expected = 2*computed
680         self.run_unitary_power_spectrum_delta(
681             amp=1, samp_freq=0.5, samples=2048)
682         # expected = 0.5*computed
683         self.run_unitary_power_spectrum_delta(
684             amp=1, samp_freq=2.0, samples=2048)
685         self.run_unitary_power_spectrum_delta(
686             amp=3, samp_freq=1.0, samples=1024)
687         self.run_unitary_power_spectrum_delta(
688             amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
689
690     def gaussian(self, area, mean, std, t):
691         "Integral over all time = area (i.e. normalized for area=1)"
692         return area / (std * _numpy.sqrt(2.0 * _numpy.pi)) * _numpy.exp(
693             -0.5 * ((t-mean)/std)**2)
694
695     def run_unitary_power_spectrum_gaussian(self, area=2.5, mean=5, std=1,
696                                             samp_freq=10.24 ,samples=512):
697         """TODO.
698         """
699         x = _numpy.zeros((samples,), dtype=_numpy.float)
700         mean = _numpy.float(mean)
701         for i in range(samples):
702             t = i / _numpy.float(samp_freq)
703             x[i] = self.gaussian(area, mean, std, t)
704         freq_axis,power = unitary_power_spectrum(x, samp_freq)
705
706         # generate the predicted curve by comparing our
707         # TestUnitaryPowerSpectrum.gaussian() form to
708         # TestUnitaryRFFT.gaussian(),
709         # we see that the Fourier transform of x(t) has parameters:
710         #  std'  = 1/(2 pi std)    (references declaring std' = 1/std are
711         #                           converting to angular frequency,
712         #                           not frequency like we are)
713         #  area' = area/[std sqrt(2*pi)]   (plugging into FT of
714         #                                   TestUnitaryRFFT.gaussian() above)
715         #  mean' = 0               (changing the mean in the time-domain just
716         #                           changes the phase in the freq-domain)
717         # So our power spectral density per unit time is given by
718         #  P(f) = 2 |X(f)|**2 / T
719         # Where
720         #  T  = samples/samp_freq  (total time of data aquisition)
721         mean = 0.0
722         area = area / (std * _numpy.sqrt(2.0 * _numpy.pi))
723         std = 1.0 / (2.0 * _numpy.pi * std)
724         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
725         # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
726         df = _numpy.float(samp_freq) / samples
727         for i in range(len(freq_axis)):
728             f = i * df
729             gaus = self.gaussian(area, mean, std, f)
730             expected[i] = 2.0 * gaus**2 * samp_freq / samples
731         print(('The power should be a half-gaussian, '
732                'with a peak at 0 Hz with amplitude {} ({})').format(
733                 expected[0], power[0]))
734
735         if TEST_PLOTS:
736             figure = _pyplot.figure()
737             time_axes = figure.add_subplot(2, 1, 1)
738             time_axes.plot(
739                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
740                 x, 'b-')
741             time_axes.set_title('time series')
742             freq_axes = figure.add_subplot(2, 1, 2)
743             freq_axes.plot(freq_axis, power, 'r.')
744             freq_axes.plot(freq_axis, expected, 'b-')
745             freq_axes.set_title('freq series')
746
747     def test_unitary_power_spectrum_gaussian(self):
748         "Test unitary power spectrums on various gaussian functions"
749         for area in [1, _numpy.pi]:
750             for std in [1, _numpy.sqrt(2)]:
751                 for samp_freq in [10.0, _numpy.exp(1)]:
752                     for samples in [1024, 2048]:
753                         self.run_unitary_power_spectrum_gaussian(
754                             area=area, std=std, samp_freq=samp_freq,
755                             samples=samples)
756
757
758 class TestUnitaryAvgPowerSpectrum (_unittest.TestCase):
759     def run_unitary_avg_power_spectrum_sin(self, sin_freq=10, samp_freq=512,
760                                            samples=1024, chunk_size=512,
761                                            overlap=True, window=window_hann):
762         """TODO
763         """
764         x = _numpy.zeros((samples,), dtype=_numpy.float)
765         samp_freq = _numpy.float(samp_freq)
766         for i in range(samples):
767             x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
768         freq_axis,power = unitary_avg_power_spectrum(
769             x, samp_freq, chunk_size, overlap, window)
770         imax = _numpy.argmax(power)
771
772         expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
773         # df = 1/T, where T = total_time
774         df = samp_freq / _numpy.float(chunk_size)
775         i = int(sin_freq / df)
776         # see TestUnitaryPowerSpectrum.run_unitary_power_spectrum_sin()
777         expected[i] = 0.5 / df
778
779         print('The power should peak at {} Hz of {} ({}, {})'.format(
780             sin_freq, expected[i], freq_axis[imax], power[imax]))
781         Pexp = P = 0
782         for i in range(len(freq_axis)):
783             Pexp += expected[i] * df
784             P += power[i] * df
785         print('The total power should be {} ({})'.format(Pexp, P))
786
787         if TEST_PLOTS:
788             figure = _pyplot.figure()
789             time_axes = figure.add_subplot(2, 1, 1)
790             time_axes.plot(
791                 _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
792                 x, 'b-')
793             time_axes.set_title('time series')
794             freq_axes = figure.add_subplot(2, 1, 2)
795             freq_axes.plot(freq_axis, power, 'r.')
796             freq_axes.plot(freq_axis, expected, 'b-')
797             freq_axes.set_title(
798                 '{} samples of sin at {} Hz'.format(samples, sin_freq))
799
800     def test_unitary_avg_power_spectrum_sin(self):
801         "Test unitary avg power spectrums on variously shaped sin functions."
802         self.run_unitary_avg_power_spectrum_sin(
803             sin_freq=5, samp_freq=512, samples=1024)
804         self.run_unitary_avg_power_spectrum_sin(
805             sin_freq=5, samp_freq=512, samples=2048)
806         self.run_unitary_avg_power_spectrum_sin(
807             sin_freq=5, samp_freq=512, samples=4098)
808         self.run_unitary_avg_power_spectrum_sin(
809             sin_freq=17, samp_freq=512, samples=1024)
810         self.run_unitary_avg_power_spectrum_sin(
811             sin_freq=5, samp_freq=1024, samples=2048)
812         # test long wavelenth sin, so be closer to window frequency
813         self.run_unitary_avg_power_spectrum_sin(
814             sin_freq=1, samp_freq=1024, samples=2048)
815         # finally, with some irrational numbers, to check that I'm not
816         # getting lucky
817         self.run_unitary_avg_power_spectrum_sin(
818             sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)