- f = zipfile.open(os.path.join(
- 'segments', str(segment_index),
- chan_info['data']['file']['name']), 'r')
- assert chan_info['data']['file']['format'] == 'raw', \
- 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
- assert chan_info['data']['type'] == 'float-data', \
- 'Non-float data format:\n%s' % pprint.pformat(chan_info)
- data = numpy.frombuffer(
- buffer(f.read()),
- dtype=numpy.dtype(numpy.float32).newbyteorder('>'))
+ if chan_info['data']['type'] in ['constant-data', 'raster-data']:
+ return self._zip_calculate_channel(chan_info)
+ with Closing(zipfile.open(os.path.join(
+ 'segments', str(segment_index),
+ chan_info['data']['file']['name']), 'r')) as f:
+ assert chan_info['data']['file']['format'] == 'raw', \
+ 'Non-raw data format:\n%s' % pprint.pformat(chan_info)
+ dtype = self._zip_channel_dtype(chan_info)
+ data = numpy.frombuffer(
+ buffer(f.read()),
+ dtype=dtype,)
+ return data
+
+ def _zip_calculate_channel(self, chan_info):
+ type_ = chan_info['data']['type']
+ n = int(chan_info['data']['num-points'])
+ if type_ == 'constant-data':
+ return float(chan_info['data']['value'])*numpy.ones(
+ shape=(n,),
+ dtype=numpy.float32)
+ elif type_ == 'raster-data':
+ start = float(chan_info['data']['start'])
+ step = float(chan_info['data']['step'])
+ return numpy.arange(
+ start=start,
+ stop=start + step*(n-0.5),
+ step=step,
+ dtype=numpy.float32)
+ else:
+ raise ValueError('Unrecognized data format "%s"' % type_)
+
+ def _zip_channel_dtype(self, chan_info):
+ type_ = chan_info['data']['type']
+ if type_ in ['float-data', 'float']:
+ dtype = numpy.dtype(numpy.float32)
+ elif type_ in ['integer-data', 'memory-integer-data']:
+ encoder = chan_info['data']['encoder']['type']
+ if encoder in ['signedinteger', 'signedinteger-limited']:
+ dtype = numpy.dtype(numpy.int32)
+ elif encoder in ['unsignedinteger', 'unsignedinteger-limited']:
+ dtype = numpy.dtype(numpy.uint32)
+ else:
+ raise ValueError('Unrecognized encoder type "%s" for "%s" data'
+ % (encoder, type_))
+ elif type_ in ['short-data', 'short', 'memory-short-data']:
+ encoder = chan_info['data']['encoder']['type']
+ if encoder in ['signedshort', 'signedshort-limited']:
+ dtype = numpy.dtype(numpy.int16)
+ elif encoder in ['unsignedshort', 'unsignedshort-limited']:
+ dtype = numpy.dtype(numpy.uint16)
+ else:
+ raise ValueError('Unrecognized encoder type "%s" for "%s" data'
+ % (encoder, type_))
+ else:
+ raise ValueError('Unrecognized data format "%s"' % type_)
+ byte_order = '>'