X-Git-Url: http://git.tremily.us/?p=hooke.git;a=blobdiff_plain;f=hooke%2Fdriver%2Fjpk.py;h=7b33c5b19bbce5b083703d50069222e513e221c6;hp=218e4b897d21aa206b7afb750da7636e47ee4f23;hb=fa02071d288637ba1877b3c4633c3a4dd5a5dc4f;hpb=c46aaa51003a6722a28eea0e49f63ef1bb0e882d diff --git a/hooke/driver/jpk.py b/hooke/driver/jpk.py index 218e4b8..7b33c5b 100644 --- a/hooke/driver/jpk.py +++ b/hooke/driver/jpk.py @@ -3,15 +3,15 @@ # # This file is part of Hooke. # -# Hooke is free software: you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation, either -# version 3 of the License, or (at your option) any later version. +# Hooke is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. # -# Hooke is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. +# Hooke is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General +# Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with Hooke. If not, see @@ -29,6 +29,7 @@ import numpy from .. import curve as curve from .. import experiment as experiment from ..util.util import Closing as Closing +from ..util.si import join_data_label, split_data_label from . import Driver as Driver @@ -39,6 +40,8 @@ class JPKDriver (Driver): super(JPKDriver, self).__init__(name='jpk') def is_me(self, path): + if os.path.isdir(path): + return False if zipfile.is_zipfile(path): # JPK file versions since at least 0.5 with Closing(zipfile.ZipFile(path, 'r')) as f: if 'header.properties' not in f.namelist(): @@ -72,14 +75,21 @@ class JPKDriver (Driver): for i in range(len([p for p in f.namelist() if p.endswith('segment-header.properties')])): segments.append(self._zip_segment(f, path, info, zip_info, i)) - for name in ['approach', 'retract']: - if len([s for s in segments if s.info['name'] == name]) == 0: - raise ValueError( - 'No segment for %s in %s, only %s' - % (name, path, [s.info['name'] for s in segments])) - return (segments, - self._zip_translate_params(zip_info, - segments[0].info['raw info'])) + if zip_info['file-format-version'] not in ['0.5']: + raise NotImplementedError( + 'JPK file version %s not supported (yet).' + % zip_info['file-format-version']) + for name in ['approach', 'retract']: + if len([s for s in segments if s.info['name'] == name]) == 0: + raise ValueError( + 'No segment for %s in %s, only %s' + % (name, path, [s.info['name'] for s in segments])) + curve_info = self._zip_translate_params(zip_info, + segments[0].info['raw info']) + for segment in segments: + segment.info['spring constant (N/m)'] = \ + curve_info['spring constant (N/m)'] + return (segments, curve_info) def _zip_info(self, zipfile): with Closing(zipfile.open('header.properties')) as f: @@ -119,22 +129,32 @@ class JPKDriver (Driver): 'Non-float data format:\n%s' % pprint.pformat(chan_info) data = numpy.frombuffer( buffer(f.read()), - dtype=numpy.dtype(numpy.float32).newbyteorder('>'), - # Is JPK data always big endian? I can't find a config - # setting. The ForceRobot brochure - # http://www.jpk.com/forcerobot300-1.download.6d694150f14773dc76bc0c3a8a6dd0e8.pdf - # lists a PowerPC chip on page 4, under Control - # electronics, and PPCs are usually big endian. - # http://en.wikipedia.org/wiki/PowerPC#Endian_modes - ) + dtype=numpy.dtype(numpy.float32).newbyteorder('>')) + # '>' (big endian) byte order. + # From version 0.3 of JPKForceSpec.txt in the "Binary data" section: + # All forms of raw data are stored in chronological order + # (the order in which they were collected), and the + # individual values are stored in network byte order + # (big-endian). The data type used to store the data is + # specified by the "channel.*.data.type" property, and is + # either short (2 bytes per value), integer (4 bytes), or + # float (4 bytes, IEEE format). f.close() return data def _zip_translate_params(self, params, chan_info): info = { 'raw info':params, + 'filetype':self.name, #'time':self._time_from_TODO(raw_info[]), } + # TODO: distinguish between force clamp and velocity clamp + # experiments. Note that the JPK file format is flexible + # enough to support mixed experiments (i.e. both force clamp + # and velocity clamp segments in a single experiment), but I + # have no idea what sort of analysis such experiments would + # require ;). + info['experiment'] = experiment.VelocityClamp force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit'] assert force_unit == 'N', force_unit force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot'] @@ -171,69 +191,55 @@ class JPKDriver (Driver): # raw column indices channels = segment.info['raw info']['channels']['list'] - z_col = channels.index('height') - d_col = channels.index('vDeflection') - - segment = self._zip_scale_channel( - segment, z_col, 'calibrated', path, info) - segment = self._zip_scale_channel( - segment, d_col, 'distance', path, info) - - assert segment.info['columns'][z_col] == 'height (m)', \ - segment.info['columns'][z_col] - assert segment.info['columns'][d_col] == 'vDeflection (m)', \ - segment.info['columns'][d_col] - - # scaled column indices same as raw column indices, - # because columns is a copy of channels.list - segment.info['columns'][z_col] = 'z piezo (m)' - segment.info['columns'][d_col] = 'deflection (m)' + for i,channel in enumerate(channels): + conversion = None + if channel == 'vDeflection': + conversion = 'distance' + segment = self._zip_scale_channel( + segment, channel, conversion=conversion, path=path, info=info) + name,unit = split_data_label(segment.info['columns'][i]) + if name == 'vDeflection': + assert unit == 'm', segment.info['columns'][i] + segment.info['columns'][i] = join_data_label('deflection', 'm') + elif name == 'height': + assert unit == 'm', segment.info['columns'][i] + segment.info['columns'][i] = join_data_label('z piezo', 'm') return segment - def _zip_scale_channel(self, segment, channel, conversion, path, info): - channel_name = segment.info['raw info']['channels']['list'][channel] + def _zip_scale_channel(self, segment, channel_name, conversion=None, + path=None, info={}): + channel = segment.info['raw info']['channels']['list'].index( + channel_name) conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set'] + if conversion == None: + conversion = conversion_set['conversions']['default'] + if conversion == conversion_set['conversions']['base']: + # Our conversion is the base data. + if conversion != 'volts': + raise NotImplementedError( + 'unknown units for base channel: %s' % conversion) + segment.info['columns'][channel] = join_data_label( + channel_name, 'V') + return segment conversion_info = conversion_set['conversion'][conversion] if conversion_info['base-calibration-slot'] \ != conversion_set['conversions']['base']: # Our conversion is stacked on a previous conversion. Do # the previous conversion first. segment = self._zip_scale_channel( - segment, channel, conversion_info['base-calibration-slot'], - info, path) + segment, channel_name, + conversion_info['base-calibration-slot'], + path=path, info=info) if conversion_info['type'] == 'file': - key = ('%s_%s_to_%s_calibration_file' - % (channel_name, - conversion_info['base-calibration-slot'], - conversion)) - calib_path = conversion_info['file'] - if key in info: - calib_path = os.path.join(os.path.dirname(path), info[key]) - self.logger().debug( - 'Overriding %s -> %s calibration for %s channel: %s' - % (conversion_info['base-calibration-slot'], - conversion, channel_name, calib_path)) - if os.path.exists(calib_path): - with file(calib_path, 'r') as f: - lines = [x.strip() for x in f.readlines()] - f.close() - calib = { # I've emailed JPK to confirm this file format. - 'title':lines[0], - 'multiplier':float(lines[1]), - 'offset':float(lines[2]), - 'unit':lines[3], - 'note':'\n'.join(lines[4:]), - } - segment[:,channel] = (segment[:,channel] * calib['multiplier'] - + calib['offset']) - segment.info['columns'][channel] = ( - '%s (%s)' % (channel_name, calib['unit'])) - return segment - else: - self.logger().warn( - 'Skipping %s -> %s calibration for %s channel. Calibration file %s not found' - % (conversion_info['base-calibration-slot'], - conversion, channel_name, calib_path)) + # Michael Haggerty at JPK points out that the conversion + # information stored in the external file is reproduced in + # the force curve file. So there is no need to actually + # read `conversion_info['file']`. In fact, the data there + # may have changed with future calibrations, while the + # information stored directly in conversion_info retains + # the calibration information as it was when the experiment + # was performed. + pass # Fall through to 'simple' conversion processing. else: assert conversion_info['type'] == 'simple', conversion_info['type'] assert conversion_info['scaling']['type'] == 'linear', \ @@ -244,7 +250,7 @@ class JPKDriver (Driver): offset = float(conversion_info['scaling']['offset']) unit = conversion_info['scaling']['unit']['unit'] segment[:,channel] = segment[:,channel] * multiplier + offset - segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit) + segment.info['columns'][channel] = join_data_label(channel_name, unit) return segment def _parse_params(self, lines): @@ -271,5 +277,3 @@ class JPKDriver (Driver): def _read_old(self, path, info): raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path) - -# LocalWords: JPK ForceRobot's