Remove debugging printout from JPK driver
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21 """
22
23 import os.path
24 import pprint
25 import zipfile
26
27 import numpy
28
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from ..util.si import join_data_label, split_data_label
33 from . import Driver as Driver
34
35
36 class JPKDriver (Driver):
37     """Handle JPK ForceRobot's data format.
38     """
39     def __init__(self):
40         super(JPKDriver, self).__init__(name='jpk')
41
42     def is_me(self, path):
43         if os.path.isdir(path):
44             return False
45         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
46             with Closing(zipfile.ZipFile(path, 'r')) as f:
47                 if 'header.properties' not in f.namelist():
48                     return False
49                 with Closing(f.open('header.properties')) as h:
50                     if 'jpk-data-file' in h.read():
51                         return True
52         else:
53             with Closing(open(path, 'r')) as f:
54                 headlines = []
55                 for i in range(3):
56                     headlines.append(f.readline())
57             if headlines[0].startswith('# xPosition') \
58                     and headlines[1].startswith('# yPosition'):
59                 return True
60         return False
61
62     def read(self, path, info=None):
63         if info == None:
64             info = {}
65         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
66             return self._read_zip(path, info)
67         else:
68             return self._read_old(path, info)
69
70     def _read_zip(self, path, info):
71         with Closing(zipfile.ZipFile(path, 'r')) as f:
72             f.path = path
73             zip_info = self._zip_info(f)
74             segments = []
75             for i in range(len([p for p in f.namelist()
76                                 if p.endswith('segment-header.properties')])):
77                 segments.append(self._zip_segment(f, path, info, zip_info, i))
78         if zip_info['file-format-version'] not in ['0.5']:
79             raise NotImplementedError(
80                 'JPK file version %s not supported (yet).'
81                 % zip_info['file-format-version'])
82         for name in ['approach', 'retract']:
83             if len([s for s in segments if s.info['name'] == name]) == 0:
84                 raise ValueError(
85                     'No segment for %s in %s, only %s'
86                     % (name, path, [s.info['name'] for s in segments]))
87         curve_info = self._zip_translate_params(zip_info,
88                                                 segments[0].info['raw info'])
89         for segment in segments:
90             segment.info['spring constant (N/m)'] = \
91                 curve_info['spring constant (N/m)']
92         return (segments, curve_info)
93
94     def _zip_info(self, zipfile):
95         with Closing(zipfile.open('header.properties')) as f:
96             info = self._parse_params(f.readlines())
97             return info
98
99     def _zip_segment(self, zipfile, path, info, zip_info, index):
100         prop_file = zipfile.open(os.path.join(
101                 'segments', str(index), 'segment-header.properties'))
102         prop = self._parse_params(prop_file.readlines())
103         prop_file.close()
104         expected_shape = (int(prop['force-segment-header']['num-points']),)
105         channels = []
106         for chan in prop['channels']['list']:
107             chan_info = prop['channel'][chan]
108             channels.append(self._zip_channel(zipfile, index, chan, chan_info))
109             if channels[-1].shape != expected_shape:
110                     raise NotImplementedError(
111                         'Channel %d:%s in %s has strange shape %s != %s'
112                         % (index, chan, zipfile.path,
113                            channels[-1].shape, expected_shape))
114         d = curve.Data(
115             shape=(len(channels[0]), len(channels)),
116             dtype=channels[0].dtype,
117             info=self._zip_translate_segment_params(prop))
118         for i,chan in enumerate(channels):
119             d[:,i] = chan
120         return self._zip_scale_segment(d, path, info)
121
122     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
123         f = zipfile.open(os.path.join(
124                 'segments', str(segment_index),
125                 chan_info['data']['file']['name']), 'r')
126         assert chan_info['data']['file']['format'] == 'raw', \
127             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
128         assert chan_info['data']['type'] == 'float-data', \
129             'Non-float data format:\n%s' % pprint.pformat(chan_info)
130         data = numpy.frombuffer(
131             buffer(f.read()),
132             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
133         # '>' (big endian) byte order.
134         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
135         #    All forms of raw data are stored in chronological order
136         #    (the order in which they were collected), and the
137         #    individual values are stored in network byte order
138         #    (big-endian). The data type used to store the data is
139         #    specified by the "channel.*.data.type" property, and is
140         #    either short (2 bytes per value), integer (4 bytes), or
141         #    float (4 bytes, IEEE format).
142         f.close()
143         return data
144
145     def _zip_translate_params(self, params, chan_info):
146         info = {
147             'raw info':params,
148             'filetype':self.name,
149             #'time':self._time_from_TODO(raw_info[]),
150             }
151         # TODO: distinguish between force clamp and velocity clamp
152         # experiments.  Note that the JPK file format is flexible
153         # enough to support mixed experiments (i.e. both force clamp
154         # and velocity clamp segments in a single experiment), but I
155         # have no idea what sort of analysis such experiments would
156         # require ;).
157         info['experiment'] = experiment.VelocityClamp
158         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
159         assert force_unit == 'N', force_unit
160         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
161         assert force_base == 'distance', force_base
162         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
163         assert dist_unit == 'm', dist_unit
164         force_mult = float(
165             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
166         info['spring constant (N/m)'] = force_mult
167         return info
168
169     def _zip_translate_segment_params(self, params):
170         info = {
171             'raw info':params,
172             'columns':list(params['channels']['list']),
173             'name':params['force-segment-header']['name']['name'],
174             }
175         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
176             info['name'] = info['name'][:-len('-spm')]
177             if info['name'] == 'extend':
178                 info['name'] = 'approach'
179         else:
180             raise NotImplementedError(
181                 'Unrecognized segment type %s' % info['name'])
182         return info
183
184     def _zip_scale_segment(self, segment, path, info):
185         data = curve.Data(
186             shape=segment.shape,
187             dtype=segment.dtype,
188             info={})
189         data[:,:] = segment
190         segment.info['raw data'] = data
191
192         # raw column indices
193         channels = segment.info['raw info']['channels']['list']
194         for i,channel in enumerate(channels):
195             conversion = None
196             if channel == 'vDeflection':
197                 conversion = 'distance'
198             segment = self._zip_scale_channel(
199                 segment, channel, conversion=conversion, path=path, info=info)
200             name,unit = split_data_label(segment.info['columns'][i])
201             if name == 'vDeflection':
202                 assert unit == 'm', segment.info['columns'][i]
203                 segment.info['columns'][i] = join_data_label('deflection', 'm')
204             elif name == 'height':
205                 assert unit == 'm', segment.info['columns'][i]
206                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
207         return segment
208
209     def _zip_scale_channel(self, segment, channel_name, conversion=None,
210                            path=None, info={}):
211         channel = segment.info['raw info']['channels']['list'].index(
212             channel_name)
213         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
214         if conversion == None:
215             conversion = conversion_set['conversions']['default']
216         if conversion == conversion_set['conversions']['base']:
217             # Our conversion is the base data.
218             if conversion != 'volts':
219                 raise NotImplementedError(
220                     'unknown units for base channel: %s' % conversion)
221             segment.info['columns'][channel] = join_data_label(
222                 channel_name, 'V')
223             return segment
224         conversion_info = conversion_set['conversion'][conversion]
225         if conversion_info['base-calibration-slot'] \
226                 != conversion_set['conversions']['base']:
227             # Our conversion is stacked on a previous conversion.  Do
228             # the previous conversion first.
229             segment = self._zip_scale_channel(
230                 segment, channel_name,
231                 conversion_info['base-calibration-slot'],
232                 path=path, info=info)
233         if conversion_info['type'] == 'file':
234             # Michael Haggerty at JPK points out that the conversion
235             # information stored in the external file is reproduced in
236             # the force curve file.  So there is no need to actually
237             # read `conversion_info['file']`.  In fact, the data there
238             # may have changed with future calibrations, while the
239             # information stored directly in conversion_info retains
240             # the calibration information as it was when the experiment
241             # was performed.
242             pass  # Fall through to 'simple' conversion processing.
243         else:
244             assert conversion_info['type'] == 'simple', conversion_info['type']
245         assert conversion_info['scaling']['type'] == 'linear', \
246             conversion_info['scaling']['type']
247         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
248             conversion_info['scaling']['style']
249         multiplier = float(conversion_info['scaling']['multiplier'])
250         offset = float(conversion_info['scaling']['offset'])
251         unit = conversion_info['scaling']['unit']['unit']
252         segment[:,channel] = segment[:,channel] * multiplier + offset
253         segment.info['columns'][channel] = join_data_label(channel_name, unit)
254         return segment
255
256     def _parse_params(self, lines):
257         info = {}
258         for line in lines:
259             line = line.strip()
260             if line.startswith('#'):
261                 continue
262             else:
263                 # e.g.: force-segment-header.type=xy-position-segment-header
264                 fields = line.split('=', 1)
265                 assert len(fields) == 2, line
266                 setting = fields[0].split('.')
267                 sub_info = info  # drill down, e.g. info['force-s..']['type']
268                 for s in setting[:-1]:
269                     if s not in sub_info:
270                         sub_info[s] = {}
271                     sub_info = sub_info[s]
272                 if setting[-1] == 'list':  # split a space-delimited list
273                     sub_info[setting[-1]] = fields[1].split(' ')
274                 else:
275                     sub_info[setting[-1]] = fields[1]
276         return info
277
278     def _read_old(self, path, info):
279         raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)