Add support for {,u}int{16,32} segments in JPK driver.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             segments = []
77             for i in range(len([p for p in f.namelist()
78                                 if p.endswith('segment-header.properties')])):
79                 segments.append(self._zip_segment(f, path, info, zip_info, i))
80         if zip_info['file-format-version'] not in ['0.%d' % i
81                                                    for i in range(12)]:
82             raise NotImplementedError(
83                 'JPK file version %s not supported (yet).'
84                 % zip_info['file-format-version'])
85         curve_info = self._zip_translate_params(zip_info,
86                                                 segments[0].info['raw info'])
87         for segment in segments:  # HACK, should use curve-level spring constant
88             segment.info['spring constant (N/m)'] = \
89                 curve_info['spring constant (N/m)']
90         return (segments, curve_info)
91
92     def _zip_info(self, zipfile):
93         with Closing(zipfile.open('header.properties')) as f:
94             info = self._parse_params(f.readlines())
95             return info
96
97     def _zip_segment(self, zipfile, path, info, zip_info, index):
98         prop_file = zipfile.open(os.path.join(
99                 'segments', str(index), 'segment-header.properties'))
100         prop = self._parse_params(prop_file.readlines())
101         prop_file.close()
102         expected_shape = (int(prop['force-segment-header']['num-points']),)
103         channels = []
104         if 'list' not in prop['channels']:
105             prop['channels'] = {'list': prop['channels'].split()}
106         for chan in prop['channels']['list']:
107             chan_info = prop['channel'][chan]
108             channels.append(self._zip_channel(
109                     zipfile, index, chan, chan_info))
110             if channels[-1].shape != expected_shape:
111                 raise NotImplementedError(
112                     'Channel %d:%s in %s has strange shape %s != %s'
113                     % (index, chan, zipfile.path,
114                        channels[-1].shape, expected_shape))
115         if len(channels) > 0:
116             shape = (len(channels[0]), len(channels))
117             dtype = channels[0].dtype
118         else:  # no channels for this data block
119             shape = (0,0)
120             dtype = numpy.float32
121         d = curve.Data(
122             shape=shape,
123             dtype=dtype,
124             info=self._zip_translate_segment_params(prop))
125         for i,chan in enumerate(channels):
126             d[:,i] = chan
127         return self._zip_scale_segment(d, path, info)
128
129     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
130         with Closing(zipfile.open(os.path.join(
131                     'segments', str(segment_index),
132                     chan_info['data']['file']['name']), 'r')) as f:
133             assert chan_info['data']['file']['format'] == 'raw', \
134                 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
135             dtype = self._zip_channel_dtype(chan_info)
136             data = numpy.frombuffer(
137                 buffer(f.read()),
138                 dtype=dtype,)
139         return data
140
141     def _zip_channel_dtype(self, chan_info):
142         type_ = chan_info['data']['type']
143         if type_ in ['float-data', 'float']:
144             dtype = numpy.dtype(numpy.float32)
145         elif type_ in ['integer-data', 'memory-integer-data']:
146             encoder = chan_info['data']['encoder']['type']
147             if encoder in ['signedinteger', 'signedinteger-limited']:
148                 dtype = numpy.dtype(numpy.int32)
149             elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
150                 dtype = numpy.dtype(numpy.uint32)
151             else:
152                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
153                                  % (encoder, type_))
154         elif type_ in ['short-data', 'short', 'memory-short-data']:
155             encoder = chan_info['data']['encoder']['type']
156             if encoder in ['signedshort', 'signedshort-limited']:
157                 dtype = numpy.dtype(numpy.int16)
158             elif encoder in ['unsignedshort', 'unsignedshort-limited']:
159                 dtype = numpy.dtype(numpy.uint16)
160             else:
161                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
162                                  % (encoder, type_))
163         else:
164             raise ValueError('Unrecognized data format "%s"' % type_)
165         byte_order = '>'
166         # '>' (big endian) byte order.
167         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
168         #    All forms of raw data are stored in chronological order
169         #    (the order in which they were collected), and the
170         #    individual values are stored in network byte order
171         #    (big-endian). The data type used to store the data is
172         #    specified by the "channel.*.data.type" property, and is
173         #    either short (2 bytes per value), integer (4 bytes), or
174         #    float (4 bytes, IEEE format).
175         return dtype.newbyteorder(byte_order)
176
177     def _zip_translate_params(self, params, chan_info):
178         info = {
179             'raw info':params,
180             #'time':self._time_from_TODO(raw_info[]),
181             }
182         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
183         assert force_unit == 'N', force_unit
184         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
185         assert force_base == 'distance', force_base
186         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
187         assert dist_unit == 'm', dist_unit
188         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
189         assert distance_base == 'volts', distance_base
190         # Assume volts unit is V, but it is not specified in the JPK
191         # file format.
192         force_mult = float(
193             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
194         sens_mult = float(
195             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
196         info['spring constant (N/m)'] = force_mult
197         info['z piezo sensitivity (m/V)'] = sens_mult
198         return info
199
200     def _zip_translate_segment_params(self, params):
201         info = {
202             'raw info': params,
203             'columns': list(params['channels']['list']),
204             'name': self._zip_segment_name(params),
205             }
206         return info
207
208     def _zip_segment_name(self, params):
209         name = params['force-segment-header']['name']['name']
210         if name.endswith('-spm'):
211             name = name[:-len('-spm')]
212         if name == 'extend':
213             name = 'approach'
214         elif name.startswith('pause-at-'):
215             name = 'pause'
216         return name
217
218     def _zip_scale_segment(self, segment, path, info):
219         data = curve.Data(
220             shape=segment.shape,
221             dtype=segment.dtype,
222             info={})
223         data[:,:] = segment
224         segment.info['raw data'] = data
225
226         # raw column indices
227         channels = segment.info['raw info']['channels']['list']
228         for i,channel in enumerate(channels):
229             conversion = None
230             if channel == 'vDeflection':
231                 conversion = 'distance'
232             segment = self._zip_scale_channel(
233                 segment, channel, conversion=conversion, path=path, info=info)
234             name,unit = split_data_label(segment.info['columns'][i])
235             if name == 'vDeflection':
236                 assert unit == 'm', segment.info['columns'][i]
237                 segment.info['columns'][i] = join_data_label('deflection', 'm')
238                 # Invert because deflection voltage increases as the
239                 # tip moves away from the surface, but it makes more
240                 # sense to me to have it increase as it moves toward
241                 # the surface (positive tension on the protein chain).
242                 segment[:,i] *= -1
243             elif name == 'height':
244                 assert unit == 'm', segment.info['columns'][i]
245                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
246         return segment
247
248     def _zip_scale_channel(self, segment, channel_name, conversion=None,
249                            path=None, info={}):
250         channel = segment.info['raw info']['channels']['list'].index(
251             channel_name)
252         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
253         if conversion == None:
254             conversion = conversion_set['conversions']['default']
255         if conversion == conversion_set['conversions']['base']:
256             # Our conversion is the base data.
257             if conversion != 'volts':
258                 raise NotImplementedError(
259                     'unknown units for base channel: %s' % conversion)
260             segment.info['columns'][channel] = join_data_label(
261                 channel_name, 'V')
262             return segment
263         conversion_info = conversion_set['conversion'][conversion]
264         if conversion_info['base-calibration-slot'] \
265                 != conversion_set['conversions']['base']:
266             # Our conversion is stacked on a previous conversion.  Do
267             # the previous conversion first.
268             segment = self._zip_scale_channel(
269                 segment, channel_name,
270                 conversion_info['base-calibration-slot'],
271                 path=path, info=info)
272         if conversion_info['type'] == 'file':
273             # Michael Haggerty at JPK points out that the conversion
274             # information stored in the external file is reproduced in
275             # the force curve file.  So there is no need to actually
276             # read `conversion_info['file']`.  In fact, the data there
277             # may have changed with future calibrations, while the
278             # information stored directly in conversion_info retains
279             # the calibration information as it was when the experiment
280             # was performed.
281             pass  # Fall through to 'simple' conversion processing.
282         else:
283             assert conversion_info['type'] == 'simple', conversion_info['type']
284         assert conversion_info['scaling']['type'] == 'linear', \
285             conversion_info['scaling']['type']
286         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
287             conversion_info['scaling']['style']
288         multiplier = float(conversion_info['scaling']['multiplier'])
289         offset = float(conversion_info['scaling']['offset'])
290         unit = conversion_info['scaling']['unit']['unit']
291         segment[:,channel] = segment[:,channel] * multiplier + offset
292         segment.info['columns'][channel] = join_data_label(channel_name, unit)
293         return segment
294
295     def _parse_params(self, lines):
296         info = {}
297         for line in lines:
298             line = line.strip()
299             if line.startswith('#'):
300                 continue
301             else:
302                 # e.g.: force-segment-header.type=xy-position-segment-header
303                 fields = line.split('=', 1)
304                 assert len(fields) == 2, line
305                 setting = fields[0].split('.')
306                 sub_info = info  # drill down, e.g. info['force-s..']['type']
307                 for s in setting[:-1]:
308                     if s not in sub_info:
309                         sub_info[s] = {}
310                     sub_info = sub_info[s]
311                 if setting[-1] == 'list':  # split a space-delimited list
312                     sub_info[setting[-1]] = fields[1].split(' ')
313                 else:
314                     sub_info[setting[-1]] = fields[1]
315         return info
316
317     def _read_old(self, path, info):
318         raise NotImplementedError(
319             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
320             "use JPK's `out2jpk-force` script to convert your old files "
321             "to a more recent format before loading them with Hooke.")