1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 # W. Trevor King <wking@drexel.edu>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14 # Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke. If not, see
18 # <http://www.gnu.org/licenses/>.
20 """Driver for JPK ForceRobot's velocity clamp data format.
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
35 class JPKDriver (Driver):
36 """Handle JPK ForceRobot's data format.
39 super(JPKDriver, self).__init__(name='jpk')
41 def is_me(self, path):
42 if os.path.isdir(path):
44 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
45 with Closing(zipfile.ZipFile(path, 'r')) as f:
46 if 'header.properties' not in f.namelist():
48 with Closing(f.open('header.properties')) as h:
49 if 'jpk-data-file' in h.read():
52 with Closing(open(path, 'r')) as f:
55 headlines.append(f.readline())
56 if headlines[0].startswith('# xPosition') \
57 and headlines[1].startswith('# yPosition'):
61 def read(self, path, info=None):
64 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
65 return self._read_zip(path, info)
67 return self._read_old(path, info)
69 def _read_zip(self, path, info):
70 with Closing(zipfile.ZipFile(path, 'r')) as f:
72 zip_info = self._zip_info(f)
74 for i in range(len([p for p in f.namelist()
75 if p.endswith('segment-header.properties')])):
76 segments.append(self._zip_segment(f, path, info, zip_info, i))
77 if zip_info['file-format-version'] not in ['0.5']:
78 raise NotImplementedError(
79 'JPK file version %s not supported (yet).'
80 % zip_info['file-format-version'])
81 for name in ['approach', 'retract']:
82 if len([s for s in segments if s.info['name'] == name]) == 0:
84 'No segment for %s in %s, only %s'
85 % (name, path, [s.info['name'] for s in segments]))
86 curve_info = self._zip_translate_params(zip_info,
87 segments[0].info['raw info'])
88 for segment in segments:
89 segment.info['spring constant (N/m)'] = \
90 curve_info['spring constant (N/m)']
91 return (segments, curve_info)
93 def _zip_info(self, zipfile):
94 with Closing(zipfile.open('header.properties')) as f:
95 info = self._parse_params(f.readlines())
98 def _zip_segment(self, zipfile, path, info, zip_info, index):
99 prop_file = zipfile.open(os.path.join(
100 'segments', str(index), 'segment-header.properties'))
101 prop = self._parse_params(prop_file.readlines())
103 expected_shape = (int(prop['force-segment-header']['num-points']),)
105 for chan in prop['channels']['list']:
106 chan_info = prop['channel'][chan]
107 channels.append(self._zip_channel(zipfile, index, chan, chan_info))
108 if channels[-1].shape != expected_shape:
109 raise NotImplementedError(
110 'Channel %d:%s in %s has strange shape %s != %s'
111 % (index, chan, zipfile.path,
112 channels[-1].shape, expected_shape))
114 shape=(len(channels[0]), len(channels)),
115 dtype=channels[0].dtype,
116 info=self._zip_translate_segment_params(prop))
117 for i,chan in enumerate(channels):
119 return self._zip_scale_segment(d, path, info)
121 def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
122 f = zipfile.open(os.path.join(
123 'segments', str(segment_index),
124 chan_info['data']['file']['name']), 'r')
125 assert chan_info['data']['file']['format'] == 'raw', \
126 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
127 assert chan_info['data']['type'] == 'float-data', \
128 'Non-float data format:\n%s' % pprint.pformat(chan_info)
129 data = numpy.frombuffer(
131 dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
132 # '>' (big endian) byte order.
133 # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
134 # All forms of raw data are stored in chronological order
135 # (the order in which they were collected), and the
136 # individual values are stored in network byte order
137 # (big-endian). The data type used to store the data is
138 # specified by the "channel.*.data.type" property, and is
139 # either short (2 bytes per value), integer (4 bytes), or
140 # float (4 bytes, IEEE format).
144 def _zip_translate_params(self, params, chan_info):
147 'filetype':self.name,
148 #'time':self._time_from_TODO(raw_info[]),
150 # TODO: distinguish between force clamp and velocity clamp
151 # experiments. Note that the JPK file format is flexible
152 # enough to support mixed experiments (i.e. both force clamp
153 # and velocity clamp segments in a single experiment), but I
154 # have no idea what sort of analysis such experiments would
156 info['experiment'] = experiment.VelocityClamp
157 force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
158 assert force_unit == 'N', force_unit
159 force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
160 assert force_base == 'distance', force_base
161 dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
162 assert dist_unit == 'm', dist_unit
164 chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
165 info['spring constant (N/m)'] = force_mult
168 def _zip_translate_segment_params(self, params):
171 'columns':list(params['channels']['list']),
172 'name':params['force-segment-header']['name']['name'],
174 if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
175 info['name'] = info['name'][:-len('-spm')]
176 if info['name'] == 'extend':
177 info['name'] = 'approach'
179 raise NotImplementedError(
180 'Unrecognized segment type %s' % info['name'])
183 def _zip_scale_segment(self, segment, path, info):
189 segment.info['raw data'] = data
192 channels = segment.info['raw info']['channels']['list']
193 z_col = channels.index('height')
194 d_col = channels.index('vDeflection')
196 segment = self._zip_scale_channel(
197 segment, z_col, 'calibrated', path, info)
198 segment = self._zip_scale_channel(
199 segment, d_col, 'distance', path, info)
201 assert segment.info['columns'][z_col] == 'height (m)', \
202 segment.info['columns'][z_col]
203 assert segment.info['columns'][d_col] == 'vDeflection (m)', \
204 segment.info['columns'][d_col]
206 # scaled column indices same as raw column indices,
207 # because columns is a copy of channels.list
208 segment.info['columns'][z_col] = 'z piezo (m)'
209 segment.info['columns'][d_col] = 'deflection (m)'
212 def _zip_scale_channel(self, segment, channel, conversion, path, info):
213 channel_name = segment.info['raw info']['channels']['list'][channel]
214 conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
215 conversion_info = conversion_set['conversion'][conversion]
216 if conversion_info['base-calibration-slot'] \
217 != conversion_set['conversions']['base']:
218 # Our conversion is stacked on a previous conversion. Do
219 # the previous conversion first.
220 segment = self._zip_scale_channel(
221 segment, channel, conversion_info['base-calibration-slot'],
222 path=path, info=info)
223 if conversion_info['type'] == 'file':
224 # Michael Haggerty at JPK points out that the conversion
225 # information stored in the external file is reproduced in
226 # the force curve file. So there is no need to actually
227 # read `conversion_info['file']`. In fact, the data there
228 # may have changed with future calibrations, while the
229 # information stored directly in conversion_info retains
230 # the calibration information as it was when the experiment
232 pass # Fall through to 'simple' conversion processing.
234 assert conversion_info['type'] == 'simple', conversion_info['type']
235 assert conversion_info['scaling']['type'] == 'linear', \
236 conversion_info['scaling']['type']
237 assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
238 conversion_info['scaling']['style']
239 multiplier = float(conversion_info['scaling']['multiplier'])
240 offset = float(conversion_info['scaling']['offset'])
241 unit = conversion_info['scaling']['unit']['unit']
242 segment[:,channel] = segment[:,channel] * multiplier + offset
243 segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
246 def _parse_params(self, lines):
250 if line.startswith('#'):
253 # e.g.: force-segment-header.type=xy-position-segment-header
254 fields = line.split('=', 1)
255 assert len(fields) == 2, line
256 setting = fields[0].split('.')
257 sub_info = info # drill down, e.g. info['force-s..']['type']
258 for s in setting[:-1]:
259 if s not in sub_info:
261 sub_info = sub_info[s]
262 if setting[-1] == 'list': # split a space-delimited list
263 sub_info[setting[-1]] = fields[1].split(' ')
265 sub_info[setting[-1]] = fields[1]
268 def _read_old(self, path, info):
269 raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)