#
# This file is part of Hooke.
#
-# Hooke is free software: you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation, either
-# version 3 of the License, or (at your option) any later version.
+# Hooke is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
#
-# Hooke is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
+# Hooke is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
+# Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with Hooke. If not, see
# <http://www.gnu.org/licenses/>.
-"""The ``flatfilt`` module provides :class:`~FlatFiltPlugin` and
+"""The ``flatfilt`` module provides :class:`FlatFiltPlugin` and
several associated :class:`~hooke.command.Command`\s for removing flat
(featureless) :mod:`~hooke.curve.Curve`\s from
:class:`~hooke.playlist.Playlist`\s.
See Also
--------
-:mod:`~hooke.plugin.convfilt` for a convolution-based filter for
-:class:`~hooke.experiment.VelocityClamp` experiments.
+:mod:`~hooke.plugin.convfilt` for a convolution-based filter.
"""
import copy
-from multiprocessing import Queue
+from Queue import Queue
-from numpy import diff
+from numpy import absolute, diff
from scipy.signal.signaltools import medfilt
-from ..command import Command, Argument, Failure
+from ..command import Argument, Success, Failure, UncaughtException
from ..config import Setting
-from ..plugin import Plugin, argument_to_setting
-from ..plugin.curve import CurveArgument
-from ..plugin.playlist import FilterCommand
-from ..plugin.vclamp import scale
-from ..util.peak import find_peaks, find_peaks_arguments, Peak
+from ..curve import Data
+from ..util.fit import PoorFit
+from ..util.peak import (find_peaks, peaks_to_mask,
+ find_peaks_arguments, Peak, _kwargs)
+from ..util.si import join_data_label, split_data_label
+from . import Plugin, argument_to_setting
+from .curve import ColumnAddingCommand
+from .playlist import FilterCommand
class FlatFiltPlugin (Plugin):
def __init__(self):
super(FlatFiltPlugin, self).__init__(name='flatfilt')
self._arguments = [ # For Command initialization
- Argument('median filter', type='int', default=7, help="""
+ Argument('median window', type='int', default=7, help="""
Median window filter size (in points).
+""".strip()),
+ Argument('blind window', type='float', default=20e-9, help="""
+Meters after the contact point where we do not count peaks to avoid
+non-specific surface interaction.
+""".strip()),
+ Argument('min peaks', type='int', default=4, help="""
+Minimum number of peaks for curve acceptance.
""".strip()),
] + copy.deepcopy(find_peaks_arguments)
# Set flat-filter-specific defaults for the fit_peak_arguments.
for key,value in [('cut side', 'both'),
('stable', 0.005),
('max cut', 0.2),
- ('min deviation', 9.0),
+ ('min deviations', 9.0),
('min points', 4),
- ('see double', 10e-9),
+ ('see double', 10), # TODO: points vs. meters. 10e-9),
]:
argument = [a for a in self._arguments if a.name == key][0]
argument.default = value
self._settings.append(argument_to_setting(
self.setting_section, argument))
argument.default = None # if argument isn't given, use the config.
- self._commands = [FlatPeaksCommand(self), FlatFilterCommand(self)]
+ self._commands = [FlatPeaksCommand(self)]
+ # append FlatFilterCommand so it can steal arguments from
+ # FlatPeaksCommand.
+ self._commands.append(FlatFilterCommand(self))
def dependencies(self):
return ['vclamp']
return self._settings
-class FlatPeaksCommand (Command):
+class FlatPeaksCommand (ColumnAddingCommand):
"""Detect peaks in velocity clamp data using noise statistics.
Notes
-----
-
Noise analysis on the retraction curve:
1) A median window filter (using
The algorithm was originally Francesco Musiani's idea.
"""
def __init__(self, plugin):
- config_arguments = [a for a in plugin._arguments
+ plugin_arguments = [a for a in plugin._arguments
if a.name != 'min peaks']
# Drop min peaks, since we're not filtering with this
# function, just detecting peaks.
super(FlatPeaksCommand, self).__init__(
name='flat filter peaks',
- arguments=[CurveArgument] + config_arguments,
+ columns=[
+ ('distance column', 'surface distance (m)', """
+Name of the column to use as the surface position input.
+""".strip()),
+ ('deflection column', 'surface deflection (m)', """
+Name of the column to use as the deflection input.
+""".strip()),
+ ],
+ new_columns=[
+ ('output peak column', 'flat filter peaks', """
+Name of the column (without units) to use as the peak output.
+""".strip()),
+ ],
+ arguments=[
+ Argument(name='peak info name', type='string',
+ default='flat filter peaks',
+ help="""
+Name for storing the list of peaks in the `.info` dictionary.
+""".strip()),
+ ] + plugin_arguments,
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- z_data,d_data,params = self._setup(params)
- start_index = 0
- while z_data[start_index] < params['bind window']:
- start_index += 1
- median = medfilt(d_data[start_index:], params['median window']),
+ self._add_to_command_stack(params)
+ params = self._setup_params(hooke=hooke, params=params)
+ block = self._block(hooke=hooke, params=params)
+ dist_data = self._get_column(hooke=hooke, params=params,
+ column_name='distance column')
+ def_data = self._get_column(hooke=hooke, params=params,
+ column_name='deflection column')
+ start_index = absolute(dist_data-params['blind window']).argmin()
+ median = medfilt(def_data[start_index:], params['median window'])
deriv = diff(median)
- kwargs = dict([(a.name, params[a.name]) for a in find_peaks_arguments])
- peaks = find_peaks(deriv, **kwargs)
- for peak in peaks:
- peak.name = 'flat filter of %s with %s' \
- % (params['deflection column name'], params['convolution'])
+ peaks = find_peaks(deriv, **_kwargs(params, find_peaks_arguments,
+ argument_input_keys=True))
+ for i,peak in enumerate(peaks):
+ peak.name = self._peak_name(params, i)
peak.index += start_index
+ block.info[params['peak info name']] = peaks
+
+ self._set_column(hooke=hooke, params=params,
+ column_name='output peak column',
+ values=peaks_to_mask(def_data, peaks) * def_data)
outqueue.put(peaks)
- def _setup(self, params):
+ def _setup_params(self, hooke, params):
"""Setup `params` from config and return the z piezo and
deflection arrays.
"""
- curve = params['curve']
- if curve.info['experiment'] != VelocityClamp:
- raise Failure('%s operates on VelocityClamp experiments, not %s'
- % (self.name, curve.info['experiment']))
- for col in ['surface z piezo (m)', 'deflection (N)']:
- if col not in curve.data[0].info['columns']:
- scale(curve)
- data = None
- for block in curve.data:
- if block.info['name'].startswith('retract'):
- data = block
- break
- if data == None:
- raise Failure('No retraction blocks in %s.' % curve)
- z_data = data[:,data.info['columns'].index('surface z piezo (m)')]
- if 'flattened deflection (N)' in data.info['columns']:
- params['deflection column name'] = 'flattened deflection (N)'
- else:
- params['deflection column name'] = 'deflection (N)'
- d_data = data[:,data.info['columns'].index(
- params['deflection column name'])]
+ curve = self._curve(hooke=hooke, params=params)
for key,value in params.items():
- if value == None: # Use configured default value.
+ if value == None and key in self.plugin.config:
+ # Use configured default value.
params[key] = self.plugin.config[key]
- return z_data,d_data,params
+ # TODO: convert 'see double' from nm to points
+ name,def_unit = split_data_label(params['deflection column'])
+ params['output peak column'] = join_data_label(
+ params['output peak column'], def_unit)
+ return params
+
+ def _peak_name(self, params, index):
+ d_name,d_unit = split_data_label(params['deflection column'])
+ return 'flat filter peak %d of %s' % (index, d_name)
+
class FlatFilterCommand (FilterCommand):
u"""Create a subset playlist of curves with enough flat peaks.
.. [#sandal2008] M. Sandal, F. Valle, I. Tessari, S. Mammi, E. Bergantino,
F. Musiani, M. Brucale, L. Bubacco, B. Samorì.
- "Conformational equilibria in monomeric :math:`\alpha`-Synuclein at the
+ "Conformational equilibria in monomeric α-Synuclein at the
single molecule level."
PLOS Biology, 2009.
doi: `10.1371/journal.pbio.0060006 <http://dx.doi.org/10.1371/journal.pbio.0060006>`_
See Also
--------
- FlatCommand : Underlying flat-based peak detection.
+ FlatPeaksCommand : Underlying flat-based peak detection.
"""
def __init__(self, plugin):
+ flat_peaks = [c for c in plugin._commands
+ if c.name == 'flat filter peaks'][0]
+ flat_peaks_arguments = [a for a in flat_peaks.arguments
+ if a.name not in ['help', 'stack']]
+ flat_peaks_arg_names = [a.name for a in flat_peaks_arguments]
+ plugin_arguments = [a for a in plugin._arguments
+ if a.name not in flat_peaks_arg_names]
+ arguments = flat_peaks_arguments + plugin_arguments
super(FlatFilterCommand, self).__init__(
plugin, name='flat filter playlist')
- self.arguments.extend(plugin._arguments)
+ self.arguments.extend(arguments)
def filter(self, curve, hooke, inqueue, outqueue, params):
+ params['curve'] = curve
inq = Queue()
outq = Queue()
- conv_command = [c for c in self.hooke.commands
+ filt_command = [c for c in hooke.commands
if c.name=='flat filter peaks'][0]
- conv_command.run(hooke, inq, outq, **params)
+ filt_command.run(hooke, inq, outq, **params)
peaks = outq.get()
- if not isinstance(peaks[0], Peak):
- raise Failure('Expected a list of Peaks, not %s' % peaks)
+ if isinstance(peaks, UncaughtException) \
+ and isinstance(peaks.exception, PoorFit):
+ return False
+ if not (isinstance(peaks, list) and (len(peaks) == 0
+ or isinstance(peaks[0], Peak))):
+ raise Failure('Expected a list of Peaks, not %s: %s'
+ % (type(peaks), peaks))
ret = outq.get()
if not isinstance(ret, Success):
raise ret
- return len(peaks) >= params['min peaks']
-
+ if params['min peaks'] == None: # Use configured default value.
+ params['min peaks'] = self.plugin.config['min peaks']
+ return len(peaks) >= int(params['min peaks'])