X-Git-Url: http://git.tremily.us/?p=hooke.git;a=blobdiff_plain;f=hooke%2Fdriver%2Fjpk.py;h=afa95704251c9fbb7a8ed214b726a38a2d45b55e;hp=3cbe1df8a7815f1ab68b04623f27e6f19470e2e7;hb=1e1315aa80b01c7442fc5792d74b7e60d08a225e;hpb=1c9f3cc24c9a66b5ecc8f10b2155db2f030efb16 diff --git a/hooke/driver/jpk.py b/hooke/driver/jpk.py index 3cbe1df..afa9570 100644 --- a/hooke/driver/jpk.py +++ b/hooke/driver/jpk.py @@ -18,6 +18,9 @@ # . """Driver for JPK ForceRobot's velocity clamp data format. + +This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12. +The specs are freely available from JPK, just email support@jpk.com. """ import os.path @@ -27,7 +30,6 @@ import zipfile import numpy from .. import curve as curve -from .. import experiment as experiment from ..util.util import Closing as Closing from ..util.si import join_data_label, split_data_label from . import Driver as Driver @@ -71,24 +73,24 @@ class JPKDriver (Driver): with Closing(zipfile.ZipFile(path, 'r')) as f: f.path = path zip_info = self._zip_info(f) + version = zip_info['file-format-version'] segments = [] for i in range(len([p for p in f.namelist() if p.endswith('segment-header.properties')])): - segments.append(self._zip_segment(f, path, info, zip_info, i)) - if zip_info['file-format-version'] not in ['0.3', '0.4', '0.5']: + segments.append(self._zip_segment( + f, path, info, zip_info, i, version)) + if version not in ['0.%d' % i for i in range(13)]: raise NotImplementedError( - 'JPK file version %s not supported (yet).' - % zip_info['file-format-version']) - for name in ['approach', 'retract']: - if len([s for s in segments if s.info['name'] == name]) == 0: - raise ValueError( - 'No segment for %s in %s, only %s' - % (name, path, [s.info['name'] for s in segments])) - curve_info = self._zip_translate_params(zip_info, - segments[0].info['raw info']) - for segment in segments: - segment.info['spring constant (N/m)'] = \ - curve_info['spring constant (N/m)'] + 'JPK file version %s not supported (yet).' % version) + curve_info = self._zip_translate_params( + zip_info, segments[0].info['raw info'], version) + for segment in segments: # HACK, should use curve-level spring constant + for key in ['spring constant (N/m)', + 'z piezo sensitivity (m/V)']: + if key in curve_info: + segment.info['spring constant (N/m)'] = \ + curve_info['spring constant (N/m)'] + return (segments, curve_info) def _zip_info(self, zipfile): @@ -96,40 +98,95 @@ class JPKDriver (Driver): info = self._parse_params(f.readlines()) return info - def _zip_segment(self, zipfile, path, info, zip_info, index): + def _zip_segment(self, zipfile, path, info, zip_info, index, version): prop_file = zipfile.open(os.path.join( 'segments', str(index), 'segment-header.properties')) prop = self._parse_params(prop_file.readlines()) prop_file.close() expected_shape = (int(prop['force-segment-header']['num-points']),) channels = [] + if 'list' not in prop['channels']: + prop['channels'] = {'list': prop['channels'].split()} for chan in prop['channels']['list']: chan_info = prop['channel'][chan] - channels.append(self._zip_channel(zipfile, index, chan, chan_info)) + channels.append(self._zip_channel( + zipfile, index, chan, chan_info)) if channels[-1].shape != expected_shape: - raise NotImplementedError( - 'Channel %d:%s in %s has strange shape %s != %s' - % (index, chan, zipfile.path, - channels[-1].shape, expected_shape)) + raise NotImplementedError( + 'Channel %d:%s in %s has strange shape %s != %s' + % (index, chan, zipfile.path, + channels[-1].shape, expected_shape)) + if len(channels) > 0: + shape = (len(channels[0]), len(channels)) + dtype = channels[0].dtype + else: # no channels for this data block + shape = (0,0) + dtype = numpy.float32 d = curve.Data( - shape=(len(channels[0]), len(channels)), - dtype=channels[0].dtype, + shape=shape, + dtype=dtype, info=self._zip_translate_segment_params(prop)) for i,chan in enumerate(channels): d[:,i] = chan - return self._zip_scale_segment(d, path, info) + return self._zip_scale_segment(d, path, info, version) def _zip_channel(self, zipfile, segment_index, channel_name, chan_info): - f = zipfile.open(os.path.join( - 'segments', str(segment_index), - chan_info['data']['file']['name']), 'r') - assert chan_info['data']['file']['format'] == 'raw', \ - 'Non-raw data format:\n%s' % pprint.pformat(chan_info) - assert chan_info['data']['type'] == 'float-data', \ - 'Non-float data format:\n%s' % pprint.pformat(chan_info) - data = numpy.frombuffer( - buffer(f.read()), - dtype=numpy.dtype(numpy.float32).newbyteorder('>')) + if chan_info['data']['type'] in ['constant-data', 'raster-data']: + return self._zip_calculate_channel(chan_info) + with Closing(zipfile.open(os.path.join( + 'segments', str(segment_index), + chan_info['data']['file']['name']), 'r')) as f: + assert chan_info['data']['file']['format'] == 'raw', \ + 'Non-raw data format:\n%s' % pprint.pformat(chan_info) + dtype = self._zip_channel_dtype(chan_info) + data = numpy.frombuffer( + buffer(f.read()), + dtype=dtype,) + return data + + def _zip_calculate_channel(self, chan_info): + type_ = chan_info['data']['type'] + n = int(chan_info['data']['num-points']) + if type_ == 'constant-data': + return float(chan_info['data']['value'])*numpy.ones( + shape=(n,), + dtype=numpy.float32) + elif type_ == 'raster-data': + start = float(chan_info['data']['start']) + step = float(chan_info['data']['step']) + return numpy.arange( + start=start, + stop=start + step*(n-0.5), + step=step, + dtype=numpy.float32) + else: + raise ValueError('Unrecognized data format "%s"' % type_) + + def _zip_channel_dtype(self, chan_info): + type_ = chan_info['data']['type'] + if type_ in ['float-data', 'float']: + dtype = numpy.dtype(numpy.float32) + elif type_ in ['integer-data', 'memory-integer-data']: + encoder = chan_info['data']['encoder']['type'] + if encoder in ['signedinteger', 'signedinteger-limited']: + dtype = numpy.dtype(numpy.int32) + elif encoder in ['unsignedinteger', 'unsignedinteger-limited']: + dtype = numpy.dtype(numpy.uint32) + else: + raise ValueError('Unrecognized encoder type "%s" for "%s" data' + % (encoder, type_)) + elif type_ in ['short-data', 'short', 'memory-short-data']: + encoder = chan_info['data']['encoder']['type'] + if encoder in ['signedshort', 'signedshort-limited']: + dtype = numpy.dtype(numpy.int16) + elif encoder in ['unsignedshort', 'unsignedshort-limited']: + dtype = numpy.dtype(numpy.uint16) + else: + raise ValueError('Unrecognized encoder type "%s" for "%s" data' + % (encoder, type_)) + else: + raise ValueError('Unrecognized data format "%s"' % type_) + byte_order = '>' # '>' (big endian) byte order. # From version 0.3 of JPKForceSpec.txt in the "Binary data" section: # All forms of raw data are stored in chronological order @@ -139,49 +196,60 @@ class JPKDriver (Driver): # specified by the "channel.*.data.type" property, and is # either short (2 bytes per value), integer (4 bytes), or # float (4 bytes, IEEE format). - f.close() - return data + return dtype.newbyteorder(byte_order) - def _zip_translate_params(self, params, chan_info): + def _zip_translate_params(self, params, chan_info, version): info = { 'raw info':params, - 'filetype':self.name, #'time':self._time_from_TODO(raw_info[]), } - # TODO: distinguish between force clamp and velocity clamp - # experiments. Note that the JPK file format is flexible - # enough to support mixed experiments (i.e. both force clamp - # and velocity clamp segments in a single experiment), but I - # have no idea what sort of analysis such experiments would - # require ;). - info['experiment'] = experiment.VelocityClamp() - force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit'] + if len(chan_info['channels']['list']) == 0: + return info + force_unit = self._zip_unit( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling'], + version) assert force_unit == 'N', force_unit force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot'] assert force_base == 'distance', force_base - dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit'] + dist_unit = self._zip_unit( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling'], + version) assert dist_unit == 'm', dist_unit + distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot'] + assert distance_base == 'volts', distance_base + base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base'] + assert base_conversion == distance_base, base_conversion + distance_base_unit = self._zip_unit( + chan_info['channel']['vDeflection']['data'], + version) + assert distance_base_unit == 'V', distance_base_unit force_mult = float( chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier']) + sens_mult = float( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier']) info['spring constant (N/m)'] = force_mult + info['z piezo sensitivity (m/V)'] = sens_mult return info def _zip_translate_segment_params(self, params): info = { - 'raw info':params, - 'columns':list(params['channels']['list']), - 'name':params['force-segment-header']['name']['name'], + 'raw info': params, + 'columns': list(params['channels']['list']), + 'name': self._zip_segment_name(params), } - if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']: - info['name'] = info['name'][:-len('-spm')] - if info['name'] == 'extend': - info['name'] = 'approach' - else: - raise NotImplementedError( - 'Unrecognized segment type %s' % info['name']) return info - def _zip_scale_segment(self, segment, path, info): + def _zip_segment_name(self, params): + name = params['force-segment-header']['name']['name'] + if name.endswith('-spm'): + name = name[:-len('-spm')] + if name == 'extend': + name = 'approach' + elif name.startswith('pause-at-'): + name = 'pause' + return name + + def _zip_scale_segment(self, segment, path, info, version): data = curve.Data( shape=segment.shape, dtype=segment.dtype, @@ -196,7 +264,8 @@ class JPKDriver (Driver): if channel == 'vDeflection': conversion = 'distance' segment = self._zip_scale_channel( - segment, channel, conversion=conversion, path=path, info=info) + segment, channel, conversion=conversion, + path=path, info=info, version=version) name,unit = split_data_label(segment.info['columns'][i]) if name == 'vDeflection': assert unit == 'm', segment.info['columns'][i] @@ -211,20 +280,19 @@ class JPKDriver (Driver): segment.info['columns'][i] = join_data_label('z piezo', 'm') return segment - def _zip_scale_channel(self, segment, channel_name, conversion=None, - path=None, info={}): + def _zip_scale_channel(self, segment, channel_name, + conversion=None, path=None, info={}, version=None): channel = segment.info['raw info']['channels']['list'].index( channel_name) conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set'] if conversion == None: conversion = conversion_set['conversions']['default'] if conversion == conversion_set['conversions']['base']: - # Our conversion is the base data. - if conversion != 'volts': - raise NotImplementedError( - 'unknown units for base channel: %s' % conversion) segment.info['columns'][channel] = join_data_label( - channel_name, 'V') + channel_name, + self._zip_unit( + segment.info['raw info']['channel'][channel_name]['data'], + version)) return segment conversion_info = conversion_set['conversion'][conversion] if conversion_info['base-calibration-slot'] \ @@ -234,7 +302,7 @@ class JPKDriver (Driver): segment = self._zip_scale_channel( segment, channel_name, conversion_info['base-calibration-slot'], - path=path, info=info) + path=path, info=info, version=version) if conversion_info['type'] == 'file': # Michael Haggerty at JPK points out that the conversion # information stored in the external file is reproduced in @@ -253,11 +321,17 @@ class JPKDriver (Driver): conversion_info['scaling']['style'] multiplier = float(conversion_info['scaling']['multiplier']) offset = float(conversion_info['scaling']['offset']) - unit = conversion_info['scaling']['unit']['unit'] + unit = self._zip_unit(conversion_info['scaling'], version) segment[:,channel] = segment[:,channel] * multiplier + offset segment.info['columns'][channel] = join_data_label(channel_name, unit) return segment + def _zip_unit(self, conversion_info, version): + if version in ['0.%d' % i for i in range(3)]: + return conversion_info['unit'] + else: + return conversion_info['unit']['unit'] + def _parse_params(self, lines): info = {} for line in lines: @@ -275,10 +349,16 @@ class JPKDriver (Driver): sub_info[s] = {} sub_info = sub_info[s] if setting[-1] == 'list': # split a space-delimited list - sub_info[setting[-1]] = fields[1].split(' ') + if fields[1]: + sub_info[setting[-1]] = fields[1].split(' ') + else: + sub_info[setting[-1]] = [] else: sub_info[setting[-1]] = fields[1] return info def _read_old(self, path, info): - raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path) + raise NotImplementedError( + "Early JPK files (pre-zip) are not supported by Hooke. Please " + "use JPK's `out2jpk-force` script to convert your old files " + "to a more recent format before loading them with Hooke.")