Work around mercurial (hg) issue 618.
[be.git] / libbe / storage / vcs / hg.py
1 # Copyright (C) 2007-2009 Aaron Bentley and Panometrics, Inc.
2 #                         Ben Finney <benf@cybersource.com.au>
3 #                         Gianluca Montecchi <gian@grys.it>
4 #                         W. Trevor King <wking@drexel.edu>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with this program; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 """
21 Mercurial (hg) backend.
22 """
23
24 import os
25 import os.path
26 import re
27 import shutil
28 import time # work around http://mercurial.selenic.com/bts/issue618
29
30 import libbe
31 import base
32
33 if libbe.TESTING == True:
34     import doctest
35     import sys
36     import unittest
37
38
39 def new():
40     return Hg()
41
42 class Hg(base.VCS):
43     name='hg'
44     client='hg'
45
46     def __init__(self, *args, **kwargs):
47         base.VCS.__init__(self, *args, **kwargs)
48         self.versioned = True
49         self.__updated = [] # work around http://mercurial.selenic.com/bts/issue618
50
51     def _vcs_version(self):
52         status,output,error = self._u_invoke_client('--version')
53         return output
54
55     def _vcs_get_user_id(self):
56         status,output,error = self._u_invoke_client(
57             'showconfig', 'ui.username')
58         return output.rstrip('\n')
59
60     def _vcs_detect(self, path):
61         """Detect whether a directory is revision-controlled using Mercurial"""
62         if self._u_search_parent_directories(path, '.hg') != None:
63             return True
64         return False
65
66     def _vcs_root(self, path):
67         status,output,error = self._u_invoke_client('root', cwd=path)
68         return output.rstrip('\n')
69
70     def _vcs_init(self, path):
71         self._u_invoke_client('init', cwd=path)
72
73     def _vcs_destroy(self):
74         vcs_dir = os.path.join(self.repo, '.hg')
75         if os.path.exists(vcs_dir):
76             shutil.rmtree(vcs_dir)
77
78     def _vcs_add(self, path):
79         self._u_invoke_client('add', path)
80
81     def _vcs_remove(self, path):
82         self._u_invoke_client('rm', '--force', path)
83
84     def _vcs_update(self, path):
85         self.__updated.append(path) # work around http://mercurial.selenic.com/bts/issue618
86
87     def _vcs_get_file_contents(self, path, revision=None):
88         if revision == None:
89             return base.VCS._vcs_get_file_contents(self, path, revision)
90         else:
91             status,output,error = \
92                 self._u_invoke_client('cat', '-r', revision, path)
93             return output
94
95     def _vcs_commit(self, commitfile, allow_empty=False):
96         args = ['commit', '--logfile', commitfile]
97         status,output,error = self._u_invoke_client(*args)
98         # work around http://mercurial.selenic.com/bts/issue618
99         strings = ['nothing changed']
100         if self._u_any_in_string(strings, output) == True \
101                 and len(self.__updated) > 0:
102             time.sleep(1)
103             for path in self.__updated:
104                 os.utime(os.path.join(self.repo, path), None)
105             status,output,error = self._u_invoke_client(*args)
106         self.__updated = []
107         # end work around
108         if allow_empty == False:
109             strings = ['nothing changed']
110             if self._u_any_in_string(strings, output) == True:
111                 raise base.EmptyCommit()
112         return self._vcs_revision_id(-1)
113
114     def _vcs_revision_id(self, index, style='id'):
115         args = ['identify', '--rev', str(int(index)), '--%s' % style]
116         kwargs = {'expect': (0,255)}
117         status,output,error = self._u_invoke_client(*args, **kwargs)
118         if status == 0:
119             id = output.strip()
120             if id == '000000000000':
121                 return None # before initial commit.
122             return id
123         return None
124
125 \f    
126 if libbe.TESTING == True:
127     base.make_vcs_testcase_subclasses(Hg, sys.modules[__name__])
128
129     unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
130     suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])