1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 # W. Trevor King <wking@drexel.edu>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14 # Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke. If not, see
18 # <http://www.gnu.org/licenses/>.
20 """Driver for JPK ForceRobot's velocity clamp data format.
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
35 class JPKDriver (Driver):
36 """Handle JPK ForceRobot's data format.
39 super(JPKDriver, self).__init__(name='jpk')
41 def is_me(self, path):
42 if os.path.isdir(path):
44 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
45 with Closing(zipfile.ZipFile(path, 'r')) as f:
46 if 'header.properties' not in f.namelist():
48 with Closing(f.open('header.properties')) as h:
49 if 'jpk-data-file' in h.read():
52 with Closing(open(path, 'r')) as f:
55 headlines.append(f.readline())
56 if headlines[0].startswith('# xPosition') \
57 and headlines[1].startswith('# yPosition'):
61 def read(self, path, info=None):
64 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
65 return self._read_zip(path, info)
67 return self._read_old(path, info)
69 def _read_zip(self, path, info):
70 with Closing(zipfile.ZipFile(path, 'r')) as f:
72 zip_info = self._zip_info(f)
74 for i in range(len([p for p in f.namelist()
75 if p.endswith('segment-header.properties')])):
76 segments.append(self._zip_segment(f, path, info, zip_info, i))
77 for name in ['approach', 'retract']:
78 if len([s for s in segments if s.info['name'] == name]) == 0:
80 'No segment for %s in %s, only %s'
81 % (name, path, [s.info['name'] for s in segments]))
83 self._zip_translate_params(zip_info,
84 segments[0].info['raw info']))
86 def _zip_info(self, zipfile):
87 with Closing(zipfile.open('header.properties')) as f:
88 info = self._parse_params(f.readlines())
91 def _zip_segment(self, zipfile, path, info, zip_info, index):
92 prop_file = zipfile.open(os.path.join(
93 'segments', str(index), 'segment-header.properties'))
94 prop = self._parse_params(prop_file.readlines())
96 expected_shape = (int(prop['force-segment-header']['num-points']),)
98 for chan in prop['channels']['list']:
99 chan_info = prop['channel'][chan]
100 channels.append(self._zip_channel(zipfile, index, chan, chan_info))
101 if channels[-1].shape != expected_shape:
102 raise NotImplementedError(
103 'Channel %d:%s in %s has strange shape %s != %s'
104 % (index, chan, zipfile.path,
105 channels[-1].shape, expected_shape))
107 shape=(len(channels[0]), len(channels)),
108 dtype=channels[0].dtype,
109 info=self._zip_translate_segment_params(prop))
110 for i,chan in enumerate(channels):
112 return self._zip_scale_segment(d, path, info)
114 def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
115 f = zipfile.open(os.path.join(
116 'segments', str(segment_index),
117 chan_info['data']['file']['name']), 'r')
118 assert chan_info['data']['file']['format'] == 'raw', \
119 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
120 assert chan_info['data']['type'] == 'float-data', \
121 'Non-float data format:\n%s' % pprint.pformat(chan_info)
122 data = numpy.frombuffer(
124 dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
125 # '>' (big endian) byte order.
126 # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
127 # All forms of raw data are stored in chronological order
128 # (the order in which they were collected), and the
129 # individual values are stored in network byte order
130 # (big-endian). The data type used to store the data is
131 # specified by the "channel.*.data.type" property, and is
132 # either short (2 bytes per value), integer (4 bytes), or
133 # float (4 bytes, IEEE format).
137 def _zip_translate_params(self, params, chan_info):
140 #'time':self._time_from_TODO(raw_info[]),
142 force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
143 assert force_unit == 'N', force_unit
144 force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
145 assert force_base == 'distance', force_base
146 dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
147 assert dist_unit == 'm', dist_unit
149 chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
150 info['spring constant (N/m)'] = force_mult
153 def _zip_translate_segment_params(self, params):
156 'columns':list(params['channels']['list']),
157 'name':params['force-segment-header']['name']['name'],
159 if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
160 info['name'] = info['name'][:-len('-spm')]
161 if info['name'] == 'extend':
162 info['name'] = 'approach'
164 raise NotImplementedError(
165 'Unrecognized segment type %s' % info['name'])
168 def _zip_scale_segment(self, segment, path, info):
174 segment.info['raw data'] = data
177 channels = segment.info['raw info']['channels']['list']
178 z_col = channels.index('height')
179 d_col = channels.index('vDeflection')
181 segment = self._zip_scale_channel(
182 segment, z_col, 'calibrated', path, info)
183 segment = self._zip_scale_channel(
184 segment, d_col, 'distance', path, info)
186 assert segment.info['columns'][z_col] == 'height (m)', \
187 segment.info['columns'][z_col]
188 assert segment.info['columns'][d_col] == 'vDeflection (m)', \
189 segment.info['columns'][d_col]
191 # scaled column indices same as raw column indices,
192 # because columns is a copy of channels.list
193 segment.info['columns'][z_col] = 'z piezo (m)'
194 segment.info['columns'][d_col] = 'deflection (m)'
197 def _zip_scale_channel(self, segment, channel, conversion, path, info):
198 channel_name = segment.info['raw info']['channels']['list'][channel]
199 conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
200 conversion_info = conversion_set['conversion'][conversion]
201 if conversion_info['base-calibration-slot'] \
202 != conversion_set['conversions']['base']:
203 # Our conversion is stacked on a previous conversion. Do
204 # the previous conversion first.
205 segment = self._zip_scale_channel(
206 segment, channel, conversion_info['base-calibration-slot'],
208 if conversion_info['type'] == 'file':
209 key = ('%s_%s_to_%s_calibration_file'
211 conversion_info['base-calibration-slot'],
213 calib_path = conversion_info['file']
215 calib_path = os.path.join(os.path.dirname(path), info[key])
217 'Overriding %s -> %s calibration for %s channel: %s'
218 % (conversion_info['base-calibration-slot'],
219 conversion, channel_name, calib_path))
220 if os.path.exists(calib_path):
221 with file(calib_path, 'r') as f:
222 lines = [x.strip() for x in f.readlines()]
224 calib = { # I've emailed JPK to confirm this file format.
226 'multiplier':float(lines[1]),
227 'offset':float(lines[2]),
229 'note':'\n'.join(lines[4:]),
231 segment[:,channel] = (segment[:,channel] * calib['multiplier']
233 segment.info['columns'][channel] = (
234 '%s (%s)' % (channel_name, calib['unit']))
238 'Skipping %s -> %s calibration for %s channel. Calibration file %s not found'
239 % (conversion_info['base-calibration-slot'],
240 conversion, channel_name, calib_path))
242 assert conversion_info['type'] == 'simple', conversion_info['type']
243 assert conversion_info['scaling']['type'] == 'linear', \
244 conversion_info['scaling']['type']
245 assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
246 conversion_info['scaling']['style']
247 multiplier = float(conversion_info['scaling']['multiplier'])
248 offset = float(conversion_info['scaling']['offset'])
249 unit = conversion_info['scaling']['unit']['unit']
250 segment[:,channel] = segment[:,channel] * multiplier + offset
251 segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
254 def _parse_params(self, lines):
258 if line.startswith('#'):
261 # e.g.: force-segment-header.type=xy-position-segment-header
262 fields = line.split('=', 1)
263 assert len(fields) == 2, line
264 setting = fields[0].split('.')
265 sub_info = info # drill down, e.g. info['force-s..']['type']
266 for s in setting[:-1]:
267 if s not in sub_info:
269 sub_info = sub_info[s]
270 if setting[-1] == 'list': # split a space-delimited list
271 sub_info[setting[-1]] = fields[1].split(' ')
273 sub_info[setting[-1]] = fields[1]
276 def _read_old(self, path, info):
277 raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)