Set filetype and experiment fields in JPK driver.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import os.path
24 import pprint
25 import zipfile
26
27 import numpy
28
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
33
34
35 class JPKDriver (Driver):
36     """Handle JPK ForceRobot's data format.
37     """
38     def __init__(self):
39         super(JPKDriver, self).__init__(name='jpk')
40
41     def is_me(self, path):
42         if os.path.isdir(path):
43             return False
44         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
45             with Closing(zipfile.ZipFile(path, 'r')) as f:
46                 if 'header.properties' not in f.namelist():
47                     return False
48                 with Closing(f.open('header.properties')) as h:
49                     if 'jpk-data-file' in h.read():
50                         return True
51         else:
52             with Closing(open(path, 'r')) as f:
53                 headlines = []
54                 for i in range(3):
55                     headlines.append(f.readline())
56             if headlines[0].startswith('# xPosition') \
57                     and headlines[1].startswith('# yPosition'):
58                 return True
59         return False
60
61     def read(self, path, info=None):
62         if info == None:
63             info = {}
64         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
65             return self._read_zip(path, info)
66         else:
67             return self._read_old(path, info)
68
69     def _read_zip(self, path, info):
70         with Closing(zipfile.ZipFile(path, 'r')) as f:
71             f.path = path
72             zip_info = self._zip_info(f)
73             segments = []
74             for i in range(len([p for p in f.namelist()
75                                 if p.endswith('segment-header.properties')])):
76                 segments.append(self._zip_segment(f, path, info, zip_info, i))
77         if zip_info['file-format-version'] not in ['0.5']:
78             raise NotImplementedError(
79                 'JPK file version %s not supported (yet).'
80                 % zip_info['file-format-version'])
81         for name in ['approach', 'retract']:
82             if len([s for s in segments if s.info['name'] == name]) == 0:
83                 raise ValueError(
84                     'No segment for %s in %s, only %s'
85                     % (name, path, [s.info['name'] for s in segments]))
86         curve_info = self._zip_translate_params(zip_info,
87                                                 segments[0].info['raw info'])
88         for segment in segments:
89             segment.info['spring constant (N/m)'] = \
90                 curve_info['spring constant (N/m)']
91         return (segments, curve_info)
92
93     def _zip_info(self, zipfile):
94         with Closing(zipfile.open('header.properties')) as f:
95             info = self._parse_params(f.readlines())
96             return info
97
98     def _zip_segment(self, zipfile, path, info, zip_info, index):
99         prop_file = zipfile.open(os.path.join(
100                 'segments', str(index), 'segment-header.properties'))
101         prop = self._parse_params(prop_file.readlines())
102         prop_file.close()
103         expected_shape = (int(prop['force-segment-header']['num-points']),)
104         channels = []
105         for chan in prop['channels']['list']:
106             chan_info = prop['channel'][chan]
107             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
108             if channels[-1].shape != expected_shape:
109                     raise NotImplementedError(
110                         'Channel %d:%s in %s has strange shape %s != %s'
111                         % (index, chan, zipfile.path,
112                            channels[-1].shape, expected_shape))
113         d = curve.Data(
114             shape=(len(channels[0]), len(channels)),
115             dtype=channels[0].dtype,
116             info=self._zip_translate_segment_params(prop))
117         for i,chan in enumerate(channels):
118             d[:,i] = chan
119         return self._zip_scale_segment(d, path, info)
120
121     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
122         f = zipfile.open(os.path.join(
123                 'segments', str(segment_index),
124                 chan_info['data']['file']['name']), 'r')
125         assert chan_info['data']['file']['format'] == 'raw', \
126             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
127         assert chan_info['data']['type'] == 'float-data', \
128             'Non-float data format:\n%s' % pprint.pformat(chan_info)
129         data = numpy.frombuffer(
130             buffer(f.read()),
131             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
132         # '>' (big endian) byte order.
133         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
134         #    All forms of raw data are stored in chronological order
135         #    (the order in which they were collected), and the
136         #    individual values are stored in network byte order
137         #    (big-endian). The data type used to store the data is
138         #    specified by the "channel.*.data.type" property, and is
139         #    either short (2 bytes per value), integer (4 bytes), or
140         #    float (4 bytes, IEEE format).
141         f.close()
142         return data
143
144     def _zip_translate_params(self, params, chan_info):
145         info = {
146             'raw info':params,
147             'filetype':self.name,
148             #'time':self._time_from_TODO(raw_info[]),
149             }
150         # TODO: distinguish between force clamp and velocity clamp
151         # experiments.  Note that the JPK file format is flexible
152         # enough to support mixed experiments (i.e. both force clamp
153         # and velocity clamp segments in a single experiment), but I
154         # have no idea what sort of analysis such experiments would
155         # require ;).
156         info['experiment'] = experiment.VelocityClamp
157         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
158         assert force_unit == 'N', force_unit
159         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
160         assert force_base == 'distance', force_base
161         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
162         assert dist_unit == 'm', dist_unit
163         force_mult = float(
164             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
165         info['spring constant (N/m)'] = force_mult
166         return info
167
168     def _zip_translate_segment_params(self, params):
169         info = {
170             'raw info':params,
171             'columns':list(params['channels']['list']),
172             'name':params['force-segment-header']['name']['name'],
173             }
174         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
175             info['name'] = info['name'][:-len('-spm')]
176             if info['name'] == 'extend':
177                 info['name'] = 'approach'
178         else:
179             raise NotImplementedError(
180                 'Unrecognized segment type %s' % info['name'])
181         return info
182
183     def _zip_scale_segment(self, segment, path, info):
184         data = curve.Data(
185             shape=segment.shape,
186             dtype=segment.dtype,
187             info={})
188         data[:,:] = segment
189         segment.info['raw data'] = data
190
191         # raw column indices
192         channels = segment.info['raw info']['channels']['list']
193         z_col = channels.index('height')
194         d_col = channels.index('vDeflection')
195         
196         segment = self._zip_scale_channel(
197             segment, z_col, 'calibrated', path, info)
198         segment = self._zip_scale_channel(
199             segment, d_col, 'distance', path, info)
200
201         assert segment.info['columns'][z_col] == 'height (m)', \
202             segment.info['columns'][z_col]
203         assert segment.info['columns'][d_col] == 'vDeflection (m)', \
204             segment.info['columns'][d_col]
205
206         # scaled column indices same as raw column indices,
207         # because columns is a copy of channels.list
208         segment.info['columns'][z_col] = 'z piezo (m)'
209         segment.info['columns'][d_col] = 'deflection (m)'
210         return segment
211
212     def _zip_scale_channel(self, segment, channel, conversion, path, info):
213         channel_name = segment.info['raw info']['channels']['list'][channel]
214         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
215         conversion_info = conversion_set['conversion'][conversion]
216         if conversion_info['base-calibration-slot'] \
217                 != conversion_set['conversions']['base']:
218             # Our conversion is stacked on a previous conversion.  Do
219             # the previous conversion first.
220             segment = self._zip_scale_channel(
221                 segment, channel, conversion_info['base-calibration-slot'],
222                 path=path, info=info)
223         if conversion_info['type'] == 'file':
224             # Michael Haggerty at JPK points out that the conversion
225             # information stored in the external file is reproduced in
226             # the force curve file.  So there is no need to actually
227             # read `conversion_info['file']`.  In fact, the data there
228             # may have changed with future calibrations, while the
229             # information stored directly in conversion_info retains
230             # the calibration information as it was when the experiment
231             # was performed.
232             pass  # Fall through to 'simple' conversion processing.
233         else:
234             assert conversion_info['type'] == 'simple', conversion_info['type']
235         assert conversion_info['scaling']['type'] == 'linear', \
236             conversion_info['scaling']['type']
237         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
238             conversion_info['scaling']['style']
239         multiplier = float(conversion_info['scaling']['multiplier'])
240         offset = float(conversion_info['scaling']['offset'])
241         unit = conversion_info['scaling']['unit']['unit']
242         segment[:,channel] = segment[:,channel] * multiplier + offset
243         segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
244         return segment
245
246     def _parse_params(self, lines):
247         info = {}
248         for line in lines:
249             line = line.strip()
250             if line.startswith('#'):
251                 continue
252             else:
253                 # e.g.: force-segment-header.type=xy-position-segment-header
254                 fields = line.split('=', 1)
255                 assert len(fields) == 2, line
256                 setting = fields[0].split('.')
257                 sub_info = info  # drill down, e.g. info['force-s..']['type']
258                 for s in setting[:-1]:
259                     if s not in sub_info:
260                         sub_info[s] = {}
261                     sub_info = sub_info[s]
262                 if setting[-1] == 'list':  # split a space-delimited list
263                     sub_info[setting[-1]] = fields[1].split(' ')
264                 else:
265                     sub_info[setting[-1]] = fields[1]
266         return info
267
268     def _read_old(self, path, info):
269         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)