import numpy
import scipy
-from ..command import Command, Argument, Failure, NullQueue
+from ..command import Argument, Failure, NullQueue
from ..config import Setting
from ..curve import Data
from ..plugin import Plugin
from ..util.fit import PoorFit, ModelFitter
from ..util.si import join_data_label, split_data_label
-from .curve import CurveArgument
-
-
-def scale(hooke, curve, block=None):
- """Run 'add block force array' on `block`.
-
- Runs 'zero block surface contact point' first, if necessary. Does
- not run either command if the columns they add to the block are
- already present.
-
- If `block` is `None`, scale all blocks in `curve`.
- """
- commands = hooke.commands
- contact = [c for c in hooke.commands
- if c.name == 'zero block surface contact point'][0]
- force = [c for c in hooke.commands if c.name == 'add block force array'][0]
- cant_adjust = [c for c in hooke.commands
- if c.name == 'add block cantilever adjusted extension array'][0]
- inqueue = None
- outqueue = NullQueue()
- if block == None:
- for i in range(len(curve.data)):
- scale(hooke, curve, block=i)
- else:
- params = {'curve':curve, 'block':block}
- b = curve.data[block]
- if ('surface distance (m)' not in b.info['columns']
- or 'surface deflection (m)' not in b.info['columns']):
- try:
- contact.run(hooke, inqueue, outqueue, **params)
- except PoorFit, e:
- raise PoorFit('Could not fit %s %s: %s'
- % (curve.path, block, str(e)))
- if ('deflection (N)' not in b.info['columns']):
- force.run(hooke, inqueue, outqueue, **params)
- if ('cantilever adjusted extension (m)' not in b.info['columns']):
- cant_adjust.run(hooke, inqueue, outqueue, **params)
- return curve
+from .curve import ColumnAddingCommand
class SurfacePositionModel (ModelFitter):
]
-class SurfaceContactCommand (Command):
+class SurfaceContactCommand (ColumnAddingCommand):
"""Automatically determine a block's surface contact point.
You can select the contact point algorithm with the creatively
"""
def __init__(self, plugin):
super(SurfaceContactCommand, self).__init__(
- name='zero block surface contact point',
- arguments=[
- CurveArgument,
- Argument(name='block', aliases=['set'], type='int', default=0,
- help="""
-Data block for which the force should be calculated. For an
-approach/retract force curve, `0` selects the approaching curve and `1`
-selects the retracting curve.
-""".strip()),
- Argument(name='ignore index', type='int', default=None,
- help="""
-Ignore the residual from the non-contact region before the indexed
-point (for the `wtk` algorithm).
-""".strip()),
- Argument(name='input distance column', type='string',
- default='z piezo (m)',
- help="""
+ name='zero surface contact point',
+ columns=[
+ ('distance column', 'z piezo (m)', """
Name of the column to use as the surface position input.
""".strip()),
- Argument(name='input deflection column', type='string',
- default='deflection (m)',
- help="""
+ ('deflection column', 'deflection (m)', """
Name of the column to use as the deflection input.
""".strip()),
- Argument(name='output distance column', type='string',
- default='surface distance',
- help="""
+ ],
+ new_columns=[
+ ('output distance column', 'surface distance', """
Name of the column (without units) to use as the surface position output.
""".strip()),
- Argument(name='output deflection column', type='string',
- default='surface deflection',
- help="""
+ ('output deflection column', 'surface deflection', """
Name of the column (without units) to use as the deflection output.
+""".strip()),
+ ],
+ arguments=[
+ Argument(name='ignore index', type='int', default=None,
+ help="""
+Ignore the residual from the non-contact region before the indexed
+point (for the `wtk` algorithm).
+""".strip()),
+ Argument(name='ignore after last peak info name',
+ type='string', default=None,
+ help="""
+As an alternative to 'ignore index', ignore after the last peak in the
+peak list stored in the `.info` dictionary.
""".strip()),
Argument(name='distance info name', type='string',
default='surface distance offset',
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
- # HACK? rely on params['curve'] being bound to the local hooke
- # playlist (i.e. not a copy, as you would get by passing a
- # curve through the queue). Ugh. Stupid queues. As an
- # alternative, we could pass lookup information through the
- # queue...
- new = Data((data.shape[0], data.shape[1]+2), dtype=data.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-2] = data
- name,dist_units = split_data_label(params['input distance column'])
- name,def_units = split_data_label(params['input deflection column'])
- new.info['columns'].extend([
- join_data_label(params['output distance column'], dist_units),
- join_data_label(params['output deflection column'], def_units),
- ])
- dist_data = data[:,data.info['columns'].index(
- params['input distance column'])]
- def_data = data[:,data.info['columns'].index(
- params['input deflection column'])]
+ params = self.__setup_params(hooke=hooke, params=params)
+ block = self._block(hooke=hooke, params=params)
+ dist_data = self._get_column(hooke=hooke, params=params,
+ column_name='distance column')
+ def_data = self._get_column(hooke=hooke, params=params,
+ column_name='deflection column')
i,def_offset,ps = self.find_contact_point(
params, dist_data, def_data, outqueue)
dist_offset = dist_data[i]
- new.info[join_data_label(params['distance info name'], dist_units
- )] = dist_offset
- new.info[join_data_label(params['deflection info name'], def_units
- )] = def_offset
- new.info[params['fit parameters info name']] = ps
- new[:,-2] = dist_data - dist_offset
- new[:,-1] = def_data - def_offset
- params['curve'].data[params['block']] = new
+ block.info[params['distance info name']] = dist_offset
+ block.info[params['deflection info name']] = def_offset
+ block.info[params['fit parameters info name']] = ps
+ self._set_column(hooke=hooke, params=params,
+ column_name='output distance column',
+ values=dist_data - dist_offset)
+ self._set_column(hooke=hooke, params=params,
+ column_name='output deflection column',
+ values=def_data - def_offset)
+
+ def __setup_params(self, hooke, params):
+ name,dist_unit = split_data_label(params['distance column'])
+ name,def_unit = split_data_label(params['deflection column'])
+ params['output distance column'] = join_data_label(
+ params['output distance column'], dist_unit)
+ params['output deflection column'] = join_data_label(
+ params['output deflection column'], def_unit)
+ params['distance info name'] = join_data_label(
+ params['distance info name'], dist_unit)
+ params['deflection info name'] = join_data_label(
+ params['deflection info name'], def_unit)
+ return params
def find_contact_point(self, params, z_data, d_data, outqueue=None):
"""Railyard for the `find_contact_point_*` family.
s = SurfacePositionModel(d_data, info={
'force zero non-contact slope':True},
rescale=True)
+ ignore_index = None
if params['ignore index'] != None:
- s.info['ignore non-contact before index'] = params['ignore index']
+ ignore_index = params['ignore index']
+ elif params['ignore after last peak info name'] != None:
+ peaks = z_data.info[params['ignore after last peak info name']]
+ if not len(peaks) > 0:
+ raise Failure('Need at least one peak in %s, not %s'
+ % (params['ignore after last peak info name'],
+ peaks))
+ ignore_index = peaks[-1].post_index()
+ if ignore_index != None:
+ s.info['ignore non-contact before index'] = ignore_index
offset,contact_slope,surface_index,non_contact_slope = s.fit(
outqueue=outqueue)
info = {
return (numpy.round(surface_index), deflection_offset, info)
-class ForceCommand (Command):
+class ForceCommand (ColumnAddingCommand):
"""Convert a deflection column from meters to newtons.
"""
def __init__(self, plugin):
super(ForceCommand, self).__init__(
- name='add block force array',
- arguments=[
- CurveArgument,
- Argument(name='block', aliases=['set'], type='int', default=0,
- help="""
-Data block for which the force should be calculated. For an
-approach/retract force curve, `0` selects the approaching curve and `1`
-selects the retracting curve.
-""".strip()),
- Argument(name='input deflection column', type='string',
- default='surface deflection (m)',
- help="""
+ name='convert distance to force',
+ columns=[
+ ('deflection column', 'surface deflection (m)', """
Name of the column to use as the deflection input.
""".strip()),
- Argument(name='output deflection column', type='string',
- default='deflection',
- help="""
+ ],
+ new_columns=[
+ ('output deflection column', 'deflection', """
Name of the column (without units) to use as the deflection output.
""".strip()),
+ ],
+ arguments=[
Argument(name='spring constant info name', type='string',
default='spring constant (N/m)',
help="""
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
- # HACK? rely on params['curve'] being bound to the local hooke
- # playlist (i.e. not a copy, as you would get by passing a
- # curve through the queue). Ugh. Stupid queues. As an
- # alternative, we could pass lookup information through the
- # queue.
- new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-1] = data
- new.info['columns'].append(
- join_data_label(params['output deflection column'], 'N'))
- d_data = data[:,data.info['columns'].index(
- params['input deflection column'])]
- new[:,-1] = d_data * data.info[params['spring constant info name']]
- params['curve'].data[params['block']] = new
-
-
-class CantileverAdjustedExtensionCommand (Command):
+ params = self.__setup_params(hooke=hooke, params=params)
+ def_data = self._get_column(hooke=hooke, params=params,
+ column_name='deflection column')
+ out = def_data * def_data.info[params['spring constant info name']]
+ self._set_column(hooke=hooke, params=params,
+ column_name='output deflection column',
+ values=out)
+
+ def __setup_params(self, hooke, params):
+ name,in_unit = split_data_label(params['deflection column'])
+ out_unit = 'N' # HACK: extract target units from k_unit.
+ params['output deflection column'] = join_data_label(
+ params['output deflection column'], out_unit)
+ name,k_unit = split_data_label(params['spring constant info name'])
+ expected_k_unit = '%s/%s' % (out_unit, in_unit)
+ if k_unit != expected_k_unit:
+ raise Failure('Cannot convert from %s to %s with %s'
+ % (params['deflection column'],
+ params['output deflection column'],
+ params['spring constant info name']))
+ return params
+
+
+class CantileverAdjustedExtensionCommand (ColumnAddingCommand):
"""Remove cantilever extension from a total extension column.
+
+ If `distance column` and `deflection column` have the same units
+ (e.g. `z piezo (m)` and `deflection (m)`), `spring constant info
+ name` is ignored and a deflection/distance conversion factor of
+ one is used.
"""
def __init__(self, plugin):
super(CantileverAdjustedExtensionCommand, self).__init__(
- name='add block cantilever adjusted extension array',
- arguments=[
- CurveArgument,
- Argument(name='block', aliases=['set'], type='int', default=0,
- help="""
-Data block for which the adjusted extension should be calculated. For
-an approach/retract force curve, `0` selects the approaching curve and
-`1` selects the retracting curve.
-""".strip()),
- Argument(name='input distance column', type='string',
- default='surface distance (m)',
- help="""
-Name of the column to use as the distance input.
+ name='remove cantilever from extension',
+ columns=[
+ ('distance column', 'surface distance (m)', """
+Name of the column to use as the surface position input.
""".strip()),
- Argument(name='input deflection column', type='string',
- default='deflection (N)',
- help="""
+ ('deflection column', 'deflection (N)', """
Name of the column to use as the deflection input.
""".strip()),
- Argument(name='output distance column', type='string',
- default='cantilever adjusted extension',
- help="""
-Name of the column (without units) to use as the deflection output.
+ ],
+ new_columns=[
+ ('output distance column', 'cantilever adjusted extension', """
+Name of the column (without units) to use as the surface position output.
""".strip()),
+ ],
+ arguments=[
Argument(name='spring constant info name', type='string',
default='spring constant (N/m)',
help="""
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
- # HACK? rely on params['curve'] being bound to the local hooke
- # playlist (i.e. not a copy, as you would get by passing a
- # curve through the queue). Ugh. Stupid queues. As an
- # alternative, we could pass lookup information through the
- # queue.
- new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-1] = data
- new.info['columns'].append(
- join_data_label(params['output distance column'], 'm'))
- z_data = data[:,data.info['columns'].index(
- params['input distance column'])]
- d_data = data[:,data.info['columns'].index(
- params['input deflection column'])]
- k = data.info[params['spring constant info name']]
-
- z_name,z_unit = split_data_label(params['input distance column'])
- assert z_unit == 'm', params['input distance column']
- d_name,d_unit = split_data_label(params['input deflection column'])
- assert d_unit == 'N', params['input deflection column']
- k_name,k_unit = split_data_label(params['spring constant info name'])
- assert k_unit == 'N/m', params['spring constant info name']
-
- new[:,-1] = z_data - d_data / k
- params['curve'].data[params['block']] = new
-
-
-class FlattenCommand (Command):
+ params = self.__setup_params(hooke=hooke, params=params)
+ def_data = self._get_column(hooke=hooke, params=params,
+ column_name='deflection column')
+ dist_data = self._get_column(hooke=hooke, params=params,
+ column_name='distance column')
+ if params['spring constant info name'] == None:
+ k = 1.0 # distance and deflection in the same units
+ else:
+ k = def_data.info[params['spring constant info name']]
+ self._set_column(hooke=hooke, params=params,
+ column_name='output distance column',
+ values=dist_data - def_data / k)
+
+ def __setup_params(self, hooke, params):
+ name,dist_unit = split_data_label(params['distance column'])
+ name,def_unit = split_data_label(params['deflection column'])
+ params['output distance column'] = join_data_label(
+ params['output distance column'], dist_unit)
+ if dist_unit == def_unit:
+ params['spring constant info name'] == None
+ else:
+ name,k_unit = split_data_label(params['spring constant info name'])
+ expected_k_unit = '%s/%s' % (def_unit, dist_unit)
+ if k_unit != expected_k_unit:
+ raise Failure('Cannot convert from %s to %s with %s'
+ % (params['deflection column'],
+ params['output distance column'],
+ params['spring constant info name']))
+ return params
+
+
+class FlattenCommand (ColumnAddingCommand):
"""Flatten a deflection column.
Subtracts a polynomial fit from the non-contact part of the curve
"""
def __init__(self, plugin):
super(FlattenCommand, self).__init__(
- name='add flattened extension array',
- arguments=[
- CurveArgument,
- Argument(name='block', aliases=['set'], type='int', default=0,
- help="""
-Data block for which the adjusted extension should be calculated. For
-an approach/retract force curve, `0` selects the approaching curve and
-`1` selects the retracting curve.
-""".strip()),
- Argument(name='max degree', type='int',
- default=1,
- help="""
-Highest order polynomial to consider for flattening. Using values
-greater than one usually doesn't help and can give artifacts.
-However, it could be useful too. (TODO: Back this up with some
-theory...)
-""".strip()),
- Argument(name='input distance column', type='string',
- default='surface distance (m)',
- help="""
-Name of the column to use as the distance input.
+ name='polynomial flatten',
+ columns=[
+ ('distance column', 'surface distance (m)', """
+Name of the column to use as the surface position input.
""".strip()),
- Argument(name='input deflection column', type='string',
- default='deflection (N)',
- help="""
+ ('deflection column', 'deflection (N)', """
Name of the column to use as the deflection input.
""".strip()),
- Argument(name='output deflection column', type='string',
- default='flattened deflection',
- help="""
+ ],
+ new_columns=[
+ ('output deflection column', 'flattened deflection', """
Name of the column (without units) to use as the deflection output.
+""".strip()),
+ ],
+ arguments=[
+ Argument(name='degree', type='int',
+ default=1,
+ help="""
+Order of the polynomial used for flattening. Using values greater
+than one usually doesn't help and can give artifacts. However, it
+could be useful too. (TODO: Back this up with some theory...)
""".strip()),
Argument(name='fit info name', type='string',
default='flatten fit',
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
- # HACK? rely on params['curve'] being bound to the local hooke
- # playlist (i.e. not a copy, as you would get by passing a
- # curve through the queue). Ugh. Stupid queues. As an
- # alternative, we could pass lookup information through the
- # queue.
- new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-1] = data
- z_data = data[:,data.info['columns'].index(
- params['input distance column'])]
- d_data = data[:,data.info['columns'].index(
- params['input deflection column'])]
-
- d_name,d_unit = split_data_label(params['input deflection column'])
- new.info['columns'].append(
- join_data_label(params['output deflection column'], d_unit))
-
- contact_index = numpy.absolute(z_data).argmin()
- mask = z_data > 0
+ params = self.__setup_params(hooke=hooke, params=params)
+ block = self._block(hooke=hooke, params=params)
+ dist_data = self._get_column(hooke=hooke, params=params,
+ column_name='distance column')
+ def_data = self._get_column(hooke=hooke, params=params,
+ column_name='deflection column')
+ degree = params['degree']
+ mask = dist_data > 0
indices = numpy.argwhere(mask)
- z_nc = z_data[indices].flatten()
- d_nc = d_data[indices].flatten()
-
- min_err = numpy.inf
- degree = poly_values = None
- log = logging.getLogger('hooke')
- for deg in range(params['max degree']):
- try:
- pv = scipy.polyfit(z_nc, d_nc, deg)
- df = scipy.polyval(pv, z_nc)
- err = numpy.sqrt((df-d_nc)**2).sum()
- except Exception,e:
- log.warn('failed to flatten with a degree %d polynomial: %s'
- % (deg, e))
- continue
- if err < min_err: # new best fit
- min_err = err
- degree = deg
- poly_values = pv
-
- if degree == None:
- raise Failure('failed to flatten with all degrees')
- new.info[params['fit info name']] = {
- 'error':min_err/len(z_nc),
+ if len(indices) == 0:
+ raise Failure('no positive distance values in %s'
+ % params['distance column'])
+ dist_nc = dist_data[indices].flatten()
+ def_nc = def_data[indices].flatten()
+
+ try:
+ poly_values = scipy.polyfit(dist_nc, def_nc, degree)
+ def_nc_fit = scipy.polyval(poly_values, dist_nc)
+ except Exception, e:
+ raise Failure('failed to flatten with a degree %d polynomial: %s'
+ % (degree, e))
+ error = numpy.sqrt((def_nc_fit-def_nc)**2).sum() / len(def_nc)
+ block.info[params['fit info name']] = {
+ 'error':error,
'degree':degree,
- 'max degree':params['max degree'],
'polynomial values':poly_values,
}
- new[:,-1] = d_data - mask*scipy.polyval(poly_values, z_data)
- params['curve'].data[params['block']] = new
+ out = def_data - mask*scipy.polyval(poly_values, dist_data)
+ self._set_column(hooke=hooke, params=params,
+ column_name='output deflection column',
+ values=out)
+
+ def __setup_params(self, hooke, params):
+ d_name,d_unit = split_data_label(params['deflection column'])
+ params['output deflection column'] = join_data_label(
+ params['output deflection column'], d_unit)
+ return params