1 # Copyright (C) 2008-2010 Alberto Gomez-Kasai
3 # Massimo Sandal <devicerandom@gmail.com>
4 # W. Trevor King <wking@drexel.edu>
6 # This file is part of Hooke.
8 # Hooke is free software: you can redistribute it and/or modify it
9 # under the terms of the GNU Lesser General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # Hooke is distributed in the hope that it will be useful, but WITHOUT
14 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
16 # Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with Hooke. If not, see
20 # <http://www.gnu.org/licenses/>.
22 """The ``curve`` module provides :class:`CurvePlugin` and several
23 associated :class:`hooke.command.Command`\s for handling
24 :mod:`hooke.curve` classes.
31 from ..command import Command, Argument, Failure
32 from ..curve import Data
33 from ..plugin import Builtin
34 from ..plugin.playlist import current_playlist_callback
35 from ..util.calculus import derivative
36 from ..util.fft import unitary_avg_power_spectrum
37 from ..util.si import ppSI, join_data_label, split_data_label
40 # Define common or complicated arguments
42 def current_curve_callback(hooke, command, argument, value):
45 playlist = current_playlist_callback(hooke, command, argument, value)
46 curve = playlist.current()
48 raise Failure('No curves in %s' % playlist)
51 CurveArgument = Argument(
52 name='curve', type='curve', callback=current_curve_callback,
54 :class:`hooke.curve.Curve` to act on. Defaults to the current curve
55 of the current playlist.
58 def _name_argument(name, default, help):
61 return Argument(name=name, type='string', default=default, help=help)
63 def block_argument(*args, **kwargs):
66 return _name_argument(*args, **kwargs)
68 def column_argument(*args, **kwargs):
71 return _name_argument(*args, **kwargs)
74 # Define useful command subclasses
76 class CurveCommand (Command):
77 """A :class:`~hooke.command.Command` operating on a
78 :class:`~hooke.curve.Curve`.
80 def __init__(self, **kwargs):
81 if 'arguments' in kwargs:
82 kwargs['arguments'].insert(0, CurveArgument)
84 kwargs['arguments'] = [CurveArgument]
85 super(CurveCommand, self).__init__(**kwargs)
87 def _curve(self, hooke, params):
88 # HACK? rely on params['curve'] being bound to the local hooke
89 # playlist (i.e. not a copy, as you would get by passing a
90 # curve through the queue). Ugh. Stupid queues. As an
91 # alternative, we could pass lookup information through the
93 return params['curve']
96 class BlockCommand (CurveCommand):
97 """A :class:`CurveCommand` operating on a :class:`~hooke.curve.Data` block.
99 def __init__(self, blocks=None, **kwargs):
101 blocks = [('block', None, 'Name of the data block to act on.')]
103 for name,default,help in blocks:
104 block_args.append(block_argument(name, default, help))
105 self._block_arguments = block_args
106 if 'arguments' not in kwargs:
107 kwargs['arguments'] = []
108 kwargs['arguments'] = block_args + kwargs['arguments']
109 super(BlockCommand, self).__init__(**kwargs)
111 def _block_names(self, hooke, params):
112 curve = self._curve(hooke, params)
113 return [b.info['name'] for b in curve.data]
115 def _block_index(self, hooke, params, name=None):
117 name = self._block_arguments[0].name
118 block_name = params[name]
119 if block_name == None:
120 curve = self._curve(hooke=hooke, params=params)
121 if len(curve.data) == 0:
122 raise Failure('no blocks in %s' % curve)
123 block_name = curve.data[0].info['name']
124 names = self._block_names(hooke=hooke, params=params)
126 return names.index(block_name)
127 except ValueError, e:
128 curve = self._curve(hooke, params)
129 raise Failure('no block named %s in %s (%s): %s'
130 % (block_name, curve, names, e))
132 def _block(self, hooke, params, name=None):
133 # HACK? rely on params['block'] being bound to the local hooke
134 # playlist (i.e. not a copy, as you would get by passing a
135 # block through the queue). Ugh. Stupid queues. As an
136 # alternative, we could pass lookup information through the
138 curve = self._curve(hooke, params)
139 index = self._block_index(hooke, params, name)
140 return curve.data[index]
143 class ColumnAccessCommand (BlockCommand):
144 """A :class:`BlockCommand` accessing a :class:`~hooke.curve.Data`
147 def __init__(self, columns=None, **kwargs):
149 columns = [('column', None, 'Name of the data column to act on.')]
151 for name,default,help in columns:
152 column_args.append(column_argument(name, default, help))
153 self._column_arguments = column_args
154 if 'arguments' not in kwargs:
155 kwargs['arguments'] = []
156 kwargs['arguments'] = column_args + kwargs['arguments']
157 super(ColumnAccessCommand, self).__init__(**kwargs)
159 def _get_column(self, hooke, params, block_name=None, column_name=None):
160 if column_name == None:
161 column_name = self._column_arguments[0].name
162 column_name = params[column_name]
163 block = self._block(hooke, params, block_name)
164 column_index = block.info['columns'].index(column_name)
165 return block[:,column_index]
168 class ColumnAddingCommand (ColumnAccessCommand):
169 """A :class:`ColumnAccessCommand` that also adds columns.
171 def __init__(self, new_columns=None, **kwargs):
172 if new_columns == None:
175 for name,default,help in new_columns:
176 column_args.append(column_argument(name, default, help))
177 self._new_column_arguments = column_args
178 if 'arguments' not in kwargs:
179 kwargs['arguments'] = []
180 kwargs['arguments'] = column_args + kwargs['arguments']
181 super(ColumnAddingCommand, self).__init__(**kwargs)
183 def _get_column(self, hooke, params, block_name=None, column_name=None):
184 if column_name == None and len(self._column_arguments) == 0:
185 column_name = self._new_column_arguments[0].name
186 return super(ColumnAddingCommand, self)._get_column(
187 hooke=hooke, params=params, block_name=block_name,
188 column_name=column_name)
190 def _set_column(self, hooke, params, block_name=None, column_name=None,
192 if column_name == None:
193 column_name = self._column_arguments[0].name
194 column_name = params[column_name]
195 block = self._block(hooke=hooke, params=params, name=block_name)
196 if column_name not in block.info['columns']:
197 new = Data((block.shape[0], block.shape[1]+1), dtype=block.dtype)
198 new.info = copy.deepcopy(block.info)
200 new.info['columns'].append(column_name)
202 block_index = self._block_index(hooke, params, name=block_name)
203 self._curve(hooke, params).data[block_index] = block
204 column_index = block.info['columns'].index(column_name)
205 block[:,column_index] = values
210 class CurvePlugin (Builtin):
212 super(CurvePlugin, self).__init__(name='curve')
214 GetCommand(self), InfoCommand(self), DeltaCommand(self),
215 ExportCommand(self), DifferenceCommand(self),
216 DerivativeCommand(self), PowerSpectrumCommand(self)]
221 class GetCommand (CurveCommand):
222 """Return a :class:`hooke.curve.Curve`.
224 def __init__(self, plugin):
225 super(GetCommand, self).__init__(
226 name='get curve', help=self.__doc__, plugin=plugin)
228 def _run(self, hooke, inqueue, outqueue, params):
229 outqueue.put(self._curve(hooke, params))
232 class InfoCommand (CurveCommand):
233 """Get selected information about a :class:`hooke.curve.Curve`.
235 def __init__(self, plugin):
237 Argument(name='all', type='bool', default=False, count=1,
238 help='Get all curve information.'),
240 self.fields = ['name', 'path', 'experiment', 'driver', 'filetype', 'note',
241 'blocks', 'block sizes']
242 for field in self.fields:
243 args.append(Argument(
244 name=field, type='bool', default=False, count=1,
245 help='Get curve %s' % field))
246 super(InfoCommand, self).__init__(
247 name='curve info', arguments=args,
248 help=self.__doc__, plugin=plugin)
250 def _run(self, hooke, inqueue, outqueue, params):
251 curve = self._curve(hooke, params)
253 for key in self.fields:
254 fields[key] = params[key]
255 if reduce(lambda x,y: x and y, fields.values()) == False:
256 params['all'] = True # No specific fields set, default to 'all'
257 if params['all'] == True:
258 for key in self.fields:
261 for key in self.fields:
262 if fields[key] == True:
263 get = getattr(self, '_get_%s' % key.replace(' ', '_'))
264 lines.append('%s: %s' % (key, get(curve)))
265 outqueue.put('\n'.join(lines))
267 def _get_name(self, curve):
270 def _get_path(self, curve):
273 def _get_experiment(self, curve):
274 return curve.info.get('experiment', None)
276 def _get_driver(self, curve):
279 def _get_filetype(self, curve):
280 return curve.info.get('filetype', None)
282 def _get_note(self, curve):
283 return curve.info.get('note', None)
285 def _get_blocks(self, curve):
286 return len(curve.data)
288 def _get_block_sizes(self, curve):
289 return [block.shape for block in curve.data]
292 class DeltaCommand (BlockCommand):
293 """Get distance information between two points.
295 With two points A and B, the returned distances are A-B.
297 def __init__(self, plugin):
298 super(DeltaCommand, self).__init__(
301 Argument(name='point', type='point', optional=False, count=2,
303 Indicies of points bounding the selected data.
305 Argument(name='SI', type='bool', default=False,
307 Return distances in SI notation.
310 help=self.__doc__, plugin=plugin)
312 def _run(self, hooke, inqueue, outqueue, params):
313 data = self._block(hooke, params)
314 As = data[params['point'][0],:]
315 Bs = data[params['point'][1],:]
316 ds = [A-B for A,B in zip(As, Bs)]
317 if params['SI'] == False:
318 out = [(name, d) for name,d in zip(data.info['columns'], ds)]
321 for name,d in zip(data.info['columns'], ds):
322 n,units = split_data_label(name)
324 (n, ppSI(value=d, unit=units, decimals=2)))
328 class ExportCommand (BlockCommand):
329 """Export a :class:`hooke.curve.Curve` data block as TAB-delimeted
332 A "#" prefixed header will optionally appear at the beginning of
333 the file naming the columns.
335 def __init__(self, plugin):
336 super(ExportCommand, self).__init__(
339 Argument(name='output', type='file', default='curve.dat',
341 File name for the output data. Defaults to 'curve.dat'
343 Argument(name='header', type='bool', default=True,
345 True if you want the column-naming header line.
348 help=self.__doc__, plugin=plugin)
350 def _run(self, hooke, inqueue, outqueue, params):
351 data = self._block(hooke, params)
353 with open(params['output'], 'w') as f:
354 if params['header'] == True:
355 f.write('# %s \n' % ('\t'.join(data.info['columns'])))
356 numpy.savetxt(f, data, delimiter='\t')
359 class DifferenceCommand (ColumnAddingCommand):
360 """Calculate the difference between two columns of data.
362 The difference is added to block A as a new column.
364 Note that the command will fail if the columns have different
365 lengths, so be careful when differencing columns from different
368 def __init__(self, plugin):
369 super(DifferenceCommand, self).__init__(
373 'Name of block A in A-B. Defaults to the first block'),
375 'Name of block B in A-B. Defaults to matching `block A`.'),
380 Column of data from block A to difference. Defaults to the first column.
384 Column of data from block B to difference. Defaults to matching `column A`.
388 ('output column', None,
390 Name of the new column for storing the difference (without units, defaults to
391 `difference of <block A> <column A> and <block B> <column B>`).
394 help=self.__doc__, plugin=plugin)
396 def _run(self, hooke, inqueue, outqueue, params):
397 params = self.__setup_params(hooke=hooke, params=params)
398 data_A = self._get_column(hooke=hooke, params=params,
399 block_name='block A',
400 column_name='column A')
401 data_B = self._get_column(hooke=hooke, params=params,
402 block_name='block B',
403 column_name='column B')
404 out = data_A - data_B
405 self._set_column(hooke=hooke, params=params,
406 block_name='block A',
407 column_name='output column',
410 def __setup_params(self, hooke, params):
411 curve = self._curve(hooke, params)
412 if params['block A'] == None:
413 params['block A'] = curve.data[0].info['name']
414 if params['block B'] == None:
415 params['block B'] = params['block A']
416 block_A = self._block(hooke, params=params, name='block A')
417 block_B = self._block(hooke, params=params, name='block B')
418 if params['column A'] == None:
419 params['column A'] = block.info['columns'][0]
420 if params['column B'] == None:
421 params['column B'] = params['column A']
422 a_name,a_unit = split_data_label(params['column A'])
423 b_name,b_unit = split_data_label(params['column B'])
425 raise Failure('Unit missmatch: %s != %s' % (a_unit, b_unit))
426 if params['output column'] == None:
427 params['output column'] = join_data_label(
428 'difference of %s %s and %s %s' % (
429 block_A.info['name'], a_name,
430 block_B.info['name'], b_name),
433 params['output column'] = join_data_label(
434 params['output column'], a_unit)
438 class DerivativeCommand (ColumnAddingCommand):
439 """Calculate the derivative (actually, the discrete differentiation)
442 See :func:`hooke.util.calculus.derivative` for implementation
445 def __init__(self, plugin):
446 super(DerivativeCommand, self).__init__(
450 'Column of data block to differentiate with respect to.'),
452 'Column of data block to differentiate.'),
455 ('output column', None,
457 Name of the new column for storing the derivative (without units, defaults to
458 `derivative of <f column> with respect to <x column>`).
462 Argument(name='weights', type='dict', default={-1:-0.5, 1:0.5},
464 Weighting scheme dictionary for finite differencing. Defaults to
465 central differencing.
468 help=self.__doc__, plugin=plugin)
470 def _run(self, hooke, inqueue, outqueue, params):
471 params = self.__setup_params(hooke=hooke, params=params)
472 x_data = self._get_column(hooke=hooke, params=params,
473 column_name='x column')
474 f_data = self._get_column(hooke=hooke, params=params,
475 column_name='f column')
477 x_data=x_data, f_data=f_data, weights=params['weights'])
478 self._set_column(hooke=hooke, params=params,
479 column_name='output column',
482 def __setup_params(self, hooke, params):
483 curve = self._curve(hooke, params)
484 x_name,x_unit = split_data_label(params['x column'])
485 f_name,f_unit = split_data_label(params['f column'])
486 d_unit = '%s/%s' % (f_unit, x_unit)
487 if params['output column'] == None:
488 params['output column'] = join_data_label(
489 'derivative of %s with respect to %s' % (
493 params['output column'] = join_data_label(
494 params['output column'], d_unit)
498 class PowerSpectrumCommand (ColumnAddingCommand):
499 """Calculate the power spectrum of a data column.
501 def __init__(self, plugin):
502 super(PowerSpectrumCommand, self).__init__(
503 name='power spectrum',
505 Argument(name='output block', type='string',
507 Name of the new data block for storing the power spectrum (defaults to
508 `power spectrum of <source block name> <source column name>`).
510 Argument(name='bounds', type='point', optional=True, count=2,
512 Indicies of points bounding the selected data.
514 Argument(name='freq', type='float', default=1.0,
518 Argument(name='freq units', type='string', default='Hz',
520 Units for the sampling frequency.
522 Argument(name='chunk size', type='int', default=2048,
524 Number of samples per chunk. Use a power of two.
526 Argument(name='overlap', type='bool', default=False,
528 If `True`, each chunk overlaps the previous chunk by half its length.
529 Otherwise, the chunks are end-to-end, and not overlapping.
532 help=self.__doc__, plugin=plugin)
534 def _run(self, hooke, inqueue, outqueue, params):
535 params = self.__setup_params(hooke=hooke, params=params)
536 data = self._get_column(hooke=hooke, params=params)
537 bounds = params['bounds']
539 data = data[bounds[0]:bounds[1]]
540 freq_axis,power = unitary_avg_power_spectrum(
541 data, freq=params['freq'],
542 chunk_size=params['chunk size'],
543 overlap=params['overlap'])
544 b = Data((len(freq_axis),2), dtype=data.dtype)
545 b.info['name'] = params['output block']
546 b.info['columns'] = [
547 params['output freq column'],
548 params['output power column'],
550 self._curve(hooke, params).data.append(b)
551 self._set_column(hooke, params, block_name='output block',
552 column_name='output freq column',
554 self._set_column(hooke, params, block_name='output block',
555 column_name='output power column',
559 def __setup_params(self, hooke, params):
560 if params['output block'] in self._block_names(hooke, params):
561 raise Failure('output block %s already exists in %s.'
562 % (params['output block'],
563 self._curve(hooke, params)))
564 data = self._get_column(hooke=hooke, params=params)
565 d_name,d_unit = split_data_label(data.info['name'])
566 if params['output block'] == None:
567 params['output block'] = 'power spectrum of %s %s' % (
568 data.info['name'], params['column'])
569 self.params['output freq column'] = join_data_label(
570 'frequency axis', params['freq units'])
571 self.params['output power column'] = join_data_label(
572 'power density', '%s^2/%s' % (data_units, params['freq units']))
576 class OldCruft (object):
578 def do_forcebase(self,args):
582 Measures the difference in force (in pN) between a point and a baseline
583 took as the average between two points.
585 The baseline is fixed once for a given curve and different force measurements,
586 unless the user wants it to be recalculated
588 Syntax: forcebase [rebase]
589 rebase: Forces forcebase to ask again the baseline
590 max: Instead of asking for a point to measure, asks for two points and use
591 the maximum peak in between
593 rebase=False #if true=we select rebase
594 maxpoint=False #if true=we measure the maximum peak
596 plot=self._get_displayed_plot()
597 whatset=1 #fixme: for all sets
598 if 'rebase' in args or (self.basecurrent != self.current.path):
604 print 'Select baseline'
605 self.basepoints=self._measure_N_points(N=2, whatset=whatset)
606 self.basecurrent=self.current.path
609 print 'Select two points'
610 points=self._measure_N_points(N=2, whatset=whatset)
611 boundpoints=[points[0].index, points[1].index]
614 y=min(plot.vectors[whatset][1][boundpoints[0]:boundpoints[1]])
616 print 'Chosen interval not valid. Try picking it again. Did you pick the same point as begin and end of interval?'
618 print 'Select point to measure'
619 points=self._measure_N_points(N=1, whatset=whatset)
620 #whatplot=points[0].dest
621 y=points[0].graph_coords[1]
623 #fixme: code duplication
624 boundaries=[self.basepoints[0].index, self.basepoints[1].index]
626 to_average=plot.vectors[whatset][1][boundaries[0]:boundaries[1]] #y points to average
628 avg=np.mean(to_average)
630 print str(forcebase*(10**12))+' pN'
631 to_dump='forcebase '+self.current.path+' '+str(forcebase*(10**12))+' pN'
632 self.outlet.push(to_dump)
635 def do_slope(self,args):
639 Measures the slope of a delimited chunk on the return trace.
640 The chunk can be delimited either by two manual clicks, or have
641 a fixed width, given as an argument.
643 Syntax: slope [width]
644 The facultative [width] parameter specifies how many
645 points will be considered for the fit. If [width] is
646 specified, only one click will be required.
647 (c) Marco Brucale, Massimo Sandal 2008
650 # Reads the facultative width argument
656 # Decides between the two forms of user input, as per (args)
658 # Gets the Xs of two clicked points as indexes on the current curve vector
659 print 'Click twice to delimit chunk'
660 points=self._measure_N_points(N=2,whatset=1)
662 print 'Click once on the leftmost point of the chunk (i.e.usually the peak)'
663 points=self._measure_N_points(N=1,whatset=1)
665 slope=self._slope(points,fitspan)
667 # Outputs the relevant slope parameter
670 to_dump='slope '+self.current.path+' '+str(slope)
671 self.outlet.push(to_dump)
673 def _slope(self,points,fitspan):
674 # Calls the function linefit_between
675 parameters=[0,0,[],[]]
677 clickedpoints=[points[0].index,points[1].index]
680 clickedpoints=[points[0].index-fitspan,points[0].index]
683 parameters=self.linefit_between(clickedpoints[0],clickedpoints[1])
685 print 'Cannot fit. Did you click twice the same point?'
688 # Outputs the relevant slope parameter
690 print str(parameters[0])
691 to_dump='slope '+self.curve.path+' '+str(parameters[0])
692 self.outlet.push(to_dump)
694 # Makes a vector with the fitted parameters and sends it to the GUI
695 xtoplot=parameters[2]
699 ytoplot.append((x*parameters[0])+parameters[1])
701 clickvector_x, clickvector_y=[], []
703 clickvector_x.append(item.graph_coords[0])
704 clickvector_y.append(item.graph_coords[1])
706 lineplot=self._get_displayed_plot(0) #get topmost displayed plot
708 lineplot.add_set(xtoplot,ytoplot)
709 lineplot.add_set(clickvector_x, clickvector_y)
712 if lineplot.styles==[]:
713 lineplot.styles=[None,None,None,'scatter']
715 lineplot.styles+=[None,'scatter']
716 if lineplot.colors==[]:
717 lineplot.colors=[None,None,'black',None]
719 lineplot.colors+=['black',None]
722 self._send_plot([lineplot])
727 def linefit_between(self,index1,index2,whatset=1):
729 Creates two vectors (xtofit,ytofit) slicing out from the
730 current return trace a portion delimited by the two indexes
732 Then does a least squares linear fit on that slice.
733 Finally returns [0]=the slope, [1]=the intercept of the
734 fitted 1st grade polynomial, and [2,3]=the actual (x,y) vectors
736 (c) Marco Brucale, Massimo Sandal 2008
738 # Translates the indexes into two vectors containing the x,y data to fit
739 xtofit=self.plots[0].vectors[whatset][0][index1:index2]
740 ytofit=self.plots[0].vectors[whatset][1][index1:index2]
742 # Does the actual linear fitting (simple least squares with numpy.polyfit)
744 linefit=np.polyfit(xtofit,ytofit,1)
746 return (linefit[0],linefit[1],xtofit,ytofit)