Allow JPK file versions up to 0.12 (unfinished).
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             segments = []
77             for i in range(len([p for p in f.namelist()
78                                 if p.endswith('segment-header.properties')])):
79                 segments.append(self._zip_segment(f, path, info, zip_info, i))
80         if zip_info['file-format-version'] not in ['0.%d' % i
81                                                    for i in range(12)]:
82             raise NotImplementedError(
83                 'JPK file version %s not supported (yet).'
84                 % zip_info['file-format-version'])
85         for name in ['approach', 'retract']:
86             if len([s for s in segments if s.info['name'] == name]) == 0:
87                 raise ValueError(
88                     'No segment for %s in %s, only %s'
89                     % (name, path, [s.info['name'] for s in segments]))
90         curve_info = self._zip_translate_params(zip_info,
91                                                 segments[0].info['raw info'])
92         for segment in segments:
93             segment.info['spring constant (N/m)'] = \
94                 curve_info['spring constant (N/m)']
95         return (segments, curve_info)
96
97     def _zip_info(self, zipfile):
98         with Closing(zipfile.open('header.properties')) as f:
99             info = self._parse_params(f.readlines())
100             return info
101
102     def _zip_segment(self, zipfile, path, info, zip_info, index):
103         prop_file = zipfile.open(os.path.join(
104                 'segments', str(index), 'segment-header.properties'))
105         prop = self._parse_params(prop_file.readlines())
106         prop_file.close()
107         expected_shape = (int(prop['force-segment-header']['num-points']),)
108         channels = []
109         for chan in prop['channels']['list']:
110             chan_info = prop['channel'][chan]
111             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
112             if channels[-1].shape != expected_shape:
113                     raise NotImplementedError(
114                         'Channel %d:%s in %s has strange shape %s != %s'
115                         % (index, chan, zipfile.path,
116                            channels[-1].shape, expected_shape))
117         d = curve.Data(
118             shape=(len(channels[0]), len(channels)),
119             dtype=channels[0].dtype,
120             info=self._zip_translate_segment_params(prop))
121         for i,chan in enumerate(channels):
122             d[:,i] = chan
123         return self._zip_scale_segment(d, path, info)
124
125     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
126         f = zipfile.open(os.path.join(
127                 'segments', str(segment_index),
128                 chan_info['data']['file']['name']), 'r')
129         assert chan_info['data']['file']['format'] == 'raw', \
130             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
131         assert chan_info['data']['type'] == 'float-data', \
132             'Non-float data format:\n%s' % pprint.pformat(chan_info)
133         data = numpy.frombuffer(
134             buffer(f.read()),
135             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
136         # '>' (big endian) byte order.
137         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
138         #    All forms of raw data are stored in chronological order
139         #    (the order in which they were collected), and the
140         #    individual values are stored in network byte order
141         #    (big-endian). The data type used to store the data is
142         #    specified by the "channel.*.data.type" property, and is
143         #    either short (2 bytes per value), integer (4 bytes), or
144         #    float (4 bytes, IEEE format).
145         f.close()
146         return data
147
148     def _zip_translate_params(self, params, chan_info):
149         info = {
150             'raw info':params,
151             #'time':self._time_from_TODO(raw_info[]),
152             }
153         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
154         assert force_unit == 'N', force_unit
155         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
156         assert force_base == 'distance', force_base
157         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
158         assert dist_unit == 'm', dist_unit
159         force_mult = float(
160             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
161         info['spring constant (N/m)'] = force_mult
162         return info
163
164     def _zip_translate_segment_params(self, params):
165         info = {
166             'raw info':params,
167             'columns':list(params['channels']['list']),
168             'name':params['force-segment-header']['name']['name'],
169             }
170         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
171             info['name'] = info['name'][:-len('-spm')]
172             if info['name'] == 'extend':
173                 info['name'] = 'approach'
174         else:
175             raise NotImplementedError(
176                 'Unrecognized segment type %s' % info['name'])
177         return info
178
179     def _zip_scale_segment(self, segment, path, info):
180         data = curve.Data(
181             shape=segment.shape,
182             dtype=segment.dtype,
183             info={})
184         data[:,:] = segment
185         segment.info['raw data'] = data
186
187         # raw column indices
188         channels = segment.info['raw info']['channels']['list']
189         for i,channel in enumerate(channels):
190             conversion = None
191             if channel == 'vDeflection':
192                 conversion = 'distance'
193             segment = self._zip_scale_channel(
194                 segment, channel, conversion=conversion, path=path, info=info)
195             name,unit = split_data_label(segment.info['columns'][i])
196             if name == 'vDeflection':
197                 assert unit == 'm', segment.info['columns'][i]
198                 segment.info['columns'][i] = join_data_label('deflection', 'm')
199                 # Invert because deflection voltage increases as the
200                 # tip moves away from the surface, but it makes more
201                 # sense to me to have it increase as it moves toward
202                 # the surface (positive tension on the protein chain).
203                 segment[:,i] *= -1
204             elif name == 'height':
205                 assert unit == 'm', segment.info['columns'][i]
206                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
207         return segment
208
209     def _zip_scale_channel(self, segment, channel_name, conversion=None,
210                            path=None, info={}):
211         channel = segment.info['raw info']['channels']['list'].index(
212             channel_name)
213         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
214         if conversion == None:
215             conversion = conversion_set['conversions']['default']
216         if conversion == conversion_set['conversions']['base']:
217             # Our conversion is the base data.
218             if conversion != 'volts':
219                 raise NotImplementedError(
220                     'unknown units for base channel: %s' % conversion)
221             segment.info['columns'][channel] = join_data_label(
222                 channel_name, 'V')
223             return segment
224         conversion_info = conversion_set['conversion'][conversion]
225         if conversion_info['base-calibration-slot'] \
226                 != conversion_set['conversions']['base']:
227             # Our conversion is stacked on a previous conversion.  Do
228             # the previous conversion first.
229             segment = self._zip_scale_channel(
230                 segment, channel_name,
231                 conversion_info['base-calibration-slot'],
232                 path=path, info=info)
233         if conversion_info['type'] == 'file':
234             # Michael Haggerty at JPK points out that the conversion
235             # information stored in the external file is reproduced in
236             # the force curve file.  So there is no need to actually
237             # read `conversion_info['file']`.  In fact, the data there
238             # may have changed with future calibrations, while the
239             # information stored directly in conversion_info retains
240             # the calibration information as it was when the experiment
241             # was performed.
242             pass  # Fall through to 'simple' conversion processing.
243         else:
244             assert conversion_info['type'] == 'simple', conversion_info['type']
245         assert conversion_info['scaling']['type'] == 'linear', \
246             conversion_info['scaling']['type']
247         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
248             conversion_info['scaling']['style']
249         multiplier = float(conversion_info['scaling']['multiplier'])
250         offset = float(conversion_info['scaling']['offset'])
251         unit = conversion_info['scaling']['unit']['unit']
252         segment[:,channel] = segment[:,channel] * multiplier + offset
253         segment.info['columns'][channel] = join_data_label(channel_name, unit)
254         return segment
255
256     def _parse_params(self, lines):
257         info = {}
258         for line in lines:
259             line = line.strip()
260             if line.startswith('#'):
261                 continue
262             else:
263                 # e.g.: force-segment-header.type=xy-position-segment-header
264                 fields = line.split('=', 1)
265                 assert len(fields) == 2, line
266                 setting = fields[0].split('.')
267                 sub_info = info  # drill down, e.g. info['force-s..']['type']
268                 for s in setting[:-1]:
269                     if s not in sub_info:
270                         sub_info[s] = {}
271                     sub_info = sub_info[s]
272                 if setting[-1] == 'list':  # split a space-delimited list
273                     sub_info[setting[-1]] = fields[1].split(' ')
274                 else:
275                     sub_info[setting[-1]] = fields[1]
276         return info
277
278     def _read_old(self, path, info):
279         raise NotImplementedError(
280             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
281             "use JPK's `out2jpk-force` script to convert your old files "
282             "to a more recent format before loading them with Hooke.")