Fix distance_base_unit extraction in JPK driver for encoded data.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             version = zip_info['file-format-version']
77             segments = []
78             for i in range(len([p for p in f.namelist()
79                                 if p.endswith('segment-header.properties')])):
80                 segments.append(self._zip_segment(
81                         f, path, info, zip_info, i, version))
82         if version not in ['0.%d' % i for i in range(13)]:
83             raise NotImplementedError(
84                 'JPK file version %s not supported (yet).' % version)
85         curve_info = self._zip_translate_params(
86             zip_info, segments[0].info['raw info'], version)
87         for segment in segments:  # HACK, should use curve-level spring constant
88             for key in ['spring constant (N/m)',
89                         'z piezo sensitivity (m/V)']:
90                 if key in curve_info:
91                     segment.info['spring constant (N/m)'] = \
92                         curve_info['spring constant (N/m)']
93             
94         return (segments, curve_info)
95
96     def _zip_info(self, zipfile):
97         with Closing(zipfile.open('header.properties')) as f:
98             info = self._parse_params(f.readlines())
99             return info
100
101     def _zip_segment(self, zipfile, path, info, zip_info, index, version):
102         prop_file = zipfile.open(os.path.join(
103                 'segments', str(index), 'segment-header.properties'))
104         prop = self._parse_params(prop_file.readlines())
105         prop_file.close()
106         expected_shape = (int(prop['force-segment-header']['num-points']),)
107         channels = []
108         if 'list' not in prop['channels']:
109             prop['channels'] = {'list': prop['channels'].split()}
110         for chan in prop['channels']['list']:
111             chan_info = prop['channel'][chan]
112             channels.append(self._zip_channel(
113                     zipfile, index, chan, chan_info))
114             if channels[-1].shape != expected_shape:
115                 raise NotImplementedError(
116                     'Channel %d:%s in %s has strange shape %s != %s'
117                     % (index, chan, zipfile.path,
118                        channels[-1].shape, expected_shape))
119         if len(channels) > 0:
120             shape = (len(channels[0]), len(channels))
121             dtype = channels[0].dtype
122         else:  # no channels for this data block
123             shape = (0,0)
124             dtype = numpy.float32
125         d = curve.Data(
126             shape=shape,
127             dtype=dtype,
128             info=self._zip_translate_segment_params(prop))
129         for i,chan in enumerate(channels):
130             d[:,i] = chan
131         return self._zip_scale_segment(d, path, info, version)
132
133     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
134         if chan_info['data']['type'] in ['constant-data', 'raster-data']:
135             return self._zip_calculate_channel(chan_info)
136         with Closing(zipfile.open(os.path.join(
137                     'segments', str(segment_index),
138                     chan_info['data']['file']['name']), 'r')) as f:
139             assert chan_info['data']['file']['format'] == 'raw', \
140                 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
141             dtype = self._zip_channel_dtype(chan_info)
142             data = numpy.frombuffer(
143                 buffer(f.read()),
144                 dtype=dtype,)
145         return data
146
147     def _zip_calculate_channel(self, chan_info):
148         type_ = chan_info['data']['type']
149         n = int(chan_info['data']['num-points'])
150         if type_ == 'constant-data':
151             return float(chan_info['data']['value'])*numpy.ones(
152                 shape=(n,),
153                 dtype=numpy.float32)
154         elif type_ == 'raster-data':
155             start = float(chan_info['data']['start'])
156             step = float(chan_info['data']['step'])
157             return numpy.arange(
158                 start=start,
159                 stop=start + step*(n-0.5),
160                 step=step,
161                 dtype=numpy.float32)
162         else:
163             raise ValueError('Unrecognized data format "%s"' % type_)
164
165     def _zip_channel_dtype(self, chan_info):
166         type_ = chan_info['data']['type']
167         if type_ in ['float-data', 'float']:
168             dtype = numpy.dtype(numpy.float32)
169         elif type_ in ['integer-data', 'memory-integer-data']:
170             encoder = chan_info['data']['encoder']['type']
171             if encoder in ['signedinteger', 'signedinteger-limited']:
172                 dtype = numpy.dtype(numpy.int32)
173             elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
174                 dtype = numpy.dtype(numpy.uint32)
175             else:
176                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
177                                  % (encoder, type_))
178         elif type_ in ['short-data', 'short', 'memory-short-data']:
179             encoder = chan_info['data']['encoder']['type']
180             if encoder in ['signedshort', 'signedshort-limited']:
181                 dtype = numpy.dtype(numpy.int16)
182             elif encoder in ['unsignedshort', 'unsignedshort-limited']:
183                 dtype = numpy.dtype(numpy.uint16)
184             else:
185                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
186                                  % (encoder, type_))
187         else:
188             raise ValueError('Unrecognized data format "%s"' % type_)
189         byte_order = '>'
190         # '>' (big endian) byte order.
191         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
192         #    All forms of raw data are stored in chronological order
193         #    (the order in which they were collected), and the
194         #    individual values are stored in network byte order
195         #    (big-endian). The data type used to store the data is
196         #    specified by the "channel.*.data.type" property, and is
197         #    either short (2 bytes per value), integer (4 bytes), or
198         #    float (4 bytes, IEEE format).
199         return dtype.newbyteorder(byte_order)
200
201     def _zip_translate_params(self, params, chan_info, version):
202         info = {
203             'raw info':params,
204             #'time':self._time_from_TODO(raw_info[]),
205             }
206         if len(chan_info['channels']['list']) == 0:
207             return info
208         force_unit = self._zip_unit(
209             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling'],
210             version)
211         assert force_unit == 'N', force_unit
212         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
213         assert force_base == 'distance', force_base
214         dist_unit = self._zip_unit(
215             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling'],
216             version)
217         assert dist_unit == 'm', dist_unit
218         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
219         assert distance_base == 'volts', distance_base
220         base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base']
221         assert base_conversion == distance_base, base_conversion
222         if 'encoder' in chan_info['channel']['vDeflection']['data']:
223             distance_base_unit = self._zip_unit(
224                 chan_info['channel']['vDeflection']['data']['encoder']['scaling'],
225                 version)
226         else:
227             distance_base_unit = self._zip_unit(
228                 chan_info['channel']['vDeflection']['data'],
229                 version)
230         assert distance_base_unit == 'V', distance_base_unit
231         force_mult = float(
232             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
233         sens_mult = float(
234             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
235         info['spring constant (N/m)'] = force_mult
236         info['z piezo sensitivity (m/V)'] = sens_mult
237         return info
238
239     def _zip_translate_segment_params(self, params):
240         info = {
241             'raw info': params,
242             'columns': list(params['channels']['list']),
243             'name': self._zip_segment_name(params),
244             }
245         return info
246
247     def _zip_segment_name(self, params):
248         name = params['force-segment-header']['name']['name']
249         if name.endswith('-spm'):
250             name = name[:-len('-spm')]
251         if name == 'extend':
252             name = 'approach'
253         elif name.startswith('pause-at-'):
254             name = 'pause'
255         return name
256
257     def _zip_scale_segment(self, segment, path, info, version):
258         data = curve.Data(
259             shape=segment.shape,
260             dtype=segment.dtype,
261             info={})
262         data[:,:] = segment
263         segment.info['raw data'] = data
264
265         # raw column indices
266         channels = segment.info['raw info']['channels']['list']
267         for i,channel in enumerate(channels):
268             conversion = None
269             if channel == 'vDeflection':
270                 conversion = 'distance'
271             segment = self._zip_scale_channel(
272                 segment, channel, conversion=conversion,
273                 path=path, info=info, version=version)
274             name,unit = split_data_label(segment.info['columns'][i])
275             if name == 'vDeflection':
276                 assert unit == 'm', segment.info['columns'][i]
277                 segment.info['columns'][i] = join_data_label('deflection', 'm')
278                 # Invert because deflection voltage increases as the
279                 # tip moves away from the surface, but it makes more
280                 # sense to me to have it increase as it moves toward
281                 # the surface (positive tension on the protein chain).
282                 segment[:,i] *= -1
283             elif name == 'height':
284                 assert unit == 'm', segment.info['columns'][i]
285                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
286         return segment
287
288     def _zip_scale_channel(self, segment, channel_name,
289                            conversion=None, path=None, info={}, version=None):
290         channel = segment.info['raw info']['channels']['list'].index(
291             channel_name)
292         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
293         if conversion == None:
294             conversion = conversion_set['conversions']['default']
295         if conversion == conversion_set['conversions']['base']:
296             segment.info['columns'][channel] = join_data_label(
297                 channel_name,
298                 self._zip_unit(
299                     segment.info['raw info']['channel'][channel_name]['data'],
300                     version))
301             return segment
302         conversion_info = conversion_set['conversion'][conversion]
303         if conversion_info['base-calibration-slot'] \
304                 != conversion_set['conversions']['base']:
305             # Our conversion is stacked on a previous conversion.  Do
306             # the previous conversion first.
307             segment = self._zip_scale_channel(
308                 segment, channel_name,
309                 conversion_info['base-calibration-slot'],
310                 path=path, info=info, version=version)
311         if conversion_info['type'] == 'file':
312             # Michael Haggerty at JPK points out that the conversion
313             # information stored in the external file is reproduced in
314             # the force curve file.  So there is no need to actually
315             # read `conversion_info['file']`.  In fact, the data there
316             # may have changed with future calibrations, while the
317             # information stored directly in conversion_info retains
318             # the calibration information as it was when the experiment
319             # was performed.
320             pass  # Fall through to 'simple' conversion processing.
321         else:
322             assert conversion_info['type'] == 'simple', conversion_info['type']
323         assert conversion_info['scaling']['type'] == 'linear', \
324             conversion_info['scaling']['type']
325         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
326             conversion_info['scaling']['style']
327         multiplier = float(conversion_info['scaling']['multiplier'])
328         offset = float(conversion_info['scaling']['offset'])
329         unit = self._zip_unit(conversion_info['scaling'], version)
330         segment[:,channel] = segment[:,channel] * multiplier + offset
331         segment.info['columns'][channel] = join_data_label(channel_name, unit)
332         return segment
333
334     def _zip_unit(self, conversion_info, version):
335         if version in ['0.%d' % i for i in range(3)]:
336             return conversion_info['unit']
337         else:
338             return conversion_info['unit']['unit']
339
340     def _parse_params(self, lines):
341         info = {}
342         for line in lines:
343             line = line.strip()
344             if line.startswith('#'):
345                 continue
346             else:
347                 # e.g.: force-segment-header.type=xy-position-segment-header
348                 fields = line.split('=', 1)
349                 assert len(fields) == 2, line
350                 setting = fields[0].split('.')
351                 sub_info = info  # drill down, e.g. info['force-s..']['type']
352                 for s in setting[:-1]:
353                     if s not in sub_info:
354                         sub_info[s] = {}
355                     sub_info = sub_info[s]
356                 if setting[-1] == 'list':  # split a space-delimited list
357                     if fields[1]:
358                         sub_info[setting[-1]] = fields[1].split(' ')
359                     else:
360                         sub_info[setting[-1]] = []
361                 else:
362                     sub_info[setting[-1]] = fields[1]
363         return info
364
365     def _read_old(self, path, info):
366         raise NotImplementedError(
367             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
368             "use JPK's `out2jpk-force` script to convert your old files "
369             "to a more recent format before loading them with Hooke.")