From a97af5f68d851047807e1fc5bfaf5935a5c6e67d Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Mon, 16 Aug 2010 12:33:39 -0400 Subject: [PATCH] Upgraded hooke.plugin.polymer_fit commands to ColumnAddingArgument subclasses. Now the following command works: $ bin/hk.py -c 'load_playlist test/data/test' -c 'zero_surface_contact_point --block retract' -c 'flat_filter_peaks --block retract --min_points 1' -c 'zero_surface_contact_point --block retract --ignore_after_last_peak_info_name "flat filter peaks"' -c 'convert_distance_to_force --block retract --deflection_column "surface deflection (m)"' -c 'remove_cantilever_from_extension --block retract' -c 'flat_peaks_to_polymer_peaks --block retract' -c 'polymer_fit_peaks --block retract' -c 'export_block --block retract --output myblock.dat' --persist See related commands in 725a18055518 and 27bf3c1e98e1. --- hooke/plugin/convfilt.py | 4 - hooke/plugin/flatfilt.py | 5 +- hooke/plugin/polymer_fit.py | 213 ++++++++++++++++-------------------- test/polymer_fit.py | 99 +++++++++++++++++ 4 files changed, 197 insertions(+), 124 deletions(-) create mode 100644 test/polymer_fit.py diff --git a/hooke/plugin/convfilt.py b/hooke/plugin/convfilt.py index 1c8124f..ff9e68c 100644 --- a/hooke/plugin/convfilt.py +++ b/hooke/plugin/convfilt.py @@ -46,7 +46,6 @@ from ..util.peak import find_peaks, find_peaks_arguments, Peak, _kwargs from . import Plugin, argument_to_setting from .curve import CurveArgument from .playlist import FilterCommand -from .vclamp import scale class ConvFiltPlugin (Plugin): @@ -153,9 +152,6 @@ class ConvolutionPeaksCommand (Command): if curve.info['experiment'] != VelocityClamp: raise Failure('%s operates on VelocityClamp experiments, not %s' % (self.name, curve.info['experiment'])) - for col in ['surface distance (m)', 'deflection (N)']: - if col not in curve.data[0].info['columns']: - scale(hooke, curve) data = None for block in curve.data: if block.info['name'].startswith('retract'): diff --git a/hooke/plugin/flatfilt.py b/hooke/plugin/flatfilt.py index 55a7d5b..952e6fc 100644 --- a/hooke/plugin/flatfilt.py +++ b/hooke/plugin/flatfilt.py @@ -137,8 +137,7 @@ Name of the column (without units) to use as the peak output. Argument(name='peak info name', type='string', default='flat filter peaks', help=""" -Name (without units) for storing the list of peaks in the `.info` -dictionary. +Name for storing the list of peaks in the `.info` dictionary. """.strip()), ] + plugin_arguments, help=self.__doc__, plugin=plugin) @@ -182,8 +181,6 @@ dictionary. name,def_unit = split_data_label(params['deflection column']) params['output peak column'] = join_data_label( params['output peak column'], def_unit) - params['peak info name'] = join_data_label( - params['peak info name'], def_unit) return params def _peak_name(self, params, index): diff --git a/hooke/plugin/polymer_fit.py b/hooke/plugin/polymer_fit.py index 9ee0c53..770dbb9 100644 --- a/hooke/plugin/polymer_fit.py +++ b/hooke/plugin/polymer_fit.py @@ -42,8 +42,7 @@ from ..util.fit import PoorFit, ModelFitter from ..util.peak import Peak from ..util.si import join_data_label, split_data_label from . import Plugin, argument_to_setting -from .curve import CurveArgument -from .vclamp import scale +from .curve import ColumnAccessCommand, ColumnAddingCommand kB = 1.3806503e-23 # Joules/Kelvin @@ -888,18 +887,16 @@ trans-trans-gauche (ttg) PEG states in units of kBT. self._settings.append(argument_to_setting( self.setting_section, argument)) argument.default = None # if argument isn't given, use the config. - self._arguments.extend([ # Non-configurable arguments - Argument(name='input distance column', type='string', - default='cantilever adjusted extension (m)', - help=""" + self._input_columns = [ + ('distance column', 'cantilever adjusted extension (m)', + """ Name of the column to use as the surface position input. """.strip()), - Argument(name='input deflection column', type='string', - default='deflection (N)', - help=""" + ('deflection column', 'deflection (N)', + """ Name of the column to use as the deflection input. """.strip()), - ]) + ] self._commands = [ PolymerFitCommand(self), PolymerFitPeaksCommand(self), TranslateFlatPeaksCommand(self), @@ -912,7 +909,7 @@ Name of the column to use as the deflection input. return self._settings -class PolymerFitCommand (Command): +class PolymerFitCommand (ColumnAddingCommand): """Polymer model (WLC, FJC, etc.) fitting. Fits an entropic elasticity function to a given chunk of the @@ -926,62 +923,53 @@ class PolymerFitCommand (Command): def __init__(self, plugin): super(PolymerFitCommand, self).__init__( name='polymer fit', - arguments=[ - CurveArgument, - Argument(name='block', aliases=['set'], type='int', default=0, - help=""" -Data block for which the fit should be calculated. For an -approach/retract force curve, `0` selects the approaching curve and -`1` selects the retracting curve. -""".strip()), - Argument(name='bounds', type='point', optional=False, count=2, - help=""" -Indicies of points bounding the selected data. -""".strip()), - ] + plugin._arguments + [ - Argument(name='output tension column', type='string', - default='polymer tension', - help=""" + columns=plugin._input_columns, + new_columns=[ + ('output tension column', 'polymer tension', + """ Name of the column (without units) to use as the polymer tension output. """.strip()), + ], + arguments=[ Argument(name='fit parameters info name', type='string', default='surface deflection offset', help=""" Name (without units) for storing the fit parameters in the `.info` dictionary. """.strip()), - ], + Argument(name='bounds', type='point', optional=False, count=2, + help=""" +Indicies of points bounding the selected data. +""".strip()), + ] + plugin._arguments, help=self.__doc__, plugin=plugin) def _run(self, hooke, inqueue, outqueue, params): - for key,value in params.items(): - if value == None: # Use configured default value. - params[key] = self.plugin.config[key] - scale(hooke, params['curve'], params['block']) # TODO: is autoscaling a good idea? (explicit is better than implicit) - data = params['curve'].data[params['block']] - # HACK? rely on params['curve'] being bound to the local hooke - # playlist (i.e. not a copy, as you would get by passing a - # curve through the queue). Ugh. Stupid queues. As an - # alternative, we could pass lookup information through the - # queue... + params = self.__setup_params(hooke, params) + data = self._block(hooke, params) model = params['polymer model'] - new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype) - new.info = copy.deepcopy(data.info) - new[:,:-1] = data - new.info['columns'].append( - join_data_label(params['output tension column'], 'N')) - z_data = data[:,data.info['columns'].index( - params['input distance column'])] - d_data = data[:,data.info['columns'].index( - params['input deflection column'])] + dist_data = self._get_column( + hooke=hooke, params=params, column_name='distance column') + def_data = self._get_column( + hooke=hooke, params=params, column_name='deflection column') start,stop = params['bounds'] tension_data,ps = self.fit_polymer_model( - params, z_data, d_data, start, stop, outqueue) - new.info[params['fit parameters info name']] = ps - new.info[params['fit parameters info name']]['model'] = model - new[:,-1] = tension_data - params['curve'].data[params['block']] = new + params, dist_data, def_data, start, stop, outqueue) + data.info[params['fit parameters info name']] = ps + data.info[params['fit parameters info name']]['model'] = model + self._set_column(hooke=hooke, params=params, + column_name='output tension column', + values=tension_data) + + def __setup_params(self, hooke, params): + for key,value in params.items(): + if value == None: # Use configured default value. + params[key] = self.plugin.config[key] + name,def_unit = split_data_label(params['deflection column']) + params['output tension column'] = join_data_label( + params['output tension column'], def_unit) + return params - def fit_polymer_model(self, params, z_data, d_data, start, stop, + def fit_polymer_model(self, params, dist_data, def_data, start, stop, outqueue=None): """Railyard for the `fit_*_model` family. @@ -990,20 +978,20 @@ Name (without units) for storing the fit parameters in the `.info` dictionary. """ fn = getattr(self, 'fit_%s_model' % params['polymer model'].replace('-','_')) - return fn(params, z_data, d_data, start, stop, outqueue) + return fn(params, dist_data, def_data, start, stop, outqueue) - def fit_FJC_model(self, params, z_data, d_data, start, stop, + def fit_FJC_model(self, params, dist_data, def_data, start, stop, outqueue=None): """Fit the data with :class:`FJC`. """ info = { 'temperature (K)': self.plugin.config['temperature'], - 'x data (m)': z_data[start:stop], + 'x data (m)': dist_data[start:stop], } if True: # TODO: optionally free persistence length info['Kuhn length (m)'] = ( params['FJC Kuhn length']) - model = FJC(d_data[start:stop], info=info, rescale=True) + model = FJC(def_data[start:stop], info=info, rescale=True) queue = Queue() params = model.fit(outqueue=queue) if True: # TODO: if Kuhn length fixed @@ -1014,23 +1002,23 @@ Name (without units) for storing the fit parameters in the `.info` dictionary. L = params[0] T = info['temperature (K)'] fit_info = queue.get(block=False) - f_data = numpy.ones(z_data.shape, dtype=z_data.dtype) * numpy.nan - f_data[start:stop] = FJC_fn(z_data[start:stop], T=T, L=L, a=a) + f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan + f_data[start:stop] = FJC_fn(dist_data[start:stop], T=T, L=L, a=a) return [f_data, fit_info] - def fit_FJC_PEG_model(self, params, z_data, d_data, start, stop, + def fit_FJC_PEG_model(self, params, dist_data, def_data, start, stop, outqueue=None): """Fit the data with :class:`FJC_PEG`. """ info = { 'temperature (K)': self.plugin.config['temperature'], - 'x data (m)': z_data[start:stop], + 'x data (m)': dist_data[start:stop], # TODO: more info } if True: # TODO: optionally free persistence length info['Kuhn length (m)'] = ( params['FJC Kuhn length']) - model = FJC_PEG(d_data[start:stop], info=info, rescale=True) + model = FJC_PEG(def_data[start:stop], info=info, rescale=True) queue = Queue() params = model.fit(outqueue=queue) if True: # TODO: if Kuhn length fixed @@ -1041,22 +1029,22 @@ Name (without units) for storing the fit parameters in the `.info` dictionary. N = params[0] T = info['temperature (K)'] fit_info = queue.get(block=False) - f_data = numpy.ones(z_data.shape, dtype=z_data.dtype) * numpy.nan - f_data[start:stop] = FJC_PEG_fn(z_data[start:stop], **kwargs) + f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan + f_data[start:stop] = FJC_PEG_fn(dist_data[start:stop], **kwargs) return [f_data, fit_info] - def fit_WLC_model(self, params, z_data, d_data, start, stop, + def fit_WLC_model(self, params, dist_data, def_data, start, stop, outqueue=None): """Fit the data with :class:`WLC`. """ info = { 'temperature (K)': self.plugin.config['temperature'], - 'x data (m)': z_data[start:stop], + 'x data (m)': dist_data[start:stop], } if True: # TODO: optionally free persistence length info['persistence length (m)'] = ( params['WLC persistence length']) - model = WLC(d_data[start:stop], info=info, rescale=True) + model = WLC(def_data[start:stop], info=info, rescale=True) queue = Queue() params = model.fit(outqueue=queue) if True: # TODO: if persistence length fixed @@ -1067,12 +1055,12 @@ Name (without units) for storing the fit parameters in the `.info` dictionary. L = params[0] T = info['temperature (K)'] fit_info = queue.get(block=False) - f_data = numpy.ones(z_data.shape, dtype=z_data.dtype) * numpy.nan - f_data[start:stop] = WLC_fn(z_data[start:stop], T=T, L=L, p=p) + f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan + f_data[start:stop] = WLC_fn(dist_data[start:stop], T=T, L=L, p=p) return [f_data, fit_info] -class PolymerFitPeaksCommand (Command): +class PolymerFitPeaksCommand (ColumnAccessCommand): """Polymer model (WLC, FJC, etc.) fitting. Use :class:`PolymerFitCommand` to fit the each peak in a list of @@ -1081,14 +1069,8 @@ class PolymerFitPeaksCommand (Command): def __init__(self, plugin): super(PolymerFitPeaksCommand, self).__init__( name='polymer fit peaks', + columns=plugin._input_columns, arguments=[ - CurveArgument, - Argument(name='block', aliases=['set'], type='int', default=0, - help=""" -Data block for which the fit should be calculated. For an -approach/retract force curve, `0` selects the approaching curve and -`1` selects the retracting curve. -""".strip()), Argument(name='peak info name', type='string', default='polymer peaks', help=""" @@ -1102,11 +1084,13 @@ Index of the selected peak in the list of peaks. Use `None` to fit all peaks. help=self.__doc__, plugin=plugin) def _run(self, hooke, inqueue, outqueue, params): - data = params['curve'].data[params['block']] + data = self._block(hooke, params) fit_command = [c for c in hooke.commands if c.name=='polymer fit'][0] inq = Queue() outq = Queue() + curve = params['curve'] + params['curve'] = None p = copy.deepcopy(params) p['curve'] = params['curve'] del(p['peak info name']) @@ -1122,7 +1106,7 @@ Index of the selected peak in the list of peaks. Use `None` to fit all peaks. raise ret -class TranslateFlatPeaksCommand (Command): +class TranslateFlatPeaksCommand (ColumnAccessCommand): """Translate flat filter peaks into polymer peaks for fitting. Use :class:`~hooke.plugin.flatfilt.FlatPeaksCommand` creates a @@ -1134,62 +1118,59 @@ class TranslateFlatPeaksCommand (Command): polymer loading regions. """ def __init__(self, plugin): - plugin_arguments = [a for a in plugin._arguments - if a.name in ['input distance column', - 'input deflection column']] - arguments = [ - CurveArgument, - Argument(name='block', aliases=['set'], type='int', default=0, - help=""" -Data block for which the fit should be calculated. For an -approach/retract force curve, `0` selects the approaching curve and -`1` selects the retracting curve. -""".strip()), - ] + plugin_arguments + [ - Argument(name='input peak info name', type='string', - default='flat filter peaks', - help=""" + super(TranslateFlatPeaksCommand, self).__init__( + name='flat peaks to polymer peaks', + columns=plugin._input_columns, + arguments=[ + Argument(name='input peak info name', type='string', + default='flat filter peaks', + help=""" Name for the input peaks in the `.info` dictionary. """.strip()), - Argument(name='output peak info name', type='string', - default='polymer peaks', - help=""" + Argument(name='output peak info name', type='string', + default='polymer peaks', + help=""" Name for the ouptput peaks in the `.info` dictionary. """.strip()), - Argument(name='end offset', type='int', default=-1, - help=""" + Argument(name='end offset', type='int', default=-1, + help=""" Number of points between the end of a new peak and the start of the old. """.strip()), - Argument(name='start fraction', type='float', default=0.2, - help=""" + Argument(name='start fraction', type='float', default=0.2, + help=""" Place the start of the new peak at `start_fraction` from the end of the previous old peak to the end of the new peak. Because the first new peak will have no previous old peak, it uses a distance of zero instead. """.strip()), - ] - super(TranslateFlatPeaksCommand, self).__init__( - name='flat peaks to polymer peaks', - arguments=arguments, + ] + plugin._arguments, help=self.__doc__, plugin=plugin) def _run(self, hooke, inqueue, outqueue, params): - data = params['curve'].data[params['block']] - z_data = data[:,data.info['columns'].index( - params['input distance column'])] - d_data = data[:,data.info['columns'].index( - params['input deflection column'])] - previous_old_stop = numpy.absolute(z_data).argmin() + data = self._block(hooke, params) + dist_data = self._get_column( + hooke=hooke, params=params, column_name='distance column') + def_data = self._get_column( + hooke=hooke, params=params, column_name='deflection column') + previous_old_stop = numpy.absolute(dist_data).argmin() new = [] - for i,peak in enumerate(data.info[params['input peak info name']]): + try: + peaks = data.info[params['input peak info name']] + except KeyError, e: + raise Failure('No value for %s in %s.info (%s): %s' + % (params['input peak info name'], data.info['name'], + sorted(data.info.keys()), e)) + for i,peak in enumerate(peaks): next_old_start = peak.index stop = next_old_start + params['end offset'] - z_start = z_data[previous_old_stop] + params['start fraction']*( - z_data[stop] - z_data[previous_old_stop]) - start = numpy.absolute(z_data - z_start).argmin() + dist_start = ( + dist_data[previous_old_stop] + + params['start fraction']*( + dist_data[stop] - dist_data[previous_old_stop])) + start = numpy.absolute(dist_data - dist_start).argmin() p = Peak('polymer peak %d' % i, index=start, - values=d_data[start:stop]) + values=def_data[start:stop]) new.append(p) previous_old_stop = peak.post_index() data.info[params['output peak info name']] = new diff --git a/test/polymer_fit.py b/test/polymer_fit.py new file mode 100644 index 0000000..40d654e --- /dev/null +++ b/test/polymer_fit.py @@ -0,0 +1,99 @@ +# Copyright (C) 2010 W. Trevor King +# +# This file is part of Hooke. +# +# Hooke is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# Hooke is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General +# Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with Hooke. If not, see +# . + +""" +>>> from hooke.hooke import Hooke, HookeRunner +>>> h = Hooke() +>>> r = HookeRunner() + +Prepare a curve for polymer fitting. + +>>> h = r.run_lines(h, ['load_playlist test/data/test']) # doctest: +ELLIPSIS + +Success + +>>> h = r.run_lines(h, ['zero_surface_contact_point --block retract'] +... ) # doctest: +ELLIPSIS, +REPORT_UDIFF +{'info':...'fitted parameters': [8.413...e-08, 2.812...e-10, 158.581...],...} +Success + +>>> h = r.run_lines(h, ['polynomial_flatten --block retract --deflection_column "surface deflection (m)" --degree 1']) +Success + +>>> h = r.run_lines(h, ['convert_distance_to_force --block retract --deflection_column "flattened deflection (m)"']) +Success + +>>> h = r.run_lines(h, ['remove_cantilever_from_extension --block retract']) +Success + +>>> h = r.run_lines(h, ['flat_filter_peaks --block retract --min_points 1'] +... ) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE +[, + , + , + , + , + ] +Success + + +Fit the flat filter peaks with a polymer tension. + +>>> h = r.run_lines(h, ['flat_peaks_to_polymer_peaks --block retract']) +Success + +>>> h = r.run_lines(h, ['polymer_fit_peaks --block retract']) +Success + + +Check the results. + +>>> curve = h.playlists.current().current() +>>> retract = curve.data[1] +>>> retract.info['columns'] # doctest: +NORMALIZE_WHITESPACE +['z piezo (m)', 'deflection (m)', + 'surface distance (m)', 'surface deflection (m)', + 'flattened deflection (m)', 'deflection (N)', + 'cantilever adjusted extension (m)', 'flat filter peaks (m)', + 'polymer peak 0 (N)', 'polymer peak 1 (N)', 'polymer peak 2 (N)', + 'polymer peak 3 (N)', 'polymer peak 4 (N)', 'polymer peak 5 (N)'] +>>> retract[:5,-2:] +Data([[ NaN, NaN], + [ NaN, NaN], + [ NaN, NaN], + [ NaN, NaN], + [ NaN, NaN]]) +>>> retract[1097:1103,-2:] # doctest: +ELLIPSIS +Data([[ NaN, 5.220...e-10], + [ NaN, 5.592...e-10], + [ NaN, 6.104...e-10], + [ NaN, 6.261...e-10], + [ NaN, 7.058...e-10], + [ NaN, NaN]]) +>>> retract[-5:,-2:] +Data([[ NaN, NaN], + [ NaN, NaN], + [ NaN, NaN], + [ NaN, NaN], + [ NaN, NaN]]) +""" -- 2.26.2