1 # Copyright (C) 2008-2010 Alberto Gomez-Casado
4 # Massimo Sandal <devicerandom@gmail.com>
5 # W. Trevor King <wking@drexel.edu>
7 # This file is part of Hooke.
9 # Hooke is free software: you can redistribute it and/or modify it
10 # under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, either version 3 of the
12 # License, or (at your option) any later version.
14 # Hooke is distributed in the hope that it will be useful, but WITHOUT
15 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
16 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
17 # Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with Hooke. If not, see
21 # <http://www.gnu.org/licenses/>.
23 """The ``vclamp`` module provides :class:`VelocityClampPlugin` and
24 several associated :class:`hooke.command.Command`\s for handling
25 common velocity clamp analysis tasks.
34 from ..command import Command, Argument, Failure, NullQueue
35 from ..config import Setting
36 from ..curve import Data
37 from ..plugin import Plugin
38 from ..util.fit import PoorFit, ModelFitter
39 from ..util.si import join_data_label, split_data_label
40 from .curve import CurveArgument
43 def scale(hooke, curve, block=None):
44 """Run 'add block force array' on `block`.
46 Runs 'zero block surface contact point' first, if necessary. Does
47 not run either command if the columns they add to the block are
50 If `block` is `None`, scale all blocks in `curve`.
52 commands = hooke.commands
53 contact = [c for c in hooke.commands
54 if c.name == 'zero block surface contact point'][0]
55 force = [c for c in hooke.commands if c.name == 'add block force array'][0]
56 cant_adjust = [c for c in hooke.commands
57 if c.name == 'add block cantilever adjusted extension array'][0]
59 outqueue = NullQueue()
61 for i in range(len(curve.data)):
62 scale(hooke, curve, block=i)
64 params = {'curve':curve, 'block':block}
66 if ('surface distance (m)' not in b.info['columns']
67 or 'surface deflection (m)' not in b.info['columns']):
69 contact.run(hooke, inqueue, outqueue, **params)
71 raise PoorFit('Could not fit %s %s: %s'
72 % (curve.path, block, str(e)))
73 if ('deflection (N)' not in b.info['columns']):
74 force.run(hooke, inqueue, outqueue, **params)
75 if ('cantilever adjusted extension (m)' not in b.info['columns']):
76 cant_adjust.run(hooke, inqueue, outqueue, **params)
80 class SurfacePositionModel (ModelFitter):
81 """Bilinear surface position model.
83 The bilinear model is symmetric, but the parameter guessing and
84 sanity checks assume the contact region occurs for lower indicies
85 ("left of") the non-contact region. We also assume that
86 tip-surface attractions produce positive deflections.
90 Algorithm borrowed from WTK's `piezo package`_, specifically
91 from :func:`piezo.z_piezo_utils.analyzeSurfPosData`.
94 http://www.physics.drexel.edu/~wking/code/git/git.php?p=piezo.git
96 Fits the data to the bilinear :method:`model`.
98 In order for this model to produce a satisfactory fit, there
99 should be enough data in the off-surface region that interactions
100 due to proteins, etc. will not seriously skew the fit in the
101 off-surface region. If you don't have much of a tail, you can set
102 the `info` dict's `ignore non-contact before index` parameter to
103 the index of the last surface- or protein-related feature.
105 def model(self, params):
106 """A continuous, bilinear model.
113 p_0 + p_1 x & \text{if $x <= p_2$}, \\
114 p_0 + p_1 p_2 + p_3 (x-p_2) & \text{if $x >= p_2$}.
117 Where :math:`p_0` is a vertical offset, :math:`p_1` is the slope
118 of the first region, :math:`p_2` is the transition location, and
119 :math:`p_3` is the slope of the second region.
121 p = params # convenient alias
122 rNC_ignore = self.info['ignore non-contact before index']
123 if self.info['force zero non-contact slope'] == True:
125 p.append(0.) # restore the non-contact slope parameter
126 r2 = numpy.round(abs(p[2]))
128 self._model_data[:r2] = p[0] + p[1] * numpy.arange(r2)
129 if r2 < len(self._data)-1:
130 self._model_data[r2:] = \
131 p[0] + p[1]*p[2] + p[3] * numpy.arange(len(self._data)-r2)
133 self._model_data[r2:rNC_ignore] = self._data[r2:rNC_ignore]
134 return self._model_data
136 def set_data(self, data, info=None, *args, **kwargs):
137 super(SurfacePositionModel, self).set_data(data, info, *args, **kwargs)
138 if self.info == None:
141 ('force zero non-contact slope', False),
142 ('ignore non-contact before index', 6158),
143 ('min position', 0), # Store postions etc. to avoid recalculating.
144 ('max position', len(data)),
145 ('max deflection', data.max()),
146 ('min deflection', data.min()),
148 if key not in self.info:
149 self.info[key] = value
152 self.info['max position'] - self.info['min position']),
154 self.info['max deflection'] - self.info['min deflection']),
156 if key not in self.info:
157 self.info[key] = value
159 def guess_initial_params(self, outqueue=None):
160 """Guess the initial parameters.
164 We guess initial parameters such that the offset (:math:`p_1`)
165 matches the minimum deflection, the kink (:math:`p_2`) occurs in
166 the middle of the data, the initial (contact) slope (:math:`p_0`)
167 produces the maximum deflection at the left-most point, and the
168 final (non-contact) slope (:math:`p_3`) is zero.
170 left_offset = self.info['min deflection']
171 left_slope = 2*(self.info['deflection range']
172 /self.info['position range'])
173 kink_position = (self.info['max position']
174 +self.info['min position'])/2.0
176 self.info['guessed contact slope'] = left_slope
177 params = [left_offset, left_slope, kink_position, right_slope]
178 if self.info['force zero non-contact slope'] == True:
182 def fit(self, *args, **kwargs):
183 """Fit the model to the data.
187 We change the `epsfcn` default from :func:`scipy.optimize.leastsq`'s
188 `0` to `1e-3`, so the initial Jacobian estimate takes larger steps,
189 which helps avoid being trapped in noise-generated local minima.
191 self.info['guessed contact slope'] = None
192 if 'epsfcn' not in kwargs:
193 kwargs['epsfcn'] = 1e-3 # take big steps to estimate the Jacobian
194 params = super(SurfacePositionModel, self).fit(*args, **kwargs)
195 params[2] = abs(params[2])
196 if self.info['force zero non-contact slope'] == True:
197 params = list(params)
198 params.append(0.) # restore the non-contact slope parameter
200 # check that the fit is reasonable, see the :meth:`model` docstring
201 # for parameter descriptions. HACK: hardcoded cutoffs.
202 if abs(params[3]*10) > abs(params[1]) :
203 raise PoorFit('Slope in non-contact region, or no slope in contact')
204 if params[2] < self.info['min position']+0.02*self.info['position range']:
206 'No kink (kink %g less than %g, need more space to left)'
208 self.info['min position']+0.02*self.info['position range']))
209 if params[2] > self.info['max position']-0.02*self.info['position range']:
211 'No kink (kink %g more than %g, need more space to right)'
213 self.info['max position']-0.02*self.info['position range']))
214 if (self.info['guessed contact slope'] != None
215 and abs(params[1]) < 0.5 * abs(self.info['guessed contact slope'])):
216 raise PoorFit('Too far (contact slope %g, but expected ~%g'
217 % (params[3], self.info['guessed contact slope']))
221 class VelocityClampPlugin (Plugin):
223 super(VelocityClampPlugin, self).__init__(name='vclamp')
225 SurfaceContactCommand(self), ForceCommand(self),
226 CantileverAdjustedExtensionCommand(self), FlattenCommand(self),
229 def default_settings(self):
231 Setting(section=self.setting_section, help=self.__doc__),
232 Setting(section=self.setting_section,
233 option='surface contact point algorithm',
235 help='Select the surface contact point algorithm. See the documentation for descriptions of available algorithms.')
239 class SurfaceContactCommand (Command):
240 """Automatically determine a block's surface contact point.
242 You can select the contact point algorithm with the creatively
243 named `surface contact point algorithm` configuration setting.
244 Currently available options are:
246 * fmms (:meth:`find_contact_point_fmms`)
247 * ms (:meth:`find_contact_point_ms`)
248 * wtk (:meth:`find_contact_point_wtk`)
250 def __init__(self, plugin):
251 super(SurfaceContactCommand, self).__init__(
252 name='zero block surface contact point',
255 Argument(name='block', aliases=['set'], type='int', default=0,
257 Data block for which the force should be calculated. For an
258 approach/retract force curve, `0` selects the approaching curve and `1`
259 selects the retracting curve.
261 Argument(name='input distance column', type='string',
262 default='z piezo (m)',
264 Name of the column to use as the surface position input.
266 Argument(name='input deflection column', type='string',
267 default='deflection (m)',
269 Name of the column to use as the deflection input.
271 Argument(name='output distance column', type='string',
272 default='surface distance',
274 Name of the column (without units) to use as the surface position output.
276 Argument(name='output deflection column', type='string',
277 default='surface deflection',
279 Name of the column (without units) to use as the deflection output.
281 Argument(name='distance info name', type='string',
282 default='surface distance offset',
284 Name (without units) for storing the distance offset in the `.info` dictionary.
286 Argument(name='deflection info name', type='string',
287 default='surface deflection offset',
289 Name (without units) for storing the deflection offset in the `.info` dictionary.
291 Argument(name='fit parameters info name', type='string',
292 default='surface deflection offset',
294 Name (without units) for storing fit parameters in the `.info` dictionary.
297 help=self.__doc__, plugin=plugin)
299 def _run(self, hooke, inqueue, outqueue, params):
300 data = params['curve'].data[params['block']]
301 # HACK? rely on params['curve'] being bound to the local hooke
302 # playlist (i.e. not a copy, as you would get by passing a
303 # curve through the queue). Ugh. Stupid queues. As an
304 # alternative, we could pass lookup information through the
306 new = Data((data.shape[0], data.shape[1]+2), dtype=data.dtype)
307 new.info = copy.deepcopy(data.info)
309 name,dist_units = split_data_label(params['input distance column'])
310 name,def_units = split_data_label(params['input deflection column'])
311 new.info['columns'].extend([
312 join_data_label(params['output distance column'], dist_units),
313 join_data_label(params['output deflection column'], def_units),
315 dist_data = data[:,data.info['columns'].index(
316 params['input distance column'])]
317 def_data = data[:,data.info['columns'].index(
318 params['input deflection column'])]
319 i,def_offset,ps = self.find_contact_point(
320 params['curve'], dist_data, def_data, outqueue)
321 dist_offset = dist_data[i]
322 new.info[join_data_label(params['distance info name'], dist_units
324 new.info[join_data_label(params['deflection info name'], def_units
326 new.info[params['fit parameters info name']] = ps
327 new[:,-2] = dist_data - dist_offset
328 new[:,-1] = def_data - def_offset
329 params['curve'].data[params['block']] = new
331 def find_contact_point(self, curve, z_data, d_data, outqueue=None):
332 """Railyard for the `find_contact_point_*` family.
334 Uses the `surface contact point algorithm` configuration
335 setting to call the appropriate backend algorithm.
337 fn = getattr(self, 'find_contact_point_%s'
338 % self.plugin.config['surface contact point algorithm'])
339 return fn(curve, z_data, d_data, outqueue)
341 def find_contact_point_fmms(self, curve, z_data, d_data, outqueue=None):
342 """Algorithm by Francesco Musiani and Massimo Sandal.
348 0) Driver-specific workarounds, e.g. deal with the PicoForce
349 trigger bug by excluding retraction portions with excessive
351 1) Select the second half (non-contact side) of the retraction
353 2) Fit the selection to a line.
354 3) If the fit is not almost horizontal, halve the selection
356 4) Average the selection and use it as a baseline.
357 5) Slide in from the start (contact side) of the retraction
358 curve, until you find a point with greater than baseline
359 deflection. That point is the contact point.
361 if curve.info['filetype'] == 'picoforce':
362 # Take care of the picoforce trigger bug (TODO: example
363 # data file demonstrating the bug). We exclude portions
364 # of the curve that have too much standard deviation.
365 # Yes, a lot of magic is here.
366 check_start = len(d_data)-len(d_data)/20
367 monster_start = len(d_data)
369 # look at the non-contact tail
370 non_monster = d_data[check_start:monster_start]
371 if non_monster.std() < 2e-10: # HACK: hardcoded cutoff
373 else: # move further away from the monster
374 check_start -= len(d_data)/50
375 monster_start -= len(d_data)/50
376 z_data = z_data[:monster_start]
377 d_data = d_data[:monster_start]
379 # take half of the thing to start
380 selection_start = len(d_data)/2
382 z_chunk = z_data[selection_start:]
383 d_chunk = d_data[selection_start:]
384 slope,intercept,r,two_tailed_prob,stderr_of_the_estimate = \
385 scipy.stats.linregress(z_chunk, d_chunk)
386 # We stop if we found an almost-horizontal fit or if we're
387 # getting to small a selection. FIXME: 0.1 and 5./6 here
388 # are "magic numbers" (although reasonable)
389 if (abs(slope) < 0.1 # deflection (m) / surface (m)
390 or selection_start > 5./6*len(d_data)):
392 selection_start += 10
394 d_baseline = d_chunk.mean()
396 # find the first point above the calculated baseline
398 while i < len(d_data) and d_data[i] < ymean:
400 return (i, d_baseline, {})
402 def find_contact_point_ms(self, curve, z_data, d_data, outqueue=None):
403 """Algorithm by Massimo Sandal.
407 WTK: At least the commits are by Massimo, and I see no notes
408 attributing the algorithm to anyone else.
414 xext=raw_plot.vectors[0][0]
415 yext=raw_plot.vectors[0][1]
416 xret2=raw_plot.vectors[1][0]
417 yret=raw_plot.vectors[1][1]
419 first_point=[xext[0], yext[0]]
420 last_point=[xext[-1], yext[-1]]
422 #regr=scipy.polyfit(first_point, last_point,1)[0:2]
423 diffx=abs(first_point[0]-last_point[0])
424 diffy=abs(first_point[1]-last_point[1])
426 #using polyfit results in numerical errors. good old algebra.
428 b=first_point[1]-(a*first_point[0])
429 baseline=scipy.polyval((a,b), xext)
431 ysub=[item-basitem for item,basitem in zip(yext,baseline)]
433 contact=ysub.index(min(ysub))
435 return xext,ysub,contact
437 #now, exploit a ClickedPoint instance to calculate index...
439 dummy.absolute_coords=(x_intercept,y_intercept)
440 dummy.find_graph_coords(xret2,yret)
443 return dummy.index, regr, regr_contact
447 def find_contact_point_wtk(self, curve, z_data, d_data, outqueue=None):
448 """Algorithm by W. Trevor King.
452 Uses :class:`SurfacePositionModel` internally.
454 reverse = z_data[0] > z_data[-1]
455 if reverse == True: # approaching, contact region on the right
456 d_data = d_data[::-1]
457 s = SurfacePositionModel(d_data, info={
458 'force zero non-contact slope':True},
460 offset,contact_slope,surface_index,non_contact_slope = s.fit(
464 'contact slope': contact_slope,
465 'surface index': surface_index,
466 'non-contact slope': non_contact_slope,
469 deflection_offset = offset + contact_slope*surface_index,
471 surface_index = len(d_data)-1-surface_index
472 return (numpy.round(surface_index), deflection_offset, info)
475 class ForceCommand (Command):
476 """Convert a deflection column from meters to newtons.
478 def __init__(self, plugin):
479 super(ForceCommand, self).__init__(
480 name='add block force array',
483 Argument(name='block', aliases=['set'], type='int', default=0,
485 Data block for which the force should be calculated. For an
486 approach/retract force curve, `0` selects the approaching curve and `1`
487 selects the retracting curve.
489 Argument(name='input deflection column', type='string',
490 default='surface deflection (m)',
492 Name of the column to use as the deflection input.
494 Argument(name='output deflection column', type='string',
495 default='deflection',
497 Name of the column (without units) to use as the deflection output.
499 Argument(name='spring constant info name', type='string',
500 default='spring constant (N/m)',
502 Name of the spring constant in the `.info` dictionary.
505 help=self.__doc__, plugin=plugin)
507 def _run(self, hooke, inqueue, outqueue, params):
508 data = params['curve'].data[params['block']]
509 # HACK? rely on params['curve'] being bound to the local hooke
510 # playlist (i.e. not a copy, as you would get by passing a
511 # curve through the queue). Ugh. Stupid queues. As an
512 # alternative, we could pass lookup information through the
514 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
515 new.info = copy.deepcopy(data.info)
517 new.info['columns'].append(
518 join_data_label(params['output deflection column'], 'N'))
519 d_data = data[:,data.info['columns'].index(
520 params['input deflection column'])]
521 new[:,-1] = d_data * data.info[params['spring constant info name']]
522 params['curve'].data[params['block']] = new
525 class CantileverAdjustedExtensionCommand (Command):
526 """Remove cantilever extension from a total extension column.
528 def __init__(self, plugin):
529 super(CantileverAdjustedExtensionCommand, self).__init__(
530 name='add block cantilever adjusted extension array',
533 Argument(name='block', aliases=['set'], type='int', default=0,
535 Data block for which the adjusted extension should be calculated. For
536 an approach/retract force curve, `0` selects the approaching curve and
537 `1` selects the retracting curve.
539 Argument(name='input distance column', type='string',
540 default='surface distance (m)',
542 Name of the column to use as the distance input.
544 Argument(name='input deflection column', type='string',
545 default='deflection (N)',
547 Name of the column to use as the deflection input.
549 Argument(name='output distance column', type='string',
550 default='cantilever adjusted extension',
552 Name of the column (without units) to use as the deflection output.
554 Argument(name='spring constant info name', type='string',
555 default='spring constant (N/m)',
557 Name of the spring constant in the `.info` dictionary.
560 help=self.__doc__, plugin=plugin)
562 def _run(self, hooke, inqueue, outqueue, params):
563 data = params['curve'].data[params['block']]
564 # HACK? rely on params['curve'] being bound to the local hooke
565 # playlist (i.e. not a copy, as you would get by passing a
566 # curve through the queue). Ugh. Stupid queues. As an
567 # alternative, we could pass lookup information through the
569 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
570 new.info = copy.deepcopy(data.info)
572 new.info['columns'].append(
573 join_data_label(params['output distance column'], 'm'))
574 z_data = data[:,data.info['columns'].index(
575 params['input distance column'])]
576 d_data = data[:,data.info['columns'].index(
577 params['input deflection column'])]
578 k = data.info[params['spring constant info name']]
580 z_name,z_unit = split_data_label(params['input distance column'])
581 assert z_unit == 'm', params['input distance column']
582 d_name,d_unit = split_data_label(params['input deflection column'])
583 assert d_unit == 'N', params['input deflection column']
584 k_name,k_unit = split_data_label(params['spring constant info name'])
585 assert k_unit == 'N/m', params['spring constant info name']
587 new[:,-1] = z_data - d_data / k
588 params['curve'].data[params['block']] = new
591 class FlattenCommand (Command):
592 """Flatten a deflection column.
594 Subtracts a polynomial fit from the non-contact part of the curve
595 to flatten it. The best polynomial fit is chosen among
596 polynomials of degree 1 to `max degree`.
598 .. todo: Why does flattening use a polynomial fit and not a sinusoid?
599 Isn't most of the oscillation due to laser interference?
600 See Jaschke 1995 ( 10.1063/1.1146018 )
601 and the figure 4 caption of Weisenhorn 1992 ( 10.1103/PhysRevB.45.11226 )
603 def __init__(self, plugin):
604 super(FlattenCommand, self).__init__(
605 name='add flattened extension array',
608 Argument(name='block', aliases=['set'], type='int', default=0,
610 Data block for which the adjusted extension should be calculated. For
611 an approach/retract force curve, `0` selects the approaching curve and
612 `1` selects the retracting curve.
614 Argument(name='max degree', type='int',
617 Highest order polynomial to consider for flattening. Using values
618 greater than one usually doesn't help and can give artifacts.
619 However, it could be useful too. (TODO: Back this up with some
622 Argument(name='input distance column', type='string',
623 default='surface distance (m)',
625 Name of the column to use as the distance input.
627 Argument(name='input deflection column', type='string',
628 default='deflection (N)',
630 Name of the column to use as the deflection input.
632 Argument(name='output deflection column', type='string',
633 default='flattened deflection',
635 Name of the column (without units) to use as the deflection output.
637 Argument(name='fit info name', type='string',
638 default='flatten fit',
640 Name of the flattening information in the `.info` dictionary.
643 help=self.__doc__, plugin=plugin)
645 def _run(self, hooke, inqueue, outqueue, params):
646 data = params['curve'].data[params['block']]
647 # HACK? rely on params['curve'] being bound to the local hooke
648 # playlist (i.e. not a copy, as you would get by passing a
649 # curve through the queue). Ugh. Stupid queues. As an
650 # alternative, we could pass lookup information through the
652 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
653 new.info = copy.deepcopy(data.info)
655 z_data = data[:,data.info['columns'].index(
656 params['input distance column'])]
657 d_data = data[:,data.info['columns'].index(
658 params['input deflection column'])]
660 d_name,d_unit = split_data_label(params['input deflection column'])
661 new.info['columns'].append(
662 join_data_label(params['output deflection column'], d_unit))
664 contact_index = numpy.absolute(z_data).argmin()
666 indices = numpy.argwhere(mask)
667 z_nc = z_data[indices].flatten()
668 d_nc = d_data[indices].flatten()
671 degree = poly_values = None
672 log = logging.getLogger('hooke')
673 for deg in range(params['max degree']):
675 pv = scipy.polyfit(z_nc, d_nc, deg)
676 df = scipy.polyval(pv, z_nc)
677 err = numpy.sqrt((df-d_nc)**2).sum()
679 log.warn('failed to flatten with a degree %d polynomial: %s'
682 if err < min_err: # new best fit
688 raise Failure('failed to flatten with all degrees')
689 new.info[params['fit info name']] = {
690 'error':min_err/len(z_nc),
692 'max degree':params['max degree'],
693 'polynomial values':poly_values,
695 new[:,-1] = d_data - mask*scipy.polyval(poly_values, z_data)
696 params['curve'].data[params['block']] = new