Merge from Ben Finney's RCS unittest patch
[be.git] / libbe / arch.py
1 # Copyright (C) 2005 Aaron Bentley and Panometrics, Inc.
2 # <abentley@panoramicfeedback.com>
3 #
4 #    This program is free software; you can redistribute it and/or modify
5 #    it under the terms of the GNU General Public License as published by
6 #    the Free Software Foundation; either version 2 of the License, or
7 #    (at your option) any later version.
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program; if not, write to the Free Software
16 #    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
18 import codecs
19 import os
20 import re
21 import shutil
22 import sys
23 import time
24 import unittest
25 import doctest
26
27 import config
28 from beuuid import uuid_gen
29 import rcs
30 from rcs import RCS
31
32 DEFAULT_CLIENT = "tla"
33
34 client = config.get_val("arch_client", default=DEFAULT_CLIENT)
35
36 def new():
37     return Arch()
38
39 class Arch(RCS):
40     name = "Arch"
41     client = client
42     versioned = True
43     _archive_name = None
44     _archive_dir = None
45     _tmp_archive = False
46     _project_name = None
47     _tmp_project = False
48     _arch_paramdir = os.path.expanduser("~/.arch-params")
49     def _rcs_help(self):
50         status,output,error = self._u_invoke_client("--help")
51         return output
52     def _rcs_detect(self, path):
53         """Detect whether a directory is revision-controlled using Arch"""
54         if self._u_search_parent_directories(path, "{arch}") != None :
55             config.set_val("arch_client", client)
56             return True
57         return False
58     def _rcs_init(self, path):
59         self._create_archive(path)
60         self._create_project(path)
61         self._add_project_code(path)
62     def _create_archive(self, path):
63         # Create a new archive
64         # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive
65         assert self._archive_name == None
66         id = self.get_user_id()
67         name, email = self._u_parse_id(id)
68         if email == None:
69             email = "%s@example.com" % name
70         trailer = "%s-%s" % ("bugs-everywhere-auto", uuid_gen()[0:8])
71         self._archive_name = "%s--%s" % (email, trailer)
72         self._archive_dir = "/tmp/%s" % trailer
73         self._tmp_archive = True
74         self._u_invoke_client("make-archive", self._archive_name,
75                               self._archive_dir, directory=path)
76     def _invoke_client(self, *args, **kwargs):
77         """
78         Invoke the client on our archive.
79         """
80         assert self._archive_name != None
81         command = args[0]
82         if len(args) > 1:
83             tailargs = args[1:]
84         else:
85             tailargs = []
86         arglist = [command, "-A", self._archive_name]
87         arglist.extend(tailargs)
88         args = tuple(arglist)
89         return self._u_invoke_client(*args, **kwargs)
90     def _remove_archive(self):
91         assert self._tmp_archive == True
92         assert self._archive_dir != None
93         assert self._archive_name != None
94         os.remove(os.path.join(self._arch_paramdir,
95                                "=locations", self._archive_name))
96         shutil.rmtree(self._archive_dir)
97         self._tmp_archive = False
98         self._archive_dir = False
99         self._archive_name = False
100     def _create_project(self, path):
101         """
102         Create a temporary Arch project in the directory PATH.  This
103         project will be removed by
104           __del__->cleanup->_rcs_cleanup->_remove_project
105         """
106         # http://mwolson.org/projects/GettingStartedWithArch.html
107         # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project
108         category = "bugs-everywhere"
109         branch = "mainline"
110         version = "0.1"
111         self._project_name = "%s--%s--%s" % (category, branch, version)
112         self._invoke_client("archive-setup", self._project_name,
113                             directory=path)
114         self._tmp_project = True
115     def _remove_project(self):
116         assert self._tmp_project == True
117         assert self._project_name != None
118         assert self._archive_dir != None
119         shutil.rmtree(os.path.join(self._archive_dir, self._project_name))
120         self._tmp_project = False
121         self._project_name = False
122     def _archive_project_name(self):
123         assert self._archive_name != None
124         assert self._project_name != None
125         return "%s/%s" % (self._archive_name, self._project_name)
126     def _adjust_naming_conventions(self, path):
127         """
128         By default, Arch restricts source code filenames to
129           ^[_=a-zA-Z0-9].*$
130         See
131           http://regexps.srparish.net/tutorial-tla/naming-conventions.html
132         Since our bug directory '.be' doesn't satisfy these conventions,
133         we need to adjust them.
134         
135         The conventions are specified in
136           project-root/{arch}/=tagging-method
137         """
138         tagpath = os.path.join(path, "{arch}", "=tagging-method")
139         lines_out = []
140         f = codecs.open(tagpath, "r", self.encoding)
141         for line in f:
142             if line.startswith("source "):
143                 lines_out.append("source ^[._=a-zA-X0-9].*$\n")
144             else:
145                 lines_out.append(line)
146         f.close()
147         f = codecs.open(tagpath, "w", self.encoding)
148         f.write("".join(lines_out))
149         f.close()
150
151     def _add_project_code(self, path):
152         # http://mwolson.org/projects/GettingStartedWithArch.html
153         # http://regexps.srparish.net/tutorial-tla/new-source.html
154         # http://regexps.srparish.net/tutorial-tla/importing-first.html
155         self._invoke_client("init-tree", self._project_name,
156                               directory=path)
157         self._adjust_naming_conventions(path)
158         self._invoke_client("import", "--summary", "Began versioning",
159                             directory=path)
160     def _rcs_cleanup(self):
161         if self._tmp_project == True:
162             self._remove_project()
163         if self._tmp_archive == True:
164             self._remove_archive()
165
166     def _rcs_root(self, path):
167         if not os.path.isdir(path):
168             dirname = os.path.dirname(path)
169         else:
170             dirname = path
171         status,output,error = self._u_invoke_client("tree-root", dirname)
172         root = output.rstrip('\n')
173         
174         self._get_archive_project_name(root)
175
176         return root
177
178     def _get_archive_name(self, root):
179         status,output,error = self._u_invoke_client("archives")
180         lines = output.split('\n')
181         # e.g. output:
182         # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52
183         #     /tmp/BEtestXXXXXX/rootdir
184         # (+ repeats)
185         for archive,location in zip(lines[::2], lines[1::2]):
186             if os.path.realpath(location) == os.path.realpath(root):
187                 self._archive_name = archive
188         assert self._archive_name != None
189
190     def _get_archive_project_name(self, root):
191         # get project names
192         status,output,error = self._u_invoke_client("tree-version", directory=root)
193         # e.g output
194         # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1
195         archive_name,project_name = output.rstrip('\n').split('/')
196         self._archive_name = archive_name
197         self._project_name = project_name
198     def _rcs_get_user_id(self):
199         try:
200             status,output,error = self._u_invoke_client('my-id')
201             return output.rstrip('\n')
202         except Exception, e:
203             if 'no arch user id set' in e.args[0]:
204                 return None
205             else:
206                 raise
207     def _rcs_set_user_id(self, value):
208         self._u_invoke_client('my-id', value)
209     def _rcs_add(self, path):
210         self._u_invoke_client("add-id", path)
211         realpath = os.path.realpath(self._u_abspath(path))
212         pathAdded = realpath in self._list_added(self.rootdir)
213         if self.paranoid and not pathAdded:
214             self._force_source(path)
215     def _list_added(self, root):
216         assert os.path.exists(root)
217         assert os.access(root, os.X_OK)
218         root = os.path.realpath(root)
219         status,output,error = self._u_invoke_client("inventory", "--source",
220                                                     "--both", "--all", root)
221         inv_str = output.rstrip('\n')
222         return [os.path.join(root, p) for p in inv_str.split('\n')]
223     def _add_dir_rule(self, rule, dirname, root):
224         inv_path = os.path.join(dirname, '.arch-inventory')
225         f = codecs.open(inv_path, "a", self.encoding)
226         f.write(rule)
227         f.close()
228         if os.path.realpath(inv_path) not in self._list_added(root):
229             paranoid = self.paranoid
230             self.paranoid = False
231             self.add(inv_path)
232             self.paranoid = paranoid
233     def _force_source(self, path):
234         rule = "source %s\n" % self._u_rel_path(path)
235         self._add_dir_rule(rule, os.path.dirname(path), self.rootdir)
236         if os.path.realpath(path) not in self._list_added(self.rootdir):
237             raise CantAddFile(path)
238     def _rcs_remove(self, path):
239         if not '.arch-ids' in path:
240             self._u_invoke_client("delete-id", path)
241     def _rcs_update(self, path):
242         pass
243     def _rcs_get_file_contents(self, path, revision=None):
244         if revision == None:
245             return RCS._rcs_get_file_contents(self, path, revision)
246         else:
247             status,output,error = \
248                 self._invoke_client("file-find", path, revision)
249             relpath = output.rstrip('\n')
250             abspath = os.path.join(self.rootdir, relpath)
251             f = codecs.open(abspath, "r", self.encoding)
252             contents = f.read()
253             f.close()
254             return contents
255     def _rcs_duplicate_repo(self, directory, revision=None):
256         if revision == None:
257             RCS._rcs_duplicate_repo(self, directory, revision)
258         else:
259             status,output,error = \
260                 self._u_invoke_client("get", revision,directory)
261     def _rcs_commit(self, commitfile):
262         summary,body = self._u_parse_commitfile(commitfile)
263         #status,output,error = self._invoke_client("make-log")
264         if body == None:
265             status,output,error \
266                 = self._u_invoke_client("commit","--summary",summary)
267         else:
268             status,output,error \
269                 = self._u_invoke_client("commit","--summary",summary,
270                                         "--log-message",body)
271         revision = None
272         revline = re.compile("[*] committed (.*)")
273         match = revline.search(output)
274         assert match != None, output+error
275         assert len(match.groups()) == 1
276         revpath = match.groups()[0]
277         assert not " " in revpath, revpath
278         assert revpath.startswith(self._archive_project_name()+'--')
279         revision = revpath[len(self._archive_project_name()+'--'):]
280         return revpath
281
282 class CantAddFile(Exception):
283     def __init__(self, file):
284         self.file = file
285         Exception.__init__(self, "Can't automatically add file %s" % file)
286
287
288 \f
289 rcs.make_rcs_testcase_subclasses(Arch, sys.modules[__name__])
290
291 unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
292 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])