1 # Copyright (C) 2008-2010 Massimo Sandal <devicerandom@gmail.com>
2 # W. Trevor King <wking@drexel.edu>
4 # This file is part of Hooke.
6 # Hooke is free software: you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
11 # Hooke is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14 # Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with Hooke. If not, see
18 # <http://www.gnu.org/licenses/>.
20 """Driver for JPK ForceRobot's velocity clamp data format.
29 from .. import curve as curve
30 from .. import experiment as experiment
31 from ..util.util import Closing as Closing
32 from . import Driver as Driver
35 class JPKDriver (Driver):
36 """Handle JPK ForceRobot's data format.
39 super(JPKDriver, self).__init__(name='jpk')
41 def is_me(self, path):
42 if os.path.isdir(path):
44 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
45 with Closing(zipfile.ZipFile(path, 'r')) as f:
46 if 'header.properties' not in f.namelist():
48 with Closing(f.open('header.properties')) as h:
49 if 'jpk-data-file' in h.read():
52 with Closing(open(path, 'r')) as f:
55 headlines.append(f.readline())
56 if headlines[0].startswith('# xPosition') \
57 and headlines[1].startswith('# yPosition'):
61 def read(self, path, info=None):
64 if zipfile.is_zipfile(path): # JPK file versions since at least 0.5
65 return self._read_zip(path, info)
67 return self._read_old(path, info)
69 def _read_zip(self, path, info):
70 with Closing(zipfile.ZipFile(path, 'r')) as f:
72 zip_info = self._zip_info(f)
74 for i in range(len([p for p in f.namelist()
75 if p.endswith('segment-header.properties')])):
76 segments.append(self._zip_segment(f, path, info, zip_info, i))
77 if zip_info['file-format-version'] not in ['0.5']:
78 raise NotImplementedError(
79 'JPK file version %s not supported (yet).'
80 % zip_info['file-format-version'])
81 for name in ['approach', 'retract']:
82 if len([s for s in segments if s.info['name'] == name]) == 0:
84 'No segment for %s in %s, only %s'
85 % (name, path, [s.info['name'] for s in segments]))
86 curve_info = self._zip_translate_params(zip_info,
87 segments[0].info['raw info'])
88 for segment in segments:
89 segment.info['spring constant (N/m)'] = \
90 curve_info['spring constant (N/m)']
91 return (segments,curve_info)
93 def _zip_info(self, zipfile):
94 with Closing(zipfile.open('header.properties')) as f:
95 info = self._parse_params(f.readlines())
98 def _zip_segment(self, zipfile, path, info, zip_info, index):
99 prop_file = zipfile.open(os.path.join(
100 'segments', str(index), 'segment-header.properties'))
101 prop = self._parse_params(prop_file.readlines())
103 expected_shape = (int(prop['force-segment-header']['num-points']),)
105 for chan in prop['channels']['list']:
106 chan_info = prop['channel'][chan]
107 channels.append(self._zip_channel(zipfile, index, chan, chan_info))
108 if channels[-1].shape != expected_shape:
109 raise NotImplementedError(
110 'Channel %d:%s in %s has strange shape %s != %s'
111 % (index, chan, zipfile.path,
112 channels[-1].shape, expected_shape))
114 shape=(len(channels[0]), len(channels)),
115 dtype=channels[0].dtype,
116 info=self._zip_translate_segment_params(prop))
117 for i,chan in enumerate(channels):
119 return self._zip_scale_segment(d, path, info)
121 def _zip_channel(self, zipfile, segment_index, channel_name, chan_info):
122 f = zipfile.open(os.path.join(
123 'segments', str(segment_index),
124 chan_info['data']['file']['name']), 'r')
125 assert chan_info['data']['file']['format'] == 'raw', \
126 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
127 assert chan_info['data']['type'] == 'float-data', \
128 'Non-float data format:\n%s' % pprint.pformat(chan_info)
129 data = numpy.frombuffer(
131 dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
132 # '>' (big endian) byte order.
133 # From version 0.3 of JPKForceSpec.txt in the "Binary data" section:
134 # All forms of raw data are stored in chronological order
135 # (the order in which they were collected), and the
136 # individual values are stored in network byte order
137 # (big-endian). The data type used to store the data is
138 # specified by the "channel.*.data.type" property, and is
139 # either short (2 bytes per value), integer (4 bytes), or
140 # float (4 bytes, IEEE format).
144 def _zip_translate_params(self, params, chan_info):
147 #'time':self._time_from_TODO(raw_info[]),
149 force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit']
150 assert force_unit == 'N', force_unit
151 force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot']
152 assert force_base == 'distance', force_base
153 dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit']
154 assert dist_unit == 'm', dist_unit
156 chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier'])
157 info['spring constant (N/m)'] = force_mult
160 def _zip_translate_segment_params(self, params):
163 'columns':list(params['channels']['list']),
164 'name':params['force-segment-header']['name']['name'],
166 if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']:
167 info['name'] = info['name'][:-len('-spm')]
168 if info['name'] == 'extend':
169 info['name'] = 'approach'
171 raise NotImplementedError(
172 'Unrecognized segment type %s' % info['name'])
175 def _zip_scale_segment(self, segment, path, info):
181 segment.info['raw data'] = data
184 channels = segment.info['raw info']['channels']['list']
185 z_col = channels.index('height')
186 d_col = channels.index('vDeflection')
188 segment = self._zip_scale_channel(
189 segment, z_col, 'calibrated', path, info)
190 segment = self._zip_scale_channel(
191 segment, d_col, 'distance', path, info)
193 assert segment.info['columns'][z_col] == 'height (m)', \
194 segment.info['columns'][z_col]
195 assert segment.info['columns'][d_col] == 'vDeflection (m)', \
196 segment.info['columns'][d_col]
198 # scaled column indices same as raw column indices,
199 # because columns is a copy of channels.list
200 segment.info['columns'][z_col] = 'z piezo (m)'
201 segment.info['columns'][d_col] = 'deflection (m)'
204 def _zip_scale_channel(self, segment, channel, conversion, path, info):
205 channel_name = segment.info['raw info']['channels']['list'][channel]
206 conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set']
207 conversion_info = conversion_set['conversion'][conversion]
208 if conversion_info['base-calibration-slot'] \
209 != conversion_set['conversions']['base']:
210 # Our conversion is stacked on a previous conversion. Do
211 # the previous conversion first.
212 segment = self._zip_scale_channel(
213 segment, channel, conversion_info['base-calibration-slot'],
214 path=path, info=info)
215 if conversion_info['type'] == 'file':
216 # Michael Haggerty at JPK points out that the conversion
217 # information stored in the external file is reproduced in
218 # the force curve file. So there is no need to actually
219 # read `conversion_info['file']`. In fact, the data there
220 # may have changed with future calibrations, while the
221 # information stored directly in conversion_info retains
222 # the calibration information as it was when the experiment
224 pass # Fall through to 'simple' conversion processing.
226 assert conversion_info['type'] == 'simple', conversion_info['type']
227 assert conversion_info['scaling']['type'] == 'linear', \
228 conversion_info['scaling']['type']
229 assert conversion_info['scaling']['style'] == 'offsetmultiplier', \
230 conversion_info['scaling']['style']
231 multiplier = float(conversion_info['scaling']['multiplier'])
232 offset = float(conversion_info['scaling']['offset'])
233 unit = conversion_info['scaling']['unit']['unit']
234 segment[:,channel] = segment[:,channel] * multiplier + offset
235 segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit)
238 def _parse_params(self, lines):
242 if line.startswith('#'):
245 # e.g.: force-segment-header.type=xy-position-segment-header
246 fields = line.split('=', 1)
247 assert len(fields) == 2, line
248 setting = fields[0].split('.')
249 sub_info = info # drill down, e.g. info['force-s..']['type']
250 for s in setting[:-1]:
251 if s not in sub_info:
253 sub_info = sub_info[s]
254 if setting[-1] == 'list': # split a space-delimited list
255 sub_info[setting[-1]] = fields[1].split(' ')
257 sub_info[setting[-1]] = fields[1]
260 def _read_old(self, path, info):
261 raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path)