Upgraded hooke.plugin.polymer_fit commands to ColumnAddingArgument subclasses.
authorW. Trevor King <wking@drexel.edu>
Mon, 16 Aug 2010 16:33:39 +0000 (12:33 -0400)
committerW. Trevor King <wking@drexel.edu>
Mon, 16 Aug 2010 16:33:39 +0000 (12:33 -0400)
Now the following command works:

    $ bin/hk.py
        -c 'load_playlist test/data/test'
        -c 'zero_surface_contact_point --block retract'
        -c 'flat_filter_peaks --block retract --min_points 1'
        -c 'zero_surface_contact_point --block retract
                --ignore_after_last_peak_info_name "flat filter peaks"'
        -c 'convert_distance_to_force --block retract
                --deflection_column "surface deflection (m)"'
        -c 'remove_cantilever_from_extension --block retract'
        -c 'flat_peaks_to_polymer_peaks --block retract'
        -c 'polymer_fit_peaks --block retract'
        -c 'export_block --block retract --output myblock.dat'
        --persist

See related commands in 725a18055518 and 27bf3c1e98e1.

hooke/plugin/convfilt.py
hooke/plugin/flatfilt.py
hooke/plugin/polymer_fit.py
test/polymer_fit.py [new file with mode: 0644]

index 1c8124f..ff9e68c 100644 (file)
@@ -46,7 +46,6 @@ from ..util.peak import find_peaks, find_peaks_arguments, Peak, _kwargs
 from . import Plugin, argument_to_setting
 from .curve import CurveArgument
 from .playlist import FilterCommand
-from .vclamp import scale
 
 
 class ConvFiltPlugin (Plugin):
@@ -153,9 +152,6 @@ class ConvolutionPeaksCommand (Command):
         if curve.info['experiment'] != VelocityClamp:
             raise Failure('%s operates on VelocityClamp experiments, not %s'
                           % (self.name, curve.info['experiment']))
-        for col in ['surface distance (m)', 'deflection (N)']:
-            if col not in curve.data[0].info['columns']:
-                scale(hooke, curve)
         data = None
         for block in curve.data:
             if block.info['name'].startswith('retract'):
index 55a7d5b..952e6fc 100644 (file)
@@ -137,8 +137,7 @@ Name of the column (without units) to use as the peak output.
                 Argument(name='peak info name', type='string',
                          default='flat filter peaks',
                          help="""
-Name (without units) for storing the list of peaks in the `.info`
-dictionary.
+Name for storing the list of peaks in the `.info` dictionary.
 """.strip()),
                 ] + plugin_arguments,
             help=self.__doc__, plugin=plugin)
@@ -182,8 +181,6 @@ dictionary.
         name,def_unit = split_data_label(params['deflection column'])
         params['output peak column'] = join_data_label(
             params['output peak column'], def_unit)
-        params['peak info name'] = join_data_label(
-            params['peak info name'], def_unit)
         return params
 
     def _peak_name(self, params, index):
index 9ee0c53..770dbb9 100644 (file)
@@ -42,8 +42,7 @@ from ..util.fit import PoorFit, ModelFitter
 from ..util.peak import Peak
 from ..util.si import join_data_label, split_data_label
 from . import Plugin, argument_to_setting
-from .curve import CurveArgument
-from .vclamp import scale
+from .curve import ColumnAccessCommand, ColumnAddingCommand
 
 
 kB = 1.3806503e-23  # Joules/Kelvin
@@ -888,18 +887,16 @@ trans-trans-gauche (ttg) PEG states in units of kBT.
             self._settings.append(argument_to_setting(
                     self.setting_section, argument))
             argument.default = None # if argument isn't given, use the config.
-        self._arguments.extend([  # Non-configurable arguments
-                Argument(name='input distance column', type='string',
-                         default='cantilever adjusted extension (m)',
-                         help="""
+        self._input_columns = [
+            ('distance column', 'cantilever adjusted extension (m)',
+             """
 Name of the column to use as the surface position input.
 """.strip()),
-                Argument(name='input deflection column', type='string',
-                         default='deflection (N)',
-                         help="""
+            ('deflection column', 'deflection (N)',
+             """
 Name of the column to use as the deflection input.
 """.strip()),
-                ])
+            ]
         self._commands = [
             PolymerFitCommand(self), PolymerFitPeaksCommand(self),
             TranslateFlatPeaksCommand(self),
@@ -912,7 +909,7 @@ Name of the column to use as the deflection input.
         return self._settings
 
 
-class PolymerFitCommand (Command):
+class PolymerFitCommand (ColumnAddingCommand):
     """Polymer model (WLC, FJC, etc.) fitting.
 
     Fits an entropic elasticity function to a given chunk of the
@@ -926,62 +923,53 @@ class PolymerFitCommand (Command):
     def __init__(self, plugin):
         super(PolymerFitCommand, self).__init__(
             name='polymer fit',
-            arguments=[
-                CurveArgument,
-                Argument(name='block', aliases=['set'], type='int', default=0,
-                         help="""
-Data block for which the fit should be calculated.  For an
-approach/retract force curve, `0` selects the approaching curve and
-`1` selects the retracting curve.
-""".strip()),
-                Argument(name='bounds', type='point', optional=False, count=2,
-                         help="""
-Indicies of points bounding the selected data.
-""".strip()),
-                ] + plugin._arguments + [
-                Argument(name='output tension column', type='string',
-                         default='polymer tension',
-                         help="""
+            columns=plugin._input_columns,
+            new_columns=[
+                ('output tension column', 'polymer tension',
+                 """
 Name of the column (without units) to use as the polymer tension output.
 """.strip()),
+                ],
+            arguments=[
                 Argument(name='fit parameters info name', type='string',
                          default='surface deflection offset',
                          help="""
 Name (without units) for storing the fit parameters in the `.info` dictionary.
 """.strip()),
-                ],
+                Argument(name='bounds', type='point', optional=False, count=2,
+                         help="""
+Indicies of points bounding the selected data.
+""".strip()),
+                ] + plugin._arguments,
             help=self.__doc__, plugin=plugin)
 
     def _run(self, hooke, inqueue, outqueue, params):
-        for key,value in params.items():
-            if value == None:  # Use configured default value.
-                params[key] = self.plugin.config[key]
-        scale(hooke, params['curve'], params['block'])  # TODO: is autoscaling a good idea? (explicit is better than implicit)
-        data = params['curve'].data[params['block']]
-        # HACK? rely on params['curve'] being bound to the local hooke
-        # playlist (i.e. not a copy, as you would get by passing a
-        # curve through the queue).  Ugh.  Stupid queues.  As an
-        # alternative, we could pass lookup information through the
-        # queue...
+        params = self.__setup_params(hooke, params)
+        data = self._block(hooke, params)
         model = params['polymer model']
-        new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
-        new.info = copy.deepcopy(data.info)
-        new[:,:-1] = data
-        new.info['columns'].append(
-            join_data_label(params['output tension column'], 'N'))
-        z_data = data[:,data.info['columns'].index(
-                params['input distance column'])]
-        d_data = data[:,data.info['columns'].index(
-                params['input deflection column'])]
+        dist_data = self._get_column(
+            hooke=hooke, params=params, column_name='distance column')
+        def_data = self._get_column(
+            hooke=hooke, params=params, column_name='deflection column')
         start,stop = params['bounds']
         tension_data,ps = self.fit_polymer_model(
-            params, z_data, d_data, start, stop, outqueue)
-        new.info[params['fit parameters info name']] = ps
-        new.info[params['fit parameters info name']]['model'] = model
-        new[:,-1] = tension_data
-        params['curve'].data[params['block']] = new
+            params, dist_data, def_data, start, stop, outqueue)
+        data.info[params['fit parameters info name']] = ps
+        data.info[params['fit parameters info name']]['model'] = model
+        self._set_column(hooke=hooke, params=params,
+                         column_name='output tension column',
+                         values=tension_data)
+
+    def __setup_params(self, hooke, params):
+        for key,value in params.items():
+            if value == None:  # Use configured default value.
+                params[key] = self.plugin.config[key]
+        name,def_unit = split_data_label(params['deflection column'])
+        params['output tension column'] = join_data_label(
+            params['output tension column'], def_unit)
+        return params
 
-    def fit_polymer_model(self, params, z_data, d_data, start, stop,
+    def fit_polymer_model(self, params, dist_data, def_data, start, stop,
                           outqueue=None):
         """Railyard for the `fit_*_model` family.
 
@@ -990,20 +978,20 @@ Name (without units) for storing the fit parameters in the `.info` dictionary.
         """
         fn = getattr(self, 'fit_%s_model'
                      % params['polymer model'].replace('-','_'))
-        return fn(params, z_data, d_data, start, stop, outqueue)
+        return fn(params, dist_data, def_data, start, stop, outqueue)
 
-    def fit_FJC_model(self, params, z_data, d_data, start, stop,
+    def fit_FJC_model(self, params, dist_data, def_data, start, stop,
                       outqueue=None):
         """Fit the data with :class:`FJC`.
         """
         info = {
             'temperature (K)': self.plugin.config['temperature'],
-            'x data (m)': z_data[start:stop],
+            'x data (m)': dist_data[start:stop],
             }
         if True:  # TODO: optionally free persistence length
             info['Kuhn length (m)'] = (
                 params['FJC Kuhn length'])
-        model = FJC(d_data[start:stop], info=info, rescale=True)
+        model = FJC(def_data[start:stop], info=info, rescale=True)
         queue = Queue()
         params = model.fit(outqueue=queue)
         if True:  # TODO: if Kuhn length fixed
@@ -1014,23 +1002,23 @@ Name (without units) for storing the fit parameters in the `.info` dictionary.
         L = params[0]
         T = info['temperature (K)']
         fit_info = queue.get(block=False)
-        f_data = numpy.ones(z_data.shape, dtype=z_data.dtype) * numpy.nan
-        f_data[start:stop] = FJC_fn(z_data[start:stop], T=T, L=L, a=a)
+        f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan
+        f_data[start:stop] = FJC_fn(dist_data[start:stop], T=T, L=L, a=a)
         return [f_data, fit_info]
 
-    def fit_FJC_PEG_model(self, params, z_data, d_data, start, stop,
+    def fit_FJC_PEG_model(self, params, dist_data, def_data, start, stop,
                           outqueue=None):
         """Fit the data with :class:`FJC_PEG`.
         """
         info = {
             'temperature (K)': self.plugin.config['temperature'],
-            'x data (m)': z_data[start:stop],
+            'x data (m)': dist_data[start:stop],
             # TODO: more info
             }
         if True:  # TODO: optionally free persistence length
             info['Kuhn length (m)'] = (
                 params['FJC Kuhn length'])
-        model = FJC_PEG(d_data[start:stop], info=info, rescale=True)
+        model = FJC_PEG(def_data[start:stop], info=info, rescale=True)
         queue = Queue()
         params = model.fit(outqueue=queue)
         if True:  # TODO: if Kuhn length fixed
@@ -1041,22 +1029,22 @@ Name (without units) for storing the fit parameters in the `.info` dictionary.
         N = params[0]
         T = info['temperature (K)']
         fit_info = queue.get(block=False)
-        f_data = numpy.ones(z_data.shape, dtype=z_data.dtype) * numpy.nan
-        f_data[start:stop] = FJC_PEG_fn(z_data[start:stop], **kwargs)
+        f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan
+        f_data[start:stop] = FJC_PEG_fn(dist_data[start:stop], **kwargs)
         return [f_data, fit_info]
 
-    def fit_WLC_model(self, params, z_data, d_data, start, stop,
+    def fit_WLC_model(self, params, dist_data, def_data, start, stop,
                       outqueue=None):
         """Fit the data with :class:`WLC`.
         """
         info = {
             'temperature (K)': self.plugin.config['temperature'],
-            'x data (m)': z_data[start:stop],
+            'x data (m)': dist_data[start:stop],
             }
         if True:  # TODO: optionally free persistence length
             info['persistence length (m)'] = (
                 params['WLC persistence length'])
-        model = WLC(d_data[start:stop], info=info, rescale=True)
+        model = WLC(def_data[start:stop], info=info, rescale=True)
         queue = Queue()
         params = model.fit(outqueue=queue)
         if True:  # TODO: if persistence length fixed
@@ -1067,12 +1055,12 @@ Name (without units) for storing the fit parameters in the `.info` dictionary.
         L = params[0]
         T = info['temperature (K)']
         fit_info = queue.get(block=False)
-        f_data = numpy.ones(z_data.shape, dtype=z_data.dtype) * numpy.nan
-        f_data[start:stop] = WLC_fn(z_data[start:stop], T=T, L=L, p=p)
+        f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan
+        f_data[start:stop] = WLC_fn(dist_data[start:stop], T=T, L=L, p=p)
         return [f_data, fit_info]
 
 
-class PolymerFitPeaksCommand (Command):
+class PolymerFitPeaksCommand (ColumnAccessCommand):
     """Polymer model (WLC, FJC, etc.) fitting.
 
     Use :class:`PolymerFitCommand` to fit the each peak in a list of
@@ -1081,14 +1069,8 @@ class PolymerFitPeaksCommand (Command):
     def __init__(self, plugin):
         super(PolymerFitPeaksCommand, self).__init__(
             name='polymer fit peaks',
+            columns=plugin._input_columns,
             arguments=[
-                CurveArgument,
-                Argument(name='block', aliases=['set'], type='int', default=0,
-                         help="""
-Data block for which the fit should be calculated.  For an
-approach/retract force curve, `0` selects the approaching curve and
-`1` selects the retracting curve.
-""".strip()),
                 Argument(name='peak info name', type='string',
                          default='polymer peaks',
                          help="""
@@ -1102,11 +1084,13 @@ Index of the selected peak in the list of peaks.  Use `None` to fit all peaks.
             help=self.__doc__, plugin=plugin)
 
     def _run(self, hooke, inqueue, outqueue, params):
-        data = params['curve'].data[params['block']]
+        data = self._block(hooke, params)
         fit_command = [c for c in hooke.commands
                        if c.name=='polymer fit'][0]
         inq = Queue()
         outq = Queue()
+        curve = params['curve']
+        params['curve'] = None
         p = copy.deepcopy(params)
         p['curve'] = params['curve']
         del(p['peak info name'])
@@ -1122,7 +1106,7 @@ Index of the selected peak in the list of peaks.  Use `None` to fit all peaks.
                 raise ret
 
 
-class TranslateFlatPeaksCommand (Command):
+class TranslateFlatPeaksCommand (ColumnAccessCommand):
     """Translate flat filter peaks into polymer peaks for fitting.
 
     Use :class:`~hooke.plugin.flatfilt.FlatPeaksCommand` creates a
@@ -1134,62 +1118,59 @@ class TranslateFlatPeaksCommand (Command):
     polymer loading regions.
     """
     def __init__(self, plugin):
-        plugin_arguments = [a for a in plugin._arguments
-                            if a.name in ['input distance column',
-                                          'input deflection column']]
-        arguments = [
-            CurveArgument,
-            Argument(name='block', aliases=['set'], type='int', default=0,
-                     help="""
-Data block for which the fit should be calculated.  For an
-approach/retract force curve, `0` selects the approaching curve and
-`1` selects the retracting curve.
-""".strip()),
-            ] + plugin_arguments + [
-            Argument(name='input peak info name', type='string',
-                     default='flat filter peaks',
-                     help="""
+        super(TranslateFlatPeaksCommand, self).__init__(
+            name='flat peaks to polymer peaks',
+            columns=plugin._input_columns,
+            arguments=[
+                Argument(name='input peak info name', type='string',
+                         default='flat filter peaks',
+                         help="""
 Name for the input peaks in the `.info` dictionary.
 """.strip()),
-            Argument(name='output peak info name', type='string',
-                     default='polymer peaks',
-                     help="""
+                Argument(name='output peak info name', type='string',
+                         default='polymer peaks',
+                         help="""
 Name for the ouptput peaks in the `.info` dictionary.
 """.strip()),
-            Argument(name='end offset', type='int', default=-1,
-                     help="""
+                Argument(name='end offset', type='int', default=-1,
+                         help="""
 Number of points between the end of a new peak and the start of the old.
 """.strip()),
-            Argument(name='start fraction', type='float', default=0.2,
-                     help="""
+                Argument(name='start fraction', type='float', default=0.2,
+                         help="""
 Place the start of the new peak at `start_fraction` from the end of
 the previous old peak to the end of the new peak.  Because the first
 new peak will have no previous old peak, it uses a distance of zero
 instead.
 """.strip()),
-            ]
-        super(TranslateFlatPeaksCommand, self).__init__(
-            name='flat peaks to polymer peaks',
-            arguments=arguments,
+            ] + plugin._arguments,
             help=self.__doc__, plugin=plugin)
 
     def _run(self, hooke, inqueue, outqueue, params):
-        data = params['curve'].data[params['block']]
-        z_data = data[:,data.info['columns'].index(
-                params['input distance column'])]
-        d_data = data[:,data.info['columns'].index(
-                params['input deflection column'])]
-        previous_old_stop = numpy.absolute(z_data).argmin()
+        data = self._block(hooke, params)
+        dist_data = self._get_column(
+            hooke=hooke, params=params, column_name='distance column')
+        def_data = self._get_column(
+            hooke=hooke, params=params, column_name='deflection column')
+        previous_old_stop = numpy.absolute(dist_data).argmin()
         new = []
-        for i,peak in enumerate(data.info[params['input peak info name']]):
+        try:
+            peaks = data.info[params['input peak info name']]
+        except KeyError, e:
+            raise Failure('No value for %s in %s.info (%s): %s'
+                          % (params['input peak info name'], data.info['name'],
+                             sorted(data.info.keys()), e))
+        for i,peak in enumerate(peaks):
             next_old_start = peak.index
             stop = next_old_start + params['end offset'] 
-            z_start = z_data[previous_old_stop] + params['start fraction']*(
-                z_data[stop] - z_data[previous_old_stop])
-            start = numpy.absolute(z_data - z_start).argmin()
+            dist_start = (
+                dist_data[previous_old_stop]
+                + params['start fraction']*(
+                    dist_data[stop] - dist_data[previous_old_stop]))
+            start = numpy.absolute(dist_data - dist_start).argmin()
             p = Peak('polymer peak %d' % i,
                      index=start,
-                     values=d_data[start:stop])
+                     values=def_data[start:stop])
             new.append(p)
             previous_old_stop = peak.post_index()
         data.info[params['output peak info name']] = new
diff --git a/test/polymer_fit.py b/test/polymer_fit.py
new file mode 100644 (file)
index 0000000..40d654e
--- /dev/null
@@ -0,0 +1,99 @@
+# Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
+#
+# This file is part of Hooke.
+#
+# Hooke is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# Hooke is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
+# Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with Hooke.  If not, see
+# <http://www.gnu.org/licenses/>.
+
+"""
+>>> from hooke.hooke import Hooke, HookeRunner
+>>> h = Hooke()
+>>> r = HookeRunner()
+
+Prepare a curve for polymer fitting.
+
+>>> h = r.run_lines(h, ['load_playlist test/data/test']) # doctest: +ELLIPSIS
+<FilePlaylist test.hkp>
+Success
+<BLANKLINE>
+>>> h = r.run_lines(h, ['zero_surface_contact_point --block retract']
+...     ) # doctest: +ELLIPSIS, +REPORT_UDIFF
+{'info':...'fitted parameters': [8.413...e-08, 2.812...e-10, 158.581...],...}
+Success
+<BLANKLINE>
+>>> h = r.run_lines(h, ['polynomial_flatten --block retract --deflection_column "surface deflection (m)" --degree 1'])
+Success
+<BLANKLINE>
+>>> h = r.run_lines(h, ['convert_distance_to_force --block retract --deflection_column "flattened deflection (m)"'])
+Success
+<BLANKLINE>
+>>> h = r.run_lines(h, ['remove_cantilever_from_extension --block retract'])
+Success
+<BLANKLINE>
+>>> h = r.run_lines(h, ['flat_filter_peaks --block retract --min_points 1']
+...     )  # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
+[<Peak flat filter peak 0 of surface deflection 510
+  [ -1.065...e-09  -2.244...e-09]>,
+ <Peak flat filter peak 1 of surface deflection 610
+  [ -1.156...e-09  -8.840...e-10  -3.173...e-10  -7.480...e-10]>,
+ <Peak flat filter peak 2 of surface deflection 704
+  [ -7.933...e-10  -1.654...e-09]>,
+ <Peak flat filter peak 3 of surface deflection 812
+  [ -1.745...e-09]>,
+ <Peak flat filter peak 4 of surface deflection 916 [ -2.085...e-09]>,
+ <Peak flat filter peak 5 of surface deflection 1103
+  [ -1.768...e-09  -8.885...e-09  -1.722...e-09]>]
+Success
+<BLANKLINE>
+
+Fit the flat filter peaks with a polymer tension.
+
+>>> h = r.run_lines(h, ['flat_peaks_to_polymer_peaks --block retract'])
+Success
+<BLANKLINE>
+>>> h = r.run_lines(h, ['polymer_fit_peaks --block retract'])
+Success
+<BLANKLINE>
+
+Check the results.
+
+>>> curve = h.playlists.current().current()
+>>> retract = curve.data[1]
+>>> retract.info['columns']  # doctest: +NORMALIZE_WHITESPACE
+['z piezo (m)', 'deflection (m)',
+ 'surface distance (m)', 'surface deflection (m)',
+ 'flattened deflection (m)', 'deflection (N)',
+ 'cantilever adjusted extension (m)', 'flat filter peaks (m)',
+ 'polymer peak 0 (N)', 'polymer peak 1 (N)', 'polymer peak 2 (N)',
+ 'polymer peak 3 (N)', 'polymer peak 4 (N)', 'polymer peak 5 (N)']
+>>> retract[:5,-2:]
+Data([[ NaN,  NaN],
+       [ NaN,  NaN],
+       [ NaN,  NaN],
+       [ NaN,  NaN],
+       [ NaN,  NaN]])
+>>> retract[1097:1103,-2:]  # doctest: +ELLIPSIS
+Data([[             NaN,   5.220...e-10],
+       [             NaN,   5.592...e-10],
+       [             NaN,   6.104...e-10],
+       [             NaN,   6.261...e-10],
+       [             NaN,   7.058...e-10],
+       [             NaN,              NaN]])
+>>> retract[-5:,-2:]
+Data([[ NaN,  NaN],
+       [ NaN,  NaN],
+       [ NaN,  NaN],
+       [ NaN,  NaN],
+       [ NaN,  NaN]])
+"""