I'm using the 0.12 specs for the JPK driver, so allow version 0.12 files.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             version = zip_info['file-format-version']
77             segments = []
78             for i in range(len([p for p in f.namelist()
79                                 if p.endswith('segment-header.properties')])):
80                 segments.append(self._zip_segment(
81                         f, path, info, zip_info, i, version))
82         if version not in ['0.%d' % i for i in range(13)]:
83             raise NotImplementedError(
84                 'JPK file version %s not supported (yet).' % version)
85         curve_info = self._zip_translate_params(
86             zip_info, segments[0].info['raw info'], version)
87         for segment in segments:  # HACK, should use curve-level spring constant
88             segment.info['spring constant (N/m)'] = \
89                 curve_info['spring constant (N/m)']
90         return (segments, curve_info)
91
92     def _zip_info(self, zipfile):
93         with Closing(zipfile.open('header.properties')) as f:
94             info = self._parse_params(f.readlines())
95             return info
96
97     def _zip_segment(self, zipfile, path, info, zip_info, index, version):
98         prop_file = zipfile.open(os.path.join(
99                 'segments', str(index), 'segment-header.properties'))
100         prop = self._parse_params(prop_file.readlines())
101         prop_file.close()
102         expected_shape = (int(prop['force-segment-header']['num-points']),)
103         channels = []
104         if 'list' not in prop['channels']:
105             prop['channels'] = {'list': prop['channels'].split()}
106         for chan in prop['channels']['list']:
107             chan_info = prop['channel'][chan]
108             channels.append(self._zip_channel(
109                     zipfile, index, chan, chan_info))
110             if channels[-1].shape != expected_shape:
111                 raise NotImplementedError(
112                     'Channel %d:%s in %s has strange shape %s != %s'
113                     % (index, chan, zipfile.path,
114                        channels[-1].shape, expected_shape))
115         if len(channels) > 0:
116             shape = (len(channels[0]), len(channels))
117             dtype = channels[0].dtype
118         else:  # no channels for this data block
119             shape = (0,0)
120             dtype = numpy.float32
121         d = curve.Data(
122             shape=shape,
123             dtype=dtype,
124             info=self._zip_translate_segment_params(prop))
125         for i,chan in enumerate(channels):
126             d[:,i] = chan
127         return self._zip_scale_segment(d, path, info, version)
128
129     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
130         if chan_info['data']['type'] in ['constant-data', 'raster-data']:
131             return self._zip_calculate_channel(chan_info)
132         with Closing(zipfile.open(os.path.join(
133                     'segments', str(segment_index),
134                     chan_info['data']['file']['name']), 'r')) as f:
135             assert chan_info['data']['file']['format'] == 'raw', \
136                 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
137             dtype = self._zip_channel_dtype(chan_info)
138             data = numpy.frombuffer(
139                 buffer(f.read()),
140                 dtype=dtype,)
141         return data
142
143     def _zip_calculate_channel(self, chan_info):
144         type_ = chan_info['data']['type']
145         n = int(chan_info['data']['num-points'])
146         if type_ == 'constant-data':
147             return float(chan_info['data']['value'])*numpy.ones(
148                 shape=(n,),
149                 dtype=numpy.float32)
150         elif type_ == 'raster-data':
151             start = float(chan_info['data']['start'])
152             step = float(chan_info['data']['step'])
153             return numpy.arange(
154                 start=start,
155                 stop=start + step*(n-0.5),
156                 step=step,
157                 dtype=numpy.float32)
158         else:
159             raise ValueError('Unrecognized data format "%s"' % type_)
160
161     def _zip_channel_dtype(self, chan_info):
162         type_ = chan_info['data']['type']
163         if type_ in ['float-data', 'float']:
164             dtype = numpy.dtype(numpy.float32)
165         elif type_ in ['integer-data', 'memory-integer-data']:
166             encoder = chan_info['data']['encoder']['type']
167             if encoder in ['signedinteger', 'signedinteger-limited']:
168                 dtype = numpy.dtype(numpy.int32)
169             elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
170                 dtype = numpy.dtype(numpy.uint32)
171             else:
172                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
173                                  % (encoder, type_))
174         elif type_ in ['short-data', 'short', 'memory-short-data']:
175             encoder = chan_info['data']['encoder']['type']
176             if encoder in ['signedshort', 'signedshort-limited']:
177                 dtype = numpy.dtype(numpy.int16)
178             elif encoder in ['unsignedshort', 'unsignedshort-limited']:
179                 dtype = numpy.dtype(numpy.uint16)
180             else:
181                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
182                                  % (encoder, type_))
183         else:
184             raise ValueError('Unrecognized data format "%s"' % type_)
185         byte_order = '>'
186         # '>' (big endian) byte order.
187         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
188         #    All forms of raw data are stored in chronological order
189         #    (the order in which they were collected), and the
190         #    individual values are stored in network byte order
191         #    (big-endian). The data type used to store the data is
192         #    specified by the "channel.*.data.type" property, and is
193         #    either short (2 bytes per value), integer (4 bytes), or
194         #    float (4 bytes, IEEE format).
195         return dtype.newbyteorder(byte_order)
196
197     def _zip_translate_params(self, params, chan_info, version):
198         info = {
199             'raw info':params,
200             #'time':self._time_from_TODO(raw_info[]),
201             }
202         force_unit = self._zip_segment_conversion_unit(
203             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force'],
204             version)
205         assert force_unit == 'N', force_unit
206         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
207         assert force_base == 'distance', force_base
208         dist_unit = self._zip_segment_conversion_unit(
209             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance'],
210             version)
211         assert dist_unit == 'm', dist_unit
212         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
213         assert distance_base == 'volts', distance_base
214         # Assume volts unit is V, but it is not specified in the JPK
215         # file format.
216         force_mult = float(
217             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
218         sens_mult = float(
219             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
220         info['spring constant (N/m)'] = force_mult
221         info['z piezo sensitivity (m/V)'] = sens_mult
222         return info
223
224     def _zip_translate_segment_params(self, params):
225         info = {
226             'raw info': params,
227             'columns': list(params['channels']['list']),
228             'name': self._zip_segment_name(params),
229             }
230         return info
231
232     def _zip_segment_name(self, params):
233         name = params['force-segment-header']['name']['name']
234         if name.endswith('-spm'):
235             name = name[:-len('-spm')]
236         if name == 'extend':
237             name = 'approach'
238         elif name.startswith('pause-at-'):
239             name = 'pause'
240         return name
241
242     def _zip_scale_segment(self, segment, path, info, version):
243         data = curve.Data(
244             shape=segment.shape,
245             dtype=segment.dtype,
246             info={})
247         data[:,:] = segment
248         segment.info['raw data'] = data
249
250         # raw column indices
251         channels = segment.info['raw info']['channels']['list']
252         for i,channel in enumerate(channels):
253             conversion = None
254             if channel == 'vDeflection':
255                 conversion = 'distance'
256             segment = self._zip_scale_channel(
257                 segment, channel, conversion=conversion,
258                 path=path, info=info, version=version)
259             name,unit = split_data_label(segment.info['columns'][i])
260             if name == 'vDeflection':
261                 assert unit == 'm', segment.info['columns'][i]
262                 segment.info['columns'][i] = join_data_label('deflection', 'm')
263                 # Invert because deflection voltage increases as the
264                 # tip moves away from the surface, but it makes more
265                 # sense to me to have it increase as it moves toward
266                 # the surface (positive tension on the protein chain).
267                 segment[:,i] *= -1
268             elif name == 'height':
269                 assert unit == 'm', segment.info['columns'][i]
270                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
271         return segment
272
273     def _zip_scale_channel(self, segment, channel_name,
274                            conversion=None, path=None, info={}, version=None):
275         channel = segment.info['raw info']['channels']['list'].index(
276             channel_name)
277         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
278         if conversion == None:
279             conversion = conversion_set['conversions']['default']
280         if conversion == conversion_set['conversions']['base']:
281             # Our conversion is the base data.
282             if conversion != 'volts':
283                 raise NotImplementedError(
284                     'unknown units for base channel: %s' % conversion)
285             segment.info['columns'][channel] = join_data_label(
286                 channel_name, 'V')
287             return segment
288         conversion_info = conversion_set['conversion'][conversion]
289         if conversion_info['base-calibration-slot'] \
290                 != conversion_set['conversions']['base']:
291             # Our conversion is stacked on a previous conversion.  Do
292             # the previous conversion first.
293             segment = self._zip_scale_channel(
294                 segment, channel_name,
295                 conversion_info['base-calibration-slot'],
296                 path=path, info=info, version=version)
297         if conversion_info['type'] == 'file':
298             # Michael Haggerty at JPK points out that the conversion
299             # information stored in the external file is reproduced in
300             # the force curve file.  So there is no need to actually
301             # read `conversion_info['file']`.  In fact, the data there
302             # may have changed with future calibrations, while the
303             # information stored directly in conversion_info retains
304             # the calibration information as it was when the experiment
305             # was performed.
306             pass  # Fall through to 'simple' conversion processing.
307         else:
308             assert conversion_info['type'] == 'simple', conversion_info['type']
309         assert conversion_info['scaling']['type'] == 'linear', \
310             conversion_info['scaling']['type']
311         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
312             conversion_info['scaling']['style']
313         multiplier = float(conversion_info['scaling']['multiplier'])
314         offset = float(conversion_info['scaling']['offset'])
315         unit = self._zip_segment_conversion_unit(conversion_info, version)
316         segment[:,channel] = segment[:,channel] * multiplier + offset
317         segment.info['columns'][channel] = join_data_label(channel_name, unit)
318         return segment
319
320     def _zip_segment_conversion_unit(self, conversion_info, version):
321         if version in ['0.%d' % i for i in range(3)]:
322             return conversion_info['scaling']['unit']
323         else:
324             return conversion_info['scaling']['unit']['unit']
325
326     def _parse_params(self, lines):
327         info = {}
328         for line in lines:
329             line = line.strip()
330             if line.startswith('#'):
331                 continue
332             else:
333                 # e.g.: force-segment-header.type=xy-position-segment-header
334                 fields = line.split('=', 1)
335                 assert len(fields) == 2, line
336                 setting = fields[0].split('.')
337                 sub_info = info  # drill down, e.g. info['force-s..']['type']
338                 for s in setting[:-1]:
339                     if s not in sub_info:
340                         sub_info[s] = {}
341                     sub_info = sub_info[s]
342                 if setting[-1] == 'list':  # split a space-delimited list
343                     sub_info[setting[-1]] = fields[1].split(' ')
344                 else:
345                     sub_info[setting[-1]] = fields[1]
346         return info
347
348     def _read_old(self, path, info):
349         raise NotImplementedError(
350             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
351             "use JPK's `out2jpk-force` script to convert your old files "
352             "to a more recent format before loading them with Hooke.")