X-Git-Url: http://git.tremily.us/?p=hooke.git;a=blobdiff_plain;f=hooke%2Fdriver%2Fjpk.py;h=ba38f6a485d5b3c764318be8e21d45399c7c00c9;hp=bcacd85e288f8494b4fe48d285eb7a536ac459e5;hb=91cf002670de3d4668b84d53fad05be90ba7d585;hpb=d4e877dfa7ebb48f1f7963f9a73cd83b53da0099 diff --git a/hooke/driver/jpk.py b/hooke/driver/jpk.py index bcacd85..ba38f6a 100644 --- a/hooke/driver/jpk.py +++ b/hooke/driver/jpk.py @@ -73,23 +73,18 @@ class JPKDriver (Driver): with Closing(zipfile.ZipFile(path, 'r')) as f: f.path = path zip_info = self._zip_info(f) + version = zip_info['file-format-version'] segments = [] for i in range(len([p for p in f.namelist() if p.endswith('segment-header.properties')])): - segments.append(self._zip_segment(f, path, info, zip_info, i)) - if zip_info['file-format-version'] not in ['0.%d' % i - for i in range(12)]: + segments.append(self._zip_segment( + f, path, info, zip_info, i, version)) + if version not in ['0.%d' % i for i in range(12)]: raise NotImplementedError( - 'JPK file version %s not supported (yet).' - % zip_info['file-format-version']) - for name in ['approach', 'retract']: - if len([s for s in segments if s.info['name'] == name]) == 0: - raise ValueError( - 'No segment for %s in %s, only %s' - % (name, path, [s.info['name'] for s in segments])) - curve_info = self._zip_translate_params(zip_info, - segments[0].info['raw info']) - for segment in segments: + 'JPK file version %s not supported (yet).' % version) + curve_info = self._zip_translate_params( + zip_info, segments[0].info['raw info'], version) + for segment in segments: # HACK, should use curve-level spring constant segment.info['spring constant (N/m)'] = \ curve_info['spring constant (N/m)'] return (segments, curve_info) @@ -99,40 +94,95 @@ class JPKDriver (Driver): info = self._parse_params(f.readlines()) return info - def _zip_segment(self, zipfile, path, info, zip_info, index): + def _zip_segment(self, zipfile, path, info, zip_info, index, version): prop_file = zipfile.open(os.path.join( 'segments', str(index), 'segment-header.properties')) prop = self._parse_params(prop_file.readlines()) prop_file.close() expected_shape = (int(prop['force-segment-header']['num-points']),) channels = [] + if 'list' not in prop['channels']: + prop['channels'] = {'list': prop['channels'].split()} for chan in prop['channels']['list']: chan_info = prop['channel'][chan] - channels.append(self._zip_channel(zipfile, index, chan, chan_info)) + channels.append(self._zip_channel( + zipfile, index, chan, chan_info)) if channels[-1].shape != expected_shape: - raise NotImplementedError( - 'Channel %d:%s in %s has strange shape %s != %s' - % (index, chan, zipfile.path, - channels[-1].shape, expected_shape)) + raise NotImplementedError( + 'Channel %d:%s in %s has strange shape %s != %s' + % (index, chan, zipfile.path, + channels[-1].shape, expected_shape)) + if len(channels) > 0: + shape = (len(channels[0]), len(channels)) + dtype = channels[0].dtype + else: # no channels for this data block + shape = (0,0) + dtype = numpy.float32 d = curve.Data( - shape=(len(channels[0]), len(channels)), - dtype=channels[0].dtype, + shape=shape, + dtype=dtype, info=self._zip_translate_segment_params(prop)) for i,chan in enumerate(channels): d[:,i] = chan - return self._zip_scale_segment(d, path, info) + return self._zip_scale_segment(d, path, info, version) def _zip_channel(self, zipfile, segment_index, channel_name, chan_info): - f = zipfile.open(os.path.join( - 'segments', str(segment_index), - chan_info['data']['file']['name']), 'r') - assert chan_info['data']['file']['format'] == 'raw', \ - 'Non-raw data format:\n%s' % pprint.pformat(chan_info) - assert chan_info['data']['type'] == 'float-data', \ - 'Non-float data format:\n%s' % pprint.pformat(chan_info) - data = numpy.frombuffer( - buffer(f.read()), - dtype=numpy.dtype(numpy.float32).newbyteorder('>')) + if chan_info['data']['type'] in ['constant-data', 'raster-data']: + return self._zip_calculate_channel(chan_info) + with Closing(zipfile.open(os.path.join( + 'segments', str(segment_index), + chan_info['data']['file']['name']), 'r')) as f: + assert chan_info['data']['file']['format'] == 'raw', \ + 'Non-raw data format:\n%s' % pprint.pformat(chan_info) + dtype = self._zip_channel_dtype(chan_info) + data = numpy.frombuffer( + buffer(f.read()), + dtype=dtype,) + return data + + def _zip_calculate_channel(self, chan_info): + type_ = chan_info['data']['type'] + n = int(chan_info['data']['num-points']) + if type_ == 'constant-data': + return float(chan_info['data']['value'])*numpy.ones( + shape=(n,), + dtype=numpy.float32) + elif type_ == 'raster-data': + start = float(chan_info['data']['start']) + step = float(chan_info['data']['step']) + return numpy.arange( + start=start, + stop=start + step*(n-0.5), + step=step, + dtype=numpy.float32) + else: + raise ValueError('Unrecognized data format "%s"' % type_) + + def _zip_channel_dtype(self, chan_info): + type_ = chan_info['data']['type'] + if type_ in ['float-data', 'float']: + dtype = numpy.dtype(numpy.float32) + elif type_ in ['integer-data', 'memory-integer-data']: + encoder = chan_info['data']['encoder']['type'] + if encoder in ['signedinteger', 'signedinteger-limited']: + dtype = numpy.dtype(numpy.int32) + elif encoder in ['unsignedinteger', 'unsignedinteger-limited']: + dtype = numpy.dtype(numpy.uint32) + else: + raise ValueError('Unrecognized encoder type "%s" for "%s" data' + % (encoder, type_)) + elif type_ in ['short-data', 'short', 'memory-short-data']: + encoder = chan_info['data']['encoder']['type'] + if encoder in ['signedshort', 'signedshort-limited']: + dtype = numpy.dtype(numpy.int16) + elif encoder in ['unsignedshort', 'unsignedshort-limited']: + dtype = numpy.dtype(numpy.uint16) + else: + raise ValueError('Unrecognized encoder type "%s" for "%s" data' + % (encoder, type_)) + else: + raise ValueError('Unrecognized data format "%s"' % type_) + byte_order = '>' # '>' (big endian) byte order. # From version 0.3 of JPKForceSpec.txt in the "Binary data" section: # All forms of raw data are stored in chronological order @@ -142,41 +192,54 @@ class JPKDriver (Driver): # specified by the "channel.*.data.type" property, and is # either short (2 bytes per value), integer (4 bytes), or # float (4 bytes, IEEE format). - f.close() - return data + return dtype.newbyteorder(byte_order) - def _zip_translate_params(self, params, chan_info): + def _zip_translate_params(self, params, chan_info, version): info = { 'raw info':params, #'time':self._time_from_TODO(raw_info[]), } - force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit'] + force_unit = self._zip_segment_conversion_unit( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['force'], + version) assert force_unit == 'N', force_unit force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot'] assert force_base == 'distance', force_base - dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit'] + dist_unit = self._zip_segment_conversion_unit( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance'], + version) assert dist_unit == 'm', dist_unit + distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot'] + assert distance_base == 'volts', distance_base + # Assume volts unit is V, but it is not specified in the JPK + # file format. force_mult = float( chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier']) + sens_mult = float( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier']) info['spring constant (N/m)'] = force_mult + info['z piezo sensitivity (m/V)'] = sens_mult return info def _zip_translate_segment_params(self, params): info = { - 'raw info':params, - 'columns':list(params['channels']['list']), - 'name':params['force-segment-header']['name']['name'], + 'raw info': params, + 'columns': list(params['channels']['list']), + 'name': self._zip_segment_name(params), } - if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']: - info['name'] = info['name'][:-len('-spm')] - if info['name'] == 'extend': - info['name'] = 'approach' - else: - raise NotImplementedError( - 'Unrecognized segment type %s' % info['name']) return info - def _zip_scale_segment(self, segment, path, info): + def _zip_segment_name(self, params): + name = params['force-segment-header']['name']['name'] + if name.endswith('-spm'): + name = name[:-len('-spm')] + if name == 'extend': + name = 'approach' + elif name.startswith('pause-at-'): + name = 'pause' + return name + + def _zip_scale_segment(self, segment, path, info, version): data = curve.Data( shape=segment.shape, dtype=segment.dtype, @@ -191,7 +254,8 @@ class JPKDriver (Driver): if channel == 'vDeflection': conversion = 'distance' segment = self._zip_scale_channel( - segment, channel, conversion=conversion, path=path, info=info) + segment, channel, conversion=conversion, + path=path, info=info, version=version) name,unit = split_data_label(segment.info['columns'][i]) if name == 'vDeflection': assert unit == 'm', segment.info['columns'][i] @@ -206,8 +270,8 @@ class JPKDriver (Driver): segment.info['columns'][i] = join_data_label('z piezo', 'm') return segment - def _zip_scale_channel(self, segment, channel_name, conversion=None, - path=None, info={}): + def _zip_scale_channel(self, segment, channel_name, + conversion=None, path=None, info={}, version=None): channel = segment.info['raw info']['channels']['list'].index( channel_name) conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set'] @@ -229,7 +293,7 @@ class JPKDriver (Driver): segment = self._zip_scale_channel( segment, channel_name, conversion_info['base-calibration-slot'], - path=path, info=info) + path=path, info=info, version=version) if conversion_info['type'] == 'file': # Michael Haggerty at JPK points out that the conversion # information stored in the external file is reproduced in @@ -248,11 +312,17 @@ class JPKDriver (Driver): conversion_info['scaling']['style'] multiplier = float(conversion_info['scaling']['multiplier']) offset = float(conversion_info['scaling']['offset']) - unit = conversion_info['scaling']['unit']['unit'] + unit = self._zip_segment_conversion_unit(conversion_info, version) segment[:,channel] = segment[:,channel] * multiplier + offset segment.info['columns'][channel] = join_data_label(channel_name, unit) return segment + def _zip_segment_conversion_unit(self, conversion_info, version): + if version in ['0.%d' % i for i in range(3)]: + return conversion_info['scaling']['unit'] + else: + return conversion_info['scaling']['unit']['unit'] + def _parse_params(self, lines): info = {} for line in lines: