Extract 'z piezo sensitivity (m/V)' from JPK files into Curve.info.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             segments = []
77             for i in range(len([p for p in f.namelist()
78                                 if p.endswith('segment-header.properties')])):
79                 segments.append(self._zip_segment(f, path, info, zip_info, i))
80         if zip_info['file-format-version'] not in ['0.%d' % i
81                                                    for i in range(12)]:
82             raise NotImplementedError(
83                 'JPK file version %s not supported (yet).'
84                 % zip_info['file-format-version'])
85         curve_info = self._zip_translate_params(zip_info,
86                                                 segments[0].info['raw info'])
87         for segment in segments:  # HACK, should use curve-level spring constant
88             segment.info['spring constant (N/m)'] = \
89                 curve_info['spring constant (N/m)']
90         return (segments, curve_info)
91
92     def _zip_info(self, zipfile):
93         with Closing(zipfile.open('header.properties')) as f:
94             info = self._parse_params(f.readlines())
95             return info
96
97     def _zip_segment(self, zipfile, path, info, zip_info, index):
98         prop_file = zipfile.open(os.path.join(
99                 'segments', str(index), 'segment-header.properties'))
100         prop = self._parse_params(prop_file.readlines())
101         prop_file.close()
102         expected_shape = (int(prop['force-segment-header']['num-points']),)
103         channels = []
104         if 'list' not in prop['channels']:
105             prop['channels'] = {'list': prop['channels'].split()}
106         for chan in prop['channels']['list']:
107             chan_info = prop['channel'][chan]
108             channels.append(self._zip_channel(
109                     zipfile, index, chan, chan_info))
110             if channels[-1].shape != expected_shape:
111                 raise NotImplementedError(
112                     'Channel %d:%s in %s has strange shape %s != %s'
113                     % (index, chan, zipfile.path,
114                        channels[-1].shape, expected_shape))
115         if len(channels) > 0:
116             shape = (len(channels[0]), len(channels))
117             dtype = channels[0].dtype
118         else:  # no channels for this data block
119             shape = (0,0)
120             dtype = numpy.float32
121         d = curve.Data(
122             shape=shape,
123             dtype=dtype,
124             info=self._zip_translate_segment_params(prop))
125         for i,chan in enumerate(channels):
126             d[:,i] = chan
127         return self._zip_scale_segment(d, path, info)
128
129     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
130         f = zipfile.open(os.path.join(
131                 'segments', str(segment_index),
132                 chan_info['data']['file']['name']), 'r')
133         assert chan_info['data']['file']['format'] == 'raw', \
134             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
135         assert chan_info['data']['type'] == 'float-data', \
136             'Non-float data format:\n%s' % pprint.pformat(chan_info)
137         data = numpy.frombuffer(
138             buffer(f.read()),
139             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
140         # '>' (big endian) byte order.
141         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
142         #    All forms of raw data are stored in chronological order
143         #    (the order in which they were collected), and the
144         #    individual values are stored in network byte order
145         #    (big-endian). The data type used to store the data is
146         #    specified by the "channel.*.data.type" property, and is
147         #    either short (2 bytes per value), integer (4 bytes), or
148         #    float (4 bytes, IEEE format).
149         f.close()
150         return data
151
152     def _zip_translate_params(self, params, chan_info):
153         info = {
154             'raw info':params,
155             #'time':self._time_from_TODO(raw_info[]),
156             }
157         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
158         assert force_unit == 'N', force_unit
159         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
160         assert force_base == 'distance', force_base
161         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
162         assert dist_unit == 'm', dist_unit
163         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
164         assert distance_base == 'volts', distance_base
165         # Assume volts unit is V, but it is not specified in the JPK
166         # file format.
167         force_mult = float(
168             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
169         sens_mult = float(
170             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
171         info['spring constant (N/m)'] = force_mult
172         info['z piezo sensitivity (m/V)'] = sens_mult
173         return info
174
175     def _zip_translate_segment_params(self, params):
176         info = {
177             'raw info':params,
178             'columns':list(params['channels']['list']),
179             'name':params['force-segment-header']['name']['name'],
180             }
181         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
182             info['name'] = info['name'][:-len('-spm')]
183             if info['name'] == 'extend':
184                 info['name'] = 'approach'
185         else:
186             raise NotImplementedError(
187                 'Unrecognized segment type %s' % info['name'])
188         return info
189
190     def _zip_scale_segment(self, segment, path, info):
191         data = curve.Data(
192             shape=segment.shape,
193             dtype=segment.dtype,
194             info={})
195         data[:,:] = segment
196         segment.info['raw data'] = data
197
198         # raw column indices
199         channels = segment.info['raw info']['channels']['list']
200         for i,channel in enumerate(channels):
201             conversion = None
202             if channel == 'vDeflection':
203                 conversion = 'distance'
204             segment = self._zip_scale_channel(
205                 segment, channel, conversion=conversion, path=path, info=info)
206             name,unit = split_data_label(segment.info['columns'][i])
207             if name == 'vDeflection':
208                 assert unit == 'm', segment.info['columns'][i]
209                 segment.info['columns'][i] = join_data_label('deflection', 'm')
210                 # Invert because deflection voltage increases as the
211                 # tip moves away from the surface, but it makes more
212                 # sense to me to have it increase as it moves toward
213                 # the surface (positive tension on the protein chain).
214                 segment[:,i] *= -1
215             elif name == 'height':
216                 assert unit == 'm', segment.info['columns'][i]
217                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
218         return segment
219
220     def _zip_scale_channel(self, segment, channel_name, conversion=None,
221                            path=None, info={}):
222         channel = segment.info['raw info']['channels']['list'].index(
223             channel_name)
224         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
225         if conversion == None:
226             conversion = conversion_set['conversions']['default']
227         if conversion == conversion_set['conversions']['base']:
228             # Our conversion is the base data.
229             if conversion != 'volts':
230                 raise NotImplementedError(
231                     'unknown units for base channel: %s' % conversion)
232             segment.info['columns'][channel] = join_data_label(
233                 channel_name, 'V')
234             return segment
235         conversion_info = conversion_set['conversion'][conversion]
236         if conversion_info['base-calibration-slot'] \
237                 != conversion_set['conversions']['base']:
238             # Our conversion is stacked on a previous conversion.  Do
239             # the previous conversion first.
240             segment = self._zip_scale_channel(
241                 segment, channel_name,
242                 conversion_info['base-calibration-slot'],
243                 path=path, info=info)
244         if conversion_info['type'] == 'file':
245             # Michael Haggerty at JPK points out that the conversion
246             # information stored in the external file is reproduced in
247             # the force curve file.  So there is no need to actually
248             # read `conversion_info['file']`.  In fact, the data there
249             # may have changed with future calibrations, while the
250             # information stored directly in conversion_info retains
251             # the calibration information as it was when the experiment
252             # was performed.
253             pass  # Fall through to 'simple' conversion processing.
254         else:
255             assert conversion_info['type'] == 'simple', conversion_info['type']
256         assert conversion_info['scaling']['type'] == 'linear', \
257             conversion_info['scaling']['type']
258         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
259             conversion_info['scaling']['style']
260         multiplier = float(conversion_info['scaling']['multiplier'])
261         offset = float(conversion_info['scaling']['offset'])
262         unit = conversion_info['scaling']['unit']['unit']
263         segment[:,channel] = segment[:,channel] * multiplier + offset
264         segment.info['columns'][channel] = join_data_label(channel_name, unit)
265         return segment
266
267     def _parse_params(self, lines):
268         info = {}
269         for line in lines:
270             line = line.strip()
271             if line.startswith('#'):
272                 continue
273             else:
274                 # e.g.: force-segment-header.type=xy-position-segment-header
275                 fields = line.split('=', 1)
276                 assert len(fields) == 2, line
277                 setting = fields[0].split('.')
278                 sub_info = info  # drill down, e.g. info['force-s..']['type']
279                 for s in setting[:-1]:
280                     if s not in sub_info:
281                         sub_info[s] = {}
282                     sub_info = sub_info[s]
283                 if setting[-1] == 'list':  # split a space-delimited list
284                     sub_info[setting[-1]] = fields[1].split(' ')
285                 else:
286                     sub_info[setting[-1]] = fields[1]
287         return info
288
289     def _read_old(self, path, info):
290         raise NotImplementedError(
291             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
292             "use JPK's `out2jpk-force` script to convert your old files "
293             "to a more recent format before loading them with Hooke.")