3 """Structure and Field classes for declaring structures
5 There are a few formats that can be used to represent the same data, a
6 binary packed format with all the data in a buffer, a linearized
7 format with each field in a single Python list, and a nested format
8 with each field in a hierarchy of Python dictionaries.
11 from __future__ import absolute_import
12 import struct as _struct
14 import numpy as _numpy
17 _buffer = buffer # save builtin buffer for clobbered situations
21 """Represent a Structure field.
23 The format argument can be a format character from the ``struct``
24 documentation (e.g., ``c`` for ``char``, ``h`` for ``short``, ...)
25 or ``Structure`` instance (for building nested structures).
30 >>> from pprint import pprint
33 Example of an unsigned short integer field:
36 ... 'I', 'time', default=0, help='POSIX time')
39 >>> list(time.pack_data(1))
41 >>> list(time.pack_item(2))
43 >>> time.unpack_data([3])
45 >>> time.unpack_item([4])
48 Example of a multi-dimensional float field:
51 ... 'f', 'data', help='example data', count=(2,3,4))
54 >>> list(data.indexes()) # doctest: +ELLIPSIS
55 [[0, 0, 0], [0, 0, 1], [0, 0, 2], [0, 0, 3], [0, 1, 0], ..., [1, 2, 3]]
56 >>> list(data.pack_data(
57 ... [[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]],
58 ... [[12, 13, 14, 15], [16, 17, 18, 19], [20, 21, 22, 23]]])
59 ... ) # doctest: +ELLIPSIS
60 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ..., 19, 20, 21, 22, 23]
61 >>> list(data.pack_item(3))
63 >>> data.unpack_data(range(data.total_count))
64 array([[[ 0, 1, 2, 3],
71 >>> data.unpack_item([3])
74 Example of a nested structure field:
76 >>> run = Structure('run', fields=[time, data])
77 >>> runs = Field(run, 'runs', help='pair of runs', count=2)
78 >>> runs.total_count # = 2 * (1 + 24)
80 >>> data1 = numpy.arange(data.total_count).reshape(data.count)
81 >>> data2 = data1 + data.total_count
82 >>> list(runs.pack_data(
83 ... [{'time': 100, 'data': data1},
84 ... {'time': 101, 'data': data2}])
85 ... ) # doctest: +ELLIPSIS
86 [100, 0, 1, 2, ..., 22, 23, 101, 24, 25, ..., 46, 47]
87 >>> list(runs.pack_item({'time': 100, 'data': data1})
88 ... ) # doctest: +ELLIPSIS
89 [100, 0, 1, 2, ..., 22, 23]
90 >>> pprint(runs.unpack_data(range(runs.total_count)))
91 [{'data': array([[[ 1, 2, 3, 4],
99 {'data': array([[[26, 27, 28, 29],
107 >>> pprint(runs.unpack_item(range(runs.structure_count)))
108 {'data': array([[[ 1, 2, 3, 4],
117 If you don't give enough values for an array field, the remaining
118 values are filled in with their defaults.
120 >>> list(data.pack_data(
121 ... [[[0, 1, 2, 3], [4, 5, 6]], [[10]]])) # doctest: +ELLIPSIS
122 Traceback (most recent call last):
124 ValueError: no default for <Field data ...>
126 >>> list(data.pack_data(
127 ... [[[0, 1, 2, 3], [4, 5, 6]], [[10]]]))
128 [0, 1, 2, 3, 4, 5, 6, 0, 0, 0, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
134 def __init__(self, format, name, default=None, help=None, count=1):
137 self.default = default
140 self.item_count = _numpy.prod(count) # number of item repeats
141 if isinstance(self.format, Structure):
142 self.structure_count = sum(f.total_count for f in format.fields)
143 self.total_count = self.item_count * self.structure_count
145 self.total_count = self.item_count # struct.Struct format chars
148 return self.__repr__()
151 return '<{} {} {}>'.format(
152 self.__class__.__name__, self.name, id(self))
155 """Iterate through indexes to a possibly multi-dimensional array"""
156 assert self.item_count > 1, self
158 i = [0] * len(self.count)
159 except TypeError: # non-iterable count
160 for i in range(self.count):
163 for i in range(self.item_count):
165 for j,c in enumerate(reversed(self.count)):
166 index.insert(0, i % c)
170 def pack_data(self, data=None):
171 """Linearize a single field's data to a flat list.
173 If the field is repeated (count > 1), the incoming data should
174 be iterable with each iteration returning a single item.
176 if self.item_count > 1:
179 if hasattr(data, 'flat'): # take advantage of numpy's ndarray.flat
181 for item in data.flat:
183 for arg in self.pack_item(item):
185 if items < self.item_count:
186 if f.default is None:
188 'no default for {}.{}'.format(self, f))
189 for i in range(self.item_count - items):
192 for index in self.indexes():
194 if isinstance(index, int):
202 for arg in self.pack_item(item):
205 for arg in self.pack_item(data):
208 def pack_item(self, item=None):
209 """Linearize a single count of the field's data to a flat iterable
211 if isinstance(self.format, Structure):
212 for i in self.format._pack_item(item):
215 if self.default is None:
216 raise ValueError('no default for {}'.format(self))
221 def unpack_data(self, data):
222 """Inverse of .pack_data"""
223 iterator = iter(data)
225 items = [iterator.next() for i in range(self.total_count)]
226 except StopIteration:
227 raise ValueError('not enough data to unpack {}'.format(self))
230 except StopIteration:
233 raise ValueError('too much data to unpack {}'.format(self))
234 if isinstance(self.format, Structure):
235 # break into per-structure clumps
236 s = self.structure_count
237 items = zip(*[items[i::s] for i in range(s)])
239 items = [[i] for i in items]
240 unpacked = [self.unpack_item(i) for i in items]
243 if isinstance(self.format, Structure):
249 raise NotImplementedError('reshape Structure field')
251 unpacked = _numpy.array(unpacked)
252 unpacked = unpacked.reshape(self.count)
255 def unpack_item(self, item):
256 """Inverse of .unpack_item"""
257 if isinstance(self.format, Structure):
258 return self.format._unpack_item(item)
260 assert len(item) == 1, item
264 class Structure (_struct.Struct):
265 r"""Represent a C structure.
267 A convenient wrapper around struct.Struct that uses Fields and
268 adds dict-handling methods for transparent name assignment.
278 >>> from pprint import pprint
280 Represent the C structures::
288 unsigned short version;
294 >>> time = Field('I', 'time', default=0, help='POSIX time')
296 ... 'h', 'data', default=0, help='example data', count=(2,3))
297 >>> run = Structure('run', fields=[time, data])
299 ... 'H', 'version', default=1, help='example version')
300 >>> runs = Field(run, 'runs', help='pair of runs', count=2)
301 >>> experiment = Structure('experiment', fields=[version, runs])
303 The structures automatically calculate the flattened data format:
307 >>> run.size # 4 + 2*3*2
309 >>> experiment.format
311 >>> experiment.size # 2 + 2*(4 + 2*3*2)
314 You can read data out of any object supporting the buffer
317 >>> b = array.array('B', range(experiment.size))
318 >>> experiment.set_byte_order('>')
319 >>> d = experiment.unpack_from(buffer=b)
321 {'runs': [{'data': array([[1543, 2057, 2571],
322 [3085, 3599, 4113]]),
324 {'data': array([[5655, 6169, 6683],
325 [7197, 7711, 8225]]),
328 >>> [hex(x) for x in d['runs'][0]['data'].flat]
329 ['0x607L', '0x809L', '0xa0bL', '0xc0dL', '0xe0fL', '0x1011L']
331 You can also read out from strings:
333 >>> d = experiment.unpack(b.tostring())
335 {'runs': [{'data': array([[1543, 2057, 2571],
336 [3085, 3599, 4113]]),
338 {'data': array([[5655, 6169, 6683],
339 [7197, 7711, 8225]]),
343 If you don't give enough values for an array field, the remaining
344 values are filled in with their defaults.
346 >>> experiment.pack_into(buffer=b, data=d)
347 >>> b.tostring()[:17]
348 '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10'
349 >>> b.tostring()[17:]
350 '\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !'
351 >>> run0 = d['runs'].pop(0)
352 >>> b = experiment.pack(data=d)
354 '\x00\x01\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f '
356 '!\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
358 def __init__(self, name, fields, byte_order='='):
359 # '=' for native byte order, standard size and alignment
360 # See http://docs.python.org/library/struct for details
363 self.set_byte_order(byte_order)
369 return '<{} {} {}>'.format(
370 self.__class__.__name__, self.name, id(self))
372 def set_byte_order(self, byte_order):
373 """Allow changing the format byte_order on the fly.
375 if (hasattr(self, 'format') and self.format != None
376 and self.format.startswith(byte_order)):
377 return # no need to change anything
379 for field in self.fields:
380 if isinstance(field.format, Structure):
381 field_format = field.format.sub_format(
384 field_format = [field.format]*field.item_count
385 format.extend(field_format)
386 super(Structure, self).__init__(
387 format=byte_order+''.join(format).replace('P', 'L'))
389 def sub_format(self):
390 return self.format.lstrip('=<>') # byte order handled by parent
392 def _pack_item(self, item=None):
393 """Linearize a single count of the structure's data to a flat iterable
397 for f in self.fields:
402 for arg in f.pack_data(data):
405 def _unpack_item(self, args):
406 """Inverse of ._unpack_item"""
408 iterator = iter(args)
409 for f in self.fields:
411 items = [iterator.next() for i in range(f.total_count)]
412 except StopIteration:
413 raise ValueError('not enough data to unpack {}.{}'.format(
415 data[f.name] = f.unpack_data(items)
418 except StopIteration:
421 raise ValueError('too much data to unpack {}'.format(self))
424 def pack(self, data):
425 args = list(self._pack_item(data))
426 return super(Structure, self).pack(*args)
428 def pack_into(self, buffer, offset=0, data={}):
429 args = list(self._pack_item(data))
430 return super(Structure, self).pack_into(
431 buffer, offset, *args)
433 def unpack(self, *args, **kwargs):
434 args = super(Structure, self).unpack(*args, **kwargs)
435 return self._unpack_item(args)
437 def unpack_from(self, buffer, offset=0, *args, **kwargs):
439 args = super(Structure, self).unpack_from(
440 buffer, offset, *args, **kwargs)
441 except _struct.error as e:
442 if not self.name in ('WaveHeader2', 'WaveHeader5'):
444 # HACK! For WaveHeader5, when npnts is 0, wData is
445 # optional. If we couldn't unpack the structure, fill in
446 # wData with zeros and try again, asserting that npnts is
448 if len(buffer) - offset < self.size:
449 # missing wData? Pad with zeros
450 buffer += _buffer('\x00'*(self.size + offset - len(buffer)))
451 args = super(Structure, self).unpack_from(buffer, offset)
452 data = self._unpack_item(args)
453 assert data['npnts'] == 0, data['npnts']
454 return self._unpack_item(args)