</comment>
</bug>
"""
- for attr in other.explicit_attrs:
- old = getattr(self, attr)
- new = getattr(other, attr)
- if old != new:
- if accept_changes == True:
- setattr(self, attr, new)
- elif change_exception == True:
- raise ValueError, \
- 'Merge would change %s "%s"->"%s" for bug %s' \
- % (attr, old, new, self.uuid)
+ if hasattr(other, 'explicit_attrs'):
+ for attr in other.explicit_attrs:
+ old = getattr(self, attr)
+ new = getattr(other, attr)
+ if old != new:
+ if accept_changes:
+ setattr(self, attr, new)
+ elif change_exception:
+ raise ValueError(
+ ('Merge would change {} "{}"->"{}" for bug {}'
+ ).format(attr, old, new, self.uuid))
for estr in other.extra_strings:
if not estr in self.extra_strings:
if accept_extra_strings == True:
- self.extra_strings.append(estr)
+ self.extra_strings += [estr]
elif change_exception == True:
raise ValueError, \
'Merge would add extra string "%s" for bug %s' \
directory=False)
self.save_settings()
for bug in self:
+ bug.bugdir = self
+ bug.storage = self.storage
bug.save()
# methods for managing bugs
def append(self, bug, update=False):
super(BugDir, self).append(bug)
if update:
+ bug.bugdir = self
+ bug.storage = self.storage
self._bug_map_gen()
if (hasattr(self, '_uuids_cache') and
not bug.uuid in self._uuids_cache):
def xml(self, indent=0, show_bugs=False, show_comments=False):
"""
>>> bug.load_severities(bug.severity_def)
- >>> bug.load_status(active_status_def=bug.active_status_def, inactive_status_def=bug.inactive_status_def)
+ >>> bug.load_status(
+ ... active_status_def=bug.active_status_def,
+ ... inactive_status_def=bug.inactive_status_def)
>>> bugdirA = SimpleBugDir(memory=True)
>>> bugdirA.severities
>>> bugdirA.severities = (('minor', 'The standard bug level.'),)
</bug>
</bugdir>
>>> bug.load_severities(bug.severity_def)
- >>> bug.load_status(active_status_def=bug.active_status_def, inactive_status_def=bug.inactive_status_def)
+ >>> bug.load_status(
+ ... active_status_def=bug.active_status_def,
+ ... inactive_status_def=bug.inactive_status_def)
+ >>> bugdirA.cleanup()
"""
info = [('uuid', self.uuid),
('short-name', self.id.user()),
"""
Note: If a bugdir uuid is given, set .alt_id to it's value.
>>> bug.load_severities(bug.severity_def)
- >>> bug.load_status(active_status_def=bug.active_status_def, inactive_status_def=bug.inactive_status_def)
+ >>> bug.load_status(
+ ... active_status_def=bug.active_status_def,
+ ... inactive_status_def=bug.inactive_status_def)
>>> bugdirA = SimpleBugDir(memory=True)
>>> bugdirA.severities = (('minor', 'The standard bug level.'),)
>>> bugdirA.inactive_status = (
>>> bugdirC.xml(show_bugs=True, show_comments=True) == xml
True
>>> bug.load_severities(bug.severity_def)
- >>> bug.load_status(active_status_def=bug.active_status_def, inactive_status_def=bug.inactive_status_def)
+ >>> bug.load_status(
+ ... active_status_def=bug.active_status_def,
+ ... inactive_status_def=bug.inactive_status_def)
+ >>> bugdirA.cleanup()
"""
if type(xml_string) == types.UnicodeType:
xml_string = xml_string.strip().encode('unicode_escape')
self.alt_id = uuid
self.extra_strings = estrs
+ def merge(self, other, accept_changes=True,
+ accept_extra_strings=True, accept_bugs=True,
+ accept_comments=True, change_exception=False):
+ """Merge info from other into this bugdir.
+
+ Overrides any attributes in self that are listed in
+ other.explicit_attrs.
+
+ >>> bugdirA = SimpleBugDir()
+ >>> bugdirA.extra_strings += ['TAG: favorite']
+ >>> bugdirB = SimpleBugDir()
+ >>> bugdirB.explicit_attrs = ['target']
+ >>> bugdirB.target = '1234'
+ >>> bugdirB.extra_strings += ['TAG: very helpful']
+ >>> bugdirB.extra_strings += ['TAG: useful']
+ >>> bugA = bugdirB.bug_from_uuid('a')
+ >>> commA = bugA.comment_root.new_reply(body='comment A')
+ >>> commA.uuid = 'uuid-commA'
+ >>> commA.date = 'Thu, 01 Jan 1970 00:01:00 +0000'
+ >>> bugC = bugdirB.new_bug(summary='bug C', _uuid='c')
+ >>> bugC.alt_id = 'alt-c'
+ >>> bugC.time_string = 'Thu, 01 Jan 1970 00:02:00 +0000'
+ >>> bugdirA.merge(
+ ... bugdirB, accept_changes=False, accept_extra_strings=False,
+ ... accept_bugs=False, change_exception=False)
+ >>> print(bugdirA.target)
+ None
+ >>> bugdirA.merge(
+ ... bugdirB, accept_changes=False, accept_extra_strings=False,
+ ... accept_bugs=False, change_exception=True)
+ Traceback (most recent call last):
+ ...
+ ValueError: Merge would change target "None"->"1234" for bugdir abc123
+ >>> print(bugdirA.target)
+ None
+ >>> bugdirA.merge(
+ ... bugdirB, accept_changes=True, accept_extra_strings=False,
+ ... accept_bugs=False, change_exception=True)
+ Traceback (most recent call last):
+ ...
+ ValueError: Merge would add extra string "TAG: useful" for bugdir abc123
+ >>> print(bugdirA.target)
+ 1234
+ >>> print(bugdirA.extra_strings)
+ ['TAG: favorite']
+ >>> bugdirA.merge(
+ ... bugdirB, accept_changes=True, accept_extra_strings=True,
+ ... accept_bugs=False, change_exception=True)
+ Traceback (most recent call last):
+ ...
+ ValueError: Merge would add bug c (alt: alt-c) to bugdir abc123
+ >>> print(bugdirA.extra_strings)
+ ['TAG: favorite', 'TAG: useful', 'TAG: very helpful']
+ >>> bugdirA.merge(
+ ... bugdirB, accept_changes=True, accept_extra_strings=True,
+ ... accept_bugs=True, change_exception=True)
+ >>> print(bugdirA.xml(show_bugs=True, show_comments=True))
+ ... # doctest: +ELLIPSIS, +REPORT_UDIFF
+ <bugdir>
+ <uuid>abc123</uuid>
+ <short-name>abc</short-name>
+ <target>1234</target>
+ <extra-string>TAG: favorite</extra-string>
+ <extra-string>TAG: useful</extra-string>
+ <extra-string>TAG: very helpful</extra-string>
+ <bug>
+ <uuid>a</uuid>
+ <short-name>abc/a</short-name>
+ <severity>minor</severity>
+ <status>open</status>
+ <creator>John Doe <jdoe@example.com></creator>
+ <created>Thu, 01 Jan 1970 00:00:00 +0000</created>
+ <summary>Bug A</summary>
+ <comment>
+ <uuid>uuid-commA</uuid>
+ <short-name>abc/a/uui</short-name>
+ <author></author>
+ <date>Thu, 01 Jan 1970 00:01:00 +0000</date>
+ <content-type>text/plain</content-type>
+ <body>comment A</body>
+ </comment>
+ </bug>
+ <bug>
+ <uuid>b</uuid>
+ <short-name>abc/b</short-name>
+ <severity>minor</severity>
+ <status>closed</status>
+ <creator>Jane Doe <jdoe@example.com></creator>
+ <created>Thu, 01 Jan 1970 00:00:00 +0000</created>
+ <summary>Bug B</summary>
+ </bug>
+ <bug>
+ <uuid>c</uuid>
+ <short-name>abc/c</short-name>
+ <severity>minor</severity>
+ <status>open</status>
+ <created>Thu, 01 Jan 1970 00:02:00 +0000</created>
+ <summary>bug C</summary>
+ </bug>
+ </bugdir>
+ >>> bugdirA.cleanup()
+ >>> bugdirB.cleanup()
+ """
+ if hasattr(other, 'explicit_attrs'):
+ for attr in other.explicit_attrs:
+ old = getattr(self, attr)
+ new = getattr(other, attr)
+ if old != new:
+ if accept_changes:
+ setattr(self, attr, new)
+ elif change_exception:
+ raise ValueError(
+ ('Merge would change {} "{}"->"{}" for bugdir {}'
+ ).format(attr, old, new, self.uuid))
+ for estr in other.extra_strings:
+ if not estr in self.extra_strings:
+ if accept_extra_strings:
+ self.extra_strings += [estr]
+ elif change_exception:
+ raise ValueError(
+ ('Merge would add extra string "{}" for bugdir {}'
+ ).format(estr, self.uuid))
+ for o_bug in other:
+ try:
+ s_bug = self.bug_from_uuid(o_bug.uuid)
+ except KeyError as e:
+ try:
+ s_bug = self.bug_from_uuid(o_bug.alt_id)
+ except KeyError as e:
+ s_bug = None
+ if s_bug is None:
+ if accept_bugs:
+ o_bug_copy = copy.copy(o_bug)
+ o_bug_copy.bugdir = self
+ o_bug_copy.id = libbe.util.id.ID(o_bug_copy, 'bug')
+ self.append(o_bug_copy)
+ elif change_exception:
+ raise ValueError(
+ ('Merge would add bug {} (alt: {}) to bugdir {}'
+ ).format(o_bug.uuid, o_bug.alt_id, self.uuid))
+ else:
+ s_bug.merge(o_bug, accept_changes=accept_changes,
+ accept_extra_strings=accept_extra_strings,
+ change_exception=change_exception)
+
# methods for id generation
def sibling_uuids(self):
def _run(self, **params):
assigned = parse_assigned(self, params['assigned'])
- bugdir = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
for bug_id in params['bug-id']:
- bug,dummy_comment = \
- libbe.command.util.bug_comment_from_user_id(bugdir, bug_id)
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, bug_id))
if bug.assigned != assigned:
bug.assigned = assigned
if bug.status == 'open':
def setup_command(self, command):
command._get_unconnected_storage = self.get_unconnected_storage
command._get_storage = self.get_storage
- command._get_bugdir = self.get_bugdir
+ command._get_bugdirs = self.get_bugdirs
def get_unconnected_storage(self):
"""
def set_storage(self, storage):
self._storage = storage
- def get_bugdir(self):
+ def get_bugdirs(self):
"""Callback for use by commands that need it."""
- if not hasattr(self, '_bugdir'):
- self._bugdir = libbe.bugdir.BugDir(self.get_storage(),
- from_storage=True)
- return self._bugdir
-
- def set_bugdir(self, bugdir):
- self._bugdir = bugdir
+ if not hasattr(self, '_bugdirs'):
+ storage = self.get_storage()
+ self._bugdirs = dict(
+ (uuid, libbe.bugdir.BugDir(
+ storage=storage,
+ uuid=uuid,
+ from_storage=True))
+ for uuid in storage.children())
+ return self._bugdirs
+
+ def set_bugdirs(self, bugdirs):
+ self._bugdirs = bugdirs
def cleanup(self):
if hasattr(self, '_storage'):
])
def _run(self, **params):
- bugdir = self._get_bugdir()
- bug,parent = \
- libbe.command.util.bug_comment_from_user_id(bugdir, params['id'])
+ bugdirs = self._get_bugdirs()
+ bugdir,bug,parent = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, params['id']))
if params['comment'] == None:
# try to launch an editor for comment-body entry
try:
# Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
import copy
+import itertools
import os
import libbe
import libbe.bug
+import libbe.bugdir
import libbe.command
import libbe.command.util
import libbe.util.tree
self.target = target
self.extra_strings_regexps = extra_strings_regexps
- def __call__(self, bugdir, bug):
+ def __call__(self, bugdirs, bug):
if self.status != 'all' and not bug.status in self.status:
return False
if self.severity != 'all' and not bug.severity in self.severity:
if self.target == 'all':
pass
else:
- target_bug = libbe.command.target.bug_target(bugdir, bug)
+ target_bug = libbe.command.target.bug_target(bugdirs, bug)
if self.target in ['none', None]:
if target_bug.summary != None:
return False
"""Add/remove bug dependencies
>>> import sys
- >>> import libbe.bugdir
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> io = libbe.command.StringInputOutput()
>>> io.stdout = sys.stdout
and params['blocking-bug-id'] != None:
raise libbe.command.UserError(
'Only one bug id used in tree mode.')
- bugdir = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
if params['repair'] == True:
- good,fixed,broken = check_dependencies(bugdir, repair_broken_links=True)
+ good,fixed,broken = check_dependencies(
+ bugdirs, repair_broken_links=True)
assert len(broken) == 0, broken
if len(fixed) > 0:
print >> self.stdout, 'Fixed the following links:'
severity = parse_severity(params['severity'])
filter = Filter(status, severity)
- bugA, dummy_comment = libbe.command.util.bug_comment_from_user_id(
- bugdir, params['bug-id'])
+ bugdir,bugA,dummy_comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, params['bug-id']))
if params['tree-depth'] != None:
- dtree = DependencyTree(bugdir, bugA, params['tree-depth'], filter)
+ dtree = DependencyTree(bugdirs, bugA, params['tree-depth'], filter)
if len(dtree.blocked_by_tree()) > 0:
print >> self.stdout, '%s blocked by:' % bugA.id.user()
for depth,node in dtree.blocked_by_tree().thread():
return 0
if params['blocking-bug-id'] != None:
- bugB,dummy_comment = libbe.command.util.bug_comment_from_user_id(
- bugdir, params['blocking-bug-id'])
+ bugdirB,bugB,dummy_comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, params['blocking-bug-id']))
if params['remove'] == True:
remove_block(bugA, bugB)
else: # add the dependency
add_block(bugA, bugB)
- blocked_by = get_blocked_by(bugdir, bugA)
+ blocked_by = get_blocked_by(bugdirs, bugA)
if len(blocked_by) > 0:
print >> self.stdout, '%s blocked by:' % bugA.id.user()
print >> self.stdout, \
'\n'.join([self.bug_string(_bug, params)
for _bug in blocked_by])
- blocks = get_blocks(bugdir, bugA)
+ blocks = get_blocks(bugdirs, bugA)
if len(blocks) > 0:
print >> self.stdout, '%s blocks:' % bugA.id.user()
print >> self.stdout, \
blocks_string = _generate_blocks_string(blocked_bug)
_add_remove_extra_string(blocking_bug, blocks_string, add=False)
-def get_blocks(bugdir, bug):
+def get_blocks(bugdirs, bug):
"""
Return a list of bugs that the given bug blocks.
"""
blocks = []
for uuid in _get_blocks(bug):
- blocks.append(bugdir.bug_from_uuid(uuid))
+ blocks.append(libbe.command.util.bug_from_uuid(bugdirs, uuid))
return blocks
-def get_blocked_by(bugdir, bug):
+def get_blocked_by(bugdirs, bug):
"""
Return a list of bugs blocking the given bug.
"""
blocked_by = []
for uuid in _get_blocked_by(bug):
- blocked_by.append(bugdir.bug_from_uuid(uuid))
+ blocked_by.append(libbe.command.util.bug_from_uuid(bugdirs, uuid))
return blocked_by
-def check_dependencies(bugdir, repair_broken_links=False):
+def check_dependencies(bugdirs, repair_broken_links=False):
"""
Check that links are bi-directional for all bugs in bugdir.
>>> import libbe.bugdir
- >>> bd = libbe.bugdir.SimpleBugDir()
- >>> a = bd.bug_from_uuid("a")
- >>> b = bd.bug_from_uuid("b")
+ >>> bugdir = libbe.bugdir.SimpleBugDir()
+ >>> bugdirs = {bugdir.uuid: bugdir}
+ >>> a = bugdir.bug_from_uuid('a')
+ >>> b = bugdir.bug_from_uuid('b')
>>> blocked_by_string = _generate_blocked_by_string(b)
>>> _add_remove_extra_string(a, blocked_by_string, add=True)
- >>> good,repaired,broken = check_dependencies(bd, repair_broken_links=False)
+ >>> good,repaired,broken = check_dependencies(
+ ... bugdirs, repair_broken_links=False)
>>> good
[]
>>> repaired
[(Bug(uuid='a'), Bug(uuid='b'))]
>>> _get_blocks(b)
[]
- >>> good,repaired,broken = check_dependencies(bd, repair_broken_links=True)
+ >>> good,repaired,broken = check_dependencies(
+ ... bugdirs, repair_broken_links=True)
>>> _get_blocks(b)
['a']
>>> good
[(Bug(uuid='a'), Bug(uuid='b'))]
>>> broken
[]
+ >>> bugdir.cleanup()
"""
- if bugdir.storage != None:
- bugdir.load_all_bugs()
+ for bugdir in bugdirs.values():
+ if bugdir.storage is not None:
+ bugdir.load_all_bugs()
good_links = []
fixed_links = []
broken_links = []
- for bug in bugdir:
- for blocker in get_blocked_by(bugdir, bug):
- blocks = get_blocks(bugdir, blocker)
- if (bug, blocks) in good_links+fixed_links+broken_links:
- continue # already checked that link
- if bug not in blocks:
- if repair_broken_links == True:
- _repair_one_way_link(bug, blocker, blocks=True)
- fixed_links.append((bug, blocker))
+ for bugdir in bugdirs.values():
+ for bug in bugdir:
+ for blocker in get_blocked_by(bugdirs, bug):
+ blocks = get_blocks(bugdirs, blocker)
+ if (bug, blocks) in good_links+fixed_links+broken_links:
+ continue # already checked that link
+ if bug not in blocks:
+ if repair_broken_links == True:
+ _repair_one_way_link(bug, blocker, blocks=True)
+ fixed_links.append((bug, blocker))
+ else:
+ broken_links.append((bug, blocker))
else:
- broken_links.append((bug, blocker))
- else:
- good_links.append((bug, blocker))
- for blockee in get_blocks(bugdir, bug):
- blocked_by = get_blocked_by(bugdir, blockee)
- if (blockee, bug) in good_links+fixed_links+broken_links:
- continue # already checked that link
- if bug not in blocked_by:
- if repair_broken_links == True:
- _repair_one_way_link(blockee, bug, blocks=False)
- fixed_links.append((blockee, bug))
+ good_links.append((bug, blocker))
+ for blockee in get_blocks(bugdirs, bug):
+ blocked_by = get_blocked_by(bugdirs, blockee)
+ if (blockee, bug) in good_links+fixed_links+broken_links:
+ continue # already checked that link
+ if bug not in blocked_by:
+ if repair_broken_links == True:
+ _repair_one_way_link(blockee, bug, blocks=False)
+ fixed_links.append((blockee, bug))
+ else:
+ broken_links.append((blockee, bug))
else:
- broken_links.append((blockee, bug))
- else:
- good_links.append((blockee, bug))
+ good_links.append((blockee, bug))
return (good_links, fixed_links, broken_links)
class DependencyTree (object):
"""
Note: should probably be DependencyDiGraph.
"""
- def __init__(self, bugdir, root_bug, depth_limit=0, filter=None):
- self.bugdir = bugdir
+ def __init__(self, bugdirs, root_bug, depth_limit=0, filter=None):
+ self.bugdirs = bugdirs
self.root_bug = root_bug
self.depth_limit = depth_limit
self.filter = filter
node = stack.pop()
if self.depth_limit > 0 and node.depth == self.depth_limit:
continue
- for bug in child_fn(self.bugdir, node.bug):
- if not self.filter(self.bugdir, bug):
+ for bug in child_fn(self.bugdirs, node.bug):
+ if not self.filter(self.bugdirs, bug):
continue
child = libbe.util.tree.Tree()
child.bug = bug
params['subscribe'])
except ValueError, e:
raise libbe.command.UserError(e.msg)
- bugdir = self._get_bugdir()
- if bugdir.storage.versioned == False:
- raise libbe.command.UserError(
- 'This repository is not revision-controlled.')
+ bugdirs = self._get_bugdirs()
+ for uuid,bugdir in sorted(bugdirs.items()):
+ self.diff(bugdir, subscriptions, params=params)
+
+
+ def diff(self, bugdir, subscriptions, params):
if params['repo'] == None:
+ if bugdir.storage.versioned == False:
+ raise libbe.command.UserError(
+ 'This repository is not revision-controlled.')
if params['revision'] == None: # get the most recent revision
params['revision'] = bugdir.storage.revision_id(-1)
old_bd = libbe.bugdir.RevisionedBugDir(bugdir, params['revision'])
else:
if old_bd_current.storage.versioned == False:
raise libbe.command.UserError(
- '%s is not revision-controlled.'
- % storage.repo)
+ '{} is not revision-controlled.'.format(
+ bugdir.storage.repo))
old_bd = libbe.bugdir.RevisionedBugDir(old_bd_current,revision)
d = libbe.diff.Diff(old_bd, bugdir)
tree = d.report_tree(subscriptions)
])
def _run(self, **params):
- bugdir = self._get_bugdir()
- bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
- bugdir, params['bug-id'])
+ bugdirs = self._get_bugdirs()
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, params['bug-id']))
if params['due'] == None:
due_time = get_due(bug)
if due_time is None:
import codecs
import htmlentitydefs
+import itertools
import os
import os.path
import re
>>> import sys
>>> import libbe.bugdir
- >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> bugdir = libbe.bugdir.SimpleBugDir(memory=False)
>>> io = libbe.command.StringInputOutput()
>>> io.stdout = sys.stdout
>>> ui = libbe.command.UserInterface(io=io)
- >>> ui.storage_callbacks.set_storage(bd.storage)
+ >>> ui.storage_callbacks.set_storage(bugdir.storage)
>>> cmd = HTML(ui=ui)
- >>> ret = ui.run(cmd, {'output':os.path.join(bd.storage.repo, 'html_export')})
- >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export'))
+ >>> ret = ui.run(cmd, {
+ ... 'output':os.path.join(bugdir.storage.repo, 'html_export')})
+ >>> os.path.exists(os.path.join(bugdir.storage.repo, 'html_export'))
True
- >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'index.html'))
+ >>> os.path.exists(os.path.join(
+ ... bugdir.storage.repo, 'html_export', 'index.html'))
True
- >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'index_inactive.html'))
+ >>> os.path.exists(os.path.join(
+ ... bugdir.storage.repo, 'html_export', 'index_inactive.html'))
True
- >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'bugs'))
+ >>> os.path.exists(os.path.join(
+ ... bugdir.storage.repo, 'html_export', 'bugs'))
True
- >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'bugs', 'a', 'index.html'))
+ >>> os.path.exists(os.path.join(
+ ... bugdir.storage.repo, 'html_export', 'bugs', 'a', 'index.html'))
True
- >>> os.path.exists(os.path.join(bd.storage.repo, 'html_export', 'bugs', 'b', 'index.html'))
+ >>> os.path.exists(os.path.join(
+ ... bugdir.storage.repo, 'html_export', 'bugs', 'b', 'index.html'))
True
>>> ui.cleanup()
- >>> bd.cleanup()
+ >>> bugdir.cleanup()
"""
name = 'html'
def _run(self, **params):
if params['export-template'] == True:
- bugdir = None
+ bugdirs = None
else:
- bugdir = self._get_bugdir()
- bugdir.load_all_bugs()
- html_gen = HTMLGen(bugdir,
+ bugdirs = self._get_bugdirs()
+ for bugdir in bugdirs.values():
+ bugdir.load_all_bugs()
+ html_gen = HTMLGen(bugdirs,
template_dir=params['template-dir'],
title=params['title'],
header=params['index-header'],
Html = HTML # alias for libbe.command.base.get_command_class()
class HTMLGen (object):
- def __init__(self, bd, template_dir=None,
+ def __init__(self, bugdirs, template_dir=None,
title="Site Title", header="Header",
min_id_length=-1,
verbose=False, encoding=None, stdout=None,
):
self.generation_time = time.ctime()
- self.bd = bd
+ self.bugdirs = bugdirs
self.title = title
self.header = header
self.verbose = verbose
bugs_active = []
bugs_inactive = []
bugs_target = []
- bugs = [b for b in self.bd]
+ bugs = list(itertools.chain(*list(
+ [bug for bug in bugdir]
+ for bugdir in self.bugdirs.values())))
bugs.sort()
for b in bugs:
template = self.template.get_template('target_index.html')
template_info['targets'] = [
(target, sorted(libbe.command.depend.get_blocked_by(
- self.bd, target)))
+ target.bugdir, target)))
for target in bugs]
else:
template = self.template.get_template('standard_index.html')
def _long_to_linked_user(self, text):
"""
>>> import libbe.bugdir
- >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
- >>> h = HTMLGen(bd)
+ >>> bugdir = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> h = HTMLGen({bugdir.uuid: bugdir})
>>> h._long_to_linked_user('A link #abc123/a#, and a non-link #x#y#.')
'A link <a href="./a/">abc/a</a>, and a non-link #x#y#.'
- >>> bd.cleanup()
+ >>> bugdir.cleanup()
"""
replacer = libbe.util.id.IDreplacer(
- [self.bd], self._long_to_linked_user_replacer, wrap=False)
+ self.bugdirs, self._long_to_linked_user_replacer, wrap=False)
return re.sub(
libbe.util.id.REGEXP, replacer, text)
"""
>>> import libbe.bugdir
>>> import libbe.util.id
- >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
- >>> a = bd.bug_from_uuid('a')
+ >>> bugdir = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> bugdirs = {bugdir.uuid: bugdir}
+ >>> a = bugdir.bug_from_uuid('a')
>>> uuid_gen = libbe.util.id.uuid_gen
>>> libbe.util.id.uuid_gen = lambda : '0123'
>>> c = a.new_comment('comment for link testing')
>>> libbe.util.id.uuid_gen = uuid_gen
>>> c.uuid
'0123'
- >>> h = HTMLGen(bd)
- >>> h._long_to_linked_user_replacer([bd], 'abc123')
+ >>> h = HTMLGen(bugdirs)
+ >>> h._long_to_linked_user_replacer(bugdirs, 'abc123')
'#abc123#'
- >>> h._long_to_linked_user_replacer([bd], 'abc123/a')
+ >>> h._long_to_linked_user_replacer(bugdirs, 'abc123/a')
'<a href="./a/">abc/a</a>'
- >>> h._long_to_linked_user_replacer([bd], 'abc123/a/0123')
+ >>> h._long_to_linked_user_replacer(bugdirs, 'abc123/a/0123')
'<a href="./a/#0123">abc/a/012</a>'
- >>> h._long_to_linked_user_replacer([bd], 'x')
+ >>> h._long_to_linked_user_replacer(bugdirs, 'x')
'#x#'
- >>> h._long_to_linked_user_replacer([bd], '')
+ >>> h._long_to_linked_user_replacer(bugdirs, '')
'##'
- >>> bd.cleanup()
+ >>> bugdir.cleanup()
"""
try:
- p = libbe.util.id.parse_user(bugdirs[0], long_id)
+ p = libbe.util.id.parse_user(bugdirs, long_id)
except (libbe.util.id.MultipleIDMatches,
libbe.util.id.NoIDMatches,
libbe.util.id.InvalidIDStructure), e:
if p['type'] == 'bugdir':
return '#%s#' % long_id
elif p['type'] == 'bug':
- bug,comment = libbe.command.util.bug_comment_from_user_id(
- bugdirs[0], long_id)
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, long_id))
return '<a href="./%s/">%s</a>' \
% (self._truncated_bug_id(bug), bug.id.user())
elif p['type'] == 'comment':
- bug,comment = libbe.command.util.bug_comment_from_user_id(
- bugdirs[0], long_id)
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, long_id))
return '<a href="./%s/#%s">%s</a>' \
% (self._truncated_bug_id(bug),
self._truncated_comment_id(comment),
import libbe
import libbe.bug
+import libbe.bugdir
import libbe.command
import libbe.command.util
import libbe.comment
import libbe.util.encoding
+import libbe.util.id
import libbe.util.utility
if libbe.TESTING == True:
import libbe.bugdir
+
class Import_XML (libbe.command.Command):
"""Import comments and bugs from XML
>>> cmd = Import_XML(ui=ui)
>>> ui.io.set_stdin('<be-xml><comment><uuid>c</uuid><body>This is a comment about a</body></comment></be-xml>')
- >>> ret = ui.run(cmd, {'comment-root':'/a'}, ['-'])
+ >>> ret = ui.run(cmd, {'root':'/a'}, ['-'])
>>> bd.flush_reload()
>>> bug = bd.bug_from_uuid('a')
>>> bug.load_comments(load_full=False)
help='If any bug or comment listed in the XML file already exists in the bug repository, do not alter the repository version.'),
libbe.command.Option(name='preserve-uuids', short_name='p',
help='Preserve UUIDs for trusted input (potential name collisions).'),
- libbe.command.Option(name='comment-root', short_name='c',
- help='Supply a bug or comment ID as the root of any <comment> elements that are direct children of the <be-xml> element. If any such <comment> elements exist, you are required to set this option.',
+ libbe.command.Option(name='root', short_name='r',
+ help='Supply a bugdir, bug, or comment ID as the root of '
+ 'any non-bugdir elements that are direct children of the '
+ '<be-xml> element. If any such elements exist, you are '
+ 'required to set this option.',
arg=libbe.command.Argument(
- name='comment-root', metavar='ID',
+ name='root', metavar='ID',
completion_callback=libbe.command.util.complete_bug_comment_id)),
])
self.args.extend([
])
def _run(self, **params):
- bugdir = self._get_bugdir()
- writeable = bugdir.storage.writeable
- bugdir.storage.writeable = False
- if params['comment-root'] != None:
- croot_bug,croot_comment = \
- libbe.command.util.bug_comment_from_user_id(
- bugdir, params['comment-root'])
- croot_bug.load_comments(load_full=True)
- if croot_comment.uuid == libbe.comment.INVALID_UUID:
- croot_comment = croot_bug.comment_root
- else:
- croot_comment = croot_bug.comment_from_uuid(croot_comment.uuid)
- new_croot_bug = libbe.bug.Bug(bugdir=bugdir, uuid=croot_bug.uuid)
- new_croot_bug.explicit_attrs = []
- new_croot_bug.comment_root = copy.deepcopy(croot_bug.comment_root)
- if croot_comment.uuid == libbe.comment.INVALID_UUID:
- new_croot_comment = new_croot_bug.comment_root
- else:
- new_croot_comment = \
- new_croot_bug.comment_from_uuid(croot_comment.uuid)
- for new in new_croot_bug.comments():
- new.explicit_attrs = []
+ storage = self._get_storage()
+ bugdirs = self._get_bugdirs()
+ writeable = storage.writeable
+ storage.writeable = False
+ if params['root'] != None:
+ root_bugdir,root_bug,root_comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, params['root']))
else:
- croot_bug,croot_comment = (None, None)
+ root_bugdir,root_bug,root_comment = (None, None, None)
+ xml = self._read_xml(storage, params)
+ version,root_bugdirs,root_bugs,root_comments = self._parse_xml(
+ xml, params)
+
+ if params['add-only']:
+ accept_changes = False
+ accept_extra_strings = False
+ else:
+ accept_changes = True
+ accept_extra_strings = True
+
+ dirty_items = list(self._merge_comments(
+ bugdirs, root_bug, root_comment, root_comments,
+ params, accept_changes, accept_extra_strings))
+ dirty_items.extend(self._merge_bugs(
+ bugdirs, root_bugdir, root_bugs,
+ params, accept_changes, accept_extra_strings))
+ dirty_items.extend(self._merge_bugdirs(
+ bugdirs, root_bugdirs,
+ params, accept_changes, accept_extra_strings))
+
+ # protect against programmer error causing data loss:
+ if root_bug is not None:
+ # check for each of the new comments
+ comms = []
+ for c in root_bug.comments():
+ comms.append(c.uuid)
+ if c.alt_id != None:
+ comms.append(c.alt_id)
+ if root_comment.uuid == libbe.comment.INVALID_UUID:
+ root_text = root_bug.id.user()
+ else:
+ root_text = root_comment.id.user()
+ for new in root_comments:
+ assert new.uuid in comms or new.alt_id in comms, \
+ "comment %s (alt: %s) wasn't added to %s" \
+ % (new.uuid, new.alt_id, root_text)
+ for new in root_bugs:
+ # check for each of the new bugs
+ try:
+ libbe.command.util.bug_from_uuid(bugdirs, new.uuid)
+ except libbe.bugdir.NoBugMatches:
+ try:
+ libbe.command.util.bug_from_uuid(bugdirs, new.alt_id)
+ except libbe.bugdir.NoBugMatches:
+ raise AssertionError(
+ "bug {} (alt: {}) wasn't added to {}".format(
+ new.uuid, new.alt_id, root_bugdir.id.user()))
+ for new in root_bugdirs:
+ assert new.uuid in bugdirs or new.alt_id in bugdirs, (
+ "bugdir {} wasn't added to {}".format(
+ new.uuid, sorted(bugdirs.keys())))
+
+ # save new information
+ storage.writeable = writeable
+ for item in dirty_items:
+ item.save()
+
+ def _read_xml(self, storage, params):
if params['xml-file'] == '-':
- xml = self.stdin.read().encode(self.stdin.encoding)
+ return self.stdin.read().encode(self.stdin.encoding)
else:
- self._check_restricted_access(bugdir.storage, params['xml-file'])
- xml = libbe.util.encoding.get_file_contents(
- params['xml-file'])
+ self._check_restricted_access(storage, params['xml-file'])
+ return libbe.util.encoding.get_file_contents(params['xml-file'])
- # parse the xml
+ def _parse_xml(self, xml, params):
+ version = {}
+ root_bugdirs = []
root_bugs = []
root_comments = []
- version = {}
be_xml = ElementTree.XML(xml)
if be_xml.tag != 'be-xml':
raise libbe.util.utility.InvalidXML(
'import-xml', be_xml, 'root element must be <be-xml>')
for child in be_xml.getchildren():
- if child.tag == 'bug':
- new = libbe.bug.Bug(bugdir=bugdir)
+ if child.tag == 'bugdir':
+ new = libbe.bugdir.BugDir(storage=None)
+ new.from_xml(child, preserve_uuids=params['preserve-uuids'])
+ root_bugdirs.append(new)
+ elif child.tag == 'bug':
+ new = libbe.bug.Bug()
new.from_xml(child, preserve_uuids=params['preserve-uuids'])
root_bugs.append(new)
elif child.tag == 'comment':
- new = libbe.comment.Comment(croot_bug)
+ new = libbe.comment.Comment()
new.from_xml(child, preserve_uuids=params['preserve-uuids'])
root_comments.append(new)
elif child.tag == 'version':
text = text.decode('unicode_escape').strip()
version[child.tag] = text
else:
- print >> sys.stderr, 'ignoring unknown tag %s in %s' \
- % (gchild.tag, child.tag)
+ sys.stderr.write(
+ 'ignoring unknown tag {} in {}\n'.format(
+ gchild.tag, child.tag))
else:
- print >> sys.stderr, 'ignoring unknown tag %s in %s' \
- % (child.tag, comment_list.tag)
+ sys.stderr.write('ignoring unknown tag {} in {}\n'.format(
+ child.tag, be_xml.tag))
+ return (version, root_bugdirs, root_bugs, root_comments)
- # merge the new root_comments
- if params['add-only'] == True:
- accept_changes = False
- accept_extra_strings = False
+ def _merge_comments(self, bugdirs, bug, root_comment, comments,
+ params, accept_changes, accept_extra_strings,
+ accept_comments=True):
+ if len(comments) == 0:
+ return
+ if bug is None:
+ raise libbe.command.UserError(
+ 'No root bug for merging comments:\n{}'.format(
+ '\n\n'.join([c.string() for c in comments])))
+ bug.load_comments(load_full=True)
+ if root_comment.uuid == libbe.comment.INVALID_UUID:
+ root_comment = bug.comment_root
else:
- accept_changes = True
- accept_extra_strings = True
- accept_comments = True
- if len(root_comments) > 0:
- if croot_bug == None:
- raise libbe.command.UserError(
- '--comment-root option is required for your root comments:\n%s'
- % '\n\n'.join([c.string() for c in root_comments]))
- try:
- # link new comments
- new_croot_bug.add_comments(root_comments,
- default_parent=new_croot_comment,
- ignore_missing_references= \
- params['ignore-missing-references'])
- except libbe.comment.MissingReference, e:
- raise libbe.command.UserError(e)
- croot_bug.merge(new_croot_bug, accept_changes=accept_changes,
- accept_extra_strings=accept_extra_strings,
- accept_comments=accept_comments)
-
- # merge the new croot_bugs
- merged_bugs = []
- old_bugs = []
- for new in root_bugs:
+ root_comment = bug.comment_from_uuid(root_comment.uuid)
+ new_bug = libbe.bug.Bug(bugdir=bug.bugdir, uuid=bug.uuid)
+ new_bug.explicit_attrs = []
+ new_bug.comment_root = copy.deepcopy(bug.comment_root)
+ if root_comment.uuid == libbe.comment.INVALID_UUID:
+ new_root_comment = new_bug.comment_root
+ else:
+ new_root_comment = new_bug.comment_from_uuid(
+ root_comment.uuid)
+ for new in new_bug.comments():
+ new.explicit_attrs = []
+ try:
+ new_bug.add_comments(
+ comments,
+ default_parent=root_comment,
+ ignore_missing_references=params['ignore-missing-references'])
+ except libbe.comment.MissingReference as e:
+ raise libbe.command.UserError(e)
+ bug.merge(new_bug, accept_changes=accept_changes,
+ accept_extra_strings=accept_extra_strings,
+ accept_comments=accept_comments)
+ yield bug
+
+ def _merge_bugs(self, bugdirs, bugdir, bugs,
+ params, accept_changes, accept_extra_strings,
+ accept_comments=True):
+ for new in bugs:
try:
old = bugdir.bug_from_uuid(new.alt_id)
except KeyError:
- old = None
- if old == None:
- bugdir.append(new)
+ bugdir.append(new, update=True)
+ yield new
else:
old.load_comments(load_full=True)
old.merge(new, accept_changes=accept_changes,
accept_extra_strings=accept_extra_strings,
accept_comments=accept_comments)
- merged_bugs.append(new)
- old_bugs.append(old)
+ yield old
- # protect against programmer error causing data loss:
- if croot_bug != None:
- comms = []
- for c in croot_comment.traverse():
- comms.append(c.uuid)
- if c.alt_id != None:
- comms.append(c.alt_id)
- if croot_comment.uuid == libbe.comment.INVALID_UUID:
- root_text = croot_bug.id.user()
+ def _merge_bugdirs(self, bugdirs, new_bugdirs,
+ params, accept_changes, accept_extra_strings,
+ accept_comments=True):
+ for new in new_bugdirs:
+ if new.alt_id in bugdirs:
+ old = bugdirs[new.alt_id]
+ old.load_all_bugs()
+ old.merge(new, accept_changes=accept_changes,
+ accept_extra_strings=accept_extra_strings,
+ accept_bugs=True,
+ accept_comments=accept_comments)
+ yield old
else:
- root_text = croot_comment.id.user()
- for new in root_comments:
- assert new.uuid in comms or new.alt_id in comms, \
- "comment %s (alt: %s) wasn't added to %s" \
- % (new.uuid, new.alt_id, root_text)
- for new in root_bugs:
- if not new in merged_bugs:
- assert bugdir.has_bug(new.uuid), \
- "bug %s wasn't added" % (new.uuid)
-
- # save new information
- bugdir.storage.writeable = writeable
- if croot_bug != None:
- croot_bug.save()
- for new in root_bugs:
- if not new in merged_bugs:
- new.save()
- for old in old_bugs:
- old.save()
+ bugdirs[new.uuid] = new
+ new.storage = self._get_storage()
+ yield new
def _long_help(self):
return """
to share this information, as that will preserve a more detailed
history.
-The XML file should be formatted similarly to
+The XML file should be formatted similarly to:
+
<be-xml>
<version>
<tag>1.0.0</tag>
<revno>446</revno>
<revision-id>a@b.com-20091119214553-iqyw2cpqluww3zna</revision-id>
<version>
- <bug>
- ...
- <comment>...</comment>
- <comment>...</comment>
- </bug>
+ <bugdir>
+ <bug>
+ ...
+ <comment>...</comment>
+ <comment>...</comment>
+ </bug>
+ <bug>...</bug>
+ </bugdir>
+ <bug>...</bug>
<bug>...</bug>
<comment>...</comment>
<comment>...</comment>
</be-xml>
-where the ellipses mark output commpatible with Bug.xml() and
-Comment.xml(). Take a look at the output of `be show --xml` for some
-explicit examples. Unrecognized tags are ignored. Missing tags are
-left at the default value. The version tag is not required, but is
-strongly recommended.
-
-The bug and comment UUIDs are always auto-generated, so if you set a
-<uuid> field, but no <alt-id> field, your <uuid> will be used as the
-comment's <alt-id>. An exception is raised if <alt-id> conflicts with
-an existing comment. Bugs do not have a permantent alt-id, so they
-the <uuid>s you specify are not saved. The <uuid>s _are_ used to
-match agains prexisting bug and comment uuids, and comment alt-ids,
-and fields explicitly given in the XML file will replace old versions
-unless the --add-only flag.
+
+where the ellipses mark output commpatible with BugDir.xml(),
+Bug.xml(), and Comment.xml(). Take a look at the output of `be show
+--xml` for some explicit examples. Unrecognized tags are ignored.
+Missing tags are left at the default value. The version tag is not
+required, but is strongly recommended.
+
+The bugdir, bug, and comment UUIDs are always auto-generated, so if
+you set a <uuid> field, but no <alt-id> field, your <uuid> will be
+used as the object's <alt-id>. An exception is raised if <alt-id>
+conflicts with an existing object. Bugdirs and bugs do not have a
+permantent alt-id, so they the <uuid>s you specify are not saved. The
+<uuid>s _are_ used to match agains prexisting bug and comment uuids,
+and comment alt-ids, and fields explicitly given in the XML file will
+replace old versions unless the --add-only flag.
*.extra_strings recieves special treatment, and if --add-only is not
set, the resulting list concatenates both source lists and removes
Here's an example of import activity:
Repository
- bug (uuid=B, creator=John, status=open)
- estr (don't forget your towel)
- estr (helps with space travel)
- com (uuid=C1, author=Jane, body=Hello)
- com (uuid=C2, author=Jess, body=World)
+ bugdir (uuid=abc123)
+ bug (uuid=B, creator=John, status=open)
+ estr (don't forget your towel)
+ estr (helps with space travel)
+ com (uuid=C1, author=Jane, body=Hello)
+ com (uuid=C2, author=Jess, body=World)
XML
- bug (uuid=B, status=fixed)
- estr (don't forget your towel)
- estr (watch out for flying dolphins)
- com (uuid=C1, body=So long)
- com (uuid=C3, author=Jed, body=And thanks)
+ bugdir (uuid=abc123)
+ bug (uuid=B, status=fixed)
+ estr (don't forget your towel)
+ estr (watch out for flying dolphins)
+ com (uuid=C1, body=So long)
+ com (uuid=C3, author=Jed, body=And thanks)
Result
- bug (uuid=B, creator=John, status=fixed)
- estr (don't forget your towel)
- estr (helps with space travel)
- estr (watch out for flying dolphins)
- com (uuid=C1, author=Jane, body=So long)
- com (uuid=C2, author=Jess, body=World)
- com (uuid=C4, alt-id=C3, author=Jed, body=And thanks)
+ bugdir (uuid=abc123)
+ bug (uuid=B, creator=John, status=fixed)
+ estr (don't forget your towel)
+ estr (helps with space travel)
+ estr (watch out for flying dolphins)
+ com (uuid=C1, author=Jane, body=So long)
+ com (uuid=C2, author=Jess, body=World)
+ com (uuid=C4, alt-id=C3, author=Jed, body=And thanks)
Result, with --add-only
- bug (uuid=B, creator=John, status=open)
- estr (don't forget your towel)
- estr (helps with space travel)
- com (uuid=C1, author=Jane, body=Hello)
- com (uuid=C2, author=Jess, body=World)
- com (uuid=C4, alt-id=C3, author=Jed, body=And thanks)
+ bugdir (uuid=abc123)
+ bug (uuid=B, creator=John, status=open)
+ estr (don't forget your towel)
+ estr (helps with space travel)
+ com (uuid=C1, author=Jane, body=Hello)
+ com (uuid=C2, author=Jess, body=World)
+ com (uuid=C4, alt-id=C3, author=Jed, body=And thanks)
Examples:
-Import comments (e.g. emails from an mbox) and append to bug XYZ
- $ be-mbox-to-xml mail.mbox | be import-xml --c XYZ -
-Or you can append those emails underneath the prexisting comment XYZ-3
- $ be-mbox-to-xml mail.mbox | be import-xml --c XYZ-3 -
+Import comments (e.g. emails from an mbox) and append to bug /XYZ:
+
+ $ be-mbox-to-xml mail.mbox | be import-xml --r /XYZ -
+
+Or you can append those emails underneath the prexisting comment /XYZ/3:
+
+ $ be-mbox-to-xml mail.mbox | be import-xml --r /XYZ/3 -
+
+User creates a new bug:
-User creates a new bug
user$ be new "The demuxulizer is broken"
Created bug with ID 48f
user$ be comment 48f
<Describe bug>
...
-User exports bug as xml and emails it to the developers
+
+User exports bug as xml and emails it to the developers:
+
user$ be show --xml 48f > 48f.xml
user$ cat 48f.xml | mail -s "Demuxulizer bug xml" devs@b.com
or equivalently (with a slightly fancier be-handle-mail compatible
email):
user$ be email-bugs 48f
-Devs recieve email, and save it's contents as demux-bug.xml
+
+Devs recieve email, and save it's contents as demux-bug.xml:
+
dev$ cat demux-bug.xml | be import-xml -
"""
bugB.save()
self.xml = """
<be-xml>
- <bug>
- <uuid>b</uuid>
- <status>fixed</status>
- <summary>a test bug</summary>
- <extra-string>don't forget your towel</extra-string>
- <extra-string>watch out for flying dolphins</extra-string>
- <comment>
- <uuid>c1</uuid>
- <body>So long</body>
- </comment>
- <comment>
- <uuid>c3</uuid>
- <author>Jed</author>
- <body>And thanks</body>
- </comment>
- </bug>
+ <bugdir>
+ <uuid>abc123</uuid>
+ <bug>
+ <uuid>b</uuid>
+ <status>fixed</status>
+ <summary>a test bug</summary>
+ <extra-string>don't forget your towel</extra-string>
+ <extra-string>watch out for flying dolphins</extra-string>
+ <comment>
+ <uuid>c1</uuid>
+ <body>So long</body>
+ </comment>
+ <comment>
+ <uuid>c3</uuid>
+ <author>Jed</author>
+ <body>And thanks</body>
+ </comment>
+ </bug>
+ </bugdir>
</be-xml>
"""
self.root_comment_xml = """
def testRootCommentsNotAddOnly(self):
bugB = self.bugdir.bug_from_uuid('b')
initial_bugB_summary = bugB.summary
- self._execute(self.root_comment_xml, {'comment-root':'/b'}, ['-'])
+ self._execute(self.root_comment_xml, {'root':'/b'}, ['-'])
uuids = list(self.bugdir.uuids())
uuids = list(self.bugdir.uuids())
self.failUnless(uuids == ['b'], uuids)
bugB = self.bugdir.bug_from_uuid('b')
initial_bugB_summary = bugB.summary
self._execute(self.root_comment_xml,
- {'comment-root':'/b', 'add-only':True}, ['-'])
+ {'root':'/b', 'add-only':True}, ['-'])
uuids = list(self.bugdir.uuids())
self.failUnless(uuids == ['b'], uuids)
bugB = self.bugdir.bug_from_uuid('b')
storage.connect()
self.ui.storage_callbacks.set_storage(storage)
bd = libbe.bugdir.BugDir(storage, from_storage=False)
- self.ui.storage_callbacks.set_bugdir(bd)
+ self.ui.storage_callbacks.set_bugdirs({bd.uuid: bd})
if bd.storage.name is not 'None':
print >> self.stdout, \
'Using %s for revision control.' % storage.name
# You should have received a copy of the GNU General Public License along with
# Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
+import itertools
import os
import re
# ])
def _run(self, **params):
- bugdir = self._get_bugdir()
- writeable = bugdir.storage.writeable
- bugdir.storage.writeable = False
+ storage = self._get_storage()
+ bugdirs = self._get_bugdirs()
+ writeable = storage.writeable
+ storage.writeable = False
cmp_list, status, severity, assigned, extra_strings_regexps = \
- self._parse_params(bugdir, params)
+ self._parse_params(bugdirs, params)
filter = Filter(status, severity, assigned,
extra_strings_regexps=extra_strings_regexps)
- bugs = [bugdir.bug_from_uuid(uuid) for uuid in bugdir.uuids()]
- bugs = [b for b in bugs if filter(bugdir, b) == True]
+ bugs = list(itertools.chain(*list(
+ [bugdir.bug_from_uuid(uuid) for uuid in bugdir.uuids()]
+ for bugdir in bugdirs.values())))
+ bugs = [b for b in bugs if filter(bugdirs, b) == True]
self.result = bugs
if len(bugs) == 0 and params['xml'] == False:
print >> self.stdout, 'No matching bugs found'
print >> self.stdout, bug.id.user()
else:
self._list_bugs(bugs, show_tags=params['tags'], xml=params['xml'])
- bugdir.storage.writeable = writeable
+ storage.writeable = writeable
return 0
- def _parse_params(self, bugdir, params):
+ def _parse_params(self, bugdirs, params):
cmp_list = []
if params['sort'] != None:
for cmp in params['sort'].split(','):
assigned = 'all'
else:
assigned = libbe.command.util.select_values(
- params['assigned'], libbe.command.util.assignees(bugdir))
+ params['assigned'], libbe.command.util.assignees(bugdirs))
for i in range(len(assigned)):
if assigned[i] == '-':
assigned[i] = params['user-id']
>>> io = libbe.command.StringInputOutput()
>>> io.stdout = sys.stdout
>>> ui = libbe.command.UserInterface(io=io)
- >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
>>> cmd = Merge(ui=ui)
>>> a = bd.bug_from_uuid('a')
... cmp=libbe.comment.cmp_time)
>>> mergeA = a_comments[0]
>>> mergeA.time = 3
- >>> print a.string(show_comments=True) # doctest: +ELLIPSIS
+ >>> print a.string(show_comments=True)
+ ... # doctest: +ELLIPSIS, +REPORT_UDIFF
ID : a
Short name : abc/a
Severity : minor
... libbe.comment.cmp_time)
>>> mergeB = b_comments[0]
>>> mergeB.time = 3
- >>> print b.string(show_comments=True) # doctest: +ELLIPSIS
+ >>> print b.string(show_comments=True)
+ ... # doctest: +ELLIPSIS, +REPORT_UDIFF
ID : b
Short name : abc/b
Severity : minor
])
def _run(self, **params):
- bugdir = self._get_bugdir()
- bugA,dummy_comment = \
- libbe.command.util.bug_comment_from_user_id(
- bugdir, params['bug-id'])
+ storage = self._get_storage()
+ bugdirs = self._get_bugdirs()
+ bugdirA,bugA,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, params['bug-id']))
bugA.load_comments()
- bugB,dummy_comment = \
- libbe.command.util.bug_comment_from_user_id(
- bugdir, params['bug-id-to-merge'])
+ bugdirB,bugB,dummy_comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, params['bug-id-to-merge']))
bugB.load_comments()
mergeA = bugA.new_comment('Merged from bug #%s#' % bugB.id.long_user())
newCommTree = copy.deepcopy(bugB.comment_root)
if comment.alt_id == None:
comment.storage = None
comment.alt_id = comment.uuid
- comment.storage = bugdir.storage
+ comment.storage = storage
comment.uuid = libbe.util.id.uuid_gen()
comment.save() # force onto disk under bugA
arg=libbe.command.Argument(
name='severity', metavar='SEVERITY',
completion_callback=libbe.command.util.complete_severity)),
+ libbe.command.Option(name='bugdir', short_name='b',
+ help='Short bugdir UUID for the new bug. You '
+ 'only need to set this if you have multiple bugdirs in '
+ 'your repository.',
+ arg=libbe.command.Argument(
+ name='bugdir', metavar='ID', default=None,
+ completion_callback=libbe.command.util.complete_bugdir_id)),
libbe.command.Option(name='full-uuid', short_name='f',
help='Print the full UUID for the new bug')
])
summary = self.stdin.readline()
else:
summary = params['summary']
- bugdir = self._get_bugdir()
- bugdir.storage.writeable = False
+ storage = self._get_storage()
+ bugdirs = self._get_bugdirs()
+ if params['bugdir']:
+ bugdir = bugdirs[bugdir]
+ elif len(bugdirs) == 1:
+ bugdir = bugdirs.values()[0]
+ else:
+ raise libbe.command.UserError(
+ 'Ambiguous bugdir {}'.format(sorted(bugdirs.values())))
+ storage.writeable = False
bug = bugdir.new_bug(summary=summary.strip())
if params['creator'] != None:
bug.creator = params['creator']
bug.status = params['status']
if params['severity'] != None:
bug.severity = params['severity']
- bugdir.storage.writeable = True
+ storage.writeable = True
bug.save()
if params['full-uuid']:
bug_id = bug.id.long_user()
])
def _run(self, **params):
- bugdir = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
user_ids = []
for bug_id in params['bug-id']:
- bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
- bugdir, bug_id)
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, bug_id))
user_ids.append(bug.id.user())
bugdir.remove_bug(bug)
if len(user_ids) == 1:
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
+ self.options.extend([
+ libbe.command.Option(name='bugdir', short_name='b',
+ help='Short bugdir UUID to act on. You '
+ 'only need to set this if you have multiple bugdirs in '
+ 'your repository.',
+ arg=libbe.command.Argument(
+ name='bugdir', metavar='ID', default=None,
+ completion_callback=libbe.command.util.complete_bugdir_id)),
+ ])
self.args.extend([
libbe.command.Argument(
name='setting', metavar='SETTING', optional=True,
])
def _run(self, **params):
- bugdir = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
+ if params['bugdir']:
+ bugdir = bugdirs[bugdir]
+ elif len(bugdirs) == 1:
+ bugdir = bugdirs.values()[0]
+ else:
+ raise libbe.command.UserError(
+ 'Ambiguous bugdir {}'.format(sorted(bugdirs.values())))
if params['setting'] == None:
keys = bugdir.settings_properties
keys.sort()
>>> io = libbe.command.StringInputOutput()
>>> io.stdout = sys.stdout
>>> ui = libbe.command.UserInterface(io=io)
- >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
>>> cmd = Severity(ui=ui)
>>> bd.bug_from_uuid('a').severity
])
def _run(self, **params):
- bugdir = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
for bug_id in params['bug-id']:
- bug,dummy_comment = \
- libbe.command.util.bug_comment_from_user_id(bugdir, bug_id)
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, bug_id))
if bug.severity != params['severity']:
try:
bug.severity = params['severity']
def _long_help(self):
try: # See if there are any per-tree severity configurations
- bd = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
except NotImplementedError:
pass # No tree, just show the defaults
longest_severity_len = max([len(s) for s in libbe.bug.severity_values])
>>> io.stdout = sys.stdout
>>> io.stdout.encoding = 'ascii'
>>> ui = libbe.command.UserInterface(io=io)
- >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
>>> cmd = Show(ui=ui)
>>> ret = ui.run(cmd, args=['/a',]) # doctest: +ELLIPSIS
])
def _run(self, **params):
- bugdir = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
if params['only-raw-body'] == True:
if len(params['id']) != 1:
raise libbe.command.UserError(
'only one ID accepted with --only-raw-body')
- bug,comment = libbe.command.util.bug_comment_from_user_id(
- bugdir, params['id'][0])
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, params['id'][0]))
if comment == bug.comment_root:
raise libbe.command.UserError(
"--only-raw-body requires a comment ID, not '%s'"
sys.__stdout__.write(comment.body)
return 0
print >> self.stdout, \
- output(bugdir, params['id'], encoding=self.stdout.encoding,
+ output(bugdirs, params['id'], encoding=self.stdout.encoding,
as_xml=params['xml'],
with_comments=not params['no-comments'])
return 0
order of the listed IDs.
"""
-def _sort_ids(bugdir, ids, with_comments=True):
+def _sort_ids(bugdirs, ids, with_comments=True):
bugs = []
root_comments = {}
for id in ids:
- p = libbe.util.id.parse_user(bugdir, id)
+ p = libbe.util.id.parse_user(bugdirs, id)
if p['type'] == 'bug':
bugs.append(p['bug'])
elif with_comments == True:
def _xml_footer():
return ['</be-xml>']
-def output(bd, ids, encoding, as_xml=True, with_comments=True):
+def output(bugdirs, ids, encoding, as_xml=True, with_comments=True):
if ids == None or len(ids) == 0:
- bd.load_all_bugs()
- ids = [bug.id.user() for bug in bd]
- bugs,root_comments = _sort_ids(bd, ids, with_comments)
+ ids = []
+ for bugdir in bugdirs.values():
+ bugdir.load_all_bugs()
+ ids.extend([bug.id.user() for bug in bugdir])
+ uuids,root_comments = _sort_ids(bugdirs, ids, with_comments)
lines = []
if as_xml:
lines.extend(_xml_header(encoding))
else:
spaces_left = len(ids) - 1
- for bugname in bugs:
- bug = bd.bug_from_uuid(bugname)
+ for bugname in uuids:
+ bug = libbe.command.util.bug_from_uuid(bugdirs, bugname)
if as_xml:
lines.append(bug.xml(indent=2, show_comments=with_comments))
else:
spaces_left -= 1
lines.append('') # add a blank line between bugs/comments
for bugname,comments in root_comments.items():
- bug = bd.bug_from_uuid(bugname)
+ bug = libbe.command.util.bug_from_uuid(bugdirs, bugname)
if as_xml:
lines.extend([' <bug>', ' <uuid>%s</uuid>' % bug.uuid])
for commname in comments:
>>> io = libbe.command.StringInputOutput()
>>> io.stdout = sys.stdout
>>> ui = libbe.command.UserInterface(io=io)
- >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
>>> cmd = Status(ui=ui)
>>> cmd._storage = bd.storage
])
def _run(self, **params):
- bugdir = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
for bug_id in params['bug-id']:
- bug,dummy_comment = \
- libbe.command.util.bug_comment_from_user_id(bugdir, bug_id)
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, bug_id))
if bug.status != params['status']:
try:
bug.status = params['status']
def _long_help(self):
try: # See if there are any per-tree status configurations
- bd = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
except NotImplementedError:
pass # No tree, just show the defaults
longest_status_len = max([len(s) for s in libbe.bug.status_values])
>>> io = libbe.command.StringInputOutput()
>>> io.stdout = sys.stdout
>>> ui = libbe.command.UserInterface(io=io)
- >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> ui.storage_callbacks.set_storage(bd.storage)
>>> cmd = Subscribe(ui=ui)
>>> a = bd.bug_from_uuid('a')
Subscriptions for abc/a:
John Doe <j@doe.com> all *
>>> ret = ui.run(cmd, {'unsubscribe':True, 'subscriber':'John Doe <j@doe.com>'}, ['/a'])
- >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>', 'types':'new'}, ['DIR']) # doctest: +NORMALIZE_WHITESPACE
- Subscriptions for bug directory:
+ >>> ret = ui.run(cmd,
+ ... {'subscriber':'Jane Doe <J@doe.com>', 'types':'new'},
+ ... [bd.uuid[:3]]) # doctest: +NORMALIZE_WHITESPACE
+ Subscriptions for abc:
Jane Doe <J@doe.com> new *
- >>> ret = ui.run(cmd, {'subscriber':'Jane Doe <J@doe.com>'}, ['DIR']) # doctest: +NORMALIZE_WHITESPACE
- Subscriptions for bug directory:
+ >>> ret = ui.run(cmd,
+ ... {'subscriber':'Jane Doe <J@doe.com>'},
+ ... [bd.uuid]) # doctest: +NORMALIZE_WHITESPACE
+ Subscriptions for abc:
Jane Doe <J@doe.com> all *
>>> ui.cleanup()
>>> bd.cleanup()
])
def _run(self, **params):
- bugdir = self._get_bugdir()
+ storage = self._get_storage()
+ bugdirs = self._get_bugdirs()
if params['list-all'] == True or params['list'] == True:
- writeable = bugdir.storage.writeable
- bugdir.storage.writeable = False
+ writeable = storage.writeable
+ storage.writeable = False
if params['list-all'] == True:
assert len(params['id']) == 0, params['id']
subscriber = params['subscriber']
types = params['types'].split(',')
if len(params['id']) == 0:
- params['id'] = [libbe.diff.BUGDIR_ID]
+ params['id'] = bugdirs.keys()
for _id in params['id']:
- if _id == libbe.diff.BUGDIR_ID: # directory-wide subscriptions
+ p = libbe.util.id.parse_user(bugdirs, _id)
+ if p['type'] == 'bugdir':
type_root = libbe.diff.BUGDIR_TYPE_ALL
- entity = bugdir
- entity_name = 'bug directory'
+ entity = bugdirs[p['bugdir']]
else: # bug-specific subscriptions
type_root = libbe.diff.BUG_TYPE_ALL
- bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
- bugdir, _id)
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, _id))
entity = bug
- entity_name = bug.id.user()
+ entity_name = entity.id.user()
if params['list-all'] == True:
entity_name = 'anything in the bug directory'
types = [libbe.diff.type_from_name(name, type_root, default=libbe.diff.INVALID_TYPE,
entity.extra_strings = estrs # reassign to notice change
if params['list-all'] == True:
- bugdir.load_all_bugs()
- subscriptions = get_bugdir_subscribers(bugdir, servers[0])
+ subscriptions = []
+ for bugdir in bugdirs.values():
+ bugdir.load_all_bugs()
+ subscriptions.extend(
+ get_bugdir_subscribers(bugdir, servers[0]))
else:
subscriptions = []
for estr in entity.extra_strings:
print >> self.stdout, 'Subscriptions for %s:' % entity_name
print >> self.stdout, '\n'.join(subscriptions)
if params['list-all'] == True or params['list'] == True:
- bugdir.storage.writeable = writeable
+ storage.writeable = writeable
return 0
def _long_help(self):
return """
-ID can be either a bug id, or blank/"DIR", in which case it refers to the
-whole bug directory.
+ID can be either a bug ID, a bugdir ID, or blank, in which case it
+refers to all known bugdirs.
SERVERS specifies the servers from which you would like to receive
notification. Multiple severs may be specified in a comma-separated
# You should have received a copy of the GNU General Public License along with
# Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
+import itertools
+
import libbe
import libbe.command
import libbe.command.util
>>> io = libbe.command.StringInputOutput()
>>> io.stdout = sys.stdout
>>> ui = libbe.command.UserInterface(io=io)
- >>> ui.storage_callbacks.set_bugdir(bd)
+ >>> ui.storage_callbacks.set_bugdirs({bd.uuid: bd})
>>> cmd = Tag(ui=ui)
>>> a = bd.bug_from_uuid('a')
if params['id'] != None and params['list'] == True:
raise libbe.command.UserError(
'Do not specify a bug id with the --list option.')
- bugdir = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
if params['list'] == True:
- tags = get_all_tags(bugdir)
+ tags = list(itertools.chain(*
+ [get_all_tags(bugdir) for bugdir in bugdirs.values()]))
tags.sort()
if len(tags) > 0:
print >> self.stdout, '\n'.join(tags)
return 0
- bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
- bugdir, params['id'])
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, params['id']))
if len(params['tag']) > 0:
tags = get_tags(bug)
for tag in params['tag']:
import libbe.command
import libbe.command.util
import libbe.command.depend
+import libbe.util.id
class Target (libbe.command.Command):
help="Print the UUID for the target bug whose summary "
"matches TARGET. If TARGET is not given, print the UUID "
"of the current bugdir target."),
+ libbe.command.Option(name='bugdir', short_name='b',
+ help='Short bugdir UUID for the target resolution. You '
+ 'only need to set this if you have multiple bugdirs in '
+ 'your repository.',
+ arg=libbe.command.Argument(
+ name='bugdir', metavar='ID', default=None,
+ completion_callback=libbe.command.util.complete_bugdir_id)),
])
self.args.extend([
libbe.command.Argument(
if params['target'] != None:
raise libbe.command.UserError('Too many arguments')
params['target'] = params.pop('id')
- bugdir = self._get_bugdir()
+ bugdirs = self._get_bugdirs()
if params['resolve'] == True:
- bug = bug_from_target_summary(bugdir, params['target'])
+ if params['bugdir']:
+ bugdir = bugdirs[bugdir]
+ elif len(bugdirs) == 1:
+ bugdir = bugdirs.values()[0]
+ else:
+ raise libbe.command.UserError(
+ 'Ambiguous bugdir {}'.format(sorted(bugdirs.values())))
+ bug = bug_from_target_summary(bugdirs, bugdir, params['target'])
if bug == None:
print >> self.stdout, 'No target assigned.'
else:
print >> self.stdout, bug.uuid
return 0
- bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
- bugdir, params['id'])
+ bugdir,bug,comment = (
+ libbe.command.util.bugdir_bug_comment_from_user_id(
+ bugdirs, params['id']))
if params['target'] == None:
- target = bug_target(bugdir, bug)
+ target = bug_target(bugdirs, bug)
if target == None:
print >> self.stdout, 'No target assigned.'
else:
print >> self.stdout, target.summary
else:
if params['target'] == 'none':
- target = remove_target(bugdir, bug)
+ target = remove_target(bugdirs, bug)
else:
- target = add_target(bugdir, bug, params['target'])
+ target = add_target(bugdirs, bugdir, bug, params['target'])
return 0
def usage(self):
$ be set target $(be target --resolve SUMMARY)
"""
-def bug_from_target_summary(bugdir, summary=None):
+def bug_from_target_summary(bugdirs, bugdir, summary=None):
if summary == None:
if bugdir.target == None:
return None
% '\n '.join([bug.uuid for bug in matched]))
return matched[0]
-def bug_target(bugdir, bug):
+def bug_target(bugdirs, bug):
if bug.severity == 'target':
return bug
matched = []
- for blocked in libbe.command.depend.get_blocks(bugdir, bug):
+ for blocked in libbe.command.depend.get_blocks(bugdirs, bug):
if blocked.severity == 'target':
matched.append(blocked)
if len(matched) == 0:
'\n '.join([b.uuid for b in matched])))
return matched[0]
-def remove_target(bugdir, bug):
- target = bug_target(bugdir, bug)
+def remove_target(bugdirs, bug):
+ target = bug_target(bugdirs, bug)
libbe.command.depend.remove_block(target, bug)
return target
-def add_target(bugdir, bug, summary):
- target = bug_from_target_summary(bugdir, summary)
+def add_target(bugdirs, bugdir, bug, summary):
+ target = bug_from_target_summary(bugdirs, bugdir, summary)
if target == None:
target = bugdir.new_bug(summary=summary)
target.severity = 'target'
libbe.command.depend.add_block(target, bug)
return target
-def targets(bugdir):
+def targets(bugdirs):
"""Generate all possible target bug summaries."""
- bugdir.load_all_bugs()
- for bug in bugdir:
- if bug.severity == 'target':
- yield bug.summary
+ for bugdir in bugdirs.values():
+ bugdir.load_all_bugs()
+ for bug in bugdir:
+ if bug.severity == 'target':
+ yield bug.summary
-def target_dict(bugdir):
+def target_dict(bugdirs):
"""
Return a dict with bug UUID keys and bug summary values for all
target bugs.
"""
ret = {}
- bugdir.load_all_bugs()
- for bug in bugdir:
- if bug.severity == 'target':
- ret[bug.uuid] = bug.summary
+ for bug in targets(bugdirs):
+ ret[bug.uuid] = bug
return ret
def complete_target(command, argument, fragment=None):
"""List possible command completions for fragment."""
- return targets(command._get_bugdir())
+ return targets(command._get_bugdirs())
class Completer (object):
def __init__(self, options):
self.options = options
- def __call__(self, bugdir, fragment=None):
+ def __call__(self, bugdirs, fragment=None):
return [fragment]
def complete_command(command, argument, fragment=None):
return comp_path(fragment)
def complete_status(command, argument, fragment=None):
- bd = command._get_bugdir()
+ bd = sorted(command._get_bugdirs().items())[1]
import libbe.bug
return libbe.bug.status_values
def complete_severity(command, argument, fragment=None):
- bd = command._get_bugdir()
+ bd = sorted(command._get_bugdirs().items())[1]
import libbe.bug
return libbe.bug.severity_values
-def assignees(bugdir):
- bugdir.load_all_bugs()
- return list(set([bug.assigned for bug in bugdir
- if bug.assigned != None]))
+def assignees(bugdirs):
+ ret = set()
+ for bugdir in bugdirs.values():
+ bugdir.load_all_bugs()
+ ret.update(set([bug.assigned for bug in bugdir
+ if bug.assigned != None]))
+ return list(ret)
def complete_assigned(command, argument, fragment=None):
- return assignees(command._get_bugdir())
+ return assignees(command._get_bugdirs())
def complete_extra_strings(command, argument, fragment=None):
if fragment == None:
return []
return [fragment]
+def complete_bugdir_id(command, argument, fragment=None):
+ bugdirs = command._get_bugdirs()
+ return bugdirs.keys()
+
def complete_bug_id(command, argument, fragment=None):
return complete_bug_comment_id(command, argument, fragment,
comments=False)
active_only=True, comments=True):
import libbe.bugdir
import libbe.util.id
- bd = command._get_bugdir()
+ bugdirs = command._get_bugdirs()
if fragment == None or len(fragment) == 0:
fragment = '/'
try:
- p = libbe.util.id.parse_user(bd, fragment)
+ p = libbe.util.id.parse_user(bugdirs, fragment)
matches = None
root,residual = (fragment, None)
if not root.endswith('/'):
common = e.common
matches = e.matches
root,residual = libbe.util.id.residual(common, fragment)
- p = libbe.util.id.parse_user(bd, e.common)
+ p = libbe.util.id.parse_user(bugdirs, e.common)
bug = None
if matches == None: # fragment was complete, get a list of children uuids
if p['type'] == 'bugdir':
- matches = bd.uuids()
- common = bd.id.user()
+ bugdir = bugdirs[p['bugdir']]
+ matches = bugdir.uuids()
+ common = bugdir.id.user()
elif p['type'] == 'bug':
if comments == False:
return [fragment]
- bug = bd.bug_from_uuid(p['bug'])
+ bugdir = bugdirs[p['bugdir']]
+ bug = bugdir.bug_from_uuid(p['bug'])
matches = bug.uuids()
common = bug.id.user()
else:
assert p['type'] == 'comment', p
return [fragment]
if p['type'] == 'bugdir':
- child_fn = bd.bug_from_uuid
+ bugdir = bugdirs[p['bugdir']]
+ child_fn = bugdir.bug_from_uuid
elif p['type'] == 'bug':
if comments == False:
return[fragment]
+ bugdir = bugdirs[p['bugdir']]
if bug == None:
- bug = bd.bug_from_uuid(p['bug'])
+ bug = bugdir.bug_from_uuid(p['bug'])
child_fn = bug.comment_from_uuid
elif p['type'] == 'comment':
assert matches == None, matches
possible_values = whitelisted_values
return possible_values
-def bug_comment_from_user_id(bugdir, id):
- p = libbe.util.id.parse_user(bugdir, id)
- if not p['type'] in ['bug', 'comment']:
+def bugdir_bug_comment_from_user_id(bugdirs, id):
+ p = libbe.util.id.parse_user(bugdirs, id)
+ if not p['type'] in ['bugdir', 'bug', 'comment']:
+ raise libbe.command.UserError(
+ '{} is a {} id, not a bugdir, bug, or comment id'.format(
+ id, p['type']))
+ if p['bugdir'] not in bugdirs:
raise libbe.command.UserError(
- '%s is a %s id, not a bug or comment id' % (id, p['type']))
+ "{} doesn't belong to any bugdirs in {}".format(
+ id, sorted(bugdirs.keys())))
+ bugdir = bugdirs[p['bugdir']]
if p['bugdir'] != bugdir.uuid:
raise libbe.command.UserError(
"%s doesn't belong to this bugdir (%s)"
% (id, bugdir.uuid))
- bug = bugdir.bug_from_uuid(p['bug'])
- if 'comment' in p:
- comment = bug.comment_from_uuid(p['comment'])
+ if 'bug' in p:
+ bug = bugdir.bug_from_uuid(p['bug'])
+ if 'comment' in p:
+ comment = bug.comment_from_uuid(p['comment'])
+ else:
+ comment = bug.comment_root
else:
- comment = bug.comment_root
- return (bug, comment)
+ bug = comment = None
+ return (bugdir, bug, comment)
+
+def bug_from_uuid(bugdirs, uuid):
+ error = None
+ for bugdir in bugdirs.values():
+ try:
+ bug = bugdir.bug_from_uuid(uuid)
+ except libbe.bugdir.NoBugMatches as e:
+ error = e
+ else:
+ return bug
+ raise error
def save_comments(bug):
for comment in bug.comment_root.traverse():
+ comment.bug = bug
+ comment.storage = bug.storage
comment.save()
assert self.uuid != INVALID_UUID, self
if self.content_type.startswith('text/') \
and self.bug != None and self.bug.bugdir != None:
- new = libbe.util.id.short_to_long_text([self.bug.bugdir], new)
+ new = libbe.util.id.short_to_long_text(
+ {self.bug.bugdir.uuid: self.bug.bugdir}, new)
if (self.storage != None and self.storage.writeable == True) \
or force==True:
assert new != None, "Can't save empty comment"
<extra-string>TAG: very helpful</extra-string>
</comment>
"""
- for attr in other.explicit_attrs:
- old = getattr(self, attr)
- new = getattr(other, attr)
- if old != new:
- if accept_changes == True:
- setattr(self, attr, new)
- elif change_exception == True:
- raise ValueError, \
- 'Merge would change %s "%s"->"%s" for comment %s' \
- % (attr, old, new, self.uuid)
+ if hasattr(other, 'explicit_attrs'):
+ for attr in other.explicit_attrs:
+ old = getattr(self, attr)
+ new = getattr(other, attr)
+ if old != new:
+ if accept_changes:
+ setattr(self, attr, new)
+ elif change_exception:
+ raise ValueError(
+ ('Merge would change {} "{}"->"{}" for comment {}'
+ ).format(attr, old, new, self.uuid))
if self.alt_id == self.uuid:
self.alt_id = None
for estr in other.extra_strings:
if self.content_type.startswith("text/"):
body = (self.body or "")
if self.bug != None and self.bug.bugdir != None:
- body = libbe.util.id.long_to_short_text([self.bug.bugdir], body)
+ body = libbe.util.id.long_to_short_text(
+ {self.bug.bugdir.uuid: self.bug.bugdir}, body)
lines.extend(body.splitlines())
else:
lines.append("Content type %s not printable. Try XML output instead" % self.content_type)
long_to_short_text : conversion on a block of text
"""
ids = _split(id, check_length=True)
- matching_bugdirs = [bd for bd in bugdirs if bd.uuid == ids[0]]
+ matching_bugdirs = [bd for bd in bugdirs.values() if bd.uuid == ids[0]]
if len(matching_bugdirs) == 0:
- raise NoIDMatches(id, [bd.uuid for bd in bugdirs])
+ raise NoIDMatches(id, [bd.uuid for bd in bugdirs.values()])
elif len(matching_bugdirs) > 1:
- raise MultipleIDMatches(id, '', [bd.uuid for bd in bugdirs])
+ raise MultipleIDMatches(id, '', [bd.uuid for bd in bugdirs.values()])
bugdir = matching_bugdirs[0]
objects = [bugdir]
if len(ids) >= 2:
"""
ids = _split(id, check_length=True)
ids[0] = _expand(ids[0], common=None,
- other_ids=[bd.uuid for bd in bugdirs])
+ other_ids=[bd.uuid for bd in bugdirs.values()])
if len(ids) == 1:
return _assemble(ids)
- bugdir = [bd for bd in bugdirs if bd.uuid == ids[0]][0]
+ bugdir = [bd for bd in bugdirs.values() if bd.uuid == ids[0]][0]
ids[1] = _expand(ids[1], common=bugdir.id.user(),
other_ids=bugdir.uuids())
if len(ids) == 2:
ret[type] = arg
return ret
-def parse_user(bugdir, id):
+def parse_user(bugdirs, id):
"""Parse a user ID (see :class:`ID`), returning a dict of parsed
information.
This function tries to expand IDs before parsing, so it can handle
both short and long IDs successfully.
"""
- long_id = short_to_long_user([bugdir], id)
+ long_id = short_to_long_user(bugdirs, id)
return _parse_user(long_id)
if libbe.TESTING == True:
class ShortLongParseTestCase(unittest.TestCase):
def setUp(self):
self.bugdir = DummyObject('1234abcd')
+ self.bugdirs = {self.bugdir.uuid: self.bugdir}
self.bug = DummyObject('abcdef', self.bugdir, ['a1234', 'ab9876'])
self.comment = DummyObject('12345678', self.bug, ['1234abcd', '1234cdef'])
self.bd_id = self.bugdir.id
None, '123/abc', ['1234abcd','1234cdef','12345678'])),
]
def test_short_to_long_text(self):
- self.failUnless(short_to_long_text([self.bugdir], self.short) == self.long,
- '\n' + self.short + '\n' + short_to_long_text([self.bugdir], self.short) + '\n' + self.long)
+ self.failUnless(short_to_long_text(
+ self.bugdirs, self.short) == self.long,
+ '\n' + self.short + '\n' + short_to_long_text(
+ self.bugdirs, self.short) + '\n' + self.long)
def test_long_to_short_text(self):
- self.failUnless(long_to_short_text([self.bugdir], self.long) == self.short,
- '\n' + long_to_short_text([self.bugdir], self.long) + '\n' + self.short)
+ self.failUnless(long_to_short_text(
+ self.bugdirs, self.long) == self.short,
+ '\n' + long_to_short_text(
+ self.bugdirs, self.long
+ ) + '\n' + self.short)
def test_parse_user(self):
for short_id,parsed in self.short_id_parse_pairs:
- ret = parse_user(self.bugdir, short_id)
+ ret = parse_user(self.bugdirs, short_id)
self.failUnless(ret == parsed,
'got %s\nexpected %s' % (ret, parsed))
def test_parse_user_exceptions(self):
for short_id,exception in self.short_id_exception_pairs:
try:
- ret = parse_user(self.bugdir, short_id)
+ ret = parse_user(self.bugdirs, short_id)
self.fail('Expected parse_user(bugdir, "%s") to raise %s,'
'\n but it returned %s'
% (short_id, exception.__class__.__name__, ret))