-# Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
+# Copyright (C) 2008-2012 Massimo Sandal <devicerandom@gmail.com>
# W. Trevor King <wking@drexel.edu>
#
# This file is part of Hooke.
#
-# Hooke is free software: you can redistribute it and/or modify it
-# under the terms of the GNU Lesser General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
+# Hooke is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
#
-# Hooke is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
-# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
-# Public License for more details.
+# Hooke is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
#
-# You should have received a copy of the GNU Lesser General Public
-# License along with Hooke. If not, see
-# <http://www.gnu.org/licenses/>.
+# You should have received a copy of the GNU Lesser General Public License
+# along with Hooke. If not, see <http://www.gnu.org/licenses/>.
"""Driver for JPK ForceRobot's velocity clamp data format.
from . import Driver as Driver
+def slash_join(*args):
+ r"""Join path components with forward slashes regardless of OS.
+
+ Notes
+ -----
+ From the `PKZIP Application Note`_, section J (Explanation of fields):
+
+ file name: (Variable)
+
+ ... All slashes should be forward slashes ``/`` as opposed to
+ backwards slashes ``\`` for compatibility with Amiga and UNIX
+ file systems etc. ...
+
+ .. _PKZIP Application Note:
+ http://www.pkware.com/documents/casestudies/APPNOTE.TXT
+
+ Examples
+ --------
+
+ >>> sep = os.path.sep
+ >>> os.path.sep = '/'
+ >>> print slash_join('abc', 'def/ghi', 'jkl\\mno')
+ abc/def/ghi/jkl\mno
+ >>> os.path.sep = '\\'
+ >>> print slash_join('abc', 'def/ghi', 'jkl\\mno')
+ abc/def/ghi/jkl\mno
+ >>> os.path.sep = sep
+
+ Note that when :const:`os.path.sep` is ``/`` (e.g. UNIX),
+ ``def/ghi`` is a compound segment, but when :const:`os.path.sep`
+ is ``\`` (e.g. Windows), ``def/ghi`` is a single segment.
+ """
+ sep = os.path.sep
+ try:
+ os.path.sep = '/'
+ return os.path.join(*args)
+ finally:
+ os.path.sep = sep
+
+
class JPKDriver (Driver):
"""Handle JPK ForceRobot's data format.
"""
f.path = path
zip_info = self._zip_info(f)
version = zip_info['file-format-version']
+ if zip_info['jpk-data-file'] == 'jpk-data1D-file':
+ return self._zip_read_1d(
+ f, path, info, zip_info, version)
+ elif zip_info['jpk-data-file'] != 'spm-forcefile':
+ raise ValueError('unrecognized JPK file type "%s"'
+ % zip_info['jpk-data-file'])
segments = []
for i in range(len([p for p in f.namelist()
if p.endswith('segment-header.properties')])):
curve_info = self._zip_translate_params(
zip_info, segments[0].info['raw info'], version)
for segment in segments: # HACK, should use curve-level spring constant
- segment.info['spring constant (N/m)'] = \
- curve_info['spring constant (N/m)']
+ for key in ['spring constant (N/m)',
+ 'z piezo sensitivity (m/V)']:
+ if key in curve_info:
+ segment.info['spring constant (N/m)'] = \
+ curve_info['spring constant (N/m)']
+ names = [segment.info['name'] for segment in segments]
+ for name in set(names): # ensure unique names
+ count = names.count(name)
+ if count > 1:
+ i = 0
+ for j,n in enumerate(names):
+ if n == name:
+ segments[j].info['name'] += '-%d' % i
+ i += 1
return (segments, curve_info)
def _zip_info(self, zipfile):
return info
def _zip_segment(self, zipfile, path, info, zip_info, index, version):
- prop_file = zipfile.open(os.path.join(
- 'segments', str(index), 'segment-header.properties'))
- prop = self._parse_params(prop_file.readlines())
- prop_file.close()
+ with Closing(zipfile.open(slash_join(
+ 'segments', str(index), 'segment-header.properties'))
+ ) as f:
+ prop = self._parse_params(f.readlines())
expected_shape = (int(prop['force-segment-header']['num-points']),)
channels = []
if 'list' not in prop['channels']:
zipfile, index, chan, chan_info))
if channels[-1].shape != expected_shape:
raise NotImplementedError(
- 'Channel %d:%s in %s has strange shape %s != %s'
+ 'channel %d:%s in %s has strange shape %s != %s'
% (index, chan, zipfile.path,
channels[-1].shape, expected_shape))
if len(channels) > 0:
def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
if chan_info['data']['type'] in ['constant-data', 'raster-data']:
return self._zip_calculate_channel(chan_info)
- with Closing(zipfile.open(os.path.join(
+ with Closing(zipfile.open(slash_join(
'segments', str(segment_index),
chan_info['data']['file']['name']), 'r')) as f:
assert chan_info['data']['file']['format'] == 'raw', \
- 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
+ 'non-raw data format:\n%s' % pprint.pformat(chan_info)
dtype = self._zip_channel_dtype(chan_info)
data = numpy.frombuffer(
buffer(f.read()),
dtype=dtype,)
+ if dtype.kind in ['i', 'u']:
+ data = self._zip_channel_decode(data, chan_info)
return data
def _zip_calculate_channel(self, chan_info):
step=step,
dtype=numpy.float32)
else:
- raise ValueError('Unrecognized data format "%s"' % type_)
+ raise ValueError('unrecognized data format "%s"' % type_)
def _zip_channel_dtype(self, chan_info):
type_ = chan_info['data']['type']
elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
dtype = numpy.dtype(numpy.uint32)
else:
- raise ValueError('Unrecognized encoder type "%s" for "%s" data'
+ raise ValueError('unrecognized encoder type "%s" for "%s" data'
% (encoder, type_))
elif type_ in ['short-data', 'short', 'memory-short-data']:
encoder = chan_info['data']['encoder']['type']
elif encoder in ['unsignedshort', 'unsignedshort-limited']:
dtype = numpy.dtype(numpy.uint16)
else:
- raise ValueError('Unrecognized encoder type "%s" for "%s" data'
+ raise ValueError('unrecognized encoder type "%s" for "%s" data'
% (encoder, type_))
else:
- raise ValueError('Unrecognized data format "%s"' % type_)
+ raise ValueError('unrecognized data format "%s"' % type_)
byte_order = '>'
# '>' (big endian) byte order.
# From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
# float (4 bytes, IEEE format).
return dtype.newbyteorder(byte_order)
+ def _zip_channel_decode(self, data, encoder_info):
+ return self._zip_apply_channel_scaling(
+ data, encoder_info['data']['encoder'])
+
def _zip_translate_params(self, params, chan_info, version):
info = {
'raw info':params,
assert distance_base == 'volts', distance_base
base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base']
assert base_conversion == distance_base, base_conversion
- distance_base_unit = self._zip_unit(
- chan_info['channel']['vDeflection']['data'],
- version)
+ if 'encoder' in chan_info['channel']['vDeflection']['data']:
+ distance_base_unit = self._zip_unit(
+ chan_info['channel']['vDeflection']['data']['encoder']['scaling'],
+ version)
+ else:
+ distance_base_unit = self._zip_unit(
+ chan_info['channel']['vDeflection']['data'],
+ version)
assert distance_base_unit == 'V', distance_base_unit
force_mult = float(
chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
pass # Fall through to 'simple' conversion processing.
else:
assert conversion_info['type'] == 'simple', conversion_info['type']
+ segment[:,channel] = self._zip_apply_channel_scaling(
+ segment[:,channel], conversion_info)
+ unit = self._zip_unit(conversion_info['scaling'], version)
+ segment.info['columns'][channel] = join_data_label(channel_name, unit)
+ return segment
+
+ def _zip_apply_channel_scaling(self, channel_data, conversion_info):
assert conversion_info['scaling']['type'] == 'linear', \
conversion_info['scaling']['type']
assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
conversion_info['scaling']['style']
multiplier = float(conversion_info['scaling']['multiplier'])
offset = float(conversion_info['scaling']['offset'])
- unit = self._zip_unit(conversion_info['scaling'], version)
- segment[:,channel] = segment[:,channel] * multiplier + offset
- segment.info['columns'][channel] = join_data_label(channel_name, unit)
- return segment
+ return channel_data * multiplier + offset
def _zip_unit(self, conversion_info, version):
if version in ['0.%d' % i for i in range(3)]:
sub_info[s] = {}
sub_info = sub_info[s]
if setting[-1] == 'list': # split a space-delimited list
- sub_info[setting[-1]] = fields[1].split(' ')
+ if fields[1]:
+ sub_info[setting[-1]] = fields[1].split(' ')
+ else:
+ sub_info[setting[-1]] = []
else:
sub_info[setting[-1]] = fields[1]
return info
+ def _zip_read_1d(self, zipfile, path, info, zip_info, version):
+ expected_shape = (int(zip_info['data']['num-points']),)
+ if zip_info['data']['type'] in ['constant-data', 'raster-data']:
+ return self._zip_calculate_channel(zip_info)
+ with Closing(zipfile.open(
+ zip_info['data']['file']['name'], 'r')) as f:
+ assert zip_info['data']['file']['format'] == 'raw', \
+ 'non-raw data format:\n%s' % pprint.pformat(chan_info)
+ dtype = self._zip_channel_dtype(zip_info)
+ data = numpy.frombuffer(
+ buffer(f.read()),
+ dtype=dtype,)
+ if dtype.kind in ['i', 'u']:
+ data = self._zip_channel_decode(data, zip_info)
+ if data.shape != expected_shape:
+ raise NotImplementedError(
+ 'channel %s has strange shape %s != %s'
+ % (path, data.shape, expected_shape))
+ d = curve.Data(
+ shape=data.shape,
+ dtype=data.dtype,
+ info=zip_info)
+ d[:] = data
+ return d
+
def _read_old(self, path, info):
raise NotImplementedError(
"Early JPK files (pre-zip) are not supported by Hooke. Please "
"use JPK's `out2jpk-force` script to convert your old files "
- "to a more recent format before loading them with Hooke.")
+ "(%s) to a more recent format before loading them with Hooke."
+ % path)