Improve handling of the base calibration slot units in JPK driver.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             version = zip_info['file-format-version']
77             segments = []
78             for i in range(len([p for p in f.namelist()
79                                 if p.endswith('segment-header.properties')])):
80                 segments.append(self._zip_segment(
81                         f, path, info, zip_info, i, version))
82         if version not in ['0.%d' % i for i in range(13)]:
83             raise NotImplementedError(
84                 'JPK file version %s not supported (yet).' % version)
85         curve_info = self._zip_translate_params(
86             zip_info, segments[0].info['raw info'], version)
87         for segment in segments:  # HACK, should use curve-level spring constant
88             segment.info['spring constant (N/m)'] = \
89                 curve_info['spring constant (N/m)']
90         return (segments, curve_info)
91
92     def _zip_info(self, zipfile):
93         with Closing(zipfile.open('header.properties')) as f:
94             info = self._parse_params(f.readlines())
95             return info
96
97     def _zip_segment(self, zipfile, path, info, zip_info, index, version):
98         prop_file = zipfile.open(os.path.join(
99                 'segments', str(index), 'segment-header.properties'))
100         prop = self._parse_params(prop_file.readlines())
101         prop_file.close()
102         expected_shape = (int(prop['force-segment-header']['num-points']),)
103         channels = []
104         if 'list' not in prop['channels']:
105             prop['channels'] = {'list': prop['channels'].split()}
106         for chan in prop['channels']['list']:
107             chan_info = prop['channel'][chan]
108             channels.append(self._zip_channel(
109                     zipfile, index, chan, chan_info))
110             if channels[-1].shape != expected_shape:
111                 raise NotImplementedError(
112                     'Channel %d:%s in %s has strange shape %s != %s'
113                     % (index, chan, zipfile.path,
114                        channels[-1].shape, expected_shape))
115         if len(channels) > 0:
116             shape = (len(channels[0]), len(channels))
117             dtype = channels[0].dtype
118         else:  # no channels for this data block
119             shape = (0,0)
120             dtype = numpy.float32
121         d = curve.Data(
122             shape=shape,
123             dtype=dtype,
124             info=self._zip_translate_segment_params(prop))
125         for i,chan in enumerate(channels):
126             d[:,i] = chan
127         return self._zip_scale_segment(d, path, info, version)
128
129     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
130         if chan_info['data']['type'] in ['constant-data', 'raster-data']:
131             return self._zip_calculate_channel(chan_info)
132         with Closing(zipfile.open(os.path.join(
133                     'segments', str(segment_index),
134                     chan_info['data']['file']['name']), 'r')) as f:
135             assert chan_info['data']['file']['format'] == 'raw', \
136                 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
137             dtype = self._zip_channel_dtype(chan_info)
138             data = numpy.frombuffer(
139                 buffer(f.read()),
140                 dtype=dtype,)
141         return data
142
143     def _zip_calculate_channel(self, chan_info):
144         type_ = chan_info['data']['type']
145         n = int(chan_info['data']['num-points'])
146         if type_ == 'constant-data':
147             return float(chan_info['data']['value'])*numpy.ones(
148                 shape=(n,),
149                 dtype=numpy.float32)
150         elif type_ == 'raster-data':
151             start = float(chan_info['data']['start'])
152             step = float(chan_info['data']['step'])
153             return numpy.arange(
154                 start=start,
155                 stop=start + step*(n-0.5),
156                 step=step,
157                 dtype=numpy.float32)
158         else:
159             raise ValueError('Unrecognized data format "%s"' % type_)
160
161     def _zip_channel_dtype(self, chan_info):
162         type_ = chan_info['data']['type']
163         if type_ in ['float-data', 'float']:
164             dtype = numpy.dtype(numpy.float32)
165         elif type_ in ['integer-data', 'memory-integer-data']:
166             encoder = chan_info['data']['encoder']['type']
167             if encoder in ['signedinteger', 'signedinteger-limited']:
168                 dtype = numpy.dtype(numpy.int32)
169             elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
170                 dtype = numpy.dtype(numpy.uint32)
171             else:
172                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
173                                  % (encoder, type_))
174         elif type_ in ['short-data', 'short', 'memory-short-data']:
175             encoder = chan_info['data']['encoder']['type']
176             if encoder in ['signedshort', 'signedshort-limited']:
177                 dtype = numpy.dtype(numpy.int16)
178             elif encoder in ['unsignedshort', 'unsignedshort-limited']:
179                 dtype = numpy.dtype(numpy.uint16)
180             else:
181                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
182                                  % (encoder, type_))
183         else:
184             raise ValueError('Unrecognized data format "%s"' % type_)
185         byte_order = '>'
186         # '>' (big endian) byte order.
187         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
188         #    All forms of raw data are stored in chronological order
189         #    (the order in which they were collected), and the
190         #    individual values are stored in network byte order
191         #    (big-endian). The data type used to store the data is
192         #    specified by the "channel.*.data.type" property, and is
193         #    either short (2 bytes per value), integer (4 bytes), or
194         #    float (4 bytes, IEEE format).
195         return dtype.newbyteorder(byte_order)
196
197     def _zip_translate_params(self, params, chan_info, version):
198         info = {
199             'raw info':params,
200             #'time':self._time_from_TODO(raw_info[]),
201             }
202         force_unit = self._zip_unit(
203             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling'],
204             version)
205         assert force_unit == 'N', force_unit
206         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
207         assert force_base == 'distance', force_base
208         dist_unit = self._zip_unit(
209             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling'],
210             version)
211         assert dist_unit == 'm', dist_unit
212         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
213         assert distance_base == 'volts', distance_base
214         base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base']
215         assert base_conversion == distance_base, base_conversion
216         distance_base_unit = self._zip_unit(
217             chan_info['channel']['vDeflection']['data'],
218             version)
219         assert distance_base_unit == 'V', distance_base_unit
220         force_mult = float(
221             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
222         sens_mult = float(
223             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
224         info['spring constant (N/m)'] = force_mult
225         info['z piezo sensitivity (m/V)'] = sens_mult
226         return info
227
228     def _zip_translate_segment_params(self, params):
229         info = {
230             'raw info': params,
231             'columns': list(params['channels']['list']),
232             'name': self._zip_segment_name(params),
233             }
234         return info
235
236     def _zip_segment_name(self, params):
237         name = params['force-segment-header']['name']['name']
238         if name.endswith('-spm'):
239             name = name[:-len('-spm')]
240         if name == 'extend':
241             name = 'approach'
242         elif name.startswith('pause-at-'):
243             name = 'pause'
244         return name
245
246     def _zip_scale_segment(self, segment, path, info, version):
247         data = curve.Data(
248             shape=segment.shape,
249             dtype=segment.dtype,
250             info={})
251         data[:,:] = segment
252         segment.info['raw data'] = data
253
254         # raw column indices
255         channels = segment.info['raw info']['channels']['list']
256         for i,channel in enumerate(channels):
257             conversion = None
258             if channel == 'vDeflection':
259                 conversion = 'distance'
260             segment = self._zip_scale_channel(
261                 segment, channel, conversion=conversion,
262                 path=path, info=info, version=version)
263             name,unit = split_data_label(segment.info['columns'][i])
264             if name == 'vDeflection':
265                 assert unit == 'm', segment.info['columns'][i]
266                 segment.info['columns'][i] = join_data_label('deflection', 'm')
267                 # Invert because deflection voltage increases as the
268                 # tip moves away from the surface, but it makes more
269                 # sense to me to have it increase as it moves toward
270                 # the surface (positive tension on the protein chain).
271                 segment[:,i] *= -1
272             elif name == 'height':
273                 assert unit == 'm', segment.info['columns'][i]
274                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
275         return segment
276
277     def _zip_scale_channel(self, segment, channel_name,
278                            conversion=None, path=None, info={}, version=None):
279         channel = segment.info['raw info']['channels']['list'].index(
280             channel_name)
281         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
282         if conversion == None:
283             conversion = conversion_set['conversions']['default']
284         if conversion == conversion_set['conversions']['base']:
285             segment.info['columns'][channel] = join_data_label(
286                 channel_name,
287                 self._zip_unit(
288                     segment.info['raw info']['channel'][channel_name]['data'],
289                     version))
290             return segment
291         conversion_info = conversion_set['conversion'][conversion]
292         if conversion_info['base-calibration-slot'] \
293                 != conversion_set['conversions']['base']:
294             # Our conversion is stacked on a previous conversion.  Do
295             # the previous conversion first.
296             segment = self._zip_scale_channel(
297                 segment, channel_name,
298                 conversion_info['base-calibration-slot'],
299                 path=path, info=info, version=version)
300         if conversion_info['type'] == 'file':
301             # Michael Haggerty at JPK points out that the conversion
302             # information stored in the external file is reproduced in
303             # the force curve file.  So there is no need to actually
304             # read `conversion_info['file']`.  In fact, the data there
305             # may have changed with future calibrations, while the
306             # information stored directly in conversion_info retains
307             # the calibration information as it was when the experiment
308             # was performed.
309             pass  # Fall through to 'simple' conversion processing.
310         else:
311             assert conversion_info['type'] == 'simple', conversion_info['type']
312         assert conversion_info['scaling']['type'] == 'linear', \
313             conversion_info['scaling']['type']
314         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
315             conversion_info['scaling']['style']
316         multiplier = float(conversion_info['scaling']['multiplier'])
317         offset = float(conversion_info['scaling']['offset'])
318         unit = self._zip_unit(conversion_info['scaling'], version)
319         segment[:,channel] = segment[:,channel] * multiplier + offset
320         segment.info['columns'][channel] = join_data_label(channel_name, unit)
321         return segment
322
323     def _zip_unit(self, conversion_info, version):
324         if version in ['0.%d' % i for i in range(3)]:
325             return conversion_info['unit']
326         else:
327             return conversion_info['unit']['unit']
328
329     def _parse_params(self, lines):
330         info = {}
331         for line in lines:
332             line = line.strip()
333             if line.startswith('#'):
334                 continue
335             else:
336                 # e.g.: force-segment-header.type=xy-position-segment-header
337                 fields = line.split('=', 1)
338                 assert len(fields) == 2, line
339                 setting = fields[0].split('.')
340                 sub_info = info  # drill down, e.g. info['force-s..']['type']
341                 for s in setting[:-1]:
342                     if s not in sub_info:
343                         sub_info[s] = {}
344                     sub_info = sub_info[s]
345                 if setting[-1] == 'list':  # split a space-delimited list
346                     sub_info[setting[-1]] = fields[1].split(' ')
347                 else:
348                     sub_info[setting[-1]] = fields[1]
349         return info
350
351     def _read_old(self, path, info):
352         raise NotImplementedError(
353             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
354             "use JPK's `out2jpk-force` script to convert your old files "
355             "to a more recent format before loading them with Hooke.")