From f52fc3a243edf5ccef2dcdfd0c4b4cded4357e13 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Wed, 9 Dec 2009 07:23:54 -0500 Subject: [PATCH] Rethought libbe.util.id module --- libbe/util/id.py | 294 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 254 insertions(+), 40 deletions(-) diff --git a/libbe/util/id.py b/libbe/util/id.py index d57205f..d443706 100644 --- a/libbe/util/id.py +++ b/libbe/util/id.py @@ -20,10 +20,13 @@ Handle ID creation and parsing. """ import os.path +import re import libbe if libbe.TESTING == True: + import doctest + import sys import unittest try: @@ -60,6 +63,25 @@ except ImportError: return output.rstrip('\n') +HIERARCHY = ['bugdir', 'bug', 'comment'] + + +class MultipleIDMatches (ValueError): + def __init__(self, id, matches): + msg = ("More than one id matches %s. " + "Please be more specific.\n%s" % (id, matches)) + ValueError.__init__(self, msg) + self.id = id + self.matches = matches + +class NoIDMatches (KeyError): + def __init__(self, id, possible_ids): + msg = "No id matches %s.\n%s" % (id, possible_ids) + KeyError.__init__(self, msg) + self.id = id + self.possible_ids = possible_ids + + def _assemble(*args): args = list(args) for i,arg in enumerate(args): @@ -74,50 +96,242 @@ def _split(id): args[i] = None return args -def _is_a_uuid(id): - if id.startswith('uuid:'): - return True - return False - -def _uuid_to_id(id): - return 'uuid:' + id - -def _id_to_uuid(id): - return id[len('uuid:'):] - -def bugdir_id(bugdir, *args): - return _assemble(_uuid_to_id(bugdir.uuid), *args) - -def bug_id(bug, *args): - if bug.bugdir == None: - bdid = None - else: - bdid = bugdir_id(bug.bugdir) - return _assemble(bdid, _uuid_to_id(bug.uuid), *args) - -def comment_id(comment, *args): - if comment.bug == None: - bid = None - else: - bid = bug_id(comment.bug) - return _assemble(bid, _uuid_to_id(comment.uuid), *args) - -def parse_id(id): - args = _split(id) - ret = {'bugdir':_id_to_uuid(args.pop(0))} - type = 'bugdir' - for child_name in ['bug', 'comment']: - if len(args) > 0 and _is_a_uuid(args[0]): - ret[child_name] = _id_to_uuid(args.pop(0)) - type = child_name - ret['type'] = type - ret['remaining'] = os.path.join(args) +def _truncate(uuid, other_uuids, min_length=3): + chars = min_length + for id in other_uuids: + if id == uuid: + continue + while (id[:chars] == uuid[:chars]): + chars+=1 + return uuid[:chars] + +def _expand(truncated_id, other_ids): + matches = [] + for id in other_ids: + if id.startswith(truncated_id): + matches.append(id) + if len(matches) > 1: + raise MultipleIDMatches(truncated_id, matches) + if len(matches) == 0: + raise NoIDMatches(truncated_id, other_ids) + return matches[0] + + +class ID (object): + """ + IDs have several formats specialized for different uses. + + In storage, all objects are represented by their uuid alone, + because that is the simplest globally unique identifier. You can + generate ids of this sort with the .storage() method. Because an + object's storage may be distributed across several chunks, and the + chunks may not have their own uuid, we generate chunk ids by + prepending the objects uuid to the chunk name. The user id types + do not support this chunk extension feature. + + For users, the full uuids are a bit overwhelming, so we truncate + them while retaining local uniqueness (with regards to the other + objects currently in storage). We also prepend truncated parent + ids for two reasons: + (1) so that a user can locate the repository containing the + referenced object. It would be hard to find bug 'XYZ' if + that's all you knew. Much easier with 'ABC/XYZ', where ABC + is the bugdir. Each project can publish a list of bugdir-id +x - to - location mappings, e.g. + ABC...(full uuid)...DEF https://server.com/projectX/be/ + which is easier than publishing all-object-ids-to-location + mappings. + (2) because it's easier to generate and parse truncated ids if + you don't have to fetch all the ids in the storage + repository, but can restrict yourself to a specific branch. + You can generate ids of this sort with the .user() method, + although in order to preform the truncation, your object (and its + parents must define a .sibling_uuids() method. + + + While users can use the convenient short user ids in the short + term, the truncation will inevitably lead to name collision. To + avoid that, we provide a non-truncated form of the short user ids + via the .long_user() method. These long user ids should be + converted to short user ids by intelligent user interfaces. + + Related tools: + * get uuids back out of the user ids: + parse_user() + * scan text for user ids & convert to long user ids: + short_to_long_user() + * scan text for long user ids & convert to short user ids: + long_to_short_user() + + Supported types: 'bugdir', 'bug', 'comment' + """ + def __init__(self, object, type): + self._object = object + self._type = type + assert self._type in HIERARCHY, self._type + self.uuid = self._object.uuid + + def storage(self, *args): + return _assemble(self._object.uuid, *args) + + def _ancestors(self): + ret = [self._object] + index = HIERARCHY.index(self._type) + if index == 0: + return ret + o = self._object + for i in range(index, 0, -1): + parent_name = HIERARCHY[i-1] + o = getattr(o, parent_name) + ret.insert(0, o) + return ret + + def long_user(self): + return _assemble(*[o.uuid for o in self._ancestors()]) + + def user(self): + return _assemble(*[_truncate(o.uuid, o.sibling_uuids()) + for o in self._ancestors()]) + +def parse_user(id): + """ + >>> parse_user('ABC/DEF/GHI') == \\ + ... {'bugdir':'ABC', 'bug':'DEF', 'comment':'GHI', 'type':'comment'} + True + >>> parse_user('ABC/DEF') == \\ + ... {'bugdir':'ABC', 'bug':'DEF', 'type':'bug'} + True + >>> parse_user('ABC') == \\ + ... {'bugdir':'ABC', 'type':'bugdir'} + True + """ + ret = {} + args = _split(id) + assert len(args) > 0 and len(args) < 4, 'Invalid id "%s"' % id + for type,arg in zip(HIERARCHY, args): + assert len(arg) > 0, 'Invalid part "%s" of id "%s"' % (arg, id) + ret['type'] = type + ret[type] = arg return ret +REGEXP = '#([-a-f0-9]*)(/[-a-g0-9]*)?(/[-a-g0-9]*)?#' + +class IDreplacer (object): + def __init__(self, bugdirs, direction): + self.bugdirs = bugdirs + self.direction = direction + def __call__(self, match): + ids = [m.lstrip('/') for m in match.groups() if m != None] + ids = self.switch_ids(ids) + return '#' + '/'.join(ids) + '#' + def switch_id(self, id, sibling_uuids): + if id == None: + return None + if self.direction == 'long_to_short': + return _truncate(id, sibling_uuids) + return _expand(id, sibling_uuids) + def switch_ids(self, ids): + assert ids[0] != None, ids + if self.direction == 'long_to_short': + bugdir = [bd for bd in self.bugdirs if bd.uuid == ids[0]][0] + objects = [bugdir] + if len(ids) >= 2: + bug = bugdir.bug_from_uuid(ids[1]) + objects.append(bug) + if len(ids) >= 3: + comment = bug.comment_from_uuid(ids[2]) + objects.append(comment) + for i,obj in enumerate(objects): + ids[i] = self.switch_id(ids[i], obj.sibling_uuids()) + else: + ids[0] = self.switch_id(ids[0], [bd.uuid for bd in self.bugdirs]) + if len(ids) == 1: + return ids + bugdir = [bd for bd in self.bugdirs if bd.uuid == ids[0]][0] + ids[1] = self.switch_id(ids[1], bugdir.uuids()) + if len(ids) == 2: + return ids + bug = bugdir.bug_from_uuid(ids[1]) + ids[2] = self.switch_id(ids[2], bug.uuids()) + return ids + +def short_to_long_user(bugdirs, text): + return re.sub(REGEXP, IDreplacer(bugdirs, 'short_to_long'), text) +def long_to_short_user(bugdirs, text): + return re.sub(REGEXP, IDreplacer(bugdirs, 'long_to_short'), text) + if libbe.TESTING == True: class UUIDtestCase(unittest.TestCase): def testUUID_gen(self): id = uuid_gen() - self.failUnless(len(id) == 36, "invalid UUID '%s'" % id) + self.failUnless(len(id) == 36, 'invalid UUID "%s"' % id) + + class DummyObject (object): + def __init__(self, uuid, siblings=[]): + self.uuid = uuid + self._siblings = siblings + def sibling_uuids(self): + return self._siblings + + class IDtestCase(unittest.TestCase): + def setUp(self): + self.bugdir = DummyObject('1234abcd') + self.bug = DummyObject('abcdef', ['a1234', 'ab9876']) + self.bug.bugdir = self.bugdir + self.comment = DummyObject('12345678', ['1234abcd', '1234cdef']) + self.comment.bug = self.bug + self.bd_id = ID(self.bugdir, 'bugdir') + self.b_id = ID(self.bug, 'bug') + self.c_id = ID(self.comment, 'comment') + def test_storage(self): + self.failUnless(self.bd_id.storage() == self.bugdir.uuid, + self.bd_id.storage()) + self.failUnless(self.b_id.storage() == self.bug.uuid, + self.b_id.storage()) + self.failUnless(self.c_id.storage() == self.comment.uuid, + self.c_id.storage()) + self.failUnless(self.bd_id.storage('x','y','z') == \ + '1234abcd/x/y/z', self.bd_id.storage()) + def test_long_user(self): + self.failUnless(self.bd_id.long_user() == self.bugdir.uuid, + self.bd_id.long_user()) + self.failUnless(self.b_id.long_user() == \ + '/'.join([self.bugdir.uuid, self.bug.uuid]), + self.b_id.long_user()) + self.failUnless(self.c_id.long_user() == + '/'.join([self.bugdir.uuid, self.bug.uuid, + self.comment.uuid]), + self.c_id.long_user) + def test_user(self): + self.failUnless(self.bd_id.user() == '123', + self.bd_id.user()) + self.failUnless(self.b_id.user() == '123/abc', + self.b_id.user()) + self.failUnless(self.c_id.user() == '123/abc/12345', + self.c_id.user()) + + class IDtestCase(unittest.TestCase): + def setUp(self): + self.bugdir = DummyObject('1234abcd') + self.bug = DummyObject('abcdef', ['a1234', 'ab9876']) + self.bug.bugdir = self.bugdir + self.bugdir.bug_from_uuid = lambda uuid: self.bug + self.bugdir.uuids = lambda : self.bug.sibling_uuids() + [self.bug.uuid] + self.comment = DummyObject('12345678', ['1234abcd', '1234cdef']) + self.comment.bug = self.bug + self.bug.comment_from_uuid = lambda uuid: self.comment + self.bug.uuids = lambda : self.comment.sibling_uuids() + [self.comment.uuid] + self.bd_id = ID(self.bugdir, 'bugdir') + self.b_id = ID(self.bug, 'bug') + self.c_id = ID(self.comment, 'comment') + self.short = 'bla bla #123/abc# bla bla #123/abc/12345# bla bla' + self.long = 'bla bla #1234abcd/abcdef# bla bla #1234abcd/abcdef/12345678# bla bla' + def test_short_to_long(self): + self.failUnless(short_to_long_user([self.bugdir], self.short) == self.long, + '\n' + self.short + '\n' + short_to_long_user([self.bugdir], self.short) + '\n' + self.long) + def test_long_to_short(self): + self.failUnless(long_to_short_user([self.bugdir], self.long) == self.short, + '\n' + long_to_short_user([self.bugdir], self.long) + '\n' + self.short) - suite = unittest.TestLoader().loadTestsFromTestCase(UUIDtestCase) + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) -- 2.26.2