1 # Copyright (C) 2007-2012 Fabrizio Benedetti <fabrizio.benedetti.82@gmail.com>
2 # W. Trevor King <wking@drexel.edu>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14 # Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke. If not, see
18 # <http://www.gnu.org/licenses/>.
20 """Utility functions for spotting peaks in signal data.
23 from math import floor
27 from ..command import Argument
31 """A peak (signal spike) instance.
33 Peaks are a continuous series of signal data points that exceed
39 Short comment explaining peak selection algorithm.
41 Offset of the first peak point in the original signal data.
43 Signal data values for each peak point.
45 def __init__(self, name='peak', index=0, values=[]):
51 return '<%s %s %d:%d>' % (self.__class__.__name__, self.name,
52 self.index, self.post_index())
55 return '<%s %s %d %s>' % (self.__class__.__name__, self.name,
56 self.index, self.values)
59 """The index *after* the end of the peak.
64 >>> p = Peak(index=5, values=numpy.arange(3))
66 The peak consists of indicies 5, 6, and 7.o
71 return self.index + len(self.values)
73 def mask_to_peaks(data, mask, name='peak'):
74 """Convert a mask array to a list of :class:`Peak`\s.
80 mask : array of booleans
81 `False` when data is noise, `True` for peaks.
85 peaks : list of :mod:`Peak`\s.
86 A :mod:`Peak` instances for each continuous peak block.
95 >>> data = numpy.arange(-10, -1)
96 >>> mask = numpy.zeros(data.shape, dtype=numpy.bool)
99 >>> mask_to_peaks(data, mask, 'special points')
100 [<Peak special points 0 [-10 -9]>, <Peak special points 5 [-5 -4 -3]>]
105 if mask[i] == True: # masked peak point
107 while i < len(mask) and mask[i] == True:
111 name=name, index=start_i, values=data[start_i:post_i]))
116 def peaks_to_mask(data, peaks):
117 """Convert a list of :class:`Peak`\s to a mask array.
123 peaks : list of :mod:`Peak`\s.
124 A :mod:`Peak` instances for each continuous peak block.
128 mask : array of booleans
129 `False` when data is noise, `True` for peaks.
138 >>> data = numpy.arange(-10, -1)
139 >>> mask = numpy.ones(data.shape, dtype=numpy.bool)
141 >>> mask[5:8] = False
142 >>> peaks = mask_to_peaks(data, mask, 'special points')
143 >>> mask2 = peaks_to_mask(data, peaks)
144 >>> min(mask == mask2)
147 mask = numpy.zeros(data.shape, dtype=numpy.bool)
149 mask[peak.index:peak.post_index()] = True
152 def noise(data, cut_side='both', stable=0.005, max_cut=0.2):
153 """Find the portion of `data` that is "noise".
159 cut_side : {'both', 'positive', 'negative'}
160 Side of the curve to cut from.
162 Convergence threshold (stop if, when cutting more points, the
163 relative change in the standard deviation is less than
166 Maximum fraction (0 to 1) of points to cut to evaluate noise,
167 even if it doesn't converge (to avoid "eating" all the noise).
172 :math:`mask[i] == True` if `i` indexes a noise portion of
173 `data`. It is `False` otherwise.
175 The most recently calculated mean.
177 The most recently calculated standard deviation.
178 converged : {True, False}
179 Whether the algorithm converged.
186 1) Calculate the mean and standard deviation of `data`
187 2) Remove the most extreme point (from `cut_side`).
188 3) Calclate mean and standard deviation of the remaining data.
189 4) If the relative change in standard deviation is less than
190 `stable` or the new standard deviation is zero, return with
191 `converged` set to `True`.
192 5) If `max cut` of the original number of points have been cut,
193 return with `converged` set to `False`.
194 6) Return to step (2)
196 The implementation relies on :meth:`numpy.ndarray.argmax`.
201 For our test data, use a step decrease.
203 >>> data = numpy.zeros((50,), dtype=numpy.float)
205 >>> mask,mean,std,converged = noise(data, cut_side='positive')
207 The standard deviation will decrease as long as you're removing
208 non-zero points (however, the relative change in the standard
209 deviation increases), so we expect the first 10 values to be
212 >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
213 >>> expected_mask[:5] = False
214 >>> min(mask == expected_mask)
217 When the remaining points are all zero, the mean and standard
218 deviation are both zero. So the algorithm exits with successful
228 If we had mad more of the initial points 1, the algorithm would
229 be limited by `max_cut` and not converge:
232 >>> data[:2*max_cut*len(data)] = 1.0
233 >>> mask,mean,std,converged = noise(
234 ... data, cut_side='positive', max_cut=max_cut)
235 >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
236 >>> expected_mask[:max_cut*len(data)] = False
237 >>> min(mask == expected_mask)
242 mask = numpy.ones(data.shape, dtype=numpy.bool)
243 masked_mean = data.mean()
244 masked_std = data.std()
246 if cut_side == 'both':
247 def new_mask(data, mask, masked_mean):
248 mask[(numpy.absolute(data-masked_mean)*mask).argmax()] = 0
250 elif cut_side == 'positive':
251 def new_mask(data, mask, masked_mean):
252 mask[(data*mask).argmax()] = 0
254 elif cut_side == 'negative':
255 def new_mask(data, mask, masked_mean):
256 mask[(data*mask).argmin()] = 0
259 raise ValueError(cut_side)
262 max_cuts = min(int(floor(max_cut * len(data))), len(data))
263 while num_cuts < max_cuts:
264 mask = new_mask(data, mask, masked_mean)
266 new_masked_mean = (data*mask).mean()
267 new_masked_std = (data*mask).std()
268 rel_std = (masked_std-new_masked_std) / new_masked_std # >= 0
269 if rel_std < stable or new_masked_std == 0:
270 return (mask, new_masked_mean, new_masked_std, True)
271 masked_mean = new_masked_mean
272 masked_std = new_masked_std
273 return (mask, masked_mean, masked_std, False)
276 Argument('cut side', type='string', default='both',
278 Select the side of the curve to cut from. `positive`, `negative`, or
281 Argument('stable', type='float', default=0.005, help="""
282 Convergence threshold (stop if, when cutting more points, the relative
283 change in the standard deviation is less than `stable`).
285 Argument('max cut', type='float', default=0.2, help="""
286 The maximum fraction (0 to 1) of points to cut to evaluate noise, even
287 if it doesn't converge (to avoid "eating" all the noise).
290 """List :func:`noise`'s :class:`~hooke.command.Argument`\s for easy use
291 by plugins in :mod:`~hooke.plugin`.
294 def above_noise(data, side='both', min_deviations=5.0, mean=None, std=None):
295 """Find locations where `data` is far from the `mean`.
301 side : {'both', 'positive', 'negative'}
302 Side of the curve that can be "above" the noise.
303 min_deviations : float
304 Number of standard deviations beyond the mean to define
305 "above" the noise. Increase to tighten the filter.
307 The mean of the input data's background noise.
309 The standard deviation of the input data's background noise.
313 mask : array of booleans
314 `True` when data is beyond the threshold, otherwise `False`.
318 If `mean` and `std` are None, they are calculted using
319 the respective `data` methods.
324 >>> data = numpy.arange(-3, 4)
325 >>> above_noise(data, side='both', min_deviations=1.1, mean=0, std=1.0)
326 array([ True, True, False, False, False, True, True], dtype=bool)
327 >>> above_noise(data, side='positive', min_deviations=1.1, mean=0, std=1.0)
328 array([False, False, False, False, False, True, True], dtype=bool)
329 >>> above_noise(data, side='negative', min_deviations=1.1, mean=0, std=1.0)
330 array([ True, True, False, False, False, False, False], dtype=bool)
336 if side == 'negative':
340 data = numpy.absolute(data-mean)
342 return data > (min_deviations * std)
344 above_noise_arguments = [
345 Argument('side', type='string', default='both',
347 Select the side of the curve that counts as "above". `positive`,
348 `negative`, or `both`.
350 Argument('min deviations', type='float', default=5.0, help="""
351 Number of standard deviations above the noise to define a peak.
352 Increase to tighten the filter.
355 """List :func:`above_noise`' :class:`~hooke.command.Argument`\s for
356 easy use by plugins in :mod:`~hooke.plugin`.
359 def merge_double_peaks(data, peaks, see_double=10):
360 """Merge peaks that are "too close" together.
366 peaks : list of :mod:`Peak`\s.
367 A :mod:`Peak` instances for each continuous peak block.
369 If two peaks are separated by less than `see double` points,
370 count them (and the intervening data) as a single peak.
374 peaks : list of :mod:`Peak`\s.
375 The modified list of :mod:`Peak`\s.
380 >>> data = numpy.arange(150)
381 >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
382 ... Peak(name='b', index=15, values=data[15:18]),
383 ... Peak(name='c', index=23, values=data[23:27]),
384 ... Peak(name='d', index=100, values=data[100:101])]
385 >>> peaks = merge_double_peaks(data, peaks, see_double=10)
386 >>> print '\\n'.join([str(p) for p in peaks])
389 >>> min(peaks[0].values == data[10:27])
393 while i < len(peaks)-1:
395 next_peak = peaks[i+1]
396 if next_peak.index - peak.post_index() > see_double: # far enough apart
397 i += 1 # move on to the next peak
398 else: # too close. merge the peaks
399 peaks[i] = Peak(name=peak.name, index=peak.index,
400 values=data[peak.index:next_peak.post_index()])
404 merge_double_peaks_arguments = [
405 Argument('see double', type='int', default=10, help="""
406 If two peaks are separated by less than `see double` points, count
407 them as a single peak.
410 """List :func:`merge_double_peaks`' :class:`~hooke.command.Argument`\s
411 for easy use by plugins in :mod:`~hooke.plugin`.
414 def drop_narrow_peaks(peaks, min_points=1):
415 """Drop peaks that are "too narrow".
419 peaks : list of :mod:`Peak`\s.
420 A :mod:`Peak` instances for each continuous peak block.
422 Minimum number of points for :class:`Peak` acceptance.
426 peaks : list of :mod:`Peak`\s.
427 The modified list of :mod:`Peak`\s.
432 >>> data = numpy.arange(150)
433 >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
434 ... Peak(name='b', index=15, values=data[15:18]),
435 ... Peak(name='c', index=23, values=data[23:27]),
436 ... Peak(name='d', index=100, values=data[100:101])]
437 >>> peaks = drop_narrow_peaks(peaks, min_points=3)
438 >>> print '\\n'.join([str(p) for p in peaks])
442 return [peak for peak in peaks if len(peak.values) >= min_points]
444 drop_narrow_peaks_arguments = [
445 Argument('min points', type='int', default=1, help="""
446 Minimum number of "feature" points for peak acceptance.
449 """List :func:`drop_narrow_peaks`' :class:`~hooke.command.Argument`\s
450 for easy use by plugins in :mod:`~hooke.plugin`.
453 def _kwargs(kwargs, arguments, translations={}, argument_input_keys=False):
454 """Split off kwargs for the arguments listed in arguments.
456 Also add the kwargs marked in `translations`.
462 >>> kwargs = {'param_a':1, 'param_b':2, 'param_c':3}
463 >>> args = [Argument(name='param a')]
464 >>> translations = {'the_big_c_param':'param_c'}
465 >>> pprint.pprint(_kwargs(kwargs, args, translations,
466 ... argument_input_keys=False))
467 {'param_a': 1, 'the_big_c_param': 3}
468 >>> pprint.pprint(_kwargs(kwargs, args, translations,
469 ... argument_input_keys=True))
470 {'the_big_c_param': 3}
471 >>> kwargs = {'param a':1, 'param b':2, 'param c':3}
472 >>> translations = {'the_big_c_param':'param c'}
473 >>> pprint.pprint(_kwargs(kwargs, args, translations,
474 ... argument_input_keys=True))
475 {'param_a': 1, 'the_big_c_param': 3}
477 arg_keys = [arg.name for arg in arguments]
478 keys = [arg.name.replace(' ', '_') for arg in arguments]
480 for arg_key,key in zip(arg_keys, keys):
482 if argument_input_keys == True:
485 ret[key] = kwargs[in_key]
486 for target_key,source_key in translations.items():
487 if source_key in kwargs:
488 ret[target_key] = kwargs[source_key]
491 def find_peaks(data, **kwargs):
492 """Catch all peak finder.
496 1) :func:`noise`, to determine the standard deviation of the noise
498 2) :func:`above_noise` to select the regions of `data` that are
500 3) :func:`merge_double_peaks`
501 4) :func:`drop_narrow_peaks`
503 The input parameters may be any accepted by the above functions.
505 mask,mean,std,converged = noise(data, **_kwargs(kwargs, noise_arguments))
506 mask = above_noise(data, mean=mean, std=std,
507 **_kwargs(kwargs, above_noise_arguments))
508 peaks = mask_to_peaks(data, mask)
509 peaks = merge_double_peaks(
510 data, peaks, **_kwargs(kwargs, merge_double_peaks_arguments))
511 return drop_narrow_peaks(
512 peaks, **_kwargs(kwargs, drop_narrow_peaks_arguments))
514 find_peaks_arguments = (noise_arguments
515 + above_noise_arguments
516 + merge_double_peaks_arguments
517 + drop_narrow_peaks_arguments)
518 """List :func:`find_peaks`' :class:`~hooke.command.Argument`\s for
519 easy use by plugins in :mod:`~hooke.plugin`.