1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 # W. Trevor King <wking@drexel.edu>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14 # Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke. If not, see
18 # <http://www.gnu.org/licenses/>.
20 """Driver for JPK ForceRobot's velocity clamp data format.
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
35 class JPKDriver (Driver):
36 """Handle JPK ForceRobot's data format.
39 super(JPKDriver, self).__init__(name='jpk')
41 def is_me(self, path):
42 if os.path.isdir(path):
44 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
45 with Closing(zipfile.ZipFile(path, 'r')) as f:
46 if 'header.properties' not in f.namelist():
48 with Closing(f.open('header.properties')) as h:
49 if 'jpk-data-file' in h.read():
52 with Closing(open(path, 'r')) as f:
55 headlines.append(f.readline())
56 if headlines[0].startswith('# xPosition') \
57 and headlines[1].startswith('# yPosition'):
61 def read(self, path, info=None):
64 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
65 return self._read_zip(path, info)
67 return self._read_old(path, info)
69 def _read_zip(self, path, info):
70 with Closing(zipfile.ZipFile(path, 'r')) as f:
72 zip_info = self._zip_info(f)
74 for i in range(len([p for p in f.namelist()
75 if p.endswith('segment-header.properties')])):
76 segments.append(self._zip_segment(f, path, info, zip_info, i))
77 for name in ['approach', 'retract']:
78 if len([s for s in segments if s.info['name'] == name]) == 0:
80 'No segment for %s in %s, only %s'
81 % (name, path, [s.info['name'] for s in segments]))
83 self._zip_translate_params(zip_info,
84 segments[0].info['raw info']))
86 def _zip_info(self, zipfile):
87 with Closing(zipfile.open('header.properties')) as f:
88 info = self._parse_params(f.readlines())
91 def _zip_segment(self, zipfile, path, info, zip_info, index):
92 prop_file = zipfile.open(os.path.join(
93 'segments', str(index), 'segment-header.properties'))
94 prop = self._parse_params(prop_file.readlines())
96 expected_shape = (int(prop['force-segment-header']['num-points']),)
98 for chan in prop['channels']['list']:
99 chan_info = prop['channel'][chan]
100 channels.append(self._zip_channel(zipfile, index, chan, chan_info))
101 if channels[-1].shape != expected_shape:
102 raise NotImplementedError(
103 'Channel %d:%s in %s has strange shape %s != %s'
104 % (index, chan, zipfile.path,
105 channels[-1].shape, expected_shape))
107 shape=(len(channels[0]), len(channels)),
108 dtype=channels[0].dtype,
109 info=self._zip_translate_segment_params(prop))
110 for i,chan in enumerate(channels):
112 return self._zip_scale_segment(d, path, info)
114 def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
115 f = zipfile.open(os.path.join(
116 'segments', str(segment_index),
117 chan_info['data']['file']['name']), 'r')
118 assert chan_info['data']['file']['format'] == 'raw', \
119 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
120 assert chan_info['data']['type'] == 'float-data', \
121 'Non-float data format:\n%s' % pprint.pformat(chan_info)
122 data = numpy.frombuffer(
124 dtype=numpy.dtype(numpy.float32).newbyteorder('>'),
125 # Is JPK data always big endian? I can't find a config
126 # setting. The ForceRobot brochure
127 # http://www.jpk.com/forcerobot300-1.download.6d694150f14773dc76bc0c3a8a6dd0e8.pdf
128 # lists a PowerPC chip on page 4, under Control
129 # electronics, and PPCs are usually big endian.
130 # http://en.wikipedia.org/wiki/PowerPC#Endian_modes
135 def _zip_translate_params(self, params, chan_info):
138 #'time':self._time_from_TODO(raw_info[]),
140 force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
141 assert force_unit == 'N', force_unit
142 force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
143 assert force_base == 'distance', force_base
144 dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
145 assert dist_unit == 'm', dist_unit
147 chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
148 info['spring constant (N/m)'] = force_mult
151 def _zip_translate_segment_params(self, params):
154 'columns':list(params['channels']['list']),
155 'name':params['force-segment-header']['name']['name'],
157 if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
158 info['name'] = info['name'][:-len('-spm')]
159 if info['name'] == 'extend':
160 info['name'] = 'approach'
162 raise NotImplementedError(
163 'Unrecognized segment type %s' % info['name'])
166 def _zip_scale_segment(self, segment, path, info):
172 segment.info['raw data'] = data
175 channels = segment.info['raw info']['channels']['list']
176 z_col = channels.index('height')
177 d_col = channels.index('vDeflection')
179 segment = self._zip_scale_channel(
180 segment, z_col, 'calibrated', path, info)
181 segment = self._zip_scale_channel(
182 segment, d_col, 'distance', path, info)
184 assert segment.info['columns'][z_col] == 'height (m)', \
185 segment.info['columns'][z_col]
186 assert segment.info['columns'][d_col] == 'vDeflection (m)', \
187 segment.info['columns'][d_col]
189 # scaled column indices same as raw column indices,
190 # because columns is a copy of channels.list
191 segment.info['columns'][z_col] = 'z piezo (m)'
192 segment.info['columns'][d_col] = 'deflection (m)'
195 def _zip_scale_channel(self, segment, channel, conversion, path, info):
196 channel_name = segment.info['raw info']['channels']['list'][channel]
197 conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
198 conversion_info = conversion_set['conversion'][conversion]
199 if conversion_info['base-calibration-slot'] \
200 != conversion_set['conversions']['base']:
201 # Our conversion is stacked on a previous conversion. Do
202 # the previous conversion first.
203 segment = self._zip_scale_channel(
204 segment, channel, conversion_info['base-calibration-slot'],
206 if conversion_info['type'] == 'file':
207 key = ('%s_%s_to_%s_calibration_file'
209 conversion_info['base-calibration-slot'],
211 calib_path = conversion_info['file']
213 calib_path = os.path.join(os.path.dirname(path), info[key])
215 'Overriding %s -> %s calibration for %s channel: %s'
216 % (conversion_info['base-calibration-slot'],
217 conversion, channel_name, calib_path))
218 if os.path.exists(calib_path):
219 with file(calib_path, 'r') as f:
220 lines = [x.strip() for x in f.readlines()]
222 calib = { # I've emailed JPK to confirm this file format.
224 'multiplier':float(lines[1]),
225 'offset':float(lines[2]),
227 'note':'\n'.join(lines[4:]),
229 segment[:,channel] = (segment[:,channel] * calib['multiplier']
231 segment.info['columns'][channel] = (
232 '%s (%s)' % (channel_name, calib['unit']))
236 'Skipping %s -> %s calibration for %s channel. Calibration file %s not found'
237 % (conversion_info['base-calibration-slot'],
238 conversion, channel_name, calib_path))
240 assert conversion_info['type'] == 'simple', conversion_info['type']
241 assert conversion_info['scaling']['type'] == 'linear', \
242 conversion_info['scaling']['type']
243 assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
244 conversion_info['scaling']['style']
245 multiplier = float(conversion_info['scaling']['multiplier'])
246 offset = float(conversion_info['scaling']['offset'])
247 unit = conversion_info['scaling']['unit']['unit']
248 segment[:,channel] = segment[:,channel] * multiplier + offset
249 segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
252 def _parse_params(self, lines):
256 if line.startswith('#'):
259 # e.g.: force-segment-header.type=xy-position-segment-header
260 fields = line.split('=', 1)
261 assert len(fields) == 2, line
262 setting = fields[0].split('.')
263 sub_info = info # drill down, e.g. info['force-s..']['type']
264 for s in setting[:-1]:
265 if s not in sub_info:
267 sub_info = sub_info[s]
268 if setting[-1] == 'list': # split a space-delimited list
269 sub_info[setting[-1]] = fields[1].split(' ')
271 sub_info[setting[-1]] = fields[1]
274 def _read_old(self, path, info):
275 raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)