"""
import copy
-import Queue
+from Queue import Queue
import StringIO
import sys
from ..command import Command, Argument, Failure
from ..config import Setting
from ..curve import Data
-from ..plugin import Plugin
+from ..plugin import Plugin, argument_to_setting
from ..util.callback import is_iterable
from ..util.fit import PoorFit, ModelFitter
from ..util.si import join_data_label, split_data_label
... 'x data (m)': x_data,
... }
>>> model = FJC(d_data, info=info, rescale=True)
- >>> outqueue = Queue.Queue()
+ >>> outqueue = Queue()
>>> Lp,a = model.fit(outqueue=outqueue)
>>> fit_info = outqueue.get(block=False)
>>> model.L(Lp) # doctest: +ELLIPSIS
... 'Gibbs free energy difference (Gp - Gh) (kBT)': kwargs['dG'],
... }
>>> model = FJC_PEG(d_data, info=info, rescale=True)
- >>> outqueue = Queue.Queue()
+ >>> outqueue = Queue()
>>> Nr,a = model.fit(outqueue=outqueue)
>>> fit_info = outqueue.get(block=False)
>>> model.L(Nr) # doctest: +ELLIPSIS
... 'x data (m)': x_data,
... }
>>> model = WLC(d_data, info=info, rescale=True)
- >>> outqueue = Queue.Queue()
+ >>> outqueue = Queue()
>>> Lp,p = model.fit(outqueue=outqueue)
>>> fit_info = outqueue.get(block=False)
>>> model.L(Lp) # doctest: +ELLIPSIS
"""
def __init__(self):
super(PolymerFitPlugin, self).__init__(name='polymer_fit')
- self._commands = [PolymerFitCommand(self),]
-
- def dependencies(self):
- return ['vclamp']
-
- def default_settings(self):
- return [
- Setting(section=self.setting_section, help=self.__doc__),
- Setting(section=self.setting_section,
- option='polymer model',
- value='WLC',
- help="Select the default polymer model for 'polymer fit'. See the documentation for descriptions of available algorithms."),
- Setting(section=self.setting_section,
- option='FJC Kuhn length',
- value=4e-10, type='float',
+ self._arguments = [ # For Command initialization
+ Argument('polymer model', default='WLC', help="""
+Select the default polymer model for 'polymer fit'. See the
+documentation for descriptions of available algorithms.
+""".strip()),
+ Argument('FJC Kuhn length', type='float', default=4e-10,
help='Kuhn length in meters'),
- Setting(section=self.setting_section,
- option='FJC-PEG Kuhn length',
- value=4e-10, type='float',
+ Argument('FJC_PEG Kuhn length', type='float', default=4e-10,
help='Kuhn length in meters'),
- Setting(section=self.setting_section,
- option='FJC-PEG elasticity',
- value=150.0, type='float',
+ Argument('FJC_PEG elasticity', type='float', default=150.0,
help='Elasticity of a PEG segment in Newtons per meter.'),
- Setting(section=self.setting_section,
- option='FJC-PEG delta G',
- value=3.0, type='float',
- help='Gibbs free energy difference between trans-trans-trans (ttt) and trans-trans-gauche (ttg) PEG states in units of kBT.'),
- Setting(section=self.setting_section,
- option='FJC-PEG L_helical',
- value=2.8e-10, type='float',
+ Argument('FJC_PEG delta G', type='float', default=3.0, help="""
+Gibbs free energy difference between trans-trans-trans (ttt) and
+trans-trans-gauche (ttg) PEG states in units of kBT.
+""".strip()),
+ Argument('FJC_PEG L_helical', type='float', default=2.8e-10,
help='Contour length of PEG in the ttg state.'),
- Setting(section=self.setting_section,
- option='FJC-PEG L_planar',
- value=3.58e-10, type='float',
+ Argument('FJC_PEG L_planar', type='float', default=3.58e-10,
help='Contour length of PEG in the ttt state.'),
- Setting(section=self.setting_section,
- option='WLC persistence length',
- value=4e-10, type='float',
- help='Persistence length in meters'),
+ Argument('WLC persistence length', type='float', default=4e-10,
+ help='Persistence length in meters'),
+ ]
+ self._settings = [
+ Setting(section=self.setting_section, help=self.__doc__)]
+ for argument in self._arguments:
+ self._settings.append(argument_to_setting(
+ self.setting_section, argument))
+ argument.default = None # if argument isn't given, use the config.
+ self._arguments.extend([ # Non-configurable arguments
+ Argument(name='input distance column', type='string',
+ default='cantilever adjusted extension (m)',
+ help="""
+Name of the column to use as the surface position input.
+""".strip()),
+ Argument(name='input deflection column', type='string',
+ default='deflection (N)',
+ help="""
+Name of the column to use as the deflection input.
+""".strip()),
+ ])
+ self._commands = [
+ PolymerFitCommand(self), PolymerFitPeaksCommand(self),
]
+ def dependencies(self):
+ return ['vclamp']
+
+ def default_settings(self):
+ return self._settings
+
class PolymerFitCommand (Command):
"""Polymer model (WLC, FJC, etc.) fitting.
help="""
Indicies of points bounding the selected data.
""".strip()),
- Argument(name='input distance column', type='string',
- default='cantilever adjusted extension (m)',
- help="""
-Name of the column to use as the surface position input.
-""".strip()),
- Argument(name='input deflection column', type='string',
- default='deflection (N)',
- help="""
-Name of the column to use as the deflection input.
-""".strip()),
+ ] + plugin._arguments + [
Argument(name='output tension column', type='string',
default='polymer tension',
help="""
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
+ print params['curve'], id(params['curve']), 'PFC'
+ for key,value in params.items():
+ if value == None: # Use configured default value.
+ params[key] = self.plugin.config[key]
scale(hooke, params['curve'], params['block']) # TODO: is autoscaling a good idea? (explicit is better than implicit)
data = params['curve'].data[params['block']]
# HACK? rely on params['curve'] being bound to the local hooke
# curve through the queue). Ugh. Stupid queues. As an
# alternative, we could pass lookup information through the
# queue...
- model = self.plugin.config['polymer model']
+ model = params['polymer model']
new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
new.info = copy.deepcopy(data.info)
new[:,:-1] = data
new.info['columns'].append(
join_data_label(params['output tension column'], 'N'))
+ print 'add column', id(params['curve']), new.info['columns'][-1], 'to', new.info['columns'][:-1]
z_data = data[:,data.info['columns'].index(
params['input distance column'])]
d_data = data[:,data.info['columns'].index(
params['input deflection column'])]
start,stop = params['bounds']
tension_data,ps = self.fit_polymer_model(
- params['curve'], z_data, d_data, start, stop, outqueue)
+ params, z_data, d_data, start, stop, outqueue)
new.info[params['fit parameters info name']] = ps
new.info[params['fit parameters info name']]['model'] = model
+ print 'add info', params['fit parameters info name']
new[:,-1] = tension_data
params['curve'].data[params['block']] = new
- def fit_polymer_model(self, curve, z_data, d_data, start, stop,
+ def fit_polymer_model(self, params, z_data, d_data, start, stop,
outqueue=None):
"""Railyard for the `fit_*_model` family.
appropriate backend algorithm.
"""
fn = getattr(self, 'fit_%s_model'
- % self.plugin.config['polymer model'].replace('-','_'))
- return fn(curve, z_data, d_data, start, stop, outqueue)
+ % params['polymer model'].replace('-','_'))
+ return fn(params, z_data, d_data, start, stop, outqueue)
- def fit_FJC_model(self, curve, z_data, d_data, start, stop,
+ def fit_FJC_model(self, params, z_data, d_data, start, stop,
outqueue=None):
"""Fit the data with :class:`FJC`.
"""
}
if True: # TODO: optionally free persistence length
info['Kuhn length (m)'] = (
- self.plugin.config['FJC Kuhn length'])
+ params['FJC Kuhn length'])
model = FJC(d_data[start:stop], info=info, rescale=True)
- queue = Queue.Queue()
+ queue = Queue()
params = model.fit(outqueue=queue)
if True: # TODO: if Kuhn length fixed
params = [params]
return [FJC_fn(z_data, T=T, L=L, a=a) * mask,
fit_info]
- def fit_FJC_PEG_model(self, curve, z_data, d_data, start, stop,
+ def fit_FJC_PEG_model(self, params, z_data, d_data, start, stop,
outqueue=None):
"""Fit the data with :class:`FJC_PEG`.
"""
}
if True: # TODO: optionally free persistence length
info['Kuhn length (m)'] = (
- self.plugin.config['FJC Kuhn length'])
+ params['FJC Kuhn length'])
model = FJC(d_data[start:stop], info=info, rescale=True)
- queue = Queue.Queue()
+ queue = Queue()
params = model.fit(outqueue=queue)
if True: # TODO: if Kuhn length fixed
params = [params]
return [FJC_PEG_fn(z_data, **kwargs) * mask,
fit_info]
- def fit_WLC_model(self, curve, z_data, d_data, start, stop,
+ def fit_WLC_model(self, params, z_data, d_data, start, stop,
outqueue=None):
"""Fit the data with :class:`WLC`.
"""
}
if True: # TODO: optionally free persistence length
info['persistence length (m)'] = (
- self.plugin.config['WLC persistence length'])
+ params['WLC persistence length'])
model = WLC(d_data[start:stop], info=info, rescale=True)
- queue = Queue.Queue()
+ queue = Queue()
params = model.fit(outqueue=queue)
if True: # TODO: if persistence length fixed
params = [params]
fit_info]
+
+class PolymerFitPeaksCommand (Command):
+ """Polymer model (WLC, FJC, etc.) fitting.
+
+ Use :class:`PolymerFitCommand` to fit the each peak in a list of
+ previously determined peaks.
+ """
+ def __init__(self, plugin):
+ super(PolymerFitPeaksCommand, self).__init__(
+ name='polymer fit peaks',
+ arguments=[
+ CurveArgument,
+ Argument(name='block', aliases=['set'], type='int', default=0,
+ help="""
+Data block for which the fit should be calculated. For an
+approach/retract force curve, `0` selects the approaching curve and
+`1` selects the retracting curve.
+""".strip()),
+ Argument(name='peak info name', type='string',
+ default='flat filter peaks',
+ help="""
+Name for storing the distance offset in the `.info` dictionary.
+""".strip()),
+ Argument(name='peak index', type='int', count=-1, default=None,
+ help="""
+Index of the selected peak in the list of peaks. Use `None` to fit all peaks.
+""".strip()),
+ ] + plugin._arguments,
+ help=self.__doc__, plugin=plugin)
+
+ def _run(self, hooke, inqueue, outqueue, params):
+ print params['curve'], id(params['curve']), 'PFPC'
+ data = params['curve'].data[params['block']]
+ fit_command = [c for c in hooke.commands
+ if c.name=='polymer fit'][0]
+ inq = Queue()
+ outq = Queue()
+ p = copy.deepcopy(params)
+ p['curve'] = params['curve']
+ del(p['peak info name'])
+ del(p['peak index'])
+ for i,peak in enumerate(data.info[params['peak info name']]):
+ if params['peak index'] == None or i in params['peak index']:
+ p['bounds'] = [peak.index, peak.post_index()]
+ p['output tension column'] = peak.name
+ p['fit parameters info name'] = peak.name
+ fit_command.run(hooke, inq, outq, **p)
+
# TODO:
# def dist2fit(self):
# '''Calculates the average distance from data to fit, scaled by