Break JPK data block name extraction out into ._zip_segment_name().
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             segments = []
77             for i in range(len([p for p in f.namelist()
78                                 if p.endswith('segment-header.properties')])):
79                 segments.append(self._zip_segment(f, path, info, zip_info, i))
80         if zip_info['file-format-version'] not in ['0.%d' % i
81                                                    for i in range(12)]:
82             raise NotImplementedError(
83                 'JPK file version %s not supported (yet).'
84                 % zip_info['file-format-version'])
85         curve_info = self._zip_translate_params(zip_info,
86                                                 segments[0].info['raw info'])
87         for segment in segments:  # HACK, should use curve-level spring constant
88             segment.info['spring constant (N/m)'] = \
89                 curve_info['spring constant (N/m)']
90         return (segments, curve_info)
91
92     def _zip_info(self, zipfile):
93         with Closing(zipfile.open('header.properties')) as f:
94             info = self._parse_params(f.readlines())
95             return info
96
97     def _zip_segment(self, zipfile, path, info, zip_info, index):
98         prop_file = zipfile.open(os.path.join(
99                 'segments', str(index), 'segment-header.properties'))
100         prop = self._parse_params(prop_file.readlines())
101         prop_file.close()
102         expected_shape = (int(prop['force-segment-header']['num-points']),)
103         channels = []
104         if 'list' not in prop['channels']:
105             prop['channels'] = {'list': prop['channels'].split()}
106         for chan in prop['channels']['list']:
107             chan_info = prop['channel'][chan]
108             channels.append(self._zip_channel(
109                     zipfile, index, chan, chan_info))
110             if channels[-1].shape != expected_shape:
111                 raise NotImplementedError(
112                     'Channel %d:%s in %s has strange shape %s != %s'
113                     % (index, chan, zipfile.path,
114                        channels[-1].shape, expected_shape))
115         if len(channels) > 0:
116             shape = (len(channels[0]), len(channels))
117             dtype = channels[0].dtype
118         else:  # no channels for this data block
119             shape = (0,0)
120             dtype = numpy.float32
121         d = curve.Data(
122             shape=shape,
123             dtype=dtype,
124             info=self._zip_translate_segment_params(prop))
125         for i,chan in enumerate(channels):
126             d[:,i] = chan
127         return self._zip_scale_segment(d, path, info)
128
129     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
130         f = zipfile.open(os.path.join(
131                 'segments', str(segment_index),
132                 chan_info['data']['file']['name']), 'r')
133         assert chan_info['data']['file']['format'] == 'raw', \
134             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
135         assert chan_info['data']['type'] == 'float-data', \
136             'Non-float data format:\n%s' % pprint.pformat(chan_info)
137         data = numpy.frombuffer(
138             buffer(f.read()),
139             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
140         # '>' (big endian) byte order.
141         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
142         #    All forms of raw data are stored in chronological order
143         #    (the order in which they were collected), and the
144         #    individual values are stored in network byte order
145         #    (big-endian). The data type used to store the data is
146         #    specified by the "channel.*.data.type" property, and is
147         #    either short (2 bytes per value), integer (4 bytes), or
148         #    float (4 bytes, IEEE format).
149         f.close()
150         return data
151
152     def _zip_translate_params(self, params, chan_info):
153         info = {
154             'raw info':params,
155             #'time':self._time_from_TODO(raw_info[]),
156             }
157         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
158         assert force_unit == 'N', force_unit
159         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
160         assert force_base == 'distance', force_base
161         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
162         assert dist_unit == 'm', dist_unit
163         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
164         assert distance_base == 'volts', distance_base
165         # Assume volts unit is V, but it is not specified in the JPK
166         # file format.
167         force_mult = float(
168             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
169         sens_mult = float(
170             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
171         info['spring constant (N/m)'] = force_mult
172         info['z piezo sensitivity (m/V)'] = sens_mult
173         return info
174
175     def _zip_translate_segment_params(self, params):
176         info = {
177             'raw info': params,
178             'columns': list(params['channels']['list']),
179             'name': self._zip_segment_name(params),
180             }
181         return info
182
183     def _zip_segment_name(self, params):
184         name = params['force-segment-header']['name']['name']
185         if name.endswith('-spm'):
186             name = name[:-len('-spm')]
187         if name == 'extend':
188             name = 'approach'
189         elif name.startswith('pause-at-'):
190             name = 'pause'
191         return name
192
193     def _zip_scale_segment(self, segment, path, info):
194         data = curve.Data(
195             shape=segment.shape,
196             dtype=segment.dtype,
197             info={})
198         data[:,:] = segment
199         segment.info['raw data'] = data
200
201         # raw column indices
202         channels = segment.info['raw info']['channels']['list']
203         for i,channel in enumerate(channels):
204             conversion = None
205             if channel == 'vDeflection':
206                 conversion = 'distance'
207             segment = self._zip_scale_channel(
208                 segment, channel, conversion=conversion, path=path, info=info)
209             name,unit = split_data_label(segment.info['columns'][i])
210             if name == 'vDeflection':
211                 assert unit == 'm', segment.info['columns'][i]
212                 segment.info['columns'][i] = join_data_label('deflection', 'm')
213                 # Invert because deflection voltage increases as the
214                 # tip moves away from the surface, but it makes more
215                 # sense to me to have it increase as it moves toward
216                 # the surface (positive tension on the protein chain).
217                 segment[:,i] *= -1
218             elif name == 'height':
219                 assert unit == 'm', segment.info['columns'][i]
220                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
221         return segment
222
223     def _zip_scale_channel(self, segment, channel_name, conversion=None,
224                            path=None, info={}):
225         channel = segment.info['raw info']['channels']['list'].index(
226             channel_name)
227         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
228         if conversion == None:
229             conversion = conversion_set['conversions']['default']
230         if conversion == conversion_set['conversions']['base']:
231             # Our conversion is the base data.
232             if conversion != 'volts':
233                 raise NotImplementedError(
234                     'unknown units for base channel: %s' % conversion)
235             segment.info['columns'][channel] = join_data_label(
236                 channel_name, 'V')
237             return segment
238         conversion_info = conversion_set['conversion'][conversion]
239         if conversion_info['base-calibration-slot'] \
240                 != conversion_set['conversions']['base']:
241             # Our conversion is stacked on a previous conversion.  Do
242             # the previous conversion first.
243             segment = self._zip_scale_channel(
244                 segment, channel_name,
245                 conversion_info['base-calibration-slot'],
246                 path=path, info=info)
247         if conversion_info['type'] == 'file':
248             # Michael Haggerty at JPK points out that the conversion
249             # information stored in the external file is reproduced in
250             # the force curve file.  So there is no need to actually
251             # read `conversion_info['file']`.  In fact, the data there
252             # may have changed with future calibrations, while the
253             # information stored directly in conversion_info retains
254             # the calibration information as it was when the experiment
255             # was performed.
256             pass  # Fall through to 'simple' conversion processing.
257         else:
258             assert conversion_info['type'] == 'simple', conversion_info['type']
259         assert conversion_info['scaling']['type'] == 'linear', \
260             conversion_info['scaling']['type']
261         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
262             conversion_info['scaling']['style']
263         multiplier = float(conversion_info['scaling']['multiplier'])
264         offset = float(conversion_info['scaling']['offset'])
265         unit = conversion_info['scaling']['unit']['unit']
266         segment[:,channel] = segment[:,channel] * multiplier + offset
267         segment.info['columns'][channel] = join_data_label(channel_name, unit)
268         return segment
269
270     def _parse_params(self, lines):
271         info = {}
272         for line in lines:
273             line = line.strip()
274             if line.startswith('#'):
275                 continue
276             else:
277                 # e.g.: force-segment-header.type=xy-position-segment-header
278                 fields = line.split('=', 1)
279                 assert len(fields) == 2, line
280                 setting = fields[0].split('.')
281                 sub_info = info  # drill down, e.g. info['force-s..']['type']
282                 for s in setting[:-1]:
283                     if s not in sub_info:
284                         sub_info[s] = {}
285                     sub_info = sub_info[s]
286                 if setting[-1] == 'list':  # split a space-delimited list
287                     sub_info[setting[-1]] = fields[1].split(' ')
288                 else:
289                     sub_info[setting[-1]] = fields[1]
290         return info
291
292     def _read_old(self, path, info):
293         raise NotImplementedError(
294             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
295             "use JPK's `out2jpk-force` script to convert your old files "
296             "to a more recent format before loading them with Hooke.")