1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 # W. Trevor King <wking@drexel.edu>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14 # Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke. If not, see
18 # <http://www.gnu.org/licenses/>.
20 """Driver for JPK ForceRobot's velocity clamp data format.
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
35 class JPKDriver (Driver):
36 """Handle JPK ForceRobot's data format.
39 super(JPKDriver, self).__init__(name='jpk')
41 def is_me(self, path):
42 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
43 with Closing(zipfile.ZipFile(path, 'r')) as f:
44 if 'header.properties' not in f.namelist():
46 with Closing(f.open('header.properties')) as h:
47 if 'jpk-data-file' in h.read():
50 with Closing(open(path, 'r')) as f:
53 headlines.append(f.readline())
54 if headlines[0].startswith('# xPosition') \
55 and headlines[1].startswith('# yPosition'):
59 def read(self, path, info=None):
62 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
63 return self._read_zip(path, info)
65 return self._read_old(path, info)
67 def _read_zip(self, path, info):
68 with Closing(zipfile.ZipFile(path, 'r')) as f:
70 zip_info = self._zip_info(f)
72 for i in range(len([p for p in f.namelist()
73 if p.endswith('segment-header.properties')])):
74 segments.append(self._zip_segment(f, path, info, zip_info, i))
75 for name in ['approach', 'retract']:
76 if len([s for s in segments if s.info['name'] == name]) == 0:
78 'No segment for %s in %s, only %s'
79 % (name, path, [s.info['name'] for s in segments]))
81 self._zip_translate_params(zip_info,
82 segments[0].info['raw info']))
84 def _zip_info(self, zipfile):
85 with Closing(zipfile.open('header.properties')) as f:
86 info = self._parse_params(f.readlines())
89 def _zip_segment(self, zipfile, path, info, zip_info, index):
90 prop_file = zipfile.open(os.path.join(
91 'segments', str(index), 'segment-header.properties'))
92 prop = self._parse_params(prop_file.readlines())
94 expected_shape = (int(prop['force-segment-header']['num-points']),)
96 for chan in prop['channels']['list']:
97 chan_info = prop['channel'][chan]
98 channels.append(self._zip_channel(zipfile, index, chan, chan_info))
99 if channels[-1].shape != expected_shape:
100 raise NotImplementedError(
101 'Channel %d:%s in %s has strange shape %s != %s'
102 % (index, chan, zipfile.path,
103 channels[-1].shape, expected_shape))
105 shape=(len(channels[0]), len(channels)),
106 dtype=channels[0].dtype,
107 info=self._zip_translate_segment_params(prop))
108 for i,chan in enumerate(channels):
110 return self._zip_scale_segment(d, path, info)
112 def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
113 f = zipfile.open(os.path.join(
114 'segments', str(segment_index),
115 chan_info['data']['file']['name']), 'r')
116 assert chan_info['data']['file']['format'] == 'raw', \
117 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
118 assert chan_info['data']['type'] == 'float-data', \
119 'Non-float data format:\n%s' % pprint.pformat(chan_info)
120 data = numpy.frombuffer(
122 dtype=numpy.dtype(numpy.float32).newbyteorder('>'),
123 # Is JPK data always big endian? I can't find a config
124 # setting. The ForceRobot brochure
125 # http://www.jpk.com/forcerobot300-1.download.6d694150f14773dc76bc0c3a8a6dd0e8.pdf
126 # lists a PowerPC chip on page 4, under Control
127 # electronics, and PPCs are usually big endian.
128 # http://en.wikipedia.org/wiki/PowerPC#Endian_modes
133 def _zip_translate_params(self, params, chan_info):
136 #'time':self._time_from_TODO(raw_info[]),
138 force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
139 assert force_unit == 'N', force_unit
140 force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
141 assert force_base == 'distance', force_base
142 dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
143 assert dist_unit == 'm', dist_unit
145 chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
146 info['spring constant (N/m)'] = force_mult
149 def _zip_translate_segment_params(self, params):
152 'columns':list(params['channels']['list']),
153 'name':params['force-segment-header']['name']['name'],
155 if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
156 info['name'] = info['name'][:-len('-spm')]
157 if info['name'] == 'extend':
158 info['name'] = 'approach'
160 raise NotImplementedError(
161 'Unrecognized segment type %s' % info['name'])
164 def _zip_scale_segment(self, segment, path, info):
170 segment.info['raw data'] = data
173 channels = segment.info['raw info']['channels']['list']
174 z_col = channels.index('height')
175 d_col = channels.index('vDeflection')
177 segment = self._zip_scale_channel(
178 segment, z_col, 'calibrated', path, info)
179 segment = self._zip_scale_channel(
180 segment, d_col, 'distance', path, info)
182 assert segment.info['columns'][z_col] == 'height (m)', \
183 segment.info['columns'][z_col]
184 assert segment.info['columns'][d_col] == 'vDeflection (m)', \
185 segment.info['columns'][d_col]
187 # scaled column indices same as raw column indices,
188 # because columns is a copy of channels.list
189 segment.info['columns'][z_col] = 'z piezo (m)'
190 segment.info['columns'][d_col] = 'deflection (m)'
193 def _zip_scale_channel(self, segment, channel, conversion, path, info):
194 channel_name = segment.info['raw info']['channels']['list'][channel]
195 conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
196 conversion_info = conversion_set['conversion'][conversion]
197 if conversion_info['base-calibration-slot'] \
198 != conversion_set['conversions']['base']:
199 # Our conversion is stacked on a previous conversion. Do
200 # the previous conversion first.
201 segment = self._zip_scale_channel(
202 segment, channel, conversion_info['base-calibration-slot'],
204 if conversion_info['type'] == 'file':
205 key = ('%s_%s_to_%s_calibration_file'
207 conversion_info['base-calibration-slot'],
209 calib_path = conversion_info['file']
211 calib_path = os.path.join(os.path.dirname(path), info[key])
213 'Overriding %s -> %s calibration for %s channel: %s'
214 % (conversion_info['base-calibration-slot'],
215 conversion, channel_name, calib_path))
216 if os.path.exists(calib_path):
217 with file(calib_path, 'r') as f:
218 lines = [x.strip() for x in f.readlines()]
220 calib = { # I've emailed JPK to confirm this file format.
222 'multiplier':float(lines[1]),
223 'offset':float(lines[2]),
225 'note':'\n'.join(lines[4:]),
227 segment[:,channel] = (segment[:,channel] * calib['multiplier']
229 segment.info['columns'][channel] = (
230 '%s (%s)' % (channel_name, calib['unit']))
234 'Skipping %s -> %s calibration for %s channel. Calibration file %s not found'
235 % (conversion_info['base-calibration-slot'],
236 conversion, channel_name, calib_path))
238 assert conversion_info['type'] == 'simple', conversion_info['type']
239 assert conversion_info['scaling']['type'] == 'linear', \
240 conversion_info['scaling']['type']
241 assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
242 conversion_info['scaling']['style']
243 multiplier = float(conversion_info['scaling']['multiplier'])
244 offset = float(conversion_info['scaling']['offset'])
245 unit = conversion_info['scaling']['unit']['unit']
246 segment[:,channel] = segment[:,channel] * multiplier + offset
247 segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
250 def _parse_params(self, lines):
254 if line.startswith('#'):
257 # e.g.: force-segment-header.type=xy-position-segment-header
258 fields = line.split('=', 1)
259 assert len(fields) == 2, line
260 setting = fields[0].split('.')
261 sub_info = info # drill down, e.g. info['force-s..']['type']
262 for s in setting[:-1]:
263 if s not in sub_info:
265 sub_info = sub_info[s]
266 if setting[-1] == 'list': # split a space-delimited list
267 sub_info[setting[-1]] = fields[1].split(' ')
269 sub_info[setting[-1]] = fields[1]
272 def _read_old(self, path, info):
273 raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)