Skip some param translation in the JPK driver for channel-less segments.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             version = zip_info['file-format-version']
77             segments = []
78             for i in range(len([p for p in f.namelist()
79                                 if p.endswith('segment-header.properties')])):
80                 segments.append(self._zip_segment(
81                         f, path, info, zip_info, i, version))
82         if version not in ['0.%d' % i for i in range(13)]:
83             raise NotImplementedError(
84                 'JPK file version %s not supported (yet).' % version)
85         curve_info = self._zip_translate_params(
86             zip_info, segments[0].info['raw info'], version)
87         for segment in segments:  # HACK, should use curve-level spring constant
88             segment.info['spring constant (N/m)'] = \
89                 curve_info['spring constant (N/m)']
90         return (segments, curve_info)
91
92     def _zip_info(self, zipfile):
93         with Closing(zipfile.open('header.properties')) as f:
94             info = self._parse_params(f.readlines())
95             return info
96
97     def _zip_segment(self, zipfile, path, info, zip_info, index, version):
98         prop_file = zipfile.open(os.path.join(
99                 'segments', str(index), 'segment-header.properties'))
100         prop = self._parse_params(prop_file.readlines())
101         prop_file.close()
102         expected_shape = (int(prop['force-segment-header']['num-points']),)
103         channels = []
104         if 'list' not in prop['channels']:
105             prop['channels'] = {'list': prop['channels'].split()}
106         for chan in prop['channels']['list']:
107             chan_info = prop['channel'][chan]
108             channels.append(self._zip_channel(
109                     zipfile, index, chan, chan_info))
110             if channels[-1].shape != expected_shape:
111                 raise NotImplementedError(
112                     'Channel %d:%s in %s has strange shape %s != %s'
113                     % (index, chan, zipfile.path,
114                        channels[-1].shape, expected_shape))
115         if len(channels) > 0:
116             shape = (len(channels[0]), len(channels))
117             dtype = channels[0].dtype
118         else:  # no channels for this data block
119             shape = (0,0)
120             dtype = numpy.float32
121         d = curve.Data(
122             shape=shape,
123             dtype=dtype,
124             info=self._zip_translate_segment_params(prop))
125         for i,chan in enumerate(channels):
126             d[:,i] = chan
127         return self._zip_scale_segment(d, path, info, version)
128
129     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
130         if chan_info['data']['type'] in ['constant-data', 'raster-data']:
131             return self._zip_calculate_channel(chan_info)
132         with Closing(zipfile.open(os.path.join(
133                     'segments', str(segment_index),
134                     chan_info['data']['file']['name']), 'r')) as f:
135             assert chan_info['data']['file']['format'] == 'raw', \
136                 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
137             dtype = self._zip_channel_dtype(chan_info)
138             data = numpy.frombuffer(
139                 buffer(f.read()),
140                 dtype=dtype,)
141         return data
142
143     def _zip_calculate_channel(self, chan_info):
144         type_ = chan_info['data']['type']
145         n = int(chan_info['data']['num-points'])
146         if type_ == 'constant-data':
147             return float(chan_info['data']['value'])*numpy.ones(
148                 shape=(n,),
149                 dtype=numpy.float32)
150         elif type_ == 'raster-data':
151             start = float(chan_info['data']['start'])
152             step = float(chan_info['data']['step'])
153             return numpy.arange(
154                 start=start,
155                 stop=start + step*(n-0.5),
156                 step=step,
157                 dtype=numpy.float32)
158         else:
159             raise ValueError('Unrecognized data format "%s"' % type_)
160
161     def _zip_channel_dtype(self, chan_info):
162         type_ = chan_info['data']['type']
163         if type_ in ['float-data', 'float']:
164             dtype = numpy.dtype(numpy.float32)
165         elif type_ in ['integer-data', 'memory-integer-data']:
166             encoder = chan_info['data']['encoder']['type']
167             if encoder in ['signedinteger', 'signedinteger-limited']:
168                 dtype = numpy.dtype(numpy.int32)
169             elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
170                 dtype = numpy.dtype(numpy.uint32)
171             else:
172                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
173                                  % (encoder, type_))
174         elif type_ in ['short-data', 'short', 'memory-short-data']:
175             encoder = chan_info['data']['encoder']['type']
176             if encoder in ['signedshort', 'signedshort-limited']:
177                 dtype = numpy.dtype(numpy.int16)
178             elif encoder in ['unsignedshort', 'unsignedshort-limited']:
179                 dtype = numpy.dtype(numpy.uint16)
180             else:
181                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
182                                  % (encoder, type_))
183         else:
184             raise ValueError('Unrecognized data format "%s"' % type_)
185         byte_order = '>'
186         # '>' (big endian) byte order.
187         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
188         #    All forms of raw data are stored in chronological order
189         #    (the order in which they were collected), and the
190         #    individual values are stored in network byte order
191         #    (big-endian). The data type used to store the data is
192         #    specified by the "channel.*.data.type" property, and is
193         #    either short (2 bytes per value), integer (4 bytes), or
194         #    float (4 bytes, IEEE format).
195         return dtype.newbyteorder(byte_order)
196
197     def _zip_translate_params(self, params, chan_info, version):
198         info = {
199             'raw info':params,
200             #'time':self._time_from_TODO(raw_info[]),
201             }
202         if len(chan_info['channels']['list']) == 0:
203             return info
204         force_unit = self._zip_unit(
205             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling'],
206             version)
207         assert force_unit == 'N', force_unit
208         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
209         assert force_base == 'distance', force_base
210         dist_unit = self._zip_unit(
211             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling'],
212             version)
213         assert dist_unit == 'm', dist_unit
214         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
215         assert distance_base == 'volts', distance_base
216         base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base']
217         assert base_conversion == distance_base, base_conversion
218         distance_base_unit = self._zip_unit(
219             chan_info['channel']['vDeflection']['data'],
220             version)
221         assert distance_base_unit == 'V', distance_base_unit
222         force_mult = float(
223             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
224         sens_mult = float(
225             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
226         info['spring constant (N/m)'] = force_mult
227         info['z piezo sensitivity (m/V)'] = sens_mult
228         return info
229
230     def _zip_translate_segment_params(self, params):
231         info = {
232             'raw info': params,
233             'columns': list(params['channels']['list']),
234             'name': self._zip_segment_name(params),
235             }
236         return info
237
238     def _zip_segment_name(self, params):
239         name = params['force-segment-header']['name']['name']
240         if name.endswith('-spm'):
241             name = name[:-len('-spm')]
242         if name == 'extend':
243             name = 'approach'
244         elif name.startswith('pause-at-'):
245             name = 'pause'
246         return name
247
248     def _zip_scale_segment(self, segment, path, info, version):
249         data = curve.Data(
250             shape=segment.shape,
251             dtype=segment.dtype,
252             info={})
253         data[:,:] = segment
254         segment.info['raw data'] = data
255
256         # raw column indices
257         channels = segment.info['raw info']['channels']['list']
258         for i,channel in enumerate(channels):
259             conversion = None
260             if channel == 'vDeflection':
261                 conversion = 'distance'
262             segment = self._zip_scale_channel(
263                 segment, channel, conversion=conversion,
264                 path=path, info=info, version=version)
265             name,unit = split_data_label(segment.info['columns'][i])
266             if name == 'vDeflection':
267                 assert unit == 'm', segment.info['columns'][i]
268                 segment.info['columns'][i] = join_data_label('deflection', 'm')
269                 # Invert because deflection voltage increases as the
270                 # tip moves away from the surface, but it makes more
271                 # sense to me to have it increase as it moves toward
272                 # the surface (positive tension on the protein chain).
273                 segment[:,i] *= -1
274             elif name == 'height':
275                 assert unit == 'm', segment.info['columns'][i]
276                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
277         return segment
278
279     def _zip_scale_channel(self, segment, channel_name,
280                            conversion=None, path=None, info={}, version=None):
281         channel = segment.info['raw info']['channels']['list'].index(
282             channel_name)
283         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
284         if conversion == None:
285             conversion = conversion_set['conversions']['default']
286         if conversion == conversion_set['conversions']['base']:
287             segment.info['columns'][channel] = join_data_label(
288                 channel_name,
289                 self._zip_unit(
290                     segment.info['raw info']['channel'][channel_name]['data'],
291                     version))
292             return segment
293         conversion_info = conversion_set['conversion'][conversion]
294         if conversion_info['base-calibration-slot'] \
295                 != conversion_set['conversions']['base']:
296             # Our conversion is stacked on a previous conversion.  Do
297             # the previous conversion first.
298             segment = self._zip_scale_channel(
299                 segment, channel_name,
300                 conversion_info['base-calibration-slot'],
301                 path=path, info=info, version=version)
302         if conversion_info['type'] == 'file':
303             # Michael Haggerty at JPK points out that the conversion
304             # information stored in the external file is reproduced in
305             # the force curve file.  So there is no need to actually
306             # read `conversion_info['file']`.  In fact, the data there
307             # may have changed with future calibrations, while the
308             # information stored directly in conversion_info retains
309             # the calibration information as it was when the experiment
310             # was performed.
311             pass  # Fall through to 'simple' conversion processing.
312         else:
313             assert conversion_info['type'] == 'simple', conversion_info['type']
314         assert conversion_info['scaling']['type'] == 'linear', \
315             conversion_info['scaling']['type']
316         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
317             conversion_info['scaling']['style']
318         multiplier = float(conversion_info['scaling']['multiplier'])
319         offset = float(conversion_info['scaling']['offset'])
320         unit = self._zip_unit(conversion_info['scaling'], version)
321         segment[:,channel] = segment[:,channel] * multiplier + offset
322         segment.info['columns'][channel] = join_data_label(channel_name, unit)
323         return segment
324
325     def _zip_unit(self, conversion_info, version):
326         if version in ['0.%d' % i for i in range(3)]:
327             return conversion_info['unit']
328         else:
329             return conversion_info['unit']['unit']
330
331     def _parse_params(self, lines):
332         info = {}
333         for line in lines:
334             line = line.strip()
335             if line.startswith('#'):
336                 continue
337             else:
338                 # e.g.: force-segment-header.type=xy-position-segment-header
339                 fields = line.split('=', 1)
340                 assert len(fields) == 2, line
341                 setting = fields[0].split('.')
342                 sub_info = info  # drill down, e.g. info['force-s..']['type']
343                 for s in setting[:-1]:
344                     if s not in sub_info:
345                         sub_info[s] = {}
346                     sub_info = sub_info[s]
347                 if setting[-1] == 'list':  # split a space-delimited list
348                     sub_info[setting[-1]] = fields[1].split(' ')
349                 else:
350                     sub_info[setting[-1]] = fields[1]
351         return info
352
353     def _read_old(self, path, info):
354         raise NotImplementedError(
355             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
356             "use JPK's `out2jpk-force` script to convert your old files "
357             "to a more recent format before loading them with Hooke.")