import copy
import hashlib
+import os
import os.path
import types
-import xml.dom.minidom
+
+if False: # YAML dump debugging code
+ """To help isolate data types etc. that give YAML problems.
+
+ This is usually caused by external C modules (e.g. numpy) that
+ define new types (e.g. numpy.dtype) which YAML cannot inspect.
+ """
+ import yaml.representer
+ import sys
+ def ignore_aliases(data):
+ print data, type(data)
+ sys.stdout.flush()
+ if data in [None, ()]:
+ return True
+ if isinstance(data, (str, unicode, bool, int, float)):
+ return True
+ yaml.representer.SafeRepresenter.ignore_aliases = staticmethod(
+ ignore_aliases)
+
+import yaml
+from yaml.representer import RepresenterError
from . import curve as curve
-from .compat import minidom as minidom # dynamically patch xml.sax.minidom
+from .util.itertools import reverse_enumerate
class NoteIndexList (list):
self.name = name
self.info = {}
self._index = 0
+ self._set_ignored_attrs()
def __str__(self):
return str(self.__unicode__())
def __repr__(self):
return self.__str__()
+ def _set_ignored_attrs(self):
+ self._ignored_attrs = ['_ignored_attrs', '_default_attrs']
+ self._default_attrs = {
+ 'info': {},
+ }
+
+ def __getstate__(self):
+ state = dict(self.__dict__)
+ for key in self._ignored_attrs:
+ if key in state:
+ del(state[key])
+ for key,value in self._default_attrs.items():
+ if key in state and state[key] == value:
+ del(state[key])
+ assert 'items' not in state
+ state['items'] = []
+ self._assert_clean_state(self, state)
+ for item in self: # save curves and their attributes
+ item_state = self._item_getstate(item)
+ self._assert_clean_state(item, item_state)
+ state['items'].append(item_state)
+ return state
+
+ def __setstate__(self, state):
+ self._set_ignored_attrs()
+ for key,value in self._default_attrs.items():
+ setattr(self, key, value)
+ for key,value in state.items():
+ if key == 'items':
+ continue
+ setattr(self, key, value)
+ for item_state in state['items']:
+ self.append(self._item_setstate(item_state))
+
+ def _item_getstate(self, item):
+ return item
+
+ def _item_setstate(self, state):
+ return state
+
+ def _assert_clean_state(self, owner, state):
+ for k,v in state.items():
+ if k == 'drivers': # HACK. Need better driver serialization.
+ continue
+ try:
+ yaml.dump((k,v))
+ except RepresenterError, e:
+ raise NotImplementedError(
+ 'cannot convert %s.%s = %s (%s) to YAML\n%s'
+ % (owner.__class__.__name__, k, v, type(v), e))
+
def _setup_item(self, item):
"""Perform any required initialization before returning an item.
"""
def items(self, reverse=False):
"""Iterate through `self` calling `_setup_item` on each item
before yielding.
+
+ Notes
+ -----
+ Updates :attr:`_index` during the iteration so
+ :func:`~hooke.plugin.curve.current_curve_callback` works as
+ expected in :class:`~hooke.command.Command`\s called from
+ :class:`~hooke.plugin.playlist.ApplyCommand`. After the
+ iteration completes, :attr:`_index` is restored to its
+ original value.
"""
+ index = self._index
items = self
if reverse == True:
- items = reversed(self)
- for item in items:
+ items = reverse_enumerate(self)
+ else:
+ items = enumerate(self)
+ for i,item in items:
+ self._index = i
self._setup_item(item)
yield item
+ self._index = index
- def filter(self, keeper_fn=lambda item:True, *args, **kwargs):
+ def filter(self, keeper_fn=lambda item:True, load_curves=True,
+ *args, **kwargs):
c = copy.deepcopy(self)
- for item in c.items(reverse=True):
+ if load_curves == True:
+ items = c.items(reverse=True)
+ else:
+ items = reversed(c)
+ for item in items:
if keeper_fn(item, *args, **kwargs) != True:
c.remove(item)
try: # attempt to maintain the same current item
c._index = 0
return c
+
class Playlist (NoteIndexList):
"""A :class:`NoteIndexList` of :class:`hooke.curve.Curve`\s.
def __init__(self, drivers, name=None):
super(Playlist, self).__init__(name=name)
self.drivers = drivers
- self._loaded = [] # List of loaded curves, see :meth:`._setup_item`.
self._max_loaded = 100 # curves to hold in memory simultaneously.
- def append_curve_by_path(self, path, info=None, identify=True):
- if self.path != None:
- path = os.path.join(os.path.dirname(self.path), path)
+ def _set_ignored_attrs(self):
+ super(Playlist, self)._set_ignored_attrs()
+ self._ignored_attrs.extend([
+ '_item_ignored_attrs', '_item_default_attrs',
+ '_loaded'])
+ self._item_ignored_attrs = ['data']
+ self._item_default_attrs = {
+ 'command_stack': [],
+ 'driver': None,
+ 'info': {},
+ 'name': None,
+ }
+ self._loaded = [] # List of loaded curves, see :meth:`._setup_item`.
+
+ def _item_getstate(self, item):
+ assert isinstance(item, curve.Curve), type(item)
+ state = item.__getstate__()
+ for key in self._item_ignored_attrs:
+ if key in state:
+ del(state[key])
+ for key,value in self._item_default_attrs.items():
+ if key in state and state[key] == value:
+ del(state[key])
+ return state
+
+ def _item_setstate(self, state):
+ for key,value in self._item_default_attrs.items():
+ if key not in state:
+ state[key] = value
+ item = curve.Curve(path=None)
+ item.__setstate__(state)
+ return item
+
+ def append_curve_by_path(self, path, info=None, identify=True, hooke=None):
path = os.path.normpath(path)
c = curve.Curve(path, info=info)
+ c.set_hooke(hooke)
if identify == True:
c.identify(self.drivers)
self.append(c)
oldest = self._loaded.pop(0)
oldest.unload()
+
class FilePlaylist (Playlist):
- version = '0.1'
+ """A file-backed :class:`Playlist`.
+ """
+ version = '0.2'
def __init__(self, drivers, name=None, path=None):
super(FilePlaylist, self).__init__(drivers, name)
+ self.path = self._base_path = None
self.set_path(path)
+ self._relative_curve_paths = True
+
+ def _set_ignored_attrs(self):
+ super(FilePlaylist, self)._set_ignored_attrs()
+ self._ignored_attrs.append('_digest')
self._digest = None
- self._ignored_keys = [
- 'experiment', # class instance, not very exciting.
- ]
+
+ def __getstate__(self):
+ state = super(FilePlaylist, self).__getstate__()
+ assert 'version' not in state, state
+ state['version'] = self.version
+ return state
+
+ def __setstate__(self, state):
+ assert('version') in state, state
+ version = state.pop('version')
+ assert version == FilePlaylist.version, (
+ 'invalid version %s (%s) != %s (%s)'
+ % (version, type(version),
+ FilePlaylist.version, type(FilePlaylist.version)))
+ super(FilePlaylist, self).__setstate__(state)
+
+ def _item_getstate(self, item):
+ state = super(FilePlaylist, self)._item_getstate(item)
+ if state.get('path', None) != None:
+ path = os.path.abspath(os.path.expanduser(state['path']))
+ if self._relative_curve_paths == True:
+ path = os.path.relpath(path, self._base_path)
+ state['path'] = path
+ return state
+
+ def _item_setstate(self, state):
+ item = super(FilePlaylist, self)._item_setstate(state)
+ if 'path' in state:
+ item.set_path(os.path.join(self._base_path, state['path']))
+ return item
def set_path(self, path):
- if path != None:
+ if path == None:
+ if self._base_path == None:
+ self._base_path = os.getcwd()
+ else:
if not path.endswith('.hkp'):
path += '.hkp'
self.path = path
+ self._base_path = os.path.dirname(os.path.abspath(
+ os.path.expanduser(self.path)))
if self.name == None:
self.name = os.path.basename(path)
+ def append_curve_by_path(self, path, *args, **kwargs):
+ if self._base_path != None:
+ path = os.path.join(self._base_path, path)
+ super(FilePlaylist, self).append_curve_by_path(path, *args, **kwargs)
+
def is_saved(self):
return self.digest() == self._digest
>>> c.info['note'] = 'The second curve'
>>> p.append(c)
>>> p.digest()
- '\\\x14\x87\x88*q\xf8\xaa\xa7\x84f\x82\xa1S>\xfd3+\xd0o'
+ '\xa1\x1ax\xb1|\x84uA\xe4\x1d\xbf`\x004|\x82\xc2\xdd\xc1\x9e'
"""
string = self.flatten()
return hashlib.sha1(string).digest()
- def _clean_key(self, key):
- """Replace spaces in keys with \\u00B7 (middle dot).
-
- This character is deemed unlikely to occur in keys to our
- playlist and curve info dictionaries, while many keys have
- spaces in them.
-
- \\u00B7 is allowed in XML 1.0 as of the 5th edition. See
- the `4th edition errata`_ for details.
-
- .. _4th edition errata:
- http://www.w3.org/XML/xml-V10-4e-errata#E09
- """
- return key.replace(' ', u'\u00B7')
-
- def _restore_key(self, key):
- """Restore keys encoded with :meth:`_clean_key`.
- """
- return key.replace(u'\u00B7', ' ')
-
- def flatten(self, absolute_paths=False):
+ def flatten(self):
"""Create a string representation of the playlist.
- A playlist is an XML document with the following syntax::
+ A playlist is a YAML document with the following minimal syntax::
- <?xml version="1.0" encoding="utf-8"?>
- <playlist attribute="value">
- <curve path="/my/file/path/"/ attribute="value" ...>
- <curve path="...">
- </playlist>
+ version: '0.2'
+ items:
+ - path: picoforce.000
+ - path: picoforce.001
Relative paths are interpreted relative to the location of the
playlist file.
-
+
Examples
--------
+ >>> from .engine import CommandMessage
+
>>> root_path = os.path.sep + 'path'
>>> p = FilePlaylist(drivers=[],
... path=os.path.join(root_path, 'to','playlist'))
>>> p.append(c)
>>> c = curve.Curve(os.path.join(root_path, 'to', 'curve', 'two'))
>>> c.info['attr with spaces'] = 'The second curve\\nwith endlines'
+ >>> c.command_stack.extend([
+ ... CommandMessage('command A', {'arg 0':0, 'arg 1':'X'}),
+ ... CommandMessage('command B', {'arg 0':1, 'arg 1':'Y'}),
+ ... ])
>>> p.append(c)
- >>> def _print(string):
- ... escaped_string = unicode(string, 'utf-8').encode('unicode escape')
- ... print escaped_string.replace('\\\\n', '\\n').replace('\\\\t', '\\t'),
- >>> _print(p.flatten()) # doctest: +NORMALIZE_WHITESPACE +REPORT_UDIFF
- <?xml version="1.0" encoding="utf-8"?>
- <playlist index="0" note="An example playlist" version="0.1">
- <curve note="The first curve" path="curve/one"/>
- <curve attr\\xb7with\\xb7spaces="The second curve
with endlines" path="curve/two"/>
- </playlist>
- >>> _print(p.flatten(absolute_paths=True)) # doctest: +NORMALIZE_WHITESPACE +REPORT_UDIFF
- <?xml version="1.0" encoding="utf-8"?>
- <playlist index="0" note="An example playlist" version="0.1">
- <curve note="The first curve" path="/path/to/curve/one"/>
- <curve attr\\xb7with\\xb7spaces="The second curve
with endlines" path="/path/to/curve/two"/>
- </playlist>
- """
- implementation = xml.dom.minidom.getDOMImplementation()
- # create the document DOM object and the root element
- doc = implementation.createDocument(None, 'playlist', None)
- root = doc.documentElement
- root.setAttribute('version', self.version) # store playlist version
- root.setAttribute('index', str(self._index))
- for key,value in self.info.items(): # save info variables
- if (key in self._ignored_keys
- or not isinstance(value, types.StringTypes)):
- continue
- root.setAttribute(self._clean_key(key), str(value))
- for curve in self: # save curves and their attributes
- curve_element = doc.createElement('curve')
- root.appendChild(curve_element)
- path = os.path.abspath(os.path.expanduser(curve.path))
- if absolute_paths == False:
- path = os.path.relpath(
- path,
- os.path.dirname(
- os.path.abspath(
- os.path.expanduser(self.path))))
- curve_element.setAttribute('path', path)
- for key,value in curve.info.items():
- if (key in self._ignored_keys
- or not isinstance(value, types.StringTypes)):
- continue
- curve_element.setAttribute(self._clean_key(key), str(value))
- string = doc.toprettyxml(encoding='utf-8')
- root.unlink() # break circular references for garbage collection
- return string
-
- def _from_xml_doc(self, doc, identify=True):
- """Load a playlist from an :class:`xml.dom.minidom.Document`
- instance.
+ >>> print p.flatten() # doctest: +REPORT_UDIFF
+ # Hooke playlist version 0.2
+ _base_path: /path/to
+ _index: 0
+ _max_loaded: 100
+ _relative_curve_paths: true
+ drivers: []
+ info: {note: An example playlist}
+ items:
+ - info: {note: The first curve}
+ name: one
+ path: curve/one
+ - command_stack: !!python/object/new:hooke.command_stack.CommandStack
+ listitems:
+ - !!python/object:hooke.engine.CommandMessage
+ arguments: {arg 0: 0, arg 1: X}
+ command: command A
+ - !!python/object:hooke.engine.CommandMessage
+ arguments: {arg 0: 1, arg 1: Y}
+ command: command B
+ info: {attr with spaces: 'The second curve
+ <BLANKLINE>
+ with endlines'}
+ name: two
+ path: curve/two
+ name: playlist.hkp
+ path: /path/to/playlist.hkp
+ version: '0.2'
+ <BLANKLINE>
+ >>> p._relative_curve_paths = False
+ >>> print p.flatten() # doctest: +REPORT_UDIFF
+ # Hooke playlist version 0.2
+ _base_path: /path/to
+ _index: 0
+ _max_loaded: 100
+ _relative_curve_paths: false
+ drivers: []
+ info: {note: An example playlist}
+ items:
+ - info: {note: The first curve}
+ name: one
+ path: /path/to/curve/one
+ - command_stack: !!python/object/new:hooke.command_stack.CommandStack
+ listitems:
+ - !!python/object:hooke.engine.CommandMessage
+ arguments: {arg 0: 0, arg 1: X}
+ command: command A
+ - !!python/object:hooke.engine.CommandMessage
+ arguments: {arg 0: 1, arg 1: Y}
+ command: command B
+ info: {attr with spaces: 'The second curve
+ <BLANKLINE>
+ with endlines'}
+ name: two
+ path: /path/to/curve/two
+ name: playlist.hkp
+ path: /path/to/playlist.hkp
+ version: '0.2'
+ <BLANKLINE>
"""
- root = doc.documentElement
- for attribute,value in root.attributes.items():
- attribute = self._restore_key(attribute)
- if attribute == 'version':
- assert value == self.version, \
- 'Cannot read v%s playlist with a v%s reader' \
- % (value, self.version)
- elif attribute == 'index':
- self._index = int(value)
- else:
- self.info[attribute] = value
- for curve_element in doc.getElementsByTagName('curve'):
- path = curve_element.getAttribute('path')
- info = dict([(self._restore_key(key), value)
- for key,value in curve_element.attributes.items()])
- info.pop('path')
- self.append_curve_by_path(path, info, identify=identify)
- self.jump(self._index) # ensure valid index
-
- def from_string(self, string, identify=True):
+ yaml_string = yaml.dump(self.__getstate__(), allow_unicode=True)
+ return ('# Hooke playlist version %s\n' % self.version) + yaml_string
+
+ def from_string(self, string):
u"""Load a playlist from a string.
Examples
--------
- >>> string = '''<?xml version="1.0" encoding="utf-8"?>
- ... <playlist index="1" note="An example playlist" version="0.1">
- ... <curve note="The first curve" path="../curve/one"/>
- ... <curve attr\xb7with\xb7spaces="The second curve
with endlines" path="../curve/two"/>
- ... </playlist>
+ Minimal example.
+
+ >>> string = '''# Hooke playlist version 0.2
+ ... version: '0.2'
+ ... items:
+ ... - path: picoforce.000
+ ... - path: picoforce.001
+ ... '''
+ >>> p = FilePlaylist(drivers=[],
+ ... path=os.path.join('/path', 'to', 'my', 'playlist'))
+ >>> p.from_string(string)
+ >>> for curve in p:
+ ... print curve.path
+ /path/to/my/picoforce.000
+ /path/to/my/picoforce.001
+
+ More complicated example.
+
+ >>> string = '''# Hooke playlist version 0.2
+ ... _base_path: /path/to
+ ... _digest: null
+ ... _index: 1
+ ... _max_loaded: 100
+ ... _relative_curve_paths: true
+ ... info: {note: An example playlist}
+ ... items:
+ ... - info: {note: The first curve}
+ ... path: curve/one
+ ... - command_stack: !!python/object/new:hooke.command_stack.CommandStack
+ ... listitems:
+ ... - !!python/object:hooke.engine.CommandMessage
+ ... arguments: {arg 0: 0, arg 1: X}
+ ... command: command A
+ ... - !!python/object:hooke.engine.CommandMessage
+ ... arguments: {arg 0: 1, arg 1: Y}
+ ... command: command B
+ ... info: {attr with spaces: 'The second curve
+ ...
+ ... with endlines'}
+ ... name: two
+ ... path: curve/two
+ ... name: playlist.hkp
+ ... path: /path/to/playlist.hkp
+ ... version: '0.2'
... '''
>>> p = FilePlaylist(drivers=[],
... path=os.path.join('path', 'to', 'my', 'playlist'))
- >>> p.from_string(string, identify=False)
+ >>> p.from_string(string)
>>> p._index
1
>>> p.info
- {u'note': u'An example playlist'}
+ {'note': 'An example playlist'}
>>> for curve in p:
- ... print curve.path
- path/to/curve/one
- path/to/curve/two
+ ... print curve.name, curve.path
+ one /path/to/curve/one
+ two /path/to/curve/two
>>> p[-1].info['attr with spaces']
- u'The second curve\\nwith endlines'
+ 'The second curve\\nwith endlines'
+ >>> type(p[-1].command_stack)
+ <class 'hooke.command_stack.CommandStack'>
+ >>> p[-1].command_stack # doctest: +NORMALIZE_WHITESPACE
+ [<CommandMessage command A {arg 0: 0, arg 1: X}>,
+ <CommandMessage command B {arg 0: 1, arg 1: Y}>]
"""
- doc = xml.dom.minidom.parseString(string)
- self._from_xml_doc(doc, identify=identify)
+ state = yaml.load(string)
+ self.__setstate__(state)
- def load(self, path=None, identify=True):
+ def save(self, path=None, makedirs=True):
+ """Saves the playlist to a YAML file.
+ """
+ self.set_path(path)
+ dirname = os.path.dirname(self.path) or '.'
+ if makedirs == True and not os.path.isdir(dirname):
+ os.makedirs(dirname)
+ with open(self.path, 'w') as f:
+ f.write(self.flatten())
+ self._digest = self.digest()
+
+ def load(self, path=None, identify=True, hooke=None):
"""Load a playlist from a file.
"""
self.set_path(path)
- doc = xml.dom.minidom.parse(self.path)
- self._from_xml_doc(doc, identify=identify)
+ with open(self.path, 'r') as f:
+ text = f.read()
+ self.from_string(text)
self._digest = self.digest()
+ for curve in self:
+ curve.set_hooke(hooke)
+ if identify == True:
+ curve.identify(self.drivers)
- def save(self, path=None):
- """Saves the playlist in a XML file.
- """
- self.set_path(path)
- f = file(self.path, 'w')
- f.write(self.flatten())
- f.close()
+
+class Playlists (NoteIndexList):
+ """A :class:`NoteIndexList` of :class:`FilePlaylist`\s.
+ """
+ def __init__(self, *arg, **kwargs):
+ super(Playlists, self).__init__(*arg, **kwargs)
+
+ def _item_getstate(self, item):
+ assert isinstance(item, FilePlaylist), type(item)
+ return item.__getstate__()
+
+ def _item_setstate(self, state):
+ item = FilePlaylist(drivers=[])
+ item.__setstate__(state)
+ return item