X-Git-Url: http://git.tremily.us/?p=hooke.git;a=blobdiff_plain;f=hooke%2Fdriver%2Fjpk.py;h=ba38f6a485d5b3c764318be8e21d45399c7c00c9;hp=fec6cf523b01de9955de74f012c8da79ce90f50e;hb=91cf002670de3d4668b84d53fad05be90ba7d585;hpb=378f0ba6bb8884c5cc2694ca0dba901cd924055d diff --git a/hooke/driver/jpk.py b/hooke/driver/jpk.py index fec6cf5..ba38f6a 100644 --- a/hooke/driver/jpk.py +++ b/hooke/driver/jpk.py @@ -73,17 +73,17 @@ class JPKDriver (Driver): with Closing(zipfile.ZipFile(path, 'r')) as f: f.path = path zip_info = self._zip_info(f) + version = zip_info['file-format-version'] segments = [] for i in range(len([p for p in f.namelist() if p.endswith('segment-header.properties')])): - segments.append(self._zip_segment(f, path, info, zip_info, i)) - if zip_info['file-format-version'] not in ['0.%d' % i - for i in range(12)]: + segments.append(self._zip_segment( + f, path, info, zip_info, i, version)) + if version not in ['0.%d' % i for i in range(12)]: raise NotImplementedError( - 'JPK file version %s not supported (yet).' - % zip_info['file-format-version']) - curve_info = self._zip_translate_params(zip_info, - segments[0].info['raw info']) + 'JPK file version %s not supported (yet).' % version) + curve_info = self._zip_translate_params( + zip_info, segments[0].info['raw info'], version) for segment in segments: # HACK, should use curve-level spring constant segment.info['spring constant (N/m)'] = \ curve_info['spring constant (N/m)'] @@ -94,7 +94,7 @@ class JPKDriver (Driver): info = self._parse_params(f.readlines()) return info - def _zip_segment(self, zipfile, path, info, zip_info, index): + def _zip_segment(self, zipfile, path, info, zip_info, index, version): prop_file = zipfile.open(os.path.join( 'segments', str(index), 'segment-header.properties')) prop = self._parse_params(prop_file.readlines()) @@ -124,19 +124,65 @@ class JPKDriver (Driver): info=self._zip_translate_segment_params(prop)) for i,chan in enumerate(channels): d[:,i] = chan - return self._zip_scale_segment(d, path, info) + return self._zip_scale_segment(d, path, info, version) def _zip_channel(self, zipfile, segment_index, channel_name, chan_info): - f = zipfile.open(os.path.join( - 'segments', str(segment_index), - chan_info['data']['file']['name']), 'r') - assert chan_info['data']['file']['format'] == 'raw', \ - 'Non-raw data format:\n%s' % pprint.pformat(chan_info) - assert chan_info['data']['type'] == 'float-data', \ - 'Non-float data format:\n%s' % pprint.pformat(chan_info) - data = numpy.frombuffer( - buffer(f.read()), - dtype=numpy.dtype(numpy.float32).newbyteorder('>')) + if chan_info['data']['type'] in ['constant-data', 'raster-data']: + return self._zip_calculate_channel(chan_info) + with Closing(zipfile.open(os.path.join( + 'segments', str(segment_index), + chan_info['data']['file']['name']), 'r')) as f: + assert chan_info['data']['file']['format'] == 'raw', \ + 'Non-raw data format:\n%s' % pprint.pformat(chan_info) + dtype = self._zip_channel_dtype(chan_info) + data = numpy.frombuffer( + buffer(f.read()), + dtype=dtype,) + return data + + def _zip_calculate_channel(self, chan_info): + type_ = chan_info['data']['type'] + n = int(chan_info['data']['num-points']) + if type_ == 'constant-data': + return float(chan_info['data']['value'])*numpy.ones( + shape=(n,), + dtype=numpy.float32) + elif type_ == 'raster-data': + start = float(chan_info['data']['start']) + step = float(chan_info['data']['step']) + return numpy.arange( + start=start, + stop=start + step*(n-0.5), + step=step, + dtype=numpy.float32) + else: + raise ValueError('Unrecognized data format "%s"' % type_) + + def _zip_channel_dtype(self, chan_info): + type_ = chan_info['data']['type'] + if type_ in ['float-data', 'float']: + dtype = numpy.dtype(numpy.float32) + elif type_ in ['integer-data', 'memory-integer-data']: + encoder = chan_info['data']['encoder']['type'] + if encoder in ['signedinteger', 'signedinteger-limited']: + dtype = numpy.dtype(numpy.int32) + elif encoder in ['unsignedinteger', 'unsignedinteger-limited']: + dtype = numpy.dtype(numpy.uint32) + else: + raise ValueError('Unrecognized encoder type "%s" for "%s" data' + % (encoder, type_)) + elif type_ in ['short-data', 'short', 'memory-short-data']: + encoder = chan_info['data']['encoder']['type'] + if encoder in ['signedshort', 'signedshort-limited']: + dtype = numpy.dtype(numpy.int16) + elif encoder in ['unsignedshort', 'unsignedshort-limited']: + dtype = numpy.dtype(numpy.uint16) + else: + raise ValueError('Unrecognized encoder type "%s" for "%s" data' + % (encoder, type_)) + else: + raise ValueError('Unrecognized data format "%s"' % type_) + byte_order = '>' # '>' (big endian) byte order. # From version 0.3 of JPKForceSpec.txt in the "Binary data" section: # All forms of raw data are stored in chronological order @@ -146,19 +192,22 @@ class JPKDriver (Driver): # specified by the "channel.*.data.type" property, and is # either short (2 bytes per value), integer (4 bytes), or # float (4 bytes, IEEE format). - f.close() - return data + return dtype.newbyteorder(byte_order) - def _zip_translate_params(self, params, chan_info): + def _zip_translate_params(self, params, chan_info, version): info = { 'raw info':params, #'time':self._time_from_TODO(raw_info[]), } - force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit'] + force_unit = self._zip_segment_conversion_unit( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['force'], + version) assert force_unit == 'N', force_unit force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot'] assert force_base == 'distance', force_base - dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit'] + dist_unit = self._zip_segment_conversion_unit( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance'], + version) assert dist_unit == 'm', dist_unit distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot'] assert distance_base == 'volts', distance_base @@ -190,7 +239,7 @@ class JPKDriver (Driver): name = 'pause' return name - def _zip_scale_segment(self, segment, path, info): + def _zip_scale_segment(self, segment, path, info, version): data = curve.Data( shape=segment.shape, dtype=segment.dtype, @@ -205,7 +254,8 @@ class JPKDriver (Driver): if channel == 'vDeflection': conversion = 'distance' segment = self._zip_scale_channel( - segment, channel, conversion=conversion, path=path, info=info) + segment, channel, conversion=conversion, + path=path, info=info, version=version) name,unit = split_data_label(segment.info['columns'][i]) if name == 'vDeflection': assert unit == 'm', segment.info['columns'][i] @@ -220,8 +270,8 @@ class JPKDriver (Driver): segment.info['columns'][i] = join_data_label('z piezo', 'm') return segment - def _zip_scale_channel(self, segment, channel_name, conversion=None, - path=None, info={}): + def _zip_scale_channel(self, segment, channel_name, + conversion=None, path=None, info={}, version=None): channel = segment.info['raw info']['channels']['list'].index( channel_name) conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set'] @@ -243,7 +293,7 @@ class JPKDriver (Driver): segment = self._zip_scale_channel( segment, channel_name, conversion_info['base-calibration-slot'], - path=path, info=info) + path=path, info=info, version=version) if conversion_info['type'] == 'file': # Michael Haggerty at JPK points out that the conversion # information stored in the external file is reproduced in @@ -262,11 +312,17 @@ class JPKDriver (Driver): conversion_info['scaling']['style'] multiplier = float(conversion_info['scaling']['multiplier']) offset = float(conversion_info['scaling']['offset']) - unit = conversion_info['scaling']['unit']['unit'] + unit = self._zip_segment_conversion_unit(conversion_info, version) segment[:,channel] = segment[:,channel] * multiplier + offset segment.info['columns'][channel] = join_data_label(channel_name, unit) return segment + def _zip_segment_conversion_unit(self, conversion_info, version): + if version in ['0.%d' % i for i in range(3)]: + return conversion_info['scaling']['unit'] + else: + return conversion_info['scaling']['unit']['unit'] + def _parse_params(self, lines): info = {} for line in lines: