a3aa16ef525caa52449581be290e0985f5a76704
[hooke.git] / hooke / util / peak.py
1 # Copyright (C) 2007-2012 Fabrizio Benedetti <fabrizio.benedetti.82@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it under the
7 # terms of the GNU Lesser General Public License as published by the Free
8 # Software Foundation, either version 3 of the License, or (at your option) any
9 # later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT ANY
12 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
13 # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
14 # details.
15 #
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with Hooke.  If not, see <http://www.gnu.org/licenses/>.
18
19 """Utility functions for spotting peaks in signal data.
20 """
21
22 from math import floor
23
24 import numpy
25
26 from ..command import Argument
27
28
29 class Peak (object):
30     """A peak (signal spike) instance.
31
32     Peaks are a continuous series of signal data points that exceed
33     some threshold.
34
35     Parameters
36     ----------
37     name : string
38         Short comment explaining peak selection algorithm.
39     index : int
40         Offset of the first peak point in the original signal data.
41     values : array
42         Signal data values for each peak point.
43     """
44     def __init__(self, name='peak', index=0, values=[]):
45         self.name = name
46         self.index = index
47         self.values = values
48
49     def __str__(self):
50         return '<%s %s %d:%d>' % (self.__class__.__name__, self.name,
51                                   self.index, self.post_index())
52
53     def __repr__(self):
54         return '<%s %s %d %s>' % (self.__class__.__name__, self.name,
55                                   self.index, self.values)
56
57     def post_index(self):
58         """The index *after* the end of the peak.
59
60         Examples
61         --------
62
63         >>> p = Peak(index=5, values=numpy.arange(3))
64
65         The peak consists of indicies 5, 6, and 7.o
66
67         >>> p.post_index()
68         8
69         """
70         return self.index + len(self.values)
71
72 def mask_to_peaks(data, mask, name='peak'):
73     """Convert a mask array to a list of :class:`Peak`\s.
74
75     Parameters
76     ----------
77     data : array
78         Input data.
79     mask : array of booleans
80         `False` when data is noise, `True` for peaks.
81
82     Returns
83     -------
84     peaks : list of :mod:`Peak`\s.
85         A :mod:`Peak` instances for each continuous peak block.
86
87     See Also
88     --------
89     peaks_to_mask
90
91     Examples
92     --------
93
94     >>> data = numpy.arange(-10, -1)
95     >>> mask = numpy.zeros(data.shape, dtype=numpy.bool)
96     >>> mask[:2] = True
97     >>> mask[5:8] = True
98     >>> mask_to_peaks(data, mask, 'special points')
99     [<Peak special points 0 [-10  -9]>, <Peak special points 5 [-5 -4 -3]>]
100     """
101     peaks = []
102     i = 0
103     while i < len(mask):
104         if mask[i] == True: # masked peak point
105             start_i = i
106             while i < len(mask) and mask[i] == True:
107                 i += 1
108             post_i = i
109             peaks.append(Peak(
110                     name=name, index=start_i, values=data[start_i:post_i]))
111         else:
112             i += 1
113     return peaks
114
115 def peaks_to_mask(data, peaks):
116     """Convert a list of :class:`Peak`\s to a mask array.
117
118     Parameters
119     ----------
120     data : array
121         Input data.
122     peaks : list of :mod:`Peak`\s.
123         A :mod:`Peak` instances for each continuous peak block.
124
125     Returns
126     -------
127     mask : array of booleans
128         `False` when data is noise, `True` for peaks.
129
130     See Also
131     --------
132     mask_to_peaks
133
134     Examples
135     --------
136
137     >>> data = numpy.arange(-10, -1)
138     >>> mask = numpy.ones(data.shape, dtype=numpy.bool)
139     >>> mask[:2] = False
140     >>> mask[5:8] = False
141     >>> peaks = mask_to_peaks(data, mask, 'special points')
142     >>> mask2 = peaks_to_mask(data, peaks)
143     >>> min(mask == mask2)
144     True
145     """
146     mask = numpy.zeros(data.shape, dtype=numpy.bool)
147     for peak in peaks:
148         mask[peak.index:peak.post_index()] = True
149     return mask
150
151 def noise(data, cut_side='both', stable=0.005, max_cut=0.2):
152     """Find the portion of `data` that is "noise".
153
154     Parameters
155     ----------
156     data : array
157         Input signal.
158     cut_side : {'both', 'positive', 'negative'}
159         Side of the curve to cut from.
160     stable : float
161         Convergence threshold (stop if, when cutting more points, the
162         relative change in the standard deviation is less than
163         `stable`).
164     max_cut : float
165         Maximum fraction (0 to 1) of points to cut to evaluate noise,
166         even if it doesn't converge (to avoid "eating" all the noise).
167
168     Returns
169     -------
170     mask : array
171         :math:`mask[i] == True` if `i` indexes a noise portion of
172         `data`.  It is `False` otherwise.
173     mean : float
174         The most recently calculated mean.
175     std : float
176         The most recently calculated standard deviation.
177     converged : {True, False}
178         Whether the algorithm converged.
179
180     Notes
181     -----
182
183     Algorithm:
184
185     1) Calculate the mean and standard deviation of `data`
186     2) Remove the most extreme point (from `cut_side`).
187     3) Calclate mean and standard deviation of the remaining data.
188     4) If the relative change in standard deviation is less than
189       `stable` or the new standard deviation is zero, return with
190       `converged` set to `True`.
191     5) If `max cut` of the original number of points have been cut,
192        return with `converged` set to `False`.
193     6) Return to step (2)
194
195     The implementation relies on :meth:`numpy.ndarray.argmax`.
196
197     Examples
198     --------
199
200     For our test data, use a step decrease.
201
202     >>> data = numpy.zeros((50,), dtype=numpy.float)
203     >>> data[:5] = 1.0
204     >>> mask,mean,std,converged = noise(data, cut_side='positive')
205
206     The standard deviation will decrease as long as you're removing
207     non-zero points (however, the relative change in the standard
208     deviation increases), so we expect the first 10 values to be
209     masked.
210
211     >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
212     >>> expected_mask[:5] = False
213     >>> min(mask == expected_mask)
214     True
215
216     When the remaining points are all zero, the mean and standard
217     deviation are both zero.  So the algorithm exits with successful
218     convergence.
219
220     >>> mean
221     0.0
222     >>> std
223     0.0
224     >>> converged
225     True
226
227     If we had mad more of the initial points 1, the algorithm would
228     be limited by `max_cut` and not converge:
229
230     >>> max_cut = 0.2
231     >>> data[:2*max_cut*len(data)] = 1.0
232     >>> mask,mean,std,converged = noise(
233     ...     data, cut_side='positive', max_cut=max_cut)
234     >>> expected_mask = numpy.ones(data.shape, dtype=numpy.bool)
235     >>> expected_mask[:max_cut*len(data)] = False
236     >>> min(mask == expected_mask)
237     True
238     >>> converged
239     False
240     """
241     mask = numpy.ones(data.shape, dtype=numpy.bool)
242     masked_mean = data.mean()
243     masked_std = data.std()
244
245     if cut_side == 'both':
246         def new_mask(data, mask, masked_mean):
247             mask[(numpy.absolute(data-masked_mean)*mask).argmax()] = 0
248             return mask
249     elif cut_side == 'positive':
250         def new_mask(data, mask, masked_mean):
251             mask[(data*mask).argmax()] = 0
252             return mask
253     elif cut_side == 'negative':
254         def new_mask(data, mask, masked_mean):
255             mask[(data*mask).argmin()] = 0
256             return mask
257     else:
258         raise ValueError(cut_side)
259
260     num_cuts = 0
261     max_cuts = min(int(floor(max_cut * len(data))), len(data))
262     while num_cuts < max_cuts:
263         mask = new_mask(data, mask, masked_mean)
264         num_cuts += 1
265         new_masked_mean = (data*mask).mean()
266         new_masked_std = (data*mask).std()
267         rel_std = (masked_std-new_masked_std) / new_masked_std # >= 0
268         if rel_std < stable or new_masked_std == 0:
269             return (mask, new_masked_mean, new_masked_std, True)
270         masked_mean = new_masked_mean
271         masked_std = new_masked_std
272     return (mask, masked_mean, masked_std, False)
273
274 noise_arguments = [
275     Argument('cut side', type='string', default='both',
276              help="""
277 Select the side of the curve to cut from.  `positive`, `negative`, or
278 `both`.
279 """.strip()),
280     Argument('stable', type='float', default=0.005, help="""
281 Convergence threshold (stop if, when cutting more points, the relative
282 change in the standard deviation is less than `stable`).
283 """.strip()),
284     Argument('max cut', type='float', default=0.2, help="""
285 The maximum fraction (0 to 1) of points to cut to evaluate noise, even
286 if it doesn't converge (to avoid "eating" all the noise).
287 """.strip()),
288     ]
289 """List :func:`noise`'s :class:`~hooke.command.Argument`\s for easy use
290 by plugins in :mod:`~hooke.plugin`.
291 """
292
293 def above_noise(data, side='both', min_deviations=5.0, mean=None, std=None):
294     """Find locations where `data` is far from the `mean`.
295
296     Parameters
297     ----------
298     data : array
299         Input data.
300     side : {'both', 'positive', 'negative'}
301         Side of the curve that can be "above" the noise.
302     min_deviations : float
303         Number of standard deviations beyond the mean to define
304         "above" the noise.  Increase to tighten the filter.
305     mean : float
306         The mean of the input data's background noise.
307     std : float
308         The standard deviation of the input data's background noise.
309
310     Returns
311     -------
312     mask : array of booleans
313         `True` when data is beyond the threshold, otherwise `False`.
314
315     Notes
316     -----
317     If `mean` and `std` are None, they are calculted using
318     the respective `data` methods.
319
320     Examples
321     --------
322
323     >>> data = numpy.arange(-3, 4)
324     >>> above_noise(data, side='both', min_deviations=1.1, mean=0, std=1.0)
325     array([ True,  True, False, False, False,  True,  True], dtype=bool)
326     >>> above_noise(data, side='positive', min_deviations=1.1, mean=0, std=1.0)
327     array([False, False, False, False, False,  True,  True], dtype=bool)
328     >>> above_noise(data, side='negative', min_deviations=1.1, mean=0, std=1.0)
329     array([ True,  True, False, False, False, False, False], dtype=bool)
330     """
331     if mean == None:
332         mean = data.mean()
333     if std == None:
334         std = data.std()
335     if side == 'negative':
336         data = -data
337         mean = -mean
338     elif side == 'both':
339         data = numpy.absolute(data-mean)
340         mean = 0
341     return data > (min_deviations * std)
342
343 above_noise_arguments = [
344     Argument('side', type='string', default='both',
345              help="""
346 Select the side of the curve that counts as "above".  `positive`,
347 `negative`, or `both`.
348 """.strip()),
349     Argument('min deviations', type='float', default=5.0, help="""
350 Number of standard deviations above the noise to define a peak.
351 Increase to tighten the filter.
352 """.strip()),
353     ]
354 """List :func:`above_noise`' :class:`~hooke.command.Argument`\s for
355 easy use by plugins in :mod:`~hooke.plugin`.
356 """
357
358 def merge_double_peaks(data, peaks, see_double=10):
359     """Merge peaks that are "too close" together.
360
361     Parameters
362     ----------
363     data : array
364         Input data.
365     peaks : list of :mod:`Peak`\s.
366         A :mod:`Peak` instances for each continuous peak block.
367     see_double : int
368         If two peaks are separated by less than `see double` points,
369         count them (and the intervening data) as a single peak.
370
371     Returns
372     -------
373     peaks : list of :mod:`Peak`\s.
374         The modified list of :mod:`Peak`\s.
375
376     Examples
377     --------
378
379     >>> data = numpy.arange(150)
380     >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
381     ...          Peak(name='b', index=15, values=data[15:18]),
382     ...          Peak(name='c', index=23, values=data[23:27]),
383     ...          Peak(name='d', index=100, values=data[100:101])]
384     >>> peaks = merge_double_peaks(data, peaks, see_double=10)
385     >>> print '\\n'.join([str(p) for p in peaks])
386     <Peak a 10:27>
387     <Peak d 100:101>
388     >>> min(peaks[0].values == data[10:27])
389     True
390     """
391     i = 0
392     while i < len(peaks)-1:
393         peak = peaks[i]
394         next_peak = peaks[i+1]
395         if next_peak.index - peak.post_index() > see_double: # far enough apart
396             i += 1 # move on to the next peak
397         else: # too close.  merge the peaks
398             peaks[i] = Peak(name=peak.name, index=peak.index,
399                             values=data[peak.index:next_peak.post_index()])
400             peaks.pop(i+1)
401     return peaks
402
403 merge_double_peaks_arguments = [
404     Argument('see double', type='int', default=10, help="""
405 If two peaks are separated by less than `see double` points, count
406 them as a single peak.
407 """.strip()),
408     ]
409 """List :func:`merge_double_peaks`' :class:`~hooke.command.Argument`\s
410 for easy use by plugins in :mod:`~hooke.plugin`.
411 """
412
413 def drop_narrow_peaks(peaks, min_points=1):
414     """Drop peaks that are "too narrow".
415
416     Parameters
417     ----------
418     peaks : list of :mod:`Peak`\s.
419         A :mod:`Peak` instances for each continuous peak block.
420     min_points : int
421         Minimum number of points for :class:`Peak` acceptance.
422
423     Returns
424     -------
425     peaks : list of :mod:`Peak`\s.
426         The modified list of :mod:`Peak`\s.
427
428     Examples
429     --------
430
431     >>> data = numpy.arange(150)
432     >>> peaks = [Peak(name='a', index=10, values=data[10:12]),
433     ...          Peak(name='b', index=15, values=data[15:18]),
434     ...          Peak(name='c', index=23, values=data[23:27]),
435     ...          Peak(name='d', index=100, values=data[100:101])]
436     >>> peaks = drop_narrow_peaks(peaks, min_points=3)
437     >>> print '\\n'.join([str(p) for p in peaks])
438     <Peak b 15:18>
439     <Peak c 23:27>
440     """
441     return [peak for peak in peaks if len(peak.values) >= min_points]
442
443 drop_narrow_peaks_arguments = [
444     Argument('min points', type='int', default=1, help="""
445 Minimum number of "feature" points for peak acceptance.
446 """.strip()),
447     ]
448 """List :func:`drop_narrow_peaks`' :class:`~hooke.command.Argument`\s
449 for easy use by plugins in :mod:`~hooke.plugin`.
450 """
451
452 def _kwargs(kwargs, arguments, translations={}, argument_input_keys=False):
453     """Split off kwargs for the arguments listed in arguments.
454
455     Also add the kwargs marked in `translations`.
456
457     Examples
458     --------
459
460     >>> import pprint
461     >>> kwargs = {'param_a':1, 'param_b':2, 'param_c':3}
462     >>> args = [Argument(name='param a')]
463     >>> translations = {'the_big_c_param':'param_c'}
464     >>> pprint.pprint(_kwargs(kwargs, args, translations,
465     ...                       argument_input_keys=False))
466     {'param_a': 1, 'the_big_c_param': 3}
467     >>> pprint.pprint(_kwargs(kwargs, args, translations,
468     ...                       argument_input_keys=True))
469     {'the_big_c_param': 3}
470     >>> kwargs = {'param a':1, 'param b':2, 'param c':3}
471     >>> translations = {'the_big_c_param':'param c'}
472     >>> pprint.pprint(_kwargs(kwargs, args, translations,
473     ...                       argument_input_keys=True))
474     {'param_a': 1, 'the_big_c_param': 3}
475     """
476     arg_keys = [arg.name for arg in arguments]
477     keys = [arg.name.replace(' ', '_') for arg in arguments]
478     ret = {}
479     for arg_key,key in zip(arg_keys, keys):
480         in_key = key
481         if argument_input_keys == True:
482             in_key = arg_key
483         if in_key in kwargs:
484             ret[key] = kwargs[in_key]
485     for target_key,source_key in translations.items():
486         if source_key in kwargs:
487             ret[target_key] = kwargs[source_key]
488     return ret
489
490 def find_peaks(data, **kwargs):
491     """Catch all peak finder.
492
493     Runs in succession:
494
495     1) :func:`noise`, to determine the standard deviation of the noise
496       in `data`.
497     2) :func:`above_noise` to select the regions of `data` that are
498       "above" the noise.
499     3) :func:`merge_double_peaks`
500     4) :func:`drop_narrow_peaks`
501
502     The input parameters may be any accepted by the above functions.
503     """
504     mask,mean,std,converged = noise(data, **_kwargs(kwargs, noise_arguments))
505     mask = above_noise(data, mean=mean, std=std,
506                        **_kwargs(kwargs, above_noise_arguments))
507     peaks = mask_to_peaks(data, mask)
508     peaks = merge_double_peaks(
509         data, peaks, **_kwargs(kwargs, merge_double_peaks_arguments))
510     return drop_narrow_peaks(
511         peaks, **_kwargs(kwargs, drop_narrow_peaks_arguments))
512
513 find_peaks_arguments = (noise_arguments
514                         + above_noise_arguments
515                         + merge_double_peaks_arguments
516                         + drop_narrow_peaks_arguments)
517 """List :func:`find_peaks`' :class:`~hooke.command.Argument`\s for
518 easy use by plugins in :mod:`~hooke.plugin`.
519 """