Gentoo
~~~~~~
-I've packaged FFT-tools for Gentoo. You need layman_ and my `wtk
+I've packaged pypiezo for Gentoo. You need layman_ and my `wtk
overlay`_. Install with::
# emerge -av app-portage/layman
# layman --add wtk
- # emerge -av sci-libs/pycomedi
+ # emerge -av sci-libs/pypiezo
Dependencies
------------
-If you're installing by hand or packaging pycomedi for another
+If you're installing by hand or packaging pypiezo for another
distribution, you'll need the following dependencies:
=========== ================= =====================
# along with pypiezo. If not, see <http://www.gnu.org/licenses/>.
import logging as _logging
+import logging.handlers as _logging_handlers
__version__ = '0.4'
LOG = _logging.getLogger('pypiezo')
"Pypiezo logger"
-LOG.setLevel(_logging.DEBUG)
-h = _logging.StreamHandler()
-#h.setLevel(_logging.WARN)
-h.setLevel(_logging.DEBUG)
-f = _logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
-h.setFormatter(f)
-LOG.addHandler(h)
-del h, f
+LOG.setLevel(_logging.WARN)
+_formatter = _logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+_stream_handler = _logging.StreamHandler()
+_stream_handler.setLevel(_logging.DEBUG)
+_stream_handler.setFormatter(_formatter)
+LOG.addHandler(_stream_handler)
+
+_syslog_handler = None
+
+
+from .config import _BaseConfig
+from .config import find_base_config as _find_base_config
+
+
+def setup_base_config(config):
+ global base_config, _syslog_handler
+ base_config = config
+
+ LOG.setLevel(base_config['log-level'])
+
+ if base_config['syslog']:
+ if not _syslog_handler:
+ _syslog_handler = _logging_handlers.SysLogHandler()
+ _syslog_handler.setLevel(_logging.DEBUG)
+ LOG.handlers = [_syslog_handler]
+ else:
+ LOG.handlers = [_stream_handler]
+
+ LOG.info('setup base_config:\n%s' % config.dump())
+
+def clear_base_config():
+ setup_base_config(_BaseConfig())
+
+base_config = _find_base_config()
+setup_base_config(base_config)
--- /dev/null
+# Copyright (C) 2008-2011 W. Trevor King <wking@drexel.edu>
+#
+# This file is part of pypiezo.
+#
+# pypiezo is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# pypiezo is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with pypiezo. If not, see <http://www.gnu.org/licenses/>.
+
+"Control of a piezo-based atomic force microscope."
+
+import numpy as _numpy
+
+try:
+ import matplotlib as _matplotlib
+ import matplotlib.pyplot as _matplotlib_pyplot
+ import time as _time # for timestamping lines on plots
+except ImportError, e:
+ _matplotlib = None
+ _matplotlib_import_error = e
+
+from curses_check_for_keypress import CheckForKeypress as _CheckForKeypress
+
+from . import LOG as _LOG
+from . import base_config as _base_config
+from . import base as _base
+from . import surface as _surface
+
+
+class AFMPiezo (_base.Piezo):
+ """A piezo-controlled atomic force microscope.
+
+ This particular class expects a single input channel for measuring
+ deflection. Other subclasses provide support for multi-segment
+ deflection measurements.
+
+ >>> from pprint import pprint
+ >>> from pycomedi.device import Device
+ >>> from pycomedi.subdevice import StreamingSubdevice
+ >>> from pycomedi.channel import AnalogChannel
+ >>> from pycomedi.constant import AREF, SUBDEVICE_TYPE, UNIT
+ >>> from . import config
+ >>> from . import surface
+
+ >>> d = Device('/dev/comedi0')
+ >>> d.open()
+
+ >>> s_in = d.find_subdevice_by_type(SUBDEVICE_TYPE.ai,
+ ... factory=StreamingSubdevice)
+ >>> s_out = d.find_subdevice_by_type(SUBDEVICE_TYPE.ao,
+ ... factory=StreamingSubdevice)
+
+ >>> axis_channel = s_out.channel(
+ ... 0, factory=AnalogChannel, aref=AREF.ground)
+ >>> input_channel = s_in.channel(0, factory=AnalogChannel, aref=AREF.diff)
+ >>> for chan in [axis_channel, input_channel]:
+ ... chan.range = chan.find_range(unit=UNIT.volt, min=-10, max=10)
+
+ We set the minimum voltage for the `z` axis to -9 (a volt above
+ the minimum possible voltage) to help with testing
+ `.get_surface_position`. Without this minimum voltage, small
+ calibration errors could lead to a railed -10 V input for the
+ first few surface approaching steps, which could lead to an
+ `EdgeKink` error instead of a `FlatFit` error.
+
+ >>> axis_config = config._AxisConfig()
+ >>> axis_config.update(
+ ... {'gain':20, 'sensitivity':8e-9, 'minimum':-9})
+ >>> axis_channel_config = config._ChannelConfig()
+ >>> input_channel_config = config._ChannelConfig()
+
+ >>> a = _base.PiezoAxis(axis_config=axis_config,
+ ... axis_channel_config=axis_channel_config,
+ ... axis_channel=axis_channel, name='z')
+ >>> a.setup_config()
+
+ >>> c = _base.InputChannel(
+ ... channel_config=input_channel_config, channel=input_channel,
+ ... name='deflection')
+ >>> c.setup_config()
+
+ >>> p = AFMPiezo(axes=[a], input_channels=[c])
+
+ >>> deflection = p.read_deflection()
+ >>> deflection # doctest: +SKIP
+ 34494L
+ >>> p.deflection_dtype()
+ <type 'numpy.uint16'>
+
+ We need to know where we are before we can move somewhere
+ smoothly.
+
+ >>> pos = _base.convert_volts_to_bits(p.axes[0].axis_channel_config, 0)
+ >>> p.jump('z', pos)
+
+ Usually `.move_to_pos_or_def` is used to approach the surface, but
+ for testing we assume the z output channel is connected directly
+ into the deflection input channel.
+
+ >>> target_pos = _base.convert_volts_to_bits(
+ ... p.axes[0].axis_channel_config, 2)
+ >>> step = int((target_pos - pos)/5)
+ >>> target_def = _base.convert_volts_to_bits(
+ ... p.input_channels[0].channel_config, 3)
+ >>> data = p.move_to_pos_or_def('z', target_pos, target_def, step=step,
+ ... return_data=True)
+ >>> p.last_output == {'z': int(target_pos)}
+ True
+ >>> pprint(data) # doctest: +SKIP
+ {'deflection':
+ array([32655, 33967, 35280, 36593, 37905, 39218, 39222], dtype=uint16),
+ 'z':
+ array([32767, 34077, 35387, 36697, 38007, 39317, 39321], dtype=uint16)}
+
+ That was a working position-limited approach. Now move back to
+ the center and try a deflection-limited approach.
+
+ >>> p.jump('z', pos)
+ >>> target_def = _base.convert_volts_to_bits(
+ ... p.input_channels[0].channel_config, 1)
+ >>> data = p.move_to_pos_or_def('z', target_pos, target_def, step=step,
+ ... return_data=True)
+ >>> print (p.last_output['z'] < int(target_pos))
+ True
+ >>> pprint(data) # doctest: +SKIP
+ {'deflection': array([32655, 33968, 35281, 36593], dtype=uint16),
+ 'z': array([32767, 34077, 35387, 36697], dtype=uint16)}
+
+ >>> p.wiggle_for_interference('z', offset=p.last_output['z'],
+ ... laser_wavelength=650e-9, keypress_test_mode=True)
+ Press any key to continue
+
+ >>> try:
+ ... p.get_surface_position('z', max_deflection=target_def)
+ ... except surface.FlatFit, e:
+ ... print 'got FlatFit'
+ got FlatFit
+ >>> print e # doctest: +SKIP
+ slopes not sufficiently different: 1.0021 and 1.0021
+ >>> abs(e.right_slope-1) < 0.1
+ True
+ >>> abs(e.left_slope-1) < 0.1
+ True
+
+ >>> d.close()
+ """
+ def _deflection_channel(self):
+ return self.channel_by_name(name='deflection', direction='input')
+
+ def read_deflection(self):
+ """Return sensor deflection in bits.
+
+ TODO: explain how bit <-> volt conversion will work for this
+ "virtual" channel.
+ """
+ return self._deflection_channel().data_read()
+
+ def deflection_dtype(self):
+ "Return a Numpy dtype suitable for deflection bit values."
+ return self._deflection_channel().subdevice.get_dtype()
+
+ def move_to_pos_or_def(self, axis_name, position, deflection, step,
+ return_data=False, pre_move_steps=0):
+ """TODO
+
+ pre_move_steps : int
+ number of 'null' steps to take before moving (confirming a
+ stable input deflection).
+ """
+ if return_data or _base_config['matplotlib']:
+ aquire_data = True
+ else:
+ aquire_data = False
+
+ if step == 0:
+ raise ValueError('must have non-zero step size')
+ elif step < 0 and position > self.last_output[axis_name]:
+ step = -step
+ elif step > 0 and position < self.last_output[axis_name]:
+ step = -step
+
+ log_string = (
+ 'move to position %d or deflection %g on axis %s in steps of %d'
+ % (position, deflection, axis_name, step))
+ _LOG.debug(log_string)
+ current_deflection = self.read_deflection()
+ log_string = 'current position %d and deflection %g' % (
+ self.last_output[axis_name], current_deflection)
+ _LOG.debug(log_string)
+
+ if aquire_data:
+ def_array=[current_deflection]
+ pos_array=[self.last_output[axis_name]]
+ for i in range(pre_move_steps):
+ self.jump(axis_name, piezo.last_output[axis_name])
+ delection = self.read_deflection()
+ if aquire_data:
+ def_array.append(current_deflection)
+ pos_array.append(self.last_output[axis_name])
+ # step in until we hit our target position or exceed our target deflection
+ while (self.last_output[axis_name] != position and
+ current_deflection < deflection):
+ dist_to = position - self.last_output[axis_name]
+ if abs(dist_to) < abs(step):
+ jump_to = position
+ else:
+ jump_to = self.last_output[axis_name] + step
+ self.jump(axis_name, jump_to)
+ current_deflection = self.read_deflection()
+ log_string = (
+ 'current z piezo position %6d, current deflection %6d'
+ % (current_deflection, self.last_output[axis_name]))
+ _LOG.debug(log_string)
+ if aquire_data:
+ def_array.append(current_deflection)
+ pos_array.append(self.last_output[axis_name])
+
+ log_string = (
+ 'move to position %d or deflection %g on axis %s complete'
+ % (position, deflection, axis_name))
+ _LOG.debug(log_string)
+ log_string = 'current position %d and deflection %g' % (
+ self.last_output[axis_name], current_deflection)
+ _LOG.debug(log_string)
+ if _base_config['matplotlib']:
+ if not _matplotlib:
+ raise _matplotlib_import_error
+ figure = _matplotlib_pyplot.figure()
+ axes = figure.add_subplot(1, 1, 1)
+ axes.hold(True)
+ timestamp = _time.strftime('%H%M%S')
+ axes.set_title('step approach %s' % timestamp)
+ axes.plot(pos_array, def_array, '.', label=timestamp)
+ #_pylab.legend(loc='best')
+ figure.show()
+
+ if return_data:
+ data = {
+ axis_name:_numpy.array(
+ pos_array, dtype=self.channel_dtype(
+ axis_name, direction='output')),
+ 'deflection':_numpy.array(
+ def_array, dtype=self.deflection_dtype()),
+ }
+ return data
+
+ def wiggle_for_interference(
+ self, axis_name, wiggle_frequency=2, n_samples=1024, amplitude=None,
+ offset=None, laser_wavelength=None, plot=True,
+ keypress_test_mode=False):
+ """Output a sine wave and measure interference.
+
+ With a poorly focused or aligned laser, leaked laser light
+ reflecting off the surface may interfere with the light
+ reflected off the cantilever, causing distance-dependent
+ interference with a period roughly half the laser's
+ wavelength. This method wiggles the cantilever near the
+ surface and monitors the magnitude of deflection oscillation,
+ allowing the operator to adjust the laser alignment in order
+ to minimize the interference.
+
+ Modern commercial AFMs with computer-aligned lasers must do
+ something like this automatically.
+ """
+ if _base_config['matplotlib']:
+ plot = True
+ if laser_wavelength and amplitude:
+ log_string = \
+ 'use either laser_wavelength or amplitude, but not both'
+ _LOG.warn(log_string)
+
+ if None in (amplitude, offset):
+ output_axis = self.axis_by_name(axis_name)
+ maxdata = output_axis.axis_channel.get_maxdata()
+ midpoint = int(maxdata/2)
+ if offset == None:
+ offset = midpoint
+ log_string = (
+ 'generated offset for interference wiggle: %g' % offset)
+ _LOG.debug(log_string)
+ if amplitude == None:
+ if offset <= midpoint:
+ max_amplitude = int(offset)
+ else:
+ max_amplitude = int(maxdata-offset)
+ offset_meters = _base.convert_bits_to_meters(
+ output_axis.axis_channel_config, output_axis.axis_config,
+ offset)
+ bit_wavelength = _base.convert_meters_to_bits(
+ output_axis.axis_channel_config, output_axis.axis_config,
+ offset_meters + laser_wavelength) - offset
+ amplitude = 2*bit_wavelength
+ log_string = (
+ 'generated amplitude for interference wiggle: %g'
+ % amplitude)
+ _LOG.debug(log_string)
+ if amplitude > max_amplitude:
+ raise ValueError('no room for a two wavelength wiggle')
+
+ scan_frequency = wiggle_frequency * n_samples
+ out = (amplitude * _numpy.sin(
+ _numpy.arange(n_samples) * 4 * _numpy.pi / float(n_samples))
+ + offset)
+ # 4 for 2 periods, so you can judge precision
+ out = out.reshape((len(out), 1)).astype(
+ self.channel_dtype(axis_name, direction='output'))
+
+ _LOG.debug('oscillate for interference wiggle')
+ log_string = (
+ 'amplitude: %d bits, offset: %d bits, scan frequency: %g Hz'
+ % (amplitude, offset, scan_frequency))
+ _LOG.debug(log_string)
+
+ if plot:
+ if not _matplotlib:
+ raise _matplotlib_import_error
+ figure = _matplotlib_pyplot.figure()
+ axes = figure.add_subplot(1, 1, 1)
+ axes.hold(False)
+ timestamp = _time.strftime('%H%M%S')
+ axes.set_title('wiggle for interference %s' % timestamp)
+ plot_p = axes.plot(out, out, 'b.-')
+ figure.show()
+ c = _CheckForKeypress(test_mode=keypress_test_mode)
+ while c.input() == None:
+ # input will need processing for multi-segment AFMs...
+ data = self.ramp(out, scan_frequency, output_names=[axis_name],
+ input_names=['deflection'])
+ _LOG.debug('completed a wiggle round')
+ if plot:
+ plot_p[0].set_ydata(data[:,0])
+ axes.set_ylim([data.min(), data.max()])
+ #_flush_plot()
+ self.last_output[axis_name] = out[-1,0]
+ _LOG.debug('interference wiggle complete')
+
+ get_surface_position = _surface.get_surface_position
+
+
+#def ramp
+# if USE_ABCD_DEFLECTION :
+# for i in range(4) : # i is the photodiode element (0->A, 1->B, ...)
+# self.curIn[i] = out["Deflection segment"][i][-1]
+# else :
+# self.curIn[self.chan_info.def_ind] = out["deflection"][-1]
+
+
+#class FourSegmentAFM (AFM):
+# def read_deflection(self):
+# "Return sensor deflection in bits."
+# A = int(self.curIn[self.chan_info.def_ind[0]])
+# B = int(self.curIn[self.chan_info.def_ind[1]])
+# C = int(self.curIn[self.chan_info.def_ind[2]])
+# D = int(self.curIn[self.chan_info.def_ind[3]])
+# df = float((A+B)-(C+D))/(A+B+C+D)
+# dfout = int(df * 2**15) + 2**15
+# if TEXT_VERBOSE :
+# print "Current deflection %d (%d, %d, %d, %d)" \
+# % (dfout, A, B, C, D)
+# return dfout
+
+
+#def test_smoothness(zp, plotVerbose=True):
+# posA = 20000
+# posB = 50000
+# setpoint = zp.def_V2in(3)
+# steps = 200
+# outfreq = 1e5
+# outarray = linspace(posB, posA, 1000)
+# indata=[]
+# outdata=[]
+# curVals = zp.jumpToPos(posA)
+# zp.pCurVals(curVals)
+# _sleep(1) # let jitters die down
+# for i in range(10):
+# print "ramp %d to %d" % (zp.curPos(), posB)
+# curVals, data = moveToPosOrDef(zp, posB, setpoint, step=steps,
+# return_data = True)
+# indata.append(data)
+# out = zp.ramp(outarray, outfreq)
+# outdata.append(out)
+# if plotVerbose:
+# from pylab import figure, plot, title, legend, hold, subplot
+# if PYLAB_VERBOSE or plotVerbose:
+# _import_pylab()
+# _pylab.figure(BASE_FIG_NUM+4)
+# for i in range(10):
+# _pylab.plot(indata[i]['z'],
+# indata[i]['deflection'], '+--', label='in')
+# _pylab.plot(outdata[i]['z'],
+# outdata[i]['deflection'], '.-', label='out')
+# _pylab.title('test smoothness (step in, ramp out)')
+# #_pylab.legend(loc='best')
+#
+#def test():
+# import z_piezo
+# zp = z_piezo.z_piezo()
+# curVals = zp.moveToPosOrDef(zp.pos_nm2out(600), defl=zp.curDef()+6000, step=(zp.pos_nm2out(10)-zp.pos_nm2out(0)))
+# if TEXT_VERBOSE:
+# zp.pCurVals(curVals)
+# pos = zp.getSurfPos(maxDefl=zp.curDef()+6000)
+# if TEXT_VERBOSE:
+# print "Surface at %g nm", pos
+# print "success"
+# if PYLAB_VERBOSE and _final_flush_plot != None:
+# _final_flush_plot()
+
--- /dev/null
+# Copyright (C) 2008-2011 W. Trevor King <wking@drexel.edu>
+#
+# This file is part of pypiezo.
+#
+# pypiezo is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# pypiezo is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with pypiezo. If not, see <http://www.gnu.org/licenses/>.
+
+"Basic piezo control."
+
+import math as _math
+from time import sleep as _sleep
+
+import numpy as _numpy
+from scipy.stats import linregress as _linregress
+
+try:
+ import matplotlib as _matplotlib
+ import matplotlib.pyplot as _matplotlib_pyplot
+ import time as _time # for timestamping lines on plots
+except ImportError, e:
+ _matplotlib = None
+ _matplotlib_import_error = e
+
+from pycomedi.constant import TRIG_SRC, SDF
+from pycomedi.utility import inttrig_insn, Reader, Writer
+
+from . import LOG as _LOG
+from . import base_config as _base_config
+from . import config as _config
+
+
+def convert_bits_to_volts(config, data):
+ """Convert bit-valued data to volts.
+
+ >>> config = _config._ChannelConfig()
+ >>> config['conversion-coefficients'] = [1, 2, 3]
+ >>> config['conversion-origin'] = -1
+ >>> convert_bits_to_volts(config, -1)
+ 1
+ >>> convert_bits_to_volts(
+ ... config, _numpy.array([-1, 0, 1, 2], dtype=_numpy.float))
+ array([ 1., 6., 17., 34.])
+ """
+ coefficients = config['conversion-coefficients']
+ origin = config['conversion-origin']
+ return _numpy.polyval(list(reversed(coefficients)), data-origin)
+
+def convert_volts_to_bits(config, data):
+ """Convert bit-valued data to volts.
+
+ >>> config = _config._ChannelConfig()
+ >>> config['inverse-conversion-coefficients'] = [1, 2, 3]
+ >>> config['inverse-conversion-origin'] = -1
+ >>> convert_volts_to_bits(config, -1)
+ 1
+ >>> convert_volts_to_bits(
+ ... config, _numpy.array([-1, 0, 1, 2], dtype=_numpy.float))
+ array([ 1., 6., 17., 34.])
+
+ Note that the inverse coeffiecient and offset are difficult to
+ derive from the forward coefficient and offset. The current
+ Comedilib conversion functions, `comedi_to_physical()` and
+ `comedi_from_physical()` take `comedi_polynomial_t` conversion
+ arguments. `comedi_polynomial_t` is defined in `comedilib.h`_,
+ and holds a polynomial of length
+ `COMEDI_MAX_NUM_POLYNOMIAL_COEFFICIENTS`, currently set to 4. The
+ inverse of this cubic polynomial might be another polynomial, but
+ it might also have a more complicated form. A general inversion
+ solution is considered too complicated, so when you're setting up
+ your configuration, you should use Comedilib to save both the
+ forward and inverse coefficients and offsets.
+
+ .. _comedilib.h: http://www.comedi.org/git?p=comedi/comedilib.git;a=blob;f=include/comedilib.h;hb=HEAD
+ """
+ origin = config['inverse-conversion-origin']
+ inverse_coefficients = config['inverse-conversion-coefficients']
+ if len(inverse_coefficients) == 0:
+ raise NotImplementedError('cubic polynomial inversion')
+ return _numpy.polyval(list(reversed(inverse_coefficients)), data-origin)
+
+def convert_volts_to_meters(config, data):
+ """Convert volt-valued data to meters.
+
+ >>> config = _config._AxisConfig()
+ >>> config['gain'] = 20.0
+ >>> config['sensitivity'] = 8e-9
+ >>> convert_volts_to_meters(config, 1)
+ 1.6e-07
+ >>> convert_volts_to_meters(
+ ... config, _numpy.array([1, 6, 17, 34], dtype=_numpy.float))
+ ... # doctest: +ELLIPSIS
+ array([ 1.6...e-07, 9.6...e-07, 2.7...e-06,
+ 5.4...e-06])
+ """
+ return data * config['gain'] * config['sensitivity']
+
+def convert_meters_to_volts(config, data):
+ """Convert bit-valued data to volts.
+
+ >>> config = _config._AxisConfig()
+ >>> config['gain'] = 20.0
+ >>> config['sensitivity'] = 8e-9
+ >>> convert_meters_to_volts(config, 1.6e-7)
+ 1.0
+ >>> convert_meters_to_volts(
+ ... config, _numpy.array([1.6e-7, 9.6e-7, 2.72e-6, 5.44e-6],
+ ... dtype=_numpy.float))
+ array([ 1., 6., 17., 34.])
+ """
+ return data / (config['gain'] * config['sensitivity'])
+
+def convert_bits_to_meters(channel_config, axis_config, data):
+ """Convert bit-valued data to meters.
+
+ >>> channel_config = _config._ChannelConfig()
+ >>> channel_config['conversion-coefficients'] = [1, 2, 3]
+ >>> channel_config['conversion-origin'] = -1
+ >>> axis_config = _config._AxisConfig()
+ >>> axis_config['gain'] = 20.0
+ >>> axis_config['sensitivity'] = 8e-9
+ >>> convert_bits_to_meters(channel_config, axis_config, 1)
+ ... # doctest: +ELLIPSIS
+ 2.7...e-06
+ >>> convert_bits_to_meters(
+ ... channel_config, axis_config,
+ ... _numpy.array([-1, 0, 1, 2], dtype=_numpy.float))
+ ... # doctest: +ELLIPSIS
+ array([ 1.6...e-07, 9.6...e-07, 2.7...e-06,
+ 5.4...e-06])
+ """
+ data = convert_bits_to_volts(channel_config, data)
+ return convert_volts_to_meters(axis_config, data)
+
+def convert_meters_to_bits(channel_config, axis_config, data):
+ """Convert meter-valued data to volts.
+
+ >>> channel_config = _config._ChannelConfig()
+ >>> channel_config['inverse-conversion-coefficients'] = [1, 2, 3]
+ >>> channel_config['inverse-conversion-origin'] = -1
+ >>> axis_config = _config._AxisConfig()
+ >>> axis_config['gain'] = 20.0
+ >>> axis_config['sensitivity'] = 8e-9
+ >>> convert_meters_to_bits(channel_config, axis_config, 1.6e-7)
+ 17.0
+ >>> convert_meters_to_bits(
+ ... channel_config, axis_config,
+ ... _numpy.array([1.6e-7, 9.6e-7, 2.72e-6, 5.44e-6],
+ ... dtype=_numpy.float))
+ array([ 17., 162., 1009., 3746.])
+ """
+ data = convert_meters_to_volts(axis_config, data)
+ return convert_volts_to_bits(channel_config, data)
+
+def _setup_channel_config(config, channel):
+ """Initialize the `ChannelConfig` `config` using the
+ `AnalogChannel` `channel`.
+ """
+ config['device'] = channel.subdevice.device.filename
+ config['subdevice'] = channel.subdevice.index
+ config['channel'] = channel.index
+ config['maxdata'] = channel.get_maxdata()
+ config['range'] = channel.range.value
+ converter = channel.get_converter()
+ config['conversion-origin'
+ ] = converter.get_to_physical_expansion_origin()
+ config['conversion-coefficients'
+ ] = converter.get_to_physical_coefficients()
+ config['inverse-conversion-origin'
+ ] = converter.get_from_physical_expansion_origin()
+ config['inverse-conversion-coefficients'
+ ] = converter.get_from_physical_coefficients()
+
+
+class PiezoAxis (object):
+ """A one-dimensional piezoelectric actuator.
+
+ If used, the montoring channel must (as of now) be on the same
+ device as the controlling channel.
+
+ >>> from pprint import pprint
+ >>> from pycomedi.device import Device
+ >>> from pycomedi.subdevice import StreamingSubdevice
+ >>> from pycomedi.channel import AnalogChannel
+ >>> from pycomedi.constant import (AREF, SUBDEVICE_TYPE, UNIT)
+
+ >>> d = Device('/dev/comedi0')
+ >>> d.open()
+
+ >>> s_in = d.find_subdevice_by_type(SUBDEVICE_TYPE.ai,
+ ... factory=StreamingSubdevice)
+ >>> s_out = d.find_subdevice_by_type(SUBDEVICE_TYPE.ao,
+ ... factory=StreamingSubdevice)
+
+ >>> axis_channel = s_out.channel(
+ ... 0, factory=AnalogChannel, aref=AREF.ground)
+ >>> monitor_channel = s_in.channel(
+ ... 0, factory=AnalogChannel, aref=AREF.diff)
+ >>> for chan in [axis_channel, monitor_channel]:
+ ... chan.range = chan.find_range(unit=UNIT.volt, min=-10, max=10)
+
+ >>> axis_config = _config._AxisConfig()
+ >>> axis_config.update({'gain':20, 'sensitivity':8e-9})
+ >>> axis_channel_config = _config._ChannelConfig()
+ >>> monitor_channel_config = _config._ChannelConfig()
+ >>> monitor_channel_config['device'] = '/dev/comediX'
+
+ >>> p = PiezoAxis(axis_config=axis_config,
+ ... axis_channel_config=axis_channel_config,
+ ... monitor_channel_config=monitor_channel_config)
+ ... # doctest: +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ NotImplementedError: piezo axis control and monitor on different devices
+ (/dev/comedi0 and /dev/comediX)
+
+ >>> monitor_channel_config['device'] = axis_channel_config['device']
+ >>> p = PiezoAxis(axis_config=axis_config,
+ ... axis_channel_config=axis_channel_config,
+ ... monitor_channel_config=monitor_channel_config,
+ ... axis_channel=axis_channel, monitor_channel=monitor_channel)
+
+ >>> p.setup_config()
+ >>> pprint(axis_channel_config)
+ {'channel': 0,
+ 'conversion-coefficients': array([ -1.00000000e+01, 3.05180438e-04]),
+ 'conversion-origin': 0.0,
+ 'device': '/dev/comedi0',
+ 'inverse-conversion-coefficients': array([ 0. , 3276.75]),
+ 'inverse-conversion-origin': -10.0,
+ 'maxdata': 65535L,
+ 'range': 0,
+ 'subdevice': 1}
+ >>> pprint(monitor_channel_config)
+ {'channel': 0,
+ 'conversion-coefficients': array([ -1.00000000e+01, 3.05180438e-04]),
+ 'conversion-origin': 0.0,
+ 'device': '/dev/comedi0',
+ 'inverse-conversion-coefficients': array([ 0. , 3276.75]),
+ 'inverse-conversion-origin': -10.0,
+ 'maxdata': 65535L,
+ 'range': 0,
+ 'subdevice': 0}
+
+ >>> convert_bits_to_meters(p.axis_channel_config, p.axis_config, 0)
+ ... # doctest: +ELLIPSIS
+ -1.6...e-06
+
+ >>> d.close()
+ """
+ def __init__(self, axis_config, axis_channel_config,
+ monitor_channel_config=None,
+ axis_channel=None, monitor_channel=None,
+ name = None):
+ self.axis_config = axis_config
+ self.axis_channel_config = axis_channel_config
+ self.monitor_channel_config = monitor_channel_config
+ if (monitor_channel_config and
+ axis_channel_config['device'] != monitor_channel_config['device']):
+ raise NotImplementedError(
+ ('piezo axis control and monitor on different devices '
+ '(%s and %s)') % (
+ axis_channel_config['device'],
+ monitor_channel_config['device']))
+ if not axis_channel:
+ raise NotImplementedError(
+ 'pypiezo not yet capable of opening its own axis channel')
+ #axis_channel = pycomedi...
+ self.axis_channel = axis_channel
+ if monitor_channel_config and not monitor_channel:
+ raise NotImplementedError(
+ 'pypiezo not yet capable of opening its own monitor channel')
+ #monitor_channel = pycomedi...
+ self.monitor_channel = monitor_channel
+ self.name = name
+
+ def setup_config(self):
+ "Initialize the axis (and monitor) configs."
+ _setup_channel_config(self.axis_channel_config, self.axis_channel)
+ if self.monitor_channel:
+ _setup_channel_config(
+ self.monitor_channel_config, self.monitor_channel)
+ if self.axis_config['minimum'] is None:
+ self.axis_config['minimum'] = convert_bits_to_volts(
+ self.axis_channel_config, 0)
+ if self.axis_config['maximum'] is None:
+ self.axis_config['maximum'] = convert_bits_to_volts(
+ self.axis_channel_config, self.axis_channel.get_maxdata())
+
+
+class InputChannel(object):
+ """An input channel monitoring some interesting parameter.
+
+ >>> from pprint import pprint
+ >>> from pycomedi.device import Device
+ >>> from pycomedi.subdevice import StreamingSubdevice
+ >>> from pycomedi.channel import AnalogChannel
+ >>> from pycomedi.constant import AREF, SUBDEVICE_TYPE, UNIT
+
+ >>> d = Device('/dev/comedi0')
+ >>> d.open()
+
+ >>> s = d.find_subdevice_by_type(SUBDEVICE_TYPE.ai,
+ ... factory=StreamingSubdevice)
+
+ >>> channel = s.channel(0, factory=AnalogChannel, aref=AREF.diff)
+ >>> channel.range = channel.find_range(unit=UNIT.volt, min=-10, max=10)
+
+ >>> channel_config = _config._ChannelConfig()
+
+ >>> c = InputChannel(channel_config=channel_config, channel=channel)
+ >>> c.setup_config()
+ >>> pprint(channel_config)
+ {'channel': 0,
+ 'conversion-coefficients': array([ -1.00000000e+01, 3.05180438e-04]),
+ 'conversion-origin': 0.0,
+ 'device': '/dev/comedi0',
+ 'inverse-conversion-coefficients': array([ 0. , 3276.75]),
+ 'inverse-conversion-origin': -10.0,
+ 'maxdata': 65535L,
+ 'range': 0,
+ 'subdevice': 0}
+
+ >>> convert_bits_to_volts(c.channel_config, 0)
+ -10.0
+
+ >>> d.close()
+ """
+ def __init__(self, channel_config, channel=None, name=None):
+ self.channel_config = channel_config
+ if not channel:
+ raise NotImplementedError(
+ 'pypiezo not yet capable of opening its own channel')
+ #channel = pycomedi...
+ self.channel = channel
+ self.name = name
+
+ def setup_config(self):
+ _setup_channel_config(self.channel_config, self.channel)
+
+
+class Piezo (object):
+ """A piezo actuator-controlled experiment.
+
+ >>> from pprint import pprint
+ >>> from pycomedi.device import Device
+ >>> from pycomedi.subdevice import StreamingSubdevice
+ >>> from pycomedi.channel import AnalogChannel
+ >>> from pycomedi.constant import AREF, SUBDEVICE_TYPE, UNIT
+
+ >>> d = Device('/dev/comedi0')
+ >>> d.open()
+
+ >>> s_in = d.find_subdevice_by_type(SUBDEVICE_TYPE.ai,
+ ... factory=StreamingSubdevice)
+ >>> s_out = d.find_subdevice_by_type(SUBDEVICE_TYPE.ao,
+ ... factory=StreamingSubdevice)
+
+ >>> axis_channel = s_out.channel(
+ ... 0, factory=AnalogChannel, aref=AREF.ground)
+ >>> monitor_channel = s_in.channel(
+ ... 0, factory=AnalogChannel, aref=AREF.diff)
+ >>> input_channel = s_in.channel(1, factory=AnalogChannel, aref=AREF.diff)
+ >>> for chan in [axis_channel, monitor_channel, input_channel]:
+ ... chan.range = chan.find_range(unit=UNIT.volt, min=-10, max=10)
+
+ >>> axis_config = _config._AxisConfig()
+ >>> axis_config.update({'gain':20, 'sensitivity':8e-9})
+ >>> axis_channel_config = _config._ChannelConfig()
+ >>> monitor_channel_config = _config._ChannelConfig()
+ >>> input_channel_config = _config._ChannelConfig()
+
+ >>> a = PiezoAxis(axis_config=axis_config,
+ ... axis_channel_config=axis_channel_config,
+ ... monitor_channel_config=monitor_channel_config,
+ ... axis_channel=axis_channel, monitor_channel=monitor_channel,
+ ... name='z')
+ >>> a.setup_config()
+
+ >>> c = InputChannel(
+ ... channel_config=input_channel_config, channel=input_channel,
+ ... name='some-input')
+ >>> c.setup_config()
+
+ >>> p = Piezo(axes=[a], input_channels=[c])
+ >>> inputs = p.read_inputs()
+ >>> pprint(inputs) # doctest: +SKIP
+ {'some-input': 34494L, 'z-monitor': 32669L}
+
+ >>> pos = convert_volts_to_bits(p.axes[0].axis_channel_config, 0)
+ >>> pos
+ 32767.5
+ >>> p.jump('z', pos)
+ >>> p.last_output == {'z': int(pos)}
+ True
+
+ :meth:`ramp` raises an error if passed an invalid `data` `dtype`.
+
+ >>> output_data = _numpy.arange(0, int(pos), step=int(pos/10),
+ ... dtype=_numpy.float)
+ >>> output_data = output_data.reshape((len(output_data), 1))
+ >>> input_data = p.ramp(data=output_data, frequency=10,
+ ... output_names=['z'], input_names=['z-monitor', 'some-input'])
+ Traceback (most recent call last):
+ ...
+ ValueError: output dtype float64 does not match expected <type 'numpy.uint16'>
+ >>> output_data = _numpy.arange(0, int(pos), step=int(pos/10),
+ ... dtype=p.channel_dtype('z', direction='output'))
+ >>> output_data = output_data.reshape((len(output_data), 1))
+ >>> input_data = p.ramp(data=output_data, frequency=10,
+ ... output_names=['z'], input_names=['z-monitor', 'some-input'])
+ >>> input_data # doctest: +SKIP
+ array([[ 0, 25219],
+ [ 3101, 23553],
+ [ 6384, 22341],
+ [ 9664, 21465],
+ [12949, 20896],
+ [16232, 20614],
+ [19516, 20588],
+ [22799, 20801],
+ [26081, 21233],
+ [29366, 21870],
+ [32646, 22686]], dtype=uint16)
+
+ >>> p.last_output == {'z': output_data[-1]}
+ True
+
+ >>> data = p.named_ramp(data=output_data, frequency=10,
+ ... output_names=['z'], input_names=['z-monitor', 'some-input'])
+ >>> pprint(data) # doctest: +ELLIPSIS, +SKIP
+ {'some-input': array([21666, 20566, ..., 22395], dtype=uint16),
+ 'z': array([ 0, 3276, ..., 32760], dtype=uint16),
+ 'z-monitor': array([ 3102, 6384, ..., 32647], dtype=uint16)}
+
+ >>> d.close()
+ """
+ def __init__(self, axes, input_channels, base_config=None):
+ self.axes = axes
+ self.input_channels = input_channels
+ self.last_output = {}
+
+ def axis_by_name(self, name):
+ "Get an axis by its name."
+ for axis in self.axes:
+ if axis.name == name:
+ return axis
+ raise ValueError(name)
+
+ def channels(self, direction=None):
+ """Iterate through all `(name, channel)` tuples.
+
+ =========== ===================
+ `direction` Returned channels
+ =========== ===================
+ 'input' all input channels
+ 'output' all output channels
+ None all channels
+ =========== ===================
+ """
+ if direction not in ('input', 'output', None):
+ raise ValueError(direction)
+ for a in self.axes:
+ if direction != 'input':
+ yield (a.name, a.axis_channel)
+ if a.monitor_channel and direction != 'output':
+ yield ('%s-monitor' % a.name, a.monitor_channel)
+ if direction != 'output':
+ for c in self.input_channels:
+ yield (c.name, c.channel)
+
+ def channel_by_name(self, name, direction=None):
+ """Get a channel by its name.
+
+ Setting `direction` (see :meth:`channels`) may allow a more
+ efficient search.
+ """
+ for n,channel in self.channels(direction=direction):
+ if n == name:
+ return channel
+ raise ValueError(name)
+
+ def channel_dtype(self, channel_name, direction=None):
+ """Get a channel's data type by name.
+
+ Setting `direction` (see :meth:`channels`) may allow a more
+ efficient search.
+ """
+ channel = self.channel_by_name(name=channel_name, direction=direction)
+ return channel.subdevice.get_dtype()
+
+ def read_inputs(self):
+ "Read all inputs and return a `name`->`value` dictionary."
+ # There is no multi-channel read instruction, so preform reads
+ # sequentially.
+ ret = dict([(n, c.data_read()) for n,c in self.channels('input')])
+ _LOG.debug('current position: %s' % ret)
+ return ret
+
+ def jump(self, axis_name, position):
+ "Move the output named `axis_name` to `position`."
+ _LOG.debug('jump %s to %s' % (axis_name, position))
+ position = int(position)
+ channel = self.channel_by_name(name=axis_name)
+ channel.data_write(position)
+ self.last_output[axis_name] = position
+
+ def ramp(self, data, frequency, output_names, input_names=()):
+ """Synchronized IO ramp writing `data` and reading `in_names`.
+
+ Parameters
+ ----------
+ data : numpy array-like
+ Row for each cycle, column for each output channel.
+ frequency : float
+ Target cycle frequency in Hz.
+ output_names : list of strings
+ Names of output channels in the same order as the columns
+ of `data`.
+ input_names : list of strings
+ Names of input channels to monitor in the same order as
+ the columns of the returned array.
+ """
+ if len(data.shape) != 2:
+ raise ValueError(
+ 'ramp data must be two dimensional, not %d' % len(data.shape))
+ if data.shape[1] != len(output_names):
+ raise ValueError(
+ 'ramp data should have on column for each input, '
+ 'but has %d columns for %d inputs'
+ % (data.shape[1], len(output_names)))
+ n_samps = data.shape[0]
+ log_string = 'ramp %d samples at %g Hz. out: %s, in: %s' % (
+ n_samps, frequency, output_names, input_names)
+ _LOG.debug(log_string) # _LOG on one line for easy commenting-out
+ # TODO: check range?
+ output_channels = [self.channel_by_name(name=n, direction='output')
+ for n in output_names]
+ input_channels = [self.channel_by_name(name=n, direction='input')
+ for n in input_names]
+
+ ao_subdevice = output_channels[0].subdevice
+ ai_subdevice = input_channels[0].subdevice
+ device = ao_subdevice.device
+
+ output_dtype = ao_subdevice.get_dtype()
+ if data.dtype != output_dtype:
+ raise ValueError('output dtype %s does not match expected %s'
+ % (data.dtype, output_dtype))
+ input_data = _numpy.ndarray(
+ (n_samps, len(input_channels)), dtype=ai_subdevice.get_dtype())
+
+ _LOG.debug('setup ramp commands')
+ scan_period_ns = int(1e9 / frequency)
+ ai_cmd = ai_subdevice.get_cmd_generic_timed(
+ len(input_channels), scan_period_ns)
+ ao_cmd = ao_subdevice.get_cmd_generic_timed(
+ len(output_channels), scan_period_ns)
+
+ ai_cmd.start_src = TRIG_SRC.int
+ ai_cmd.start_arg = 0
+ ai_cmd.stop_src = TRIG_SRC.count
+ ai_cmd.stop_arg = n_samps
+ ai_cmd.chanlist = input_channels
+ #ao_cmd.start_src = TRIG_SRC.ext
+ #ao_cmd.start_arg = 18 # NI card AI_START1 internal AI start signal
+ ao_cmd.start_src = TRIG_SRC.int
+ ao_cmd.start_arg = 0
+ ao_cmd.stop_src = TRIG_SRC.count
+ ao_cmd.stop_arg = n_samps-1
+ ao_cmd.chanlist = output_channels
+
+ ai_subdevice.cmd = ai_cmd
+ ao_subdevice.cmd = ao_cmd
+ for i in range(3):
+ rc = ai_subdevice.command_test()
+ if rc is None: break
+ _LOG.debug('analog input test %d: %s' % (i, rc))
+ for i in range(3):
+ rc = ao_subdevice.command_test()
+ if rc is None: break
+ _LOG.debug('analog output test %d: %s' % (i, rc))
+
+ _LOG.debug('lock subdevices for ramp')
+ ao_subdevice.lock()
+ try:
+ ai_subdevice.lock()
+ try:
+ _LOG.debug('load ramp commands')
+ ao_subdevice.command()
+ ai_subdevice.command()
+
+ writer = Writer(ao_subdevice, data)
+ writer.start()
+ reader = Reader(ai_subdevice, input_data)
+ reader.start()
+
+ _LOG.debug('arm analog output')
+ device.do_insn(inttrig_insn(ao_subdevice))
+ _LOG.debug('trigger ramp (via analog input)')
+ device.do_insn(inttrig_insn(ai_subdevice))
+ _LOG.debug('ramp running')
+
+ writer.join()
+ reader.join()
+ _LOG.debug('ramp complete')
+ finally:
+ #_LOG.debug('AI flags: %s' % ai_subdevice.get_flags())
+ ai_subdevice.cancel()
+ ai_subdevice.unlock()
+ finally:
+ # release busy flag, which seems to not be cleared
+ # automatically. See
+ # http://groups.google.com/group/comedi_list/browse_thread/thread/4c7040989197abad/
+ #_LOG.debug('AO flags: %s' % ao_subdevice.get_flags())
+ ao_subdevice.cancel()
+ ao_subdevice.unlock()
+ _LOG.debug('unlocked subdevices after ramp')
+
+ for i,name in enumerate(output_names):
+ self.last_output[name] = data[-1,i]
+
+ if _base_config['matplotlib']:
+ if not _matplotlib:
+ raise _matplotlib_import_error
+ figure = _matplotlib_pyplot.figure()
+ axes = figure.add_subplot(1, 1, 1)
+ axes.hold(True)
+ timestamp = _time.strftime('%H%M%S')
+ axes.set_title('piezo ramp %s' % timestamp)
+ for d,names in [(data, output_names),
+ (input_data, input_names)]:
+ for i,name in enumerate(names):
+ axes.plot(d[:,i], label=name)
+ figure.show()
+ return input_data
+
+ def named_ramp(self, data, frequency, output_names, input_names=()):
+ input_data = self.ramp(
+ data=data, frequency=frequency, output_names=output_names,
+ input_names=input_names)
+ ret = {}
+ for i,name in enumerate(output_names):
+ ret[name] = data[:,i]
+ for i,name in enumerate(input_names):
+ ret[name] = input_data[:,i]
+ return ret
--- /dev/null
+"""Piezo configuration
+
+Broken out from the main modules to make it easy to override if you
+wish to use a different configuration file format.
+
+The HDF5- and YAML-backed config file classes are created dynamically:
+
+>>> print '\\n'.join([obj for obj in sorted(locals().keys())
+... if obj.endswith('Config')
+... and not obj.startswith('_')])
+HDF5_AxisConfig
+HDF5_BaseConfig
+HDF5_ChannelConfig
+HDF5_InputChannelConfig
+HDF5_OutputChannelConfig
+YAML_AxisConfig
+YAML_BaseConfig
+YAML_ChannelConfig
+YAML_InputChannelConfig
+YAML_OutputChannelConfig
+
+The first time you use them, the file they create will probably be
+empty or not exist.
+
+>>> import os
+>>> import tempfile
+>>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='pypiezo-')
+>>> os.close(fd)
+
+>>> c = HDF5_BaseConfig(filename=filename, group='/base')
+>>> c.load()
+
+Loading will create a stub group group if it hadn't existed before.
+
+>>> pprint_HDF5(filename)
+/
+ /base
+>>> print c.dump(from_file=True)
+<BLANKLINE>
+
+Saving fills in all the config values.
+
+>>> c['syslog'] = True
+>>> c.save()
+>>> pprint_HDF5(filename) # doctest: +REPORT_UDIFF
+/
+ /base
+ <HDF5 dataset "log-level": shape (), type "|S4">
+ warn
+ <HDF5 dataset "matplotlib": shape (), type "|S2">
+ no
+ <HDF5 dataset "syslog": shape (), type "|S3">
+ yes
+>>> print c.dump(from_file=True)
+log-level: warn
+matplotlib: no
+syslog: yes
+
+If you want more details, you can dump with help strings.
+
+>>> print c.dump(help=True, from_file=True) # doctest: +NORMALIZE_WHITESPACE
+log-level: warn\t(Module logging level. Default: warn. Choices:
+ \t critical, error, warn, info, debug)
+matplotlib: no\t(Plot piezo motion using `matplotlib`. Default: no.
+ \t Choices: yes, no)
+syslog: yes\t(Log to syslog (otherwise log to stderr). Default: no.
+ \t Choices: yes, no)
+
+Settings also support `None`, even if they have numeric types.
+
+>>> c = HDF5_AxisConfig(filename=filename, group='/z-axis')
+>>> c.load()
+>>> c.save()
+>>> c.load()
+>>> print c.dump(from_file=True)
+gain: 1.0
+maximum: None
+minimum: None
+sensitivity: 1.0
+>>> print (c['minimum'] == None)
+True
+
+Cleanup our temporary config file.
+
+>>> os.remove(filename)
+"""
+
+import logging as _logging
+import os.path as _os_path
+import sys as _sys
+
+import h5py as _h5py
+import yaml as _yaml
+
+from . import LOG as _LOG
+
+
+class _Setting (object):
+ "A named setting with arbitrart text values."
+ def __init__(self, name, help='', default=None):
+ self.name = name
+ self._help = help
+ self.default = default
+
+ def __str__(self):
+ return '<%s %s>' % (self.__class__.__name__, self.name)
+
+ def __repr__(self):
+ return self.__str__()
+
+ def help(self):
+ ret = '%s Default: %s.' % (
+ self._help, self.convert_to_text(self.default))
+ return ret.strip()
+
+ def convert_from_text(self, value):
+ return value
+
+ def convert_to_text(self, value):
+ return value
+
+
+class _ChoiceSetting (_Setting):
+ """A named setting with a limited number of possible values.
+
+ `choices` should be a list of `(config_file_value, Python value)`
+ pairs. For example
+
+ >>> s = _ChoiceSetting(name='bool',
+ ... choices=[('yes', True), ('no', False)])
+ >>> s.convert_from_text('yes')
+ True
+ >>> s.convert_to_text(True)
+ 'yes'
+ >>> s.convert_to_text('invalid')
+ Traceback (most recent call last):
+ ...
+ ValueError: invalid
+ >>> s.help()
+ 'Default: yes. Choices: yes, no'
+ """
+ def __init__(self, choices=None, **kwargs):
+ if 'default' not in kwargs:
+ if None not in [keyval[1] for keyval in choices]:
+ kwargs['default'] = choices[0][1]
+ super(_ChoiceSetting, self).__init__(**kwargs)
+ if choices == None:
+ choices = []
+ self.choices = choices
+
+ def help(self):
+ ret = '%s Choices: %s' % (
+ super(_ChoiceSetting, self).help(),
+ ', '.join([key for key,value in self.choices]))
+ return ret.strip()
+
+ def convert_from_text(self, value):
+ return dict(self.choices)[value]
+
+ def convert_to_text(self, value):
+ for keyval in self.choices:
+ key,val = keyval
+ if val == value:
+ return key
+ raise ValueError(value)
+
+
+class _BooleanSetting (_ChoiceSetting):
+ """A named settubg that can be either true or false.
+
+ >>> s = _BooleanSetting(name='bool')
+
+ >>> s.convert_from_text('yes')
+ True
+ >>> s.convert_to_text(True)
+ 'yes'
+ >>> s.convert_to_text('invalid')
+ Traceback (most recent call last):
+ ...
+ ValueError: invalid
+ >>> s.help()
+ 'Default: no. Choices: yes, no'
+ """
+ def __init__(self, **kwargs):
+ assert 'choices' not in kwargs
+ if 'default' not in kwargs:
+ kwargs['default'] = False
+ super(_BooleanSetting, self).__init__(
+ choices=[('yes', True), ('no', False)], **kwargs)
+
+
+class _NumericSetting (_Setting):
+ """A named setting with numeric values.
+
+ >>> s = _NumericSetting(name='float')
+ >>> s.default
+ 0
+ >>> s.convert_to_text(13)
+ '13'
+ """
+ _default_value = 0
+
+ def __init__(self, **kwargs):
+ if 'default' not in kwargs:
+ kwargs['default'] = self._default_value
+ super(_NumericSetting, self).__init__(**kwargs)
+
+ def convert_to_text(self, value):
+ return str(value)
+
+ def convert_from_text(self, value):
+ if value in [None, 'None']:
+ return None
+ return self._convert_from_text(value)
+
+ def _convert_from_text(self, value):
+ raise NotImplementedError()
+
+
+class _IntegerSetting (_NumericSetting):
+ """A named setting with integer values.
+
+ >>> s = _IntegerSetting(name='int')
+ >>> s.default
+ 1
+ >>> s.convert_from_text('8')
+ 8
+ """
+ _default_value = 1
+
+ def _convert_from_text(self, value):
+ return int(value)
+
+
+class _FloatSetting (_NumericSetting):
+ """A named setting with floating point values.
+
+ >>> s = _FloatSetting(name='float')
+ >>> s.default
+ 1.0
+ >>> s.convert_from_text('8')
+ 8.0
+ >>> s.convert_from_text('invalid')
+ Traceback (most recent call last):
+ ...
+ ValueError: invalid literal for float(): invalid
+ """
+ _default_value = 1.0
+
+ def _convert_from_text(self, value):
+ return float(value)
+
+
+class _FloatListSetting (_Setting):
+ """A named setting with a list of floating point values.
+
+ >>> s = _FloatListSetting(name='floatlist')
+ >>> s.default
+ []
+ >>> s.convert_to_text([1, 2.3])
+ '1, 2.3'
+ >>> s.convert_from_text('4.5, -6.7') # doctest: +ELLIPSIS
+ [4.5, -6.700...]
+ >>> s.convert_to_text([])
+ ''
+ >>> s.convert_from_text('')
+ []
+ """
+ def __init__(self, **kwargs):
+ if 'default' not in kwargs:
+ kwargs['default'] = []
+ super(_FloatListSetting, self).__init__(**kwargs)
+
+ def _convert_from_text(self, value):
+ if value is None:
+ return value
+ return float(value)
+
+ def convert_from_text(self, value):
+ if value is None:
+ return None
+ elif value == '':
+ return []
+ return [self._convert_from_text(x) for x in value.split(',')]
+
+ def convert_to_text(self, value):
+ if value is None:
+ return None
+ return ', '.join([str(x) for x in value])
+
+
+class _Config (dict):
+ "A class with a list `._keys` of `_Setting`\s."
+ settings = []
+
+ def __init__(self):
+ for s in self.settings:
+ self[s.name] = s.default
+
+ def dump(self, help=False):
+ """Return all settings and their values as a string
+
+ >>> b = _BaseConfig()
+ >>> print b.dump()
+ syslog: no
+ matplotlib: no
+ log-level: warn
+ >>> print b.dump(help=True) # doctest: +NORMALIZE_WHITESPACE
+ syslog: no (Log to syslog (otherwise log to stderr).
+ Default: no. Choices: yes, no)
+ matplotlib: no (Plot piezo motion using `matplotlib`.
+ Default: no. Choices: yes, no)
+ log-level: warn (Module logging level. Default: warn.
+ Choices: critical, error, warn, info, debug)
+ """
+ lines = []
+ settings = dict([(s.name, s) for s in self.settings])
+ for key,value in self.iteritems():
+ if key in settings:
+ setting = settings[key]
+ value_string = setting.convert_to_text(value)
+ if help:
+ help_string = '\t(%s)' % setting.help()
+ else:
+ help_string = ''
+ lines.append('%s: %s%s' % (key, value_string, help_string))
+ return '\n'.join(lines)
+
+
+class _BackedConfig (_Config):
+ "A `_Config` instance with some kind of storage interface"
+ def load(self):
+ raise NotImplementedError()
+
+ def save(self):
+ raise NotImplementedError()
+
+
+class _BaseConfig (_Config):
+ """Configure `pypiezo` module operation
+
+ >>> b = _BaseConfig()
+ >>> b.settings # doctest: +NORMALIZE_WHITESPACE
+ [<_ChoiceSetting log-level>, <_BooleanSetting syslog>,
+ <_BooleanSetting matplotlib>]
+ >>> print b['log-level'] == _logging.WARN
+ True
+ """
+ settings = [
+ _ChoiceSetting(
+ name='log-level',
+ help='Module logging level.',
+ default=_logging.WARN,
+ choices=[
+ ('critical', _logging.CRITICAL),
+ ('error', _logging.ERROR),
+ ('warn', _logging.WARN),
+ ('info', _logging.INFO),
+ ('debug', _logging.DEBUG),
+ ]),
+ _BooleanSetting(
+ name='syslog',
+ help='Log to syslog (otherwise log to stderr).',
+ default=False),
+ _BooleanSetting(
+ name='matplotlib',
+ help='Plot piezo motion using `matplotlib`.',
+ default=False),
+ ]
+
+
+class _AxisConfig (_Config):
+ "Configure a single piezo axis"
+ settings = [
+ _FloatSetting(
+ name='gain',
+ help=(
+ 'Volts applied at piezo per volt output from the DAQ card '
+ '(e.g. if your DAQ output is amplified before driving the '
+ 'piezo),')),
+ _FloatSetting(
+ name='sensitivity',
+ help='Meters of piezo deflection per volt applied to the piezo.'),
+ _FloatSetting(
+ name='minimum',
+ help='Set a lower limit on allowed output voltage',
+ default=None),
+ _FloatSetting(
+ name='maximum',
+ help='Set an upper limit on allowed output voltage',
+ default=None),
+ ]
+
+
+class _ChannelConfig (_Config):
+ settings = [
+ _Setting(
+ name='device',
+ help='Comedi device.',
+ default='/dev/comedi0'),
+ _IntegerSetting(
+ name='subdevice',
+ help='Comedi subdevice index. -1 for automatic detection.',
+ default=-1),
+ _IntegerSetting(
+ name='channel',
+ help='Subdevice channel index.',
+ default=0),
+ _IntegerSetting(
+ name='maxdata',
+ help="Channel's maximum bit value."),
+ _IntegerSetting(
+ name='range',
+ help="Channel's selected range index."),
+ _FloatListSetting(
+ name='conversion-coefficients',
+ help=('Bit to physical unit conversion coefficients starting with '
+ 'the constant coefficient.')),
+ _FloatSetting(
+ name='conversion-origin',
+ help=('Origin (bit offset) of bit to physical polynomial '
+ 'expansion.')),
+ _FloatListSetting(
+ name='inverse-conversion-coefficients',
+ help=('Physical unit to bit conversion coefficients starting with '
+ 'the constant coefficient.')),
+ _FloatSetting(
+ name='inverse-conversion-origin',
+ help=('Origin (physical unit offset) of physical to bit '
+ 'polynomial expansion.')),
+ ]
+
+
+class _OutputChannelConfig (_ChannelConfig):
+ pass
+
+
+class _InputChannelConfig (_ChannelConfig):
+ pass
+
+
+def pprint_HDF5(*args, **kwargs):
+ print pformat_HDF5(*args, **kwargs)
+
+def pformat_HDF5(filename, group='/'):
+ f = _h5py.File(filename, 'r')
+ cwg = f[group]
+ return '\n'.join(_pformat_hdf5(cwg))
+
+def _pformat_hdf5(cwg, depth=0):
+ lines = []
+ lines.append(' '*depth + cwg.name)
+ depth += 1
+ for key,value in cwg.iteritems():
+ if isinstance(value, _h5py.Group):
+ lines.extend(_pformat_hdf5(value, depth))
+ elif isinstance(value, _h5py.Dataset):
+ lines.append(' '*depth + str(value))
+ lines.append(' '*(depth+1) + str(value.value))
+ else:
+ lines.append(' '*depth + str(value))
+ return lines
+
+
+class _HDF5Config (_BackedConfig):
+ """Mixin to back a `_Config` class with an HDF5 file.
+
+ TODO: Special handling for Choice (enums), FloatList (arrays), etc.?
+
+ The `.save` and `.load` methods have an optional `group` argument
+ that allows you to save and load settings from an externally
+ opened HDF5 file. This can make it easier to stash several
+ related `_Config` classes in a single file. For example
+
+ >>> import os
+ >>> import tempfile
+ >>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='pypiezo-')
+ >>> os.close(fd)
+
+ >>> f = _h5py.File(filename, 'a')
+ >>> c = HDF5_BaseConfig(filename='untouched_file.h5',
+ ... group='/untouched/group')
+ >>> c['syslog'] = True
+ >>> group = f.create_group('base')
+ >>> c.save(group)
+ >>> pprint_HDF5(filename)
+ /
+ /base
+ <HDF5 dataset "log-level": shape (), type "|S4">
+ warn
+ <HDF5 dataset "matplotlib": shape (), type "|S2">
+ no
+ <HDF5 dataset "syslog": shape (), type "|S3">
+ yes
+ >>> d = HDF5_BaseConfig(filename='untouched_file.h5',
+ ... group='/untouched/group')
+ >>> d.load(group)
+ >>> d['syslog']
+ True
+
+ >>> f.close()
+ >>> os.remove(filename)
+ """
+ def __init__(self, filename, group='/', **kwargs):
+ super(_HDF5Config, self).__init__(**kwargs)
+ self.filename = filename
+ assert group.startswith('/'), group
+ if not group.endswith('/'):
+ group += '/'
+ self.group = group
+ self._file_checked = False
+
+ def _check_file(self):
+ if self._file_checked:
+ return
+ self._setup_file()
+ self._file_checked = True
+
+ def _setup_file(self):
+ f = _h5py.File(self.filename, 'a')
+ cwg = f # current working group
+
+ # Create the group where the settings are stored (if necessary)
+ gpath = ['']
+ for group in self.group.strip('/').split('/'):
+ gpath.append(group)
+ if group not in cwg.keys():
+ _LOG.debug('creating group %s in %s'
+ % ('/'.join(gpath), self.filename))
+ cwg.create_group(group)
+ cwg = cwg[group]
+ f.close()
+
+ def dump(self, help=False, from_file=False):
+ """Return the relevant group in `self.filename` as a string
+
+ Extends the base :meth:`dump` by adding the `from_file`
+ option. If `from_file` is true, dump all entries that
+ currently exist in the relevant group, rather than listing all
+ settings defined in the instance dictionary.
+ """
+ if from_file:
+ self._check_file()
+ f = _h5py.File(self.filename, 'r')
+ cwg = f[self.group]
+ lines = []
+ settings = dict([(s.name, s) for s in self.settings])
+ for key,value in cwg.iteritems():
+ if help and key in settings:
+ help_string = '\t(%s)' % settings[key].help()
+ else:
+ help_string = ''
+ lines.append('%s: %s%s' % (key, value.value, help_string))
+ return '\n'.join(lines)
+ return super(_HDF5Config, self).dump(help=help)
+
+ def load(self, group=None):
+ if group is None:
+ self._check_file()
+ f = _h5py.File(self.filename, 'r')
+ group = f[self.group]
+ else:
+ f = None
+ for s in self.settings:
+ if s.name not in group.keys():
+ continue
+ self[s.name] = s.convert_from_text(group[s.name].value)
+ if f:
+ f.close()
+
+ def save(self, group=None):
+ if group is None:
+ self._check_file()
+ f = _h5py.File(self.filename, 'a')
+ group = f[self.group]
+ else:
+ f = None
+ for s in self.settings:
+ group[s.name] = s.convert_to_text(self[s.name])
+ if f:
+ f.close()
+
+
+class _YAMLDumper (_yaml.SafeDumper):
+ def represent_bool(self, data):
+ "Use yes/no instead of the default true/false"
+ if data:
+ value = u'yes'
+ else:
+ value = u'no'
+ return self.represent_scalar(u'tag:yaml.org,2002:bool', value)
+
+
+_YAMLDumper.add_representer(bool, _YAMLDumper.represent_bool)
+
+
+class _YAMLConfig (_BackedConfig):
+ """Mixin to back a `_Config` class with a YAML file.
+
+ TODO: Special handling for Choice (enums), FloatList (arrays), etc.?
+
+ >>> import os
+ >>> import os.path
+ >>> import tempfile
+ >>> fd,filename = tempfile.mkstemp(suffix='.yaml', prefix='pypiezo-')
+ >>> os.close(fd)
+
+ >>> c = YAML_BaseConfig(filename=filename)
+ >>> c.load()
+
+ Saving writes all the config values to disk.
+
+ >>> c['syslog'] = True
+ >>> c.save()
+ >>> print open(c.filename, 'r').read()
+ log-level: warn
+ matplotlib: no
+ syslog: yes
+ <BLANKLINE>
+
+ Loading reads the config files from disk.
+
+ >>> c = YAML_BaseConfig(filename=filename)
+ >>> c.load()
+ >>> print c.dump()
+ syslog: yes
+ matplotlib: no
+ log-level: warn
+
+ Cleanup our temporary config file.
+
+ >>> os.remove(filename)
+ """
+ dumper = _YAMLDumper
+
+ def __init__(self, filename, **kwargs):
+ super(_YAMLConfig, self).__init__(**kwargs)
+ self.filename = filename
+
+ def load(self):
+ if not _os_path.exists(self.filename):
+ open(self.filename, 'a').close()
+ with open(self.filename, 'r') as f:
+ data = _yaml.safe_load(f)
+ if data == None:
+ return # empty file
+ settings = dict([(s.name, s) for s in self.settings])
+ for key,value in data.iteritems():
+ setting = settings[key]
+ if isinstance(setting, _BooleanSetting):
+ v = value
+ else:
+ v = setting.convert_from_text(value)
+ self[key] = v
+
+ def save(self):
+ data = {}
+ settings = dict([(s.name, s) for s in self.settings])
+ for key,value in self.iteritems():
+ if key in settings:
+ setting = settings[key]
+ if isinstance(setting, _BooleanSetting):
+ v = value
+ else:
+ v = setting.convert_to_text(value)
+ data[key] = v
+ with open(self.filename, 'w') as f:
+ _yaml.dump(data, stream=f, Dumper=self.dumper,
+ default_flow_style=False)
+
+
+# Define HDF5- and YAML-backed subclasses of the basic _Config types.
+for name,obj in locals().items():
+ if (obj != _Config and
+ type(obj) == type and
+ issubclass(obj, _Config) and
+ not issubclass(obj, _BackedConfig)):
+ for prefix,base in [('HDF5', _HDF5Config), ('YAML', _YAMLConfig)]:
+ _name = '%s%s' % (prefix, name)
+ _bases = (base, obj)
+ _dict = {}
+ _class = type(_name, _bases, _dict)
+ setattr(_sys.modules[__name__], _name, _class)
+
+del name, obj, prefix, base, _name, _bases, _dict, _class
+
+
+def find_base_config():
+ "Return the best `_BaseConfig` match after scanning the filesystem"
+ _LOG.info('looking for base_config file')
+ user_basepath = _os_path.join(_os_path.expanduser('~'), '.pypiezorc')
+ system_basepath = _os_path.join('/etc', 'pypiezo', 'config')
+ distributed_basepath = _os_path.join('/usr', 'share', 'pypiezo', 'config')
+ for basepath in [user_basepath, system_basepath, distributed_basepath]:
+ for (extension, config) in [('.h5', HDF5_BaseConfig),
+ ('.yaml', YAML_BaseConfig)]:
+ filename = basepath + extension
+ if _os_path.exists(filename):
+ _LOG.info('base_config file found at %s' % filename)
+ base_config = config(filename)
+ base_config.load()
+ return base_config
+ else:
+ _LOG.debug('no base_config file at %s' % filename)
+ _LOG.info('new base_config file at %s' % filename)
+ basepath = user_basepath
+ filename = basepath + extension
+ return config(filename)
--- /dev/null
+# Copyright (C) 2008-2011 W. Trevor King <wking@drexel.edu>
+#
+# This file is part of pypiezo.
+#
+# pypiezo is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# pypiezo is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with pypiezo. If not, see <http://www.gnu.org/licenses/>.
+
+"""Utilities detecting the position of the sample surface.
+
+TODO: doctests
+"""
+
+SLEEP_DURING_SURF_POS_AQUISITION = False # doesn't help
+
+#from time import sleep as _sleep
+import numpy as _numpy
+from scipy.optimize import leastsq as _leastsq
+
+try:
+ import matplotlib as _matplotlib
+ import matplotlib.pyplot as _matplotlib_pyplot
+ import time as _time # for timestamping lines on plots
+except ImportError, e:
+ _matplotlib = None
+ _matplotlib_import_error = e
+
+from . import LOG as _LOG
+from . import base_config as _base_config
+from . import base as _base
+
+
+class SurfaceError (Exception):
+ pass
+
+
+class PoorFit (SurfaceError):
+ pass
+
+
+class PoorGuess (PoorFit):
+ pass
+
+
+class FlatFit (PoorFit):
+ "Raised for slope in the non-contact region, or no slope in contact region"
+ def __init__(self, left_slope, right_slope):
+ self.left_slope = left_slope
+ self.right_slope = right_slope
+ msg = 'slopes not sufficiently different: %g and %g' % (
+ left_slope, right_slope)
+ super(FlatFit, self).__init__(msg)
+
+class EdgeKink (PoorFit):
+ def __init__(self, kink, edge, window):
+ self.kink = kink
+ self.edge = edge
+ self.window = window
+ msg = 'no kink (kink %d not within %d of %d)' % (kink, window, edge)
+ super(EdgeKink, self).__init__(self, msg)
+
+
+def _linspace(*args, **kwargs):
+ dtype = kwargs.pop('dtype')
+ out = _numpy.linspace(*args, **kwargs)
+ return out.reshape((len(out), 1)).astype(dtype)
+
+def _get_min_max_positions(piezo, axis_name, min_position=None,
+ max_position=None):
+ output_axis = piezo.axis_by_name(axis_name)
+ if min_position is None:
+ min_position = _base.convert_volts_to_bits(
+ output_axis.axis_channel_config,
+ output_axis.axis_config['minimum'])
+ if max_position is None:
+ max_position = _base.convert_volts_to_bits(
+ output_axis.axis_channel_config,
+ output_axis.axis_config['maximum'])
+ return (min_position, max_position)
+
+def get_surface_position_data(piezo, axis_name, max_deflection, steps=2000,
+ frequency=10e3, min_position=None,
+ max_position=None):
+ "Measure the distance to the surface"
+ _LOG.debug('get surface position')
+ orig_position = piezo.last_output[axis_name]
+ # fully retract the piezo
+ min_position,max_position = _get_min_max_positions(
+ piezo, axis_name, min_position=min_position, max_position=max_position)
+ _LOG.debug('retract the piezo to %d' % min_position)
+ dtype = piezo.channel_dtype(axis_name, direction='output')
+ out = _linspace(orig_position, min_position, steps, dtype=dtype)
+ out = out.reshape((len(out), 1)).astype(
+ piezo.channel_dtype(axis_name, direction='output'))
+ ramp_kwargs = {
+ 'frequency': frequency,
+ 'output_names': [axis_name],
+ 'input_names': ['deflection'],
+ }
+ ret1 = piezo.named_ramp(data=out, **ramp_kwargs)
+ # locate high deflection position
+ _LOG.debug('approach until there is dangerous deflection (> %d)'
+ % max_deflection)
+ if SLEEP_DURING_SURF_POS_AQUISITION == True:
+ _sleep(.2) # sleeping briefly seems to reduce bounce?
+ mtpod = piezo.move_to_pos_or_def(
+ axis_name=axis_name, position=max_position, deflection=max_deflection,
+ step=(max_position-min_position)/steps, return_data=True)
+ high_contact_pos = piezo.last_output[axis_name]
+ # fully retract the piezo again
+ _LOG.debug('retract the piezo to %d again' % min_position)
+ if SLEEP_DURING_SURF_POS_AQUISITION == True:
+ _sleep(.2)
+ out = _linspace(high_contact_pos, min_position, steps, dtype=dtype)
+ ret2 = piezo.named_ramp(data=out, **ramp_kwargs)
+ # scan to the high contact position
+ _LOG.debug('ramp in to the deflected position %d' % high_contact_pos)
+ if SLEEP_DURING_SURF_POS_AQUISITION == True:
+ _sleep(.2)
+ out = _linspace(min_position, high_contact_pos, steps, dtype=dtype)
+ data = piezo.named_ramp(data=out, **ramp_kwargs)
+ if SLEEP_DURING_SURF_POS_AQUISITION == True:
+ _sleep(.2)
+ # return to the original position
+ _LOG.debug('return to the original position %d' % orig_position)
+ out = _linspace(high_contact_pos, orig_position, steps, dtype=dtype)
+ ret3 = piezo.named_ramp(data=out, **ramp_kwargs)
+ return {'ret1':ret1, 'mtpod':mtpod, 'ret2':ret2,
+ 'approach':data, 'ret3':ret3}
+
+def bilinear(x, params):
+ """bilinear fit for surface bumps. Model has two linear regimes
+ which meet at x=kink_position and have independent slopes.
+
+ `x` should be a `numpy.ndarray`.
+ """
+ left_offset,left_slope,kink_position,right_slope = params
+ left_mask = x < kink_position
+ right_mask = x >= kink_position # = not left_mask
+ left_y = left_offset + left_slope*x
+ right_y = (left_offset + left_slope*kink_position
+ + right_slope*(x-kink_position))
+ return left_mask * left_y + right_mask * right_y
+
+def analyze_surface_position_data(
+ ddict, min_slope_ratio=10.0, kink_window=None, return_all_params=False):
+ """
+
+ min_slope_ratio : float
+ Minimum ratio between the non-contact "left" slope and the
+ contact "right" slope.
+ kink_window : int (in bits) or None
+ Raise `EdgeKink` if the kink is within `kink_window` of the
+ minimum or maximum `z` position during the approach. If
+ `None`, a default value of 2% of the approach range is used.
+ """
+ # ususes ddict["approach"] for analysis
+ # the others are just along to be plotted
+ _LOG.debug('snalyze surface position data')
+
+ data = ddict['approach']
+ # analyze data, using bilinear model
+ # y = p0 + p1 x for x <= p2
+ # = p0 + p1 p2 + p3 (x-p2) for x >= p2
+ dump_before_index = 0 # 25 # HACK!!
+ # Generate a reasonable guess...
+ start_pos = int(data['z'].min())
+ final_pos = int(data['z'].max())
+ start_def = int(data['deflection'].min())
+ final_def = int(data['deflection'].max())
+ # start_def and start_pos are probably for 2 different points
+ _LOG.debug('min deflection %d, max deflection %d'
+ % (start_def, final_def))
+ _LOG.debug('min position %d, max position %d'
+ % (start_pos, final_pos))
+
+ left_offset = start_def
+ left_slope = 0
+ kink_position = (final_pos+start_pos)/2.0
+ right_slope = 2.0*(final_def-start_def)/(final_pos-start_pos)
+ pstart = [left_offset, left_slope, kink_position, right_slope]
+ _LOG.debug('guessed params: %s' % pstart)
+
+ offset_scale = (final_pos - start_pos)/100
+ left_slope_scale = right_slope/10
+ kink_scale = (final_pos-start_pos)/100
+ right_slope_scale = right_slope
+ scale = [offset_scale, left_slope_scale, kink_scale, right_slope_scale]
+ _LOG.debug('guessed scale: %s' % scale)
+
+ def residual(p, y, x):
+ Y = bilinear(x, p)
+ return Y-y
+ params,cov,info,mesg,ier = _leastsq(
+ residual, pstart,
+ args=(data["deflection"][dump_before_index:],
+ data["z"][dump_before_index:]),
+ full_output=True, maxfev=10000)
+ _LOG.debug('best fit parameters: %s' % (params,))
+
+ if _base_config['matplotlib']:
+ if not _matplotlib:
+ raise _matplotlib_import_error
+ figure = _matplotlib_pyplot.figure()
+ axes = figure.add_subplot(1, 1, 1)
+ axes.hold(True)
+ timestamp = _time.strftime('%H%M%S')
+ axes.set_title('surf_pos %5g %5g %5g %5g' % tuple(params))
+ def plot_dict(d, label):
+ _pylab.plot(d["z"], d["deflection"], '.',label=label)
+ for n,name in [('ret1', 'first retract'),
+ ('mtpod', 'single step in'),
+ ('ret2', 'second retract'),
+ ('approach', 'main approach'),
+ ('ret3', 'return to start')]:
+ axes.plot(ddict[n]['z'], ddict[n]['deflection'], label=name)
+ def fit_fn(x, params):
+ if x <= params[2]:
+ return params[0] + params[1]*x
+ else:
+ return (params[0] + params[1]*params[2]
+ + params[3]*(x-params[2]))
+ axes.plot([start_pos, params[2], final_pos],
+ [fit_fn(start_pos, params), fit_fn(params[2], params),
+ fit_fn(final_pos, params)], '-',label='fit')
+ #_pylab.legend(loc='best')
+ figure.show()
+
+ # check that the fit is reasonable
+ # params[1] is slope in non-contact region
+ # params[2] is kink position
+ # params[3] is slope in contact region
+ if kink_window is None:
+ kink_window = int(0.02*(final_pos-start_pos))
+
+ if abs(params[1]*min_slope_ratio) > abs(params[3]):
+ raise FlatFit(left_slope=params[1], right_slope=params[3])
+ if params[2] < start_pos+kink_window:
+ raise EdgeKink(kink=params[2], edge=start_pos, window=kink_window)
+ if params[2] > final_pos-kink_window:
+ raise EdgeKink(kink=params[2], edge=final_pos, window=kink_window)
+ _LOG.debug('surface position %s' % params[2])
+ if return_all_parameters:
+ return params
+ return params[2]
+
+def get_surface_position(piezo, axis_name, max_deflection, **kwargs):
+ ddict = get_surface_position_data(piezo, axis_name, max_deflection)
+ return analyze_surface_position_data(ddict, **kwargs)
+++ /dev/null
-# Copyright (C) 2008-2011 W. Trevor King <wking@drexel.edu>
-#
-# This file is part of pypiezo.
-#
-# pypiezo is free software; you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the
-# Free Software Foundation, either version 3 of the License, or (at your
-# option) any later version.
-#
-# pypiezo is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with pypiezo. If not, see <http://www.gnu.org/licenses/>.
-
-from pycomedi.single_aio import AO
-
-class x_piezoError (Exception) :
- pass
-
-class x_piezo :
- def __init__(self, xp_chan=1) :
- self.verbose = False
- self.xpSensitivity = 34.0 # nm/Volt
- self.gain = 20 # Vpiezo / Voutput
-
- self.AO = AO(chan=(xp_chan,))
- self.AO.close()
- #self.AO = sngAO.sngAOobj([xp_chan], -10, 10)
-
- self.curOut = [self.nm2out(0)]*1
- self.xpMax = self.nm2out(2000) # limits at 2 microns
- self.xpMin = self.nm2out(-2000) # and -2 microns
-
- self.wanderStep = self.nm2out(5) - self.nm2out(0)
- # define default value changed callbacks
- self.setExternalXPiezo = None
- def curPos(self) :
- return self.curOut[0]
- def nm2out(self, nm) :
- # nm / (nm/Vp * Vp/Vout) = Vout
- # where Vout is Volts output by the DAQ card
- # and Vp is Volts applied across the piezo
- return self.phys_to_out(nm / (self.xpSensitivity * self.gain))
- def out2nm(self, out) :
- return self.out_to_phys(out) * self.xpSensitivity * self.gain
- def out_to_phys(self, output) :
- return self.AO.comedi_to_phys(0, output)
- def phys_to_out(self, physical) :
- return self.AO.phys_to_comedi(0, physical)
- def pCurVals(self) :
- print "X piezo output : %6d" % self.curPos()
- def jumpToPos(self, pos) :
- self._check_range(pos)
- if self.verbose :
- print "Jump X piezo to %d (%g nm)" % (pos, self.out2nm(pos))
- self.curOut[0] = pos
- self.AO.open()
- #self.AO.reserve()
- self.AO.write(self.curOut)
- self.AO.close()
- #self.AO.unreserve()
- if self.setExternalXPiezo != None :
- self.setExternalXPiezo(self.out2nm(self.curPos()))
- return self.curPos()
- def wander(self) :
- """
- steps in one direction until it hits the boundary,
- then reverses and walks back the other way.
- """
- newPos = self.curPos() + self.wanderStep
- if self.verbose :
- print "wander from %d to %d" % (self.curPos(), newPos)
- try :
- self._check_range(newPos)
- except Exception :
- # reached one limit, so turn around
- self.wanderStep = -self.wanderStep
- newPos += 2*self.wanderStep
- return self.jumpToPos(newPos)
- def _check_range(self, pos) :
- if pos > self.xpMax :
- raise x_piezoError, "X piezo pos = %d > %d = max" % (pos, self.xpMax)
- if pos < self.xpMin :
- raise x_piezoError, "X piezo pos = %d < %d = min" % (pos, self.xpMin)
-
-def test() :
- print "test x_piezo()"
- xp = x_piezo()
- xp.verbose = True
- xp.jumpToPos(xp.nm2out(0))
- xp.wander()
- xp.wander()
- xp.wander()
- xp.pCurVals()
- curPos = xp.curPos()
- curPosPhys = xp.out_to_phys(curPos)
- curPos2 = xp.phys_to_out(curPosPhys)
- if curPos != curPos2 :
- raise x_piezoError, "Conversion %d -> %g -> %d not round trip" % (curPos, curPosPhys, curPos2)
- print "success"
-
-if __name__ == "__main__" :
- test()
+++ /dev/null
-# Copyright (C) 2008-2011 W. Trevor King <wking@drexel.edu>
-#
-# This file is part of pypiezo.
-#
-# pypiezo is free software; you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the
-# Free Software Foundation, either version 3 of the License, or (at your
-# option) any later version.
-#
-# pypiezo is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with pypiezo. If not, see <http://www.gnu.org/licenses/>.
-
-"""The z_piezo module controls the one-dimensional displacement of a
-piezoelectric actuator, while allowing simultaneous data aquisition on
-two channels. The module is called z_piezo, because for force spectroscopy,
-the axis that needs simultaneous aquisition is the Z axis, although this
-module could be used to provide control in any "fast" axis.
-
-There are a few globals controlling the behavior of the entire module.
-
- USE_ABCD_DEFLECTION (default False)
- selects between using a preprocessed vertical deflection signal
- and using the 'raw' 4 photodiode output segments.
- TEXT_VERBOSE (default False)
- print text messages to stderr displaying actions taken
- PYLAB_INTERACTIVE_VERBOSE (default False)
- display pylab graphs of any ramps taken
- BASE_FIG_NUM (default 50)
- the figure number for z_piezo pylab figures will be this number + {0,1,..}
- LOG_RAMPS (default False)
- save the input and output of any ramps to the LOG_DIR directory
- LOG_DIR = '${DEFAULT}/z_piezo'
- where the ramp logs are saved to
- DEFAULT_ZP_CHAN (default 0)
- output channel controlling the z piezo
- DEFAULT_ZP_MON_CHAN (default 1)
- input channel monitoring the z piezo signal
- DEFAULT_DEF_CHAN (default 0, or in ABCD mode (2,3,4,5))
- input channel(s) monitoring deflection (or some other feedback signal)
-"""
-
-USE_ABCD_DEFLECTION = False
-TEXT_VERBOSE = False
-PYLAB_INTERACTIVE_VERBOSE = False
-BASE_FIG_NUM = 50
-LOG_RAMPS = False
-LOG_DIR = '${DEFAULT}/z_piezo'
-
-# Hackish defaults for the calibcant.config module
-DEFAULT_GAIN = 20 # Vpiezo / Voutput
-DEFAULT_SENSITIVITY = 6.790 # nm_surface/Volt_piezo (for S/N 4546EV piezo from 2009/01/09 calibration on 200nm deep pits, 10 um pitch Digital Instruments P/N 498-000-026)
-DEFAULT_ZERO_PHOTODIODE_BITS = 2**15
-DEFAULT_VPHOTO_IN_2_VOLTS = lambda vbits : (vbits-DEFAULT_ZERO_PHOTODIODE_BITS)/3276.8
-DEFAULT_VZP_OUT_2_VOLTS = lambda vbits : (vbits-DEFAULT_ZERO_PHOTODIODE_BITS)/3276.8
-
-DEFAULT_ZP_CHAN = 0
-DEFAULT_ZP_MON_CHAN = 1
-DEFAULT_DEF_CHAN = 0
-if USE_ABCD_DEFLECTION :
- DEFAULT_DEF_CHAN = (2,3,4,5)
- # The 4 photodiode segments are
- # A B
- # C D
- # looking in along the laser
-
-class NoComedi (ImportError):
- "Missing comedi. Don't initialize a z_piezo instance."
- pass
-
-try:
- import pycomedi.single_aio as single_aio
- import pycomedi.simult_aio as simult_aio
- HAS_COMEDI = True
-except ImportError, e:
- HAS_COMEDI = False
-
-from numpy import array, arange, ones, zeros, linspace, uint16, float, sin, pi
-from scipy.stats import linregress
-import data_logger
-
-if PYLAB_INTERACTIVE_VERBOSE :
- from pylab import figure, plot, title, legend, hold, subplot
- import time # for timestamping lines on plots
-
-class error (Exception) :
- pass
-class outOfRange (error) :
- pass
-
-# frontend:
-
-class z_piezo :
- def __init__(self, zp_chan=DEFAULT_ZP_CHAN,
- zp_mon_chan=DEFAULT_ZP_MON_CHAN,
- def_chan=DEFAULT_DEF_CHAN) :
- if HAS_COMEDI is not True:
- raise NoComedi
- self.sensitivity = DEFAULT_SENSITIVITY
- self.gain = DEFAULT_GAIN
- self.nm2Vout = self.sensitivity * self.gain # nm_surface / V_output
- self.chan_info = _chan_info(zp_chan, zp_mon_chan, def_chan)
-
- self.curIn = array([0]*self.chan_info.numIn, dtype=uint16)
- self.curOut = array([0]*self.chan_info.numOut, dtype=uint16)
- self._jump = _z_piezo_jump(self.chan_info)
- self._ramp = _z_piezo_ramp(self.chan_info)
- self.zpMax = int(self.pos_nm2out(1000)) # limits at 1 micron
- self.zpMin = int(self.pos_nm2out(-1000)) # and -1 micron
- # define default value changed callbacks
- self.setExternalZPiezo = None
- self.setExternalZPiezoMon = None
- self.setExternalDeflection = None
- def __del__(self) :
- pass
- def curPos(self) :
- return int(self.curOut[self.chan_info.zp_ind]) # cast as int so arithmetic is signed
- def curPosMon(self) :
- return int(self.curIn[self.chan_info.zp_mon_ind])
- def curDef(self) :
- if USE_ABCD_DEFLECTION :
- A = int(self.curIn[self.chan_info.def_ind[0]])
- B = int(self.curIn[self.chan_info.def_ind[1]])
- C = int(self.curIn[self.chan_info.def_ind[2]])
- D = int(self.curIn[self.chan_info.def_ind[3]])
- df = float((A+B)-(C+D))/(A+B+C+D)
- dfout = int(df * 2**15) + 2**15
- if TEXT_VERBOSE :
- print "Current deflection %d (%d, %d, %d, %d)" \
- % (dfout, A, B, C, D)
- return dfout
- else : # reading the deflection voltage directly
- return int(self.curIn[self.chan_info.def_ind])
- def pos_out2V(self, out) :
- return self._jump.out_to_phys(out)
- def pos_V2out(self, V) :
- # Vout is Volts output by the DAQ card
- # and Vpiezo is Volts applied across the piezo
- return self._jump.phys_to_out(V)
- def pos_nm2out(self, nm) :
- # Vout is Volts output by the DAQ card
- # and Vpiezo is Volts applied across the piezo
- return self.pos_V2out(nm / self.nm2Vout)
- def pos_out2nm(self, out) :
- return self.pos_out2V(out) * self.nm2Vout
- def posMon_nm2out(self, nm) :
- return self._jump.phys_to_out(nm / self.nm2Vout)
- def posMon_out2nm(self, out) :
- return self._jump.out_to_phys(out) * self.nm2Vout
- def def_in2V(self, input) :
- return self._jump.out_to_phys(input)
- def def_V2in(self, physical) :
- print physical, type(physical) # temporary debugging printout
- return self._jump.phys_to_out(physical)
- def curVals(self) :
- return {'Z piezo output': self.curPos(),
- 'Deflection input':self.curDef(),
- 'Z piezo input':self.curPosMon()}
- def pCurVals(self, curVals=None) :
- if curVals == None :
- curVals = self.curVals()
- print "Z piezo output : %6d" % curVals["Z piezo output"]
- print "Z piezo input : %6d" % curVals["Z piezo input"]
- print "Deflection input: %6d" % curVals["Deflection input"]
- def updateInputs(self) :
- self.curIn = self._jump.updateInputs()
- if self.setExternalZPiezoMon != None :
- self.setExternalZPiezoMon(self.posMon_out2nm(self.curPosMon()))
- if self.setExternalDeflection != None :
- self.setExternalDeflection(self.def_in2V(self.curDef()))
- def jumpToPos(self, pos) :
- self._check_range(pos)
- self.curOut[self.chan_info.zp_ind] = pos
- self.curIn = self._jump.jumpToPos(self.curOut)
- if self.setExternalZPiezo != None :
- self.setExternalZPiezo(self.pos_out2nm(self.curPos()))
- if TEXT_VERBOSE :
- print "Jumped to"
- self.pCurVals()
- return self.curVals()
- def ramp(self, out, freq) :
- for pos in out :
- self._check_range(pos)
- out = self._ramp.ramp(out, freq)
- self.curOut[self.chan_info.zp_ind] = out["Z piezo output"][-1]
- self.curIn[self.chan_info.zp_mon_ind] = out["Z piezo input"][-1]
- if USE_ABCD_DEFLECTION :
- for i in range(4) : # i is the photodiode element (0->A, 1->B, ...)
- self.curIn[i] = out["Deflection segment"][i][-1]
- else :
- self.curIn[self.chan_info.def_ind] = out["Deflection input"][-1]
-
- if self.setExternalZPiezo != None :
- self.setExternalZPiezo(self.pos_out2nm(self.curPos()))
- if self.setExternalZPiezoMon != None :
- self.setExternalZPiezoMon(self.posMon_out2nm(self.curPosMon()))
- if self.setExternalDeflection != None :
- self.setExternalDeflection(self.def_in2V(self.curDef()))
- if LOG_RAMPS :
- log = data_logger.data_log(LOG_DIR, noclobber_logsubdir=False,
- log_name="ramp")
- if USE_ABCD_DEFLECTION :
- v = out.pop("Deflection segment")
- log.write_dict_of_arrays(out)
- out["Deflection segment"] = v
- else :
- log.write_dict_of_arrays(out)
- if TEXT_VERBOSE :
- print "Ramped from (%g, %g) to (%g, %g) in %d points" \
- % (out["Z piezo output"][0], out["Deflection input"][0],
- out["Z piezo output"][-1], out["Deflection input"][-1],
- len(out["Z piezo output"]))
- if PYLAB_INTERACTIVE_VERBOSE :
- figure(BASE_FIG_NUM)
- subplot(211)
- title('ramp (def_in vs zp_out)')
- timestamp = time.strftime('%H%M%S')
- plot(out["Z piezo output"], out["Deflection input"], '.', label=timestamp)
- subplot(212)
- title('zp_in vs zp_out')
- plot(out["Z piezo output"], out["Z piezo input"], '.', label=timestamp)
- legend(loc='best')
- return out
- def _check_range(self, pos) :
- if pos > self.zpMax :
- raise outOfRange, "Z piezo pos = %d > %d = max" % (pos, self.zpMax)
- if pos < self.zpMin :
- raise outOfRange, "Z piezo pos = %d < %d = min" % (pos, self.zpMin)
-
-# backends:
-
-class _z_piezo_jump :
- def __init__(self, chan_info) :
- self.chan_info = chan_info
- self.reserved = False
- self.AI = single_aio.AI(chan=self.chan_info.inChan)
- self.AI.close()
- self.AO = single_aio.AO(chan=self.chan_info.outChan)
- self.AO.close()
- def reserve(self) :
- self.AI.open()
- self.AO.open()
- self.reserved = True
- def release(self) :
- self.AI.close()
- self.AO.close()
- self.reserved = False
- def updateInputs(self) :
- if self.reserved == False :
- self.AI.open()
- #self.reserve()
- self.reserved = False
- In = self.AI.read()
- if self.reserved == False :
- self.AI.close()
- #self.release()
- return In
- def jumpToPos(self, out) :
- if self.reserved == False :
- self.reserve()
- self.reserved = False # set to true inside reserve(), which we don't want
- self.AO.write(out)
- if self.reserved == False :
- self.release()
- return self.updateInputs()
- def out_to_phys(self, output) :
- return self.AO.comedi_to_phys(0, output)
- def phys_to_out(self, physical) :
- return self.AO.phys_to_comedi(0, physical)
-
-class _z_piezo_ramp :
- def __init__(self, chan_info) :
- self.chan_info = chan_info
- self.attempts=0
- self.failures=0
- self.verbose = True
- def ramp(self, outArray, freq) :
- # allocate and run the task
- npoints = int(len(outArray)/self.chan_info.numOut)
- in_data = array([0]*npoints*self.chan_info.numIn, dtype=uint16)
- if type(outArray) != type(in_data) or outArray.dtype != uint16 :
- out_data = array([0]*npoints*self.chan_info.numOut, dtype=uint16)
- for i in range(npoints) :
- out_data[i] = outArray[i]
- else :
- out_data = outArray
- if simult_aio.DONT_OUTPUT_LAST_SAMPLE_HACK == True:
- # duplicate the last output point
- npoints += 1
- out_hack = array([0]*npoints*self.chan_info.numOut, dtype=uint16)
- for i in range(npoints-1) :
- out_hack[i] = out_data[i]
- out_hack[-1] = out_data[-1]
- out_data = out_hack
- in_data = array([0]*npoints*self.chan_info.numIn, dtype=uint16)
-
- correlated = False
- ramp={}
- while not correlated :
- AIO = simult_aio.AIO(in_chan=self.chan_info.inChan,
- out_chan=self.chan_info.outChan)
- if self.verbose :
- print "setup AIO"
- AIO.setup(freq=freq, out_buffer=out_data)
- if self.verbose :
- print "arm AIO"
- AIO.arm()
- if self.verbose :
- print "read AIO"
- AIO.start_read(in_data)
- if self.verbose :
- print "close AIO"
- AIO.close()
- if self.verbose :
- print "finished AIO"
- self.attempts += 1
- ramp["Z piezo output"] = out_data[self.chan_info.zp_ind::self.chan_info.numOut]
- ramp["Z piezo input"] = in_data[self.chan_info.zp_mon_ind::self.chan_info.numIn]
- ramp["Deflection input"] = in_data[self.chan_info.def_ind::self.chan_info.numIn]
- failed = False
- gradient, intercept, r_value, p_value, std_err = linregress(ramp["Z piezo output"],
- ramp["Z piezo input"])
- rnge = ramp["Z piezo output"].max()-ramp["Z piezo output"].min()
- if gradient < .8 and rnge > 100 :
- if PYLAB_INTERACTIVE_VERBOSE :
- figure(BASE_FIG_NUM+3)
- subplot(211)
- title('ramp (def_in vs zp_out)')
- timestamp = time.strftime('%H%M%S')
- plot(ramp["Z piezo output"], ramp["Deflection input"], '.', label=timestamp)
- subplot(212)
- title('zp_in vs zp_out')
- plot(ramp["Z piezo output"], ramp["Z piezo input"], '.', label=timestamp)
- legend(loc='best')
- print "ramp failed slope (%g, %g), try again" % (gradient, rnge)
- failed = True
- if not failed :
- if simult_aio.DONT_OUTPUT_LAST_SAMPLE_HACK == True:
- # remove the duplicated last output point
- out_data = out_data[:-self.chan_info.numOut]
- in_data = in_data[:-self.chan_info.numIn]
- ramp["Z piezo output"] = out_data[self.chan_info.zp_ind::self.chan_info.numOut]
- ramp["Z piezo input"] = in_data[self.chan_info.zp_mon_ind::self.chan_info.numIn]
- ramp["Deflection input"] = in_data[self.chan_info.def_ind::self.chan_info.numIn]
-
- if USE_ABCD_DEFLECTION :
- ramp["Deflection segment"] = []
- for c_ind in self.chan_info.def_ind :
- ramp["Deflection segment"].append(in_data[c_ind::self.chan_info.numIn])
- A = ramp["Deflection segment"][0].astype(float)
- B = ramp["Deflection segment"][1].astype(float)
- C = ramp["Deflection segment"][2].astype(float)
- D = ramp["Deflection segment"][3].astype(float)
- ramp["Deflection input"] = (((A+B)-(C+D))/(A+B+C+D) * 2**15).astype(uint16)
- ramp["Deflection input"] += 2**15 # HACK, comedi uses unsigned ints...
- else :
- ramp["Deflection input"] = in_data[self.chan_info.def_ind::self.chan_info.numIn]
- correlated = True
- if PYLAB_INTERACTIVE_VERBOSE :
- figure(BASE_FIG_NUM+1)
- timestamp = time.strftime('%H%M%S')
- plot(ramp["Z piezo output"], ramp["Z piezo input"], '.', label=timestamp)
- title('goods')
- legend(loc='best')
- else :
- self.failures += 1
- if PYLAB_INTERACTIVE_VERBOSE :
- figure(BASE_FIG_NUM+2)
- timestamp = time.strftime('%H%M%S')
- plot(ramp["Z piezo output"], ramp["Z piezo input"], '.', label=timestamp)
- legend(loc='best')
- title('bads')
-
- #AIO = simAIO.simAIOobj(npoints,
- # freq,
- # self.chan_info.inChan,
- # self.chan_info.outChan)
- #err = AIO.configure()
- #err = AIO.commit()
- #err = AIO.write(outArray)
- #err = AIO.run()
- #(err, inArray) = AIO.read()
- #if err != 0 :
- # raise error, "Error in simAIO"
- #if len(inArray)*self.chan_info.numOut != len(outArray)*self.chan_info.numIn :
- # raise error, "Input array of wrong length"
- #
- #ramp={}
- #ramp["Z piezo output"] = outArray[self.chan_info.zp_ind::self.chan_info.numOut]
- #ramp["Z piezo input"] = inArray[self.chan_info.zp_mon_ind::self.chan_info.numIn]
- #ramp["Deflection input"] = inArray[self.chan_info.def_ind::self.chan_info.numIn]
-
- # task automatically freed as AIO goes out of scope
- return ramp
-
-class _chan_info :
- def __init__(self, zp_chan, zp_mon_chan, def_chan) :
- self.inChan = []
- if USE_ABCD_DEFLECTION :
- self.def_ind = [] # index in inChan array
- i=0
- for chan in def_chan :
- self.inChan.append(chan)
- self.def_ind.append(i)
- i += 1
- else :
- self.inChan.append(def_chan)
- self.def_ind = 0 # index in inChan array
- self.inChan.append(zp_mon_chan)
- self.zp_mon_ind = zp_mon_chan
- self.numIn = len(self.inChan)
-
- self.outChan = [zp_chan]
- self.zp_ind = 0 # index in outChan array
- self.numOut = len(self.outChan)
-
-
-def test() :
- print "test z_piezo()"
- zp = z_piezo()
- zp.verbose = True
- zp.jumpToPos(zp.pos_nm2out(0))
- if zp.verbose :
- zp.pCurVals()
- curPos = zp.curPos()
- curPosNm = zp.pos_out2nm(curPos)
- curPos2 = zp.pos_nm2out(curPosNm)
- if curPos != curPos2 :
- raise error, "Conversion %d -> %g -> %d not round trip" % (curPos, curPosNm, curPos2)
- ramp_positions = linspace(zp.curPos(), zp.pos_nm2out(100), 20)
- out = zp.ramp(ramp_positions, 10000)
- ramp_positions = linspace(zp.curPos(), zp.pos_nm2out(0), 20)
- out2 = zp.ramp(ramp_positions, 10000)
-
-if __name__ == "__main__" :
- test()
+++ /dev/null
-# Copyright (C) 2008-2011 W. Trevor King <wking@drexel.edu>
-#
-# This file is part of pypiezo.
-#
-# pypiezo is free software; you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the
-# Free Software Foundation, either version 3 of the License, or (at your
-# option) any later version.
-#
-# pypiezo is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with pypiezo. If not, see <http://www.gnu.org/licenses/>.
-
-TEXT_VERBOSE = False
-PYLAB_INTERACTIVE = True
-PYLAB_VERBOSE = False
-BASE_FIG_NUM = 60
-LOG_DATA = True
-LOG_DIR = '${DEFAULT}/z_piezo_utils'
-
-NULL_STEPS_BEFORE_SINGLE_PT_SWEEP = False
-SLEEP_DURING_SURF_POS_AQUISITION = False # doesn't help
-
-from numpy import array, arange, ones, zeros, linspace, uint16, float, sin, cos, pi
-from scipy.stats import linregress
-import curses_check_for_keypress
-import scipy.optimize
-import data_logger
-import time
-
-_flush_plot = None
-_final_flush_plot = None
-_pylab = None
-def _dummy_fn(): pass
-
-def _import_pylab():
- """Import pylab plotting functions for when we need to plot. This
- function can be called multiple times, and ensures that the pylab
- setup is only imported once. It defines the functions
- _flush_plot() to be called after incremental changes,
- _final_flush_plot(), to be called at the end of any graphical processing, and
- _pylab, a reference to the pylab module."""
- global _pylab
- global _flush_plot
- global _final_flush_plot
- if _pylab == None :
- import pylab as _pylab
- if _flush_plot == None :
- if PYLAB_INTERACTIVE :
- _flush_plot = _pylab.draw
- else :
- _flush_plot = _pylab.show
- if _final_flush_plot == None :
- if PYLAB_INTERACTIVE :
- _final_flush_plot = _pylab.draw
- else :
- _final_flush_plot = _dummy_fn
-
-class error (Exception) :
- pass
-class poorFit (error) :
- pass
-class tooClose (error) :
- pass
-
-def moveToPosOrDef(zpiezo, pos, defl, step, return_data=False) :
- if return_data or LOG_DATA or PYLAB_VERBOSE :
- aquire_data = True
- else :
- aquire_data = False
- zpiezo._check_range(pos)
- if step == 0 :
- raise error, "Must have non-zero step size"
- elif step < 0 and pos > zpiezo.curPos() :
- step = -step
- elif step > 0 and pos < zpiezo.curPos() :
- step = -step
-
- zpiezo._jump.reserve()
- zpiezo.updateInputs()
- if TEXT_VERBOSE :
- print "current z_piezo position %6d, current deflection %6d <? %6d" % (zpiezo.curPos(), zpiezo.curDef(), defl)
- if aquire_data :
- def_array=[zpiezo.curDef()]
- pos_array=[zpiezo.curPos()]
- if NULL_STEPS_BEFORE_SINGLE_PT_SWEEP == True :
- # take a few null steps to let the input deflection stabalize
- for i in range(100) :
- zpiezo.jumpToPos(zpiezo.curPos())
- if aquire_data :
- def_array.append(zpiezo.curDef())
- pos_array.append(zpiezo.curPos())
- # step in until we hit our target position or exceed our target deflection
- while zpiezo.curPos() != pos and zpiezo.curDef() < defl :
- distTo = pos - zpiezo.curPos()
- if abs(distTo) < abs(step) :
- jumpTo = pos
- else :
- jumpTo = zpiezo.curPos() + step
- zpiezo.jumpToPos(jumpTo)
- if TEXT_VERBOSE :
- print "current z_piezo position %6d, current deflection %6d >=< %6d" % (zpiezo.curPos(), zpiezo.curDef(), defl)
- if aquire_data :
- def_array.append(zpiezo.curDef())
- pos_array.append(zpiezo.curPos())
- zpiezo._jump.release()
-
- if zpiezo.setExternalZPiezo != None :
- zpiezo.setExternalZPiezo(zpiezo.pos_out2nm(zpiezo.curPos()))
- if PYLAB_VERBOSE :
- _import_pylab()
- _pylab.figure(BASE_FIG_NUM)
- timestamp = time.strftime('%H%M%S')
- _pylab.plot(pos_array, def_array, '.', label=timestamp)
- _pylab.title('step approach')
- _pylab.legend(loc='best')
- _flush_plot()
- if LOG_DATA :
- log = data_logger.data_log(LOG_DIR, noclobber_logsubdir=False,
- log_name="moveToPosOrDef")
- data = {"Z piezo output":array(pos_array),
- "Deflection input":array(def_array)}
- log.write_dict_of_arrays(data)
- if return_data :
- data = {"Z piezo output":array(pos_array),
- "Deflection input":array(def_array)}
- return (zpiezo.curVals(), data)
- else :
- return zpiezo.curVals()
-
-
-def wiggleForInterferenceMin(zpiezo, wig_amp=20000, wig_freq=100,
- plotVerbose=True) :
- "Output sin to measure interference"
- if PYLAB_VERBOSE or plotVerbose:
- _import_pylab()
- npoints = 1000
- scanfreq = wig_freq*npoints
- out = zeros((npoints,), dtype=uint16)
- for i in range(npoints) :
- out[i] = int(sin(4*pi*i/npoints)*wig_amp+32000)
- # 4 for 2 periods
- if PYLAB_VERBOSE or plotVerbose:
- _pylab.figure(BASE_FIG_NUM+1)
- a = _pylab.axes()
- _pylab.title('wiggle for interference')
- _pylab.hold(False)
- p = _pylab.plot(out, out, 'b.-')
- if LOG_DATA :
- log = data_logger.data_log(LOG_DIR, noclobber_logsubdir=False,
- log_name="wiggle")
- c = curses_check_for_keypress.check_for_keypress(test_mode=False)
- while c.input() == None :
- data = zpiezo.ramp(out, scanfreq)
- dmin = data["Deflection input"].min()
- dmax = data["Deflection input"].max()
- if PYLAB_VERBOSE or plotVerbose:
- p[0].set_ydata(data["Deflection input"])
- a.set_ylim([dmin,dmax])
- _flush_plot()
- if LOG_DATA :
- log.write_dict_of_arrays(data)
- if PYLAB_VERBOSE or plotVerbose:
- _pylab.hold(True)
-
-def getSurfPosData(zpiezo, maxDefl, textVerbose=False, plotVerbose=False) :
- "Measure the distance to the surface"
- if plotVerbose and not PYLAB_VERBOSE :
- _import_pylab()
- origPos = zpiezo.curPos()
- # fully retract the piezo
- if TEXT_VERBOSE or textVerbose :
- print "\tRetract the piezo"
- out = linspace(zpiezo.curPos(), zpiezo.zpMin, 2000)
- ret1 = zpiezo.ramp(out, 10e3)
- del(out) # start with a clean output vector, seems to help reduce bounce?
- zpiezo.updateInputs() # opening output seems to cause bounce?
- # locate high deflection position
- if TEXT_VERBOSE or textVerbose :
- print "\tApproach until there is dangerous deflection (> %d)" % maxDefl
- if SLEEP_DURING_SURF_POS_AQUISITION == True :
- time.sleep(.2) # sleeping briefly seems to reduce bounce?
- cp, mtpod = moveToPosOrDef(zpiezo, zpiezo.zpMax, maxDefl,
- (zpiezo.zpMax-zpiezo.zpMin)/2000,
- return_data=True)
- highContactPos = zpiezo.curPos()
- if highContactPos <= origPos : # started out too close, we need some room to move in...
- if TEXT_VERBOSE or textVerbose :
- print "\tToo close, return to the original position: ", origPos
- out = linspace(zpiezo.curPos(), origPos, 2000)
- ret3=zpiezo.ramp(out, 10e3)
- del(out)
- zpiezo.updateInputs()
- if PYLAB_VERBOSE or plotVerbose :
- _pylab.figure(BASE_FIG_NUM+2)
- def plot_dict(d, label) :
- print d.keys()
- _pylab.plot(d["Z piezo output"], d["Deflection input"], '.',label=label)
- plot_dict(ret1, 'First retract')
- plot_dict(mtpod, 'Single step in')
- plot_dict(ret3, 'Return to start')
- _pylab.legend(loc='best')
- _pylab.title('Too close')
- raise tooClose, "Way too close!\n(highContactPos %d <= origPos %d)" % (highContactPos, origPos)
- zpiezo.updateInputs()
- # fully retract the piezo again
- if TEXT_VERBOSE or textVerbose :
- print "\tRetract the piezo again"
- if SLEEP_DURING_SURF_POS_AQUISITION == True :
- time.sleep(.2)
- out = linspace(highContactPos, zpiezo.zpMin, 2000)
- ret2 = zpiezo.ramp(out, 10e3)
- del(out)
- zpiezo.updateInputs()
- if zpiezo.curPos() != zpiezo.zpMin :
- raise Exception, "bad code"
- # scan to the high contact position
- if TEXT_VERBOSE or textVerbose :
- print "\tRamp in to the deflected position: ", highContactPos
- if SLEEP_DURING_SURF_POS_AQUISITION == True :
- time.sleep(.2)
- out = linspace(zpiezo.curPos(), highContactPos, 4000)
- data = zpiezo.ramp(out, 10e3)
- del(out)
- zpiezo.updateInputs()
- if LOG_DATA :
- log = data_logger.data_log(LOG_DIR, noclobber_logsubdir=False,
- log_name="getSurfPos")
- log.write_dict_of_arrays(data)
- if SLEEP_DURING_SURF_POS_AQUISITION == True :
- time.sleep(.2)
- # return to the original position
- if TEXT_VERBOSE or textVerbose :
- print "\tReturn to the original position: ", origPos
- out = linspace(highContactPos, origPos, 2000)
- ret3=zpiezo.ramp(out, 10e3)
- del(out)
- zpiezo.updateInputs()
- return {"ret1":ret1, "mtpod":mtpod, "ret2":ret2,
- "approach":data, "ret3":ret3}
-
-def bilinear(x, params):
- """bilinear fit for surface bumps. Model has two linear regimes
- which meet at x=kink_position and have independent slopes.
- """
- assert type(x) == type(array([0,1])), "Invalid x array: %s" % type(x)
- left_offset,left_slope,kink_position,right_slope = params
- left_mask = x < kink_position
- right_mask = x >= kink_position # = not left_mask
- left_y = left_offset + left_slope*x
- right_y = left_offset + left_slope*kink_position \
- + right_slope*(x-kink_position)
- return left_mask * left_y + right_mask * right_y
-
-def analyzeSurfPosData(ddict, textVerbose=False, plotVerbose=False, retAllParams=False) :
- # ususes ddict["approach"] for analysis
- # the others are just along to be plotted
-
- data = ddict["approach"]
- # analyze data, using bilinear model
- # y = p0 + p1 x for x <= p2
- # = p0 + p1 p2 + p3 (x-p2) for x >= p2
- if TEXT_VERBOSE or textVerbose :
- print "\tAnalyze the data"
- dump_before_index = 0 # 25 # HACK!!
- # Generate a reasonable guess...
- startPos = int(data["Z piezo output"].min())
- finalPos = int(data["Z piezo output"].max())
- startDef = int(data["Deflection input"].min())
- finalDef = int(data["Deflection input"].max())
- # startDef and startPos are probably for 2 different points
-
- left_offset = startDef
- left_slope = 0
- kink_position = (finalPos+startPos)/2.0
- right_slope = 2.0*(finalDef-startDef)/(finalPos-startPos)
- pstart = [left_offset, left_slope, kink_position, right_slope]
-
- offset_scale = (finalPos - startPos)/100
- left_slope_scale = right_slope/10
- kink_scale = (finalPos-startPos)/100
- right_slope_scale = right_slope
- scale = [offset_scale, left_slope_scale, kink_scale, right_slope_scale]
- if TEXT_VERBOSE or textVerbose :
- print "start Def %d, final def %d, start pos %d, final pos %d" % (startDef, finalDef, startPos, finalPos)
- print "Guessed params ", pstart
- def residual(p, y, x):
- Y = bilinear(x, p)
- return Y-y
- leastsq = scipy.optimize.leastsq
- params,cov,info,mesg,ier = leastsq(
- residual, pstart,
- args=(data["Deflection input"][dump_before_index:],
- data["Z piezo output"][dump_before_index:]),
- full_output=True, maxfev=10000)
- if TEXT_VERBOSE or textVerbose :
- print "Best fit parameters: ", params
-
- if PYLAB_VERBOSE or plotVerbose :
- _import_pylab()
- _pylab.figure(BASE_FIG_NUM+3)
- def plot_dict(d, label) :
- _pylab.plot(d["Z piezo output"], d["Deflection input"], '.',label=label)
- try :
- plot_dict(ddict['ret1'], 'First retract')
- plot_dict(ddict['mtpod'], 'Single step in')
- plot_dict(ddict['ret2'], 'Second retract')
- except KeyError :
- pass # if we don't have the surrounding data, don't worry
- plot_dict(data, 'Main approach')
- try :
- plot_dict(ddict['ret3'], 'Return to start')
- except KeyError :
- pass # if we don't have the surrounding data, don't worry
- def fit_fn(x, params) :
- if x <= params[2] :
- return params[0] + params[1]*x
- else :
- return params[0] + params[1]*params[2] + params[3]*(x-params[2])
- _pylab.plot([startPos, params[2], finalPos],
- [fit_fn(startPos, params), fit_fn(params[2], params), fit_fn(finalPos, params)],
- '-',label='fit')
- _pylab.title('surf_pos %5g %5g %5g %5g' % (params[0], params[1], params[2], params[3]))
- _pylab.legend(loc='best')
- _flush_plot()
-
- # check that the fit is reasonable
- # params[1] is slope in non-contact region
- # params[3] is slope in contact region
- if abs(params[1]*10) > abs(params[3]) :
- raise poorFit, "Slope in non-contact region, or no slope in contact"
- if params[2] < startPos+0.02*(finalPos-startPos) : # params[2] is kink position
- raise poorFit, "No kink (kink %g less than %g, need more space to left)" % \
- (params[2], startPos+0.02*(finalPos-startPos))
- if params[2] > finalPos-0.02*(finalPos-startPos) :
- raise poorFit, "No kink (kink %g more than %g, need more space to right)" % \
- (params[2], finalPos-0.02*(finalPos-startPos))
- if params[3] < 0.5 * right_slope : # params[3] is slope in contact region
- raise poorFit, "Too far"
- if TEXT_VERBOSE or textVerbose :
- print "\tSurface position: ", params[2]
- if retAllParams :
- return params
- return params[2]
-
-def getSurfPos(zpiezo, maxDefl, textVerbose=False, plotVerbose=False) :
- ddict = getSurfPosData(zpiezo, maxDefl, textVerbose, plotVerbose)
- return analyzeSurfPosData(ddict, textVerbose, plotVerbose)
-
-def test_smoothness(zp, plotVerbose=True) :
- posA = 20000
- posB = 50000
- setpoint = zp.def_V2in(3)
- steps = 200
- outfreq = 1e5
- outarray = linspace(posB, posA, 1000)
- indata=[]
- outdata=[]
- curVals = zp.jumpToPos(posA)
- zp.pCurVals(curVals)
- time.sleep(1) # let jitters die down
- for i in range(10) :
- print "ramp %d to %d" % (zp.curPos(), posB)
- curVals, data = moveToPosOrDef(zp, posB, setpoint, step=steps,
- return_data = True)
- indata.append(data)
- out = zp.ramp(outarray, outfreq)
- outdata.append(out)
- if plotVerbose :
- from pylab import figure, plot, title, legend, hold, subplot
- if PYLAB_VERBOSE or plotVerbose :
- _import_pylab()
- _pylab.figure(BASE_FIG_NUM+4)
- for i in range(10) :
- _pylab.plot(indata[i]['Z piezo output'],
- indata[i]['Deflection input'], '+--', label='in')
- _pylab.plot(outdata[i]['Z piezo output'],
- outdata[i]['Deflection input'], '.-', label='out')
- _pylab.title('test smoothness (step in, ramp out)')
- #_pylab.legend(loc='best')
-
-def test() :
- import z_piezo
- zp = z_piezo.z_piezo()
- curVals = zp.moveToPosOrDef(zp.pos_nm2out(600), defl=zp.curDef()+6000, step=(zp.pos_nm2out(10)-zp.pos_nm2out(0)))
- if TEXT_VERBOSE :
- zp.pCurVals(curVals)
- pos = zp.getSurfPos(maxDefl=zp.curDef()+6000)
- if TEXT_VERBOSE :
- print "Surface at %g nm", pos
- print "success"
- if PYLAB_VERBOSE and _final_flush_plot != None:
- _final_flush_plot()
Topic :: Software Development :: Libraries :: Python Modules
"""
-def find_packages(root='calibcant'):
+def find_packages(root='pypiezo'):
packages = []
prefix = '.'+os.path.sep
for dirpath,dirnames,filenames in walk(root):
_this_dir = os.path.dirname(__file__)
packages = find_packages()
-setup(name="pypiezo",
+setup(name='pypiezo',
version=__version__,
- maintainer="W. Trevor King",
- maintainer_email="wking@drexel.edu",
- url = "http://www.physics.drexel.edu/~wking/unfolding-disasters/pypiezo/",
- download_url = "http://www.physics.drexel.edu/~wking/code/python/pypiezo-%s.tar.gz" % __version__,
- license = "GNU General Public License (GPL)",
- platforms = ["all"],
- description = __doc__,
- long_description = open(os.path.join(_this_dir, 'README'), 'r').read(),
- classifiers = filter(None, classifiers.split("\n")),
- packages = packages,
- provides = ['pypiezo (%s)' % __version__],
- requires = ['pycomedi (>= 0.3)'],
+ maintainer='W. Trevor King',
+ maintainer_email='wking@drexel.edu',
+ url='http://www.physics.drexel.edu/~wking/unfolding-disasters/pypiezo/',
+ download_url='http://www.physics.drexel.edu/~wking/code/python/pypiezo-%s.tar.gz' % __version__,
+ license='GNU General Public License (GPL)',
+ platforms=['all'],
+ description=__doc__,
+ long_description=open(os.path.join(_this_dir, 'README'), 'r').read(),
+ classifiers=filter(None, classifiers.split('\n')),
+ packages=packages,
+ provides=['pypiezo (%s)' % __version__],
+ requires=['pycomedi (>= 0.3)'],
)