"""
import copy
-import Queue
+from Queue import Queue
import StringIO
import sys
import numpy
from scipy.optimize import newton
-from ..command import Command, Argument, Failure
+from ..command import Command, Argument, Success, Failure
from ..config import Setting
from ..curve import Data
-from ..plugin import Plugin
-from ..util.fit import PoorFit, ModelFitter
from ..util.callback import is_iterable
-from .curve import CurveArgument
-from .vclamp import scale
+from ..util.fit import PoorFit, ModelFitter
+from ..util.peak import Peak
+from ..util.si import join_data_label, split_data_label
+from . import Plugin, argument_to_setting
+from .curve import ColumnAccessCommand, ColumnAddingCommand
kB = 1.3806503e-23 # Joules/Kelvin
... 'x data (m)': x_data,
... }
>>> model = FJC(d_data, info=info, rescale=True)
- >>> outqueue = Queue.Queue()
- >>> Lp,a = model.fit(outqueue=outqueue)
+ >>> outqueue = Queue()
+ >>> L,a = model.fit(outqueue=outqueue)
>>> fit_info = outqueue.get(block=False)
- >>> model.L(Lp) # doctest: +ELLIPSIS
- 3.500...e-08
- >>> a # doctest: +ELLIPSIS
- 2.499...e-10
+ >>> print L
+ 3.5e-08
+ >>> print a
+ 2.5e-10
Fit the example data with a one-parameter fit (`L`). We introduce
some error in our fixed Kuhn length for fun.
>>> info['Kuhn length (m)'] = 2*a
>>> model = FJC(d_data, info=info, rescale=True)
- >>> Lp = model.fit(outqueue=outqueue)
+ >>> L = model.fit(outqueue=outqueue)
>>> fit_info = outqueue.get(block=False)
- >>> model.L(Lp) # doctest: +ELLIPSIS
+ >>> print L # doctest: +ELLIPSIS
3.199...e-08
"""
def Lp(self, L):
self._model_data[:] = FJC_fn(x_data, T, L, a)
return self._model_data
+ def fit(self, *args, **kwargs):
+ params = super(FJC, self).fit(*args, **kwargs)
+ if is_iterable(params):
+ params[0] = self.L(params[0]) # convert Lp -> L
+ params[1] = abs(params[1]) # take the absolute value of `a`
+ else: # params is a float
+ params = self.L(params) # convert Lp -> L
+ return params
+
def guess_initial_params(self, outqueue=None):
"""Guess initial fitting parameters.
return [Lp]
return [Lp, a]
- def guess_scale(self, params, outqueue=None):
- """Guess parameter scales.
-
- Returns
- -------
- Lp_scale : float
- A guess at the reparameterized contour length scale in meters.
- a_scale : float (optional)
- A guess at the Kuhn length in meters. If the length of
- `params` is less than 2, `a_scale` is not returned.
- """
- Lp_scale = 1.0
- if len(params) == 1:
- return [Lp_scale]
- return [Lp_scale] + [x/10.0 for x in params[1:]]
-
def inverse_FJC_PEG_fn(F_data, T=300, N=1, k=150., Lp=3.58e-10, Lh=2.8e-10, dG=3., a=7e-10):
"""Inverse poly(ethylene-glycol) adjusted extended FJC model.
... 'Gibbs free energy difference (Gp - Gh) (kBT)': kwargs['dG'],
... }
>>> model = FJC_PEG(d_data, info=info, rescale=True)
- >>> outqueue = Queue.Queue()
- >>> Nr,a = model.fit(outqueue=outqueue)
+ >>> outqueue = Queue()
+ >>> N,a = model.fit(outqueue=outqueue)
>>> fit_info = outqueue.get(block=False)
- >>> model.L(Nr) # doctest: +ELLIPSIS
- 122.999...
- >>> a # doctest: +ELLIPSIS
- 6.999...e-10
+ >>> print N
+ 123.0
+ >>> print a
+ 7e-10
Fit the example data with a one-parameter fit (`N`). We introduce
some error in our fixed Kuhn length for fun.
>>> info['Kuhn length (m)'] = 2*kwargs['a']
>>> model = FJC_PEG(d_data, info=info, rescale=True)
- >>> Nr = model.fit(outqueue=outqueue)
+ >>> N = model.fit(outqueue=outqueue)
>>> fit_info = outqueue.get(block=False)
- >>> model.L(Nr) # doctest: +ELLIPSIS
+ >>> print N # doctest: +ELLIPSIS
96.931...
"""
def Lr(self, L):
self._model_data[:] = FJC_PEG_fn(x_data, N=N, a=a, **kwargs)
return self._model_data
+ def fit(self, *args, **kwargs):
+ params = super(FJC_PEG, self).fit(*args, **kwargs)
+ if is_iterable(params):
+ params[0] = self.L(params[0]) # convert Nr -> N
+ params[1] = abs(params[1]) # take the absolute value of `a`
+ else: # params is a float
+ params = self.L(params) # convert Nr -> N
+ return params
+
def guess_initial_params(self, outqueue=None):
"""Guess initial fitting parameters.
return [Nr]
return [Nr, a]
- def guess_scale(self, params, outqueue=None):
- """Guess parameter scales.
-
- Returns
- -------
- N_scale : float
- A guess at the section count scale in meters.
- a_scale : float (optional)
- A guess at the Kuhn length in meters. If the length of
- `params` is less than 2, `a_scale` is not returned.
- """
- return [x/10.0 for x in params]
-
def WLC_fn(x_data, T, L, p):
"""The worm like chain interpolation model.
... 'x data (m)': x_data,
... }
>>> model = WLC(d_data, info=info, rescale=True)
- >>> outqueue = Queue.Queue()
- >>> Lp,p = model.fit(outqueue=outqueue)
+ >>> outqueue = Queue()
+ >>> L,p = model.fit(outqueue=outqueue)
>>> fit_info = outqueue.get(block=False)
- >>> model.L(Lp) # doctest: +ELLIPSIS
- 3.500...e-08
- >>> p # doctest: +ELLIPSIS
- 2.500...e-10
+ >>> print L
+ 3.5e-08
+ >>> print p
+ 2.5e-10
Fit the example data with a one-parameter fit (`L`). We introduce
some error in our fixed persistence length for fun.
>>> info['persistence length (m)'] = 2*p
>>> model = WLC(d_data, info=info, rescale=True)
- >>> Lp = model.fit(outqueue=outqueue)
+ >>> L = model.fit(outqueue=outqueue)
>>> fit_info = outqueue.get(block=False)
- >>> model.L(Lp) # doctest: +ELLIPSIS
+ >>> print L # doctest: +ELLIPSIS
3.318...e-08
"""
def Lp(self, L):
self._model_data[:] = WLC_fn(x_data, T, L, p)
return self._model_data
+ def fit(self, *args, **kwargs):
+ params = super(WLC, self).fit(*args, **kwargs)
+ if is_iterable(params):
+ params[0] = self.L(params[0]) # convert Lp -> L
+ params[1] = abs(params[1]) # take the absolute value of `p`
+ else: # params is a float
+ params = self.L(params) # convert Lp -> L
+ return params
+
def guess_initial_params(self, outqueue=None):
"""Guess initial fitting parameters.
return [Lp]
return [Lp, p]
- def guess_scale(self, params, outqueue=None):
- """Guess parameter scales.
-
- Returns
- -------
- Lp_scale : float
- A guess at the reparameterized contour length scale in meters.
- p_scale : float (optional)
- A guess at the persistence length in meters. If the
- length of `params` is less than 2, `p_scale` is not
- returned.
- """
- Lp_scale = 1.0
- if len(params) == 1:
- return [Lp_scale]
- return [Lp_scale] + [x/10.0 for x in params[1:]]
-
class PolymerFitPlugin (Plugin):
"""Polymer model (WLC, FJC, etc.) fitting.
"""
def __init__(self):
super(PolymerFitPlugin, self).__init__(name='polymer_fit')
- self._commands = [PolymerFitCommand(self),]
-
- def dependencies(self):
- return ['vclamp']
-
- def default_settings(self):
- return [
- Setting(section=self.setting_section, help=self.__doc__),
- Setting(section=self.setting_section,
- option='polymer model',
- value='WLC',
- help="Select the default polymer model for 'polymer fit'. See the documentation for descriptions of available algorithms."),
- Setting(section=self.setting_section,
- option='FJC Kuhn length',
- value=4e-10, type='float',
+ self._arguments = [ # For Command initialization
+ Argument('polymer model', default='WLC', help="""
+Select the default polymer model for 'polymer fit'. See the
+documentation for descriptions of available algorithms.
+""".strip()),
+ Argument('FJC Kuhn length', type='float', default=4e-10,
help='Kuhn length in meters'),
- Setting(section=self.setting_section,
- option='FJC-PEG Kuhn length',
- value=4e-10, type='float',
+ Argument('FJC_PEG Kuhn length', type='float', default=4e-10,
help='Kuhn length in meters'),
- Setting(section=self.setting_section,
- option='FJC-PEG elasticity',
- value=150.0, type='float',
+ Argument('FJC_PEG elasticity', type='float', default=150.0,
help='Elasticity of a PEG segment in Newtons per meter.'),
- Setting(section=self.setting_section,
- option='FJC-PEG delta G',
- value=3.0, type='float',
- help='Gibbs free energy difference between trans-trans-trans (ttt) and trans-trans-gauche (ttg) PEG states in units of kBT.'),
- Setting(section=self.setting_section,
- option='FJC-PEG L_helical',
- value=2.8e-10, type='float',
+ Argument('FJC_PEG delta G', type='float', default=3.0, help="""
+Gibbs free energy difference between trans-trans-trans (ttt) and
+trans-trans-gauche (ttg) PEG states in units of kBT.
+""".strip()),
+ Argument('FJC_PEG L_helical', type='float', default=2.8e-10,
help='Contour length of PEG in the ttg state.'),
- Setting(section=self.setting_section,
- option='FJC-PEG L_planar',
- value=3.58e-10, type='float',
+ Argument('FJC_PEG L_planar', type='float', default=3.58e-10,
help='Contour length of PEG in the ttt state.'),
- Setting(section=self.setting_section,
- option='WLC persistence length',
- value=4e-10, type='float',
- help='Persistence length in meters'),
+ Argument('WLC persistence length', type='float', default=4e-10,
+ help='Persistence length in meters'),
+ ]
+ self._settings = [
+ Setting(section=self.setting_section, help=self.__doc__)]
+ for argument in self._arguments:
+ self._settings.append(argument_to_setting(
+ self.setting_section, argument))
+ argument.default = None # if argument isn't given, use the config.
+ self._input_columns = [
+ ('distance column', 'cantilever adjusted extension (m)',
+ """
+Name of the column to use as the surface position input.
+""".strip()),
+ ('deflection column', 'deflection (N)',
+ """
+Name of the column to use as the deflection input.
+""".strip()),
+ ]
+ self._commands = [
+ PolymerFitCommand(self), PolymerFitPeaksCommand(self),
+ TranslateFlatPeaksCommand(self),
]
+ def dependencies(self):
+ return ['vclamp']
-class PolymerFitCommand (Command):
+ def default_settings(self):
+ return self._settings
+
+
+class PolymerFitCommand (ColumnAddingCommand):
"""Polymer model (WLC, FJC, etc.) fitting.
Fits an entropic elasticity function to a given chunk of the
def __init__(self, plugin):
super(PolymerFitCommand, self).__init__(
name='polymer fit',
+ columns=plugin._input_columns,
+ new_columns=[
+ ('output tension column', 'polymer tension',
+ """
+Name of the column (without units) to use as the polymer tension output.
+""".strip()),
+ ],
arguments=[
- CurveArgument,
- Argument(name='block', aliases=['set'], type='int', default=0,
+ Argument(name='fit parameters info name', type='string',
+ default='polymer fit',
help="""
-Data block for which the fit should be calculated. For an
-approach/retract force curve, `0` selects the approaching curve and
-`1` selects the retracting curve.
+Name (without units) for storing the fit parameters in the `.info` dictionary.
""".strip()),
Argument(name='bounds', type='point', optional=False, count=2,
help="""
Indicies of points bounding the selected data.
""".strip()),
- ],
+ ] + plugin._arguments,
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- scale(hooke, params['curve'], params['block']) # TODO: is autoscaling a good idea? (explicit is better than implicit)
- data = params['curve'].data[params['block']]
- # HACK? rely on params['curve'] being bound to the local hooke
- # playlist (i.e. not a copy, as you would get by passing a
- # curve through the queue). Ugh. Stupid queues. As an
- # alternative, we could pass lookup information through the
- # queue...
- model = self.plugin.config['polymer model']
- new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-1] = data
- new.info['columns'].append('%s tension (N)' % model) # TODO: WLC fit for each peak, etc.
- z_data = data[:,data.info['columns'].index(
- 'cantilever adjusted extension (m)')]
- d_data = data[:,data.info['columns'].index('deflection (N)')]
+ params = self._setup_params(hooke, params)
+ data = self._block(hooke, params)
+ model = params['polymer model']
+ dist_data = self._get_column(
+ hooke=hooke, params=params, column_name='distance column')
+ def_data = self._get_column(
+ hooke=hooke, params=params, column_name='deflection column')
start,stop = params['bounds']
tension_data,ps = self.fit_polymer_model(
- params['curve'], z_data, d_data, start, stop, outqueue)
- new.info['%s polymer fit parameters' % model] = ps
- new[:,-1] = tension_data
- params['curve'].data[params['block']] = new
-
- def fit_polymer_model(self, curve, z_data, d_data, start, stop,
+ params, dist_data, def_data, start, stop, outqueue)
+ data.info[params['fit parameters info name']] = ps
+ data.info[params['fit parameters info name']]['model'] = model
+ self._set_column(hooke=hooke, params=params,
+ column_name='output tension column',
+ values=tension_data)
+
+ def _setup_params(self, hooke, params):
+ for key,value in params.items():
+ if value == None: # Use configured default value.
+ params[key] = self.plugin.config[key]
+ name,def_unit = split_data_label(params['deflection column'])
+ params['output tension column'] = join_data_label(
+ params['output tension column'], def_unit)
+ return params
+
+ def fit_polymer_model(self, params, dist_data, def_data, start, stop,
outqueue=None):
"""Railyard for the `fit_*_model` family.
appropriate backend algorithm.
"""
fn = getattr(self, 'fit_%s_model'
- % self.plugin.config['polymer model'].replace('-','_'))
- return fn(curve, z_data, d_data, start, stop, outqueue)
+ % params['polymer model'].replace('-','_'))
+ return fn(params, dist_data, def_data, start, stop, outqueue)
- def fit_FJC_model(self, curve, z_data, d_data, start, stop,
+ def fit_FJC_model(self, params, dist_data, def_data, start, stop,
outqueue=None):
"""Fit the data with :class:`FJC`.
"""
info = {
'temperature (K)': self.plugin.config['temperature'],
- 'x data (m)': z_data[start:stop],
+ 'x data (m)': dist_data[start:stop],
}
if True: # TODO: optionally free persistence length
info['Kuhn length (m)'] = (
- self.plugin.config['FJC Kuhn length'])
- model = FJC(d_data[start:stop], info=info, rescale=True)
- queue = Queue.Queue()
+ params['FJC Kuhn length'])
+ model = FJC(def_data[start:stop], info=info, rescale=True)
+ queue = Queue()
params = model.fit(outqueue=queue)
if True: # TODO: if Kuhn length fixed
params = [params]
a = info['Kuhn length (m)']
else:
a = params[1]
- Lp = params[0]
- L = model.L(Lp)
+ L = params[0]
T = info['temperature (K)']
fit_info = queue.get(block=False)
- mask = numpy.zeros(z_data.shape, dtype=numpy.bool)
- mask[start:stop] = True
- return [FJC_fn(z_data, T=T, L=L, a=a) * mask,
- fit_info]
+ f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan
+ f_data[start:stop] = FJC_fn(dist_data[start:stop], T=T, L=L, a=a)
+ return [f_data, fit_info]
- def fit_FJC_PEG_model(self, curve, z_data, d_data, start, stop,
+ def fit_FJC_PEG_model(self, params, dist_data, def_data, start, stop,
outqueue=None):
"""Fit the data with :class:`FJC_PEG`.
"""
info = {
'temperature (K)': self.plugin.config['temperature'],
- 'x data (m)': z_data[start:stop],
+ 'x data (m)': dist_data[start:stop],
# TODO: more info
}
if True: # TODO: optionally free persistence length
info['Kuhn length (m)'] = (
- self.plugin.config['FJC Kuhn length'])
- model = FJC(d_data[start:stop], info=info, rescale=True)
- queue = Queue.Queue()
+ params['FJC Kuhn length'])
+ model = FJC_PEG(def_data[start:stop], info=info, rescale=True)
+ queue = Queue()
params = model.fit(outqueue=queue)
if True: # TODO: if Kuhn length fixed
params = [params]
a = info['Kuhn length (m)']
else:
a = params[1]
- Nr = params[0]
- N = model.L(Nr)
+ N = params[0]
T = info['temperature (K)']
fit_info = queue.get(block=False)
- mask = numpy.zeros(z_data.shape, dtype=numpy.bool)
- mask[start:stop] = True
- return [FJC_PEG_fn(z_data, **kwargs) * mask,
- fit_info]
+ f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan
+ f_data[start:stop] = FJC_PEG_fn(dist_data[start:stop], **kwargs)
+ return [f_data, fit_info]
- def fit_WLC_model(self, curve, z_data, d_data, start, stop,
+ def fit_WLC_model(self, params, dist_data, def_data, start, stop,
outqueue=None):
"""Fit the data with :class:`WLC`.
"""
info = {
'temperature (K)': self.plugin.config['temperature'],
- 'x data (m)': z_data[start:stop],
+ 'x data (m)': dist_data[start:stop],
}
if True: # TODO: optionally free persistence length
info['persistence length (m)'] = (
- self.plugin.config['WLC persistence length'])
- model = WLC(d_data[start:stop], info=info, rescale=True)
- queue = Queue.Queue()
+ params['WLC persistence length'])
+ model = WLC(def_data[start:stop], info=info, rescale=True)
+ queue = Queue()
params = model.fit(outqueue=queue)
if True: # TODO: if persistence length fixed
params = [params]
p = info['persistence length (m)']
else:
p = params[1]
- Lp = params[0]
- L = model.L(Lp)
+ info['persistence length (m)'] = p
+ L = params[0]
+ info['contour length (m)'] = L
T = info['temperature (K)']
fit_info = queue.get(block=False)
- mask = numpy.zeros(z_data.shape, dtype=numpy.bool)
- mask[start:stop] = True
- return [WLC_fn(z_data, T=T, L=L, p=p) * mask,
- fit_info]
+ info['fit'] = fit_info
+ info.pop('x data (m)')
+ f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan
+ f_data[start:stop] = WLC_fn(dist_data[start:stop], T=T, L=L, p=p)
+ return [f_data, info]
+
+
+class PolymerFitPeaksCommand (ColumnAccessCommand):
+ """Polymer model (WLC, FJC, etc.) fitting.
+
+ Use :class:`PolymerFitCommand` to fit the each peak in a list of
+ previously determined peaks.
+ """
+ def __init__(self, plugin):
+ super(PolymerFitPeaksCommand, self).__init__(
+ name='polymer fit peaks',
+ columns=plugin._input_columns,
+ arguments=[
+ Argument(name='peak info name', type='string',
+ default='polymer peaks',
+ help="""
+Name for the list of peaks in the `.info` dictionary.
+""".strip()),
+ Argument(name='peak index', type='int', count=-1, default=None,
+ help="""
+Index of the selected peak in the list of peaks. Use `None` to fit all peaks.
+""".strip()),
+ ] + plugin._arguments,
+ help=self.__doc__, plugin=plugin)
+
+ def _run(self, hooke, inqueue, outqueue, params):
+ data = self._block(hooke, params)
+ fit_command = hooke.command_by_name['polymer fit']
+ inq = Queue()
+ outq = Queue()
+ curve = params['curve']
+ params['curve'] = None
+ p = copy.deepcopy(params)
+ p['curve'] = params['curve']
+ del(p['peak info name'])
+ del(p['peak index'])
+ for i,peak in enumerate(data.info[params['peak info name']]):
+ if params['peak index'] == None or i in params['peak index']:
+ p['bounds'] = [peak.index, peak.post_index()]
+ p['output tension column'] = peak.name
+ p['fit parameters info name'] = peak.name
+ fit_command.run(hooke, inq, outq, **p)
+ ret = outq.get()
+ if not isinstance(ret, Success):
+ raise ret
+
+
+class TranslateFlatPeaksCommand (ColumnAccessCommand):
+ """Translate flat filter peaks into polymer peaks for fitting.
+
+ Use :class:`~hooke.plugin.flatfilt.FlatPeaksCommand` creates a
+ list of peaks for regions with large derivatives. For velocity
+ clamp measurements, these regions are usually the rebound phase
+ after a protein domain unfolds, the cantilever detaches, etc.
+ Because these features occur after the polymer loading phase, we
+ need to shift the selected regions back to align them with the
+ polymer loading regions.
+ """
+ def __init__(self, plugin):
+ super(TranslateFlatPeaksCommand, self).__init__(
+ name='flat peaks to polymer peaks',
+ columns=plugin._input_columns,
+ arguments=[
+ Argument(name='input peak info name', type='string',
+ default='flat filter peaks',
+ help="""
+Name for the input peaks in the `.info` dictionary.
+""".strip()),
+ Argument(name='output peak info name', type='string',
+ default='polymer peaks',
+ help="""
+Name for the ouptput peaks in the `.info` dictionary.
+""".strip()),
+ Argument(name='end offset', type='int', default=-1,
+ help="""
+Number of points between the end of a new peak and the start of the old.
+""".strip()),
+ Argument(name='start fraction', type='float', default=0.2,
+ help="""
+Place the start of the new peak at `start_fraction` from the end of
+the previous old peak to the end of the new peak. Because the first
+new peak will have no previous old peak, it uses a distance of zero
+instead.
+""".strip()),
+ ] + plugin._arguments,
+ help=self.__doc__, plugin=plugin)
+
+ def _run(self, hooke, inqueue, outqueue, params):
+ data = self._block(hooke, params)
+ dist_data = self._get_column(
+ hooke=hooke, params=params, column_name='distance column')
+ def_data = self._get_column(
+ hooke=hooke, params=params, column_name='deflection column')
+ previous_old_stop = numpy.absolute(dist_data).argmin()
+ new = []
+ try:
+ peaks = data.info[params['input peak info name']]
+ except KeyError, e:
+ raise Failure('No value for %s in %s.info (%s): %s'
+ % (params['input peak info name'], data.info['name'],
+ sorted(data.info.keys()), e))
+ for i,peak in enumerate(peaks):
+ next_old_start = peak.index
+ stop = next_old_start + params['end offset']
+ dist_start = (
+ dist_data[previous_old_stop]
+ + params['start fraction']*(
+ dist_data[stop] - dist_data[previous_old_stop]))
+ start = numpy.absolute(dist_data - dist_start).argmin()
+ p = Peak('polymer peak %d' % i,
+ index=start,
+ values=def_data[start:stop])
+ new.append(p)
+ previous_old_stop = peak.post_index()
+ data.info[params['output peak info name']] = new
# TODO: