1 # Copyright (C) 2007-2011 Fabrizio Benedetti
2 # Massimo Sandal <devicerandom@gmail.com>
3 # W. Trevor King <wking@drexel.edu>
5 # This file is part of Hooke.
7 # Hooke is free software: you can redistribute it and/or modify it
8 # under the terms of the GNU Lesser General Public License as
9 # published by the Free Software Foundation, either version 3 of the
10 # License, or (at your option) any later version.
12 # Hooke is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
15 # Public License for more details.
17 # You should have received a copy of the GNU Lesser General Public
18 # License along with Hooke. If not, see
19 # <http://www.gnu.org/licenses/>.
21 """Utility functions for spotting peaks in signal data.
24 from math import floor
28 from ..command import Argument
32 """A peak (signal spike) instance.
34 Peaks are a continuous series of signal data points that exceed
40 Short comment explaining peak selection algorithm.
42 Offset of the first peak point in the original signal data.
44 Signal data values for each peak point.
46 def __init__(self, name='peak', index=0, values=[]):
52 return '<%s %s %d:%d>' % (self.__class__.__name__, self.name,
53 self.index, self.post_index())
56 return '<%s %s %d %s>' % (self.__class__.__name__, self.name,
57 self.index, self.values)
60 """The index *after* the end of the peak.
65 >>> p = Peak(index=5, values=numpy.arange(3))
67 The peak consists of indicies 5, 6, and 7.o
72 return self.index + len(self.values)
74 def mask_to_peaks(data, mask, name='peak'):
75 """Convert a mask array to a list of :class:`Peak`\s.
81 mask : array of booleans
82 `False` when data is noise, `True` for peaks.
86 peaks : list of :mod:`Peak`\s.
87 A :mod:`Peak` instances for each continuous peak block.
96 >>> data = numpy.arange(-10, -1)
97 >>> mask = numpy.zeros(data.shape, dtype=numpy.bool)
100 >>> mask_to_peaks(data, mask, 'special points')
101 [<Peak special points 0 [-10 -9]>, <Peak special points 5 [-5 -4 -3]>]
106 if mask[i] == True: # masked peak point
108 while i < len(mask) and mask[i] == True:
112 name=name, index=start_i, values=data[start_i:post_i]))
117 def peaks_to_mask(data, peaks):
118 """Convert a list of :class:`Peak`\s to a mask array.
124 peaks : list of :mod:`Peak`\s.
125 A :mod:`Peak` instances for each continuous peak block.
129 mask : array of booleans
130 `False` when data is noise, `True` for peaks.
139 >>> data = numpy.arange(-10, -1)
140 >>> mask = numpy.ones(data.shape, dtype=numpy.bool)
142 >>> mask[5:8] = False
143 >>> peaks = mask_to_peaks(data, mask, 'special points')
144 >>> mask2 = peaks_to_mask(data, peaks)
145 >>> min(mask == mask2)
148 mask = numpy.zeros(data.shape, dtype=numpy.bool)
150 mask[peak.index:peak.post_index()] = True
153 def noise(data, cut_side='both', stable=0.005, max_cut=0.2):
154 """Find the portion of `data` that is "noise".
160 cut_side : {'both', 'positive', 'negative'}
161 Side of the curve to cut from.
163 Convergence threshold (stop if, when cutting more points, the
164 relative change in the standard deviation is less than
167 Maximum fraction (0 to 1) of points to cut to evaluate noise,
168 even if it doesn't converge (to avoid "eating" all the noise).
173 :math:`mask[i] == True` if `i` indexes a noise portion of
174 `data`. It is `False` otherwise.
176 The most recently calculated mean.
178 The most recently calculated standard deviation.
179 converged : {True, False}
180 Whether the algorithm converged.
187 1) Calculate the mean and standard deviation of `data`
188 2) Remove the most extreme point (from `cut_side`).
189 3) Calclate mean and standard deviation of the remaining data.
190 4) If the relative change in standard deviation is less than
191 `stable` or the new standard deviation is zero, return with
192 `converged` set to `True`.
193 5) If `max cut` of the original number of points have been cut,
194 return with `converged` set to `False`.
195 6) Return to step (2)
197 The implementation relies on :meth:`numpy.ndarray.argmax`.
202 For our test data, use a step decrease.
204 >>> data = numpy.zeros((50,), dtype=numpy.float)
206 >>> mask,mean,std,converged = noise(data, cut_side='positive')
208 The standard deviation will decrease as long as you're removing
209 non-zero points (however, the relative change in the standard
210 deviation increases), so we expect the first 10 values to be
213 >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
214 >>> expected_mask[:5] = False
215 >>> min(mask == expected_mask)
218 When the remaining points are all zero, the mean and standard
219 deviation are both zero. So the algorithm exits with successful
229 If we had mad more of the initial points 1, the algorithm would
230 be limited by `max_cut` and not converge:
233 >>> data[:2*max_cut*len(data)] = 1.0
234 >>> mask,mean,std,converged = noise(
235 ... data, cut_side='positive', max_cut=max_cut)
236 >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
237 >>> expected_mask[:max_cut*len(data)] = False
238 >>> min(mask == expected_mask)
243 mask = numpy.ones(data.shape, dtype=numpy.bool)
244 masked_mean = data.mean()
245 masked_std = data.std()
247 if cut_side == 'both':
248 def new_mask(data, mask, masked_mean):
249 mask[(numpy.absolute(data-masked_mean)*mask).argmax()] = 0
251 elif cut_side == 'positive':
252 def new_mask(data, mask, masked_mean):
253 mask[(data*mask).argmax()] = 0
255 elif cut_side == 'negative':
256 def new_mask(data, mask, masked_mean):
257 mask[(data*mask).argmin()] = 0
260 raise ValueError(cut_side)
263 max_cuts = min(int(floor(max_cut * len(data))), len(data))
264 while num_cuts < max_cuts:
265 mask = new_mask(data, mask, masked_mean)
267 new_masked_mean = (data*mask).mean()
268 new_masked_std = (data*mask).std()
269 rel_std = (masked_std-new_masked_std) / new_masked_std # >= 0
270 if rel_std < stable or new_masked_std == 0:
271 return (mask, new_masked_mean, new_masked_std, True)
272 masked_mean = new_masked_mean
273 masked_std = new_masked_std
274 return (mask, masked_mean, masked_std, False)
277 Argument('cut side', type='string', default='both',
279 Select the side of the curve to cut from. `positive`, `negative`, or
282 Argument('stable', type='float', default=0.005, help="""
283 Convergence threshold (stop if, when cutting more points, the relative
284 change in the standard deviation is less than `stable`).
286 Argument('max cut', type='float', default=0.2, help="""
287 The maximum fraction (0 to 1) of points to cut to evaluate noise, even
288 if it doesn't converge (to avoid "eating" all the noise).
291 """List :func:`noise`'s :class:`~hooke.command.Argument`\s for easy use
292 by plugins in :mod:`~hooke.plugin`.
295 def above_noise(data, side='both', min_deviations=5.0, mean=None, std=None):
296 """Find locations where `data` is far from the `mean`.
302 side : {'both', 'positive', 'negative'}
303 Side of the curve that can be "above" the noise.
304 min_deviations : float
305 Number of standard deviations beyond the mean to define
306 "above" the noise. Increase to tighten the filter.
308 The mean of the input data's background noise.
310 The standard deviation of the input data's background noise.
314 mask : array of booleans
315 `True` when data is beyond the threshold, otherwise `False`.
319 If `mean` and `std` are None, they are calculted using
320 the respective `data` methods.
325 >>> data = numpy.arange(-3, 4)
326 >>> above_noise(data, side='both', min_deviations=1.1, mean=0, std=1.0)
327 array([ True, True, False, False, False, True, True], dtype=bool)
328 >>> above_noise(data, side='positive', min_deviations=1.1, mean=0, std=1.0)
329 array([False, False, False, False, False, True, True], dtype=bool)
330 >>> above_noise(data, side='negative', min_deviations=1.1, mean=0, std=1.0)
331 array([ True, True, False, False, False, False, False], dtype=bool)
337 if side == 'negative':
341 data = numpy.absolute(data-mean)
343 return data > (min_deviations * std)
345 above_noise_arguments = [
346 Argument('side', type='string', default='both',
348 Select the side of the curve that counts as "above". `positive`,
349 `negative`, or `both`.
351 Argument('min deviations', type='float', default=5.0, help="""
352 Number of standard deviations above the noise to define a peak.
353 Increase to tighten the filter.
356 """List :func:`above_noise`' :class:`~hooke.command.Argument`\s for
357 easy use by plugins in :mod:`~hooke.plugin`.
360 def merge_double_peaks(data, peaks, see_double=10):
361 """Merge peaks that are "too close" together.
367 peaks : list of :mod:`Peak`\s.
368 A :mod:`Peak` instances for each continuous peak block.
370 If two peaks are separated by less than `see double` points,
371 count them (and the intervening data) as a single peak.
375 peaks : list of :mod:`Peak`\s.
376 The modified list of :mod:`Peak`\s.
381 >>> data = numpy.arange(150)
382 >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
383 ... Peak(name='b', index=15, values=data[15:18]),
384 ... Peak(name='c', index=23, values=data[23:27]),
385 ... Peak(name='d', index=100, values=data[100:101])]
386 >>> peaks = merge_double_peaks(data, peaks, see_double=10)
387 >>> print '\\n'.join([str(p) for p in peaks])
390 >>> min(peaks[0].values == data[10:27])
394 while i < len(peaks)-1:
396 next_peak = peaks[i+1]
397 if next_peak.index - peak.post_index() > see_double: # far enough apart
398 i += 1 # move on to the next peak
399 else: # too close. merge the peaks
400 peaks[i] = Peak(name=peak.name, index=peak.index,
401 values=data[peak.index:next_peak.post_index()])
405 merge_double_peaks_arguments = [
406 Argument('see double', type='int', default=10, help="""
407 If two peaks are separated by less than `see double` points, count
408 them as a single peak.
411 """List :func:`merge_double_peaks`' :class:`~hooke.command.Argument`\s
412 for easy use by plugins in :mod:`~hooke.plugin`.
415 def drop_narrow_peaks(peaks, min_points=1):
416 """Drop peaks that are "too narrow".
420 peaks : list of :mod:`Peak`\s.
421 A :mod:`Peak` instances for each continuous peak block.
423 Minimum number of points for :class:`Peak` acceptance.
427 peaks : list of :mod:`Peak`\s.
428 The modified list of :mod:`Peak`\s.
433 >>> data = numpy.arange(150)
434 >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
435 ... Peak(name='b', index=15, values=data[15:18]),
436 ... Peak(name='c', index=23, values=data[23:27]),
437 ... Peak(name='d', index=100, values=data[100:101])]
438 >>> peaks = drop_narrow_peaks(peaks, min_points=3)
439 >>> print '\\n'.join([str(p) for p in peaks])
443 return [peak for peak in peaks if len(peak.values) >= min_points]
445 drop_narrow_peaks_arguments = [
446 Argument('min points', type='int', default=1, help="""
447 Minimum number of "feature" points for peak acceptance.
450 """List :func:`drop_narrow_peaks`' :class:`~hooke.command.Argument`\s
451 for easy use by plugins in :mod:`~hooke.plugin`.
454 def _kwargs(kwargs, arguments, translations={}, argument_input_keys=False):
455 """Split off kwargs for the arguments listed in arguments.
457 Also add the kwargs marked in `translations`.
463 >>> kwargs = {'param_a':1, 'param_b':2, 'param_c':3}
464 >>> args = [Argument(name='param a')]
465 >>> translations = {'the_big_c_param':'param_c'}
466 >>> pprint.pprint(_kwargs(kwargs, args, translations,
467 ... argument_input_keys=False))
468 {'param_a': 1, 'the_big_c_param': 3}
469 >>> pprint.pprint(_kwargs(kwargs, args, translations,
470 ... argument_input_keys=True))
471 {'the_big_c_param': 3}
472 >>> kwargs = {'param a':1, 'param b':2, 'param c':3}
473 >>> translations = {'the_big_c_param':'param c'}
474 >>> pprint.pprint(_kwargs(kwargs, args, translations,
475 ... argument_input_keys=True))
476 {'param_a': 1, 'the_big_c_param': 3}
478 arg_keys = [arg.name for arg in arguments]
479 keys = [arg.name.replace(' ', '_') for arg in arguments]
481 for arg_key,key in zip(arg_keys, keys):
483 if argument_input_keys == True:
486 ret[key] = kwargs[in_key]
487 for target_key,source_key in translations.items():
488 if source_key in kwargs:
489 ret[target_key] = kwargs[source_key]
492 def find_peaks(data, **kwargs):
493 """Catch all peak finder.
497 1) :func:`noise`, to determine the standard deviation of the noise
499 2) :func:`above_noise` to select the regions of `data` that are
501 3) :func:`merge_double_peaks`
502 4) :func:`drop_narrow_peaks`
504 The input parameters may be any accepted by the above functions.
506 mask,mean,std,converged = noise(data, **_kwargs(kwargs, noise_arguments))
507 mask = above_noise(data, mean=mean, std=std,
508 **_kwargs(kwargs, above_noise_arguments))
509 peaks = mask_to_peaks(data, mask)
510 peaks = merge_double_peaks(
511 data, peaks, **_kwargs(kwargs, merge_double_peaks_arguments))
512 return drop_narrow_peaks(
513 peaks, **_kwargs(kwargs, drop_narrow_peaks_arguments))
515 find_peaks_arguments = (noise_arguments
516 + above_noise_arguments
517 + merge_double_peaks_arguments
518 + drop_narrow_peaks_arguments)
519 """List :func:`find_peaks`' :class:`~hooke.command.Argument`\s for
520 easy use by plugins in :mod:`~hooke.plugin`.