Don't specify curve or data block types. See doc/standards.txt.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             segments = []
77             for i in range(len([p for p in f.namelist()
78                                 if p.endswith('segment-header.properties')])):
79                 segments.append(self._zip_segment(f, path, info, zip_info, i))
80         if zip_info['file-format-version'] not in ['0.3', '0.4', '0.5']:
81             raise NotImplementedError(
82                 'JPK file version %s not supported (yet).'
83                 % zip_info['file-format-version'])
84         for name in ['approach', 'retract']:
85             if len([s for s in segments if s.info['name'] == name]) == 0:
86                 raise ValueError(
87                     'No segment for %s in %s, only %s'
88                     % (name, path, [s.info['name'] for s in segments]))
89         curve_info = self._zip_translate_params(zip_info,
90                                                 segments[0].info['raw info'])
91         for segment in segments:
92             segment.info['spring constant (N/m)'] = \
93                 curve_info['spring constant (N/m)']
94         return (segments, curve_info)
95
96     def _zip_info(self, zipfile):
97         with Closing(zipfile.open('header.properties')) as f:
98             info = self._parse_params(f.readlines())
99             return info
100
101     def _zip_segment(self, zipfile, path, info, zip_info, index):
102         prop_file = zipfile.open(os.path.join(
103                 'segments', str(index), 'segment-header.properties'))
104         prop = self._parse_params(prop_file.readlines())
105         prop_file.close()
106         expected_shape = (int(prop['force-segment-header']['num-points']),)
107         channels = []
108         for chan in prop['channels']['list']:
109             chan_info = prop['channel'][chan]
110             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
111             if channels[-1].shape != expected_shape:
112                     raise NotImplementedError(
113                         'Channel %d:%s in %s has strange shape %s != %s'
114                         % (index, chan, zipfile.path,
115                            channels[-1].shape, expected_shape))
116         d = curve.Data(
117             shape=(len(channels[0]), len(channels)),
118             dtype=channels[0].dtype,
119             info=self._zip_translate_segment_params(prop))
120         for i,chan in enumerate(channels):
121             d[:,i] = chan
122         return self._zip_scale_segment(d, path, info)
123
124     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
125         f = zipfile.open(os.path.join(
126                 'segments', str(segment_index),
127                 chan_info['data']['file']['name']), 'r')
128         assert chan_info['data']['file']['format'] == 'raw', \
129             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
130         assert chan_info['data']['type'] == 'float-data', \
131             'Non-float data format:\n%s' % pprint.pformat(chan_info)
132         data = numpy.frombuffer(
133             buffer(f.read()),
134             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
135         # '>' (big endian) byte order.
136         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
137         #    All forms of raw data are stored in chronological order
138         #    (the order in which they were collected), and the
139         #    individual values are stored in network byte order
140         #    (big-endian). The data type used to store the data is
141         #    specified by the "channel.*.data.type" property, and is
142         #    either short (2 bytes per value), integer (4 bytes), or
143         #    float (4 bytes, IEEE format).
144         f.close()
145         return data
146
147     def _zip_translate_params(self, params, chan_info):
148         info = {
149             'raw info':params,
150             #'time':self._time_from_TODO(raw_info[]),
151             }
152         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
153         assert force_unit == 'N', force_unit
154         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
155         assert force_base == 'distance', force_base
156         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
157         assert dist_unit == 'm', dist_unit
158         force_mult = float(
159             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
160         info['spring constant (N/m)'] = force_mult
161         return info
162
163     def _zip_translate_segment_params(self, params):
164         info = {
165             'raw info':params,
166             'columns':list(params['channels']['list']),
167             'name':params['force-segment-header']['name']['name'],
168             }
169         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
170             info['name'] = info['name'][:-len('-spm')]
171             if info['name'] == 'extend':
172                 info['name'] = 'approach'
173         else:
174             raise NotImplementedError(
175                 'Unrecognized segment type %s' % info['name'])
176         return info
177
178     def _zip_scale_segment(self, segment, path, info):
179         data = curve.Data(
180             shape=segment.shape,
181             dtype=segment.dtype,
182             info={})
183         data[:,:] = segment
184         segment.info['raw data'] = data
185
186         # raw column indices
187         channels = segment.info['raw info']['channels']['list']
188         for i,channel in enumerate(channels):
189             conversion = None
190             if channel == 'vDeflection':
191                 conversion = 'distance'
192             segment = self._zip_scale_channel(
193                 segment, channel, conversion=conversion, path=path, info=info)
194             name,unit = split_data_label(segment.info['columns'][i])
195             if name == 'vDeflection':
196                 assert unit == 'm', segment.info['columns'][i]
197                 segment.info['columns'][i] = join_data_label('deflection', 'm')
198                 # Invert because deflection voltage increases as the
199                 # tip moves away from the surface, but it makes more
200                 # sense to me to have it increase as it moves toward
201                 # the surface (positive tension on the protein chain).
202                 segment[:,i] *= -1
203             elif name == 'height':
204                 assert unit == 'm', segment.info['columns'][i]
205                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
206         return segment
207
208     def _zip_scale_channel(self, segment, channel_name, conversion=None,
209                            path=None, info={}):
210         channel = segment.info['raw info']['channels']['list'].index(
211             channel_name)
212         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
213         if conversion == None:
214             conversion = conversion_set['conversions']['default']
215         if conversion == conversion_set['conversions']['base']:
216             # Our conversion is the base data.
217             if conversion != 'volts':
218                 raise NotImplementedError(
219                     'unknown units for base channel: %s' % conversion)
220             segment.info['columns'][channel] = join_data_label(
221                 channel_name, 'V')
222             return segment
223         conversion_info = conversion_set['conversion'][conversion]
224         if conversion_info['base-calibration-slot'] \
225                 != conversion_set['conversions']['base']:
226             # Our conversion is stacked on a previous conversion.  Do
227             # the previous conversion first.
228             segment = self._zip_scale_channel(
229                 segment, channel_name,
230                 conversion_info['base-calibration-slot'],
231                 path=path, info=info)
232         if conversion_info['type'] == 'file':
233             # Michael Haggerty at JPK points out that the conversion
234             # information stored in the external file is reproduced in
235             # the force curve file.  So there is no need to actually
236             # read `conversion_info['file']`.  In fact, the data there
237             # may have changed with future calibrations, while the
238             # information stored directly in conversion_info retains
239             # the calibration information as it was when the experiment
240             # was performed.
241             pass  # Fall through to 'simple' conversion processing.
242         else:
243             assert conversion_info['type'] == 'simple', conversion_info['type']
244         assert conversion_info['scaling']['type'] == 'linear', \
245             conversion_info['scaling']['type']
246         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
247             conversion_info['scaling']['style']
248         multiplier = float(conversion_info['scaling']['multiplier'])
249         offset = float(conversion_info['scaling']['offset'])
250         unit = conversion_info['scaling']['unit']['unit']
251         segment[:,channel] = segment[:,channel] * multiplier + offset
252         segment.info['columns'][channel] = join_data_label(channel_name, unit)
253         return segment
254
255     def _parse_params(self, lines):
256         info = {}
257         for line in lines:
258             line = line.strip()
259             if line.startswith('#'):
260                 continue
261             else:
262                 # e.g.: force-segment-header.type=xy-position-segment-header
263                 fields = line.split('=', 1)
264                 assert len(fields) == 2, line
265                 setting = fields[0].split('.')
266                 sub_info = info  # drill down, e.g. info['force-s..']['type']
267                 for s in setting[:-1]:
268                     if s not in sub_info:
269                         sub_info[s] = {}
270                     sub_info = sub_info[s]
271                 if setting[-1] == 'list':  # split a space-delimited list
272                     sub_info[setting[-1]] = fields[1].split(' ')
273                 else:
274                     sub_info[setting[-1]] = fields[1]
275         return info
276
277     def _read_old(self, path, info):
278         raise NotImplementedError(
279             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
280             "use JPK's `out2jpk-force` script to convert your old files "
281             "to a more recent format before loading them with Hooke.")