Use 'simple' conversion handler to process 'file'-type JPK conversions.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import os.path
24 import pprint
25 import zipfile
26
27 import numpy
28
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
33
34
35 class JPKDriver (Driver):
36     """Handle JPK ForceRobot's data format.
37     """
38     def __init__(self):
39         super(JPKDriver, self).__init__(name='jpk')
40
41     def is_me(self, path):
42         if os.path.isdir(path):
43             return False
44         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
45             with Closing(zipfile.ZipFile(path, 'r')) as f:
46                 if 'header.properties' not in f.namelist():
47                     return False
48                 with Closing(f.open('header.properties')) as h:
49                     if 'jpk-data-file' in h.read():
50                         return True
51         else:
52             with Closing(open(path, 'r')) as f:
53                 headlines = []
54                 for i in range(3):
55                     headlines.append(f.readline())
56             if headlines[0].startswith('# xPosition') \
57                     and headlines[1].startswith('# yPosition'):
58                 return True
59         return False
60
61     def read(self, path, info=None):
62         if info == None:
63             info = {}
64         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
65             return self._read_zip(path, info)
66         else:
67             return self._read_old(path, info)
68
69     def _read_zip(self, path, info):
70         with Closing(zipfile.ZipFile(path, 'r')) as f:
71             f.path = path
72             zip_info = self._zip_info(f)
73             segments = []
74             for i in range(len([p for p in f.namelist()
75                                 if p.endswith('segment-header.properties')])):
76                 segments.append(self._zip_segment(f, path, info, zip_info, i))
77         if zip_info['file-format-version'] not in ['0.5']:
78             raise NotImplementedError(
79                 'JPK file version %s not supported (yet).'
80                 % zip_info['file-format-version'])
81         for name in ['approach', 'retract']:
82             if len([s for s in segments if s.info['name'] == name]) == 0:
83                 raise ValueError(
84                     'No segment for %s in %s, only %s'
85                     % (name, path, [s.info['name'] for s in segments]))
86         return (segments,
87                 self._zip_translate_params(zip_info,
88                                            segments[0].info['raw info']))
89
90     def _zip_info(self, zipfile):
91         with Closing(zipfile.open('header.properties')) as f:
92             info = self._parse_params(f.readlines())
93             return info
94
95     def _zip_segment(self, zipfile, path, info, zip_info, index):
96         prop_file = zipfile.open(os.path.join(
97                 'segments', str(index), 'segment-header.properties'))
98         prop = self._parse_params(prop_file.readlines())
99         prop_file.close()
100         expected_shape = (int(prop['force-segment-header']['num-points']),)
101         channels = []
102         for chan in prop['channels']['list']:
103             chan_info = prop['channel'][chan]
104             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
105             if channels[-1].shape != expected_shape:
106                     raise NotImplementedError(
107                         'Channel %d:%s in %s has strange shape %s != %s'
108                         % (index, chan, zipfile.path,
109                            channels[-1].shape, expected_shape))
110         d = curve.Data(
111             shape=(len(channels[0]), len(channels)),
112             dtype=channels[0].dtype,
113             info=self._zip_translate_segment_params(prop))
114         for i,chan in enumerate(channels):
115             d[:,i] = chan
116         return self._zip_scale_segment(d, path, info)
117
118     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
119         f = zipfile.open(os.path.join(
120                 'segments', str(segment_index),
121                 chan_info['data']['file']['name']), 'r')
122         assert chan_info['data']['file']['format'] == 'raw', \
123             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
124         assert chan_info['data']['type'] == 'float-data', \
125             'Non-float data format:\n%s' % pprint.pformat(chan_info)
126         data = numpy.frombuffer(
127             buffer(f.read()),
128             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
129         # '>' (big endian) byte order.
130         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
131         #    All forms of raw data are stored in chronological order
132         #    (the order in which they were collected), and the
133         #    individual values are stored in network byte order
134         #    (big-endian). The data type used to store the data is
135         #    specified by the "channel.*.data.type" property, and is
136         #    either short (2 bytes per value), integer (4 bytes), or
137         #    float (4 bytes, IEEE format).
138         f.close()
139         return data
140
141     def _zip_translate_params(self, params, chan_info):
142         info = {
143             'raw info':params,
144             #'time':self._time_from_TODO(raw_info[]),
145             }
146         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
147         assert force_unit == 'N', force_unit
148         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
149         assert force_base == 'distance', force_base
150         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
151         assert dist_unit == 'm', dist_unit
152         force_mult = float(
153             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
154         info['spring constant (N/m)'] = force_mult
155         return info
156
157     def _zip_translate_segment_params(self, params):
158         info = {
159             'raw info':params,
160             'columns':list(params['channels']['list']),
161             'name':params['force-segment-header']['name']['name'],
162             }
163         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
164             info['name'] = info['name'][:-len('-spm')]
165             if info['name'] == 'extend':
166                 info['name'] = 'approach'
167         else:
168             raise NotImplementedError(
169                 'Unrecognized segment type %s' % info['name'])
170         return info
171
172     def _zip_scale_segment(self, segment, path, info):
173         data = curve.Data(
174             shape=segment.shape,
175             dtype=segment.dtype,
176             info={})
177         data[:,:] = segment
178         segment.info['raw data'] = data
179
180         # raw column indices
181         channels = segment.info['raw info']['channels']['list']
182         z_col = channels.index('height')
183         d_col = channels.index('vDeflection')
184         
185         segment = self._zip_scale_channel(
186             segment, z_col, 'calibrated', path, info)
187         segment = self._zip_scale_channel(
188             segment, d_col, 'distance', path, info)
189
190         assert segment.info['columns'][z_col] == 'height (m)', \
191             segment.info['columns'][z_col]
192         assert segment.info['columns'][d_col] == 'vDeflection (m)', \
193             segment.info['columns'][d_col]
194
195         # scaled column indices same as raw column indices,
196         # because columns is a copy of channels.list
197         segment.info['columns'][z_col] = 'z piezo (m)'
198         segment.info['columns'][d_col] = 'deflection (m)'
199         return segment
200
201     def _zip_scale_channel(self, segment, channel, conversion, path, info):
202         channel_name = segment.info['raw info']['channels']['list'][channel]
203         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
204         conversion_info = conversion_set['conversion'][conversion]
205         if conversion_info['base-calibration-slot'] \
206                 != conversion_set['conversions']['base']:
207             # Our conversion is stacked on a previous conversion.  Do
208             # the previous conversion first.
209             segment = self._zip_scale_channel(
210                 segment, channel, conversion_info['base-calibration-slot'],
211                 path=path, info=info)
212         if conversion_info['type'] == 'file':
213             # Michael Haggerty at JPK points out that the conversion
214             # information stored in the external file is reproduced in
215             # the force curve file.  So there is no need to actually
216             # read `conversion_info['file']`.  In fact, the data there
217             # may have changed with future calibrations, while the
218             # information stored directly in conversion_info retains
219             # the calibration information as it was when the experiment
220             # was performed.
221             pass  # Fall through to 'simple' conversion processing.
222         else:
223             assert conversion_info['type'] == 'simple', conversion_info['type']
224         assert conversion_info['scaling']['type'] == 'linear', \
225             conversion_info['scaling']['type']
226         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
227             conversion_info['scaling']['style']
228         multiplier = float(conversion_info['scaling']['multiplier'])
229         offset = float(conversion_info['scaling']['offset'])
230         unit = conversion_info['scaling']['unit']['unit']
231         segment[:,channel] = segment[:,channel] * multiplier + offset
232         segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
233         return segment
234
235     def _parse_params(self, lines):
236         info = {}
237         for line in lines:
238             line = line.strip()
239             if line.startswith('#'):
240                 continue
241             else:
242                 # e.g.: force-segment-header.type=xy-position-segment-header
243                 fields = line.split('=', 1)
244                 assert len(fields) == 2, line
245                 setting = fields[0].split('.')
246                 sub_info = info  # drill down, e.g. info['force-s..']['type']
247                 for s in setting[:-1]:
248                     if s not in sub_info:
249                         sub_info[s] = {}
250                     sub_info = sub_info[s]
251                 if setting[-1] == 'list':  # split a space-delimited list
252                     sub_info[setting[-1]] = fields[1].split(' ')
253                 else:
254                     sub_info[setting[-1]] = fields[1]
255         return info
256
257     def _read_old(self, path, info):
258         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)