"""Driver for JPK ForceRobot's velocity clamp data format.
"""
-import logging
import os.path
import pprint
import zipfile
return True
return False
- def read(self, path):
+ def read(self, path, info=None):
+ if info == None:
+ info = {}
if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
- return self._read_zip(path)
+ return self._read_zip(path, info)
else:
- return self._read_old(path)
+ return self._read_old(path, info)
- def _read_zip(self, path):
+ def _read_zip(self, path, info):
with Closing(zipfile.ZipFile(path, 'r')) as f:
f.path = path
- info = self._zip_info(f)
- approach = self._zip_segment(f, info, 0)
- retract = self._zip_segment(f, info, 1)
- assert approach.info['name'] == 'approach', approach.info['name']
- assert retract.info['name'] == 'retract', retract.info['name']
- return ([approach, retract],
- self._zip_translate_params(info, retract.info['raw info']))
+ zip_info = self._zip_info(f)
+ segments = []
+ for i in range(len([p for p in f.namelist()
+ if p.endswith('segment-header.properties')])):
+ segments.append(self._zip_segment(f, path, info, zip_info, i))
+ for name in ['approach', 'retract']:
+ if len([s for s in segments if s.info['name'] == name]) == 0:
+ raise ValueError(
+ 'No segment for %s in %s, only %s'
+ % (name, path, [s.info['name'] for s in segments]))
+ return (segments,
+ self._zip_translate_params(zip_info,
+ segments[0].info['raw info']))
def _zip_info(self, zipfile):
with Closing(zipfile.open('header.properties')) as f:
info = self._parse_params(f.readlines())
return info
- def _zip_segment(self, zipfile, info, index):
+ def _zip_segment(self, zipfile, path, info, zip_info, index):
prop_file = zipfile.open(os.path.join(
'segments', str(index), 'segment-header.properties'))
prop = self._parse_params(prop_file.readlines())
info=self._zip_translate_segment_params(prop))
for i,chan in enumerate(channels):
d[:,i] = chan
- return self._zip_scale_segment(d)
+ return self._zip_scale_segment(d, path, info)
def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
f = zipfile.open(os.path.join(
'columns':list(params['channels']['list']),
'name':params['force-segment-header']['name']['name'],
}
- if info['name'] == 'extend-spm':
- info['name'] = 'approach'
- elif info['name'] == 'retract-spm':
- info['name'] = 'retract'
+ if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
+ info['name'] = info['name'][:-len('-spm')]
+ if info['name'] == 'extend':
+ info['name'] = 'approach'
else:
raise NotImplementedError(
'Unrecognized segment type %s' % info['name'])
return info
- def _zip_scale_segment(self, segment):
+ def _zip_scale_segment(self, segment, path, info):
data = curve.Data(
shape=segment.shape,
dtype=segment.dtype,
z_col = channels.index('height')
d_col = channels.index('vDeflection')
- segment = self._zip_scale_channel(segment, z_col, 'calibrated')
- segment = self._zip_scale_channel(segment, d_col, 'distance')
+ segment = self._zip_scale_channel(
+ segment, z_col, 'calibrated', path, info)
+ segment = self._zip_scale_channel(
+ segment, d_col, 'distance', path, info)
assert segment.info['columns'][z_col] == 'height (m)', \
segment.info['columns'][z_col]
segment.info['columns'][d_col] = 'deflection (m)'
return segment
- def _zip_scale_channel(self, segment, channel, conversion):
+ def _zip_scale_channel(self, segment, channel, conversion, path, info):
channel_name = segment.info['raw info']['channels']['list'][channel]
conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
conversion_info = conversion_set['conversion'][conversion]
# Our conversion is stacked on a previous conversion. Do
# the previous conversion first.
segment = self._zip_scale_channel(
- segment, channel, conversion_info['base-calibration-slot'])
+ segment, channel, conversion_info['base-calibration-slot'],
+ info, path)
if conversion_info['type'] == 'file':
- if os.path.exists(conversion_info['file']):
- raise NotImplementedError('No calibration files were available for testing')
+ key = ('%s_%s_to_%s_calibration_file'
+ % (channel_name,
+ conversion_info['base-calibration-slot'],
+ conversion))
+ calib_path = conversion_info['file']
+ if key in info:
+ calib_path = os.path.join(os.path.dirname(path), info[key])
+ self.logger().debug(
+ 'Overriding %s -> %s calibration for %s channel: %s'
+ % (conversion_info['base-calibration-slot'],
+ conversion, channel_name, calib_path))
+ if os.path.exists(calib_path):
+ with file(calib_path, 'r') as f:
+ lines = [x.strip() for x in f.readlines()]
+ f.close()
+ calib = { # I've emailed JPK to confirm this file format.
+ 'title':lines[0],
+ 'multiplier':float(lines[1]),
+ 'offset':float(lines[2]),
+ 'unit':lines[3],
+ 'note':'\n'.join(lines[4:]),
+ }
+ segment[:,channel] = (segment[:,channel] * calib['multiplier']
+ + calib['offset'])
+ segment.info['columns'][channel] = (
+ '%s (%s)' % (channel_name, calib['unit']))
+ return segment
else:
- log = logging.getLogger('hooke')
- log.warn(
+ self.logger().warn(
'Skipping %s -> %s calibration for %s channel. Calibration file %s not found'
% (conversion_info['base-calibration-slot'],
- conversion, channel_name, conversion_info['file']))
+ conversion, channel_name, calib_path))
else:
assert conversion_info['type'] == 'simple', conversion_info['type']
assert conversion_info['scaling']['type'] == 'linear', \
sub_info[setting[-1]] = fields[1]
return info
- def _read_old(self, path):
+ def _read_old(self, path, info):
raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)
+
+# LocalWords: JPK ForceRobot's