1 # Copyright (C) 2008-2010 Alberto Gomez-Casado
4 # Massimo Sandal <devicerandom@gmail.com>
5 # W. Trevor King <wking@drexel.edu>
7 # This file is part of Hooke.
9 # Hooke is free software: you can redistribute it and/or modify it
10 # under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, either version 3 of the
12 # License, or (at your option) any later version.
14 # Hooke is distributed in the hope that it will be useful, but WITHOUT
15 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
16 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
17 # Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with Hooke. If not, see
21 # <http://www.gnu.org/licenses/>.
23 """The ``vclamp`` module provides :class:`VelocityClampPlugin` and
24 several associated :class:`hooke.command.Command`\s for handling
25 common velocity clamp analysis tasks.
34 from ..command import Command, Argument, Failure, NullQueue
35 from ..config import Setting
36 from ..curve import Data
37 from ..plugin import Plugin
38 from ..util.fit import PoorFit, ModelFitter
39 from ..util.si import join_data_label, split_data_label
40 from .curve import CurveArgument
43 def scale(hooke, curve, block=None):
44 """Run 'add block force array' on `block`.
46 Runs 'zero block surface contact point' first, if necessary. Does
47 not run either command if the columns they add to the block are
50 If `block` is `None`, scale all blocks in `curve`.
52 commands = hooke.commands
53 contact = [c for c in hooke.commands
54 if c.name == 'zero block surface contact point'][0]
55 force = [c for c in hooke.commands if c.name == 'add block force array'][0]
56 cant_adjust = [c for c in hooke.commands
57 if c.name == 'add block cantilever adjusted extension array'][0]
59 outqueue = NullQueue()
61 for i in range(len(curve.data)):
62 scale(hooke, curve, block=i)
64 params = {'curve':curve, 'block':block}
66 if ('surface distance (m)' not in b.info['columns']
67 or 'surface deflection (m)' not in b.info['columns']):
69 contact.run(hooke, inqueue, outqueue, **params)
71 raise PoorFit('Could not fit %s %s: %s'
72 % (curve.path, block, str(e)))
73 if ('deflection (N)' not in b.info['columns']):
74 force.run(hooke, inqueue, outqueue, **params)
75 if ('cantilever adjusted extension (m)' not in b.info['columns']):
76 cant_adjust.run(hooke, inqueue, outqueue, **params)
80 class SurfacePositionModel (ModelFitter):
81 """Bilinear surface position model.
83 The bilinear model is symmetric, but the parameter guessing and
84 sanity checks assume the contact region occurs for lower indicies
85 ("left of") the non-contact region. We also assume that
86 tip-surface attractions produce positive deflections.
90 Algorithm borrowed from WTK's `piezo package`_, specifically
91 from :func:`piezo.z_piezo_utils.analyzeSurfPosData`.
94 http://www.physics.drexel.edu/~wking/code/git/git.php?p=piezo.git
96 Fits the data to the bilinear :method:`model`.
98 In order for this model to produce a satisfactory fit, there
99 should be enough data in the off-surface region that interactions
100 due to proteins, etc. will not seriously skew the fit in the
101 off-surface region. If you don't have much of a tail, you can set
102 the `info` dict's `ignore non-contact before index` parameter to
103 the index of the last surface- or protein-related feature.
105 def model(self, params):
106 """A continuous, bilinear model.
113 p_0 + p_1 x & \text{if $x <= p_2$}, \\
114 p_0 + p_1 p_2 + p_3 (x-p_2) & \text{if $x >= p_2$}.
117 Where :math:`p_0` is a vertical offset, :math:`p_1` is the slope
118 of the first region, :math:`p_2` is the transition location, and
119 :math:`p_3` is the slope of the second region.
121 p = params # convenient alias
122 rNC_ignore = self.info['ignore non-contact before index']
123 if self.info['force zero non-contact slope'] == True:
125 p.append(0.) # restore the non-contact slope parameter
126 r2 = numpy.round(abs(p[2]))
128 self._model_data[:r2] = p[0] + p[1] * numpy.arange(r2)
129 if r2 < len(self._data)-1:
130 self._model_data[r2:] = \
131 p[0] + p[1]*p[2] + p[3] * numpy.arange(len(self._data)-r2)
133 self._model_data[r2:rNC_ignore] = self._data[r2:rNC_ignore]
134 return self._model_data
136 def set_data(self, data, info=None, *args, **kwargs):
137 super(SurfacePositionModel, self).set_data(data, info, *args, **kwargs)
138 if self.info == None:
141 ('force zero non-contact slope', False),
142 ('ignore non-contact before index', 6158),
143 ('min position', 0), # Store postions etc. to avoid recalculating.
144 ('max position', len(data)),
145 ('max deflection', data.max()),
146 ('min deflection', data.min()),
148 if key not in self.info:
149 self.info[key] = value
152 self.info['max position'] - self.info['min position']),
154 self.info['max deflection'] - self.info['min deflection']),
156 if key not in self.info:
157 self.info[key] = value
159 def guess_initial_params(self, outqueue=None):
160 """Guess the initial parameters.
164 We guess initial parameters such that the offset (:math:`p_1`)
165 matches the minimum deflection, the kink (:math:`p_2`) occurs in
166 the middle of the data, the initial (contact) slope (:math:`p_0`)
167 produces the maximum deflection at the left-most point, and the
168 final (non-contact) slope (:math:`p_3`) is zero.
170 left_offset = self.info['min deflection']
171 left_slope = 2*(self.info['deflection range']
172 /self.info['position range'])
173 kink_position = (self.info['max position']
174 +self.info['min position'])/2.0
176 self.info['guessed contact slope'] = left_slope
177 params = [left_offset, left_slope, kink_position, right_slope]
178 if self.info['force zero non-contact slope'] == True:
182 def guess_scale(self, params, outqueue=None):
183 """Guess the parameter scales.
187 We the scale as one tenth for each parameter.
190 left_slope_scale = 0.1
192 right_slope_scale = 0.1
193 scale = [offset_scale, left_slope_scale, kink_scale, right_slope_scale]
194 if self.info['force zero non-contact slope'] == True:
198 def fit(self, *args, **kwargs):
199 """Fit the model to the data.
203 We change the `epsfcn` default from :func:`scipy.optimize.leastsq`'s
204 `0` to `1e-3`, so the initial Jacobian estimate takes larger steps,
205 which helps avoid being trapped in noise-generated local minima.
207 self.info['guessed contact slope'] = None
208 if 'epsfcn' not in kwargs:
209 kwargs['epsfcn'] = 1e-3 # take big steps to estimate the Jacobian
210 params = super(SurfacePositionModel, self).fit(*args, **kwargs)
211 params[2] = abs(params[2])
212 if self.info['force zero non-contact slope'] == True:
213 params = list(params)
214 params.append(0.) # restore the non-contact slope parameter
216 # check that the fit is reasonable, see the :meth:`model` docstring
217 # for parameter descriptions. HACK: hardcoded cutoffs.
218 if abs(params[3]*10) > abs(params[1]) :
219 raise PoorFit('Slope in non-contact region, or no slope in contact')
220 if params[2] < self.info['min position']+0.02*self.info['position range']:
222 'No kink (kink %g less than %g, need more space to left)'
224 self.info['min position']+0.02*self.info['position range']))
225 if params[2] > self.info['max position']-0.02*self.info['position range']:
227 'No kink (kink %g more than %g, need more space to right)'
229 self.info['max position']-0.02*self.info['position range']))
230 if (self.info['guessed contact slope'] != None
231 and abs(params[1]) < 0.5 * abs(self.info['guessed contact slope'])):
232 raise PoorFit('Too far (contact slope %g, but expected ~%g'
233 % (params[3], self.info['guessed contact slope']))
237 class VelocityClampPlugin (Plugin):
239 super(VelocityClampPlugin, self).__init__(name='vclamp')
241 SurfaceContactCommand(self), ForceCommand(self),
242 CantileverAdjustedExtensionCommand(self), FlattenCommand(self),
245 def default_settings(self):
247 Setting(section=self.setting_section, help=self.__doc__),
248 Setting(section=self.setting_section,
249 option='surface contact point algorithm',
251 help='Select the surface contact point algorithm. See the documentation for descriptions of available algorithms.')
255 class SurfaceContactCommand (Command):
256 """Automatically determine a block's surface contact point.
258 You can select the contact point algorithm with the creatively
259 named `surface contact point algorithm` configuration setting.
260 Currently available options are:
262 * fmms (:meth:`find_contact_point_fmms`)
263 * ms (:meth:`find_contact_point_ms`)
264 * wtk (:meth:`find_contact_point_wtk`)
266 def __init__(self, plugin):
267 super(SurfaceContactCommand, self).__init__(
268 name='zero block surface contact point',
271 Argument(name='block', aliases=['set'], type='int', default=0,
273 Data block for which the force should be calculated. For an
274 approach/retract force curve, `0` selects the approaching curve and `1`
275 selects the retracting curve.
277 Argument(name='input distance column', type='string',
278 default='z piezo (m)',
280 Name of the column to use as the surface position input.
282 Argument(name='input deflection column', type='string',
283 default='deflection (m)',
285 Name of the column to use as the deflection input.
287 Argument(name='output distance column', type='string',
288 default='surface distance',
290 Name of the column (without units) to use as the surface position output.
292 Argument(name='output deflection column', type='string',
293 default='surface deflection',
295 Name of the column (without units) to use as the deflection output.
297 Argument(name='distance info name', type='string',
298 default='surface distance offset',
300 Name (without units) for storing the distance offset in the `.info` dictionary.
302 Argument(name='deflection info name', type='string',
303 default='surface deflection offset',
305 Name (without units) for storing the deflection offset in the `.info` dictionary.
307 Argument(name='fit parameters info name', type='string',
308 default='surface deflection offset',
310 Name (without units) for storing fit parameters in the `.info` dictionary.
313 help=self.__doc__, plugin=plugin)
315 def _run(self, hooke, inqueue, outqueue, params):
316 data = params['curve'].data[params['block']]
317 # HACK? rely on params['curve'] being bound to the local hooke
318 # playlist (i.e. not a copy, as you would get by passing a
319 # curve through the queue). Ugh. Stupid queues. As an
320 # alternative, we could pass lookup information through the
322 new = Data((data.shape[0], data.shape[1]+2), dtype=data.dtype)
323 new.info = copy.deepcopy(data.info)
325 name,dist_units = split_data_label(params['input distance column'])
326 name,def_units = split_data_label(params['input deflection column'])
327 new.info['columns'].extend([
328 join_data_label(params['output distance column'], dist_units),
329 join_data_label(params['output deflection column'], def_units),
331 dist_data = data[:,data.info['columns'].index(
332 params['input distance column'])]
333 def_data = data[:,data.info['columns'].index(
334 params['input deflection column'])]
335 i,def_offset,ps = self.find_contact_point(
336 params['curve'], dist_data, def_data, outqueue)
337 dist_offset = dist_data[i]
338 new.info[join_data_label(params['distance info name'], dist_units
340 new.info[join_data_label(params['deflection info name'], def_units
342 new.info[params['fit parameters info name']] = ps
343 new[:,-2] = dist_data - dist_offset
344 new[:,-1] = def_data - def_offset
345 params['curve'].data[params['block']] = new
347 def find_contact_point(self, curve, z_data, d_data, outqueue=None):
348 """Railyard for the `find_contact_point_*` family.
350 Uses the `surface contact point algorithm` configuration
351 setting to call the appropriate backend algorithm.
353 fn = getattr(self, 'find_contact_point_%s'
354 % self.plugin.config['surface contact point algorithm'])
355 return fn(curve, z_data, d_data, outqueue)
357 def find_contact_point_fmms(self, curve, z_data, d_data, outqueue=None):
358 """Algorithm by Francesco Musiani and Massimo Sandal.
364 0) Driver-specific workarounds, e.g. deal with the PicoForce
365 trigger bug by excluding retraction portions with excessive
367 1) Select the second half (non-contact side) of the retraction
369 2) Fit the selection to a line.
370 3) If the fit is not almost horizontal, halve the selection
372 4) Average the selection and use it as a baseline.
373 5) Slide in from the start (contact side) of the retraction
374 curve, until you find a point with greater than baseline
375 deflection. That point is the contact point.
377 if curve.info['filetype'] == 'picoforce':
378 # Take care of the picoforce trigger bug (TODO: example
379 # data file demonstrating the bug). We exclude portions
380 # of the curve that have too much standard deviation.
381 # Yes, a lot of magic is here.
382 check_start = len(d_data)-len(d_data)/20
383 monster_start = len(d_data)
385 # look at the non-contact tail
386 non_monster = d_data[check_start:monster_start]
387 if non_monster.std() < 2e-10: # HACK: hardcoded cutoff
389 else: # move further away from the monster
390 check_start -= len(d_data)/50
391 monster_start -= len(d_data)/50
392 z_data = z_data[:monster_start]
393 d_data = d_data[:monster_start]
395 # take half of the thing to start
396 selection_start = len(d_data)/2
398 z_chunk = z_data[selection_start:]
399 d_chunk = d_data[selection_start:]
400 slope,intercept,r,two_tailed_prob,stderr_of_the_estimate = \
401 scipy.stats.linregress(z_chunk, d_chunk)
402 # We stop if we found an almost-horizontal fit or if we're
403 # getting to small a selection. FIXME: 0.1 and 5./6 here
404 # are "magic numbers" (although reasonable)
405 if (abs(slope) < 0.1 # deflection (m) / surface (m)
406 or selection_start > 5./6*len(d_data)):
408 selection_start += 10
410 d_baseline = d_chunk.mean()
412 # find the first point above the calculated baseline
414 while i < len(d_data) and d_data[i] < ymean:
416 return (i, d_baseline, {})
418 def find_contact_point_ms(self, curve, z_data, d_data, outqueue=None):
419 """Algorithm by Massimo Sandal.
423 WTK: At least the commits are by Massimo, and I see no notes
424 attributing the algorithm to anyone else.
430 xext=raw_plot.vectors[0][0]
431 yext=raw_plot.vectors[0][1]
432 xret2=raw_plot.vectors[1][0]
433 yret=raw_plot.vectors[1][1]
435 first_point=[xext[0], yext[0]]
436 last_point=[xext[-1], yext[-1]]
438 #regr=scipy.polyfit(first_point, last_point,1)[0:2]
439 diffx=abs(first_point[0]-last_point[0])
440 diffy=abs(first_point[1]-last_point[1])
442 #using polyfit results in numerical errors. good old algebra.
444 b=first_point[1]-(a*first_point[0])
445 baseline=scipy.polyval((a,b), xext)
447 ysub=[item-basitem for item,basitem in zip(yext,baseline)]
449 contact=ysub.index(min(ysub))
451 return xext,ysub,contact
453 #now, exploit a ClickedPoint instance to calculate index...
455 dummy.absolute_coords=(x_intercept,y_intercept)
456 dummy.find_graph_coords(xret2,yret)
459 return dummy.index, regr, regr_contact
463 def find_contact_point_wtk(self, curve, z_data, d_data, outqueue=None):
464 """Algorithm by W. Trevor King.
468 Uses :class:`SurfacePositionModel` internally.
470 reverse = z_data[0] > z_data[-1]
471 if reverse == True: # approaching, contact region on the right
472 d_data = d_data[::-1]
473 s = SurfacePositionModel(d_data, info={
474 'force zero non-contact slope':True},
476 offset,contact_slope,surface_index,non_contact_slope = s.fit(
480 'contact slope': contact_slope,
481 'surface index': surface_index,
482 'non-contact slope': non_contact_slope,
485 deflection_offset = offset + contact_slope*surface_index,
487 surface_index = len(d_data)-1-surface_index
488 return (numpy.round(surface_index), deflection_offset, info)
491 class ForceCommand (Command):
492 """Convert a deflection column from meters to newtons.
494 def __init__(self, plugin):
495 super(ForceCommand, self).__init__(
496 name='add block force array',
499 Argument(name='block', aliases=['set'], type='int', default=0,
501 Data block for which the force should be calculated. For an
502 approach/retract force curve, `0` selects the approaching curve and `1`
503 selects the retracting curve.
505 Argument(name='input deflection column', type='string',
506 default='surface deflection (m)',
508 Name of the column to use as the deflection input.
510 Argument(name='output deflection column', type='string',
511 default='deflection',
513 Name of the column (without units) to use as the deflection output.
515 Argument(name='spring constant info name', type='string',
516 default='spring constant (N/m)',
518 Name of the spring constant in the `.info` dictionary.
521 help=self.__doc__, plugin=plugin)
523 def _run(self, hooke, inqueue, outqueue, params):
524 data = params['curve'].data[params['block']]
525 # HACK? rely on params['curve'] being bound to the local hooke
526 # playlist (i.e. not a copy, as you would get by passing a
527 # curve through the queue). Ugh. Stupid queues. As an
528 # alternative, we could pass lookup information through the
530 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
531 new.info = copy.deepcopy(data.info)
533 new.info['columns'].append(
534 join_data_label(params['output deflection column'], 'N'))
535 d_data = data[:,data.info['columns'].index(
536 params['input deflection column'])]
537 new[:,-1] = d_data * data.info[params['spring constant info name']]
538 params['curve'].data[params['block']] = new
541 class CantileverAdjustedExtensionCommand (Command):
542 """Remove cantilever extension from a total extension column.
544 def __init__(self, plugin):
545 super(CantileverAdjustedExtensionCommand, self).__init__(
546 name='add block cantilever adjusted extension array',
549 Argument(name='block', aliases=['set'], type='int', default=0,
551 Data block for which the adjusted extension should be calculated. For
552 an approach/retract force curve, `0` selects the approaching curve and
553 `1` selects the retracting curve.
555 Argument(name='input distance column', type='string',
556 default='surface distance (m)',
558 Name of the column to use as the distance input.
560 Argument(name='input deflection column', type='string',
561 default='deflection (N)',
563 Name of the column to use as the deflection input.
565 Argument(name='output distance column', type='string',
566 default='cantilever adjusted extension',
568 Name of the column (without units) to use as the deflection output.
570 Argument(name='spring constant info name', type='string',
571 default='spring constant (N/m)',
573 Name of the spring constant in the `.info` dictionary.
576 help=self.__doc__, plugin=plugin)
578 def _run(self, hooke, inqueue, outqueue, params):
579 data = params['curve'].data[params['block']]
580 # HACK? rely on params['curve'] being bound to the local hooke
581 # playlist (i.e. not a copy, as you would get by passing a
582 # curve through the queue). Ugh. Stupid queues. As an
583 # alternative, we could pass lookup information through the
585 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
586 new.info = copy.deepcopy(data.info)
588 new.info['columns'].append(
589 join_data_label(params['output distance column'], 'm'))
590 z_data = data[:,data.info['columns'].index(
591 params['input distance column'])]
592 d_data = data[:,data.info['columns'].index(
593 params['input deflection column'])]
594 k = data.info[params['spring constant info name']]
596 z_name,z_unit = split_data_label(params['input distance column'])
597 assert z_unit == 'm', params['input distance column']
598 d_name,d_unit = split_data_label(params['input deflection column'])
599 assert d_unit == 'N', params['input deflection column']
600 k_name,k_unit = split_data_label(params['spring constant info name'])
601 assert k_unit == 'N/m', params['spring constant info name']
603 new[:,-1] = z_data - d_data / k
604 params['curve'].data[params['block']] = new
607 class FlattenCommand (Command):
608 """Flatten a deflection column.
610 Subtracts a polynomial fit from the non-contact part of the curve
611 to flatten it. The best polynomial fit is chosen among
612 polynomials of degree 1 to `max degree`.
614 .. todo: Why does flattening use a polynomial fit and not a sinusoid?
615 Isn't most of the oscillation due to laser interference?
616 See Jaschke 1995 ( 10.1063/1.1146018 )
617 and the figure 4 caption of Weisenhorn 1992 ( 10.1103/PhysRevB.45.11226 )
619 def __init__(self, plugin):
620 super(FlattenCommand, self).__init__(
621 name='add flattened extension array',
624 Argument(name='block', aliases=['set'], type='int', default=0,
626 Data block for which the adjusted extension should be calculated. For
627 an approach/retract force curve, `0` selects the approaching curve and
628 `1` selects the retracting curve.
630 Argument(name='max degree', type='int',
633 Highest order polynomial to consider for flattening. Using values
634 greater than one usually doesn't help and can give artifacts.
635 However, it could be useful too. (TODO: Back this up with some
638 Argument(name='input distance column', type='string',
639 default='surface distance (m)',
641 Name of the column to use as the distance input.
643 Argument(name='input deflection column', type='string',
644 default='deflection (N)',
646 Name of the column to use as the deflection input.
648 Argument(name='output deflection column', type='string',
649 default='flattened deflection',
651 Name of the column (without units) to use as the deflection output.
653 Argument(name='fit info name', type='string',
654 default='flatten fit',
656 Name of the flattening information in the `.info` dictionary.
659 help=self.__doc__, plugin=plugin)
661 def _run(self, hooke, inqueue, outqueue, params):
662 data = params['curve'].data[params['block']]
663 # HACK? rely on params['curve'] being bound to the local hooke
664 # playlist (i.e. not a copy, as you would get by passing a
665 # curve through the queue). Ugh. Stupid queues. As an
666 # alternative, we could pass lookup information through the
668 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
669 new.info = copy.deepcopy(data.info)
671 z_data = data[:,data.info['columns'].index(
672 params['input distance column'])]
673 d_data = data[:,data.info['columns'].index(
674 params['input deflection column'])]
676 d_name,d_unit = split_data_label(params['input deflection column'])
677 new.info['columns'].append(
678 join_data_label(params['output deflection column'], d_unit))
680 contact_index = numpy.absolute(z_data).argmin()
682 indices = numpy.argwhere(mask)
683 z_nc = z_data[indices].flatten()
684 d_nc = d_data[indices].flatten()
687 degree = poly_values = None
688 log = logging.getLogger('hooke')
689 for deg in range(params['max degree']):
691 pv = scipy.polyfit(z_nc, d_nc, deg)
692 df = scipy.polyval(pv, z_nc)
693 err = numpy.sqrt((df-d_nc)**2).sum()
695 log.warn('failed to flatten with a degree %d polynomial: %s'
698 if err < min_err: # new best fit
704 raise Failure('failed to flatten with all degrees')
705 new.info[params['fit info name']] = {
706 'error':min_err/len(z_nc),
708 'max degree':params['max degree'],
709 'polynomial values':poly_values,
711 new[:,-1] = d_data - mask*scipy.polyval(poly_values, z_data)
712 params['curve'].data[params['block']] = new