86ea7d8099203e6c2428f5ad565a7442fbd43abf
[hooke.git] / hooke / plugin / curve.py
1 # Copyright (C) 2008-2010 Alberto Gomez-Casado
2 #                         Fabrizio Benedetti
3 #                         Massimo Sandal <devicerandom@gmail.com>
4 #                         W. Trevor King <wking@drexel.edu>
5 #
6 # This file is part of Hooke.
7 #
8 # Hooke is free software: you can redistribute it and/or modify it
9 # under the terms of the GNU Lesser General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
12 #
13 # Hooke is distributed in the hope that it will be useful, but WITHOUT
14 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
16 # Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with Hooke.  If not, see
20 # <http://www.gnu.org/licenses/>.
21
22 """The ``curve`` module provides :class:`CurvePlugin` and several
23 associated :class:`hooke.command.Command`\s for handling
24 :mod:`hooke.curve` classes.
25 """
26
27 import copy
28
29 import numpy
30
31 from ..command import Command, Argument, Failure
32 from ..curve import Data
33 from ..plugin import Builtin
34 from ..plugin.playlist import current_playlist_callback
35 from ..util.calculus import derivative
36 from ..util.fft import unitary_avg_power_spectrum
37 from ..util.si import ppSI, join_data_label, split_data_label
38
39
40
41 class CurvePlugin (Builtin):
42     def __init__(self):
43         super(CurvePlugin, self).__init__(name='curve')
44         self._commands = [
45             GetCommand(self), InfoCommand(self), DeltaCommand(self),
46             ExportCommand(self), DifferenceCommand(self),
47             DerivativeCommand(self), PowerSpectrumCommand(self)]
48
49
50 # Define common or complicated arguments
51
52 def current_curve_callback(hooke, command, argument, value):
53     if value != None:
54         return value
55     playlist = current_playlist_callback(hooke, command, argument, value)
56     curve = playlist.current()
57     if curve == None:
58         raise Failure('No curves in %s' % playlist)
59     return curve
60
61 CurveArgument = Argument(
62     name='curve', type='curve', callback=current_curve_callback,
63     help="""
64 :class:`hooke.curve.Curve` to act on.  Defaults to the current curve
65 of the current playlist.
66 """.strip())
67
68
69 # Define commands
70
71 class GetCommand (Command):
72     """Return a :class:`hooke.curve.Curve`.
73     """
74     def __init__(self, plugin):
75         super(GetCommand, self).__init__(
76             name='get curve', arguments=[CurveArgument],
77             help=self.__doc__, plugin=plugin)
78
79     def _run(self, hooke, inqueue, outqueue, params):
80         outqueue.put(params['curve'])
81
82 class InfoCommand (Command):
83     """Get selected information about a :class:`hooke.curve.Curve`.
84     """
85     def __init__(self, plugin):
86         args = [
87             CurveArgument,                    
88             Argument(name='all', type='bool', default=False, count=1,
89                      help='Get all curve information.'),
90             ]
91         self.fields = ['name', 'path', 'experiment', 'driver', 'filetype', 'note',
92                        'blocks', 'block sizes']
93         for field in self.fields:
94             args.append(Argument(
95                     name=field, type='bool', default=False, count=1,
96                     help='Get curve %s' % field))
97         super(InfoCommand, self).__init__(
98             name='curve info', arguments=args,
99             help=self.__doc__, plugin=plugin)
100
101     def _run(self, hooke, inqueue, outqueue, params):
102         fields = {}
103         for key in self.fields:
104             fields[key] = params[key]
105         if reduce(lambda x,y: x and y, fields.values()) == False:
106             params['all'] = True # No specific fields set, default to 'all'
107         if params['all'] == True:
108             for key in self.fields:
109                 fields[key] = True
110         lines = []
111         for key in self.fields:
112             if fields[key] == True:
113                 get = getattr(self, '_get_%s' % key.replace(' ', '_'))
114                 lines.append('%s: %s' % (key, get(params['curve'])))
115         outqueue.put('\n'.join(lines))
116
117     def _get_name(self, curve):
118         return curve.name
119
120     def _get_path(self, curve):
121         return curve.path
122
123     def _get_experiment(self, curve):
124         return curve.info.get('experiment', None)
125
126     def _get_driver(self, curve):
127         return curve.driver
128
129     def _get_filetype(self, curve):
130         return curve.info.get('filetype', None)
131
132     def _get_note(self, curve):
133         return curve.info.get('note', None)
134                               
135     def _get_blocks(self, curve):
136         return len(curve.data)
137
138     def _get_block_sizes(self, curve):
139         return [block.shape for block in curve.data]
140
141
142 class DeltaCommand (Command):
143     """Get distance information between two points.
144
145     With two points A and B, the returned distances are A-B.
146     """
147     def __init__(self, plugin):
148         super(DeltaCommand, self).__init__(
149             name='delta',
150             arguments=[
151                 CurveArgument,
152                 Argument(name='block', type='int', default=0,
153                     help="""
154 Data block that points are selected from.  For an approach/retract
155 force curve, `0` selects the approaching curve and `1` selects the
156 retracting curve.
157 """.strip()),
158                 Argument(name='point', type='point', optional=False, count=2,
159                          help="""
160 Indicies of points bounding the selected data.
161 """.strip()),
162                 Argument(name='SI', type='bool', default=False,
163                          help="""
164 Return distances in SI notation.
165 """.strip())
166                 ],
167             help=self.__doc__, plugin=plugin)
168
169     def _run(self, hooke, inqueue, outqueue, params):
170         data = params['curve'].data[params['block']]
171         As = data[params['point'][0],:]
172         Bs = data[params['point'][1],:]
173         ds = [A-B for A,B in zip(As, Bs)]
174         if params['SI'] == False:
175             out = [(name, d) for name,d in zip(data.info['columns'], ds)]
176         else:
177             out = []
178             for name,d in zip(data.info['columns'], ds):
179                 n,units = split_data_label(name)
180                 out.append(
181                   (n, ppSI(value=d, unit=units, decimals=2)))
182         outqueue.put(out)
183
184
185 class ExportCommand (Command):
186     """Export a :class:`hooke.curve.Curve` data block as TAB-delimeted
187     ASCII text.
188
189     A "#" prefixed header will optionally appear at the beginning of
190     the file naming the columns.
191     """
192     def __init__(self, plugin):
193         super(ExportCommand, self).__init__(
194             name='export block',
195             arguments=[
196                 CurveArgument,
197                 Argument(name='block', type='int', default=0,
198                          help="""
199 Data block to save.  For an approach/retract force curve, `0` selects
200 the approaching curve and `1` selects the retracting curve.
201 """.strip()),
202                 Argument(name='output', type='file', default='curve.dat',
203                          help="""
204 File name for the output data.  Defaults to 'curve.dat'
205 """.strip()),
206                 Argument(name='header', type='bool', default=True,
207                          help="""
208 True if you want the column-naming header line.
209 """.strip()),
210                 ],
211             help=self.__doc__, plugin=plugin)
212
213     def _run(self, hooke, inqueue, outqueue, params):
214         data = params['curve'].data[params['block']]
215
216         f = open(params['output'], 'w')
217         if params['header'] == True:
218             f.write('# %s \n' % ('\t'.join(data.info['columns'])))
219         numpy.savetxt(f, data, delimiter='\t')
220         f.close()
221
222 class DifferenceCommand (Command):
223     """Calculate the difference between two columns of data.
224
225     The difference is added to block A as a new column.
226
227     Note that the command will fail if the columns have different
228     lengths, so be careful when differencing columns from different
229     blocks.
230     """
231     def __init__(self, plugin):
232         super(DifferenceCommand, self).__init__(
233             name='difference',
234             arguments=[
235                 CurveArgument,
236                 Argument(name='block A', type='int',
237                          help="""
238 Block A in A-B.  For an approach/retract force curve, `0` selects the
239 approaching curve and `1` selects the retracting curve.  Defaults to
240 the first block.
241 """.strip()),
242                 Argument(name='block B', type='int',
243                          help="""
244 Block B in A-B.  Defaults to matching `block A`.
245 """.strip()),
246                 Argument(name='column A', type='string',
247                          help="""
248 Column of data from block A to difference.  Defaults to the first column.
249 """.strip()),
250                 Argument(name='column B', type='string', default=1,
251                          help="""
252 Column of data from block B to difference.  Defaults to matching `column A`.
253 """.strip()),
254                 Argument(name='output column name', type='string',
255                          help="""
256 Name of the new column for storing the difference (without units, defaults to
257 `difference of <block A> <column A> and <block B> <column B>`).
258 """.strip()),
259                 ],
260             help=self.__doc__, plugin=plugin)
261
262     def _run(self, hooke, inqueue, outqueue, params):
263         data_A = params['curve'].data[params['block A']]
264         data_B = params['curve'].data[params['block B']]
265         # HACK? rely on params['curve'] being bound to the local hooke
266         # playlist (i.e. not a copy, as you would get by passing a
267         # curve through the queue).  Ugh.  Stupid queues.  As an
268         # alternative, we could pass lookup information through the
269         # queue...
270         new = Data((data_A.shape[0], data_A.shape[1]+1), dtype=data_A.dtype)
271         new.info = copy.deepcopy(data.info)
272         new[:,:-1] = data_A
273
274         a_col = data_A.info['columns'].index(params['column A'])
275         b_col = data_A.info['columns'].index(params['column A'])
276         out = data_A[:,a_col] - data_B[:,b_col]
277
278         a_name,a_units = split_data_label(params['column A'])
279         b_name,b_units = split_data_label(params['column B'])
280         assert a_units == b_units, (
281             'Unit missmatch: %s != %s' % (a_units, b_units))
282         if params['output column name'] == None:
283             params['output column name'] = (
284                 'difference of %s %s and %s %s' % (
285                     block_A.info['name'], params['column A'],
286                     block_B.info['name'], params['column B']))
287         new.info['columns'].append(
288             join_data_label(params['output distance column'], a_units)
289         new[:,-1] = out
290         params['curve'].data[params['block A']] = new
291
292
293 class DerivativeCommand (Command):
294     """Calculate the derivative (actually, the discrete differentiation)
295     of a curve data block.
296
297     See :func:`hooke.util.calculus.derivative` for implementation
298     details.
299     """
300     def __init__(self, plugin):
301         super(DerivativeCommand, self).__init__(
302             name='derivative',
303             arguments=[
304                 CurveArgument,
305                 Argument(name='block', type='int', default=0,
306                          help="""
307 Data block to differentiate.  For an approach/retract force curve, `0`
308 selects the approaching curve and `1` selects the retracting curve.
309 """.strip()),
310                 Argument(name='x column', type='string',
311                          help="""
312 Column of data block to differentiate with respect to.
313 """.strip()),
314                 Argument(name='f column', type='string',
315                          help="""
316 Column of data block to differentiate.
317 """.strip()),
318                 Argument(name='weights', type='dict', default={-1:-0.5, 1:0.5},
319                          help="""
320 Weighting scheme dictionary for finite differencing.  Defaults to
321 central differencing.
322 """.strip()),
323                 Argument(name='output column name', type='string',
324                          help="""
325 Name of the new column for storing the derivative (without units, defaults to
326 `derivative of <f column name> with respect to <x column name>`).
327 """.strip()),
328                 ],
329             help=self.__doc__, plugin=plugin)
330
331     def _run(self, hooke, inqueue, outqueue, params):
332         data = params['curve'].data[params['block']]
333         # HACK? rely on params['curve'] being bound to the local hooke
334         # playlist (i.e. not a copy, as you would get by passing a
335         # curve through the queue).  Ugh.  Stupid queues.  As an
336         # alternative, we could pass lookup information through the
337         # queue...
338         new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
339         new.info = copy.deepcopy(data.info)
340         new[:,:-1] = data
341
342         x_col = data.info['columns'].index(params['x column'])
343         f_col = data.info['columns'].index(params['f column'])
344         d = derivative(
345             block, x_col=x_col, f_col=f_col, weights=params['weights'])
346
347         x_name,x_units = split_data_label(params['x column'])
348         f_name,f_units = split_data_label(params['f column'])
349         if params['output column name'] == None:
350             params['output column name'] = (
351                 'derivative of %s with respect to %s' % (
352                     params['f column'], params['x column']))
353
354         new.info['columns'].append(
355             join_data_label(params['output distance column'],
356                             '%s/%s' % (f_units/x_units)))
357         new[:,-1] = d[:,1]
358         params['curve'].data[params['block']] = new
359
360
361 class PowerSpectrumCommand (Command):
362     """Calculate the power spectrum of a data block.
363     """
364     def __init__(self, plugin):
365         super(PowerSpectrumCommand, self).__init__(
366             name='power spectrum',
367             arguments=[
368                 CurveArgument,
369                 Argument(name='block', type='int', default=0,
370                          help="""
371 Data block to act on.  For an approach/retract force curve, `0`
372 selects the approaching curve and `1` selects the retracting curve.
373 """.strip()),
374                 Argument(name='column', type='string', optional=False,
375                          help="""
376 Name of the data block column containing to-be-transformed data.
377 """.strip()),
378                 Argument(name='bounds', type='point', optional=True, count=2,
379                          help="""
380 Indicies of points bounding the selected data.
381 """.strip()),
382                 Argument(name='freq', type='float', default=1.0,
383                          help="""
384 Sampling frequency.
385 """.strip()),
386                 Argument(name='freq units', type='string', default='Hz',
387                          help="""
388 Units for the sampling frequency.
389 """.strip()),
390                 Argument(name='chunk size', type='int', default=2048,
391                          help="""
392 Number of samples per chunk.  Use a power of two.
393 """.strip()),
394                 Argument(name='overlap', type='bool', default=False,
395                          help="""
396 If `True`, each chunk overlaps the previous chunk by half its length.
397 Otherwise, the chunks are end-to-end, and not overlapping.
398 """.strip()),
399                 Argument(name='output block name', type='string',
400                          help="""
401 Name of the new data block for storing the power spectrum (defaults to
402 `power spectrum of <source block name> <source column name>`).
403 """.strip()),
404                 ],
405             help=self.__doc__, plugin=plugin)
406
407     def _run(self, hooke, inqueue, outqueue, params):
408         data = params['curve'].data[params['block']]
409         col = data.info['columns'].index(params['column'])
410         d = data[:,col]
411         if bounds != None:
412             d = d[params['bounds'][0]:params['bounds'][1]]
413         freq_axis,power = unitary_avg_power_spectrum(
414             d, freq=params['freq'],
415             chunk_size=params['chunk size'],
416             overlap=params['overlap'])
417
418         name,data_units = split_data_label(params['column'])
419         b = Data((len(freq_axis),2), dtype=data.dtype)
420         if params['output block name'] == None:
421             params['output block name'] = 'power spectrum of %s %s' % (
422               params['output block name'], data.info['name'], params['column'])
423         b.info['name'] = params['output block name']
424         b.info['columns'] = [
425             join_data_label('frequency axis', params['freq units']),
426             join_data_label('power density',
427                             '%s^2/%s' % (data_units, params['freq units'])),
428             ]
429         b[:,0] = freq_axis
430         b[:,1] = power
431         params['curve'].data.append(b)
432         outqueue.put(b)