X-Git-Url: http://git.tremily.us/?p=hooke.git;a=blobdiff_plain;f=hooke%2Fdriver%2Fjpk.py;h=afa95704251c9fbb7a8ed214b726a38a2d45b55e;hp=2cbb852d627ec5e93ccbeb928ce1dbce18b54e64;hb=1e1315aa80b01c7442fc5792d74b7e60d08a225e;hpb=565f9d7b69d2e4a9ea447d7a50f8f835c3e08642 diff --git a/hooke/driver/jpk.py b/hooke/driver/jpk.py index 2cbb852..afa9570 100644 --- a/hooke/driver/jpk.py +++ b/hooke/driver/jpk.py @@ -18,6 +18,9 @@ # . """Driver for JPK ForceRobot's velocity clamp data format. + +This driver is based on JPK's :file:`JPKForceSpec.txt` version 0.12. +The specs are freely available from JPK, just email support@jpk.com. """ import os.path @@ -27,8 +30,8 @@ import zipfile import numpy from .. import curve as curve -from .. import experiment as experiment from ..util.util import Closing as Closing +from ..util.si import join_data_label, split_data_label from . import Driver as Driver @@ -39,6 +42,8 @@ class JPKDriver (Driver): super(JPKDriver, self).__init__(name='jpk') def is_me(self, path): + if os.path.isdir(path): + return False if zipfile.is_zipfile(path): # JPK file versions since at least 0.5 with Closing(zipfile.ZipFile(path, 'r')) as f: if 'header.properties' not in f.namelist(): @@ -68,100 +73,183 @@ class JPKDriver (Driver): with Closing(zipfile.ZipFile(path, 'r')) as f: f.path = path zip_info = self._zip_info(f) + version = zip_info['file-format-version'] segments = [] for i in range(len([p for p in f.namelist() if p.endswith('segment-header.properties')])): - segments.append(self._zip_segment(f, path, info, zip_info, i)) - for name in ['approach', 'retract']: - if len([s for s in segments if s.info['name'] == name]) == 0: - raise ValueError( - 'No segment for %s in %s, only %s' - % (name, path, [s.info['name'] for s in segments])) - return (segments, - self._zip_translate_params(zip_info, - segments[0].info['raw info'])) + segments.append(self._zip_segment( + f, path, info, zip_info, i, version)) + if version not in ['0.%d' % i for i in range(13)]: + raise NotImplementedError( + 'JPK file version %s not supported (yet).' % version) + curve_info = self._zip_translate_params( + zip_info, segments[0].info['raw info'], version) + for segment in segments: # HACK, should use curve-level spring constant + for key in ['spring constant (N/m)', + 'z piezo sensitivity (m/V)']: + if key in curve_info: + segment.info['spring constant (N/m)'] = \ + curve_info['spring constant (N/m)'] + + return (segments, curve_info) def _zip_info(self, zipfile): with Closing(zipfile.open('header.properties')) as f: info = self._parse_params(f.readlines()) return info - def _zip_segment(self, zipfile, path, info, zip_info, index): + def _zip_segment(self, zipfile, path, info, zip_info, index, version): prop_file = zipfile.open(os.path.join( 'segments', str(index), 'segment-header.properties')) prop = self._parse_params(prop_file.readlines()) prop_file.close() expected_shape = (int(prop['force-segment-header']['num-points']),) channels = [] + if 'list' not in prop['channels']: + prop['channels'] = {'list': prop['channels'].split()} for chan in prop['channels']['list']: chan_info = prop['channel'][chan] - channels.append(self._zip_channel(zipfile, index, chan, chan_info)) + channels.append(self._zip_channel( + zipfile, index, chan, chan_info)) if channels[-1].shape != expected_shape: - raise NotImplementedError( - 'Channel %d:%s in %s has strange shape %s != %s' - % (index, chan, zipfile.path, - channels[-1].shape, expected_shape)) + raise NotImplementedError( + 'Channel %d:%s in %s has strange shape %s != %s' + % (index, chan, zipfile.path, + channels[-1].shape, expected_shape)) + if len(channels) > 0: + shape = (len(channels[0]), len(channels)) + dtype = channels[0].dtype + else: # no channels for this data block + shape = (0,0) + dtype = numpy.float32 d = curve.Data( - shape=(len(channels[0]), len(channels)), - dtype=channels[0].dtype, + shape=shape, + dtype=dtype, info=self._zip_translate_segment_params(prop)) for i,chan in enumerate(channels): d[:,i] = chan - return self._zip_scale_segment(d, path, info) + return self._zip_scale_segment(d, path, info, version) def _zip_channel(self, zipfile, segment_index, channel_name, chan_info): - f = zipfile.open(os.path.join( - 'segments', str(segment_index), - chan_info['data']['file']['name']), 'r') - assert chan_info['data']['file']['format'] == 'raw', \ - 'Non-raw data format:\n%s' % pprint.pformat(chan_info) - assert chan_info['data']['type'] == 'float-data', \ - 'Non-float data format:\n%s' % pprint.pformat(chan_info) - data = numpy.frombuffer( - buffer(f.read()), - dtype=numpy.dtype(numpy.float32).newbyteorder('>'), - # Is JPK data always big endian? I can't find a config - # setting. The ForceRobot brochure - # http://www.jpk.com/forcerobot300-1.download.6d694150f14773dc76bc0c3a8a6dd0e8.pdf - # lists a PowerPC chip on page 4, under Control - # electronics, and PPCs are usually big endian. - # http://en.wikipedia.org/wiki/PowerPC#Endian_modes - ) - f.close() + if chan_info['data']['type'] in ['constant-data', 'raster-data']: + return self._zip_calculate_channel(chan_info) + with Closing(zipfile.open(os.path.join( + 'segments', str(segment_index), + chan_info['data']['file']['name']), 'r')) as f: + assert chan_info['data']['file']['format'] == 'raw', \ + 'Non-raw data format:\n%s' % pprint.pformat(chan_info) + dtype = self._zip_channel_dtype(chan_info) + data = numpy.frombuffer( + buffer(f.read()), + dtype=dtype,) return data - def _zip_translate_params(self, params, chan_info): + def _zip_calculate_channel(self, chan_info): + type_ = chan_info['data']['type'] + n = int(chan_info['data']['num-points']) + if type_ == 'constant-data': + return float(chan_info['data']['value'])*numpy.ones( + shape=(n,), + dtype=numpy.float32) + elif type_ == 'raster-data': + start = float(chan_info['data']['start']) + step = float(chan_info['data']['step']) + return numpy.arange( + start=start, + stop=start + step*(n-0.5), + step=step, + dtype=numpy.float32) + else: + raise ValueError('Unrecognized data format "%s"' % type_) + + def _zip_channel_dtype(self, chan_info): + type_ = chan_info['data']['type'] + if type_ in ['float-data', 'float']: + dtype = numpy.dtype(numpy.float32) + elif type_ in ['integer-data', 'memory-integer-data']: + encoder = chan_info['data']['encoder']['type'] + if encoder in ['signedinteger', 'signedinteger-limited']: + dtype = numpy.dtype(numpy.int32) + elif encoder in ['unsignedinteger', 'unsignedinteger-limited']: + dtype = numpy.dtype(numpy.uint32) + else: + raise ValueError('Unrecognized encoder type "%s" for "%s" data' + % (encoder, type_)) + elif type_ in ['short-data', 'short', 'memory-short-data']: + encoder = chan_info['data']['encoder']['type'] + if encoder in ['signedshort', 'signedshort-limited']: + dtype = numpy.dtype(numpy.int16) + elif encoder in ['unsignedshort', 'unsignedshort-limited']: + dtype = numpy.dtype(numpy.uint16) + else: + raise ValueError('Unrecognized encoder type "%s" for "%s" data' + % (encoder, type_)) + else: + raise ValueError('Unrecognized data format "%s"' % type_) + byte_order = '>' + # '>' (big endian) byte order. + # From version 0.3 of JPKForceSpec.txt in the "Binary data" section: + # All forms of raw data are stored in chronological order + # (the order in which they were collected), and the + # individual values are stored in network byte order + # (big-endian). The data type used to store the data is + # specified by the "channel.*.data.type" property, and is + # either short (2 bytes per value), integer (4 bytes), or + # float (4 bytes, IEEE format). + return dtype.newbyteorder(byte_order) + + def _zip_translate_params(self, params, chan_info, version): info = { 'raw info':params, #'time':self._time_from_TODO(raw_info[]), } - force_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['unit']['unit'] + if len(chan_info['channels']['list']) == 0: + return info + force_unit = self._zip_unit( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling'], + version) assert force_unit == 'N', force_unit force_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['base-calibration-slot'] assert force_base == 'distance', force_base - dist_unit = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['unit']['unit'] + dist_unit = self._zip_unit( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling'], + version) assert dist_unit == 'm', dist_unit + distance_base = chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['base-calibration-slot'] + assert distance_base == 'volts', distance_base + base_conversion = chan_info['channel']['vDeflection']['conversion-set']['conversions']['base'] + assert base_conversion == distance_base, base_conversion + distance_base_unit = self._zip_unit( + chan_info['channel']['vDeflection']['data'], + version) + assert distance_base_unit == 'V', distance_base_unit force_mult = float( chan_info['channel']['vDeflection']['conversion-set']['conversion']['force']['scaling']['multiplier']) + sens_mult = float( + chan_info['channel']['vDeflection']['conversion-set']['conversion']['distance']['scaling']['multiplier']) info['spring constant (N/m)'] = force_mult + info['z piezo sensitivity (m/V)'] = sens_mult return info def _zip_translate_segment_params(self, params): info = { - 'raw info':params, - 'columns':list(params['channels']['list']), - 'name':params['force-segment-header']['name']['name'], + 'raw info': params, + 'columns': list(params['channels']['list']), + 'name': self._zip_segment_name(params), } - if info['name'] in ['extend-spm', 'retract-spm', 'pause-at-end-spm']: - info['name'] = info['name'][:-len('-spm')] - if info['name'] == 'extend': - info['name'] = 'approach' - else: - raise NotImplementedError( - 'Unrecognized segment type %s' % info['name']) return info - def _zip_scale_segment(self, segment, path, info): + def _zip_segment_name(self, params): + name = params['force-segment-header']['name']['name'] + if name.endswith('-spm'): + name = name[:-len('-spm')] + if name == 'extend': + name = 'approach' + elif name.startswith('pause-at-'): + name = 'pause' + return name + + def _zip_scale_segment(self, segment, path, info, version): data = curve.Data( shape=segment.shape, dtype=segment.dtype, @@ -171,69 +259,60 @@ class JPKDriver (Driver): # raw column indices channels = segment.info['raw info']['channels']['list'] - z_col = channels.index('height') - d_col = channels.index('vDeflection') - - segment = self._zip_scale_channel( - segment, z_col, 'calibrated', path, info) - segment = self._zip_scale_channel( - segment, d_col, 'distance', path, info) - - assert segment.info['columns'][z_col] == 'height (m)', \ - segment.info['columns'][z_col] - assert segment.info['columns'][d_col] == 'vDeflection (m)', \ - segment.info['columns'][d_col] - - # scaled column indices same as raw column indices, - # because columns is a copy of channels.list - segment.info['columns'][z_col] = 'z piezo (m)' - segment.info['columns'][d_col] = 'deflection (m)' + for i,channel in enumerate(channels): + conversion = None + if channel == 'vDeflection': + conversion = 'distance' + segment = self._zip_scale_channel( + segment, channel, conversion=conversion, + path=path, info=info, version=version) + name,unit = split_data_label(segment.info['columns'][i]) + if name == 'vDeflection': + assert unit == 'm', segment.info['columns'][i] + segment.info['columns'][i] = join_data_label('deflection', 'm') + # Invert because deflection voltage increases as the + # tip moves away from the surface, but it makes more + # sense to me to have it increase as it moves toward + # the surface (positive tension on the protein chain). + segment[:,i] *= -1 + elif name == 'height': + assert unit == 'm', segment.info['columns'][i] + segment.info['columns'][i] = join_data_label('z piezo', 'm') return segment - def _zip_scale_channel(self, segment, channel, conversion, path, info): - channel_name = segment.info['raw info']['channels']['list'][channel] + def _zip_scale_channel(self, segment, channel_name, + conversion=None, path=None, info={}, version=None): + channel = segment.info['raw info']['channels']['list'].index( + channel_name) conversion_set = segment.info['raw info']['channel'][channel_name]['conversion-set'] + if conversion == None: + conversion = conversion_set['conversions']['default'] + if conversion == conversion_set['conversions']['base']: + segment.info['columns'][channel] = join_data_label( + channel_name, + self._zip_unit( + segment.info['raw info']['channel'][channel_name]['data'], + version)) + return segment conversion_info = conversion_set['conversion'][conversion] if conversion_info['base-calibration-slot'] \ != conversion_set['conversions']['base']: # Our conversion is stacked on a previous conversion. Do # the previous conversion first. segment = self._zip_scale_channel( - segment, channel, conversion_info['base-calibration-slot'], - info, path) + segment, channel_name, + conversion_info['base-calibration-slot'], + path=path, info=info, version=version) if conversion_info['type'] == 'file': - key = ('%s_%s_to_%s_calibration_file' - % (channel_name, - conversion_info['base-calibration-slot'], - conversion)) - calib_path = conversion_info['file'] - if key in info: - calib_path = os.path.join(os.path.dirname(path), info[key]) - self.logger().debug( - 'Overriding %s -> %s calibration for %s channel: %s' - % (conversion_info['base-calibration-slot'], - conversion, channel_name, calib_path)) - if os.path.exists(calib_path): - with file(calib_path, 'r') as f: - lines = [x.strip() for x in f.readlines()] - f.close() - calib = { # I've emailed JPK to confirm this file format. - 'title':lines[0], - 'multiplier':float(lines[1]), - 'offset':float(lines[2]), - 'unit':lines[3], - 'note':'\n'.join(lines[4:]), - } - segment[:,channel] = (segment[:,channel] * calib['multiplier'] - + calib['offset']) - segment.info['columns'][channel] = ( - '%s (%s)' % (channel_name, calib['unit'])) - return segment - else: - self.logger().warn( - 'Skipping %s -> %s calibration for %s channel. Calibration file %s not found' - % (conversion_info['base-calibration-slot'], - conversion, channel_name, calib_path)) + # Michael Haggerty at JPK points out that the conversion + # information stored in the external file is reproduced in + # the force curve file. So there is no need to actually + # read `conversion_info['file']`. In fact, the data there + # may have changed with future calibrations, while the + # information stored directly in conversion_info retains + # the calibration information as it was when the experiment + # was performed. + pass # Fall through to 'simple' conversion processing. else: assert conversion_info['type'] == 'simple', conversion_info['type'] assert conversion_info['scaling']['type'] == 'linear', \ @@ -242,11 +321,17 @@ class JPKDriver (Driver): conversion_info['scaling']['style'] multiplier = float(conversion_info['scaling']['multiplier']) offset = float(conversion_info['scaling']['offset']) - unit = conversion_info['scaling']['unit']['unit'] + unit = self._zip_unit(conversion_info['scaling'], version) segment[:,channel] = segment[:,channel] * multiplier + offset - segment.info['columns'][channel] = '%s (%s)' % (channel_name, unit) + segment.info['columns'][channel] = join_data_label(channel_name, unit) return segment + def _zip_unit(self, conversion_info, version): + if version in ['0.%d' % i for i in range(3)]: + return conversion_info['unit'] + else: + return conversion_info['unit']['unit'] + def _parse_params(self, lines): info = {} for line in lines: @@ -264,10 +349,16 @@ class JPKDriver (Driver): sub_info[s] = {} sub_info = sub_info[s] if setting[-1] == 'list': # split a space-delimited list - sub_info[setting[-1]] = fields[1].split(' ') + if fields[1]: + sub_info[setting[-1]] = fields[1].split(' ') + else: + sub_info[setting[-1]] = [] else: sub_info[setting[-1]] = fields[1] return info def _read_old(self, path, info): - raise NotImplementedError('No old-style JPK files were available for testing, please send us yours: %s' % path) + raise NotImplementedError( + "Early JPK files (pre-zip) are not supported by Hooke. Please " + "use JPK's `out2jpk-force` script to convert your old files " + "to a more recent format before loading them with Hooke.")