Converted libbe.storage.vcs.hg to new Storage format.
authorW. Trevor King <wking@drexel.edu>
Sun, 13 Dec 2009 12:20:31 +0000 (07:20 -0500)
committerW. Trevor King <wking@drexel.edu>
Sun, 13 Dec 2009 12:20:31 +0000 (07:20 -0500)
libbe/storage/base.py
libbe/storage/vcs/base.py
libbe/storage/vcs/hg.py

index 84197966a2b7ce3402b9105429ed9e67a2ffca46..56b59ba286432761e6b87a0ff3e2674ed6958168 100644 (file)
@@ -149,7 +149,7 @@ class Storage (object):
         return self._init()
 
     def _init(self):
-        f = open(self.repo, 'wb')
+        f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
         root = Entry(id='__ROOT__', directory=True)
         d = {root.id:root}
         pickle.dump(dict((k,v._objects_to_ids()) for k,v in d.items()), f, -1)
@@ -162,7 +162,7 @@ class Storage (object):
         return self._destroy()
 
     def _destroy(self):
-        os.remove(self.repo)
+        os.remove(os.path.join(self.repo, 'repo.pkl'))
 
     def connect(self):
         """Open a connection to the repository."""
@@ -172,7 +172,7 @@ class Storage (object):
 
     def _connect(self):
         try:
-            f = open(self.repo, 'rb')
+            f = open(os.path.join(self.repo, 'repo.pkl'), 'rb')
         except IOError:
             raise ConnectionError(self)
         d = pickle.load(f)
@@ -183,7 +183,7 @@ class Storage (object):
         """Close the connection to the repository."""
         if self.is_writeable() == False:
             return
-        f = open(self.repo, 'wb')
+        f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
         pickle.dump(dict((k,v._objects_to_ids())
                          for k,v in self._data.items()), f, -1)
         f.close()
@@ -299,7 +299,7 @@ class VersionedStorage (Storage):
         self.versioned = True
 
     def _init(self):
-        f = open(self.repo, 'wb')
+        f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
         root = Entry(id='__ROOT__', directory=True)
         summary = Entry(id='__COMMIT__SUMMARY__', value='Initial commit')
         body = Entry(id='__COMMIT__BODY__')
@@ -310,7 +310,7 @@ class VersionedStorage (Storage):
 
     def _connect(self):
         try:
-            f = open(self.repo, 'rb')
+            f = open(os.path.join(self.repo, 'repo.pkl'), 'rb')
         except IOError:
             raise ConnectionError(self)
         d = pickle.load(f)
@@ -322,7 +322,7 @@ class VersionedStorage (Storage):
         """Close the connection to the repository."""
         if self.is_writeable() == False:
             return
-        f = open(self.repo, 'wb')
+        f = open(os.path.join(self.repo, 'repo.pkl'), 'wb')
         pickle.dump([dict((k,v._objects_to_ids())
                           for k,v in t.items()) for t in self._data], f, -1)
         f.close()
@@ -426,7 +426,7 @@ if TESTING == True:
             super(StorageTestCase, self).setUp()
             self.dir = Dir()
             self.dirname = self.dir.path
-            self.s = self.Class(repo=os.path.join(self.dirname, 'repo.pkl'))
+            self.s = self.Class(repo=self.dirname)
             self.assert_failed_connect()
             self.s.init()
             self.s.connect()
index 8c0ecf5c6d3424a42f66408f61faa0af40e8e1a6..faa891a0d8f3a358e7ab1309c52194cbcaaa661f 100644 (file)
@@ -83,10 +83,17 @@ def installed_vcs():
     return _get_matching_vcs(lambda vcs: vcs.installed())
 
 
-class VCSnotRooted (libbe.storage.base.ConnectionError):
-    def __init__(self):
+class VCSNotRooted (libbe.storage.base.ConnectionError):
+    def __init__(self, vcs):
         msg = 'VCS not rooted'
         libbe.storage.base.ConnectionError.__init__(self, msg)
+        self.vcs = vcs
+
+class VCSUnableToRoot (libbe.storage.base.ConnectionError):
+    def __init__(self, vcs):
+        msg = 'VCS unable to root'
+        libbe.storage.base.ConnectionError.__init__(self, msg)
+        self.vcs = vcs
 
 class InvalidPath (libbe.storage.base.InvalidID):
     def __init__(self, path, root, msg=None):
@@ -589,7 +596,10 @@ os.listdir(self.get_path("bugs")):
         Set the root directory to the path's VCS root.  This is the
         default working directory for future invocations.
         """
-        self.repo = os.path.abspath(self._vcs_root(self.repo))
+        root = self._vcs_root(self.repo)
+        if root == None:
+            raise VCSUnableToRoot(self)
+        self.repo = os.path.abspath(root)
         if os.path.isdir(self.repo) == False:
             self.repo = os.path.dirname(self.repo)
         self.be_dir = os.path.join(
@@ -601,8 +611,8 @@ os.listdir(self.get_path("bugs")):
         Begin versioning the tree based at self.repo.
         Also roots the vcs at path.
         """
+        self._vcs_init(self.repo)
         self.root()
-        self._vcs_init()
         os.mkdir(self.be_dir)
         self._vcs_add(self._u_rel_path(self.be_dir))
         self._cached_path_id.init()
@@ -798,7 +808,7 @@ os.listdir(self.get_path("bugs")):
                 exception = InvalidPath(path, self.repo)
         else:
             use_vcs = False
-            exception = VCSnotRooted
+            exception = VCSNotRooted(self)
         if use_vcs == False and allow_no_vcs==False:
             raise exception
         return use_vcs
@@ -814,7 +824,7 @@ os.listdir(self.get_path("bugs")):
         """
         if root == None:
             if self.repo == None:
-                raise VCSnotRooted
+                raise VCSNotRooted(self)
             root = self.repo
         path = os.path.abspath(path)
         absRoot = os.path.abspath(root)
@@ -832,7 +842,7 @@ os.listdir(self.get_path("bugs")):
         """
         if root == None:
             if self.repo == None:
-                raise VCSnotRooted
+                raise VCSNotRooted(self)
             root = self.repo
         path = os.path.abspath(path)
         absRoot = os.path.abspath(root)
@@ -929,6 +939,7 @@ if libbe.TESTING == True:
                 self.s.destroy()
             self.dir.cleanup()
 
+    class VCS_installed_TestCase (VCSTestCase):
         def test_installed(self):
             """
             See if the VCS is installed.
@@ -976,21 +987,23 @@ if libbe.TESTING == True:
 
         def test_gets_existing_user_id(self):
             """Should get the existing user ID."""
-            user_id = self.s.get_user_id()
-            if user_id == None:
-                return
-            name,email = libbe.ui.util.user.parse_user_id(user_id)
-            if email != None:
-                self.failUnless('@' in email, email)
-
-    def make_vcs_testcase_subclasses(storage_class, namespace):
-        c = storage_class()
-        if c.versioned == True:
-            libbe.storage.base.make_versioned_storage_testcase_subclasses(
-                storage_class, namespace)
-        else:
-            libbe.storage.base.make_storage_testcase_subclasses(
-                storage_class, namespace)
+            if self.s.installed():
+                user_id = self.s.get_user_id()
+                if user_id == None:
+                    return
+                name,email = libbe.ui.util.user.parse_user_id(user_id)
+                if email != None:
+                    self.failUnless('@' in email, email)
+
+    def make_vcs_testcase_subclasses(vcs_class, namespace):
+        c = vcs_class()
+        if c.installed():
+            if c.versioned == True:
+                libbe.storage.base.make_versioned_storage_testcase_subclasses(
+                    vcs_class, namespace)
+            else:
+                libbe.storage.base.make_storage_testcase_subclasses(
+                    vcs_class, namespace)
 
         if namespace != sys.modules[__name__]:
             # Make VCSTestCase subclasses for vcs_class in the namespace.
index ed277179201bb534451a4fef2dab7dba00680d92..67c5bf3114fa7c5f6a43b88b0f3927ea6a91659f 100644 (file)
@@ -26,7 +26,7 @@ import re
 import sys
 
 import libbe
-import vcs
+import base
 
 if libbe.TESTING == True:
     import unittest
@@ -36,62 +36,67 @@ if libbe.TESTING == True:
 def new():
     return Hg()
 
-class Hg(vcs.VCS):
-    name="hg"
-    client="hg"
+class Hg(base.VCS):
+    name='hg'
+    client='hg'
     versioned=True
+
     def _vcs_version(self):
-        status,output,error = self._u_invoke_client("--version")
+        status,output,error = self._u_invoke_client('--version')
         return output
+
+    def _vcs_get_user_id(self):
+        status,output,error = self._u_invoke_client(
+            'showconfig', 'ui.username')
+        return output.rstrip('\n')
+
     def _vcs_detect(self, path):
         """Detect whether a directory is revision-controlled using Mercurial"""
-        if self._u_search_parent_directories(path, ".hg") != None:
+        if self._u_search_parent_directories(path, '.hg') != None:
             return True
         return False
+
     def _vcs_root(self, path):
-        status,output,error = self._u_invoke_client("root", cwd=path)
+        status,output,error = self._u_invoke_client(
+            'root', expect=(0,255), cwd=path)
+        if status == 255:
+            # "abort: There is no Mercurial repository here
+            #  (.hg not found)!"
+            return None
         return output.rstrip('\n')
+
     def _vcs_init(self, path):
-        self._u_invoke_client("init", cwd=path)
-    def _vcs_get_user_id(self):
-        status,output,error = self._u_invoke_client("showconfig","ui.username")
-        return output.rstrip('\n')
-    def _vcs_set_user_id(self, value):
-        """
-        Supported by the Config Extension, but that is not part of
-        standard Mercurial.
-        http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension
-        """
-        raise vcs.SettingIDnotSupported
+        self._u_invoke_client('init', cwd=path)
+
     def _vcs_add(self, path):
-        self._u_invoke_client("add", path)
+        self._u_invoke_client('add', path)
+
     def _vcs_remove(self, path):
-        self._u_invoke_client("rm", "--force", path)
+        self._u_invoke_client('rm', '--force', path)
+
     def _vcs_update(self, path):
         pass
-    def _vcs_get_file_contents(self, path, revision=None, binary=False):
+
+    def _vcs_get_file_contents(self, path, revision=None):
         if revision == None:
-            return vcs.VCS._vcs_get_file_contents(self, path, revision, binary=binary)
+            return base.VCS._vcs_get_file_contents(self, path, revision)
         else:
             status,output,error = \
-                self._u_invoke_client("cat","-r",revision,path)
+                self._u_invoke_client('cat', '-r', revision, path)
             return output
-    def _vcs_duplicate_repo(self, directory, revision=None):
-        if revision == None:
-            return vcs.VCS._vcs_duplicate_repo(self, directory, revision)
-        else:
-            self._u_invoke_client("archive", "--rev", revision, directory)
+
     def _vcs_commit(self, commitfile, allow_empty=False):
         args = ['commit', '--logfile', commitfile]
         status,output,error = self._u_invoke_client(*args)
         if allow_empty == False:
-            strings = ["nothing changed"]
+            strings = ['nothing changed']
             if self._u_any_in_string(strings, output) == True:
-                raise vcs.EmptyCommit()
+                raise base.EmptyCommit()
         return self._vcs_revision_id(-1)
-    def _vcs_revision_id(self, index, style="id"):
-        args = ["identify", "--rev", str(int(index)), "--%s" % style]
-        kwargs = {"expect": (0,255)}
+
+    def _vcs_revision_id(self, index, style='id'):
+        args = ['identify', '--rev', str(int(index)), '--%s' % style]
+        kwargs = {'expect': (0,255)}
         status,output,error = self._u_invoke_client(*args, **kwargs)
         if status == 0:
             id = output.strip()
@@ -102,7 +107,7 @@ class Hg(vcs.VCS):
 
 \f    
 if libbe.TESTING == True:
-    vcs.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
+    base.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
 
     unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
     suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])