Ran update_copyright.py
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import os.path
24 import pprint
25 import zipfile
26
27 import numpy
28
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
33
34
35 class JPKDriver (Driver):
36     """Handle JPK ForceRobot's data format.
37     """
38     def __init__(self):
39         super(JPKDriver, self).__init__(name='jpk')
40
41     def is_me(self, path):
42         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
43             with Closing(zipfile.ZipFile(path, 'r')) as f:
44                 if 'header.properties' not in f.namelist():
45                     return False
46                 with Closing(f.open('header.properties')) as h:
47                     if 'jpk-data-file' in h.read():
48                         return True
49         else:
50             with Closing(open(path, 'r')) as f:
51                 headlines = []
52                 for i in range(3):
53                     headlines.append(f.readline())
54             if headlines[0].startswith('# xPosition') \
55                     and headlines[1].startswith('# yPosition'):
56                 return True
57         return False
58
59     def read(self, path, info=None):
60         if info == None:
61             info = {}
62         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
63             return self._read_zip(path, info)
64         else:
65             return self._read_old(path, info)
66
67     def _read_zip(self, path, info):
68         with Closing(zipfile.ZipFile(path, 'r')) as f:
69             f.path = path
70             zip_info = self._zip_info(f)
71             segments = []
72             for i in range(len([p for p in f.namelist()
73                                 if p.endswith('segment-header.properties')])):
74                 segments.append(self._zip_segment(f, path, info, zip_info, i))
75             for name in ['approach', 'retract']:
76                 if len([s for s in segments if s.info['name'] == name]) == 0:
77                     raise ValueError(
78                         'No segment for %s in %s, only %s'
79                         % (name, path, [s.info['name'] for s in segments]))
80             return (segments,
81                     self._zip_translate_params(zip_info,
82                                                segments[0].info['raw info']))
83
84     def _zip_info(self, zipfile):
85         with Closing(zipfile.open('header.properties')) as f:
86             info = self._parse_params(f.readlines())
87             return info
88
89     def _zip_segment(self, zipfile, path, info, zip_info, index):
90         prop_file = zipfile.open(os.path.join(
91                 'segments', str(index), 'segment-header.properties'))
92         prop = self._parse_params(prop_file.readlines())
93         prop_file.close()
94         expected_shape = (int(prop['force-segment-header']['num-points']),)
95         channels = []
96         for chan in prop['channels']['list']:
97             chan_info = prop['channel'][chan]
98             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
99             if channels[-1].shape != expected_shape:
100                     raise NotImplementedError(
101                         'Channel %d:%s in %s has strange shape %s != %s'
102                         % (index, chan, zipfile.path,
103                            channels[-1].shape, expected_shape))
104         d = curve.Data(
105             shape=(len(channels[0]), len(channels)),
106             dtype=channels[0].dtype,
107             info=self._zip_translate_segment_params(prop))
108         for i,chan in enumerate(channels):
109             d[:,i] = chan
110         return self._zip_scale_segment(d, path, info)
111
112     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
113         f = zipfile.open(os.path.join(
114                 'segments', str(segment_index),
115                 chan_info['data']['file']['name']), 'r')
116         assert chan_info['data']['file']['format'] == 'raw', \
117             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
118         assert chan_info['data']['type'] == 'float-data', \
119             'Non-float data format:\n%s' % pprint.pformat(chan_info)
120         data = numpy.frombuffer(
121             buffer(f.read()),
122             dtype=numpy.dtype(numpy.float32).newbyteorder('>'),
123             # Is JPK data always big endian?  I can't find a config
124             # setting.  The ForceRobot brochure
125             #   http://www.jpk.com/forcerobot300-1.download.6d694150f14773dc76bc0c3a8a6dd0e8.pdf
126             # lists a PowerPC chip on page 4, under Control
127             # electronics, and PPCs are usually big endian.
128             #   http://en.wikipedia.org/wiki/PowerPC#Endian_modes
129             )
130         f.close()
131         return data
132
133     def _zip_translate_params(self, params, chan_info):
134         info = {
135             'raw info':params,
136             #'time':self._time_from_TODO(raw_info[]),
137             }
138         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
139         assert force_unit == 'N', force_unit
140         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
141         assert force_base == 'distance', force_base
142         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
143         assert dist_unit == 'm', dist_unit
144         force_mult = float(
145             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
146         info['spring constant (N/m)'] = force_mult
147         return info
148
149     def _zip_translate_segment_params(self, params):
150         info = {
151             'raw info':params,
152             'columns':list(params['channels']['list']),
153             'name':params['force-segment-header']['name']['name'],
154             }
155         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
156             info['name'] = info['name'][:-len('-spm')]
157             if info['name'] == 'extend':
158                 info['name'] = 'approach'
159         else:
160             raise NotImplementedError(
161                 'Unrecognized segment type %s' % info['name'])
162         return info
163
164     def _zip_scale_segment(self, segment, path, info):
165         data = curve.Data(
166             shape=segment.shape,
167             dtype=segment.dtype,
168             info={})
169         data[:,:] = segment
170         segment.info['raw data'] = data
171
172         # raw column indices
173         channels = segment.info['raw info']['channels']['list']
174         z_col = channels.index('height')
175         d_col = channels.index('vDeflection')
176         
177         segment = self._zip_scale_channel(
178             segment, z_col, 'calibrated', path, info)
179         segment = self._zip_scale_channel(
180             segment, d_col, 'distance', path, info)
181
182         assert segment.info['columns'][z_col] == 'height (m)', \
183             segment.info['columns'][z_col]
184         assert segment.info['columns'][d_col] == 'vDeflection (m)', \
185             segment.info['columns'][d_col]
186
187         # scaled column indices same as raw column indices,
188         # because columns is a copy of channels.list
189         segment.info['columns'][z_col] = 'z piezo (m)'
190         segment.info['columns'][d_col] = 'deflection (m)'
191         return segment
192
193     def _zip_scale_channel(self, segment, channel, conversion, path, info):
194         channel_name = segment.info['raw info']['channels']['list'][channel]
195         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
196         conversion_info = conversion_set['conversion'][conversion]
197         if conversion_info['base-calibration-slot'] \
198                 != conversion_set['conversions']['base']:
199             # Our conversion is stacked on a previous conversion.  Do
200             # the previous conversion first.
201             segment = self._zip_scale_channel(
202                 segment, channel, conversion_info['base-calibration-slot'],
203                 info, path)
204         if conversion_info['type'] == 'file':
205             key = ('%s_%s_to_%s_calibration_file'
206                    % (channel_name,
207                       conversion_info['base-calibration-slot'],
208                       conversion))
209             calib_path = conversion_info['file']
210             if key in info:
211                 calib_path = os.path.join(os.path.dirname(path), info[key])
212                 self.logger().debug(
213                     'Overriding %s -> %s calibration for %s channel: %s'
214                     % (conversion_info['base-calibration-slot'],
215                        conversion, channel_name, calib_path))
216             if os.path.exists(calib_path):
217                 with file(calib_path, 'r') as f:
218                     lines = [x.strip() for x in f.readlines()]
219                     f.close()
220                 calib = {  # I've emailed JPK to confirm this file format.
221                     'title':lines[0],
222                     'multiplier':float(lines[1]),
223                     'offset':float(lines[2]),
224                     'unit':lines[3],
225                     'note':'\n'.join(lines[4:]),
226                     }
227                 segment[:,channel] = (segment[:,channel] * calib['multiplier']
228                                       + calib['offset'])
229                 segment.info['columns'][channel] = (
230                     '%s (%s)' % (channel_name, calib['unit']))
231                 return segment
232             else:
233                 self.logger().warn(
234                     'Skipping %s -> %s calibration for %s channel.  Calibration file %s not found'
235                     % (conversion_info['base-calibration-slot'],
236                        conversion, channel_name, calib_path))
237         else:
238             assert conversion_info['type'] == 'simple', conversion_info['type']
239         assert conversion_info['scaling']['type'] == 'linear', \
240             conversion_info['scaling']['type']
241         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
242             conversion_info['scaling']['style']
243         multiplier = float(conversion_info['scaling']['multiplier'])
244         offset = float(conversion_info['scaling']['offset'])
245         unit = conversion_info['scaling']['unit']['unit']
246         segment[:,channel] = segment[:,channel] * multiplier + offset
247         segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
248         return segment
249
250     def _parse_params(self, lines):
251         info = {}
252         for line in lines:
253             line = line.strip()
254             if line.startswith('#'):
255                 continue
256             else:
257                 # e.g.: force-segment-header.type=xy-position-segment-header
258                 fields = line.split('=', 1)
259                 assert len(fields) == 2, line
260                 setting = fields[0].split('.')
261                 sub_info = info  # drill down, e.g. info['force-s..']['type']
262                 for s in setting[:-1]:
263                     if s not in sub_info:
264                         sub_info[s] = {}
265                     sub_info = sub_info[s]
266                 if setting[-1] == 'list':  # split a space-delimited list
267                     sub_info[setting[-1]] = fields[1].split(' ')
268                 else:
269                     sub_info[setting[-1]] = fields[1]
270         return info
271
272     def _read_old(self, path, info):
273         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)