1 # Copyright (C) 2007-2012 Fabrizio Benedetti <fabrizio.benedetti.82@gmail.com>
2 # W. Trevor King <wking@tremily.us>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it under the
7 # terms of the GNU Lesser General Public License as published by the Free
8 # Software Foundation, either version 3 of the License, or (at your option) any
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT ANY
12 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
13 # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with Hooke. If not, see <http://www.gnu.org/licenses/>.
19 """Utility functions for spotting peaks in signal data.
22 from math import floor
26 from ..command import Argument
30 """A peak (signal spike) instance.
32 Peaks are a continuous series of signal data points that exceed
38 Short comment explaining peak selection algorithm.
40 Offset of the first peak point in the original signal data.
42 Signal data values for each peak point.
44 def __init__(self, name='peak', index=0, values=[]):
50 return '<%s %s %d:%d>' % (self.__class__.__name__, self.name,
51 self.index, self.post_index())
54 return '<%s %s %d %s>' % (self.__class__.__name__, self.name,
55 self.index, self.values)
58 """The index *after* the end of the peak.
63 >>> p = Peak(index=5, values=numpy.arange(3))
65 The peak consists of indicies 5, 6, and 7.o
70 return self.index + len(self.values)
72 def mask_to_peaks(data, mask, name='peak'):
73 """Convert a mask array to a list of :class:`Peak`\s.
79 mask : array of booleans
80 `False` when data is noise, `True` for peaks.
84 peaks : list of :mod:`Peak`\s.
85 A :mod:`Peak` instances for each continuous peak block.
94 >>> data = numpy.arange(-10, -1)
95 >>> mask = numpy.zeros(data.shape, dtype=numpy.bool)
98 >>> mask_to_peaks(data, mask, 'special points')
99 [<Peak special points 0 [-10 -9]>, <Peak special points 5 [-5 -4 -3]>]
104 if mask[i] == True: # masked peak point
106 while i < len(mask) and mask[i] == True:
110 name=name, index=start_i, values=data[start_i:post_i]))
115 def peaks_to_mask(data, peaks):
116 """Convert a list of :class:`Peak`\s to a mask array.
122 peaks : list of :mod:`Peak`\s.
123 A :mod:`Peak` instances for each continuous peak block.
127 mask : array of booleans
128 `False` when data is noise, `True` for peaks.
137 >>> data = numpy.arange(-10, -1)
138 >>> mask = numpy.ones(data.shape, dtype=numpy.bool)
140 >>> mask[5:8] = False
141 >>> peaks = mask_to_peaks(data, mask, 'special points')
142 >>> mask2 = peaks_to_mask(data, peaks)
143 >>> min(mask == mask2)
146 mask = numpy.zeros(data.shape, dtype=numpy.bool)
148 mask[peak.index:peak.post_index()] = True
151 def noise(data, cut_side='both', stable=0.005, max_cut=0.2):
152 """Find the portion of `data` that is "noise".
158 cut_side : {'both', 'positive', 'negative'}
159 Side of the curve to cut from.
161 Convergence threshold (stop if, when cutting more points, the
162 relative change in the standard deviation is less than
165 Maximum fraction (0 to 1) of points to cut to evaluate noise,
166 even if it doesn't converge (to avoid "eating" all the noise).
171 :math:`mask[i] == True` if `i` indexes a noise portion of
172 `data`. It is `False` otherwise.
174 The most recently calculated mean.
176 The most recently calculated standard deviation.
177 converged : {True, False}
178 Whether the algorithm converged.
185 1) Calculate the mean and standard deviation of `data`
186 2) Remove the most extreme point (from `cut_side`).
187 3) Calclate mean and standard deviation of the remaining data.
188 4) If the relative change in standard deviation is less than
189 `stable` or the new standard deviation is zero, return with
190 `converged` set to `True`.
191 5) If `max cut` of the original number of points have been cut,
192 return with `converged` set to `False`.
193 6) Return to step (2)
195 The implementation relies on :meth:`numpy.ndarray.argmax`.
200 For our test data, use a step decrease.
202 >>> data = numpy.zeros((50,), dtype=numpy.float)
204 >>> mask,mean,std,converged = noise(data, cut_side='positive')
206 The standard deviation will decrease as long as you're removing
207 non-zero points (however, the relative change in the standard
208 deviation increases), so we expect the first 10 values to be
211 >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
212 >>> expected_mask[:5] = False
213 >>> min(mask == expected_mask)
216 When the remaining points are all zero, the mean and standard
217 deviation are both zero. So the algorithm exits with successful
227 If we had mad more of the initial points 1, the algorithm would
228 be limited by `max_cut` and not converge:
231 >>> data[:2*max_cut*len(data)] = 1.0
232 >>> mask,mean,std,converged = noise(
233 ... data, cut_side='positive', max_cut=max_cut)
234 >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
235 >>> expected_mask[:max_cut*len(data)] = False
236 >>> min(mask == expected_mask)
241 mask = numpy.ones(data.shape, dtype=numpy.bool)
242 masked_mean = data.mean()
243 masked_std = data.std()
245 if cut_side == 'both':
246 def new_mask(data, mask, masked_mean):
247 mask[(numpy.absolute(data-masked_mean)*mask).argmax()] = 0
249 elif cut_side == 'positive':
250 def new_mask(data, mask, masked_mean):
251 mask[(data*mask).argmax()] = 0
253 elif cut_side == 'negative':
254 def new_mask(data, mask, masked_mean):
255 mask[(data*mask).argmin()] = 0
258 raise ValueError(cut_side)
261 max_cuts = min(int(floor(max_cut * len(data))), len(data))
262 while num_cuts < max_cuts:
263 mask = new_mask(data, mask, masked_mean)
265 new_masked_mean = (data*mask).mean()
266 new_masked_std = (data*mask).std()
267 rel_std = (masked_std-new_masked_std) / new_masked_std # >= 0
268 if rel_std < stable or new_masked_std == 0:
269 return (mask, new_masked_mean, new_masked_std, True)
270 masked_mean = new_masked_mean
271 masked_std = new_masked_std
272 return (mask, masked_mean, masked_std, False)
275 Argument('cut side', type='string', default='both',
277 Select the side of the curve to cut from. `positive`, `negative`, or
280 Argument('stable', type='float', default=0.005, help="""
281 Convergence threshold (stop if, when cutting more points, the relative
282 change in the standard deviation is less than `stable`).
284 Argument('max cut', type='float', default=0.2, help="""
285 The maximum fraction (0 to 1) of points to cut to evaluate noise, even
286 if it doesn't converge (to avoid "eating" all the noise).
289 """List :func:`noise`'s :class:`~hooke.command.Argument`\s for easy use
290 by plugins in :mod:`~hooke.plugin`.
293 def above_noise(data, side='both', min_deviations=5.0, mean=None, std=None):
294 """Find locations where `data` is far from the `mean`.
300 side : {'both', 'positive', 'negative'}
301 Side of the curve that can be "above" the noise.
302 min_deviations : float
303 Number of standard deviations beyond the mean to define
304 "above" the noise. Increase to tighten the filter.
306 The mean of the input data's background noise.
308 The standard deviation of the input data's background noise.
312 mask : array of booleans
313 `True` when data is beyond the threshold, otherwise `False`.
317 If `mean` and `std` are None, they are calculted using
318 the respective `data` methods.
323 >>> data = numpy.arange(-3, 4)
324 >>> above_noise(data, side='both', min_deviations=1.1, mean=0, std=1.0)
325 array([ True, True, False, False, False, True, True], dtype=bool)
326 >>> above_noise(data, side='positive', min_deviations=1.1, mean=0, std=1.0)
327 array([False, False, False, False, False, True, True], dtype=bool)
328 >>> above_noise(data, side='negative', min_deviations=1.1, mean=0, std=1.0)
329 array([ True, True, False, False, False, False, False], dtype=bool)
335 if side == 'negative':
339 data = numpy.absolute(data-mean)
341 return data > (min_deviations * std)
343 above_noise_arguments = [
344 Argument('side', type='string', default='both',
346 Select the side of the curve that counts as "above". `positive`,
347 `negative`, or `both`.
349 Argument('min deviations', type='float', default=5.0, help="""
350 Number of standard deviations above the noise to define a peak.
351 Increase to tighten the filter.
354 """List :func:`above_noise`' :class:`~hooke.command.Argument`\s for
355 easy use by plugins in :mod:`~hooke.plugin`.
358 def merge_double_peaks(data, peaks, see_double=10):
359 """Merge peaks that are "too close" together.
365 peaks : list of :mod:`Peak`\s.
366 A :mod:`Peak` instances for each continuous peak block.
368 If two peaks are separated by less than `see double` points,
369 count them (and the intervening data) as a single peak.
373 peaks : list of :mod:`Peak`\s.
374 The modified list of :mod:`Peak`\s.
379 >>> data = numpy.arange(150)
380 >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
381 ... Peak(name='b', index=15, values=data[15:18]),
382 ... Peak(name='c', index=23, values=data[23:27]),
383 ... Peak(name='d', index=100, values=data[100:101])]
384 >>> peaks = merge_double_peaks(data, peaks, see_double=10)
385 >>> print '\\n'.join([str(p) for p in peaks])
388 >>> min(peaks[0].values == data[10:27])
392 while i < len(peaks)-1:
394 next_peak = peaks[i+1]
395 if next_peak.index - peak.post_index() > see_double: # far enough apart
396 i += 1 # move on to the next peak
397 else: # too close. merge the peaks
398 peaks[i] = Peak(name=peak.name, index=peak.index,
399 values=data[peak.index:next_peak.post_index()])
403 merge_double_peaks_arguments = [
404 Argument('see double', type='int', default=10, help="""
405 If two peaks are separated by less than `see double` points, count
406 them as a single peak.
409 """List :func:`merge_double_peaks`' :class:`~hooke.command.Argument`\s
410 for easy use by plugins in :mod:`~hooke.plugin`.
413 def drop_narrow_peaks(peaks, min_points=1):
414 """Drop peaks that are "too narrow".
418 peaks : list of :mod:`Peak`\s.
419 A :mod:`Peak` instances for each continuous peak block.
421 Minimum number of points for :class:`Peak` acceptance.
425 peaks : list of :mod:`Peak`\s.
426 The modified list of :mod:`Peak`\s.
431 >>> data = numpy.arange(150)
432 >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
433 ... Peak(name='b', index=15, values=data[15:18]),
434 ... Peak(name='c', index=23, values=data[23:27]),
435 ... Peak(name='d', index=100, values=data[100:101])]
436 >>> peaks = drop_narrow_peaks(peaks, min_points=3)
437 >>> print '\\n'.join([str(p) for p in peaks])
441 return [peak for peak in peaks if len(peak.values) >= min_points]
443 drop_narrow_peaks_arguments = [
444 Argument('min points', type='int', default=1, help="""
445 Minimum number of "feature" points for peak acceptance.
448 """List :func:`drop_narrow_peaks`' :class:`~hooke.command.Argument`\s
449 for easy use by plugins in :mod:`~hooke.plugin`.
452 def _kwargs(kwargs, arguments, translations={}, argument_input_keys=False):
453 """Split off kwargs for the arguments listed in arguments.
455 Also add the kwargs marked in `translations`.
461 >>> kwargs = {'param_a':1, 'param_b':2, 'param_c':3}
462 >>> args = [Argument(name='param a')]
463 >>> translations = {'the_big_c_param':'param_c'}
464 >>> pprint.pprint(_kwargs(kwargs, args, translations,
465 ... argument_input_keys=False))
466 {'param_a': 1, 'the_big_c_param': 3}
467 >>> pprint.pprint(_kwargs(kwargs, args, translations,
468 ... argument_input_keys=True))
469 {'the_big_c_param': 3}
470 >>> kwargs = {'param a':1, 'param b':2, 'param c':3}
471 >>> translations = {'the_big_c_param':'param c'}
472 >>> pprint.pprint(_kwargs(kwargs, args, translations,
473 ... argument_input_keys=True))
474 {'param_a': 1, 'the_big_c_param': 3}
476 arg_keys = [arg.name for arg in arguments]
477 keys = [arg.name.replace(' ', '_') for arg in arguments]
479 for arg_key,key in zip(arg_keys, keys):
481 if argument_input_keys == True:
484 ret[key] = kwargs[in_key]
485 for target_key,source_key in translations.items():
486 if source_key in kwargs:
487 ret[target_key] = kwargs[source_key]
490 def find_peaks(data, **kwargs):
491 """Catch all peak finder.
495 1) :func:`noise`, to determine the standard deviation of the noise
497 2) :func:`above_noise` to select the regions of `data` that are
499 3) :func:`merge_double_peaks`
500 4) :func:`drop_narrow_peaks`
502 The input parameters may be any accepted by the above functions.
504 mask,mean,std,converged = noise(data, **_kwargs(kwargs, noise_arguments))
505 mask = above_noise(data, mean=mean, std=std,
506 **_kwargs(kwargs, above_noise_arguments))
507 peaks = mask_to_peaks(data, mask)
508 peaks = merge_double_peaks(
509 data, peaks, **_kwargs(kwargs, merge_double_peaks_arguments))
510 return drop_narrow_peaks(
511 peaks, **_kwargs(kwargs, drop_narrow_peaks_arguments))
513 find_peaks_arguments = (noise_arguments
514 + above_noise_arguments
515 + merge_double_peaks_arguments
516 + drop_narrow_peaks_arguments)
517 """List :func:`find_peaks`' :class:`~hooke.command.Argument`\s for
518 easy use by plugins in :mod:`~hooke.plugin`.