-# Copyright (C) 2008-2010 Alberto Gomez-Casado
-# Fabrizio Benedetti
+# Copyright (C) 2008-2010 Alberto Gomez-Kasai
+# Fabiano's Benedetti
# Massimo Sandal <devicerandom@gmail.com>
# W. Trevor King <wking@drexel.edu>
#
from ..util.si import ppSI, join_data_label, split_data_label
-
-class CurvePlugin (Builtin):
- def __init__(self):
- super(CurvePlugin, self).__init__(name='curve')
- self._commands = [
- GetCommand(self), InfoCommand(self), DeltaCommand(self),
- ExportCommand(self), DifferenceCommand(self),
- DerivativeCommand(self), PowerSpectrumCommand(self)]
-
-
# Define common or complicated arguments
def current_curve_callback(hooke, command, argument, value):
of the current playlist.
""".strip())
+def _name_argument(name, default, help):
+ """TODO
+ """
+ return Argument(name=name, type='string', default=default, help=help)
+
+def block_argument(*args, **kwargs):
+ """TODO
+ """
+ return _name_argument(*args, **kwargs)
+
+def column_argument(*args, **kwargs):
+ """TODO
+ """
+ return _name_argument(*args, **kwargs)
+
+
+# Define useful command subclasses
+
+class CurveCommand (Command):
+ """A :class:`~hooke.command.Command` operating on a
+ :class:`~hooke.curve.Curve`.
+ """
+ def __init__(self, **kwargs):
+ if 'arguments' in kwargs:
+ kwargs['arguments'].insert(0, CurveArgument)
+ else:
+ kwargs['arguments'] = [CurveArgument]
+ super(CurveCommand, self).__init__(**kwargs)
+
+ def _curve(self, hooke, params):
+ # HACK? rely on params['curve'] being bound to the local hooke
+ # playlist (i.e. not a copy, as you would get by passing a
+ # curve through the queue). Ugh. Stupid queues. As an
+ # alternative, we could pass lookup information through the
+ # queue...
+ return params['curve']
+
+
+class BlockCommand (CurveCommand):
+ """A :class:`CurveCommand` operating on a :class:`~hooke.curve.Data` block.
+ """
+ def __init__(self, blocks=None, **kwargs):
+ if blocks == None:
+ blocks = [('block', None, 'Name of the data block to act on.')]
+ block_args = []
+ for name,default,help in blocks:
+ block_args.append(block_argument(name, default, help))
+ self._block_arguments = block_args
+ if 'arguments' not in kwargs:
+ kwargs['arguments'] = []
+ kwargs['arguments'] = block_args + kwargs['arguments']
+ super(BlockCommand, self).__init__(**kwargs)
+
+ def _block_names(self, hooke, params):
+ curve = self._curve(hooke, params)
+ return [b.info['name'] for b in curve.data]
+
+ def _block_index(self, hooke, params, name=None):
+ if name == None:
+ name = self._block_arguments[0].name
+ block_name = params[name]
+ if block_name == None:
+ curve = self._curve(hooke=hooke, params=params)
+ if len(curve.data) == 0:
+ raise Failure('no blocks in %s' % curve)
+ block_name = curve.data[0].info['name']
+ names = self._block_names(hooke=hooke, params=params)
+ try:
+ return names.index(block_name)
+ except ValueError, e:
+ curve = self._curve(hooke, params)
+ raise Failure('no block named %s in %s (%s): %s'
+ % (block_name, curve, names, e))
+
+ def _block(self, hooke, params, name=None):
+ # HACK? rely on params['block'] being bound to the local hooke
+ # playlist (i.e. not a copy, as you would get by passing a
+ # block through the queue). Ugh. Stupid queues. As an
+ # alternative, we could pass lookup information through the
+ # queue...
+ curve = self._curve(hooke, params)
+ index = self._block_index(hooke, params, name)
+ return curve.data[index]
+
+
+class ColumnAccessCommand (BlockCommand):
+ """A :class:`BlockCommand` accessing a :class:`~hooke.curve.Data`
+ block column.
+ """
+ def __init__(self, columns=None, **kwargs):
+ if columns == None:
+ columns = [('column', None, 'Name of the data column to act on.')]
+ column_args = []
+ for name,default,help in columns:
+ column_args.append(column_argument(name, default, help))
+ self._column_arguments = column_args
+ if 'arguments' not in kwargs:
+ kwargs['arguments'] = []
+ kwargs['arguments'] = column_args + kwargs['arguments']
+ super(ColumnAccessCommand, self).__init__(**kwargs)
+
+ def _get_column(self, hooke, params, block_name=None, column_name=None):
+ if column_name == None:
+ column_name = self._column_arguments[0].name
+ column_name = params[column_name]
+ block = self._block(hooke, params, block_name)
+ column_index = block.info['columns'].index(column_name)
+ return block[:,column_index]
+
+
+class ColumnAddingCommand (ColumnAccessCommand):
+ """A :class:`ColumnAccessCommand` that also adds columns.
+ """
+ def __init__(self, new_columns=None, **kwargs):
+ if new_columns == None:
+ new_columns = []
+ column_args = []
+ for name,default,help in new_columns:
+ column_args.append(column_argument(name, default, help))
+ self._new_column_arguments = column_args
+ if 'arguments' not in kwargs:
+ kwargs['arguments'] = []
+ kwargs['arguments'] = column_args + kwargs['arguments']
+ super(ColumnAddingCommand, self).__init__(**kwargs)
+
+ def _get_column(self, hooke, params, block_name=None, column_name=None):
+ if column_name == None and len(self._column_arguments) == 0:
+ column_name = self._new_column_arguments[0].name
+ return super(ColumnAddingCommand, self)._get_column(
+ hooke=hooke, params=params, block_name=block_name,
+ column_name=column_name)
+
+ def _set_column(self, hooke, params, block_name=None, column_name=None,
+ values=None):
+ if column_name == None:
+ column_name = self._column_arguments[0].name
+ column_name = params[column_name]
+ block = self._block(hooke=hooke, params=params, name=block_name)
+ if column_name not in block.info['columns']:
+ new = Data((block.shape[0], block.shape[1]+1), dtype=block.dtype)
+ new.info = copy.deepcopy(block.info)
+ new[:,:-1] = block
+ new.info['columns'].append(column_name)
+ block = new
+ block_index = self._block_index(hooke, params, name=block_name)
+ self._curve(hooke, params).data[block_index] = block
+ column_index = block.info['columns'].index(column_name)
+ block[:,column_index] = values
+
+
+# The plugin itself
+
+class CurvePlugin (Builtin):
+ def __init__(self):
+ super(CurvePlugin, self).__init__(name='curve')
+ self._commands = [
+ GetCommand(self), InfoCommand(self), DeltaCommand(self),
+ ExportCommand(self), DifferenceCommand(self),
+ DerivativeCommand(self), PowerSpectrumCommand(self)]
+
# Define commands
-class GetCommand (Command):
+class GetCommand (CurveCommand):
"""Return a :class:`hooke.curve.Curve`.
"""
def __init__(self, plugin):
super(GetCommand, self).__init__(
- name='get curve', arguments=[CurveArgument],
- help=self.__doc__, plugin=plugin)
+ name='get curve', help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- outqueue.put(params['curve'])
+ outqueue.put(self._curve(hooke, params))
-class InfoCommand (Command):
+
+class InfoCommand (CurveCommand):
"""Get selected information about a :class:`hooke.curve.Curve`.
"""
def __init__(self, plugin):
args = [
- CurveArgument,
Argument(name='all', type='bool', default=False, count=1,
help='Get all curve information.'),
]
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
+ curve = self._curve(hooke, params)
fields = {}
for key in self.fields:
fields[key] = params[key]
for key in self.fields:
if fields[key] == True:
get = getattr(self, '_get_%s' % key.replace(' ', '_'))
- lines.append('%s: %s' % (key, get(params['curve'])))
+ lines.append('%s: %s' % (key, get(curve)))
outqueue.put('\n'.join(lines))
def _get_name(self, curve):
return [block.shape for block in curve.data]
-class DeltaCommand (Command):
+class DeltaCommand (BlockCommand):
"""Get distance information between two points.
With two points A and B, the returned distances are A-B.
super(DeltaCommand, self).__init__(
name='delta',
arguments=[
- CurveArgument,
- Argument(name='block', type='int', default=0,
- help="""
-Data block that points are selected from. For an approach/retract
-force curve, `0` selects the approaching curve and `1` selects the
-retracting curve.
-""".strip()),
Argument(name='point', type='point', optional=False, count=2,
help="""
Indicies of points bounding the selected data.
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
+ data = self._block(hooke, params)
As = data[params['point'][0],:]
Bs = data[params['point'][1],:]
ds = [A-B for A,B in zip(As, Bs)]
outqueue.put(out)
-class ExportCommand (Command):
+class ExportCommand (BlockCommand):
"""Export a :class:`hooke.curve.Curve` data block as TAB-delimeted
ASCII text.
super(ExportCommand, self).__init__(
name='export block',
arguments=[
- CurveArgument,
- Argument(name='block', type='int', default=0,
- help="""
-Data block to save. For an approach/retract force curve, `0` selects
-the approaching curve and `1` selects the retracting curve.
-""".strip()),
Argument(name='output', type='file', default='curve.dat',
help="""
File name for the output data. Defaults to 'curve.dat'
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
+ data = self._block(hooke, params)
+
+ with open(params['output'], 'w') as f:
+ if params['header'] == True:
+ f.write('# %s \n' % ('\t'.join(data.info['columns'])))
+ numpy.savetxt(f, data, delimiter='\t')
- f = open(params['output'], 'w')
- if params['header'] == True:
- f.write('# %s \n' % ('\t'.join(data.info['columns'])))
- numpy.savetxt(f, data, delimiter='\t')
- f.close()
-class DifferenceCommand (Command):
+class DifferenceCommand (ColumnAddingCommand):
"""Calculate the difference between two columns of data.
The difference is added to block A as a new column.
def __init__(self, plugin):
super(DifferenceCommand, self).__init__(
name='difference',
- arguments=[
- CurveArgument,
- Argument(name='block A', type='int',
- help="""
-Block A in A-B. For an approach/retract force curve, `0` selects the
-approaching curve and `1` selects the retracting curve. Defaults to
-the first block.
-""".strip()),
- Argument(name='block B', type='int',
- help="""
-Block B in A-B. Defaults to matching `block A`.
-""".strip()),
- Argument(name='column A', type='string',
- help="""
+ blocks=[
+ ('block A', None,
+ 'Name of block A in A-B. Defaults to the first block'),
+ ('block B', None,
+ 'Name of block B in A-B. Defaults to matching `block A`.'),
+ ],
+ columns=[
+ ('column A', None,
+ """
Column of data from block A to difference. Defaults to the first column.
""".strip()),
- Argument(name='column B', type='string', default=1,
- help="""
+ ('column B', None,
+ """
Column of data from block B to difference. Defaults to matching `column A`.
""".strip()),
- Argument(name='output column name', type='string',
- help="""
+ ],
+ new_columns=[
+ ('output column', None,
+ """
Name of the new column for storing the difference (without units, defaults to
`difference of <block A> <column A> and <block B> <column B>`).
""".strip()),
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data_A = params['curve'].data[params['block A']]
- data_B = params['curve'].data[params['block B']]
- # HACK? rely on params['curve'] being bound to the local hooke
- # playlist (i.e. not a copy, as you would get by passing a
- # curve through the queue). Ugh. Stupid queues. As an
- # alternative, we could pass lookup information through the
- # queue...
- new = Data((data_A.shape[0], data_A.shape[1]+1), dtype=data_A.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-1] = data_A
-
- a_col = data_A.info['columns'].index(params['column A'])
- b_col = data_A.info['columns'].index(params['column A'])
- out = data_A[:,a_col] - data_B[:,b_col]
-
- a_name,a_units = split_data_label(params['column A'])
- b_name,b_units = split_data_label(params['column B'])
- assert a_units == b_units, (
- 'Unit missmatch: %s != %s' % (a_units, b_units))
- if params['output column name'] == None:
- params['output column name'] = (
+ params = self.__setup_params(hooke=hooke, params=params)
+ data_A = self._get_column(hooke=hooke, params=params,
+ block_name='block A',
+ column_name='column A')
+ data_B = self._get_column(hooke=hooke, params=params,
+ block_name='block B',
+ column_name='column B')
+ out = data_A - data_B
+ self._set_column(hooke=hooke, params=params,
+ block_name='block A',
+ column_name='output column',
+ values=out)
+
+ def __setup_params(self, hooke, params):
+ curve = self._curve(hooke, params)
+ if params['block A'] == None:
+ params['block A'] = curve.data[0].info['name']
+ if params['block B'] == None:
+ params['block B'] = params['block A']
+ block_A = self._block(hooke, params=params, name='block A')
+ block_B = self._block(hooke, params=params, name='block B')
+ if params['column A'] == None:
+ params['column A'] = block.info['columns'][0]
+ if params['column B'] == None:
+ params['column B'] = params['column A']
+ a_name,a_unit = split_data_label(params['column A'])
+ b_name,b_unit = split_data_label(params['column B'])
+ if a_unit != b_unit:
+ raise Failure('Unit missmatch: %s != %s' % (a_unit, b_unit))
+ if params['output column'] == None:
+ params['output column'] = join_data_label(
'difference of %s %s and %s %s' % (
- block_A.info['name'], params['column A'],
- block_B.info['name'], params['column B']))
- new.info['columns'].append(
- join_data_label(params['output distance column'], a_units))
- new[:,-1] = out
- params['curve'].data[params['block A']] = new
+ block_A.info['name'], a_name,
+ block_B.info['name'], b_name),
+ a_unit)
+ else:
+ params['output column'] = join_data_label(
+ params['output column'], a_unit)
+ return params
-class DerivativeCommand (Command):
+class DerivativeCommand (ColumnAddingCommand):
"""Calculate the derivative (actually, the discrete differentiation)
- of a curve data block.
+ of a data column.
See :func:`hooke.util.calculus.derivative` for implementation
details.
def __init__(self, plugin):
super(DerivativeCommand, self).__init__(
name='derivative',
- arguments=[
- CurveArgument,
- Argument(name='block', type='int', default=0,
- help="""
-Data block to differentiate. For an approach/retract force curve, `0`
-selects the approaching curve and `1` selects the retracting curve.
-""".strip()),
- Argument(name='x column', type='string',
- help="""
-Column of data block to differentiate with respect to.
-""".strip()),
- Argument(name='f column', type='string',
- help="""
-Column of data block to differentiate.
+ columns=[
+ ('x column', None,
+ 'Column of data block to differentiate with respect to.'),
+ ('f column', None,
+ 'Column of data block to differentiate.'),
+ ],
+ new_columns=[
+ ('output column', None,
+ """
+Name of the new column for storing the derivative (without units, defaults to
+`derivative of <f column> with respect to <x column>`).
""".strip()),
+ ],
+ arguments=[
Argument(name='weights', type='dict', default={-1:-0.5, 1:0.5},
help="""
Weighting scheme dictionary for finite differencing. Defaults to
central differencing.
-""".strip()),
- Argument(name='output column name', type='string',
- help="""
-Name of the new column for storing the derivative (without units, defaults to
-`derivative of <f column name> with respect to <x column name>`).
""".strip()),
],
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
- # HACK? rely on params['curve'] being bound to the local hooke
- # playlist (i.e. not a copy, as you would get by passing a
- # curve through the queue). Ugh. Stupid queues. As an
- # alternative, we could pass lookup information through the
- # queue...
- new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
- new.info = copy.deepcopy(data.info)
- new[:,:-1] = data
-
- x_col = data.info['columns'].index(params['x column'])
- f_col = data.info['columns'].index(params['f column'])
+ params = self.__setup_params(hooke=hooke, params=params)
+ x_data = self._get_column(hooke=hooke, params=params,
+ column_name='x column')
+ f_data = self._get_column(hooke=hooke, params=params,
+ column_name='f column')
d = derivative(
- block, x_col=x_col, f_col=f_col, weights=params['weights'])
-
- x_name,x_units = split_data_label(params['x column'])
- f_name,f_units = split_data_label(params['f column'])
- if params['output column name'] == None:
- params['output column name'] = (
+ x_data=x_data, f_data=f_data, weights=params['weights'])
+ self._set_column(hooke=hooke, params=params,
+ column_name='output column',
+ values=d)
+
+ def __setup_params(self, hooke, params):
+ curve = self._curve(hooke, params)
+ x_name,x_unit = split_data_label(params['x column'])
+ f_name,f_unit = split_data_label(params['f column'])
+ d_unit = '%s/%s' % (f_unit, x_unit)
+ if params['output column'] == None:
+ params['output column'] = join_data_label(
'derivative of %s with respect to %s' % (
- params['f column'], params['x column']))
-
- new.info['columns'].append(
- join_data_label(params['output distance column'],
- '%s/%s' % (f_units/x_units)))
- new[:,-1] = d[:,1]
- params['curve'].data[params['block']] = new
+ f_name, x_name),
+ d_unit)
+ else:
+ params['output column'] = join_data_label(
+ params['output column'], d_unit)
+ return params
-class PowerSpectrumCommand (Command):
- """Calculate the power spectrum of a data block.
+class PowerSpectrumCommand (ColumnAddingCommand):
+ """Calculate the power spectrum of a data column.
"""
def __init__(self, plugin):
super(PowerSpectrumCommand, self).__init__(
name='power spectrum',
arguments=[
- CurveArgument,
- Argument(name='block', type='int', default=0,
- help="""
-Data block to act on. For an approach/retract force curve, `0`
-selects the approaching curve and `1` selects the retracting curve.
-""".strip()),
- Argument(name='column', type='string', optional=False,
+ Argument(name='output block', type='string',
help="""
-Name of the data block column containing to-be-transformed data.
+Name of the new data block for storing the power spectrum (defaults to
+`power spectrum of <source block name> <source column name>`).
""".strip()),
Argument(name='bounds', type='point', optional=True, count=2,
help="""
help="""
If `True`, each chunk overlaps the previous chunk by half its length.
Otherwise, the chunks are end-to-end, and not overlapping.
-""".strip()),
- Argument(name='output block name', type='string',
- help="""
-Name of the new data block for storing the power spectrum (defaults to
-`power spectrum of <source block name> <source column name>`).
""".strip()),
],
help=self.__doc__, plugin=plugin)
def _run(self, hooke, inqueue, outqueue, params):
- data = params['curve'].data[params['block']]
- col = data.info['columns'].index(params['column'])
- d = data[:,col]
+ params = self.__setup_params(hooke=hooke, params=params)
+ data = self._get_column(hooke=hooke, params=params)
+ bounds = params['bounds']
if bounds != None:
- d = d[params['bounds'][0]:params['bounds'][1]]
+ data = data[bounds[0]:bounds[1]]
freq_axis,power = unitary_avg_power_spectrum(
- d, freq=params['freq'],
+ data, freq=params['freq'],
chunk_size=params['chunk size'],
overlap=params['overlap'])
-
- name,data_units = split_data_label(params['column'])
b = Data((len(freq_axis),2), dtype=data.dtype)
- if params['output block name'] == None:
- params['output block name'] = 'power spectrum of %s %s' % (
- params['output block name'], data.info['name'], params['column'])
- b.info['name'] = params['output block name']
+ b.info['name'] = params['output block']
b.info['columns'] = [
- join_data_label('frequency axis', params['freq units']),
- join_data_label('power density',
- '%s^2/%s' % (data_units, params['freq units'])),
+ params['output freq column'],
+ params['output power column'],
]
- b[:,0] = freq_axis
- b[:,1] = power
- params['curve'].data.append(b)
+ self._curve(hooke, params).data.append(b)
+ self._set_column(hooke, params, block_name='output block',
+ column_name='output freq column',
+ values=freq_axis)
+ self._set_column(hooke, params, block_name='output block',
+ column_name='output power column',
+ values=power)
outqueue.put(b)
+ def __setup_params(self, hooke, params):
+ if params['output block'] in self._block_names(hooke, params):
+ raise Failure('output block %s already exists in %s.'
+ % (params['output block'],
+ self._curve(hooke, params)))
+ data = self._get_column(hooke=hooke, params=params)
+ d_name,d_unit = split_data_label(data.info['name'])
+ if params['output block'] == None:
+ params['output block'] = 'power spectrum of %s %s' % (
+ data.info['name'], params['column'])
+ self.params['output freq column'] = join_data_label(
+ 'frequency axis', params['freq units'])
+ self.params['output power column'] = join_data_label(
+ 'power density', '%s^2/%s' % (data_units, params['freq units']))
+ return params
+
class OldCruft (object):