5c13ba741a4e325720bbaa9641c555b7a64cf325
[hooke.git] / hooke / plugin / curve.py
1 # Copyright (C) 2008-2010 Alberto Gomez-Casado
2 #                         Fabrizio Benedetti
3 #                         Massimo Sandal <devicerandom@gmail.com>
4 #                         W. Trevor King <wking@drexel.edu>
5 #
6 # This file is part of Hooke.
7 #
8 # Hooke is free software: you can redistribute it and/or modify it
9 # under the terms of the GNU Lesser General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
12 #
13 # Hooke is distributed in the hope that it will be useful, but WITHOUT
14 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
16 # Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with Hooke.  If not, see
20 # <http://www.gnu.org/licenses/>.
21
22 """The ``curve`` module provides :class:`CurvePlugin` and several
23 associated :class:`hooke.command.Command`\s for handling
24 :mod:`hooke.curve` classes.
25 """
26
27 import copy
28
29 import numpy
30
31 from ..command import Command, Argument, Failure
32 from ..curve import Data
33 from ..plugin import Builtin
34 from ..plugin.playlist import current_playlist_callback
35 from ..util.calculus import derivative
36 from ..util.fft import unitary_avg_power_spectrum
37 from ..util.si import ppSI, join_data_label, split_data_label
38
39
40
41 class CurvePlugin (Builtin):
42     def __init__(self):
43         super(CurvePlugin, self).__init__(name='curve')
44         self._commands = [
45             GetCommand(self), InfoCommand(self), DeltaCommand(self),
46             ExportCommand(self), DifferenceCommand(self),
47             DerivativeCommand(self), PowerSpectrumCommand(self)]
48
49
50 # Define common or complicated arguments
51
52 def current_curve_callback(hooke, command, argument, value):
53     if value != None:
54         return value
55     playlist = current_playlist_callback(hooke, command, argument, value)
56     curve = playlist.current()
57     if curve == None:
58         raise Failure('No curves in %s' % playlist)
59     return curve
60
61 CurveArgument = Argument(
62     name='curve', type='curve', callback=current_curve_callback,
63     help="""
64 :class:`hooke.curve.Curve` to act on.  Defaults to the current curve
65 of the current playlist.
66 """.strip())
67
68
69 # Define commands
70
71 class GetCommand (Command):
72     """Return a :class:`hooke.curve.Curve`.
73     """
74     def __init__(self, plugin):
75         super(GetCommand, self).__init__(
76             name='get curve', arguments=[CurveArgument],
77             help=self.__doc__, plugin=plugin)
78
79     def _run(self, hooke, inqueue, outqueue, params):
80         outqueue.put(params['curve'])
81
82 class InfoCommand (Command):
83     """Get selected information about a :class:`hooke.curve.Curve`.
84     """
85     def __init__(self, plugin):
86         args = [
87             CurveArgument,                    
88             Argument(name='all', type='bool', default=False, count=1,
89                      help='Get all curve information.'),
90             ]
91         self.fields = ['name', 'path', 'experiment', 'driver', 'filetype', 'note',
92                        'blocks', 'block sizes']
93         for field in self.fields:
94             args.append(Argument(
95                     name=field, type='bool', default=False, count=1,
96                     help='Get curve %s' % field))
97         super(InfoCommand, self).__init__(
98             name='curve info', arguments=args,
99             help=self.__doc__, plugin=plugin)
100
101     def _run(self, hooke, inqueue, outqueue, params):
102         fields = {}
103         for key in self.fields:
104             fields[key] = params[key]
105         if reduce(lambda x,y: x and y, fields.values()) == False:
106             params['all'] = True # No specific fields set, default to 'all'
107         if params['all'] == True:
108             for key in self.fields:
109                 fields[key] = True
110         lines = []
111         for key in self.fields:
112             if fields[key] == True:
113                 get = getattr(self, '_get_%s' % key.replace(' ', '_'))
114                 lines.append('%s: %s' % (key, get(params['curve'])))
115         outqueue.put('\n'.join(lines))
116
117     def _get_name(self, curve):
118         return curve.name
119
120     def _get_path(self, curve):
121         return curve.path
122
123     def _get_experiment(self, curve):
124         return curve.info.get('experiment', None)
125
126     def _get_driver(self, curve):
127         return curve.driver
128
129     def _get_filetype(self, curve):
130         return curve.info.get('filetype', None)
131
132     def _get_note(self, curve):
133         return curve.info.get('note', None)
134                               
135     def _get_blocks(self, curve):
136         return len(curve.data)
137
138     def _get_block_sizes(self, curve):
139         return [block.shape for block in curve.data]
140
141
142 class DeltaCommand (Command):
143     """Get distance information between two points.
144
145     With two points A and B, the returned distances are A-B.
146     """
147     def __init__(self, plugin):
148         super(DeltaCommand, self).__init__(
149             name='delta',
150             arguments=[
151                 CurveArgument,
152                 Argument(name='block', type='int', default=0,
153                     help="""
154 Data block that points are selected from.  For an approach/retract
155 force curve, `0` selects the approaching curve and `1` selects the
156 retracting curve.
157 """.strip()),
158                 Argument(name='point', type='point', optional=False, count=2,
159                          help="""
160 Indicies of points bounding the selected data.
161 """.strip()),
162                 Argument(name='SI', type='bool', default=False,
163                          help="""
164 Return distances in SI notation.
165 """.strip())
166                 ],
167             help=self.__doc__, plugin=plugin)
168
169     def _run(self, hooke, inqueue, outqueue, params):
170         data = params['curve'].data[params['block']]
171         As = data[params['point'][0],:]
172         Bs = data[params['point'][1],:]
173         ds = [A-B for A,B in zip(As, Bs)]
174         if params['SI'] == False:
175             out = [(name, d) for name,d in zip(data.info['columns'], ds)]
176         else:
177             out = []
178             for name,d in zip(data.info['columns'], ds):
179                 n,units = split_data_label(name)
180                 out.append(
181                   (n, ppSI(value=d, unit=units, decimals=2)))
182         outqueue.put(out)
183
184
185 class ExportCommand (Command):
186     """Export a :class:`hooke.curve.Curve` data block as TAB-delimeted
187     ASCII text.
188
189     A "#" prefixed header will optionally appear at the beginning of
190     the file naming the columns.
191     """
192     def __init__(self, plugin):
193         super(ExportCommand, self).__init__(
194             name='export block',
195             arguments=[
196                 CurveArgument,
197                 Argument(name='block', aliases=['set'], type='int', default=0,
198                          help="""
199 Data block to save.  For an approach/retract force curve, `0` selects
200 the approaching curve and `1` selects the retracting curve.
201 """.strip()),
202                 Argument(name='output', type='file', default='curve.dat',
203                          help="""
204 File name for the output data.  Defaults to 'curve.dat'
205 """.strip()),
206                 Argument(name='header', type='bool', default=True,
207                          help="""
208 True if you want the column-naming header line.
209 """.strip()),
210                 ],
211             help=self.__doc__, plugin=plugin)
212
213     def _run(self, hooke, inqueue, outqueue, params):
214         data = params['curve'].data[params['block']]
215
216         f = open(params['output'], 'w')
217         if params['header'] == True:
218             f.write('# %s \n' % ('\t'.join(data.info['columns'])))
219         numpy.savetxt(f, data, delimiter='\t')
220         f.close()
221
222 class DifferenceCommand (Command):
223     """Calculate the difference between two blocks of data.
224     """
225     def __init__(self, plugin):
226         super(DifferenceCommand, self).__init__(
227             name='block difference',
228             arguments=[
229                 CurveArgument,
230                 Argument(name='block one', aliases=['set one'], type='int',
231                          default=1,
232                          help="""
233 Block A in A-B.  For an approach/retract force curve, `0` selects the
234 approaching curve and `1` selects the retracting curve.
235 """.strip()),
236                 Argument(name='block two', aliases=['set two'], type='int',
237                          default=0,
238                          help='Block B in A-B.'),
239                 Argument(name='x column', type='int', default=0,
240                          help="""
241 Column of data to return as x values.
242 """.strip()),
243                 Argument(name='y column', type='int', default=1,
244                          help="""
245 Column of data block to difference.
246 """.strip()),
247                 ],
248             help=self.__doc__, plugin=plugin)
249
250     def _run(self, hooke, inqueue, outqueue, params):
251         a = params['curve'].data[params['block one']]
252         b = params['curve'].data[params['block two']]
253         assert a[:,params['x column']] == b[:,params['x column']]
254         out = Data((a.shape[0],2), dtype=a.dtype)
255         out[:,0] = a[:,params['x column']]
256         out[:,1] = a[:,params['y column']] - b[:,params['y column']]
257         outqueue.put(out)
258
259 class DerivativeCommand (Command):
260     """Calculate the derivative (actually, the discrete differentiation)
261     of a curve data block.
262
263     See :func:`hooke.util.calculus.derivative` for implementation
264     details.
265     """
266     def __init__(self, plugin):
267         super(DerivativeCommand, self).__init__(
268             name='block derivative',
269             arguments=[
270                 CurveArgument,
271                 Argument(name='block', aliases=['set'], type='int', default=0,
272                          help="""
273 Data block to differentiate.  For an approach/retract force curve, `0`
274 selects the approaching curve and `1` selects the retracting curve.
275 """.strip()),
276                 Argument(name='x column', type='string',
277                          help="""
278 Column of data block to differentiate with respect to.
279 """.strip()),
280                 Argument(name='f column', type='string',
281                          help="""
282 Column of data block to differentiate.
283 """.strip()),
284                 Argument(name='weights', type='dict', default={-1:-0.5, 1:0.5},
285                          help="""
286 Weighting scheme dictionary for finite differencing.  Defaults to
287 central differencing.
288 """.strip()),
289                 Argument(name='output column name', type='string',
290                          help="""
291 Name of the new column for storing the derivative (without units, defaults to `derivative of <f column name> with respect to <x column name>`).
292 """.strip()),
293                 ],
294             help=self.__doc__, plugin=plugin)
295
296     def _run(self, hooke, inqueue, outqueue, params):
297         data = params['curve'].data[params['block']]
298         # HACK? rely on params['curve'] being bound to the local hooke
299         # playlist (i.e. not a copy, as you would get by passing a
300         # curve through the queue).  Ugh.  Stupid queues.  As an
301         # alternative, we could pass lookup information through the
302         # queue...
303         new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
304         new.info = copy.deepcopy(data.info)
305         new[:,:-1] = data
306
307         x_col = data.info['columns'].index(params['x column'])
308         f_col = data.info['columns'].index(params['f column'])
309         d = derivative(
310             block, x_col=x_col, f_col=f_col, weights=params['weights'])
311
312         x_name,x_units = split_data_label(params['x column'])
313         f_name,f_units = split_data_label(params['f column'])
314         if params['output column name'] == None:
315             params['output column name'] = (
316                 'derivative of %s with respect to %s' % (
317                     params['f column'], params['x column']))
318
319         new.info['columns'].append(
320             join_data_label(params['output distance column'],
321                             '%s/%s' % (f_units/x_units)))
322         new[:,-1] = d[:,1]
323         params['curve'].data[params['block']] = new
324
325
326 class PowerSpectrumCommand (Command):
327     """Calculate the power spectrum of a data block.
328     """
329     def __init__(self, plugin):
330         super(PowerSpectrumCommand, self).__init__(
331             name='block power spectrum',
332             arguments=[
333                 CurveArgument,
334                 Argument(name='block', aliases=['set'], type='int', default=0,
335                          help="""
336 Data block to act on.  For an approach/retract force curve, `0`
337 selects the approaching curve and `1` selects the retracting curve.
338 """.strip()),
339                 Argument(name='column', type='string', optional=False,
340                          help="""
341 Name of the data block column containing to-be-transformed data.
342 """.strip()),
343                 Argument(name='bounds', type='point', optional=True, count=2,
344                          help="""
345 Indicies of points bounding the selected data.
346 """.strip()),
347                 Argument(name='freq', type='float', default=1.0,
348                          help="""
349 Sampling frequency.
350 """.strip()),
351                 Argument(name='freq units', type='string', default='Hz',
352                          help="""
353 Units for the sampling frequency.
354 """.strip()),
355                 Argument(name='chunk size', type='int', default=2048,
356                          help="""
357 Number of samples per chunk.  Use a power of two.
358 """.strip()),
359                 Argument(name='overlap', type='bool', default=False,
360                          help="""
361 If `True`, each chunk overlaps the previous chunk by half its length.
362 Otherwise, the chunks are end-to-end, and not overlapping.
363 """.strip()),
364                 Argument(name='output block name', type='string',
365                          help="""
366 Name of the new data block (defaults to `power spectrum of <source block name> <source column name>`) for storing the power spectrum.
367 """.strip()),
368                 ],
369             help=self.__doc__, plugin=plugin)
370
371     def _run(self, hooke, inqueue, outqueue, params):
372         data = params['curve'].data[params['block']]
373         col = data.info['columns'].index(params['column'])
374         d = data[:,col]
375         if bounds != None:
376             d = d[params['bounds'][0]:params['bounds'][1]]
377         freq_axis,power = unitary_avg_power_spectrum(
378             d, freq=params['freq'],
379             chunk_size=params['chunk size'],
380             overlap=params['overlap'])
381
382         name,data_units = split_data_label(params['column'])
383         b = Data((len(freq_axis),2), dtype=data.dtype)
384         if params['output block name'] == None:
385             params['output block name'] = 'power spectrum of %s %s' % (
386               params['output block name'], data.info['name'], params['column'])
387         b.info['name'] = params['output block name']
388         b.info['columns'] = [
389             join_data_label('frequency axis', params['freq units']),
390             join_data_label('power density',
391                             '%s^2/%s' % (data_units, params['freq units'])),
392             ]
393         b[:,0] = freq_axis
394         b[:,1] = power
395         params['curve'].data.append(b)
396         outqueue.put(b)