Ensure unique segment names in the JPK driver.
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             version = zip_info['file-format-version']
77             segments = []
78             for i in range(len([p for p in f.namelist()
79                                 if p.endswith('segment-header.properties')])):
80                 segments.append(self._zip_segment(
81                         f, path, info, zip_info, i, version))
82         if version not in ['0.%d' % i for i in range(13)]:
83             raise NotImplementedError(
84                 'JPK file version %s not supported (yet).' % version)
85         curve_info = self._zip_translate_params(
86             zip_info, segments[0].info['raw info'], version)
87         for segment in segments:  # HACK, should use curve-level spring constant
88             for key in ['spring constant (N/m)',
89                         'z piezo sensitivity (m/V)']:
90                 if key in curve_info:
91                     segment.info['spring constant (N/m)'] = \
92                         curve_info['spring constant (N/m)']
93         names = [segment.info['name'] for segment in segments]
94         for name in set(names):  # ensure unique names
95             count = names.count(name)
96             if count > 1:
97                 i = 0
98                 for j,n in enumerate(names):
99                     if n == name:
100                         segments[j].info['name'] += '-%d' % i
101                         i += 1
102         return (segments, curve_info)
103
104     def _zip_info(self, zipfile):
105         with Closing(zipfile.open('header.properties')) as f:
106             info = self._parse_params(f.readlines())
107             return info
108
109     def _zip_segment(self, zipfile, path, info, zip_info, index, version):
110         prop_file = zipfile.open(os.path.join(
111                 'segments', str(index), 'segment-header.properties'))
112         prop = self._parse_params(prop_file.readlines())
113         prop_file.close()
114         expected_shape = (int(prop['force-segment-header']['num-points']),)
115         channels = []
116         if 'list' not in prop['channels']:
117             prop['channels'] = {'list': prop['channels'].split()}
118         for chan in prop['channels']['list']:
119             chan_info = prop['channel'][chan]
120             channels.append(self._zip_channel(
121                     zipfile, index, chan, chan_info))
122             if channels[-1].shape != expected_shape:
123                 raise NotImplementedError(
124                     'Channel %d:%s in %s has strange shape %s != %s'
125                     % (index, chan, zipfile.path,
126                        channels[-1].shape, expected_shape))
127         if len(channels) > 0:
128             shape = (len(channels[0]), len(channels))
129             dtype = channels[0].dtype
130         else:  # no channels for this data block
131             shape = (0,0)
132             dtype = numpy.float32
133         d = curve.Data(
134             shape=shape,
135             dtype=dtype,
136             info=self._zip_translate_segment_params(prop))
137         for i,chan in enumerate(channels):
138             d[:,i] = chan
139         return self._zip_scale_segment(d, path, info, version)
140
141     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
142         if chan_info['data']['type'] in ['constant-data', 'raster-data']:
143             return self._zip_calculate_channel(chan_info)
144         with Closing(zipfile.open(os.path.join(
145                     'segments', str(segment_index),
146                     chan_info['data']['file']['name']), 'r')) as f:
147             assert chan_info['data']['file']['format'] == 'raw', \
148                 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
149             dtype = self._zip_channel_dtype(chan_info)
150             data = numpy.frombuffer(
151                 buffer(f.read()),
152                 dtype=dtype,)
153         return data
154
155     def _zip_calculate_channel(self, chan_info):
156         type_ = chan_info['data']['type']
157         n = int(chan_info['data']['num-points'])
158         if type_ == 'constant-data':
159             return float(chan_info['data']['value'])*numpy.ones(
160                 shape=(n,),
161                 dtype=numpy.float32)
162         elif type_ == 'raster-data':
163             start = float(chan_info['data']['start'])
164             step = float(chan_info['data']['step'])
165             return numpy.arange(
166                 start=start,
167                 stop=start + step*(n-0.5),
168                 step=step,
169                 dtype=numpy.float32)
170         else:
171             raise ValueError('Unrecognized data format "%s"' % type_)
172
173     def _zip_channel_dtype(self, chan_info):
174         type_ = chan_info['data']['type']
175         if type_ in ['float-data', 'float']:
176             dtype = numpy.dtype(numpy.float32)
177         elif type_ in ['integer-data', 'memory-integer-data']:
178             encoder = chan_info['data']['encoder']['type']
179             if encoder in ['signedinteger', 'signedinteger-limited']:
180                 dtype = numpy.dtype(numpy.int32)
181             elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
182                 dtype = numpy.dtype(numpy.uint32)
183             else:
184                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
185                                  % (encoder, type_))
186         elif type_ in ['short-data', 'short', 'memory-short-data']:
187             encoder = chan_info['data']['encoder']['type']
188             if encoder in ['signedshort', 'signedshort-limited']:
189                 dtype = numpy.dtype(numpy.int16)
190             elif encoder in ['unsignedshort', 'unsignedshort-limited']:
191                 dtype = numpy.dtype(numpy.uint16)
192             else:
193                 raise ValueError('Unrecognized encoder type "%s" for "%s" data'
194                                  % (encoder, type_))
195         else:
196             raise ValueError('Unrecognized data format "%s"' % type_)
197         byte_order = '>'
198         # '>' (big endian) byte order.
199         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
200         #    All forms of raw data are stored in chronological order
201         #    (the order in which they were collected), and the
202         #    individual values are stored in network byte order
203         #    (big-endian). The data type used to store the data is
204         #    specified by the "channel.*.data.type" property, and is
205         #    either short (2 bytes per value), integer (4 bytes), or
206         #    float (4 bytes, IEEE format).
207         return dtype.newbyteorder(byte_order)
208
209     def _zip_translate_params(self, params, chan_info, version):
210         info = {
211             'raw info':params,
212             #'time':self._time_from_TODO(raw_info[]),
213             }
214         if len(chan_info['channels']['list']) == 0:
215             return info
216         force_unit = self._zip_unit(
217             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling'],
218             version)
219         assert force_unit == 'N', force_unit
220         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
221         assert force_base == 'distance', force_base
222         dist_unit = self._zip_unit(
223             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling'],
224             version)
225         assert dist_unit == 'm', dist_unit
226         distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot']
227         assert distance_base == 'volts', distance_base
228         base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base']
229         assert base_conversion == distance_base, base_conversion
230         if 'encoder' in chan_info['channel']['vDeflection']['data']:
231             distance_base_unit = self._zip_unit(
232                 chan_info['channel']['vDeflection']['data']['encoder']['scaling'],
233                 version)
234         else:
235             distance_base_unit = self._zip_unit(
236                 chan_info['channel']['vDeflection']['data'],
237                 version)
238         assert distance_base_unit == 'V', distance_base_unit
239         force_mult = float(
240             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
241         sens_mult = float(
242             chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier'])
243         info['spring constant (N/m)'] = force_mult
244         info['z piezo sensitivity (m/V)'] = sens_mult
245         return info
246
247     def _zip_translate_segment_params(self, params):
248         info = {
249             'raw info': params,
250             'columns': list(params['channels']['list']),
251             'name': self._zip_segment_name(params),
252             }
253         return info
254
255     def _zip_segment_name(self, params):
256         name = params['force-segment-header']['name']['name']
257         if name.endswith('-spm'):
258             name = name[:-len('-spm')]
259         if name == 'extend':
260             name = 'approach'
261         elif name.startswith('pause-at-'):
262             name = 'pause'
263         return name
264
265     def _zip_scale_segment(self, segment, path, info, version):
266         data = curve.Data(
267             shape=segment.shape,
268             dtype=segment.dtype,
269             info={})
270         data[:,:] = segment
271         segment.info['raw data'] = data
272
273         # raw column indices
274         channels = segment.info['raw info']['channels']['list']
275         for i,channel in enumerate(channels):
276             conversion = None
277             if channel == 'vDeflection':
278                 conversion = 'distance'
279             segment = self._zip_scale_channel(
280                 segment, channel, conversion=conversion,
281                 path=path, info=info, version=version)
282             name,unit = split_data_label(segment.info['columns'][i])
283             if name == 'vDeflection':
284                 assert unit == 'm', segment.info['columns'][i]
285                 segment.info['columns'][i] = join_data_label('deflection', 'm')
286                 # Invert because deflection voltage increases as the
287                 # tip moves away from the surface, but it makes more
288                 # sense to me to have it increase as it moves toward
289                 # the surface (positive tension on the protein chain).
290                 segment[:,i] *= -1
291             elif name == 'height':
292                 assert unit == 'm', segment.info['columns'][i]
293                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
294         return segment
295
296     def _zip_scale_channel(self, segment, channel_name,
297                            conversion=None, path=None, info={}, version=None):
298         channel = segment.info['raw info']['channels']['list'].index(
299             channel_name)
300         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
301         if conversion == None:
302             conversion = conversion_set['conversions']['default']
303         if conversion == conversion_set['conversions']['base']:
304             segment.info['columns'][channel] = join_data_label(
305                 channel_name,
306                 self._zip_unit(
307                     segment.info['raw info']['channel'][channel_name]['data'],
308                     version))
309             return segment
310         conversion_info = conversion_set['conversion'][conversion]
311         if conversion_info['base-calibration-slot'] \
312                 != conversion_set['conversions']['base']:
313             # Our conversion is stacked on a previous conversion.  Do
314             # the previous conversion first.
315             segment = self._zip_scale_channel(
316                 segment, channel_name,
317                 conversion_info['base-calibration-slot'],
318                 path=path, info=info, version=version)
319         if conversion_info['type'] == 'file':
320             # Michael Haggerty at JPK points out that the conversion
321             # information stored in the external file is reproduced in
322             # the force curve file.  So there is no need to actually
323             # read `conversion_info['file']`.  In fact, the data there
324             # may have changed with future calibrations, while the
325             # information stored directly in conversion_info retains
326             # the calibration information as it was when the experiment
327             # was performed.
328             pass  # Fall through to 'simple' conversion processing.
329         else:
330             assert conversion_info['type'] == 'simple', conversion_info['type']
331         assert conversion_info['scaling']['type'] == 'linear', \
332             conversion_info['scaling']['type']
333         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
334             conversion_info['scaling']['style']
335         multiplier = float(conversion_info['scaling']['multiplier'])
336         offset = float(conversion_info['scaling']['offset'])
337         unit = self._zip_unit(conversion_info['scaling'], version)
338         segment[:,channel] = segment[:,channel] * multiplier + offset
339         segment.info['columns'][channel] = join_data_label(channel_name, unit)
340         return segment
341
342     def _zip_unit(self, conversion_info, version):
343         if version in ['0.%d' % i for i in range(3)]:
344             return conversion_info['unit']
345         else:
346             return conversion_info['unit']['unit']
347
348     def _parse_params(self, lines):
349         info = {}
350         for line in lines:
351             line = line.strip()
352             if line.startswith('#'):
353                 continue
354             else:
355                 # e.g.: force-segment-header.type=xy-position-segment-header
356                 fields = line.split('=', 1)
357                 assert len(fields) == 2, line
358                 setting = fields[0].split('.')
359                 sub_info = info  # drill down, e.g. info['force-s..']['type']
360                 for s in setting[:-1]:
361                     if s not in sub_info:
362                         sub_info[s] = {}
363                     sub_info = sub_info[s]
364                 if setting[-1] == 'list':  # split a space-delimited list
365                     if fields[1]:
366                         sub_info[setting[-1]] = fields[1].split(' ')
367                     else:
368                         sub_info[setting[-1]] = []
369                 else:
370                     sub_info[setting[-1]] = fields[1]
371         return info
372
373     def _read_old(self, path, info):
374         raise NotImplementedError(
375             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
376             "use JPK's `out2jpk-force` script to convert your old files "
377             "to a more recent format before loading them with Hooke.")