from ..util.peak import Peak
from ..util.si import join_data_label, split_data_label
from . import Plugin, argument_to_setting
-from .curve import CurveArgument
-from .vclamp import scale
+from .curve import ColumnAccessCommand, ColumnAddingCommand
kB = 1.3806503e-23 # Joules/Kelvin
self._settings.append(argument_to_setting(
self.setting_section, argument))
argument.default = None # if argument isn't given, use the config.
- self._arguments.extend([ # Non-configurable arguments
- Argument(name='input distance column', type='string',
- default='cantilever adjusted extension (m)',
- help="""
+ self._input_columns = [
+ ('distance column', 'cantilever adjusted extension (m)',
+ """
Name of the column to use as the surface position input.
""".strip()),
- Argument(name='input deflection column', type='string',
- default='deflection (N)',
- help="""
+ ('deflection column', 'deflection (N)',
+ """
Name of the column to use as the deflection input.
""".strip()),
- ])
+ ]
self._commands = [
PolymerFitCommand(self), PolymerFitPeaksCommand(self),
TranslateFlatPeaksCommand(self),
return self._settings
-class PolymerFitCommand (Command):
+class PolymerFitCommand (ColumnAddingCommand):
"""Polymer model (WLC, FJC, etc.) fitting.
Fits an entropic elasticity function to a given chunk of the
def __init__(self, plugin):
super(PolymerFitCommand, self).__init__(
name='polymer fit',
- arguments=[
- CurveArgument,
- Argument(name='block', aliases=['set'], type='int', default=0,
- help="""
-Data block for which the fit should be calculated. For an
-approach/retract force curve, `0` selects the approaching curve and
-`1` selects the retracting curve.
-""".strip()),
- Argument(name='bounds', type='point', optional=False, count=2,
- help="""
-Indicies of points bounding the selected data.
-""".strip()),
- ] + plugin._arguments + [
- Argument(name='output tension column', type='string',
- default='polymer tension',
- help="""
+ columns=plugin._input_columns,
+ new_columns=[
+ ('output tension column', 'polymer tension',
+ """
Name of the column (without units) to use as the polymer tension output.
""".strip()),
+ ],
+ arguments=[
Argument(name='fit parameters info name', type='string',
default='surface deflection offset',
help="""
Name (without units) for storing the fit parameters in the `.info` dictionary.
""".strip()),
- ],
+ Argument(name='bounds', type='point', optional=False, count=2,
+ help="""
+Indicies of points bounding the selected data.
+""".strip()),
+ ] + plugin._arguments,
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- for key,value in params.items():
- if value == None: # Use configured default value.
- params[key] = self.plugin.config[key]
- scale(hooke, params['curve'], params['block']) # TODO: is autoscaling a good idea? (explicit is better than implicit)
- data = params['curve'].data[params['block']]
- # HACK? rely on params['curve'] being bound to the local hooke
- # playlist (i.e. not a copy, as you would get by passing a
- # curve through the queue). Ugh. Stupid queues. As an
- # alternative, we could pass lookup information through the
- # queue...
+ params = self.__setup_params(hooke, params)
+ data = self._block(hooke, params)
model = params['polymer model']
- new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-1] = data
- new.info['columns'].append(
- join_data_label(params['output tension column'], 'N'))
- z_data = data[:,data.info['columns'].index(
- params['input distance column'])]
- d_data = data[:,data.info['columns'].index(
- params['input deflection column'])]
+ dist_data = self._get_column(
+ hooke=hooke, params=params, column_name='distance column')
+ def_data = self._get_column(
+ hooke=hooke, params=params, column_name='deflection column')
start,stop = params['bounds']
tension_data,ps = self.fit_polymer_model(
- params, z_data, d_data, start, stop, outqueue)
- new.info[params['fit parameters info name']] = ps
- new.info[params['fit parameters info name']]['model'] = model
- new[:,-1] = tension_data
- params['curve'].data[params['block']] = new
+ params, dist_data, def_data, start, stop, outqueue)
+ data.info[params['fit parameters info name']] = ps
+ data.info[params['fit parameters info name']]['model'] = model
+ self._set_column(hooke=hooke, params=params,
+ column_name='output tension column',
+ values=tension_data)
+
+ def __setup_params(self, hooke, params):
+ for key,value in params.items():
+ if value == None: # Use configured default value.
+ params[key] = self.plugin.config[key]
+ name,def_unit = split_data_label(params['deflection column'])
+ params['output tension column'] = join_data_label(
+ params['output tension column'], def_unit)
+ return params
- def fit_polymer_model(self, params, z_data, d_data, start, stop,
+ def fit_polymer_model(self, params, dist_data, def_data, start, stop,
outqueue=None):
"""Railyard for the `fit_*_model` family.
"""
fn = getattr(self, 'fit_%s_model'
% params['polymer model'].replace('-','_'))
- return fn(params, z_data, d_data, start, stop, outqueue)
+ return fn(params, dist_data, def_data, start, stop, outqueue)
- def fit_FJC_model(self, params, z_data, d_data, start, stop,
+ def fit_FJC_model(self, params, dist_data, def_data, start, stop,
outqueue=None):
"""Fit the data with :class:`FJC`.
"""
info = {
'temperature (K)': self.plugin.config['temperature'],
- 'x data (m)': z_data[start:stop],
+ 'x data (m)': dist_data[start:stop],
}
if True: # TODO: optionally free persistence length
info['Kuhn length (m)'] = (
params['FJC Kuhn length'])
- model = FJC(d_data[start:stop], info=info, rescale=True)
+ model = FJC(def_data[start:stop], info=info, rescale=True)
queue = Queue()
params = model.fit(outqueue=queue)
if True: # TODO: if Kuhn length fixed
L = params[0]
T = info['temperature (K)']
fit_info = queue.get(block=False)
- f_data = numpy.ones(z_data.shape, dtype=z_data.dtype) * numpy.nan
- f_data[start:stop] = FJC_fn(z_data[start:stop], T=T, L=L, a=a)
+ f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan
+ f_data[start:stop] = FJC_fn(dist_data[start:stop], T=T, L=L, a=a)
return [f_data, fit_info]
- def fit_FJC_PEG_model(self, params, z_data, d_data, start, stop,
+ def fit_FJC_PEG_model(self, params, dist_data, def_data, start, stop,
outqueue=None):
"""Fit the data with :class:`FJC_PEG`.
"""
info = {
'temperature (K)': self.plugin.config['temperature'],
- 'x data (m)': z_data[start:stop],
+ 'x data (m)': dist_data[start:stop],
# TODO: more info
}
if True: # TODO: optionally free persistence length
info['Kuhn length (m)'] = (
params['FJC Kuhn length'])
- model = FJC_PEG(d_data[start:stop], info=info, rescale=True)
+ model = FJC_PEG(def_data[start:stop], info=info, rescale=True)
queue = Queue()
params = model.fit(outqueue=queue)
if True: # TODO: if Kuhn length fixed
N = params[0]
T = info['temperature (K)']
fit_info = queue.get(block=False)
- f_data = numpy.ones(z_data.shape, dtype=z_data.dtype) * numpy.nan
- f_data[start:stop] = FJC_PEG_fn(z_data[start:stop], **kwargs)
+ f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan
+ f_data[start:stop] = FJC_PEG_fn(dist_data[start:stop], **kwargs)
return [f_data, fit_info]
- def fit_WLC_model(self, params, z_data, d_data, start, stop,
+ def fit_WLC_model(self, params, dist_data, def_data, start, stop,
outqueue=None):
"""Fit the data with :class:`WLC`.
"""
info = {
'temperature (K)': self.plugin.config['temperature'],
- 'x data (m)': z_data[start:stop],
+ 'x data (m)': dist_data[start:stop],
}
if True: # TODO: optionally free persistence length
info['persistence length (m)'] = (
params['WLC persistence length'])
- model = WLC(d_data[start:stop], info=info, rescale=True)
+ model = WLC(def_data[start:stop], info=info, rescale=True)
queue = Queue()
params = model.fit(outqueue=queue)
if True: # TODO: if persistence length fixed
L = params[0]
T = info['temperature (K)']
fit_info = queue.get(block=False)
- f_data = numpy.ones(z_data.shape, dtype=z_data.dtype) * numpy.nan
- f_data[start:stop] = WLC_fn(z_data[start:stop], T=T, L=L, p=p)
+ f_data = numpy.ones(dist_data.shape, dtype=dist_data.dtype) * numpy.nan
+ f_data[start:stop] = WLC_fn(dist_data[start:stop], T=T, L=L, p=p)
return [f_data, fit_info]
-class PolymerFitPeaksCommand (Command):
+class PolymerFitPeaksCommand (ColumnAccessCommand):
"""Polymer model (WLC, FJC, etc.) fitting.
Use :class:`PolymerFitCommand` to fit the each peak in a list of
def __init__(self, plugin):
super(PolymerFitPeaksCommand, self).__init__(
name='polymer fit peaks',
+ columns=plugin._input_columns,
arguments=[
- CurveArgument,
- Argument(name='block', aliases=['set'], type='int', default=0,
- help="""
-Data block for which the fit should be calculated. For an
-approach/retract force curve, `0` selects the approaching curve and
-`1` selects the retracting curve.
-""".strip()),
Argument(name='peak info name', type='string',
default='polymer peaks',
help="""
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
+ data = self._block(hooke, params)
fit_command = [c for c in hooke.commands
if c.name=='polymer fit'][0]
inq = Queue()
outq = Queue()
+ curve = params['curve']
+ params['curve'] = None
p = copy.deepcopy(params)
p['curve'] = params['curve']
del(p['peak info name'])
raise ret
-class TranslateFlatPeaksCommand (Command):
+class TranslateFlatPeaksCommand (ColumnAccessCommand):
"""Translate flat filter peaks into polymer peaks for fitting.
Use :class:`~hooke.plugin.flatfilt.FlatPeaksCommand` creates a
polymer loading regions.
"""
def __init__(self, plugin):
- plugin_arguments = [a for a in plugin._arguments
- if a.name in ['input distance column',
- 'input deflection column']]
- arguments = [
- CurveArgument,
- Argument(name='block', aliases=['set'], type='int', default=0,
- help="""
-Data block for which the fit should be calculated. For an
-approach/retract force curve, `0` selects the approaching curve and
-`1` selects the retracting curve.
-""".strip()),
- ] + plugin_arguments + [
- Argument(name='input peak info name', type='string',
- default='flat filter peaks',
- help="""
+ super(TranslateFlatPeaksCommand, self).__init__(
+ name='flat peaks to polymer peaks',
+ columns=plugin._input_columns,
+ arguments=[
+ Argument(name='input peak info name', type='string',
+ default='flat filter peaks',
+ help="""
Name for the input peaks in the `.info` dictionary.
""".strip()),
- Argument(name='output peak info name', type='string',
- default='polymer peaks',
- help="""
+ Argument(name='output peak info name', type='string',
+ default='polymer peaks',
+ help="""
Name for the ouptput peaks in the `.info` dictionary.
""".strip()),
- Argument(name='end offset', type='int', default=-1,
- help="""
+ Argument(name='end offset', type='int', default=-1,
+ help="""
Number of points between the end of a new peak and the start of the old.
""".strip()),
- Argument(name='start fraction', type='float', default=0.2,
- help="""
+ Argument(name='start fraction', type='float', default=0.2,
+ help="""
Place the start of the new peak at `start_fraction` from the end of
the previous old peak to the end of the new peak. Because the first
new peak will have no previous old peak, it uses a distance of zero
instead.
""".strip()),
- ]
- super(TranslateFlatPeaksCommand, self).__init__(
- name='flat peaks to polymer peaks',
- arguments=arguments,
+ ] + plugin._arguments,
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
- z_data = data[:,data.info['columns'].index(
- params['input distance column'])]
- d_data = data[:,data.info['columns'].index(
- params['input deflection column'])]
- previous_old_stop = numpy.absolute(z_data).argmin()
+ data = self._block(hooke, params)
+ dist_data = self._get_column(
+ hooke=hooke, params=params, column_name='distance column')
+ def_data = self._get_column(
+ hooke=hooke, params=params, column_name='deflection column')
+ previous_old_stop = numpy.absolute(dist_data).argmin()
new = []
- for i,peak in enumerate(data.info[params['input peak info name']]):
+ try:
+ peaks = data.info[params['input peak info name']]
+ except KeyError, e:
+ raise Failure('No value for %s in %s.info (%s): %s'
+ % (params['input peak info name'], data.info['name'],
+ sorted(data.info.keys()), e))
+ for i,peak in enumerate(peaks):
next_old_start = peak.index
stop = next_old_start + params['end offset']
- z_start = z_data[previous_old_stop] + params['start fraction']*(
- z_data[stop] - z_data[previous_old_stop])
- start = numpy.absolute(z_data - z_start).argmin()
+ dist_start = (
+ dist_data[previous_old_stop]
+ + params['start fraction']*(
+ dist_data[stop] - dist_data[previous_old_stop]))
+ start = numpy.absolute(dist_data - dist_start).argmin()
p = Peak('polymer peak %d' % i,
index=start,
- values=d_data[start:stop])
+ values=def_data[start:stop])
new.append(p)
previous_old_stop = peak.post_index()
data.info[params['output peak info name']] = new