1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 # W. Trevor King <wking@drexel.edu>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14 # Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke. If not, see
18 # <http://www.gnu.org/licenses/>.
20 """Driver for JPK ForceRobot's velocity clamp data format.
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from ..util.si import join_data_label, split_data_label
33 from . import Driver as Driver
36 class JPKDriver (Driver):
37 """Handle JPK ForceRobot's data format.
40 super(JPKDriver, self).__init__(name='jpk')
42 def is_me(self, path):
43 if os.path.isdir(path):
45 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
46 with Closing(zipfile.ZipFile(path, 'r')) as f:
47 if 'header.properties' not in f.namelist():
49 with Closing(f.open('header.properties')) as h:
50 if 'jpk-data-file' in h.read():
53 with Closing(open(path, 'r')) as f:
56 headlines.append(f.readline())
57 if headlines[0].startswith('# xPosition') \
58 and headlines[1].startswith('# yPosition'):
62 def read(self, path, info=None):
65 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
66 return self._read_zip(path, info)
68 return self._read_old(path, info)
70 def _read_zip(self, path, info):
71 with Closing(zipfile.ZipFile(path, 'r')) as f:
73 zip_info = self._zip_info(f)
75 for i in range(len([p for p in f.namelist()
76 if p.endswith('segment-header.properties')])):
77 segments.append(self._zip_segment(f, path, info, zip_info, i))
78 if zip_info['file-format-version'] not in ['0.5']:
79 raise NotImplementedError(
80 'JPK file version %s not supported (yet).'
81 % zip_info['file-format-version'])
82 for name in ['approach', 'retract']:
83 if len([s for s in segments if s.info['name'] == name]) == 0:
85 'No segment for %s in %s, only %s'
86 % (name, path, [s.info['name'] for s in segments]))
87 curve_info = self._zip_translate_params(zip_info,
88 segments[0].info['raw info'])
89 for segment in segments:
90 segment.info['spring constant (N/m)'] = \
91 curve_info['spring constant (N/m)']
92 return (segments, curve_info)
94 def _zip_info(self, zipfile):
95 with Closing(zipfile.open('header.properties')) as f:
96 info = self._parse_params(f.readlines())
99 def _zip_segment(self, zipfile, path, info, zip_info, index):
100 prop_file = zipfile.open(os.path.join(
101 'segments', str(index), 'segment-header.properties'))
102 prop = self._parse_params(prop_file.readlines())
104 expected_shape = (int(prop['force-segment-header']['num-points']),)
106 for chan in prop['channels']['list']:
107 chan_info = prop['channel'][chan]
108 channels.append(self._zip_channel(zipfile, index, chan, chan_info))
109 if channels[-1].shape != expected_shape:
110 raise NotImplementedError(
111 'Channel %d:%s in %s has strange shape %s != %s'
112 % (index, chan, zipfile.path,
113 channels[-1].shape, expected_shape))
115 shape=(len(channels[0]), len(channels)),
116 dtype=channels[0].dtype,
117 info=self._zip_translate_segment_params(prop))
118 for i,chan in enumerate(channels):
120 return self._zip_scale_segment(d, path, info)
122 def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
123 f = zipfile.open(os.path.join(
124 'segments', str(segment_index),
125 chan_info['data']['file']['name']), 'r')
126 assert chan_info['data']['file']['format'] == 'raw', \
127 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
128 assert chan_info['data']['type'] == 'float-data', \
129 'Non-float data format:\n%s' % pprint.pformat(chan_info)
130 data = numpy.frombuffer(
132 dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
133 # '>' (big endian) byte order.
134 # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
135 # All forms of raw data are stored in chronological order
136 # (the order in which they were collected), and the
137 # individual values are stored in network byte order
138 # (big-endian). The data type used to store the data is
139 # specified by the "channel.*.data.type" property, and is
140 # either short (2 bytes per value), integer (4 bytes), or
141 # float (4 bytes, IEEE format).
145 def _zip_translate_params(self, params, chan_info):
148 'filetype':self.name,
149 #'time':self._time_from_TODO(raw_info[]),
151 # TODO: distinguish between force clamp and velocity clamp
152 # experiments. Note that the JPK file format is flexible
153 # enough to support mixed experiments (i.e. both force clamp
154 # and velocity clamp segments in a single experiment), but I
155 # have no idea what sort of analysis such experiments would
157 info['experiment'] = experiment.VelocityClamp
158 force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
159 assert force_unit == 'N', force_unit
160 force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
161 assert force_base == 'distance', force_base
162 dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
163 assert dist_unit == 'm', dist_unit
165 chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
166 info['spring constant (N/m)'] = force_mult
169 def _zip_translate_segment_params(self, params):
172 'columns':list(params['channels']['list']),
173 'name':params['force-segment-header']['name']['name'],
175 if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
176 info['name'] = info['name'][:-len('-spm')]
177 if info['name'] == 'extend':
178 info['name'] = 'approach'
180 raise NotImplementedError(
181 'Unrecognized segment type %s' % info['name'])
184 def _zip_scale_segment(self, segment, path, info):
190 segment.info['raw data'] = data
193 channels = segment.info['raw info']['channels']['list']
194 for i,channel in enumerate(channels):
196 if channel == 'vDeflection':
197 conversion = 'distance'
198 segment = self._zip_scale_channel(
199 segment, channel, conversion=conversion, path=path, info=info)
200 name,unit = split_data_label(segment.info['columns'][i])
201 if name == 'vDeflection':
202 assert unit == 'm', segment.info['columns'][i]
203 segment.info['columns'][i] = join_data_label('deflection', 'm')
204 elif name == 'height':
205 assert unit == 'm', segment.info['columns'][i]
206 segment.info['columns'][i] = join_data_label('z piezo', 'm')
209 def _zip_scale_channel(self, segment, channel_name, conversion=None,
211 channel = segment.info['raw info']['channels']['list'].index(
213 conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
214 if conversion == None:
215 conversion = conversion_set['conversions']['default']
216 if conversion == conversion_set['conversions']['base']:
217 # Our conversion is the base data.
218 if conversion != 'volts':
219 raise NotImplementedError(
220 'unknown units for base channel: %s' % conversion)
221 segment.info['columns'][channel] = join_data_label(
224 conversion_info = conversion_set['conversion'][conversion]
225 if conversion_info['base-calibration-slot'] \
226 != conversion_set['conversions']['base']:
227 # Our conversion is stacked on a previous conversion. Do
228 # the previous conversion first.
229 segment = self._zip_scale_channel(
230 segment, channel_name,
231 conversion_info['base-calibration-slot'],
232 path=path, info=info)
233 if conversion_info['type'] == 'file':
234 # Michael Haggerty at JPK points out that the conversion
235 # information stored in the external file is reproduced in
236 # the force curve file. So there is no need to actually
237 # read `conversion_info['file']`. In fact, the data there
238 # may have changed with future calibrations, while the
239 # information stored directly in conversion_info retains
240 # the calibration information as it was when the experiment
242 pass # Fall through to 'simple' conversion processing.
244 assert conversion_info['type'] == 'simple', conversion_info['type']
245 assert conversion_info['scaling']['type'] == 'linear', \
246 conversion_info['scaling']['type']
247 assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
248 conversion_info['scaling']['style']
249 multiplier = float(conversion_info['scaling']['multiplier'])
250 offset = float(conversion_info['scaling']['offset'])
251 unit = conversion_info['scaling']['unit']['unit']
252 segment[:,channel] = segment[:,channel] * multiplier + offset
253 segment.info['columns'][channel] = join_data_label(channel_name, unit)
256 def _parse_params(self, lines):
260 if line.startswith('#'):
263 # e.g.: force-segment-header.type=xy-position-segment-header
264 fields = line.split('=', 1)
265 assert len(fields) == 2, line
266 setting = fields[0].split('.')
267 sub_info = info # drill down, e.g. info['force-s..']['type']
268 for s in setting[:-1]:
269 if s not in sub_info:
271 sub_info = sub_info[s]
272 if setting[-1] == 'list': # split a space-delimited list
273 sub_info[setting[-1]] = fields[1].split(' ')
275 sub_info[setting[-1]] = fields[1]
278 def _read_old(self, path, info):
279 raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)