92c9a8da63c6ae21fd63bd9aa4abd834cbb93b4b
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             version = zip_info['file-format-version']
77             segments = []
78             for i in range(len([p for p in f.namelist()
79                                 if p.endswith('segment-header.properties')])):
80                 segments.append(self._zip_segment(
81                         f, path, info, zip_info, i, version))
82         if version not in ['0.%d' % i for i in range(13)]:
83             raise NotImplementedError(
84                 'JPK file version %s not supported (yet).' % version)
85         curve_info = self._zip_translate_params(
86             zip_info, segments[0].info['raw info'], version)
87         for segment in segments:  # HACK, should use curve-level spring constant
88             for key in ['spring constant (N/m)',
89                         'z piezo sensitivity (m/V)']:
90                 if key in curve_info:
91                     segment.info['spring constant (N/m)'] = \
92                         curve_info['spring constant (N/m)']
93             
94         return (segments, curve_info)
95
96     def _zip_info(self, zipfile):
97         with Closing(zipfile.open('header.properties')) as f:
98             info = self._parse_params(f.readlines())
99             return info
100
101     def _zip_segment(self, zipfile, path, info, zip_info, index, version):
102         prop_file = zipfile.open(os.path.join(
103                 'segments', str(index), 'segment-header.properties'))
104         prop = self._parse_params(prop_file.readlines())
105         prop_file.close()
106         expected_shape = (int(prop['force-segment-header']['num-points']),)
107         channels = []
108         if 'list' not in prop['channels']:
109             prop['channels'] = {'list': prop['channels'].split()}
110         for chan in prop['channels']['list']:
111             chan_info = prop['channel'][chan]
112             channels.append(self._zip_channel(
113                     zipfile, index, chan, chan_info))
114             if channels[-1].shape != expected_shape:
115                 raise NotImplementedError(
116                     'Channel %d:%s in %s has strange shape %s != %s'
117                     % (index, chan, zipfile.path,
118                        channels[-1].shape, expected_shape))
119         if len(channels) > 0:
120             shape = (len(channels[0]), len(channels))
121             dtype = channels[0].dtype
122         else:  # no channels for this data block
123             shape = (0,0)
124             dtype = numpy.float32
125         d = curve.Data(
126             shape=shape,
127             dtype=dtype,
128             info=self._zip_translate_segment_params(prop))
129         for i,chan in enumerate(channels):
130             d[:,i] = chan
131         return self._zip_scale_segment(d, path, info, version)
132
133     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
134         if chan_info['data']['type'] in ['constant-data', 'raster-data']:
135             return self._zip_calculate_channel(chan_info)
136         with Closing(zipfile.open(os.path.join(
137                     'segments', str(segment_index),
138                     chan_info['data']['file']['name']), 'r')) as f:
139             assert chan_info['data']['file']['format'] == 'raw', \
140                 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
141             dtype = self._zip_channel_dtype(chan_info)
142             data = numpy.frombuffer(
143                 buffer(f.read()),
144                 dtype=dtype,)
145         return data
146
147     def _zip_calculate_channel(self, chan_info):
148         type_ = chan_info['data']['type']
149         n = int(chan_info['data']['num-points'])
150         if type_ == 'constant-data':
151             return float(chan_info['data']['value'])*numpy.ones(
152                 shape=(n,),
153                 dtype=numpy.float32)
154         elif type_ == 'raster-data':
155             start = float(chan_info['data']['start'])
156             step = float(chan_info['data']['step'])
157             return numpy.arange(
158                 start=start,
159                 stop=start + step*(n-0.5),
160                 step=step,
161                 dtype=numpy.float32)
162         else:
163             raise ValueError('Unrecognized data format "%s"' % type_)
164
165     def _zip_channel_dtype(self, chan_info):
166         type_ = chan_info['data']['type']
167         if type_ in ['float-data', 'float']:
168             dtype = numpy.dtype(numpy.float32)
169         elif type_ in ['integer-data', 'memory-integer-data']:
170             encoder = chan_info['data']['encoder']['type']
171             if encoder in ['signedinteger', 'signedinteger-limited']:
172                 dtype = numpy.dtype(numpy.int32)
173             elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
174                 dtype = numpy.dtype(numpy.uint32)
175             else:
176                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
177                                  % (encoder, type_))
178         elif type_ in ['short-data', 'short', 'memory-short-data']:
179             encoder = chan_info['data']['encoder']['type']
180             if encoder in ['signedshort', 'signedshort-limited']:
181                 dtype = numpy.dtype(numpy.int16)
182             elif encoder in ['unsignedshort', 'unsignedshort-limited']:
183                 dtype = numpy.dtype(numpy.uint16)
184             else:
185                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
186                                  % (encoder, type_))
187         else:
188             raise ValueError('Unrecognized data format "%s"' % type_)
189         byte_order = '>'
190         # '>' (big endian) byte order.
191         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
192         #    All forms of raw data are stored in chronological order
193         #    (the order in which they were collected), and the
194         #    individual values are stored in network byte order
195         #    (big-endian). The data type used to store the data is
196         #    specified by the "channel.*.data.type" property, and is
197         #    either short (2 bytes per value), integer (4 bytes), or
198         #    float (4 bytes, IEEE format).
199         return dtype.newbyteorder(byte_order)
200
201     def _zip_translate_params(self, params, chan_info, version):
202         info = {
203             'raw info':params,
204             #'time':self._time_from_TODO(raw_info[]),
205             }
206         if len(chan_info['channels']['list']) == 0:
207             return info
208         force_unit = self._zip_unit(
209             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling'],
210             version)
211         assert force_unit == 'N', force_unit
212         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
213         assert force_base == 'distance', force_base
214         dist_unit = self._zip_unit(
215             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling'],
216             version)
217         assert dist_unit == 'm', dist_unit
218         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
219         assert distance_base == 'volts', distance_base
220         base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base']
221         assert base_conversion == distance_base, base_conversion
222         distance_base_unit = self._zip_unit(
223             chan_info['channel']['vDeflection']['data'],
224             version)
225         assert distance_base_unit == 'V', distance_base_unit
226         force_mult = float(
227             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
228         sens_mult = float(
229             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
230         info['spring constant (N/m)'] = force_mult
231         info['z piezo sensitivity (m/V)'] = sens_mult
232         return info
233
234     def _zip_translate_segment_params(self, params):
235         info = {
236             'raw info': params,
237             'columns': list(params['channels']['list']),
238             'name': self._zip_segment_name(params),
239             }
240         return info
241
242     def _zip_segment_name(self, params):
243         name = params['force-segment-header']['name']['name']
244         if name.endswith('-spm'):
245             name = name[:-len('-spm')]
246         if name == 'extend':
247             name = 'approach'
248         elif name.startswith('pause-at-'):
249             name = 'pause'
250         return name
251
252     def _zip_scale_segment(self, segment, path, info, version):
253         data = curve.Data(
254             shape=segment.shape,
255             dtype=segment.dtype,
256             info={})
257         data[:,:] = segment
258         segment.info['raw data'] = data
259
260         # raw column indices
261         channels = segment.info['raw info']['channels']['list']
262         for i,channel in enumerate(channels):
263             conversion = None
264             if channel == 'vDeflection':
265                 conversion = 'distance'
266             segment = self._zip_scale_channel(
267                 segment, channel, conversion=conversion,
268                 path=path, info=info, version=version)
269             name,unit = split_data_label(segment.info['columns'][i])
270             if name == 'vDeflection':
271                 assert unit == 'm', segment.info['columns'][i]
272                 segment.info['columns'][i] = join_data_label('deflection', 'm')
273                 # Invert because deflection voltage increases as the
274                 # tip moves away from the surface, but it makes more
275                 # sense to me to have it increase as it moves toward
276                 # the surface (positive tension on the protein chain).
277                 segment[:,i] *= -1
278             elif name == 'height':
279                 assert unit == 'm', segment.info['columns'][i]
280                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
281         return segment
282
283     def _zip_scale_channel(self, segment, channel_name,
284                            conversion=None, path=None, info={}, version=None):
285         channel = segment.info['raw info']['channels']['list'].index(
286             channel_name)
287         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
288         if conversion == None:
289             conversion = conversion_set['conversions']['default']
290         if conversion == conversion_set['conversions']['base']:
291             segment.info['columns'][channel] = join_data_label(
292                 channel_name,
293                 self._zip_unit(
294                     segment.info['raw info']['channel'][channel_name]['data'],
295                     version))
296             return segment
297         conversion_info = conversion_set['conversion'][conversion]
298         if conversion_info['base-calibration-slot'] \
299                 != conversion_set['conversions']['base']:
300             # Our conversion is stacked on a previous conversion.  Do
301             # the previous conversion first.
302             segment = self._zip_scale_channel(
303                 segment, channel_name,
304                 conversion_info['base-calibration-slot'],
305                 path=path, info=info, version=version)
306         if conversion_info['type'] == 'file':
307             # Michael Haggerty at JPK points out that the conversion
308             # information stored in the external file is reproduced in
309             # the force curve file.  So there is no need to actually
310             # read `conversion_info['file']`.  In fact, the data there
311             # may have changed with future calibrations, while the
312             # information stored directly in conversion_info retains
313             # the calibration information as it was when the experiment
314             # was performed.
315             pass  # Fall through to 'simple' conversion processing.
316         else:
317             assert conversion_info['type'] == 'simple', conversion_info['type']
318         assert conversion_info['scaling']['type'] == 'linear', \
319             conversion_info['scaling']['type']
320         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
321             conversion_info['scaling']['style']
322         multiplier = float(conversion_info['scaling']['multiplier'])
323         offset = float(conversion_info['scaling']['offset'])
324         unit = self._zip_unit(conversion_info['scaling'], version)
325         segment[:,channel] = segment[:,channel] * multiplier + offset
326         segment.info['columns'][channel] = join_data_label(channel_name, unit)
327         return segment
328
329     def _zip_unit(self, conversion_info, version):
330         if version in ['0.%d' % i for i in range(3)]:
331             return conversion_info['unit']
332         else:
333             return conversion_info['unit']['unit']
334
335     def _parse_params(self, lines):
336         info = {}
337         for line in lines:
338             line = line.strip()
339             if line.startswith('#'):
340                 continue
341             else:
342                 # e.g.: force-segment-header.type=xy-position-segment-header
343                 fields = line.split('=', 1)
344                 assert len(fields) == 2, line
345                 setting = fields[0].split('.')
346                 sub_info = info  # drill down, e.g. info['force-s..']['type']
347                 for s in setting[:-1]:
348                     if s not in sub_info:
349                         sub_info[s] = {}
350                     sub_info = sub_info[s]
351                 if setting[-1] == 'list':  # split a space-delimited list
352                     sub_info[setting[-1]] = fields[1].split(' ')
353                 else:
354                     sub_info[setting[-1]] = fields[1]
355         return info
356
357     def _read_old(self, path, info):
358         raise NotImplementedError(
359             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
360             "use JPK's `out2jpk-force` script to convert your old files "
361             "to a more recent format before loading them with Hooke.")