dc65fb76348d8beaf7752fe745d90144a8577ed7
[hooke.git] / hooke / util / peak.py
1 # Copyright (C) 2007-2012 Fabrizio Benedetti <fabrizio.benedetti.82@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Utility functions for spotting peaks in signal data.
21 """
22
23 from math import floor
24
25 import numpy
26
27 from ..command import Argument
28
29
30 class Peak (object):
31     """A peak (signal spike) instance.
32
33     Peaks are a continuous series of signal data points that exceed
34     some threshold.
35
36     Parameters
37     ----------
38     name : string
39         Short comment explaining peak selection algorithm.
40     index : int
41         Offset of the first peak point in the original signal data.
42     values : array
43         Signal data values for each peak point.
44     """
45     def __init__(self, name='peak', index=0, values=[]):
46         self.name = name
47         self.index = index
48         self.values = values
49
50     def __str__(self):
51         return '<%s %s %d:%d>' % (self.__class__.__name__, self.name,
52                                   self.index, self.post_index())
53
54     def __repr__(self):
55         return '<%s %s %d %s>' % (self.__class__.__name__, self.name,
56                                   self.index, self.values)
57
58     def post_index(self):
59         """The index *after* the end of the peak.
60
61         Examples
62         --------
63
64         >>> p = Peak(index=5, values=numpy.arange(3))
65
66         The peak consists of indicies 5, 6, and 7.o
67
68         >>> p.post_index()
69         8
70         """
71         return self.index + len(self.values)
72
73 def mask_to_peaks(data, mask, name='peak'):
74     """Convert a mask array to a list of :class:`Peak`\s.
75
76     Parameters
77     ----------
78     data : array
79         Input data.
80     mask : array of booleans
81         `False` when data is noise, `True` for peaks.
82
83     Returns
84     -------
85     peaks : list of :mod:`Peak`\s.
86         A :mod:`Peak` instances for each continuous peak block.
87
88     See Also
89     --------
90     peaks_to_mask
91
92     Examples
93     --------
94
95     >>> data = numpy.arange(-10, -1)
96     >>> mask = numpy.zeros(data.shape, dtype=numpy.bool)
97     >>> mask[:2] = True
98     >>> mask[5:8] = True
99     >>> mask_to_peaks(data, mask, 'special points')
100     [<Peak special points 0 [-10  -9]>, <Peak special points 5 [-5 -4 -3]>]
101     """
102     peaks = []
103     i = 0
104     while i < len(mask):
105         if mask[i] == True: # masked peak point
106             start_i = i
107             while i < len(mask) and mask[i] == True:
108                 i += 1
109             post_i = i
110             peaks.append(Peak(
111                     name=name, index=start_i, values=data[start_i:post_i]))
112         else:
113             i += 1
114     return peaks
115
116 def peaks_to_mask(data, peaks):
117     """Convert a list of :class:`Peak`\s to a mask array.
118
119     Parameters
120     ----------
121     data : array
122         Input data.
123     peaks : list of :mod:`Peak`\s.
124         A :mod:`Peak` instances for each continuous peak block.
125
126     Returns
127     -------
128     mask : array of booleans
129         `False` when data is noise, `True` for peaks.
130
131     See Also
132     --------
133     mask_to_peaks
134
135     Examples
136     --------
137
138     >>> data = numpy.arange(-10, -1)
139     >>> mask = numpy.ones(data.shape, dtype=numpy.bool)
140     >>> mask[:2] = False
141     >>> mask[5:8] = False
142     >>> peaks = mask_to_peaks(data, mask, 'special points')
143     >>> mask2 = peaks_to_mask(data, peaks)
144     >>> min(mask == mask2)
145     True
146     """
147     mask = numpy.zeros(data.shape, dtype=numpy.bool)
148     for peak in peaks:
149         mask[peak.index:peak.post_index()] = True
150     return mask
151
152 def noise(data, cut_side='both', stable=0.005, max_cut=0.2):
153     """Find the portion of `data` that is "noise".
154
155     Parameters
156     ----------
157     data : array
158         Input signal.
159     cut_side : {'both', 'positive', 'negative'}
160         Side of the curve to cut from.
161     stable : float
162         Convergence threshold (stop if, when cutting more points, the
163         relative change in the standard deviation is less than
164         `stable`).
165     max_cut : float
166         Maximum fraction (0 to 1) of points to cut to evaluate noise,
167         even if it doesn't converge (to avoid "eating" all the noise).
168
169     Returns
170     -------
171     mask : array
172         :math:`mask[i] == True` if `i` indexes a noise portion of
173         `data`.  It is `False` otherwise.
174     mean : float
175         The most recently calculated mean.
176     std : float
177         The most recently calculated standard deviation.
178     converged : {True, False}
179         Whether the algorithm converged.
180
181     Notes
182     -----
183
184     Algorithm:
185
186     1) Calculate the mean and standard deviation of `data`
187     2) Remove the most extreme point (from `cut_side`).
188     3) Calclate mean and standard deviation of the remaining data.
189     4) If the relative change in standard deviation is less than
190       `stable` or the new standard deviation is zero, return with
191       `converged` set to `True`.
192     5) If `max cut` of the original number of points have been cut,
193        return with `converged` set to `False`.
194     6) Return to step (2)
195
196     The implementation relies on :meth:`numpy.ndarray.argmax`.
197
198     Examples
199     --------
200
201     For our test data, use a step decrease.
202
203     >>> data = numpy.zeros((50,), dtype=numpy.float)
204     >>> data[:5] = 1.0
205     >>> mask,mean,std,converged = noise(data, cut_side='positive')
206
207     The standard deviation will decrease as long as you're removing
208     non-zero points (however, the relative change in the standard
209     deviation increases), so we expect the first 10 values to be
210     masked.
211
212     >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
213     >>> expected_mask[:5] = False
214     >>> min(mask == expected_mask)
215     True
216
217     When the remaining points are all zero, the mean and standard
218     deviation are both zero.  So the algorithm exits with successful
219     convergence.
220
221     >>> mean
222     0.0
223     >>> std
224     0.0
225     >>> converged
226     True
227
228     If we had mad more of the initial points 1, the algorithm would
229     be limited by `max_cut` and not converge:
230
231     >>> max_cut = 0.2
232     >>> data[:2*max_cut*len(data)] = 1.0
233     >>> mask,mean,std,converged = noise(
234     ...     data, cut_side='positive', max_cut=max_cut)
235     >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
236     >>> expected_mask[:max_cut*len(data)] = False
237     >>> min(mask == expected_mask)
238     True
239     >>> converged
240     False
241     """
242     mask = numpy.ones(data.shape, dtype=numpy.bool)
243     masked_mean = data.mean()
244     masked_std = data.std()
245
246     if cut_side == 'both':
247         def new_mask(data, mask, masked_mean):
248             mask[(numpy.absolute(data-masked_mean)*mask).argmax()] = 0
249             return mask
250     elif cut_side == 'positive':
251         def new_mask(data, mask, masked_mean):
252             mask[(data*mask).argmax()] = 0
253             return mask
254     elif cut_side == 'negative':
255         def new_mask(data, mask, masked_mean):
256             mask[(data*mask).argmin()] = 0
257             return mask
258     else:
259         raise ValueError(cut_side)
260
261     num_cuts = 0
262     max_cuts = min(int(floor(max_cut * len(data))), len(data))
263     while num_cuts < max_cuts:
264         mask = new_mask(data, mask, masked_mean)
265         num_cuts += 1
266         new_masked_mean = (data*mask).mean()
267         new_masked_std = (data*mask).std()
268         rel_std = (masked_std-new_masked_std) / new_masked_std # >= 0
269         if rel_std < stable or new_masked_std == 0:
270             return (mask, new_masked_mean, new_masked_std, True)
271         masked_mean = new_masked_mean
272         masked_std = new_masked_std
273     return (mask, masked_mean, masked_std, False)
274
275 noise_arguments = [
276     Argument('cut side', type='string', default='both',
277              help="""
278 Select the side of the curve to cut from.  `positive`, `negative`, or
279 `both`.
280 """.strip()),
281     Argument('stable', type='float', default=0.005, help="""
282 Convergence threshold (stop if, when cutting more points, the relative
283 change in the standard deviation is less than `stable`).
284 """.strip()),
285     Argument('max cut', type='float', default=0.2, help="""
286 The maximum fraction (0 to 1) of points to cut to evaluate noise, even
287 if it doesn't converge (to avoid "eating" all the noise).
288 """.strip()),
289     ]
290 """List :func:`noise`'s :class:`~hooke.command.Argument`\s for easy use
291 by plugins in :mod:`~hooke.plugin`.
292 """
293
294 def above_noise(data, side='both', min_deviations=5.0, mean=None, std=None):
295     """Find locations where `data` is far from the `mean`.
296
297     Parameters
298     ----------
299     data : array
300         Input data.
301     side : {'both', 'positive', 'negative'}
302         Side of the curve that can be "above" the noise.
303     min_deviations : float
304         Number of standard deviations beyond the mean to define
305         "above" the noise.  Increase to tighten the filter.
306     mean : float
307         The mean of the input data's background noise.
308     std : float
309         The standard deviation of the input data's background noise.
310
311     Returns
312     -------
313     mask : array of booleans
314         `True` when data is beyond the threshold, otherwise `False`.
315
316     Notes
317     -----
318     If `mean` and `std` are None, they are calculted using
319     the respective `data` methods.
320
321     Examples
322     --------
323
324     >>> data = numpy.arange(-3, 4)
325     >>> above_noise(data, side='both', min_deviations=1.1, mean=0, std=1.0)
326     array([ True,  True, False, False, False,  True,  True], dtype=bool)
327     >>> above_noise(data, side='positive', min_deviations=1.1, mean=0, std=1.0)
328     array([False, False, False, False, False,  True,  True], dtype=bool)
329     >>> above_noise(data, side='negative', min_deviations=1.1, mean=0, std=1.0)
330     array([ True,  True, False, False, False, False, False], dtype=bool)
331     """
332     if mean == None:
333         mean = data.mean()
334     if std == None:
335         std = data.std()
336     if side == 'negative':
337         data = -data
338         mean = -mean
339     elif side == 'both':
340         data = numpy.absolute(data-mean)
341         mean = 0
342     return data > (min_deviations * std)
343
344 above_noise_arguments = [
345     Argument('side', type='string', default='both',
346              help="""
347 Select the side of the curve that counts as "above".  `positive`,
348 `negative`, or `both`.
349 """.strip()),
350     Argument('min deviations', type='float', default=5.0, help="""
351 Number of standard deviations above the noise to define a peak.
352 Increase to tighten the filter.
353 """.strip()),
354     ]
355 """List :func:`above_noise`' :class:`~hooke.command.Argument`\s for
356 easy use by plugins in :mod:`~hooke.plugin`.
357 """
358
359 def merge_double_peaks(data, peaks, see_double=10):
360     """Merge peaks that are "too close" together.
361
362     Parameters
363     ----------
364     data : array
365         Input data.
366     peaks : list of :mod:`Peak`\s.
367         A :mod:`Peak` instances for each continuous peak block.
368     see_double : int
369         If two peaks are separated by less than `see double` points,
370         count them (and the intervening data) as a single peak.
371
372     Returns
373     -------
374     peaks : list of :mod:`Peak`\s.
375         The modified list of :mod:`Peak`\s.
376
377     Examples
378     --------
379
380     >>> data = numpy.arange(150)
381     >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
382     ...          Peak(name='b', index=15, values=data[15:18]),
383     ...          Peak(name='c', index=23, values=data[23:27]),
384     ...          Peak(name='d', index=100, values=data[100:101])]
385     >>> peaks = merge_double_peaks(data, peaks, see_double=10)
386     >>> print '\\n'.join([str(p) for p in peaks])
387     <Peak a 10:27>
388     <Peak d 100:101>
389     >>> min(peaks[0].values == data[10:27])
390     True
391     """
392     i = 0
393     while i < len(peaks)-1:
394         peak = peaks[i]
395         next_peak = peaks[i+1]
396         if next_peak.index - peak.post_index() > see_double: # far enough apart
397             i += 1 # move on to the next peak
398         else: # too close.  merge the peaks
399             peaks[i] = Peak(name=peak.name, index=peak.index,
400                             values=data[peak.index:next_peak.post_index()])
401             peaks.pop(i+1)
402     return peaks
403
404 merge_double_peaks_arguments = [
405     Argument('see double', type='int', default=10, help="""
406 If two peaks are separated by less than `see double` points, count
407 them as a single peak.
408 """.strip()),
409     ]
410 """List :func:`merge_double_peaks`' :class:`~hooke.command.Argument`\s
411 for easy use by plugins in :mod:`~hooke.plugin`.
412 """
413
414 def drop_narrow_peaks(peaks, min_points=1):
415     """Drop peaks that are "too narrow".
416
417     Parameters
418     ----------
419     peaks : list of :mod:`Peak`\s.
420         A :mod:`Peak` instances for each continuous peak block.
421     min_points : int
422         Minimum number of points for :class:`Peak` acceptance.
423
424     Returns
425     -------
426     peaks : list of :mod:`Peak`\s.
427         The modified list of :mod:`Peak`\s.
428
429     Examples
430     --------
431
432     >>> data = numpy.arange(150)
433     >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
434     ...          Peak(name='b', index=15, values=data[15:18]),
435     ...          Peak(name='c', index=23, values=data[23:27]),
436     ...          Peak(name='d', index=100, values=data[100:101])]
437     >>> peaks = drop_narrow_peaks(peaks, min_points=3)
438     >>> print '\\n'.join([str(p) for p in peaks])
439     <Peak b 15:18>
440     <Peak c 23:27>
441     """
442     return [peak for peak in peaks if len(peak.values) >= min_points]
443
444 drop_narrow_peaks_arguments = [
445     Argument('min points', type='int', default=1, help="""
446 Minimum number of "feature" points for peak acceptance.
447 """.strip()),
448     ]
449 """List :func:`drop_narrow_peaks`' :class:`~hooke.command.Argument`\s
450 for easy use by plugins in :mod:`~hooke.plugin`.
451 """
452
453 def _kwargs(kwargs, arguments, translations={}, argument_input_keys=False):
454     """Split off kwargs for the arguments listed in arguments.
455
456     Also add the kwargs marked in `translations`.
457
458     Examples
459     --------
460
461     >>> import pprint
462     >>> kwargs = {'param_a':1, 'param_b':2, 'param_c':3}
463     >>> args = [Argument(name='param a')]
464     >>> translations = {'the_big_c_param':'param_c'}
465     >>> pprint.pprint(_kwargs(kwargs, args, translations,
466     ...                       argument_input_keys=False))
467     {'param_a': 1, 'the_big_c_param': 3}
468     >>> pprint.pprint(_kwargs(kwargs, args, translations,
469     ...                       argument_input_keys=True))
470     {'the_big_c_param': 3}
471     >>> kwargs = {'param a':1, 'param b':2, 'param c':3}
472     >>> translations = {'the_big_c_param':'param c'}
473     >>> pprint.pprint(_kwargs(kwargs, args, translations,
474     ...                       argument_input_keys=True))
475     {'param_a': 1, 'the_big_c_param': 3}
476     """
477     arg_keys = [arg.name for arg in arguments]
478     keys = [arg.name.replace(' ', '_') for arg in arguments]
479     ret = {}
480     for arg_key,key in zip(arg_keys, keys):
481         in_key = key
482         if argument_input_keys == True:
483             in_key = arg_key
484         if in_key in kwargs:
485             ret[key] = kwargs[in_key]
486     for target_key,source_key in translations.items():
487         if source_key in kwargs:
488             ret[target_key] = kwargs[source_key]
489     return ret
490
491 def find_peaks(data, **kwargs):
492     """Catch all peak finder.
493
494     Runs in succession:
495
496     1) :func:`noise`, to determine the standard deviation of the noise
497       in `data`.
498     2) :func:`above_noise` to select the regions of `data` that are
499       "above" the noise.
500     3) :func:`merge_double_peaks`
501     4) :func:`drop_narrow_peaks`
502
503     The input parameters may be any accepted by the above functions.
504     """
505     mask,mean,std,converged = noise(data, **_kwargs(kwargs, noise_arguments))
506     mask = above_noise(data, mean=mean, std=std,
507                        **_kwargs(kwargs, above_noise_arguments))
508     peaks = mask_to_peaks(data, mask)
509     peaks = merge_double_peaks(
510         data, peaks, **_kwargs(kwargs, merge_double_peaks_arguments))
511     return drop_narrow_peaks(
512         peaks, **_kwargs(kwargs, drop_narrow_peaks_arguments))
513
514 find_peaks_arguments = (noise_arguments
515                         + above_noise_arguments
516                         + merge_double_peaks_arguments
517                         + drop_narrow_peaks_arguments)
518 """List :func:`find_peaks`' :class:`~hooke.command.Argument`\s for
519 easy use by plugins in :mod:`~hooke.plugin`.
520 """