Added calibration file support to the JPK driver.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or
7 # modify it under the terms of the GNU Lesser General Public
8 # License as published by the Free Software Foundation, either
9 # version 3 of the License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import os.path
24 import pprint
25 import zipfile
26
27 import numpy
28
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from . import Driver as Driver
32
33
34 class Closing (object):
35     """Add .__enter__() .__exit__() for `with` statements.
36
37     See :pep:`343`.
38     """
39     def __init__(self, obj):
40         self.obj = obj
41
42     def __enter__(self):
43         return self.obj
44
45     def __exit__(self, *exc_info):
46         try:
47             close_it = self.obj.close
48         except AttributeError:
49             pass
50         else:
51             close_it()
52
53
54 class JPKDriver (Driver):
55     """Handle JPK ForceRobot's data format.
56     """
57     def __init__(self):
58         super(JPKDriver, self).__init__(name='jpk')
59
60     def is_me(self, path):
61         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
62             with Closing(zipfile.ZipFile(path, 'r')) as f:
63                 if 'header.properties' not in f.namelist():
64                     return False
65                 with Closing(f.open('header.properties')) as h:
66                     if 'jpk-data-file' in h.read():
67                         return True
68         else:
69             with Closing(open(path, 'r')) as f:
70                 headlines = []
71                 for i in range(3):
72                     headlines.append(f.readline())
73                 if headlines[0].startswith('# xPosition') \
74                         and headlines[1].startswith('# yPosition'):
75                     return True
76         return False
77
78     def read(self, path, info=None):
79         if info == None:
80             info = {}
81         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
82             return self._read_zip(path, info)
83         else:
84             return self._read_old(path, info)
85
86     def _read_zip(self, path, info):
87         with Closing(zipfile.ZipFile(path, 'r')) as f:
88             f.path = path
89             zip_info = self._zip_info(f)
90             segments = []
91             for i in range(len([p for p in f.namelist()
92                                 if p.endswith('segment-header.properties')])):
93                 segments.append(self._zip_segment(f, path, info, zip_info, i))
94             for name in ['approach', 'retract']:
95                 if len([s for s in segments if s.info['name'] == name]) == 0:
96                     raise ValueError(
97                         'No segment for %s in %s, only %s'
98                         % (name, path, [s.info['name'] for s in segments]))
99             return (segments,
100                     self._zip_translate_params(zip_info,
101                                                segments[0].info['raw info']))
102
103     def _zip_info(self, zipfile):
104         with Closing(zipfile.open('header.properties')) as f:
105             info = self._parse_params(f.readlines())
106             return info
107
108     def _zip_segment(self, zipfile, path, info, zip_info, index):
109         prop_file = zipfile.open(os.path.join(
110                 'segments', str(index), 'segment-header.properties'))
111         prop = self._parse_params(prop_file.readlines())
112         prop_file.close()
113         expected_shape = (int(prop['force-segment-header']['num-points']),)
114         channels = []
115         for chan in prop['channels']['list']:
116             chan_info = prop['channel'][chan]
117             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
118             if channels[-1].shape != expected_shape:
119                     raise NotImplementedError(
120                         'Channel %d:%s in %s has strange shape %s != %s'
121                         % (index, chan, zipfile.path,
122                            channels[-1].shape, expected_shape))
123         d = curve.Data(
124             shape=(len(channels[0]), len(channels)),
125             dtype=channels[0].dtype,
126             info=self._zip_translate_segment_params(prop))
127         for i,chan in enumerate(channels):
128             d[:,i] = chan
129         return self._zip_scale_segment(d, path, info)
130
131     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
132         f = zipfile.open(os.path.join(
133                 'segments', str(segment_index),
134                 chan_info['data']['file']['name']), 'r')
135         assert chan_info['data']['file']['format'] == 'raw', \
136             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
137         assert chan_info['data']['type'] == 'float-data', \
138             'Non-float data format:\n%s' % pprint.pformat(chan_info)
139         data = numpy.frombuffer(
140             buffer(f.read()),
141             dtype=numpy.dtype(numpy.float32).newbyteorder('>'),
142             # Is JPK data always big endian?  I can't find a config
143             # setting.  The ForceRobot brochure
144             #   http://www.jpk.com/forcerobot300-1.download.6d694150f14773dc76bc0c3a8a6dd0e8.pdf
145             # lists a PowerPC chip on page 4, under Control
146             # electronics, and PPCs are usually big endian.
147             #   http://en.wikipedia.org/wiki/PowerPC#Endian_modes
148             )
149         f.close()
150         return data
151
152     def _zip_translate_params(self, params, chan_info):
153         info = {
154             'raw info':params,
155             #'time':self._time_from_TODO(raw_info[]),
156             }
157         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
158         assert force_unit == 'N', force_unit
159         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
160         assert force_base == 'distance', force_base
161         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
162         assert dist_unit == 'm', dist_unit
163         force_mult = float(
164             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
165         info['spring constant (N/m)'] = force_mult
166         return info
167
168     def _zip_translate_segment_params(self, params):
169         info = {
170             'raw info':params,
171             'columns':list(params['channels']['list']),
172             'name':params['force-segment-header']['name']['name'],
173             }
174         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
175             info['name'] = info['name'][:-len('-spm')]
176             if info['name'] == 'extend':
177                 info['name'] = 'approach'
178         else:
179             raise NotImplementedError(
180                 'Unrecognized segment type %s' % info['name'])
181         return info
182
183     def _zip_scale_segment(self, segment, path, info):
184         data = curve.Data(
185             shape=segment.shape,
186             dtype=segment.dtype,
187             info={})
188         data[:,:] = segment
189         segment.info['raw data'] = data
190
191         # raw column indices
192         channels = segment.info['raw info']['channels']['list']
193         z_col = channels.index('height')
194         d_col = channels.index('vDeflection')
195         
196         segment = self._zip_scale_channel(
197             segment, z_col, 'calibrated', path, info)
198         segment = self._zip_scale_channel(
199             segment, d_col, 'distance', path, info)
200
201         assert segment.info['columns'][z_col] == 'height (m)', \
202             segment.info['columns'][z_col]
203         assert segment.info['columns'][d_col] == 'vDeflection (m)', \
204             segment.info['columns'][d_col]
205
206         # scaled column indices same as raw column indices,
207         # because columns is a copy of channels.list
208         segment.info['columns'][z_col] = 'z piezo (m)'
209         segment.info['columns'][d_col] = 'deflection (m)'
210         return segment
211
212     def _zip_scale_channel(self, segment, channel, conversion, path, info):
213         channel_name = segment.info['raw info']['channels']['list'][channel]
214         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
215         conversion_info = conversion_set['conversion'][conversion]
216         if conversion_info['base-calibration-slot'] \
217                 != conversion_set['conversions']['base']:
218             # Our conversion is stacked on a previous conversion.  Do
219             # the previous conversion first.
220             segment = self._zip_scale_channel(
221                 segment, channel, conversion_info['base-calibration-slot'],
222                 info, path)
223         if conversion_info['type'] == 'file':
224             key = ('%s_%s_to_%s_calibration_file'
225                    % (channel_name,
226                       conversion_info['base-calibration-slot'],
227                       conversion))
228             calib_path = conversion_info['file']
229             if key in info:
230                 calib_path = os.path.join(os.path.dirname(path), info[key])
231                 self.logger().debug(
232                     'Overriding %s -> %s calibration for %s channel: %s'
233                     % (conversion_info['base-calibration-slot'],
234                        conversion, channel_name, calib_path))
235             if os.path.exists(calib_path):
236                 with file(calib_path, 'r') as f:
237                     lines = [x.strip() for x in f.readlines()]
238                     f.close()
239                 calib = {  # I've emailed JPK to confirm this file format.
240                     'title':lines[0],
241                     'multiplier':float(lines[1]),
242                     'offset':float(lines[2]),
243                     'unit':lines[3],
244                     'note':'\n'.join(lines[4:]),
245                     }
246                 segment[:,channel] = (segment[:,channel] * calib['multiplier']
247                                       + calib['offset'])
248                 segment.info['columns'][channel] = (
249                     '%s (%s)' % (channel_name, calib['unit']))
250                 return segment
251             else:
252                 self.logger().warn(
253                     'Skipping %s -> %s calibration for %s channel.  Calibration file %s not found'
254                     % (conversion_info['base-calibration-slot'],
255                        conversion, channel_name, calib_path))
256         else:
257             assert conversion_info['type'] == 'simple', conversion_info['type']
258         assert conversion_info['scaling']['type'] == 'linear', \
259             conversion_info['scaling']['type']
260         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
261             conversion_info['scaling']['style']
262         multiplier = float(conversion_info['scaling']['multiplier'])
263         offset = float(conversion_info['scaling']['offset'])
264         unit = conversion_info['scaling']['unit']['unit']
265         segment[:,channel] = segment[:,channel] * multiplier + offset
266         segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
267         return segment
268
269     def _parse_params(self, lines):
270         info = {}
271         for line in lines:
272             line = line.strip()
273             if line.startswith('#'):
274                 continue
275             else:
276                 # e.g.: force-segment-header.type=xy-position-segment-header
277                 fields = line.split('=', 1)
278                 assert len(fields) == 2, line
279                 setting = fields[0].split('.')
280                 sub_info = info  # drill down, e.g. info['force-s..']['type']
281                 for s in setting[:-1]:
282                     if s not in sub_info:
283                         sub_info[s] = {}
284                     sub_info = sub_info[s]
285                 if setting[-1] == 'list':  # split a space-delimited list
286                     sub_info[setting[-1]] = fields[1].split(' ')
287                 else:
288                     sub_info[setting[-1]] = fields[1]
289         return info
290
291     def _read_old(self, path, info):
292         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)
293
294 #  LocalWords:  JPK ForceRobot's