1 # Copyright (C) 2008-2010 Alberto Gomez-Casado
4 # Massimo Sandal <devicerandom@gmail.com>
5 # W. Trevor King <wking@drexel.edu>
7 # This file is part of Hooke.
9 # Hooke is free software: you can redistribute it and/or modify it
10 # under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, either version 3 of the
12 # License, or (at your option) any later version.
14 # Hooke is distributed in the hope that it will be useful, but WITHOUT
15 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
16 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
17 # Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with Hooke. If not, see
21 # <http://www.gnu.org/licenses/>.
23 """The ``vclamp`` module provides :class:`VelocityClampPlugin` and
24 several associated :class:`hooke.command.Command`\s for handling
25 common velocity clamp analysis tasks.
34 from ..command import Command, Argument, Failure, NullQueue
35 from ..config import Setting
36 from ..curve import Data
37 from ..plugin import Plugin
38 from ..util.fit import PoorFit, ModelFitter
39 from ..util.si import join_data_label, split_data_label
40 from .curve import CurveArgument
43 def scale(hooke, curve, block=None):
44 """Run 'add block force array' on `block`.
46 Runs 'zero block surface contact point' first, if necessary. Does
47 not run either command if the columns they add to the block are
50 If `block` is `None`, scale all blocks in `curve`.
52 commands = hooke.commands
53 contact = [c for c in hooke.commands
54 if c.name == 'zero block surface contact point'][0]
55 force = [c for c in hooke.commands if c.name == 'add block force array'][0]
56 cant_adjust = [c for c in hooke.commands
57 if c.name == 'add block cantilever adjusted extension array'][0]
59 outqueue = NullQueue()
61 for i in range(len(curve.data)):
62 scale(hooke, curve, block=i)
64 params = {'curve':curve, 'block':block}
66 if ('surface distance (m)' not in b.info['columns']
67 or 'surface deflection (m)' not in b.info['columns']):
69 contact.run(hooke, inqueue, outqueue, **params)
71 raise PoorFit('Could not fit %s %s: %s'
72 % (curve.path, block, str(e)))
73 if ('deflection (N)' not in b.info['columns']):
74 force.run(hooke, inqueue, outqueue, **params)
75 if ('cantilever adjusted extension (m)' not in b.info['columns']):
76 cant_adjust.run(hooke, inqueue, outqueue, **params)
80 class SurfacePositionModel (ModelFitter):
81 """Bilinear surface position model.
83 The bilinear model is symmetric, but the parameter guessing and
84 sanity checks assume the contact region occurs for lower indicies
85 ("left of") the non-contact region. We also assume that
86 tip-surface attractions produce positive deflections.
90 Algorithm borrowed from WTK's `piezo package`_, specifically
91 from :func:`piezo.z_piezo_utils.analyzeSurfPosData`.
94 http://www.physics.drexel.edu/~wking/code/git/git.php?p=piezo.git
96 Fits the data to the bilinear :method:`model`.
98 In order for this model to produce a satisfactory fit, there
99 should be enough data in the off-surface region that interactions
100 due to proteins, etc. will not seriously skew the fit in the
101 off-surface region. If you don't have much of a tail, you can set
102 the `info` dict's `ignore non-contact before index` parameter to
103 the index of the last surface- or protein-related feature.
105 def model(self, params):
106 """A continuous, bilinear model.
113 p_0 + p_1 x & \text{if $x <= p_2$}, \\
114 p_0 + p_1 p_2 + p_3 (x-p_2) & \text{if $x >= p_2$}.
117 Where :math:`p_0` is a vertical offset, :math:`p_1` is the slope
118 of the first region, :math:`p_2` is the transition location, and
119 :math:`p_3` is the slope of the second region.
121 p = params # convenient alias
122 rNC_ignore = self.info['ignore non-contact before index']
123 if self.info['force zero non-contact slope'] == True:
125 p.append(0.) # restore the non-contact slope parameter
126 r2 = numpy.round(abs(p[2]))
128 self._model_data[:r2] = p[0] + p[1] * numpy.arange(r2)
129 if r2 < len(self._data)-1:
130 self._model_data[r2:] = \
131 p[0] + p[1]*p[2] + p[3] * numpy.arange(len(self._data)-r2)
133 self._model_data[r2:rNC_ignore] = self._data[r2:rNC_ignore]
134 return self._model_data
136 def set_data(self, data, info=None, *args, **kwargs):
137 super(SurfacePositionModel, self).set_data(data, info, *args, **kwargs)
138 if self.info == None:
141 ('force zero non-contact slope', False),
142 ('ignore non-contact before index', -1),
143 ('min position', 0), # Store postions etc. to avoid recalculating.
144 ('max position', len(data)),
145 ('max deflection', data.max()),
146 ('min deflection', data.min()),
148 if key not in self.info:
149 self.info[key] = value
152 self.info['max position'] - self.info['min position']),
154 self.info['max deflection'] - self.info['min deflection']),
156 if key not in self.info:
157 self.info[key] = value
159 def guess_initial_params(self, outqueue=None):
160 """Guess the initial parameters.
164 We guess initial parameters such that the offset (:math:`p_1`)
165 matches the minimum deflection, the kink (:math:`p_2`) occurs in
166 the middle of the data, the initial (contact) slope (:math:`p_0`)
167 produces the maximum deflection at the left-most point, and the
168 final (non-contact) slope (:math:`p_3`) is zero.
170 left_offset = self.info['min deflection']
171 left_slope = 2*(self.info['deflection range']
172 /self.info['position range'])
173 kink_position = (self.info['max position']
174 +self.info['min position'])/2.0
176 self.info['guessed contact slope'] = left_slope
177 params = [left_offset, left_slope, kink_position, right_slope]
178 if self.info['force zero non-contact slope'] == True:
182 def fit(self, *args, **kwargs):
183 """Fit the model to the data.
187 We change the `epsfcn` default from :func:`scipy.optimize.leastsq`'s
188 `0` to `1e-3`, so the initial Jacobian estimate takes larger steps,
189 which helps avoid being trapped in noise-generated local minima.
191 self.info['guessed contact slope'] = None
192 if 'epsfcn' not in kwargs:
193 kwargs['epsfcn'] = 1e-3 # take big steps to estimate the Jacobian
194 params = super(SurfacePositionModel, self).fit(*args, **kwargs)
195 params[2] = abs(params[2])
196 if self.info['force zero non-contact slope'] == True:
197 params = list(params)
198 params.append(0.) # restore the non-contact slope parameter
200 # check that the fit is reasonable, see the :meth:`model` docstring
201 # for parameter descriptions. HACK: hardcoded cutoffs.
202 if abs(params[3]*10) > abs(params[1]) :
203 raise PoorFit('Slope in non-contact region, or no slope in contact')
204 if params[2] < self.info['min position']+0.02*self.info['position range']:
206 'No kink (kink %g less than %g, need more space to left)'
208 self.info['min position']+0.02*self.info['position range']))
209 if params[2] > self.info['max position']-0.02*self.info['position range']:
211 'No kink (kink %g more than %g, need more space to right)'
213 self.info['max position']-0.02*self.info['position range']))
214 if (self.info['guessed contact slope'] != None
215 and abs(params[1]) < 0.5 * abs(self.info['guessed contact slope'])):
216 raise PoorFit('Too far (contact slope %g, but expected ~%g'
217 % (params[3], self.info['guessed contact slope']))
221 class VelocityClampPlugin (Plugin):
223 super(VelocityClampPlugin, self).__init__(name='vclamp')
225 SurfaceContactCommand(self), ForceCommand(self),
226 CantileverAdjustedExtensionCommand(self), FlattenCommand(self),
229 def default_settings(self):
231 Setting(section=self.setting_section, help=self.__doc__),
232 Setting(section=self.setting_section,
233 option='surface contact point algorithm',
235 help='Select the surface contact point algorithm. See the documentation for descriptions of available algorithms.')
239 class SurfaceContactCommand (Command):
240 """Automatically determine a block's surface contact point.
242 You can select the contact point algorithm with the creatively
243 named `surface contact point algorithm` configuration setting.
244 Currently available options are:
246 * fmms (:meth:`find_contact_point_fmms`)
247 * ms (:meth:`find_contact_point_ms`)
248 * wtk (:meth:`find_contact_point_wtk`)
250 def __init__(self, plugin):
251 super(SurfaceContactCommand, self).__init__(
252 name='zero block surface contact point',
255 Argument(name='block', aliases=['set'], type='int', default=0,
257 Data block for which the force should be calculated. For an
258 approach/retract force curve, `0` selects the approaching curve and `1`
259 selects the retracting curve.
261 Argument(name='ignore index', type='int', default=None,
263 Ignore the residual from the non-contact region before the indexed
264 point (for the `wtk` algorithm).
266 Argument(name='input distance column', type='string',
267 default='z piezo (m)',
269 Name of the column to use as the surface position input.
271 Argument(name='input deflection column', type='string',
272 default='deflection (m)',
274 Name of the column to use as the deflection input.
276 Argument(name='output distance column', type='string',
277 default='surface distance',
279 Name of the column (without units) to use as the surface position output.
281 Argument(name='output deflection column', type='string',
282 default='surface deflection',
284 Name of the column (without units) to use as the deflection output.
286 Argument(name='distance info name', type='string',
287 default='surface distance offset',
289 Name (without units) for storing the distance offset in the `.info` dictionary.
291 Argument(name='deflection info name', type='string',
292 default='surface deflection offset',
294 Name (without units) for storing the deflection offset in the `.info` dictionary.
296 Argument(name='fit parameters info name', type='string',
297 default='surface deflection offset',
299 Name (without units) for storing fit parameters in the `.info` dictionary.
302 help=self.__doc__, plugin=plugin)
304 def _run(self, hooke, inqueue, outqueue, params):
305 data = params['curve'].data[params['block']]
306 # HACK? rely on params['curve'] being bound to the local hooke
307 # playlist (i.e. not a copy, as you would get by passing a
308 # curve through the queue). Ugh. Stupid queues. As an
309 # alternative, we could pass lookup information through the
311 new = Data((data.shape[0], data.shape[1]+2), dtype=data.dtype)
312 new.info = copy.deepcopy(data.info)
314 name,dist_units = split_data_label(params['input distance column'])
315 name,def_units = split_data_label(params['input deflection column'])
316 new.info['columns'].extend([
317 join_data_label(params['output distance column'], dist_units),
318 join_data_label(params['output deflection column'], def_units),
320 dist_data = data[:,data.info['columns'].index(
321 params['input distance column'])]
322 def_data = data[:,data.info['columns'].index(
323 params['input deflection column'])]
324 i,def_offset,ps = self.find_contact_point(
325 params, dist_data, def_data, outqueue)
326 dist_offset = dist_data[i]
327 new.info[join_data_label(params['distance info name'], dist_units
329 new.info[join_data_label(params['deflection info name'], def_units
331 new.info[params['fit parameters info name']] = ps
332 new[:,-2] = dist_data - dist_offset
333 new[:,-1] = def_data - def_offset
334 params['curve'].data[params['block']] = new
336 def find_contact_point(self, params, z_data, d_data, outqueue=None):
337 """Railyard for the `find_contact_point_*` family.
339 Uses the `surface contact point algorithm` configuration
340 setting to call the appropriate backend algorithm.
342 fn = getattr(self, 'find_contact_point_%s'
343 % self.plugin.config['surface contact point algorithm'])
344 return fn(params, z_data, d_data, outqueue)
346 def find_contact_point_fmms(self, params, z_data, d_data, outqueue=None):
347 """Algorithm by Francesco Musiani and Massimo Sandal.
353 0) Driver-specific workarounds, e.g. deal with the PicoForce
354 trigger bug by excluding retraction portions with excessive
356 1) Select the second half (non-contact side) of the retraction
358 2) Fit the selection to a line.
359 3) If the fit is not almost horizontal, halve the selection
361 4) Average the selection and use it as a baseline.
362 5) Slide in from the start (contact side) of the retraction
363 curve, until you find a point with greater than baseline
364 deflection. That point is the contact point.
366 if params['curve'].info['filetype'] == 'picoforce':
367 # Take care of the picoforce trigger bug (TODO: example
368 # data file demonstrating the bug). We exclude portions
369 # of the curve that have too much standard deviation.
370 # Yes, a lot of magic is here.
371 check_start = len(d_data)-len(d_data)/20
372 monster_start = len(d_data)
374 # look at the non-contact tail
375 non_monster = d_data[check_start:monster_start]
376 if non_monster.std() < 2e-10: # HACK: hardcoded cutoff
378 else: # move further away from the monster
379 check_start -= len(d_data)/50
380 monster_start -= len(d_data)/50
381 z_data = z_data[:monster_start]
382 d_data = d_data[:monster_start]
384 # take half of the thing to start
385 selection_start = len(d_data)/2
387 z_chunk = z_data[selection_start:]
388 d_chunk = d_data[selection_start:]
389 slope,intercept,r,two_tailed_prob,stderr_of_the_estimate = \
390 scipy.stats.linregress(z_chunk, d_chunk)
391 # We stop if we found an almost-horizontal fit or if we're
392 # getting to small a selection. FIXME: 0.1 and 5./6 here
393 # are "magic numbers" (although reasonable)
394 if (abs(slope) < 0.1 # deflection (m) / surface (m)
395 or selection_start > 5./6*len(d_data)):
397 selection_start += 10
399 d_baseline = d_chunk.mean()
401 # find the first point above the calculated baseline
403 while i < len(d_data) and d_data[i] < ymean:
405 return (i, d_baseline, {})
407 def find_contact_point_ms(self, params, z_data, d_data, outqueue=None):
408 """Algorithm by Massimo Sandal.
412 WTK: At least the commits are by Massimo, and I see no notes
413 attributing the algorithm to anyone else.
419 xext=raw_plot.vectors[0][0]
420 yext=raw_plot.vectors[0][1]
421 xret2=raw_plot.vectors[1][0]
422 yret=raw_plot.vectors[1][1]
424 first_point=[xext[0], yext[0]]
425 last_point=[xext[-1], yext[-1]]
427 #regr=scipy.polyfit(first_point, last_point,1)[0:2]
428 diffx=abs(first_point[0]-last_point[0])
429 diffy=abs(first_point[1]-last_point[1])
431 #using polyfit results in numerical errors. good old algebra.
433 b=first_point[1]-(a*first_point[0])
434 baseline=scipy.polyval((a,b), xext)
436 ysub=[item-basitem for item,basitem in zip(yext,baseline)]
438 contact=ysub.index(min(ysub))
440 return xext,ysub,contact
442 #now, exploit a ClickedPoint instance to calculate index...
444 dummy.absolute_coords=(x_intercept,y_intercept)
445 dummy.find_graph_coords(xret2,yret)
448 return dummy.index, regr, regr_contact
452 def find_contact_point_wtk(self, params, z_data, d_data, outqueue=None):
453 """Algorithm by W. Trevor King.
457 Uses :class:`SurfacePositionModel` internally.
459 reverse = z_data[0] > z_data[-1]
460 if reverse == True: # approaching, contact region on the right
461 d_data = d_data[::-1]
462 s = SurfacePositionModel(d_data, info={
463 'force zero non-contact slope':True},
465 if params['ignore index'] != None:
466 s.info['ignore non-contact before index'] = params['ignore index']
467 offset,contact_slope,surface_index,non_contact_slope = s.fit(
471 'contact slope': contact_slope,
472 'surface index': surface_index,
473 'non-contact slope': non_contact_slope,
476 deflection_offset = offset + contact_slope*surface_index,
478 surface_index = len(d_data)-1-surface_index
479 return (numpy.round(surface_index), deflection_offset, info)
482 class ForceCommand (Command):
483 """Convert a deflection column from meters to newtons.
485 def __init__(self, plugin):
486 super(ForceCommand, self).__init__(
487 name='add block force array',
490 Argument(name='block', aliases=['set'], type='int', default=0,
492 Data block for which the force should be calculated. For an
493 approach/retract force curve, `0` selects the approaching curve and `1`
494 selects the retracting curve.
496 Argument(name='input deflection column', type='string',
497 default='surface deflection (m)',
499 Name of the column to use as the deflection input.
501 Argument(name='output deflection column', type='string',
502 default='deflection',
504 Name of the column (without units) to use as the deflection output.
506 Argument(name='spring constant info name', type='string',
507 default='spring constant (N/m)',
509 Name of the spring constant in the `.info` dictionary.
512 help=self.__doc__, plugin=plugin)
514 def _run(self, hooke, inqueue, outqueue, params):
515 data = params['curve'].data[params['block']]
516 # HACK? rely on params['curve'] being bound to the local hooke
517 # playlist (i.e. not a copy, as you would get by passing a
518 # curve through the queue). Ugh. Stupid queues. As an
519 # alternative, we could pass lookup information through the
521 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
522 new.info = copy.deepcopy(data.info)
524 new.info['columns'].append(
525 join_data_label(params['output deflection column'], 'N'))
526 d_data = data[:,data.info['columns'].index(
527 params['input deflection column'])]
528 new[:,-1] = d_data * data.info[params['spring constant info name']]
529 params['curve'].data[params['block']] = new
532 class CantileverAdjustedExtensionCommand (Command):
533 """Remove cantilever extension from a total extension column.
535 def __init__(self, plugin):
536 super(CantileverAdjustedExtensionCommand, self).__init__(
537 name='add block cantilever adjusted extension array',
540 Argument(name='block', aliases=['set'], type='int', default=0,
542 Data block for which the adjusted extension should be calculated. For
543 an approach/retract force curve, `0` selects the approaching curve and
544 `1` selects the retracting curve.
546 Argument(name='input distance column', type='string',
547 default='surface distance (m)',
549 Name of the column to use as the distance input.
551 Argument(name='input deflection column', type='string',
552 default='deflection (N)',
554 Name of the column to use as the deflection input.
556 Argument(name='output distance column', type='string',
557 default='cantilever adjusted extension',
559 Name of the column (without units) to use as the deflection output.
561 Argument(name='spring constant info name', type='string',
562 default='spring constant (N/m)',
564 Name of the spring constant in the `.info` dictionary.
567 help=self.__doc__, plugin=plugin)
569 def _run(self, hooke, inqueue, outqueue, params):
570 data = params['curve'].data[params['block']]
571 # HACK? rely on params['curve'] being bound to the local hooke
572 # playlist (i.e. not a copy, as you would get by passing a
573 # curve through the queue). Ugh. Stupid queues. As an
574 # alternative, we could pass lookup information through the
576 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
577 new.info = copy.deepcopy(data.info)
579 new.info['columns'].append(
580 join_data_label(params['output distance column'], 'm'))
581 z_data = data[:,data.info['columns'].index(
582 params['input distance column'])]
583 d_data = data[:,data.info['columns'].index(
584 params['input deflection column'])]
585 k = data.info[params['spring constant info name']]
587 z_name,z_unit = split_data_label(params['input distance column'])
588 assert z_unit == 'm', params['input distance column']
589 d_name,d_unit = split_data_label(params['input deflection column'])
590 assert d_unit == 'N', params['input deflection column']
591 k_name,k_unit = split_data_label(params['spring constant info name'])
592 assert k_unit == 'N/m', params['spring constant info name']
594 new[:,-1] = z_data - d_data / k
595 params['curve'].data[params['block']] = new
598 class FlattenCommand (Command):
599 """Flatten a deflection column.
601 Subtracts a polynomial fit from the non-contact part of the curve
602 to flatten it. The best polynomial fit is chosen among
603 polynomials of degree 1 to `max degree`.
605 .. todo: Why does flattening use a polynomial fit and not a sinusoid?
606 Isn't most of the oscillation due to laser interference?
607 See Jaschke 1995 ( 10.1063/1.1146018 )
608 and the figure 4 caption of Weisenhorn 1992 ( 10.1103/PhysRevB.45.11226 )
610 def __init__(self, plugin):
611 super(FlattenCommand, self).__init__(
612 name='add flattened extension array',
615 Argument(name='block', aliases=['set'], type='int', default=0,
617 Data block for which the adjusted extension should be calculated. For
618 an approach/retract force curve, `0` selects the approaching curve and
619 `1` selects the retracting curve.
621 Argument(name='max degree', type='int',
624 Highest order polynomial to consider for flattening. Using values
625 greater than one usually doesn't help and can give artifacts.
626 However, it could be useful too. (TODO: Back this up with some
629 Argument(name='input distance column', type='string',
630 default='surface distance (m)',
632 Name of the column to use as the distance input.
634 Argument(name='input deflection column', type='string',
635 default='deflection (N)',
637 Name of the column to use as the deflection input.
639 Argument(name='output deflection column', type='string',
640 default='flattened deflection',
642 Name of the column (without units) to use as the deflection output.
644 Argument(name='fit info name', type='string',
645 default='flatten fit',
647 Name of the flattening information in the `.info` dictionary.
650 help=self.__doc__, plugin=plugin)
652 def _run(self, hooke, inqueue, outqueue, params):
653 data = params['curve'].data[params['block']]
654 # HACK? rely on params['curve'] being bound to the local hooke
655 # playlist (i.e. not a copy, as you would get by passing a
656 # curve through the queue). Ugh. Stupid queues. As an
657 # alternative, we could pass lookup information through the
659 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
660 new.info = copy.deepcopy(data.info)
662 z_data = data[:,data.info['columns'].index(
663 params['input distance column'])]
664 d_data = data[:,data.info['columns'].index(
665 params['input deflection column'])]
667 d_name,d_unit = split_data_label(params['input deflection column'])
668 new.info['columns'].append(
669 join_data_label(params['output deflection column'], d_unit))
671 contact_index = numpy.absolute(z_data).argmin()
673 indices = numpy.argwhere(mask)
674 z_nc = z_data[indices].flatten()
675 d_nc = d_data[indices].flatten()
678 degree = poly_values = None
679 log = logging.getLogger('hooke')
680 for deg in range(params['max degree']):
682 pv = scipy.polyfit(z_nc, d_nc, deg)
683 df = scipy.polyval(pv, z_nc)
684 err = numpy.sqrt((df-d_nc)**2).sum()
686 log.warn('failed to flatten with a degree %d polynomial: %s'
689 if err < min_err: # new best fit
695 raise Failure('failed to flatten with all degrees')
696 new.info[params['fit info name']] = {
697 'error':min_err/len(z_nc),
699 'max degree':params['max degree'],
700 'polynomial values':poly_values,
702 new[:,-1] = d_data - mask*scipy.polyval(poly_values, z_data)
703 params['curve'].data[params['block']] = new