+++ /dev/null
-<?xml version="1.0" ?>
-<config>
-
-<!-- Parameters to calculate the noise absolute deviation.
- positive= cut out most positive (1) or negative (0) values (default=0)
- maxcut=cut at maximum a maxcut fraction of all points. (default=0.2)
- stable=convergency threshold (when cutting more points the ratio doesn't change more than stable, then stop) (default=0.005)
--->
-<noise_absdev positive="0" maxcut="0.2" stable="0.005"/>
-
-<!-- Parameters of the convfilt.
- minpeaks=number minimum of peaks we want (default=5)
- mindeviation=minimum absolute deviation of convolution to define a peak (default=5)
- seedouble=if two peaks distance less than seedouble points, count them as a single peak (default=10)
- convolution=the actual convolution vector (DO NOT TOUCH UNLESS YOU KNOW WHAT YOU ARE DOING)
- blindwindow=nm after the contact point where we do not count peaks.
- -->
-<convfilt minpeaks="5" mindeviation="5" seedouble="10" convolution="[6.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0]" blindwindow="20"/>
-<!--convfilt minpeaks="5" mindeviation="5" seedouble="10" convolution="[11.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0]" blindwindow="100"/-->
-</config>
+++ /dev/null
-[convfilt]
- [[blindwindow]]
- default = 20
- maximum = 10000
- minimum = 0
- type = float
- value = 20
-
- [[convolution]]
- default = '[6.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0]'
- type = string
- #value = '[6.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0]'
- value = '[11.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0]'
-
- [[maxcut]]
- default = 0.2
- maximum = 1
- minimum = 0
- type = float
- value = 0.2
- visible = False
-
- [[medianfilter]]
- default = 7
- maximum = 100
- minimum = 0
- type = integer
- value = 7
-
- [[mindeviation]]
- default = 5
- maximum = 100
- minimum = 0
- type = float
- value = 5
-
- [[minpeaks]]
- default = 5
- maximum = 20
- minimum = 0
- type = integer
- value = 1
-
- [[positive]]
- default = False
- type = boolean
- value = False
-
- [[seedouble]]
- default = 10
- maximum = 1000
- minimum = 0
- type = integer
- value = 10
-
- [[stable]]
- default = 0.005
- maximum = 1
- minimum = 0
- type = float
- value = 0.005
-
-[flatfilt]
- #[[median_filter]]
- #default = 7
- #maximum = 100
- #minimum = 0
- #type = integer
- #value = 7
-
- [[min_npks]]
- default = 4
- maximum = 10000
- minimum = 1
- type = integer
- value = 4
-
- [[min_deviation]]
- default = 9
- maximum = 100
- minimum = 1
- type = float
- value = 9
-
class Request (Interaction):
"""Command engine requests for information from the UI.
+
+ >>> r = Request('test', 'Does response_class work?')
+ >>> r.response_class()
+ <class 'hooke.interaction.Response'>
"""
- def __init__(self, type, response_class,
- msg, default=None, validator=None):
+ def __init__(self, type, msg, default=None, validator=None):
super(Request, self).__init__(type)
- self.response_class = response_class
self.msg = msg
self.default = default
self.validator = validator
+ def response_class(self):
+ class_name = self.__class__.__name__.replace('Request', 'Response')
+ return globals()[class_name]
+
def response(self, value):
if self.validator != None:
self.validator(value)
- return self.response_class(value)
+ return self.response_class()(value)
class Response (Interaction):
"""UI response to a :class:`Request`.
class BooleanRequest (Request):
def __init__(self, msg, default=None):
super(BooleanRequest, self).__init__(
- 'boolean', BooleanResponse, msg, default,
+ 'boolean', msg, default,
validator = InList([True, False, default]))
class BooleanResponse (Response):
class StringRequest (Request):
def __init__(self, msg, default=None):
- super(StringRequest, self).__init__(
- 'string', StringResponse, msg, default)
+ super(StringRequest, self).__init__('string', msg, default)
class StringResponse (Response):
def __init__(self, value):
class FloatRequest (Request):
def __init__(self, msg, default=None):
- super(FloatRequest, self).__init__(
- 'float', FloatResponse, msg, default)
+ super(FloatRequest, self).__init__('float', msg, default)
class FloatResponse (Response):
def __init__(self, value):
class SelectionRequest (Request):
def __init__(self, msg, default=None, options=[]):
- super(SelectionRequest, self).__init__(
- 'selection', SelectionResponse, msg, default)
+ super(SelectionRequest, self).__init__('selection', msg, default)
self.options = options
class SelectionResponse (Response):
class PointRequest (Request):
def __init__(self, msg, curve, block=0, default=None):
- super(PointRequest, self).__init__(
- 'point', PointResponse, msg, default)
+ super(PointRequest, self).__init__('point', msg, default)
self.options = options
class PointResponse (Response):
PLUGIN_MODULES = [
# ('autopeak', True),
-# ('curvetools', True),
('convfilt', True),
('cut', True),
+# ('fclamp', True),
# ('fit', True),
# ('flatfilts-rolf', True),
('flatfilt', True),
-# ('generalclamp', True),
-# ('generaltccd', True),
-# ('generalvclamp', True),
# ('jumpstat', True),
# ('macro', True),
# ('massanalysis', True),
# ('multidistance', True),
# ('multifit', True),
# ('pcluster', True),
-# ('peakspot', True),
# ('procplots', True),
# ('review', True),
# ('showconvoluted', True),
# ('superimpose', True),
+# ('tccd', True),
# ('tutorial', True),
+ ('vclamp', True),
]
"""List of plugin modules and whether they should be included by
default. TODO: autodiscovery
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2008-2010 Alberto Gomez-Casado
+# Fabrizio Benedetti
+# Massimo Sandal <devicerandom@gmail.com>
+# W. Trevor King <wking@drexel.edu>
+#
+# This file is part of Hooke.
+#
+# Hooke is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation, either
+# version 3 of the License, or (at your option) any later version.
+#
+# Hooke is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with Hooke. If not, see
+# <http://www.gnu.org/licenses/>.
+
+"""The ``convfilt`` module provides :class:`ConvFiltPlugin` and
+for convolution-based filtering of :mod:`hooke.curve.Curve`\s.
+
+See Also
+--------
+:mod:`~hooke.plugin.flatfilt` for a collection of additional filters.
+
+:class:`ConfFiltPlugin` was broken out of
+:class:`~hooke.plugin.flatfilt` to separate its large number of
+configuration settings.
+"""
+
+import copy
+from multiprocessing import Queue
+
+import numpy
+
+from ..command import Command, Argument, Failure
+from ..config import Setting
+from ..experiment import VelocityClamp
+from ..plugin import Plugin, argument_to_setting
+from ..plugin.curve import CurveArgument
+from ..plugin.playlist import FilterCommand
+from ..plugin.vclamp import scale
+from ..util.peak import find_peaks, find_peaks_arguments, Peak
+
+
+class ConvFiltPlugin (Plugin):
+ """Convolution-based peak recognition and filtering.
+ """
+ def __init__(self):
+ super(ConvFiltPlugin, self).__init__(name='convfilt')
+ self._arguments = [ # for Command initialization
+ Argument('convolution', type='float', count='-1',
+ default=[11.0]+[-1.0]*11, help="""
+Convolution vector roughly matching post-peak cantilever rebound.
+This should roughly match the shape of the feature you're looking for.
+""".strip()), # TODO: per-curve convolution vector + interpolation, to
+ # take advantage of the known spring constant.
+ Argument('blind window', type='float', default=20e-9, help="""
+Meters after the contact point where we do not count peaks to avoid
+non-specific surface interaction.
+""".strip()),
+ Argument('min peaks', type='int', default=5, help="""
+Minimum number of peaks for curve acceptance.
+""".strip()),
+ ] + copy.deepcopy(find_peaks_arguments)
+ # Set convolution-specific defaults for the fit_peak_arguments.
+ for key,value in [('cut side', 'positive'),
+ ('stable', 0.005),
+ ('max cut', 0.2),
+ ('min deviation', 5.0),
+ ('min points', 1),
+ ('see double', 10e-9),
+ ]:
+ argument = [a for a in self._arguments if a.name == key][0]
+ argument.default = value
+ self._settings = [
+ Setting(section=self.setting_section, help=self.__doc__)]
+ for argument in self._arguments:
+ self._settings.append(argument_to_setting(
+ self.setting_section, argument))
+ argument.default = None # if argument isn't given, use the config.
+ self._commands = [ConvolutionPeaksCommand(self),
+ ConvolutionFilterCommand(self)]
+
+ def dependencies(self):
+ return ['vclamp']
+
+ def default_settings(self):
+ return self._settings
+
+
+class ConvolutionPeaksCommand (Command):
+ """Detect peaks in velocity clamp data with a convolution.
+
+ Notes
+ -----
+ A simplified version of Kasas' filter [#kasas2000].
+ For any given retracting curve:
+
+ 1) The contact point is found.
+ 2) The off-surface data is convolved (using :func:`numpy.convolve`)
+ with a vector that encodes the approximate shape of the target
+ feature.
+ 3) Peaks in the convolved curve are extracted with
+ :func:`~hooke.plugins.peak.find_peaks`.
+
+ The convolution algorithm, with appropriate thresholds, usually
+ recognizes peaks well more than 95% of the time.
+
+ .. [#kasas2000] S. Kasas, B.M. Riederer, S. Catsicas, B. Cappella,
+ G. Dietler.
+ "Fuzzy logic algorithm to extract specific interaction forces
+ from atomic force microscopy data"
+ Rev. Sci. Instrum., 2000.
+ doi: `10.1063/1.1150583 <http://dx.doi.org/10.1063/1.1150583>`_
+ """
+ def __init__(self, plugin):
+ config_arguments = [a for a in plugin._arguments
+ if a.name != 'min peaks']
+ # Drop min peaks, since we're not filtering with this
+ # function, just detecting peaks.
+ super(ConvolutionPeaksCommand, self).__init__(
+ name='convolution peaks',
+ arguments=[CurveArgument] + config_arguments,
+ help=self.__doc__, plugin=plugin)
+
+ def _run(self, hooke, inqueue, outqueue, params):
+ z_data,d_data,params = self._setup(params)
+ start_index = 0
+ while z_data[start_index] < params['bind window']:
+ start_index += 1
+ conv = numpy.convolve(d_data[start_index:], params['convolution'],
+ mode='valid')
+ kwargs = dict([(a.name, params[a.name]) for a in find_peaks_arguments])
+ peaks = find_peaks(conv, **kwargs)
+ for peak in peaks:
+ peak.name = 'convolution of %s with %s' \
+ % (params['deflection column name'], params['convolution'])
+ peak.index += start_index
+ outqueue.put(peaks)
+
+ def _setup(self, params):
+ """Setup `params` from config and return the z piezo and
+ deflection arrays.
+ """
+ curve = params['curve']
+ if curve.info['experiment'] != VelocityClamp:
+ raise Failure('%s operates on VelocityClamp experiments, not %s'
+ % (self.name, curve.info['experiment']))
+ for col in ['surface z piezo (m)', 'deflection (N)']:
+ if col not in curve.data[0].info['columns']:
+ scale(curve)
+ data = None
+ for block in curve.data:
+ if block.info['name'].startswith('retract'):
+ data = block
+ break
+ if data == None:
+ raise Failure('No retraction blocks in %s.' % curve)
+ z_data = data[:,data.info['columns'].index('surface z piezo (m)')]
+ if 'flattened deflection (N)' in data.info['columns']:
+ params['deflection column name'] = 'flattened deflection (N)'
+ else:
+ params['deflection column name'] = 'deflection (N)'
+ d_data = data[:,data.info['columns'].index(
+ params['deflection column name'])]
+ for key,value in params.items():
+ if value == None: # Use configured default value.
+ params[key] = self.plugin.config[key]
+ return z_data,d_data,params
+
+
+class ConvolutionFilterCommand (FilterCommand):
+ u"""Create a subset playlist of curves with enough convolution peaks.
+
+ Notes
+ -----
+ This filter can reduce a dataset like the one in [#brucale2009] to
+ analyze by hand by about 80-90% (depending on the overall
+ cleanliness of the data set). Thousands of curves can be
+ automatically filtered this way in a few minutes on a standard PC,
+ but the algorithm could still be optimized.
+
+ .. [#brucale2009] M. Brucale, M. Sandal, S. Di Maio, A. Rampioni,
+ I. Tessari, L. Tosatto, M. Bisaglia, L. Bubacco, B. Samorì.
+ "Pathogenic mutations shift the equilibria of
+ :math:`\alpha`-Synuclein single molecules towards structured
+ conformers."
+ Chembiochem., 2009.
+ doi: `10.1002/cbic.200800581 <http://dx.doi.org/10.1002/cbic.200800581>`_
+
+ See Also
+ --------
+ ConvolutionCommand : Underlying convolution-based peak detection.
+ """
+ def __init__(self, plugin):
+ super(ConvolutionFilterCommand, self).__init__(
+ plugin, name='convolution filter playlist')
+ self.arguments.extend(plugin._arguments)
+
+ def filter(self, curve, hooke, inqueue, outqueue, params):
+ inq = Queue()
+ outq = Queue()
+ conv_command = [c for c in self.hooke.commands
+ if c.name=='convolution peaks'][0]
+ conv_command.run(hooke, inq, outq, **params)
+ peaks = outq.get()
+ if not isinstance(peaks[0], Peak):
+ raise Failure('Expected a list of Peaks, not %s' % peaks)
+ ret = outq.get()
+ if not isinstance(ret, Success):
+ raise ret
+ return len(peaks) >= params['min peaks']
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2008-2010 Alberto Gomez-Casado
+# Fabrizio Benedetti
+# Massimo Sandal <devicerandom@gmail.com>
+# W. Trevor King <wking@drexel.edu>
+#
+# This file is part of Hooke.
+#
+# Hooke is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation, either
+# version 3 of the License, or (at your option) any later version.
+#
+# Hooke is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with Hooke. If not, see
+# <http://www.gnu.org/licenses/>.
+
+"""The ``flatfilt`` module provides :class:`~FlatFiltPlugin` and
+several associated :class:`~hooke.command.Command`\s for removing flat
+(featureless) :mod:`~hooke.curve.Curve`\s from
+:class:`~hooke.playlist.Playlist`\s.
+
+See Also
+--------
+:mod:`~hooke.plugin.convfilt` for a convolution-based filter for
+:class:`~hooke.experiment.VelocityClamp` experiments.
+"""
+
+import copy
+from multiprocessing import Queue
+
+from numpy import diff
+from scipy.signal.signaltools import medfilt
+
+from ..command import Command, Argument, Failure
+from ..config import Setting
+from ..plugin import Plugin, argument_to_setting
+from ..plugin.curve import CurveArgument
+from ..plugin.playlist import FilterCommand
+from ..plugin.vclamp import scale
+from ..util.peak import find_peaks, find_peaks_arguments, Peak
+
+
+class FlatFiltPlugin (Plugin):
+ """Standard-devitiation-based peak recognition and filtering.
+ """
+ def __init__(self):
+ super(FlatFiltPlugin, self).__init__(name='flatfilt')
+ self._arguments = [ # For Command initialization
+ Argument('median filter', type='int', default=7, help="""
+Median window filter size (in points).
+""".strip()),
+ ] + copy.deepcopy(find_peaks_arguments)
+ # Set flat-filter-specific defaults for the fit_peak_arguments.
+ for key,value in [('cut side', 'both'),
+ ('stable', 0.005),
+ ('max cut', 0.2),
+ ('min deviation', 9.0),
+ ('min points', 4),
+ ('see double', 10e-9),
+ ]:
+ argument = [a for a in self._arguments if a.name == key][0]
+ argument.default = value
+ self._settings = [
+ Setting(section=self.setting_section, help=self.__doc__)]
+ for argument in self._arguments:
+ self._settings.append(argument_to_setting(
+ self.setting_section, argument))
+ argument.default = None # if argument isn't given, use the config.
+ self._commands = [FlatPeaksCommand(self), FlatFilterCommand(self)]
+
+ def dependencies(self):
+ return ['vclamp']
+
+ def default_settings(self):
+ return self._settings
+
+
+class FlatPeaksCommand (Command):
+ """Detect peaks in velocity clamp data using noise statistics.
+
+ Notes
+ -----
+
+ Noise analysis on the retraction curve:
+
+ 1) A median window filter (using
+ :func:`scipy.signal.signaltools.medfilt`) smooths the
+ deflection.
+ 2) The deflection derivative is calculated (using
+ :func:`numpy.diff` which uses forward differencing).
+ 3) Peaks in the derivative curve are extracted with
+ :func:`~hooke.plugins.peak.find_peaks`.
+
+ The algorithm was originally Francesco Musiani's idea.
+ """
+ def __init__(self, plugin):
+ config_arguments = [a for a in plugin._arguments
+ if a.name != 'min peaks']
+ # Drop min peaks, since we're not filtering with this
+ # function, just detecting peaks.
+ super(FlatPeaksCommand, self).__init__(
+ name='flat filter peaks',
+ arguments=[CurveArgument] + config_arguments,
+ help=self.__doc__, plugin=plugin)
+
+ def _run(self, hooke, inqueue, outqueue, params):
+ z_data,d_data,params = self._setup(params)
+ start_index = 0
+ while z_data[start_index] < params['bind window']:
+ start_index += 1
+ median = medfilt(d_data[start_index:], params['median window']),
+ deriv = diff(median)
+ kwargs = dict([(a.name, params[a.name]) for a in find_peaks_arguments])
+ peaks = find_peaks(deriv, **kwargs)
+ for peak in peaks:
+ peak.name = 'flat filter of %s with %s' \
+ % (params['deflection column name'], params['convolution'])
+ peak.index += start_index
+ outqueue.put(peaks)
+
+ def _setup(self, params):
+ """Setup `params` from config and return the z piezo and
+ deflection arrays.
+ """
+ curve = params['curve']
+ if curve.info['experiment'] != VelocityClamp:
+ raise Failure('%s operates on VelocityClamp experiments, not %s'
+ % (self.name, curve.info['experiment']))
+ for col in ['surface z piezo (m)', 'deflection (N)']:
+ if col not in curve.data[0].info['columns']:
+ scale(curve)
+ data = None
+ for block in curve.data:
+ if block.info['name'].startswith('retract'):
+ data = block
+ break
+ if data == None:
+ raise Failure('No retraction blocks in %s.' % curve)
+ z_data = data[:,data.info['columns'].index('surface z piezo (m)')]
+ if 'flattened deflection (N)' in data.info['columns']:
+ params['deflection column name'] = 'flattened deflection (N)'
+ else:
+ params['deflection column name'] = 'deflection (N)'
+ d_data = data[:,data.info['columns'].index(
+ params['deflection column name'])]
+ for key,value in params.items():
+ if value == None: # Use configured default value.
+ params[key] = self.plugin.config[key]
+ return z_data,d_data,params
+
+class FlatFilterCommand (FilterCommand):
+ u"""Create a subset playlist of curves with enough flat peaks.
+
+ Notes
+ -----
+ This type of filter is of course very raw, and requires relatively
+ conservative settings to safely avoid false negatives (that is, to
+ avoid discarding interesting curves). Using it on the protein
+ unfolding experiments described by Sandal [#sandal2008] it has
+ been found to reduce the data set to analyze by hand by 60-80%.
+
+ .. [#sandal2008] M. Sandal, F. Valle, I. Tessari, S. Mammi, E. Bergantino,
+ F. Musiani, M. Brucale, L. Bubacco, B. Samorì.
+ "Conformational equilibria in monomeric :math:`\alpha`-Synuclein at the
+ single molecule level."
+ PLOS Biology, 2009.
+ doi: `10.1371/journal.pbio.0060006 <http://dx.doi.org/10.1371/journal.pbio.0060006>`_
+
+ See Also
+ --------
+ FlatCommand : Underlying flat-based peak detection.
+ """
+ def __init__(self, plugin):
+ super(FlatFilterCommand, self).__init__(
+ plugin, name='flat filter playlist')
+ self.arguments.extend(plugin._arguments)
+
+ def filter(self, curve, hooke, inqueue, outqueue, params):
+ inq = Queue()
+ outq = Queue()
+ conv_command = [c for c in self.hooke.commands
+ if c.name=='flat filter peaks'][0]
+ conv_command.run(hooke, inq, outq, **params)
+ peaks = outq.get()
+ if not isinstance(peaks[0], Peak):
+ raise Failure('Expected a list of Peaks, not %s' % peaks)
+ ret = outq.get()
+ if not isinstance(ret, Success):
+ raise ret
+ return len(peaks) >= params['min peaks']
+
+++ /dev/null
-# Copyright (C) 2008-2010 Alberto Gomez-Casado
-# Fabrizio Benedetti
-# Massimo Sandal <devicerandom@gmail.com>
-# W. Trevor King <wking@drexel.edu>
-#
-# This file is part of Hooke.
-#
-# Hooke is free software: you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation, either
-# version 3 of the License, or (at your option) any later version.
-#
-# Hooke is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with Hooke. If not, see
-# <http://www.gnu.org/licenses/>.
-
-"""Force spectroscopy curves filtering of flat curves
-
-Other plugin dependencies:
-procplots.py (plot processing plugin)
-"""
-
-from hooke.libhooke import WX_GOOD
-
-import wxversion
-wxversion.select(WX_GOOD)
-import xml.dom.minidom
-import wx
-import scipy
-import numpy
-from numpy import diff
-#import pickle
-
-from .. import libpeakspot as lps
-from .. import curve as lhc
-
-
-class flatfiltsCommands(object):
-
- def _plug_init(self):
- #configurate convfilt variables
- convfilt_configurator=ConvfiltConfig()
- self.convfilt_config=convfilt_configurator.load_config('convfilt.conf')
-
- def do_flatfilt(self,args):
- '''
- FLATFILT
- (flatfilts.py)
- Filters out flat (featureless) curves of the current playlist,
- creating a playlist containing only the curves with potential
- features.
- ------------
- Syntax:
- flatfilt [min_npks min_deviation]
-
- min_npks = minmum number of points over the deviation
- (default=4)
-
- min_deviation = minimum signal/noise ratio
- (default=9)
-
- If called without arguments, it uses default values, that
- should work most of the times.
- '''
- median_filter=7
- min_npks=4
- min_deviation=9
-
- args=args.split(' ')
- if len(args) == 2:
- min_npks=int(args[0])
- min_deviation=int(args[1])
- else:
- pass
-
- print 'Processing playlist...'
- notflat_list=[]
-
- c=0
- for item in self.current_list:
- c+=1
-
- try:
- notflat=self.has_features(item, median_filter, min_npks, min_deviation)
- print 'Curve',item.path, 'is',c,'of',len(self.current_list),': features are ',notflat
- except:
- notflat=False
- print 'Curve',item.path, 'is',c,'of',len(self.current_list),': cannot be filtered. Probably unable to retrieve force data from corrupt file.'
-
- if notflat:
- item.features=notflat
- item.curve=None #empty the item object, to further avoid memory leak
- notflat_list.append(item)
-
- if len(notflat_list)==0:
- print 'Found nothing interesting. Check your playlist, could be a bug or criteria could be too much stringent'
- return
- else:
- print 'Found ',len(notflat_list),' potentially interesting curves'
- print 'Regenerating playlist...'
- self.pointer=0
- self.current_list=notflat_list
- self.current=self.current_list[self.pointer]
- self.do_plot(0)
-
- def has_features(self,item,median_filter,min_npks,min_deviation):
- '''
- decides if a curve is flat enough to be rejected from analysis: it sees if there
- are at least min_npks points that are higher than min_deviation times the absolute value
- of noise.
-
- Algorithm original idea by Francesco Musiani, with my tweaks and corrections.
- '''
- retvalue=False
-
- item.identify(self.drivers)
- #we assume the first is the plot with the force curve
- #do the median to better resolve features from noise
- flat_plot=self.plotmanip_median(item.curve.default_plots()[0], item, customvalue=median_filter)
- flat_vects=flat_plot.vectors
- item.curve.close_all()
- #needed to avoid *big* memory leaks!
- del item.curve
- del item
-
- #absolute value of derivate
- yretdiff=diff(flat_vects[1][1])
- yretdiff=[abs(value) for value in yretdiff]
- #average of derivate values
- diffmean=numpy.mean(yretdiff)
- yretdiff.sort()
- yretdiff.reverse()
- c_pks=0
- for value in yretdiff:
- if value/diffmean > min_deviation:
- c_pks+=1
- else:
- break
-
- if c_pks>=min_npks:
- retvalue = c_pks
-
- del flat_plot, flat_vects, yretdiff
-
- return retvalue
-
- ################################################################
- #-----CONVFILT-------------------------------------------------
- #-----Convolution-based peak recognition and filtering.
- #Requires the libpeakspot.py library
-
- def has_peaks(self, plot, abs_devs=None, maxpeak=True, window=10):
- '''
- Finds peak position in a force curve.
- FIXME: should be moved in libpeakspot.py
- '''
- if abs_devs==None:
- abs_devs=self.convfilt_config['mindeviation']
-
-
- xret=plot.vectors[1][0]
- yret=plot.vectors[1][1]
- #Calculate convolution.
- convoluted=lps.conv_dx(yret, self.convfilt_config['convolution'])
-
- #surely cut everything before the contact point
- cut_index=self.find_contact_point(plot)
- #cut even more, before the blind window
- start_x=xret[cut_index]
- blind_index=0
- for value in xret[cut_index:]:
- if abs((value) - (start_x)) > self.convfilt_config['blindwindow']*(10**-9):
- break
- blind_index+=1
- cut_index+=blind_index
- #do the dirty convolution-peak finding stuff
- noise_level=lps.noise_absdev(convoluted[cut_index:], self.convfilt_config['positive'], self.convfilt_config['maxcut'], self.convfilt_config['stable'])
- above=lps.abovenoise(convoluted,noise_level,cut_index,abs_devs)
- peak_location,peak_size=lps.find_peaks(above,seedouble=self.convfilt_config['seedouble'])
-
- #take the minimum or the maximum of a peak
- for i in range(len(peak_location)):
- peak=peak_location[i]
- valpk=min(yret[peak-window:peak+window]) #maximum in force (near the unfolding point)
- index_pk=yret[peak-window:peak+window].index(valpk)+(peak-window)
-
- if maxpeak==False:
- valpk=max(yret[peak:peak+window]) #minimum in force, near the baseline
- index_pk=yret[peak:peak+window].index(valpk)+(peak)
-
-# Let's explain that for the minimum. Immaging that we know that there is a peak at position/region 100 and you have found its y-value,
-# Now you look in the array, from 100-10 to 100+10 (if the window is 10).
-# This "100-10 to 100+10" is substancially a new array with its index. In this array you have 20
-# elements, so the index of your y-value will be 10.
-# Now to find the index in the TOTAL array you have to add the "position" of the "region" (that in this case
-# correspond to 100) and also substract the window size ---> (+100-10)
-
- peak_location[i]=index_pk
-
- return peak_location,peak_size
-
-
- def exec_has_peaks(self,item,abs_devs):
- '''
- encapsulates has_peaks for the purpose of correctly treating the curve objects in the convfilt loop,
- to avoid memory leaks
- '''
- item.identify(self.drivers)
- #we assume the first is the plot with the force curve
- plot=item.curve.default_plots()[0]
-
- if 'flatten' in self.config['plotmanips']:
- #If flatten is present, use it for better recognition of peaks...
- flatten=self._find_plotmanip('flatten') #extract flatten plot manipulator
- plot=flatten(plot, item, customvalue=1)
-
- peak_location,peak_size=self.has_peaks(plot,abs_devs)
- #close all open files
- item.curve.close_all()
- #needed to avoid *big* memory leaks!
- del item.curve
- del item
- return peak_location, peak_size
-
- #------------------------
- #------commands----------
- #------------------------
- def do_peaks(self,args):
- '''
- PEAKS
- (flatfilts.py)
- Test command for convolution filter / test.
- ----
- Syntax: peaks [deviations]
- absolute deviation = number of times the convolution signal is above the noise absolute deviation.
- Default is 5.
- '''
- if len(args)==0:
- args=self.convfilt_config['mindeviation']
-
- try:
- abs_devs=float(args)
- except:
- print 'Wrong argument, using config value'
- abs_devs=float(self.convfilt_config['mindeviation'])
-
- defplots=self.current.curve.default_plots()[0] #we need the raw, uncorrected plots
-
- if 'flatten' in self.config['plotmanips']:
- flatten=self._find_plotmanip('flatten') #extract flatten plot manipulator
- defplots=flatten(defplots, self.current)
- else:
- print 'You have the flatten plot manipulator not loaded. Enabling it could give you better results.'
-
- peak_location,peak_size=self.has_peaks(defplots,abs_devs)
- print 'Found '+str(len(peak_location))+' peaks.'
- to_dump='peaks '+self.current.path+' '+str(len(peak_location))
- self.outlet.push(to_dump)
- #print peak_location
-
- #if no peaks, we have nothing to plot. exit.
- if len(peak_location)==0:
- return
-
- #otherwise, we plot the peak locations.
- xplotted_ret=self.plots[0].vectors[1][0]
- yplotted_ret=self.plots[0].vectors[1][1]
- xgood=[xplotted_ret[index] for index in peak_location]
- ygood=[yplotted_ret[index] for index in peak_location]
-
- recplot=self._get_displayed_plot()
- recplot.vectors.append([xgood,ygood])
- if recplot.styles==[]:
- recplot.styles=[None,None,'scatter']
- recplot.colors=[None,None,None]
- else:
- recplot.styles+=['scatter']
- recplot.colors+=[None]
-
- self._send_plot([recplot])
-
- def do_convfilt(self,args):
- '''
- CONVFILT
- (flatfilts.py)
- Filters out flat (featureless) curves of the current playlist,
- creating a playlist containing only the curves with potential
- features.
- ------------
- Syntax:
- convfilt [min_npks min_deviation]
-
- min_npks = minmum number of peaks
- (to set the default, see convfilt.conf file; CONVCONF and SETCONF commands)
-
- min_deviation = minimum signal/noise ratio *in the convolution*
- (to set the default, see convfilt.conf file; CONVCONF and SETCONF commands)
-
- If called without arguments, it uses default values.
- '''
-
- min_npks=self.convfilt_config['minpeaks']
- min_deviation=self.convfilt_config['mindeviation']
-
- args=args.split(' ')
- if len(args) == 2:
- min_npks=int(args[0])
- min_deviation=int(args[1])
- else:
- pass
-
- print 'Processing playlist...'
- print '(Please wait)'
- notflat_list=[]
-
- c=0
- for item in self.current_list:
- c+=1
-
- try:
- peak_location,peak_size=self.exec_has_peaks(item,min_deviation)
- if len(peak_location)>=min_npks:
- isok='+'
- else:
- isok=''
- print 'Curve',item.path, 'is',c,'of',len(self.current_list),': found '+str(len(peak_location))+' peaks.'+isok
- except:
- peak_location,peak_size=[],[]
- print 'Curve',item.path, 'is',c,'of',len(self.current_list),': cannot be filtered. Probably unable to retrieve force data from corrupt file.'
-
- if len(peak_location)>=min_npks:
- item.peak_location=peak_location
- item.peak_size=peak_size
- item.curve=None #empty the item object, to further avoid memory leak
- notflat_list.append(item)
-
- #Warn that no flattening had been done.
- if not ('flatten' in self.config['plotmanips']):
- print 'Flatten manipulator was not found. Processing was done without flattening.'
- print 'Try to enable it in your configuration file for better results.'
-
- if len(notflat_list)==0:
- print 'Found nothing interesting. Check your playlist, could be a bug or criteria could be too much stringent'
- return
- else:
- print 'Found ',len(notflat_list),' potentially interesting curves'
- print 'Regenerating playlist...'
- self.pointer=0
- self.current_list=notflat_list
- self.current=self.current_list[self.pointer]
- self.do_plot(0)
-
-
- def do_setconv(self,args):
- '''
- SETCONV
- (flatfilts.py)
- Sets the convfilt configuration variables
- ------
- Syntax: setconv variable value
- '''
- args=args.split()
- #FIXME: a general "set dictionary" function has to be built
- if len(args)==0:
- print self.convfilt_config
- else:
- if not (args[0] in self.convfilt_config.keys()):
- print 'This is not an internal convfilt variable!'
- print 'Run "setconv" without arguments to see a list of defined variables.'
- return
-
- if len(args)==1:
- print self.convfilt_config[args[0]]
- elif len(args)>1:
- try:
- self.convfilt_config[args[0]]=eval(args[1])
- except NameError: #we have a string argument
- self.convfilt_config[args[0]]=args[1]
-
-
-#########################
-#HANDLING OF CONFIGURATION FILE
-class ConvfiltConfig(object):
- '''
- Handling of convfilt configuration file
-
- Mostly based on the simple-yet-useful examples of the Python Library Reference
- about xml.dom.minidom
-
- FIXME: starting to look a mess, should require refactoring
- '''
-
- def __init__(self):
- self.config={}
-
-
- def load_config(self, filename):
- myconfig=file(filename)
- #the following 3 lines are needed to strip newlines. otherwise, since newlines
- #are XML elements too, the parser would read them (and re-save them, multiplying
- #newlines...)
- #yes, I'm an XML n00b
- the_file=myconfig.read()
- the_file_lines=the_file.split('\n')
- the_file=''.join(the_file_lines)
-
- self.config_tree=xml.dom.minidom.parseString(the_file)
-
- def getText(nodelist):
- #take the text from a nodelist
- #from Python Library Reference 13.7.2
- rc = ''
- for node in nodelist:
- if node.nodeType == node.TEXT_NODE:
- rc += node.data
- return rc
-
- def handleConfig(config):
- noiseabsdev_elements=config.getElementsByTagName("noise_absdev")
- convfilt_elements=config.getElementsByTagName("convfilt")
- handleAbsdev(noiseabsdev_elements)
- handleConvfilt(convfilt_elements)
-
- def handleAbsdev(noiseabsdev_elements):
- for element in noiseabsdev_elements:
- for attribute in element.attributes.keys():
- self.config[attribute]=element.getAttribute(attribute)
-
- def handleConvfilt(convfilt_elements):
- for element in convfilt_elements:
- for attribute in element.attributes.keys():
- self.config[attribute]=element.getAttribute(attribute)
-
- handleConfig(self.config_tree)
- #making items in the dictionary machine-readable
- for item in self.config.keys():
- try:
- self.config[item]=eval(self.config[item])
- except NameError: #if it's an unreadable string, keep it as a string
- pass
-
- return self.config
+++ /dev/null
-# Copyright (C) 2007-2010 Fabrizio Benedetti
-# Massimo Sandal <devicerandom@gmail.com>
-# W. Trevor King <wking@drexel.edu>
-#
-# This file is part of Hooke.
-#
-# Hooke is free software: you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation, either
-# version 3 of the License, or (at your option) any later version.
-#
-# Hooke is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with Hooke. If not, see
-# <http://www.gnu.org/licenses/>.
-
-"""a library of helping functions for spotting force spectroscopy peaks.
-"""
-
-import numpy as np
-
-def conv_dx(data,vect):
- '''
- Returns the right-centered convolution of data with vector vect
- '''
- dim=len(data)
- window=len(vect)
- temparr=[0.0]*dim
-
- end=dim-window
-
- for j in range(end):
- for k in range(window):
- temparr[j]+=data[j+k]*vect[k]
-
- return temparr
-
-def absdev(arr):
- '''
- Calculates the absolute deviation of a vector
- '''
- med=0.0
- absD=0.0
-
- med=np.mean(arr)
- for j in arr:
- absD+=abs(j-med)
- return absD/len(arr)
-
-def noise_absdev(data,positive=False,maxcut=0.2,stable=0.005):
- '''
- Returns the standard deviation of the noise.
- The logic is: we cut the most negative (or positive) data points until the absolute deviation
- becomes stable (it doesn't vary more than 0.005) or we have cut more than maxcut*len(data) points.
- Then calculate the absolute deviation.
-
- If positive=True we cut the most positive data points, False=we cut the negative ones.
- '''
- out=[item for item in data] #we copy just to be sure...
- out.sort()
- if positive:
- out.reverse()
-
- temp_absdev=absdev(out)
-
- for index in range(len(out)):
- cutindex=(index+1)*5
- cut_absdev=absdev(out[cutindex:]) #we jump five points after five points...
- if 1-(cut_absdev/temp_absdev) < stable and cutindex<(maxcut*len(out)):
- temp_absdev=cut_absdev
- else:
- break
-
- return cut_absdev
-
-def abovenoise(convoluted,noise_level,cut_index=0,abs_devs=4):
- '''
- Generates a vector which is 0 where the vector is less than abs_devs*noise_level ; 1 if not (spike).
- '''
- #calculate absolute noise deviation
- #noise_level=noise_absdev(convoluted[cut_index:])
-
- above=[]
-
- for index in range(len(convoluted)):
- if index<cut_index:
- above.append(0)
- else:
- #FIXME: should calculate the *average* (here we assume that convolution mean is 0, which is FALSE!)
- if convoluted[index] < -1*noise_level*abs_devs:
- above.append(convoluted[index])
- else:
- above.append(0)
- return above
-
-def find_peaks(above, seedouble=10):
- '''
- Finds individual peak location.
- abovenoise() finds all points above a given threshold in the convolution. This point is often not unique
- but there is a "cluster" of points above a threshold.
- Here we obtain only the coordinates of the largest *convolution* spike for each cluster.
-
- above=vector obtained from abovenoise()
- seedouble=value at which we want to "delete" double peaks. That is, if two peaks have a distance
- < than $seedouble points , only the first is kept.
- '''
- nonzero=[]
- peaks_location=[]
- peaks_size=[]
-
- for index in range(len(above)):
- if above[index] != 0:
- nonzero.append(index)
- else:
- if len(nonzero) != 0:
- if len(nonzero)==1:
- nonzero.append(nonzero[0]+1)
- peaks_size.append(min(above[nonzero[0]:nonzero[-1]]))
- peaks_location.append(above[nonzero[0]:nonzero[-1]].index(peaks_size[-1])+nonzero[0])
- nonzero=[]
- else:
- pass
-
- #recursively eliminate double peaks
- #(maybe not the smartest of cycles, but i'm asleep...)
- temp_location=None
- while temp_location != []:
- temp_location=[]
- if len(peaks_location)>1:
- for index in range(len(peaks_location)-1):
- if peaks_location[index+1]-peaks_location[index] < seedouble:
- temp_location=peaks_location[:index]+peaks_location[index+1:]
- if temp_location != []:
- peaks_location=temp_location
-
- return peaks_location,peaks_size
# License along with Hooke. If not, see
# <http://www.gnu.org/licenses/>.
-"""Plugin regarding general velocity clamp measurements
+"""The ``vclamp`` module provides :class:`VelocityClampPlugin` and
+several associated :class:`hooke.command.Command`\s for handling
+common velocity clamp analysis tasks.
"""
-from hooke.libhooke import WX_GOOD, ClickedPoint
-import wxversion
-wxversion.select(WX_GOOD)
-from wx import PostEvent
-import numpy as np
-import scipy as sp
-import copy
-import os.path
-import time
+from ..command import Command, Argument, Failure
+from ..plugin import Builtin
-import warnings
-warnings.simplefilter('ignore',np.RankWarning)
+# Utility functions
+
+def scale(curve):
+ return curve
+
+
+class VelocityClampPlugin (Builtin):
+ def __init__(self):
+ super(VelocityClampPlugin, self).__init__(name='vclamp')
+ self._commands = []
+# NextCommand(self), PreviousCommand(self), JumpCommand(self),
+# ]
class generalvclampCommands(object):
self.outlet.push(to_dump)
def _slope(self,points,fitspan):
- # Calls the function linefit_between
+ # Calls the function linefit_between
parameters=[0,0,[],[]]
try:
clickedpoints=[points[0].index,points[1].index]
self._send_plot([lineplot])
- return parameters[0]
+ return parameters[0]
def linefit_between(self,index1,index2,whatset=1):
return (linefit[0],linefit[1],xtofit,ytofit)
- def fit_interval_nm(self,start_index,plot,nm,backwards):
+ def fit_interval_nm(self,start_index,plot,nm,backwards):
'''
Calculates the number of points to fit, given a fit interval in nm
start_index: index of point
- def find_current_peaks(self,noflatten, a=True, maxpeak=True):
+ def find_current_peaks(self,noflatten, a=True, maxpeak=True):
#Find peaks.
if a==True:
a=self.convfilt_config['mindeviation']
return pk_location, peak_size
- def pickup_contact_point(self,N=1,whatset=1):
- '''macro to pick up the contact point by clicking'''
- contact_point=self._measure_N_points(N=1, whatset=1)[0]
- contact_point_index=contact_point.index
- self.wlccontact_point=contact_point
- self.wlccontact_index=contact_point.index
- self.wlccurrent=self.current.path
- return contact_point, contact_point_index
+ def pickup_contact_point(self,N=1,whatset=1):
+ '''macro to pick up the contact point by clicking'''
+ contact_point=self._measure_N_points(N=1, whatset=1)[0]
+ contact_point_index=contact_point.index
+ self.wlccontact_point=contact_point
+ self.wlccontact_index=contact_point.index
+ self.wlccurrent=self.current.path
+ return contact_point, contact_point_index
- def baseline_points(self,peak_location, displayed_plot):
- clicks=self.config['baseline_clicks']
- if clicks==0:
- self.basepoints=[]
- base_index_0=peak_location[-1]+self.fit_interval_nm(peak_location[-1], displayed_plot, self.config['auto_right_baseline'],False)
- self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_0))
- base_index_1=self.basepoints[0].index+self.fit_interval_nm(self.basepoints[0].index, displayed_plot, self.config['auto_left_baseline'],False)
+ def baseline_points(self,peak_location, displayed_plot):
+ clicks=self.config['baseline_clicks']
+ if clicks==0:
+ self.basepoints=[]
+ base_index_0=peak_location[-1]+self.fit_interval_nm(peak_location[-1], displayed_plot, self.config['auto_right_baseline'],False)
+ self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_0))
+ base_index_1=self.basepoints[0].index+self.fit_interval_nm(self.basepoints[0].index, displayed_plot, self.config['auto_left_baseline'],False)
+ self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_1))
+ elif clicks>0:
+ print 'Select baseline'
+ if clicks==1:
+ self.basepoints=self._measure_N_points(N=1, whatset=1)
+ base_index_1=self.basepoints[0].index+self.fit_interval_nm(self.basepoints[0].index, displayed_plot, self.config['auto_left_baseline'], False)
self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_1))
- elif clicks>0:
- print 'Select baseline'
- if clicks==1:
- self.basepoints=self._measure_N_points(N=1, whatset=1)
- base_index_1=self.basepoints[0].index+self.fit_interval_nm(self.basepoints[0].index, displayed_plot, self.config['auto_left_baseline'], False)
- self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_1))
- else:
- self.basepoints=self._measure_N_points(N=2, whatset=1)
+ else:
+ self.basepoints=self._measure_N_points(N=2, whatset=1)
- self.basecurrent=self.current.path
- return self.basepoints
+ self.basecurrent=self.current.path
+ return self.basepoints
super(CommandLine, self).__init__(name='command line')
def run(self, commands, ui_to_command_queue, command_to_ui_queue):
+ print 'running in ui'
cmd = HookeCmd(self, commands,
inqueue=ui_to_command_queue,
outqueue=command_to_ui_queue)
+ print 'cmdloop'
cmd.cmdloop(self._splash_text())
def run_lines(self, commands, ui_to_command_queue, command_to_ui_queue,
--- /dev/null
+# Copyright (C) 2007-2010 Fabrizio Benedetti
+# Massimo Sandal <devicerandom@gmail.com>
+# W. Trevor King <wking@drexel.edu>
+#
+# This file is part of Hooke.
+#
+# Hooke is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation, either
+# version 3 of the License, or (at your option) any later version.
+#
+# Hooke is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with Hooke. If not, see
+# <http://www.gnu.org/licenses/>.
+
+"""Utility functions for spotting peaks in signal data.
+"""
+
+from math import floor
+
+import numpy
+
+from ..command import Argument
+
+
+class Peak (object):
+ """A peak (signal spike) instance.
+
+ Peaks are a continuous series of signal data points that exceed
+ some threshold.
+
+ Parameters
+ ----------
+ name : string
+ Short comment explaining peak selection algorithm.
+ index : int
+ Offset of the first peak point in the original signal data.
+ values : array
+ Signal data values for each peak point.
+ """
+ def __init__(self, name='peak', index=0, values=[]):
+ self.name = name
+ self.index = index
+ self.values = values
+
+ def __str__(self):
+ return '<%s %s %d:%d>' % (self.__class__.__name__, self.name,
+ self.index, self.post_index())
+
+ def __repr__(self):
+ return '<%s %s %d %s>' % (self.__class__.__name__, self.name,
+ self.index, self.values)
+
+ def post_index(self):
+ """The index *after* the end of the peak.
+
+ Examples
+ --------
+
+ >>> p = Peak(index=5, values=numpy.arange(3))
+
+ The peak consists of indicies 5, 6, and 7.o
+
+ >>> p.post_index()
+ 8
+ """
+ return self.index + len(self.values)
+
+def mask_to_peaks(data, mask, name):
+ """Convert a mask array to a list of :class:`Peak`\s.
+
+ Parameters
+ ----------
+ data : array
+ Input data.
+ mask : array of booleans
+ `False` when data is noise, `True` for peaks.
+
+ Returns
+ -------
+ peaks : list of :mod:`Peak`\s.
+ A :mod:`Peak` instances for each continuous peak block.
+
+ See Also
+ --------
+ peaks_to_mask
+
+ Examples
+ --------
+
+ >>> data = numpy.arange(-10, -1)
+ >>> mask = numpy.zeros(data.shape, dtype=numpy.bool)
+ >>> mask[:2] = True
+ >>> mask[5:8] = True
+ >>> mask_to_peaks(data, mask, 'special points')
+ [<Peak special points 0 [-10 -9]>, <Peak special points 5 [-5 -4 -3]>]
+ """
+ peaks = []
+ i = 0
+ while i < len(mask):
+ if mask[i] == True: # masked peak point
+ start_i = i
+ while i < len(mask) and mask[i] == True:
+ i += 1
+ post_i = i
+ peaks.append(Peak(
+ name=name, index=start_i, values=data[start_i:post_i]))
+ else:
+ i += 1
+ return peaks
+
+def peaks_to_mask(data, peaks):
+ """Convert a list of :class:`Peak`\s to a mask array.
+
+ Parameters
+ ----------
+ data : array
+ Input data.
+ peaks : list of :mod:`Peak`\s.
+ A :mod:`Peak` instances for each continuous peak block.
+
+ Returns
+ -------
+ mask : array of booleans
+ `False` when data is noise, `True` for peaks.
+
+ See Also
+ --------
+ mask_to_peaks
+
+ Examples
+ --------
+
+ >>> data = numpy.arange(-10, -1)
+ >>> mask = numpy.ones(data.shape, dtype=numpy.bool)
+ >>> mask[:2] = False
+ >>> mask[5:8] = False
+ >>> peaks = mask_to_peaks(data, mask, 'special points')
+ >>> mask2 = peaks_to_mask(data, peaks)
+ >>> min(mask == mask2)
+ True
+ """
+ mask = numpy.zeros(data.shape, dtype=numpy.bool)
+ for peak in peaks:
+ mask[peak.index:peak.post_index()] = True
+ return mask
+
+def noise(data, cut_side='both', stable=0.005, max_cut=0.2):
+ """Find the portion of `data` that is "noise".
+
+ Parameters
+ ----------
+ data : array
+ Input signal.
+ cut_side : {'both', 'positive', 'negative'}
+ Side of the curve to cut from.
+ stable : float
+ Convergence threshold (stop if, when cutting more points, the
+ relative change in the standard deviation is less than
+ `stable`).
+ max_cut : float
+ Maximum fraction (0 to 1) of points to cut to evaluate noise,
+ even if it doesn't converge (to avoid "eating" all the noise).
+
+ Returns
+ -------
+ mask : array
+ :math:`mask[i] == True` if `i` indexes a noise portion of
+ `data`. It is `False` otherwise.
+ mean : float
+ The most recently calculated mean.
+ std : float
+ The most recently calculated standard deviation.
+ converged : {True, False}
+ Whether the algorithm converged.
+
+ Notes
+ -----
+
+ Algorithm:
+
+ 1) Calculate the mean and standard deviation of `data`
+ 2) Remove the most extreme point (from `cut_side`).
+ 3) Calclate mean and standard deviation of the remaining data.
+ 4) If the relative change in standard deviation is less than
+ `stable` or the new standard deviation is zero, return with
+ `converged` set to `True`.
+ 5) If `max cut` of the original number of points have been cut,
+ return with `converged` set to `False`.
+ 6) Return to step (2)
+
+ The implementation relies on :meth:`numpy.ndarray.argmax`.
+
+ Examples
+ --------
+
+ For our test data, use a step decrease.
+
+ >>> data = numpy.zeros((50,), dtype=numpy.float)
+ >>> data[:5] = 1.0
+ >>> mask,mean,std,converged = noise(data, cut_side='positive')
+
+ The standard deviation will decrease as long as you're removing
+ non-zero points (however, the relative change in the standard
+ deviation increases), so we expect the first 10 values to be
+ masked.
+
+ >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
+ >>> expected_mask[:5] = False
+ >>> min(mask == expected_mask)
+ True
+
+ When the remaining points are all zero, the mean and standard
+ deviation are both zero. So the algorithm exits with successful
+ convergence.
+
+ >>> mean
+ 0.0
+ >>> std
+ 0.0
+ >>> converged
+ True
+
+ If we had mad more of the initial points 1, the algorithm would
+ be limited by `max_cut` and not converge:
+
+ >>> max_cut = 0.2
+ >>> data[:2*max_cut*len(data)] = 1.0
+ >>> mask,mean,std,converged = noise(
+ ... data, cut_side='positive', max_cut=max_cut)
+ >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
+ >>> expected_mask[:max_cut*len(data)] = False
+ >>> min(mask == expected_mask)
+ True
+ >>> converged
+ False
+ """
+ mask = numpy.ones(data.shape, dtype=numpy.bool)
+ masked_mean = data.mean()
+ masked_std = data.std()
+
+ if cut_side == 'both':
+ def new_mask(data, mask, masked_mean):
+ mask[((data-masked_mean).abs()*mask).argmax()] = 0
+ return mask
+ elif cut_side == 'positive':
+ def new_mask(data, mask, masked_mean):
+ mask[(data*mask).argmax()] = 0
+ return mask
+ elif cut_side == 'negative':
+ def new_mask(data, mask, masked_mean):
+ mask[(data*mask).argmin()] = 0
+ return mask
+ else:
+ raise ValueError(cut_side)
+
+ num_cuts = 0
+ max_cuts = min(int(floor(max_cut * len(data))), len(data))
+ while num_cuts < max_cuts:
+ mask = new_mask(data, mask, masked_mean)
+ num_cuts += 1
+ new_masked_mean = (data*mask).mean()
+ new_masked_std = (data*mask).std()
+ rel_std = (masked_std-new_masked_std) / new_masked_std # >= 0
+ if rel_std < stable or new_masked_std == 0:
+ return (mask, new_masked_mean, new_masked_std, True)
+ masked_mean = new_masked_mean
+ masked_std = new_masked_std
+ return (mask, masked_mean, masked_std, False)
+
+noise_arguments = [
+ Argument('cut side', type='string', default='both',
+ help="""
+Select the side of the curve to cut from. `positive`, `negative`, or
+`both`.
+""".strip()),
+ Argument('stable', type='float', default=0.005, help="""
+Convergence threshold (stop if, when cutting more points, the relative
+change in the standard deviation is less than `stable`).
+""".strip()),
+ Argument('max cut', type='float', default=0.2, help="""
+The maximum fraction (0 to 1) of points to cut to evaluate noise, even
+if it doesn't converge (to avoid "eating" all the noise).
+""".strip()),
+ ]
+"""List :func:`noise`'s :class:`~hooke.command.Argument`\s for easy use
+by plugins in :mod:`~hooke.plugin`.
+"""
+
+def above_noise(data, side='both', min_deviations=5.0, mean=None, std=None):
+ """Find locations where `data` is far from the `mean`.
+
+ Parameters
+ ----------
+ data : array
+ Input data.
+ side : {'both', 'positive', 'negative'}
+ Side of the curve that can be "above" the noise.
+ min_deviations : float
+ Number of standard deviations beyond the mean to define
+ "above" the noise. Increase to tighten the filter.
+ mean : float
+ The mean of the input data's background noise.
+ std : float
+ The standard deviation of the input data's background noise.
+
+ Returns
+ -------
+ mask : array of booleans
+ `True` when data is beyond the threshold, otherwise `False`.
+
+ Notes
+ -----
+ If `mean` and `std` are None, they are calculted using
+ the respective `data` methods.
+
+ Examples
+ --------
+
+ >>> data = numpy.arange(-3, 4)
+ >>> above_noise(data, side='both', min_deviations=1.0, mean=0, std=1.0)
+ array([ True, False, False, False, False, False, True], dtype=bool)
+ >>> above_noise(data, side='positive', min_deviations=1.0, mean=0, std=1.0)
+ array([False, False, False, False, False, False, True], dtype=bool)
+ >>> above_noise(data, side='negative', min_deviations=1.0, mean=0, std=1.0)
+ array([ True, False, False, False, False, False, False], dtype=bool)
+ """
+ if mean == None:
+ mean = data.mean()
+ if std == None:
+ std == data.std()
+ if side == 'negative':
+ data = -data
+ mean = -mean
+ elif side == 'both':
+ data = numpy.absolute(data-mean)
+ mean = 0
+ return data > (min_deviations + std)
+
+above_noise_arguments = [
+ Argument('side', type='string', default='both',
+ help="""
+Select the side of the curve that counts as "above". `positive`,
+`negative`, or `both`.
+""".strip()),
+ Argument('min deviation', type='float', default=5.0, help="""
+Number of standard deviations above the noise to define a peak.
+Increase to tighten the filter.
+""".strip()),
+ ]
+"""List :func:`above_noise`' :class:`~hooke.command.Argument`\s for
+easy use by plugins in :mod:`~hooke.plugin`.
+"""
+
+def merge_double_peaks(data, peaks, see_double=10):
+ """Merge peaks that are "too close" together.
+
+ Parameters
+ ----------
+ data : array
+ Input data.
+ peaks : list of :mod:`Peak`\s.
+ A :mod:`Peak` instances for each continuous peak block.
+ see_double : int
+ If two peaks are separated by less than `see double` points,
+ count them (and the intervening data) as a single peak.
+
+ Returns
+ -------
+ peaks : list of :mod:`Peak`\s.
+ The modified list of :mod:`Peak`\s.
+
+ Examples
+ --------
+
+ >>> data = numpy.arange(150)
+ >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
+ ... Peak(name='b', index=15, values=data[15:18]),
+ ... Peak(name='c', index=23, values=data[23:27]),
+ ... Peak(name='d', index=100, values=data[100:101])]
+ >>> peaks = merge_double_peaks(data, peaks, see_double=10)
+ >>> print '\\n'.join([str(p) for p in peaks])
+ <Peak a 10:27>
+ <Peak d 100:101>
+ >>> min(peaks[0].values == data[10:27])
+ True
+ """
+ i = 0
+ while i < len(peaks)-1:
+ peak = peaks[i]
+ next_peak = peaks[i+1]
+ if next_peak.index - peak.post_index() > see_double: # far enough apart
+ i += 1 # move on to the next peak
+ else: # too close. merge the peaks
+ peaks[i] = Peak(name=peak.name, index=peak.index,
+ values=data[peak.index:next_peak.post_index()])
+ peaks.pop(i+1)
+ return peaks
+
+merge_double_peaks_arguments = [
+ Argument('see double', type='int', default=10, help="""
+If two peaks are separated by less than `see double` points, count
+them as a single peak.
+""".strip()),
+ ]
+"""List :func:`merge_double_peaks`' :class:`~hooke.command.Argument`\s
+for easy use by plugins in :mod:`~hooke.plugin`.
+"""
+
+def drop_narrow_peaks(peaks, min_points=1):
+ """Drop peaks that are "too narrow".
+
+ Parameters
+ ----------
+ peaks : list of :mod:`Peak`\s.
+ A :mod:`Peak` instances for each continuous peak block.
+ min_points : int
+ Minimum number of points for :class:`Peak` acceptance.
+
+ Returns
+ -------
+ peaks : list of :mod:`Peak`\s.
+ The modified list of :mod:`Peak`\s.
+
+ Examples
+ --------
+
+ >>> data = numpy.arange(150)
+ >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
+ ... Peak(name='b', index=15, values=data[15:18]),
+ ... Peak(name='c', index=23, values=data[23:27]),
+ ... Peak(name='d', index=100, values=data[100:101])]
+ >>> peaks = drop_narrow_peaks(peaks, min_points=3)
+ >>> print '\\n'.join([str(p) for p in peaks])
+ <Peak b 15:18>
+ <Peak c 23:27>
+ """
+ return [peak for peak in peaks if len(peak.values) >= min_points]
+
+drop_narrow_peaks_arguments = [
+ Argument('min points', type='int', default=1, help="""
+Minimum number of "feature" points for peak acceptance.
+""".strip()),
+ ]
+"""List :func:`drop_narrow_peaks`' :class:`~hooke.command.Argument`\s
+for easy use by plugins in :mod:`~hooke.plugin`.
+"""
+
+def _kwargs(kwargs, arguments, translations={}):
+ """Split off kwargs for the arguments listed in arguments.
+
+ Also add the kwargs marked in `translations`.
+
+ Examples
+ --------
+
+ >>> import pprint
+ >>> kwargs = {'param_a':1, 'param_b':2, 'param_c':3}
+ >>> args = [Argument(name='param a')]
+ >>> translations = {'the_big_c_param':'param_c'}
+ >>> pprint.pprint(_kwargs(kwargs, args, translations))
+ {'param_a': 1, 'the_big_c_param': 3}
+ """
+ keys = [arg.name.replace(' ', '_') for arg in arguments]
+ ret = dict([(key, kwargs[key]) for key in keys])
+ for target_key,source_key in translations.items():
+ if source_key in kwargs:
+ ret[target_key] = kwargs[source_key]
+ return ret
+
+def find_peaks(data, **kwargs):
+ """Catch all peak finder.
+
+ Runs in succession:
+
+ 1) :func:`noise`, to determine the standard deviation of the noise
+ in `data`.
+ 2) :func:`above_noise` to select the regions of `data` that are
+ "above" the noise.
+ 3) :func:`merge_double_peaks`
+ 4) :func:`drop_narrow_peaks`
+
+ The input parameters may be any accepted by the above functions.
+ """
+ mask,mean,std,converged = noise(data, **_kwargs(kwargs, noise_arguments))
+ if converged == False:
+ raise ValueError(kwargs)
+ mask = above_noise(data, **_kwargs(kwargs, above_noise_arguments))
+ peaks = mask_to_peaks(data, mask)
+ peaks = merge_double_peaks(
+ data, peaks, **_kwargs(kwargs, merge_double_peaks_arguments))
+ return drop_narrow_peaks(
+ peaks, **_kwargs(kwargs, drop_narrow_peaks_arguments))
+
+find_peaks_arguments = (noise_arguments
+ + above_noise_arguments
+ + merge_double_peaks_arguments
+ + drop_narrow_peaks_arguments)
+"""List :func:`find_peaks`' :class:`~hooke.command.Argument`\s for
+easy use by plugins in :mod:`~hooke.plugin`.
+"""