From: W. Trevor King Date: Mon, 11 May 2009 18:47:20 +0000 (-0400) Subject: Began versioning. X-Git-Tag: v0.2^0 X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=4dcf7f081a9784d569047b7992012971d2f42686;p=stepper.git Began versioning. --- diff --git a/FFT_tools.py b/FFT_tools.py deleted file mode 100644 index b79d3b4..0000000 --- a/FFT_tools.py +++ /dev/null @@ -1,617 +0,0 @@ -#!/usr/bin/python -# -# Wrap Numpy's fft module to produce 1D unitary transforms and power spectra. -# -# Copyright (C) 2008 W. Trevor King -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in -# the documentation and/or other materials provided with the -# distribution. -# -# * Neither the name of the copyright holders nor the names of the -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE -# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, -# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN -# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. - -""" -Define some FFT wrappers to reduce clutter. -Provides a unitary discrete FFT and a windowed version. -Based on numpy.fft.rfft. - -Main entry functions: - unitary_rfft(data, freq=1.0) - power_spectrum(data, freq=1.0) - unitary_power_spectrum(data, freq=1.0) - avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann) - unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048, overlap=True, window=window_hann) -""" - -from numpy import log2, floor, round, ceil, abs, pi, exp, cos, sin, sqrt, \ - sinc, arctan2, array, ones, arange, linspace, zeros, \ - uint16, float, concatenate, fromfile, argmax, complex -from numpy.fft import rfft - - -# print time- and freq- space plots of the test transforms if True -TEST_PLOTS = False -#TEST_PLOTS = True - -def floor_pow_of_two(num) : - "Round num down to the closest exact a power of two." - lnum = log2(num) - if int(lnum) != lnum : - num = 2**floor(lnum) - return num - -def round_pow_of_two(num) : - "Round num to the closest exact a power of two on a log scale." - lnum = log2(num) - if int(lnum) != lnum : - num = 2**round(lnum) - return num - -def ceil_pow_of_two(num) : - "Round num up to the closest exact a power of two." - lnum = log2(num) - if int(lnum) != lnum : - num = 2**ceil(lnum) - return num - -def _test_rfft(xs, Xs) : - # Numpy's FFT algoritm returns - # n-1 - # X[k] = SUM x[m] exp (-j 2pi km /n) - # m=0 - # (see http://www.tramy.us/numpybook.pdf) - j = complex(0,1) - n = len(xs) - Xa = [] - for k in range(n) : - Xa.append(sum([x*exp(-j*2*pi*k*m/n) for x,m in zip(xs,range(n))])) - if k < len(Xs): - assert (Xs[k]-Xa[k])/abs(Xa[k]) < 1e-6, \ - "rfft mismatch on element %d: %g != %g, relative error %g" \ - % (k, Xs[k], Xa[k], (Xs[k]-Xa[k])/abs(Xa[k])) - # Which should satisfy the discrete form of Parseval's theorem - # n-1 n-1 - # SUM |x_m|^2 = 1/n SUM |X_k|^2. - # m=0 k=0 - timeSum = sum([abs(x)**2 for x in xs]) - freqSum = sum([abs(X)**2 for X in Xa]) - assert abs(freqSum/float(n) - timeSum)/timeSum < 1e-6, \ - "Mismatch on Parseval's, %g != 1/%d * %g" % (timeSum, n, freqSum) - -def _test_rfft_suite() : - print "Test numpy rfft definition" - xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1] - _test_rfft(xs, rfft(xs)) - -def unitary_rfft(data, freq=1.0) : - """ - Compute the Fourier transform of real data. - Unitary (preserves power [Parseval's theorem]). - - If the units on your data are Volts, - and your sampling frequency is in Hz, - then freq_axis will be in Hz, - and trans will be in Volts. - """ - nsamps = floor_pow_of_two(len(data)) - # Which should satisfy the discrete form of Parseval's theorem - # n-1 n-1 - # SUM |x_m|^2 = 1/n SUM |X_k|^2. - # m=0 k=0 - # However, we want our FFT to satisfy the continuous Parseval eqn - # int_{-infty}^{infty} |x(t)|^2 dt = int_{-infty}^{infty} |X(f)|^2 df - # which has the discrete form - # n-1 n-1 - # SUM |x_m|^2 dt = SUM |X'_k|^2 df - # m=0 k=0 - # with X'_k = AX, this gives us - # n-1 n-1 - # SUM |x_m|^2 = A^2 df/dt SUM |X'_k|^2 - # m=0 k=0 - # so we see - # A^2 df/dt = 1/n - # A^2 = 1/n dt/df - # From Numerical Recipes (http://www.fizyka.umk.pl/nrbook/bookcpdf.html), - # Section 12.1, we see that for a sampling rate dt, the maximum frequency - # f_c in the transformed data is the Nyquist frequency (12.1.2) - # f_c = 1/2dt - # and the points are spaced out by (12.1.5) - # df = 1/ndt - # so - # dt = 1/ndf - # dt/df = 1/ndf^2 - # A^2 = 1/n^2df^2 - # A = 1/ndf = ndt/n = dt - # so we can convert the Numpy transformed data to match our unitary - # continuous transformed data with (also NR 12.1.8) - # X'_k = dtX = X / - trans = rfft(data[0:nsamps]) / float(freq) - freq_axis = linspace(0, freq/2, nsamps/2+1) - return (freq_axis, trans) - -def _test_unitary_rfft_parsevals(xs, freq, freqs, Xs): - # Which should satisfy the discretized integral form of Parseval's theorem - # n-1 n-1 - # SUM |x_m|^2 dt = SUM |X_k|^2 df - # m=0 k=0 - dt = 1.0/freq - df = freqs[1]-freqs[0] - assert (df - 1/(len(xs)*dt))/df < 1e-6, \ - "Mismatch in spacing, %g != 1/(%d*%g)" % (df, len(xs), dt) - Xa = list(Xs) - for k in range(len(Xs)-1,1,-1) : - Xa.append(Xa[k]) - assert len(xs) == len(Xa), "Length mismatch %d != %d" % (len(xs), len(Xa)) - lhs = sum([abs(x)**2 for x in xs]) * dt - rhs = sum([abs(X)**2 for X in Xa]) * df - assert abs(lhs - rhs)/lhs < 1e-4, "Mismatch on Parseval's, %g != %g" \ - % (lhs, rhs) - -def _test_unitary_rfft_parsevals_suite(): - print "Test unitary rfft on Parseval's theorem" - xs = [1,2,3,1,2,3,1,2,3,1,2,3,1,2,3,1] - dt = pi - freqs,Xs = unitary_rfft(xs, 1.0/dt) - _test_unitary_rfft_parsevals(xs, 1.0/dt, freqs, Xs) - -def _rect(t) : - if abs(t) < 0.5 : - return 1 - else : - return 0 - -def _test_unitary_rfft_rect(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) : - "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)" - samp_freq = float(samp_freq) - a = float(a) - - x = zeros((samples,), dtype=float) - dt = 1.0/samp_freq - for i in range(samples) : - t = i*dt - x[i] = _rect(a*(t-time_shift)) - freq_axis, X = unitary_rfft(x, samp_freq) - #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X) - - # remove the phase due to our time shift - j = complex(0.0,1.0) # sqrt(-1) - for i in range(len(freq_axis)) : - f = freq_axis[i] - inverse_phase_shift = exp(j*2.0*pi*time_shift*f) - X[i] *= inverse_phase_shift - - expected = zeros((len(freq_axis),), dtype=float) - # normalized sinc(x) = sin(pi x)/(pi x) - # so sinc(0.5) = sin(pi/2)/(pi/2) = 2/pi - assert sinc(0.5) == 2.0/pi, "abnormal sinc()" - for i in range(len(freq_axis)) : - f = freq_axis[i] - expected[i] = 1.0/abs(a) * sinc(f/a) - - if TEST_PLOTS : - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, dt*samples, dt), x) - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, X.real, 'r.') - pylab.plot(freq_axis, X.imag, 'g.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('freq series') - -def _test_unitary_rfft_rect_suite() : - print "Test unitary FFTs on variously shaped rectangular functions" - _test_unitary_rfft_rect(a=0.5) - _test_unitary_rfft_rect(a=2.0) - _test_unitary_rfft_rect(a=0.7, samp_freq=50, samples=512) - _test_unitary_rfft_rect(a=3.0, samp_freq=60, samples=1024) - -def _gaussian(a, t) : - return exp(-a * t**2) - -def _test_unitary_rfft_gaussian(a=1.0, time_shift=5.0, samp_freq=25.6, samples=256) : - "Show fft(rect(at)) = 1/abs(a) * sinc(f/a)" - samp_freq = float(samp_freq) - a = float(a) - - x = zeros((samples,), dtype=float) - dt = 1.0/samp_freq - for i in range(samples) : - t = i*dt - x[i] = _gaussian(a, (t-time_shift)) - freq_axis, X = unitary_rfft(x, samp_freq) - #_test_unitary_rfft_parsevals(x, samp_freq, freq_axis, X) - - # remove the phase due to our time shift - j = complex(0.0,1.0) # sqrt(-1) - for i in range(len(freq_axis)) : - f = freq_axis[i] - inverse_phase_shift = exp(j*2.0*pi*time_shift*f) - X[i] *= inverse_phase_shift - - expected = zeros((len(freq_axis),), dtype=float) - for i in range(len(freq_axis)) : - f = freq_axis[i] - expected[i] = sqrt(pi/a) * _gaussian(1.0/a, pi*f) # see Wikipedia, or do the integral yourself. - - if TEST_PLOTS : - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, dt*samples, dt), x) - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, X.real, 'r.') - pylab.plot(freq_axis, X.imag, 'g.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('freq series') - -def _test_unitary_rfft_gaussian_suite() : - print "Test unitary FFTs on variously shaped gaussian functions" - _test_unitary_rfft_gaussian(a=0.5) - _test_unitary_rfft_gaussian(a=2.0) - _test_unitary_rfft_gaussian(a=0.7, samp_freq=50, samples=512) - _test_unitary_rfft_gaussian(a=3.0, samp_freq=60, samples=1024) - - - -def power_spectrum(data, freq=1.0) : - """ - Compute the power spectrum of DATA taken with a sampling frequency FREQ. - DATA must be real (not complex). - Returns a tuple of two arrays, (freq_axis, power), suitable for plotting. - If the number of samples in data is not an integer power of two, - the FFT ignores some of the later points. - """ - nsamps = floor_pow_of_two(len(data)) - - freq_axis = linspace(0, freq/2, nsamps/2+1) - # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real. - # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination. - # See Numerical Recipies for a details. - trans = rfft(data[0:nsamps]) - power = trans * trans.conj() # We want the square of the amplitude. - return (freq_axis, power) - -def unitary_power_spectrum(data, freq=1.0) : - freq_axis,power = power_spectrum(data, freq) - # One sided power spectral density, so 2|H(f)|**2 (see NR 2nd edition 12.0.14, p498) - # - # numpy normalizes with 1/N on the inverse transform ifft, - # so we should normalize the freq-space representation with 1/sqrt(N). - # But we're using the rfft, where N points are like N/2 complex points, so 1/sqrt(N/2) - # So the power gets normalized by that twice and we have 2/N - # - # On top of this, the FFT assumes a sampling freq of 1 per second, - # and we want to preserve area under our curves. - # If our total time T = len(data)/freq is smaller than 1, - # our df_real = freq/len(data) is bigger that the FFT expects (dt_fft = 1/len(data)), - # and we need to scale the powers down to conserve area. - # df_fft * F_fft(f) = df_real *F_real(f) - # F_real = F_fft(f) * (1/len)/(freq/len) = F_fft(f)*freq - # So the power gets normalized by *that* twice and we have 2/N * freq**2 - - # power per unit time - # measure x(t) for time T - # X(f) = int_0^T x(t) exp(-2 pi ift) dt - # PSD(f) = 2 |X(f)|**2 / T - - # total_time = len(data)/float(freq) - # power *= 2.0 / float(freq)**2 / total_time - # power *= 2.0 / freq**2 * freq / len(data) - power *= 2.0 / (freq * float(len(data))) - - return (freq_axis, power) - -def _test_unitary_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024) : - x = zeros((samples,), dtype=float) - samp_freq = float(samp_freq) - for i in range(samples) : - x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq) - freq_axis, power = unitary_power_spectrum(x, samp_freq) - imax = argmax(power) - - expected = zeros((len(freq_axis),), dtype=float) - df = samp_freq/float(samples) # df = 1/T, where T = total_time - i = int(sin_freq/df) - # average power per unit time is - # P = - # average value of sin(t)**2 = 0.5 (b/c sin**2+cos**2 == 1) - # so average value of (int sin(t)**2 dt) per unit time is 0.5 - # P = 0.5 - # we spread that power over a frequency bin of width df, sp - # P(f0) = 0.5/df - # where f0 is the sin's frequency - # - # or : - # FFT of sin(2*pi*t*f0) gives - # X(f) = 0.5 i (delta(f-f0) - delta(f-f0)), - # (area under x(t) = 0, area under X(f) = 0) - # so one sided power spectral density (PSD) per unit time is - # P(f) = 2 |X(f)|**2 / T - # = 2 * |0.5 delta(f-f0)|**2 / T - # = 0.5 * |delta(f-f0)|**2 / T - # but we're discrete and want the integral of the 'delta' to be 1, - # so 'delta'*df = 1 --> 'delta' = 1/df, and - # P(f) = 0.5 / (df**2 * T) - # = 0.5 / df (T = 1/df) - expected[i] = 0.5 / df - - print "The power should be a peak at %g Hz of %g (%g, %g)" % \ - (sin_freq, expected[i], freq_axis[imax], power[imax]) - Pexp = 0 - P = 0 - for i in range(len(freq_axis)) : - Pexp += expected[i] *df - P += power[i] * df - print " The total power should be %g (%g)" % (Pexp, P) - - if TEST_PLOTS : - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, power, 'r.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq)) - -def _test_unitary_power_spectrum_sin_suite() : - print "Test unitary power spectrums on variously shaped sin functions" - _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024) - _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048) - _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098) - _test_unitary_power_spectrum_sin(sin_freq=7, samp_freq=512, samples=1024) - _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048) - # finally, with some irrational numbers, to check that I'm not getting lucky - _test_unitary_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024) - # test with non-integer number of periods - _test_unitary_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=256) - -def _test_unitary_power_spectrum_delta(amp=1, samp_freq=1, samples=256) : - x = zeros((samples,), dtype=float) - samp_freq = float(samp_freq) - x[0] = amp - freq_axis, power = unitary_power_spectrum(x, samp_freq) - - # power = = (amp)**2 * dt/T - # we spread that power over the entire freq_axis [0,fN], so - # P(f) = (amp)**2 dt / (T fN) - # where - # dt = 1/samp_freq (sample period) - # T = samples/samp_freq (total time of data aquisition) - # fN = 0.5 samp_freq (Nyquist frequency) - # so - # P(f) = amp**2 / (samp_freq * samples/samp_freq * 0.5 samp_freq) - # = 2 amp**2 / (samp_freq*samples) - expected_amp = 2.0 * amp**2 / (samp_freq * samples) - expected = ones((len(freq_axis),), dtype=float) * expected_amp - - print "The power should be flat at y = %g (%g)" % (expected_amp, power[0]) - - if TEST_PLOTS : - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, power, 'r.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('%g samples of delta amp %g' % (samples, amp)) - -def _test_unitary_power_spectrum_delta_suite() : - print "Test unitary power spectrums on various delta functions" - _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=1024) - _test_unitary_power_spectrum_delta(amp=1, samp_freq=1.0, samples=2048) - _test_unitary_power_spectrum_delta(amp=1, samp_freq=0.5, samples=2048)# expected = 2*computed - _test_unitary_power_spectrum_delta(amp=1, samp_freq=2.0, samples=2048)# expected = 0.5*computed - _test_unitary_power_spectrum_delta(amp=3, samp_freq=1.0, samples=1024) - _test_unitary_power_spectrum_delta(amp=pi, samp_freq=exp(1), samples=1024) - -def _gaussian2(area, mean, std, t) : - "Integral over all time = area (i.e. normalized for area=1)" - return area/(std*sqrt(2.0*pi)) * exp(-0.5*((t-mean)/std)**2) - -def _test_unitary_power_spectrum_gaussian(area=2.5, mean=5, std=1, samp_freq=10.24 ,samples=512) : #1024 - x = zeros((samples,), dtype=float) - mean = float(mean) - for i in range(samples) : - t = i/float(samp_freq) - x[i] = _gaussian2(area, mean, std, t) - freq_axis, power = unitary_power_spectrum(x, samp_freq) - - # generate the predicted curve - # by comparing our _gaussian2() form to _gaussian(), - # we see that the Fourier transform of x(t) has parameters: - # std' = 1/(2 pi std) (references declaring std' = 1/std are converting to angular frequency, not frequency like we are) - # area' = area/[std sqrt(2*pi)] (plugging into FT of _gaussian() above) - # mean' = 0 (changing the mean in the time-domain just changes the phase in the freq-domain) - # So our power spectral density per unit time is given by - # P(f) = 2 |X(f)|**2 / T - # Where - # T = samples/samp_freq (total time of data aquisition) - mean = 0.0 - area = area /(std*sqrt(2.0*pi)) - std = 1.0/(2.0*pi*std) - expected = zeros((len(freq_axis),), dtype=float) - df = float(samp_freq)/samples # 1/total_time ( = freq_axis[1]-freq_axis[0] = freq_axis[1]) - for i in range(len(freq_axis)) : - f = i*df - gaus = _gaussian2(area, mean, std, f) - expected[i] = 2.0 * gaus**2 * samp_freq/samples - print "The power should be a half-gaussian, ", - print "with a peak at 0 Hz with amplitude %g (%g)" % (expected[0], power[0]) - - if TEST_PLOTS : - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, power, 'r.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('freq series') - -def _test_unitary_power_spectrum_gaussian_suite() : - print "Test unitary power spectrums on various gaussian functions" - _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=1024) - _test_unitary_power_spectrum_gaussian(area=1, std=2, samp_freq=10.0, samples=1024) - _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=10.0, samples=2048) - _test_unitary_power_spectrum_gaussian(area=1, std=1, samp_freq=20.0, samples=2048) - _test_unitary_power_spectrum_gaussian(area=3, std=1, samp_freq=10.0, samples=1024) - _test_unitary_power_spectrum_gaussian(area=pi, std=sqrt(2), samp_freq=exp(1), samples=1024) - -def window_hann(length) : - "Returns a Hann window array with length entries" - win = zeros((length,), dtype=float) - for i in range(length) : - win[i] = 0.5*(1.0-cos(2.0*pi*float(i)/(length))) - # avg value of cos over a period is 0 - # so average height of Hann window is 0.5 - return win - -def avg_power_spectrum(data, freq=1.0, chunk_size=2048, - overlap=True, window=window_hann) : - """ - Compute the avg power spectrum of DATA taken with a sampling frequency FREQ. - DATA must be real (not complex) by breaking DATA into chunks. - The chunks may or may not be overlapping (by setting OVERLAP). - The chunks are windowed by dotting with WINDOW(CHUNK_SIZE), FFTed, - and the resulting spectra are averaged together. - See NR 13.4 for rational. - - Returns a tuple of two arrays, (freq_axis, power), suitable for plotting. - CHUNK_SIZE should really be a power of 2. - If the number of samples in DATA is not an integer power of CHUNK_SIZE, - the FFT ignores some of the later points. - """ - assert chunk_size == floor_pow_of_two(chunk_size), \ - "chunk_size %d should be a power of 2" % chunk_size - - nchunks = len(data)/chunk_size # integer division = implicit floor - if overlap : - chunk_step = chunk_size/2 - else : - chunk_step = chunk_size - - win = window(chunk_size) # generate a window of the appropriate size - freq_axis = linspace(0, freq/2, chunk_size/2+1) - # nsamps/2+1 b/c zero-freq and nyqist-freq are both fully real. - # >>> help(numpy.fft.fftpack.rfft) for Numpy's explaination. - # See Numerical Recipies for a details. - power = zeros((chunk_size/2+1,), dtype=float) - for i in range(nchunks) : - starti = i*chunk_step - stopi = starti+chunk_size - fft_chunk = rfft(data[starti:stopi]*win) - p_chunk = fft_chunk * fft_chunk.conj() - power += p_chunk.astype(float) - power /= float(nchunks) - return (freq_axis, power) - -def unitary_avg_power_spectrum(data, freq=1.0, chunk_size=2048, - overlap=True, window=window_hann) : - """ - compute the average power spectrum, preserving normalization - """ - freq_axis,power = avg_power_spectrum(data, freq, chunk_size, - overlap, window) - # 2.0 / (freq * chunk_size) |rfft()|**2 --> unitary_power_spectrum - power *= 2.0 / (freq*float(chunk_size)) * 8/3 # see unitary_power_spectrum() - # * 8/3 to remove power from windowing - # <[x(t)*w(t)]**2> = ~= * - # where the ~= is because the frequency of x(t) >> the frequency of w(t). - # So our calulated power has and extra in it. - # For the Hann window, = <0.5(1 + 2cos + cos**2)> = 1/4 + 0 + 1/8 = 3/8 - # For low frequency components, where the frequency of x(t) is ~= the frequency of w(t), - # The normalization is not perfect. ?? - # The normalization approaches perfection as chunk_size -> infinity. - return (freq_axis, power) - -def _test_unitary_avg_power_spectrum_sin(sin_freq=10, samp_freq=512, samples=1024, - chunk_size=512, overlap=True, - window=window_hann) : - x = zeros((samples,), dtype=float) - samp_freq = float(samp_freq) - for i in range(samples) : - x[i] = sin(2.0 * pi * (i/samp_freq) * sin_freq) - freq_axis, power = unitary_avg_power_spectrum(x, samp_freq, chunk_size, - overlap, window) - imax = argmax(power) - - expected = zeros((len(freq_axis),), dtype=float) - df = samp_freq/float(chunk_size) # df = 1/T, where T = total_time - i = int(sin_freq/df) - expected[i] = 0.5 / df # see _test_unitary_power_spectrum_sin() - - print "The power should be a peak at %g Hz of %g (%g, %g)" % \ - (sin_freq, expected[i], freq_axis[imax], power[imax]) - Pexp = 0 - P = 0 - for i in range(len(freq_axis)) : - Pexp += expected[i] * df - P += power[i] * df - print " The total power should be %g (%g)" % (Pexp, P) - - if TEST_PLOTS : - pylab.figure() - pylab.subplot(211) - pylab.plot(arange(0, samples/samp_freq, 1.0/samp_freq), x, 'b-') - pylab.title('time series') - pylab.subplot(212) - pylab.plot(freq_axis, power, 'r.') - pylab.plot(freq_axis, expected, 'b-') - pylab.title('%g samples of sin at %g Hz' % (samples, sin_freq)) - -def _test_unitary_avg_power_spectrum_sin_suite() : - print "Test unitary avg power spectrums on variously shaped sin functions" - _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=1024) - _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=2048) - _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=512, samples=4098) - _test_unitary_avg_power_spectrum_sin(sin_freq=17, samp_freq=512, samples=1024) - _test_unitary_avg_power_spectrum_sin(sin_freq=5, samp_freq=1024, samples=2048) - # test long wavelenth sin, so be closer to window frequency - _test_unitary_avg_power_spectrum_sin(sin_freq=1, samp_freq=1024, samples=2048) - # finally, with some irrational numbers, to check that I'm not getting lucky - _test_unitary_avg_power_spectrum_sin(sin_freq=pi, samp_freq=100*exp(1), samples=1024) - - -def test() : - _test_rfft_suite() - _test_unitary_rfft_parsevals_suite() - _test_unitary_rfft_rect_suite() - _test_unitary_rfft_gaussian_suite() - _test_unitary_power_spectrum_sin_suite() - _test_unitary_power_spectrum_delta_suite() - _test_unitary_power_spectrum_gaussian_suite() - _test_unitary_avg_power_spectrum_sin_suite() - -if __name__ == "__main__" : - if TEST_PLOTS : - import pylab - test() - if TEST_PLOTS : - pylab.show() diff --git a/Makefile b/Makefile index 12f6398..ff388c0 100644 --- a/Makefile +++ b/Makefile @@ -2,18 +2,18 @@ all : dummy_py -dummy_py : setup.py FFT_tools.py +dummy_py : setup.py stepper.py python setup.py install --home=~ echo "dummy for Makefile dependencies" > $@ check : all - python FFT_tools.py + python stepper.py dist : python setup.py sdist - scp dist/FFT_tools*tar.gz einstein:public_html/code/python/ + scp dist/stepper*tar.gz einstein:public_html/code/python/ clean : python setup.py clean - rm -rf build dist FFT_tools.egg-info + rm -rf build dist stepper.egg-info rm -f dummy_py *.pyc diff --git a/README b/README index f634e53..4880358 100644 --- a/README +++ b/README @@ -1,20 +1,22 @@ -This package wraps Numpy's fft module to produce unitary transforms -and power spectra of real numbers in one dimension. See the code for -the technical details. - +This package provides Python control of stepper motors. See Jones' +"Control of Stepping Motors" for an excellent stepper overview. + http://www.cs.uiowa.edu/~jones/step/ +Supports full and half stepping of motors with one or two power lines +and 4 drains, or one or two drains and 4 power lines... Basically any +motor with 4 variable lines, and it would be easy to extend it to +other cases. == Installation == Non-Python dependencies (Debian packagename): easy_install (python-setuptools) - Numpy source (python-numpy-dev) -FFT_tools uses `setuptools' for installation. Setuptools is basically +Stepper uses `setuptools' for installation. Setuptools is basically an extension of the standard Python distutils package which supports automatic package dependency tracking. The installation procedure should be (on Debian-esque systems) # apt-get intall python-setuptools python-numpy-dev - # easy_install -f http://www.physics.drexel.edu/~wking/code/python/ FFT_tools + # easy_install -f http://www.physics.drexel.edu/~wking/code/python/ stepper There is one speedbump you might run into: * an outdated version of easy_install (see ez_setup.py section) @@ -42,12 +44,12 @@ See the tests in FFT_tools.py for simple examples. == Licence == -This project is distributed under the Python Software Foundation License. -http://www.python.org/psf/license/ +This project is distributed under the BSD License. + http://www.fsf.org/licensing/licenses/#ModifiedBSD == Author == W. Trevor King wking@drexel.edu -Copyright 2007, 2008 +Copyright 2008, 2009 diff --git a/ez_setup.py b/ez_setup.py new file mode 100644 index 0000000..14cd450 --- /dev/null +++ b/ez_setup.py @@ -0,0 +1,275 @@ +#!python +"""Bootstrap setuptools installation + +If you want to use setuptools in your package's setup.py, just include this +file in the same directory with it, and add this to the top of your setup.py:: + + from ez_setup import use_setuptools + use_setuptools() + +If you want to require a specific version of setuptools, set a download +mirror, or use an alternate download directory, you can do so by supplying +the appropriate options to ``use_setuptools()``. + +This file can also be run as a script to install or upgrade setuptools. +""" +import sys +DEFAULT_VERSION = "0.6c9" +DEFAULT_URL = "http://pypi.python.org/packages/%s/s/setuptools/" % sys.version[:3] + +md5_data = { + 'setuptools-0.6b1-py2.3.egg': '8822caf901250d848b996b7f25c6e6ca', + 'setuptools-0.6b1-py2.4.egg': 'b79a8a403e4502fbb85ee3f1941735cb', + 'setuptools-0.6b2-py2.3.egg': '5657759d8a6d8fc44070a9d07272d99b', + 'setuptools-0.6b2-py2.4.egg': '4996a8d169d2be661fa32a6e52e4f82a', + 'setuptools-0.6b3-py2.3.egg': 'bb31c0fc7399a63579975cad9f5a0618', + 'setuptools-0.6b3-py2.4.egg': '38a8c6b3d6ecd22247f179f7da669fac', + 'setuptools-0.6b4-py2.3.egg': '62045a24ed4e1ebc77fe039aa4e6f7e5', + 'setuptools-0.6b4-py2.4.egg': '4cb2a185d228dacffb2d17f103b3b1c4', + 'setuptools-0.6c1-py2.3.egg': 'b3f2b5539d65cb7f74ad79127f1a908c', + 'setuptools-0.6c1-py2.4.egg': 'b45adeda0667d2d2ffe14009364f2a4b', + 'setuptools-0.6c2-py2.3.egg': 'f0064bf6aa2b7d0f3ba0b43f20817c27', + 'setuptools-0.6c2-py2.4.egg': '616192eec35f47e8ea16cd6a122b7277', + 'setuptools-0.6c3-py2.3.egg': 'f181fa125dfe85a259c9cd6f1d7b78fa', + 'setuptools-0.6c3-py2.4.egg': 'e0ed74682c998bfb73bf803a50e7b71e', + 'setuptools-0.6c3-py2.5.egg': 'abef16fdd61955514841c7c6bd98965e', + 'setuptools-0.6c4-py2.3.egg': 'b0b9131acab32022bfac7f44c5d7971f', + 'setuptools-0.6c4-py2.4.egg': '2a1f9656d4fbf3c97bf946c0a124e6e2', + 'setuptools-0.6c4-py2.5.egg': '8f5a052e32cdb9c72bcf4b5526f28afc', + 'setuptools-0.6c5-py2.3.egg': 'ee9fd80965da04f2f3e6b3576e9d8167', + 'setuptools-0.6c5-py2.4.egg': 'afe2adf1c01701ee841761f5bcd8aa64', + 'setuptools-0.6c5-py2.5.egg': 'a8d3f61494ccaa8714dfed37bccd3d5d', + 'setuptools-0.6c6-py2.3.egg': '35686b78116a668847237b69d549ec20', + 'setuptools-0.6c6-py2.4.egg': '3c56af57be3225019260a644430065ab', + 'setuptools-0.6c6-py2.5.egg': 'b2f8a7520709a5b34f80946de5f02f53', + 'setuptools-0.6c7-py2.3.egg': '209fdf9adc3a615e5115b725658e13e2', + 'setuptools-0.6c7-py2.4.egg': '5a8f954807d46a0fb67cf1f26c55a82e', + 'setuptools-0.6c7-py2.5.egg': '45d2ad28f9750e7434111fde831e8372', + 'setuptools-0.6c8-py2.3.egg': '50759d29b349db8cfd807ba8303f1902', + 'setuptools-0.6c8-py2.4.egg': 'cba38d74f7d483c06e9daa6070cce6de', + 'setuptools-0.6c8-py2.5.egg': '1721747ee329dc150590a58b3e1ac95b', + 'setuptools-0.6c9-py2.3.egg': 'a83c4020414807b496e4cfbe08507c03', + 'setuptools-0.6c9-py2.4.egg': '260a2be2e5388d66bdaee06abec6342a', + 'setuptools-0.6c9-py2.5.egg': 'fe67c3e5a17b12c0e7c541b7ea43a8e6', +} + +import sys, os +try: from hashlib import md5 +except ImportError: from md5 import md5 + +def _validate_md5(egg_name, data): + if egg_name in md5_data: + digest = md5(data).hexdigest() + if digest != md5_data[egg_name]: + print >>sys.stderr, ( + "md5 validation of %s failed! (Possible download problem?)" + % egg_name + ) + sys.exit(2) + return data + +def use_setuptools( + version=DEFAULT_VERSION, download_base=DEFAULT_URL, to_dir=os.curdir, + download_delay=15 +): + """Automatically find/download setuptools and make it available on sys.path + + `version` should be a valid setuptools version number that is available + as an egg for download under the `download_base` URL (which should end with + a '/'). `to_dir` is the directory where setuptools will be downloaded, if + it is not already available. If `download_delay` is specified, it should + be the number of seconds that will be paused before initiating a download, + should one be required. If an older version of setuptools is installed, + this routine will print a message to ``sys.stderr`` and raise SystemExit in + an attempt to abort the calling script. + """ + was_imported = 'pkg_resources' in sys.modules or 'setuptools' in sys.modules + def do_download(): + egg = download_setuptools(version, download_base, to_dir, download_delay) + sys.path.insert(0, egg) + import setuptools; setuptools.bootstrap_install_from = egg + try: + import pkg_resources + except ImportError: + return do_download() + try: + pkg_resources.require("setuptools>="+version); return + except pkg_resources.VersionConflict, e: + if was_imported: + print >>sys.stderr, ( + "The required version of setuptools (>=%s) is not available, and\n" + "can't be installed while this script is running. Please install\n" + " a more recent version first, using 'easy_install -U setuptools'." + "\n\n(Currently using %r)" + ) % (version, e.args[0]) + sys.exit(2) + else: + del pkg_resources, sys.modules['pkg_resources'] # reload ok + return do_download() + except pkg_resources.DistributionNotFound: + return do_download() + +def download_setuptools( + version=DEFAULT_VERSION, download_base=DEFAULT_URL, to_dir=os.curdir, + delay = 15 +): + """Download setuptools from a specified location and return its filename + + `version` should be a valid setuptools version number that is available + as an egg for download under the `download_base` URL (which should end + with a '/'). `to_dir` is the directory where the egg will be downloaded. + `delay` is the number of seconds to pause before an actual download attempt. + """ + import urllib2, shutil + egg_name = "setuptools-%s-py%s.egg" % (version,sys.version[:3]) + url = download_base + egg_name + saveto = os.path.join(to_dir, egg_name) + src = dst = None + if not os.path.exists(saveto): # Avoid repeated downloads + try: + from distutils import log + if delay: + log.warn(""" +--------------------------------------------------------------------------- +This script requires setuptools version %s to run (even to display +help). I will attempt to download it for you (from +%s), but +you may need to enable firewall access for this script first. +I will start the download in %d seconds. + +(Note: if this machine does not have network access, please obtain the file + + %s + +and place it in this directory before rerunning this script.) +---------------------------------------------------------------------------""", + version, download_base, delay, url + ); from time import sleep; sleep(delay) + log.warn("Downloading %s", url) + src = urllib2.urlopen(url) + # Read/write all in one block, so we don't create a corrupt file + # if the download is interrupted. + data = _validate_md5(egg_name, src.read()) + dst = open(saveto,"wb"); dst.write(data) + finally: + if src: src.close() + if dst: dst.close() + return os.path.realpath(saveto) + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +def main(argv, version=DEFAULT_VERSION): + """Install or upgrade setuptools and EasyInstall""" + try: + import setuptools + except ImportError: + egg = None + try: + egg = download_setuptools(version, delay=0) + sys.path.insert(0,egg) + from setuptools.command.easy_install import main + return main(list(argv)+[egg]) # we're done here + finally: + if egg and os.path.exists(egg): + os.unlink(egg) + else: + if setuptools.__version__ == '0.0.1': + print >>sys.stderr, ( + "You have an obsolete version of setuptools installed. Please\n" + "remove it from your system entirely before rerunning this script." + ) + sys.exit(2) + + req = "setuptools>="+version + import pkg_resources + try: + pkg_resources.require(req) + except pkg_resources.VersionConflict: + try: + from setuptools.command.easy_install import main + except ImportError: + from easy_install import main + main(list(argv)+[download_setuptools(delay=0)]) + sys.exit(0) # try to force an exit + else: + if argv: + from setuptools.command.easy_install import main + main(argv) + else: + print "Setuptools version",version,"or greater has been installed." + print '(Run "ez_setup.py -U setuptools" to reinstall or upgrade.)' + +def update_md5(filenames): + """Update our built-in md5 registry""" + + import re + + for name in filenames: + base = os.path.basename(name) + f = open(name,'rb') + md5_data[base] = md5(f.read()).hexdigest() + f.close() + + data = [" %r: %r,\n" % it for it in md5_data.items()] + data.sort() + repl = "".join(data) + + import inspect + srcfile = inspect.getsourcefile(sys.modules[__name__]) + f = open(srcfile, 'rb'); src = f.read(); f.close() + + match = re.search("\nmd5_data = {\n([^}]+)}", src) + if not match: + print >>sys.stderr, "Internal error!" + sys.exit(2) + + src = src[:match.start(1)] + repl + src[match.end(1):] + f = open(srcfile,'w') + f.write(src) + f.close() + + +if __name__=='__main__': + if len(sys.argv)>2 and sys.argv[1]=='--md5update': + update_md5(sys.argv[2:]) + else: + main(sys.argv[1:]) + + + + + + diff --git a/setup.py b/setup.py index 989bfa3..318dfb2 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,7 @@ -"""FFT_tools: unitary FFTs and power spectra for real data. +"""Stepper: Python control of stepper motors. -This pacakage mostly consists of normalizing wrappers around Numpy.fft.rttf() -and a large suite of tests and checks to verify proper function. Requires - * Numpy (http://numpy.scipy.org/) + * PyComedi () """ classifiers = """\ @@ -32,21 +30,25 @@ if version < '2.2.3': DistributionMetadata.classifiers = None DistributionMetadata.download_url = None +#from stepper import VERSION +VERSION = "0.2" # can't import VERSION without also importing comedi. + doclines = __doc__.split("\n") -setup(name="FFT_tools", - version="0.1", +setup(name="stepper", + version=VERSION, maintainer="W. Trevor King", maintainer_email="wking@drexel.edu", url = "http://www.physics.drexel.edu/~wking/code/python/", - download_url = "http://www.physics.drexel.edu/~wking/code/python/FFT_tools-0.1.tar.gz", + download_url = "http://www.physics.drexel.edu/~wking/code/python/stepper-%s.tar.gz" % (VERSION), license = "BSD", platforms = ["all"], description = doclines[0], long_description = "\n".join(doclines[2:]), classifiers = filter(None, classifiers.split("\n")), - py_modules = ['FFT_tools', 'ez_setup'], + py_modules = ['stepper', 'ez_setup'], #packages = find_packages(), + install_requires = "pycomedi >= 0.2", ) # use packages to include subdirectory packages diff --git a/stepper.py b/stepper.py new file mode 100644 index 0000000..203b202 --- /dev/null +++ b/stepper.py @@ -0,0 +1,126 @@ +#!/usr/bin/python +# +# Python control of stepper motors. +# +# Copyright (C) 2008-2009, W. Trevor King +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# +# * Neither the name of the copyright holders nor the names of the +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from pycomedi.single_dio import DO_port +write = DO_port(chan=(0,1,2,3)) + +from time import sleep # in case our DIO is too fast for the motor to keep up + +VERSION = "0.2" + +port_vals = [1, # binary ---1 + 5, # binary -1-1 + 4, # binary -1-- + 6, # binary -11- + 2, # binary --1- + 10, # binary 1-1- + 8, # binary 1--- + 9] # binary 1--1 + +class stepper_obj : + def __init__(self) : + self._curPos = 0 # Current stepper index + self.fullStep = True # Select full or half stepping + self.logic = False # Select active high (True) or active low (False) + self.delay = 1e-5 # 10 us + self._set_pos(0) + self.setExternalStepPos = None + def __del__(self) : + pass + def _get_output(self, position) : + return port_vals[position % 8] + def _set_pos(self, position) : + write(self._get_output(position)) + def get_cur_pos(self) : + return self._curPos + def single_step(self, direction) : + if self.fullStep and self._curPos % 2 == 1 : + self._curPos -= 1 # round down to a full step + if direction > 0 : + if self.fullStep : + self._curPos += 2 + else : + self._curPos += 1 + if direction < 0 : + if self.fullStep : + self._curPos -= 2 + else : + self._curPos -= 1 + write(self._get_output(self._curPos)) + if self.setExternalStepPos != None : + self.setExternalStepPos(self.get_cur_pos()) + if self.delay > 0 : + sleep(self.delay) + def step_to(self, target_position) : + if target_position != int(target_position) : + str = "target_position must be an int: ", target_position + raise Exception, str + if self.fullStep and target_position%2 == 1 : + target_position -= 1 + if target_position > self._curPos : + direction = 1 + else : + direction = -1 + while self._curPos != target_position : + self.single_step(direction) + def step_rel(self, relative_target_position) : + return self.step_to(self._curPos + relative_target_position) + +def test() : + s = stepper_obj() + s._get_output(0) + s._get_output(-100) + s._get_output(100) + s.fullStep = True + s.single_step(1) + s.single_step(1) + s.single_step(1) + s.single_step(-1) + s.single_step(-1) + s.single_step(-1) + s.fullStep = False + s.single_step(-1) + s.single_step(-1) + s.single_step(-1) + s.single_step(1) + s.single_step(1) + s.single_step(1) + s.step_to(1000) + s.step_to(-1000) + s.step_to(0) + +if __name__ == "__main__" : + test()