Mercurial (hg) backend.
"""
+try:
+ # enable importing on demand to reduce startup time
+ from mercurial import demandimport; demandimport.enable()
+ import mercurial
+ import mercurial.version
+ import mercurial.dispatch
+ import mercurial.ui
+except ImportError:
+ mercurial = None
import os
import os.path
import re
import shutil
+import StringIO
+import sys
import time # work around http://mercurial.selenic.com/bts/issue618
import libbe
class Hg(base.VCS):
name='hg'
- client='hg'
+ client=None # mercurial module
def __init__(self, *args, **kwargs):
base.VCS.__init__(self, *args, **kwargs)
self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618
def _vcs_version(self):
- status,output,error = self._u_invoke_client('--version')
- return output
+ if mercurial == None:
+ return None
+ return mercurial.version.get_version()
+
+ def _u_invoke_client(self, *args, **kwargs):
+ if 'cwd' not in kwargs:
+ kwargs['cwd'] = self.repo
+ assert len(kwargs) == 1, kwargs
+ ui = mercurial.ui.ui(interactive=False)
+ fullargs = ['--cwd', kwargs['cwd']]
+ fullargs.extend(args)
+ stdout = sys.stdout
+ tmp_stdout = StringIO.StringIO()
+ sys.stdout = tmp_stdout
+ mercurial.dispatch._dispatch(ui, fullargs)
+ sys.stdout = stdout
+ return tmp_stdout.getvalue().rstrip('\n')
def _vcs_get_user_id(self):
- status,output,error = self._u_invoke_client(
- 'showconfig', 'ui.username')
- return output.rstrip('\n')
+ return self._u_invoke_client('showconfig', 'ui.username')
def _vcs_detect(self, path):
"""Detect whether a directory is revision-controlled using Mercurial"""
return False
def _vcs_root(self, path):
- status,output,error = self._u_invoke_client('root', cwd=path)
- return output.rstrip('\n')
+ return self._u_invoke_client('root', cwd=path)
def _vcs_init(self, path):
self._u_invoke_client('init', cwd=path)
if revision == None:
return base.VCS._vcs_get_file_contents(self, path, revision)
else:
- status,output,error = \
- self._u_invoke_client('cat', '-r', revision, path)
- return output
+ return self._u_invoke_client('cat', '-r', revision, path)
def _vcs_commit(self, commitfile, allow_empty=False):
args = ['commit', '--logfile', commitfile]
- status,output,error = self._u_invoke_client(*args)
+ output = self._u_invoke_client(*args)
# work around http://mercurial.selenic.com/bts/issue618
strings = ['nothing changed']
if self._u_any_in_string(strings, output) == True \
time.sleep(1)
for path in self.__updated:
os.utime(os.path.join(self.repo, path), None)
- status,output,error = self._u_invoke_client(*args)
+ output = self._u_invoke_client(*args)
self.__updated = []
# end work around
if allow_empty == False:
if index > 0:
index -= 1
args = ['identify', '--rev', str(int(index)), '--%s' % style]
- kwargs = {'expect': (0,255)}
- status,output,error = self._u_invoke_client(*args, **kwargs)
- if status == 0:
- id = output.strip()
- if id == '000000000000':
- return None # before initial commit.
- return id
- return None
+ output = self._u_invoke_client(*args)
+ id = output.strip()
+ if id == '000000000000':
+ return None # before initial commit.
+ return id
\f
if libbe.TESTING == True: