Make file-format-version=0.5 support explicit in JPKDriver
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import os.path
24 import pprint
25 import zipfile
26
27 import numpy
28
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
33
34
35 class JPKDriver (Driver):
36     """Handle JPK ForceRobot's data format.
37     """
38     def __init__(self):
39         super(JPKDriver, self).__init__(name='jpk')
40
41     def is_me(self, path):
42         if os.path.isdir(path):
43             return False
44         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
45             with Closing(zipfile.ZipFile(path, 'r')) as f:
46                 if 'header.properties' not in f.namelist():
47                     return False
48                 with Closing(f.open('header.properties')) as h:
49                     if 'jpk-data-file' in h.read():
50                         return True
51         else:
52             with Closing(open(path, 'r')) as f:
53                 headlines = []
54                 for i in range(3):
55                     headlines.append(f.readline())
56             if headlines[0].startswith('# xPosition') \
57                     and headlines[1].startswith('# yPosition'):
58                 return True
59         return False
60
61     def read(self, path, info=None):
62         if info == None:
63             info = {}
64         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
65             return self._read_zip(path, info)
66         else:
67             return self._read_old(path, info)
68
69     def _read_zip(self, path, info):
70         with Closing(zipfile.ZipFile(path, 'r')) as f:
71             f.path = path
72             zip_info = self._zip_info(f)
73             segments = []
74             for i in range(len([p for p in f.namelist()
75                                 if p.endswith('segment-header.properties')])):
76                 segments.append(self._zip_segment(f, path, info, zip_info, i))
77         if zip_info['file-format-version'] not in ['0.5']:
78             raise NotImplementedError(
79                 'JPK file version %s not supported (yet).'
80                 % zip_info['file-format-version'])
81         for name in ['approach', 'retract']:
82             if len([s for s in segments if s.info['name'] == name]) == 0:
83                 raise ValueError(
84                     'No segment for %s in %s, only %s'
85                     % (name, path, [s.info['name'] for s in segments]))
86         return (segments,
87                 self._zip_translate_params(zip_info,
88                                            segments[0].info['raw info']))
89
90     def _zip_info(self, zipfile):
91         with Closing(zipfile.open('header.properties')) as f:
92             info = self._parse_params(f.readlines())
93             return info
94
95     def _zip_segment(self, zipfile, path, info, zip_info, index):
96         prop_file = zipfile.open(os.path.join(
97                 'segments', str(index), 'segment-header.properties'))
98         prop = self._parse_params(prop_file.readlines())
99         prop_file.close()
100         expected_shape = (int(prop['force-segment-header']['num-points']),)
101         channels = []
102         for chan in prop['channels']['list']:
103             chan_info = prop['channel'][chan]
104             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
105             if channels[-1].shape != expected_shape:
106                     raise NotImplementedError(
107                         'Channel %d:%s in %s has strange shape %s != %s'
108                         % (index, chan, zipfile.path,
109                            channels[-1].shape, expected_shape))
110         d = curve.Data(
111             shape=(len(channels[0]), len(channels)),
112             dtype=channels[0].dtype,
113             info=self._zip_translate_segment_params(prop))
114         for i,chan in enumerate(channels):
115             d[:,i] = chan
116         return self._zip_scale_segment(d, path, info)
117
118     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
119         f = zipfile.open(os.path.join(
120                 'segments', str(segment_index),
121                 chan_info['data']['file']['name']), 'r')
122         assert chan_info['data']['file']['format'] == 'raw', \
123             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
124         assert chan_info['data']['type'] == 'float-data', \
125             'Non-float data format:\n%s' % pprint.pformat(chan_info)
126         data = numpy.frombuffer(
127             buffer(f.read()),
128             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
129         # '>' (big endian) byte order.
130         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
131         #    All forms of raw data are stored in chronological order
132         #    (the order in which they were collected), and the
133         #    individual values are stored in network byte order
134         #    (big-endian). The data type used to store the data is
135         #    specified by the "channel.*.data.type" property, and is
136         #    either short (2 bytes per value), integer (4 bytes), or
137         #    float (4 bytes, IEEE format).
138         f.close()
139         return data
140
141     def _zip_translate_params(self, params, chan_info):
142         info = {
143             'raw info':params,
144             #'time':self._time_from_TODO(raw_info[]),
145             }
146         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
147         assert force_unit == 'N', force_unit
148         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
149         assert force_base == 'distance', force_base
150         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
151         assert dist_unit == 'm', dist_unit
152         force_mult = float(
153             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
154         info['spring constant (N/m)'] = force_mult
155         return info
156
157     def _zip_translate_segment_params(self, params):
158         info = {
159             'raw info':params,
160             'columns':list(params['channels']['list']),
161             'name':params['force-segment-header']['name']['name'],
162             }
163         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
164             info['name'] = info['name'][:-len('-spm')]
165             if info['name'] == 'extend':
166                 info['name'] = 'approach'
167         else:
168             raise NotImplementedError(
169                 'Unrecognized segment type %s' % info['name'])
170         return info
171
172     def _zip_scale_segment(self, segment, path, info):
173         data = curve.Data(
174             shape=segment.shape,
175             dtype=segment.dtype,
176             info={})
177         data[:,:] = segment
178         segment.info['raw data'] = data
179
180         # raw column indices
181         channels = segment.info['raw info']['channels']['list']
182         z_col = channels.index('height')
183         d_col = channels.index('vDeflection')
184         
185         segment = self._zip_scale_channel(
186             segment, z_col, 'calibrated', path, info)
187         segment = self._zip_scale_channel(
188             segment, d_col, 'distance', path, info)
189
190         assert segment.info['columns'][z_col] == 'height (m)', \
191             segment.info['columns'][z_col]
192         assert segment.info['columns'][d_col] == 'vDeflection (m)', \
193             segment.info['columns'][d_col]
194
195         # scaled column indices same as raw column indices,
196         # because columns is a copy of channels.list
197         segment.info['columns'][z_col] = 'z piezo (m)'
198         segment.info['columns'][d_col] = 'deflection (m)'
199         return segment
200
201     def _zip_scale_channel(self, segment, channel, conversion, path, info):
202         channel_name = segment.info['raw info']['channels']['list'][channel]
203         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
204         conversion_info = conversion_set['conversion'][conversion]
205         if conversion_info['base-calibration-slot'] \
206                 != conversion_set['conversions']['base']:
207             # Our conversion is stacked on a previous conversion.  Do
208             # the previous conversion first.
209             segment = self._zip_scale_channel(
210                 segment, channel, conversion_info['base-calibration-slot'],
211                 info, path)
212         if conversion_info['type'] == 'file':
213             key = ('%s_%s_to_%s_calibration_file'
214                    % (channel_name,
215                       conversion_info['base-calibration-slot'],
216                       conversion))
217             calib_path = conversion_info['file']
218             if key in info:
219                 calib_path = os.path.join(os.path.dirname(path), info[key])
220                 self.logger().debug(
221                     'Overriding %s -> %s calibration for %s channel: %s'
222                     % (conversion_info['base-calibration-slot'],
223                        conversion, channel_name, calib_path))
224             if os.path.exists(calib_path):
225                 with file(calib_path, 'r') as f:
226                     lines = [x.strip() for x in f.readlines()]
227                     f.close()
228                 calib = {  # I've emailed JPK to confirm this file format.
229                     'title':lines[0],
230                     'multiplier':float(lines[1]),
231                     'offset':float(lines[2]),
232                     'unit':lines[3],
233                     'note':'\n'.join(lines[4:]),
234                     }
235                 segment[:,channel] = (segment[:,channel] * calib['multiplier']
236                                       + calib['offset'])
237                 segment.info['columns'][channel] = (
238                     '%s (%s)' % (channel_name, calib['unit']))
239                 return segment
240             else:
241                 self.logger().warn(
242                     'Skipping %s -> %s calibration for %s channel.  Calibration file %s not found'
243                     % (conversion_info['base-calibration-slot'],
244                        conversion, channel_name, calib_path))
245         else:
246             assert conversion_info['type'] == 'simple', conversion_info['type']
247         assert conversion_info['scaling']['type'] == 'linear', \
248             conversion_info['scaling']['type']
249         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
250             conversion_info['scaling']['style']
251         multiplier = float(conversion_info['scaling']['multiplier'])
252         offset = float(conversion_info['scaling']['offset'])
253         unit = conversion_info['scaling']['unit']['unit']
254         segment[:,channel] = segment[:,channel] * multiplier + offset
255         segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
256         return segment
257
258     def _parse_params(self, lines):
259         info = {}
260         for line in lines:
261             line = line.strip()
262             if line.startswith('#'):
263                 continue
264             else:
265                 # e.g.: force-segment-header.type=xy-position-segment-header
266                 fields = line.split('=', 1)
267                 assert len(fields) == 2, line
268                 setting = fields[0].split('.')
269                 sub_info = info  # drill down, e.g. info['force-s..']['type']
270                 for s in setting[:-1]:
271                     if s not in sub_info:
272                         sub_info[s] = {}
273                     sub_info = sub_info[s]
274                 if setting[-1] == 'list':  # split a space-delimited list
275                     sub_info[setting[-1]] = fields[1].split(' ')
276                 else:
277                     sub_info[setting[-1]] = fields[1]
278         return info
279
280     def _read_old(self, path, info):
281         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)