69f307e69a49f4f99607f1f285699c2b42694bba
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2012 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@tremily.us>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it under the
7 # terms of the GNU Lesser General Public License as published by the Free
8 # Software Foundation, either version 3 of the License, or (at your option) any
9 # later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT ANY
12 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
13 # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
14 # details.
15 #
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with Hooke.  If not, see <http://www.gnu.org/licenses/>.
18
19 """Driver for JPK ForceRobot's velocity clamp data format.
20
21 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
22 The specs are freely available from JPK, just email support@jpk.com.
23 """
24
25 import os.path
26 import pprint
27 import zipfile
28
29 import numpy
30
31 from .. import curve as curve
32 from ..util.util import Closing as Closing
33 from ..util.si import join_data_label, split_data_label
34 from . import Driver as Driver
35
36
37 def slash_join(*args):
38     r"""Join path components with forward slashes regardless of OS.
39
40     Notes
41     -----
42     From the `PKZIP Application Note`_, section J (Explanation of fields):
43
44       file name: (Variable)
45
46         ... All slashes should be forward slashes ``/`` as opposed to
47         backwards slashes ``\`` for compatibility with Amiga and UNIX
48         file systems etc. ...
49
50     .. _PKZIP Application Note:
51       http://www.pkware.com/documents/casestudies/APPNOTE.TXT
52
53     Examples
54     --------
55
56     >>> sep = os.path.sep
57     >>> os.path.sep = '/'
58     >>> print slash_join('abc', 'def/ghi', 'jkl\\mno')
59     abc/def/ghi/jkl\mno
60     >>> os.path.sep = '\\'
61     >>> print slash_join('abc', 'def/ghi', 'jkl\\mno')
62     abc/def/ghi/jkl\mno
63     >>> os.path.sep = sep
64
65     Note that when :const:`os.path.sep` is ``/`` (e.g. UNIX),
66     ``def/ghi`` is a compound segment, but when :const:`os.path.sep`
67     is ``\`` (e.g. Windows), ``def/ghi`` is a single segment.
68     """
69     sep = os.path.sep
70     try:
71         os.path.sep = '/'
72         return os.path.join(*args)
73     finally:
74         os.path.sep = sep
75
76
77 class JPKDriver (Driver):
78     """Handle JPK ForceRobot's data format.
79     """
80     def __init__(self):
81         super(JPKDriver, self).__init__(name='jpk')
82
83     def is_me(self, path):
84         if os.path.isdir(path):
85             return False
86         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
87             with Closing(zipfile.ZipFile(path, 'r')) as f:
88                 if 'header.properties' not in f.namelist():
89                     return False
90                 with Closing(f.open('header.properties')) as h:
91                     if 'jpk-data-file' in h.read():
92                         return True
93         else:
94             with Closing(open(path, 'r')) as f:
95                 headlines = []
96                 for i in range(3):
97                     headlines.append(f.readline())
98             if headlines[0].startswith('# xPosition') \
99                     and headlines[1].startswith('# yPosition'):
100                 return True
101         return False
102
103     def read(self, path, info=None):
104         if info == None:
105             info = {}
106         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
107             return self._read_zip(path, info)
108         else:
109             return self._read_old(path, info)
110
111     def _read_zip(self, path, info):
112         with Closing(zipfile.ZipFile(path, 'r')) as f:
113             f.path = path
114             zip_info = self._zip_info(f)
115             version = zip_info['file-format-version']
116             if zip_info['jpk-data-file'] == 'jpk-data1D-file':
117                 return self._zip_read_1d(
118                     f, path, info, zip_info, version)
119             elif zip_info['jpk-data-file'] != 'spm-forcefile':
120                 raise ValueError('unrecognized JPK file type "%s"'
121                                  % zip_info['jpk-data-file'])
122             segments = []
123             for i in range(len([p for p in f.namelist()
124                                 if p.endswith('segment-header.properties')])):
125                 segments.append(self._zip_segment(
126                         f, path, info, zip_info, i, version))
127         if version not in ['0.%d' % i for i in range(13)]:
128             raise NotImplementedError(
129                 'JPK file version %s not supported (yet).' % version)
130         curve_info = self._zip_translate_params(
131             zip_info, segments[0].info['raw info'], version)
132         for segment in segments:  # HACK, should use curve-level spring constant
133             for key in ['spring constant (N/m)',
134                         'z piezo sensitivity (m/V)']:
135                 if key in curve_info:
136                     segment.info['spring constant (N/m)'] = \
137                         curve_info['spring constant (N/m)']
138         names = [segment.info['name'] for segment in segments]
139         for name in set(names):  # ensure unique names
140             count = names.count(name)
141             if count > 1:
142                 i = 0
143                 for j,n in enumerate(names):
144                     if n == name:
145                         segments[j].info['name'] += '-%d' % i
146                         i += 1
147         return (segments, curve_info)
148
149     def _zip_info(self, zipfile):
150         with Closing(zipfile.open('header.properties')) as f:
151             info = self._parse_params(f.readlines())
152             return info
153
154     def _zip_segment(self, zipfile, path, info, zip_info, index, version):
155         with Closing(zipfile.open(slash_join(
156                     'segments', str(index), 'segment-header.properties'))
157                      ) as f:
158             prop = self._parse_params(f.readlines())
159         expected_shape = (int(prop['force-segment-header']['num-points']),)
160         channels = []
161         if 'list' not in prop['channels']:
162             prop['channels'] = {'list': prop['channels'].split()}
163         for chan in prop['channels']['list']:
164             chan_info = prop['channel'][chan]
165             channels.append(self._zip_channel(
166                     zipfile, index, chan, chan_info))
167             if channels[-1].shape != expected_shape:
168                 raise NotImplementedError(
169                     'channel %d:%s in %s has strange shape %s != %s'
170                     % (index, chan, zipfile.path,
171                        channels[-1].shape, expected_shape))
172         if len(channels) > 0:
173             shape = (len(channels[0]), len(channels))
174             dtype = channels[0].dtype
175         else:  # no channels for this data block
176             shape = (0,0)
177             dtype = numpy.float32
178         d = curve.Data(
179             shape=shape,
180             dtype=dtype,
181             info=self._zip_translate_segment_params(prop))
182         for i,chan in enumerate(channels):
183             d[:,i] = chan
184         return self._zip_scale_segment(d, path, info, version)
185
186     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
187         if chan_info['data']['type'] in ['constant-data', 'raster-data']:
188             return self._zip_calculate_channel(chan_info)
189         with Closing(zipfile.open(slash_join(
190                     'segments', str(segment_index),
191                     chan_info['data']['file']['name']), 'r')) as f:
192             assert chan_info['data']['file']['format'] == 'raw', \
193                 'non-raw data format:\n%s' % pprint.pformat(chan_info)
194             dtype = self._zip_channel_dtype(chan_info)
195             data = numpy.frombuffer(
196                 buffer(f.read()),
197                 dtype=dtype,)
198         if dtype.kind in ['i', 'u']:
199             data = self._zip_channel_decode(data, chan_info)
200         return data
201
202     def _zip_calculate_channel(self, chan_info):
203         type_ = chan_info['data']['type']
204         n = int(chan_info['data']['num-points'])
205         if type_ == 'constant-data':
206             return float(chan_info['data']['value'])*numpy.ones(
207                 shape=(n,),
208                 dtype=numpy.float32)
209         elif type_ == 'raster-data':
210             start = float(chan_info['data']['start'])
211             step = float(chan_info['data']['step'])
212             return numpy.arange(
213                 start=start,
214                 stop=start + step*(n-0.5),
215                 step=step,
216                 dtype=numpy.float32)
217         else:
218             raise ValueError('unrecognized data format "%s"' % type_)
219
220     def _zip_channel_dtype(self, chan_info):
221         type_ = chan_info['data']['type']
222         if type_ in ['float-data', 'float']:
223             dtype = numpy.dtype(numpy.float32)
224         elif type_ in ['integer-data', 'memory-integer-data']:
225             encoder = chan_info['data']['encoder']['type']
226             if encoder in ['signedinteger', 'signedinteger-limited']:
227                 dtype = numpy.dtype(numpy.int32)
228             elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
229                 dtype = numpy.dtype(numpy.uint32)
230             else:
231                 raise ValueError('unrecognized encoder type "%s" for "%s" data'
232                                  % (encoder, type_))
233         elif type_ in ['short-data', 'short', 'memory-short-data']:
234             encoder = chan_info['data']['encoder']['type']
235             if encoder in ['signedshort', 'signedshort-limited']:
236                 dtype = numpy.dtype(numpy.int16)
237             elif encoder in ['unsignedshort', 'unsignedshort-limited']:
238                 dtype = numpy.dtype(numpy.uint16)
239             else:
240                 raise ValueError('unrecognized encoder type "%s" for "%s" data'
241                                  % (encoder, type_))
242         else:
243             raise ValueError('unrecognized data format "%s"' % type_)
244         byte_order = '>'
245         # '>' (big endian) byte order.
246         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
247         #    All forms of raw data are stored in chronological order
248         #    (the order in which they were collected), and the
249         #    individual values are stored in network byte order
250         #    (big-endian). The data type used to store the data is
251         #    specified by the "channel.*.data.type" property, and is
252         #    either short (2 bytes per value), integer (4 bytes), or
253         #    float (4 bytes, IEEE format).
254         return dtype.newbyteorder(byte_order)
255
256     def _zip_channel_decode(self, data, encoder_info):
257         return self._zip_apply_channel_scaling(
258             data, encoder_info['data']['encoder'])
259
260     def _zip_translate_params(self, params, chan_info, version):
261         info = {
262             'raw info':params,
263             #'time':self._time_from_TODO(raw_info[]),
264             }
265         if len(chan_info['channels']['list']) == 0:
266             return info
267         force_unit = self._zip_unit(
268             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling'],
269             version)
270         assert force_unit == 'N', force_unit
271         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
272         assert force_base == 'distance', force_base
273         dist_unit = self._zip_unit(
274             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling'],
275             version)
276         assert dist_unit == 'm', dist_unit
277         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
278         assert distance_base == 'volts', distance_base
279         base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base']
280         assert base_conversion == distance_base, base_conversion
281         if 'encoder' in chan_info['channel']['vDeflection']['data']:
282             distance_base_unit = self._zip_unit(
283                 chan_info['channel']['vDeflection']['data']['encoder']['scaling'],
284                 version)
285         else:
286             distance_base_unit = self._zip_unit(
287                 chan_info['channel']['vDeflection']['data'],
288                 version)
289         assert distance_base_unit == 'V', distance_base_unit
290         force_mult = float(
291             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
292         sens_mult = float(
293             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
294         info['spring constant (N/m)'] = force_mult
295         info['z piezo sensitivity (m/V)'] = sens_mult
296         return info
297
298     def _zip_translate_segment_params(self, params):
299         info = {
300             'raw info': params,
301             'columns': list(params['channels']['list']),
302             'name': self._zip_segment_name(params),
303             }
304         return info
305
306     def _zip_segment_name(self, params):
307         name = params['force-segment-header']['name']['name']
308         if name.endswith('-spm'):
309             name = name[:-len('-spm')]
310         if name == 'extend':
311             name = 'approach'
312         elif name.startswith('pause-at-'):
313             name = 'pause'
314         return name
315
316     def _zip_scale_segment(self, segment, path, info, version):
317         data = curve.Data(
318             shape=segment.shape,
319             dtype=segment.dtype,
320             info={})
321         data[:,:] = segment
322         segment.info['raw data'] = data
323
324         # raw column indices
325         channels = segment.info['raw info']['channels']['list']
326         for i,channel in enumerate(channels):
327             conversion = None
328             if channel == 'vDeflection':
329                 conversion = 'distance'
330             segment = self._zip_scale_channel(
331                 segment, channel, conversion=conversion,
332                 path=path, info=info, version=version)
333             name,unit = split_data_label(segment.info['columns'][i])
334             if name == 'vDeflection':
335                 assert unit == 'm', segment.info['columns'][i]
336                 segment.info['columns'][i] = join_data_label('deflection', 'm')
337                 # Invert because deflection voltage increases as the
338                 # tip moves away from the surface, but it makes more
339                 # sense to me to have it increase as it moves toward
340                 # the surface (positive tension on the protein chain).
341                 segment[:,i] *= -1
342             elif name == 'height':
343                 assert unit == 'm', segment.info['columns'][i]
344                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
345         return segment
346
347     def _zip_scale_channel(self, segment, channel_name,
348                            conversion=None, path=None, info={}, version=None):
349         channel = segment.info['raw info']['channels']['list'].index(
350             channel_name)
351         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
352         if conversion == None:
353             conversion = conversion_set['conversions']['default']
354         if conversion == conversion_set['conversions']['base']:
355             segment.info['columns'][channel] = join_data_label(
356                 channel_name,
357                 self._zip_unit(
358                     segment.info['raw info']['channel'][channel_name]['data'],
359                     version))
360             return segment
361         conversion_info = conversion_set['conversion'][conversion]
362         if conversion_info['base-calibration-slot'] \
363                 != conversion_set['conversions']['base']:
364             # Our conversion is stacked on a previous conversion.  Do
365             # the previous conversion first.
366             segment = self._zip_scale_channel(
367                 segment, channel_name,
368                 conversion_info['base-calibration-slot'],
369                 path=path, info=info, version=version)
370         if conversion_info['type'] == 'file':
371             # Michael Haggerty at JPK points out that the conversion
372             # information stored in the external file is reproduced in
373             # the force curve file.  So there is no need to actually
374             # read `conversion_info['file']`.  In fact, the data there
375             # may have changed with future calibrations, while the
376             # information stored directly in conversion_info retains
377             # the calibration information as it was when the experiment
378             # was performed.
379             pass  # Fall through to 'simple' conversion processing.
380         else:
381             assert conversion_info['type'] == 'simple', conversion_info['type']
382         segment[:,channel] = self._zip_apply_channel_scaling(
383             segment[:,channel], conversion_info)
384         unit = self._zip_unit(conversion_info['scaling'], version)
385         segment.info['columns'][channel] = join_data_label(channel_name, unit)
386         return segment
387
388     def _zip_apply_channel_scaling(self, channel_data, conversion_info):
389         assert conversion_info['scaling']['type'] == 'linear', \
390             conversion_info['scaling']['type']
391         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
392             conversion_info['scaling']['style']
393         multiplier = float(conversion_info['scaling']['multiplier'])
394         offset = float(conversion_info['scaling']['offset'])
395         return channel_data * multiplier + offset
396
397     def _zip_unit(self, conversion_info, version):
398         if version in ['0.%d' % i for i in range(3)]:
399             return conversion_info['unit']
400         else:
401             return conversion_info['unit']['unit']
402
403     def _parse_params(self, lines):
404         info = {}
405         for line in lines:
406             line = line.strip()
407             if line.startswith('#'):
408                 continue
409             else:
410                 # e.g.: force-segment-header.type=xy-position-segment-header
411                 fields = line.split('=', 1)
412                 assert len(fields) == 2, line
413                 setting = fields[0].split('.')
414                 sub_info = info  # drill down, e.g. info['force-s..']['type']
415                 for s in setting[:-1]:
416                     if s not in sub_info:
417                         sub_info[s] = {}
418                     sub_info = sub_info[s]
419                 if setting[-1] == 'list':  # split a space-delimited list
420                     if fields[1]:
421                         sub_info[setting[-1]] = fields[1].split(' ')
422                     else:
423                         sub_info[setting[-1]] = []
424                 else:
425                     sub_info[setting[-1]] = fields[1]
426         return info
427
428     def _zip_read_1d(self, zipfile, path, info, zip_info, version):
429         expected_shape = (int(zip_info['data']['num-points']),)
430         if zip_info['data']['type'] in ['constant-data', 'raster-data']:
431             return self._zip_calculate_channel(zip_info)
432         with Closing(zipfile.open(
433                 zip_info['data']['file']['name'], 'r')) as f:
434             assert zip_info['data']['file']['format'] == 'raw', \
435                 'non-raw data format:\n%s' % pprint.pformat(chan_info)
436             dtype = self._zip_channel_dtype(zip_info)
437             data = numpy.frombuffer(
438                 buffer(f.read()),
439                 dtype=dtype,)
440             if dtype.kind in ['i', 'u']:
441                 data = self._zip_channel_decode(data, zip_info)
442         if data.shape != expected_shape:
443             raise NotImplementedError(
444                 'channel %s has strange shape %s != %s'
445                 % (path, data.shape, expected_shape))
446         d = curve.Data(
447             shape=data.shape,
448             dtype=data.dtype,
449             info=zip_info)
450         d[:] = data
451         return d
452
453     def _read_old(self, path, info):
454         raise NotImplementedError(
455             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
456             "use JPK's `out2jpk-force` script to convert your old files "
457             "(%s) to a more recent format before loading them with Hooke."
458             % path)