+class TestRFFT (_unittest.TestCase):
+ r"""Ensure Numpy's FFT algorithm acts as expected.
+
+ Notes
+ -----
+ The expected return values are [#dft]_:
+
+ .. math:: X_k = \sum_{m=0}^{n-1} x_m \exp^{-2\pi imk/n}
+
+ .. [#dft] See the *Background information* section of
+ :py:mod:`numpy.fft`.
+ """
+ def run_rfft(self, xs, Xs):
+ i = _numpy.complex(0, 1)
+ n = len(xs)
+ Xa = []
+ for k in range(n):
+ Xa.append(sum([x * _numpy.exp(-2 * _numpy.pi * i * m * k / n)
+ for x,m in zip(xs, range(n))]))
+ if k < len(Xs):
+ self.assertAlmostEqual(
+ (Xs[k] - Xa[k]) / _numpy.abs(Xa[k]), 0, 6,
+ ('rfft mismatch on element {}: {} != {}, '
+ 'relative error {}').format(
+ k, Xs[k], Xa[k], (Xs[k] - Xa[k]) / _numpy.abs(Xa[k])))
+ # Which should satisfy the discrete form of Parseval's theorem
+ # n-1 n-1
+ # SUM |x_m|^2 = 1/n SUM |X_k|^2.
+ # m=0 k=0
+ timeSum = sum([_numpy.abs(x)**2 for x in xs])
+ freqSum = sum([_numpy.abs(X)**2 for X in Xa])
+ self.assertAlmostEqual(
+ _numpy.abs(freqSum / _numpy.float(n) - timeSum) / timeSum, 0, 6,
+ "Mismatch on Parseval's, {} != 1/{} * {}".format(
+ timeSum, n, freqSum))
+
+ def test_rfft(self):
+ "Test NumPy's builtin :py:func:`numpy.fft.rfft`"
+ xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
+ self.run_rfft(xs, _numpy.fft.rfft(xs))
+
+
+class TestUnitaryRFFT (_unittest.TestCase):
+ """Verify :py:func:`unitary_rfft`.
+ """
+ def run_parsevals(self, xs, freq, freqs, Xs):
+ """Check the discretized integral form of Parseval's theorem
+
+ Notes
+ -----
+ Which is:
+
+ .. math:: \sum_{m=0}^{n-1} |x_m|^2 dt = \sum_{k=0}^{n-1} |X_k|^2 df
+ """
+ dt = 1.0 / freq
+ df = freqs[1] - freqs[0]
+ self.assertAlmostEqual(
+ (df - 1 / (len(xs) * dt)) / df, 0, 6,
+ 'Mismatch in spacing, {} != 1/({}*{})'.format(df, len(xs), dt))
+ Xa = list(Xs)
+ for k in range(len(Xs) - 1, 1, -1):
+ Xa.append(Xa[k])
+ self.assertEqual(len(xs), len(Xa))
+ lhs = sum([_numpy.abs(x)**2 for x in xs]) * dt
+ rhs = sum([_numpy.abs(X)**2 for X in Xa]) * df
+ self.assertAlmostEqual(
+ _numpy.abs(lhs - rhs) / lhs, 0, 3,
+ "Mismatch on Parseval's, {} != {}".format(lhs, rhs))
+
+ def test_parsevals(self):
+ "Test unitary rfft on Parseval's theorem"
+ xs = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1]
+ dt = _numpy.pi
+ freqs,Xs = unitary_rfft(xs, 1.0 / dt)
+ self.run_parsevals(xs, 1.0 / dt, freqs, Xs)
+
+ def rect(self, t):
+ r"""Rectangle function.
+
+ Notes
+ -----
+
+ .. math::
+
+ \rect(t) = \begin{cases}
+ 1& \text{if $|t| < 0.5$}, \\
+ 0& \text{if $|t| \ge 0.5$}.
+ \end{cases}
+ """
+ if _numpy.abs(t) < 0.5:
+ return 1
+ else:
+ return 0
+
+ def run_rect(self, a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
+ r"""Test :py:func:`unitary_rfft` on known function :py:meth:`rect`.
+
+ Notes
+ -----
+ Analytic result:
+
+ .. math:: \rfft(\rect(at)) = 1/|a|\cdot\sinc(f/a)
+ """
+ samp_freq = _numpy.float(samp_freq)
+ a = _numpy.float(a)
+
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
+ dt = 1.0 / samp_freq
+ for i in range(samples):
+ t = i * dt
+ x[i] = self.rect(a * (t - time_shift))
+ freq_axis,X = unitary_rfft(x, samp_freq)
+
+ # remove the phase due to our time shift
+ j = _numpy.complex(0.0, 1.0) # sqrt(-1)
+ for i in range(len(freq_axis)):
+ f = freq_axis[i]
+ inverse_phase_shift = _numpy.exp(
+ j * 2.0 * _numpy.pi * time_shift * f)
+ X[i] *= inverse_phase_shift
+
+ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
+ # normalized sinc(x) = sin(pi x)/(pi x)
+ # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi
+ self.assertEqual(_numpy.sinc(0.5), 2.0 / _numpy.pi)
+ for i in range(len(freq_axis)):
+ f = freq_axis[i]
+ expected[i] = 1.0 / _numpy.abs(a) * _numpy.sinc(f / a)
+
+ if TEST_PLOTS:
+ if _pyplot is None:
+ raise _pyplot_import_error
+ figure = _pyplot.figure()
+ time_axes = figure.add_subplot(2, 1, 1)
+ time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
+ time_axes.set_title('time series')
+ freq_axes = figure.add_subplot(2, 1, 2)
+ freq_axes.plot(freq_axis, X.real, 'r.')
+ freq_axes.plot(freq_axis, X.imag, 'g.')
+ freq_axes.plot(freq_axis, expected, 'b-')
+ freq_axes.set_title('freq series')
+
+ def test_rect(self):
+ "Test unitary FFTs on variously shaped rectangular functions."
+ self.run_rect(a=0.5)
+ self.run_rect(a=2.0)
+ self.run_rect(a=0.7, samp_freq=50, samples=512)
+ self.run_rect(a=3.0, samp_freq=60, samples=1024)
+
+ def gaussian(self, a, t):
+ r"""Gaussian function.
+
+ Notes
+ -----
+
+ .. math:: \gaussian(a,t) = \exp^{-at^2}
+ """
+ return _numpy.exp(-a * t**2)
+
+ def run_gaussian(self, a=1.0, time_shift=5.0, samp_freq=25.6, samples=256):
+ r"""Test :py:func:`unitary_rttf` on known function :py:meth:`gaussian`.
+
+ Notes
+ -----
+ Analytic result:
+
+ .. math::
+
+ \rfft(\gaussian(a,t))
+ = \sqrt{\pi/a} \cdot \gaussian(1/a,\pi f)
+ """
+ samp_freq = _numpy.float(samp_freq)
+ a = _numpy.float(a)
+
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
+ dt = 1.0 / samp_freq
+ for i in range(samples):
+ t = i * dt
+ x[i] = self.gaussian(a, (t - time_shift))
+ freq_axis,X = unitary_rfft(x, samp_freq)
+
+ # remove the phase due to our time shift
+ j = _numpy.complex(0.0, 1.0) # sqrt(-1)
+ for i in range(len(freq_axis)):
+ f = freq_axis[i]
+ inverse_phase_shift = _numpy.exp(
+ j * 2.0 * _numpy.pi * time_shift * f)
+ X[i] *= inverse_phase_shift
+
+ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
+ for i in range(len(freq_axis)):
+ f = freq_axis[i]
+ # see Wikipedia, or do the integral yourself.
+ expected[i] = _numpy.sqrt(_numpy.pi / a) * self.gaussian(
+ 1.0 / a, _numpy.pi * f)
+
+ if TEST_PLOTS:
+ if _pyplot is None:
+ raise _pyplot_import_error
+ figure = _pyplot.figure()
+ time_axes = figure.add_subplot(2, 1, 1)
+ time_axes.plot(_numpy.arange(0, dt * samples, dt), x)
+ time_axes.set_title('time series')
+ freq_axes = figure.add_subplot(2, 1, 2)
+ freq_axes.plot(freq_axis, X.real, 'r.')
+ freq_axes.plot(freq_axis, X.imag, 'g.')
+ freq_axes.plot(freq_axis, expected, 'b-')
+ freq_axes.set_title('freq series')
+
+ def test_gaussian(self):
+ "Test unitary FFTs on variously shaped gaussian functions."
+ self.run_gaussian(a=0.5)
+ self.run_gaussian(a=2.0)
+ self.run_gaussian(a=0.7, samp_freq=50, samples=512)
+ self.run_gaussian(a=3.0, samp_freq=60, samples=1024)
+
+
+class TestUnitaryPowerSpectrum (_unittest.TestCase):
+ def run_sin(self, sin_freq=10, samp_freq=512, samples=1024):
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
+ samp_freq = _numpy.float(samp_freq)
+ for i in range(samples):
+ x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
+ freq_axis,power = unitary_power_spectrum(x, samp_freq)
+ imax = _numpy.argmax(power)
+
+ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
+ # df = 1/T, where T = total_time
+ df = samp_freq / _numpy.float(samples)
+ i = int(sin_freq / df)
+ # average power per unit time is
+ # P = <x(t)**2>
+ # average value of sin(t)**2 = 0.5 (b/c sin**2+cos**2 == 1)
+ # so average value of (int sin(t)**2 dt) per unit time is 0.5
+ # P = 0.5
+ # we spread that power over a frequency bin of width df, sp
+ # P(f0) = 0.5/df
+ # where f0 is the sin's frequency
+ #
+ # or:
+ # FFT of sin(2*pi*t*f0) gives
+ # X(f) = 0.5 i (delta(f-f0) - delta(f-f0)),
+ # (area under x(t) = 0, area under X(f) = 0)
+ # so one sided power spectral density (PSD) per unit time is
+ # P(f) = 2 |X(f)|**2 / T
+ # = 2 * |0.5 delta(f-f0)|**2 / T
+ # = 0.5 * |delta(f-f0)|**2 / T
+ # but we're discrete and want the integral of the 'delta' to be 1,
+ # so 'delta'*df = 1 --> 'delta' = 1/df, and
+ # P(f) = 0.5 / (df**2 * T)
+ # = 0.5 / df (T = 1/df)
+ expected[i] = 0.5 / df
+
+ LOG.debug('The power should be a peak at {} Hz of {} ({}, {})'.format(
+ sin_freq, expected[i], freq_axis[imax], power[imax]))
+ Pexp = P = 0
+ for i in range(len(freq_axis)):
+ Pexp += expected[i] * df
+ P += power[i] * df
+ self.assertAlmostEqual(
+ _numpy.abs((P - Pexp) / Pexp), 0, 1,
+ 'The total power should be {} ({})'.format(Pexp, P))
+
+ if TEST_PLOTS:
+ if _pyplot is None:
+ raise _pyplot_import_error
+ figure = _pyplot.figure()
+ time_axes = figure.add_subplot(2, 1, 1)
+ time_axes.plot(
+ _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
+ time_axes.set_title('time series')
+ freq_axes = figure.add_subplot(2, 1, 2)
+ freq_axes.plot(freq_axis, power, 'r.')
+ freq_axes.plot(freq_axis, expected, 'b-')
+ freq_axes.set_title(
+ '{} samples of sin at {} Hz'.format(samples, sin_freq))
+
+ def test_sin(self):
+ "Test unitary power spectrums on variously shaped sin functions"
+ self.run_sin(sin_freq=5, samp_freq=512, samples=1024)
+ self.run_sin(sin_freq=5, samp_freq=512, samples=2048)
+ self.run_sin(sin_freq=5, samp_freq=512, samples=4098)
+ self.run_sin(sin_freq=7, samp_freq=512, samples=1024)
+ self.run_sin(sin_freq=5, samp_freq=1024, samples=2048)
+ # finally, with some irrational numbers, to check that I'm not
+ # getting lucky
+ self.run_sin(
+ sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)
+ # test with non-integer number of periods
+ self.run_sin(sin_freq=5, samp_freq=512, samples=256)
+
+ def run_delta(self, amp=1, samp_freq=1, samples=256):
+ """TODO
+ """
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
+ samp_freq = _numpy.float(samp_freq)
+ x[0] = amp
+ freq_axis,power = unitary_power_spectrum(x, samp_freq)
+
+ # power = <x(t)**2> = (amp)**2 * dt/T
+ # we spread that power over the entire freq_axis [0,fN], so
+ # P(f) = (amp)**2 dt / (T fN)
+ # where
+ # dt = 1/samp_freq (sample period)
+ # T = samples/samp_freq (total time of data aquisition)
+ # fN = 0.5 samp_freq (Nyquist frequency)
+ # so
+ # P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq)
+ # = 2 amp**2 / (samp_freq*samples)
+ expected_amp = 2.0 * amp**2 / (samp_freq * samples)
+ expected = _numpy.ones(
+ (len(freq_axis),), dtype=_numpy.float) * expected_amp
+
+ self.assertAlmostEqual(
+ expected_amp, power[0], 4,
+ 'The power should be flat at y = {} ({})'.format(
+ expected_amp, power[0]))
+
+ if TEST_PLOTS:
+ if _pyplot is None:
+ raise _pyplot_import_error
+ figure = _pyplot.figure()
+ time_axes = figure.add_subplot(2, 1, 1)
+ time_axes.plot(
+ _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq), x, 'b-')
+ time_axes.set_title('time series')
+ freq_axes = figure.add_subplot(2, 1, 2)
+ freq_axes.plot(freq_axis, power, 'r.')
+ freq_axes.plot(freq_axis, expected, 'b-')
+ freq_axes.set_title('{} samples of delta amp {}'.format(samples, amp))
+
+ def test_delta(self):
+ "Test unitary power spectrums on various delta functions"
+ self.run_delta(amp=1, samp_freq=1.0, samples=1024)
+ self.run_delta(amp=1, samp_freq=1.0, samples=2048)
+ # expected = 2*computed
+ self.run_delta(amp=1, samp_freq=0.5, samples=2048)
+ # expected = 0.5*computed
+ self.run_delta(amp=1, samp_freq=2.0, samples=2048)
+ self.run_delta(amp=3, samp_freq=1.0, samples=1024)
+ self.run_delta(amp=_numpy.pi, samp_freq=_numpy.exp(1), samples=1024)
+
+ def gaussian(self, area, mean, std, t):
+ "Integral over all time = area (i.e. normalized for area=1)"
+ return area / (std * _numpy.sqrt(2.0 * _numpy.pi)) * _numpy.exp(
+ -0.5 * ((t-mean)/std)**2)
+
+ def run_gaussian(self, area=2.5, mean=5, std=1, samp_freq=10.24,
+ samples=512):
+ """TODO.
+ """
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
+ mean = _numpy.float(mean)
+ for i in range(samples):
+ t = i / _numpy.float(samp_freq)
+ x[i] = self.gaussian(area, mean, std, t)
+ freq_axis,power = unitary_power_spectrum(x, samp_freq)
+
+ # generate the predicted curve by comparing our
+ # TestUnitaryPowerSpectrum.gaussian() form to
+ # TestUnitaryRFFT.gaussian(),
+ # we see that the Fourier transform of x(t) has parameters:
+ # std' = 1/(2 pi std) (references declaring std' = 1/std are
+ # converting to angular frequency,
+ # not frequency like we are)
+ # area' = area/[std sqrt(2*pi)] (plugging into FT of
+ # TestUnitaryRFFT.gaussian() above)
+ # mean' = 0 (changing the mean in the time-domain just
+ # changes the phase in the freq-domain)
+ # So our power spectral density per unit time is given by
+ # P(f) = 2 |X(f)|**2 / T
+ # Where
+ # T = samples/samp_freq (total time of data aquisition)
+ mean = 0.0
+ area = area / (std * _numpy.sqrt(2.0 * _numpy.pi))
+ std = 1.0 / (2.0 * _numpy.pi * std)
+ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
+ # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1])
+ df = _numpy.float(samp_freq) / samples
+ for i in range(len(freq_axis)):
+ f = i * df
+ gaus = self.gaussian(area, mean, std, f)
+ expected[i] = 2.0 * gaus**2 * samp_freq / samples
+ self.assertAlmostEqual(
+ expected[0], power[0], 3,
+ ('The power should be a half-gaussian, '
+ 'with a peak at 0 Hz with amplitude {} ({})').format(
+ expected[0], power[0]))
+
+ if TEST_PLOTS:
+ if _pyplot is None:
+ raise _pyplot_import_error
+ figure = _pyplot.figure()
+ time_axes = figure.add_subplot(2, 1, 1)
+ time_axes.plot(
+ _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
+ x, 'b-')
+ time_axes.set_title('time series')
+ freq_axes = figure.add_subplot(2, 1, 2)
+ freq_axes.plot(freq_axis, power, 'r.')
+ freq_axes.plot(freq_axis, expected, 'b-')
+ freq_axes.set_title('freq series')
+
+ def test_gaussian(self):
+ "Test unitary power spectrums on various gaussian functions"
+ for area in [1, _numpy.pi]:
+ for std in [1, _numpy.sqrt(2)]:
+ for samp_freq in [10.0, _numpy.exp(1)]:
+ for samples in [1024, 2048]:
+ self.run_gaussian(
+ area=area, std=std, samp_freq=samp_freq,
+ samples=samples)
+
+
+class TestUnitaryAvgPowerSpectrum (_unittest.TestCase):
+ def run_sin(self, sin_freq=10, samp_freq=512, samples=1024, chunk_size=512,
+ overlap=True, window=window_hann, places=3):
+ """TODO
+ """
+ x = _numpy.zeros((samples,), dtype=_numpy.float)
+ samp_freq = _numpy.float(samp_freq)
+ for i in range(samples):
+ x[i] = _numpy.sin(2.0 * _numpy.pi * (i / samp_freq) * sin_freq)
+ freq_axis,power = unitary_avg_power_spectrum(
+ x, samp_freq, chunk_size, overlap, window)
+ imax = _numpy.argmax(power)
+
+ expected = _numpy.zeros((len(freq_axis),), dtype=_numpy.float)
+ # df = 1/T, where T = total_time
+ df = samp_freq / _numpy.float(chunk_size)
+ i = int(sin_freq / df)
+ # see TestUnitaryPowerSpectrum.run_unitary_power_spectrum_sin()
+ expected[i] = 0.5 / df
+
+ LOG.debug('The power should peak at {} Hz of {} ({}, {})'.format(
+ sin_freq, expected[i], freq_axis[imax], power[imax]))
+ Pexp = P = 0
+ for i in range(len(freq_axis)):
+ Pexp += expected[i] * df
+ P += power[i] * df
+ self.assertAlmostEqual(
+ Pexp, P, places,
+ 'The total power should be {} ({})'.format(Pexp, P))
+
+ if TEST_PLOTS:
+ if _pyplot is None:
+ raise _pyplot_import_error
+ figure = _pyplot.figure()
+ time_axes = figure.add_subplot(2, 1, 1)
+ time_axes.plot(
+ _numpy.arange(0, samples / samp_freq, 1.0 / samp_freq),
+ x, 'b-')
+ time_axes.set_title('time series')
+ freq_axes = figure.add_subplot(2, 1, 2)
+ freq_axes.plot(freq_axis, power, 'r.')
+ freq_axes.plot(freq_axis, expected, 'b-')
+ freq_axes.set_title(
+ '{} samples of sin at {} Hz'.format(samples, sin_freq))
+
+ def test_sin(self):
+ "Test unitary avg power spectrums on variously shaped sin functions."
+ self.run_sin(sin_freq=5, samp_freq=512, samples=1024)
+ self.run_sin(sin_freq=5, samp_freq=512, samples=2048)
+ self.run_sin(sin_freq=5, samp_freq=512, samples=4098)
+ self.run_sin(sin_freq=17, samp_freq=512, samples=1024)
+ self.run_sin(sin_freq=5, samp_freq=1024, samples=2048)
+ # test long wavelenth sin, so be closer to window frequency
+ self.run_sin(sin_freq=1, samp_freq=1024, samples=2048, places=0)
+ # finally, with some irrational numbers, to check that I'm not
+ # getting lucky
+ self.run_sin(
+ sin_freq=_numpy.pi, samp_freq=100 * _numpy.exp(1), samples=1024)