dcc10409a4dea2cb1cfd7484bcaf15cee65f820f
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             segments = []
77             for i in range(len([p for p in f.namelist()
78                                 if p.endswith('segment-header.properties')])):
79                 segments.append(self._zip_segment(f, path, info, zip_info, i))
80         if zip_info['file-format-version'] not in ['0.%d' % i
81                                                    for i in range(12)]:
82             raise NotImplementedError(
83                 'JPK file version %s not supported (yet).'
84                 % zip_info['file-format-version'])
85         curve_info = self._zip_translate_params(zip_info,
86                                                 segments[0].info['raw info'])
87         for segment in segments:  # HACK, should use curve-level spring constant
88             segment.info['spring constant (N/m)'] = \
89                 curve_info['spring constant (N/m)']
90         return (segments, curve_info)
91
92     def _zip_info(self, zipfile):
93         with Closing(zipfile.open('header.properties')) as f:
94             info = self._parse_params(f.readlines())
95             return info
96
97     def _zip_segment(self, zipfile, path, info, zip_info, index):
98         prop_file = zipfile.open(os.path.join(
99                 'segments', str(index), 'segment-header.properties'))
100         prop = self._parse_params(prop_file.readlines())
101         prop_file.close()
102         expected_shape = (int(prop['force-segment-header']['num-points']),)
103         channels = []
104         if 'list' not in prop['channels']:
105             prop['channels'] = {'list': prop['channels'].split()}
106         for chan in prop['channels']['list']:
107             chan_info = prop['channel'][chan]
108             channels.append(self._zip_channel(
109                     zipfile, index, chan, chan_info))
110             if channels[-1].shape != expected_shape:
111                 raise NotImplementedError(
112                     'Channel %d:%s in %s has strange shape %s != %s'
113                     % (index, chan, zipfile.path,
114                        channels[-1].shape, expected_shape))
115         if len(channels) > 0:
116             shape = (len(channels[0]), len(channels))
117             dtype = channels[0].dtype
118         else:  # no channels for this data block
119             shape = (0,0)
120             dtype = numpy.float32
121         d = curve.Data(
122             shape=shape,
123             dtype=dtype,
124             info=self._zip_translate_segment_params(prop))
125         for i,chan in enumerate(channels):
126             d[:,i] = chan
127         return self._zip_scale_segment(d, path, info)
128
129     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
130         if chan_info['data']['type'] in ['constant-data', 'raster-data']:
131             return self._zip_calculate_channel(chan_info)
132         with Closing(zipfile.open(os.path.join(
133                     'segments', str(segment_index),
134                     chan_info['data']['file']['name']), 'r')) as f:
135             assert chan_info['data']['file']['format'] == 'raw', \
136                 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
137             dtype = self._zip_channel_dtype(chan_info)
138             data = numpy.frombuffer(
139                 buffer(f.read()),
140                 dtype=dtype,)
141         return data
142
143     def _zip_calculate_channel(self, chan_info):
144         type_ = chan_info['data']['type']
145         n = int(chan_info['data']['num-points'])
146         if type_ == 'constant-data':
147             return float(chan_info['data']['value'])*numpy.ones(
148                 shape=(n,),
149                 dtype=numpy.float32)
150         elif type_ == 'raster-data':
151             start = float(chan_info['data']['start'])
152             step = float(chan_info['data']['step'])
153             return numpy.arange(
154                 start=start,
155                 stop=start + step*(n-0.5),
156                 step=step,
157                 dtype=numpy.float32)
158         else:
159             raise ValueError('Unrecognized data format "%s"' % type_)
160
161     def _zip_channel_dtype(self, chan_info):
162         type_ = chan_info['data']['type']
163         if type_ in ['float-data', 'float']:
164             dtype = numpy.dtype(numpy.float32)
165         elif type_ in ['integer-data', 'memory-integer-data']:
166             encoder = chan_info['data']['encoder']['type']
167             if encoder in ['signedinteger', 'signedinteger-limited']:
168                 dtype = numpy.dtype(numpy.int32)
169             elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
170                 dtype = numpy.dtype(numpy.uint32)
171             else:
172                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
173                                  % (encoder, type_))
174         elif type_ in ['short-data', 'short', 'memory-short-data']:
175             encoder = chan_info['data']['encoder']['type']
176             if encoder in ['signedshort', 'signedshort-limited']:
177                 dtype = numpy.dtype(numpy.int16)
178             elif encoder in ['unsignedshort', 'unsignedshort-limited']:
179                 dtype = numpy.dtype(numpy.uint16)
180             else:
181                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
182                                  % (encoder, type_))
183         else:
184             raise ValueError('Unrecognized data format "%s"' % type_)
185         byte_order = '>'
186         # '>' (big endian) byte order.
187         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
188         #    All forms of raw data are stored in chronological order
189         #    (the order in which they were collected), and the
190         #    individual values are stored in network byte order
191         #    (big-endian). The data type used to store the data is
192         #    specified by the "channel.*.data.type" property, and is
193         #    either short (2 bytes per value), integer (4 bytes), or
194         #    float (4 bytes, IEEE format).
195         return dtype.newbyteorder(byte_order)
196
197     def _zip_translate_params(self, params, chan_info):
198         info = {
199             'raw info':params,
200             #'time':self._time_from_TODO(raw_info[]),
201             }
202         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
203         assert force_unit == 'N', force_unit
204         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
205         assert force_base == 'distance', force_base
206         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
207         assert dist_unit == 'm', dist_unit
208         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
209         assert distance_base == 'volts', distance_base
210         # Assume volts unit is V, but it is not specified in the JPK
211         # file format.
212         force_mult = float(
213             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
214         sens_mult = float(
215             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
216         info['spring constant (N/m)'] = force_mult
217         info['z piezo sensitivity (m/V)'] = sens_mult
218         return info
219
220     def _zip_translate_segment_params(self, params):
221         info = {
222             'raw info': params,
223             'columns': list(params['channels']['list']),
224             'name': self._zip_segment_name(params),
225             }
226         return info
227
228     def _zip_segment_name(self, params):
229         name = params['force-segment-header']['name']['name']
230         if name.endswith('-spm'):
231             name = name[:-len('-spm')]
232         if name == 'extend':
233             name = 'approach'
234         elif name.startswith('pause-at-'):
235             name = 'pause'
236         return name
237
238     def _zip_scale_segment(self, segment, path, info):
239         data = curve.Data(
240             shape=segment.shape,
241             dtype=segment.dtype,
242             info={})
243         data[:,:] = segment
244         segment.info['raw data'] = data
245
246         # raw column indices
247         channels = segment.info['raw info']['channels']['list']
248         for i,channel in enumerate(channels):
249             conversion = None
250             if channel == 'vDeflection':
251                 conversion = 'distance'
252             segment = self._zip_scale_channel(
253                 segment, channel, conversion=conversion, path=path, info=info)
254             name,unit = split_data_label(segment.info['columns'][i])
255             if name == 'vDeflection':
256                 assert unit == 'm', segment.info['columns'][i]
257                 segment.info['columns'][i] = join_data_label('deflection', 'm')
258                 # Invert because deflection voltage increases as the
259                 # tip moves away from the surface, but it makes more
260                 # sense to me to have it increase as it moves toward
261                 # the surface (positive tension on the protein chain).
262                 segment[:,i] *= -1
263             elif name == 'height':
264                 assert unit == 'm', segment.info['columns'][i]
265                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
266         return segment
267
268     def _zip_scale_channel(self, segment, channel_name, conversion=None,
269                            path=None, info={}):
270         channel = segment.info['raw info']['channels']['list'].index(
271             channel_name)
272         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
273         if conversion == None:
274             conversion = conversion_set['conversions']['default']
275         if conversion == conversion_set['conversions']['base']:
276             # Our conversion is the base data.
277             if conversion != 'volts':
278                 raise NotImplementedError(
279                     'unknown units for base channel: %s' % conversion)
280             segment.info['columns'][channel] = join_data_label(
281                 channel_name, 'V')
282             return segment
283         conversion_info = conversion_set['conversion'][conversion]
284         if conversion_info['base-calibration-slot'] \
285                 != conversion_set['conversions']['base']:
286             # Our conversion is stacked on a previous conversion.  Do
287             # the previous conversion first.
288             segment = self._zip_scale_channel(
289                 segment, channel_name,
290                 conversion_info['base-calibration-slot'],
291                 path=path, info=info)
292         if conversion_info['type'] == 'file':
293             # Michael Haggerty at JPK points out that the conversion
294             # information stored in the external file is reproduced in
295             # the force curve file.  So there is no need to actually
296             # read `conversion_info['file']`.  In fact, the data there
297             # may have changed with future calibrations, while the
298             # information stored directly in conversion_info retains
299             # the calibration information as it was when the experiment
300             # was performed.
301             pass  # Fall through to 'simple' conversion processing.
302         else:
303             assert conversion_info['type'] == 'simple', conversion_info['type']
304         assert conversion_info['scaling']['type'] == 'linear', \
305             conversion_info['scaling']['type']
306         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
307             conversion_info['scaling']['style']
308         multiplier = float(conversion_info['scaling']['multiplier'])
309         offset = float(conversion_info['scaling']['offset'])
310         unit = conversion_info['scaling']['unit']['unit']
311         segment[:,channel] = segment[:,channel] * multiplier + offset
312         segment.info['columns'][channel] = join_data_label(channel_name, unit)
313         return segment
314
315     def _parse_params(self, lines):
316         info = {}
317         for line in lines:
318             line = line.strip()
319             if line.startswith('#'):
320                 continue
321             else:
322                 # e.g.: force-segment-header.type=xy-position-segment-header
323                 fields = line.split('=', 1)
324                 assert len(fields) == 2, line
325                 setting = fields[0].split('.')
326                 sub_info = info  # drill down, e.g. info['force-s..']['type']
327                 for s in setting[:-1]:
328                     if s not in sub_info:
329                         sub_info[s] = {}
330                     sub_info = sub_info[s]
331                 if setting[-1] == 'list':  # split a space-delimited list
332                     sub_info[setting[-1]] = fields[1].split(' ')
333                 else:
334                     sub_info[setting[-1]] = fields[1]
335         return info
336
337     def _read_old(self, path, info):
338         raise NotImplementedError(
339             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
340             "use JPK's `out2jpk-force` script to convert your old files "
341             "to a more recent format before loading them with Hooke.")