1 # Copyright (C) 2008-2010 Alberto Gomez-Kasai
3 # Massimo Sandal <devicerandom@gmail.com>
4 # W. Trevor King <wking@drexel.edu>
6 # This file is part of Hooke.
8 # Hooke is free software: you can redistribute it and/or modify it
9 # under the terms of the GNU Lesser General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # Hooke is distributed in the hope that it will be useful, but WITHOUT
14 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
16 # Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with Hooke. If not, see
20 # <http://www.gnu.org/licenses/>.
22 """The ``curve`` module provides :class:`CurvePlugin` and several
23 associated :class:`hooke.command.Command`\s for handling
24 :mod:`hooke.curve` classes.
31 from ..command import Command, Argument, Failure
32 from ..curve import Data
33 from ..util.calculus import derivative
34 from ..util.fft import unitary_avg_power_spectrum
35 from ..util.si import ppSI, join_data_label, split_data_label
37 from .playlist import current_playlist_callback
40 # Define common or complicated arguments
42 def current_curve_callback(hooke, command, argument, value):
45 playlist = current_playlist_callback(hooke, command, argument, value)
46 curve = playlist.current()
48 raise Failure('No curves in %s' % playlist)
51 CurveArgument = Argument(
52 name='curve', type='curve', callback=current_curve_callback,
54 :class:`hooke.curve.Curve` to act on. Defaults to the current curve
55 of the current playlist.
58 def _name_argument(name, default, help):
61 return Argument(name=name, type='string', default=default, help=help)
63 def block_argument(*args, **kwargs):
66 return _name_argument(*args, **kwargs)
68 def column_argument(*args, **kwargs):
71 return _name_argument(*args, **kwargs)
74 # Define useful command subclasses
76 class CurveCommand (Command):
77 """A :class:`~hooke.command.Command` operating on a
78 :class:`~hooke.curve.Curve`.
80 def __init__(self, **kwargs):
81 if 'arguments' in kwargs:
82 kwargs['arguments'].insert(0, CurveArgument)
84 kwargs['arguments'] = [CurveArgument]
85 super(CurveCommand, self).__init__(**kwargs)
87 def _curve(self, hooke, params):
88 # HACK? rely on params['curve'] being bound to the local hooke
89 # playlist (i.e. not a copy, as you would get by passing a
90 # curve through the queue). Ugh. Stupid queues. As an
91 # alternative, we could pass lookup information through the
93 return params['curve']
96 class BlockCommand (CurveCommand):
97 """A :class:`CurveCommand` operating on a :class:`~hooke.curve.Data` block.
99 def __init__(self, blocks=None, **kwargs):
101 blocks = [('block', None, 'Name of the data block to act on.')]
103 for name,default,help in blocks:
104 block_args.append(block_argument(name, default, help))
105 self._block_arguments = block_args
106 if 'arguments' not in kwargs:
107 kwargs['arguments'] = []
108 kwargs['arguments'] = block_args + kwargs['arguments']
109 super(BlockCommand, self).__init__(**kwargs)
111 def _block_names(self, hooke, params):
112 curve = self._curve(hooke, params)
113 return [b.info['name'] for b in curve.data]
115 def _block_index(self, hooke, params, name=None):
117 name = self._block_arguments[0].name
118 block_name = params[name]
119 if block_name == None:
120 curve = self._curve(hooke=hooke, params=params)
121 if len(curve.data) == 0:
122 raise Failure('no blocks in %s' % curve)
123 block_name = curve.data[0].info['name']
124 names = self._block_names(hooke=hooke, params=params)
126 return names.index(block_name)
127 except ValueError, e:
128 curve = self._curve(hooke, params)
129 raise Failure('no block named %s in %s (%s): %s'
130 % (block_name, curve, names, e))
132 def _block(self, hooke, params, name=None):
133 # HACK? rely on params['block'] being bound to the local hooke
134 # playlist (i.e. not a copy, as you would get by passing a
135 # block through the queue). Ugh. Stupid queues. As an
136 # alternative, we could pass lookup information through the
138 curve = self._curve(hooke, params)
139 index = self._block_index(hooke, params, name)
140 return curve.data[index]
143 class ColumnAccessCommand (BlockCommand):
144 """A :class:`BlockCommand` accessing a :class:`~hooke.curve.Data`
147 def __init__(self, columns=None, **kwargs):
149 columns = [('column', None, 'Name of the data column to act on.')]
151 for name,default,help in columns:
152 column_args.append(column_argument(name, default, help))
153 self._column_arguments = column_args
154 if 'arguments' not in kwargs:
155 kwargs['arguments'] = []
156 kwargs['arguments'] = column_args + kwargs['arguments']
157 super(ColumnAccessCommand, self).__init__(**kwargs)
159 def _get_column(self, hooke, params, block_name=None, column_name=None):
160 if column_name == None:
161 column_name = self._column_arguments[0].name
162 column_name = params[column_name]
163 block = self._block(hooke, params, block_name)
164 columns = block.info['columns']
166 column_index = columns.index(column_name)
167 except ValueError, e:
168 raise Failure('%s not in %s (%s): %s'
169 % (column_name, block.info['name'], columns, e))
170 return block[:,column_index]
173 class ColumnAddingCommand (ColumnAccessCommand):
174 """A :class:`ColumnAccessCommand` that also adds columns.
176 def __init__(self, new_columns=None, **kwargs):
177 if new_columns == None:
180 for name,default,help in new_columns:
181 column_args.append(column_argument(name, default, help))
182 self._new_column_arguments = column_args
183 if 'arguments' not in kwargs:
184 kwargs['arguments'] = []
185 kwargs['arguments'] = column_args + kwargs['arguments']
186 super(ColumnAddingCommand, self).__init__(**kwargs)
188 def _get_column(self, hooke, params, block_name=None, column_name=None):
189 if column_name == None and len(self._column_arguments) == 0:
190 column_name = self._new_column_arguments[0].name
191 return super(ColumnAddingCommand, self)._get_column(
192 hooke=hooke, params=params, block_name=block_name,
193 column_name=column_name)
195 def _set_column(self, hooke, params, block_name=None, column_name=None,
197 if column_name == None:
198 column_name = self._column_arguments[0].name
199 column_name = params[column_name]
200 block = self._block(hooke=hooke, params=params, name=block_name)
201 if column_name not in block.info['columns']:
202 new = Data((block.shape[0], block.shape[1]+1), dtype=block.dtype)
203 new.info = copy.deepcopy(block.info)
205 new.info['columns'].append(column_name)
207 block_index = self._block_index(hooke, params, name=block_name)
208 self._curve(hooke, params).data[block_index] = block
209 column_index = block.info['columns'].index(column_name)
210 block[:,column_index] = values
215 class CurvePlugin (Builtin):
217 super(CurvePlugin, self).__init__(name='curve')
219 GetCommand(self), InfoCommand(self), DeltaCommand(self),
220 ExportCommand(self), DifferenceCommand(self),
221 DerivativeCommand(self), PowerSpectrumCommand(self)]
226 class GetCommand (CurveCommand):
227 """Return a :class:`hooke.curve.Curve`.
229 def __init__(self, plugin):
230 super(GetCommand, self).__init__(
231 name='get curve', help=self.__doc__, plugin=plugin)
233 def _run(self, hooke, inqueue, outqueue, params):
234 outqueue.put(self._curve(hooke, params))
237 class InfoCommand (CurveCommand):
238 """Get selected information about a :class:`hooke.curve.Curve`.
240 def __init__(self, plugin):
242 Argument(name='all', type='bool', default=False, count=1,
243 help='Get all curve information.'),
245 self.fields = ['name', 'path', 'experiment', 'driver', 'filetype', 'note',
246 'blocks', 'block sizes']
247 for field in self.fields:
248 args.append(Argument(
249 name=field, type='bool', default=False, count=1,
250 help='Get curve %s' % field))
251 super(InfoCommand, self).__init__(
252 name='curve info', arguments=args,
253 help=self.__doc__, plugin=plugin)
255 def _run(self, hooke, inqueue, outqueue, params):
256 curve = self._curve(hooke, params)
258 for key in self.fields:
259 fields[key] = params[key]
260 if reduce(lambda x,y: x and y, fields.values()) == False:
261 params['all'] = True # No specific fields set, default to 'all'
262 if params['all'] == True:
263 for key in self.fields:
266 for key in self.fields:
267 if fields[key] == True:
268 get = getattr(self, '_get_%s' % key.replace(' ', '_'))
269 lines.append('%s: %s' % (key, get(curve)))
270 outqueue.put('\n'.join(lines))
272 def _get_name(self, curve):
275 def _get_path(self, curve):
278 def _get_experiment(self, curve):
279 return curve.info.get('experiment', None)
281 def _get_driver(self, curve):
284 def _get_filetype(self, curve):
285 return curve.info.get('filetype', None)
287 def _get_note(self, curve):
288 return curve.info.get('note', None)
290 def _get_blocks(self, curve):
291 return len(curve.data)
293 def _get_block_sizes(self, curve):
294 return [block.shape for block in curve.data]
297 class DeltaCommand (BlockCommand):
298 """Get distance information between two points.
300 With two points A and B, the returned distances are A-B.
302 def __init__(self, plugin):
303 super(DeltaCommand, self).__init__(
306 Argument(name='point', type='point', optional=False, count=2,
308 Indicies of points bounding the selected data.
310 Argument(name='SI', type='bool', default=False,
312 Return distances in SI notation.
315 help=self.__doc__, plugin=plugin)
317 def _run(self, hooke, inqueue, outqueue, params):
318 data = self._block(hooke, params)
319 As = data[params['point'][0],:]
320 Bs = data[params['point'][1],:]
321 ds = [A-B for A,B in zip(As, Bs)]
322 if params['SI'] == False:
323 out = [(name, d) for name,d in zip(data.info['columns'], ds)]
326 for name,d in zip(data.info['columns'], ds):
327 n,units = split_data_label(name)
329 (n, ppSI(value=d, unit=units, decimals=2)))
333 class ExportCommand (BlockCommand):
334 """Export a :class:`hooke.curve.Curve` data block as TAB-delimeted
337 A "#" prefixed header will optionally appear at the beginning of
338 the file naming the columns.
340 def __init__(self, plugin):
341 super(ExportCommand, self).__init__(
344 Argument(name='output', type='file', default='curve.dat',
346 File name for the output data. Defaults to 'curve.dat'
348 Argument(name='header', type='bool', default=True,
350 True if you want the column-naming header line.
353 help=self.__doc__, plugin=plugin)
355 def _run(self, hooke, inqueue, outqueue, params):
356 data = self._block(hooke, params)
358 with open(params['output'], 'w') as f:
359 if params['header'] == True:
360 f.write('# %s \n' % ('\t'.join(data.info['columns'])))
361 numpy.savetxt(f, data, delimiter='\t')
364 class DifferenceCommand (ColumnAddingCommand):
365 """Calculate the difference between two columns of data.
367 The difference is added to block A as a new column.
369 Note that the command will fail if the columns have different
370 lengths, so be careful when differencing columns from different
373 def __init__(self, plugin):
374 super(DifferenceCommand, self).__init__(
378 'Name of block A in A-B. Defaults to the first block'),
380 'Name of block B in A-B. Defaults to matching `block A`.'),
385 Column of data from block A to difference. Defaults to the first column.
389 Column of data from block B to difference. Defaults to matching `column A`.
393 ('output column', None,
395 Name of the new column for storing the difference (without units, defaults to
396 `difference of <block A> <column A> and <block B> <column B>`).
399 help=self.__doc__, plugin=plugin)
401 def _run(self, hooke, inqueue, outqueue, params):
402 params = self.__setup_params(hooke=hooke, params=params)
403 data_A = self._get_column(hooke=hooke, params=params,
404 block_name='block A',
405 column_name='column A')
406 data_B = self._get_column(hooke=hooke, params=params,
407 block_name='block B',
408 column_name='column B')
409 out = data_A - data_B
410 self._set_column(hooke=hooke, params=params,
411 block_name='block A',
412 column_name='output column',
415 def __setup_params(self, hooke, params):
416 curve = self._curve(hooke, params)
417 if params['block A'] == None:
418 params['block A'] = curve.data[0].info['name']
419 if params['block B'] == None:
420 params['block B'] = params['block A']
421 block_A = self._block(hooke, params=params, name='block A')
422 block_B = self._block(hooke, params=params, name='block B')
423 if params['column A'] == None:
424 params['column A'] = block.info['columns'][0]
425 if params['column B'] == None:
426 params['column B'] = params['column A']
427 a_name,a_unit = split_data_label(params['column A'])
428 b_name,b_unit = split_data_label(params['column B'])
430 raise Failure('Unit missmatch: %s != %s' % (a_unit, b_unit))
431 if params['output column'] == None:
432 params['output column'] = join_data_label(
433 'difference of %s %s and %s %s' % (
434 block_A.info['name'], a_name,
435 block_B.info['name'], b_name),
438 params['output column'] = join_data_label(
439 params['output column'], a_unit)
443 class DerivativeCommand (ColumnAddingCommand):
444 """Calculate the derivative (actually, the discrete differentiation)
447 See :func:`hooke.util.calculus.derivative` for implementation
450 def __init__(self, plugin):
451 super(DerivativeCommand, self).__init__(
455 'Column of data block to differentiate with respect to.'),
457 'Column of data block to differentiate.'),
460 ('output column', None,
462 Name of the new column for storing the derivative (without units, defaults to
463 `derivative of <f column> with respect to <x column>`).
467 Argument(name='weights', type='dict', default={-1:-0.5, 1:0.5},
469 Weighting scheme dictionary for finite differencing. Defaults to
470 central differencing.
473 help=self.__doc__, plugin=plugin)
475 def _run(self, hooke, inqueue, outqueue, params):
476 params = self.__setup_params(hooke=hooke, params=params)
477 x_data = self._get_column(hooke=hooke, params=params,
478 column_name='x column')
479 f_data = self._get_column(hooke=hooke, params=params,
480 column_name='f column')
482 x_data=x_data, f_data=f_data, weights=params['weights'])
483 self._set_column(hooke=hooke, params=params,
484 column_name='output column',
487 def __setup_params(self, hooke, params):
488 curve = self._curve(hooke, params)
489 x_name,x_unit = split_data_label(params['x column'])
490 f_name,f_unit = split_data_label(params['f column'])
491 d_unit = '%s/%s' % (f_unit, x_unit)
492 if params['output column'] == None:
493 params['output column'] = join_data_label(
494 'derivative of %s with respect to %s' % (
498 params['output column'] = join_data_label(
499 params['output column'], d_unit)
503 class PowerSpectrumCommand (ColumnAddingCommand):
504 """Calculate the power spectrum of a data column.
506 def __init__(self, plugin):
507 super(PowerSpectrumCommand, self).__init__(
508 name='power spectrum',
510 Argument(name='output block', type='string',
512 Name of the new data block for storing the power spectrum (defaults to
513 `power spectrum of <source block name> <source column name>`).
515 Argument(name='bounds', type='point', optional=True, count=2,
517 Indicies of points bounding the selected data.
519 Argument(name='freq', type='float', default=1.0,
523 Argument(name='freq units', type='string', default='Hz',
525 Units for the sampling frequency.
527 Argument(name='chunk size', type='int', default=2048,
529 Number of samples per chunk. Use a power of two.
531 Argument(name='overlap', type='bool', default=False,
533 If `True`, each chunk overlaps the previous chunk by half its length.
534 Otherwise, the chunks are end-to-end, and not overlapping.
537 help=self.__doc__, plugin=plugin)
539 def _run(self, hooke, inqueue, outqueue, params):
540 params = self.__setup_params(hooke=hooke, params=params)
541 data = self._get_column(hooke=hooke, params=params)
542 bounds = params['bounds']
544 data = data[bounds[0]:bounds[1]]
545 freq_axis,power = unitary_avg_power_spectrum(
546 data, freq=params['freq'],
547 chunk_size=params['chunk size'],
548 overlap=params['overlap'])
549 b = Data((len(freq_axis),2), dtype=data.dtype)
550 b.info['name'] = params['output block']
551 b.info['columns'] = [
552 params['output freq column'],
553 params['output power column'],
555 self._curve(hooke, params).data.append(b)
556 self._set_column(hooke, params, block_name='output block',
557 column_name='output freq column',
559 self._set_column(hooke, params, block_name='output block',
560 column_name='output power column',
564 def __setup_params(self, hooke, params):
565 if params['output block'] in self._block_names(hooke, params):
566 raise Failure('output block %s already exists in %s.'
567 % (params['output block'],
568 self._curve(hooke, params)))
569 data = self._get_column(hooke=hooke, params=params)
570 d_name,d_unit = split_data_label(data.info['name'])
571 if params['output block'] == None:
572 params['output block'] = 'power spectrum of %s %s' % (
573 data.info['name'], params['column'])
574 self.params['output freq column'] = join_data_label(
575 'frequency axis', params['freq units'])
576 self.params['output power column'] = join_data_label(
577 'power density', '%s^2/%s' % (data_units, params['freq units']))
581 class OldCruft (object):
583 def do_forcebase(self,args):
587 Measures the difference in force (in pN) between a point and a baseline
588 took as the average between two points.
590 The baseline is fixed once for a given curve and different force measurements,
591 unless the user wants it to be recalculated
593 Syntax: forcebase [rebase]
594 rebase: Forces forcebase to ask again the baseline
595 max: Instead of asking for a point to measure, asks for two points and use
596 the maximum peak in between
598 rebase=False #if true=we select rebase
599 maxpoint=False #if true=we measure the maximum peak
601 plot=self._get_displayed_plot()
602 whatset=1 #fixme: for all sets
603 if 'rebase' in args or (self.basecurrent != self.current.path):
609 print 'Select baseline'
610 self.basepoints=self._measure_N_points(N=2, whatset=whatset)
611 self.basecurrent=self.current.path
614 print 'Select two points'
615 points=self._measure_N_points(N=2, whatset=whatset)
616 boundpoints=[points[0].index, points[1].index]
619 y=min(plot.vectors[whatset][1][boundpoints[0]:boundpoints[1]])
621 print 'Chosen interval not valid. Try picking it again. Did you pick the same point as begin and end of interval?'
623 print 'Select point to measure'
624 points=self._measure_N_points(N=1, whatset=whatset)
625 #whatplot=points[0].dest
626 y=points[0].graph_coords[1]
628 #fixme: code duplication
629 boundaries=[self.basepoints[0].index, self.basepoints[1].index]
631 to_average=plot.vectors[whatset][1][boundaries[0]:boundaries[1]] #y points to average
633 avg=np.mean(to_average)
635 print str(forcebase*(10**12))+' pN'
636 to_dump='forcebase '+self.current.path+' '+str(forcebase*(10**12))+' pN'
637 self.outlet.push(to_dump)
640 def do_slope(self,args):
644 Measures the slope of a delimited chunk on the return trace.
645 The chunk can be delimited either by two manual clicks, or have
646 a fixed width, given as an argument.
648 Syntax: slope [width]
649 The facultative [width] parameter specifies how many
650 points will be considered for the fit. If [width] is
651 specified, only one click will be required.
652 (c) Marco Brucale, Massimo Sandal 2008
655 # Reads the facultative width argument
661 # Decides between the two forms of user input, as per (args)
663 # Gets the Xs of two clicked points as indexes on the current curve vector
664 print 'Click twice to delimit chunk'
665 points=self._measure_N_points(N=2,whatset=1)
667 print 'Click once on the leftmost point of the chunk (i.e.usually the peak)'
668 points=self._measure_N_points(N=1,whatset=1)
670 slope=self._slope(points,fitspan)
672 # Outputs the relevant slope parameter
675 to_dump='slope '+self.current.path+' '+str(slope)
676 self.outlet.push(to_dump)
678 def _slope(self,points,fitspan):
679 # Calls the function linefit_between
680 parameters=[0,0,[],[]]
682 clickedpoints=[points[0].index,points[1].index]
685 clickedpoints=[points[0].index-fitspan,points[0].index]
688 parameters=self.linefit_between(clickedpoints[0],clickedpoints[1])
690 print 'Cannot fit. Did you click twice the same point?'
693 # Outputs the relevant slope parameter
695 print str(parameters[0])
696 to_dump='slope '+self.curve.path+' '+str(parameters[0])
697 self.outlet.push(to_dump)
699 # Makes a vector with the fitted parameters and sends it to the GUI
700 xtoplot=parameters[2]
704 ytoplot.append((x*parameters[0])+parameters[1])
706 clickvector_x, clickvector_y=[], []
708 clickvector_x.append(item.graph_coords[0])
709 clickvector_y.append(item.graph_coords[1])
711 lineplot=self._get_displayed_plot(0) #get topmost displayed plot
713 lineplot.add_set(xtoplot,ytoplot)
714 lineplot.add_set(clickvector_x, clickvector_y)
717 if lineplot.styles==[]:
718 lineplot.styles=[None,None,None,'scatter']
720 lineplot.styles+=[None,'scatter']
721 if lineplot.colors==[]:
722 lineplot.colors=[None,None,'black',None]
724 lineplot.colors+=['black',None]
727 self._send_plot([lineplot])
732 def linefit_between(self,index1,index2,whatset=1):
734 Creates two vectors (xtofit,ytofit) slicing out from the
735 current return trace a portion delimited by the two indexes
737 Then does a least squares linear fit on that slice.
738 Finally returns [0]=the slope, [1]=the intercept of the
739 fitted 1st grade polynomial, and [2,3]=the actual (x,y) vectors
741 (c) Marco Brucale, Massimo Sandal 2008
743 # Translates the indexes into two vectors containing the x,y data to fit
744 xtofit=self.plots[0].vectors[whatset][0][index1:index2]
745 ytofit=self.plots[0].vectors[whatset][1][index1:index2]
747 # Does the actual linear fitting (simple least squares with numpy.polyfit)
749 linefit=np.polyfit(xtofit,ytofit,1)
751 return (linefit[0],linefit[1],xtofit,ytofit)