Confirm big-endian binary data format for JPK files.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import os.path
24 import pprint
25 import zipfile
26
27 import numpy
28
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
33
34
35 class JPKDriver (Driver):
36     """Handle JPK ForceRobot's data format.
37     """
38     def __init__(self):
39         super(JPKDriver, self).__init__(name='jpk')
40
41     def is_me(self, path):
42         if os.path.isdir(path):
43             return False
44         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
45             with Closing(zipfile.ZipFile(path, 'r')) as f:
46                 if 'header.properties' not in f.namelist():
47                     return False
48                 with Closing(f.open('header.properties')) as h:
49                     if 'jpk-data-file' in h.read():
50                         return True
51         else:
52             with Closing(open(path, 'r')) as f:
53                 headlines = []
54                 for i in range(3):
55                     headlines.append(f.readline())
56             if headlines[0].startswith('# xPosition') \
57                     and headlines[1].startswith('# yPosition'):
58                 return True
59         return False
60
61     def read(self, path, info=None):
62         if info == None:
63             info = {}
64         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
65             return self._read_zip(path, info)
66         else:
67             return self._read_old(path, info)
68
69     def _read_zip(self, path, info):
70         with Closing(zipfile.ZipFile(path, 'r')) as f:
71             f.path = path
72             zip_info = self._zip_info(f)
73             segments = []
74             for i in range(len([p for p in f.namelist()
75                                 if p.endswith('segment-header.properties')])):
76                 segments.append(self._zip_segment(f, path, info, zip_info, i))
77             for name in ['approach', 'retract']:
78                 if len([s for s in segments if s.info['name'] == name]) == 0:
79                     raise ValueError(
80                         'No segment for %s in %s, only %s'
81                         % (name, path, [s.info['name'] for s in segments]))
82             return (segments,
83                     self._zip_translate_params(zip_info,
84                                                segments[0].info['raw info']))
85
86     def _zip_info(self, zipfile):
87         with Closing(zipfile.open('header.properties')) as f:
88             info = self._parse_params(f.readlines())
89             return info
90
91     def _zip_segment(self, zipfile, path, info, zip_info, index):
92         prop_file = zipfile.open(os.path.join(
93                 'segments', str(index), 'segment-header.properties'))
94         prop = self._parse_params(prop_file.readlines())
95         prop_file.close()
96         expected_shape = (int(prop['force-segment-header']['num-points']),)
97         channels = []
98         for chan in prop['channels']['list']:
99             chan_info = prop['channel'][chan]
100             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
101             if channels[-1].shape != expected_shape:
102                     raise NotImplementedError(
103                         'Channel %d:%s in %s has strange shape %s != %s'
104                         % (index, chan, zipfile.path,
105                            channels[-1].shape, expected_shape))
106         d = curve.Data(
107             shape=(len(channels[0]), len(channels)),
108             dtype=channels[0].dtype,
109             info=self._zip_translate_segment_params(prop))
110         for i,chan in enumerate(channels):
111             d[:,i] = chan
112         return self._zip_scale_segment(d, path, info)
113
114     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
115         f = zipfile.open(os.path.join(
116                 'segments', str(segment_index),
117                 chan_info['data']['file']['name']), 'r')
118         assert chan_info['data']['file']['format'] == 'raw', \
119             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
120         assert chan_info['data']['type'] == 'float-data', \
121             'Non-float data format:\n%s' % pprint.pformat(chan_info)
122         data = numpy.frombuffer(
123             buffer(f.read()),
124             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
125         # '>' (big endian) byte order.
126         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
127         #    All forms of raw data are stored in chronological order
128         #    (the order in which they were collected), and the
129         #    individual values are stored in network byte order
130         #    (big-endian). The data type used to store the data is
131         #    specified by the "channel.*.data.type" property, and is
132         #    either short (2 bytes per value), integer (4 bytes), or
133         #    float (4 bytes, IEEE format).
134         f.close()
135         return data
136
137     def _zip_translate_params(self, params, chan_info):
138         info = {
139             'raw info':params,
140             #'time':self._time_from_TODO(raw_info[]),
141             }
142         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
143         assert force_unit == 'N', force_unit
144         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
145         assert force_base == 'distance', force_base
146         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
147         assert dist_unit == 'm', dist_unit
148         force_mult = float(
149             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
150         info['spring constant (N/m)'] = force_mult
151         return info
152
153     def _zip_translate_segment_params(self, params):
154         info = {
155             'raw info':params,
156             'columns':list(params['channels']['list']),
157             'name':params['force-segment-header']['name']['name'],
158             }
159         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
160             info['name'] = info['name'][:-len('-spm')]
161             if info['name'] == 'extend':
162                 info['name'] = 'approach'
163         else:
164             raise NotImplementedError(
165                 'Unrecognized segment type %s' % info['name'])
166         return info
167
168     def _zip_scale_segment(self, segment, path, info):
169         data = curve.Data(
170             shape=segment.shape,
171             dtype=segment.dtype,
172             info={})
173         data[:,:] = segment
174         segment.info['raw data'] = data
175
176         # raw column indices
177         channels = segment.info['raw info']['channels']['list']
178         z_col = channels.index('height')
179         d_col = channels.index('vDeflection')
180         
181         segment = self._zip_scale_channel(
182             segment, z_col, 'calibrated', path, info)
183         segment = self._zip_scale_channel(
184             segment, d_col, 'distance', path, info)
185
186         assert segment.info['columns'][z_col] == 'height (m)', \
187             segment.info['columns'][z_col]
188         assert segment.info['columns'][d_col] == 'vDeflection (m)', \
189             segment.info['columns'][d_col]
190
191         # scaled column indices same as raw column indices,
192         # because columns is a copy of channels.list
193         segment.info['columns'][z_col] = 'z piezo (m)'
194         segment.info['columns'][d_col] = 'deflection (m)'
195         return segment
196
197     def _zip_scale_channel(self, segment, channel, conversion, path, info):
198         channel_name = segment.info['raw info']['channels']['list'][channel]
199         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
200         conversion_info = conversion_set['conversion'][conversion]
201         if conversion_info['base-calibration-slot'] \
202                 != conversion_set['conversions']['base']:
203             # Our conversion is stacked on a previous conversion.  Do
204             # the previous conversion first.
205             segment = self._zip_scale_channel(
206                 segment, channel, conversion_info['base-calibration-slot'],
207                 info, path)
208         if conversion_info['type'] == 'file':
209             key = ('%s_%s_to_%s_calibration_file'
210                    % (channel_name,
211                       conversion_info['base-calibration-slot'],
212                       conversion))
213             calib_path = conversion_info['file']
214             if key in info:
215                 calib_path = os.path.join(os.path.dirname(path), info[key])
216                 self.logger().debug(
217                     'Overriding %s -> %s calibration for %s channel: %s'
218                     % (conversion_info['base-calibration-slot'],
219                        conversion, channel_name, calib_path))
220             if os.path.exists(calib_path):
221                 with file(calib_path, 'r') as f:
222                     lines = [x.strip() for x in f.readlines()]
223                     f.close()
224                 calib = {  # I've emailed JPK to confirm this file format.
225                     'title':lines[0],
226                     'multiplier':float(lines[1]),
227                     'offset':float(lines[2]),
228                     'unit':lines[3],
229                     'note':'\n'.join(lines[4:]),
230                     }
231                 segment[:,channel] = (segment[:,channel] * calib['multiplier']
232                                       + calib['offset'])
233                 segment.info['columns'][channel] = (
234                     '%s (%s)' % (channel_name, calib['unit']))
235                 return segment
236             else:
237                 self.logger().warn(
238                     'Skipping %s -> %s calibration for %s channel.  Calibration file %s not found'
239                     % (conversion_info['base-calibration-slot'],
240                        conversion, channel_name, calib_path))
241         else:
242             assert conversion_info['type'] == 'simple', conversion_info['type']
243         assert conversion_info['scaling']['type'] == 'linear', \
244             conversion_info['scaling']['type']
245         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
246             conversion_info['scaling']['style']
247         multiplier = float(conversion_info['scaling']['multiplier'])
248         offset = float(conversion_info['scaling']['offset'])
249         unit = conversion_info['scaling']['unit']['unit']
250         segment[:,channel] = segment[:,channel] * multiplier + offset
251         segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
252         return segment
253
254     def _parse_params(self, lines):
255         info = {}
256         for line in lines:
257             line = line.strip()
258             if line.startswith('#'):
259                 continue
260             else:
261                 # e.g.: force-segment-header.type=xy-position-segment-header
262                 fields = line.split('=', 1)
263                 assert len(fields) == 2, line
264                 setting = fields[0].split('.')
265                 sub_info = info  # drill down, e.g. info['force-s..']['type']
266                 for s in setting[:-1]:
267                     if s not in sub_info:
268                         sub_info[s] = {}
269                     sub_info = sub_info[s]
270                 if setting[-1] == 'list':  # split a space-delimited list
271                     sub_info[setting[-1]] = fields[1].split(' ')
272                 else:
273                     sub_info[setting[-1]] = fields[1]
274         return info
275
276     def _read_old(self, path, info):
277         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)