-# Copyright
+# Copyright (C) 2012 W. Trevor King <wking@tremily.us>
+#
+# This file is part of igor.
+#
+# igor is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# igor is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with igor. If not, see <http://www.gnu.org/licenses/>.
"""Structure and Field classes for declaring structures
from . import LOG as _LOG
-_buffer = buffer # save builtin buffer for clobbered situations
-
-
class Field (object):
"""Represent a Structure field.
Example of a multi-dimensional float field:
>>> data = Field(
- ... 'f', 'data', help='example data', count=(2,3,4))
+ ... 'f', 'data', help='example data', count=(2,3,4), array=True)
>>> data.arg_count
24
>>> list(data.indexes()) # doctest: +ELLIPSIS
Example of a nested structure field:
>>> run = Structure('run', fields=[time, data])
- >>> runs = Field(run, 'runs', help='pair of runs', count=2)
+ >>> runs = Field(run, 'runs', help='pair of runs', count=2, array=True)
>>> runs.arg_count # = 2 * (1 + 24)
50
>>> data1 = numpy.arange(data.arg_count).reshape(data.count)
--------
Structure
"""
- def __init__(self, format, name, default=None, help=None, count=1):
+ def __init__(self, format, name, default=None, help=None, count=1,
+ array=False):
self.format = format
self.name = name
self.default = default
self.help = help
self.count = count
+ self.array = array
self.setup()
def setup(self):
"""
_LOG.debug('setup {}'.format(self))
self.item_count = _numpy.prod(self.count) # number of item repeats
+ if not self.array and self.item_count != 1:
+ raise ValueError(
+ '{} must be an array field to have a count of {}'.format(
+ self, self.count))
if isinstance(self.format, Structure):
self.structure_count = sum(
f.arg_count for f in self.format.fields)
def indexes(self):
"""Iterate through indexes to a possibly multi-dimensional array"""
- assert self.item_count > 1, self
+ assert self.array, self
try:
i = [0] * len(self.count)
except TypeError: # non-iterable count
index = []
for j,c in enumerate(reversed(self.count)):
index.insert(0, i % c)
- i /= c
+ i //= c
yield index
def pack_data(self, data=None):
If the field is repeated (count > 1), the incoming data should
be iterable with each iteration returning a single item.
"""
- if self.item_count > 1:
+ if self.array:
if data is None:
data = []
if hasattr(data, 'flat'): # take advantage of numpy's ndarray.flat
item = None
for arg in self.pack_item(item):
yield arg
- elif self.item_count:
+ else:
for arg in self.pack_item(data):
yield arg
_LOG.debug('unpack {} for {} {}'.format(data, self, self.format))
iterator = iter(data)
try:
- items = [iterator.next() for i in range(self.arg_count)]
+ items = [next(iterator) for i in range(self.arg_count)]
except StopIteration:
raise ValueError('not enough data to unpack {}'.format(self))
try:
- iterator.next()
+ next(iterator)
except StopIteration:
pass
else:
count = self.count
else:
count = 0 # padding bytes, etc.
- if count == 1:
+ if not self.array:
+ assert count == 1, (self, self.count)
return unpacked[0]
if isinstance(self.format, Structure):
try:
struct run {
unsigned int time;
short data[2][3];
- }
+ };
struct experiment {
unsigned short version;
struct run runs[2];
- }
+ };
As:
>>> time = Field('I', 'time', default=0, help='POSIX time')
>>> data = Field(
- ... 'h', 'data', default=0, help='example data', count=(2,3))
+ ... 'h', 'data', default=0, help='example data', count=(2,3),
+ ... array=True)
>>> run = Structure('run', fields=[time, data])
>>> version = Field(
... 'H', 'version', default=1, help='example version')
- >>> runs = Field(run, 'runs', help='pair of runs', count=2)
+ >>> runs = Field(run, 'runs', help='pair of runs', count=2, array=True)
>>> experiment = Structure('experiment', fields=[version, runs])
The structures automatically calculate the flattened data format:
If you set ``count=0``, the field is ignored.
>>> experiment2 = Structure('experiment', fields=[
- ... version, Field('f', 'ignored', count=0), runs], byte_order='>')
+ ... version, Field('f', 'ignored', count=0, array=True), runs],
+ ... byte_order='>')
>>> experiment2.format
'>HIhhhhhhIhhhhhh'
>>> d = experiment2.unpack(b)
iterator = iter(args)
for f in self.fields:
try:
- items = [iterator.next() for i in range(f.arg_count)]
+ items = [next(iterator) for i in range(f.arg_count)]
except StopIteration:
raise ValueError('not enough data to unpack {}.{}'.format(
self, f))
data[f.name] = f.unpack_data(items)
try:
- iterator.next()
+ next(iterator)
except StopIteration:
pass
else:
struct vector {
unsigned int length;
short data[length];
- }
+ };
You can generate a Python version of this structure in two ways,
with a dynamic ``length``, or with a dynamic ``data``. In both
>>> dynamic_length_vector = DynamicStructure('vector',
... fields=[
... DynamicLengthField('I', 'length'),
- ... Field('h', 'data', count=0),
+ ... Field('h', 'data', count=0, array=True),
... ],
... byte_order='>')
>>> class DynamicDataField (DynamicField):
>>> dynamic_data_vector = DynamicStructure('vector',
... fields=[
... Field('I', 'length'),
- ... DynamicDataField('h', 'data', count=0),
+ ... DynamicDataField('h', 'data', count=0, array=True),
... ],
... byte_order='>')
- >>> b = '\x00\x00\x00\x02\x01\x02\x03\x04'
+ >>> b = b'\x00\x00\x00\x02\x01\x02\x03\x04'
>>> d = dynamic_length_vector.unpack(b)
>>> pprint(d)
{'data': array([258, 772]), 'length': 2}
if data is None:
parents = [self]
data = d = {}
- if _LOG.level == _logging.DEBUG:
+ if _LOG.level <= _logging.DEBUG:
stream = DebuggingStream(stream)
else:
parents = parents + [self]
f.setup()
f.format.setup()
if isinstance(f.format, DynamicStructure):
- if f.item_count == 1:
- # TODO, fix in case we *want* an array
- d[f.name] = {}
- f.format.unpack_stream(
- stream, parents=parents, data=data, d=d[f.name])
- else:
+ if f.array:
d[f.name] = []
for i in range(f.item_count):
x = {}
d[f.name].append(x)
f.format.unpack_stream(
stream, parents=parents, data=data, d=x)
+ else:
+ assert f.item_count == 1, (f, f.count)
+ d[f.name] = {}
+ f.format.unpack_stream(
+ stream, parents=parents, data=data, d=d[f.name])
if hasattr(f, 'post_unpack'):
_LOG.debug('post-unpack {}'.format(f))
repeat = f.post_unpack(parents=parents, data=data)
f.setup()
f.format.setup()
x = [f.format.unpack_from(b) for b in bs]
- if len(x) == 1: # TODO, fix in case we *want* an array
+ if not f.array:
+ assert len(x) == 1, (f, f.count, x)
x = x[0]
return x
else: