f560c09eca55bfa2443e28cef1334978ecbce1ce
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or
7 # modify it under the terms of the GNU Lesser General Public
8 # License as published by the Free Software Foundation, either
9 # version 3 of the License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import logging
24 import os.path
25 import pprint
26 import zipfile
27
28 import numpy
29
30 from .. import curve as curve
31 from .. import experiment as experiment
32 from . import Driver as Driver
33
34
35 class Closing (object):
36     """Add .__enter__() .__exit__() for `with` statements.
37
38     See :pep:`343`.
39     """
40     def __init__(self, obj):
41         self.obj = obj
42
43     def __enter__(self):
44         return self.obj
45
46     def __exit__(self, *exc_info):
47         try:
48             close_it = self.obj.close
49         except AttributeError:
50             pass
51         else:
52             close_it()
53
54
55 class JPKDriver (Driver):
56     """Handle JPK ForceRobot's data format.
57     """
58     def __init__(self):
59         super(JPKDriver, self).__init__(name='jpk')
60
61     def is_me(self, path):
62         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
63             with Closing(zipfile.ZipFile(path, 'r')) as f:
64                 if 'header.properties' not in f.namelist():
65                     return False
66                 with Closing(f.open('header.properties')) as h:
67                     if 'jpk-data-file' in h.read():
68                         return True
69         else:
70             with Closing(open(path, 'r')) as f:
71                 headlines = []
72                 for i in range(3):
73                     headlines.append(f.readline())
74                 if headlines[0].startswith('# xPosition') \
75                         and headlines[1].startswith('# yPosition'):
76                     return True
77         return False
78
79     def read(self, path):
80         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
81             return self._read_zip(path)
82         else:
83             return self._read_old(path)
84
85     def _read_zip(self, path):
86         with Closing(zipfile.ZipFile(path, 'r')) as f:
87             f.path = path
88             info = self._zip_info(f)
89             approach = self._zip_segment(f, info, 0)
90             retract = self._zip_segment(f, info, 1)
91             assert approach.info['name'] == 'approach', approach.info['name']
92             assert retract.info['name'] == 'retract', retract.info['name']
93             return ([approach, retract],
94                     self._zip_translate_params(info, retract.info['raw info']))
95
96     def _zip_info(self, zipfile):
97         with Closing(zipfile.open('header.properties')) as f:
98             info = self._parse_params(f.readlines())
99             return info
100
101     def _zip_segment(self, zipfile, info, index):
102         prop_file = zipfile.open(os.path.join(
103                 'segments', str(index), 'segment-header.properties'))
104         prop = self._parse_params(prop_file.readlines())
105         prop_file.close()
106         expected_shape = (int(prop['force-segment-header']['num-points']),)
107         channels = []
108         for chan in prop['channels']['list']:
109             chan_info = prop['channel'][chan]
110             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
111             if channels[-1].shape != expected_shape:
112                     raise NotImplementedError(
113                         'Channel %d:%s in %s has strange shape %s != %s'
114                         % (index, chan, zipfile.path,
115                            channels[-1].shape, expected_shape))
116         d = curve.Data(
117             shape=(len(channels[0]), len(channels)),
118             dtype=channels[0].dtype,
119             info=self._zip_translate_segment_params(prop))
120         for i,chan in enumerate(channels):
121             d[:,i] = chan
122         return self._zip_scale_segment(d)
123
124     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
125         f = zipfile.open(os.path.join(
126                 'segments', str(segment_index),
127                 chan_info['data']['file']['name']), 'r')
128         assert chan_info['data']['file']['format'] == 'raw', \
129             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
130         assert chan_info['data']['type'] == 'float-data', \
131             'Non-float data format:\n%s' % pprint.pformat(chan_info)
132         data = numpy.frombuffer(
133             buffer(f.read()),
134             dtype=numpy.dtype(numpy.float32).newbyteorder('>'),
135             # Is JPK data always big endian?  I can't find a config
136             # setting.  The ForceRobot brochure
137             #   http://www.jpk.com/forcerobot300-1.download.6d694150f14773dc76bc0c3a8a6dd0e8.pdf
138             # lists a PowerPC chip on page 4, under Control
139             # electronics, and PPCs are usually big endian.
140             #   http://en.wikipedia.org/wiki/PowerPC#Endian_modes
141             )
142         f.close()
143         return data
144
145     def _zip_translate_params(self, params, chan_info):
146         info = {
147             'raw info':params,
148             #'time':self._time_from_TODO(raw_info[]),
149             }
150         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
151         assert force_unit == 'N', force_unit
152         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
153         assert force_base == 'distance', force_base
154         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
155         assert dist_unit == 'm', dist_unit
156         force_mult = float(
157             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
158         info['spring constant (N/m)'] = force_mult
159         return info
160
161     def _zip_translate_segment_params(self, params):
162         info = {
163             'raw info':params,
164             'columns':list(params['channels']['list']),
165             'name':params['force-segment-header']['name']['name'],
166             }
167         if info['name'] == 'extend-spm':
168             info['name'] = 'approach'
169         elif info['name'] == 'retract-spm':
170             info['name'] = 'retract'
171         else:
172             raise NotImplementedError(
173                 'Unrecognized segment type %s' % info['name'])
174         return info
175
176     def _zip_scale_segment(self, segment):
177         data = curve.Data(
178             shape=segment.shape,
179             dtype=segment.dtype,
180             info={})
181         data[:,:] = segment
182         segment.info['raw data'] = data
183
184         # raw column indices
185         channels = segment.info['raw info']['channels']['list']
186         z_col = channels.index('height')
187         d_col = channels.index('vDeflection')
188         
189         segment = self._zip_scale_channel(segment, z_col, 'calibrated')
190         segment = self._zip_scale_channel(segment, d_col, 'distance')
191
192         assert segment.info['columns'][z_col] == 'height (m)', \
193             segment.info['columns'][z_col]
194         assert segment.info['columns'][d_col] == 'vDeflection (m)', \
195             segment.info['columns'][d_col]
196
197         # scaled column indices same as raw column indices,
198         # because columns is a copy of channels.list
199         segment.info['columns'][z_col] = 'z piezo (m)'
200         segment.info['columns'][d_col] = 'deflection (m)'
201         return segment
202
203     def _zip_scale_channel(self, segment, channel, conversion):
204         channel_name = segment.info['raw info']['channels']['list'][channel]
205         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
206         conversion_info = conversion_set['conversion'][conversion]
207         if conversion_info['base-calibration-slot'] \
208                 != conversion_set['conversions']['base']:
209             # Our conversion is stacked on a previous conversion.  Do
210             # the previous conversion first.
211             segment = self._zip_scale_channel(
212                 segment, channel, conversion_info['base-calibration-slot'])
213         if conversion_info['type'] == 'file':
214             if os.path.exists(conversion_info['file']):
215                 raise NotImplementedError('No calibration files were available for testing')
216             else:
217                 log = logging.getLogger('hooke')                
218                 log.warn(
219                     'Skipping %s -> %s calibration for %s channel.  Calibration file %s not found'
220                     % (conversion_info['base-calibration-slot'],
221                        conversion, channel_name, conversion_info['file']))
222         else:
223             assert conversion_info['type'] == 'simple', conversion_info['type']
224         assert conversion_info['scaling']['type'] == 'linear', \
225             conversion_info['scaling']['type']
226         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
227             conversion_info['scaling']['style']
228         multiplier = float(conversion_info['scaling']['multiplier'])
229         offset = float(conversion_info['scaling']['offset'])
230         unit = conversion_info['scaling']['unit']['unit']
231         segment[:,channel] = segment[:,channel] * multiplier + offset
232         segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
233         return segment
234
235     def _parse_params(self, lines):
236         info = {}
237         for line in lines:
238             line = line.strip()
239             if line.startswith('#'):
240                 continue
241             else:
242                 # e.g.: force-segment-header.type=xy-position-segment-header
243                 fields = line.split('=', 1)
244                 assert len(fields) == 2, line
245                 setting = fields[0].split('.')
246                 sub_info = info  # drill down, e.g. info['force-s..']['type']
247                 for s in setting[:-1]:
248                     if s not in sub_info:
249                         sub_info[s] = {}
250                     sub_info = sub_info[s]
251                 if setting[-1] == 'list':  # split a space-delimited list
252                     sub_info[setting[-1]] = fields[1].split(' ')
253                 else:
254                     sub_info[setting[-1]] = fields[1]
255         return info
256
257     def _read_old(self, path):
258         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)