severity: minor
-status: open
+status: fixed
summary: 'Terminology: Version control system vs. RCS'
- Unknown meaning. For backwards compatibility with old BE bugs.
-rcs_name: bzr
+vcs_name: bzr
-Bugs Everywhere Directory v1.1
+Bugs Everywhere Directory v1.2
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Commit the currently pending changes to the repository"""
-from libbe import cmdutil, bugdir, editor, rcs
+from libbe import cmdutil, bugdir, editor, vcs
import sys
__desc__ = __doc__
>>> os.chdir(bd.root)
>>> full_path = "testfile"
>>> test_contents = "A test file"
- >>> bd.rcs.set_file_contents(full_path, test_contents)
+ >>> bd.vcs.set_file_contents(full_path, test_contents)
>>> execute(["Added %s." % (full_path)], manipulate_encodings=False) # doctest: +ELLIPSIS
Committed ...
>>> bd.cleanup()
elif options.body == "EDITOR":
body = editor.editor_string("Please enter your commit message above")
else:
- body = bd.rcs.get_file_contents(options.body, allow_no_rcs=True)
+ body = bd.vcs.get_file_contents(options.body, allow_no_vcs=True)
try:
- revision = bd.rcs.commit(summary, body=body,
+ revision = bd.vcs.commit(summary, body=body,
allow_empty=options.allow_empty)
- except rcs.EmptyCommit, e:
+ except vcs.EmptyCommit, e:
print e
return 1
else:
>>> import os
>>> bd = bugdir.SimpleBugDir()
>>> bd.set_sync_with_disk(True)
- >>> original = bd.rcs.commit("Original status")
+ >>> original = bd.vcs.commit("Original status")
>>> bug = bd.bug_from_uuid("a")
>>> bug.status = "closed"
- >>> changed = bd.rcs.commit("Closed bug a")
+ >>> changed = bd.vcs.commit("Closed bug a")
>>> os.chdir(bd.root)
- >>> if bd.rcs.versioned == True:
+ >>> if bd.vcs.versioned == True:
... execute([original], manipulate_encodings=False)
... else:
... print "Modified bugs:\\n a:cm: Bug A\\n Changed bug settings:\\n status: open -> closed"
a:cm: Bug A
Changed bug settings:
status: open -> closed
- >>> if bd.rcs.versioned == True:
+ >>> if bd.vcs.versioned == True:
... execute(["--modified", original], manipulate_encodings=False)
... else:
... print "a"
a
- >>> if bd.rcs.versioned == False:
+ >>> if bd.vcs.versioned == False:
... execute([original], manipulate_encodings=False)
... else:
... print "This directory is not revision-controlled."
raise cmdutil.UsageError("Too many arguments.")
bd = bugdir.BugDir(from_disk=True,
manipulate_encodings=manipulate_encodings)
- if bd.rcs.versioned == False:
+ if bd.vcs.versioned == False:
print "This directory is not revision-controlled."
else:
if revision == None: # get the most recent revision
- revision = bd.rcs.revision_id(-1)
+ revision = bd.vcs.revision_id(-1)
old_bd = bd.duplicate_bugdir(revision)
d = diff.Diff(old_bd, bd)
tree = d.report_tree()
return parser
longhelp="""
-Uses the RCS to compare the current tree with a previous tree, and
+Uses the VCS to compare the current tree with a previous tree, and
prints a pretty report. If REVISION is given, it is a specifier for
the particular previous tree to use. Specifiers are specific to their
-RCS.
+VCS.
For Arch your specifier must be a fully-qualified revision name.
def execute(args, manipulate_encodings=True):
"""
- >>> from libbe import utility, rcs
+ >>> from libbe import utility, vcs
>>> import os
>>> dir = utility.Dir()
>>> try:
>>> dir = utility.Dir()
>>> os.chdir(dir.path)
- >>> rcs = rcs.installed_rcs()
- >>> rcs.init('.')
- >>> print rcs.name
+ >>> vcs = vcs.installed_vcs()
+ >>> vcs.init('.')
+ >>> print vcs.name
Arch
>>> execute([], manipulate_encodings=False)
Using Arch for revision control.
Directory initialized.
- >>> rcs.cleanup()
+ >>> vcs.cleanup()
>>> try:
... execute(['--root', '.'], manipulate_encodings=False)
except bugdir.AlreadyInitialized:
raise cmdutil.UserError("Directory already initialized: %s" % options.root_dir)
bd.save()
- if bd.rcs.name is not "None":
- print "Using %s for revision control." % bd.rcs.name
+ if bd.vcs.name is not "None":
+ print "Using %s for revision control." % bd.vcs.name
else:
print "No revision control detected."
print "Directory initialized."
longhelp="""
This command initializes Bugs Everywhere support for the specified directory
and all its subdirectories. It will auto-detect any supported revision control
-system. You can use "be set rcs_name" to change the rcs being used.
+system. You can use "be set vcs_name" to change the vcs being used.
The directory defaults to your current working directory.
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Change tree settings"""
import textwrap
-from libbe import cmdutil, bugdir, rcs, settings_object
+from libbe import cmdutil, bugdir, vcs, settings_object
__desc__ = __doc__
def _value_string(bd, setting):
set = getattr(bugdir.BugDir, s)
dstr = set.__doc__.strip()
# per-setting comment adjustments
- if s == "rcs_name":
+ if s == "vcs_name":
lines = dstr.split('\n')
while lines[0].startswith("This property defaults to") == False:
lines.pop(0)
assert len(lines) != None, \
- "Unexpected rcs_name docstring:\n '%s'" % dstr
+ "Unexpected vcs_name docstring:\n '%s'" % dstr
lines.insert(
0, "The name of the revision control system to use.\n")
dstr = '\n'.join(lines)
# read only bugdir.
bd = libbe.bugdir.BugDir(from_disk=True,
manipulate_encodings=False)
- if bd.rcs.versioned == False: # no way to tell what's changed
+ if bd.vcs.versioned == False: # no way to tell what's changed
raise NotificationFailed("Not versioned")
subscribers = subscribe.get_bugdir_subscribers(bd, THIS_SERVER)
commit_msg = self.commit_command.stdout
assert commit_msg.startswith("Committed "), commit_msg
after_revision = commit_msg[len("Committed "):]
- before_revision = bd.rcs.revision_id(-2)
+ before_revision = bd.vcs.revision_id(-2)
else:
before_revision = previous_revision
if before_revision == None:
assigned = None
bug.assigned = assigned
bug.save()
-# bug.rcs.precommit(bug.path)
-# bug.rcs.commit(bug.path, "Auto-commit")
-# bug.rcs.postcommit(bug.path)
+# bug.vcs.precommit(bug.path)
+# bug.vcs.commit(bug.path, "Auto-commit")
+# bug.vcs.postcommit(bug.path)
raise cherrypy.HTTPRedirect(bug_list_url(bug_data["project"]))
def instantiate(self, project, bug):
import config
from beuuid import uuid_gen
-import rcs
-from rcs import RCS
+import vcs
+from vcs import VCS
DEFAULT_CLIENT = "tla"
def new():
return Arch()
-class Arch(RCS):
+class Arch(VCS):
name = "Arch"
client = client
versioned = True
_project_name = None
_tmp_project = False
_arch_paramdir = os.path.expanduser("~/.arch-params")
- def _rcs_help(self):
+ def _vcs_help(self):
status,output,error = self._u_invoke_client("--help")
return output
- def _rcs_detect(self, path):
+ def _vcs_detect(self, path):
"""Detect whether a directory is revision-controlled using Arch"""
if self._u_search_parent_directories(path, "{arch}") != None :
config.set_val("arch_client", client)
return True
return False
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
self._create_archive(path)
self._create_project(path)
self._add_project_code(path)
"""
Create a temporary Arch archive in the directory PATH. This
archive will be removed by
- __del__->cleanup->_rcs_cleanup->_remove_archive
+ __del__->cleanup->_vcs_cleanup->_remove_archive
"""
# http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
assert self._archive_name == None
"""
Create a temporary Arch project in the directory PATH. This
project will be removed by
- __del__->cleanup->_rcs_cleanup->_remove_project
+ __del__->cleanup->_vcs_cleanup->_remove_project
"""
# http://mwolson.org/projects/GettingStartedWithArch.html
# http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
self._adjust_naming_conventions(path)
self._invoke_client("import", "--summary", "Began versioning",
directory=path)
- def _rcs_cleanup(self):
+ def _vcs_cleanup(self):
if self._tmp_project == True:
self._remove_project()
if self._tmp_archive == True:
self._remove_archive()
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
if not os.path.isdir(path):
dirname = os.path.dirname(path)
else:
archive_name,project_name = output.rstrip('\n').split('/')
self._archive_name = archive_name
self._project_name = project_name
- def _rcs_get_user_id(self):
+ def _vcs_get_user_id(self):
try:
status,output,error = self._u_invoke_client('my-id')
return output.rstrip('\n')
return None
else:
raise
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
self._u_invoke_client('my-id', value)
- def _rcs_add(self, path):
+ def _vcs_add(self, path):
self._u_invoke_client("add-id", path)
realpath = os.path.realpath(self._u_abspath(path))
pathAdded = realpath in self._list_added(self.rootdir)
self._add_dir_rule(rule, os.path.dirname(path), self.rootdir)
if os.path.realpath(path) not in self._list_added(self.rootdir):
raise CantAddFile(path)
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
if not '.arch-ids' in path:
self._u_invoke_client("delete-id", path)
- def _rcs_update(self, path):
+ def _vcs_update(self, path):
pass
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
if revision == None:
- return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ return VCS._vcs_get_file_contents(self, path, revision, binary=binary)
else:
status,output,error = \
self._invoke_client("file-find", path, revision)
contents = f.read()
f.close()
return contents
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
if revision == None:
- RCS._rcs_duplicate_repo(self, directory, revision)
+ VCS._vcs_duplicate_repo(self, directory, revision)
else:
status,output,error = \
self._u_invoke_client("get", revision,directory)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
if allow_empty == False:
# arch applies empty commits without complaining, so check first
status,output,error = self._u_invoke_client("changes",expect=(0,1))
if status == 0:
- raise rcs.EmptyCommit()
+ raise vcs.EmptyCommit()
summary,body = self._u_parse_commitfile(commitfile)
args = ["commit", "--summary", summary]
if body != None:
assert revpath.startswith(self._archive_project_name()+'--')
revision = revpath[len(self._archive_project_name()+'--'):]
return revpath
- def _rcs_revision_id(self, index):
+ def _vcs_revision_id(self, index):
status,output,error = self._u_invoke_client("logs")
logs = output.splitlines()
first_log = logs.pop(0)
\f
-rcs.make_rcs_testcase_subclasses(Arch, sys.modules[__name__])
+vcs.make_vcs_testcase_subclasses(Arch, sys.modules[__name__])
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
@doc_property(doc="The trunk of the comment tree")
def comment_root(): return {}
- def _get_rcs(self):
- if hasattr(self.bugdir, "rcs"):
- return self.bugdir.rcs
+ def _get_vcs(self):
+ if hasattr(self.bugdir, "vcs"):
+ return self.bugdir.vcs
@Property
- @cached_property(generator=_get_rcs)
- @local_property("rcs")
+ @cached_property(generator=_get_vcs)
+ @local_property("vcs")
@doc_property(doc="A revision control system instance.")
- def rcs(): return {}
+ def vcs(): return {}
def __init__(self, bugdir=None, uuid=None, from_disk=False,
load_comments=False, summary=None):
if uuid == None:
self.uuid = uuid_gen()
self.time = int(time.time()) # only save to second precision
- if self.rcs != None:
- self.creator = self.rcs.get_user_id()
+ if self.vcs != None:
+ self.creator = self.vcs.get_user_id()
self.summary = summary
def __repr__(self):
def load_settings(self):
if self.sync_with_disk == False:
raise DiskAccessRequired("load settings")
- self.settings = mapfile.map_load(self.rcs, self.get_path("values"))
+ self.settings = mapfile.map_load(self.vcs, self.get_path("values"))
self._setup_saved_settings()
def save_settings(self):
if self.sync_with_disk == False:
raise DiskAccessRequired("save settings")
assert self.summary != None, "Can't save blank bug"
- self.rcs.mkdir(self.get_path())
+ self.vcs.mkdir(self.get_path())
path = self.get_path("values")
- mapfile.map_save(self.rcs, path, self._get_saved_settings())
+ mapfile.map_save(self.vcs, path, self._get_saved_settings())
def save(self):
"""
raise DiskAccessRequired("remove")
self.comment_root.remove()
path = self.get_path()
- self.rcs.recursive_remove(path)
+ self.vcs.recursive_remove(path)
# methods for managing comments
cached_property, primed_property, change_hook_property, \
settings_property
import mapfile
-import rcs
+import vcs
import settings_object
import upgrade
import utility
loads new settings/bugs/comments that it doesn't already have in
memory and .sync_with_disk == True.
- Allow RCS initialization
+ Allow VCS initialization
========================
This one is for testing purposes. Setting it to True allows the
- BugDir to search for an installed RCS backend and initialize it in
+ BugDir to search for an installed VCS backend and initialize it in
the root directory. This is a convenience option for supporting
tests of versioning functionality (e.g. .duplicate_bugdir).
def encoding(): return {}
def _setup_user_id(self, user_id):
- self.rcs.user_id = user_id
+ self.vcs.user_id = user_id
def _guess_user_id(self):
- return self.rcs.get_user_id()
+ return self.vcs.get_user_id()
def _set_user_id(self, old_user_id, new_user_id):
self._setup_user_id(new_user_id)
self._prop_save_settings(old_user_id, new_user_id)
@_versioned_property(name="user_id",
doc=
"""The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note
-that the Arch RCS backend *enforces* ids with this format.""",
+that the Arch VCS backend *enforces* ids with this format.""",
change_hook=_set_user_id,
generator=_guess_user_id)
def user_id(): return {}
"""The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""")
def default_assignee(): return {}
- @_versioned_property(name="rcs_name",
- doc="""The name of the current RCS. Kept seperate to make saving/loading
-settings easy. Don't set this attribute. Set .rcs instead, and
-.rcs_name will be automatically adjusted.""",
+ @_versioned_property(name="vcs_name",
+ doc="""The name of the current VCS. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .vcs instead, and
+.vcs_name will be automatically adjusted.""",
default="None",
allowed=["None", "Arch", "bzr", "darcs", "git", "hg"])
- def rcs_name(): return {}
+ def vcs_name(): return {}
- def _get_rcs(self, rcs_name=None):
+ def _get_vcs(self, vcs_name=None):
"""Get and root a new revision control system"""
- if rcs_name == None:
- rcs_name = self.rcs_name
- new_rcs = rcs.rcs_by_name(rcs_name)
- self._change_rcs(None, new_rcs)
- return new_rcs
- def _change_rcs(self, old_rcs, new_rcs):
- new_rcs.encoding = self.encoding
- new_rcs.root(self.root)
- self.rcs_name = new_rcs.name
+ if vcs_name == None:
+ vcs_name = self.vcs_name
+ new_vcs = vcs.vcs_by_name(vcs_name)
+ self._change_vcs(None, new_vcs)
+ return new_vcs
+ def _change_vcs(self, old_vcs, new_vcs):
+ new_vcs.encoding = self.encoding
+ new_vcs.root(self.root)
+ self.vcs_name = new_vcs.name
@Property
- @change_hook_property(hook=_change_rcs)
- @cached_property(generator=_get_rcs)
- @local_property("rcs")
+ @change_hook_property(hook=_change_vcs)
+ @cached_property(generator=_get_vcs)
+ @local_property("vcs")
@doc_property(doc="A revision control system instance.")
- def rcs(): return {}
+ def vcs(): return {}
def _bug_map_gen(self):
map = {}
def __init__(self, root=None, sink_to_existing_root=True,
- assert_new_BugDir=False, allow_rcs_init=False,
- manipulate_encodings=True, from_disk=False, rcs=None):
+ assert_new_BugDir=False, allow_vcs_init=False,
+ manipulate_encodings=True, from_disk=False, vcs=None):
list.__init__(self)
settings_object.SavedSettingsObject.__init__(self)
self._manipulate_encodings = manipulate_encodings
if not os.path.exists(root):
raise NoRootEntry(root)
self.root = root
- # get a temporary rcs until we've loaded settings
+ # get a temporary vcs until we've loaded settings
self.sync_with_disk = False
- self.rcs = self._guess_rcs()
+ self.vcs = self._guess_vcs()
if from_disk == True:
self.sync_with_disk = True
if assert_new_BugDir == True:
if os.path.exists(self.get_path()):
raise AlreadyInitialized, self.get_path()
- if rcs == None:
- rcs = self._guess_rcs(allow_rcs_init)
- self.rcs = rcs
+ if vcs == None:
+ vcs = self._guess_vcs(allow_vcs_init)
+ self.vcs = vcs
self._setup_user_id(self.user_id)
def __del__(self):
self.cleanup()
def cleanup(self):
- self.rcs.cleanup()
+ self.vcs.cleanup()
# methods for getting the BugDir situated in the filesystem
raise NoBugDir(path)
return beroot
- def _guess_rcs(self, allow_rcs_init=False):
+ def _guess_vcs(self, allow_vcs_init=False):
"""
Only called by __init__.
"""
deepdir = self.get_path()
if not os.path.exists(deepdir):
deepdir = os.path.dirname(deepdir)
- new_rcs = rcs.detect_rcs(deepdir)
+ new_vcs = vcs.detect_vcs(deepdir)
install = False
- if new_rcs.name == "None":
- if allow_rcs_init == True:
- new_rcs = rcs.installed_rcs()
- new_rcs.init(self.root)
- return new_rcs
+ if new_vcs.name == "None":
+ if allow_vcs_init == True:
+ new_vcs = vcs.installed_vcs()
+ new_vcs.init(self.root)
+ return new_vcs
# methods for saving/loading/accessing settings and properties.
return os.path.join(dir, *args)
def _get_settings(self, settings_path, for_duplicate_bugdir=False):
- allow_no_rcs = not self.rcs.path_in_root(settings_path)
- if allow_no_rcs == True:
+ allow_no_vcs = not self.vcs.path_in_root(settings_path)
+ if allow_no_vcs == True:
assert for_duplicate_bugdir == True
if self.sync_with_disk == False and for_duplicate_bugdir == False:
# duplicates can ignore this bugdir's .sync_with_disk status
raise DiskAccessRequired("_get settings")
try:
- settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs)
- except rcs.NoSuchFile:
- settings = {"rcs_name": "None"}
+ settings = mapfile.map_load(self.vcs, settings_path, allow_no_vcs)
+ except vcs.NoSuchFile:
+ settings = {"vcs_name": "None"}
return settings
def _save_settings(self, settings_path, settings,
for_duplicate_bugdir=False):
- allow_no_rcs = not self.rcs.path_in_root(settings_path)
- if allow_no_rcs == True:
+ allow_no_vcs = not self.vcs.path_in_root(settings_path)
+ if allow_no_vcs == True:
assert for_duplicate_bugdir == True
if self.sync_with_disk == False and for_duplicate_bugdir == False:
# duplicates can ignore this bugdir's .sync_with_disk status
raise DiskAccessRequired("_save settings")
- self.rcs.mkdir(self.get_path(), allow_no_rcs)
- mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs)
+ self.vcs.mkdir(self.get_path(), allow_no_vcs)
+ mapfile.map_save(self.vcs, settings_path, settings, allow_no_vcs)
def load_settings(self):
self.settings = self._get_settings(self.get_path("settings"))
self._setup_encoding(self.encoding)
self._setup_severities(self.severities)
self._setup_status(self.active_status, self.inactive_status)
- self.rcs = rcs.rcs_by_name(self.rcs_name)
+ self.vcs = vcs.vcs_by_name(self.vcs_name)
self._setup_user_id(self.user_id)
def save_settings(self):
settings = self._get_saved_settings()
self._save_settings(self.get_path("settings"), settings)
- def get_version(self, path=None, use_none_rcs=False,
+ def get_version(self, path=None, use_none_vcs=False,
for_duplicate_bugdir=False):
"""
Requires disk access.
"""
if self.sync_with_disk == False:
raise DiskAccessRequired("get version")
- if use_none_rcs == True:
- RCS = rcs.rcs_by_name("None")
- RCS.root(self.root)
- RCS.encoding = encoding.get_encoding()
+ if use_none_vcs == True:
+ VCS = vcs.vcs_by_name("None")
+ VCS.root(self.root)
+ VCS.encoding = encoding.get_encoding()
else:
- RCS = self.rcs
+ VCS = self.vcs
if path == None:
path = self.get_path("version")
- allow_no_rcs = not RCS.path_in_root(path)
- if allow_no_rcs == True:
+ allow_no_vcs = not VCS.path_in_root(path)
+ if allow_no_vcs == True:
assert for_duplicate_bugdir == True
- version = RCS.get_file_contents(
- path, allow_no_rcs=allow_no_rcs).rstrip("\n")
+ version = VCS.get_file_contents(
+ path, allow_no_vcs=allow_no_vcs).rstrip("\n")
return version
def set_version(self):
"""
if self.sync_with_disk == False:
raise DiskAccessRequired("set version")
- self.rcs.mkdir(self.get_path())
- self.rcs.set_file_contents(self.get_path("version"),
+ self.vcs.mkdir(self.get_path())
+ self.vcs.set_file_contents(self.get_path("version"),
upgrade.BUGDIR_DISK_VERSION+"\n")
# methods controlling disk access
"""
Reqires disk access
"""
- version = self.get_version(use_none_rcs=True)
+ version = self.get_version(use_none_vcs=True)
if version != upgrade.BUGDIR_DISK_VERSION:
upgrade.upgrade(self.root, version)
else:
# methods for managing duplicate BugDirs
def duplicate_bugdir(self, revision):
- duplicate_path = self.rcs.duplicate_repo(revision)
+ duplicate_path = self.vcs.duplicate_repo(revision)
duplicate_version_path = os.path.join(duplicate_path, ".be", "version")
version = self.get_version(duplicate_version_path,
if version != upgrade.BUGDIR_DISK_VERSION:
upgrade.upgrade(duplicate_path, version)
- # setup revision RCS as None, since the duplicate may not be
+ # setup revision VCS as None, since the duplicate may not be
# initialized for versioning
duplicate_settings_path = os.path.join(duplicate_path,
".be", "settings")
duplicate_settings = self._get_settings(duplicate_settings_path,
for_duplicate_bugdir=True)
- if "rcs_name" in duplicate_settings:
- duplicate_settings["rcs_name"] = "None"
+ if "vcs_name" in duplicate_settings:
+ duplicate_settings["vcs_name"] = "None"
duplicate_settings["user_id"] = self.user_id
if "disabled" in bug.status_values:
# Hack to support old versions of BE bugs
return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings)
def remove_duplicate_bugdir(self):
- self.rcs.remove_duplicate_repo()
+ self.vcs.remove_duplicate_repo()
# methods for managing bugs
assert os.path.exists(dir.path)
root = dir.path
assert_new_BugDir = True
- rcs_init = True
+ vcs_init = True
else:
root = "/"
assert_new_BugDir = False
- rcs_init = False
+ vcs_init = False
BugDir.__init__(self, root, sink_to_existing_root=False,
assert_new_BugDir=assert_new_BugDir,
- allow_rcs_init=rcs_init,
+ allow_vcs_init=vcs_init,
manipulate_encodings=False)
if sync_with_disk == True: # postpone cleanup since dir.__del__() removes dir.
self._dir_ref = dir
def setUp(self):
self.dir = utility.Dir()
self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
- allow_rcs_init=True)
- self.rcs = self.bugdir.rcs
+ allow_vcs_init=True)
+ self.vcs = self.bugdir.vcs
def tearDown(self):
self.bugdir.cleanup()
self.dir.cleanup()
self.assertRaises(AlreadyInitialized, BugDir,
self.dir.path, assertNewBugDir=True)
def versionTest(self):
- if self.rcs.versioned == False:
+ if self.vcs.versioned == False:
return
- original = self.bugdir.rcs.commit("Began versioning")
+ original = self.bugdir.vcs.commit("Began versioning")
bugA = self.bugdir.bug_from_uuid("a")
bugA.status = "fixed"
self.bugdir.save()
- new = self.rcs.commit("Fixed bug a")
+ new = self.vcs.commit("Fixed bug a")
dupdir = self.bugdir.duplicate_bugdir(original)
self.failUnless(dupdir.root != self.bugdir.root,
"%s, %s" % (dupdir.root, self.bugdir.root))
self.original_working_dir = os.getcwd()
os.chdir(self.dir.path)
self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
- allow_rcs_init=True)
+ allow_vcs_init=True)
self.bugdir.new_bug("preexisting", summary="Hopefully not imported")
self.bugdir.save()
def tearDown(self):
import unittest
import doctest
-import rcs
-from rcs import RCS
+import vcs
+from vcs import VCS
def new():
return Bzr()
-class Bzr(RCS):
+class Bzr(VCS):
name = "bzr"
client = "bzr"
versioned = True
- def _rcs_help(self):
+ def _vcs_help(self):
status,output,error = self._u_invoke_client("--help")
return output
- def _rcs_detect(self, path):
+ def _vcs_detect(self, path):
if self._u_search_parent_directories(path, ".bzr") != None :
return True
return False
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
"""Find the root of the deepest repository containing path."""
status,output,error = self._u_invoke_client("root", path)
return output.rstrip('\n')
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
self._u_invoke_client("init", directory=path)
- def _rcs_get_user_id(self):
+ def _vcs_get_user_id(self):
status,output,error = self._u_invoke_client("whoami")
return output.rstrip('\n')
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
self._u_invoke_client("whoami", value)
- def _rcs_add(self, path):
+ def _vcs_add(self, path):
self._u_invoke_client("add", path)
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
# --force to also remove unversioned files.
self._u_invoke_client("remove", "--force", path)
- def _rcs_update(self, path):
+ def _vcs_update(self, path):
pass
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
if revision == None:
- return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ return VCS._vcs_get_file_contents(self, path, revision, binary=binary)
else:
status,output,error = \
self._u_invoke_client("cat","-r",revision,path)
return output
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
if revision == None:
- RCS._rcs_duplicate_repo(self, directory, revision)
+ VCS._vcs_duplicate_repo(self, directory, revision)
else:
self._u_invoke_client("branch", "--revision", revision,
".", directory)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
args = ["commit", "--file", commitfile]
if allow_empty == True:
args.append("--unchanged")
strings = ["ERROR: no changes to commit.", # bzr 1.3.1
"ERROR: No changes to commit."] # bzr 1.15.1
if self._u_any_in_string(strings, error) == True:
- raise rcs.EmptyCommit()
+ raise vcs.EmptyCommit()
else:
- raise rcs.CommandError(args, status, error)
+ raise vcs.CommandError(args, status, error)
revision = None
revline = re.compile("Committed revision (.*)[.]")
match = revline.search(error)
assert len(match.groups()) == 1
revision = match.groups()[0]
return revision
- def _rcs_revision_id(self, index):
+ def _vcs_revision_id(self, index):
status,output,error = self._u_invoke_client("revno")
current_revision = int(output)
if index >= current_revision or index < -current_revision:
return str(current_revision+index+1)
\f
-rcs.make_rcs_testcase_subclasses(Bzr, sys.modules[__name__])
+vcs.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__])
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
doc="An integer version of .date")
def _get_comment_body(self):
- if self.rcs != None and self.sync_with_disk == True:
- import rcs
+ if self.vcs != None and self.sync_with_disk == True:
+ import vcs
binary = not self.content_type.startswith("text/")
- return self.rcs.get_file_contents(self.get_path("body"), binary=binary)
+ return self.vcs.get_file_contents(self.get_path("body"), binary=binary)
def _set_comment_body(self, old=None, new=None, force=False):
- if (self.rcs != None and self.sync_with_disk == True) or force==True:
+ if (self.vcs != None and self.sync_with_disk == True) or force==True:
assert new != None, "Can't save empty comment"
binary = not self.content_type.startswith("text/")
- self.rcs.set_file_contents(self.get_path("body"), new, binary=binary)
+ self.vcs.set_file_contents(self.get_path("body"), new, binary=binary)
@Property
@change_hook_property(hook=_set_comment_body)
@doc_property(doc="The meat of the comment")
def body(): return {}
- def _get_rcs(self):
- if hasattr(self.bug, "rcs"):
- return self.bug.rcs
+ def _get_vcs(self):
+ if hasattr(self.bug, "vcs"):
+ return self.bug.vcs
@Property
- @cached_property(generator=_get_rcs)
- @local_property("rcs")
+ @cached_property(generator=_get_vcs)
+ @local_property("vcs")
@doc_property(doc="A revision control system instance.")
- def rcs(): return {}
+ def vcs(): return {}
def _extra_strings_check_fn(value):
return utility.iterable_full_of_strings(value, \
if uuid == None:
self.uuid = uuid_gen()
self.time = int(time.time()) # only save to second precision
- if self.rcs != None:
- self.author = self.rcs.get_user_id()
+ if self.vcs != None:
+ self.author = self.vcs.get_user_id()
self.in_reply_to = in_reply_to
self.body = body
def load_settings(self):
if self.sync_with_disk == False:
raise DiskAccessRequired("load settings")
- self.settings = mapfile.map_load(self.rcs, self.get_path("values"))
+ self.settings = mapfile.map_load(self.vcs, self.get_path("values"))
self._setup_saved_settings()
def save_settings(self):
if self.sync_with_disk == False:
raise DiskAccessRequired("save settings")
- self.rcs.mkdir(self.get_path())
+ self.vcs.mkdir(self.get_path())
path = self.get_path("values")
- mapfile.map_save(self.rcs, path, self._get_saved_settings())
+ mapfile.map_save(self.vcs, path, self._get_saved_settings())
def save(self):
"""
raise DiskAccessRequired("remove")
for comment in self.traverse():
path = comment.get_path()
- self.rcs.recursive_remove(path)
+ self.vcs.recursive_remove(path)
def add_reply(self, reply, allow_time_inversion=False):
if self.uuid != INVALID_UUID:
import doctest
import unittest
-import rcs
-from rcs import RCS
+import vcs
+from vcs import VCS
def new():
return Darcs()
-class Darcs(RCS):
+class Darcs(VCS):
name="darcs"
client="darcs"
versioned=True
- def _rcs_help(self):
+ def _vcs_help(self):
status,output,error = self._u_invoke_client("--help")
return output
- def _rcs_detect(self, path):
+ def _vcs_detect(self, path):
if self._u_search_parent_directories(path, "_darcs") != None :
return True
return False
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
"""Find the root of the deepest repository containing path."""
# Assume that nothing funny is going on; in particular, that we aren't
# dealing with a bare repo.
if darcs_dir == None:
return None
return os.path.dirname(darcs_dir)
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
self._u_invoke_client("init", directory=path)
- def _rcs_get_user_id(self):
+ def _vcs_get_user_id(self):
# following http://darcs.net/manual/node4.html#SECTION00410030000000000000
# as of June 29th, 2009
if self.rootdir == None:
if env_variable in os.environ:
return os.environ[env_variable]
return None
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
if self.rootdir == None:
self.root(".")
if self.rootdir == None:
- raise rcs.SettingIDnotSupported
+ raise vcs.SettingIDnotSupported
author_path = os.path.join(self.rootdir, "_darcs", "prefs", "author")
f = codecs.open(author_path, "w", self.encoding)
f.write(value)
f.close()
- def _rcs_add(self, path):
+ def _vcs_add(self, path):
if os.path.isdir(path):
return
self._u_invoke_client("add", path)
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
if not os.path.isdir(self._u_abspath(path)):
os.remove(os.path.join(self.rootdir, path)) # darcs notices removal
- def _rcs_update(self, path):
+ def _vcs_update(self, path):
pass # darcs notices changes
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
if revision == None:
- return RCS._rcs_get_file_contents(self, path, revision,
+ return VCS._vcs_get_file_contents(self, path, revision,
binary=binary)
else:
try:
return self._u_invoke_client("show", "contents", "--patch", revision, path)
- except rcs.CommandError:
+ except vcs.CommandError:
# Darcs versions < 2.0.0pre2 lack the "show contents" command
status,output,error = self._u_invoke_client("diff", "--unified",
status,output,error = self._u_invoke(args, stdin=target_patch)
if os.path.exists(os.path.join(self.rootdir, path)) == True:
- contents = RCS._rcs_get_file_contents(self, path,
+ contents = VCS._vcs_get_file_contents(self, path,
binary=binary)
else:
contents = ""
status,output,error = self._u_invoke(args, stdin=target_patch)
args=["patch", path]
status,output,error = self._u_invoke(args, stdin=major_patch)
- current_contents = RCS._rcs_get_file_contents(self, path,
+ current_contents = VCS._vcs_get_file_contents(self, path,
binary=binary)
return contents
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
if revision==None:
- RCS._rcs_duplicate_repo(self, directory, revision)
+ VCS._vcs_duplicate_repo(self, directory, revision)
else:
self._u_invoke_client("put", "--to-patch", revision, directory)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
id = self.get_user_id()
if '@' not in id:
id = "%s <%s@invalid.com>" % (id, id)
empty_strings = ["No changes!"]
if self._u_any_in_string(empty_strings, output) == True:
if allow_empty == False:
- raise rcs.EmptyCommit()
+ raise vcs.EmptyCommit()
# note that darcs does _not_ make an empty revision.
# this returns the last non-empty revision id...
- revision = self._rcs_revision_id(-1)
+ revision = self._vcs_revision_id(-1)
else:
revline = re.compile("Finished recording patch '(.*)'")
match = revline.search(output)
assert len(match.groups()) == 1
revision = match.groups()[0]
return revision
- def _rcs_revision_id(self, index):
+ def _vcs_revision_id(self, index):
status,output,error = self._u_invoke_client("changes", "--xml")
revisions = []
xml_str = output.encode("unicode_escape").replace(r"\n", "\n")
except IndexError:
return None
\f
-rcs.make_rcs_testcase_subclasses(Darcs, sys.modules[__name__])
+vcs.make_vcs_testcase_subclasses(Darcs, sys.modules[__name__])
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
def _bugdir_attribute_changes(self):
return self._settings_properties_attribute_changes( \
self.old_bugdir, self.new_bugdir,
- ["rcs_name"]) # tweaked by bugdir.duplicate_bugdir
+ ["vcs_name"]) # tweaked by bugdir.duplicate_bugdir
def _bug_attribute_changes(self, old, new):
return self._settings_properties_attribute_changes(old, new)
def _comment_attribute_changes(self, old, new):
if hasattr(self, "__report_tree"):
return self.__report_tree
bugdir_settings = sorted(self.new_bugdir.settings_properties)
- bugdir_settings.remove("rcs_name") # tweaked by bugdir.duplicate_bugdir
+ bugdir_settings.remove("vcs_name") # tweaked by bugdir.duplicate_bugdir
root = diff_tree("bugdir")
bugdir_attribute_changes = self._bugdir_attribute_changes()
if len(bugdir_attribute_changes) > 0:
import unittest
import doctest
-import rcs
-from rcs import RCS
+import vcs
+from vcs import VCS
def new():
return Git()
-class Git(RCS):
+class Git(VCS):
name="git"
client="git"
versioned=True
- def _rcs_help(self):
+ def _vcs_help(self):
status,output,error = self._u_invoke_client("--help")
return output
- def _rcs_detect(self, path):
+ def _vcs_detect(self, path):
if self._u_search_parent_directories(path, ".git") != None :
return True
return False
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
"""Find the root of the deepest repository containing path."""
# Assume that nothing funny is going on; in particular, that we aren't
# dealing with a bare repo.
gitdir = os.path.join(path, output.rstrip('\n'))
dirname = os.path.abspath(os.path.dirname(gitdir))
return dirname
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
self._u_invoke_client("init", directory=path)
- def _rcs_get_user_id(self):
+ def _vcs_get_user_id(self):
status,output,error = \
self._u_invoke_client("config", "user.name", expect=(0,1))
if status == 0:
email = self._u_get_fallback_email()
return self._u_create_id(name, email)
return None # Git has no infomation
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
name,email = self._u_parse_id(value)
if email != None:
self._u_invoke_client("config", "user.email", email)
self._u_invoke_client("config", "user.name", name)
- def _rcs_add(self, path):
+ def _vcs_add(self, path):
if os.path.isdir(path):
return
self._u_invoke_client("add", path)
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
if not os.path.isdir(self._u_abspath(path)):
self._u_invoke_client("rm", "-f", path)
- def _rcs_update(self, path):
- self._rcs_add(path)
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_update(self, path):
+ self._vcs_add(path)
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
if revision == None:
- return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ return VCS._vcs_get_file_contents(self, path, revision, binary=binary)
else:
arg = "%s:%s" % (revision,path)
status,output,error = self._u_invoke_client("show", arg)
return output
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
if revision==None:
- RCS._rcs_duplicate_repo(self, directory, revision)
+ VCS._vcs_duplicate_repo(self, directory, revision)
else:
#self._u_invoke_client("archive", revision, directory) # makes tarball
self._u_invoke_client("clone", "--no-checkout",".",directory)
self._u_invoke_client("checkout", revision, directory=directory)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
args = ['commit', '--all', '--file', commitfile]
if allow_empty == True:
args.append("--allow-empty")
strings = ["nothing to commit",
"nothing added to commit"]
if self._u_any_in_string(strings, output) == True:
- raise rcs.EmptyCommit()
+ raise vcs.EmptyCommit()
revision = None
revline = re.compile("(.*) (.*)[:\]] (.*)")
match = revline.search(output)
assert match != None, output+error
assert len(match.groups()) == 3
revision = match.groups()[1]
- full_revision = self._rcs_revision_id(-1)
+ full_revision = self._vcs_revision_id(-1)
assert full_revision.startswith(revision), \
"Mismatched revisions:\n%s\n%s" % (revision, full_revision)
return full_revision
- def _rcs_revision_id(self, index):
+ def _vcs_revision_id(self, index):
args = ["rev-list", "--first-parent", "--reverse", "HEAD"]
kwargs = {"expect":(0,128)}
status,output,error = self._u_invoke_client(*args, **kwargs)
if status == 128:
if error.startswith("fatal: ambiguous argument 'HEAD': unknown "):
return None
- raise rcs.CommandError(args, status, error)
+ raise vcs.CommandError(args, status, error)
commits = output.splitlines()
try:
return commits[index]
return None
\f
-rcs.make_rcs_testcase_subclasses(Git, sys.modules[__name__])
+vcs.make_vcs_testcase_subclasses(Git, sys.modules[__name__])
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
import unittest
import doctest
-import rcs
-from rcs import RCS
+import vcs
+from vcs import VCS
def new():
return Hg()
-class Hg(RCS):
+class Hg(VCS):
name="hg"
client="hg"
versioned=True
- def _rcs_help(self):
+ def _vcs_help(self):
status,output,error = self._u_invoke_client("--help")
return output
- def _rcs_detect(self, path):
+ def _vcs_detect(self, path):
"""Detect whether a directory is revision-controlled using Mercurial"""
if self._u_search_parent_directories(path, ".hg") != None:
return True
return False
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
status,output,error = self._u_invoke_client("root", directory=path)
return output.rstrip('\n')
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
self._u_invoke_client("init", directory=path)
- def _rcs_get_user_id(self):
+ def _vcs_get_user_id(self):
status,output,error = self._u_invoke_client("showconfig","ui.username")
return output.rstrip('\n')
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
"""
Supported by the Config Extension, but that is not part of
standard Mercurial.
http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
"""
- raise rcs.SettingIDnotSupported
- def _rcs_add(self, path):
+ raise vcs.SettingIDnotSupported
+ def _vcs_add(self, path):
self._u_invoke_client("add", path)
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
self._u_invoke_client("rm", "--force", path)
- def _rcs_update(self, path):
+ def _vcs_update(self, path):
pass
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
if revision == None:
- return RCS._rcs_get_file_contents(self, path, revision, binary=binary)
+ return VCS._vcs_get_file_contents(self, path, revision, binary=binary)
else:
status,output,error = \
self._u_invoke_client("cat","-r",revision,path)
return output
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
if revision == None:
- return RCS._rcs_duplicate_repo(self, directory, revision)
+ return VCS._vcs_duplicate_repo(self, directory, revision)
else:
self._u_invoke_client("archive", "--rev", revision, directory)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
args = ['commit', '--logfile', commitfile]
status,output,error = self._u_invoke_client(*args)
if allow_empty == False:
strings = ["nothing changed"]
if self._u_any_in_string(strings, output) == True:
- raise rcs.EmptyCommit()
- return self._rcs_revision_id(-1)
- def _rcs_revision_id(self, index, style="id"):
+ raise vcs.EmptyCommit()
+ return self._vcs_revision_id(-1)
+ def _vcs_revision_id(self, index, style="id"):
args = ["identify", "--rev", str(int(index)), "--%s" % style]
kwargs = {"expect": (0,255)}
status,output,error = self._u_invoke_client(*args, **kwargs)
return None
\f
-rcs.make_rcs_testcase_subclasses(Hg, sys.modules[__name__])
+vcs.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
"""
return yaml.load(contents) or {}
-def map_save(rcs, path, map, allow_no_rcs=False):
+def map_save(vcs, path, map, allow_no_vcs=False):
"""Save the map as a mapfile to the specified path"""
contents = generate(map)
- rcs.set_file_contents(path, contents, allow_no_rcs)
+ vcs.set_file_contents(path, contents, allow_no_vcs)
-def map_load(rcs, path, allow_no_rcs=False):
- contents = rcs.get_file_contents(path, allow_no_rcs=allow_no_rcs)
+def map_load(vcs, path, allow_no_vcs=False):
+ contents = vcs.get_file_contents(path, allow_no_vcs=allow_no_vcs)
return parse(contents)
suite = doctest.DocTestSuite()
import encoding
import mapfile
-import rcs
+import vcs
# a list of all past versions
BUGDIR_DISK_VERSIONS = ["Bugs Everywhere Tree 1 0",
- "Bugs Everywhere Directory v1.1"]
+ "Bugs Everywhere Directory v1.1",
+ "Bugs Everywhere Directory v1.2"]
# the current version
BUGDIR_DISK_VERSION = BUGDIR_DISK_VERSIONS[-1]
final_version = None
def __init__(self, root):
self.root = root
- # use the "None" RCS to ensure proper encoding/decoding and
+ # use the "None" VCS to ensure proper encoding/decoding and
# simplify path construction.
- self.rcs = rcs.rcs_by_name("None")
- self.rcs.root(self.root)
- self.rcs.encoding = encoding.get_encoding()
+ self.vcs = vcs.vcs_by_name("None")
+ self.vcs.root(self.root)
+ self.vcs.encoding = encoding.get_encoding()
def get_path(self, *args):
"""
def check_initial_version(self):
path = self.get_path("version")
- version = self.rcs.get_file_contents(path).rstrip("\n")
+ version = self.vcs.get_file_contents(path).rstrip("\n")
assert version == self.initial_version, version
def set_version(self):
path = self.get_path("version")
- self.rcs.set_file_contents(path, self.final_version+"\n")
+ self.vcs.set_file_contents(path, self.final_version+"\n")
def upgrade(self):
print >> sys.stderr, "upgrading bugdir from '%s' to '%s'" \
raise NotImplementedError
-class Upgrade_1_0_to_2 (Upgrader):
+class Upgrade_1_0_to_1_1 (Upgrader):
initial_version = "Bugs Everywhere Tree 1 0"
- final_version = "Bugs Everywhere Directory v2"
+ final_version = "Bugs Everywhere Directory v1.1"
def _upgrade_mapfile(self, path):
- contents = self.rcs.get_file_contents(path)
+ contents = self.vcs.get_file_contents(path)
old_format = False
for line in contents.splitlines():
if len(line.split("=")) == 2:
contents = '\n'.join(newlines)
# load the YAML and save
map = mapfile.parse(contents)
- mapfile.map_save(self.rcs, path, map)
+ mapfile.map_save(self.vcs, path, map)
def _upgrade(self):
"""
path_list = c_path + [comment_uuid, "values"]
path = self.get_path(*path_list)
self._upgrade_mapfile(path)
- settings = mapfile.map_load(self.rcs, path)
+ settings = mapfile.map_load(self.vcs, path)
if "From" in settings:
settings["Author"] = settings.pop("From")
- mapfile.map_save(self.rcs, path, settings)
+ mapfile.map_save(self.vcs, path, settings)
-upgraders = [Upgrade_1_0_to_2]
+class Upgrade_1_1_to_1_2 (Upgrader):
+ initial_version = "Bugs Everywhere Directory v1.1"
+ final_version = "Bugs Everywhere Directory v1.2"
+ def _upgrade(self):
+ """
+ BugDir settings field "rcs_name" -> "vcs_name".
+ """
+ path = self.get_path("settings")
+ settings = mapfile.map_load(self.vcs, path)
+ if "rcs_name" in settings:
+ settings["vcs_name"] = settings.pop("rcs_name")
+ mapfile.map_save(self.vcs, path, settings)
+
+
+upgraders = [Upgrade_1_0_to_1_1,
+ Upgrade_1_1_to_1_2]
upgrade_classes = {}
for upgrader in upgraders:
upgrade_classes[(upgrader.initial_version,upgrader.final_version)]=upgrader
from utility import Dir, search_parent_directories
-def _get_matching_rcs(matchfn):
- """Return the first module for which matchfn(RCS_instance) is true"""
+def _get_matching_vcs(matchfn):
+ """Return the first module for which matchfn(VCS_instance) is true"""
import arch
import bzr
import darcs
import git
import hg
for module in [arch, bzr, darcs, git, hg]:
- rcs = module.new()
- if matchfn(rcs) == True:
- return rcs
- del(rcs)
- return RCS()
+ vcs = module.new()
+ if matchfn(vcs) == True:
+ return vcs
+ del(vcs)
+ return VCS()
-def rcs_by_name(rcs_name):
- """Return the module for the RCS with the given name"""
- return _get_matching_rcs(lambda rcs: rcs.name == rcs_name)
+def vcs_by_name(vcs_name):
+ """Return the module for the VCS with the given name"""
+ return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
-def detect_rcs(dir):
- """Return an RCS instance for the rcs being used in this directory"""
- return _get_matching_rcs(lambda rcs: rcs.detect(dir))
+def detect_vcs(dir):
+ """Return an VCS instance for the vcs being used in this directory"""
+ return _get_matching_vcs(lambda vcs: vcs.detect(dir))
-def installed_rcs():
- """Return an instance of an installed RCS"""
- return _get_matching_rcs(lambda rcs: rcs.installed())
+def installed_vcs():
+ """Return an instance of an installed VCS"""
+ return _get_matching_vcs(lambda vcs: vcs.installed())
class CommandError(Exception):
class SettingIDnotSupported(NotImplementedError):
pass
-class RCSnotRooted(Exception):
+class VCSnotRooted(Exception):
def __init__(self):
- msg = "RCS not rooted"
+ msg = "VCS not rooted"
Exception.__init__(self, msg)
class PathNotInRoot(Exception):
def new():
- return RCS()
+ return VCS()
-class RCS(object):
+class VCS(object):
"""
- This class implements a 'no-rcs' interface.
+ This class implements a 'no-vcs' interface.
- Support for other RCSs can be added by subclassing this class, and
- overriding methods _rcs_*() with code appropriate for your RCS.
+ Support for other VCSs can be added by subclassing this class, and
+ overriding methods _vcs_*() with code appropriate for your VCS.
- The methods _u_*() are utility methods available to the _rcs_*()
+ The methods _u_*() are utility methods available to the _vcs_*()
methods.
"""
name = "None"
def __del__(self):
self.cleanup()
- def _rcs_help(self):
+ def _vcs_help(self):
"""
Return the command help string.
(Allows a simple test to see if the client is installed.)
"""
pass
- def _rcs_detect(self, path=None):
+ def _vcs_detect(self, path=None):
"""
- Detect whether a directory is revision controlled with this RCS.
+ Detect whether a directory is revision controlled with this VCS.
"""
return True
- def _rcs_root(self, path):
+ def _vcs_root(self, path):
"""
- Get the RCS root. This is the default working directory for
+ Get the VCS root. This is the default working directory for
future invocations. You would normally set this to the root
- directory for your RCS.
+ directory for your VCS.
"""
if os.path.isdir(path)==False:
path = os.path.dirname(path)
if path == "":
path = os.path.abspath(".")
return path
- def _rcs_init(self, path):
+ def _vcs_init(self, path):
"""
Begin versioning the tree based at path.
"""
pass
- def _rcs_cleanup(self):
+ def _vcs_cleanup(self):
"""
- Remove any cruft that _rcs_init() created outside of the
+ Remove any cruft that _vcs_init() created outside of the
versioned tree.
"""
pass
- def _rcs_get_user_id(self):
+ def _vcs_get_user_id(self):
"""
- Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
- If the RCS has not been configured with a username, return None.
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return None.
"""
return None
- def _rcs_set_user_id(self, value):
+ def _vcs_set_user_id(self, value):
"""
- Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>").
- This is run if the RCS has not been configured with a usename, so
+ Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the VCS has not been configured with a usename, so
that commits will have a reasonable FROM value.
"""
raise SettingIDnotSupported
- def _rcs_add(self, path):
+ def _vcs_add(self, path):
"""
Add the already created file at path to version control.
"""
pass
- def _rcs_remove(self, path):
+ def _vcs_remove(self, path):
"""
Remove the file at path from version control. Optionally
remove the file from the filesystem as well.
"""
pass
- def _rcs_update(self, path):
+ def _vcs_update(self, path):
"""
Notify the versioning system of changes to the versioned file
at path.
"""
pass
- def _rcs_get_file_contents(self, path, revision=None, binary=False):
+ def _vcs_get_file_contents(self, path, revision=None, binary=False):
"""
Get the file contents as they were in a given revision.
Revision==None specifies the current revision.
"""
assert revision == None, \
- "The %s RCS does not support revision specifiers" % self.name
+ "The %s VCS does not support revision specifiers" % self.name
if binary == False:
f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding)
else:
contents = f.read()
f.close()
return contents
- def _rcs_duplicate_repo(self, directory, revision=None):
+ def _vcs_duplicate_repo(self, directory, revision=None):
"""
Get the repository as it was in a given revision.
revision==None specifies the current revision.
dir specifies a directory to create the duplicate in.
"""
shutil.copytree(self.rootdir, directory, True)
- def _rcs_commit(self, commitfile, allow_empty=False):
+ def _vcs_commit(self, commitfile, allow_empty=False):
"""
Commit the current working directory, using the contents of
commitfile as the comment. Return the name of the old
changes to commit.
"""
return None
- def _rcs_revision_id(self, index):
+ def _vcs_revision_id(self, index):
"""
Return the name of the <index>th revision. Index will be an
integer (possibly <= 0). The choice of which branch to follow
return None
def installed(self):
try:
- self._rcs_help()
+ self._vcs_help()
return True
except OSError, e:
if e.errno == errno.ENOENT:
return False
def detect(self, path="."):
"""
- Detect whether a directory is revision controlled with this RCS.
+ Detect whether a directory is revision controlled with this VCS.
"""
- return self._rcs_detect(path)
+ return self._vcs_detect(path)
def root(self, path):
"""
- Set the root directory to the path's RCS root. This is the
+ Set the root directory to the path's VCS root. This is the
default working directory for future invocations.
"""
- self.rootdir = self._rcs_root(path)
+ self.rootdir = self._vcs_root(path)
def init(self, path):
"""
Begin versioning the tree based at path.
- Also roots the rcs at path.
+ Also roots the vcs at path.
"""
if os.path.isdir(path)==False:
path = os.path.dirname(path)
- self._rcs_init(path)
+ self._vcs_init(path)
self.root(path)
def cleanup(self):
- self._rcs_cleanup()
+ self._vcs_cleanup()
def get_user_id(self):
"""
- Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
- If the RCS has not been configured with a username, return the user's
+ Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
+ If the VCS has not been configured with a username, return the user's
id. You can override the automatic lookup procedure by setting the
- RCS.user_id attribute to a string of your choice.
+ VCS.user_id attribute to a string of your choice.
"""
if hasattr(self, "user_id"):
if self.user_id != None:
return self.user_id
- id = self._rcs_get_user_id()
+ id = self._vcs_get_user_id()
if id == None:
name = self._u_get_fallback_username()
email = self._u_get_fallback_email()
return id
def set_user_id(self, value):
"""
- Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>").
- This is run if the RCS has not been configured with a usename, so
+ Set the VCS's suggested user id (e.g "John Doe <jdoe@example.com>").
+ This is run if the VCS has not been configured with a usename, so
that commits will have a reasonable FROM value.
"""
- self._rcs_set_user_id(value)
+ self._vcs_set_user_id(value)
def add(self, path):
"""
Add the already created file at path to version control.
"""
- self._rcs_add(self._u_rel_path(path))
+ self._vcs_add(self._u_rel_path(path))
def remove(self, path):
"""
Remove a file from both version control and the filesystem.
"""
- self._rcs_remove(self._u_rel_path(path))
+ self._vcs_remove(self._u_rel_path(path))
if os.path.exists(path):
os.remove(path)
def recursive_remove(self, dirname):
fullpath = os.path.join(dirpath, path)
if os.path.exists(fullpath) == False:
continue
- self._rcs_remove(self._u_rel_path(fullpath))
+ self._vcs_remove(self._u_rel_path(fullpath))
if os.path.exists(dirname):
shutil.rmtree(dirname)
def update(self, path):
Notify the versioning system of changes to the versioned file
at path.
"""
- self._rcs_update(self._u_rel_path(path))
- def get_file_contents(self, path, revision=None, allow_no_rcs=False, binary=False):
+ self._vcs_update(self._u_rel_path(path))
+ def get_file_contents(self, path, revision=None, allow_no_vcs=False, binary=False):
"""
Get the file as it was in a given revision.
Revision==None specifies the current revision.
"""
if not os.path.exists(path):
raise NoSuchFile(path)
- if self._use_rcs(path, allow_no_rcs):
+ if self._use_vcs(path, allow_no_vcs):
relpath = self._u_rel_path(path)
- contents = self._rcs_get_file_contents(relpath,revision,binary=binary)
+ contents = self._vcs_get_file_contents(relpath,revision,binary=binary)
else:
f = codecs.open(path, "r", self.encoding)
contents = f.read()
f.close()
return contents
- def set_file_contents(self, path, contents, allow_no_rcs=False, binary=False):
+ def set_file_contents(self, path, contents, allow_no_vcs=False, binary=False):
"""
Set the file contents under version control.
"""
f.write(contents)
f.close()
- if self._use_rcs(path, allow_no_rcs):
+ if self._use_vcs(path, allow_no_vcs):
if add:
self.add(path)
else:
self.update(path)
- def mkdir(self, path, allow_no_rcs=False, check_parents=True):
+ def mkdir(self, path, allow_no_vcs=False, check_parents=True):
"""
Create (if neccessary) a directory at path under version
control.
if check_parents == True:
parent = os.path.dirname(path)
if not os.path.exists(parent): # recurse through parents
- self.mkdir(parent, allow_no_rcs, check_parents)
+ self.mkdir(parent, allow_no_vcs, check_parents)
if not os.path.exists(path):
os.mkdir(path)
- if self._use_rcs(path, allow_no_rcs):
+ if self._use_vcs(path, allow_no_vcs):
self.add(path)
else:
assert os.path.isdir(path)
- if self._use_rcs(path, allow_no_rcs):
+ if self._use_vcs(path, allow_no_vcs):
#self.update(path)# Don't update directories. Changing files
pass # underneath them should be sufficient.
"""
# Dirname in Baseir to protect against simlink attacks.
if self._duplicateBasedir == None:
- self._duplicateBasedir = tempfile.mkdtemp(prefix='BErcs')
+ self._duplicateBasedir = tempfile.mkdtemp(prefix='BEvcs')
self._duplicateDirname = \
os.path.join(self._duplicateBasedir, "duplicate")
- self._rcs_duplicate_repo(directory=self._duplicateDirname,
+ self._vcs_duplicate_repo(directory=self._duplicateDirname,
revision=revision)
return self._duplicateDirname
def remove_duplicate_repo(self):
temp_file.write(summary)
temp_file.flush()
self.precommit()
- revision = self._rcs_commit(filename, allow_empty=allow_empty)
+ revision = self._vcs_commit(filename, allow_empty=allow_empty)
temp_file.close()
self.postcommit()
finally:
"""
if index == None:
return None
- return self._rcs_revision_id(index)
+ return self._vcs_revision_id(index)
def _u_any_in_string(self, list, string):
"""
Return True if any of the strings in list are in string.
or None if none of those files exist.
"""
return search_parent_directories(path, filename)
- def _use_rcs(self, path, allow_no_rcs):
+ def _use_vcs(self, path, allow_no_vcs):
"""
- Try and decide if _rcs_add/update/mkdir/etc calls will
- succeed. Returns True is we think the rcs_call would
+ Try and decide if _vcs_add/update/mkdir/etc calls will
+ succeed. Returns True is we think the vcs_call would
succeeed, and False otherwise.
"""
- use_rcs = True
+ use_vcs = True
exception = None
if self.rootdir != None:
if self.path_in_root(path) == False:
- use_rcs = False
+ use_vcs = False
exception = PathNotInRoot(path, self.rootdir)
else:
- use_rcs = False
- exception = RCSnotRooted
- if use_rcs == False and allow_no_rcs==False:
+ use_vcs = False
+ exception = VCSnotRooted
+ if use_vcs == False and allow_no_vcs==False:
raise exception
- return use_rcs
+ return use_vcs
def path_in_root(self, path, root=None):
"""
Return the relative path to path from root.
- >>> rcs = new()
- >>> rcs.path_in_root("/a.b/c/.be", "/a.b/c")
+ >>> vcs = new()
+ >>> vcs.path_in_root("/a.b/c/.be", "/a.b/c")
True
- >>> rcs.path_in_root("/a.b/.be", "/a.b/c")
+ >>> vcs.path_in_root("/a.b/.be", "/a.b/c")
False
"""
if root == None:
if self.rootdir == None:
- raise RCSnotRooted
+ raise VCSnotRooted
root = self.rootdir
path = os.path.abspath(path)
absRoot = os.path.abspath(root)
def _u_rel_path(self, path, root=None):
"""
Return the relative path to path from root.
- >>> rcs = new()
- >>> rcs._u_rel_path("/a.b/c/.be", "/a.b/c")
+ >>> vcs = new()
+ >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
'.be'
"""
if root == None:
if self.rootdir == None:
- raise RCSnotRooted
+ raise VCSnotRooted
root = self.rootdir
path = os.path.abspath(path)
absRoot = os.path.abspath(root)
def _u_abspath(self, path, root=None):
"""
Return the absolute path from a path realtive to root.
- >>> rcs = new()
- >>> rcs._u_abspath(".be", "/a.b/c")
+ >>> vcs = new()
+ >>> vcs._u_abspath(".be", "/a.b/c")
'/a.b/c/.be'
"""
if root == None:
- assert self.rootdir != None, "RCS not rooted"
+ assert self.rootdir != None, "VCS not rooted"
root = self.rootdir
return os.path.abspath(os.path.join(root, path))
def _u_create_id(self, name, email=None):
"""
- >>> rcs = new()
- >>> rcs._u_create_id("John Doe", "jdoe@example.com")
+ >>> vcs = new()
+ >>> vcs._u_create_id("John Doe", "jdoe@example.com")
'John Doe <jdoe@example.com>'
- >>> rcs._u_create_id("John Doe")
+ >>> vcs._u_create_id("John Doe")
'John Doe'
"""
assert len(name) > 0
return "%s <%s>" % (name, email)
def _u_parse_id(self, value):
"""
- >>> rcs = new()
- >>> rcs._u_parse_id("John Doe <jdoe@example.com>")
+ >>> vcs = new()
+ >>> vcs._u_parse_id("John Doe <jdoe@example.com>")
('John Doe', 'jdoe@example.com')
- >>> rcs._u_parse_id("John Doe")
+ >>> vcs._u_parse_id("John Doe")
('John Doe', None)
>>> try:
- ... rcs._u_parse_id("John Doe <jdoe@example.com><what?>")
+ ... vcs._u_parse_id("John Doe <jdoe@example.com><what?>")
... except AssertionError:
... print "Invalid match"
Invalid match
return (summary, body)
\f
-def setup_rcs_test_fixtures(testcase):
- """Set up test fixtures for RCS test case."""
- testcase.rcs = testcase.Class()
+def setup_vcs_test_fixtures(testcase):
+ """Set up test fixtures for VCS test case."""
+ testcase.vcs = testcase.Class()
testcase.dir = Dir()
testcase.dirname = testcase.dir.path
- rcs_not_supporting_uninitialized_user_id = []
- rcs_not_supporting_set_user_id = ["None", "hg"]
- testcase.rcs_supports_uninitialized_user_id = (
- testcase.rcs.name not in rcs_not_supporting_uninitialized_user_id)
- testcase.rcs_supports_set_user_id = (
- testcase.rcs.name not in rcs_not_supporting_set_user_id)
+ vcs_not_supporting_uninitialized_user_id = []
+ vcs_not_supporting_set_user_id = ["None", "hg"]
+ testcase.vcs_supports_uninitialized_user_id = (
+ testcase.vcs.name not in vcs_not_supporting_uninitialized_user_id)
+ testcase.vcs_supports_set_user_id = (
+ testcase.vcs.name not in vcs_not_supporting_set_user_id)
- if not testcase.rcs.installed():
+ if not testcase.vcs.installed():
testcase.fail(
- "%(name)s RCS not found" % vars(testcase.Class))
+ "%(name)s VCS not found" % vars(testcase.Class))
if testcase.Class.name != "None":
testcase.failIf(
- testcase.rcs.detect(testcase.dirname),
- "Detected %(name)s RCS before initialising"
+ testcase.vcs.detect(testcase.dirname),
+ "Detected %(name)s VCS before initialising"
% vars(testcase.Class))
- testcase.rcs.init(testcase.dirname)
+ testcase.vcs.init(testcase.dirname)
-class RCSTestCase(unittest.TestCase):
- """Test cases for base RCS class."""
+class VCSTestCase(unittest.TestCase):
+ """Test cases for base VCS class."""
- Class = RCS
+ Class = VCS
def __init__(self, *args, **kwargs):
- super(RCSTestCase, self).__init__(*args, **kwargs)
+ super(VCSTestCase, self).__init__(*args, **kwargs)
self.dirname = None
def setUp(self):
- super(RCSTestCase, self).setUp()
- setup_rcs_test_fixtures(self)
+ super(VCSTestCase, self).setUp()
+ setup_vcs_test_fixtures(self)
def tearDown(self):
- del(self.rcs)
- super(RCSTestCase, self).tearDown()
+ del(self.vcs)
+ super(VCSTestCase, self).tearDown()
def full_path(self, rel_path):
return os.path.join(self.dirname, rel_path)
-class RCS_init_TestCase(RCSTestCase):
- """Test cases for RCS.init method."""
+class VCS_init_TestCase(VCSTestCase):
+ """Test cases for VCS.init method."""
def test_detect_should_succeed_after_init(self):
- """Should detect RCS in directory after initialization."""
+ """Should detect VCS in directory after initialization."""
self.failUnless(
- self.rcs.detect(self.dirname),
- "Did not detect %(name)s RCS after initialising"
+ self.vcs.detect(self.dirname),
+ "Did not detect %(name)s VCS after initialising"
% vars(self.Class))
- def test_rcs_rootdir_in_specified_root_path(self):
- """RCS root directory should be in specified root path."""
- rp = os.path.realpath(self.rcs.rootdir)
+ def test_vcs_rootdir_in_specified_root_path(self):
+ """VCS root directory should be in specified root path."""
+ rp = os.path.realpath(self.vcs.rootdir)
dp = os.path.realpath(self.dirname)
- rcs_name = self.Class.name
+ vcs_name = self.Class.name
self.failUnless(
dp == rp or rp == None,
- "%(rcs_name)s RCS root in wrong dir (%(dp)s %(rp)s)" % vars())
+ "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
-class RCS_get_user_id_TestCase(RCSTestCase):
- """Test cases for RCS.get_user_id method."""
+class VCS_get_user_id_TestCase(VCSTestCase):
+ """Test cases for VCS.get_user_id method."""
def test_gets_existing_user_id(self):
"""Should get the existing user ID."""
- if not self.rcs_supports_uninitialized_user_id:
+ if not self.vcs_supports_uninitialized_user_id:
return
- user_id = self.rcs.get_user_id()
+ user_id = self.vcs.get_user_id()
self.failUnless(
user_id is not None,
"unable to get a user id")
-class RCS_set_user_id_TestCase(RCSTestCase):
- """Test cases for RCS.set_user_id method."""
+class VCS_set_user_id_TestCase(VCSTestCase):
+ """Test cases for VCS.set_user_id method."""
def setUp(self):
- super(RCS_set_user_id_TestCase, self).setUp()
+ super(VCS_set_user_id_TestCase, self).setUp()
- if self.rcs_supports_uninitialized_user_id:
- self.prev_user_id = self.rcs.get_user_id()
+ if self.vcs_supports_uninitialized_user_id:
+ self.prev_user_id = self.vcs.get_user_id()
else:
self.prev_user_id = "Uninitialized identity <bogus@example.org>"
- if self.rcs_supports_set_user_id:
+ if self.vcs_supports_set_user_id:
self.test_new_user_id = "John Doe <jdoe@example.com>"
- self.rcs.set_user_id(self.test_new_user_id)
+ self.vcs.set_user_id(self.test_new_user_id)
def tearDown(self):
- if self.rcs_supports_set_user_id:
- self.rcs.set_user_id(self.prev_user_id)
- super(RCS_set_user_id_TestCase, self).tearDown()
+ if self.vcs_supports_set_user_id:
+ self.vcs.set_user_id(self.prev_user_id)
+ super(VCS_set_user_id_TestCase, self).tearDown()
def test_raises_error_in_unsupported_vcs(self):
"""Should raise an error in a VCS that doesn't support it."""
- if self.rcs_supports_set_user_id:
+ if self.vcs_supports_set_user_id:
return
self.assertRaises(
SettingIDnotSupported,
- self.rcs.set_user_id, "foo")
+ self.vcs.set_user_id, "foo")
- def test_updates_user_id_in_supporting_rcs(self):
- """Should update the user ID in an RCS that supports it."""
- if not self.rcs_supports_set_user_id:
+ def test_updates_user_id_in_supporting_vcs(self):
+ """Should update the user ID in an VCS that supports it."""
+ if not self.vcs_supports_set_user_id:
return
- user_id = self.rcs.get_user_id()
+ user_id = self.vcs.get_user_id()
self.failUnlessEqual(
self.test_new_user_id, user_id,
"user id not set correctly (expected %s, got %s)"
% (self.test_new_user_id, user_id))
-def setup_rcs_revision_test_fixtures(testcase):
- """Set up revision test fixtures for RCS test case."""
+def setup_vcs_revision_test_fixtures(testcase):
+ """Set up revision test fixtures for VCS test case."""
testcase.test_dirs = ['a', 'a/b', 'c']
for path in testcase.test_dirs:
- testcase.rcs.mkdir(testcase.full_path(path))
+ testcase.vcs.mkdir(testcase.full_path(path))
testcase.test_files = ['a/text', 'a/b/text']
}
-class RCS_mkdir_TestCase(RCSTestCase):
- """Test cases for RCS.mkdir method."""
+class VCS_mkdir_TestCase(VCSTestCase):
+ """Test cases for VCS.mkdir method."""
def setUp(self):
- super(RCS_mkdir_TestCase, self).setUp()
- setup_rcs_revision_test_fixtures(self)
+ super(VCS_mkdir_TestCase, self).setUp()
+ setup_vcs_revision_test_fixtures(self)
def tearDown(self):
for path in reversed(sorted(self.test_dirs)):
- self.rcs.recursive_remove(self.full_path(path))
- super(RCS_mkdir_TestCase, self).tearDown()
+ self.vcs.recursive_remove(self.full_path(path))
+ super(VCS_mkdir_TestCase, self).tearDown()
def test_mkdir_creates_directory(self):
"""Should create specified directory in filesystem."""
"path %(full_path)s does not exist" % vars())
-class RCS_commit_TestCase(RCSTestCase):
- """Test cases for RCS.commit method."""
+class VCS_commit_TestCase(VCSTestCase):
+ """Test cases for VCS.commit method."""
def setUp(self):
- super(RCS_commit_TestCase, self).setUp()
- setup_rcs_revision_test_fixtures(self)
+ super(VCS_commit_TestCase, self).setUp()
+ setup_vcs_revision_test_fixtures(self)
def tearDown(self):
for path in reversed(sorted(self.test_dirs)):
- self.rcs.recursive_remove(self.full_path(path))
- super(RCS_commit_TestCase, self).tearDown()
+ self.vcs.recursive_remove(self.full_path(path))
+ super(VCS_commit_TestCase, self).tearDown()
def test_file_contents_as_specified(self):
"""Should set file contents as specified."""
test_contents = self.test_contents['rev_1']
for path in self.test_files:
full_path = self.full_path(path)
- self.rcs.set_file_contents(full_path, test_contents)
- current_contents = self.rcs.get_file_contents(full_path)
+ self.vcs.set_file_contents(full_path, test_contents)
+ current_contents = self.vcs.get_file_contents(full_path)
self.failUnlessEqual(test_contents, current_contents)
def test_file_contents_as_committed(self):
test_contents = self.test_contents['rev_1']
for path in self.test_files:
full_path = self.full_path(path)
- self.rcs.set_file_contents(full_path, test_contents)
- revision = self.rcs.commit("Initial file contents.")
- current_contents = self.rcs.get_file_contents(full_path)
+ self.vcs.set_file_contents(full_path, test_contents)
+ revision = self.vcs.commit("Initial file contents.")
+ current_contents = self.vcs.get_file_contents(full_path)
self.failUnlessEqual(test_contents, current_contents)
def test_file_contents_as_set_when_uncommitted(self):
"""Should set file contents as specified after commit."""
- if not self.rcs.versioned:
+ if not self.vcs.versioned:
return
for path in self.test_files:
full_path = self.full_path(path)
- self.rcs.set_file_contents(
+ self.vcs.set_file_contents(
full_path, self.test_contents['rev_1'])
- revision = self.rcs.commit("Initial file contents.")
- self.rcs.set_file_contents(
+ revision = self.vcs.commit("Initial file contents.")
+ self.vcs.set_file_contents(
full_path, self.test_contents['uncommitted'])
- current_contents = self.rcs.get_file_contents(full_path)
+ current_contents = self.vcs.get_file_contents(full_path)
self.failUnlessEqual(
self.test_contents['uncommitted'], current_contents)
def test_revision_file_contents_as_committed(self):
"""Should get file contents as committed to specified revision."""
- if not self.rcs.versioned:
+ if not self.vcs.versioned:
return
for path in self.test_files:
full_path = self.full_path(path)
- self.rcs.set_file_contents(
+ self.vcs.set_file_contents(
full_path, self.test_contents['rev_1'])
- revision = self.rcs.commit("Initial file contents.")
- self.rcs.set_file_contents(
+ revision = self.vcs.commit("Initial file contents.")
+ self.vcs.set_file_contents(
full_path, self.test_contents['uncommitted'])
- committed_contents = self.rcs.get_file_contents(
+ committed_contents = self.vcs.get_file_contents(
full_path, revision)
self.failUnlessEqual(
self.test_contents['rev_1'], committed_contents)
def test_revision_id_as_committed(self):
"""Check for compatibility between .commit() and .revision_id()"""
- if not self.rcs.versioned:
- self.failUnlessEqual(self.rcs.revision_id(5), None)
+ if not self.vcs.versioned:
+ self.failUnlessEqual(self.vcs.revision_id(5), None)
return
committed_revisions = []
for path in self.test_files:
full_path = self.full_path(path)
- self.rcs.set_file_contents(
+ self.vcs.set_file_contents(
full_path, self.test_contents['rev_1'])
- revision = self.rcs.commit("Initial %s contents." % path)
+ revision = self.vcs.commit("Initial %s contents." % path)
committed_revisions.append(revision)
- self.rcs.set_file_contents(
+ self.vcs.set_file_contents(
full_path, self.test_contents['uncommitted'])
- revision = self.rcs.commit("Altered %s contents." % path)
+ revision = self.vcs.commit("Altered %s contents." % path)
committed_revisions.append(revision)
for i,revision in enumerate(committed_revisions):
- self.failUnlessEqual(self.rcs.revision_id(i), revision)
+ self.failUnlessEqual(self.vcs.revision_id(i), revision)
i += -len(committed_revisions) # check negative indices
- self.failUnlessEqual(self.rcs.revision_id(i), revision)
+ self.failUnlessEqual(self.vcs.revision_id(i), revision)
i = len(committed_revisions)
- self.failUnlessEqual(self.rcs.revision_id(i), None)
- self.failUnlessEqual(self.rcs.revision_id(-i-1), None)
+ self.failUnlessEqual(self.vcs.revision_id(i), None)
+ self.failUnlessEqual(self.vcs.revision_id(-i-1), None)
def test_revision_id_as_committed(self):
"""Check revision id before first commit"""
- if not self.rcs.versioned:
- self.failUnlessEqual(self.rcs.revision_id(5), None)
+ if not self.vcs.versioned:
+ self.failUnlessEqual(self.vcs.revision_id(5), None)
return
committed_revisions = []
for path in self.test_files:
- self.failUnlessEqual(self.rcs.revision_id(0), None)
+ self.failUnlessEqual(self.vcs.revision_id(0), None)
-class RCS_duplicate_repo_TestCase(RCSTestCase):
- """Test cases for RCS.duplicate_repo method."""
+class VCS_duplicate_repo_TestCase(VCSTestCase):
+ """Test cases for VCS.duplicate_repo method."""
def setUp(self):
- super(RCS_duplicate_repo_TestCase, self).setUp()
- setup_rcs_revision_test_fixtures(self)
+ super(VCS_duplicate_repo_TestCase, self).setUp()
+ setup_vcs_revision_test_fixtures(self)
def tearDown(self):
- self.rcs.remove_duplicate_repo()
+ self.vcs.remove_duplicate_repo()
for path in reversed(sorted(self.test_dirs)):
- self.rcs.recursive_remove(self.full_path(path))
- super(RCS_duplicate_repo_TestCase, self).tearDown()
+ self.vcs.recursive_remove(self.full_path(path))
+ super(VCS_duplicate_repo_TestCase, self).tearDown()
def test_revision_file_contents_as_committed(self):
"""Should match file contents as committed to specified revision."""
- if not self.rcs.versioned:
+ if not self.vcs.versioned:
return
for path in self.test_files:
full_path = self.full_path(path)
- self.rcs.set_file_contents(
+ self.vcs.set_file_contents(
full_path, self.test_contents['rev_1'])
- revision = self.rcs.commit("Commit current status")
- self.rcs.set_file_contents(
+ revision = self.vcs.commit("Commit current status")
+ self.vcs.set_file_contents(
full_path, self.test_contents['uncommitted'])
- dup_repo_path = self.rcs.duplicate_repo(revision)
+ dup_repo_path = self.vcs.duplicate_repo(revision)
dup_file_path = os.path.join(dup_repo_path, path)
dup_file_contents = file(dup_file_path, 'rb').read()
self.failUnlessEqual(
self.test_contents['rev_1'], dup_file_contents)
- self.rcs.remove_duplicate_repo()
+ self.vcs.remove_duplicate_repo()
-def make_rcs_testcase_subclasses(rcs_class, namespace):
- """Make RCSTestCase subclasses for rcs_class in the namespace."""
- rcs_testcase_classes = [
+def make_vcs_testcase_subclasses(vcs_class, namespace):
+ """Make VCSTestCase subclasses for vcs_class in the namespace."""
+ vcs_testcase_classes = [
c for c in (
ob for ob in globals().values() if isinstance(ob, type))
- if issubclass(c, RCSTestCase)]
+ if issubclass(c, VCSTestCase)]
- for base_class in rcs_testcase_classes:
- testcase_class_name = rcs_class.__name__ + base_class.__name__
+ for base_class in vcs_testcase_classes:
+ testcase_class_name = vcs_class.__name__ + base_class.__name__
testcase_class_bases = (base_class,)
testcase_class_dict = dict(base_class.__dict__)
- testcase_class_dict['Class'] = rcs_class
+ testcase_class_dict['Class'] = vcs_class
testcase_class = type(
testcase_class_name, testcase_class_bases, testcase_class_dict)
setattr(namespace, testcase_class_name, testcase_class)