import copy
from Queue import Queue
-from numpy import diff
+from numpy import absolute, diff
from scipy.signal.signaltools import medfilt
-from ..command import Command, Argument, Success, Failure, UncaughtException
+from ..command import Argument, Success, Failure, UncaughtException
from ..config import Setting
from ..curve import Data
from ..experiment import VelocityClamp
from ..plugin import Plugin, argument_to_setting
-from ..plugin.curve import CurveArgument
+from ..plugin.curve import ColumnAddingCommand
from ..plugin.playlist import FilterCommand
from ..util.fit import PoorFit
from ..util.peak import (find_peaks, peaks_to_mask,
self._settings.append(argument_to_setting(
self.setting_section, argument))
argument.default = None # if argument isn't given, use the config.
- self._arguments.extend([ # Non-configurable arguments
- Argument(name='retract block', type='string',
- default='retract',
- help="""
-Name of the retracting data block.
-""".strip()),
- Argument(name='input distance column', type='string',
- default='surface distance (m)',
- help="""
-Name of the column to use as the surface position input.
-""".strip()),
- Argument(name='input deflection column', type='string',
- default='deflection (N)',
- help="""
-Name of the column to use as the deflection input.
-""".strip()),
- Argument(name='output peak column', type='string',
- default='peak deflection',
- help="""
-Name of the column (without units) to use as the peak-maske deflection output.
-""".strip()),
- Argument(name='peak info name', type='string',
- default='flat filter peaks',
- help="""
-Name for storing the distance offset in the `.info` dictionary.
-""".strip()),
- ])
self._commands = [FlatPeaksCommand(self), FlatFilterCommand(self)]
def dependencies(self):
return self._settings
-class FlatPeaksCommand (Command):
+class FlatPeaksCommand (ColumnAddingCommand):
"""Detect peaks in velocity clamp data using noise statistics.
Notes
# function, just detecting peaks.
super(FlatPeaksCommand, self).__init__(
name='flat filter peaks',
+ columns=[
+ ('distance column', 'surface distance (m)', """
+Name of the column to use as the surface position input.
+""".strip()),
+ ('deflection column', 'surface deflection (m)', """
+Name of the column to use as the deflection input.
+""".strip()),
+ ],
+ new_columns=[
+ ('output peak column', 'flat filter peaks', """
+Name of the column (without units) to use as the peak output.
+""".split()),
+ ],
arguments=[
- CurveArgument,
+ Argument(name='peak info name', type='string',
+ default='flat filter peaks',
+ help="""
+Name (without units) for storing the list of peaks in the `.info`
+dictionary.
+""".strip()),
] + plugin_arguments,
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data,block_index,z_data,d_data,params = self._setup(hooke, params)
- start_index = 0
- while (start_index < len(z_data)
- and z_data[start_index] < params['blind window']):
- start_index += 1
- median = medfilt(d_data[start_index:], params['median window'])
+ params = self.__setup_params(hooke=hooke, params=params)
+ block = self._block(hooke=hooke, params=params)
+ dist_data = self._get_column(hooke=hooke, params=params,
+ column_name='distance column')
+ def_data = self._get_column(hooke=hooke, params=params,
+ column_name='deflection column')
+ start_index = absolute(dist_data-params['blind window']).argmin()
+ median = medfilt(def_data[start_index:], params['median window'])
deriv = diff(median)
peaks = find_peaks(deriv, **_kwargs(params, find_peaks_arguments,
argument_input_keys=True))
- d_name,d_unit = split_data_label(params['input deflection column'])
for i,peak in enumerate(peaks):
- peak.name = 'flat filter peak %d of %s' % (i, d_name)
+ peak.name = self._peak_name(params, i)
peak.index += start_index
- data.info[params['peak info name']] = peaks
-
- # Add a peak-masked deflection column.
- new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-1] = data
- d_name,d_unit = split_data_label(params['input deflection column'])
- new.info['columns'].append(
- join_data_label(params['output peak column'], d_unit))
- new[:,-1] = peaks_to_mask(d_data, peaks) * d_data
+ block.info[params['peak info name']] = peaks
+
+ self._set_column(hooke=hooke, params=params,
+ column_name='output peak column',
+ values=peaks_to_mask(def_data, peaks) * def_data)
outqueue.put(peaks)
- params['curve'].data[block_index] = new
- def _setup(self, hooke, params):
+ def __setup_params(self, hooke, params):
"""Setup `params` from config and return the z piezo and
deflection arrays.
"""
- curve = params['curve']
+ curve = self._curve(hooke=hooke, params=params)
if curve.info['experiment'] != VelocityClamp:
raise Failure('%s operates on VelocityClamp experiments, not %s'
% (self.name, curve.info['experiment']))
- data = None
- for i,block in enumerate(curve.data):
- if block.info['name'].startswith(params['retract block']):
- data = block
- block_index = i
- break
- if data == None:
- raise Failure('No retraction blocks in %s.' % curve)
- z_data = data[:,data.info['columns'].index(
- params['input distance column'])]
- d_data = data[:,data.info['columns'].index(
- params['input deflection column'])]
for key,value in params.items():
if value == None: # Use configured default value.
params[key] = self.plugin.config[key]
# TODO: convert 'see double' from nm to points
- return (data, block_index, z_data, d_data, params)
+ name,def_unit = split_data_label(params['deflection column'])
+ params['output peak column'] = join_data_label(
+ params['output peak column'], def_unit)
+ params['peak info name'] = join_data_label(
+ params['peak info name'], def_unit)
+ return params
+
+ def _peak_name(self, params, index):
+ d_name,d_unit = split_data_label(params['deflection column'])
+ return 'flat filter peak %d of %s' % (index, d_name)
class FlatFilterCommand (FilterCommand):
def __init__(self, plugin):
super(FlatFilterCommand, self).__init__(
plugin, name='flat filter playlist')
- self.arguments.extend(plugin._arguments)
+ self.arguments.extend(plugin._arguments) # TODO: curve & block arguments
def filter(self, curve, hooke, inqueue, outqueue, params):
params['curve'] = curve