for entry in reversed(list(self._data[id].traverse())):
self._remove(entry.id)
+ def ancestors(self, *args, **kwargs):
+ """Return a list of the specified entry's ancestors' ids."""
+ if self.is_readable() == False:
+ raise NotReadable('Cannot list parents with unreadable storage.')
+ return self._ancestors(*args, **kwargs)
+
+ def _ancestors(self, id=None, revision=None):
+ if id == None:
+ return []
+ ancestors = []
+ stack = [id]
+ while len(stack) > 0:
+ id = stack.pop(0)
+ parent = self._data[id].parent
+ if parent != None and not parent.id.startswith('__'):
+ ancestor = parent.id
+ ancestors.append(ancestor)
+ stack.append(ancestor)
+ return ancestors
+
def children(self, *args, **kwargs):
"""Return a list of specified entry's children's ids."""
if self.is_readable() == False:
for entry in reversed(list(self._data[-1][id].traverse())):
self._remove(entry.id)
+ def _ancestors(self, id=None, revision=None):
+ if id == None:
+ return []
+ if revision == None:
+ revision = -1
+ else:
+ revision = int(revision)
+ ancestors = []
+ stack = [id]
+ while len(stack) > 0:
+ id = stack.pop(0)
+ parent = self._data[revision][id].parent
+ if parent != None and not parent.id.startswith('__'):
+ ancestor = parent.id
+ ancestors.append(ancestor)
+ stack.append(ancestor)
+ return ancestors
+
def _children(self, id=None, revision=None):
if id == None:
id = '__ROOT__'
self.failUnless(len(self.s.children()) == 0, self.s.children())
def test_add_identical_rooted(self):
- """
- Adding entries with the same ID should not increase the number of children.
+ """Adding entries with the same ID should not increase the number of children.
"""
for i in range(10):
self.s.add('some id', directory=False)
self.failUnless(s == ['some id'], s)
def test_add_rooted(self):
- """
- Adding entries should increase the number of children (rooted).
+ """Adding entries should increase the number of children (rooted).
"""
ids = []
for i in range(10):
self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_add_nonrooted(self):
- """
- Adding entries should increase the number of children (nonrooted).
+ """Adding entries should increase the number of children (nonrooted).
"""
self.s.add('parent', directory=True)
ids = []
s = self.s.children()
self.failUnless(s == ['parent'], s)
- def test_children(self):
+ def test_ancestors(self):
+ """Check ancestors lists.
"""
- Non-UUID ids should be returned as such.
+ self.s.add('parent', directory=True)
+ for i in range(10):
+ i_id = str(i)
+ self.s.add(i_id, 'parent', directory=True)
+ for j in range(10): # add some grandkids
+ j_id = str(20*(i+1)+j)
+ self.s.add(j_id, i_id, directory=(i%2 == 0))
+ ancestors = sorted(self.s.ancestors(j_id))
+ self.failUnless(ancestors == [i_id, 'parent'],
+ 'Unexpected ancestors for %s/%s, "%s"'
+ % (i_id, j_id, ancestors))
+
+ def test_children(self):
+ """Non-UUID ids should be returned as such.
"""
self.s.add('parent', directory=True)
ids = []
self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_add_invalid_directory(self):
- """
- Should not be able to add children to non-directories.
+ """Should not be able to add children to non-directories.
"""
self.s.add('parent', directory=False)
try:
self.s.children('parent'))
def test_remove_rooted(self):
- """
- Removing entries should decrease the number of children (rooted).
+ """Removing entries should decrease the number of children (rooted).
"""
ids = []
for i in range(10):
self.failUnless(s == ids, '\n %s\n !=\n %s' % (s, ids))
def test_remove_nonrooted(self):
- """
- Removing entries should decrease the number of children (nonrooted).
+ """Removing entries should decrease the number of children (nonrooted).
"""
self.s.add('parent', directory=True)
ids = []
self.failUnless(s == ['parent'], s)
def test_remove_directory_not_empty(self):
- """
- Removing a non-empty directory entry should raise exception.
+ """Removing a non-empty directory entry should raise exception.
"""
self.s.add('parent', directory=True)
ids = []
pass
def test_recursive_remove(self):
- """
- Recursive remove should empty the tree.
- """
+ """Recursive remove should empty the tree."""
self.s.add('parent', directory=True)
ids = []
for i in range(10):
val = 'unlikely value'
def test_get_default(self):
- """
- Get should return specified default if id not in Storage.
+ """Get should return specified default if id not in Storage.
"""
ret = self.s.get(self.id, default=self.val)
self.failUnless(ret == self.val,
% (vars(self.Class)['name'], ret, self.val))
def test_get_default_exception(self):
- """
- Get should raise exception if id not in Storage and no default.
+ """Get should raise exception if id not in Storage and no default.
"""
try:
ret = self.s.get(self.id)
pass
def test_get_initial_value(self):
- """
- Data value should be None before any value has been set.
+ """Data value should be None before any value has been set.
"""
self.s.add(self.id, directory=False)
ret = self.s.get(self.id)
% (vars(self.Class)['name'], ret))
def test_set_exception(self):
- """
- Set should raise exception if id not in Storage.
+ """Set should raise exception if id not in Storage.
"""
try:
self.s.set(self.id, self.val)
pass
def test_set(self):
- """
- Set should define the value returned by get.
+ """Set should define the value returned by get.
"""
self.s.add(self.id, directory=False)
self.s.set(self.id, self.val)
% (vars(self.Class)['name'], ret, self.val))
def test_unicode_set(self):
- """
- Set should define the value returned by get.
+ """Set should define the value returned by get.
"""
val = u'Fran\xe7ois'
self.s.add(self.id, directory=False)
val = 'unlikely value'
def test_get_set_persistence(self):
- """
- Set should define the value returned by get after reconnect.
+ """Set should define the value returned by get after reconnect.
"""
self.s.add(self.id, directory=False)
self.s.set(self.id, self.val)
% (vars(self.Class)['name'], ret, self.val))
def test_add_nonrooted_persistence(self):
- """
- Adding entries should increase the number of children after reconnect.
+ """Adding entries should increase the number of children after reconnect.
"""
self.s.add('parent', directory=True)
ids = []
pass
def test_revision_id_exception(self):
- """
- Invalid revision id should raise InvalidRevision.
+ """Invalid revision id should raise InvalidRevision.
"""
try:
rev = self.s.revision_id('highly unlikely revision id')
pass
def test_empty_commit_raises_exception(self):
- """
- Empty commit should raise exception.
+ """Empty commit should raise exception.
"""
self._setup_for_empty_commit()
try:
pass
def test_empty_commit_allowed(self):
- """
- Empty commit should _not_ raise exception if allow_empty=True.
+ """Empty commit should _not_ raise exception if allow_empty=True.
"""
self._setup_for_empty_commit()
self.s.commit(self.commit_msg, self.commit_body,
allow_empty=True)
def test_commit_revision_ids(self):
- """
- Commit / revision_id should agree on revision ids.
+ """Commit / revision_id should agree on revision ids.
"""
def val(i):
return '%s:%d' % (self.val, i+1)
% (vars(self.Class)['name'], i, rev, revs[i]))
def test_get_previous_version(self):
- """
- Get should be able to return the previous version.
+ """Get should be able to return the previous version.
"""
def val(i):
return '%s:%d' % (self.val, i+1)
% (vars(self.Class)['name'], ret, val(i), revs[i]))
def test_get_previous_children(self):
- """
- Children list should be revision dependent.
+ """Children list should be revision dependent.
"""
self.s.add('parent', directory=True)
revs = []
"""Test cases for VersionedStorage.changed() method."""
def test_changed(self):
+ """Changed lists should reflect past activity"""
self.s.add('dir', directory=True)
self.s.add('modified', parent='dir')
self.s.set('modified', 'some value to be modified')