Don't assume you know which sJPK egments will be created.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             segments = []
77             for i in range(len([p for p in f.namelist()
78                                 if p.endswith('segment-header.properties')])):
79                 segments.append(self._zip_segment(f, path, info, zip_info, i))
80         if zip_info['file-format-version'] not in ['0.%d' % i
81                                                    for i in range(12)]:
82             raise NotImplementedError(
83                 'JPK file version %s not supported (yet).'
84                 % zip_info['file-format-version'])
85         curve_info = self._zip_translate_params(zip_info,
86                                                 segments[0].info['raw info'])
87         for segment in segments:
88             segment.info['spring constant (N/m)'] = \
89                 curve_info['spring constant (N/m)']
90         return (segments, curve_info)
91
92     def _zip_info(self, zipfile):
93         with Closing(zipfile.open('header.properties')) as f:
94             info = self._parse_params(f.readlines())
95             return info
96
97     def _zip_segment(self, zipfile, path, info, zip_info, index):
98         prop_file = zipfile.open(os.path.join(
99                 'segments', str(index), 'segment-header.properties'))
100         prop = self._parse_params(prop_file.readlines())
101         prop_file.close()
102         expected_shape = (int(prop['force-segment-header']['num-points']),)
103         channels = []
104         for chan in prop['channels']['list']:
105             chan_info = prop['channel'][chan]
106             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
107             if channels[-1].shape != expected_shape:
108                     raise NotImplementedError(
109                         'Channel %d:%s in %s has strange shape %s != %s'
110                         % (index, chan, zipfile.path,
111                            channels[-1].shape, expected_shape))
112         d = curve.Data(
113             shape=(len(channels[0]), len(channels)),
114             dtype=channels[0].dtype,
115             info=self._zip_translate_segment_params(prop))
116         for i,chan in enumerate(channels):
117             d[:,i] = chan
118         return self._zip_scale_segment(d, path, info)
119
120     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
121         f = zipfile.open(os.path.join(
122                 'segments', str(segment_index),
123                 chan_info['data']['file']['name']), 'r')
124         assert chan_info['data']['file']['format'] == 'raw', \
125             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
126         assert chan_info['data']['type'] == 'float-data', \
127             'Non-float data format:\n%s' % pprint.pformat(chan_info)
128         data = numpy.frombuffer(
129             buffer(f.read()),
130             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
131         # '>' (big endian) byte order.
132         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
133         #    All forms of raw data are stored in chronological order
134         #    (the order in which they were collected), and the
135         #    individual values are stored in network byte order
136         #    (big-endian). The data type used to store the data is
137         #    specified by the "channel.*.data.type" property, and is
138         #    either short (2 bytes per value), integer (4 bytes), or
139         #    float (4 bytes, IEEE format).
140         f.close()
141         return data
142
143     def _zip_translate_params(self, params, chan_info):
144         info = {
145             'raw info':params,
146             #'time':self._time_from_TODO(raw_info[]),
147             }
148         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
149         assert force_unit == 'N', force_unit
150         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
151         assert force_base == 'distance', force_base
152         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
153         assert dist_unit == 'm', dist_unit
154         force_mult = float(
155             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
156         info['spring constant (N/m)'] = force_mult
157         return info
158
159     def _zip_translate_segment_params(self, params):
160         info = {
161             'raw info':params,
162             'columns':list(params['channels']['list']),
163             'name':params['force-segment-header']['name']['name'],
164             }
165         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
166             info['name'] = info['name'][:-len('-spm')]
167             if info['name'] == 'extend':
168                 info['name'] = 'approach'
169         else:
170             raise NotImplementedError(
171                 'Unrecognized segment type %s' % info['name'])
172         return info
173
174     def _zip_scale_segment(self, segment, path, info):
175         data = curve.Data(
176             shape=segment.shape,
177             dtype=segment.dtype,
178             info={})
179         data[:,:] = segment
180         segment.info['raw data'] = data
181
182         # raw column indices
183         channels = segment.info['raw info']['channels']['list']
184         for i,channel in enumerate(channels):
185             conversion = None
186             if channel == 'vDeflection':
187                 conversion = 'distance'
188             segment = self._zip_scale_channel(
189                 segment, channel, conversion=conversion, path=path, info=info)
190             name,unit = split_data_label(segment.info['columns'][i])
191             if name == 'vDeflection':
192                 assert unit == 'm', segment.info['columns'][i]
193                 segment.info['columns'][i] = join_data_label('deflection', 'm')
194                 # Invert because deflection voltage increases as the
195                 # tip moves away from the surface, but it makes more
196                 # sense to me to have it increase as it moves toward
197                 # the surface (positive tension on the protein chain).
198                 segment[:,i] *= -1
199             elif name == 'height':
200                 assert unit == 'm', segment.info['columns'][i]
201                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
202         return segment
203
204     def _zip_scale_channel(self, segment, channel_name, conversion=None,
205                            path=None, info={}):
206         channel = segment.info['raw info']['channels']['list'].index(
207             channel_name)
208         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
209         if conversion == None:
210             conversion = conversion_set['conversions']['default']
211         if conversion == conversion_set['conversions']['base']:
212             # Our conversion is the base data.
213             if conversion != 'volts':
214                 raise NotImplementedError(
215                     'unknown units for base channel: %s' % conversion)
216             segment.info['columns'][channel] = join_data_label(
217                 channel_name, 'V')
218             return segment
219         conversion_info = conversion_set['conversion'][conversion]
220         if conversion_info['base-calibration-slot'] \
221                 != conversion_set['conversions']['base']:
222             # Our conversion is stacked on a previous conversion.  Do
223             # the previous conversion first.
224             segment = self._zip_scale_channel(
225                 segment, channel_name,
226                 conversion_info['base-calibration-slot'],
227                 path=path, info=info)
228         if conversion_info['type'] == 'file':
229             # Michael Haggerty at JPK points out that the conversion
230             # information stored in the external file is reproduced in
231             # the force curve file.  So there is no need to actually
232             # read `conversion_info['file']`.  In fact, the data there
233             # may have changed with future calibrations, while the
234             # information stored directly in conversion_info retains
235             # the calibration information as it was when the experiment
236             # was performed.
237             pass  # Fall through to 'simple' conversion processing.
238         else:
239             assert conversion_info['type'] == 'simple', conversion_info['type']
240         assert conversion_info['scaling']['type'] == 'linear', \
241             conversion_info['scaling']['type']
242         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
243             conversion_info['scaling']['style']
244         multiplier = float(conversion_info['scaling']['multiplier'])
245         offset = float(conversion_info['scaling']['offset'])
246         unit = conversion_info['scaling']['unit']['unit']
247         segment[:,channel] = segment[:,channel] * multiplier + offset
248         segment.info['columns'][channel] = join_data_label(channel_name, unit)
249         return segment
250
251     def _parse_params(self, lines):
252         info = {}
253         for line in lines:
254             line = line.strip()
255             if line.startswith('#'):
256                 continue
257             else:
258                 # e.g.: force-segment-header.type=xy-position-segment-header
259                 fields = line.split('=', 1)
260                 assert len(fields) == 2, line
261                 setting = fields[0].split('.')
262                 sub_info = info  # drill down, e.g. info['force-s..']['type']
263                 for s in setting[:-1]:
264                     if s not in sub_info:
265                         sub_info[s] = {}
266                     sub_info = sub_info[s]
267                 if setting[-1] == 'list':  # split a space-delimited list
268                     sub_info[setting[-1]] = fields[1].split(' ')
269                 else:
270                     sub_info[setting[-1]] = fields[1]
271         return info
272
273     def _read_old(self, path, info):
274         raise NotImplementedError(
275             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
276             "use JPK's `out2jpk-force` script to convert your old files "
277             "to a more recent format before loading them with Hooke.")