1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 # W. Trevor King <wking@drexel.edu>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14 # Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke. If not, see
18 # <http://www.gnu.org/licenses/>.
20 """Driver for JPK ForceRobot's velocity clamp data format.
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
38 class JPKDriver (Driver):
39 """Handle JPK ForceRobot's data format.
42 super(JPKDriver, self).__init__(name='jpk')
44 def is_me(self, path):
45 if os.path.isdir(path):
47 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
48 with Closing(zipfile.ZipFile(path, 'r')) as f:
49 if 'header.properties' not in f.namelist():
51 with Closing(f.open('header.properties')) as h:
52 if 'jpk-data-file' in h.read():
55 with Closing(open(path, 'r')) as f:
58 headlines.append(f.readline())
59 if headlines[0].startswith('# xPosition') \
60 and headlines[1].startswith('# yPosition'):
64 def read(self, path, info=None):
67 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
68 return self._read_zip(path, info)
70 return self._read_old(path, info)
72 def _read_zip(self, path, info):
73 with Closing(zipfile.ZipFile(path, 'r')) as f:
75 zip_info = self._zip_info(f)
76 version = zip_info['file-format-version']
77 if zip_info['jpk-data-file'] == 'jpk-data1D-file':
78 return self._zip_read_1d(
79 f, path, info, zip_info, version)
80 elif zip_info['jpk-data-file'] != 'spm-forcefile':
81 raise ValueError('unrecognized JPK file type "%s"'
82 % zip_info['jpk-data-file'])
84 for i in range(len([p for p in f.namelist()
85 if p.endswith('segment-header.properties')])):
86 segments.append(self._zip_segment(
87 f, path, info, zip_info, i, version))
88 if version not in ['0.%d' % i for i in range(13)]:
89 raise NotImplementedError(
90 'JPK file version %s not supported (yet).' % version)
91 curve_info = self._zip_translate_params(
92 zip_info, segments[0].info['raw info'], version)
93 for segment in segments: # HACK, should use curve-level spring constant
94 for key in ['spring constant (N/m)',
95 'z piezo sensitivity (m/V)']:
97 segment.info['spring constant (N/m)'] = \
98 curve_info['spring constant (N/m)']
99 names = [segment.info['name'] for segment in segments]
100 for name in set(names): # ensure unique names
101 count = names.count(name)
104 for j,n in enumerate(names):
106 segments[j].info['name'] += '-%d' % i
108 return (segments, curve_info)
110 def _zip_info(self, zipfile):
111 with Closing(zipfile.open('header.properties')) as f:
112 info = self._parse_params(f.readlines())
115 def _zip_segment(self, zipfile, path, info, zip_info, index, version):
116 with Closing(zipfile.open(os.path.join(
117 'segments', str(index), 'segment-header.properties'))
119 prop = self._parse_params(f.readlines())
120 expected_shape = (int(prop['force-segment-header']['num-points']),)
122 if 'list' not in prop['channels']:
123 prop['channels'] = {'list': prop['channels'].split()}
124 for chan in prop['channels']['list']:
125 chan_info = prop['channel'][chan]
126 channels.append(self._zip_channel(
127 zipfile, index, chan, chan_info))
128 if channels[-1].shape != expected_shape:
129 raise NotImplementedError(
130 'channel %d:%s in %s has strange shape %s != %s'
131 % (index, chan, zipfile.path,
132 channels[-1].shape, expected_shape))
133 if len(channels) > 0:
134 shape = (len(channels[0]), len(channels))
135 dtype = channels[0].dtype
136 else: # no channels for this data block
138 dtype = numpy.float32
142 info=self._zip_translate_segment_params(prop))
143 for i,chan in enumerate(channels):
145 return self._zip_scale_segment(d, path, info, version)
147 def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
148 if chan_info['data']['type'] in ['constant-data', 'raster-data']:
149 return self._zip_calculate_channel(chan_info)
150 with Closing(zipfile.open(os.path.join(
151 'segments', str(segment_index),
152 chan_info['data']['file']['name']), 'r')) as f:
153 assert chan_info['data']['file']['format'] == 'raw', \
154 'non-raw data format:\n%s' % pprint.pformat(chan_info)
155 dtype = self._zip_channel_dtype(chan_info)
156 data = numpy.frombuffer(
159 if dtype.kind in ['i', 'u']:
160 data = self._zip_channel_decode(data, chan_info)
163 def _zip_calculate_channel(self, chan_info):
164 type_ = chan_info['data']['type']
165 n = int(chan_info['data']['num-points'])
166 if type_ == 'constant-data':
167 return float(chan_info['data']['value'])*numpy.ones(
170 elif type_ == 'raster-data':
171 start = float(chan_info['data']['start'])
172 step = float(chan_info['data']['step'])
175 stop=start + step*(n-0.5),
179 raise ValueError('unrecognized data format "%s"' % type_)
181 def _zip_channel_dtype(self, chan_info):
182 type_ = chan_info['data']['type']
183 if type_ in ['float-data', 'float']:
184 dtype = numpy.dtype(numpy.float32)
185 elif type_ in ['integer-data', 'memory-integer-data']:
186 encoder = chan_info['data']['encoder']['type']
187 if encoder in ['signedinteger', 'signedinteger-limited']:
188 dtype = numpy.dtype(numpy.int32)
189 elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
190 dtype = numpy.dtype(numpy.uint32)
192 raise ValueError('unrecognized encoder type "%s" for "%s" data'
194 elif type_ in ['short-data', 'short', 'memory-short-data']:
195 encoder = chan_info['data']['encoder']['type']
196 if encoder in ['signedshort', 'signedshort-limited']:
197 dtype = numpy.dtype(numpy.int16)
198 elif encoder in ['unsignedshort', 'unsignedshort-limited']:
199 dtype = numpy.dtype(numpy.uint16)
201 raise ValueError('unrecognized encoder type "%s" for "%s" data'
204 raise ValueError('unrecognized data format "%s"' % type_)
206 # '>' (big endian) byte order.
207 # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
208 # All forms of raw data are stored in chronological order
209 # (the order in which they were collected), and the
210 # individual values are stored in network byte order
211 # (big-endian). The data type used to store the data is
212 # specified by the "channel.*.data.type" property, and is
213 # either short (2 bytes per value), integer (4 bytes), or
214 # float (4 bytes, IEEE format).
215 return dtype.newbyteorder(byte_order)
217 def _zip_channel_decode(self, data, encoder_info):
218 return self._zip_apply_channel_scaling(
219 data, encoder_info['data']['encoder'])
221 def _zip_translate_params(self, params, chan_info, version):
224 #'time':self._time_from_TODO(raw_info[]),
226 if len(chan_info['channels']['list']) == 0:
228 force_unit = self._zip_unit(
229 chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling'],
231 assert force_unit == 'N', force_unit
232 force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
233 assert force_base == 'distance', force_base
234 dist_unit = self._zip_unit(
235 chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling'],
237 assert dist_unit == 'm', dist_unit
238 distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
239 assert distance_base == 'volts', distance_base
240 base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base']
241 assert base_conversion == distance_base, base_conversion
242 if 'encoder' in chan_info['channel']['vDeflection']['data']:
243 distance_base_unit = self._zip_unit(
244 chan_info['channel']['vDeflection']['data']['encoder']['scaling'],
247 distance_base_unit = self._zip_unit(
248 chan_info['channel']['vDeflection']['data'],
250 assert distance_base_unit == 'V', distance_base_unit
252 chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
254 chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
255 info['spring constant (N/m)'] = force_mult
256 info['z piezo sensitivity (m/V)'] = sens_mult
259 def _zip_translate_segment_params(self, params):
262 'columns': list(params['channels']['list']),
263 'name': self._zip_segment_name(params),
267 def _zip_segment_name(self, params):
268 name = params['force-segment-header']['name']['name']
269 if name.endswith('-spm'):
270 name = name[:-len('-spm')]
273 elif name.startswith('pause-at-'):
277 def _zip_scale_segment(self, segment, path, info, version):
283 segment.info['raw data'] = data
286 channels = segment.info['raw info']['channels']['list']
287 for i,channel in enumerate(channels):
289 if channel == 'vDeflection':
290 conversion = 'distance'
291 segment = self._zip_scale_channel(
292 segment, channel, conversion=conversion,
293 path=path, info=info, version=version)
294 name,unit = split_data_label(segment.info['columns'][i])
295 if name == 'vDeflection':
296 assert unit == 'm', segment.info['columns'][i]
297 segment.info['columns'][i] = join_data_label('deflection', 'm')
298 # Invert because deflection voltage increases as the
299 # tip moves away from the surface, but it makes more
300 # sense to me to have it increase as it moves toward
301 # the surface (positive tension on the protein chain).
303 elif name == 'height':
304 assert unit == 'm', segment.info['columns'][i]
305 segment.info['columns'][i] = join_data_label('z piezo', 'm')
308 def _zip_scale_channel(self, segment, channel_name,
309 conversion=None, path=None, info={}, version=None):
310 channel = segment.info['raw info']['channels']['list'].index(
312 conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
313 if conversion == None:
314 conversion = conversion_set['conversions']['default']
315 if conversion == conversion_set['conversions']['base']:
316 segment.info['columns'][channel] = join_data_label(
319 segment.info['raw info']['channel'][channel_name]['data'],
322 conversion_info = conversion_set['conversion'][conversion]
323 if conversion_info['base-calibration-slot'] \
324 != conversion_set['conversions']['base']:
325 # Our conversion is stacked on a previous conversion. Do
326 # the previous conversion first.
327 segment = self._zip_scale_channel(
328 segment, channel_name,
329 conversion_info['base-calibration-slot'],
330 path=path, info=info, version=version)
331 if conversion_info['type'] == 'file':
332 # Michael Haggerty at JPK points out that the conversion
333 # information stored in the external file is reproduced in
334 # the force curve file. So there is no need to actually
335 # read `conversion_info['file']`. In fact, the data there
336 # may have changed with future calibrations, while the
337 # information stored directly in conversion_info retains
338 # the calibration information as it was when the experiment
340 pass # Fall through to 'simple' conversion processing.
342 assert conversion_info['type'] == 'simple', conversion_info['type']
343 segment[:,channel] = self._zip_apply_channel_scaling(
344 segment[:,channel], conversion_info)
345 unit = self._zip_unit(conversion_info['scaling'], version)
346 segment.info['columns'][channel] = join_data_label(channel_name, unit)
349 def _zip_apply_channel_scaling(self, channel_data, conversion_info):
350 assert conversion_info['scaling']['type'] == 'linear', \
351 conversion_info['scaling']['type']
352 assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
353 conversion_info['scaling']['style']
354 multiplier = float(conversion_info['scaling']['multiplier'])
355 offset = float(conversion_info['scaling']['offset'])
356 return channel_data * multiplier + offset
358 def _zip_unit(self, conversion_info, version):
359 if version in ['0.%d' % i for i in range(3)]:
360 return conversion_info['unit']
362 return conversion_info['unit']['unit']
364 def _parse_params(self, lines):
368 if line.startswith('#'):
371 # e.g.: force-segment-header.type=xy-position-segment-header
372 fields = line.split('=', 1)
373 assert len(fields) == 2, line
374 setting = fields[0].split('.')
375 sub_info = info # drill down, e.g. info['force-s..']['type']
376 for s in setting[:-1]:
377 if s not in sub_info:
379 sub_info = sub_info[s]
380 if setting[-1] == 'list': # split a space-delimited list
382 sub_info[setting[-1]] = fields[1].split(' ')
384 sub_info[setting[-1]] = []
386 sub_info[setting[-1]] = fields[1]
389 def _zip_read_1d(self, zipfile, path, info, zip_info, version):
390 expected_shape = (int(zip_info['data']['num-points']),)
391 if zip_info['data']['type'] in ['constant-data', 'raster-data']:
392 return self._zip_calculate_channel(zip_info)
393 with Closing(zipfile.open(
394 zip_info['data']['file']['name'], 'r')) as f:
395 assert zip_info['data']['file']['format'] == 'raw', \
396 'non-raw data format:\n%s' % pprint.pformat(chan_info)
397 dtype = self._zip_channel_dtype(zip_info)
398 data = numpy.frombuffer(
401 if dtype.kind in ['i', 'u']:
402 data = self._zip_channel_decode(data, zip_info)
403 if data.shape != expected_shape:
404 raise NotImplementedError(
405 'channel %s has strange shape %s != %s'
406 % (path, data.shape, expected_shape))
414 def _read_old(self, path, info):
415 raise NotImplementedError(
416 "Early JPK files (pre-zip) are not supported by Hooke. Please "
417 "use JPK's `out2jpk-force` script to convert your old files "
418 "(%s) to a more recent format before loading them with Hooke."