Zip files use slashes (not os.path.sep) for their path separator (JPK driver).
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 def slash_join(*args):
39     r"""Join path components with forward slashes regardless of OS.
40
41     Notes
42     -----
43     From the `PKZIP Application Note`_, section J (Explanation of fields):
44
45       file name: (Variable)
46
47         ... All slashes should be forward slashes ``/`` as opposed to
48         backwards slashes ``\`` for compatibility with Amiga and UNIX
49         file systems etc. ...
50
51     .. _PKZIP Application Note:
52       http://www.pkware.com/documents/casestudies/APPNOTE.TXT
53
54     Examples
55     --------
56
57     >>> sep = os.path.sep
58     >>> os.path.sep = '/'
59     >>> print slash_join('abc', 'def/ghi', 'jkl\\mno')
60     abc/def/ghi/jkl\mno
61     >>> os.path.sep = '\\'
62     >>> print slash_join('abc', 'def/ghi', 'jkl\\mno')
63     abc/def/ghi/jkl\mno
64     >>> os.path.sep = sep
65
66     Note that when :const:`os.path.sep` is ``/`` (e.g. UNIX),
67     ``def/ghi`` is a compound segment, but when :const:`os.path.sep`
68     is ``\`` (e.g. Windows), ``def/ghi`` is a single segment.
69     """
70     sep = os.path.sep
71     try:
72         os.path.sep = '/'
73         return os.path.join(*args)
74     finally:
75         os.path.sep = sep
76
77
78 class JPKDriver (Driver):
79     """Handle JPK ForceRobot's data format.
80     """
81     def __init__(self):
82         super(JPKDriver, self).__init__(name='jpk')
83
84     def is_me(self, path):
85         if os.path.isdir(path):
86             return False
87         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
88             with Closing(zipfile.ZipFile(path, 'r')) as f:
89                 if 'header.properties' not in f.namelist():
90                     return False
91                 with Closing(f.open('header.properties')) as h:
92                     if 'jpk-data-file' in h.read():
93                         return True
94         else:
95             with Closing(open(path, 'r')) as f:
96                 headlines = []
97                 for i in range(3):
98                     headlines.append(f.readline())
99             if headlines[0].startswith('# xPosition') \
100                     and headlines[1].startswith('# yPosition'):
101                 return True
102         return False
103
104     def read(self, path, info=None):
105         if info == None:
106             info = {}
107         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
108             return self._read_zip(path, info)
109         else:
110             return self._read_old(path, info)
111
112     def _read_zip(self, path, info):
113         with Closing(zipfile.ZipFile(path, 'r')) as f:
114             f.path = path
115             zip_info = self._zip_info(f)
116             version = zip_info['file-format-version']
117             if zip_info['jpk-data-file'] == 'jpk-data1D-file':
118                 return self._zip_read_1d(
119                     f, path, info, zip_info, version)
120             elif zip_info['jpk-data-file'] != 'spm-forcefile':
121                 raise ValueError('unrecognized JPK file type "%s"'
122                                  % zip_info['jpk-data-file'])
123             segments = []
124             for i in range(len([p for p in f.namelist()
125                                 if p.endswith('segment-header.properties')])):
126                 segments.append(self._zip_segment(
127                         f, path, info, zip_info, i, version))
128         if version not in ['0.%d' % i for i in range(13)]:
129             raise NotImplementedError(
130                 'JPK file version %s not supported (yet).' % version)
131         curve_info = self._zip_translate_params(
132             zip_info, segments[0].info['raw info'], version)
133         for segment in segments:  # HACK, should use curve-level spring constant
134             for key in ['spring constant (N/m)',
135                         'z piezo sensitivity (m/V)']:
136                 if key in curve_info:
137                     segment.info['spring constant (N/m)'] = \
138                         curve_info['spring constant (N/m)']
139         names = [segment.info['name'] for segment in segments]
140         for name in set(names):  # ensure unique names
141             count = names.count(name)
142             if count > 1:
143                 i = 0
144                 for j,n in enumerate(names):
145                     if n == name:
146                         segments[j].info['name'] += '-%d' % i
147                         i += 1
148         return (segments, curve_info)
149
150     def _zip_info(self, zipfile):
151         with Closing(zipfile.open('header.properties')) as f:
152             info = self._parse_params(f.readlines())
153             return info
154
155     def _zip_segment(self, zipfile, path, info, zip_info, index, version):
156         with Closing(zipfile.open(slash_join(
157                     'segments', str(index), 'segment-header.properties'))
158                      ) as f:
159             prop = self._parse_params(f.readlines())
160         expected_shape = (int(prop['force-segment-header']['num-points']),)
161         channels = []
162         if 'list' not in prop['channels']:
163             prop['channels'] = {'list': prop['channels'].split()}
164         for chan in prop['channels']['list']:
165             chan_info = prop['channel'][chan]
166             channels.append(self._zip_channel(
167                     zipfile, index, chan, chan_info))
168             if channels[-1].shape != expected_shape:
169                 raise NotImplementedError(
170                     'channel %d:%s in %s has strange shape %s != %s'
171                     % (index, chan, zipfile.path,
172                        channels[-1].shape, expected_shape))
173         if len(channels) > 0:
174             shape = (len(channels[0]), len(channels))
175             dtype = channels[0].dtype
176         else:  # no channels for this data block
177             shape = (0,0)
178             dtype = numpy.float32
179         d = curve.Data(
180             shape=shape,
181             dtype=dtype,
182             info=self._zip_translate_segment_params(prop))
183         for i,chan in enumerate(channels):
184             d[:,i] = chan
185         return self._zip_scale_segment(d, path, info, version)
186
187     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
188         if chan_info['data']['type'] in ['constant-data', 'raster-data']:
189             return self._zip_calculate_channel(chan_info)
190         with Closing(zipfile.open(slash_join(
191                     'segments', str(segment_index),
192                     chan_info['data']['file']['name']), 'r')) as f:
193             assert chan_info['data']['file']['format'] == 'raw', \
194                 'non-raw data format:\n%s' % pprint.pformat(chan_info)
195             dtype = self._zip_channel_dtype(chan_info)
196             data = numpy.frombuffer(
197                 buffer(f.read()),
198                 dtype=dtype,)
199         if dtype.kind in ['i', 'u']:
200             data = self._zip_channel_decode(data, chan_info)
201         return data
202
203     def _zip_calculate_channel(self, chan_info):
204         type_ = chan_info['data']['type']
205         n = int(chan_info['data']['num-points'])
206         if type_ == 'constant-data':
207             return float(chan_info['data']['value'])*numpy.ones(
208                 shape=(n,),
209                 dtype=numpy.float32)
210         elif type_ == 'raster-data':
211             start = float(chan_info['data']['start'])
212             step = float(chan_info['data']['step'])
213             return numpy.arange(
214                 start=start,
215                 stop=start + step*(n-0.5),
216                 step=step,
217                 dtype=numpy.float32)
218         else:
219             raise ValueError('unrecognized data format "%s"' % type_)
220
221     def _zip_channel_dtype(self, chan_info):
222         type_ = chan_info['data']['type']
223         if type_ in ['float-data', 'float']:
224             dtype = numpy.dtype(numpy.float32)
225         elif type_ in ['integer-data', 'memory-integer-data']:
226             encoder = chan_info['data']['encoder']['type']
227             if encoder in ['signedinteger', 'signedinteger-limited']:
228                 dtype = numpy.dtype(numpy.int32)
229             elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
230                 dtype = numpy.dtype(numpy.uint32)
231             else:
232                 raise ValueError('unrecognized encoder type "%s" for "%s" data'
233                                  % (encoder, type_))
234         elif type_ in ['short-data', 'short', 'memory-short-data']:
235             encoder = chan_info['data']['encoder']['type']
236             if encoder in ['signedshort', 'signedshort-limited']:
237                 dtype = numpy.dtype(numpy.int16)
238             elif encoder in ['unsignedshort', 'unsignedshort-limited']:
239                 dtype = numpy.dtype(numpy.uint16)
240             else:
241                 raise ValueError('unrecognized encoder type "%s" for "%s" data'
242                                  % (encoder, type_))
243         else:
244             raise ValueError('unrecognized data format "%s"' % type_)
245         byte_order = '>'
246         # '>' (big endian) byte order.
247         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
248         #    All forms of raw data are stored in chronological order
249         #    (the order in which they were collected), and the
250         #    individual values are stored in network byte order
251         #    (big-endian). The data type used to store the data is
252         #    specified by the "channel.*.data.type" property, and is
253         #    either short (2 bytes per value), integer (4 bytes), or
254         #    float (4 bytes, IEEE format).
255         return dtype.newbyteorder(byte_order)
256
257     def _zip_channel_decode(self, data, encoder_info):
258         return self._zip_apply_channel_scaling(
259             data, encoder_info['data']['encoder'])
260
261     def _zip_translate_params(self, params, chan_info, version):
262         info = {
263             'raw info':params,
264             #'time':self._time_from_TODO(raw_info[]),
265             }
266         if len(chan_info['channels']['list']) == 0:
267             return info
268         force_unit = self._zip_unit(
269             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling'],
270             version)
271         assert force_unit == 'N', force_unit
272         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
273         assert force_base == 'distance', force_base
274         dist_unit = self._zip_unit(
275             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling'],
276             version)
277         assert dist_unit == 'm', dist_unit
278         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
279         assert distance_base == 'volts', distance_base
280         base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base']
281         assert base_conversion == distance_base, base_conversion
282         if 'encoder' in chan_info['channel']['vDeflection']['data']:
283             distance_base_unit = self._zip_unit(
284                 chan_info['channel']['vDeflection']['data']['encoder']['scaling'],
285                 version)
286         else:
287             distance_base_unit = self._zip_unit(
288                 chan_info['channel']['vDeflection']['data'],
289                 version)
290         assert distance_base_unit == 'V', distance_base_unit
291         force_mult = float(
292             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
293         sens_mult = float(
294             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
295         info['spring constant (N/m)'] = force_mult
296         info['z piezo sensitivity (m/V)'] = sens_mult
297         return info
298
299     def _zip_translate_segment_params(self, params):
300         info = {
301             'raw info': params,
302             'columns': list(params['channels']['list']),
303             'name': self._zip_segment_name(params),
304             }
305         return info
306
307     def _zip_segment_name(self, params):
308         name = params['force-segment-header']['name']['name']
309         if name.endswith('-spm'):
310             name = name[:-len('-spm')]
311         if name == 'extend':
312             name = 'approach'
313         elif name.startswith('pause-at-'):
314             name = 'pause'
315         return name
316
317     def _zip_scale_segment(self, segment, path, info, version):
318         data = curve.Data(
319             shape=segment.shape,
320             dtype=segment.dtype,
321             info={})
322         data[:,:] = segment
323         segment.info['raw data'] = data
324
325         # raw column indices
326         channels = segment.info['raw info']['channels']['list']
327         for i,channel in enumerate(channels):
328             conversion = None
329             if channel == 'vDeflection':
330                 conversion = 'distance'
331             segment = self._zip_scale_channel(
332                 segment, channel, conversion=conversion,
333                 path=path, info=info, version=version)
334             name,unit = split_data_label(segment.info['columns'][i])
335             if name == 'vDeflection':
336                 assert unit == 'm', segment.info['columns'][i]
337                 segment.info['columns'][i] = join_data_label('deflection', 'm')
338                 # Invert because deflection voltage increases as the
339                 # tip moves away from the surface, but it makes more
340                 # sense to me to have it increase as it moves toward
341                 # the surface (positive tension on the protein chain).
342                 segment[:,i] *= -1
343             elif name == 'height':
344                 assert unit == 'm', segment.info['columns'][i]
345                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
346         return segment
347
348     def _zip_scale_channel(self, segment, channel_name,
349                            conversion=None, path=None, info={}, version=None):
350         channel = segment.info['raw info']['channels']['list'].index(
351             channel_name)
352         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
353         if conversion == None:
354             conversion = conversion_set['conversions']['default']
355         if conversion == conversion_set['conversions']['base']:
356             segment.info['columns'][channel] = join_data_label(
357                 channel_name,
358                 self._zip_unit(
359                     segment.info['raw info']['channel'][channel_name]['data'],
360                     version))
361             return segment
362         conversion_info = conversion_set['conversion'][conversion]
363         if conversion_info['base-calibration-slot'] \
364                 != conversion_set['conversions']['base']:
365             # Our conversion is stacked on a previous conversion.  Do
366             # the previous conversion first.
367             segment = self._zip_scale_channel(
368                 segment, channel_name,
369                 conversion_info['base-calibration-slot'],
370                 path=path, info=info, version=version)
371         if conversion_info['type'] == 'file':
372             # Michael Haggerty at JPK points out that the conversion
373             # information stored in the external file is reproduced in
374             # the force curve file.  So there is no need to actually
375             # read `conversion_info['file']`.  In fact, the data there
376             # may have changed with future calibrations, while the
377             # information stored directly in conversion_info retains
378             # the calibration information as it was when the experiment
379             # was performed.
380             pass  # Fall through to 'simple' conversion processing.
381         else:
382             assert conversion_info['type'] == 'simple', conversion_info['type']
383         segment[:,channel] = self._zip_apply_channel_scaling(
384             segment[:,channel], conversion_info)
385         unit = self._zip_unit(conversion_info['scaling'], version)
386         segment.info['columns'][channel] = join_data_label(channel_name, unit)
387         return segment
388
389     def _zip_apply_channel_scaling(self, channel_data, conversion_info):
390         assert conversion_info['scaling']['type'] == 'linear', \
391             conversion_info['scaling']['type']
392         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
393             conversion_info['scaling']['style']
394         multiplier = float(conversion_info['scaling']['multiplier'])
395         offset = float(conversion_info['scaling']['offset'])
396         return channel_data * multiplier + offset
397
398     def _zip_unit(self, conversion_info, version):
399         if version in ['0.%d' % i for i in range(3)]:
400             return conversion_info['unit']
401         else:
402             return conversion_info['unit']['unit']
403
404     def _parse_params(self, lines):
405         info = {}
406         for line in lines:
407             line = line.strip()
408             if line.startswith('#'):
409                 continue
410             else:
411                 # e.g.: force-segment-header.type=xy-position-segment-header
412                 fields = line.split('=', 1)
413                 assert len(fields) == 2, line
414                 setting = fields[0].split('.')
415                 sub_info = info  # drill down, e.g. info['force-s..']['type']
416                 for s in setting[:-1]:
417                     if s not in sub_info:
418                         sub_info[s] = {}
419                     sub_info = sub_info[s]
420                 if setting[-1] == 'list':  # split a space-delimited list
421                     if fields[1]:
422                         sub_info[setting[-1]] = fields[1].split(' ')
423                     else:
424                         sub_info[setting[-1]] = []
425                 else:
426                     sub_info[setting[-1]] = fields[1]
427         return info
428
429     def _zip_read_1d(self, zipfile, path, info, zip_info, version):
430         expected_shape = (int(zip_info['data']['num-points']),)
431         if zip_info['data']['type'] in ['constant-data', 'raster-data']:
432             return self._zip_calculate_channel(zip_info)
433         with Closing(zipfile.open(
434                 zip_info['data']['file']['name'], 'r')) as f:
435             assert zip_info['data']['file']['format'] == 'raw', \
436                 'non-raw data format:\n%s' % pprint.pformat(chan_info)
437             dtype = self._zip_channel_dtype(zip_info)
438             data = numpy.frombuffer(
439                 buffer(f.read()),
440                 dtype=dtype,)
441             if dtype.kind in ['i', 'u']:
442                 data = self._zip_channel_decode(data, zip_info)
443         if data.shape != expected_shape:
444             raise NotImplementedError(
445                 'channel %s has strange shape %s != %s'
446                 % (path, data.shape, expected_shape))
447         d = curve.Data(
448             shape=data.shape,
449             dtype=data.dtype,
450             info=zip_info)
451         d[:] = data
452         return d
453
454     def _read_old(self, path, info):
455         raise NotImplementedError(
456             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
457             "use JPK's `out2jpk-force` script to convert your old files "
458             "(%s) to a more recent format before loading them with Hooke."
459             % path)