$ python -m cProfile -o profile be [command] [args]
$ python -c "import pstats; p=pstats.Stats('profile'); p.sort_stats('cumulative').print_stats(20)"
+
+It's often useful to toss a
+ import sys, traceback
+ print >> sys.stderr, '-'*60, '\n', '\n'.join(traceback.format_stack()[-10:])
+into expensive functions (e.g. libbe.util.subproc.invoke()), if you're
+not sure why they're being called.
def active(self):
return self.status in active_status_values
- def _get_user_id(self):
- if self.bugdir != None:
- return self.bugdir._get_user_id()
- return None
-
@_versioned_property(name="creator",
- doc="The user who entered the bug into the system",
- generator=_get_user_id)
+ doc="The user who entered the bug into the system")
def creator(): return {}
@_versioned_property(name="reporter",
elif assignee == '-':
assignee = params['user-id']
for bug_id in params['bug-id']:
- p = libbe.util.id.parse_user(bugdir, bug_id)
- if p['type'] != 'bug':
- raise libbe.command.UserError(
- '%s is a %s id, not a bug id' % (bug_id, p['type']))
- if p['bugdir'] != bugdir.uuid:
- raise libbe.command.UserError(
- "%s doesn't belong to this bugdir (%s)"
- % (bug_id, bugdir.uuid))
- bug = bugdir.bug_from_uuid(p['bug'])
+ bug,dummy_comment = \
+ libbe.command.util.bug_comment_from_user_id(bugdir, bug_id)
if bug.assigned != assignee:
bug.assigned = assignee
import codecs
import optparse
+import os.path
import sys
import libbe
import libbe.util.encoding
import libbe.util.plugin
-
class UserError(Exception):
pass
def __repr__(self):
return '<Option %s>' % self.__str__()
-class _DummyParser (object):
- def __init__(self, options):
- self.option_list = options
- self.option_groups = []
- for option in self.option_list: # add required methods and attributes
- option.dest = option.name
- option._short_opts = []
- if option.short_name != None:
- option._short_opts.append('-' + option.short_name)
- option._long_opts = ['--' + option.name]
- option.takes_value = lambda : option.arg != None
- if option.takes_value():
- option.metavar = option.arg.metavar
- else:
- option.metavar = None
+class _DummyParser (optparse.OptionParser):
+ def __init__(self, command):
+ optparse.OptionParser.__init__(self)
+ self.remove_option('-h')
+ self.command = command
+ self._command_opts = []
+ for option in self.command.options:
+ self._add_option(option)
+
+ def _add_option(self, option):
+ # from libbe.ui.command_line.CmdOptionParser._add_option
+ option.validate()
+ long_opt = '--%s' % option.name
+ if option.short_name != None:
+ short_opt = '-%s' % option.short_name
+ assert '_' not in option.name, \
+ 'Non-reconstructable option name %s' % option.name
+ kwargs = {'dest':option.name.replace('-', '_'),
+ 'help':option.help}
+ if option.arg == None or option.arg.type == 'bool':
+ kwargs['action'] = 'store_true'
+ kwargs['metavar'] = None
+ kwargs['default'] = False
+ else:
+ kwargs['type'] = option.arg.type
+ kwargs['action'] = 'store'
+ kwargs['metavar'] = option.arg.metavar
+ kwargs['default'] = option.arg.default
+ if option.short_name != None:
+ opt = optparse.Option(short_opt, long_opt, **kwargs)
+ else:
+ opt = optparse.Option(long_opt, **kwargs)
+ #option.takes_value = lambda : option.arg != None
+ opt._option = option
+ self._command_opts.append(opt)
+ self.add_option(opt)
class OptionFormatter (optparse.IndentedHelpFormatter):
- def __init__(self, options):
+ def __init__(self, command):
optparse.IndentedHelpFormatter.__init__(self)
- self.options = options
+ self.command = command
def option_help(self):
# based on optparse.OptionParser.format_option_help()
- parser = _DummyParser(self.options)
+ parser = _DummyParser(self.command)
self.store_option_strings(parser)
ret = []
ret.append(self.format_heading('Options'))
self.indent()
- for option in self.options:
+ for option in parser._command_opts:
ret.append(self.format_option(option))
ret.append('\n')
self.dedent()
usage: be command [options]
<BLANKLINE>
Options:
- -h HELP, --help=HELP Print a help message.
+ -h, --help Print a help message.
<BLANKLINE>
- --complete=STRING Print a list of possible completions.
+ --complete Print a list of possible completions.
<BLANKLINE>
- A detailed help message.
+ A detailed help message.
"""
name = 'command'
self.status = None
self.result = None
self.requires_bugdir = False
+ self.requires_storage = False
self.requires_unconnected_storage = False
+ self.restrict_file_access = True
self.input_encoding = None
self.output_encoding = None
self.options = [
return usage
def _option_help(self):
- o = OptionFormatter(self.options)
+ o = OptionFormatter(self)
return o.option_help().strip('\n')
def _long_help(self):
# finish a particular argument
return argument.completion_callback(self, argument, fragment)
return [] # the particular argument doesn't supply completion info
+
+ def check_restricted_access(self, storage, path):
+ """
+ Check that the file at path is inside bugdir.root. This is
+ important if you allow other users to execute becommands with
+ your username (e.g. if you're running be-handle-mail through
+ your ~/.procmailrc). If this check wasn't made, a user could
+ e.g. run
+ be commit -b ~/.ssh/id_rsa "Hack to expose ssh key"
+ which would expose your ssh key to anyone who could read the
+ VCS log.
+
+ >>> class DummyStorage (object): pass
+ >>> s = DummyStorage()
+ >>> s.repo = os.path.expanduser('~/x/')
+ >>> c = Command()
+ >>> try:
+ ... c.check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
+ ... except UserError, e:
+ ... assert str(e).startswith('file access restricted!'), str(e)
+ ... print 'we got the expected error'
+ we got the expected error
+ >>> c.check_restricted_access(s, os.path.expanduser('~/x'))
+ >>> c.check_restricted_access(s, os.path.expanduser('~/x/y'))
+ >>> c.restrict_file_access = False
+ >>> c.check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
+ """
+ if self.restrict_file_access == True:
+ path = os.path.abspath(path)
+ repo = os.path.abspath(storage.repo).rstrip(os.path.sep)
+ if path == repo or path.startswith(repo+os.path.sep):
+ return
+ raise UserError('file access restricted!\n %s not in %s'
+ % (path, repo))
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-"""Add a comment to a bug"""
-from libbe import cmdutil, bugdir, comment, editor
+
import os
import sys
-__desc__ = __doc__
-def execute(args, manipulate_encodings=True, restrict_file_access=False,
- dir="."):
- """
+import libbe
+import libbe.command
+import libbe.command.util
+import libbe.comment
+import libbe.ui.util.editor
+import libbe.util.id
+
+
+class Comment (libbe.command.Command):
+ """Add a comment to a bug
+
>>> import time
- >>> bd = bugdir.SimpleBugDir()
- >>> os.chdir(bd.root)
- >>> execute(["a", "This is a comment about a"], manipulate_encodings=False)
- >>> bd._clear_bugs()
- >>> bug = cmdutil.bug_from_id(bd, "a")
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> cmd = Comment()
+ >>> cmd._setup_io = lambda i_enc,o_enc : None
+ >>> cmd.stdout = sys.stdout
+
+ >>> cmd.run(bd.storage, bd, {'user-id':u'Fran\\xe7ois'},
+ ... ['/a', 'This is a comment about a'])
+ >>> bd.flush_reload()
+ >>> bug = bd.bug_from_uuid('a')
>>> bug.load_comments(load_full=False)
>>> comment = bug.comment_root[0]
+ >>> comment.id.storage() == comment.uuid
+ True
>>> print comment.body
This is a comment about a
<BLANKLINE>
- >>> comment.author == bd.user_id
- True
+ >>> comment.author
+ u'Fran\\xe7ois'
>>> comment.time <= int(time.time())
True
>>> comment.in_reply_to is None
True
>>> if 'EDITOR' in os.environ:
- ... del os.environ["EDITOR"]
- >>> execute(["b"], manipulate_encodings=False)
+ ... del os.environ['EDITOR']
+ >>> cmd.run(bd.storage, bd, {'user-id':u'Frank'}, ['/b'])
Traceback (most recent call last):
UserError: No comment supplied, and EDITOR not specified.
- >>> os.environ["EDITOR"] = "echo 'I like cheese' > "
- >>> execute(["b"], manipulate_encodings=False)
- >>> bd._clear_bugs()
- >>> bug = cmdutil.bug_from_id(bd, "b")
+ >>> os.environ['EDITOR'] = "echo 'I like cheese' > "
+ >>> cmd.run(bd.storage, bd, {'user-id':u'Frank'}, ['/b'])
+ >>> bd.flush_reload()
+ >>> bug = bd.bug_from_uuid('b')
>>> bug.load_comments(load_full=False)
>>> comment = bug.comment_root[0]
>>> print comment.body
<BLANKLINE>
>>> bd.cleanup()
"""
- parser = get_parser()
- options, args = parser.parse_args(args)
- complete(options, args, parser)
- if len(args) == 0:
- raise cmdutil.UsageError("Please specify a bug or comment id.")
- if len(args) > 2:
- raise cmdutil.UsageError("Too many arguments.")
+ name = 'comment'
- shortname = args[0]
-
- bd = bugdir.BugDir(from_disk=True,
- manipulate_encodings=manipulate_encodings,
- root=dir)
- bug, parent = cmdutil.bug_comment_from_id(bd, shortname)
-
- if len(args) == 1: # try to launch an editor for comment-body entry
- try:
- if parent == bug.comment_root:
- parent_body = bug.summary+"\n"
- else:
- parent_body = parent.body
- estr = "Please enter your comment above\n\n> %s\n" \
- % ("\n> ".join(parent_body.splitlines()))
- body = editor.editor_string(estr)
- except editor.CantFindEditor, e:
- raise cmdutil.UserError, "No comment supplied, and EDITOR not specified."
- if body is None:
- raise cmdutil.UserError("No comment entered.")
- elif args[1] == '-': # read body from stdin
- binary = not (options.content_type == None
- or options.content_type.startswith("text/"))
- if not binary:
- body = sys.stdin.read()
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.requires_bugdir = True
+ self.options.extend([
+ libbe.command.Option(name='author', short_name='a',
+ help='Set the comment author',
+ arg=libbe.command.Argument(
+ name='author', metavar='AUTHOR')),
+ libbe.command.Option(name='alt-id',
+ help='Set an alternate comment ID',
+ arg=libbe.command.Argument(
+ name='alt-id', metavar='ID')),
+ libbe.command.Option(name='content-type', short_name='c',
+ help='Set comment content-type (e.g. text/plain)',
+ arg=libbe.command.Argument(name='content-type',
+ metavar='MIME')),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='id', metavar='ID', default=None,
+ completion_callback=libbe.command.util.complete_bug_comment_id),
+ libbe.command.Argument(
+ name='comment', metavar='COMMENT', default=None,
+ optional=True,
+ completion_callback=libbe.command.util.complete_assigned),
+ ])
+ def _run(self, storage, bugdir, **params):
+ bug,parent = \
+ libbe.command.util.bug_comment_from_user_id(bugdir, params['id'])
+ if params['comment'] == None:
+ # try to launch an editor for comment-body entry
+ try:
+ if parent == bug.comment_root:
+ parent_body = bug.summary+'\n'
+ else:
+ parent_body = parent.body
+ estr = 'Please enter your comment above\n\n> %s\n' \
+ % ('\n> '.join(parent_body.splitlines()))
+ body = libbe.ui.util.editor.editor_string(estr)
+ except libbe.ui.util.editor.CantFindEditor, e:
+ raise libbe.command.UserError(
+ 'No comment supplied, and EDITOR not specified.')
+ if body is None:
+ raise libbe.command.UserError('No comment entered.')
+ elif params['comment'] == '-': # read body from stdin
+ binary = not (params['content-type'] == None
+ or params['content-type'].startswith("text/"))
+ if not binary:
+ body = self.stdin.read()
+ if not body.endswith('\n'):
+ body += '\n'
+ else: # read-in without decoding
+ body = sys.stdin.read()
+ else: # body given on command line
+ body = params['comment']
if not body.endswith('\n'):
body+='\n'
- else: # read-in without decoding
- body = sys.__stdin__.read()
- else: # body = arg[1]
- body = args[1]
- if not body.endswith('\n'):
- body+='\n'
-
- new = parent.new_reply(body=body, content_type=options.content_type)
- if options.author != None:
- new.author = options.author
- if options.alt_id != None:
- new.alt_id = options.alt_id
+ if params['author'] == None:
+ params['author'] = params['user-id']
-def get_parser():
- parser = cmdutil.CmdOptionParser("be comment ID [COMMENT]")
- parser.add_option("-a", "--author", metavar="AUTHOR", dest="author",
- help="Set the comment author", default=None)
- parser.add_option("--alt-id", metavar="ID", dest="alt_id",
- help="Set an alternate comment ID", default=None)
- parser.add_option("-c", "--content-type", metavar="MIME", dest="content_type",
- help="Set comment content-type (e.g. text/plain)", default=None)
- return parser
+ new = parent.new_reply(body=body)
+ for key in ['alt-id', 'author', 'content-type']:
+ if params[key] != None:
+ setattr(new, key, params[key])
-longhelp="""
+ def _long_help(self):
+ return """
To add a comment to a bug, use the bug ID as the argument. To reply
to another comment, specify the comment name (as shown in "be show"
output). COMMENT, if specified, should be either the text of your
COMMENT is unspecified and EDITOR is not set, no comment will be
created.
"""
-
-def help():
- return get_parser().help_str() + longhelp
-
-def complete(options, args, parser):
- for option,value in cmdutil.option_value_pairs(options, parser):
- if value == "--complete":
- # no argument-options at the moment, so this is future-proofing
- raise cmdutil.GetCompletions()
- for pos,value in enumerate(args):
- if value == "--complete":
- if pos == 0: # fist positional argument is a bug or comment id
- if len(args) >= 2:
- partial = args[1].split(':')[0] # take only bugid portion
- else:
- partial = ""
- ids = []
- try:
- bd = bugdir.BugDir(from_disk=True,
- manipulate_encodings=False)
- bugs = []
- for uuid in bd.uuids():
- if uuid.startswith(partial):
- bug = bd.bug_from_uuid(uuid)
- if bug.active == True:
- bugs.append(bug)
- for bug in bugs:
- shortname = bd.bug_shortname(bug)
- ids.append(shortname)
- bug.load_comments(load_full=False)
- for id,comment in bug.comment_shortnames(shortname):
- ids.append(id)
- except bugdir.NoBugDir:
- pass
- raise cmdutil.GetCompletions(ids)
- raise cmdutil.GetCompletions()
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-"""Commit the currently pending changes to the repository"""
-from libbe import cmdutil, bugdir, editor, vcs
+
import sys
-__desc__ = __doc__
-def execute(args, manipulate_encodings=True, restrict_file_access=False,
- dir="."):
- """
- >>> import os
- >>> from libbe import bug
- >>> bd = bugdir.SimpleBugDir()
- >>> os.chdir(bd.root)
- >>> full_path = "testfile"
- >>> test_contents = "A test file"
- >>> bd.vcs.set_file_contents(full_path, test_contents)
- >>> execute(["Added %s." % (full_path)], manipulate_encodings=False) # doctest: +ELLIPSIS
+import libbe
+import libbe.bugdir
+import libbe.command
+import libbe.command.util
+import libbe.storage
+import libbe.ui.util.editor
+
+
+class Commit (libbe.command.Command):
+ """Commit the currently pending changes to the repository
+
+ >>> import os, sys
+ >>> import libbe.storage.vcs
+ >>> import libbe.storage.vcs.base
+ >>> import libbe.util.utility
+ >>> cmd = Commit()
+ >>> cmd._setup_io = lambda i_enc,o_enc : None
+ >>> cmd.stdout = sys.stdout
+
+ >>> dir = libbe.util.utility.Dir()
+ >>> vcs = libbe.storage.vcs.installed_vcs()
+ >>> vcs.repo = dir.path
+ >>> vcs.init()
+ >>> vcs.connect()
+ >>> if vcs.name in libbe.storage.vcs.base.VCS_ORDER:
+ ... bd = libbe.bugdir.BugDir(vcs, from_storage=False)
+ ... bd.extra_strings = ['hi there']
+ ... cmd.run(vcs, None, {'user-id':'Joe'},
+ ... ['Making a commit']) # doctest: +ELLIPSIS
+ ... else:
+ ... print 'Committed ...'
Committed ...
- >>> bd.cleanup()
+ >>> vcs.disconnect()
+ >>> vcs.destroy()
+ >>> dir.cleanup()
"""
- parser = get_parser()
- options, args = parser.parse_args(args)
- cmdutil.default_complete(options, args, parser)
- if len(args) != 1:
- raise cmdutil.UsageError("Please supply a commit message")
- bd = bugdir.BugDir(from_disk=True,
- manipulate_encodings=manipulate_encodings,
- root=dir)
- if args[0] == '-': # read summary from stdin
- assert options.body != "EDITOR", \
- "Cannot spawn and editor when the summary is using stdin."
- summary = sys.stdin.readline()
- else:
- summary = args[0]
- if options.body == None:
- body = None
- elif options.body == "EDITOR":
- body = editor.editor_string("Please enter your commit message above")
- else:
- if restrict_file_access == True:
- cmdutil.restrict_file_access(bd, options.body)
- body = bd.vcs.get_file_contents(options.body, allow_no_vcs=True)
- try:
- revision = bd.vcs.commit(summary, body=body,
- allow_empty=options.allow_empty)
- except vcs.EmptyCommit, e:
- print e
- return 1
- else:
- print "Committed %s" % revision
+ name = 'commit'
-def get_parser():
- parser = cmdutil.CmdOptionParser("be commit COMMENT")
- parser.add_option("-b", "--body", metavar="FILE", dest="body",
- help='Provide a detailed body for the commit message. In the special case that FILE == "EDITOR", spawn an editor to enter the body text (in which case you cannot use stdin for the summary)', default=None)
- parser.add_option("-a", "--allow-empty", dest="allow_empty",
- help="Allow empty commits",
- default=False, action="store_true")
- return parser
+ def __init__(self, *args, **kwargs):
+ libbe.command.Command.__init__(self, *args, **kwargs)
+ self.requires_storage = True
+ self.options.extend([
+ libbe.command.Option(name='body', short_name='b',
+ help='Provide the detailed body for the commit message. In the special case that FILE == "EDITOR", spawn an editor to enter the body text (in which case you cannot use stdin for the summary)',
+ arg=libbe.command.Argument(name='body', metavar='FILE',
+ completion_callback=libbe.command.util.complete_path)),
+ libbe.command.Option(name='allow-empty', short_name='a',
+ help='Allow empty commits'),
+ ])
+ self.args.extend([
+ libbe.command.Argument(
+ name='comment', metavar='COMMENT', default=None),
+ ])
-longhelp="""
+ def _run(self, storage, bugdir=None, **params):
+ if params['comment'] == '-': # read summary from stdin
+ assert params['body'] != 'EDITOR', \
+ 'Cannot spawn and editor when the summary is using stdin.'
+ summary = sys.stdin.readline()
+ else:
+ summary = params['comment']
+ if params['body'] == None:
+ body = None
+ elif params['body'] == 'EDITOR':
+ body = libbe.ui.util.editor.editor_string(
+ 'Please enter your commit message above')
+ else:
+ self.check_restricted_access(storage, params['body'])
+ body = libbe.util.encoding.get_file_contents(
+ params['body'], decode=True)
+ try:
+ revision = storage.commit(summary, body=body,
+ allow_empty=params['allow-empty'])
+ print >> self.stdout, 'Committed %s' % revision
+ except libbe.storage.EmptyCommit, e:
+ print >> self.stdout, e
+ return 1
+
+ def _long_help(self):
+ return """
Commit the current repository status. The summary specified on the
commandline is a string (only one line) that describes the commit
briefly or "-", in which case the string will be read from stdin.
"""
-
-def help():
- return get_parser().help_str() + longhelp
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Generate a static HTML dump of the current repository status"""
from libbe import cmdutil, bugdir, bug
+import libbe.util.encoding
import codecs, os, os.path, re, string, time
import xml.sax.saxutils, htmlentitydefs
return dir_path
def _write_file(self, content, path_array, mode='w'):
- f = codecs.open(os.path.join(*path_array), mode, self.encoding)
- f.write(content)
- f.close()
+ return libbe.util.encoding.set_file_contents(
+ os.path.join(*path_array), content, mode, self.encoding)
def _read_file(self, path_array, mode='r'):
- f = codecs.open(os.path.join(*path_array), mode, self.encoding)
- content = f.read()
- f.close()
- return content
+ return libbe.util.encoding.get_file_contents(
+ os.path.join(*path_array), mode, self.encoding, decode=True)
def write_default_template(self, out_dir):
if self.verbose:
import libbe
import libbe.bugdir
import libbe.command
-import libbe.command.util
import libbe.storage
class Init (libbe.command.Command):
>>> dir = libbe.util.utility.Dir()
>>> vcs = libbe.storage.vcs.installed_vcs()
+ >>> vcs.repo = dir.path
+ >>> vcs._vcs_init(vcs.repo)
>>> if vcs.name in libbe.storage.vcs.base.VCS_ORDER:
- ... vcs.repo = dir.path
- ... vcs._vcs_init(vcs.repo)
... cmd.run(vcs) # doctest: +ELLIPSIS
... else:
+ ... vcs.init()
+ ... vcs.connect()
... print 'Using ... for revision control.\\nDirectory initialized.'
Using ... for revision control.
BE repository initialized.
>>> vcs.destroy()
>>> dir.cleanup()
"""
-
name = 'init'
def __init__(self, *args, **kwargs):
return [fragment]
def complete_bug_id(command, argument, fragment=None):
return [fragment]
+def complete_bug_comment_id(command, argument, fragment=None):
+ return [fragment]
def select_values(string, possible_values, name="unkown"):
"""
% (name, value, possible_values))
possible_values = whitelisted_values
return possible_values
+
+def bug_comment_from_user_id(bugdir, id):
+ p = libbe.util.id.parse_user(bugdir, id)
+ if not p['type'] in ['bug', 'comment']:
+ raise libbe.command.UserError(
+ '%s is a %s id, not a bug or comment id' % (id, p['type']))
+ if p['bugdir'] != bugdir.uuid:
+ raise libbe.command.UserError(
+ "%s doesn't belong to this bugdir (%s)"
+ % (id, bugdir.uuid))
+ bug = bugdir.bug_from_uuid(p['bug'])
+ if 'comment' in p:
+ comment = bug.comment_from_uuid(p['comment'])
+ else:
+ comment = bug.comment_root
+ return (bug, comment)
from disk *now*, rather than waiting and lazy loading as required.
"""
uuids = []
- for id in libbe.util.id.child_uuids(bug.storage.children()):
+ for id in libbe.util.id.child_uuids(
+ bug.storage.children(
+ bug.id.storage())):
uuids.append(id)
comments = []
for uuid in uuids:
doc="Alternate ID for linking imported comments. Internally comments are linked (via In-reply-to) to the parent's UUID. However, these UUIDs are generated internally, so Alt-id is provided as a user-controlled linking target.")
def alt_id(): return {}
- def _get_user_id(self):
- if self.bug != None:
- return self.bug._get_user_id()
- return None
-
@_versioned_property(name="Author",
- doc="The author of the comment",
- generator=_get_user_id)
+ doc="The author of the comment")
def author(): return {}
@_versioned_property(name="In-reply-to",
reply.in_reply_to = self.uuid
self.append(reply)
- def new_reply(self, body=None, content_type=None):
+ def new_reply(self, body=None):
"""
>>> comm = Comment(bug=None, body="Some insightful remarks")
>>> repA = comm.new_reply("Critique original comment")
True
"""
reply = Comment(self.bug, body=body)
- if content_type != None: # set before saving body to decide binary format
- reply.content_type = content_type
- if reply.storage != None and reply.storage.is_writeable():
- reply.save()
self.add_reply(reply)
return reply
encoding = default_encoding
config = ConfigParser.ConfigParser()
if os.path.exists(path()) == False: # touch file or config
- open(path(), "w").close() # read chokes on missing file
- f = codecs.open(path(), "r", encoding)
+ open(path(), 'w').close() # read chokes on missing file
+ f = codecs.open(path(), 'r', encoding)
config.readfp(f, path())
f.close()
if value is not None:
config.set(section, name, value)
else:
config.remove_option(section, name)
- f = codecs.open(path(), "w", encoding)
+ f = codecs.open(path(), 'w', encoding)
config.write(f)
f.close()
if encoding == None:
encoding = default_encoding
config = ConfigParser.ConfigParser()
- f = codecs.open(path(), "r", encoding)
+ f = codecs.open(path(), 'r', encoding)
config.readfp(f, path())
f.close()
try:
self.be_dir = os.path.join(
self.repo, self._cached_path_id._spacer_dirs[0])
self._cached_path_id.root(self.repo)
- self._rooted == True
+ self._rooted = True
def _init(self):
"""
raise VCSUnableToRoot(self)
if self._vcs_detect(self.repo) == False:
self._vcs_init(self.repo)
- self.root()
+ if self._rooted == False:
+ self.root()
os.mkdir(self.be_dir)
self._vcs_add(self._u_rel_path(self.be_dir))
self._cached_path_id.init()
Split the commitfile created in self.commit() back into
summary and header lines.
"""
- f = codecs.open(commitfile, "r", self.encoding)
+ f = codecs.open(commitfile, 'r', self.encoding)
summary = f.readline()
body = f.read()
body.lstrip('\n')
def __init__(self, command):
self.command = command
optparse.OptionParser.__init__(self)
- self.disable_interspersed_args()
self.remove_option('-h')
+ self.disable_interspersed_args()
self._option_by_name = {}
for option in self.command.options:
self._add_option(option)
if option.arg == None: # a callback option
kwargs['action'] = 'callback'
kwargs['callback'] = self.callback
+ elif option.arg.type == 'bool':
+ kwargs['action'] = 'store_true'
+ kwargs['metavar'] = None
+ kwargs['default'] = False
else:
- if option.arg.type == 'bool':
- kwargs['action'] = 'store_true'
- else:
- kwargs['type'] = option.arg.type
- kwargs['action'] = 'store'
+ kwargs['type'] = option.arg.type
+ kwargs['action'] = 'store'
kwargs['metavar'] = option.arg.metavar
kwargs['default'] = option.arg.default
if option.short_name != None:
raise CallbackExit
def complete(self, argument=None, fragment=None):
- print argument, fragment
comps = self.command.complete(argument, fragment)
if fragment != None:
comps = [c for c in comps if c.startswith(fragment)]
usage: be [options] [COMMAND [command-options] [COMMAND-ARGS ...]]
<BLANKLINE>
Options:
- -h HELP, --help=HELP Print a help message.
+ -h, --help Print a help message.
<BLANKLINE>
- --complete=STRING Print a list of possible completions.
+ --complete Print a list of possible completions.
<BLANKLINE>
- --version=VERSION Print version string.
+ --version Print version string.
+ ...
+ >>> try:
+ ... options,args = p.parse_args(['--complete']) # doctest: +ELLIPSIS
+ ... except CallbackExit:
+ ... print ' got callback'
+ --help
+ --complete
+ --version
...
- >>> options,args = p.parse_args(['--complete']) # doctest: +ELLIPSIS
+ subscribe
+ tag
+ target
+ got callback
"""
name = 'be'
storage = libbe.storage.get_storage(options['repo'])
storage.connect()
bugdir = libbe.bugdir.BugDir(storage, from_storage=True)
- elif: command.requires_unconnected_storage == True:
+ elif command.requires_storage == True \
+ or command.requires_unconnected_storage == True:
storage = libbe.storage.get_storage(options['repo'])
+ if command.requires_unconnected_storage == False:
+ storage.connect()
try:
options,args = parser.parse_args(args[1:])
command.run(storage, bugdir, options, args)
import tempfile
import libbe
+import libbe.util.encoding
+
if libbe.TESTING == True:
import doctest
-default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding()
-
comment_marker = u"== Anything below this line will be ignored\n"
class CantFindEditor(Exception):
>>> del os.environ["VISUAL"]
"""
if encoding == None:
- encoding = default_encoding
+ encoding = libbe.util.encoding.get_filesystem_encoding()
for name in ('VISUAL', 'EDITOR'):
try:
editor = os.environ[name]
os.close(fhandle)
oldmtime = os.path.getmtime(fname)
os.system("%s %s" % (editor, fname))
- f = codecs.open(fname, "r", encoding)
- output = trimmed_string(f.read())
- f.close()
+ output = libbe.util.encoding.get_file_contents(
+ fname, encoding=encoding, decode=True)
if output.rstrip('\n') == "":
output = None
finally:
import codecs
import locale
import sys
+import types
import libbe
if libbe.TESTING == True:
except LookupError:
return False
+def get_file_contents(path, mode='r', encoding=None, decode=False):
+ if decode == True:
+ if encoding == None:
+ encoding = get_filesystem_encoding()
+ f = codecs.open(path, mode, encoding)
+ else:
+ f = open(path, mode)
+ contents = f.read()
+ f.close()
+ return contents
+
+def set_file_contents(path, contents, mode='w', encoding=None):
+ if type(value) == types.UnicodeType:
+ if encoding == None:
+ encoding = get_filesystem_encoding()
+ f = codecs.open(path, mode, encoding)
+ else:
+ f = open(path, mode)
+ f.write(contents)
+ f.close()
+
if libbe.TESTING == True:
suite = doctest.DocTestSuite()