from numpy import diff
from scipy.signal.signaltools import medfilt
-from ..command import Command, Argument, Failure
+from ..command import Command, Argument, Success, Failure, UncaughtException
from ..config import Setting
+from ..experiment import VelocityClamp
from ..plugin import Plugin, argument_to_setting
from ..plugin.curve import CurveArgument
from ..plugin.playlist import FilterCommand
from ..plugin.vclamp import scale
-from ..util.peak import find_peaks, find_peaks_arguments, Peak
+from ..util.fit import PoorFit
+from ..util.peak import find_peaks, find_peaks_arguments, Peak, _kwargs
class FlatFiltPlugin (Plugin):
def __init__(self):
super(FlatFiltPlugin, self).__init__(name='flatfilt')
self._arguments = [ # For Command initialization
- Argument('median filter', type='int', default=7, help="""
+ Argument('median window', type='int', default=7, help="""
Median window filter size (in points).
+""".strip()),
+ Argument('blind window', type='float', default=20e-9, help="""
+Meters after the contact point where we do not count peaks to avoid
+non-specific surface interaction.
+""".strip()),
+ Argument('min peaks', type='int', default=4, help="""
+Minimum number of peaks for curve acceptance.
""".strip()),
] + copy.deepcopy(find_peaks_arguments)
# Set flat-filter-specific defaults for the fit_peak_arguments.
for key,value in [('cut side', 'both'),
('stable', 0.005),
('max cut', 0.2),
- ('min deviation', 9.0),
+ ('min deviations', 9.0),
('min points', 4),
('see double', 10e-9),
]:
# function, just detecting peaks.
super(FlatPeaksCommand, self).__init__(
name='flat filter peaks',
- arguments=[CurveArgument] + config_arguments,
+ arguments=[
+ CurveArgument,
+ ] + config_arguments,
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- z_data,d_data,params = self._setup(params)
+ z_data,d_data,params = self._setup(hooke, params)
start_index = 0
- while z_data[start_index] < params['bind window']:
+ while (start_index < len(z_data)
+ and z_data[start_index] < params['blind window']):
start_index += 1
- median = medfilt(d_data[start_index:], params['median window']),
+ median = medfilt(d_data[start_index:], params['median window'])
deriv = diff(median)
- kwargs = dict([(a.name, params[a.name]) for a in find_peaks_arguments])
- peaks = find_peaks(deriv, **kwargs)
+ peaks = find_peaks(deriv, **_kwargs(params, find_peaks_arguments,
+ argument_input_keys=True))
for peak in peaks:
- peak.name = 'flat filter of %s with %s' \
- % (params['deflection column name'], params['convolution'])
+ peak.name = 'flat filter of %s' % (params['deflection column name'])
peak.index += start_index
outqueue.put(peaks)
- def _setup(self, params):
+ def _setup(self, hooke, params):
"""Setup `params` from config and return the z piezo and
deflection arrays.
"""
if curve.info['experiment'] != VelocityClamp:
raise Failure('%s operates on VelocityClamp experiments, not %s'
% (self.name, curve.info['experiment']))
- for col in ['surface z piezo (m)', 'deflection (N)']:
+ for col in ['surface distance (m)', 'deflection (N)']:
if col not in curve.data[0].info['columns']:
- scale(curve)
+ scale(hooke, curve)
data = None
- for block in curve.data:
+ for i,block in enumerate(curve.data):
if block.info['name'].startswith('retract'):
data = block
break
if data == None:
raise Failure('No retraction blocks in %s.' % curve)
- z_data = data[:,data.info['columns'].index('surface z piezo (m)')]
+ z_data = data[:,data.info['columns'].index('surface distance (m)')]
if 'flattened deflection (N)' in data.info['columns']:
params['deflection column name'] = 'flattened deflection (N)'
else:
for key,value in params.items():
if value == None: # Use configured default value.
params[key] = self.plugin.config[key]
+ # TODO: better option parser to do this automatically by Argument.type
+ for key in ['blind window', 'median window', 'max cut', 'min deviations', 'min points', 'see double', 'stable']:
+ params[key] = float(params[key])
+ # TODO: convert 'see double' from nm to points
return z_data,d_data,params
class FlatFilterCommand (FilterCommand):
self.arguments.extend(plugin._arguments)
def filter(self, curve, hooke, inqueue, outqueue, params):
+ params['curve'] = curve
inq = Queue()
outq = Queue()
- conv_command = [c for c in hooke.commands
+ filt_command = [c for c in hooke.commands
if c.name=='flat filter peaks'][0]
- conv_command.run(hooke, inq, outq, **params)
+ filt_command.run(hooke, inq, outq, **params)
peaks = outq.get()
- if not isinstance(peaks[0], Peak):
+ if isinstance(peaks, UncaughtException) \
+ and isinstance(peaks.exception, PoorFit):
+ return False
+ if not (isinstance(peaks, list) and (len(peaks) == 0
+ or isinstance(peaks[0], Peak))):
raise Failure('Expected a list of Peaks, not %s' % peaks)
ret = outq.get()
if not isinstance(ret, Success):
raise ret
- return len(peaks) >= params['min peaks']
+ if params['min peaks'] == None: # Use configured default value.
+ params['min peaks'] = self.plugin.config['min peaks']
+ return len(peaks) >= int(params['min peaks'])