>>> import libbe.bugdir
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> cmd = Assign()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
>>> bd.bug_from_uuid('a').assigned is None
True
- >>> ret = cmd.run(bd.storage, bd, {'user-id':u'Fran\xe7ois'}, ['-', '/a'])
+ >>> ret = cmd.run({'user-id':u'Fran\xe7ois'}, ['-', '/a'])
>>> bd.flush_reload()
>>> bd.bug_from_uuid('a').assigned
u'Fran\\xe7ois'
- >>> ret = cmd.run(bd.storage, bd, args=['someone', '/a', '/b'])
+ >>> ret = cmd.run(args=['someone', '/a', '/b'])
>>> bd.flush_reload()
>>> bd.bug_from_uuid('a').assigned
'someone'
>>> bd.bug_from_uuid('b').assigned
'someone'
- >>> ret = cmd.run(bd.storage, bd, args=['none', '/a'])
+ >>> ret = cmd.run(args=['none', '/a'])
>>> bd.flush_reload()
>>> bd.bug_from_uuid('a').assigned is None
True
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_bugdir = True
self.args.extend([
libbe.command.Argument(
- name='assignee', metavar='ASSIGNEE', default=None,
+ name='assigned', metavar='ASSIGNED', default=None,
completion_callback=libbe.command.util.complete_assigned),
libbe.command.Argument(
name='bug-id', metavar='BUG-ID', default=None,
completion_callback=libbe.command.util.complete_bug_id),
])
- def _run(self, storage, bugdir, **params):
- assignee = params['assignee']
- if assignee == 'none':
- assignee = None
- elif assignee == '-':
- assignee = params['user-id']
+ def _run(self, **params):
+ assigned = params['assigned']
+ if assigned == 'none':
+ assigned = None
+ elif assigned == '-':
+ assigned = self._get_user_id()
+ bugdir = self._get_bugdir()
for bug_id in params['bug-id']:
bug,dummy_comment = \
libbe.command.util.bug_comment_from_user_id(bugdir, bug_id)
- if bug.assigned != assignee:
- bug.assigned = assignee
+ if bug.assigned != assigned:
+ bug.assigned = assigned
return 0
def _long_help(self):
return """
Assign a person to fix a bug.
-Assignees should be the person's Bugs Everywhere identity, the same
+Assigneds should be the person's Bugs Everywhere identity, the same
string that appears in Creator fields.
-Special assignee strings:
+Special assigned strings:
"-" assign the bug to yourself
"none" un-assigns the bug
"""
name = 'command'
- def __init__(self, input_encoding=None, output_encoding=None):
+ def __init__(self, input_encoding=None, output_encoding=None,
+ get_unconnected_storage=None, ui=None):
+ self.input_encoding = input_encoding
+ self.output_encoding = output_encoding
+ self.get_unconnected_storage = get_unconnected_storage
+ self.ui = ui # calling user-interface, e.g. for Help()
self.status = None
self.result = None
- self.ui = None # calling user-interface, e.g. for Help()
- self.requires_bugdir = False
- self.requires_storage = False
- self.requires_unconnected_storage = False
self.restrict_file_access = True
- self.input_encoding = None
- self.output_encoding = None
self.options = [
Option(name='help', short_name='h',
help='Print a help message.',
]
self.args = []
- def run(self, storage=None, bugdir=None, options=None, args=None):
+ def run(self, options=None, args=None):
if options == None:
options = {}
if args == None:
params[option.name] = False
assert 'user-id' not in params, params['user-id']
if 'user-id' in options:
- params['user-id'] = options.pop('user-id')
- else:
- params['user-id'] = libbe.ui.util.user.get_user_id(storage)
+ self._user_id = options.pop('user-id')
if len(options) > 0:
raise UserError, 'Invalid option passed to command %s:\n %s' \
% (self.name, '\n '.join(['%s: %s' % (k,v)
params.pop('complete')
self._setup_io(self.input_encoding, self.output_encoding)
- self.status = self._run(storage, bugdir, **params)
+ self.status = self._run(**params)
return self.status
- def _run(self, storage, bugdir, **kwargs):
- pass
+ def _run(self, **kwargs):
+ raise NotImplementedError
def _setup_io(self, input_encoding=None, output_encoding=None):
if input_encoding == None:
return argument.completion_callback(self, argument, fragment)
return [] # the particular argument doesn't supply completion info
- def check_restricted_access(self, storage, path):
+ def _check_restricted_access(self, storage, path):
"""
Check that the file at path is inside bugdir.root. This is
important if you allow other users to execute becommands with
>>> s.repo = os.path.expanduser('~/x/')
>>> c = Command()
>>> try:
- ... c.check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
+ ... c._check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
... except UserError, e:
... assert str(e).startswith('file access restricted!'), str(e)
... print 'we got the expected error'
we got the expected error
- >>> c.check_restricted_access(s, os.path.expanduser('~/x'))
- >>> c.check_restricted_access(s, os.path.expanduser('~/x/y'))
+ >>> c._check_restricted_access(s, os.path.expanduser('~/x'))
+ >>> c._check_restricted_access(s, os.path.expanduser('~/x/y'))
>>> c.restrict_file_access = False
- >>> c.check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
+ >>> c._check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
"""
if self.restrict_file_access == True:
path = os.path.abspath(path)
return
raise UserError('file access restricted!\n %s not in %s'
% (path, repo))
+
+ def _get_unconnected_storage(self):
+ """Callback for use by commands that need it."""
+ if not hasattr(self, '_unconnected_storage'):
+ if self.get_unconnected_storage == None:
+ raise NotImplementedError
+ self._unconnected_storage = self.get_unconnected_storage()
+ return self._unconnected_storage
+
+ def _get_storage(self):
+ """
+ Callback for use by commands that need it.
+
+ Note that with the current implementation,
+ _get_unconnected_storage() will not work after this method
+ runs, but that shouldn't be an issue for any command I can
+ think of...
+ """
+ if not hasattr(self, '_storage'):
+ self._storage = self._get_unconnected_storage()
+ self._storage.connect()
+ return self._storage
+
+ def _get_bugdir(self):
+ """Callback for use by commands that need it."""
+ if not hasattr(self, '_bugdir'):
+ self._bugdir = libbe.bugdir.BugDir(self._get_storage(), from_storage=True)
+ return self._bugdir
+
+ def _get_user_id(self):
+ """Callback for use by commands that need it."""
+ if not hasattr(self, '_user_id'):
+ self._user_id = libbe.ui.util.user.get_user_id(self._get_storage())
+ return self._user_id
+
+ def cleanup(self):
+ if hasattr(self, '_storage'):
+ self._storage.disconnect()
>>> import libbe.bugdir
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> cmd = Comment()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
- >>> ret = cmd.run(bd.storage, bd, {'user-id':u'Fran\\xe7ois'},
+ >>> ret = cmd.run({'user-id':u'Fran\\xe7ois'},
... ['/a', 'This is a comment about a'])
>>> bd.flush_reload()
>>> bug = bd.bug_from_uuid('a')
>>> if 'EDITOR' in os.environ:
... del os.environ['EDITOR']
- >>> ret = cmd.run(bd.storage, bd, {'user-id':u'Frank'}, ['/b'])
+ >>> ret = cmd.run({'user-id':u'Frank'}, ['/b'])
Traceback (most recent call last):
UserError: No comment supplied, and EDITOR not specified.
>>> os.environ['EDITOR'] = "echo 'I like cheese' > "
- >>> ret = cmd.run(bd.storage, bd, {'user-id':u'Frank'}, ['/b'])
+ >>> ret = cmd.run({'user-id':u'Frank'}, ['/b'])
>>> bd.flush_reload()
>>> bug = bd.bug_from_uuid('b')
>>> bug.load_comments(load_full=False)
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_bugdir = True
self.options.extend([
libbe.command.Option(name='author', short_name='a',
help='Set the comment author',
completion_callback=libbe.command.util.complete_assigned),
])
- def _run(self, storage, bugdir, **params):
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
bug,parent = \
libbe.command.util.bug_comment_from_user_id(bugdir, params['id'])
if params['comment'] == None:
if not body.endswith('\n'):
body+='\n'
if params['author'] == None:
- params['author'] = params['user-id']
+ params['author'] = self._get_user_id()
new = parent.new_reply(body=body)
for key in ['alt-id', 'author', 'content-type']:
>>> vcs.repo = dir.path
>>> vcs.init()
>>> vcs.connect()
+ >>> cmd._storage = vcs
>>> if vcs.name in libbe.storage.vcs.base.VCS_ORDER:
... bd = libbe.bugdir.BugDir(vcs, from_storage=False)
... bd.extra_strings = ['hi there']
- ... cmd.run(vcs, None, {'user-id':'Joe'},
- ... ['Making a commit']) # doctest: +ELLIPSIS
+ ... cmd.run({'user-id':'Joe'}, ['Making a commit']) # doctest: +ELLIPSIS
... else:
... print 'Committed ...'
Committed ...
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_storage = True
self.options.extend([
libbe.command.Option(name='body', short_name='b',
help='Provide the detailed body for the commit message. In the special case that FILE == "EDITOR", spawn an editor to enter the body text (in which case you cannot use stdin for the summary)',
name='comment', metavar='COMMENT', default=None),
])
- def _run(self, storage, bugdir=None, **params):
+ def _run(self, **params):
if params['comment'] == '-': # read summary from stdin
assert params['body'] != 'EDITOR', \
'Cannot spawn and editor when the summary is using stdin.'
summary = sys.stdin.readline()
else:
summary = params['comment']
+ storage = self._get_storage()
if params['body'] == None:
body = None
elif params['body'] == 'EDITOR':
body = libbe.ui.util.editor.editor_string(
'Please enter your commit message above')
else:
- self.check_restricted_access(storage, params['body'])
+ self._check_restricted_access(storage, params['body'])
body = libbe.util.encoding.get_file_contents(
params['body'], decode=True)
try:
>>> import libbe.bugdir
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> cmd = Depend()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
- >>> ret = cmd.run(bd.storage, bd, {}, ['/a', '/b'])
+ >>> ret = cmd.run(args=['/a', '/b'])
a blocked by:
b
- >>> ret = cmd.run(bd.storage, bd, {}, ['/a'])
+ >>> ret = cmd.run(args=['/a'])
a blocked by:
b
- >>> ret = cmd.run(bd.storage, bd, {'show-status':True}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
+ >>> ret = cmd.run({'show-status':True}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
a blocked by:
b closed
- >>> ret = cmd.run(bd.storage, bd, {}, ['/b', '/a'])
+ >>> ret = cmd.run(args=['/b', '/a'])
b blocked by:
a
b blocks:
a
- >>> ret = cmd.run(bd.storage, bd, {'show-status':True}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
+ >>> ret = cmd.run({'show-status':True}, ['/a']) # doctest: +NORMALIZE_WHITESPACE
a blocked by:
b closed
a blocks:
b closed
- >>> ret = cmd.run(bd.storage, bd, {'repair':True})
- >>> ret = cmd.run(bd.storage, bd, {'remove':True}, ['/b', '/a'])
+ >>> ret = cmd.run({'repair':True})
+ >>> ret = cmd.run({'remove':True}, ['/b', '/a'])
b blocks:
a
- >>> ret = cmd.run(bd.storage, bd, {'remove':True}, ['/a', '/b'])
+ >>> ret = cmd.run({'remove':True}, ['/a', '/b'])
>>> bd.cleanup()
"""
name = 'depend'
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_bugdir = True
self.options.extend([
libbe.command.Option(name='remove', short_name='r',
help='Remove dependency (instead of adding it)'),
completion_callback=libbe.command.util.complete_bug_id),
])
- def _run(self, storage, bugdir, **params):
+ def _run(self, **params):
if params['repair'] == True and params['bug-id'] != None:
raise libbe.command.UsageError(
'No arguments with --repair calls.')
and params['blocking-bug-id'] != None:
raise libbe.command.UsageError(
'Only one bug id used in tree mode.')
+ bugdir = self._get_bugdir()
if params['repair'] == True:
good,fixed,broken = check_dependencies(bugdir, repair_broken_links=True)
assert len(broken) == 0, broken
>>> import libbe.bugdir
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> cmd = Due()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
- >>> ret = cmd.run(bd.storage, bd, {}, ['/a'])
+ >>> ret = cmd.run(args=['/a'])
No due date assigned.
- >>> ret = cmd.run(bd.storage, bd, {}, ['/a', 'Thu, 01 Jan 1970 00:00:00 +0000'])
- >>> ret = cmd.run(bd.storage, bd, {}, ['/a'])
+ >>> ret = cmd.run(args=['/a', 'Thu, 01 Jan 1970 00:00:00 +0000'])
+ >>> ret = cmd.run(args=['/a'])
Thu, 01 Jan 1970 00:00:00 +0000
- >>> ret = cmd.run(bd.storage, bd, {}, ['/a', 'none'])
- >>> ret = cmd.run(bd.storage, bd, {}, ['/a'])
+ >>> ret = cmd.run(args=['/a', 'none'])
+ >>> ret = cmd.run(args=['/a'])
No due date assigned.
>>> bd.cleanup()
"""
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_bugdir = True
self.args.extend([
libbe.command.Argument(
name='bug-id', metavar='BUG-ID',
name='due', metavar='DUE', optional=True),
])
- def _run(self, storage, bugdir, **params):
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
bugdir, params['bug-id'])
if params['due'] == None:
completion_callback=self.complete_topic)
])
- def _run(self, storage, bugdir, **params):
+ def _run(self, **params):
if params['topic'] == None:
if hasattr(self.ui, 'help'):
self.ui.help()
elif params['topic'] in libbe.command.commands():
module = libbe.command.get_command(params['topic'])
Class = libbe.command.get_command_class(module,params['topic'])
- c = Class()
+ c = Class(get_unconnected_storage=self.get_unconnected_storage,
+ ui=self.ui)
print >> self.stdout, c.help().rstrip('\n')
elif params['topic'] in TOPICS:
print >> self.stdout, TOPICS[params['topic']].rstrip('\n')
>>> import libbe.bugdir
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> cmd = HTML()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
>>> cwd = os.getcwd()
>>> os.chdir(bd.storage.repo)
- >>> ret = cmd.run(bd.storage, bd)
+ >>> ret = cmd.run()
>>> os.path.exists('./html_export')
True
>>> os.path.exists('./html_export/index.html')
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_bugdir = True
self.options.extend([
libbe.command.Option(name='output', short_name='o',
help='Set the output path (%default)',
help='Verbose output, default is %default'),
])
- def _run(self, storage, bugdir, **params):
+ def _run(self, **params):
if params['export-template'] == True:
html_gen.write_default_template(params['export-template-dir'])
return 0
+ bugdir = self._get_bugdir()
bugdir.load_all_bugs()
html_gen = HTMLGen(bugdir,
template=params['template-dir'],
>>> import libbe.bugdir
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> cmd = Import_XML()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
>>> cmd.stdin = StringIO.StringIO('<be-xml><comment><uuid>c</uuid><body>This is a comment about a</body></comment></be-xml>')
>>> cmd.stdin.encoding = 'ascii'
- >>> ret = cmd.run(bd.storage, bd, {'comment-root':'/a'}, ['-'])
+ >>> ret = cmd.run({'comment-root':'/a'}, ['-'])
>>> bd.flush_reload()
>>> bug = bd.bug_from_uuid('a')
>>> bug.load_comments(load_full=False)
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_bugdir = True
self.options.extend([
libbe.command.Option(name='ignore-missing-references', short_name='i',
help="If any comment's <in-reply-to> refers to a non-existent comment, ignore it (instead of raising an exception)."),
name='xml-file', metavar='XML-FILE'),
])
- def _run(self, storage, bugdir, **params):
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
writeable = bugdir.storage.writeable
bugdir.storage.writeable = False
if params['comment-root'] != None:
if params['xml-file'] == '-':
xml = self.stdin.read().encode(self.stdin.encoding)
else:
- self.check_restricted_access(storage, params['xml-file'])
+ self._check_restricted_access(storage, params['xml-file'])
xml = libbe.util.encoding.get_file_contents(
params['xml-file'])
def setUp(self):
self.bugdir = libbe.bugdir.SimpleBugDir(memory=False)
self.cmd = Import_XML()
+ self.cmd._storage = self.bugdir.storage
self.cmd._setup_io = lambda i_enc,o_enc : None
bugA = self.bugdir.bug_from_uuid('a')
self.bugdir.remove_bug(bugA)
def tearDown(self):
self.bugdir.cleanup()
def _execute(self, params={}, args=[]):
- self.cmd.run(self.bugdir.storage, self.bugdir, params, args)
+ self.cmd.run(params, args)
self.bugdir.flush_reload()
def testCleanBugdir(self):
uuids = list(self.bugdir.uuids())
... except libbe.storage.ConnectionError:
... True
True
- >>> cmd.run(vcs)
+ >>> cmd._unconnected_storage = vcs
+ >>> cmd.run()
No revision control detected.
BE repository initialized.
>>> bd = libbe.bugdir.BugDir(vcs)
>>> vcs = libbe.storage.vcs.installed_vcs()
>>> vcs.repo = dir.path
>>> vcs._vcs_init(vcs.repo)
+ >>> cmd._unconnected_storage = vcs
>>> if vcs.name in libbe.storage.vcs.base.VCS_ORDER:
- ... cmd.run(vcs) # doctest: +ELLIPSIS
+ ... cmd.run() # doctest: +ELLIPSIS
... else:
... vcs.init()
... vcs.connect()
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_unconnected_storage = True
- def _run(self, storage, bugdir=None, **params):
+ def _run(self, **params):
+ storage = self._get_unconnected_storage()
if not os.path.isdir(storage.repo):
raise libbe.command.UserError(
'No such directory: %s' % storage.repo)
>>> import libbe.bugdir
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> cmd = List()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
- >>> ret = cmd.run(bd.storage, bd)
+
+ >>> ret = cmd.run()
abc/a:om: Bug A
- >>> ret = cmd.run(bd.storage, bd, {'status':'closed'})
+ >>> ret = cmd.run({'status':'closed'})
abc/b:cm: Bug B
>>> bd.storage.writeable
True
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_bugdir = True
self.options.extend([
libbe.command.Option(name='status',
help='Only show bugs matching the STATUS specifier',
#
# ])
- def _run(self, storage, bugdir, **params):
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
writeable = bugdir.storage.writeable
bugdir.storage.writeable = False
cmp_list, status, severity, assigned, extra_strings_regexps = \
>>> import libbe.comment
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> cmd = Merge()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
>>> dummy = dummy.new_reply('1 2 3 4')
>>> dummy.time = 2
- >>> ret = cmd.run(bd.storage, bd, {}, ['/a', '/b'])
+ >>> ret = cmd.run(args=['/a', '/b'])
Merged bugs #abc/a# and #abc/b#
>>> bd.flush_reload()
>>> a = bd.bug_from_uuid('a')
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_bugdir = True
self.args.extend([
libbe.command.Argument(
name='bug-id', metavar='BUG-ID', default=None,
completion_callback=libbe.command.util.complete_bug_id),
])
- def _run(self, storage, bugdir, **params):
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
bugA,dummy_comment = \
libbe.command.util.bug_comment_from_user_id(
bugdir, params['bug-id'])
if comment.alt_id == None:
comment.storage = None
comment.alt_id = comment.uuid
- comment.storage = storage
+ comment.storage = bugdir.storage
comment.uuid = libbe.util.id.uuid_gen()
comment.save() # force onto disk under bugA
>>> import libbe.util.id
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> cmd = New()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
>>> libbe.util.id.uuid_gen = lambda: 'X'
- >>> ret = cmd.run(bd.storage, bd, args=['this is a test',])
+ >>> ret = cmd.run(args=['this is a test',])
Created bug with ID abc/X
>>> bd.flush_reload()
>>> bug = bd.bug_from_uuid('X')
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_bugdir = True
self.options.extend([
libbe.command.Option(name='reporter', short_name='r',
help='The user who reported the bug',
libbe.command.Argument(name='summary', metavar='SUMMARY')
])
- def _run(self, storage, bugdir, **params):
+ def _run(self, **params):
if params['summary'] == '-': # read summary from stdin
summary = self.stdin.readline()
else:
summary = params['summary']
+ bugdir = self._get_bugdir()
bug = bugdir.new_bug(summary=summary.strip())
if params['reporter'] != None:
bug.reporter = params['reporter']
>>> import libbe.bugdir
>>> bd = libbe.bugdir.SimpleBugDir(memory=False)
>>> cmd = Remove()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
>>> print bd.bug_from_uuid('b').status
closed
- >>> ret = cmd.run(bd.storage, bd, args=['/b'])
+ >>> ret = cmd.run(args=['/b'])
Removed bug abc/b
>>> bd.flush_reload()
>>> try:
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
- self.requires_bugdir = True
self.args.extend([
libbe.command.Argument(
name='bug-id', metavar='BUG-ID', default=None,
completion_callback=libbe.command.util.complete_bug_id),
])
- def _run(self, storage, bugdir, **params):
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
user_ids = []
for bug_id in params['bug-id']:
bug,dummy_comment = libbe.command.util.bug_comment_from_user_id(
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-"""Show or change a bug's severity level"""
-from libbe import cmdutil, bugdir, bug
-__desc__ = __doc__
-def execute(args, manipulate_encodings=True, restrict_file_access=False,
- dir="."):
- """
- >>> import os
- >>> bd = bugdir.SimpleBugDir()
- >>> os.chdir(bd.root)
- >>> execute(["a"], manipulate_encodings=False)
- minor
- >>> execute(["a", "wishlist"], manipulate_encodings=False)
- >>> execute(["a"], manipulate_encodings=False)
- wishlist
- >>> execute(["a", "none"], manipulate_encodings=False)
+import libbe
+import libbe.bug
+import libbe.command
+import libbe.command.util
+
+class Severity (libbe.command.Command):
+ """Change a bug's severity level
+
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> cmd = Severity()
+ >>> cmd._storage = bd.storage
+ >>> cmd._setup_io = lambda i_enc,o_enc : None
+ >>> cmd.stdout = sys.stdout
+
+ >>> bd.bug_from_uuid('a').severity
+ 'minor'
+ >>> ret = cmd.run(args=['wishlist', '/a'])
+ >>> bd.flush_reload()
+ >>> bd.bug_from_uuid('a').severity
+ 'wishlist'
+ >>> ret = cmd.run(args=['none', '/a'])
Traceback (most recent call last):
UserError: Invalid severity level: none
>>> bd.cleanup()
"""
- parser = get_parser()
- options, args = parser.parse_args(args)
- complete(options, args, parser)
- if len(args) not in (1,2):
- raise cmdutil.UsageError
- bd = bugdir.BugDir(from_disk=True,
- manipulate_encodings=manipulate_encodings,
- root=dir)
- bug = cmdutil.bug_from_id(bd, args[0])
- if len(args) == 1:
- print bug.severity
- elif len(args) == 2:
- try:
- bug.severity = args[1]
- except ValueError, e:
- if e.name != "severity":
- raise e
- raise cmdutil.UserError ("Invalid severity level: %s" % e.value)
+ name = 'severity'
-def get_parser():
- parser = cmdutil.CmdOptionParser("be severity BUG-ID [SEVERITY]")
- return parser
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.args.extend([
+ libbe.command.Argument(
+ name='severity', metavar='SEVERITY', default=None,
+ completion_callback=libbe.command.util.complete_severity),
+ libbe.command.Argument(
+ name='bug-id', metavar='BUG-ID', default=None,
+ repeatable=True,
+ completion_callback=libbe.command.util.complete_bug_id),
+ ])
+
+ def _run(self, **params):
+ bugdir = self._get_bugdir()
+ for bug_id in params['bug-id']:
+ bug,dummy_comment = \
+ libbe.command.util.bug_comment_from_user_id(bugdir, bug_id)
+ if bug.severity != params['severity']:
+ try:
+ bug.severity = params['severity']
+ except ValueError, e:
+ if e.name != 'severity':
+ raise e
+ raise libbe.command.UserError(
+ 'Invalid severity level: %s' % e.value)
+ return 0
-def help():
- longhelp=["""
+ def _long_help(self):
+ ret = ["""
Show or change a bug's severity level.
If no severity is specified, the current value is printed. If a severity level
Severity levels are:
"""]
- try: # See if there are any per-tree severity configurations
- bd = bugdir.BugDir(from_disk=True, manipulate_encodings=False)
- except bugdir.NoBugDir, e:
- pass # No tree, just show the defaults
- longest_severity_len = max([len(s) for s in bug.severity_values])
- for severity in bug.severity_values :
- description = bug.severity_description[severity]
- s = "%*s : %s\n" % (longest_severity_len, severity, description)
- longhelp.append(s)
- longhelp = ''.join(longhelp)
- return get_parser().help_str() + longhelp
-
-def complete(options, args, parser):
- for option,value in cmdutil.option_value_pairs(options, parser):
- if value == "--complete":
- # no argument-options at the moment, so this is future-proofing
- raise cmdutil.GetCompletions()
- for pos,value in enumerate(args):
- if value == "--complete":
- try: # See if there are any per-tree severity configurations
- bd = bugdir.BugDir(from_disk=True,
- manipulate_encodings=False)
- except bugdir.NoBugDir:
- bd = None
- if pos == 0: # fist positional argument is a bug id
- ids = []
- if bd != None:
- bd.load_all_bugs()
- filter = lambda bg : bg.active==True
- bugs = [bg for bg in bd if filter(bg)==True]
- ids = [bd.bug_shortname(bg) for bg in bugs]
- raise cmdutil.GetCompletions(ids)
- elif pos == 1: # second positional argument is a severity
- raise cmdutil.GetCompletions(bug.severity_values)
- raise cmdutil.GetCompletions()
+ try: # See if there are any per-tree severity configurations
+ bd = self._get_bugdir()
+ except NotImplementedError:
+ pass # No tree, just show the defaults
+ longest_severity_len = max([len(s) for s in libbe.bug.severity_values])
+ for severity in bug.severity_values :
+ description = bug.severity_description[severity]
+ ret.append('%*s : %s\n' \
+ % (longest_severity_len, severity, description))
+ return ''.join(ret)
print e
return 1
Class = getattr(module, command_name.capitalize())
- command = Class()
- command.ui = self
+ def gucs():
+ return libbe.storage.get_storage(options['repo'])
+ command = Class(get_unconnected_storage=gucs, ui=ui)
parser = CmdOptionParser(command)
- storage = None
- bugdir = None
- if command.requires_bugdir == True:
- assert command.requires_unconnected_storage == False
- storage = libbe.storage.get_storage(options['repo'])
- storage.connect()
- bugdir = libbe.bugdir.BugDir(storage, from_storage=True)
- elif command.requires_storage == True \
- or command.requires_unconnected_storage == True:
- storage = libbe.storage.get_storage(options['repo'])
- if command.requires_unconnected_storage == False:
- storage.connect()
try:
options,args = parser.parse_args(args[1:])
- command.run(storage, bugdir, options, args)
+ command.run(options, args)
except CallbackExit:
- if storage != None: storage.disconnect()
+ command.cleanup()
return 0
except libbe.command.UserError, e:
- if storage != None: storage.disconnect()
+ command.cleanup()
print 'ERROR:\n', e
return 1
if storage != None: storage.disconnect()
def restrict_file_access(bugdir, path):
- """
- Check that the file at path is inside bugdir.root. This is
- important if you allow other users to execute becommands with your
- username (e.g. if you're running be-handle-mail through your
- ~/.procmailrc). If this check wasn't made, a user could e.g.
- run
- be commit -b ~/.ssh/id_rsa "Hack to expose ssh key"
- which would expose your ssh key to anyone who could read the VCS
- log.
- """
- in_root = bugdir.vcs.path_in_root(path, bugdir.root)
- if in_root == False:
- raise UserError('file access restricted!\n %s not in %s'
- % (path, bugdir.root))
def parse_id(id):
"""