From: W. Trevor King Date: Tue, 19 Apr 2011 13:21:32 +0000 (-0400) Subject: Clean up and convert to my Cython-based pycomedi implementation. X-Git-Tag: 0.5~16 X-Git-Url: http://git.tremily.us/?p=pypiezo.git;a=commitdiff_plain;h=072e3e03624adea5457cc41e4f68c4154833035b Clean up and convert to my Cython-based pycomedi implementation. --- diff --git a/README b/README index 8370012..300ae9e 100644 --- a/README +++ b/README @@ -17,18 +17,18 @@ Packages Gentoo ~~~~~~ -I've packaged FFT-tools for Gentoo. You need layman_ and my `wtk +I've packaged pypiezo for Gentoo. You need layman_ and my `wtk overlay`_. Install with:: # emerge -av app-portage/layman # layman --add wtk - # emerge -av sci-libs/pycomedi + # emerge -av sci-libs/pypiezo Dependencies ------------ -If you're installing by hand or packaging pycomedi for another +If you're installing by hand or packaging pypiezo for another distribution, you'll need the following dependencies: =========== ================= ===================== diff --git a/pypiezo/__init__.py b/pypiezo/__init__.py index 38b75af..f65ab37 100644 --- a/pypiezo/__init__.py +++ b/pypiezo/__init__.py @@ -16,6 +16,7 @@ # along with pypiezo. If not, see . import logging as _logging +import logging.handlers as _logging_handlers __version__ = '0.4' @@ -24,11 +25,40 @@ __version__ = '0.4' LOG = _logging.getLogger('pypiezo') "Pypiezo logger" -LOG.setLevel(_logging.DEBUG) -h = _logging.StreamHandler() -#h.setLevel(_logging.WARN) -h.setLevel(_logging.DEBUG) -f = _logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -h.setFormatter(f) -LOG.addHandler(h) -del h, f +LOG.setLevel(_logging.WARN) +_formatter = _logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + +_stream_handler = _logging.StreamHandler() +_stream_handler.setLevel(_logging.DEBUG) +_stream_handler.setFormatter(_formatter) +LOG.addHandler(_stream_handler) + +_syslog_handler = None + + +from .config import _BaseConfig +from .config import find_base_config as _find_base_config + + +def setup_base_config(config): + global base_config, _syslog_handler + base_config = config + + LOG.setLevel(base_config['log-level']) + + if base_config['syslog']: + if not _syslog_handler: + _syslog_handler = _logging_handlers.SysLogHandler() + _syslog_handler.setLevel(_logging.DEBUG) + LOG.handlers = [_syslog_handler] + else: + LOG.handlers = [_stream_handler] + + LOG.info('setup base_config:\n%s' % config.dump()) + +def clear_base_config(): + setup_base_config(_BaseConfig()) + +base_config = _find_base_config() +setup_base_config(base_config) diff --git a/pypiezo/afm.py b/pypiezo/afm.py new file mode 100644 index 0000000..d4beb13 --- /dev/null +++ b/pypiezo/afm.py @@ -0,0 +1,415 @@ +# Copyright (C) 2008-2011 W. Trevor King +# +# This file is part of pypiezo. +# +# pypiezo is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# pypiezo is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with pypiezo. If not, see . + +"Control of a piezo-based atomic force microscope." + +import numpy as _numpy + +try: + import matplotlib as _matplotlib + import matplotlib.pyplot as _matplotlib_pyplot + import time as _time # for timestamping lines on plots +except ImportError, e: + _matplotlib = None + _matplotlib_import_error = e + +from curses_check_for_keypress import CheckForKeypress as _CheckForKeypress + +from . import LOG as _LOG +from . import base_config as _base_config +from . import base as _base +from . import surface as _surface + + +class AFMPiezo (_base.Piezo): + """A piezo-controlled atomic force microscope. + + This particular class expects a single input channel for measuring + deflection. Other subclasses provide support for multi-segment + deflection measurements. + + >>> from pprint import pprint + >>> from pycomedi.device import Device + >>> from pycomedi.subdevice import StreamingSubdevice + >>> from pycomedi.channel import AnalogChannel + >>> from pycomedi.constant import AREF, SUBDEVICE_TYPE, UNIT + >>> from . import config + >>> from . import surface + + >>> d = Device('/dev/comedi0') + >>> d.open() + + >>> s_in = d.find_subdevice_by_type(SUBDEVICE_TYPE.ai, + ... factory=StreamingSubdevice) + >>> s_out = d.find_subdevice_by_type(SUBDEVICE_TYPE.ao, + ... factory=StreamingSubdevice) + + >>> axis_channel = s_out.channel( + ... 0, factory=AnalogChannel, aref=AREF.ground) + >>> input_channel = s_in.channel(0, factory=AnalogChannel, aref=AREF.diff) + >>> for chan in [axis_channel, input_channel]: + ... chan.range = chan.find_range(unit=UNIT.volt, min=-10, max=10) + + We set the minimum voltage for the `z` axis to -9 (a volt above + the minimum possible voltage) to help with testing + `.get_surface_position`. Without this minimum voltage, small + calibration errors could lead to a railed -10 V input for the + first few surface approaching steps, which could lead to an + `EdgeKink` error instead of a `FlatFit` error. + + >>> axis_config = config._AxisConfig() + >>> axis_config.update( + ... {'gain':20, 'sensitivity':8e-9, 'minimum':-9}) + >>> axis_channel_config = config._ChannelConfig() + >>> input_channel_config = config._ChannelConfig() + + >>> a = _base.PiezoAxis(axis_config=axis_config, + ... axis_channel_config=axis_channel_config, + ... axis_channel=axis_channel, name='z') + >>> a.setup_config() + + >>> c = _base.InputChannel( + ... channel_config=input_channel_config, channel=input_channel, + ... name='deflection') + >>> c.setup_config() + + >>> p = AFMPiezo(axes=[a], input_channels=[c]) + + >>> deflection = p.read_deflection() + >>> deflection # doctest: +SKIP + 34494L + >>> p.deflection_dtype() + + + We need to know where we are before we can move somewhere + smoothly. + + >>> pos = _base.convert_volts_to_bits(p.axes[0].axis_channel_config, 0) + >>> p.jump('z', pos) + + Usually `.move_to_pos_or_def` is used to approach the surface, but + for testing we assume the z output channel is connected directly + into the deflection input channel. + + >>> target_pos = _base.convert_volts_to_bits( + ... p.axes[0].axis_channel_config, 2) + >>> step = int((target_pos - pos)/5) + >>> target_def = _base.convert_volts_to_bits( + ... p.input_channels[0].channel_config, 3) + >>> data = p.move_to_pos_or_def('z', target_pos, target_def, step=step, + ... return_data=True) + >>> p.last_output == {'z': int(target_pos)} + True + >>> pprint(data) # doctest: +SKIP + {'deflection': + array([32655, 33967, 35280, 36593, 37905, 39218, 39222], dtype=uint16), + 'z': + array([32767, 34077, 35387, 36697, 38007, 39317, 39321], dtype=uint16)} + + That was a working position-limited approach. Now move back to + the center and try a deflection-limited approach. + + >>> p.jump('z', pos) + >>> target_def = _base.convert_volts_to_bits( + ... p.input_channels[0].channel_config, 1) + >>> data = p.move_to_pos_or_def('z', target_pos, target_def, step=step, + ... return_data=True) + >>> print (p.last_output['z'] < int(target_pos)) + True + >>> pprint(data) # doctest: +SKIP + {'deflection': array([32655, 33968, 35281, 36593], dtype=uint16), + 'z': array([32767, 34077, 35387, 36697], dtype=uint16)} + + >>> p.wiggle_for_interference('z', offset=p.last_output['z'], + ... laser_wavelength=650e-9, keypress_test_mode=True) + Press any key to continue + + >>> try: + ... p.get_surface_position('z', max_deflection=target_def) + ... except surface.FlatFit, e: + ... print 'got FlatFit' + got FlatFit + >>> print e # doctest: +SKIP + slopes not sufficiently different: 1.0021 and 1.0021 + >>> abs(e.right_slope-1) < 0.1 + True + >>> abs(e.left_slope-1) < 0.1 + True + + >>> d.close() + """ + def _deflection_channel(self): + return self.channel_by_name(name='deflection', direction='input') + + def read_deflection(self): + """Return sensor deflection in bits. + + TODO: explain how bit <-> volt conversion will work for this + "virtual" channel. + """ + return self._deflection_channel().data_read() + + def deflection_dtype(self): + "Return a Numpy dtype suitable for deflection bit values." + return self._deflection_channel().subdevice.get_dtype() + + def move_to_pos_or_def(self, axis_name, position, deflection, step, + return_data=False, pre_move_steps=0): + """TODO + + pre_move_steps : int + number of 'null' steps to take before moving (confirming a + stable input deflection). + """ + if return_data or _base_config['matplotlib']: + aquire_data = True + else: + aquire_data = False + + if step == 0: + raise ValueError('must have non-zero step size') + elif step < 0 and position > self.last_output[axis_name]: + step = -step + elif step > 0 and position < self.last_output[axis_name]: + step = -step + + log_string = ( + 'move to position %d or deflection %g on axis %s in steps of %d' + % (position, deflection, axis_name, step)) + _LOG.debug(log_string) + current_deflection = self.read_deflection() + log_string = 'current position %d and deflection %g' % ( + self.last_output[axis_name], current_deflection) + _LOG.debug(log_string) + + if aquire_data: + def_array=[current_deflection] + pos_array=[self.last_output[axis_name]] + for i in range(pre_move_steps): + self.jump(axis_name, piezo.last_output[axis_name]) + delection = self.read_deflection() + if aquire_data: + def_array.append(current_deflection) + pos_array.append(self.last_output[axis_name]) + # step in until we hit our target position or exceed our target deflection + while (self.last_output[axis_name] != position and + current_deflection < deflection): + dist_to = position - self.last_output[axis_name] + if abs(dist_to) < abs(step): + jump_to = position + else: + jump_to = self.last_output[axis_name] + step + self.jump(axis_name, jump_to) + current_deflection = self.read_deflection() + log_string = ( + 'current z piezo position %6d, current deflection %6d' + % (current_deflection, self.last_output[axis_name])) + _LOG.debug(log_string) + if aquire_data: + def_array.append(current_deflection) + pos_array.append(self.last_output[axis_name]) + + log_string = ( + 'move to position %d or deflection %g on axis %s complete' + % (position, deflection, axis_name)) + _LOG.debug(log_string) + log_string = 'current position %d and deflection %g' % ( + self.last_output[axis_name], current_deflection) + _LOG.debug(log_string) + if _base_config['matplotlib']: + if not _matplotlib: + raise _matplotlib_import_error + figure = _matplotlib_pyplot.figure() + axes = figure.add_subplot(1, 1, 1) + axes.hold(True) + timestamp = _time.strftime('%H%M%S') + axes.set_title('step approach %s' % timestamp) + axes.plot(pos_array, def_array, '.', label=timestamp) + #_pylab.legend(loc='best') + figure.show() + + if return_data: + data = { + axis_name:_numpy.array( + pos_array, dtype=self.channel_dtype( + axis_name, direction='output')), + 'deflection':_numpy.array( + def_array, dtype=self.deflection_dtype()), + } + return data + + def wiggle_for_interference( + self, axis_name, wiggle_frequency=2, n_samples=1024, amplitude=None, + offset=None, laser_wavelength=None, plot=True, + keypress_test_mode=False): + """Output a sine wave and measure interference. + + With a poorly focused or aligned laser, leaked laser light + reflecting off the surface may interfere with the light + reflected off the cantilever, causing distance-dependent + interference with a period roughly half the laser's + wavelength. This method wiggles the cantilever near the + surface and monitors the magnitude of deflection oscillation, + allowing the operator to adjust the laser alignment in order + to minimize the interference. + + Modern commercial AFMs with computer-aligned lasers must do + something like this automatically. + """ + if _base_config['matplotlib']: + plot = True + if laser_wavelength and amplitude: + log_string = \ + 'use either laser_wavelength or amplitude, but not both' + _LOG.warn(log_string) + + if None in (amplitude, offset): + output_axis = self.axis_by_name(axis_name) + maxdata = output_axis.axis_channel.get_maxdata() + midpoint = int(maxdata/2) + if offset == None: + offset = midpoint + log_string = ( + 'generated offset for interference wiggle: %g' % offset) + _LOG.debug(log_string) + if amplitude == None: + if offset <= midpoint: + max_amplitude = int(offset) + else: + max_amplitude = int(maxdata-offset) + offset_meters = _base.convert_bits_to_meters( + output_axis.axis_channel_config, output_axis.axis_config, + offset) + bit_wavelength = _base.convert_meters_to_bits( + output_axis.axis_channel_config, output_axis.axis_config, + offset_meters + laser_wavelength) - offset + amplitude = 2*bit_wavelength + log_string = ( + 'generated amplitude for interference wiggle: %g' + % amplitude) + _LOG.debug(log_string) + if amplitude > max_amplitude: + raise ValueError('no room for a two wavelength wiggle') + + scan_frequency = wiggle_frequency * n_samples + out = (amplitude * _numpy.sin( + _numpy.arange(n_samples) * 4 * _numpy.pi / float(n_samples)) + + offset) + # 4 for 2 periods, so you can judge precision + out = out.reshape((len(out), 1)).astype( + self.channel_dtype(axis_name, direction='output')) + + _LOG.debug('oscillate for interference wiggle') + log_string = ( + 'amplitude: %d bits, offset: %d bits, scan frequency: %g Hz' + % (amplitude, offset, scan_frequency)) + _LOG.debug(log_string) + + if plot: + if not _matplotlib: + raise _matplotlib_import_error + figure = _matplotlib_pyplot.figure() + axes = figure.add_subplot(1, 1, 1) + axes.hold(False) + timestamp = _time.strftime('%H%M%S') + axes.set_title('wiggle for interference %s' % timestamp) + plot_p = axes.plot(out, out, 'b.-') + figure.show() + c = _CheckForKeypress(test_mode=keypress_test_mode) + while c.input() == None: + # input will need processing for multi-segment AFMs... + data = self.ramp(out, scan_frequency, output_names=[axis_name], + input_names=['deflection']) + _LOG.debug('completed a wiggle round') + if plot: + plot_p[0].set_ydata(data[:,0]) + axes.set_ylim([data.min(), data.max()]) + #_flush_plot() + self.last_output[axis_name] = out[-1,0] + _LOG.debug('interference wiggle complete') + + get_surface_position = _surface.get_surface_position + + +#def ramp +# if USE_ABCD_DEFLECTION : +# for i in range(4) : # i is the photodiode element (0->A, 1->B, ...) +# self.curIn[i] = out["Deflection segment"][i][-1] +# else : +# self.curIn[self.chan_info.def_ind] = out["deflection"][-1] + + +#class FourSegmentAFM (AFM): +# def read_deflection(self): +# "Return sensor deflection in bits." +# A = int(self.curIn[self.chan_info.def_ind[0]]) +# B = int(self.curIn[self.chan_info.def_ind[1]]) +# C = int(self.curIn[self.chan_info.def_ind[2]]) +# D = int(self.curIn[self.chan_info.def_ind[3]]) +# df = float((A+B)-(C+D))/(A+B+C+D) +# dfout = int(df * 2**15) + 2**15 +# if TEXT_VERBOSE : +# print "Current deflection %d (%d, %d, %d, %d)" \ +# % (dfout, A, B, C, D) +# return dfout + + +#def test_smoothness(zp, plotVerbose=True): +# posA = 20000 +# posB = 50000 +# setpoint = zp.def_V2in(3) +# steps = 200 +# outfreq = 1e5 +# outarray = linspace(posB, posA, 1000) +# indata=[] +# outdata=[] +# curVals = zp.jumpToPos(posA) +# zp.pCurVals(curVals) +# _sleep(1) # let jitters die down +# for i in range(10): +# print "ramp %d to %d" % (zp.curPos(), posB) +# curVals, data = moveToPosOrDef(zp, posB, setpoint, step=steps, +# return_data = True) +# indata.append(data) +# out = zp.ramp(outarray, outfreq) +# outdata.append(out) +# if plotVerbose: +# from pylab import figure, plot, title, legend, hold, subplot +# if PYLAB_VERBOSE or plotVerbose: +# _import_pylab() +# _pylab.figure(BASE_FIG_NUM+4) +# for i in range(10): +# _pylab.plot(indata[i]['z'], +# indata[i]['deflection'], '+--', label='in') +# _pylab.plot(outdata[i]['z'], +# outdata[i]['deflection'], '.-', label='out') +# _pylab.title('test smoothness (step in, ramp out)') +# #_pylab.legend(loc='best') +# +#def test(): +# import z_piezo +# zp = z_piezo.z_piezo() +# curVals = zp.moveToPosOrDef(zp.pos_nm2out(600), defl=zp.curDef()+6000, step=(zp.pos_nm2out(10)-zp.pos_nm2out(0))) +# if TEXT_VERBOSE: +# zp.pCurVals(curVals) +# pos = zp.getSurfPos(maxDefl=zp.curDef()+6000) +# if TEXT_VERBOSE: +# print "Surface at %g nm", pos +# print "success" +# if PYLAB_VERBOSE and _final_flush_plot != None: +# _final_flush_plot() + diff --git a/pypiezo/base.py b/pypiezo/base.py new file mode 100644 index 0000000..76bab25 --- /dev/null +++ b/pypiezo/base.py @@ -0,0 +1,655 @@ +# Copyright (C) 2008-2011 W. Trevor King +# +# This file is part of pypiezo. +# +# pypiezo is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# pypiezo is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with pypiezo. If not, see . + +"Basic piezo control." + +import math as _math +from time import sleep as _sleep + +import numpy as _numpy +from scipy.stats import linregress as _linregress + +try: + import matplotlib as _matplotlib + import matplotlib.pyplot as _matplotlib_pyplot + import time as _time # for timestamping lines on plots +except ImportError, e: + _matplotlib = None + _matplotlib_import_error = e + +from pycomedi.constant import TRIG_SRC, SDF +from pycomedi.utility import inttrig_insn, Reader, Writer + +from . import LOG as _LOG +from . import base_config as _base_config +from . import config as _config + + +def convert_bits_to_volts(config, data): + """Convert bit-valued data to volts. + + >>> config = _config._ChannelConfig() + >>> config['conversion-coefficients'] = [1, 2, 3] + >>> config['conversion-origin'] = -1 + >>> convert_bits_to_volts(config, -1) + 1 + >>> convert_bits_to_volts( + ... config, _numpy.array([-1, 0, 1, 2], dtype=_numpy.float)) + array([ 1., 6., 17., 34.]) + """ + coefficients = config['conversion-coefficients'] + origin = config['conversion-origin'] + return _numpy.polyval(list(reversed(coefficients)), data-origin) + +def convert_volts_to_bits(config, data): + """Convert bit-valued data to volts. + + >>> config = _config._ChannelConfig() + >>> config['inverse-conversion-coefficients'] = [1, 2, 3] + >>> config['inverse-conversion-origin'] = -1 + >>> convert_volts_to_bits(config, -1) + 1 + >>> convert_volts_to_bits( + ... config, _numpy.array([-1, 0, 1, 2], dtype=_numpy.float)) + array([ 1., 6., 17., 34.]) + + Note that the inverse coeffiecient and offset are difficult to + derive from the forward coefficient and offset. The current + Comedilib conversion functions, `comedi_to_physical()` and + `comedi_from_physical()` take `comedi_polynomial_t` conversion + arguments. `comedi_polynomial_t` is defined in `comedilib.h`_, + and holds a polynomial of length + `COMEDI_MAX_NUM_POLYNOMIAL_COEFFICIENTS`, currently set to 4. The + inverse of this cubic polynomial might be another polynomial, but + it might also have a more complicated form. A general inversion + solution is considered too complicated, so when you're setting up + your configuration, you should use Comedilib to save both the + forward and inverse coefficients and offsets. + + .. _comedilib.h: http://www.comedi.org/git?p=comedi/comedilib.git;a=blob;f=include/comedilib.h;hb=HEAD + """ + origin = config['inverse-conversion-origin'] + inverse_coefficients = config['inverse-conversion-coefficients'] + if len(inverse_coefficients) == 0: + raise NotImplementedError('cubic polynomial inversion') + return _numpy.polyval(list(reversed(inverse_coefficients)), data-origin) + +def convert_volts_to_meters(config, data): + """Convert volt-valued data to meters. + + >>> config = _config._AxisConfig() + >>> config['gain'] = 20.0 + >>> config['sensitivity'] = 8e-9 + >>> convert_volts_to_meters(config, 1) + 1.6e-07 + >>> convert_volts_to_meters( + ... config, _numpy.array([1, 6, 17, 34], dtype=_numpy.float)) + ... # doctest: +ELLIPSIS + array([ 1.6...e-07, 9.6...e-07, 2.7...e-06, + 5.4...e-06]) + """ + return data * config['gain'] * config['sensitivity'] + +def convert_meters_to_volts(config, data): + """Convert bit-valued data to volts. + + >>> config = _config._AxisConfig() + >>> config['gain'] = 20.0 + >>> config['sensitivity'] = 8e-9 + >>> convert_meters_to_volts(config, 1.6e-7) + 1.0 + >>> convert_meters_to_volts( + ... config, _numpy.array([1.6e-7, 9.6e-7, 2.72e-6, 5.44e-6], + ... dtype=_numpy.float)) + array([ 1., 6., 17., 34.]) + """ + return data / (config['gain'] * config['sensitivity']) + +def convert_bits_to_meters(channel_config, axis_config, data): + """Convert bit-valued data to meters. + + >>> channel_config = _config._ChannelConfig() + >>> channel_config['conversion-coefficients'] = [1, 2, 3] + >>> channel_config['conversion-origin'] = -1 + >>> axis_config = _config._AxisConfig() + >>> axis_config['gain'] = 20.0 + >>> axis_config['sensitivity'] = 8e-9 + >>> convert_bits_to_meters(channel_config, axis_config, 1) + ... # doctest: +ELLIPSIS + 2.7...e-06 + >>> convert_bits_to_meters( + ... channel_config, axis_config, + ... _numpy.array([-1, 0, 1, 2], dtype=_numpy.float)) + ... # doctest: +ELLIPSIS + array([ 1.6...e-07, 9.6...e-07, 2.7...e-06, + 5.4...e-06]) + """ + data = convert_bits_to_volts(channel_config, data) + return convert_volts_to_meters(axis_config, data) + +def convert_meters_to_bits(channel_config, axis_config, data): + """Convert meter-valued data to volts. + + >>> channel_config = _config._ChannelConfig() + >>> channel_config['inverse-conversion-coefficients'] = [1, 2, 3] + >>> channel_config['inverse-conversion-origin'] = -1 + >>> axis_config = _config._AxisConfig() + >>> axis_config['gain'] = 20.0 + >>> axis_config['sensitivity'] = 8e-9 + >>> convert_meters_to_bits(channel_config, axis_config, 1.6e-7) + 17.0 + >>> convert_meters_to_bits( + ... channel_config, axis_config, + ... _numpy.array([1.6e-7, 9.6e-7, 2.72e-6, 5.44e-6], + ... dtype=_numpy.float)) + array([ 17., 162., 1009., 3746.]) + """ + data = convert_meters_to_volts(axis_config, data) + return convert_volts_to_bits(channel_config, data) + +def _setup_channel_config(config, channel): + """Initialize the `ChannelConfig` `config` using the + `AnalogChannel` `channel`. + """ + config['device'] = channel.subdevice.device.filename + config['subdevice'] = channel.subdevice.index + config['channel'] = channel.index + config['maxdata'] = channel.get_maxdata() + config['range'] = channel.range.value + converter = channel.get_converter() + config['conversion-origin' + ] = converter.get_to_physical_expansion_origin() + config['conversion-coefficients' + ] = converter.get_to_physical_coefficients() + config['inverse-conversion-origin' + ] = converter.get_from_physical_expansion_origin() + config['inverse-conversion-coefficients' + ] = converter.get_from_physical_coefficients() + + +class PiezoAxis (object): + """A one-dimensional piezoelectric actuator. + + If used, the montoring channel must (as of now) be on the same + device as the controlling channel. + + >>> from pprint import pprint + >>> from pycomedi.device import Device + >>> from pycomedi.subdevice import StreamingSubdevice + >>> from pycomedi.channel import AnalogChannel + >>> from pycomedi.constant import (AREF, SUBDEVICE_TYPE, UNIT) + + >>> d = Device('/dev/comedi0') + >>> d.open() + + >>> s_in = d.find_subdevice_by_type(SUBDEVICE_TYPE.ai, + ... factory=StreamingSubdevice) + >>> s_out = d.find_subdevice_by_type(SUBDEVICE_TYPE.ao, + ... factory=StreamingSubdevice) + + >>> axis_channel = s_out.channel( + ... 0, factory=AnalogChannel, aref=AREF.ground) + >>> monitor_channel = s_in.channel( + ... 0, factory=AnalogChannel, aref=AREF.diff) + >>> for chan in [axis_channel, monitor_channel]: + ... chan.range = chan.find_range(unit=UNIT.volt, min=-10, max=10) + + >>> axis_config = _config._AxisConfig() + >>> axis_config.update({'gain':20, 'sensitivity':8e-9}) + >>> axis_channel_config = _config._ChannelConfig() + >>> monitor_channel_config = _config._ChannelConfig() + >>> monitor_channel_config['device'] = '/dev/comediX' + + >>> p = PiezoAxis(axis_config=axis_config, + ... axis_channel_config=axis_channel_config, + ... monitor_channel_config=monitor_channel_config) + ... # doctest: +NORMALIZE_WHITESPACE + Traceback (most recent call last): + ... + NotImplementedError: piezo axis control and monitor on different devices + (/dev/comedi0 and /dev/comediX) + + >>> monitor_channel_config['device'] = axis_channel_config['device'] + >>> p = PiezoAxis(axis_config=axis_config, + ... axis_channel_config=axis_channel_config, + ... monitor_channel_config=monitor_channel_config, + ... axis_channel=axis_channel, monitor_channel=monitor_channel) + + >>> p.setup_config() + >>> pprint(axis_channel_config) + {'channel': 0, + 'conversion-coefficients': array([ -1.00000000e+01, 3.05180438e-04]), + 'conversion-origin': 0.0, + 'device': '/dev/comedi0', + 'inverse-conversion-coefficients': array([ 0. , 3276.75]), + 'inverse-conversion-origin': -10.0, + 'maxdata': 65535L, + 'range': 0, + 'subdevice': 1} + >>> pprint(monitor_channel_config) + {'channel': 0, + 'conversion-coefficients': array([ -1.00000000e+01, 3.05180438e-04]), + 'conversion-origin': 0.0, + 'device': '/dev/comedi0', + 'inverse-conversion-coefficients': array([ 0. , 3276.75]), + 'inverse-conversion-origin': -10.0, + 'maxdata': 65535L, + 'range': 0, + 'subdevice': 0} + + >>> convert_bits_to_meters(p.axis_channel_config, p.axis_config, 0) + ... # doctest: +ELLIPSIS + -1.6...e-06 + + >>> d.close() + """ + def __init__(self, axis_config, axis_channel_config, + monitor_channel_config=None, + axis_channel=None, monitor_channel=None, + name = None): + self.axis_config = axis_config + self.axis_channel_config = axis_channel_config + self.monitor_channel_config = monitor_channel_config + if (monitor_channel_config and + axis_channel_config['device'] != monitor_channel_config['device']): + raise NotImplementedError( + ('piezo axis control and monitor on different devices ' + '(%s and %s)') % ( + axis_channel_config['device'], + monitor_channel_config['device'])) + if not axis_channel: + raise NotImplementedError( + 'pypiezo not yet capable of opening its own axis channel') + #axis_channel = pycomedi... + self.axis_channel = axis_channel + if monitor_channel_config and not monitor_channel: + raise NotImplementedError( + 'pypiezo not yet capable of opening its own monitor channel') + #monitor_channel = pycomedi... + self.monitor_channel = monitor_channel + self.name = name + + def setup_config(self): + "Initialize the axis (and monitor) configs." + _setup_channel_config(self.axis_channel_config, self.axis_channel) + if self.monitor_channel: + _setup_channel_config( + self.monitor_channel_config, self.monitor_channel) + if self.axis_config['minimum'] is None: + self.axis_config['minimum'] = convert_bits_to_volts( + self.axis_channel_config, 0) + if self.axis_config['maximum'] is None: + self.axis_config['maximum'] = convert_bits_to_volts( + self.axis_channel_config, self.axis_channel.get_maxdata()) + + +class InputChannel(object): + """An input channel monitoring some interesting parameter. + + >>> from pprint import pprint + >>> from pycomedi.device import Device + >>> from pycomedi.subdevice import StreamingSubdevice + >>> from pycomedi.channel import AnalogChannel + >>> from pycomedi.constant import AREF, SUBDEVICE_TYPE, UNIT + + >>> d = Device('/dev/comedi0') + >>> d.open() + + >>> s = d.find_subdevice_by_type(SUBDEVICE_TYPE.ai, + ... factory=StreamingSubdevice) + + >>> channel = s.channel(0, factory=AnalogChannel, aref=AREF.diff) + >>> channel.range = channel.find_range(unit=UNIT.volt, min=-10, max=10) + + >>> channel_config = _config._ChannelConfig() + + >>> c = InputChannel(channel_config=channel_config, channel=channel) + >>> c.setup_config() + >>> pprint(channel_config) + {'channel': 0, + 'conversion-coefficients': array([ -1.00000000e+01, 3.05180438e-04]), + 'conversion-origin': 0.0, + 'device': '/dev/comedi0', + 'inverse-conversion-coefficients': array([ 0. , 3276.75]), + 'inverse-conversion-origin': -10.0, + 'maxdata': 65535L, + 'range': 0, + 'subdevice': 0} + + >>> convert_bits_to_volts(c.channel_config, 0) + -10.0 + + >>> d.close() + """ + def __init__(self, channel_config, channel=None, name=None): + self.channel_config = channel_config + if not channel: + raise NotImplementedError( + 'pypiezo not yet capable of opening its own channel') + #channel = pycomedi... + self.channel = channel + self.name = name + + def setup_config(self): + _setup_channel_config(self.channel_config, self.channel) + + +class Piezo (object): + """A piezo actuator-controlled experiment. + + >>> from pprint import pprint + >>> from pycomedi.device import Device + >>> from pycomedi.subdevice import StreamingSubdevice + >>> from pycomedi.channel import AnalogChannel + >>> from pycomedi.constant import AREF, SUBDEVICE_TYPE, UNIT + + >>> d = Device('/dev/comedi0') + >>> d.open() + + >>> s_in = d.find_subdevice_by_type(SUBDEVICE_TYPE.ai, + ... factory=StreamingSubdevice) + >>> s_out = d.find_subdevice_by_type(SUBDEVICE_TYPE.ao, + ... factory=StreamingSubdevice) + + >>> axis_channel = s_out.channel( + ... 0, factory=AnalogChannel, aref=AREF.ground) + >>> monitor_channel = s_in.channel( + ... 0, factory=AnalogChannel, aref=AREF.diff) + >>> input_channel = s_in.channel(1, factory=AnalogChannel, aref=AREF.diff) + >>> for chan in [axis_channel, monitor_channel, input_channel]: + ... chan.range = chan.find_range(unit=UNIT.volt, min=-10, max=10) + + >>> axis_config = _config._AxisConfig() + >>> axis_config.update({'gain':20, 'sensitivity':8e-9}) + >>> axis_channel_config = _config._ChannelConfig() + >>> monitor_channel_config = _config._ChannelConfig() + >>> input_channel_config = _config._ChannelConfig() + + >>> a = PiezoAxis(axis_config=axis_config, + ... axis_channel_config=axis_channel_config, + ... monitor_channel_config=monitor_channel_config, + ... axis_channel=axis_channel, monitor_channel=monitor_channel, + ... name='z') + >>> a.setup_config() + + >>> c = InputChannel( + ... channel_config=input_channel_config, channel=input_channel, + ... name='some-input') + >>> c.setup_config() + + >>> p = Piezo(axes=[a], input_channels=[c]) + >>> inputs = p.read_inputs() + >>> pprint(inputs) # doctest: +SKIP + {'some-input': 34494L, 'z-monitor': 32669L} + + >>> pos = convert_volts_to_bits(p.axes[0].axis_channel_config, 0) + >>> pos + 32767.5 + >>> p.jump('z', pos) + >>> p.last_output == {'z': int(pos)} + True + + :meth:`ramp` raises an error if passed an invalid `data` `dtype`. + + >>> output_data = _numpy.arange(0, int(pos), step=int(pos/10), + ... dtype=_numpy.float) + >>> output_data = output_data.reshape((len(output_data), 1)) + >>> input_data = p.ramp(data=output_data, frequency=10, + ... output_names=['z'], input_names=['z-monitor', 'some-input']) + Traceback (most recent call last): + ... + ValueError: output dtype float64 does not match expected + >>> output_data = _numpy.arange(0, int(pos), step=int(pos/10), + ... dtype=p.channel_dtype('z', direction='output')) + >>> output_data = output_data.reshape((len(output_data), 1)) + >>> input_data = p.ramp(data=output_data, frequency=10, + ... output_names=['z'], input_names=['z-monitor', 'some-input']) + >>> input_data # doctest: +SKIP + array([[ 0, 25219], + [ 3101, 23553], + [ 6384, 22341], + [ 9664, 21465], + [12949, 20896], + [16232, 20614], + [19516, 20588], + [22799, 20801], + [26081, 21233], + [29366, 21870], + [32646, 22686]], dtype=uint16) + + >>> p.last_output == {'z': output_data[-1]} + True + + >>> data = p.named_ramp(data=output_data, frequency=10, + ... output_names=['z'], input_names=['z-monitor', 'some-input']) + >>> pprint(data) # doctest: +ELLIPSIS, +SKIP + {'some-input': array([21666, 20566, ..., 22395], dtype=uint16), + 'z': array([ 0, 3276, ..., 32760], dtype=uint16), + 'z-monitor': array([ 3102, 6384, ..., 32647], dtype=uint16)} + + >>> d.close() + """ + def __init__(self, axes, input_channels, base_config=None): + self.axes = axes + self.input_channels = input_channels + self.last_output = {} + + def axis_by_name(self, name): + "Get an axis by its name." + for axis in self.axes: + if axis.name == name: + return axis + raise ValueError(name) + + def channels(self, direction=None): + """Iterate through all `(name, channel)` tuples. + + =========== =================== + `direction` Returned channels + =========== =================== + 'input' all input channels + 'output' all output channels + None all channels + =========== =================== + """ + if direction not in ('input', 'output', None): + raise ValueError(direction) + for a in self.axes: + if direction != 'input': + yield (a.name, a.axis_channel) + if a.monitor_channel and direction != 'output': + yield ('%s-monitor' % a.name, a.monitor_channel) + if direction != 'output': + for c in self.input_channels: + yield (c.name, c.channel) + + def channel_by_name(self, name, direction=None): + """Get a channel by its name. + + Setting `direction` (see :meth:`channels`) may allow a more + efficient search. + """ + for n,channel in self.channels(direction=direction): + if n == name: + return channel + raise ValueError(name) + + def channel_dtype(self, channel_name, direction=None): + """Get a channel's data type by name. + + Setting `direction` (see :meth:`channels`) may allow a more + efficient search. + """ + channel = self.channel_by_name(name=channel_name, direction=direction) + return channel.subdevice.get_dtype() + + def read_inputs(self): + "Read all inputs and return a `name`->`value` dictionary." + # There is no multi-channel read instruction, so preform reads + # sequentially. + ret = dict([(n, c.data_read()) for n,c in self.channels('input')]) + _LOG.debug('current position: %s' % ret) + return ret + + def jump(self, axis_name, position): + "Move the output named `axis_name` to `position`." + _LOG.debug('jump %s to %s' % (axis_name, position)) + position = int(position) + channel = self.channel_by_name(name=axis_name) + channel.data_write(position) + self.last_output[axis_name] = position + + def ramp(self, data, frequency, output_names, input_names=()): + """Synchronized IO ramp writing `data` and reading `in_names`. + + Parameters + ---------- + data : numpy array-like + Row for each cycle, column for each output channel. + frequency : float + Target cycle frequency in Hz. + output_names : list of strings + Names of output channels in the same order as the columns + of `data`. + input_names : list of strings + Names of input channels to monitor in the same order as + the columns of the returned array. + """ + if len(data.shape) != 2: + raise ValueError( + 'ramp data must be two dimensional, not %d' % len(data.shape)) + if data.shape[1] != len(output_names): + raise ValueError( + 'ramp data should have on column for each input, ' + 'but has %d columns for %d inputs' + % (data.shape[1], len(output_names))) + n_samps = data.shape[0] + log_string = 'ramp %d samples at %g Hz. out: %s, in: %s' % ( + n_samps, frequency, output_names, input_names) + _LOG.debug(log_string) # _LOG on one line for easy commenting-out + # TODO: check range? + output_channels = [self.channel_by_name(name=n, direction='output') + for n in output_names] + input_channels = [self.channel_by_name(name=n, direction='input') + for n in input_names] + + ao_subdevice = output_channels[0].subdevice + ai_subdevice = input_channels[0].subdevice + device = ao_subdevice.device + + output_dtype = ao_subdevice.get_dtype() + if data.dtype != output_dtype: + raise ValueError('output dtype %s does not match expected %s' + % (data.dtype, output_dtype)) + input_data = _numpy.ndarray( + (n_samps, len(input_channels)), dtype=ai_subdevice.get_dtype()) + + _LOG.debug('setup ramp commands') + scan_period_ns = int(1e9 / frequency) + ai_cmd = ai_subdevice.get_cmd_generic_timed( + len(input_channels), scan_period_ns) + ao_cmd = ao_subdevice.get_cmd_generic_timed( + len(output_channels), scan_period_ns) + + ai_cmd.start_src = TRIG_SRC.int + ai_cmd.start_arg = 0 + ai_cmd.stop_src = TRIG_SRC.count + ai_cmd.stop_arg = n_samps + ai_cmd.chanlist = input_channels + #ao_cmd.start_src = TRIG_SRC.ext + #ao_cmd.start_arg = 18 # NI card AI_START1 internal AI start signal + ao_cmd.start_src = TRIG_SRC.int + ao_cmd.start_arg = 0 + ao_cmd.stop_src = TRIG_SRC.count + ao_cmd.stop_arg = n_samps-1 + ao_cmd.chanlist = output_channels + + ai_subdevice.cmd = ai_cmd + ao_subdevice.cmd = ao_cmd + for i in range(3): + rc = ai_subdevice.command_test() + if rc is None: break + _LOG.debug('analog input test %d: %s' % (i, rc)) + for i in range(3): + rc = ao_subdevice.command_test() + if rc is None: break + _LOG.debug('analog output test %d: %s' % (i, rc)) + + _LOG.debug('lock subdevices for ramp') + ao_subdevice.lock() + try: + ai_subdevice.lock() + try: + _LOG.debug('load ramp commands') + ao_subdevice.command() + ai_subdevice.command() + + writer = Writer(ao_subdevice, data) + writer.start() + reader = Reader(ai_subdevice, input_data) + reader.start() + + _LOG.debug('arm analog output') + device.do_insn(inttrig_insn(ao_subdevice)) + _LOG.debug('trigger ramp (via analog input)') + device.do_insn(inttrig_insn(ai_subdevice)) + _LOG.debug('ramp running') + + writer.join() + reader.join() + _LOG.debug('ramp complete') + finally: + #_LOG.debug('AI flags: %s' % ai_subdevice.get_flags()) + ai_subdevice.cancel() + ai_subdevice.unlock() + finally: + # release busy flag, which seems to not be cleared + # automatically. See + # http://groups.google.com/group/comedi_list/browse_thread/thread/4c7040989197abad/ + #_LOG.debug('AO flags: %s' % ao_subdevice.get_flags()) + ao_subdevice.cancel() + ao_subdevice.unlock() + _LOG.debug('unlocked subdevices after ramp') + + for i,name in enumerate(output_names): + self.last_output[name] = data[-1,i] + + if _base_config['matplotlib']: + if not _matplotlib: + raise _matplotlib_import_error + figure = _matplotlib_pyplot.figure() + axes = figure.add_subplot(1, 1, 1) + axes.hold(True) + timestamp = _time.strftime('%H%M%S') + axes.set_title('piezo ramp %s' % timestamp) + for d,names in [(data, output_names), + (input_data, input_names)]: + for i,name in enumerate(names): + axes.plot(d[:,i], label=name) + figure.show() + return input_data + + def named_ramp(self, data, frequency, output_names, input_names=()): + input_data = self.ramp( + data=data, frequency=frequency, output_names=output_names, + input_names=input_names) + ret = {} + for i,name in enumerate(output_names): + ret[name] = data[:,i] + for i,name in enumerate(input_names): + ret[name] = input_data[:,i] + return ret diff --git a/pypiezo/config.py b/pypiezo/config.py new file mode 100644 index 0000000..4b78d10 --- /dev/null +++ b/pypiezo/config.py @@ -0,0 +1,708 @@ +"""Piezo configuration + +Broken out from the main modules to make it easy to override if you +wish to use a different configuration file format. + +The HDF5- and YAML-backed config file classes are created dynamically: + +>>> print '\\n'.join([obj for obj in sorted(locals().keys()) +... if obj.endswith('Config') +... and not obj.startswith('_')]) +HDF5_AxisConfig +HDF5_BaseConfig +HDF5_ChannelConfig +HDF5_InputChannelConfig +HDF5_OutputChannelConfig +YAML_AxisConfig +YAML_BaseConfig +YAML_ChannelConfig +YAML_InputChannelConfig +YAML_OutputChannelConfig + +The first time you use them, the file they create will probably be +empty or not exist. + +>>> import os +>>> import tempfile +>>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='pypiezo-') +>>> os.close(fd) + +>>> c = HDF5_BaseConfig(filename=filename, group='/base') +>>> c.load() + +Loading will create a stub group group if it hadn't existed before. + +>>> pprint_HDF5(filename) +/ + /base +>>> print c.dump(from_file=True) + + +Saving fills in all the config values. + +>>> c['syslog'] = True +>>> c.save() +>>> pprint_HDF5(filename) # doctest: +REPORT_UDIFF +/ + /base + + warn + + no + + yes +>>> print c.dump(from_file=True) +log-level: warn +matplotlib: no +syslog: yes + +If you want more details, you can dump with help strings. + +>>> print c.dump(help=True, from_file=True) # doctest: +NORMALIZE_WHITESPACE +log-level: warn\t(Module logging level. Default: warn. Choices: + \t critical, error, warn, info, debug) +matplotlib: no\t(Plot piezo motion using `matplotlib`. Default: no. + \t Choices: yes, no) +syslog: yes\t(Log to syslog (otherwise log to stderr). Default: no. + \t Choices: yes, no) + +Settings also support `None`, even if they have numeric types. + +>>> c = HDF5_AxisConfig(filename=filename, group='/z-axis') +>>> c.load() +>>> c.save() +>>> c.load() +>>> print c.dump(from_file=True) +gain: 1.0 +maximum: None +minimum: None +sensitivity: 1.0 +>>> print (c['minimum'] == None) +True + +Cleanup our temporary config file. + +>>> os.remove(filename) +""" + +import logging as _logging +import os.path as _os_path +import sys as _sys + +import h5py as _h5py +import yaml as _yaml + +from . import LOG as _LOG + + +class _Setting (object): + "A named setting with arbitrart text values." + def __init__(self, name, help='', default=None): + self.name = name + self._help = help + self.default = default + + def __str__(self): + return '<%s %s>' % (self.__class__.__name__, self.name) + + def __repr__(self): + return self.__str__() + + def help(self): + ret = '%s Default: %s.' % ( + self._help, self.convert_to_text(self.default)) + return ret.strip() + + def convert_from_text(self, value): + return value + + def convert_to_text(self, value): + return value + + +class _ChoiceSetting (_Setting): + """A named setting with a limited number of possible values. + + `choices` should be a list of `(config_file_value, Python value)` + pairs. For example + + >>> s = _ChoiceSetting(name='bool', + ... choices=[('yes', True), ('no', False)]) + >>> s.convert_from_text('yes') + True + >>> s.convert_to_text(True) + 'yes' + >>> s.convert_to_text('invalid') + Traceback (most recent call last): + ... + ValueError: invalid + >>> s.help() + 'Default: yes. Choices: yes, no' + """ + def __init__(self, choices=None, **kwargs): + if 'default' not in kwargs: + if None not in [keyval[1] for keyval in choices]: + kwargs['default'] = choices[0][1] + super(_ChoiceSetting, self).__init__(**kwargs) + if choices == None: + choices = [] + self.choices = choices + + def help(self): + ret = '%s Choices: %s' % ( + super(_ChoiceSetting, self).help(), + ', '.join([key for key,value in self.choices])) + return ret.strip() + + def convert_from_text(self, value): + return dict(self.choices)[value] + + def convert_to_text(self, value): + for keyval in self.choices: + key,val = keyval + if val == value: + return key + raise ValueError(value) + + +class _BooleanSetting (_ChoiceSetting): + """A named settubg that can be either true or false. + + >>> s = _BooleanSetting(name='bool') + + >>> s.convert_from_text('yes') + True + >>> s.convert_to_text(True) + 'yes' + >>> s.convert_to_text('invalid') + Traceback (most recent call last): + ... + ValueError: invalid + >>> s.help() + 'Default: no. Choices: yes, no' + """ + def __init__(self, **kwargs): + assert 'choices' not in kwargs + if 'default' not in kwargs: + kwargs['default'] = False + super(_BooleanSetting, self).__init__( + choices=[('yes', True), ('no', False)], **kwargs) + + +class _NumericSetting (_Setting): + """A named setting with numeric values. + + >>> s = _NumericSetting(name='float') + >>> s.default + 0 + >>> s.convert_to_text(13) + '13' + """ + _default_value = 0 + + def __init__(self, **kwargs): + if 'default' not in kwargs: + kwargs['default'] = self._default_value + super(_NumericSetting, self).__init__(**kwargs) + + def convert_to_text(self, value): + return str(value) + + def convert_from_text(self, value): + if value in [None, 'None']: + return None + return self._convert_from_text(value) + + def _convert_from_text(self, value): + raise NotImplementedError() + + +class _IntegerSetting (_NumericSetting): + """A named setting with integer values. + + >>> s = _IntegerSetting(name='int') + >>> s.default + 1 + >>> s.convert_from_text('8') + 8 + """ + _default_value = 1 + + def _convert_from_text(self, value): + return int(value) + + +class _FloatSetting (_NumericSetting): + """A named setting with floating point values. + + >>> s = _FloatSetting(name='float') + >>> s.default + 1.0 + >>> s.convert_from_text('8') + 8.0 + >>> s.convert_from_text('invalid') + Traceback (most recent call last): + ... + ValueError: invalid literal for float(): invalid + """ + _default_value = 1.0 + + def _convert_from_text(self, value): + return float(value) + + +class _FloatListSetting (_Setting): + """A named setting with a list of floating point values. + + >>> s = _FloatListSetting(name='floatlist') + >>> s.default + [] + >>> s.convert_to_text([1, 2.3]) + '1, 2.3' + >>> s.convert_from_text('4.5, -6.7') # doctest: +ELLIPSIS + [4.5, -6.700...] + >>> s.convert_to_text([]) + '' + >>> s.convert_from_text('') + [] + """ + def __init__(self, **kwargs): + if 'default' not in kwargs: + kwargs['default'] = [] + super(_FloatListSetting, self).__init__(**kwargs) + + def _convert_from_text(self, value): + if value is None: + return value + return float(value) + + def convert_from_text(self, value): + if value is None: + return None + elif value == '': + return [] + return [self._convert_from_text(x) for x in value.split(',')] + + def convert_to_text(self, value): + if value is None: + return None + return ', '.join([str(x) for x in value]) + + +class _Config (dict): + "A class with a list `._keys` of `_Setting`\s." + settings = [] + + def __init__(self): + for s in self.settings: + self[s.name] = s.default + + def dump(self, help=False): + """Return all settings and their values as a string + + >>> b = _BaseConfig() + >>> print b.dump() + syslog: no + matplotlib: no + log-level: warn + >>> print b.dump(help=True) # doctest: +NORMALIZE_WHITESPACE + syslog: no (Log to syslog (otherwise log to stderr). + Default: no. Choices: yes, no) + matplotlib: no (Plot piezo motion using `matplotlib`. + Default: no. Choices: yes, no) + log-level: warn (Module logging level. Default: warn. + Choices: critical, error, warn, info, debug) + """ + lines = [] + settings = dict([(s.name, s) for s in self.settings]) + for key,value in self.iteritems(): + if key in settings: + setting = settings[key] + value_string = setting.convert_to_text(value) + if help: + help_string = '\t(%s)' % setting.help() + else: + help_string = '' + lines.append('%s: %s%s' % (key, value_string, help_string)) + return '\n'.join(lines) + + +class _BackedConfig (_Config): + "A `_Config` instance with some kind of storage interface" + def load(self): + raise NotImplementedError() + + def save(self): + raise NotImplementedError() + + +class _BaseConfig (_Config): + """Configure `pypiezo` module operation + + >>> b = _BaseConfig() + >>> b.settings # doctest: +NORMALIZE_WHITESPACE + [<_ChoiceSetting log-level>, <_BooleanSetting syslog>, + <_BooleanSetting matplotlib>] + >>> print b['log-level'] == _logging.WARN + True + """ + settings = [ + _ChoiceSetting( + name='log-level', + help='Module logging level.', + default=_logging.WARN, + choices=[ + ('critical', _logging.CRITICAL), + ('error', _logging.ERROR), + ('warn', _logging.WARN), + ('info', _logging.INFO), + ('debug', _logging.DEBUG), + ]), + _BooleanSetting( + name='syslog', + help='Log to syslog (otherwise log to stderr).', + default=False), + _BooleanSetting( + name='matplotlib', + help='Plot piezo motion using `matplotlib`.', + default=False), + ] + + +class _AxisConfig (_Config): + "Configure a single piezo axis" + settings = [ + _FloatSetting( + name='gain', + help=( + 'Volts applied at piezo per volt output from the DAQ card ' + '(e.g. if your DAQ output is amplified before driving the ' + 'piezo),')), + _FloatSetting( + name='sensitivity', + help='Meters of piezo deflection per volt applied to the piezo.'), + _FloatSetting( + name='minimum', + help='Set a lower limit on allowed output voltage', + default=None), + _FloatSetting( + name='maximum', + help='Set an upper limit on allowed output voltage', + default=None), + ] + + +class _ChannelConfig (_Config): + settings = [ + _Setting( + name='device', + help='Comedi device.', + default='/dev/comedi0'), + _IntegerSetting( + name='subdevice', + help='Comedi subdevice index. -1 for automatic detection.', + default=-1), + _IntegerSetting( + name='channel', + help='Subdevice channel index.', + default=0), + _IntegerSetting( + name='maxdata', + help="Channel's maximum bit value."), + _IntegerSetting( + name='range', + help="Channel's selected range index."), + _FloatListSetting( + name='conversion-coefficients', + help=('Bit to physical unit conversion coefficients starting with ' + 'the constant coefficient.')), + _FloatSetting( + name='conversion-origin', + help=('Origin (bit offset) of bit to physical polynomial ' + 'expansion.')), + _FloatListSetting( + name='inverse-conversion-coefficients', + help=('Physical unit to bit conversion coefficients starting with ' + 'the constant coefficient.')), + _FloatSetting( + name='inverse-conversion-origin', + help=('Origin (physical unit offset) of physical to bit ' + 'polynomial expansion.')), + ] + + +class _OutputChannelConfig (_ChannelConfig): + pass + + +class _InputChannelConfig (_ChannelConfig): + pass + + +def pprint_HDF5(*args, **kwargs): + print pformat_HDF5(*args, **kwargs) + +def pformat_HDF5(filename, group='/'): + f = _h5py.File(filename, 'r') + cwg = f[group] + return '\n'.join(_pformat_hdf5(cwg)) + +def _pformat_hdf5(cwg, depth=0): + lines = [] + lines.append(' '*depth + cwg.name) + depth += 1 + for key,value in cwg.iteritems(): + if isinstance(value, _h5py.Group): + lines.extend(_pformat_hdf5(value, depth)) + elif isinstance(value, _h5py.Dataset): + lines.append(' '*depth + str(value)) + lines.append(' '*(depth+1) + str(value.value)) + else: + lines.append(' '*depth + str(value)) + return lines + + +class _HDF5Config (_BackedConfig): + """Mixin to back a `_Config` class with an HDF5 file. + + TODO: Special handling for Choice (enums), FloatList (arrays), etc.? + + The `.save` and `.load` methods have an optional `group` argument + that allows you to save and load settings from an externally + opened HDF5 file. This can make it easier to stash several + related `_Config` classes in a single file. For example + + >>> import os + >>> import tempfile + >>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='pypiezo-') + >>> os.close(fd) + + >>> f = _h5py.File(filename, 'a') + >>> c = HDF5_BaseConfig(filename='untouched_file.h5', + ... group='/untouched/group') + >>> c['syslog'] = True + >>> group = f.create_group('base') + >>> c.save(group) + >>> pprint_HDF5(filename) + / + /base + + warn + + no + + yes + >>> d = HDF5_BaseConfig(filename='untouched_file.h5', + ... group='/untouched/group') + >>> d.load(group) + >>> d['syslog'] + True + + >>> f.close() + >>> os.remove(filename) + """ + def __init__(self, filename, group='/', **kwargs): + super(_HDF5Config, self).__init__(**kwargs) + self.filename = filename + assert group.startswith('/'), group + if not group.endswith('/'): + group += '/' + self.group = group + self._file_checked = False + + def _check_file(self): + if self._file_checked: + return + self._setup_file() + self._file_checked = True + + def _setup_file(self): + f = _h5py.File(self.filename, 'a') + cwg = f # current working group + + # Create the group where the settings are stored (if necessary) + gpath = [''] + for group in self.group.strip('/').split('/'): + gpath.append(group) + if group not in cwg.keys(): + _LOG.debug('creating group %s in %s' + % ('/'.join(gpath), self.filename)) + cwg.create_group(group) + cwg = cwg[group] + f.close() + + def dump(self, help=False, from_file=False): + """Return the relevant group in `self.filename` as a string + + Extends the base :meth:`dump` by adding the `from_file` + option. If `from_file` is true, dump all entries that + currently exist in the relevant group, rather than listing all + settings defined in the instance dictionary. + """ + if from_file: + self._check_file() + f = _h5py.File(self.filename, 'r') + cwg = f[self.group] + lines = [] + settings = dict([(s.name, s) for s in self.settings]) + for key,value in cwg.iteritems(): + if help and key in settings: + help_string = '\t(%s)' % settings[key].help() + else: + help_string = '' + lines.append('%s: %s%s' % (key, value.value, help_string)) + return '\n'.join(lines) + return super(_HDF5Config, self).dump(help=help) + + def load(self, group=None): + if group is None: + self._check_file() + f = _h5py.File(self.filename, 'r') + group = f[self.group] + else: + f = None + for s in self.settings: + if s.name not in group.keys(): + continue + self[s.name] = s.convert_from_text(group[s.name].value) + if f: + f.close() + + def save(self, group=None): + if group is None: + self._check_file() + f = _h5py.File(self.filename, 'a') + group = f[self.group] + else: + f = None + for s in self.settings: + group[s.name] = s.convert_to_text(self[s.name]) + if f: + f.close() + + +class _YAMLDumper (_yaml.SafeDumper): + def represent_bool(self, data): + "Use yes/no instead of the default true/false" + if data: + value = u'yes' + else: + value = u'no' + return self.represent_scalar(u'tag:yaml.org,2002:bool', value) + + +_YAMLDumper.add_representer(bool, _YAMLDumper.represent_bool) + + +class _YAMLConfig (_BackedConfig): + """Mixin to back a `_Config` class with a YAML file. + + TODO: Special handling for Choice (enums), FloatList (arrays), etc.? + + >>> import os + >>> import os.path + >>> import tempfile + >>> fd,filename = tempfile.mkstemp(suffix='.yaml', prefix='pypiezo-') + >>> os.close(fd) + + >>> c = YAML_BaseConfig(filename=filename) + >>> c.load() + + Saving writes all the config values to disk. + + >>> c['syslog'] = True + >>> c.save() + >>> print open(c.filename, 'r').read() + log-level: warn + matplotlib: no + syslog: yes + + + Loading reads the config files from disk. + + >>> c = YAML_BaseConfig(filename=filename) + >>> c.load() + >>> print c.dump() + syslog: yes + matplotlib: no + log-level: warn + + Cleanup our temporary config file. + + >>> os.remove(filename) + """ + dumper = _YAMLDumper + + def __init__(self, filename, **kwargs): + super(_YAMLConfig, self).__init__(**kwargs) + self.filename = filename + + def load(self): + if not _os_path.exists(self.filename): + open(self.filename, 'a').close() + with open(self.filename, 'r') as f: + data = _yaml.safe_load(f) + if data == None: + return # empty file + settings = dict([(s.name, s) for s in self.settings]) + for key,value in data.iteritems(): + setting = settings[key] + if isinstance(setting, _BooleanSetting): + v = value + else: + v = setting.convert_from_text(value) + self[key] = v + + def save(self): + data = {} + settings = dict([(s.name, s) for s in self.settings]) + for key,value in self.iteritems(): + if key in settings: + setting = settings[key] + if isinstance(setting, _BooleanSetting): + v = value + else: + v = setting.convert_to_text(value) + data[key] = v + with open(self.filename, 'w') as f: + _yaml.dump(data, stream=f, Dumper=self.dumper, + default_flow_style=False) + + +# Define HDF5- and YAML-backed subclasses of the basic _Config types. +for name,obj in locals().items(): + if (obj != _Config and + type(obj) == type and + issubclass(obj, _Config) and + not issubclass(obj, _BackedConfig)): + for prefix,base in [('HDF5', _HDF5Config), ('YAML', _YAMLConfig)]: + _name = '%s%s' % (prefix, name) + _bases = (base, obj) + _dict = {} + _class = type(_name, _bases, _dict) + setattr(_sys.modules[__name__], _name, _class) + +del name, obj, prefix, base, _name, _bases, _dict, _class + + +def find_base_config(): + "Return the best `_BaseConfig` match after scanning the filesystem" + _LOG.info('looking for base_config file') + user_basepath = _os_path.join(_os_path.expanduser('~'), '.pypiezorc') + system_basepath = _os_path.join('/etc', 'pypiezo', 'config') + distributed_basepath = _os_path.join('/usr', 'share', 'pypiezo', 'config') + for basepath in [user_basepath, system_basepath, distributed_basepath]: + for (extension, config) in [('.h5', HDF5_BaseConfig), + ('.yaml', YAML_BaseConfig)]: + filename = basepath + extension + if _os_path.exists(filename): + _LOG.info('base_config file found at %s' % filename) + base_config = config(filename) + base_config.load() + return base_config + else: + _LOG.debug('no base_config file at %s' % filename) + _LOG.info('new base_config file at %s' % filename) + basepath = user_basepath + filename = basepath + extension + return config(filename) diff --git a/pypiezo/surface.py b/pypiezo/surface.py new file mode 100644 index 0000000..79033c2 --- /dev/null +++ b/pypiezo/surface.py @@ -0,0 +1,258 @@ +# Copyright (C) 2008-2011 W. Trevor King +# +# This file is part of pypiezo. +# +# pypiezo is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# pypiezo is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with pypiezo. If not, see . + +"""Utilities detecting the position of the sample surface. + +TODO: doctests +""" + +SLEEP_DURING_SURF_POS_AQUISITION = False # doesn't help + +#from time import sleep as _sleep +import numpy as _numpy +from scipy.optimize import leastsq as _leastsq + +try: + import matplotlib as _matplotlib + import matplotlib.pyplot as _matplotlib_pyplot + import time as _time # for timestamping lines on plots +except ImportError, e: + _matplotlib = None + _matplotlib_import_error = e + +from . import LOG as _LOG +from . import base_config as _base_config +from . import base as _base + + +class SurfaceError (Exception): + pass + + +class PoorFit (SurfaceError): + pass + + +class PoorGuess (PoorFit): + pass + + +class FlatFit (PoorFit): + "Raised for slope in the non-contact region, or no slope in contact region" + def __init__(self, left_slope, right_slope): + self.left_slope = left_slope + self.right_slope = right_slope + msg = 'slopes not sufficiently different: %g and %g' % ( + left_slope, right_slope) + super(FlatFit, self).__init__(msg) + +class EdgeKink (PoorFit): + def __init__(self, kink, edge, window): + self.kink = kink + self.edge = edge + self.window = window + msg = 'no kink (kink %d not within %d of %d)' % (kink, window, edge) + super(EdgeKink, self).__init__(self, msg) + + +def _linspace(*args, **kwargs): + dtype = kwargs.pop('dtype') + out = _numpy.linspace(*args, **kwargs) + return out.reshape((len(out), 1)).astype(dtype) + +def _get_min_max_positions(piezo, axis_name, min_position=None, + max_position=None): + output_axis = piezo.axis_by_name(axis_name) + if min_position is None: + min_position = _base.convert_volts_to_bits( + output_axis.axis_channel_config, + output_axis.axis_config['minimum']) + if max_position is None: + max_position = _base.convert_volts_to_bits( + output_axis.axis_channel_config, + output_axis.axis_config['maximum']) + return (min_position, max_position) + +def get_surface_position_data(piezo, axis_name, max_deflection, steps=2000, + frequency=10e3, min_position=None, + max_position=None): + "Measure the distance to the surface" + _LOG.debug('get surface position') + orig_position = piezo.last_output[axis_name] + # fully retract the piezo + min_position,max_position = _get_min_max_positions( + piezo, axis_name, min_position=min_position, max_position=max_position) + _LOG.debug('retract the piezo to %d' % min_position) + dtype = piezo.channel_dtype(axis_name, direction='output') + out = _linspace(orig_position, min_position, steps, dtype=dtype) + out = out.reshape((len(out), 1)).astype( + piezo.channel_dtype(axis_name, direction='output')) + ramp_kwargs = { + 'frequency': frequency, + 'output_names': [axis_name], + 'input_names': ['deflection'], + } + ret1 = piezo.named_ramp(data=out, **ramp_kwargs) + # locate high deflection position + _LOG.debug('approach until there is dangerous deflection (> %d)' + % max_deflection) + if SLEEP_DURING_SURF_POS_AQUISITION == True: + _sleep(.2) # sleeping briefly seems to reduce bounce? + mtpod = piezo.move_to_pos_or_def( + axis_name=axis_name, position=max_position, deflection=max_deflection, + step=(max_position-min_position)/steps, return_data=True) + high_contact_pos = piezo.last_output[axis_name] + # fully retract the piezo again + _LOG.debug('retract the piezo to %d again' % min_position) + if SLEEP_DURING_SURF_POS_AQUISITION == True: + _sleep(.2) + out = _linspace(high_contact_pos, min_position, steps, dtype=dtype) + ret2 = piezo.named_ramp(data=out, **ramp_kwargs) + # scan to the high contact position + _LOG.debug('ramp in to the deflected position %d' % high_contact_pos) + if SLEEP_DURING_SURF_POS_AQUISITION == True: + _sleep(.2) + out = _linspace(min_position, high_contact_pos, steps, dtype=dtype) + data = piezo.named_ramp(data=out, **ramp_kwargs) + if SLEEP_DURING_SURF_POS_AQUISITION == True: + _sleep(.2) + # return to the original position + _LOG.debug('return to the original position %d' % orig_position) + out = _linspace(high_contact_pos, orig_position, steps, dtype=dtype) + ret3 = piezo.named_ramp(data=out, **ramp_kwargs) + return {'ret1':ret1, 'mtpod':mtpod, 'ret2':ret2, + 'approach':data, 'ret3':ret3} + +def bilinear(x, params): + """bilinear fit for surface bumps. Model has two linear regimes + which meet at x=kink_position and have independent slopes. + + `x` should be a `numpy.ndarray`. + """ + left_offset,left_slope,kink_position,right_slope = params + left_mask = x < kink_position + right_mask = x >= kink_position # = not left_mask + left_y = left_offset + left_slope*x + right_y = (left_offset + left_slope*kink_position + + right_slope*(x-kink_position)) + return left_mask * left_y + right_mask * right_y + +def analyze_surface_position_data( + ddict, min_slope_ratio=10.0, kink_window=None, return_all_params=False): + """ + + min_slope_ratio : float + Minimum ratio between the non-contact "left" slope and the + contact "right" slope. + kink_window : int (in bits) or None + Raise `EdgeKink` if the kink is within `kink_window` of the + minimum or maximum `z` position during the approach. If + `None`, a default value of 2% of the approach range is used. + """ + # ususes ddict["approach"] for analysis + # the others are just along to be plotted + _LOG.debug('snalyze surface position data') + + data = ddict['approach'] + # analyze data, using bilinear model + # y = p0 + p1 x for x <= p2 + # = p0 + p1 p2 + p3 (x-p2) for x >= p2 + dump_before_index = 0 # 25 # HACK!! + # Generate a reasonable guess... + start_pos = int(data['z'].min()) + final_pos = int(data['z'].max()) + start_def = int(data['deflection'].min()) + final_def = int(data['deflection'].max()) + # start_def and start_pos are probably for 2 different points + _LOG.debug('min deflection %d, max deflection %d' + % (start_def, final_def)) + _LOG.debug('min position %d, max position %d' + % (start_pos, final_pos)) + + left_offset = start_def + left_slope = 0 + kink_position = (final_pos+start_pos)/2.0 + right_slope = 2.0*(final_def-start_def)/(final_pos-start_pos) + pstart = [left_offset, left_slope, kink_position, right_slope] + _LOG.debug('guessed params: %s' % pstart) + + offset_scale = (final_pos - start_pos)/100 + left_slope_scale = right_slope/10 + kink_scale = (final_pos-start_pos)/100 + right_slope_scale = right_slope + scale = [offset_scale, left_slope_scale, kink_scale, right_slope_scale] + _LOG.debug('guessed scale: %s' % scale) + + def residual(p, y, x): + Y = bilinear(x, p) + return Y-y + params,cov,info,mesg,ier = _leastsq( + residual, pstart, + args=(data["deflection"][dump_before_index:], + data["z"][dump_before_index:]), + full_output=True, maxfev=10000) + _LOG.debug('best fit parameters: %s' % (params,)) + + if _base_config['matplotlib']: + if not _matplotlib: + raise _matplotlib_import_error + figure = _matplotlib_pyplot.figure() + axes = figure.add_subplot(1, 1, 1) + axes.hold(True) + timestamp = _time.strftime('%H%M%S') + axes.set_title('surf_pos %5g %5g %5g %5g' % tuple(params)) + def plot_dict(d, label): + _pylab.plot(d["z"], d["deflection"], '.',label=label) + for n,name in [('ret1', 'first retract'), + ('mtpod', 'single step in'), + ('ret2', 'second retract'), + ('approach', 'main approach'), + ('ret3', 'return to start')]: + axes.plot(ddict[n]['z'], ddict[n]['deflection'], label=name) + def fit_fn(x, params): + if x <= params[2]: + return params[0] + params[1]*x + else: + return (params[0] + params[1]*params[2] + + params[3]*(x-params[2])) + axes.plot([start_pos, params[2], final_pos], + [fit_fn(start_pos, params), fit_fn(params[2], params), + fit_fn(final_pos, params)], '-',label='fit') + #_pylab.legend(loc='best') + figure.show() + + # check that the fit is reasonable + # params[1] is slope in non-contact region + # params[2] is kink position + # params[3] is slope in contact region + if kink_window is None: + kink_window = int(0.02*(final_pos-start_pos)) + + if abs(params[1]*min_slope_ratio) > abs(params[3]): + raise FlatFit(left_slope=params[1], right_slope=params[3]) + if params[2] < start_pos+kink_window: + raise EdgeKink(kink=params[2], edge=start_pos, window=kink_window) + if params[2] > final_pos-kink_window: + raise EdgeKink(kink=params[2], edge=final_pos, window=kink_window) + _LOG.debug('surface position %s' % params[2]) + if return_all_parameters: + return params + return params[2] + +def get_surface_position(piezo, axis_name, max_deflection, **kwargs): + ddict = get_surface_position_data(piezo, axis_name, max_deflection) + return analyze_surface_position_data(ddict, **kwargs) diff --git a/pypiezo/x_piezo.py b/pypiezo/x_piezo.py deleted file mode 100644 index 7dddaee..0000000 --- a/pypiezo/x_piezo.py +++ /dev/null @@ -1,106 +0,0 @@ -# Copyright (C) 2008-2011 W. Trevor King -# -# This file is part of pypiezo. -# -# pypiezo is free software; you can redistribute it and/or modify it -# under the terms of the GNU General Public License as published by the -# Free Software Foundation, either version 3 of the License, or (at your -# option) any later version. -# -# pypiezo is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with pypiezo. If not, see . - -from pycomedi.single_aio import AO - -class x_piezoError (Exception) : - pass - -class x_piezo : - def __init__(self, xp_chan=1) : - self.verbose = False - self.xpSensitivity = 34.0 # nm/Volt - self.gain = 20 # Vpiezo / Voutput - - self.AO = AO(chan=(xp_chan,)) - self.AO.close() - #self.AO = sngAO.sngAOobj([xp_chan], -10, 10) - - self.curOut = [self.nm2out(0)]*1 - self.xpMax = self.nm2out(2000) # limits at 2 microns - self.xpMin = self.nm2out(-2000) # and -2 microns - - self.wanderStep = self.nm2out(5) - self.nm2out(0) - # define default value changed callbacks - self.setExternalXPiezo = None - def curPos(self) : - return self.curOut[0] - def nm2out(self, nm) : - # nm / (nm/Vp * Vp/Vout) = Vout - # where Vout is Volts output by the DAQ card - # and Vp is Volts applied across the piezo - return self.phys_to_out(nm / (self.xpSensitivity * self.gain)) - def out2nm(self, out) : - return self.out_to_phys(out) * self.xpSensitivity * self.gain - def out_to_phys(self, output) : - return self.AO.comedi_to_phys(0, output) - def phys_to_out(self, physical) : - return self.AO.phys_to_comedi(0, physical) - def pCurVals(self) : - print "X piezo output : %6d" % self.curPos() - def jumpToPos(self, pos) : - self._check_range(pos) - if self.verbose : - print "Jump X piezo to %d (%g nm)" % (pos, self.out2nm(pos)) - self.curOut[0] = pos - self.AO.open() - #self.AO.reserve() - self.AO.write(self.curOut) - self.AO.close() - #self.AO.unreserve() - if self.setExternalXPiezo != None : - self.setExternalXPiezo(self.out2nm(self.curPos())) - return self.curPos() - def wander(self) : - """ - steps in one direction until it hits the boundary, - then reverses and walks back the other way. - """ - newPos = self.curPos() + self.wanderStep - if self.verbose : - print "wander from %d to %d" % (self.curPos(), newPos) - try : - self._check_range(newPos) - except Exception : - # reached one limit, so turn around - self.wanderStep = -self.wanderStep - newPos += 2*self.wanderStep - return self.jumpToPos(newPos) - def _check_range(self, pos) : - if pos > self.xpMax : - raise x_piezoError, "X piezo pos = %d > %d = max" % (pos, self.xpMax) - if pos < self.xpMin : - raise x_piezoError, "X piezo pos = %d < %d = min" % (pos, self.xpMin) - -def test() : - print "test x_piezo()" - xp = x_piezo() - xp.verbose = True - xp.jumpToPos(xp.nm2out(0)) - xp.wander() - xp.wander() - xp.wander() - xp.pCurVals() - curPos = xp.curPos() - curPosPhys = xp.out_to_phys(curPos) - curPos2 = xp.phys_to_out(curPosPhys) - if curPos != curPos2 : - raise x_piezoError, "Conversion %d -> %g -> %d not round trip" % (curPos, curPosPhys, curPos2) - print "success" - -if __name__ == "__main__" : - test() diff --git a/pypiezo/z_piezo.py b/pypiezo/z_piezo.py deleted file mode 100644 index a85ebab..0000000 --- a/pypiezo/z_piezo.py +++ /dev/null @@ -1,440 +0,0 @@ -# Copyright (C) 2008-2011 W. Trevor King -# -# This file is part of pypiezo. -# -# pypiezo is free software; you can redistribute it and/or modify it -# under the terms of the GNU General Public License as published by the -# Free Software Foundation, either version 3 of the License, or (at your -# option) any later version. -# -# pypiezo is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with pypiezo. If not, see . - -"""The z_piezo module controls the one-dimensional displacement of a -piezoelectric actuator, while allowing simultaneous data aquisition on -two channels. The module is called z_piezo, because for force spectroscopy, -the axis that needs simultaneous aquisition is the Z axis, although this -module could be used to provide control in any "fast" axis. - -There are a few globals controlling the behavior of the entire module. - - USE_ABCD_DEFLECTION (default False) - selects between using a preprocessed vertical deflection signal - and using the 'raw' 4 photodiode output segments. - TEXT_VERBOSE (default False) - print text messages to stderr displaying actions taken - PYLAB_INTERACTIVE_VERBOSE (default False) - display pylab graphs of any ramps taken - BASE_FIG_NUM (default 50) - the figure number for z_piezo pylab figures will be this number + {0,1,..} - LOG_RAMPS (default False) - save the input and output of any ramps to the LOG_DIR directory - LOG_DIR = '${DEFAULT}/z_piezo' - where the ramp logs are saved to - DEFAULT_ZP_CHAN (default 0) - output channel controlling the z piezo - DEFAULT_ZP_MON_CHAN (default 1) - input channel monitoring the z piezo signal - DEFAULT_DEF_CHAN (default 0, or in ABCD mode (2,3,4,5)) - input channel(s) monitoring deflection (or some other feedback signal) -""" - -USE_ABCD_DEFLECTION = False -TEXT_VERBOSE = False -PYLAB_INTERACTIVE_VERBOSE = False -BASE_FIG_NUM = 50 -LOG_RAMPS = False -LOG_DIR = '${DEFAULT}/z_piezo' - -# Hackish defaults for the calibcant.config module -DEFAULT_GAIN = 20 # Vpiezo / Voutput -DEFAULT_SENSITIVITY = 6.790 # nm_surface/Volt_piezo (for S/N 4546EV piezo from 2009/01/09 calibration on 200nm deep pits, 10 um pitch Digital Instruments P/N 498-000-026) -DEFAULT_ZERO_PHOTODIODE_BITS = 2**15 -DEFAULT_VPHOTO_IN_2_VOLTS = lambda vbits : (vbits-DEFAULT_ZERO_PHOTODIODE_BITS)/3276.8 -DEFAULT_VZP_OUT_2_VOLTS = lambda vbits : (vbits-DEFAULT_ZERO_PHOTODIODE_BITS)/3276.8 - -DEFAULT_ZP_CHAN = 0 -DEFAULT_ZP_MON_CHAN = 1 -DEFAULT_DEF_CHAN = 0 -if USE_ABCD_DEFLECTION : - DEFAULT_DEF_CHAN = (2,3,4,5) - # The 4 photodiode segments are - # A B - # C D - # looking in along the laser - -class NoComedi (ImportError): - "Missing comedi. Don't initialize a z_piezo instance." - pass - -try: - import pycomedi.single_aio as single_aio - import pycomedi.simult_aio as simult_aio - HAS_COMEDI = True -except ImportError, e: - HAS_COMEDI = False - -from numpy import array, arange, ones, zeros, linspace, uint16, float, sin, pi -from scipy.stats import linregress -import data_logger - -if PYLAB_INTERACTIVE_VERBOSE : - from pylab import figure, plot, title, legend, hold, subplot - import time # for timestamping lines on plots - -class error (Exception) : - pass -class outOfRange (error) : - pass - -# frontend: - -class z_piezo : - def __init__(self, zp_chan=DEFAULT_ZP_CHAN, - zp_mon_chan=DEFAULT_ZP_MON_CHAN, - def_chan=DEFAULT_DEF_CHAN) : - if HAS_COMEDI is not True: - raise NoComedi - self.sensitivity = DEFAULT_SENSITIVITY - self.gain = DEFAULT_GAIN - self.nm2Vout = self.sensitivity * self.gain # nm_surface / V_output - self.chan_info = _chan_info(zp_chan, zp_mon_chan, def_chan) - - self.curIn = array([0]*self.chan_info.numIn, dtype=uint16) - self.curOut = array([0]*self.chan_info.numOut, dtype=uint16) - self._jump = _z_piezo_jump(self.chan_info) - self._ramp = _z_piezo_ramp(self.chan_info) - self.zpMax = int(self.pos_nm2out(1000)) # limits at 1 micron - self.zpMin = int(self.pos_nm2out(-1000)) # and -1 micron - # define default value changed callbacks - self.setExternalZPiezo = None - self.setExternalZPiezoMon = None - self.setExternalDeflection = None - def __del__(self) : - pass - def curPos(self) : - return int(self.curOut[self.chan_info.zp_ind]) # cast as int so arithmetic is signed - def curPosMon(self) : - return int(self.curIn[self.chan_info.zp_mon_ind]) - def curDef(self) : - if USE_ABCD_DEFLECTION : - A = int(self.curIn[self.chan_info.def_ind[0]]) - B = int(self.curIn[self.chan_info.def_ind[1]]) - C = int(self.curIn[self.chan_info.def_ind[2]]) - D = int(self.curIn[self.chan_info.def_ind[3]]) - df = float((A+B)-(C+D))/(A+B+C+D) - dfout = int(df * 2**15) + 2**15 - if TEXT_VERBOSE : - print "Current deflection %d (%d, %d, %d, %d)" \ - % (dfout, A, B, C, D) - return dfout - else : # reading the deflection voltage directly - return int(self.curIn[self.chan_info.def_ind]) - def pos_out2V(self, out) : - return self._jump.out_to_phys(out) - def pos_V2out(self, V) : - # Vout is Volts output by the DAQ card - # and Vpiezo is Volts applied across the piezo - return self._jump.phys_to_out(V) - def pos_nm2out(self, nm) : - # Vout is Volts output by the DAQ card - # and Vpiezo is Volts applied across the piezo - return self.pos_V2out(nm / self.nm2Vout) - def pos_out2nm(self, out) : - return self.pos_out2V(out) * self.nm2Vout - def posMon_nm2out(self, nm) : - return self._jump.phys_to_out(nm / self.nm2Vout) - def posMon_out2nm(self, out) : - return self._jump.out_to_phys(out) * self.nm2Vout - def def_in2V(self, input) : - return self._jump.out_to_phys(input) - def def_V2in(self, physical) : - print physical, type(physical) # temporary debugging printout - return self._jump.phys_to_out(physical) - def curVals(self) : - return {'Z piezo output': self.curPos(), - 'Deflection input':self.curDef(), - 'Z piezo input':self.curPosMon()} - def pCurVals(self, curVals=None) : - if curVals == None : - curVals = self.curVals() - print "Z piezo output : %6d" % curVals["Z piezo output"] - print "Z piezo input : %6d" % curVals["Z piezo input"] - print "Deflection input: %6d" % curVals["Deflection input"] - def updateInputs(self) : - self.curIn = self._jump.updateInputs() - if self.setExternalZPiezoMon != None : - self.setExternalZPiezoMon(self.posMon_out2nm(self.curPosMon())) - if self.setExternalDeflection != None : - self.setExternalDeflection(self.def_in2V(self.curDef())) - def jumpToPos(self, pos) : - self._check_range(pos) - self.curOut[self.chan_info.zp_ind] = pos - self.curIn = self._jump.jumpToPos(self.curOut) - if self.setExternalZPiezo != None : - self.setExternalZPiezo(self.pos_out2nm(self.curPos())) - if TEXT_VERBOSE : - print "Jumped to" - self.pCurVals() - return self.curVals() - def ramp(self, out, freq) : - for pos in out : - self._check_range(pos) - out = self._ramp.ramp(out, freq) - self.curOut[self.chan_info.zp_ind] = out["Z piezo output"][-1] - self.curIn[self.chan_info.zp_mon_ind] = out["Z piezo input"][-1] - if USE_ABCD_DEFLECTION : - for i in range(4) : # i is the photodiode element (0->A, 1->B, ...) - self.curIn[i] = out["Deflection segment"][i][-1] - else : - self.curIn[self.chan_info.def_ind] = out["Deflection input"][-1] - - if self.setExternalZPiezo != None : - self.setExternalZPiezo(self.pos_out2nm(self.curPos())) - if self.setExternalZPiezoMon != None : - self.setExternalZPiezoMon(self.posMon_out2nm(self.curPosMon())) - if self.setExternalDeflection != None : - self.setExternalDeflection(self.def_in2V(self.curDef())) - if LOG_RAMPS : - log = data_logger.data_log(LOG_DIR, noclobber_logsubdir=False, - log_name="ramp") - if USE_ABCD_DEFLECTION : - v = out.pop("Deflection segment") - log.write_dict_of_arrays(out) - out["Deflection segment"] = v - else : - log.write_dict_of_arrays(out) - if TEXT_VERBOSE : - print "Ramped from (%g, %g) to (%g, %g) in %d points" \ - % (out["Z piezo output"][0], out["Deflection input"][0], - out["Z piezo output"][-1], out["Deflection input"][-1], - len(out["Z piezo output"])) - if PYLAB_INTERACTIVE_VERBOSE : - figure(BASE_FIG_NUM) - subplot(211) - title('ramp (def_in vs zp_out)') - timestamp = time.strftime('%H%M%S') - plot(out["Z piezo output"], out["Deflection input"], '.', label=timestamp) - subplot(212) - title('zp_in vs zp_out') - plot(out["Z piezo output"], out["Z piezo input"], '.', label=timestamp) - legend(loc='best') - return out - def _check_range(self, pos) : - if pos > self.zpMax : - raise outOfRange, "Z piezo pos = %d > %d = max" % (pos, self.zpMax) - if pos < self.zpMin : - raise outOfRange, "Z piezo pos = %d < %d = min" % (pos, self.zpMin) - -# backends: - -class _z_piezo_jump : - def __init__(self, chan_info) : - self.chan_info = chan_info - self.reserved = False - self.AI = single_aio.AI(chan=self.chan_info.inChan) - self.AI.close() - self.AO = single_aio.AO(chan=self.chan_info.outChan) - self.AO.close() - def reserve(self) : - self.AI.open() - self.AO.open() - self.reserved = True - def release(self) : - self.AI.close() - self.AO.close() - self.reserved = False - def updateInputs(self) : - if self.reserved == False : - self.AI.open() - #self.reserve() - self.reserved = False - In = self.AI.read() - if self.reserved == False : - self.AI.close() - #self.release() - return In - def jumpToPos(self, out) : - if self.reserved == False : - self.reserve() - self.reserved = False # set to true inside reserve(), which we don't want - self.AO.write(out) - if self.reserved == False : - self.release() - return self.updateInputs() - def out_to_phys(self, output) : - return self.AO.comedi_to_phys(0, output) - def phys_to_out(self, physical) : - return self.AO.phys_to_comedi(0, physical) - -class _z_piezo_ramp : - def __init__(self, chan_info) : - self.chan_info = chan_info - self.attempts=0 - self.failures=0 - self.verbose = True - def ramp(self, outArray, freq) : - # allocate and run the task - npoints = int(len(outArray)/self.chan_info.numOut) - in_data = array([0]*npoints*self.chan_info.numIn, dtype=uint16) - if type(outArray) != type(in_data) or outArray.dtype != uint16 : - out_data = array([0]*npoints*self.chan_info.numOut, dtype=uint16) - for i in range(npoints) : - out_data[i] = outArray[i] - else : - out_data = outArray - if simult_aio.DONT_OUTPUT_LAST_SAMPLE_HACK == True: - # duplicate the last output point - npoints += 1 - out_hack = array([0]*npoints*self.chan_info.numOut, dtype=uint16) - for i in range(npoints-1) : - out_hack[i] = out_data[i] - out_hack[-1] = out_data[-1] - out_data = out_hack - in_data = array([0]*npoints*self.chan_info.numIn, dtype=uint16) - - correlated = False - ramp={} - while not correlated : - AIO = simult_aio.AIO(in_chan=self.chan_info.inChan, - out_chan=self.chan_info.outChan) - if self.verbose : - print "setup AIO" - AIO.setup(freq=freq, out_buffer=out_data) - if self.verbose : - print "arm AIO" - AIO.arm() - if self.verbose : - print "read AIO" - AIO.start_read(in_data) - if self.verbose : - print "close AIO" - AIO.close() - if self.verbose : - print "finished AIO" - self.attempts += 1 - ramp["Z piezo output"] = out_data[self.chan_info.zp_ind::self.chan_info.numOut] - ramp["Z piezo input"] = in_data[self.chan_info.zp_mon_ind::self.chan_info.numIn] - ramp["Deflection input"] = in_data[self.chan_info.def_ind::self.chan_info.numIn] - failed = False - gradient, intercept, r_value, p_value, std_err = linregress(ramp["Z piezo output"], - ramp["Z piezo input"]) - rnge = ramp["Z piezo output"].max()-ramp["Z piezo output"].min() - if gradient < .8 and rnge > 100 : - if PYLAB_INTERACTIVE_VERBOSE : - figure(BASE_FIG_NUM+3) - subplot(211) - title('ramp (def_in vs zp_out)') - timestamp = time.strftime('%H%M%S') - plot(ramp["Z piezo output"], ramp["Deflection input"], '.', label=timestamp) - subplot(212) - title('zp_in vs zp_out') - plot(ramp["Z piezo output"], ramp["Z piezo input"], '.', label=timestamp) - legend(loc='best') - print "ramp failed slope (%g, %g), try again" % (gradient, rnge) - failed = True - if not failed : - if simult_aio.DONT_OUTPUT_LAST_SAMPLE_HACK == True: - # remove the duplicated last output point - out_data = out_data[:-self.chan_info.numOut] - in_data = in_data[:-self.chan_info.numIn] - ramp["Z piezo output"] = out_data[self.chan_info.zp_ind::self.chan_info.numOut] - ramp["Z piezo input"] = in_data[self.chan_info.zp_mon_ind::self.chan_info.numIn] - ramp["Deflection input"] = in_data[self.chan_info.def_ind::self.chan_info.numIn] - - if USE_ABCD_DEFLECTION : - ramp["Deflection segment"] = [] - for c_ind in self.chan_info.def_ind : - ramp["Deflection segment"].append(in_data[c_ind::self.chan_info.numIn]) - A = ramp["Deflection segment"][0].astype(float) - B = ramp["Deflection segment"][1].astype(float) - C = ramp["Deflection segment"][2].astype(float) - D = ramp["Deflection segment"][3].astype(float) - ramp["Deflection input"] = (((A+B)-(C+D))/(A+B+C+D) * 2**15).astype(uint16) - ramp["Deflection input"] += 2**15 # HACK, comedi uses unsigned ints... - else : - ramp["Deflection input"] = in_data[self.chan_info.def_ind::self.chan_info.numIn] - correlated = True - if PYLAB_INTERACTIVE_VERBOSE : - figure(BASE_FIG_NUM+1) - timestamp = time.strftime('%H%M%S') - plot(ramp["Z piezo output"], ramp["Z piezo input"], '.', label=timestamp) - title('goods') - legend(loc='best') - else : - self.failures += 1 - if PYLAB_INTERACTIVE_VERBOSE : - figure(BASE_FIG_NUM+2) - timestamp = time.strftime('%H%M%S') - plot(ramp["Z piezo output"], ramp["Z piezo input"], '.', label=timestamp) - legend(loc='best') - title('bads') - - #AIO = simAIO.simAIOobj(npoints, - # freq, - # self.chan_info.inChan, - # self.chan_info.outChan) - #err = AIO.configure() - #err = AIO.commit() - #err = AIO.write(outArray) - #err = AIO.run() - #(err, inArray) = AIO.read() - #if err != 0 : - # raise error, "Error in simAIO" - #if len(inArray)*self.chan_info.numOut != len(outArray)*self.chan_info.numIn : - # raise error, "Input array of wrong length" - # - #ramp={} - #ramp["Z piezo output"] = outArray[self.chan_info.zp_ind::self.chan_info.numOut] - #ramp["Z piezo input"] = inArray[self.chan_info.zp_mon_ind::self.chan_info.numIn] - #ramp["Deflection input"] = inArray[self.chan_info.def_ind::self.chan_info.numIn] - - # task automatically freed as AIO goes out of scope - return ramp - -class _chan_info : - def __init__(self, zp_chan, zp_mon_chan, def_chan) : - self.inChan = [] - if USE_ABCD_DEFLECTION : - self.def_ind = [] # index in inChan array - i=0 - for chan in def_chan : - self.inChan.append(chan) - self.def_ind.append(i) - i += 1 - else : - self.inChan.append(def_chan) - self.def_ind = 0 # index in inChan array - self.inChan.append(zp_mon_chan) - self.zp_mon_ind = zp_mon_chan - self.numIn = len(self.inChan) - - self.outChan = [zp_chan] - self.zp_ind = 0 # index in outChan array - self.numOut = len(self.outChan) - - -def test() : - print "test z_piezo()" - zp = z_piezo() - zp.verbose = True - zp.jumpToPos(zp.pos_nm2out(0)) - if zp.verbose : - zp.pCurVals() - curPos = zp.curPos() - curPosNm = zp.pos_out2nm(curPos) - curPos2 = zp.pos_nm2out(curPosNm) - if curPos != curPos2 : - raise error, "Conversion %d -> %g -> %d not round trip" % (curPos, curPosNm, curPos2) - ramp_positions = linspace(zp.curPos(), zp.pos_nm2out(100), 20) - out = zp.ramp(ramp_positions, 10000) - ramp_positions = linspace(zp.curPos(), zp.pos_nm2out(0), 20) - out2 = zp.ramp(ramp_positions, 10000) - -if __name__ == "__main__" : - test() diff --git a/pypiezo/z_piezo_utils.py b/pypiezo/z_piezo_utils.py deleted file mode 100644 index 11f8b7c..0000000 --- a/pypiezo/z_piezo_utils.py +++ /dev/null @@ -1,397 +0,0 @@ -# Copyright (C) 2008-2011 W. Trevor King -# -# This file is part of pypiezo. -# -# pypiezo is free software; you can redistribute it and/or modify it -# under the terms of the GNU General Public License as published by the -# Free Software Foundation, either version 3 of the License, or (at your -# option) any later version. -# -# pypiezo is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with pypiezo. If not, see . - -TEXT_VERBOSE = False -PYLAB_INTERACTIVE = True -PYLAB_VERBOSE = False -BASE_FIG_NUM = 60 -LOG_DATA = True -LOG_DIR = '${DEFAULT}/z_piezo_utils' - -NULL_STEPS_BEFORE_SINGLE_PT_SWEEP = False -SLEEP_DURING_SURF_POS_AQUISITION = False # doesn't help - -from numpy import array, arange, ones, zeros, linspace, uint16, float, sin, cos, pi -from scipy.stats import linregress -import curses_check_for_keypress -import scipy.optimize -import data_logger -import time - -_flush_plot = None -_final_flush_plot = None -_pylab = None -def _dummy_fn(): pass - -def _import_pylab(): - """Import pylab plotting functions for when we need to plot. This - function can be called multiple times, and ensures that the pylab - setup is only imported once. It defines the functions - _flush_plot() to be called after incremental changes, - _final_flush_plot(), to be called at the end of any graphical processing, and - _pylab, a reference to the pylab module.""" - global _pylab - global _flush_plot - global _final_flush_plot - if _pylab == None : - import pylab as _pylab - if _flush_plot == None : - if PYLAB_INTERACTIVE : - _flush_plot = _pylab.draw - else : - _flush_plot = _pylab.show - if _final_flush_plot == None : - if PYLAB_INTERACTIVE : - _final_flush_plot = _pylab.draw - else : - _final_flush_plot = _dummy_fn - -class error (Exception) : - pass -class poorFit (error) : - pass -class tooClose (error) : - pass - -def moveToPosOrDef(zpiezo, pos, defl, step, return_data=False) : - if return_data or LOG_DATA or PYLAB_VERBOSE : - aquire_data = True - else : - aquire_data = False - zpiezo._check_range(pos) - if step == 0 : - raise error, "Must have non-zero step size" - elif step < 0 and pos > zpiezo.curPos() : - step = -step - elif step > 0 and pos < zpiezo.curPos() : - step = -step - - zpiezo._jump.reserve() - zpiezo.updateInputs() - if TEXT_VERBOSE : - print "current z_piezo position %6d, current deflection %6d =< %6d" % (zpiezo.curPos(), zpiezo.curDef(), defl) - if aquire_data : - def_array.append(zpiezo.curDef()) - pos_array.append(zpiezo.curPos()) - zpiezo._jump.release() - - if zpiezo.setExternalZPiezo != None : - zpiezo.setExternalZPiezo(zpiezo.pos_out2nm(zpiezo.curPos())) - if PYLAB_VERBOSE : - _import_pylab() - _pylab.figure(BASE_FIG_NUM) - timestamp = time.strftime('%H%M%S') - _pylab.plot(pos_array, def_array, '.', label=timestamp) - _pylab.title('step approach') - _pylab.legend(loc='best') - _flush_plot() - if LOG_DATA : - log = data_logger.data_log(LOG_DIR, noclobber_logsubdir=False, - log_name="moveToPosOrDef") - data = {"Z piezo output":array(pos_array), - "Deflection input":array(def_array)} - log.write_dict_of_arrays(data) - if return_data : - data = {"Z piezo output":array(pos_array), - "Deflection input":array(def_array)} - return (zpiezo.curVals(), data) - else : - return zpiezo.curVals() - - -def wiggleForInterferenceMin(zpiezo, wig_amp=20000, wig_freq=100, - plotVerbose=True) : - "Output sin to measure interference" - if PYLAB_VERBOSE or plotVerbose: - _import_pylab() - npoints = 1000 - scanfreq = wig_freq*npoints - out = zeros((npoints,), dtype=uint16) - for i in range(npoints) : - out[i] = int(sin(4*pi*i/npoints)*wig_amp+32000) - # 4 for 2 periods - if PYLAB_VERBOSE or plotVerbose: - _pylab.figure(BASE_FIG_NUM+1) - a = _pylab.axes() - _pylab.title('wiggle for interference') - _pylab.hold(False) - p = _pylab.plot(out, out, 'b.-') - if LOG_DATA : - log = data_logger.data_log(LOG_DIR, noclobber_logsubdir=False, - log_name="wiggle") - c = curses_check_for_keypress.check_for_keypress(test_mode=False) - while c.input() == None : - data = zpiezo.ramp(out, scanfreq) - dmin = data["Deflection input"].min() - dmax = data["Deflection input"].max() - if PYLAB_VERBOSE or plotVerbose: - p[0].set_ydata(data["Deflection input"]) - a.set_ylim([dmin,dmax]) - _flush_plot() - if LOG_DATA : - log.write_dict_of_arrays(data) - if PYLAB_VERBOSE or plotVerbose: - _pylab.hold(True) - -def getSurfPosData(zpiezo, maxDefl, textVerbose=False, plotVerbose=False) : - "Measure the distance to the surface" - if plotVerbose and not PYLAB_VERBOSE : - _import_pylab() - origPos = zpiezo.curPos() - # fully retract the piezo - if TEXT_VERBOSE or textVerbose : - print "\tRetract the piezo" - out = linspace(zpiezo.curPos(), zpiezo.zpMin, 2000) - ret1 = zpiezo.ramp(out, 10e3) - del(out) # start with a clean output vector, seems to help reduce bounce? - zpiezo.updateInputs() # opening output seems to cause bounce? - # locate high deflection position - if TEXT_VERBOSE or textVerbose : - print "\tApproach until there is dangerous deflection (> %d)" % maxDefl - if SLEEP_DURING_SURF_POS_AQUISITION == True : - time.sleep(.2) # sleeping briefly seems to reduce bounce? - cp, mtpod = moveToPosOrDef(zpiezo, zpiezo.zpMax, maxDefl, - (zpiezo.zpMax-zpiezo.zpMin)/2000, - return_data=True) - highContactPos = zpiezo.curPos() - if highContactPos <= origPos : # started out too close, we need some room to move in... - if TEXT_VERBOSE or textVerbose : - print "\tToo close, return to the original position: ", origPos - out = linspace(zpiezo.curPos(), origPos, 2000) - ret3=zpiezo.ramp(out, 10e3) - del(out) - zpiezo.updateInputs() - if PYLAB_VERBOSE or plotVerbose : - _pylab.figure(BASE_FIG_NUM+2) - def plot_dict(d, label) : - print d.keys() - _pylab.plot(d["Z piezo output"], d["Deflection input"], '.',label=label) - plot_dict(ret1, 'First retract') - plot_dict(mtpod, 'Single step in') - plot_dict(ret3, 'Return to start') - _pylab.legend(loc='best') - _pylab.title('Too close') - raise tooClose, "Way too close!\n(highContactPos %d <= origPos %d)" % (highContactPos, origPos) - zpiezo.updateInputs() - # fully retract the piezo again - if TEXT_VERBOSE or textVerbose : - print "\tRetract the piezo again" - if SLEEP_DURING_SURF_POS_AQUISITION == True : - time.sleep(.2) - out = linspace(highContactPos, zpiezo.zpMin, 2000) - ret2 = zpiezo.ramp(out, 10e3) - del(out) - zpiezo.updateInputs() - if zpiezo.curPos() != zpiezo.zpMin : - raise Exception, "bad code" - # scan to the high contact position - if TEXT_VERBOSE or textVerbose : - print "\tRamp in to the deflected position: ", highContactPos - if SLEEP_DURING_SURF_POS_AQUISITION == True : - time.sleep(.2) - out = linspace(zpiezo.curPos(), highContactPos, 4000) - data = zpiezo.ramp(out, 10e3) - del(out) - zpiezo.updateInputs() - if LOG_DATA : - log = data_logger.data_log(LOG_DIR, noclobber_logsubdir=False, - log_name="getSurfPos") - log.write_dict_of_arrays(data) - if SLEEP_DURING_SURF_POS_AQUISITION == True : - time.sleep(.2) - # return to the original position - if TEXT_VERBOSE or textVerbose : - print "\tReturn to the original position: ", origPos - out = linspace(highContactPos, origPos, 2000) - ret3=zpiezo.ramp(out, 10e3) - del(out) - zpiezo.updateInputs() - return {"ret1":ret1, "mtpod":mtpod, "ret2":ret2, - "approach":data, "ret3":ret3} - -def bilinear(x, params): - """bilinear fit for surface bumps. Model has two linear regimes - which meet at x=kink_position and have independent slopes. - """ - assert type(x) == type(array([0,1])), "Invalid x array: %s" % type(x) - left_offset,left_slope,kink_position,right_slope = params - left_mask = x < kink_position - right_mask = x >= kink_position # = not left_mask - left_y = left_offset + left_slope*x - right_y = left_offset + left_slope*kink_position \ - + right_slope*(x-kink_position) - return left_mask * left_y + right_mask * right_y - -def analyzeSurfPosData(ddict, textVerbose=False, plotVerbose=False, retAllParams=False) : - # ususes ddict["approach"] for analysis - # the others are just along to be plotted - - data = ddict["approach"] - # analyze data, using bilinear model - # y = p0 + p1 x for x <= p2 - # = p0 + p1 p2 + p3 (x-p2) for x >= p2 - if TEXT_VERBOSE or textVerbose : - print "\tAnalyze the data" - dump_before_index = 0 # 25 # HACK!! - # Generate a reasonable guess... - startPos = int(data["Z piezo output"].min()) - finalPos = int(data["Z piezo output"].max()) - startDef = int(data["Deflection input"].min()) - finalDef = int(data["Deflection input"].max()) - # startDef and startPos are probably for 2 different points - - left_offset = startDef - left_slope = 0 - kink_position = (finalPos+startPos)/2.0 - right_slope = 2.0*(finalDef-startDef)/(finalPos-startPos) - pstart = [left_offset, left_slope, kink_position, right_slope] - - offset_scale = (finalPos - startPos)/100 - left_slope_scale = right_slope/10 - kink_scale = (finalPos-startPos)/100 - right_slope_scale = right_slope - scale = [offset_scale, left_slope_scale, kink_scale, right_slope_scale] - if TEXT_VERBOSE or textVerbose : - print "start Def %d, final def %d, start pos %d, final pos %d" % (startDef, finalDef, startPos, finalPos) - print "Guessed params ", pstart - def residual(p, y, x): - Y = bilinear(x, p) - return Y-y - leastsq = scipy.optimize.leastsq - params,cov,info,mesg,ier = leastsq( - residual, pstart, - args=(data["Deflection input"][dump_before_index:], - data["Z piezo output"][dump_before_index:]), - full_output=True, maxfev=10000) - if TEXT_VERBOSE or textVerbose : - print "Best fit parameters: ", params - - if PYLAB_VERBOSE or plotVerbose : - _import_pylab() - _pylab.figure(BASE_FIG_NUM+3) - def plot_dict(d, label) : - _pylab.plot(d["Z piezo output"], d["Deflection input"], '.',label=label) - try : - plot_dict(ddict['ret1'], 'First retract') - plot_dict(ddict['mtpod'], 'Single step in') - plot_dict(ddict['ret2'], 'Second retract') - except KeyError : - pass # if we don't have the surrounding data, don't worry - plot_dict(data, 'Main approach') - try : - plot_dict(ddict['ret3'], 'Return to start') - except KeyError : - pass # if we don't have the surrounding data, don't worry - def fit_fn(x, params) : - if x <= params[2] : - return params[0] + params[1]*x - else : - return params[0] + params[1]*params[2] + params[3]*(x-params[2]) - _pylab.plot([startPos, params[2], finalPos], - [fit_fn(startPos, params), fit_fn(params[2], params), fit_fn(finalPos, params)], - '-',label='fit') - _pylab.title('surf_pos %5g %5g %5g %5g' % (params[0], params[1], params[2], params[3])) - _pylab.legend(loc='best') - _flush_plot() - - # check that the fit is reasonable - # params[1] is slope in non-contact region - # params[3] is slope in contact region - if abs(params[1]*10) > abs(params[3]) : - raise poorFit, "Slope in non-contact region, or no slope in contact" - if params[2] < startPos+0.02*(finalPos-startPos) : # params[2] is kink position - raise poorFit, "No kink (kink %g less than %g, need more space to left)" % \ - (params[2], startPos+0.02*(finalPos-startPos)) - if params[2] > finalPos-0.02*(finalPos-startPos) : - raise poorFit, "No kink (kink %g more than %g, need more space to right)" % \ - (params[2], finalPos-0.02*(finalPos-startPos)) - if params[3] < 0.5 * right_slope : # params[3] is slope in contact region - raise poorFit, "Too far" - if TEXT_VERBOSE or textVerbose : - print "\tSurface position: ", params[2] - if retAllParams : - return params - return params[2] - -def getSurfPos(zpiezo, maxDefl, textVerbose=False, plotVerbose=False) : - ddict = getSurfPosData(zpiezo, maxDefl, textVerbose, plotVerbose) - return analyzeSurfPosData(ddict, textVerbose, plotVerbose) - -def test_smoothness(zp, plotVerbose=True) : - posA = 20000 - posB = 50000 - setpoint = zp.def_V2in(3) - steps = 200 - outfreq = 1e5 - outarray = linspace(posB, posA, 1000) - indata=[] - outdata=[] - curVals = zp.jumpToPos(posA) - zp.pCurVals(curVals) - time.sleep(1) # let jitters die down - for i in range(10) : - print "ramp %d to %d" % (zp.curPos(), posB) - curVals, data = moveToPosOrDef(zp, posB, setpoint, step=steps, - return_data = True) - indata.append(data) - out = zp.ramp(outarray, outfreq) - outdata.append(out) - if plotVerbose : - from pylab import figure, plot, title, legend, hold, subplot - if PYLAB_VERBOSE or plotVerbose : - _import_pylab() - _pylab.figure(BASE_FIG_NUM+4) - for i in range(10) : - _pylab.plot(indata[i]['Z piezo output'], - indata[i]['Deflection input'], '+--', label='in') - _pylab.plot(outdata[i]['Z piezo output'], - outdata[i]['Deflection input'], '.-', label='out') - _pylab.title('test smoothness (step in, ramp out)') - #_pylab.legend(loc='best') - -def test() : - import z_piezo - zp = z_piezo.z_piezo() - curVals = zp.moveToPosOrDef(zp.pos_nm2out(600), defl=zp.curDef()+6000, step=(zp.pos_nm2out(10)-zp.pos_nm2out(0))) - if TEXT_VERBOSE : - zp.pCurVals(curVals) - pos = zp.getSurfPos(maxDefl=zp.curDef()+6000) - if TEXT_VERBOSE : - print "Surface at %g nm", pos - print "success" - if PYLAB_VERBOSE and _final_flush_plot != None: - _final_flush_plot() diff --git a/setup.py b/setup.py index 9acf353..9e57456 100644 --- a/setup.py +++ b/setup.py @@ -36,7 +36,7 @@ Topic :: Scientific/Engineering Topic :: Software Development :: Libraries :: Python Modules """ -def find_packages(root='calibcant'): +def find_packages(root='pypiezo'): packages = [] prefix = '.'+os.path.sep for dirpath,dirnames,filenames in walk(root): @@ -49,18 +49,18 @@ def find_packages(root='calibcant'): _this_dir = os.path.dirname(__file__) packages = find_packages() -setup(name="pypiezo", +setup(name='pypiezo', version=__version__, - maintainer="W. Trevor King", - maintainer_email="wking@drexel.edu", - url = "http://www.physics.drexel.edu/~wking/unfolding-disasters/pypiezo/", - download_url = "http://www.physics.drexel.edu/~wking/code/python/pypiezo-%s.tar.gz" % __version__, - license = "GNU General Public License (GPL)", - platforms = ["all"], - description = __doc__, - long_description = open(os.path.join(_this_dir, 'README'), 'r').read(), - classifiers = filter(None, classifiers.split("\n")), - packages = packages, - provides = ['pypiezo (%s)' % __version__], - requires = ['pycomedi (>= 0.3)'], + maintainer='W. Trevor King', + maintainer_email='wking@drexel.edu', + url='http://www.physics.drexel.edu/~wking/unfolding-disasters/pypiezo/', + download_url='http://www.physics.drexel.edu/~wking/code/python/pypiezo-%s.tar.gz' % __version__, + license='GNU General Public License (GPL)', + platforms=['all'], + description=__doc__, + long_description=open(os.path.join(_this_dir, 'README'), 'r').read(), + classifiers=filter(None, classifiers.split('\n')), + packages=packages, + provides=['pypiezo (%s)' % __version__], + requires=['pycomedi (>= 0.3)'], )