1 # Copyright (C) 2008-2010 Alberto Gomez-Casado
4 # Massimo Sandal <devicerandom@gmail.com>
5 # W. Trevor King <wking@drexel.edu>
7 # This file is part of Hooke.
9 # Hooke is free software: you can redistribute it and/or modify it
10 # under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, either version 3 of the
12 # License, or (at your option) any later version.
14 # Hooke is distributed in the hope that it will be useful, but WITHOUT
15 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
16 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
17 # Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with Hooke. If not, see
21 # <http://www.gnu.org/licenses/>.
23 """The ``vclamp`` module provides :class:`VelocityClampPlugin` and
24 several associated :class:`hooke.command.Command`\s for handling
25 common velocity clamp analysis tasks.
34 from ..command import Command, Argument, Failure, NullQueue
35 from ..config import Setting
36 from ..curve import Data
37 from ..plugin import Plugin
38 from ..util.fit import PoorFit, ModelFitter
39 from ..util.si import join_data_label, split_data_label
40 from .curve import CurveArgument
43 def scale(hooke, curve, block=None):
44 """Run 'add block force array' on `block`.
46 Runs 'zero block surface contact point' first, if necessary. Does
47 not run either command if the columns they add to the block are
50 If `block` is `None`, scale all blocks in `curve`.
52 commands = hooke.commands
53 contact = [c for c in hooke.commands
54 if c.name == 'zero block surface contact point'][0]
55 force = [c for c in hooke.commands if c.name == 'add block force array'][0]
56 cant_adjust = [c for c in hooke.commands
57 if c.name == 'add block cantilever adjusted extension array'][0]
59 outqueue = NullQueue()
61 for i in range(len(curve.data)):
62 scale(hooke, curve, block=i)
64 params = {'curve':curve, 'block':block}
66 if ('surface distance (m)' not in b.info['columns']
67 or 'surface deflection (m)' not in b.info['columns']):
69 contact.run(hooke, inqueue, outqueue, **params)
71 raise PoorFit('Could not fit %s %s: %s'
72 % (curve.path, block, str(e)))
73 if ('deflection (N)' not in b.info['columns']):
74 force.run(hooke, inqueue, outqueue, **params)
75 if ('cantilever adjusted extension (m)' not in b.info['columns']):
76 cant_adjust.run(hooke, inqueue, outqueue, **params)
79 class SurfacePositionModel (ModelFitter):
80 """Bilinear surface position model.
82 The bilinear model is symmetric, but the parameter guessing and
83 sanity checks assume the contact region occurs for lower indicies
84 ("left of") the non-contact region. We also assume that
85 tip-surface attractions produce positive deflections.
89 Algorithm borrowed from WTK's `piezo package`_, specifically
90 from :func:`piezo.z_piezo_utils.analyzeSurfPosData`.
93 http://www.physics.drexel.edu/~wking/code/git/git.php?p=piezo.git
95 Fits the data to the bilinear :method:`model`.
97 In order for this model to produce a satisfactory fit, there
98 should be enough data in the off-surface region that interactions
99 due to proteins, etc. will not seriously skew the fit in the
102 def model(self, params):
103 """A continuous, bilinear model.
110 p_0 + p_1 x & \text{if $x <= p_2$}, \\
111 p_0 + p_1 p_2 + p_3 (x-p_2) & \text{if $x >= p_2$}.
114 Where :math:`p_0` is a vertical offset, :math:`p_1` is the slope
115 of the first region, :math:`p_2` is the transition location, and
116 :math:`p_3` is the slope of the second region.
118 p = params # convenient alias
119 if self.info.get('force zero non-contact slope', None) == True:
121 p.append(0.) # restore the non-contact slope parameter
122 r2 = numpy.round(abs(p[2]))
124 self._model_data[:r2] = p[0] + p[1] * numpy.arange(r2)
125 if r2 < len(self._data)-1:
126 self._model_data[r2:] = \
127 p[0] + p[1]*p[2] + p[3] * numpy.arange(len(self._data)-r2)
128 return self._model_data
130 def set_data(self, data, info=None):
131 super(SurfacePositionModel, self).set_data(data, info)
135 self.info['min position'] = 0
136 self.info['max position'] = len(data)
137 self.info['max deflection'] = data.max()
138 self.info['min deflection'] = data.min()
139 self.info['position range'] = self.info['max position'] - self.info['min position']
140 self.info['deflection range'] = self.info['max deflection'] - self.info['min deflection']
142 def guess_initial_params(self, outqueue=None):
143 """Guess the initial parameters.
147 We guess initial parameters such that the offset (:math:`p_1`)
148 matches the minimum deflection, the kink (:math:`p_2`) occurs in
149 the middle of the data, the initial (contact) slope (:math:`p_0`)
150 produces the maximum deflection at the left-most point, and the
151 final (non-contact) slope (:math:`p_3`) is zero.
153 left_offset = self.info['min deflection']
154 left_slope = 2*(self.info['deflection range']
155 /self.info['position range'])
156 kink_position = (self.info['max position']
157 +self.info['min position'])/2.0
159 self.info['guessed contact slope'] = left_slope
160 params = [left_offset, left_slope, kink_position, right_slope]
161 if self.info.get('force zero non-contact slope', None) == True:
165 def guess_scale(self, params, outqueue=None):
166 """Guess the parameter scales.
170 We guess offset scale (:math:`p_0`) as one tenth of the total
171 deflection range, the kink scale (:math:`p_2`) as one tenth of
172 the total index range, the initial (contact) slope scale
173 (:math:`p_1`) as one tenth of the contact slope estimation,
174 and the final (non-contact) slope scale (:math:`p_3`) is as
175 one tenth of the initial slope scale.
177 offset_scale = self.info['deflection range']/10.
178 left_slope_scale = abs(params[1])/10.
179 kink_scale = self.info['position range']/10.
180 right_slope_scale = left_slope_scale/10.
181 scale = [offset_scale, left_slope_scale, kink_scale, right_slope_scale]
182 if self.info.get('force zero non-contact slope', None) == True:
186 def fit(self, *args, **kwargs):
187 self.info['guessed contact slope'] = None
188 params = super(SurfacePositionModel, self).fit(*args, **kwargs)
189 params[2] = abs(params[2])
190 if self.info.get('force zero non-contact slope', None) == True:
191 params = list(params)
192 params.append(0.) # restore the non-contact slope parameter
194 # check that the fit is reasonable, see the :meth:`model` docstring
195 # for parameter descriptions. HACK: hardcoded cutoffs.
196 if abs(params[3]*10) > abs(params[1]) :
197 raise PoorFit('Slope in non-contact region, or no slope in contact')
198 if params[2] < self.info['min position']+0.02*self.info['position range']:
200 'No kink (kink %g less than %g, need more space to left)'
202 self.info['min position']+0.02*self.info['position range']))
203 if params[2] > self.info['max position']-0.02*self.info['position range']:
205 'No kink (kink %g more than %g, need more space to right)'
207 self.info['max position']-0.02*self.info['position range']))
208 if (self.info['guessed contact slope'] != None
209 and abs(params[1]) < 0.5 * abs(self.info['guessed contact slope'])):
210 raise PoorFit('Too far (contact slope %g, but expected ~%g'
211 % (params[3], self.info['guessed contact slope']))
214 class VelocityClampPlugin (Plugin):
216 super(VelocityClampPlugin, self).__init__(name='vclamp')
218 SurfaceContactCommand(self), ForceCommand(self),
219 CantileverAdjustedExtensionCommand(self), FlattenCommand(self),
222 def default_settings(self):
224 Setting(section=self.setting_section, help=self.__doc__),
225 Setting(section=self.setting_section,
226 option='surface contact point algorithm',
228 help='Select the surface contact point algorithm. See the documentation for descriptions of available algorithms.')
232 class SurfaceContactCommand (Command):
233 """Automatically determine a block's surface contact point.
235 You can select the contact point algorithm with the creatively
236 named `surface contact point algorithm` configuration setting.
237 Currently available options are:
239 * fmms (:meth:`find_contact_point_fmms`)
240 * ms (:meth:`find_contact_point_ms`)
241 * wtk (:meth:`find_contact_point_wtk`)
243 def __init__(self, plugin):
244 super(SurfaceContactCommand, self).__init__(
245 name='zero block surface contact point',
248 Argument(name='block', aliases=['set'], type='int', default=0,
250 Data block for which the force should be calculated. For an
251 approach/retract force curve, `0` selects the approaching curve and `1`
252 selects the retracting curve.
254 Argument(name='input distance column', type='string',
255 default='z piezo (m)',
257 Name of the column to use as the surface positioning input.
259 Argument(name='input deflection column', type='string',
260 default='deflection (m)',
262 Name of the column to use as the deflection input.
264 Argument(name='output distance column', type='string',
265 default='surface distance',
267 Name of the column (without units) to use as the surface positioning output.
269 Argument(name='output deflection column', type='string',
270 default='surface deflection',
272 Name of the column (without units) to use as the deflection output.
274 Argument(name='distance info name', type='string',
275 default='surface distance offset',
277 Name (without units) for storing the distance offset in the `.info` dictionary.
279 Argument(name='deflection info name', type='string',
280 default='surface deflection offset',
282 Name (without units) for storing the deflection offset in the `.info` dictionary.
284 Argument(name='fit parameters info name', type='string',
285 default='surface deflection offset',
287 Name (without units) for storing fit parameters in the `.info` dictionary.
290 help=self.__doc__, plugin=plugin)
292 def _run(self, hooke, inqueue, outqueue, params):
293 data = params['curve'].data[params['block']]
294 # HACK? rely on params['curve'] being bound to the local hooke
295 # playlist (i.e. not a copy, as you would get by passing a
296 # curve through the queue). Ugh. Stupid queues. As an
297 # alternative, we could pass lookup information through the
299 new = Data((data.shape[0], data.shape[1]+2), dtype=data.dtype)
300 new.info = copy.deepcopy(data.info)
302 name,dist_units = split_data_label(params['input distance column'])
303 name,def_units = split_data_label(params['input deflection column'])
304 new.info['columns'].extend([
305 join_data_label(params['output distance column'], dist_units),
306 join_data_label(params['output deflection column'], def_units),
308 dist_data = data[:,data.info['columns'].index(
309 params['input distance column'])]
310 def_data = data[:,data.info['columns'].index(
311 params['input deflection column'])]
312 i,def_offset,ps = self.find_contact_point(
313 params['curve'], dist_data, def_data, outqueue)
314 dist_offset = dist_data[i]
315 new.info[join_data_label(params['distance info name'], dist_units
317 new.info[join_data_label(params['deflection info name'], def_units
319 new.info[params['fit parameters info name']] = ps
320 new[:,-2] = dist_data - dist_offset
321 new[:,-1] = def_data - def_offset
322 params['curve'].data[params['block']] = new
324 def find_contact_point(self, curve, z_data, d_data, outqueue=None):
325 """Railyard for the `find_contact_point_*` family.
327 Uses the `surface contact point algorithm` configuration
328 setting to call the appropriate backend algorithm.
330 fn = getattr(self, 'find_contact_point_%s'
331 % self.plugin.config['surface contact point algorithm'])
332 return fn(curve, z_data, d_data, outqueue)
334 def find_contact_point_fmms(self, curve, z_data, d_data, outqueue=None):
335 """Algorithm by Francesco Musiani and Massimo Sandal.
341 0) Driver-specific workarounds, e.g. deal with the PicoForce
342 trigger bug by excluding retraction portions with excessive
344 1) Select the second half (non-contact side) of the retraction
346 2) Fit the selection to a line.
347 3) If the fit is not almost horizontal, halve the selection
349 4) Average the selection and use it as a baseline.
350 5) Slide in from the start (contact side) of the retraction
351 curve, until you find a point with greater than baseline
352 deflection. That point is the contact point.
354 if curve.info['filetype'] == 'picoforce':
355 # Take care of the picoforce trigger bug (TODO: example
356 # data file demonstrating the bug). We exclude portions
357 # of the curve that have too much standard deviation.
358 # Yes, a lot of magic is here.
359 check_start = len(d_data)-len(d_data)/20
360 monster_start = len(d_data)
362 # look at the non-contact tail
363 non_monster = d_data[check_start:monster_start]
364 if non_monster.std() < 2e-10: # HACK: hardcoded cutoff
366 else: # move further away from the monster
367 check_start -= len(d_data)/50
368 monster_start -= len(d_data)/50
369 z_data = z_data[:monster_start]
370 d_data = d_data[:monster_start]
372 # take half of the thing to start
373 selection_start = len(d_data)/2
375 z_chunk = z_data[selection_start:]
376 d_chunk = d_data[selection_start:]
377 slope,intercept,r,two_tailed_prob,stderr_of_the_estimate = \
378 scipy.stats.linregress(z_chunk, d_chunk)
379 # We stop if we found an almost-horizontal fit or if we're
380 # getting to small a selection. FIXME: 0.1 and 5./6 here
381 # are "magic numbers" (although reasonable)
382 if (abs(slope) < 0.1 # deflection (m) / surface (m)
383 or selection_start > 5./6*len(d_data)):
385 selection_start += 10
387 d_baseline = d_chunk.mean()
389 # find the first point above the calculated baseline
391 while i < len(d_data) and d_data[i] < ymean:
393 return (i, d_baseline, {})
395 def find_contact_point_ms(self, curve, z_data, d_data, outqueue=None):
396 """Algorithm by Massimo Sandal.
400 WTK: At least the commits are by Massimo, and I see no notes
401 attributing the algorithm to anyone else.
407 xext=raw_plot.vectors[0][0]
408 yext=raw_plot.vectors[0][1]
409 xret2=raw_plot.vectors[1][0]
410 yret=raw_plot.vectors[1][1]
412 first_point=[xext[0], yext[0]]
413 last_point=[xext[-1], yext[-1]]
415 #regr=scipy.polyfit(first_point, last_point,1)[0:2]
416 diffx=abs(first_point[0]-last_point[0])
417 diffy=abs(first_point[1]-last_point[1])
419 #using polyfit results in numerical errors. good old algebra.
421 b=first_point[1]-(a*first_point[0])
422 baseline=scipy.polyval((a,b), xext)
424 ysub=[item-basitem for item,basitem in zip(yext,baseline)]
426 contact=ysub.index(min(ysub))
428 return xext,ysub,contact
430 #now, exploit a ClickedPoint instance to calculate index...
432 dummy.absolute_coords=(x_intercept,y_intercept)
433 dummy.find_graph_coords(xret2,yret)
436 return dummy.index, regr, regr_contact
440 def find_contact_point_wtk(self, curve, z_data, d_data, outqueue=None):
441 """Algorithm by W. Trevor King.
445 Uses :func:`analyze_surf_pos_data_wtk` internally.
447 reverse = z_data[0] > z_data[-1]
448 if reverse == True: # approaching, contact region on the right
449 d_data = d_data[::-1]
450 s = SurfacePositionModel(d_data)
451 s.info['force zero non-contact slope'] = True
452 offset,contact_slope,surface_index,non_contact_slope = s.fit(
456 'contact slope': contact_slope,
457 'surface index': surface_index,
458 'non-contact slope': non_contact_slope,
461 deflection_offset = offset + contact_slope*surface_index,
463 surface_index = len(d_data)-1-surface_index
464 return (numpy.round(surface_index), deflection_offset, info)
467 class ForceCommand (Command):
468 """Convert a deflection column from meters to newtons.
470 def __init__(self, plugin):
471 super(ForceCommand, self).__init__(
472 name='add block force array',
475 Argument(name='block', aliases=['set'], type='int', default=0,
477 Data block for which the force should be calculated. For an
478 approach/retract force curve, `0` selects the approaching curve and `1`
479 selects the retracting curve.
481 Argument(name='input deflection column', type='string',
482 default='surface deflection (m)',
484 Name of the column to use as the deflection input.
486 Argument(name='output deflection column', type='string',
487 default='deflection',
489 Name of the column (without units) to use as the deflection output.
491 Argument(name='spring constant info name', type='string',
492 default='spring constant (N/m)',
494 Name of the spring constant in the `.info` dictionary.
497 help=self.__doc__, plugin=plugin)
499 def _run(self, hooke, inqueue, outqueue, params):
500 data = params['curve'].data[params['block']]
501 # HACK? rely on params['curve'] being bound to the local hooke
502 # playlist (i.e. not a copy, as you would get by passing a
503 # curve through the queue). Ugh. Stupid queues. As an
504 # alternative, we could pass lookup information through the
506 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
507 new.info = copy.deepcopy(data.info)
509 new.info['columns'].append(
510 join_data_label(params['output deflection column'], 'N'))
511 d_data = data[:,data.info['columns'].index(
512 params['input deflection column'])]
513 new[:,-1] = d_data * data.info[params['spring constant info name']]
514 params['curve'].data[params['block']] = new
517 class CantileverAdjustedExtensionCommand (Command):
518 """Remove cantilever extension from a total extension column.
520 def __init__(self, plugin):
521 super(CantileverAdjustedExtensionCommand, self).__init__(
522 name='add block cantilever adjusted extension array',
525 Argument(name='block', aliases=['set'], type='int', default=0,
527 Data block for which the adjusted extension should be calculated. For
528 an approach/retract force curve, `0` selects the approaching curve and
529 `1` selects the retracting curve.
531 Argument(name='input distance column', type='string',
532 default='surface distance (m)',
534 Name of the column to use as the distance input.
536 Argument(name='input deflection column', type='string',
537 default='deflection (N)',
539 Name of the column to use as the deflection input.
541 Argument(name='output distance column', type='string',
542 default='cantilever adjusted extension',
544 Name of the column (without units) to use as the deflection output.
546 Argument(name='spring constant info name', type='string',
547 default='spring constant (N/m)',
549 Name of the spring constant in the `.info` dictionary.
552 help=self.__doc__, plugin=plugin)
554 def _run(self, hooke, inqueue, outqueue, params):
555 data = params['curve'].data[params['block']]
556 # HACK? rely on params['curve'] being bound to the local hooke
557 # playlist (i.e. not a copy, as you would get by passing a
558 # curve through the queue). Ugh. Stupid queues. As an
559 # alternative, we could pass lookup information through the
561 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
562 new.info = copy.deepcopy(data.info)
564 new.info['columns'].append(
565 join_data_label(params['output distance column'], 'm'))
566 z_data = data[:,data.info['columns'].index(
567 params['input distance column'])]
568 d_data = data[:,data.info['columns'].index(
569 params['input deflection column'])]
570 k = data.info[params['spring constant info name']]
572 z_name,z_unit = split_data_label(params['input distance column'])
573 assert z_unit == 'm', params['input distance column']
574 d_name,d_unit = split_data_label(params['input deflection column'])
575 assert d_unit == 'N', params['input deflection column']
576 k_name,k_unit = split_data_label(params['spring constant info name'])
577 assert k_unit == 'N/m', params['spring constant info name']
579 new[:,-1] = z_data - d_data / k
580 params['curve'].data[params['block']] = new
583 class FlattenCommand (Command):
584 """Flatten a deflection column.
586 Subtracts a polynomial fit from the non-contact part of the curve
587 to flatten it. The best polynomial fit is chosen among
588 polynomials of degree 1 to `max degree`.
590 .. todo: Why does flattening use a polynomial fit and not a sinusoid?
591 Isn't most of the oscillation due to laser interference?
592 See Jaschke 1995 ( 10.1063/1.1146018 )
593 and the figure 4 caption of Weisenhorn 1992 ( 10.1103/PhysRevB.45.11226 )
595 def __init__(self, plugin):
596 super(FlattenCommand, self).__init__(
597 name='add flattened extension array',
600 Argument(name='block', aliases=['set'], type='int', default=0,
602 Data block for which the adjusted extension should be calculated. For
603 an approach/retract force curve, `0` selects the approaching curve and
604 `1` selects the retracting curve.
606 Argument(name='max degree', type='int',
609 Highest order polynomial to consider for flattening. Using values
610 greater than one usually doesn't help and can give artifacts.
611 However, it could be useful too. (TODO: Back this up with some
614 Argument(name='input distance column', type='string',
615 default='surface distance (m)',
617 Name of the column to use as the distance input.
619 Argument(name='input deflection column', type='string',
620 default='deflection (N)',
622 Name of the column to use as the deflection input.
624 Argument(name='output deflection column', type='string',
625 default='flattened deflection',
627 Name of the column (without units) to use as the deflection output.
629 Argument(name='fit info name', type='string',
630 default='flatten fit',
632 Name of the flattening information in the `.info` dictionary.
635 help=self.__doc__, plugin=plugin)
637 def _run(self, hooke, inqueue, outqueue, params):
638 data = params['curve'].data[params['block']]
639 # HACK? rely on params['curve'] being bound to the local hooke
640 # playlist (i.e. not a copy, as you would get by passing a
641 # curve through the queue). Ugh. Stupid queues. As an
642 # alternative, we could pass lookup information through the
644 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
645 new.info = copy.deepcopy(data.info)
647 z_data = data[:,data.info['columns'].index(
648 params['input distance column'])]
649 d_data = data[:,data.info['columns'].index(
650 params['input deflection column'])]
652 d_name,d_unit = split_data_label(params['input deflection column'])
653 new.info['columns'].append(
654 join_data_label(params['output deflection column'], d_unit))
656 contact_index = numpy.absolute(z_data).argmin()
658 indices = numpy.argwhere(mask)
659 z_nc = z_data[indices].flatten()
660 d_nc = d_data[indices].flatten()
663 degree = poly_values = None
664 log = logging.getLogger('hooke')
665 for deg in range(params['max degree']):
667 pv = scipy.polyfit(z_nc, d_nc, deg)
668 df = scipy.polyval(pv, z_nc)
669 err = numpy.sqrt((df-d_nc)**2).sum()
671 log.warn('failed to flatten with a degree %d polynomial: %s'
674 if err < min_err: # new best fit
680 raise Failure('failed to flatten with all degrees')
681 new.info[params['fit info name']] = {
682 'error':min_err/len(z_nc),
684 'max degree':params['max degree'],
685 'polynomial values':poly_values,
687 new[:,-1] = d_data - mask*scipy.polyval(poly_values, z_data)
688 params['curve'].data[params['block']] = new