"""
import copy
+import logging
import numpy
import scipy
-from ..command import Command, Argument, Failure, NullQueue
+from ..command import Argument, Failure, NullQueue
from ..config import Setting
from ..curve import Data
-from ..plugin import Plugin
from ..util.fit import PoorFit, ModelFitter
-from .curve import CurveArgument
-
-
-def scale(hooke, curve):
- commands = hooke.commands
- contact = [c for c in hooke.commands
- if c.name == 'zero block surface contact point'][0]
- force = [c for c in hooke.commands if c.name == 'add block force array'][0]
- inqueue = None
- outqueue = NullQueue()
- for i,block in enumerate(curve.data):
- numpy.savetxt(open('curve.dat', 'w'), block, delimiter='\t')
- params = {'curve':curve, 'block':i}
- try:
- contact._run(hooke, inqueue, outqueue, params)
- except PoorFit, e:
- raise PoorFit('Could not fit %s %s: %s'
- % (curve.path, i, str(e)))
- force._run(hooke, inqueue, outqueue, params)
- return curve
+from ..util.si import join_data_label, split_data_label
+from . import Plugin
+from .curve import ColumnAddingCommand
+
class SurfacePositionModel (ModelFitter):
- """
+ """Bilinear surface position model.
The bilinear model is symmetric, but the parameter guessing and
sanity checks assume the contact region occurs for lower indicies
In order for this model to produce a satisfactory fit, there
should be enough data in the off-surface region that interactions
due to proteins, etc. will not seriously skew the fit in the
- off-surface region.
-
-
- We guess
+ off-surface region. If you don't have much of a tail, you can set
+ the `info` dict's `ignore non-contact before index` parameter to
+ the index of the last surface- or protein-related feature.
"""
+ _fit_check_arguments = [
+ Argument('min slope ratio', type='float', default=10.,
+ help="""
+Minimum `contact-slope/non-contact-slope` ratio for a "good" fit.
+""".strip()),
+ Argument('min contact fraction', type='float', default=0.02,
+ help="""
+Minimum fraction of points in the contact region for a "good" fit.
+""".strip()),
+ Argument('max contact fraction', type='float', default=0.98,
+ help="""
+Maximum fraction of points in the contact region for a "good" fit.
+""".strip()),
+ Argument('min slope guess ratio', type='float', default=0.5,
+ help="""
+Minimum `fit-contact-slope/guessed-contact-slope` ratio for a "good" fit.
+""".strip()),
+ ]
+
def model(self, params):
"""A continuous, bilinear model.
:math:`p_3` is the slope of the second region.
"""
p = params # convenient alias
- if self.info.get('force zero non-contact slope', None) == True:
+ rNC_ignore = self.info['ignore non-contact before index']
+ if self.info['force zero non-contact slope'] == True:
p = list(p)
p.append(0.) # restore the non-contact slope parameter
r2 = numpy.round(abs(p[2]))
if r2 >= 1:
+ r2 = min(r2, len(self._model_data))
self._model_data[:r2] = p[0] + p[1] * numpy.arange(r2)
if r2 < len(self._data)-1:
self._model_data[r2:] = \
p[0] + p[1]*p[2] + p[3] * numpy.arange(len(self._data)-r2)
+ if r2 < rNC_ignore:
+ self._model_data[r2:rNC_ignore] = self._data[r2:rNC_ignore]
return self._model_data
- def set_data(self, data, info=None):
- super(SurfacePositionModel, self).set_data(data, info)
+ def set_data(self, data, info=None, *args, **kwargs):
+ super(SurfacePositionModel, self).set_data(data, info, *args, **kwargs)
if info == None:
info = {}
- self.info = info
- self.info['min position'] = 0
- self.info['max position'] = len(data)
- self.info['max deflection'] = data.max()
- self.info['min deflection'] = data.min()
- self.info['position range'] = self.info['max position'] - self.info['min position']
- self.info['deflection range'] = self.info['max deflection'] - self.info['min deflection']
+ if getattr(self, 'info', None) == None:
+ self.info = {}
+ self.info.update(info)
+ for key,value in [
+ ('force zero non-contact slope', False),
+ ('ignore non-contact before index', -1),
+ ('min position', 0), # Store postions etc. to avoid recalculating.
+ ('max position', len(data)),
+ ('max deflection', data.max()),
+ ('min deflection', data.min()),
+ ]:
+ if key not in self.info:
+ self.info[key] = value
+ for key,value in [
+ ('position range',
+ self.info['max position'] - self.info['min position']),
+ ('deflection range',
+ self.info['max deflection'] - self.info['min deflection']),
+ ]:
+ if key not in self.info:
+ self.info[key] = value
+ for argument in self._fit_check_arguments:
+ if argument.name not in self.info:
+ self.info[argument.name] = argument.default
def guess_initial_params(self, outqueue=None):
"""Guess the initial parameters.
Notes
-----
We guess initial parameters such that the offset (:math:`p_1`)
- matches the minimum deflection, the kink (:math:`p_2`) occurs in
- the middle of the data, the initial (contact) slope (:math:`p_0`)
- produces the maximum deflection at the left-most point, and the
- final (non-contact) slope (:math:`p_3`) is zero.
+ matches the minimum deflection, the kink (:math:`p_2`) occurs
+ at the first point that the deflection crosses the middle of
+ its range, the initial (contact) slope (:math:`p_0`) produces
+ the right-most deflection at the kink point, and the final
+ (non-contact) slope (:math:`p_3`) is zero.
+
+ In the event of a tie, :meth:`argmax` returns the lowest index
+ to the maximum value.
+ >>> (numpy.arange(10) >= 5).argmax()
+ 5
"""
left_offset = self.info['min deflection']
- left_slope = 2*(self.info['deflection range']
- /self.info['position range'])
- kink_position = (self.info['max position']
- +self.info['min position'])/2.0
+ middle_deflection = (self.info['min deflection']
+ + self.info['deflection range']/2.)
+ kink_position = 2*(self._data > middle_deflection).argmax()
+ if kink_position == 0:
+ # jump vibration at the start of the retraction?
+ start = int(min(max(20, 0.01 * len(self._data)), 0.5*len(self._data)))
+ std = self._data[:start].std()
+ left_offset = self._data[start].mean()
+ stop = start
+ while abs(self._data[stop] - left_offset) < 3*std:
+ stop += 1
+ left_slope = (self._data[stop-start:stop].mean()
+ - left_offset) / (stop-start)
+ left_offset -= left_slope * start/2
+ kink_position = (self._data[-1] - left_offset)/left_slope
+ else:
+ left_slope = (self._data[-1] - self.info['min deflection']
+ )/kink_position
right_slope = 0
self.info['guessed contact slope'] = left_slope
params = [left_offset, left_slope, kink_position, right_slope]
- if self.info.get('force zero non-contact slope', None) == True:
+ if self.info['force zero non-contact slope'] == True:
params = params[:-1]
return params
- def guess_scale(self, params, outqueue=None):
- """Guess the parameter scales.
+ def fit(self, *args, **kwargs):
+ """Fit the model to the data.
Notes
-----
-
- We guess offset scale (:math:`p_0`) as one tenth of the total
- deflection range, the kink scale (:math:`p_2`) as one tenth of
- the total index range, the initial (contact) slope scale
- (:math:`p_1`) as one tenth of the contact slope estimation,
- and the final (non-contact) slope scale (:math:`p_3`) is as
- one tenth of the initial slope scale.
+ We change the `epsfcn` default from :func:`scipy.optimize.leastsq`'s
+ `0` to `1e-3`, so the initial Jacobian estimate takes larger steps,
+ which helps avoid being trapped in noise-generated local minima.
"""
- offset_scale = self.info['deflection range']/10.
- left_slope_scale = abs(params[1])/10.
- kink_scale = self.info['position range']/10.
- right_slope_scale = left_slope_scale/10.
- scale = [offset_scale, left_slope_scale, kink_scale, right_slope_scale]
- if self.info.get('force zero non-contact slope', None) == True:
- scale = scale[:-1]
- return scale
-
- def fit(self, *args, **kwargs):
self.info['guessed contact slope'] = None
+ if 'epsfcn' not in kwargs:
+ kwargs['epsfcn'] = 1e-3 # take big steps to estimate the Jacobian
params = super(SurfacePositionModel, self).fit(*args, **kwargs)
params[2] = abs(params[2])
- if self.info.get('force zero non-contact slope', None) == True:
+ if self.info['force zero non-contact slope'] == True:
params = list(params)
params.append(0.) # restore the non-contact slope parameter
# check that the fit is reasonable, see the :meth:`model` docstring
- # for parameter descriptions. HACK: hardcoded cutoffs.
- if abs(params[3]*10) > abs(params[1]) :
- raise PoorFit('Slope in non-contact region, or no slope in contact')
- if params[2] < self.info['min position']+0.02*self.info['position range']:
+ # for parameter descriptions.
+ slope_ratio = abs(params[1]/params[3])
+ if slope_ratio < self.info['min slope ratio']:
+ raise PoorFit(
+ 'Slope in non-contact region, or no slope in contact (slope ratio %g less than %g)'
+ % (slope_ratio, self.info['min slope ratio']))
+ contact_fraction = ((params[2]-self.info['min position'])
+ /self.info['position range'])
+ if contact_fraction < self.info['min contact fraction']:
+ raise PoorFit(
+ 'No kink (contact fraction %g less than %g)'
+ % (contact_fraction, self.info['min contact fraction']))
+ if contact_fraction > self.info['max contact fraction']:
raise PoorFit(
- 'No kink (kink %g less than %g, need more space to left)'
- % (params[2],
- self.info['min position']+0.02*self.info['position range']))
- if params[2] > self.info['max position']-0.02*self.info['position range']:
- raise poorFit(
- 'No kink (kink %g more than %g, need more space to right)'
- % (params[2],
- self.info['max position']-0.02*self.info['position range']))
+ 'No kink (contact fraction %g greater than %g)'
+ % (contact_fraction, self.info['max contact fraction']))
+ slope_guess_ratio = abs(params[1]/self.info['guessed contact slope'])
if (self.info['guessed contact slope'] != None
- and abs(params[1]) < 0.5 * abs(self.info['guessed contact slope'])):
- raise PoorFit('Too far (contact slope %g, but expected ~%g'
- % (params[3], self.info['guessed contact slope']))
+ and slope_guess_ratio < self.info['min slope guess ratio']):
+ raise PoorFit(
+ 'Too far (contact slope off guess by %g less than %g)'
+ % (slope_guess_ratio, self.info['min slope guess ratio']))
return params
+
class VelocityClampPlugin (Plugin):
def __init__(self):
super(VelocityClampPlugin, self).__init__(name='vclamp')
self._commands = [
SurfaceContactCommand(self), ForceCommand(self),
+ CantileverAdjustedExtensionCommand(self), FlattenCommand(self),
]
def default_settings(self):
]
-class SurfaceContactCommand (Command):
+class SurfaceContactCommand (ColumnAddingCommand):
"""Automatically determine a block's surface contact point.
- Uses the block's `z piezo (m)` and `deflection (m)` arrays.
- Stores the contact parameters in `block.info`'s `surface distance
- offset (m)` and `surface deflection offset (m)`. Model-specific
- fitting information is stored in `surface detection parameters`.
-
- The adjusted data columns `surface distance (m)` and `surface
- adjusted deflection (m)` are also added to the block.
-
You can select the contact point algorithm with the creatively
named `surface contact point algorithm` configuration setting.
Currently available options are:
* wtk (:meth:`find_contact_point_wtk`)
"""
def __init__(self, plugin):
+ self._wtk_fit_check_arguments = []
+ for argument in SurfacePositionModel._fit_check_arguments:
+ arg = copy.deepcopy(argument)
+ arg._help += ' (wtk model)'
+ self._wtk_fit_check_arguments.append(arg)
super(SurfaceContactCommand, self).__init__(
- name='zero block surface contact point',
+ name='zero surface contact point',
+ columns=[
+ ('distance column', 'z piezo (m)', """
+Name of the column to use as the surface position input.
+""".strip()),
+ ('deflection column', 'deflection (m)', """
+Name of the column to use as the deflection input.
+""".strip()),
+ ],
+ new_columns=[
+ ('output distance column', 'surface distance', """
+Name of the column (without units) to use as the surface position output.
+""".strip()),
+ ('output deflection column', 'surface deflection', """
+Name of the column (without units) to use as the deflection output.
+""".strip()),
+ ],
arguments=[
- CurveArgument,
- Argument(name='block', aliases=['set'], type='int', default=0,
+ Argument(name='ignore index', type='int', default=None,
help="""
-Data block for which the force should be calculated. For an
-approach/retract force curve, `0` selects the approaching curve and `1`
-selects the retracting curve.
+Ignore the residual from the non-contact region before the indexed
+point (for the `wtk` algorithm).
""".strip()),
- ],
+ Argument(name='ignore after last peak info name',
+ type='string', default=None,
+ help="""
+As an alternative to 'ignore index', ignore after the last peak in the
+peak list stored in the `.info` dictionary.
+""".strip()),
+ Argument(name='distance info name', type='string',
+ default='surface distance offset',
+ help="""
+Name (without units) for storing the distance offset in the `.info` dictionary.
+""".strip()),
+ Argument(name='deflection info name', type='string',
+ default='surface deflection offset',
+ help="""
+Name (without units) for storing the deflection offset in the `.info` dictionary.
+""".strip()),
+ Argument(name='fit parameters info name', type='string',
+ default='surface deflection offset',
+ help="""
+Name (without units) for storing fit parameters in the `.info` dictionary.
+""".strip()),
+ ] + self._wtk_fit_check_arguments,
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
- # HACK? rely on params['curve'] being bound to the local hooke
- # playlist (i.e. not a copy, as you would get by passing a
- # curve through the queue). Ugh. Stupid queues. As an
- # alternative, we could pass lookup information through the
- # queue...
- new = Data((data.shape[0], data.shape[1]+2), dtype=data.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-2] = data
- new.info['columns'].extend(
- ['surface distance (m)', 'surface adjusted deflection (m)'])
- z_data = data[:,data.info['columns'].index('z piezo (m)')]
- d_data = data[:,data.info['columns'].index('deflection (m)')]
- i,deflection_offset,ps = self.find_contact_point(
- params['curve'], z_data, d_data, outqueue)
- surface_offset = z_data[i]
- new.info['surface distance offset (m)'] = surface_offset
- new.info['surface deflection offset (m)'] = deflection_offset
- new.info['surface detection parameters'] = ps
- new[:,-2] = z_data - surface_offset
- new[:,-1] = d_data - deflection_offset
- data = params['curve'].data[params['block']]
- params['curve'].data[params['block']] = new
-
- def find_contact_point(self, curve, z_data, d_data, outqueue=None):
+ self._add_to_command_stack(params)
+ params = self._setup_params(hooke=hooke, params=params)
+ block = self._block(hooke=hooke, params=params)
+ dist_data = self._get_column(hooke=hooke, params=params,
+ column_name='distance column')
+ def_data = self._get_column(hooke=hooke, params=params,
+ column_name='deflection column')
+ i,def_offset,ps = self.find_contact_point(
+ params, dist_data, def_data, outqueue)
+ dist_offset = dist_data[i]
+ block.info[params['distance info name']] = dist_offset
+ block.info[params['deflection info name']] = def_offset
+ block.info[params['fit parameters info name']] = ps
+ self._set_column(hooke=hooke, params=params,
+ column_name='output distance column',
+ values=dist_data - dist_offset)
+ self._set_column(hooke=hooke, params=params,
+ column_name='output deflection column',
+ values=def_data - def_offset)
+
+ def _setup_params(self, hooke, params):
+ name,dist_unit = split_data_label(params['distance column'])
+ name,def_unit = split_data_label(params['deflection column'])
+ params['output distance column'] = join_data_label(
+ params['output distance column'], dist_unit)
+ params['output deflection column'] = join_data_label(
+ params['output deflection column'], def_unit)
+ params['distance info name'] = join_data_label(
+ params['distance info name'], dist_unit)
+ params['deflection info name'] = join_data_label(
+ params['deflection info name'], def_unit)
+ return params
+
+ def find_contact_point(self, params, z_data, d_data, outqueue=None):
"""Railyard for the `find_contact_point_*` family.
Uses the `surface contact point algorithm` configuration
"""
fn = getattr(self, 'find_contact_point_%s'
% self.plugin.config['surface contact point algorithm'])
- return fn(curve, z_data, d_data, outqueue)
+ return fn(params, z_data, d_data, outqueue)
- def find_contact_point_fmms(self, curve, z_data, d_data, outqueue=None):
+ def find_contact_point_fmms(self, params, z_data, d_data, outqueue=None):
"""Algorithm by Francesco Musiani and Massimo Sandal.
Notes
curve, until you find a point with greater than baseline
deflection. That point is the contact point.
"""
- if curve.info['filetype'] == 'picoforce':
+ if params['curve'].info['filetype'] == 'picoforce':
# Take care of the picoforce trigger bug (TODO: example
# data file demonstrating the bug). We exclude portions
# of the curve that have too much standard deviation.
i += 1
return (i, d_baseline, {})
- def find_contact_point_ms(self, curve, z_data, d_data, outqueue=None):
+ def find_contact_point_ms(self, params, z_data, d_data, outqueue=None):
"""Algorithm by Massimo Sandal.
Notes
else:
return dummy.index
- def find_contact_point_wtk(self, curve, z_data, d_data, outqueue=None):
+ def find_contact_point_wtk(self, params, z_data, d_data, outqueue=None):
"""Algorithm by W. Trevor King.
Notes
-----
- Uses :func:`analyze_surf_pos_data_wtk` internally.
+ Uses :class:`SurfacePositionModel` internally.
"""
reverse = z_data[0] > z_data[-1]
if reverse == True: # approaching, contact region on the right
d_data = d_data[::-1]
- s = SurfacePositionModel(d_data)
- s.info['force zero non-contact slope'] = True
+ s = SurfacePositionModel(d_data, info={
+ 'force zero non-contact slope':True},
+ rescale=True)
+ for argument in self._wtk_fit_check_arguments:
+ s.info[argument.name] = params[argument.name]
+ ignore_index = None
+ if params['ignore index'] != None:
+ ignore_index = params['ignore index']
+ elif params['ignore after last peak info name'] != None:
+ peaks = z_data.info[params['ignore after last peak info name']]
+ if not len(peaks) > 0:
+ raise Failure('Need at least one peak in %s, not %s'
+ % (params['ignore after last peak info name'],
+ peaks))
+ ignore_index = peaks[-1].post_index()
+ if ignore_index != None:
+ s.info['ignore non-contact before index'] = ignore_index
offset,contact_slope,surface_index,non_contact_slope = s.fit(
outqueue=outqueue)
info = {
surface_index = len(d_data)-1-surface_index
return (numpy.round(surface_index), deflection_offset, info)
-class ForceCommand (Command):
- """Calculate a block's `deflection (N)` array.
- Uses the block's `deflection (m)` array and `spring constant
- (N/m)`.
+class ForceCommand (ColumnAddingCommand):
+ """Convert a deflection column from meters to newtons.
"""
def __init__(self, plugin):
super(ForceCommand, self).__init__(
- name='add block force array',
+ name='convert distance to force',
+ columns=[
+ ('deflection column', 'surface deflection (m)', """
+Name of the column to use as the deflection input.
+""".strip()),
+ ],
+ new_columns=[
+ ('output deflection column', 'deflection', """
+Name of the column (without units) to use as the deflection output.
+""".strip()),
+ ],
arguments=[
- CurveArgument,
- Argument(name='block', aliases=['set'], type='int', default=0,
+ Argument(name='spring constant info name', type='string',
+ default='spring constant (N/m)',
help="""
-Data block for which the force should be calculated. For an
-approach/retract force curve, `0` selects the approaching curve and `1`
-selects the retracting curve.
+Name of the spring constant in the `.info` dictionary.
""".strip()),
],
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
- # HACK? rely on params['curve'] being bound to the local hooke
- # playlist (i.e. not a copy, as you would get by passing a
- # curve through the queue). Ugh. Stupid queues. As an
- # alternative, we could pass lookup information through the
- # queue.
- new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-1] = data
- new.info['columns'].append('deflection (N)')
- d_data = data[:,data.info['columns'].index('surface adjusted deflection (m)')]
- new[:,-1] = d_data * data.info['spring constant (N/m)']
- params['curve'].data[params['block']] = new
-
-
-class generalvclampCommands(object):
-
- def do_subtplot(self, args):
- '''
- SUBTPLOT
- (procplots.py plugin)
- Plots the difference between ret and ext current curve
- -------
- Syntax: subtplot
- '''
- #FIXME: sub_filter and sub_order must be args
-
- if len(self.plots[0].vectors) != 2:
- print 'This command only works on a curve with two different plots.'
- pass
-
- outplot=self.subtract_curves(sub_order=1)
-
- plot_graph=self.list_of_events['plot_graph']
- wx.PostEvent(self.frame,plot_graph(plots=[outplot]))
-
- def _plug_init(self):
- self.basecurrent=None
- self.basepoints=None
- self.autofile=''
-
- def do_distance(self,args):
- '''
- DISTANCE
- (generalvclamp.py)
- Measure the distance (in nm) between two points.
- For a standard experiment this is the delta X distance.
- For a force clamp experiment this is the delta Y distance (actually becomes
- an alias of zpiezo)
- -----------------
- Syntax: distance
- '''
- if self.current.curve.experiment == 'clamp':
- print 'You wanted to use zpiezo perhaps?'
- return
- else:
- dx,unitx,dy,unity=self._delta(set=1)
- print str(dx*(10**9))+' nm'
- to_dump='distance '+self.current.path+' '+str(dx*(10**9))+' nm'
- self.outlet.push(to_dump)
-
-
- def do_force(self,args):
- '''
- FORCE
- (generalvclamp.py)
- Measure the force difference (in pN) between two points
- ---------------
- Syntax: force
- '''
- if self.current.curve.experiment == 'clamp':
- print 'This command makes no sense for a force clamp experiment.'
- return
- dx,unitx,dy,unity=self._delta(set=1)
- print str(dy*(10**12))+' pN'
- to_dump='force '+self.current.path+' '+str(dy*(10**12))+' pN'
- self.outlet.push(to_dump)
-
-
- def do_forcebase(self,args):
- '''
- FORCEBASE
- (generalvclamp.py)
- Measures the difference in force (in pN) between a point and a baseline
- took as the average between two points.
-
- The baseline is fixed once for a given curve and different force measurements,
- unless the user wants it to be recalculated
- ------------
- Syntax: forcebase [rebase]
- rebase: Forces forcebase to ask again the baseline
- max: Instead of asking for a point to measure, asks for two points and use
- the maximum peak in between
- '''
- rebase=False #if true=we select rebase
- maxpoint=False #if true=we measure the maximum peak
-
- plot=self._get_displayed_plot()
- whatset=1 #fixme: for all sets
- if 'rebase' in args or (self.basecurrent != self.current.path):
- rebase=True
- if 'max' in args:
- maxpoint=True
-
- if rebase:
- print 'Select baseline'
- self.basepoints=self._measure_N_points(N=2, whatset=whatset)
- self.basecurrent=self.current.path
-
- if maxpoint:
- print 'Select two points'
- points=self._measure_N_points(N=2, whatset=whatset)
- boundpoints=[points[0].index, points[1].index]
- boundpoints.sort()
- try:
- y=min(plot.vectors[whatset][1][boundpoints[0]:boundpoints[1]])
- except ValueError:
- print 'Chosen interval not valid. Try picking it again. Did you pick the same point as begin and end of interval?'
- else:
- print 'Select point to measure'
- points=self._measure_N_points(N=1, whatset=whatset)
- #whatplot=points[0].dest
- y=points[0].graph_coords[1]
-
- #fixme: code duplication
- boundaries=[self.basepoints[0].index, self.basepoints[1].index]
- boundaries.sort()
- to_average=plot.vectors[whatset][1][boundaries[0]:boundaries[1]] #y points to average
-
- avg=np.mean(to_average)
- forcebase=abs(y-avg)
- print str(forcebase*(10**12))+' pN'
- to_dump='forcebase '+self.current.path+' '+str(forcebase*(10**12))+' pN'
- self.outlet.push(to_dump)
-
- def plotmanip_multiplier(self, plot, current):
- '''
- Multiplies all the Y values of an SMFS curve by a value stored in the 'force_multiplier'
- configuration variable. Useful for calibrations and other stuff.
- '''
-
- #not a smfs curve...
- if current.curve.experiment != 'smfs':
- return plot
-
- #only one set is present...
- if len(self.plots[0].vectors) != 2:
- return plot
-
- #multiplier is 1...
- if (self.config['force_multiplier']==1):
- return plot
-
- for i in range(len(plot.vectors[0][1])):
- plot.vectors[0][1][i]=plot.vectors[0][1][i]*self.config['force_multiplier']
-
- for i in range(len(plot.vectors[1][1])):
- plot.vectors[1][1][i]=plot.vectors[1][1][i]*self.config['force_multiplier']
-
- return plot
-
-
- def plotmanip_flatten(self, plot, current, customvalue=False):
- '''
- Subtracts a polynomial fit to the non-contact part of the curve, as to flatten it.
- the best polynomial fit is chosen among polynomials of degree 1 to n, where n is
- given by the configuration file or by the customvalue.
+ self._add_to_command_stack(params)
+ params = self._setup_params(hooke=hooke, params=params)
+ def_data = self._get_column(hooke=hooke, params=params,
+ column_name='deflection column')
+ out = def_data * def_data.info[params['spring constant info name']]
+ self._set_column(hooke=hooke, params=params,
+ column_name='output deflection column',
+ values=out)
+
+ def _setup_params(self, hooke, params):
+ name,in_unit = split_data_label(params['deflection column'])
+ out_unit = 'N' # HACK: extract target units from k_unit.
+ params['output deflection column'] = join_data_label(
+ params['output deflection column'], out_unit)
+ name,k_unit = split_data_label(params['spring constant info name'])
+ expected_k_unit = '%s/%s' % (out_unit, in_unit)
+ if k_unit != expected_k_unit:
+ raise Failure('Cannot convert from %s to %s with %s'
+ % (params['deflection column'],
+ params['output deflection column'],
+ params['spring constant info name']))
+ return params
- customvalue= int (>0) --> starts the function even if config says no (default=False)
- '''
- #not a smfs curve...
- if current.curve.experiment != 'smfs':
- return plot
+class CantileverAdjustedExtensionCommand (ColumnAddingCommand):
+ """Remove cantilever extension from a total extension column.
- #only one set is present...
- if len(self.plots[0].vectors) != 2:
- return plot
+ If `distance column` and `deflection column` have the same units
+ (e.g. `z piezo (m)` and `deflection (m)`), `spring constant info
+ name` is ignored and a deflection/distance conversion factor of
+ one is used.
+ """
+ def __init__(self, plugin):
+ super(CantileverAdjustedExtensionCommand, self).__init__(
+ name='remove cantilever from extension',
+ columns=[
+ ('distance column', 'surface distance (m)', """
+Name of the column to use as the surface position input.
+""".strip()),
+ ('deflection column', 'deflection (N)', """
+Name of the column to use as the deflection input.
+""".strip()),
+ ],
+ new_columns=[
+ ('output distance column', 'cantilever adjusted extension', """
+Name of the column (without units) to use as the surface position output.
+""".strip()),
+ ],
+ arguments=[
+ Argument(name='spring constant info name', type='string',
+ default='spring constant (N/m)',
+ help="""
+Name of the spring constant in the `.info` dictionary.
+""".strip()),
+ ],
+ help=self.__doc__, plugin=plugin)
- #config is not flatten, and customvalue flag is false too
- if (not self.config['flatten']) and (not customvalue):
- return plot
+ def _run(self, hooke, inqueue, outqueue, params):
+ self._add_to_command_stack(params)
+ params = self._setup_params(hooke=hooke, params=params)
+ def_data = self._get_column(hooke=hooke, params=params,
+ column_name='deflection column')
+ dist_data = self._get_column(hooke=hooke, params=params,
+ column_name='distance column')
+ if params['spring constant info name'] == None:
+ k = 1.0 # distance and deflection in the same units
+ else:
+ k = def_data.info[params['spring constant info name']]
+ self._set_column(hooke=hooke, params=params,
+ column_name='output distance column',
+ values=dist_data - def_data / k)
+
+ def _setup_params(self, hooke, params):
+ name,dist_unit = split_data_label(params['distance column'])
+ name,def_unit = split_data_label(params['deflection column'])
+ params['output distance column'] = join_data_label(
+ params['output distance column'], dist_unit)
+ if dist_unit == def_unit:
+ params['spring constant info name'] == None
+ else:
+ name,k_unit = split_data_label(params['spring constant info name'])
+ expected_k_unit = '%s/%s' % (def_unit, dist_unit)
+ if k_unit != expected_k_unit:
+ raise Failure('Cannot convert from %s to %s with %s'
+ % (params['deflection column'],
+ params['output distance column'],
+ params['spring constant info name']))
+ return params
- max_exponent=12
- delta_contact=0
- if customvalue:
- max_cycles=customvalue
- else:
- max_cycles=self.config['flatten'] #Using > 1 usually doesn't help and can give artefacts. However, it could be useful too.
+class FlattenCommand (ColumnAddingCommand):
+ """Flatten a deflection column.
- contact_index=self.find_contact_point()
+ Subtracts a polynomial fit from the non-contact part of the curve
+ to flatten it. The best polynomial fit is chosen among
+ polynomials of degree 1 to `max degree`.
- valn=[[] for item in range(max_exponent)]
- yrn=[0.0 for item in range(max_exponent)]
- errn=[0.0 for item in range(max_exponent)]
-
- #Check if we have a proper numerical value
- try:
- zzz=int(max_cycles)
- except:
- #Loudly and annoyingly complain if it's not a number, then fallback to zero
- print '''Warning: flatten value is not a number!
- Use "set flatten" or edit hooke.conf to set it properly
- Using zero.'''
- max_cycles=0
-
- for i in range(int(max_cycles)):
-
- x_ext=plot.vectors[0][0][contact_index+delta_contact:]
- y_ext=plot.vectors[0][1][contact_index+delta_contact:]
- x_ret=plot.vectors[1][0][contact_index+delta_contact:]
- y_ret=plot.vectors[1][1][contact_index+delta_contact:]
- for exponent in range(max_exponent):
- try:
- valn[exponent]=sp.polyfit(x_ext,y_ext,exponent)
- yrn[exponent]=sp.polyval(valn[exponent],x_ret)
- errn[exponent]=sp.sqrt(sum((yrn[exponent]-y_ext)**2)/float(len(y_ext)))
- except Exception,e:
- print 'Cannot flatten!'
- print e
- return plot
-
- best_exponent=errn.index(min(errn))
-
- #extension
- ycorr_ext=y_ext-yrn[best_exponent]+y_ext[0] #noncontact part
- yjoin_ext=np.array(plot.vectors[0][1][0:contact_index+delta_contact]) #contact part
- #retraction
- ycorr_ret=y_ret-yrn[best_exponent]+y_ext[0] #noncontact part
- yjoin_ret=np.array(plot.vectors[1][1][0:contact_index+delta_contact]) #contact part
-
- ycorr_ext=np.concatenate((yjoin_ext, ycorr_ext))
- ycorr_ret=np.concatenate((yjoin_ret, ycorr_ret))
-
- plot.vectors[0][1]=list(ycorr_ext)
- plot.vectors[1][1]=list(ycorr_ret)
-
- return plot
-
- #---SLOPE---
- def do_slope(self,args):
- '''
- SLOPE
- (generalvclamp.py)
- Measures the slope of a delimited chunk on the return trace.
- The chunk can be delimited either by two manual clicks, or have
- a fixed width, given as an argument.
- ---------------
- Syntax: slope [width]
- The facultative [width] parameter specifies how many
- points will be considered for the fit. If [width] is
- specified, only one click will be required.
- (c) Marco Brucale, Massimo Sandal 2008
- '''
-
- # Reads the facultative width argument
- try:
- fitspan=int(args)
- except:
- fitspan=0
-
- # Decides between the two forms of user input, as per (args)
- if fitspan == 0:
- # Gets the Xs of two clicked points as indexes on the current curve vector
- print 'Click twice to delimit chunk'
- points=self._measure_N_points(N=2,whatset=1)
- else:
- print 'Click once on the leftmost point of the chunk (i.e.usually the peak)'
- points=self._measure_N_points(N=1,whatset=1)
-
- slope=self._slope(points,fitspan)
-
- # Outputs the relevant slope parameter
- print 'Slope:'
- print str(slope)
- to_dump='slope '+self.current.path+' '+str(slope)
- self.outlet.push(to_dump)
-
- def _slope(self,points,fitspan):
- # Calls the function linefit_between
- parameters=[0,0,[],[]]
- try:
- clickedpoints=[points[0].index,points[1].index]
- clickedpoints.sort()
- except:
- clickedpoints=[points[0].index-fitspan,points[0].index]
+ .. todo: Why does flattening use a polynomial fit and not a sinusoid?
+ Isn't most of the oscillation due to laser interference?
+ See Jaschke 1995 ( 10.1063/1.1146018 )
+ and the figure 4 caption of Weisenhorn 1992 ( 10.1103/PhysRevB.45.11226 )
+ """
+ def __init__(self, plugin):
+ super(FlattenCommand, self).__init__(
+ name='polynomial flatten',
+ columns=[
+ ('distance column', 'surface distance (m)', """
+Name of the column to use as the surface position input.
+""".strip()),
+ ('deflection column', 'deflection (N)', """
+Name of the column to use as the deflection input.
+""".strip()),
+ ],
+ new_columns=[
+ ('output deflection column', 'flattened deflection', """
+Name of the column (without units) to use as the deflection output.
+""".strip()),
+ ],
+ arguments=[
+ Argument(name='degree', type='int',
+ default=1,
+ help="""
+Order of the polynomial used for flattening. Using values greater
+than one usually doesn't help and can give artifacts. However, it
+could be useful too. (TODO: Back this up with some theory...)
+""".strip()),
+ Argument(name='fit info name', type='string',
+ default='flatten fit',
+ help="""
+Name of the flattening information in the `.info` dictionary.
+""".strip()),
+ ],
+ help=self.__doc__, plugin=plugin)
+
+ def _run(self, hooke, inqueue, outqueue, params):
+ self._add_to_command_stack(params)
+ params = self._setup_params(hooke=hooke, params=params)
+ block = self._block(hooke=hooke, params=params)
+ dist_data = self._get_column(hooke=hooke, params=params,
+ column_name='distance column')
+ def_data = self._get_column(hooke=hooke, params=params,
+ column_name='deflection column')
+ degree = params['degree']
+ mask = dist_data > 0
+ indices = numpy.argwhere(mask)
+ if len(indices) == 0:
+ raise Failure('no positive distance values in %s'
+ % params['distance column'])
+ dist_nc = dist_data[indices].flatten()
+ def_nc = def_data[indices].flatten()
try:
- parameters=self.linefit_between(clickedpoints[0],clickedpoints[1])
- except:
- print 'Cannot fit. Did you click twice the same point?'
- return
-
- # Outputs the relevant slope parameter
- print 'Slope:'
- print str(parameters[0])
- to_dump='slope '+self.curve.path+' '+str(parameters[0])
- self.outlet.push(to_dump)
-
- # Makes a vector with the fitted parameters and sends it to the GUI
- xtoplot=parameters[2]
- ytoplot=[]
- x=0
- for x in xtoplot:
- ytoplot.append((x*parameters[0])+parameters[1])
-
- clickvector_x, clickvector_y=[], []
- for item in points:
- clickvector_x.append(item.graph_coords[0])
- clickvector_y.append(item.graph_coords[1])
-
- lineplot=self._get_displayed_plot(0) #get topmost displayed plot
-
- lineplot.add_set(xtoplot,ytoplot)
- lineplot.add_set(clickvector_x, clickvector_y)
-
-
- if lineplot.styles==[]:
- lineplot.styles=[None,None,None,'scatter']
- else:
- lineplot.styles+=[None,'scatter']
- if lineplot.colors==[]:
- lineplot.colors=[None,None,'black',None]
- else:
- lineplot.colors+=['black',None]
-
-
- self._send_plot([lineplot])
-
- return parameters[0]
-
-
- def linefit_between(self,index1,index2,whatset=1):
- '''
- Creates two vectors (xtofit,ytofit) slicing out from the
- current return trace a portion delimited by the two indexes
- given as arguments.
- Then does a least squares linear fit on that slice.
- Finally returns [0]=the slope, [1]=the intercept of the
- fitted 1st grade polynomial, and [2,3]=the actual (x,y) vectors
- used for the fit.
- (c) Marco Brucale, Massimo Sandal 2008
- '''
- # Translates the indexes into two vectors containing the x,y data to fit
- xtofit=self.plots[0].vectors[whatset][0][index1:index2]
- ytofit=self.plots[0].vectors[whatset][1][index1:index2]
-
- # Does the actual linear fitting (simple least squares with numpy.polyfit)
- linefit=[]
- linefit=np.polyfit(xtofit,ytofit,1)
-
- return (linefit[0],linefit[1],xtofit,ytofit)
-
-
- def fit_interval_nm(self,start_index,plot,nm,backwards):
- '''
- Calculates the number of points to fit, given a fit interval in nm
- start_index: index of point
- plot: plot to use
- backwards: if true, finds a point backwards.
- '''
- whatset=1 #FIXME: should be decidable
- x_vect=plot.vectors[1][0]
-
- c=0
- i=start_index
- start=x_vect[start_index]
- maxlen=len(x_vect)
- while abs(x_vect[i]-x_vect[start_index])*(10**9) < nm:
- if i==0 or i==maxlen-1: #we reached boundaries of vector!
- return c
-
- if backwards:
- i-=1
- else:
- i+=1
- c+=1
- return c
-
-
-
- def find_current_peaks(self,noflatten, a=True, maxpeak=True):
- #Find peaks.
- if a==True:
- a=self.convfilt_config['mindeviation']
- try:
- abs_devs=float(a)
- except:
- print "Bad input, using default."
- abs_devs=self.convfilt_config['mindeviation']
-
- defplot=self.current.curve.default_plots()[0]
- if not noflatten:
- flatten=self._find_plotmanip('flatten') #Extract flatten plotmanip
- defplot=flatten(defplot, self.current, customvalue=1) #Flatten curve before feeding it to has_peaks
- pk_location,peak_size=self.has_peaks(defplot, abs_devs, maxpeak)
- return pk_location, peak_size
-
-
- def pickup_contact_point(self,N=1,whatset=1):
- '''macro to pick up the contact point by clicking'''
- contact_point=self._measure_N_points(N=1, whatset=1)[0]
- contact_point_index=contact_point.index
- self.wlccontact_point=contact_point
- self.wlccontact_index=contact_point.index
- self.wlccurrent=self.current.path
- return contact_point, contact_point_index
-
-
- def baseline_points(self,peak_location, displayed_plot):
- clicks=self.config['baseline_clicks']
- if clicks==0:
- self.basepoints=[]
- base_index_0=peak_location[-1]+self.fit_interval_nm(peak_location[-1], displayed_plot, self.config['auto_right_baseline'],False)
- self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_0))
- base_index_1=self.basepoints[0].index+self.fit_interval_nm(self.basepoints[0].index, displayed_plot, self.config['auto_left_baseline'],False)
- self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_1))
- elif clicks>0:
- print 'Select baseline'
- if clicks==1:
- self.basepoints=self._measure_N_points(N=1, whatset=1)
- base_index_1=self.basepoints[0].index+self.fit_interval_nm(self.basepoints[0].index, displayed_plot, self.config['auto_left_baseline'], False)
- self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_1))
- else:
- self.basepoints=self._measure_N_points(N=2, whatset=1)
-
- self.basecurrent=self.current.path
- return self.basepoints
+ poly_values = scipy.polyfit(dist_nc, def_nc, degree)
+ def_nc_fit = scipy.polyval(poly_values, dist_nc)
+ except Exception, e:
+ raise Failure('failed to flatten with a degree %d polynomial: %s'
+ % (degree, e))
+ error = numpy.sqrt((def_nc_fit-def_nc)**2).sum() / len(def_nc)
+ block.info[params['fit info name']] = {
+ 'error':error,
+ 'degree':degree,
+ 'polynomial values':poly_values,
+ }
+ out = (def_data
+ - numpy.invert(mask)*scipy.polyval(poly_values[-1:], dist_data)
+ - mask*scipy.polyval(poly_values, dist_data))
+ self._set_column(hooke=hooke, params=params,
+ column_name='output deflection column',
+ values=out)
+
+ def _setup_params(self, hooke, params):
+ d_name,d_unit = split_data_label(params['deflection column'])
+ params['output deflection column'] = join_data_label(
+ params['output deflection column'], d_unit)
+ return params