ee78f35350e6011e4e9825cad028d36e69029a6b
[hooke.git] / hooke / driver / jpk.py
1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 #                         W. Trevor King <wking@drexel.edu>
3 #
4 # This file is part of Hooke.
5 #
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Driver for JPK ForceRobot's velocity clamp data format.
21
22 This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12.
23 The specs are freely available from JPK, just email support@jpk.com.
24 """
25
26 import os.path
27 import pprint
28 import zipfile
29
30 import numpy
31
32 from .. import curve as curve
33 from ..util.util import Closing as Closing
34 from ..util.si import join_data_label, split_data_label
35 from . import Driver as Driver
36
37
38 class JPKDriver (Driver):
39     """Handle JPK ForceRobot's data format.
40     """
41     def __init__(self):
42         super(JPKDriver, self).__init__(name='jpk')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
48             with Closing(zipfile.ZipFile(path, 'r')) as f:
49                 if 'header.properties' not in f.namelist():
50                     return False
51                 with Closing(f.open('header.properties')) as h:
52                     if 'jpk-data-file' in h.read():
53                         return True
54         else:
55             with Closing(open(path, 'r')) as f:
56                 headlines = []
57                 for i in range(3):
58                     headlines.append(f.readline())
59             if headlines[0].startswith('# xPosition') \
60                     and headlines[1].startswith('# yPosition'):
61                 return True
62         return False
63
64     def read(self, path, info=None):
65         if info == None:
66             info = {}
67         if zipfile.is_zipfile(path):  # JPK file versions since at least 0.5
68             return self._read_zip(path, info)
69         else:
70             return self._read_old(path, info)
71
72     def _read_zip(self, path, info):
73         with Closing(zipfile.ZipFile(path, 'r')) as f:
74             f.path = path
75             zip_info = self._zip_info(f)
76             segments = []
77             for i in range(len([p for p in f.namelist()
78                                 if p.endswith('segment-header.properties')])):
79                 segments.append(self._zip_segment(f, path, info, zip_info, i))
80         if zip_info['file-format-version'] not in ['0.%d' % i
81                                                    for i in range(12)]:
82             raise NotImplementedError(
83                 'JPK file version %s not supported (yet).'
84                 % zip_info['file-format-version'])
85         curve_info = self._zip_translate_params(zip_info,
86                                                 segments[0].info['raw info'])
87         for segment in segments:  # HACK, should use curve-level spring constant
88             segment.info['spring constant (N/m)'] = \
89                 curve_info['spring constant (N/m)']
90         return (segments, curve_info)
91
92     def _zip_info(self, zipfile):
93         with Closing(zipfile.open('header.properties')) as f:
94             info = self._parse_params(f.readlines())
95             return info
96
97     def _zip_segment(self, zipfile, path, info, zip_info, index):
98         prop_file = zipfile.open(os.path.join(
99                 'segments', str(index), 'segment-header.properties'))
100         prop = self._parse_params(prop_file.readlines())
101         prop_file.close()
102         expected_shape = (int(prop['force-segment-header']['num-points']),)
103         channels = []
104         if 'list' not in prop['channels']:
105             prop['channels'] = {'list': prop['channels'].split()}
106         for chan in prop['channels']['list']:
107             chan_info = prop['channel'][chan]
108             channels.append(self._zip_channel(
109                     zipfile, index, chan, chan_info))
110             if channels[-1].shape != expected_shape:
111                 raise NotImplementedError(
112                     'Channel %d:%s in %s has strange shape %s != %s'
113                     % (index, chan, zipfile.path,
114                        channels[-1].shape, expected_shape))
115         if len(channels) > 0:
116             shape = (len(channels[0]), len(channels))
117             dtype = channels[0].dtype
118         else:  # no channels for this data block
119             shape = (0,0)
120             dtype = numpy.float32
121         d = curve.Data(
122             shape=shape,
123             dtype=dtype,
124             info=self._zip_translate_segment_params(prop))
125         for i,chan in enumerate(channels):
126             d[:,i] = chan
127         return self._zip_scale_segment(d, path, info)
128
129     def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
130         f = zipfile.open(os.path.join(
131                 'segments', str(segment_index),
132                 chan_info['data']['file']['name']), 'r')
133         assert chan_info['data']['file']['format'] == 'raw', \
134             'Non-raw data format:\n%s' % pprint.pformat(chan_info)
135         assert chan_info['data']['type'] == 'float-data', \
136             'Non-float data format:\n%s' % pprint.pformat(chan_info)
137         data = numpy.frombuffer(
138             buffer(f.read()),
139             dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
140         # '>' (big endian) byte order.
141         # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
142         #    All forms of raw data are stored in chronological order
143         #    (the order in which they were collected), and the
144         #    individual values are stored in network byte order
145         #    (big-endian). The data type used to store the data is
146         #    specified by the "channel.*.data.type" property, and is
147         #    either short (2 bytes per value), integer (4 bytes), or
148         #    float (4 bytes, IEEE format).
149         f.close()
150         return data
151
152     def _zip_translate_params(self, params, chan_info):
153         info = {
154             'raw info':params,
155             #'time':self._time_from_TODO(raw_info[]),
156             }
157         force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
158         assert force_unit == 'N', force_unit
159         force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
160         assert force_base == 'distance', force_base
161         dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
162         assert dist_unit == 'm', dist_unit
163         force_mult = float(
164             chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
165         info['spring constant (N/m)'] = force_mult
166         return info
167
168     def _zip_translate_segment_params(self, params):
169         info = {
170             'raw info':params,
171             'columns':list(params['channels']['list']),
172             'name':params['force-segment-header']['name']['name'],
173             }
174         if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
175             info['name'] = info['name'][:-len('-spm')]
176             if info['name'] == 'extend':
177                 info['name'] = 'approach'
178         else:
179             raise NotImplementedError(
180                 'Unrecognized segment type %s' % info['name'])
181         return info
182
183     def _zip_scale_segment(self, segment, path, info):
184         data = curve.Data(
185             shape=segment.shape,
186             dtype=segment.dtype,
187             info={})
188         data[:,:] = segment
189         segment.info['raw data'] = data
190
191         # raw column indices
192         channels = segment.info['raw info']['channels']['list']
193         for i,channel in enumerate(channels):
194             conversion = None
195             if channel == 'vDeflection':
196                 conversion = 'distance'
197             segment = self._zip_scale_channel(
198                 segment, channel, conversion=conversion, path=path, info=info)
199             name,unit = split_data_label(segment.info['columns'][i])
200             if name == 'vDeflection':
201                 assert unit == 'm', segment.info['columns'][i]
202                 segment.info['columns'][i] = join_data_label('deflection', 'm')
203                 # Invert because deflection voltage increases as the
204                 # tip moves away from the surface, but it makes more
205                 # sense to me to have it increase as it moves toward
206                 # the surface (positive tension on the protein chain).
207                 segment[:,i] *= -1
208             elif name == 'height':
209                 assert unit == 'm', segment.info['columns'][i]
210                 segment.info['columns'][i] = join_data_label('z piezo', 'm')
211         return segment
212
213     def _zip_scale_channel(self, segment, channel_name, conversion=None,
214                            path=None, info={}):
215         channel = segment.info['raw info']['channels']['list'].index(
216             channel_name)
217         conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
218         if conversion == None:
219             conversion = conversion_set['conversions']['default']
220         if conversion == conversion_set['conversions']['base']:
221             # Our conversion is the base data.
222             if conversion != 'volts':
223                 raise NotImplementedError(
224                     'unknown units for base channel: %s' % conversion)
225             segment.info['columns'][channel] = join_data_label(
226                 channel_name, 'V')
227             return segment
228         conversion_info = conversion_set['conversion'][conversion]
229         if conversion_info['base-calibration-slot'] \
230                 != conversion_set['conversions']['base']:
231             # Our conversion is stacked on a previous conversion.  Do
232             # the previous conversion first.
233             segment = self._zip_scale_channel(
234                 segment, channel_name,
235                 conversion_info['base-calibration-slot'],
236                 path=path, info=info)
237         if conversion_info['type'] == 'file':
238             # Michael Haggerty at JPK points out that the conversion
239             # information stored in the external file is reproduced in
240             # the force curve file.  So there is no need to actually
241             # read `conversion_info['file']`.  In fact, the data there
242             # may have changed with future calibrations, while the
243             # information stored directly in conversion_info retains
244             # the calibration information as it was when the experiment
245             # was performed.
246             pass  # Fall through to 'simple' conversion processing.
247         else:
248             assert conversion_info['type'] == 'simple', conversion_info['type']
249         assert conversion_info['scaling']['type'] == 'linear', \
250             conversion_info['scaling']['type']
251         assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
252             conversion_info['scaling']['style']
253         multiplier = float(conversion_info['scaling']['multiplier'])
254         offset = float(conversion_info['scaling']['offset'])
255         unit = conversion_info['scaling']['unit']['unit']
256         segment[:,channel] = segment[:,channel] * multiplier + offset
257         segment.info['columns'][channel] = join_data_label(channel_name, unit)
258         return segment
259
260     def _parse_params(self, lines):
261         info = {}
262         for line in lines:
263             line = line.strip()
264             if line.startswith('#'):
265                 continue
266             else:
267                 # e.g.: force-segment-header.type=xy-position-segment-header
268                 fields = line.split('=', 1)
269                 assert len(fields) == 2, line
270                 setting = fields[0].split('.')
271                 sub_info = info  # drill down, e.g. info['force-s..']['type']
272                 for s in setting[:-1]:
273                     if s not in sub_info:
274                         sub_info[s] = {}
275                     sub_info = sub_info[s]
276                 if setting[-1] == 'list':  # split a space-delimited list
277                     sub_info[setting[-1]] = fields[1].split(' ')
278                 else:
279                     sub_info[setting[-1]] = fields[1]
280         return info
281
282     def _read_old(self, path, info):
283         raise NotImplementedError(
284             "Early JPK files (pre-zip) are not supported by Hooke.  Please "
285             "use JPK's `out2jpk-force` script to convert your old files "
286             "to a more recent format before loading them with Hooke.")