From: W. Trevor King Date: Mon, 19 Nov 2012 06:13:55 +0000 (-0500) Subject: Merge changes from hooke.util.fft X-Git-Tag: 0.5~10 X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=70aa1b8bc8ef02873265455ac9fc9d35279758af;p=FFT-tools.git Merge changes from hooke.util.fft I'd bundled FFT_tools into Hooke back in 2010, when I was optimistic about getting my Hooke branch accepted upstream. Now that that seems less likely, I'm went to tear the module back out of Hooke, and I noticed some useful restructuring. Here it is, merged back into the FFT-tools. Changes: * Fancy Sphinx/numpydoc-style docstrings * unittest-based test suite. Conflicts: FFT_tools.py The bulk of the conflics were due to parallel style cleanups, and had trivial fixes. I also copied the test code relatively unchanged from the FFT-tools branch into the Hooke unittest framework. Here is the mapping: Old FFT-tools function New unittest method ------------------------------------------- --------------------------------------------------------------- _test_rfft TestRFFT.run_rfft _test_rfft_suite TestRFFT.test_rfft _test_unitary_rfft_parsevals TestUnitaryRFFT.run_unitary_rfft_parsevals _test_unitary_rfft_parsevals_suite TestUnitaryRFFT.test_unitary_rfft_parsevals _rect TestUnitaryRFFT.rect _test_unitary_rfft_rect TestUnitaryRFFT.run_unitary_rfft_rect _test_unitary_rfft_rect_suite TestUnitaryRFFT.test_unitary_rfft_rect _gaussian TestUnitaryRFFT.gaussian _test_unitary_rfft_gaussian TestUnitaryRFFT.run_unitary_rfft_gaussian _test_unitary_power_spectrum_sin TestUnitaryPowerSpectrum.run_unitary_power_spectrum_sin _test_unitary_power_spectrum_sin_suite TestUnitaryPowerSpectrum.test_unitary_power_spectrum_sin _test_unitary_power_spectrum_delta TestUnitaryPowerSpectrum.run_unitary_power_spectrum_delta _test_unitary_power_spectrum_delta_suite TestUnitaryPowerSpectrum.test_unitary_power_spectrum_delta _gaussian2 TestUnitaryPowerSpectrum.gaussian _test_unitary_power_spectrum_gaussian TestUnitaryPowerSpectrum.run_unitary_power_spectrum_gaussian _test_unitary_power_spectrum_gaussian_suite TestUnitaryPowerSpectrum.test_unitary_power_spectrum_gaussian _test_unitary_avg_power_spectrum_sin TestUnitaryAvgPowerSpectrum.run_unitary_avg_power_spectrum_sin _test_unitary_avg_power_spectrum_sin_suite TestUnitaryAvgPowerSpectrum.test_unitary_avg_power_spectrum_sin --- 70aa1b8bc8ef02873265455ac9fc9d35279758af diff --cc FFT_tools.py index 6205489,0daec76..933e064 --- a/FFT_tools.py +++ b/FFT_tools.py @@@ -1,117 -1,140 +1,122 @@@ -#!/usr/bin/python +# Copyright (C) 2008-2012 W. Trevor King +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. # -# Wrap Numpy's fft module to produce 1D unitary transforms and power spectra. +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. # -# Copyright (C) 2008 W. Trevor King -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in -# the documentation and/or other materials provided with the -# distribution. -# -# * Neither the name of the copyright holders nor the names of the -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE -# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, -# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN -# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . -""" -Define some FFT wrappers to reduce clutter. -Provides a unitary discrete FFT and a windowed version. -Based on :func:`numpy.fft.rfft`. +"""Wrap Numpy's fft module to reduce clutter. + +Provides a unitary discrete FFT and a windowed version based on - numpy.fft.rfft. - - Create some fake data: - - >>> import numpy - >>> data = numpy.random.rand(10) - >>> frequency = 1 ++:func:`numpy.fft.rfft`. Main entry functions: - >>> rfft = unitary_rfft(data, freq=frequency) - >>> psd = power_spectrum(data, freq=frequency) - >>> upsd = unitary_power_spectrum(data, freq=frequency) - >>> apsd = avg_power_spectrum(data, freq=frequency, chunk_size=2048, - ... overlap=True, window=window_hann) - >>> aupsd = unitary_avg_power_spectrum(data, freq=frequency, chunk_size=2048, - ... overlap=True, window=window_hann) + * :func:`unitary_rfft` + * :func:`power_spectrum` + * :func:`unitary_power_spectrum` + * :func:`avg_power_spectrum` + * :func:`unitary_avg_power_spectrum` """ -import unittest ++import unittest as _unittest ++ +import numpy as _numpy -from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \ - sinc, arctan2, array, ones, arange, linspace, zeros, \ - uint16, float, concatenate, fromfile, argmax, complex -from numpy.fft import rfft +__version__ = '0.4' +# Display time- and freq-space plots of the test transforms if True TEST_PLOTS = False + def floor_pow_of_two(num): - "Round num down to the closest exact a power of two." + """Round `num` down to the closest exact a power of two. + + Examples + -------- + + >>> floor_pow_of_two(3) + 2 + >>> floor_pow_of_two(11) + 8 + >>> floor_pow_of_two(15) + 8 + """ - lnum = log2(num) + lnum = _numpy.log2(num) if int(lnum) != lnum: - num = 2**floor(lnum) + num = 2**_numpy.floor(lnum) - return num + return int(num) + def round_pow_of_two(num): - "Round num to the closest exact a power of two on a log scale." + """Round `num` to the closest exact a power of two on a log scale. + + Examples + -------- + + >>> round_pow_of_two(2.9) # Note rounding on *log scale* + 4 + >>> round_pow_of_two(11) + 8 + >>> round_pow_of_two(15) + 16 + """ - lnum = log2(num) + lnum = _numpy.log2(num) if int(lnum) != lnum: - num = 2**round(lnum) + num = 2**_numpy.round(lnum) - return num + return int(num) + def ceil_pow_of_two(num): - "Round num up to the closest exact a power of two." + """Round `num` up to the closest exact a power of two. + + Examples + -------- + + >>> ceil_pow_of_two(3) + 4 + >>> ceil_pow_of_two(11) + 16 + >>> ceil_pow_of_two(15) + 16 + """ - lnum = log2(num) + lnum = _numpy.log2(num) if int(lnum) != lnum: - num = 2**ceil(lnum) + num = 2**_numpy.ceil(lnum) - return num - - - def _test_rfft(xs, Xs): - # Numpy's FFT algoritm returns - # n-1 - # X[k] = SUM x[m] exp (-j 2pi km /n) - # m=0 - # (see http://www.tramy.us/numpybook.pdf) - j = _numpy.complex(0, 1) - n = len(xs) - Xa = [] - for k in range(n): - Xa.append(sum([x * _numpy.exp(-j * 2 * _numpy.pi * k * m / n) - for x,m in zip(xs, range(n))])) - if k < len(Xs): - if (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]) >= 1e-6: - raise ValueError( - ('rfft mismatch on element {}: {} != {}, relative error {}' - ).format( - k, Xs[k], Xa[k], (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]))) - # Which should satisfy the discrete form of Parseval's theorem - # n-1 n-1 - # SUM |x_m|^2 = 1/n SUM |X_k|^2. - # m=0 k=0 - timeSum = sum([_numpy.abs(x)**2 for x in xs]) - freqSum = sum([_numpy.abs(X)**2 for X in Xa]) - if _numpy.abs(freqSum / _numpy.float(n) - timeSum) / timeSum >= 1e-6: - raise ValueError( - "Mismatch on Parseval's, {} != 1/{} * {}".format( - timeSum, n, freqSum)) + return int(num) + - def _test_rfft_suite(): - print('Test numpy rfft definition') - xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1] - _test_rfft(xs, _numpy.fft.rfft(xs)) + def unitary_rfft(data, freq=1.0): + """Compute the unitary Fourier transform of real data. + Unitary = preserves power [Parseval's theorem]. - def unitary_rfft(data, freq=1.0): - """Compute the Fourier transform of real data. + Parameters + ---------- + data : iterable + Real (not complex) data taken with a sampling frequency `freq`. + freq : real + Sampling frequency. - Unitary (preserves power [Parseval's theorem]). + Returns + ------- + freq_axis,trans : numpy.ndarray + Arrays ready for plotting. + Notes + ----- If the units on your data are Volts, and your sampling frequency is in Hz, - then freq_axis will be in Hz, - and trans will be in Volts. + then `freq_axis` will be in Hz, + and `trans` will be in Volts. """ nsamps = floor_pow_of_two(len(data)) # Which should satisfy the discrete form of Parseval's theorem @@@ -145,172 -168,53 +150,56 @@@ # so we can convert the Numpy transformed data to match our unitary # continuous transformed data with (also NR 12.1.8) # X'_k = dtX = X / - trans = rfft(data[0:nsamps]) / float(freq) - freq_axis = linspace(0, freq/2, nsamps/2+1) + trans = _numpy.fft.rfft(data[0:nsamps]) / _numpy.float(freq) + freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1) return (freq_axis, trans) + - def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs): - # Which should satisfy the discretized integral form of Parseval's theorem - # n-1 n-1 - # SUM |x_m|^2 dt = SUM |X_k|^2 df - # m=0 k=0 - dt = 1.0 / freq - df = freqs[1] - freqs[0] - if df - 1 / (len(xs) * dt * df) >= 1e-6: - raise ValueError( - 'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt)) - Xa = list(Xs) - for k in range(len(Xs) - 1, 1, -1): - Xa.append(Xa[k]) - if len(xs) != len(Xa): - raise ValueError('Length mismatch {} != {}'.format(len(xs), len(Xa))) - lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt - rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df - if _numpy.abs(lhs - rhs) / lhs >= 1e-4: - raise ValueError("Mismatch on Parseval's, {} != {}".format(lhs, rhs)) - - - def _test_unitary_rfft_parsevals_suite(): - print("Test unitary rfft on Parseval's theorem") - xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1] - dt = _numpy.pi - freqs,Xs = unitary_rfft(xs, 1.0 / dt) - _test_unitary_rfft_parsevals(xs, 1.0 / dt, freqs, Xs) - - - def _rect(t): - if _numpy.abs(t) < 0.5: - return 1 - else: - return 0 - - - def _test_unitary_rfft_rect( - a=1.0, time_shift=5.0, samp_freq=25.6, samples=256): - "Show fft(rect(at)) = 1/abs(a) * _numpy.sinc(f/a)" - samp_freq = _numpy.float(samp_freq) - a = _numpy.float(a) - - x = _numpy.zeros((samples,), dtype=_numpy.float) - dt = 1.0 / samp_freq - for i in range(samples): - t = i * dt - x[i] = _rect(a * (t - time_shift)) - freq_axis, X = unitary_rfft(x, samp_freq) - #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X) - - # remove the phase due to our time shift - j = _numpy.complex(0.0, 1.0) # sqrt(-1) - for i in range(len(freq_axis)): - f = freq_axis[i] - inverse_phase_shift = _numpy.exp(j * 2.0 * _numpy.pi * time_shift * f) - X[i] *= inverse_phase_shift - - expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) - # normalized sinc(x) = sin(pi x)/(pi x) - # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi - if _numpy.sinc(0.5) != 2.0 / _numpy.pi: - raise ValueError('abnormal sinc()') - for i in range(len(freq_axis)): - f = freq_axis[i] - expected[i] = 1.0 / _numpy.abs(a) * _numpy.sinc(f / a) - - if TEST_PLOTS: - figure = _pyplot.figure() - time_axes = figure.add_subplot(2, 1, 1) - time_axes.plot(_numpy.arange(0, dt * samples, dt), x) - time_axes.set_title('time series') - freq_axes = figure.add_subplot(2, 1, 2) - freq_axes.plot(freq_axis, X.real, 'r.') - freq_axes.plot(freq_axis, X.imag, 'g.') - freq_axes.plot(freq_axis, expected, 'b-') - freq_axes.set_title('freq series') - - - def _test_unitary_rfft_rect_suite(): - print('Test unitary FFTs on variously shaped rectangular functions') - _test_unitary_rfft_rect(a=0.5) - _test_unitary_rfft_rect(a=2.0) - _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512) - _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024) - - - def _gaussian(a, t): - return _numpy.exp(-a * t**2) - - - def _test_unitary_rfft_gaussian( - a=1.0, time_shift=5.0, samp_freq=25.6, samples=256): - "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)" - samp_freq = _numpy.float(samp_freq) - a = _numpy.float(a) - - x = _numpy.zeros((samples,), dtype=_numpy.float) - dt = 1.0 / samp_freq - for i in range(samples): - t = i * dt - x[i] = _gaussian(a, (t - time_shift)) - freq_axis, X = unitary_rfft(x, samp_freq) - #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X) - - # remove the phase due to our time shift - j = _numpy.complex(0.0, 1.0) # sqrt(-1) - for i in range(len(freq_axis)): - f = freq_axis[i] - inverse_phase_shift = _numpy.exp(j * 2.0 * _numpy.pi * time_shift * f) - X[i] *= inverse_phase_shift - - expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) - for i in range(len(freq_axis)): - f = freq_axis[i] - # see Wikipedia, or do the integral yourself. - expected[i] = _numpy.sqrt(_numpy.pi / a) * _gaussian( - 1.0 / a, _numpy.pi * f) - - if TEST_PLOTS: - figure = _pyplot.figure() - time_axes = figure.add_subplot(2, 1, 1) - time_axes.plot(_numpy.arange(0, dt * samples, dt), x) - time_axes.set_title('time series') - freq_axes = figure.add_subplot(2, 1, 2) - freq_axes.plot(freq_axis, X.real, 'r.') - freq_axes.plot(freq_axis, X.imag, 'g.') - freq_axes.plot(freq_axis, expected, 'b-') - freq_axes.set_title('freq series') - - - def _test_unitary_rfft_gaussian_suite(): - print("Test unitary FFTs on variously shaped gaussian functions") - _test_unitary_rfft_gaussian(a=0.5) - _test_unitary_rfft_gaussian(a=2.0) - _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512) - _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024) - - def power_spectrum(data, freq=1.0): - """Compute the power spectrum of DATA taken with a sampling frequency FREQ. - - DATA must be real (not complex). - Returns a tuple of two arrays, (freq_axis, power), suitable for plotting. - If the number of samples in data is not an integer power of two, + """Compute the power spectrum of the time series `data`. + + Parameters + ---------- + data : iterable + Real (not complex) data taken with a sampling frequency `freq`. + freq : real + Sampling frequency. + + Returns + ------- + freq_axis,power : numpy.ndarray + Arrays ready for plotting. + + Notes + ----- + If the number of samples in `data` is not an integer power of two, the FFT ignores some of the later points. + + See Also + -------- + unitary_power_spectrum,avg_power_spectrum """ nsamps = floor_pow_of_two(len(data)) - - freq_axis = linspace(0, freq/2, nsamps/2+1) + + freq_axis = _numpy.linspace(0, freq / 2, nsamps / 2 + 1) # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real. # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination. # See Numerical Recipies for a details. - trans = rfft(data[0:nsamps]) - power = trans * trans.conj() # We want the square of the amplitude. + trans = _numpy.fft.rfft(data[0:nsamps]) + power = (trans * trans.conj()).real # we want the square of the amplitude return (freq_axis, power) + def unitary_power_spectrum(data, freq=1.0): + """Compute the unitary power spectrum of the time series `data`. + + See Also + -------- + power_spectrum,unitary_avg_power_spectrum + """ freq_axis,power = power_spectrum(data, freq) - # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498) + # One sided power spectral density, so 2|H(f)|**2 + # (see NR 2nd edition 12.0.14, p498) # # numpy normalizes with 1/N on the inverse transform ifft, # so we should normalize the freq-space representation with 1/sqrt(N). @@@ -340,260 -242,98 +229,104 @@@ return (freq_axis, power) + - def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024): - x = _numpy.zeros((samples,), dtype=_numpy.float) - samp_freq = _numpy.float(samp_freq) - for i in range(samples): - x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq) - freq_axis, power = unitary_power_spectrum(x, samp_freq) - imax = _numpy.argmax(power) - - expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) - df = samp_freq / _numpy.float(samples) # df = 1/T, where T = total_time - i = int(sin_freq / df) - # average power per unit time is - # P = - # average value of sin(t)**2 = 0.5 (b/c sin**2+cos**2 == 1) - # so average value of (int sin(t)**2 dt) per unit time is 0.5 - # P = 0.5 - # we spread that power over a frequency bin of width df, sp - # P(f0) = 0.5/df - # where f0 is the sin's frequency - # - # or: - # FFT of sin(2*pi*t*f0) gives - # X(f) = 0.5 i (delta(f-f0) - delta(f-f0)), - # (area under x(t) = 0, area under X(f) = 0) - # so one sided power spectral density (PSD) per unit time is - # P(f) = 2 |X(f)|**2 / T - # = 2 * |0.5 delta(f-f0)|**2 / T - # = 0.5 * |delta(f-f0)|**2 / T - # but we're discrete and want the integral of the 'delta' to be 1, - # so 'delta'*df = 1 --> 'delta' = 1/df, and - # P(f) = 0.5 / (df**2 * T) - # = 0.5 / df (T = 1/df) - expected[i] = 0.5 / df - - print('The power should be a peak at {} Hz of {} ({}, {})'.format( - sin_freq, expected[i], freq_axis[imax], power[imax])) - Pexp = P = 0 - for i in range(len(freq_axis)): - Pexp += expected[i] * df - P += power[i] * df - print('The total power should be {} ({})'.format(Pexp, P)) - - if TEST_PLOTS: - figure = _pyplot.figure() - time_axes = figure.add_subplot(2, 1, 1) - time_axes.plot( - _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-') - time_axes.set_title('time series') - freq_axes = figure.add_subplot(2, 1, 2) - freq_axes.plot(freq_axis, power, 'r.') - freq_axes.plot(freq_axis, expected, 'b-') - freq_axes.set_title( - '{} samples of sin at {} Hz'.format(samples, sin_freq)) - - - def _test_unitary_power_spectrum_sin_suite(): - print('Test unitary power spectrums on variously shaped sin functions') - _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024) - _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048) - _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098) - _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024) - _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048) - # with some irrational numbers, to check that I'm not getting lucky - _test_unitary_power_spectrum_sin( - sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024) - # test with non-integer number of periods - _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256) - - - def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256): - x = _numpy.zeros((samples,), dtype=_numpy.float) - samp_freq = _numpy.float(samp_freq) - x[0] = amp - freq_axis, power = unitary_power_spectrum(x, samp_freq) - - # power = = (amp)**2 * dt/T - # we spread that power over the entire freq_axis [0,fN], so - # P(f) = (amp)**2 dt / (T fN) - # where - # dt = 1/samp_freq (sample period) - # T = samples/samp_freq (total time of data aquisition) - # fN = 0.5 samp_freq (Nyquist frequency) - # so - # P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq) - # = 2 amp**2 / (samp_freq*samples) - expected_amp = 2.0 * amp**2 / (samp_freq * samples) - expected = _numpy.ones( - (len(freq_axis),), dtype=_numpy.float) * expected_amp - - print('The power should be flat at y = {} ({})'.format( - expected_amp, power[0])) - - if TEST_PLOTS: - figure = _pyplot.figure() - time_axes = figure.add_subplot(2, 1, 1) - time_axes.plot( - _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-') - time_axes.set_title('time series') - freq_axes = figure.add_subplot(2, 1, 2) - freq_axes.plot(freq_axis, power, 'r.') - freq_axes.plot(freq_axis, expected, 'b-') - freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp)) - - - def _test_unitary_power_spectrum_delta_suite(): - print('Test unitary power spectrums on various delta functions') - _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024) - _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048) - # expected = 2*computed - _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048) - # expected = 0.5*computed - _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048) - _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024) - _test_unitary_power_spectrum_delta( - amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024) - - - def _gaussian2(area, mean, std, t): - "Integral over all time = area (i.e. normalized for area=1)" - return area / (std * _numpy.sqrt(2.0 * _numpy.pi)) * _numpy.exp( - -0.5 * ((t-mean)/std)**2) - - - def _test_unitary_power_spectrum_gaussian( - area=2.5, mean=5, std=1, samp_freq=10.24, samples=512): - "Test unitary_power_spectrum() on the gaussian function" - x = _numpy.zeros((samples,), dtype=_numpy.float) - mean = _numpy.float(mean) - for i in range(samples): - t = i / _numpy.float(samp_freq) - x[i] = _gaussian2(area, mean, std, t) - freq_axis, power = unitary_power_spectrum(x, samp_freq) - - # generate the predicted curve - # by comparing our _gaussian2() form to _gaussian(), - # we see that the Fourier transform of x(t) has parameters: - # std' = 1/(2 pi std) (references declaring std' = 1/std are - # converting to angular frequency, - # not frequency like we are) - # area' = area/[std sqrt(2*pi)] (plugging into FT of _gaussian() above) - # mean' = 0 (changing the mean in the time-domain just - # changes the phase in the freq-domain) - # So our power spectral density per unit time is given by - # P(f) = 2 |X(f)|**2 / T - # Where - # T = samples/samp_freq (total time of data aquisition) - mean = 0.0 - area = area / (std * _numpy.sqrt(2.0 * _numpy.pi)) - std = 1.0 / (2.0 * _numpy.pi * std) - expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) - # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1]) - df = _numpy.float(samp_freq) / samples - for i in range(len(freq_axis)): - f = i * df - gaus = _gaussian2(area, mean, std, f) - expected[i] = 2.0 * gaus**2 * samp_freq / samples - print(('The power should be a half-gaussian, ' - 'with a peak at 0 Hz with amplitude {} ({})').format( - expected[0], power[0])) - - if TEST_PLOTS: - figure = _pyplot.figure() - time_axes = figure.add_subplot(2, 1, 1) - time_axes.plot( - _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-') - time_axes.set_title('time series') - freq_axes = figure.add_subplot(2, 1, 2) - freq_axes.plot(freq_axis, power, 'r.') - freq_axes.plot(freq_axis, expected, 'b-') - freq_axes.set_title('freq series') - - - def _test_unitary_power_spectrum_gaussian_suite(): - print('Test unitary power spectrums on various gaussian functions') - _test_unitary_power_spectrum_gaussian( - area=1, std=1, samp_freq=10.0, samples=1024) - _test_unitary_power_spectrum_gaussian( - area=1, std=2, samp_freq=10.0, samples=1024) - _test_unitary_power_spectrum_gaussian( - area=1, std=1, samp_freq=10.0, samples=2048) - _test_unitary_power_spectrum_gaussian( - area=1, std=1, samp_freq=20.0, samples=2048) - _test_unitary_power_spectrum_gaussian( - area=3, std=1, samp_freq=10.0, samples=1024) - _test_unitary_power_spectrum_gaussian( - area=_numpy.pi, std=_numpy.sqrt(2), samp_freq=_numpy.exp(1), - samples=1024) + def window_hann(length): + r"""Returns a Hann window array with length entries + Notes + ----- + The Hann window with length :math:`L` is defined as - def window_hann(length): - "Returns a Hann window array with length entries" + .. math:: w_i = \frac{1}{2} (1-\cos(2\pi i/L)) + """ - win = zeros((length,), dtype=float) + win = _numpy.zeros((length,), dtype=_numpy.float) for i in range(length): - win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length))) + win[i] = 0.5 * ( + 1.0 - _numpy.cos(2.0 * _numpy.pi * _numpy.float(i) / (length))) # avg value of cos over a period is 0 # so average height of Hann window is 0.5 return win + def avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann): - """Compute the avgerage power spectrum of DATA. - - Taken with a sampling frequency FREQ. - - DATA must be real (not complex) by breaking DATA into chunks. - The chunks may or may not be overlapping (by setting OVERLAP). - The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed, - and the resulting spectra are averaged together. - See NR 13.4 for rational. - - Returns a tuple of two arrays, (freq_axis, power), suitable for plotting. - CHUNK_SIZE should really be a power of 2. - If the number of samples in DATA is not an integer power of CHUNK_SIZE, - the FFT ignores some of the later points. + """Compute the avgerage power spectrum of `data`. + + Parameters + ---------- + data : iterable + Real (not complex) data taken with a sampling frequency `freq`. + freq : real + Sampling frequency. + chunk_size : int + Number of samples per chunk. Use a power of two. + overlap: {True,False} + If `True`, each chunk overlaps the previous chunk by half its + length. Otherwise, the chunks are end-to-end, and not + overlapping. + window: iterable + Weights used to "smooth" the chunks, there is a whole science + behind windowing, but if you're not trying to squeeze every + drop of information out of your data, you'll be OK with the + default Hann window. + + Returns + ------- + freq_axis,power : numpy.ndarray + Arrays ready for plotting. + + Notes + ----- + The average power spectrum is computed by breaking `data` into + chunks of length `chunk_size`. These chunks are transformed + individually into frequency space and then averaged together. + + See Numerical Recipes 2 section 13.4 for a good introduction to + the theory. + + If the number of samples in `data` is not a multiple of + `chunk_size`, we ignore the extra points. """ - assert chunk_size == floor_pow_of_two(chunk_size), \ - "chunk_size %d should be a power of 2" % chunk_size + if chunk_size != floor_pow_of_two(chunk_size): + raise ValueError( + 'chunk_size {} should be a power of 2'.format(chunk_size)) - nchunks = len(data)/chunk_size # integer division = implicit floor + nchunks = len(data) // chunk_size # integer division = implicit floor if overlap: - chunk_step = chunk_size/2 + chunk_step = chunk_size / 2 else: chunk_step = chunk_size - - win = window(chunk_size) # generate a window of the appropriate size - freq_axis = linspace(0, freq/2, chunk_size/2+1) + + win = window(chunk_size) # generate a window of the appropriate size + freq_axis = _numpy.linspace(0, freq / 2, chunk_size / 2 + 1) # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real. # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination. # See Numerical Recipies for a details. - power = zeros((chunk_size/2+1,), dtype=float) + power = _numpy.zeros((chunk_size / 2 + 1, ), dtype=_numpy.float) for i in range(nchunks): - starti = i*chunk_step - stopi = starti+chunk_size - fft_chunk = rfft(data[starti:stopi]*win) - p_chunk = fft_chunk * fft_chunk.conj() - power += p_chunk.astype(float) - power /= float(nchunks) + starti = i * chunk_step + stopi = starti + chunk_size + fft_chunk = _numpy.fft.rfft(data[starti:stopi] * win) + p_chunk = (fft_chunk * fft_chunk.conj()).real + power += p_chunk.astype(_numpy.float) + power /= _numpy.float(nchunks) return (freq_axis, power) + def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann): - """Compute the average power spectrum, preserving normalization + """Compute the unitary average power spectrum of `data`. + + See Also + -------- + avg_power_spectrum,unitary_power_spectrum """ - freq_axis,power = avg_power_spectrum(data, freq, chunk_size, - overlap, window) - # 2.0 / (freq * chunk_size) |rfft()|**2 --> unitary_power_spectrum - power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum() - # * 8/3 to remove power from windowing + freq_axis,power = avg_power_spectrum( + data, freq, chunk_size, overlap, window) + # 2.0 / (freq * chunk_size) |rfft()|**2 --> unitary_power_spectrum + # see unitary_power_spectrum() + power *= 2.0 / (freq * _numpy.float(chunk_size)) * 8.0 / 3 + # * 8/3 to remove power from windowing # <[x(t)*w(t)]**2> = ~= * # where the ~= is because the frequency of x(t) >> the frequency of w(t). # So our calulated power has and extra in it. @@@ -606,90 -344,421 +339,480 @@@ return (freq_axis, power) - def _test_unitary_avg_power_spectrum_sin( - sin_freq=10, samp_freq=512, samples=1024, chunk_size=512, overlap=True, - window=window_hann): - "Test unitary_avg_power_spectrum() on the sine function" - x = _numpy.zeros((samples,), dtype=_numpy.float) - samp_freq = _numpy.float(samp_freq) - for i in range(samples): - x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq) - freq_axis, power = unitary_avg_power_spectrum( - x, samp_freq, chunk_size, overlap, window) - imax = _numpy.argmax(power) - - expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) - df = samp_freq / _numpy.float(chunk_size) # df = 1/T, where T = total_time - i = int(sin_freq / df) - expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin() - - print('The power should peak at {} Hz of {} ({}, {})'.format( - sin_freq, expected[i], freq_axis[imax], power[imax])) - Pexp = P = 0 - for i in range(len(freq_axis)): - Pexp += expected[i] * df - P += power[i] * df - print('The total power should be {} ({})'.format(Pexp, P)) - - if TEST_PLOTS: - figure = _pyplot.figure() - time_axes = figure.add_subplot(2, 1, 1) - time_axes.plot( - _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-') - time_axes.set_title('time series') - freq_axes = figure.add_subplot(2, 1, 2) - freq_axes.plot(freq_axis, power, 'r.') - freq_axes.plot(freq_axis, expected, 'b-') - freq_axes.set_title( - '{} samples of sin at {} Hz'.format(samples, sin_freq)) - - - def _test_unitary_avg_power_spectrum_sin_suite(): - print('Test unitary avg power spectrums on variously shaped sin functions') - _test_unitary_avg_power_spectrum_sin( - sin_freq=5, samp_freq=512, samples=1024) - _test_unitary_avg_power_spectrum_sin( - sin_freq=5, samp_freq=512, samples=2048) - _test_unitary_avg_power_spectrum_sin( - sin_freq=5, samp_freq=512, samples=4098) - _test_unitary_avg_power_spectrum_sin( - sin_freq=17, samp_freq=512, samples=1024) - _test_unitary_avg_power_spectrum_sin( - sin_freq=5, samp_freq=1024, samples=2048) - # test long wavelenth sin, so be closer to window frequency - _test_unitary_avg_power_spectrum_sin( - sin_freq=1, samp_freq=1024, samples=2048) - # finally, with some irrational numbers, to check that I'm not - # getting lucky - _test_unitary_avg_power_spectrum_sin( - sin_freq=_numpy.pi, - samp_freq=100 * _numpy.exp(1), - samples=1024) - - - def test(): - _test_rfft_suite() - _test_unitary_rfft_parsevals_suite() - _test_unitary_rfft_rect_suite() - _test_unitary_rfft_gaussian_suite() - _test_unitary_power_spectrum_sin_suite() - _test_unitary_power_spectrum_delta_suite() - _test_unitary_power_spectrum_gaussian_suite() - _test_unitary_avg_power_spectrum_sin_suite() - - - if __name__ == '__main__': - from optparse import OptionParser - - p = OptionParser('%prog [options]', epilog='Run FFT-tools unit tests.') - p.add_option('-p', '--plot', dest='plot', action='store_true', - help='Display time- and freq-space plots of test transforms.') - - options,args = p.parse_args() -- - if options.plot: - import matplotlib.pyplot as _pyplot - TEST_PLOTS = True - test() - if options.plot: - _pyplot.show() -class TestRFFT (unittest.TestCase): ++class TestRFFT (_unittest.TestCase): + r"""Ensure Numpy's FFT algorithm acts as expected. + + Notes + ----- + The expected return values are [#dft]_: + + .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n} + + .. [#dft] See the *Background information* section of :mod:`numpy.fft`. + """ + def run_rfft(self, xs, Xs): - i = complex(0,1) ++ i = _numpy.complex(0, 1) + n = len(xs) + Xa = [] + for k in range(n): - Xa.append(sum([x*exp(-2*pi*i*m*k/n) for x,m in zip(xs,range(n))])) ++ Xa.append(sum([x * _numpy.exp(-2 * _numpy.pi * i * m * k / n) ++ for x,m in zip(xs, range(n))])) + if k < len(Xs): - assert (Xs[k]-Xa[k])/abs(Xa[k]) < 1e-6, \ - "rfft mismatch on element %d: %g != %g, relative error %g" \ - % (k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k])) ++ if (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]) >= 1e-6: ++ raise ValueError( ++ ('rfft mismatch on element {}: {} != {}, ' ++ 'relative error {}').format( ++ k, Xs[k], Xa[k], ++ (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]))) + # Which should satisfy the discrete form of Parseval's theorem + # n-1 n-1 - # SUM |x_m|^2 = 1/n SUM |X_k|^2. ++ # SUM |x_m|^2 = 1/n SUM |X_k|^2. + # m=0 k=0 - timeSum = sum([abs(x)**2 for x in xs]) - freqSum = sum([abs(X)**2 for X in Xa]) - assert abs(freqSum/float(n) - timeSum)/timeSum < 1e-6, \ - "Mismatch on Parseval's, %g != 1/%d * %g" % (timeSum, n, freqSum) ++ timeSum = sum([_numpy.abs(x)**2 for x in xs]) ++ freqSum = sum([_numpy.abs(X)**2 for X in Xa]) ++ if _numpy.abs(freqSum / _numpy.float(n) - timeSum) / timeSum >= 1e-6: ++ raise ValueError( ++ "Mismatch on Parseval's, {} != 1/{} * {}".format( ++ timeSum, n, freqSum)) + + def test_rfft(self): - xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1] - self.run_rfft(xs, rfft(xs)) ++ xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1] ++ self.run_rfft(xs, _numpy.fft.rfft(xs)) + -class TestUnitaryRFFT (unittest.TestCase): ++ ++class TestUnitaryRFFT (_unittest.TestCase): + """Verify `unitary_rfft`. + """ + def run_unitary_rfft_parsevals(self, xs, freq, freqs, Xs): + """Check the discretized integral form of Parseval's theorem + + Notes + ----- + Which is: + + .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df + """ - dt = 1.0/freq - df = freqs[1]-freqs[0] - assert (df - 1/(len(xs)*dt))/df < 1e-6, \ - "Mismatch in spacing, %g != 1/(%d*%g)" % (df, len(xs), dt) ++ dt = 1.0 / freq ++ df = freqs[1] - freqs[0] ++ if (df - 1 / (len(xs) * dt)) / df >= 1e-6: ++ raise ValueError( ++ 'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt)) + Xa = list(Xs) - for k in range(len(Xs)-1,1,-1): ++ for k in range(len(Xs) - 1, 1, -1): + Xa.append(Xa[k]) - assert len(xs) == len(Xa), "Length mismatch %d != %d" % (len(xs), len(Xa)) - lhs = sum([abs(x)**2 for x in xs]) * dt - rhs = sum([abs(X)**2 for X in Xa]) * df - assert abs(lhs - rhs)/lhs < 1e-4, "Mismatch on Parseval's, %g != %g" \ - % (lhs, rhs) - ++ if len(xs) != len(Xa): ++ raise ValueError( ++ 'Length mismatch {} != {}'.format(len(xs), len(Xa))) ++ lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt ++ rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df ++ if _numpy.abs(lhs - rhs) / lhs >= 1e-4: ++ raise ValueError( ++ "Mismatch on Parseval's, {} != {}".format(lhs, rhs)) ++ + def test_unitary_rfft_parsevals(self): + "Test unitary rfft on Parseval's theorem" - xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1] - dt = pi - freqs,Xs = unitary_rfft(xs, 1.0/dt) - self.run_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs) - ++ xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1] ++ dt = _numpy.pi ++ freqs,Xs = unitary_rfft(xs, 1.0 / dt) ++ self.run_unitary_rfft_parsevals(xs, 1.0 / dt, freqs, Xs) ++ + def rect(self, t): + r"""Rectangle function. + + Notes + ----- + + .. math:: + + \rect(t) = \begin{cases} + 1& \text{if $|t| < 0.5$}, \\ + 0& \text{if $|t| \ge 0.5$}. + \end{cases} + """ - if abs(t) < 0.5: ++ if _numpy.abs(t) < 0.5: + return 1 + else: + return 0 - ++ + def run_unitary_rfft_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6, + samples=256): + r"""Test `unitary_rttf` on known function `rect(at)`. + + Notes + ----- + Analytic result: + + .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a) + """ - samp_freq = float(samp_freq) - a = float(a) - - x = zeros((samples,), dtype=float) - dt = 1.0/samp_freq ++ samp_freq = _numpy.float(samp_freq) ++ a = _numpy.float(a) ++ ++ x = _numpy.zeros((samples,), dtype=_numpy.float) ++ dt = 1.0 / samp_freq + for i in range(samples): - t = i*dt - x[i] = self.rect(a*(t-time_shift)) - freq_axis, X = unitary_rfft(x, samp_freq) - #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X) - ++ t = i * dt ++ x[i] = self.rect(a * (t - time_shift)) ++ freq_axis,X = unitary_rfft(x, samp_freq) ++ + # remove the phase due to our time shift - j = complex(0.0,1.0) # sqrt(-1) ++ j = _numpy.complex(0.0, 1.0) # sqrt(-1) + for i in range(len(freq_axis)): + f = freq_axis[i] - inverse_phase_shift = exp(j*2.0*pi*time_shift*f) ++ inverse_phase_shift = _numpy.exp( ++ j * 2.0 * _numpy.pi * time_shift * f) + X[i] *= inverse_phase_shift - - expected = zeros((len(freq_axis),), dtype=float) ++ ++ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) + # normalized sinc(x) = sin(pi x)/(pi x) + # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi - assert sinc(0.5) == 2.0/pi, "abnormal sinc()" ++ if _numpy.sinc(0.5) != 2.0 / _numpy.pi: ++ raise ValueError('abnormal sinc()') + for i in range(len(freq_axis)): + f = freq_axis[i] - expected[i] = 1.0/abs(a) * sinc(f/a) - ++ expected[i] = 1.0 / _numpy.abs(a) * _numpy.sinc(f / a) ++ + if TEST_PLOTS: - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, dt*samples, dt), x) - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, X.real, 'r.') - pylab.plot(freq_axis, X.imag, 'g.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('freq series') - ++ figure = _pyplot.figure() ++ time_axes = figure.add_subplot(2, 1, 1) ++ time_axes.plot(_numpy.arange(0, dt * samples, dt), x) ++ time_axes.set_title('time series') ++ freq_axes = figure.add_subplot(2, 1, 2) ++ freq_axes.plot(freq_axis, X.real, 'r.') ++ freq_axes.plot(freq_axis, X.imag, 'g.') ++ freq_axes.plot(freq_axis, expected, 'b-') ++ freq_axes.set_title('freq series') ++ + def test_unitary_rfft_rect(self): + "Test unitary FFTs on variously shaped rectangular functions." + self.run_unitary_rfft_rect(a=0.5) + self.run_unitary_rfft_rect(a=2.0) + self.run_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512) + self.run_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024) - ++ + def gaussian(self, a, t): + r"""Gaussian function. + + Notes + ----- + + .. math:: \gaussian(a,t) = \exp^{-at^2} + """ - return exp(-a * t**2) - ++ return _numpy.exp(-a * t**2) ++ + def run_unitary_rfft_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6, + samples=256): + r"""Test `unitary_rttf` on known function `gaussian(a,t)`. + + Notes + ----- + Analytic result: + + .. math:: + + \rfft(\gaussian(a,t)) = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f) + """ - samp_freq = float(samp_freq) - a = float(a) - - x = zeros((samples,), dtype=float) - dt = 1.0/samp_freq ++ samp_freq = _numpy.float(samp_freq) ++ a = _numpy.float(a) ++ ++ x = _numpy.zeros((samples,), dtype=_numpy.float) ++ dt = 1.0 / samp_freq + for i in range(samples): - t = i*dt - x[i] = self.gaussian(a, (t-time_shift)) - freq_axis, X = unitary_rfft(x, samp_freq) - #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X) - ++ t = i * dt ++ x[i] = self.gaussian(a, (t - time_shift)) ++ freq_axis,X = unitary_rfft(x, samp_freq) ++ + # remove the phase due to our time shift - j = complex(0.0,1.0) # sqrt(-1) ++ j = _numpy.complex(0.0, 1.0) # sqrt(-1) + for i in range(len(freq_axis)): + f = freq_axis[i] - inverse_phase_shift = exp(j*2.0*pi*time_shift*f) ++ inverse_phase_shift = _numpy.exp( ++ j * 2.0 * _numpy.pi * time_shift * f) + X[i] *= inverse_phase_shift - - expected = zeros((len(freq_axis),), dtype=float) ++ ++ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) + for i in range(len(freq_axis)): + f = freq_axis[i] - expected[i] = sqrt(pi/a) * self.gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself. - ++ # see Wikipedia, or do the integral yourself. ++ expected[i] = _numpy.sqrt(_numpy.pi / a) * self.gaussian( ++ 1.0 / a, _numpy.pi * f) ++ + if TEST_PLOTS: - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, dt*samples, dt), x) - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, X.real, 'r.') - pylab.plot(freq_axis, X.imag, 'g.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('freq series') - ++ figure = _pyplot.figure() ++ time_axes = figure.add_subplot(2, 1, 1) ++ time_axes.plot(_numpy.arange(0, dt * samples, dt), x) ++ time_axes.set_title('time series') ++ freq_axes = figure.add_subplot(2, 1, 2) ++ freq_axes.plot(freq_axis, X.real, 'r.') ++ freq_axes.plot(freq_axis, X.imag, 'g.') ++ freq_axes.plot(freq_axis, expected, 'b-') ++ freq_axes.set_title('freq series') ++ + def test_unitary_rfft_gaussian(self): + "Test unitary FFTs on variously shaped gaussian functions." + self.run_unitary_rfft_gaussian(a=0.5) + self.run_unitary_rfft_gaussian(a=2.0) + self.run_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512) + self.run_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024) + -class TestUnitaryPowerSpectrum (unittest.TestCase): ++ ++class TestUnitaryPowerSpectrum (_unittest.TestCase): + def run_unitary_power_spectrum_sin(self, sin_freq=10, samp_freq=512, + samples=1024): - x = zeros((samples,), dtype=float) - samp_freq = float(samp_freq) ++ x = _numpy.zeros((samples,), dtype=_numpy.float) ++ samp_freq = _numpy.float(samp_freq) + for i in range(samples): - x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq) - freq_axis, power = unitary_power_spectrum(x, samp_freq) - imax = argmax(power) - - expected = zeros((len(freq_axis),), dtype=float) - df = samp_freq/float(samples) # df = 1/T, where T = total_time - i = int(sin_freq/df) - # average power per unit time is ++ x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq) ++ freq_axis,power = unitary_power_spectrum(x, samp_freq) ++ imax = _numpy.argmax(power) ++ ++ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) ++ # df = 1/T, where T = total_time ++ df = samp_freq / _numpy.float(samples) ++ i = int(sin_freq / df) ++ # average power per unit time is + # P = + # average value of sin(t)**2 = 0.5 (b/c sin**2+cos**2 == 1) + # so average value of (int sin(t)**2 dt) per unit time is 0.5 + # P = 0.5 + # we spread that power over a frequency bin of width df, sp + # P(f0) = 0.5/df + # where f0 is the sin's frequency + # + # or: + # FFT of sin(2*pi*t*f0) gives + # X(f) = 0.5 i (delta(f-f0) - delta(f-f0)), + # (area under x(t) = 0, area under X(f) = 0) + # so one sided power spectral density (PSD) per unit time is + # P(f) = 2 |X(f)|**2 / T + # = 2 * |0.5 delta(f-f0)|**2 / T + # = 0.5 * |delta(f-f0)|**2 / T - # but we're discrete and want the integral of the 'delta' to be 1, ++ # but we're discrete and want the integral of the 'delta' to be 1, + # so 'delta'*df = 1 --> 'delta' = 1/df, and + # P(f) = 0.5 / (df**2 * T) + # = 0.5 / df (T = 1/df) + expected[i] = 0.5 / df - - print "The power should be a peak at %g Hz of %g (%g, %g)" % \ - (sin_freq, expected[i], freq_axis[imax], power[imax]) - Pexp = 0 - P = 0 ++ ++ print('The power should be a peak at {} Hz of {} ({}, {})'.format( ++ sin_freq, expected[i], freq_axis[imax], power[imax])) ++ Pexp = P = 0 + for i in range(len(freq_axis)): - Pexp += expected[i] *df - P += power[i] * df - print " The total power should be %g (%g)" % (Pexp, P) - ++ Pexp += expected[i] * df ++ P += power[i] * df ++ print('The total power should be {} ({})'.format(Pexp, P)) ++ + if TEST_PLOTS: - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, power, 'r.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq)) - ++ figure = _pyplot.figure() ++ time_axes = figure.add_subplot(2, 1, 1) ++ time_axes.plot( ++ _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-') ++ time_axes.set_title('time series') ++ freq_axes = figure.add_subplot(2, 1, 2) ++ freq_axes.plot(freq_axis, power, 'r.') ++ freq_axes.plot(freq_axis, expected, 'b-') ++ freq_axes.set_title( ++ '{} samples of sin at {} Hz'.format(samples, sin_freq)) ++ ++ + def test_unitary_power_spectrum_sin(self): + "Test unitary power spectrums on variously shaped sin functions" - self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024) - self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048) - self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098) - self.run_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024) - self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048) - # finally, with some irrational numbers, to check that I'm not getting lucky - self.run_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024) ++ self.run_unitary_power_spectrum_sin( ++ sin_freq=5, samp_freq=512, samples=1024) ++ self.run_unitary_power_spectrum_sin( ++ sin_freq=5, samp_freq=512, samples=2048) ++ self.run_unitary_power_spectrum_sin( ++ sin_freq=5, samp_freq=512, samples=4098) ++ self.run_unitary_power_spectrum_sin( ++ sin_freq=7, samp_freq=512, samples=1024) ++ self.run_unitary_power_spectrum_sin( ++ sin_freq=5, samp_freq=1024, samples=2048) ++ # finally, with some irrational numbers, to check that I'm not ++ # getting lucky ++ self.run_unitary_power_spectrum_sin( ++ sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024) + # test with non-integer number of periods - self.run_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256) - ++ self.run_unitary_power_spectrum_sin( ++ sin_freq=5, samp_freq=512, samples=256) ++ + def run_unitary_power_spectrum_delta(self, amp=1, samp_freq=1, + samples=256): + """TODO + """ - x = zeros((samples,), dtype=float) - samp_freq = float(samp_freq) ++ x = _numpy.zeros((samples,), dtype=_numpy.float) ++ samp_freq = _numpy.float(samp_freq) + x[0] = amp - freq_axis, power = unitary_power_spectrum(x, samp_freq) - ++ freq_axis,power = unitary_power_spectrum(x, samp_freq) ++ + # power = = (amp)**2 * dt/T + # we spread that power over the entire freq_axis [0,fN], so + # P(f) = (amp)**2 dt / (T fN) + # where + # dt = 1/samp_freq (sample period) + # T = samples/samp_freq (total time of data aquisition) + # fN = 0.5 samp_freq (Nyquist frequency) + # so + # P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq) + # = 2 amp**2 / (samp_freq*samples) + expected_amp = 2.0 * amp**2 / (samp_freq * samples) - expected = ones((len(freq_axis),), dtype=float) * expected_amp - - print "The power should be flat at y = %g (%g)" % (expected_amp, power[0]) - ++ expected = _numpy.ones( ++ (len(freq_axis),), dtype=_numpy.float) * expected_amp ++ ++ print('The power should be flat at y = {} ({})'.format( ++ expected_amp, power[0])) ++ + if TEST_PLOTS: - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, power, 'r.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('%g samples of delta amp %g' % (samples, amp)) - - def _test_unitary_power_spectrum_delta(self): ++ figure = _pyplot.figure() ++ time_axes = figure.add_subplot(2, 1, 1) ++ time_axes.plot( ++ _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-') ++ time_axes.set_title('time series') ++ freq_axes = figure.add_subplot(2, 1, 2) ++ freq_axes.plot(freq_axis, power, 'r.') ++ freq_axes.plot(freq_axis, expected, 'b-') ++ freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp)) ++ ++ def test_unitary_power_spectrum_delta(self): + "Test unitary power spectrums on various delta functions" - _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024) - _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048) - _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed - _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed - _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024) - _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024) - ++ self.run_unitary_power_spectrum_delta( ++ amp=1, samp_freq=1.0, samples=1024) ++ self.run_unitary_power_spectrum_delta( ++ amp=1, samp_freq=1.0, samples=2048) ++ # expected = 2*computed ++ self.run_unitary_power_spectrum_delta( ++ amp=1, samp_freq=0.5, samples=2048) ++ # expected = 0.5*computed ++ self.run_unitary_power_spectrum_delta( ++ amp=1, samp_freq=2.0, samples=2048) ++ self.run_unitary_power_spectrum_delta( ++ amp=3, samp_freq=1.0, samples=1024) ++ self.run_unitary_power_spectrum_delta( ++ amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024) ++ + def gaussian(self, area, mean, std, t): + "Integral over all time = area (i.e. normalized for area=1)" - return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2) - ++ return area / (std * _numpy.sqrt(2.0 * _numpy.pi)) * _numpy.exp( ++ -0.5 * ((t-mean)/std)**2) ++ + def run_unitary_power_spectrum_gaussian(self, area=2.5, mean=5, std=1, + samp_freq=10.24 ,samples=512): + """TODO. + """ - x = zeros((samples,), dtype=float) - mean = float(mean) ++ x = _numpy.zeros((samples,), dtype=_numpy.float) ++ mean = _numpy.float(mean) + for i in range(samples): - t = i/float(samp_freq) ++ t = i / _numpy.float(samp_freq) + x[i] = self.gaussian(area, mean, std, t) - freq_axis, power = unitary_power_spectrum(x, samp_freq) - - # generate the predicted curve - # by comparing our self.gaussian() form to _gaussian(), ++ freq_axis,power = unitary_power_spectrum(x, samp_freq) ++ ++ # generate the predicted curve by comparing our ++ # TestUnitaryPowerSpectrum.gaussian() form to ++ # TestUnitaryRFFT.gaussian(), + # we see that the Fourier transform of x(t) has parameters: - # std' = 1/(2 pi std) (references declaring std' = 1/std are converting to angular frequency, not frequency like we are) - # area' = area/[std sqrt(2*pi)] (plugging into FT of _gaussian() above) - # mean' = 0 (changing the mean in the time-domain just changes the phase in the freq-domain) ++ # std' = 1/(2 pi std) (references declaring std' = 1/std are ++ # converting to angular frequency, ++ # not frequency like we are) ++ # area' = area/[std sqrt(2*pi)] (plugging into FT of ++ # TestUnitaryRFFT.gaussian() above) ++ # mean' = 0 (changing the mean in the time-domain just ++ # changes the phase in the freq-domain) + # So our power spectral density per unit time is given by + # P(f) = 2 |X(f)|**2 / T + # Where + # T = samples/samp_freq (total time of data aquisition) + mean = 0.0 - area = area /(std*sqrt(2.0*pi)) - std = 1.0/(2.0*pi*std) - expected = zeros((len(freq_axis),), dtype=float) - df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1]) ++ area = area / (std * _numpy.sqrt(2.0 * _numpy.pi)) ++ std = 1.0 / (2.0 * _numpy.pi * std) ++ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) ++ # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1]) ++ df = _numpy.float(samp_freq) / samples + for i in range(len(freq_axis)): - f = i*df ++ f = i * df + gaus = self.gaussian(area, mean, std, f) - expected[i] = 2.0 * gaus**2 * samp_freq/samples - print "The power should be a half-gaussian, ", - print "with a peak at 0 Hz with amplitude %g (%g)" % (expected[0], power[0]) - ++ expected[i] = 2.0 * gaus**2 * samp_freq / samples ++ print(('The power should be a half-gaussian, ' ++ 'with a peak at 0 Hz with amplitude {} ({})').format( ++ expected[0], power[0])) ++ + if TEST_PLOTS: - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, power, 'r.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('freq series') - ++ figure = _pyplot.figure() ++ time_axes = figure.add_subplot(2, 1, 1) ++ time_axes.plot( ++ _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), ++ x, 'b-') ++ time_axes.set_title('time series') ++ freq_axes = figure.add_subplot(2, 1, 2) ++ freq_axes.plot(freq_axis, power, 'r.') ++ freq_axes.plot(freq_axis, expected, 'b-') ++ freq_axes.set_title('freq series') ++ + def test_unitary_power_spectrum_gaussian(self): + "Test unitary power spectrums on various gaussian functions" - for area in [1,pi]: - for std in [1,sqrt(2)]: - for samp_freq in [10.0, exp(1)]: - for samples in [1024,2048]: ++ for area in [1, _numpy.pi]: ++ for std in [1, _numpy.sqrt(2)]: ++ for samp_freq in [10.0, _numpy.exp(1)]: ++ for samples in [1024, 2048]: + self.run_unitary_power_spectrum_gaussian( + area=area, std=std, samp_freq=samp_freq, + samples=samples) + -class TestUnitaryAvgPowerSpectrum (unittest.TestCase): ++ ++class TestUnitaryAvgPowerSpectrum (_unittest.TestCase): + def run_unitary_avg_power_spectrum_sin(self, sin_freq=10, samp_freq=512, + samples=1024, chunk_size=512, + overlap=True, window=window_hann): + """TODO + """ - x = zeros((samples,), dtype=float) - samp_freq = float(samp_freq) ++ x = _numpy.zeros((samples,), dtype=_numpy.float) ++ samp_freq = _numpy.float(samp_freq) + for i in range(samples): - x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq) - freq_axis, power = unitary_avg_power_spectrum(x, samp_freq, chunk_size, - overlap, window) - imax = argmax(power) - - expected = zeros((len(freq_axis),), dtype=float) - df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time - i = int(sin_freq/df) - expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin() - - print "The power should be a peak at %g Hz of %g (%g, %g)" % \ - (sin_freq, expected[i], freq_axis[imax], power[imax]) - Pexp = 0 - P = 0 ++ x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq) ++ freq_axis,power = unitary_avg_power_spectrum( ++ x, samp_freq, chunk_size, overlap, window) ++ imax = _numpy.argmax(power) ++ ++ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float) ++ # df = 1/T, where T = total_time ++ df = samp_freq / _numpy.float(chunk_size) ++ i = int(sin_freq / df) ++ # see TestUnitaryPowerSpectrum.run_unitary_power_spectrum_sin() ++ expected[i] = 0.5 / df ++ ++ print('The power should peak at {} Hz of {} ({}, {})'.format( ++ sin_freq, expected[i], freq_axis[imax], power[imax])) ++ Pexp = P = 0 + for i in range(len(freq_axis)): + Pexp += expected[i] * df - P += power[i] * df - print " The total power should be %g (%g)" % (Pexp, P) - ++ P += power[i] * df ++ print('The total power should be {} ({})'.format(Pexp, P)) ++ + if TEST_PLOTS: - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, power, 'r.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq)) - ++ figure = _pyplot.figure() ++ time_axes = figure.add_subplot(2, 1, 1) ++ time_axes.plot( ++ _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), ++ x, 'b-') ++ time_axes.set_title('time series') ++ freq_axes = figure.add_subplot(2, 1, 2) ++ freq_axes.plot(freq_axis, power, 'r.') ++ freq_axes.plot(freq_axis, expected, 'b-') ++ freq_axes.set_title( ++ '{} samples of sin at {} Hz'.format(samples, sin_freq)) ++ + def test_unitary_avg_power_spectrum_sin(self): + "Test unitary avg power spectrums on variously shaped sin functions." - self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024) - self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048) - self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098) - self.run_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024) - self.run_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048) ++ self.run_unitary_avg_power_spectrum_sin( ++ sin_freq=5, samp_freq=512, samples=1024) ++ self.run_unitary_avg_power_spectrum_sin( ++ sin_freq=5, samp_freq=512, samples=2048) ++ self.run_unitary_avg_power_spectrum_sin( ++ sin_freq=5, samp_freq=512, samples=4098) ++ self.run_unitary_avg_power_spectrum_sin( ++ sin_freq=17, samp_freq=512, samples=1024) ++ self.run_unitary_avg_power_spectrum_sin( ++ sin_freq=5, samp_freq=1024, samples=2048) + # test long wavelenth sin, so be closer to window frequency - self.run_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048) - # finally, with some irrational numbers, to check that I'm not getting lucky - self.run_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024) ++ self.run_unitary_avg_power_spectrum_sin( ++ sin_freq=1, samp_freq=1024, samples=2048) ++ # finally, with some irrational numbers, to check that I'm not ++ # getting lucky ++ self.run_unitary_avg_power_spectrum_sin( ++ sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)