452f77219deac7465817b2d2dbf4937483cdce26
[hooke.git] / hooke / util / peak.py
1 # Copyright (C) 2007-2011 Fabrizio Benedetti
2 #                         Massimo Sandal <devicerandom@gmail.com>
3 #                         W. Trevor King <wking@drexel.edu>
4 #
5 # This file is part of Hooke.
6 #
7 # Hooke is free software: you can redistribute it and/or modify it
8 # under the terms of the GNU Lesser General Public License as
9 # published by the Free Software Foundation, either version 3 of the
10 # License, or (at your option) any later version.
11 #
12 # Hooke is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
15 # Public License for more details.
16 #
17 # You should have received a copy of the GNU Lesser General Public
18 # License along with Hooke.  If not, see
19 # <http://www.gnu.org/licenses/>.
20
21 """Utility functions for spotting peaks in signal data.
22 """
23
24 from math import floor
25
26 import numpy
27
28 from ..command import Argument
29
30
31 class Peak (object):
32     """A peak (signal spike) instance.
33
34     Peaks are a continuous series of signal data points that exceed
35     some threshold.
36
37     Parameters
38     ----------
39     name : string
40         Short comment explaining peak selection algorithm.
41     index : int
42         Offset of the first peak point in the original signal data.
43     values : array
44         Signal data values for each peak point.
45     """
46     def __init__(self, name='peak', index=0, values=[]):
47         self.name = name
48         self.index = index
49         self.values = values
50
51     def __str__(self):
52         return '<%s %s %d:%d>' % (self.__class__.__name__, self.name,
53                                   self.index, self.post_index())
54
55     def __repr__(self):
56         return '<%s %s %d %s>' % (self.__class__.__name__, self.name,
57                                   self.index, self.values)
58
59     def post_index(self):
60         """The index *after* the end of the peak.
61
62         Examples
63         --------
64
65         >>> p = Peak(index=5, values=numpy.arange(3))
66
67         The peak consists of indicies 5, 6, and 7.o
68
69         >>> p.post_index()
70         8
71         """
72         return self.index + len(self.values)
73
74 def mask_to_peaks(data, mask, name='peak'):
75     """Convert a mask array to a list of :class:`Peak`\s.
76
77     Parameters
78     ----------
79     data : array
80         Input data.
81     mask : array of booleans
82         `False` when data is noise, `True` for peaks.
83
84     Returns
85     -------
86     peaks : list of :mod:`Peak`\s.
87         A :mod:`Peak` instances for each continuous peak block.
88
89     See Also
90     --------
91     peaks_to_mask
92
93     Examples
94     --------
95
96     >>> data = numpy.arange(-10, -1)
97     >>> mask = numpy.zeros(data.shape, dtype=numpy.bool)
98     >>> mask[:2] = True
99     >>> mask[5:8] = True
100     >>> mask_to_peaks(data, mask, 'special points')
101     [<Peak special points 0 [-10  -9]>, <Peak special points 5 [-5 -4 -3]>]
102     """
103     peaks = []
104     i = 0
105     while i < len(mask):
106         if mask[i] == True: # masked peak point
107             start_i = i
108             while i < len(mask) and mask[i] == True:
109                 i += 1
110             post_i = i
111             peaks.append(Peak(
112                     name=name, index=start_i, values=data[start_i:post_i]))
113         else:
114             i += 1
115     return peaks
116
117 def peaks_to_mask(data, peaks):
118     """Convert a list of :class:`Peak`\s to a mask array.
119
120     Parameters
121     ----------
122     data : array
123         Input data.
124     peaks : list of :mod:`Peak`\s.
125         A :mod:`Peak` instances for each continuous peak block.
126
127     Returns
128     -------
129     mask : array of booleans
130         `False` when data is noise, `True` for peaks.
131
132     See Also
133     --------
134     mask_to_peaks
135
136     Examples
137     --------
138
139     >>> data = numpy.arange(-10, -1)
140     >>> mask = numpy.ones(data.shape, dtype=numpy.bool)
141     >>> mask[:2] = False
142     >>> mask[5:8] = False
143     >>> peaks = mask_to_peaks(data, mask, 'special points')
144     >>> mask2 = peaks_to_mask(data, peaks)
145     >>> min(mask == mask2)
146     True
147     """
148     mask = numpy.zeros(data.shape, dtype=numpy.bool)
149     for peak in peaks:
150         mask[peak.index:peak.post_index()] = True
151     return mask
152
153 def noise(data, cut_side='both', stable=0.005, max_cut=0.2):
154     """Find the portion of `data` that is "noise".
155
156     Parameters
157     ----------
158     data : array
159         Input signal.
160     cut_side : {'both', 'positive', 'negative'}
161         Side of the curve to cut from.
162     stable : float
163         Convergence threshold (stop if, when cutting more points, the
164         relative change in the standard deviation is less than
165         `stable`).
166     max_cut : float
167         Maximum fraction (0 to 1) of points to cut to evaluate noise,
168         even if it doesn't converge (to avoid "eating" all the noise).
169
170     Returns
171     -------
172     mask : array
173         :math:`mask[i] == True` if `i` indexes a noise portion of
174         `data`.  It is `False` otherwise.
175     mean : float
176         The most recently calculated mean.
177     std : float
178         The most recently calculated standard deviation.
179     converged : {True, False}
180         Whether the algorithm converged.
181
182     Notes
183     -----
184
185     Algorithm:
186
187     1) Calculate the mean and standard deviation of `data`
188     2) Remove the most extreme point (from `cut_side`).
189     3) Calclate mean and standard deviation of the remaining data.
190     4) If the relative change in standard deviation is less than
191       `stable` or the new standard deviation is zero, return with
192       `converged` set to `True`.
193     5) If `max cut` of the original number of points have been cut,
194        return with `converged` set to `False`.
195     6) Return to step (2)
196
197     The implementation relies on :meth:`numpy.ndarray.argmax`.
198
199     Examples
200     --------
201
202     For our test data, use a step decrease.
203
204     >>> data = numpy.zeros((50,), dtype=numpy.float)
205     >>> data[:5] = 1.0
206     >>> mask,mean,std,converged = noise(data, cut_side='positive')
207
208     The standard deviation will decrease as long as you're removing
209     non-zero points (however, the relative change in the standard
210     deviation increases), so we expect the first 10 values to be
211     masked.
212
213     >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
214     >>> expected_mask[:5] = False
215     >>> min(mask == expected_mask)
216     True
217
218     When the remaining points are all zero, the mean and standard
219     deviation are both zero.  So the algorithm exits with successful
220     convergence.
221
222     >>> mean
223     0.0
224     >>> std
225     0.0
226     >>> converged
227     True
228
229     If we had mad more of the initial points 1, the algorithm would
230     be limited by `max_cut` and not converge:
231
232     >>> max_cut = 0.2
233     >>> data[:2*max_cut*len(data)] = 1.0
234     >>> mask,mean,std,converged = noise(
235     ...     data, cut_side='positive', max_cut=max_cut)
236     >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
237     >>> expected_mask[:max_cut*len(data)] = False
238     >>> min(mask == expected_mask)
239     True
240     >>> converged
241     False
242     """
243     mask = numpy.ones(data.shape, dtype=numpy.bool)
244     masked_mean = data.mean()
245     masked_std = data.std()
246
247     if cut_side == 'both':
248         def new_mask(data, mask, masked_mean):
249             mask[(numpy.absolute(data-masked_mean)*mask).argmax()] = 0
250             return mask
251     elif cut_side == 'positive':
252         def new_mask(data, mask, masked_mean):
253             mask[(data*mask).argmax()] = 0
254             return mask
255     elif cut_side == 'negative':
256         def new_mask(data, mask, masked_mean):
257             mask[(data*mask).argmin()] = 0
258             return mask
259     else:
260         raise ValueError(cut_side)
261
262     num_cuts = 0
263     max_cuts = min(int(floor(max_cut * len(data))), len(data))
264     while num_cuts < max_cuts:
265         mask = new_mask(data, mask, masked_mean)
266         num_cuts += 1
267         new_masked_mean = (data*mask).mean()
268         new_masked_std = (data*mask).std()
269         rel_std = (masked_std-new_masked_std) / new_masked_std # >= 0
270         if rel_std < stable or new_masked_std == 0:
271             return (mask, new_masked_mean, new_masked_std, True)
272         masked_mean = new_masked_mean
273         masked_std = new_masked_std
274     return (mask, masked_mean, masked_std, False)
275
276 noise_arguments = [
277     Argument('cut side', type='string', default='both',
278              help="""
279 Select the side of the curve to cut from.  `positive`, `negative`, or
280 `both`.
281 """.strip()),
282     Argument('stable', type='float', default=0.005, help="""
283 Convergence threshold (stop if, when cutting more points, the relative
284 change in the standard deviation is less than `stable`).
285 """.strip()),
286     Argument('max cut', type='float', default=0.2, help="""
287 The maximum fraction (0 to 1) of points to cut to evaluate noise, even
288 if it doesn't converge (to avoid "eating" all the noise).
289 """.strip()),
290     ]
291 """List :func:`noise`'s :class:`~hooke.command.Argument`\s for easy use
292 by plugins in :mod:`~hooke.plugin`.
293 """
294
295 def above_noise(data, side='both', min_deviations=5.0, mean=None, std=None):
296     """Find locations where `data` is far from the `mean`.
297
298     Parameters
299     ----------
300     data : array
301         Input data.
302     side : {'both', 'positive', 'negative'}
303         Side of the curve that can be "above" the noise.
304     min_deviations : float
305         Number of standard deviations beyond the mean to define
306         "above" the noise.  Increase to tighten the filter.
307     mean : float
308         The mean of the input data's background noise.
309     std : float
310         The standard deviation of the input data's background noise.
311
312     Returns
313     -------
314     mask : array of booleans
315         `True` when data is beyond the threshold, otherwise `False`.
316
317     Notes
318     -----
319     If `mean` and `std` are None, they are calculted using
320     the respective `data` methods.
321
322     Examples
323     --------
324
325     >>> data = numpy.arange(-3, 4)
326     >>> above_noise(data, side='both', min_deviations=1.1, mean=0, std=1.0)
327     array([ True,  True, False, False, False,  True,  True], dtype=bool)
328     >>> above_noise(data, side='positive', min_deviations=1.1, mean=0, std=1.0)
329     array([False, False, False, False, False,  True,  True], dtype=bool)
330     >>> above_noise(data, side='negative', min_deviations=1.1, mean=0, std=1.0)
331     array([ True,  True, False, False, False, False, False], dtype=bool)
332     """
333     if mean == None:
334         mean = data.mean()
335     if std == None:
336         std = data.std()
337     if side == 'negative':
338         data = -data
339         mean = -mean
340     elif side == 'both':
341         data = numpy.absolute(data-mean)
342         mean = 0
343     return data > (min_deviations * std)
344
345 above_noise_arguments = [
346     Argument('side', type='string', default='both',
347              help="""
348 Select the side of the curve that counts as "above".  `positive`,
349 `negative`, or `both`.
350 """.strip()),
351     Argument('min deviations', type='float', default=5.0, help="""
352 Number of standard deviations above the noise to define a peak.
353 Increase to tighten the filter.
354 """.strip()),
355     ]
356 """List :func:`above_noise`' :class:`~hooke.command.Argument`\s for
357 easy use by plugins in :mod:`~hooke.plugin`.
358 """
359
360 def merge_double_peaks(data, peaks, see_double=10):
361     """Merge peaks that are "too close" together.
362
363     Parameters
364     ----------
365     data : array
366         Input data.
367     peaks : list of :mod:`Peak`\s.
368         A :mod:`Peak` instances for each continuous peak block.
369     see_double : int
370         If two peaks are separated by less than `see double` points,
371         count them (and the intervening data) as a single peak.
372
373     Returns
374     -------
375     peaks : list of :mod:`Peak`\s.
376         The modified list of :mod:`Peak`\s.
377
378     Examples
379     --------
380
381     >>> data = numpy.arange(150)
382     >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
383     ...          Peak(name='b', index=15, values=data[15:18]),
384     ...          Peak(name='c', index=23, values=data[23:27]),
385     ...          Peak(name='d', index=100, values=data[100:101])]
386     >>> peaks = merge_double_peaks(data, peaks, see_double=10)
387     >>> print '\\n'.join([str(p) for p in peaks])
388     <Peak a 10:27>
389     <Peak d 100:101>
390     >>> min(peaks[0].values == data[10:27])
391     True
392     """
393     i = 0
394     while i < len(peaks)-1:
395         peak = peaks[i]
396         next_peak = peaks[i+1]
397         if next_peak.index - peak.post_index() > see_double: # far enough apart
398             i += 1 # move on to the next peak
399         else: # too close.  merge the peaks
400             peaks[i] = Peak(name=peak.name, index=peak.index,
401                             values=data[peak.index:next_peak.post_index()])
402             peaks.pop(i+1)
403     return peaks
404
405 merge_double_peaks_arguments = [
406     Argument('see double', type='int', default=10, help="""
407 If two peaks are separated by less than `see double` points, count
408 them as a single peak.
409 """.strip()),
410     ]
411 """List :func:`merge_double_peaks`' :class:`~hooke.command.Argument`\s
412 for easy use by plugins in :mod:`~hooke.plugin`.
413 """
414
415 def drop_narrow_peaks(peaks, min_points=1):
416     """Drop peaks that are "too narrow".
417
418     Parameters
419     ----------
420     peaks : list of :mod:`Peak`\s.
421         A :mod:`Peak` instances for each continuous peak block.
422     min_points : int
423         Minimum number of points for :class:`Peak` acceptance.
424
425     Returns
426     -------
427     peaks : list of :mod:`Peak`\s.
428         The modified list of :mod:`Peak`\s.
429
430     Examples
431     --------
432
433     >>> data = numpy.arange(150)
434     >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
435     ...          Peak(name='b', index=15, values=data[15:18]),
436     ...          Peak(name='c', index=23, values=data[23:27]),
437     ...          Peak(name='d', index=100, values=data[100:101])]
438     >>> peaks = drop_narrow_peaks(peaks, min_points=3)
439     >>> print '\\n'.join([str(p) for p in peaks])
440     <Peak b 15:18>
441     <Peak c 23:27>
442     """
443     return [peak for peak in peaks if len(peak.values) >= min_points]
444
445 drop_narrow_peaks_arguments = [
446     Argument('min points', type='int', default=1, help="""
447 Minimum number of "feature" points for peak acceptance.
448 """.strip()),
449     ]
450 """List :func:`drop_narrow_peaks`' :class:`~hooke.command.Argument`\s
451 for easy use by plugins in :mod:`~hooke.plugin`.
452 """
453
454 def _kwargs(kwargs, arguments, translations={}, argument_input_keys=False):
455     """Split off kwargs for the arguments listed in arguments.
456
457     Also add the kwargs marked in `translations`.
458
459     Examples
460     --------
461
462     >>> import pprint
463     >>> kwargs = {'param_a':1, 'param_b':2, 'param_c':3}
464     >>> args = [Argument(name='param a')]
465     >>> translations = {'the_big_c_param':'param_c'}
466     >>> pprint.pprint(_kwargs(kwargs, args, translations,
467     ...                       argument_input_keys=False))
468     {'param_a': 1, 'the_big_c_param': 3}
469     >>> pprint.pprint(_kwargs(kwargs, args, translations,
470     ...                       argument_input_keys=True))
471     {'the_big_c_param': 3}
472     >>> kwargs = {'param a':1, 'param b':2, 'param c':3}
473     >>> translations = {'the_big_c_param':'param c'}
474     >>> pprint.pprint(_kwargs(kwargs, args, translations,
475     ...                       argument_input_keys=True))
476     {'param_a': 1, 'the_big_c_param': 3}
477     """
478     arg_keys = [arg.name for arg in arguments]
479     keys = [arg.name.replace(' ', '_') for arg in arguments]
480     ret = {}
481     for arg_key,key in zip(arg_keys, keys):
482         in_key = key
483         if argument_input_keys == True:
484             in_key = arg_key
485         if in_key in kwargs:
486             ret[key] = kwargs[in_key]
487     for target_key,source_key in translations.items():
488         if source_key in kwargs:
489             ret[target_key] = kwargs[source_key]
490     return ret
491
492 def find_peaks(data, **kwargs):
493     """Catch all peak finder.
494
495     Runs in succession:
496
497     1) :func:`noise`, to determine the standard deviation of the noise
498       in `data`.
499     2) :func:`above_noise` to select the regions of `data` that are
500       "above" the noise.
501     3) :func:`merge_double_peaks`
502     4) :func:`drop_narrow_peaks`
503
504     The input parameters may be any accepted by the above functions.
505     """
506     mask,mean,std,converged = noise(data, **_kwargs(kwargs, noise_arguments))
507     mask = above_noise(data, mean=mean, std=std,
508                        **_kwargs(kwargs, above_noise_arguments))
509     peaks = mask_to_peaks(data, mask)
510     peaks = merge_double_peaks(
511         data, peaks, **_kwargs(kwargs, merge_double_peaks_arguments))
512     return drop_narrow_peaks(
513         peaks, **_kwargs(kwargs, drop_narrow_peaks_arguments))
514
515 find_peaks_arguments = (noise_arguments
516                         + above_noise_arguments
517                         + merge_double_peaks_arguments
518                         + drop_narrow_peaks_arguments)
519 """List :func:`find_peaks`' :class:`~hooke.command.Argument`\s for
520 easy use by plugins in :mod:`~hooke.plugin`.
521 """