Updated JPK driver for JPK's version 0.5 format and new driver architecture
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or
7 # modify it under the terms of the GNU Lesser General Public
8 # License as published by the Free Software Foundation, either
9 # version 3 of the License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import logging
24 import os.path
25 import pprint
26 import zipfile
27
28 import numpy
29
30 from .. import curve as curve
31 from .. import experiment as experiment
32 from . import Driver as Driver
33
34
35 class JPKDriver (Driver):
36     """Handle JPK ForceRobot's data format.
37     """
38     def __init__(self):
39         super(JPKDriver, self).__init__(name='jpk')
40
41     def is_me(self, path):
42         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
43             f = h = None
44             try:
45                 f = zipfile.ZipFile(path, 'r')
46                 if 'header.properties' not in f.namelist():
47                     return False
48                 h = f.open('header.properties')
49                 if 'jpk-data-file' in h.read():
50                     return True
51             finally:
52                 if h != None:
53                     h.close()
54                 if f != None:
55                     f.close()
56         else:
57             f = None
58             try:
59                 f = open(path, 'r')
60                 headlines = []
61                 for i in range(3):
62                     headlines.append(f.readline())
63                 if headlines[0].startswith('# xPosition') \
64                         and headlines[1].startswith('# yPosition'):
65                     return True
66             finally:
67                 if f != None:
68                     f.close()
69         return False
70
71     def read(self, path):
72         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
73             return self._read_zip(path)
74         else:
75             return self._read_old(path)
76
77     def _read_zip(self, path):
78         f = None
79         try:
80             f = zipfile.ZipFile(path, 'r')
81             f.path = path
82             info = self._zip_info(f)
83             approach = self._zip_segment(f, info, 0)
84             retract = self._zip_segment(f, info, 1)
85             assert approach.info['name'] == 'approach', approach.info['name']
86             assert retract.info['name'] == 'retract', retract.info['name']
87             return ([approach, retract],
88                     self._zip_translate_params(info, retract.info['raw info']))
89         finally:
90             if f != None:
91                 f.close()
92
93     def _zip_info(self, zipfile):
94         h = None
95         try:
96             h = zipfile.open('header.properties')
97             info = self._parse_params(h.readlines())
98             return info
99         finally:
100             if h != None:
101                 h.close()
102
103     def _zip_segment(self, zipfile, info, index):
104         prop_file = zipfile.open(os.path.join(
105                 'segments', str(index), 'segment-header.properties'))
106         prop = self._parse_params(prop_file.readlines())
107         prop_file.close()
108         expected_shape = (int(prop['force-segment-header']['num-points']),)
109         channels = []
110         for chan in prop['channels']['list']:
111             chan_info = prop['channel'][chan]
112             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
113             if channels[-1].shape != expected_shape:
114                     raise NotImplementedError(
115                         'Channel %d:%s in %s has strange shape %s != %s'
116                         % (index, chan, zipfile.path,
117                            channels[-1].shape, expected_shape))
118         d = curve.Data(
119             shape=(len(channels[0]), len(channels)),
120             dtype=channels[0].dtype,
121             info=self._zip_translate_segment_params(prop))
122         for i,chan in enumerate(channels):
123             d[:,i] = chan
124         return self._zip_scale_segment(d)
125
126     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
127         f = zipfile.open(os.path.join(
128                 'segments', str(segment_index),
129                 chan_info['data']['file']['name']), 'r')
130         assert chan_info['data']['file']['format'] == 'raw', \
131             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
132         assert chan_info['data']['type'] == 'float-data', \
133             'Non-float data format:\n%s' % pprint.pformat(chan_info)
134         data = numpy.frombuffer(
135             buffer(f.read()),
136             dtype=numpy.dtype(numpy.float32).newbyteorder('>'),
137             # Is JPK data always big endian?  I can't find a config
138             # setting.  The ForceRobot brochure
139             #   http://www.jpk.com/forcerobot300-1.download.6d694150f14773dc76bc0c3a8a6dd0e8.pdf
140             # lists a PowerPC chip on page 4, under Control
141             # electronics, and PPCs are usually big endian.
142             #   http://en.wikipedia.org/wiki/PowerPC#Endian_modes
143             )
144         f.close()
145         return data
146
147     def _zip_translate_params(self, params, chan_info):
148         info = {
149             'raw info':params,
150             #'time':self._time_from_TODO(raw_info[]),
151             }
152         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
153         assert force_unit == 'N', force_unit
154         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
155         assert force_base == 'distance', force_base
156         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
157         assert dist_unit == 'm', dist_unit
158         force_mult = float(
159             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
160         info['spring constant (N/m)'] = force_mult
161         return info
162
163     def _zip_translate_segment_params(self, params):
164         info = {
165             'raw info':params,
166             'columns':list(params['channels']['list']),
167             'name':params['force-segment-header']['name']['name'],
168             }
169         if info['name'] == 'extend-spm':
170             info['name'] = 'approach'
171         elif info['name'] == 'retract-spm':
172             info['name'] = 'retract'
173         else:
174             raise NotImplementedError(
175                 'Unrecognized segment type %s' % info['name'])
176         return info
177
178     def _zip_scale_segment(self, segment):
179         data = curve.Data(
180             shape=segment.shape,
181             dtype=segment.dtype,
182             info={})
183         data[:,:] = segment
184         segment.info['raw data'] = data
185
186         # raw column indices
187         channels = segment.info['raw info']['channels']['list']
188         z_col = channels.index('height')
189         d_col = channels.index('vDeflection')
190         
191         segment = self._zip_scale_channel(segment, z_col, 'calibrated')
192         segment = self._zip_scale_channel(segment, d_col, 'distance')
193
194         assert segment.info['columns'][z_col] == 'height (m)', \
195             segment.info['columns'][z_col]
196         assert segment.info['columns'][d_col] == 'vDeflection (m)', \
197             segment.info['columns'][d_col]
198
199         # scaled column indices same as raw column indices,
200         # because columns is a copy of channels.list
201         segment.info['columns'][z_col] = 'z piezo (m)'
202         segment.info['columns'][d_col] = 'deflection (m)'
203         return segment
204
205     def _zip_scale_channel(self, segment, channel, conversion):
206         channel_name = segment.info['raw info']['channels']['list'][channel]
207         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
208         conversion_info = conversion_set['conversion'][conversion]
209         if conversion_info['base-calibration-slot'] \
210                 != conversion_set['conversions']['base']:
211             # Our conversion is stacked on a previous conversion.  Do
212             # the previous conversion first.
213             segment = self._zip_scale_channel(
214                 segment, channel, conversion_info['base-calibration-slot'])
215         if conversion_info['type'] == 'file':
216             if os.path.exists(conversion_info['file']):
217                 raise NotImplementedError('No calibration files were available for testing')
218             else:
219                 log = logging.getLogger('hooke')                
220                 log.warn(
221                     'Skipping %s -> %s calibration for %s channel.  Calibration file %s not found'
222                     % (conversion_info['base-calibration-slot'],
223                        conversion, channel_name, conversion_info['file']))
224         else:
225             assert conversion_info['type'] == 'simple', conversion_info['type']
226         assert conversion_info['scaling']['type'] == 'linear', \
227             conversion_info['scaling']['type']
228         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
229             conversion_info['scaling']['style']
230         multiplier = float(conversion_info['scaling']['multiplier'])
231         offset = float(conversion_info['scaling']['offset'])
232         unit = conversion_info['scaling']['unit']['unit']
233         segment[:,channel] = segment[:,channel] * multiplier + offset
234         segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
235         return segment
236
237     def _parse_params(self, lines):
238         info = {}
239         for line in lines:
240             line = line.strip()
241             if line.startswith('#'):
242                 continue
243             else:
244                 # e.g.: force-segment-header.type=xy-position-segment-header
245                 fields = line.split('=', 1)
246                 assert len(fields) == 2, line
247                 setting = fields[0].split('.')
248                 sub_info = info  # drill down, e.g. info['force-s..']['type']
249                 for s in setting[:-1]:
250                     if s not in sub_info:
251                         sub_info[s] = {}
252                     sub_info = sub_info[s]
253                 if setting[-1] == 'list':  # split a space-delimited list
254                     sub_info[setting[-1]] = fields[1].split(' ')
255                 else:
256                     sub_info[setting[-1]] = fields[1]
257         return info
258
259     def _read_old(self, path):
260         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)