Example of a multi-dimensional float field:
>>> data = Field(
- ... 'f', 'data', help='example data', count=(2,3,4))
+ ... 'f', 'data', help='example data', count=(2,3,4), array=True)
>>> data.arg_count
24
>>> list(data.indexes()) # doctest: +ELLIPSIS
Example of a nested structure field:
>>> run = Structure('run', fields=[time, data])
- >>> runs = Field(run, 'runs', help='pair of runs', count=2)
+ >>> runs = Field(run, 'runs', help='pair of runs', count=2, array=True)
>>> runs.arg_count # = 2 * (1 + 24)
50
>>> data1 = numpy.arange(data.arg_count).reshape(data.count)
--------
Structure
"""
- def __init__(self, format, name, default=None, help=None, count=1):
+ def __init__(self, format, name, default=None, help=None, count=1,
+ array=False):
self.format = format
self.name = name
self.default = default
self.help = help
self.count = count
+ self.array = array
self.setup()
def setup(self):
"""
_LOG.debug('setup {}'.format(self))
self.item_count = _numpy.prod(self.count) # number of item repeats
+ if not self.array and self.item_count != 1:
+ raise ValueError(
+ '{} must be an array field to have a count of {}'.format(
+ self, self.count))
if isinstance(self.format, Structure):
self.structure_count = sum(
f.arg_count for f in self.format.fields)
def indexes(self):
"""Iterate through indexes to a possibly multi-dimensional array"""
- assert self.item_count > 1, self
+ assert self.array, self
try:
i = [0] * len(self.count)
except TypeError: # non-iterable count
index = []
for j,c in enumerate(reversed(self.count)):
index.insert(0, i % c)
- i /= c
+ i //= c
yield index
def pack_data(self, data=None):
If the field is repeated (count > 1), the incoming data should
be iterable with each iteration returning a single item.
"""
- if self.item_count > 1:
+ if self.array:
if data is None:
data = []
if hasattr(data, 'flat'): # take advantage of numpy's ndarray.flat
item = None
for arg in self.pack_item(item):
yield arg
- elif self.item_count:
+ else:
for arg in self.pack_item(data):
yield arg
count = self.count
else:
count = 0 # padding bytes, etc.
- if count == 1:
+ if not self.array:
+ assert count == 1, (self, self.count)
return unpacked[0]
if isinstance(self.format, Structure):
try:
struct run {
unsigned int time;
short data[2][3];
- }
+ };
struct experiment {
unsigned short version;
struct run runs[2];
- }
+ };
As:
>>> time = Field('I', 'time', default=0, help='POSIX time')
>>> data = Field(
- ... 'h', 'data', default=0, help='example data', count=(2,3))
+ ... 'h', 'data', default=0, help='example data', count=(2,3),
+ ... array=True)
>>> run = Structure('run', fields=[time, data])
>>> version = Field(
... 'H', 'version', default=1, help='example version')
- >>> runs = Field(run, 'runs', help='pair of runs', count=2)
+ >>> runs = Field(run, 'runs', help='pair of runs', count=2, array=True)
>>> experiment = Structure('experiment', fields=[version, runs])
The structures automatically calculate the flattened data format:
If you set ``count=0``, the field is ignored.
>>> experiment2 = Structure('experiment', fields=[
- ... version, Field('f', 'ignored', count=0), runs], byte_order='>')
+ ... version, Field('f', 'ignored', count=0, array=True), runs],
+ ... byte_order='>')
>>> experiment2.format
'>HIhhhhhhIhhhhhh'
>>> d = experiment2.unpack(b)
struct vector {
unsigned int length;
short data[length];
- }
+ };
You can generate a Python version of this structure in two ways,
with a dynamic ``length``, or with a dynamic ``data``. In both
>>> dynamic_length_vector = DynamicStructure('vector',
... fields=[
... DynamicLengthField('I', 'length'),
- ... Field('h', 'data', count=0),
+ ... Field('h', 'data', count=0, array=True),
... ],
... byte_order='>')
>>> class DynamicDataField (DynamicField):
>>> dynamic_data_vector = DynamicStructure('vector',
... fields=[
... Field('I', 'length'),
- ... DynamicDataField('h', 'data', count=0),
+ ... DynamicDataField('h', 'data', count=0, array=True),
... ],
... byte_order='>')
f.setup()
f.format.setup()
if isinstance(f.format, DynamicStructure):
- if f.item_count == 1:
- # TODO, fix in case we *want* an array
- d[f.name] = {}
- f.format.unpack_stream(
- stream, parents=parents, data=data, d=d[f.name])
- else:
+ if f.array:
d[f.name] = []
for i in range(f.item_count):
x = {}
d[f.name].append(x)
f.format.unpack_stream(
stream, parents=parents, data=data, d=x)
+ else:
+ assert f.item_count == 1, (f, f.count)
+ d[f.name] = {}
+ f.format.unpack_stream(
+ stream, parents=parents, data=data, d=d[f.name])
if hasattr(f, 'post_unpack'):
_LOG.debug('post-unpack {}'.format(f))
repeat = f.post_unpack(parents=parents, data=data)
f.setup()
f.format.setup()
x = [f.format.unpack_from(b) for b in bs]
- if len(x) == 1: # TODO, fix in case we *want* an array
+ if not f.array:
+ assert len(x) == 1, (f, f.count, x)
x = x[0]
return x
else: