31d98104e0d22e6ba91085e894de3139e7a9db3d
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import os.path
24 import pprint
25 import zipfile
26
27 import numpy
28
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
33
34
35 class JPKDriver (Driver):
36     """Handle JPK ForceRobot's data format.
37     """
38     def __init__(self):
39         super(JPKDriver, self).__init__(name='jpk')
40
41     def is_me(self, path):
42         if os.path.isdir(path):
43             return False
44         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
45             with Closing(zipfile.ZipFile(path, 'r')) as f:
46                 if 'header.properties' not in f.namelist():
47                     return False
48                 with Closing(f.open('header.properties')) as h:
49                     if 'jpk-data-file' in h.read():
50                         return True
51         else:
52             with Closing(open(path, 'r')) as f:
53                 headlines = []
54                 for i in range(3):
55                     headlines.append(f.readline())
56             if headlines[0].startswith('# xPosition') \
57                     and headlines[1].startswith('# yPosition'):
58                 return True
59         return False
60
61     def read(self, path, info=None):
62         if info == None:
63             info = {}
64         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
65             return self._read_zip(path, info)
66         else:
67             return self._read_old(path, info)
68
69     def _read_zip(self, path, info):
70         with Closing(zipfile.ZipFile(path, 'r')) as f:
71             f.path = path
72             zip_info = self._zip_info(f)
73             segments = []
74             for i in range(len([p for p in f.namelist()
75                                 if p.endswith('segment-header.properties')])):
76                 segments.append(self._zip_segment(f, path, info, zip_info, i))
77             for name in ['approach', 'retract']:
78                 if len([s for s in segments if s.info['name'] == name]) == 0:
79                     raise ValueError(
80                         'No segment for %s in %s, only %s'
81                         % (name, path, [s.info['name'] for s in segments]))
82             return (segments,
83                     self._zip_translate_params(zip_info,
84                                                segments[0].info['raw info']))
85
86     def _zip_info(self, zipfile):
87         with Closing(zipfile.open('header.properties')) as f:
88             info = self._parse_params(f.readlines())
89             return info
90
91     def _zip_segment(self, zipfile, path, info, zip_info, index):
92         prop_file = zipfile.open(os.path.join(
93                 'segments', str(index), 'segment-header.properties'))
94         prop = self._parse_params(prop_file.readlines())
95         prop_file.close()
96         expected_shape = (int(prop['force-segment-header']['num-points']),)
97         channels = []
98         for chan in prop['channels']['list']:
99             chan_info = prop['channel'][chan]
100             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
101             if channels[-1].shape != expected_shape:
102                     raise NotImplementedError(
103                         'Channel %d:%s in %s has strange shape %s != %s'
104                         % (index, chan, zipfile.path,
105                            channels[-1].shape, expected_shape))
106         d = curve.Data(
107             shape=(len(channels[0]), len(channels)),
108             dtype=channels[0].dtype,
109             info=self._zip_translate_segment_params(prop))
110         for i,chan in enumerate(channels):
111             d[:,i] = chan
112         return self._zip_scale_segment(d, path, info)
113
114     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
115         f = zipfile.open(os.path.join(
116                 'segments', str(segment_index),
117                 chan_info['data']['file']['name']), 'r')
118         assert chan_info['data']['file']['format'] == 'raw', \
119             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
120         assert chan_info['data']['type'] == 'float-data', \
121             'Non-float data format:\n%s' % pprint.pformat(chan_info)
122         data = numpy.frombuffer(
123             buffer(f.read()),
124             dtype=numpy.dtype(numpy.float32).newbyteorder('>'),
125             # Is JPK data always big endian?  I can't find a config
126             # setting.  The ForceRobot brochure
127             #   http://www.jpk.com/forcerobot300-1.download.6d694150f14773dc76bc0c3a8a6dd0e8.pdf
128             # lists a PowerPC chip on page 4, under Control
129             # electronics, and PPCs are usually big endian.
130             #   http://en.wikipedia.org/wiki/PowerPC#Endian_modes
131             )
132         f.close()
133         return data
134
135     def _zip_translate_params(self, params, chan_info):
136         info = {
137             'raw info':params,
138             #'time':self._time_from_TODO(raw_info[]),
139             }
140         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
141         assert force_unit == 'N', force_unit
142         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
143         assert force_base == 'distance', force_base
144         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
145         assert dist_unit == 'm', dist_unit
146         force_mult = float(
147             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
148         info['spring constant (N/m)'] = force_mult
149         return info
150
151     def _zip_translate_segment_params(self, params):
152         info = {
153             'raw info':params,
154             'columns':list(params['channels']['list']),
155             'name':params['force-segment-header']['name']['name'],
156             }
157         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
158             info['name'] = info['name'][:-len('-spm')]
159             if info['name'] == 'extend':
160                 info['name'] = 'approach'
161         else:
162             raise NotImplementedError(
163                 'Unrecognized segment type %s' % info['name'])
164         return info
165
166     def _zip_scale_segment(self, segment, path, info):
167         data = curve.Data(
168             shape=segment.shape,
169             dtype=segment.dtype,
170             info={})
171         data[:,:] = segment
172         segment.info['raw data'] = data
173
174         # raw column indices
175         channels = segment.info['raw info']['channels']['list']
176         z_col = channels.index('height')
177         d_col = channels.index('vDeflection')
178         
179         segment = self._zip_scale_channel(
180             segment, z_col, 'calibrated', path, info)
181         segment = self._zip_scale_channel(
182             segment, d_col, 'distance', path, info)
183
184         assert segment.info['columns'][z_col] == 'height (m)', \
185             segment.info['columns'][z_col]
186         assert segment.info['columns'][d_col] == 'vDeflection (m)', \
187             segment.info['columns'][d_col]
188
189         # scaled column indices same as raw column indices,
190         # because columns is a copy of channels.list
191         segment.info['columns'][z_col] = 'z piezo (m)'
192         segment.info['columns'][d_col] = 'deflection (m)'
193         return segment
194
195     def _zip_scale_channel(self, segment, channel, conversion, path, info):
196         channel_name = segment.info['raw info']['channels']['list'][channel]
197         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
198         conversion_info = conversion_set['conversion'][conversion]
199         if conversion_info['base-calibration-slot'] \
200                 != conversion_set['conversions']['base']:
201             # Our conversion is stacked on a previous conversion.  Do
202             # the previous conversion first.
203             segment = self._zip_scale_channel(
204                 segment, channel, conversion_info['base-calibration-slot'],
205                 info, path)
206         if conversion_info['type'] == 'file':
207             key = ('%s_%s_to_%s_calibration_file'
208                    % (channel_name,
209                       conversion_info['base-calibration-slot'],
210                       conversion))
211             calib_path = conversion_info['file']
212             if key in info:
213                 calib_path = os.path.join(os.path.dirname(path), info[key])
214                 self.logger().debug(
215                     'Overriding %s -> %s calibration for %s channel: %s'
216                     % (conversion_info['base-calibration-slot'],
217                        conversion, channel_name, calib_path))
218             if os.path.exists(calib_path):
219                 with file(calib_path, 'r') as f:
220                     lines = [x.strip() for x in f.readlines()]
221                     f.close()
222                 calib = {  # I've emailed JPK to confirm this file format.
223                     'title':lines[0],
224                     'multiplier':float(lines[1]),
225                     'offset':float(lines[2]),
226                     'unit':lines[3],
227                     'note':'\n'.join(lines[4:]),
228                     }
229                 segment[:,channel] = (segment[:,channel] * calib['multiplier']
230                                       + calib['offset'])
231                 segment.info['columns'][channel] = (
232                     '%s (%s)' % (channel_name, calib['unit']))
233                 return segment
234             else:
235                 self.logger().warn(
236                     'Skipping %s -> %s calibration for %s channel.  Calibration file %s not found'
237                     % (conversion_info['base-calibration-slot'],
238                        conversion, channel_name, calib_path))
239         else:
240             assert conversion_info['type'] == 'simple', conversion_info['type']
241         assert conversion_info['scaling']['type'] == 'linear', \
242             conversion_info['scaling']['type']
243         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
244             conversion_info['scaling']['style']
245         multiplier = float(conversion_info['scaling']['multiplier'])
246         offset = float(conversion_info['scaling']['offset'])
247         unit = conversion_info['scaling']['unit']['unit']
248         segment[:,channel] = segment[:,channel] * multiplier + offset
249         segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
250         return segment
251
252     def _parse_params(self, lines):
253         info = {}
254         for line in lines:
255             line = line.strip()
256             if line.startswith('#'):
257                 continue
258             else:
259                 # e.g.: force-segment-header.type=xy-position-segment-header
260                 fields = line.split('=', 1)
261                 assert len(fields) == 2, line
262                 setting = fields[0].split('.')
263                 sub_info = info  # drill down, e.g. info['force-s..']['type']
264                 for s in setting[:-1]:
265                     if s not in sub_info:
266                         sub_info[s] = {}
267                     sub_info = sub_info[s]
268                 if setting[-1] == 'list':  # split a space-delimited list
269                     sub_info[setting[-1]] = fields[1].split(' ')
270                 else:
271                     sub_info[setting[-1]] = fields[1]
272         return info
273
274     def _read_old(self, path, info):
275         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)