1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 # W. Trevor King <wking@drexel.edu>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14 # Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke. If not, see
18 # <http://www.gnu.org/licenses/>.
20 """Driver for JPK ForceRobot's velocity clamp data format.
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
35 class JPKDriver (Driver):
36 """Handle JPK ForceRobot's data format.
39 super(JPKDriver, self).__init__(name='jpk')
41 def is_me(self, path):
42 if os.path.isdir(path):
44 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
45 with Closing(zipfile.ZipFile(path, 'r')) as f:
46 if 'header.properties' not in f.namelist():
48 with Closing(f.open('header.properties')) as h:
49 if 'jpk-data-file' in h.read():
52 with Closing(open(path, 'r')) as f:
55 headlines.append(f.readline())
56 if headlines[0].startswith('# xPosition') \
57 and headlines[1].startswith('# yPosition'):
61 def read(self, path, info=None):
64 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
65 return self._read_zip(path, info)
67 return self._read_old(path, info)
69 def _read_zip(self, path, info):
70 with Closing(zipfile.ZipFile(path, 'r')) as f:
72 zip_info = self._zip_info(f)
74 for i in range(len([p for p in f.namelist()
75 if p.endswith('segment-header.properties')])):
76 segments.append(self._zip_segment(f, path, info, zip_info, i))
77 if zip_info['file-format-version'] not in ['0.5']:
78 raise NotImplementedError(
79 'JPK file version %s not supported (yet).'
80 % zip_info['file-format-version'])
81 for name in ['approach', 'retract']:
82 if len([s for s in segments if s.info['name'] == name]) == 0:
84 'No segment for %s in %s, only %s'
85 % (name, path, [s.info['name'] for s in segments]))
87 self._zip_translate_params(zip_info,
88 segments[0].info['raw info']))
90 def _zip_info(self, zipfile):
91 with Closing(zipfile.open('header.properties')) as f:
92 info = self._parse_params(f.readlines())
95 def _zip_segment(self, zipfile, path, info, zip_info, index):
96 prop_file = zipfile.open(os.path.join(
97 'segments', str(index), 'segment-header.properties'))
98 prop = self._parse_params(prop_file.readlines())
100 expected_shape = (int(prop['force-segment-header']['num-points']),)
102 for chan in prop['channels']['list']:
103 chan_info = prop['channel'][chan]
104 channels.append(self._zip_channel(zipfile, index, chan, chan_info))
105 if channels[-1].shape != expected_shape:
106 raise NotImplementedError(
107 'Channel %d:%s in %s has strange shape %s != %s'
108 % (index, chan, zipfile.path,
109 channels[-1].shape, expected_shape))
111 shape=(len(channels[0]), len(channels)),
112 dtype=channels[0].dtype,
113 info=self._zip_translate_segment_params(prop))
114 for i,chan in enumerate(channels):
116 return self._zip_scale_segment(d, path, info)
118 def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
119 f = zipfile.open(os.path.join(
120 'segments', str(segment_index),
121 chan_info['data']['file']['name']), 'r')
122 assert chan_info['data']['file']['format'] == 'raw', \
123 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
124 assert chan_info['data']['type'] == 'float-data', \
125 'Non-float data format:\n%s' % pprint.pformat(chan_info)
126 data = numpy.frombuffer(
128 dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
129 # '>' (big endian) byte order.
130 # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
131 # All forms of raw data are stored in chronological order
132 # (the order in which they were collected), and the
133 # individual values are stored in network byte order
134 # (big-endian). The data type used to store the data is
135 # specified by the "channel.*.data.type" property, and is
136 # either short (2 bytes per value), integer (4 bytes), or
137 # float (4 bytes, IEEE format).
141 def _zip_translate_params(self, params, chan_info):
144 #'time':self._time_from_TODO(raw_info[]),
146 force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
147 assert force_unit == 'N', force_unit
148 force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
149 assert force_base == 'distance', force_base
150 dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
151 assert dist_unit == 'm', dist_unit
153 chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
154 info['spring constant (N/m)'] = force_mult
157 def _zip_translate_segment_params(self, params):
160 'columns':list(params['channels']['list']),
161 'name':params['force-segment-header']['name']['name'],
163 if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
164 info['name'] = info['name'][:-len('-spm')]
165 if info['name'] == 'extend':
166 info['name'] = 'approach'
168 raise NotImplementedError(
169 'Unrecognized segment type %s' % info['name'])
172 def _zip_scale_segment(self, segment, path, info):
178 segment.info['raw data'] = data
181 channels = segment.info['raw info']['channels']['list']
182 z_col = channels.index('height')
183 d_col = channels.index('vDeflection')
185 segment = self._zip_scale_channel(
186 segment, z_col, 'calibrated', path, info)
187 segment = self._zip_scale_channel(
188 segment, d_col, 'distance', path, info)
190 assert segment.info['columns'][z_col] == 'height (m)', \
191 segment.info['columns'][z_col]
192 assert segment.info['columns'][d_col] == 'vDeflection (m)', \
193 segment.info['columns'][d_col]
195 # scaled column indices same as raw column indices,
196 # because columns is a copy of channels.list
197 segment.info['columns'][z_col] = 'z piezo (m)'
198 segment.info['columns'][d_col] = 'deflection (m)'
201 def _zip_scale_channel(self, segment, channel, conversion, path, info):
202 channel_name = segment.info['raw info']['channels']['list'][channel]
203 conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
204 conversion_info = conversion_set['conversion'][conversion]
205 if conversion_info['base-calibration-slot'] \
206 != conversion_set['conversions']['base']:
207 # Our conversion is stacked on a previous conversion. Do
208 # the previous conversion first.
209 segment = self._zip_scale_channel(
210 segment, channel, conversion_info['base-calibration-slot'],
211 path=path, info=info)
212 if conversion_info['type'] == 'file':
213 # Michael Haggerty at JPK points out that the conversion
214 # information stored in the external file is reproduced in
215 # the force curve file. So there is no need to actually
216 # read `conversion_info['file']`. In fact, the data there
217 # may have changed with future calibrations, while the
218 # information stored directly in conversion_info retains
219 # the calibration information as it was when the experiment
221 pass # Fall through to 'simple' conversion processing.
223 assert conversion_info['type'] == 'simple', conversion_info['type']
224 assert conversion_info['scaling']['type'] == 'linear', \
225 conversion_info['scaling']['type']
226 assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
227 conversion_info['scaling']['style']
228 multiplier = float(conversion_info['scaling']['multiplier'])
229 offset = float(conversion_info['scaling']['offset'])
230 unit = conversion_info['scaling']['unit']['unit']
231 segment[:,channel] = segment[:,channel] * multiplier + offset
232 segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
235 def _parse_params(self, lines):
239 if line.startswith('#'):
242 # e.g.: force-segment-header.type=xy-position-segment-header
243 fields = line.split('=', 1)
244 assert len(fields) == 2, line
245 setting = fields[0].split('.')
246 sub_info = info # drill down, e.g. info['force-s..']['type']
247 for s in setting[:-1]:
248 if s not in sub_info:
250 sub_info = sub_info[s]
251 if setting[-1] == 'list': # split a space-delimited list
252 sub_info[setting[-1]] = fields[1].split(' ')
254 sub_info[setting[-1]] = fields[1]
257 def _read_old(self, path, info):
258 raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)