800f78e13ba70835145634a92c0affb467ce63a0
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import os.path
24 import pprint
25 import zipfile
26
27 import numpy
28
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
33
34
35 class JPKDriver (Driver):
36     """Handle JPK ForceRobot's data format.
37     """
38     def __init__(self):
39         super(JPKDriver, self).__init__(name='jpk')
40
41     def is_me(self, path):
42         if os.path.isdir(path):
43             return False
44         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
45             with Closing(zipfile.ZipFile(path, 'r')) as f:
46                 if 'header.properties' not in f.namelist():
47                     return False
48                 with Closing(f.open('header.properties')) as h:
49                     if 'jpk-data-file' in h.read():
50                         return True
51         else:
52             with Closing(open(path, 'r')) as f:
53                 headlines = []
54                 for i in range(3):
55                     headlines.append(f.readline())
56             if headlines[0].startswith('# xPosition') \
57                     and headlines[1].startswith('# yPosition'):
58                 return True
59         return False
60
61     def read(self, path, info=None):
62         if info == None:
63             info = {}
64         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
65             return self._read_zip(path, info)
66         else:
67             return self._read_old(path, info)
68
69     def _read_zip(self, path, info):
70         with Closing(zipfile.ZipFile(path, 'r')) as f:
71             f.path = path
72             zip_info = self._zip_info(f)
73             segments = []
74             for i in range(len([p for p in f.namelist()
75                                 if p.endswith('segment-header.properties')])):
76                 segments.append(self._zip_segment(f, path, info, zip_info, i))
77         if zip_info['file-format-version'] not in ['0.5']:
78             raise NotImplementedError(
79                 'JPK file version %s not supported (yet).'
80                 % zip_info['file-format-version'])
81         for name in ['approach', 'retract']:
82             if len([s for s in segments if s.info['name'] == name]) == 0:
83                 raise ValueError(
84                     'No segment for %s in %s, only %s'
85                     % (name, path, [s.info['name'] for s in segments]))
86         curve_info = self._zip_translate_params(zip_info,
87                                                 segments[0].info['raw info'])
88         for segment in segments:
89             segment.info['spring constant (N/m)'] = \
90                 curve_info['spring constant (N/m)']
91         return (segments,curve_info)
92
93     def _zip_info(self, zipfile):
94         with Closing(zipfile.open('header.properties')) as f:
95             info = self._parse_params(f.readlines())
96             return info
97
98     def _zip_segment(self, zipfile, path, info, zip_info, index):
99         prop_file = zipfile.open(os.path.join(
100                 'segments', str(index), 'segment-header.properties'))
101         prop = self._parse_params(prop_file.readlines())
102         prop_file.close()
103         expected_shape = (int(prop['force-segment-header']['num-points']),)
104         channels = []
105         for chan in prop['channels']['list']:
106             chan_info = prop['channel'][chan]
107             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
108             if channels[-1].shape != expected_shape:
109                     raise NotImplementedError(
110                         'Channel %d:%s in %s has strange shape %s != %s'
111                         % (index, chan, zipfile.path,
112                            channels[-1].shape, expected_shape))
113         d = curve.Data(
114             shape=(len(channels[0]), len(channels)),
115             dtype=channels[0].dtype,
116             info=self._zip_translate_segment_params(prop))
117         for i,chan in enumerate(channels):
118             d[:,i] = chan
119         return self._zip_scale_segment(d, path, info)
120
121     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
122         f = zipfile.open(os.path.join(
123                 'segments', str(segment_index),
124                 chan_info['data']['file']['name']), 'r')
125         assert chan_info['data']['file']['format'] == 'raw', \
126             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
127         assert chan_info['data']['type'] == 'float-data', \
128             'Non-float data format:\n%s' % pprint.pformat(chan_info)
129         data = numpy.frombuffer(
130             buffer(f.read()),
131             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
132         # '>' (big endian) byte order.
133         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
134         #    All forms of raw data are stored in chronological order
135         #    (the order in which they were collected), and the
136         #    individual values are stored in network byte order
137         #    (big-endian). The data type used to store the data is
138         #    specified by the "channel.*.data.type" property, and is
139         #    either short (2 bytes per value), integer (4 bytes), or
140         #    float (4 bytes, IEEE format).
141         f.close()
142         return data
143
144     def _zip_translate_params(self, params, chan_info):
145         info = {
146             'raw info':params,
147             #'time':self._time_from_TODO(raw_info[]),
148             }
149         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
150         assert force_unit == 'N', force_unit
151         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
152         assert force_base == 'distance', force_base
153         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
154         assert dist_unit == 'm', dist_unit
155         force_mult = float(
156             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
157         info['spring constant (N/m)'] = force_mult
158         return info
159
160     def _zip_translate_segment_params(self, params):
161         info = {
162             'raw info':params,
163             'columns':list(params['channels']['list']),
164             'name':params['force-segment-header']['name']['name'],
165             }
166         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
167             info['name'] = info['name'][:-len('-spm')]
168             if info['name'] == 'extend':
169                 info['name'] = 'approach'
170         else:
171             raise NotImplementedError(
172                 'Unrecognized segment type %s' % info['name'])
173         return info
174
175     def _zip_scale_segment(self, segment, path, info):
176         data = curve.Data(
177             shape=segment.shape,
178             dtype=segment.dtype,
179             info={})
180         data[:,:] = segment
181         segment.info['raw data'] = data
182
183         # raw column indices
184         channels = segment.info['raw info']['channels']['list']
185         z_col = channels.index('height')
186         d_col = channels.index('vDeflection')
187         
188         segment = self._zip_scale_channel(
189             segment, z_col, 'calibrated', path, info)
190         segment = self._zip_scale_channel(
191             segment, d_col, 'distance', path, info)
192
193         assert segment.info['columns'][z_col] == 'height (m)', \
194             segment.info['columns'][z_col]
195         assert segment.info['columns'][d_col] == 'vDeflection (m)', \
196             segment.info['columns'][d_col]
197
198         # scaled column indices same as raw column indices,
199         # because columns is a copy of channels.list
200         segment.info['columns'][z_col] = 'z piezo (m)'
201         segment.info['columns'][d_col] = 'deflection (m)'
202         return segment
203
204     def _zip_scale_channel(self, segment, channel, conversion, path, info):
205         channel_name = segment.info['raw info']['channels']['list'][channel]
206         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
207         conversion_info = conversion_set['conversion'][conversion]
208         if conversion_info['base-calibration-slot'] \
209                 != conversion_set['conversions']['base']:
210             # Our conversion is stacked on a previous conversion.  Do
211             # the previous conversion first.
212             segment = self._zip_scale_channel(
213                 segment, channel, conversion_info['base-calibration-slot'],
214                 path=path, info=info)
215         if conversion_info['type'] == 'file':
216             # Michael Haggerty at JPK points out that the conversion
217             # information stored in the external file is reproduced in
218             # the force curve file.  So there is no need to actually
219             # read `conversion_info['file']`.  In fact, the data there
220             # may have changed with future calibrations, while the
221             # information stored directly in conversion_info retains
222             # the calibration information as it was when the experiment
223             # was performed.
224             pass  # Fall through to 'simple' conversion processing.
225         else:
226             assert conversion_info['type'] == 'simple', conversion_info['type']
227         assert conversion_info['scaling']['type'] == 'linear', \
228             conversion_info['scaling']['type']
229         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
230             conversion_info['scaling']['style']
231         multiplier = float(conversion_info['scaling']['multiplier'])
232         offset = float(conversion_info['scaling']['offset'])
233         unit = conversion_info['scaling']['unit']['unit']
234         segment[:,channel] = segment[:,channel] * multiplier + offset
235         segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
236         return segment
237
238     def _parse_params(self, lines):
239         info = {}
240         for line in lines:
241             line = line.strip()
242             if line.startswith('#'):
243                 continue
244             else:
245                 # e.g.: force-segment-header.type=xy-position-segment-header
246                 fields = line.split('=', 1)
247                 assert len(fields) == 2, line
248                 setting = fields[0].split('.')
249                 sub_info = info  # drill down, e.g. info['force-s..']['type']
250                 for s in setting[:-1]:
251                     if s not in sub_info:
252                         sub_info[s] = {}
253                     sub_info = sub_info[s]
254                 if setting[-1] == 'list':  # split a space-delimited list
255                     sub_info[setting[-1]] = fields[1].split(' ')
256                 else:
257                     sub_info[setting[-1]] = fields[1]
258         return info
259
260     def _read_old(self, path, info):
261         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)