Close parenthesis on column append in DifferenceCommand
[hooke.git] / hooke / plugin / vclamp.py
index 2bc39d68fef0563268cdb865e099e60d759215cb..40c1ecb543b20a697768e6b20171271d1780e117 100644 (file)
@@ -6,15 +6,15 @@
 #
 # This file is part of Hooke.
 #
-# Hooke is free software: you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation, either
-# version 3 of the License, or (at your option) any later version.
+# Hooke is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
 #
-# Hooke is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU Lesser General Public License for more details.
+# Hooke is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
+# Public License for more details.
 #
 # You should have received a copy of the GNU Lesser General Public
 # License along with Hooke.  If not, see
@@ -25,43 +25,517 @@ several associated :class:`hooke.command.Command`\s for handling
 common velocity clamp analysis tasks.
 """
 
-from ..command import Command, Argument, Failure
-from ..plugin import Builtin
+import copy
+
+import numpy
+import scipy
+
+from ..command import Command, Argument, Failure, NullQueue
+from ..config import Setting
+from ..curve import Data
+from ..plugin import Plugin
+from ..util.fit import PoorFit, ModelFitter
+from ..util.si import join_data_label, split_data_label
+from .curve import CurveArgument
+
+
+def scale(hooke, curve, block=None):
+    """Run 'add block force array' on `block`.
+
+    Runs 'zero block surface contact point' first, if necessary.  Does
+    not run either command if the columns they add to the block are
+    already present.
+
+    If `block` is `None`, scale all blocks in `curve`.
+    """
+    commands = hooke.commands
+    contact = [c for c in hooke.commands
+               if c.name == 'zero block surface contact point'][0]
+    force = [c for c in hooke.commands if c.name == 'add block force array'][0]
+    cant_adjust = [c for c in hooke.commands
+               if c.name == 'add block cantilever adjusted extension array'][0]
+    inqueue = None
+    outqueue = NullQueue()
+    if block == None:
+        for i in range(len(curve.data)):
+            scale(hooke, curve, block=i)
+    else:
+        params = {'curve':curve, 'block':block}
+        b = curve.data[block]
+        if ('surface distance (m)' not in b.info['columns']
+            or 'surface adjusted deflection (m)' not in b.info['columns']):
+            try:
+                contact._run(hooke, inqueue, outqueue, params)
+            except PoorFit, e:
+                raise PoorFit('Could not fit %s %s: %s'
+                              % (curve.path, block, str(e)))
+        if ('deflection (N)' not in b.info['columns']):
+            force._run(hooke, inqueue, outqueue, params)
+        if ('cantilever adjusted extension (m)' not in b.info['columns']):
+            cant_adjust._run(hooke, inqueue, outqueue, params)
+    return curve
 
+class SurfacePositionModel (ModelFitter):
+    """Bilinear surface position model.
 
-# Utility functions
+    The bilinear model is symmetric, but the parameter guessing and
+    sanity checks assume the contact region occurs for lower indicies
+    ("left of") the non-contact region.  We also assume that
+    tip-surface attractions produce positive deflections.
 
-def scale(curve):
-    return curve
+    Notes
+    -----
+    Algorithm borrowed from WTK's `piezo package`_, specifically
+    from :func:`piezo.z_piezo_utils.analyzeSurfPosData`.
 
+    .. _piezo package:
+      http://www.physics.drexel.edu/~wking/code/git/git.php?p=piezo.git
 
-class VelocityClampPlugin (Builtin):
-    def __init__(self):
-        super(VelocityClampPlugin, self).__init__(name='vclamp')
-        self._commands = []
-#            NextCommand(self), PreviousCommand(self), JumpCommand(self),
-#            ]
+    Fits the data to the bilinear :method:`model`.
 
-class generalvclampCommands(object):
-
-    def do_subtplot(self, args):
-        '''
-        SUBTPLOT
-        (procplots.py plugin)
-        Plots the difference between ret and ext current curve
-        -------
-        Syntax: subtplot
-        '''
-        #FIXME: sub_filter and sub_order must be args
+    In order for this model to produce a satisfactory fit, there
+    should be enough data in the off-surface region that interactions
+    due to proteins, etc. will not seriously skew the fit in the
+    off-surface region.
+    """
+    def model(self, params):
+        """A continuous, bilinear model.
 
-        if len(self.plots[0].vectors) != 2:
-            print 'This command only works on a curve with two different plots.'
-            pass
+        Notes
+        -----
+        .. math::
+    
+          y = \begin{cases}
+            p_0 + p_1 x                 & \text{if $x <= p_2$}, \\
+            p_0 + p_1 p_2 + p_3 (x-p_2) & \text{if $x >= p_2$}.
+              \end{cases}
+    
+        Where :math:`p_0` is a vertical offset, :math:`p_1` is the slope
+        of the first region, :math:`p_2` is the transition location, and
+        :math:`p_3` is the slope of the second region.
+        """
+        p = params  # convenient alias
+        if self.info.get('force zero non-contact slope', None) == True:
+            p = list(p)
+            p.append(0.)  # restore the non-contact slope parameter
+        r2 = numpy.round(abs(p[2]))
+        if r2 >= 1:
+            self._model_data[:r2] = p[0] + p[1] * numpy.arange(r2)
+        if r2 < len(self._data)-1:
+            self._model_data[r2:] = \
+                p[0] + p[1]*p[2] + p[3] * numpy.arange(len(self._data)-r2)
+        return self._model_data
+
+    def set_data(self, data, info=None):
+        super(SurfacePositionModel, self).set_data(data, info)
+        if info == None:
+            info = {}
+        self.info = info
+        self.info['min position'] = 0
+        self.info['max position'] = len(data)
+        self.info['max deflection'] = data.max()
+        self.info['min deflection'] = data.min()
+        self.info['position range'] = self.info['max position'] - self.info['min position']
+        self.info['deflection range'] = self.info['max deflection'] - self.info['min deflection']
+
+    def guess_initial_params(self, outqueue=None):
+        """Guess the initial parameters.
+
+        Notes
+        -----
+        We guess initial parameters such that the offset (:math:`p_1`)
+        matches the minimum deflection, the kink (:math:`p_2`) occurs in
+        the middle of the data, the initial (contact) slope (:math:`p_0`)
+        produces the maximum deflection at the left-most point, and the
+        final (non-contact) slope (:math:`p_3`) is zero.
+        """
+        left_offset = self.info['min deflection']
+        left_slope = 2*(self.info['deflection range']
+                        /self.info['position range'])
+        kink_position = (self.info['max position']
+                         +self.info['min position'])/2.0
+        right_slope = 0
+        self.info['guessed contact slope'] = left_slope
+        params = [left_offset, left_slope, kink_position, right_slope]
+        if self.info.get('force zero non-contact slope', None) == True:
+            params = params[:-1]
+        return params
+
+    def guess_scale(self, params, outqueue=None):
+        """Guess the parameter scales.
+
+        Notes
+        -----
+        We guess offset scale (:math:`p_0`) as one tenth of the total
+        deflection range, the kink scale (:math:`p_2`) as one tenth of
+        the total index range, the initial (contact) slope scale
+        (:math:`p_1`) as one tenth of the contact slope estimation,
+        and the final (non-contact) slope scale (:math:`p_3`) is as
+        one tenth of the initial slope scale.
+        """
+        offset_scale = self.info['deflection range']/10.
+        left_slope_scale = abs(params[1])/10.
+        kink_scale = self.info['position range']/10.
+        right_slope_scale = left_slope_scale/10.
+        scale = [offset_scale, left_slope_scale, kink_scale, right_slope_scale]
+        if self.info.get('force zero non-contact slope', None) == True:
+            scale = scale[:-1]
+        return scale
+
+    def fit(self, *args, **kwargs):
+        self.info['guessed contact slope'] = None
+        params = super(SurfacePositionModel, self).fit(*args, **kwargs)
+        params[2] = abs(params[2])
+        if self.info.get('force zero non-contact slope', None) == True:
+            params = list(params)
+            params.append(0.)  # restore the non-contact slope parameter
+
+        # check that the fit is reasonable, see the :meth:`model` docstring
+        # for parameter descriptions.  HACK: hardcoded cutoffs.
+        if abs(params[3]*10) > abs(params[1]) :
+            raise PoorFit('Slope in non-contact region, or no slope in contact')
+        if params[2] < self.info['min position']+0.02*self.info['position range']:
+            raise PoorFit(
+                'No kink (kink %g less than %g, need more space to left)'
+                % (params[2],
+                   self.info['min position']+0.02*self.info['position range']))
+        if params[2] > self.info['max position']-0.02*self.info['position range']:
+            raise poorFit(
+                'No kink (kink %g more than %g, need more space to right)'
+                % (params[2],
+                   self.info['max position']-0.02*self.info['position range']))
+        if (self.info['guessed contact slope'] != None
+            and abs(params[1]) < 0.5 * abs(self.info['guessed contact slope'])):
+            raise PoorFit('Too far (contact slope %g, but expected ~%g'
+                          % (params[3], self.info['guessed contact slope']))
+        return params
+
+class VelocityClampPlugin (Plugin):
+    def __init__(self):
+        super(VelocityClampPlugin, self).__init__(name='vclamp')
+        self._commands = [
+            SurfaceContactCommand(self), ForceCommand(self),
+            CantileverAdjustedExtensionCommand(self),
+            ]
+
+    def default_settings(self):
+        return [
+            Setting(section=self.setting_section, help=self.__doc__),
+            Setting(section=self.setting_section,
+                    option='surface contact point algorithm',
+                    value='wtk',
+                    help='Select the surface contact point algorithm.  See the documentation for descriptions of available algorithms.')
+            ]
+
+
+class SurfaceContactCommand (Command):
+    """Automatically determine a block's surface contact point.
+
+    You can select the contact point algorithm with the creatively
+    named `surface contact point algorithm` configuration setting.
+    Currently available options are:
+
+    * fmms (:meth:`find_contact_point_fmms`)
+    * ms (:meth:`find_contact_point_ms`)
+    * wtk (:meth:`find_contact_point_wtk`)
+    """
+    def __init__(self, plugin):
+        super(SurfaceContactCommand, self).__init__(
+            name='zero block surface contact point',
+            arguments=[
+                CurveArgument,
+                Argument(name='block', aliases=['set'], type='int', default=0,
+                         help="""
+Data block for which the force should be calculated.  For an
+approach/retract force curve, `0` selects the approaching curve and `1`
+selects the retracting curve.
+""".strip()),
+                Argument(name='input distance column', type='string',
+                         default='z piezo (m)',
+                         help="""
+Name of the column to use as the surface positioning input.
+""".strip()),
+                Argument(name='input deflection column', type='string',
+                         default='deflection (m)',
+                         help="""
+Name of the column to use as the deflection input.
+""".strip()),
+                Argument(name='output distance column', type='string',
+                         default='surface distance',
+                         help="""
+Name of the column (without units) to use as the surface positioning output.
+""".strip()),
+                Argument(name='output deflection column', type='string',
+                         default='surface deflection',
+                         help="""
+Name of the column (without units) to use as the deflection output.
+""".strip()),
+                Argument(name='distance info name', type='string',
+                         default='surface distance offset',
+                         help="""
+Name of the column (without units) for storing the distance offset in the `.info` dictionary.
+""".strip()),
+                Argument(name='deflection info name', type='string',
+                         default='surface deflection offset',
+                         help="""
+Name of the column (without units) for storing the deflection offset in the `.info` dictionary.
+""".strip()),
+                Argument(name='fit parameters info name', type='string',
+                         default='surface deflection offset',
+                         help="""
+Name of the column (without units) for storing the deflection offset in the `.info` dictionary.
+""".strip()),
+                ],
+            help=self.__doc__, plugin=plugin)
+
+    def _run(self, hooke, inqueue, outqueue, params):
+        data = params['curve'].data[params['block']]
+        # HACK? rely on params['curve'] being bound to the local hooke
+        # playlist (i.e. not a copy, as you would get by passing a
+        # curve through the queue).  Ugh.  Stupid queues.  As an
+        # alternative, we could pass lookup information through the
+        # queue...
+        new = Data((data.shape[0], data.shape[1]+2), dtype=data.dtype)
+        new.info = copy.deepcopy(data.info)
+        new[:,:-2] = data
+        name,dist_units = split_data_label(params['input distance column'])
+        name,def_units = split_data_label(params['input deflection column'])
+        new.info['columns'].extend([
+                join_data_label(params['output distance column'], dist_units),
+                join_data_label(params['output deflection column'], def_units),
+                ])
+        dist_data = data[:,data.info['columns'].index(
+                params['input distance column'])]
+        def_data = data[:,data.info['columns'].index(
+                params['input deflection column'])]
+        i,def_offset,ps = self.find_contact_point(
+            params['curve'], dist_data, def_data, outqueue)
+        dist_offset = dist_data[i]
+        new.info[join_data_label(params['distance info name'], dist_units
+                                 )] = dist_offset
+        new.info[join_data_label(params['deflection info name'], def_units
+                                 )] = def_offset
+        new.info[params['fit parameters info name']] = ps
+        new[:,-2] = dist_data - dist_offset
+        new[:,-1] = def_data - def_offset
+        params['curve'].data[params['block']] = new
+
+    def find_contact_point(self, curve, z_data, d_data, outqueue=None):
+        """Railyard for the `find_contact_point_*` family.
+
+        Uses the `surface contact point algorithm` configuration
+        setting to call the appropriate backend algorithm.
+        """
+        fn = getattr(self, 'find_contact_point_%s'
+                     % self.plugin.config['surface contact point algorithm'])
+        return fn(curve, z_data, d_data, outqueue)
+
+    def find_contact_point_fmms(self, curve, z_data, d_data, outqueue=None):
+        """Algorithm by Francesco Musiani and Massimo Sandal.
+
+        Notes
+        -----
+        Algorithm:
+
+        0) Driver-specific workarounds, e.g. deal with the PicoForce
+          trigger bug by excluding retraction portions with excessive
+          deviation.
+        1) Select the second half (non-contact side) of the retraction
+          curve.
+        2) Fit the selection to a line.
+        3) If the fit is not almost horizontal, halve the selection
+          and retrun to (2).
+        4) Average the selection and use it as a baseline.
+        5) Slide in from the start (contact side) of the retraction
+        curve, until you find a point with greater than baseline
+        deflection.  That point is the contact point.
+        """
+        if curve.info['filetype'] == 'picoforce':
+            # Take care of the picoforce trigger bug (TODO: example
+            # data file demonstrating the bug).  We exclude portions
+            # of the curve that have too much standard deviation.
+            # Yes, a lot of magic is here.
+            check_start = len(d_data)-len(d_data)/20
+            monster_start = len(d_data)
+            while True:
+                # look at the non-contact tail
+                non_monster = d_data[check_start:monster_start]
+                if non_monster.std() < 2e-10: # HACK: hardcoded cutoff
+                    break
+                else: # move further away from the monster
+                    check_start -= len(d_data)/50
+                    monster_start -= len(d_data)/50
+            z_data = z_data[:monster_start]
+            d_data = d_data[:monster_start]
+
+        # take half of the thing to start
+        selection_start = len(d_data)/2
+        while True:
+            z_chunk = z_data[selection_start:]
+            d_chunk = d_data[selection_start:]
+            slope,intercept,r,two_tailed_prob,stderr_of_the_estimate = \
+                scipy.stats.linregress(z_chunk, d_chunk)
+            # We stop if we found an almost-horizontal fit or if we're
+            # getting to small a selection.  FIXME: 0.1 and 5./6 here
+            # are "magic numbers" (although reasonable)
+            if (abs(slope) < 0.1  # deflection (m) / surface (m)
+                or selection_start > 5./6*len(d_data)):
+                break
+            selection_start += 10
+
+        d_baseline = d_chunk.mean()
+
+        # find the first point above the calculated baseline
+        i = 0
+        while i < len(d_data) and d_data[i] < ymean:
+            i += 1
+        return (i, d_baseline, {})
+
+    def find_contact_point_ms(self, curve, z_data, d_data, outqueue=None):
+        """Algorithm by Massimo Sandal.
+
+        Notes
+        -----
+        WTK: At least the commits are by Massimo, and I see no notes
+        attributing the algorithm to anyone else.
+
+        Algorithm:
+
+        * ?
+        """
+        xext=raw_plot.vectors[0][0]
+        yext=raw_plot.vectors[0][1]
+        xret2=raw_plot.vectors[1][0]
+        yret=raw_plot.vectors[1][1]
+
+        first_point=[xext[0], yext[0]]
+        last_point=[xext[-1], yext[-1]]
+
+        #regr=scipy.polyfit(first_point, last_point,1)[0:2]
+        diffx=abs(first_point[0]-last_point[0])
+        diffy=abs(first_point[1]-last_point[1])
+
+        #using polyfit results in numerical errors. good old algebra.
+        a=diffy/diffx
+        b=first_point[1]-(a*first_point[0])
+        baseline=scipy.polyval((a,b), xext)
+
+        ysub=[item-basitem for item,basitem in zip(yext,baseline)]
+
+        contact=ysub.index(min(ysub))
+
+        return xext,ysub,contact
+
+        #now, exploit a ClickedPoint instance to calculate index...
+        dummy=ClickedPoint()
+        dummy.absolute_coords=(x_intercept,y_intercept)
+        dummy.find_graph_coords(xret2,yret)
+
+        if debug:
+            return dummy.index, regr, regr_contact
+        else:
+            return dummy.index
+
+    def find_contact_point_wtk(self, curve, z_data, d_data, outqueue=None):
+        """Algorithm by W. Trevor King.
+
+        Notes
+        -----
+        Uses :func:`analyze_surf_pos_data_wtk` internally.
+        """
+        reverse = z_data[0] > z_data[-1]
+        if reverse == True:    # approaching, contact region on the right
+            d_data = d_data[::-1]
+        s = SurfacePositionModel(d_data)
+        s.info['force zero non-contact slope'] = True
+        offset,contact_slope,surface_index,non_contact_slope = s.fit(
+            outqueue=outqueue)
+        info = {
+            'offset': offset,
+            'contact slope': contact_slope,
+            'surface index': surface_index,
+            'non-contact slope': non_contact_slope,
+            'reversed': reverse,
+            }
+        deflection_offset = offset + contact_slope*surface_index,
+        if reverse == True:
+            surface_index = len(d_data)-1-surface_index
+        return (numpy.round(surface_index), deflection_offset, info)
+
+class ForceCommand (Command):
+    """Calculate a block's `deflection (N)` array.
+
+    Uses the block's `deflection (m)` array and
+    `spring constant (N/m)`.
+    """
+    def __init__(self, plugin):
+        super(ForceCommand, self).__init__(
+            name='add block force array',
+            arguments=[
+                CurveArgument,
+                Argument(name='block', aliases=['set'], type='int', default=0,
+                         help="""
+Data block for which the force should be calculated.  For an
+approach/retract force curve, `0` selects the approaching curve and `1`
+selects the retracting curve.
+""".strip()),
+                ],
+            help=self.__doc__, plugin=plugin)
+
+    def _run(self, hooke, inqueue, outqueue, params):
+        data = params['curve'].data[params['block']]
+        # HACK? rely on params['curve'] being bound to the local hooke
+        # playlist (i.e. not a copy, as you would get by passing a
+        # curve through the queue).  Ugh.  Stupid queues.  As an
+        # alternative, we could pass lookup information through the
+        # queue.
+        new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
+        new.info = copy.deepcopy(data.info)
+        new[:,:-1] = data
+        new.info['columns'].append('deflection (N)')
+        d_data = data[:,data.info['columns'].index('surface adjusted deflection (m)')]
+        new[:,-1] = d_data * data.info['spring constant (N/m)']
+        params['curve'].data[params['block']] = new
+
+
+class CantileverAdjustedExtensionCommand (Command):
+    """Calculate a block's `cantilever adjusted extension (m)` array.
+
+    Uses the block's `deflection (m)` and `surface distance offset (m)`
+    arrays and `spring constant (N/m)`.
+    """
+    def __init__(self, plugin):
+        super(CantileverAdjustedExtensionCommand, self).__init__(
+            name='add block cantilever adjusted extension array',
+            arguments=[
+                CurveArgument,
+                Argument(name='block', aliases=['set'], type='int', default=0,
+                         help="""
+Data block for which the adjusted extension should be calculated.  For
+an approach/retract force curve, `0` selects the approaching curve and
+`1` selects the retracting curve.
+""".strip()),
+                ],
+            help=self.__doc__, plugin=plugin)
+
+    def _run(self, hooke, inqueue, outqueue, params):
+        data = params['curve'].data[params['block']]
+        # HACK? rely on params['curve'] being bound to the local hooke
+        # playlist (i.e. not a copy, as you would get by passing a
+        # curve through the queue).  Ugh.  Stupid queues.  As an
+        # alternative, we could pass lookup information through the
+        # queue.
+        new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
+        new.info = copy.deepcopy(data.info)
+        new[:,:-1] = data
+        new.info['columns'].append('cantilever adjusted extension (m)')
+        z_data = data[:,data.info['columns'].index('surface distance (m)')]
+        d_data = data[:,data.info['columns'].index('deflection (N)')]
+        new[:,-1] = z_data - d_data / data.info['spring constant (N/m)']
+        params['curve'].data[params['block']] = new
 
-        outplot=self.subtract_curves(sub_order=1)
 
-        plot_graph=self.list_of_events['plot_graph']
-        wx.PostEvent(self.frame,plot_graph(plots=[outplot]))
+class generalvclampCommands(object):
 
     def _plug_init(self):
         self.basecurrent=None
@@ -437,7 +911,6 @@ class generalvclampCommands(object):
         return contact_point, contact_point_index
 
 
-
     def baseline_points(self,peak_location, displayed_plot):
         clicks=self.config['baseline_clicks']
         if clicks==0: