import os
import os.path
import errno
+import sys
import time
import types
try: # import core module, Python >= 2.5
check_fn=lambda s: s in status_values,
require_save=True)
def status(): return {}
-
+
@property
def active(self):
return self.status in active_status_values
if self.bugdir != None:
self.storage = self.bugdir.storage
if from_storage == False:
- if self.storage != None and self.storage.is_writeable():
+ if self.storage != None and self.storage.is_writeable():
self.save()
def __repr__(self):
severitychar = self.severity[0]
chars = "%c%c" % (statuschar, severitychar)
bugout = "%s:%s: %s" % (self.id.user(),chars,self.summary.rstrip('\n'))
-
+
if show_comments == True:
self.comment_root.sort(cmp=libbe.comment.cmp_time, reverse=True)
comout = self.comment_root.string_thread(flatten=False)
self.explicit_attrs.append(attr_name)
setattr(self, attr_name, text)
elif verbose == True:
- print >> sys.stderr, "Ignoring unknown tag %s in %s" \
+ print >> sys.stderr, 'Ignoring unknown tag %s in %s' \
% (child.tag, comment.tag)
if uuid != self.uuid:
if not hasattr(self, 'alt_id') or self.alt_id == None:
except KeyError:
if ignore_missing_references == True:
print >> sys.stderr, \
- "Ignoring missing reference to %s" % c.in_reply_to
+ 'Ignoring missing reference to %s' % c.in_reply_to
parent = default_parent
if parent.uuid != comment.INVALID_UUID:
c.in_reply_to = parent.uuid
if settings_mapfile == None:
settings_mapfile = \
self.storage.get(self.id.storage("values"), default="\n")
- self.settings = mapfile.parse(settings_mapfile)
+ try:
+ self.settings = mapfile.parse(settings_mapfile)
+ except mapfile.InvalidMapfileContents, e:
+ raise Exception('Invalid settings file for bug %s\n'
+ '(BE version missmatch?)' % self.id.user())
self._setup_saved_settings()
def save_settings(self):
"""
Save any loaded contents to storage. Because of lazy loading
of comments, this is actually not too inefficient.
-
+
However, if self.storage.is_writeable() == True, then any
changes are automatically written to storage as soon as they
happen, so calling this method will just waste time (unless
# next _get_comment_root returns a fresh version. Turn of
# writing temporarily so we don't write our blank comment
# tree to disk.
- w = self.storage.writeable
+ w = self.storage.writeable
self.storage.writeable = False
self.comment_root = None
self.storage.writeable = w
def remove(self):
self.storage.recursive_remove(self.id.storage())
-
+
# methods for managing comments
def uuids(self):
val_2 = getattr(bug_2, attr)
if val_1 == None: val_1 = None
if val_2 == None: val_2 = None
-
+
if invert == True :
return -cmp(val_1, val_2)
else :
if val != 0 :
return val
return 0
-
+
cmp_full = BugCompoundComparator()
if settings_mapfile == None:
settings_mapfile = \
self.storage.get(self.id.storage('settings'), default='\n')
- self.settings = mapfile.parse(settings_mapfile)
+ try:
+ self.settings = mapfile.parse(settings_mapfile)
+ except mapfile.InvalidMapfileContents, e:
+ raise Exception('Invalid settings file for bugdir %s\n'
+ '(BE version missmatch?)' % self.id.user())
self._setup_saved_settings()
#self._setup_user_id(self.user_id)
self._setup_severities(self.severities)
Duplicate bugdirs are read-only copies used for generating
diffs between revisions.
"""
- dbd = copy.copy(self)
- dbd.storage = copy.copy(self.storage)
- dbd._bug_map = copy.copy(self._bug_map)
- dbd.storage.writeable = False
- added,changed,removed = self.storage.changed_since(revision)
- for id in added:
- pass
- for id in removed:
- pass
- for id in changed:
- parsed = libbe.util.id.parse_id(id)
- if parsed['type'] == 'bugdir':
- assert parsed['remaining'] == ['settings'], parsed['remaining']
- dbd._settings = copy.copy(self._settings)
- mf = self.storage.get(self.id.storage('settings'), default='\n',
- revision=revision)
- dbd.load_settings(mf)
- else:
- if parsed['bug'] not in self:
- self._load_bug(parsed['bug'])
- dbd._load_bug(parsed['bug'])
- else:
- bug = copy.copy(self._bug_map[parsed['bug']])
- bug.settings = copy.copy(bug.settings)
- dbd._bug_map[parsed['bug']] = bug
- if parsed['type'] == 'bug':
- assert parsed['remaining'] == ['values'], parsed['remaining']
- mf = self.storage.get(self.id.storage('values'), default='\n',
- revision=revision)
- bug.load_settings(mf)
- elif parsed['type'] == 'comment':
- assert parsed['remaining'] in [['values'], ['body']], \
- parsed['remaining']
- bug.comment_root = copy.deepcopy(bug.comment_root)
- comment = bug.comment_from_uuid(parsed['comment'])
- if parsed['remaining'] == ['values']:
- mf = self.storage.get(self.id.storage('values'), default='\n',
- revision=revision)
- comment.load_settings(mf)
- else:
- body = self.storage.get(self.id.storage('body'), default='\n',
- revision=revision)
- comment.body = body
- else:
- assert 1==0, 'Unkown type "%s" for id "%s"' % (type, id)
- dbd.storage.readable = False # so we won't read in added bugs, etc.
+ s = copy.deepcopy(self.storage)
+ s.writeable = False
+ class RevisionedStorageGet (object):
+ def __init__(self, storage, default_revision):
+ self.s = storage
+ self.sget = self.s.get
+ self.r = default_revision
+ def get(self, *args, **kwargs):
+ if not 'revision' in kwargs or kwargs['revision'] == None:
+ kwargs['revision'] = self.r
+ return self.sget(*args, **kwargs)
+ rsg = RevisionedStorageGet(s, revision)
+ s.get = rsg.get
+ dbd = BugDir(s, from_storage=True)
+# dbd = copy.copy(self)
+# dbd.storage = copy.copy(self.storage)
+# dbd._bug_map = copy.copy(self._bug_map)
+# dbd.storage.writeable = False
+# added,changed,removed = self.storage.changed_since(revision)
+# for id in added:
+# pass
+# for id in removed:
+# pass
+# for id in changed:
+# parsed = libbe.util.id.parse_id(id)
+# if parsed['type'] == 'bugdir':
+# assert parsed['remaining'] == ['settings'], parsed['remaining']
+# dbd._settings = copy.copy(self._settings)
+# mf = self.storage.get(self.id.storage('settings'), default='\n',
+# revision=revision)
+# dbd.load_settings(mf)
+# else:
+# if parsed['bug'] not in self:
+# self._load_bug(parsed['bug'])
+# dbd._load_bug(parsed['bug'])
+# else:
+# bug = copy.copy(self._bug_map[parsed['bug']])
+# bug.settings = copy.copy(bug.settings)
+# dbd._bug_map[parsed['bug']] = bug
+# if parsed['type'] == 'bug':
+# assert parsed['remaining'] == ['values'], parsed['remaining']
+# mf = self.storage.get(self.id.storage('values'), default='\n',
+# revision=revision)
+# bug.load_settings(mf)
+# elif parsed['type'] == 'comment':
+# assert parsed['remaining'] in [['values'], ['body']], \
+# parsed['remaining']
+# bug.comment_root = copy.deepcopy(bug.comment_root)
+# comment = bug.comment_from_uuid(parsed['comment'])
+# if parsed['remaining'] == ['values']:
+# mf = self.storage.get(self.id.storage('values'), default='\n',
+# revision=revision)
+# comment.load_settings(mf)
+# else:
+# body = self.storage.get(self.id.storage('body'), default='\n',
+# revision=revision)
+# comment.body = body
+# else:
+# assert 1==0, 'Unkown type "%s" for id "%s"' % (type, id)
+# dbd.storage.readable = False # so we won't read in added bugs, etc.
return dbd
if libbe.TESTING == True:
['a', 'b']
>>> bugdir.cleanup()
"""
- def __init__(self, memory=True):
+ def __init__(self, memory=True, versioned=False):
if memory == True:
storage = None
else:
dir = utility.Dir()
self._dir_ref = dir # postpone cleanup since dir.cleanup() removes dir.
- storage = libbe.storage.base.Storage(dir.path)
+ if versioned == False:
+ storage = libbe.storage.base.Storage(dir.path)
+ else:
+ storage = libbe.storage.base.VersionedStorage(dir.path)
storage.init()
storage.connect()
BugDir.__init__(self, storage=storage, uuid='abc123')
self.storage.disconnect()
self.storage.connect()
self._clear_bugs()
-
+
# class BugDirTestCase(unittest.TestCase):
# def setUp(self):
# self.dir = utility.Dir()
# "Invalid comment: %d\n%s" % (index, comment))
# def testSyncedComments(self):
# self.testComments(sync_with_disk=True)
-
+
class SimpleBugDirTestCase (unittest.TestCase):
def setUp(self):
# create a pre-existing bugdir in a temporary directory
uuids = sorted([bug.uuid for bug in bugdir])
self.failUnless(uuids == [], uuids)
bugdir.cleanup()
-
+
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
self.stdout = codecs.getwriter(output_encoding)(sys.stdout)
self.stdout.encoding = output_encoding
- def help(self, *args):
+ def help(self, *args):
return '\n\n'.join([self._usage(),
self._option_help(),
self._long_help().rstrip('\n')])
def _get_storage(self):
"""
Callback for use by commands that need it.
-
+
Note that with the current implementation,
_get_unconnected_storage() will not work after this method
runs, but that shouldn't be an issue for any command I can
class Commit (libbe.command.Command):
"""Commit the currently pending changes to the repository
- >>> import os, sys
- >>> import libbe.storage.vcs
- >>> import libbe.storage.vcs.base
- >>> import libbe.util.utility
+ >>> import sys
+ >>> import libbe.bugdir
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False, versioned=True)
>>> cmd = Commit()
+ >>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> cmd.stdout = sys.stdout
- >>> dir = libbe.util.utility.Dir()
- >>> vcs = libbe.storage.vcs.installed_vcs()
- >>> vcs.repo = dir.path
- >>> vcs.init()
- >>> vcs.connect()
- >>> cmd._storage = vcs
- >>> if vcs.name in libbe.storage.vcs.base.VCS_ORDER:
- ... bd = libbe.bugdir.BugDir(vcs, from_storage=False)
- ... bd.extra_strings = ['hi there']
- ... cmd.run({'user-id':'Joe'}, ['Making a commit']) # doctest: +ELLIPSIS
- ... else:
- ... print 'Committed ...'
+ >>> bd.extra_strings = ['hi there']
+ >>> bd.flush_reload()
+ >>> cmd.run({'user-id':'Joe'}, ['Making a commit']) # doctest: +ELLIPSIS
Committed ...
- >>> vcs.disconnect()
- >>> vcs.destroy()
- >>> dir.cleanup()
+ >>> bd.cleanup()
"""
name = 'commit'
>>> import sys
>>> import libbe.bugdir
- >>> bd = libbe.bugdir.SimpleBugDir(memory=False)
+ >>> bd = libbe.bugdir.SimpleBugDir(memory=False, versioned=True)
>>> cmd = Diff()
>>> cmd._storage = bd.storage
>>> cmd._setup_io = lambda i_enc,o_enc : None
>>> bug = bd.bug_from_uuid('a')
>>> bug.status = 'closed'
>>> changed = bd.storage.commit('Closed bug a')
- >>> if bd.storage.versioned == True:
- ... ret = cmd.run(args=[original])
- ... else:
- ... print 'Modified bugs:\\n a:cm: Bug A\\n Changed bug settings:\\n status: open -> closed'
+ >>> ret = cmd.run(args=[original])
Modified bugs:
- a:cm: Bug A
+ abc/a:cm: Bug A
Changed bug settings:
status: open -> closed
- >>> if bd.storage.versioned == True:
- ... ret = cmd.run({'subscribe':'%(bugdir_id)s:mod', 'uuids':True}, [original])
- ... else:
- ... print 'a'
+ >>> ret = cmd.run({'subscribe':'%(bugdir_id)s:mod', 'uuids':True}, [original])
a
- >>> if bd.storage.versioned == False:
- ... ret = cmd.run(args=[original])
- ... else:
- ... raise libbe.command.UserError('This repository not revision-controlled.')
+ >>> bd.storage.versioned = False
+ >>> ret = cmd.run(args=[original])
Traceback (most recent call last):
...
UserError: This repository is not revision-controlled.
if params['repo'] == None:
if params['revision'] == None: # get the most recent revision
params['revision'] = bugdir.storage.revision_id(-1)
- old_bd = bugdir.duplicate_bugdir(params['revision']) # TODO
+ old_bd = bugdir.duplicate_bugdir(params['revision'])
else:
old_storage = libbe.storage.get_storage(params['repo'])
old_storage.connect()
raise libbe.command.UserError(
'%s is not revision-controlled.'
% storage.repo)
- old_bd = old_bd_current.duplicate_bugdir(revision) # TODO
- d = libbe.diff.Diff(old_bd, bugir)
+ old_bd = old_bd_current.duplicate_bugdir(revision)
+ d = libbe.diff.Diff(old_bd, bugdir)
tree = d.report_tree(subscriptions)
if params['uuids'] == True:
>>> bd.cleanup()
"""
name = 'html'
-
+
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
self.options.extend([
new.explicit_attrs = []
else:
croot_bug,croot_comment = (None, None)
-
+
if params['xml-file'] == '-':
- xml = self.stdin.read().encode(self.stdin.encoding)
+ xml = self.stdin.read().encode(self.stdin.encoding)
else:
self._check_restricted_access(storage, params['xml-file'])
xml = libbe.util.encoding.get_file_contents(
else:
print >> sys.stderr, 'ignoring unknown tag %s in %s' \
% (child.tag, comment_list.tag)
-
+
# merge the new root_comments
if params['add-only'] == True:
accept_changes = False
croot_bug.merge(new_croot_bug, accept_changes=accept_changes,
accept_extra_strings=accept_extra_strings,
accept_comments=accept_comments)
-
+
# merge the new croot_bugs
merged_bugs = []
old_bugs = []
accept_comments=accept_comments)
merged_bugs.append(new)
old_bugs.append(old)
-
+
# protect against programmer error causing data loss:
if croot_bug != None:
comms = [c.uuid for c in croot_comment.traverse()]
if not new in merged_bugs:
assert bugdir.has_bug(new.uuid), \
"bug %s wasn't added" % (new.uuid)
-
+
# save new information
bugdir.storage.writeable = writeable
if croot_bug != None:
import libbe.command
import libbe.command.util
-# get a list of * for cmp_*() comparing two bugs.
+# get a list of * for cmp_*() comparing two bugs.
AVAILABLE_CMPS = [fn[4:] for fn in dir(libbe.bug) if fn[:4] == 'cmp_']
AVAILABLE_CMPS.remove('attr') # a cmp_* template.
# parser.add_option(short, long, action="store_true",
# dest=attr, help=help, default=False)
# return parser
-#
+#
# ])
def _run(self, **params):
self.result = bugs
if len(bugs) == 0 and params['xml'] == False:
print >> self.stdout, "No matching bugs found"
-
+
# sort bugs
bugs = self._sort_bugs(bugs, cmp_list)
Short name : abc/a
Severity : minor
Status : open
- Assigned :
- Reporter :
+ Assigned :
+ Reporter :
Creator : John Doe <jdoe@example.com>
Created : ...
Bug A
Short name : abc/b
Severity : minor
Status : closed
- Assigned :
- Reporter :
+ Assigned :
+ Reporter :
Creator : Jane Doe <jdoe@example.com>
Created : ...
Bug B
comment.storage = None
comment.alt_id = comment.uuid
comment.storage = bugdir.storage
- comment.uuid = libbe.util.id.uuid_gen()
+ comment.uuid = libbe.util.id.uuid_gen()
comment.save() # force onto disk under bugA
for comment in newCommTree: # just the child comments
def _long_help(self):
return """
-Show or change per-tree settings.
+Show or change per-tree settings.
If name and value are supplied, the name is set to a new value.
If no value is specified, the current value is printed.
-If no arguments are provided, all names and values are listed.
+If no arguments are provided, all names and values are listed.
To unset a setting, set it to "none".
repeatable=True,
completion_callback=libbe.command.util.complete_bug_id),
])
-
+
def _run(self, **params):
bugdir = self._get_bugdir()
for bug_id in params['bug-id']:
Short name : abc/a
Severity : minor
Status : open
- Assigned :
- Reporter :
+ Assigned :
+ Reporter :
Creator : John Doe <jdoe@example.com>
Created : ...
Bug A
repeatable=True,
completion_callback=libbe.command.util.complete_bug_id),
])
-
+
def _run(self, **params):
bugdir = self._get_bugdir()
for bug_id in params['bug-id']:
params['types'] = 'all'
servers = params['servers'].split(',')
types = params['types'].split(',')
-
+
if len(params['id']) == 0:
params['id'] = [libbe.diff.BUGDIR_ID]
for _id in params['id']:
else: # add the tag
estrs = subscribe(estrs, subscriber, types, servers, type_root)
entity.extra_strings = estrs # reassign to notice change
-
+
if params['list-all'] == True:
bugdir.load_all_bugs()
subscriptions = get_bugdir_subscribers(bugdir, servers[0])
for estr in entity.extra_strings:
if estr.startswith(TAG):
subscriptions.append(estr[len(TAG):])
-
+
if len(subscriptions) > 0:
print >> self.stdout, 'Subscriptions for %s:' % entity_name
print >> self.stdout, '\n'.join(subscriptions)
for estr in bug.extra_strings:
if estr.startswith(TAG_TAG):
tags.append(estr[len(TAG_TAG):])
-
+
if len(tags) > 0:
print "Tags for %s:" % bug.id.user()
print '\n'.join(tags)
>>> bd.cleanup()
"""
name = 'target'
-
+
def __init__(self, *args, **kwargs):
libbe.command.Command.__init__(self, *args, **kwargs)
self.options.extend([
if len(comps) == 1 and os.path.isdir(comps[0]):
comps.extend(glob.glob(comps[0]+'/*'))
return comps
-
+
def complete_status(command, argument, fragment=None):
return [fragment]
def complete_severity(command, argument, fragment=None):
Set from_storage=False to create a new comment.
The uuid option is required when from_storage==True.
-
+
The in_reply_to and body options are only used if
from_storage==False (the default). When from_storage==True,
they are loaded from the bug database.
-
+
in_reply_to should be the uuid string of the parent comment.
"""
Tree.__init__(self)
settings_object.SavedSettingsObject.__init__(self)
self.bug = bug
self.storage = None
- self.uuid = uuid
+ self.uuid = uuid
self.id = libbe.util.id.ID(self, 'comment')
if from_storage == False:
if uuid == None:
if self.bug != None:
self.storage = self.bug.storage
if from_storage == False:
- if self.storage != None and self.storage.is_writeable():
+ if self.storage != None and self.storage.is_writeable():
self.save()
def __cmp__(self, other):
self.body = base64.decodestring(body)
self.extra_strings = estrs
- def merge(self, other, accept_changes=True,
+ def merge(self, other, accept_changes=True,
accept_extra_strings=True, change_exception=False):
"""
Merge info from other into this comment. Overrides any
>>> print comm.string(indent=2)
--------- Comment ---------
Name: //abc
- From:
+ From:
Date: Thu, 01 Jan 1970 00:00:00 +0000
<BLANKLINE>
Some
lines.extend(body.splitlines())
else:
lines.append("Content type %s not printable. Try XML output instead" % self.content_type)
-
+
istring = ' '*indent
sep = '\n' + istring
return istring + sep.join(lines).rstrip('\n')
"""
Return a string displaying a thread of comments.
bug_shortname is only used if auto_name_map == True.
-
+
string_method_name (defaults to "string") is the name of the
Comment method used to generate the output string for each
Comment in the thread. The method must take the arguments
indent and shortname.
-
+
SIDE-EFFECT: if auto_name_map==True, calls comment_shortnames()
which will sort the tree by comment.time. Avoid by calling
name_map = {}
>>> print a.string_thread(flatten=True)
--------- Comment ---------
Name: //a
- From:
+ From:
Date: Thu, 20 Nov 2008 01:00:00 +0000
<BLANKLINE>
Insightful remarks
--------- Comment ---------
Name: //b
- From:
+ From:
Date: Thu, 20 Nov 2008 02:00:00 +0000
<BLANKLINE>
Critique original comment
--------- Comment ---------
Name: //c
- From:
+ From:
Date: Thu, 20 Nov 2008 03:00:00 +0000
<BLANKLINE>
Begin flamewar :p
--------- Comment ---------
Name: //d
- From:
+ From:
Date: Thu, 20 Nov 2008 04:00:00 +0000
<BLANKLINE>
Useful examples
>>> print a.string_thread()
--------- Comment ---------
Name: //a
- From:
+ From:
Date: Thu, 20 Nov 2008 01:00:00 +0000
<BLANKLINE>
Insightful remarks
--------- Comment ---------
Name: //b
- From:
+ From:
Date: Thu, 20 Nov 2008 02:00:00 +0000
<BLANKLINE>
Critique original comment
--------- Comment ---------
Name: //c
- From:
+ From:
Date: Thu, 20 Nov 2008 03:00:00 +0000
<BLANKLINE>
Begin flamewar :p
--------- Comment ---------
Name: //d
- From:
+ From:
Date: Thu, 20 Nov 2008 04:00:00 +0000
<BLANKLINE>
Useful examples
if settings_mapfile == None:
settings_mapfile = \
self.storage.get(self.id.storage("values"), default="\n")
- self.settings = mapfile.parse(settings_mapfile)
+ try:
+ self.settings = mapfile.parse(settings_mapfile)
+ except mapfile.InvalidMapfileContents, e:
+ raise Exception('Invalid settings file for comment %s\n'
+ '(BE version missmatch?)' % self.id.user())
self._setup_saved_settings()
def save_settings(self):
def save(self):
"""
Save any loaded contents to storage.
-
+
However, if self.storage.is_writeable() == True, then any
changes are automatically written to storage as soon as they
happen, so calling this method will just waste time (unless
val_2 = getattr(comment_2, attr)
if val_1 == None: val_1 = None
if val_2 == None: val_2 = None
-
+
if invert == True :
return -cmp(val_1, val_2)
else :
if val != 0 :
return val
return 0
-
+
cmp_full = CommentCompoundComparator()
if libbe.TESTING == True:
def __init__(self, id, subscription_type, **kwargs):
if 'type_root' not in kwargs:
if id == BUGDIR_ID:
- kwargs['type_root'] = BUGDIR_TYPE_ALL
+ kwargs['type_root'] = BUGDIR_TYPE_ALL
else:
- kwargs['type_root'] = BUG_TYPE_ALL
+ kwargs['type_root'] = BUG_TYPE_ALL
if type(subscription_type) in types.StringTypes:
subscription_type = type_from_name(subscription_type, **kwargs)
self.id = id
else:
decode = False
value = self._get(*args, **kwargs)
- if decode == True:
+ if decode == True and type(value) != types.UnicodeType:
return unicode(value, self.encoding)
+ if decode == False and type(value) != types.StringType:
+ return value.encode(self.encoding)
return value
def _get(self, id, default=InvalidObject, revision=None):
self.failUnless(s == val,
"%s.get() returned %s not %s"
% (vars(self.Class)['name'], s, self.val))
-
+
class Storage_persistence_TestCase (StorageTestCase):
"""Test cases for Storage.disconnect and .connect methods."""
revs.append(self.s.commit('%s: %d' % (self.commit_msg, i),
self.commit_body))
for i in range(10):
- rev = self.s.revision_id(i+1)
+ rev = self.s.revision_id(i+1)
self.failUnless(rev == revs[i],
"%s.revision_id(%d) returned %s not %s"
% (vars(self.Class)['name'], i+1, rev, revs[i]))
self.failUnless(ret == val(i),
"%s.get() returned %s not %s for revision %s"
% (vars(self.Class)['name'], ret, val(i), revs[i]))
-
+
def make_storage_testcase_subclasses(storage_class, namespace):
"""Make StorageTestCase subclasses for storage_class in namespace."""
storage_testcase_classes = [
if encoding == None:
encoding = default_encoding
config = ConfigParser.ConfigParser()
- if os.path.exists(path()) == False: # touch file or config
+ if os.path.exists(path()) == False: # touch file or config
open(path(), 'w').close() # read chokes on missing file
f = codecs.open(path(), 'r', encoding)
config.readfp(f, path())
import errno
import os.path
+import types
import yaml
import libbe
class IllegalValue(Exception):
def __init__(self, value):
Exception.__init__(self, 'Illegal value "%s"' % value)
- self.value = value
+ self.value = value
+
+class InvalidMapfileContents(Exception):
+ def __init__(self, contents):
+ Exception.__init__(self, 'Invalid YAML contents')
+ self.contents = contents
def generate(map):
"""Generate a YAML mapfile content string.
- >>> generate({"q":"p"})
+ >>> generate({'q':'p'})
'q: p\\n\\n'
- >>> generate({"q":u"Fran\u00e7ais"})
+ >>> generate({'q':u'Fran\u00e7ais'})
'q: Fran\\xc3\\xa7ais\\n\\n'
- >>> generate({"q":u"hello"})
+ >>> generate({'q':u'hello'})
'q: hello\\n\\n'
- >>> generate({"q=":"p"})
+ >>> generate({'q=':'p'})
Traceback (most recent call last):
IllegalKey: Illegal key "q="
- >>> generate({"q:":"p"})
+ >>> generate({'q:':'p'})
Traceback (most recent call last):
IllegalKey: Illegal key "q:"
- >>> generate({"q\\n":"p"})
+ >>> generate({'q\\n':'p'})
Traceback (most recent call last):
IllegalKey: Illegal key "q\\n"
- >>> generate({"":"p"})
+ >>> generate({'':'p'})
Traceback (most recent call last):
IllegalKey: Illegal key ""
- >>> generate({">q":"p"})
+ >>> generate({'>q':'p'})
Traceback (most recent call last):
IllegalKey: Illegal key ">q"
- >>> generate({"q":"p\\n"})
+ >>> generate({'q':'p\\n'})
Traceback (most recent call last):
IllegalValue: Illegal value "p\\n"
"""
'p'
>>> parse('q: \\'p\\'\\n\\n')['q']
'p'
- >>> contents = generate({"a":"b", "c":"d", "e":"f"})
+ >>> contents = generate({'a':'b', 'c':'d', 'e':'f'})
>>> dict = parse(contents)
- >>> dict["a"]
+ >>> dict['a']
'b'
- >>> dict["c"]
+ >>> dict['c']
'd'
- >>> dict["e"]
+ >>> dict['e']
'f'
- >>> contents = generate({"q":u"Fran\u00e7ais"})
+ >>> contents = generate({'q':u'Fran\u00e7ais'})
>>> dict = parse(contents)
- >>> dict["q"]
+ >>> dict['q']
u'Fran\\xe7ais'
+ >>> dict = parse('a!')
+ Traceback (most recent call last):
+ ...
+ InvalidMapfileContents: Invalid YAML contents
"""
- return yaml.load(contents) or {}
+ c = yaml.load(contents)
+ if type(c) == types.StringType:
+ raise InvalidMapfileContents(
+ 'Unable to parse YAML (BE format missmatch?):\n\n%s' % contents)
+ return c or {}
if libbe.TESTING == True:
suite = doctest.DocTestSuite()
settings as primed.
"""
for property in self.settings_properties:
- if property not in self.settings:
- self.settings[property] = EMPTY
- elif self.settings[property] == UNPRIMED:
+ if property not in self.settings \
+ or self.settings[property] == UNPRIMED:
self.settings[property] = EMPTY
if flag_as_loaded == True:
self._settings_loaded = True
import libbe
import libbe.bug as bug
-import libbe.util.encoding as encoding
import libbe.storage.util.mapfile as mapfile
+import libbe.util.encoding as encoding
+import libbe.util.id
-if libbe.TESTING == True:
- import doctest
# a list of all past versions
-BUGDIR_DISK_VERSIONS = ["Bugs Everywhere Tree 1 0",
- "Bugs Everywhere Directory v1.1",
- "Bugs Everywhere Directory v1.2",
- "Bugs Everywhere Directory v1.3"]
+BUGDIR_DISK_VERSIONS = ['Bugs Everywhere Tree 1 0',
+ 'Bugs Everywhere Directory v1.1',
+ 'Bugs Everywhere Directory v1.2',
+ 'Bugs Everywhere Directory v1.3']
# the current version
BUGDIR_DISK_VERSION = BUGDIR_DISK_VERSIONS[-1]
"Class for converting between different on-disk BE storage formats."
initial_version = None
final_version = None
- def __init__(self, root):
- self.root = root
+ def __init__(self, repo):
+ self.repo = repo
- def get_path(self, *args):
+ def get_path(self, id):
"""
- Return a path relative to .root.
+ Return a path relative to .repo.
"""
+ if id == 'version':
+ return os.path.join(self.repo, id)
+
+TODO
dir = os.path.join(self.root, '.be')
if len(args) == 0:
return dir
def check_initial_version(self):
path = self.get_path('version')
- version = self.vcs.get_file_contents(path).rstrip('\n')
+ version = encoding.get_file_contents(path).rstrip('\n')
assert version == self.initial_version, version
def set_version(self):
- path = self.get_path("version")
- self.vcs.set_file_contents(path, self.final_version+"\n")
+ path = self.get_path('version')
+ encoding.set_file_contents(path, self.final_version+'\n')
def upgrade(self):
- print >> sys.stderr, "upgrading bugdir from '%s' to '%s'" \
+ print >> sys.stderr, 'upgrading bugdir from "%s" to "%s"' \
% (self.initial_version, self.final_version)
self.check_initial_version()
self.set_version()
if version_b == target_version:
break
i += 1
-
-if libbe.TESTING == True:
- suite = doctest.DocTestSuite()
http://regexps.srparish.net/tutorial-tla/naming-conventions.html
Since our bug directory '.be' doesn't satisfy these conventions,
we need to adjust them.
-
+
The conventions are specified in
project-root/{arch}/=tagging-method
"""
dirname = path
status,output,error = self._u_invoke_client('tree-root', dirname)
root = output.rstrip('\n')
-
+
self._get_archive_project_name(root)
return root
id = self.id(dirpath)
relpath = dirpath[len(self._root)+1:]
if id.count('/') == 0:
+ if id in self._cache:
+ import sys
+ print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
self._cache[id] = relpath
except InvalidPath:
pass
dumping VCS-specific files into the .be directory.
If you do need to implement this method (e.g. Arch), set
- self.interspersed_vcs_files = True
+ self.interspersed_vcs_files = True
"""
assert self.interspersed_vcs_files == False
raise NotImplementedError
def _vcs_version(self):
status,output,error = self._u_invoke_client('--version')
- return output
+ return output
def _vcs_get_user_id(self):
status,output,error = self._u_invoke_client('whoami')
return base.VCS._vcs_get_file_contents(self, path, revision)
else:
status,output,error = \
- self._u_invoke_client('cat', '-r', revision,path)
+ self._u_invoke_client('cat', '-r', revision, path)
return output
def _vcs_commit(self, commitfile, allow_empty=False):
return str(index) # bzr commit 0 is the empty tree.
return str(current_revision+index+1)
-\f
+\f
if libbe.TESTING == True:
base.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__])
def _vcs_detect(self, path):
if self._u_search_parent_directories(path, "_darcs") != None :
return True
- return False
+ return False
def _vcs_root(self, path):
"""Find the root of the deepest repository containing path."""
'diff', '--unified', '--patch', revision, path,
unicode_output=False)
target_patch = output
-
+
# '--output -' to be supported in GNU patch > 2.5.9
# but that hasn't been released as of June 30th, 2009.
except IndexError:
return None
-\f
+\f
if libbe.TESTING == True:
base.make_vcs_testcase_subclasses(Darcs, sys.modules[__name__])
def _vcs_detect(self, path):
if self._u_search_parent_directories(path, '.git') != None :
return True
- return False
+ return False
def _vcs_root(self, path):
"""Find the root of the deepest repository containing path."""
except IndexError:
return None
-\f
+\f
if libbe.TESTING == True:
base.make_vcs_testcase_subclasses(Git, sys.modules[__name__])
return id
return None
-\f
+\f
if libbe.TESTING == True:
base.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
if options['no-pager'] == True:
paginate = 'never'
libbe.ui.util.pager.run_pager(paginate)
-
+
command_name = args[0]
try:
module = libbe.command.get_command(command_name)
encoding = get_filesystem_encoding()
f = codecs.open(path, mode, encoding)
else:
- f = open(path, mode)
+ f = open(path, mode)
contents = f.read()
f.close()
return contents
q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
else:
# win32 don't have os.execvp() so have to run command in a shell
- q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
shell=True, cwd=cwd)
except OSError, e :
strerror = "%s\nwhile executing %s" % (e.args[1], args)
fields = _split(id)
if len(fields) == 1:
yield fields[0]
-
+
REGEXP = '#([-a-f0-9]*)(/[-a-g0-9]*)?(/[-a-g0-9]*)?#'
self._siblings = siblings
def sibling_uuids(self):
return self._siblings
-
+
class IDtestCase(unittest.TestCase):
def setUp(self):
self.bugdir = DummyObject('1234abcd')
self.bug = DummyObject('abcdef', ['a1234', 'ab9876'])
self.bug.bugdir = self.bugdir
self.bugdir.bug_from_uuid = lambda uuid: self.bug
- self.bugdir.uuids = lambda : self.bug.sibling_uuids() + [self.bug.uuid]
+ self.bugdir.uuids = lambda : self.bug.sibling_uuids() + [self.bug.uuid]
self.comment = DummyObject('12345678', ['1234abcd', '1234cdef'])
self.comment.bug = self.bug
self.bug.comment_from_uuid = lambda uuid: self.comment
- self.bug.uuids = lambda : self.comment.sibling_uuids() + [self.comment.uuid]
+ self.bug.uuids = lambda : self.comment.sibling_uuids() + [self.comment.uuid]
self.bd_id = ID(self.bugdir, 'bugdir')
self.b_id = ID(self.bug, 'bug')
self.c_id = ID(self.comment, 'comment')
>>> 'plugin' in [n for n in modnames('libbe.util')]
True
"""
- components = prefix.split('.')
+ components = prefix.split('.')
modfiles = os.listdir(os.path.join(_PLUGIN_PATH, *components))
modfiles.sort()
for modfile in modfiles:
else:
assert _MSWINDOWS==True, 'invalid platform'
# win32 don't have os.execvp() so have to run command in a shell
- q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr,
+ q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr,
shell=True, cwd=cwd)
except OSError, e:
raise CommandError(args, status=e.args[0], stderr=e)
thread.start()
threads.append(thread)
std_X_arrays.append(stderr_array)
-
+
# also listen to the last processes stdout
stdout_array = []
thread = Thread(target=proc._readerthread,
thread.start()
threads.append(thread)
std_X_arrays.append(stdout_array)
-
+
# join threads as they die
for thread in threads:
thread.join()
-
+
# read output from reader threads
std_X_strings = []
for std_X_array in std_X_arrays:
"""
Find the file (or directory) named filename in path or in any
of path's parents.
-
+
e.g.
search_parent_directories("/a/b/c", ".be")
will return the path to the first existing file from
time_val = calendar.timegm(time.strptime(str_time, RFC_2822_TIME_FMT))
timesign = -int(timezone_str[0]+"1") # "+" -> time_val ahead of GMT
timezone_tuple = time.strptime(timezone_str[1:], "%H%M")
- timezone = timezone_tuple.tm_hour*3600 + timezone_tuple.tm_min*60
+ timezone = timezone_tuple.tm_hour*3600 + timezone_tuple.tm_min*60
return time_val + timesign*timezone
def handy_time(time_val):
>>> underlined("Underlined String")
'Underlined String\\n================='
"""
-
+
return "%s\n%s" % (instring, "="*len(instring))
if libbe.TESTING == True: