Convert JPK deflection sign to follow Hooke convention.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import os.path
24 import pprint
25 import zipfile
26
27 import numpy
28
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from ..util.si import join_data_label, split_data_label
33 from . import Driver as Driver
34
35
36 class JPKDriver (Driver):
37     """Handle JPK ForceRobot's data format.
38     """
39     def __init__(self):
40         super(JPKDriver, self).__init__(name='jpk')
41
42     def is_me(self, path):
43         if os.path.isdir(path):
44             return False
45         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
46             with Closing(zipfile.ZipFile(path, 'r')) as f:
47                 if 'header.properties' not in f.namelist():
48                     return False
49                 with Closing(f.open('header.properties')) as h:
50                     if 'jpk-data-file' in h.read():
51                         return True
52         else:
53             with Closing(open(path, 'r')) as f:
54                 headlines = []
55                 for i in range(3):
56                     headlines.append(f.readline())
57             if headlines[0].startswith('# xPosition') \
58                     and headlines[1].startswith('# yPosition'):
59                 return True
60         return False
61
62     def read(self, path, info=None):
63         if info == None:
64             info = {}
65         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
66             return self._read_zip(path, info)
67         else:
68             return self._read_old(path, info)
69
70     def _read_zip(self, path, info):
71         with Closing(zipfile.ZipFile(path, 'r')) as f:
72             f.path = path
73             zip_info = self._zip_info(f)
74             segments = []
75             for i in range(len([p for p in f.namelist()
76                                 if p.endswith('segment-header.properties')])):
77                 segments.append(self._zip_segment(f, path, info, zip_info, i))
78         if zip_info['file-format-version'] not in ['0.5']:
79             raise NotImplementedError(
80                 'JPK file version %s not supported (yet).'
81                 % zip_info['file-format-version'])
82         for name in ['approach', 'retract']:
83             if len([s for s in segments if s.info['name'] == name]) == 0:
84                 raise ValueError(
85                     'No segment for %s in %s, only %s'
86                     % (name, path, [s.info['name'] for s in segments]))
87         curve_info = self._zip_translate_params(zip_info,
88                                                 segments[0].info['raw info'])
89         for segment in segments:
90             segment.info['spring constant (N/m)'] = \
91                 curve_info['spring constant (N/m)']
92         return (segments, curve_info)
93
94     def _zip_info(self, zipfile):
95         with Closing(zipfile.open('header.properties')) as f:
96             info = self._parse_params(f.readlines())
97             return info
98
99     def _zip_segment(self, zipfile, path, info, zip_info, index):
100         prop_file = zipfile.open(os.path.join(
101                 'segments', str(index), 'segment-header.properties'))
102         prop = self._parse_params(prop_file.readlines())
103         prop_file.close()
104         expected_shape = (int(prop['force-segment-header']['num-points']),)
105         channels = []
106         for chan in prop['channels']['list']:
107             chan_info = prop['channel'][chan]
108             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
109             if channels[-1].shape != expected_shape:
110                     raise NotImplementedError(
111                         'Channel %d:%s in %s has strange shape %s != %s'
112                         % (index, chan, zipfile.path,
113                            channels[-1].shape, expected_shape))
114         d = curve.Data(
115             shape=(len(channels[0]), len(channels)),
116             dtype=channels[0].dtype,
117             info=self._zip_translate_segment_params(prop))
118         for i,chan in enumerate(channels):
119             d[:,i] = chan
120         return self._zip_scale_segment(d, path, info)
121
122     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
123         f = zipfile.open(os.path.join(
124                 'segments', str(segment_index),
125                 chan_info['data']['file']['name']), 'r')
126         assert chan_info['data']['file']['format'] == 'raw', \
127             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
128         assert chan_info['data']['type'] == 'float-data', \
129             'Non-float data format:\n%s' % pprint.pformat(chan_info)
130         data = numpy.frombuffer(
131             buffer(f.read()),
132             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
133         # '>' (big endian) byte order.
134         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
135         #    All forms of raw data are stored in chronological order
136         #    (the order in which they were collected), and the
137         #    individual values are stored in network byte order
138         #    (big-endian). The data type used to store the data is
139         #    specified by the "channel.*.data.type" property, and is
140         #    either short (2 bytes per value), integer (4 bytes), or
141         #    float (4 bytes, IEEE format).
142         f.close()
143         return data
144
145     def _zip_translate_params(self, params, chan_info):
146         info = {
147             'raw info':params,
148             'filetype':self.name,
149             #'time':self._time_from_TODO(raw_info[]),
150             }
151         # TODO: distinguish between force clamp and velocity clamp
152         # experiments.  Note that the JPK file format is flexible
153         # enough to support mixed experiments (i.e. both force clamp
154         # and velocity clamp segments in a single experiment), but I
155         # have no idea what sort of analysis such experiments would
156         # require ;).
157         info['experiment'] = experiment.VelocityClamp
158         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
159         assert force_unit == 'N', force_unit
160         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
161         assert force_base == 'distance', force_base
162         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
163         assert dist_unit == 'm', dist_unit
164         force_mult = float(
165             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
166         info['spring constant (N/m)'] = force_mult
167         return info
168
169     def _zip_translate_segment_params(self, params):
170         info = {
171             'raw info':params,
172             'columns':list(params['channels']['list']),
173             'name':params['force-segment-header']['name']['name'],
174             }
175         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
176             info['name'] = info['name'][:-len('-spm')]
177             if info['name'] == 'extend':
178                 info['name'] = 'approach'
179         else:
180             raise NotImplementedError(
181                 'Unrecognized segment type %s' % info['name'])
182         return info
183
184     def _zip_scale_segment(self, segment, path, info):
185         data = curve.Data(
186             shape=segment.shape,
187             dtype=segment.dtype,
188             info={})
189         data[:,:] = segment
190         segment.info['raw data'] = data
191
192         # raw column indices
193         channels = segment.info['raw info']['channels']['list']
194         for i,channel in enumerate(channels):
195             conversion = None
196             if channel == 'vDeflection':
197                 conversion = 'distance'
198             segment = self._zip_scale_channel(
199                 segment, channel, conversion=conversion, path=path, info=info)
200             name,unit = split_data_label(segment.info['columns'][i])
201             if name == 'vDeflection':
202                 assert unit == 'm', segment.info['columns'][i]
203                 segment.info['columns'][i] = join_data_label('deflection', 'm')
204                 # Invert because deflection voltage increases as the
205                 # tip moves away from the surface, but it makes more
206                 # sense to me to have it increase as it moves toward
207                 # the surface (positive tension on the protein chain).
208                 segment[:,i] *= -1
209             elif name == 'height':
210                 assert unit == 'm', segment.info['columns'][i]
211                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
212         return segment
213
214     def _zip_scale_channel(self, segment, channel_name, conversion=None,
215                            path=None, info={}):
216         channel = segment.info['raw info']['channels']['list'].index(
217             channel_name)
218         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
219         if conversion == None:
220             conversion = conversion_set['conversions']['default']
221         if conversion == conversion_set['conversions']['base']:
222             # Our conversion is the base data.
223             if conversion != 'volts':
224                 raise NotImplementedError(
225                     'unknown units for base channel: %s' % conversion)
226             segment.info['columns'][channel] = join_data_label(
227                 channel_name, 'V')
228             return segment
229         conversion_info = conversion_set['conversion'][conversion]
230         if conversion_info['base-calibration-slot'] \
231                 != conversion_set['conversions']['base']:
232             # Our conversion is stacked on a previous conversion.  Do
233             # the previous conversion first.
234             segment = self._zip_scale_channel(
235                 segment, channel_name,
236                 conversion_info['base-calibration-slot'],
237                 path=path, info=info)
238         if conversion_info['type'] == 'file':
239             # Michael Haggerty at JPK points out that the conversion
240             # information stored in the external file is reproduced in
241             # the force curve file.  So there is no need to actually
242             # read `conversion_info['file']`.  In fact, the data there
243             # may have changed with future calibrations, while the
244             # information stored directly in conversion_info retains
245             # the calibration information as it was when the experiment
246             # was performed.
247             pass  # Fall through to 'simple' conversion processing.
248         else:
249             assert conversion_info['type'] == 'simple', conversion_info['type']
250         assert conversion_info['scaling']['type'] == 'linear', \
251             conversion_info['scaling']['type']
252         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
253             conversion_info['scaling']['style']
254         multiplier = float(conversion_info['scaling']['multiplier'])
255         offset = float(conversion_info['scaling']['offset'])
256         unit = conversion_info['scaling']['unit']['unit']
257         segment[:,channel] = segment[:,channel] * multiplier + offset
258         segment.info['columns'][channel] = join_data_label(channel_name, unit)
259         return segment
260
261     def _parse_params(self, lines):
262         info = {}
263         for line in lines:
264             line = line.strip()
265             if line.startswith('#'):
266                 continue
267             else:
268                 # e.g.: force-segment-header.type=xy-position-segment-header
269                 fields = line.split('=', 1)
270                 assert len(fields) == 2, line
271                 setting = fields[0].split('.')
272                 sub_info = info  # drill down, e.g. info['force-s..']['type']
273                 for s in setting[:-1]:
274                     if s not in sub_info:
275                         sub_info[s] = {}
276                     sub_info = sub_info[s]
277                 if setting[-1] == 'list':  # split a space-delimited list
278                     sub_info[setting[-1]] = fields[1].split(' ')
279                 else:
280                     sub_info[setting[-1]] = fields[1]
281         return info
282
283     def _read_old(self, path, info):
284         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)