1 # Copyright (C) 2008-2010 Alberto Gomez-Casado
3 # Massimo Sandal <devicerandom@gmail.com>
4 # W. Trevor King <wking@drexel.edu>
6 # This file is part of Hooke.
8 # Hooke is free software: you can redistribute it and/or modify it
9 # under the terms of the GNU Lesser General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # Hooke is distributed in the hope that it will be useful, but WITHOUT
14 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
16 # Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with Hooke. If not, see
20 # <http://www.gnu.org/licenses/>.
22 """The ``curve`` module provides :class:`CurvePlugin` and several
23 associated :class:`hooke.command.Command`\s for handling
24 :mod:`hooke.curve` classes.
31 from ..command import Command, Argument, Failure
32 from ..curve import Data
33 from ..plugin import Builtin
34 from ..plugin.playlist import current_playlist_callback
35 from ..util.calculus import derivative
36 from ..util.fft import unitary_avg_power_spectrum
37 from ..util.si import ppSI, join_data_label, split_data_label
41 class CurvePlugin (Builtin):
43 super(CurvePlugin, self).__init__(name='curve')
45 GetCommand(self), InfoCommand(self), DeltaCommand(self),
46 ExportCommand(self), DifferenceCommand(self),
47 DerivativeCommand(self), PowerSpectrumCommand(self)]
50 # Define common or complicated arguments
52 def current_curve_callback(hooke, command, argument, value):
55 playlist = current_playlist_callback(hooke, command, argument, value)
56 curve = playlist.current()
58 raise Failure('No curves in %s' % playlist)
61 CurveArgument = Argument(
62 name='curve', type='curve', callback=current_curve_callback,
64 :class:`hooke.curve.Curve` to act on. Defaults to the current curve
65 of the current playlist.
71 class GetCommand (Command):
72 """Return a :class:`hooke.curve.Curve`.
74 def __init__(self, plugin):
75 super(GetCommand, self).__init__(
76 name='get curve', arguments=[CurveArgument],
77 help=self.__doc__, plugin=plugin)
79 def _run(self, hooke, inqueue, outqueue, params):
80 outqueue.put(params['curve'])
82 class InfoCommand (Command):
83 """Get selected information about a :class:`hooke.curve.Curve`.
85 def __init__(self, plugin):
88 Argument(name='all', type='bool', default=False, count=1,
89 help='Get all curve information.'),
91 self.fields = ['name', 'path', 'experiment', 'driver', 'filetype', 'note',
92 'blocks', 'block sizes']
93 for field in self.fields:
95 name=field, type='bool', default=False, count=1,
96 help='Get curve %s' % field))
97 super(InfoCommand, self).__init__(
98 name='curve info', arguments=args,
99 help=self.__doc__, plugin=plugin)
101 def _run(self, hooke, inqueue, outqueue, params):
103 for key in self.fields:
104 fields[key] = params[key]
105 if reduce(lambda x,y: x and y, fields.values()) == False:
106 params['all'] = True # No specific fields set, default to 'all'
107 if params['all'] == True:
108 for key in self.fields:
111 for key in self.fields:
112 if fields[key] == True:
113 get = getattr(self, '_get_%s' % key.replace(' ', '_'))
114 lines.append('%s: %s' % (key, get(params['curve'])))
115 outqueue.put('\n'.join(lines))
117 def _get_name(self, curve):
120 def _get_path(self, curve):
123 def _get_experiment(self, curve):
124 return curve.info.get('experiment', None)
126 def _get_driver(self, curve):
129 def _get_filetype(self, curve):
130 return curve.info.get('filetype', None)
132 def _get_note(self, curve):
133 return curve.info.get('note', None)
135 def _get_blocks(self, curve):
136 return len(curve.data)
138 def _get_block_sizes(self, curve):
139 return [block.shape for block in curve.data]
142 class DeltaCommand (Command):
143 """Get distance information between two points.
145 With two points A and B, the returned distances are A-B.
147 def __init__(self, plugin):
148 super(DeltaCommand, self).__init__(
152 Argument(name='block', type='int', default=0,
154 Data block that points are selected from. For an approach/retract
155 force curve, `0` selects the approaching curve and `1` selects the
158 Argument(name='point', type='point', optional=False, count=2,
160 Indicies of points bounding the selected data.
162 Argument(name='SI', type='bool', default=False,
164 Return distances in SI notation.
167 help=self.__doc__, plugin=plugin)
169 def _run(self, hooke, inqueue, outqueue, params):
170 data = params['curve'].data[params['block']]
171 As = data[params['point'][0],:]
172 Bs = data[params['point'][1],:]
173 ds = [A-B for A,B in zip(As, Bs)]
174 if params['SI'] == False:
175 out = [(name, d) for name,d in zip(data.info['columns'], ds)]
178 for name,d in zip(data.info['columns'], ds):
179 n,units = split_data_label(name)
181 (n, ppSI(value=d, unit=units, decimals=2)))
185 class ExportCommand (Command):
186 """Export a :class:`hooke.curve.Curve` data block as TAB-delimeted
189 A "#" prefixed header will optionally appear at the beginning of
190 the file naming the columns.
192 def __init__(self, plugin):
193 super(ExportCommand, self).__init__(
197 Argument(name='block', type='int', default=0,
199 Data block to save. For an approach/retract force curve, `0` selects
200 the approaching curve and `1` selects the retracting curve.
202 Argument(name='output', type='file', default='curve.dat',
204 File name for the output data. Defaults to 'curve.dat'
206 Argument(name='header', type='bool', default=True,
208 True if you want the column-naming header line.
211 help=self.__doc__, plugin=plugin)
213 def _run(self, hooke, inqueue, outqueue, params):
214 data = params['curve'].data[params['block']]
216 f = open(params['output'], 'w')
217 if params['header'] == True:
218 f.write('# %s \n' % ('\t'.join(data.info['columns'])))
219 numpy.savetxt(f, data, delimiter='\t')
222 class DifferenceCommand (Command):
223 """Calculate the difference between two columns of data.
225 The difference is added to block A as a new column.
227 Note that the command will fail if the columns have different
228 lengths, so be careful when differencing columns from different
231 def __init__(self, plugin):
232 super(DifferenceCommand, self).__init__(
236 Argument(name='block A', type='int',
238 Block A in A-B. For an approach/retract force curve, `0` selects the
239 approaching curve and `1` selects the retracting curve. Defaults to
242 Argument(name='block B', type='int',
244 Block B in A-B. Defaults to matching `block A`.
246 Argument(name='column A', type='string',
248 Column of data from block A to difference. Defaults to the first column.
250 Argument(name='column B', type='string', default=1,
252 Column of data from block B to difference. Defaults to matching `column A`.
254 Argument(name='output column name', type='string',
256 Name of the new column for storing the difference (without units, defaults to
257 `difference of <block A> <column A> and <block B> <column B>`).
260 help=self.__doc__, plugin=plugin)
262 def _run(self, hooke, inqueue, outqueue, params):
263 data_A = params['curve'].data[params['block A']]
264 data_B = params['curve'].data[params['block B']]
265 # HACK? rely on params['curve'] being bound to the local hooke
266 # playlist (i.e. not a copy, as you would get by passing a
267 # curve through the queue). Ugh. Stupid queues. As an
268 # alternative, we could pass lookup information through the
270 new = Data((data_A.shape[0], data_A.shape[1]+1), dtype=data_A.dtype)
271 new.info = copy.deepcopy(data.info)
274 a_col = data_A.info['columns'].index(params['column A'])
275 b_col = data_A.info['columns'].index(params['column A'])
276 out = data_A[:,a_col] - data_B[:,b_col]
278 a_name,a_units = split_data_label(params['column A'])
279 b_name,b_units = split_data_label(params['column B'])
280 assert a_units == b_units, (
281 'Unit missmatch: %s != %s' % (a_units, b_units))
282 if params['output column name'] == None:
283 params['output column name'] = (
284 'difference of %s %s and %s %s' % (
285 block_A.info['name'], params['column A'],
286 block_B.info['name'], params['column B']))
287 new.info['columns'].append(
288 join_data_label(params['output distance column'], a_units))
290 params['curve'].data[params['block A']] = new
293 class DerivativeCommand (Command):
294 """Calculate the derivative (actually, the discrete differentiation)
295 of a curve data block.
297 See :func:`hooke.util.calculus.derivative` for implementation
300 def __init__(self, plugin):
301 super(DerivativeCommand, self).__init__(
305 Argument(name='block', type='int', default=0,
307 Data block to differentiate. For an approach/retract force curve, `0`
308 selects the approaching curve and `1` selects the retracting curve.
310 Argument(name='x column', type='string',
312 Column of data block to differentiate with respect to.
314 Argument(name='f column', type='string',
316 Column of data block to differentiate.
318 Argument(name='weights', type='dict', default={-1:-0.5, 1:0.5},
320 Weighting scheme dictionary for finite differencing. Defaults to
321 central differencing.
323 Argument(name='output column name', type='string',
325 Name of the new column for storing the derivative (without units, defaults to
326 `derivative of <f column name> with respect to <x column name>`).
329 help=self.__doc__, plugin=plugin)
331 def _run(self, hooke, inqueue, outqueue, params):
332 data = params['curve'].data[params['block']]
333 # HACK? rely on params['curve'] being bound to the local hooke
334 # playlist (i.e. not a copy, as you would get by passing a
335 # curve through the queue). Ugh. Stupid queues. As an
336 # alternative, we could pass lookup information through the
338 new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
339 new.info = copy.deepcopy(data.info)
342 x_col = data.info['columns'].index(params['x column'])
343 f_col = data.info['columns'].index(params['f column'])
345 block, x_col=x_col, f_col=f_col, weights=params['weights'])
347 x_name,x_units = split_data_label(params['x column'])
348 f_name,f_units = split_data_label(params['f column'])
349 if params['output column name'] == None:
350 params['output column name'] = (
351 'derivative of %s with respect to %s' % (
352 params['f column'], params['x column']))
354 new.info['columns'].append(
355 join_data_label(params['output distance column'],
356 '%s/%s' % (f_units/x_units)))
358 params['curve'].data[params['block']] = new
361 class PowerSpectrumCommand (Command):
362 """Calculate the power spectrum of a data block.
364 def __init__(self, plugin):
365 super(PowerSpectrumCommand, self).__init__(
366 name='power spectrum',
369 Argument(name='block', type='int', default=0,
371 Data block to act on. For an approach/retract force curve, `0`
372 selects the approaching curve and `1` selects the retracting curve.
374 Argument(name='column', type='string', optional=False,
376 Name of the data block column containing to-be-transformed data.
378 Argument(name='bounds', type='point', optional=True, count=2,
380 Indicies of points bounding the selected data.
382 Argument(name='freq', type='float', default=1.0,
386 Argument(name='freq units', type='string', default='Hz',
388 Units for the sampling frequency.
390 Argument(name='chunk size', type='int', default=2048,
392 Number of samples per chunk. Use a power of two.
394 Argument(name='overlap', type='bool', default=False,
396 If `True`, each chunk overlaps the previous chunk by half its length.
397 Otherwise, the chunks are end-to-end, and not overlapping.
399 Argument(name='output block name', type='string',
401 Name of the new data block for storing the power spectrum (defaults to
402 `power spectrum of <source block name> <source column name>`).
405 help=self.__doc__, plugin=plugin)
407 def _run(self, hooke, inqueue, outqueue, params):
408 data = params['curve'].data[params['block']]
409 col = data.info['columns'].index(params['column'])
412 d = d[params['bounds'][0]:params['bounds'][1]]
413 freq_axis,power = unitary_avg_power_spectrum(
414 d, freq=params['freq'],
415 chunk_size=params['chunk size'],
416 overlap=params['overlap'])
418 name,data_units = split_data_label(params['column'])
419 b = Data((len(freq_axis),2), dtype=data.dtype)
420 if params['output block name'] == None:
421 params['output block name'] = 'power spectrum of %s %s' % (
422 params['output block name'], data.info['name'], params['column'])
423 b.info['name'] = params['output block name']
424 b.info['columns'] = [
425 join_data_label('frequency axis', params['freq units']),
426 join_data_label('power density',
427 '%s^2/%s' % (data_units, params['freq units'])),
431 params['curve'].data.append(b)
435 class OldCruft (object):
437 def do_forcebase(self,args):
441 Measures the difference in force (in pN) between a point and a baseline
442 took as the average between two points.
444 The baseline is fixed once for a given curve and different force measurements,
445 unless the user wants it to be recalculated
447 Syntax: forcebase [rebase]
448 rebase: Forces forcebase to ask again the baseline
449 max: Instead of asking for a point to measure, asks for two points and use
450 the maximum peak in between
452 rebase=False #if true=we select rebase
453 maxpoint=False #if true=we measure the maximum peak
455 plot=self._get_displayed_plot()
456 whatset=1 #fixme: for all sets
457 if 'rebase' in args or (self.basecurrent != self.current.path):
463 print 'Select baseline'
464 self.basepoints=self._measure_N_points(N=2, whatset=whatset)
465 self.basecurrent=self.current.path
468 print 'Select two points'
469 points=self._measure_N_points(N=2, whatset=whatset)
470 boundpoints=[points[0].index, points[1].index]
473 y=min(plot.vectors[whatset][1][boundpoints[0]:boundpoints[1]])
475 print 'Chosen interval not valid. Try picking it again. Did you pick the same point as begin and end of interval?'
477 print 'Select point to measure'
478 points=self._measure_N_points(N=1, whatset=whatset)
479 #whatplot=points[0].dest
480 y=points[0].graph_coords[1]
482 #fixme: code duplication
483 boundaries=[self.basepoints[0].index, self.basepoints[1].index]
485 to_average=plot.vectors[whatset][1][boundaries[0]:boundaries[1]] #y points to average
487 avg=np.mean(to_average)
489 print str(forcebase*(10**12))+' pN'
490 to_dump='forcebase '+self.current.path+' '+str(forcebase*(10**12))+' pN'
491 self.outlet.push(to_dump)
494 def do_slope(self,args):
498 Measures the slope of a delimited chunk on the return trace.
499 The chunk can be delimited either by two manual clicks, or have
500 a fixed width, given as an argument.
502 Syntax: slope [width]
503 The facultative [width] parameter specifies how many
504 points will be considered for the fit. If [width] is
505 specified, only one click will be required.
506 (c) Marco Brucale, Massimo Sandal 2008
509 # Reads the facultative width argument
515 # Decides between the two forms of user input, as per (args)
517 # Gets the Xs of two clicked points as indexes on the current curve vector
518 print 'Click twice to delimit chunk'
519 points=self._measure_N_points(N=2,whatset=1)
521 print 'Click once on the leftmost point of the chunk (i.e.usually the peak)'
522 points=self._measure_N_points(N=1,whatset=1)
524 slope=self._slope(points,fitspan)
526 # Outputs the relevant slope parameter
529 to_dump='slope '+self.current.path+' '+str(slope)
530 self.outlet.push(to_dump)
532 def _slope(self,points,fitspan):
533 # Calls the function linefit_between
534 parameters=[0,0,[],[]]
536 clickedpoints=[points[0].index,points[1].index]
539 clickedpoints=[points[0].index-fitspan,points[0].index]
542 parameters=self.linefit_between(clickedpoints[0],clickedpoints[1])
544 print 'Cannot fit. Did you click twice the same point?'
547 # Outputs the relevant slope parameter
549 print str(parameters[0])
550 to_dump='slope '+self.curve.path+' '+str(parameters[0])
551 self.outlet.push(to_dump)
553 # Makes a vector with the fitted parameters and sends it to the GUI
554 xtoplot=parameters[2]
558 ytoplot.append((x*parameters[0])+parameters[1])
560 clickvector_x, clickvector_y=[], []
562 clickvector_x.append(item.graph_coords[0])
563 clickvector_y.append(item.graph_coords[1])
565 lineplot=self._get_displayed_plot(0) #get topmost displayed plot
567 lineplot.add_set(xtoplot,ytoplot)
568 lineplot.add_set(clickvector_x, clickvector_y)
571 if lineplot.styles==[]:
572 lineplot.styles=[None,None,None,'scatter']
574 lineplot.styles+=[None,'scatter']
575 if lineplot.colors==[]:
576 lineplot.colors=[None,None,'black',None]
578 lineplot.colors+=['black',None]
581 self._send_plot([lineplot])
586 def linefit_between(self,index1,index2,whatset=1):
588 Creates two vectors (xtofit,ytofit) slicing out from the
589 current return trace a portion delimited by the two indexes
591 Then does a least squares linear fit on that slice.
592 Finally returns [0]=the slope, [1]=the intercept of the
593 fitted 1st grade polynomial, and [2,3]=the actual (x,y) vectors
595 (c) Marco Brucale, Massimo Sandal 2008
597 # Translates the indexes into two vectors containing the x,y data to fit
598 xtofit=self.plots[0].vectors[whatset][0][index1:index2]
599 ytofit=self.plots[0].vectors[whatset][1][index1:index2]
601 # Does the actual linear fitting (simple least squares with numpy.polyfit)
603 linefit=np.polyfit(xtofit,ytofit,1)
605 return (linefit[0],linefit[1],xtofit,ytofit)