1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 # W. Trevor King <wking@drexel.edu>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14 # Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke. If not, see
18 # <http://www.gnu.org/licenses/>.
20 """Driver for JPK ForceRobot's velocity clamp data format.
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
35 class JPKDriver (Driver):
36 """Handle JPK ForceRobot's data format.
39 super(JPKDriver, self).__init__(name='jpk')
41 def is_me(self, path):
42 if os.path.isdir(path):
44 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
45 with Closing(zipfile.ZipFile(path, 'r')) as f:
46 if 'header.properties' not in f.namelist():
48 with Closing(f.open('header.properties')) as h:
49 if 'jpk-data-file' in h.read():
52 with Closing(open(path, 'r')) as f:
55 headlines.append(f.readline())
56 if headlines[0].startswith('# xPosition') \
57 and headlines[1].startswith('# yPosition'):
61 def read(self, path, info=None):
64 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
65 return self._read_zip(path, info)
67 return self._read_old(path, info)
69 def _read_zip(self, path, info):
70 with Closing(zipfile.ZipFile(path, 'r')) as f:
72 zip_info = self._zip_info(f)
74 for i in range(len([p for p in f.namelist()
75 if p.endswith('segment-header.properties')])):
76 segments.append(self._zip_segment(f, path, info, zip_info, i))
77 if zip_info['file-format-version'] not in ['0.5']:
78 raise NotImplementedError(
79 'JPK file version %s not supported (yet).'
80 % zip_info['file-format-version'])
81 for name in ['approach', 'retract']:
82 if len([s for s in segments if s.info['name'] == name]) == 0:
84 'No segment for %s in %s, only %s'
85 % (name, path, [s.info['name'] for s in segments]))
87 self._zip_translate_params(zip_info,
88 segments[0].info['raw info']))
90 def _zip_info(self, zipfile):
91 with Closing(zipfile.open('header.properties')) as f:
92 info = self._parse_params(f.readlines())
95 def _zip_segment(self, zipfile, path, info, zip_info, index):
96 prop_file = zipfile.open(os.path.join(
97 'segments', str(index), 'segment-header.properties'))
98 prop = self._parse_params(prop_file.readlines())
100 expected_shape = (int(prop['force-segment-header']['num-points']),)
102 for chan in prop['channels']['list']:
103 chan_info = prop['channel'][chan]
104 channels.append(self._zip_channel(zipfile, index, chan, chan_info))
105 if channels[-1].shape != expected_shape:
106 raise NotImplementedError(
107 'Channel %d:%s in %s has strange shape %s != %s'
108 % (index, chan, zipfile.path,
109 channels[-1].shape, expected_shape))
111 shape=(len(channels[0]), len(channels)),
112 dtype=channels[0].dtype,
113 info=self._zip_translate_segment_params(prop))
114 for i,chan in enumerate(channels):
116 return self._zip_scale_segment(d, path, info)
118 def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
119 f = zipfile.open(os.path.join(
120 'segments', str(segment_index),
121 chan_info['data']['file']['name']), 'r')
122 assert chan_info['data']['file']['format'] == 'raw', \
123 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
124 assert chan_info['data']['type'] == 'float-data', \
125 'Non-float data format:\n%s' % pprint.pformat(chan_info)
126 data = numpy.frombuffer(
128 dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
129 # '>' (big endian) byte order.
130 # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
131 # All forms of raw data are stored in chronological order
132 # (the order in which they were collected), and the
133 # individual values are stored in network byte order
134 # (big-endian). The data type used to store the data is
135 # specified by the "channel.*.data.type" property, and is
136 # either short (2 bytes per value), integer (4 bytes), or
137 # float (4 bytes, IEEE format).
141 def _zip_translate_params(self, params, chan_info):
144 #'time':self._time_from_TODO(raw_info[]),
146 force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
147 assert force_unit == 'N', force_unit
148 force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
149 assert force_base == 'distance', force_base
150 dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
151 assert dist_unit == 'm', dist_unit
153 chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
154 info['spring constant (N/m)'] = force_mult
157 def _zip_translate_segment_params(self, params):
160 'columns':list(params['channels']['list']),
161 'name':params['force-segment-header']['name']['name'],
163 if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
164 info['name'] = info['name'][:-len('-spm')]
165 if info['name'] == 'extend':
166 info['name'] = 'approach'
168 raise NotImplementedError(
169 'Unrecognized segment type %s' % info['name'])
172 def _zip_scale_segment(self, segment, path, info):
178 segment.info['raw data'] = data
181 channels = segment.info['raw info']['channels']['list']
182 z_col = channels.index('height')
183 d_col = channels.index('vDeflection')
185 segment = self._zip_scale_channel(
186 segment, z_col, 'calibrated', path, info)
187 segment = self._zip_scale_channel(
188 segment, d_col, 'distance', path, info)
190 assert segment.info['columns'][z_col] == 'height (m)', \
191 segment.info['columns'][z_col]
192 assert segment.info['columns'][d_col] == 'vDeflection (m)', \
193 segment.info['columns'][d_col]
195 # scaled column indices same as raw column indices,
196 # because columns is a copy of channels.list
197 segment.info['columns'][z_col] = 'z piezo (m)'
198 segment.info['columns'][d_col] = 'deflection (m)'
201 def _zip_scale_channel(self, segment, channel, conversion, path, info):
202 channel_name = segment.info['raw info']['channels']['list'][channel]
203 conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
204 conversion_info = conversion_set['conversion'][conversion]
205 if conversion_info['base-calibration-slot'] \
206 != conversion_set['conversions']['base']:
207 # Our conversion is stacked on a previous conversion. Do
208 # the previous conversion first.
209 segment = self._zip_scale_channel(
210 segment, channel, conversion_info['base-calibration-slot'],
212 if conversion_info['type'] == 'file':
213 key = ('%s_%s_to_%s_calibration_file'
215 conversion_info['base-calibration-slot'],
217 calib_path = conversion_info['file']
219 calib_path = os.path.join(os.path.dirname(path), info[key])
221 'Overriding %s -> %s calibration for %s channel: %s'
222 % (conversion_info['base-calibration-slot'],
223 conversion, channel_name, calib_path))
224 if os.path.exists(calib_path):
225 with file(calib_path, 'r') as f:
226 lines = [x.strip() for x in f.readlines()]
228 calib = { # I've emailed JPK to confirm this file format.
230 'multiplier':float(lines[1]),
231 'offset':float(lines[2]),
233 'note':'\n'.join(lines[4:]),
235 segment[:,channel] = (segment[:,channel] * calib['multiplier']
237 segment.info['columns'][channel] = (
238 '%s (%s)' % (channel_name, calib['unit']))
242 'Skipping %s -> %s calibration for %s channel. Calibration file %s not found'
243 % (conversion_info['base-calibration-slot'],
244 conversion, channel_name, calib_path))
246 assert conversion_info['type'] == 'simple', conversion_info['type']
247 assert conversion_info['scaling']['type'] == 'linear', \
248 conversion_info['scaling']['type']
249 assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
250 conversion_info['scaling']['style']
251 multiplier = float(conversion_info['scaling']['multiplier'])
252 offset = float(conversion_info['scaling']['offset'])
253 unit = conversion_info['scaling']['unit']['unit']
254 segment[:,channel] = segment[:,channel] * multiplier + offset
255 segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
258 def _parse_params(self, lines):
262 if line.startswith('#'):
265 # e.g.: force-segment-header.type=xy-position-segment-header
266 fields = line.split('=', 1)
267 assert len(fields) == 2, line
268 setting = fields[0].split('.')
269 sub_info = info # drill down, e.g. info['force-s..']['type']
270 for s in setting[:-1]:
271 if s not in sub_info:
273 sub_info = sub_info[s]
274 if setting[-1] == 'list': # split a space-delimited list
275 sub_info[setting[-1]] = fields[1].split(' ')
277 sub_info[setting[-1]] = fields[1]
280 def _read_old(self, path, info):
281 raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)