-# Copyright (C) 2008-2010 Alberto Gomez-Kasai
-# Fabiano's Benedetti
-# Massimo Sandal <devicerandom@gmail.com>
-# W. Trevor King <wking@drexel.edu>
+# Copyright (C) 2010-2012 W. Trevor King <wking@tremily.us>
#
# This file is part of Hooke.
#
-# Hooke is free software: you can redistribute it and/or modify it
-# under the terms of the GNU Lesser General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
+# Hooke is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
#
-# Hooke is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
-# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
-# Public License for more details.
+# Hooke is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
#
-# You should have received a copy of the GNU Lesser General Public
-# License along with Hooke. If not, see
-# <http://www.gnu.org/licenses/>.
+# You should have received a copy of the GNU Lesser General Public License
+# along with Hooke. If not, see <http://www.gnu.org/licenses/>.
"""The ``curve`` module provides :class:`CurvePlugin` and several
associated :class:`hooke.command.Command`\s for handling
"""
import copy
+import os.path
+import re
+from FFT_tools import unitary_avg_power_spectrum
import numpy
+import yaml
from ..command import Command, Argument, Failure
from ..command_stack import CommandStack
from ..curve import Data
from ..engine import CommandMessage
from ..util.calculus import derivative
-from ..util.fft import unitary_avg_power_spectrum
from ..util.si import ppSI, join_data_label, split_data_label
from . import Builtin
from .playlist import current_playlist_callback
# Define common or complicated arguments
-def current_curve_callback(hooke, command, argument, value):
+def current_curve_callback(hooke, command, argument, value, load=True):
if value != None:
return value
playlist = current_playlist_callback(hooke, command, argument, value)
- curve = playlist.current()
+ curve = playlist.current(load=load)
if curve == None:
raise Failure('No curves in %s' % playlist)
return curve
+def unloaded_current_curve_callback(hooke, command, argument, value):
+ return current_curve_callback(
+ hooke=hooke, command=command, argument=argument, value=value,
+ load=False)
+
CurveArgument = Argument(
name='curve', type='curve', callback=current_curve_callback,
help="""
pass # no need to place duplicate calls on the stack.
else:
curve.command_stack.append(CommandMessage(
- self.name, params))
+ self.name, dict(params)))
class BlockCommand (CurveCommand):
if column_name == None:
column_name = self._column_arguments[0].name
column_name = params[column_name]
+ if column_name is None:
+ return None
block = self._block(hooke, params, block_name)
columns = block.info['columns']
try:
def __init__(self):
super(CurvePlugin, self).__init__(name='curve')
self._commands = [
- GetCommand(self), InfoCommand(self), DeltaCommand(self),
- ExportCommand(self), DifferenceCommand(self),
+ GetCommand(self), InfoCommand(self), BlockInfoCommand(self),
+ DeltaCommand(self), ExportCommand(self), DifferenceCommand(self),
DerivativeCommand(self), PowerSpectrumCommand(self),
- ClearStackCommand(self)]
+ ScaledColumnAdditionCommand(self), ClearStackCommand(self)]
# Define commands
Argument(name='all', type='bool', default=False, count=1,
help='Get all curve information.'),
]
- self.fields = ['name', 'path', 'experiment', 'driver', 'filetype', 'note',
- 'blocks', 'block sizes']
+ self.fields = ['name', 'path', 'driver', 'note', 'command stack',
+ 'blocks', 'block names', 'block sizes']
for field in self.fields:
args.append(Argument(
name=field, type='bool', default=False, count=1,
fields = {}
for key in self.fields:
fields[key] = params[key]
- if reduce(lambda x,y: x and y, fields.values()) == False:
+ if reduce(lambda x,y: x or y, fields.values()) == False:
params['all'] = True # No specific fields set, default to 'all'
if params['all'] == True:
for key in self.fields:
def _get_path(self, curve):
return curve.path
- def _get_experiment(self, curve):
- return curve.info.get('experiment', None)
-
def _get_driver(self, curve):
return curve.driver
- def _get_filetype(self, curve):
- return curve.info.get('filetype', None)
-
def _get_note(self, curve):
return curve.info.get('note', None)
+ def _get_command_stack(self, curve):
+ return curve.command_stack
+
def _get_blocks(self, curve):
return len(curve.data)
+ def _get_block_names(self, curve):
+ return [block.info['name'] for block in curve.data]
+
def _get_block_sizes(self, curve):
return [block.shape for block in curve.data]
+class BlockInfoCommand (BlockCommand):
+ """Get selected information about a :class:`hooke.curve.Curve` data block.
+ """
+ def __init__(self, plugin):
+ super(BlockInfoCommand, self).__init__(
+ name='block info', arguments=[
+ Argument(
+ name='key', count=-1, optional=False,
+ help='Dot-separted (.) key selection regexp.'),
+ Argument(
+ name='output',
+ help="""
+File name for the output (appended).
+""".strip()),
+ ],
+ help=self.__doc__, plugin=plugin)
+
+ def _run(self, hooke, inqueue, outqueue, params):
+ block = self._block(hooke, params)
+ values = {'index': self._block_index(hooke, params)}
+ for key in params['key']:
+ keys = [(0, key.split('.'), block.info)]
+ while len(keys) > 0:
+ index,key_stack,info = keys.pop(0)
+ regexp = re.compile(key_stack[index])
+ matched = False
+ for k,v in info.items():
+ if regexp.match(k):
+ matched = True
+ new_stack = copy.copy(key_stack)
+ new_stack[index] = k
+ if index+1 == len(key_stack):
+ vals = values
+ for k in new_stack[:-1]:
+ if k not in vals:
+ vals[k] = {}
+ vals = vals[k]
+ vals[new_stack[-1]] = v
+ else:
+ keys.append((index+1, new_stack, v))
+ if matched == False:
+ raise ValueError(
+ 'no match found for %s (%s) in %s'
+ % (key_stack[index], key, sorted(info.keys())))
+ if params['output'] != None:
+ curve = self._curve(hooke, params)
+ with open(params['output'], 'a') as f:
+ yaml.dump({curve.name:{
+ 'path': curve.path,
+ block.info['name']: values
+ }}, f)
+ outqueue.put(values)
+
+
class DeltaCommand (BlockCommand):
"""Get distance information between two points.
def _run(self, hooke, inqueue, outqueue, params):
data = self._block(hooke, params)
- with open(params['output'], 'w') as f:
+ with open(os.path.expanduser(params['output']), 'w') as f:
if params['header'] == True:
f.write('# %s \n' % ('\t'.join(data.info['columns'])))
numpy.savetxt(f, data, delimiter='\t')
def _run(self, hooke, inqueue, outqueue, params):
self._add_to_command_stack(params)
- params = self.__setup_params(hooke=hooke, params=params)
+ params = self._setup_params(hooke=hooke, params=params)
data_A = self._get_column(hooke=hooke, params=params,
block_name='block A',
column_name='column A')
column_name='output column',
values=out)
- def __setup_params(self, hooke, params):
+ def _setup_params(self, hooke, params):
curve = self._curve(hooke, params)
if params['block A'] == None:
params['block A'] = curve.data[0].info['name']
def _run(self, hooke, inqueue, outqueue, params):
self._add_to_command_stack(params)
- params = self.__setup_params(hooke=hooke, params=params)
+ params = self._setup_params(hooke=hooke, params=params)
x_data = self._get_column(hooke=hooke, params=params,
column_name='x column')
f_data = self._get_column(hooke=hooke, params=params,
column_name='output column',
values=d)
- def __setup_params(self, hooke, params):
+ def _setup_params(self, hooke, params):
curve = self._curve(hooke, params)
x_name,x_unit = split_data_label(params['x column'])
f_name,f_unit = split_data_label(params['f column'])
def _run(self, hooke, inqueue, outqueue, params):
self._add_to_command_stack(params)
- params = self.__setup_params(hooke=hooke, params=params)
+ params = self._setup_params(hooke=hooke, params=params)
data = self._get_column(hooke=hooke, params=params)
bounds = params['bounds']
if bounds != None:
values=power)
outqueue.put(b)
- def __setup_params(self, hooke, params):
+ def _setup_params(self, hooke, params):
if params['output block'] in self._block_names(hooke, params):
raise Failure('output block %s already exists in %s.'
% (params['output block'],
return params
+class ScaledColumnAdditionCommand (ColumnAddingCommand):
+ """Add one affine transformed column to another: `o=A*i1+B*i2+C`.
+ """
+ def __init__(self, plugin):
+ super(ScaledColumnAdditionCommand, self).__init__(
+ name='scaled column addition',
+ columns=[
+ ('input column 1', 'input column (m)', """
+Name of the first column to use as the transform input.
+""".strip()),
+ ('input column 2', None, """
+Name of the second column to use as the transform input.
+""".strip()),
+ ],
+ new_columns=[
+ ('output column', 'output column (m)', """
+Name of the column to use as the transform output.
+""".strip()),
+ ],
+ arguments=[
+ Argument(name='scale 1', type='float', default=None,
+ help="""
+A float value for the first scale constant.
+""".strip()),
+ Argument(name='scale 1 name', type='string', default=None,
+ help="""
+The name of the first scale constant in the `.info` dictionary.
+""".strip()),
+ Argument(name='scale 2', type='float', default=None,
+ help="""
+A float value for the second scale constant.
+""".strip()),
+ Argument(name='scale 2 name', type='string', default=None,
+ help="""
+The name of the second scale constant in the `.info` dictionary.
+""".strip()),
+ Argument(name='constant', type='float', default=None,
+ help="""
+A float value for the offset constant.
+""".strip()),
+ Argument(name='constant name', type='string', default=None,
+ help="""
+The name of the offset constant in the `.info` dictionary.
+""".strip()),
+ ],
+ help=self.__doc__, plugin=plugin)
+
+ def _run(self, hooke, inqueue, outqueue, params):
+ self._add_to_command_stack(params)
+ i1 = self._get_column(hooke=hooke, params=params,
+ column_name='input column 1')
+ i2 = self._get_column(hooke=hooke, params=params,
+ column_name='input column 2')
+ if i1 is None:
+ i1 = 0
+ if i2 is None:
+ i2 = 0
+ # what if both i1 and i2 are None?
+ a = self._get_constant(params, i1.info, 'scale 1')
+ b = self._get_constant(params, i2.info, 'scale 2')
+ c = self._get_constant(params, i1.info, 'constant')
+ out = a*i1 + b*i2 + c
+ self._set_column(hooke=hooke, params=params,
+ column_name='output column', values=out)
+
+ def _get_constant(self, params, info, name):
+ a = params[name]
+ pname = params[name + ' name']
+ b = None
+ if pname is not None:
+ pname_entries = pname.split('|')
+ b = info
+ for entry in pname_entries:
+ b = b[entry]
+ if a is None and b is None:
+ return 0
+ if a is None:
+ a = 1
+ if b is None:
+ b = 1
+ return a*b
+
+
class ClearStackCommand (CurveCommand):
"""Empty a curve's command stack.
"""
super(ClearStackCommand, self).__init__(
name='clear curve command stack',
help=self.__doc__, plugin=plugin)
+ i,arg = [(i,arg) for i,arg in enumerate(self.arguments)
+ if arg.name == 'curve'][0]
+ arg = copy.copy(arg)
+ arg.callback = unloaded_current_curve_callback
+ self.arguments[i] = arg
def _run(self, hooke, inqueue, outqueue, params):
curve = self._curve(hooke, params)
maxpoint=True
if rebase:
- print 'Select baseline'
+ print('Select baseline')
self.basepoints=self._measure_N_points(N=2, whatset=whatset)
self.basecurrent=self.current.path
if maxpoint:
- print 'Select two points'
+ print('Select two points')
points=self._measure_N_points(N=2, whatset=whatset)
boundpoints=[points[0].index, points[1].index]
boundpoints.sort()
try:
y=min(plot.vectors[whatset][1][boundpoints[0]:boundpoints[1]])
except ValueError:
- print 'Chosen interval not valid. Try picking it again. Did you pick the same point as begin and end of interval?'
+ print('Chosen interval not valid. Try picking it again. Did '
+ 'you pick the same point as begin and end of interval?')
else:
- print 'Select point to measure'
+ print('Select point to measure')
points=self._measure_N_points(N=1, whatset=whatset)
#whatplot=points[0].dest
y=points[0].graph_coords[1]
avg=np.mean(to_average)
forcebase=abs(y-avg)
- print str(forcebase*(10**12))+' pN'
+ print('{} pN'.format(forcebase * 10**12))
to_dump='forcebase '+self.current.path+' '+str(forcebase*(10**12))+' pN'
self.outlet.push(to_dump)
# Decides between the two forms of user input, as per (args)
if fitspan == 0:
# Gets the Xs of two clicked points as indexes on the current curve vector
- print 'Click twice to delimit chunk'
+ print('Click twice to delimit chunk')
points=self._measure_N_points(N=2,whatset=1)
else:
- print 'Click once on the leftmost point of the chunk (i.e.usually the peak)'
+ print('Click once on the leftmost point of the chunk (i.e.usually the peak)')
points=self._measure_N_points(N=1,whatset=1)
slope=self._slope(points,fitspan)
# Outputs the relevant slope parameter
- print 'Slope:'
- print str(slope)
+ print('Slope:')
+ print(str(slope))
to_dump='slope '+self.current.path+' '+str(slope)
self.outlet.push(to_dump)
try:
parameters=self.linefit_between(clickedpoints[0],clickedpoints[1])
except:
- print 'Cannot fit. Did you click twice the same point?'
+ print('Cannot fit. Did you click twice the same point?')
return
# Outputs the relevant slope parameter
- print 'Slope:'
- print str(parameters[0])
+ print('Slope:')
+ print(str(parameters[0]))
to_dump='slope '+self.curve.path+' '+str(parameters[0])
self.outlet.push(to_dump)