Bzr storage now based off bzrlib module, not 'bzr' executible.
[be.git] / libbe / storage / vcs / bzr.py
1 # Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
2 #                         Ben Finney <benf@cybersource.com.au>
3 #                         Gianluca Montecchi <gian@grys.it>
4 #                         Marien Zwart <marienz@gentoo.org>
5 #                         W. Trevor King <wking@drexel.edu>
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20
21 """
22 Bazaar (bzr) backend.
23 """
24
25 try:
26     import bzrlib
27     import bzrlib.branch
28     import bzrlib.builtins
29     import bzrlib.config
30     import bzrlib.errors
31     import bzrlib.option
32 except ImportError:
33     bzrlib = None
34 import os
35 import os.path
36 import re
37 import shutil
38 import StringIO
39
40 import libbe
41 import base
42
43 if libbe.TESTING == True:
44     import doctest
45     import sys
46     import unittest
47
48
49 def new():
50     return Bzr()
51
52 class Bzr(base.VCS):
53     name = 'bzr'
54     client = None # bzrlib
55
56     def __init__(self, *args, **kwargs):
57         base.VCS.__init__(self, *args, **kwargs)
58         self.versioned = True
59
60     def _vcs_version(self):
61         if bzrlib == None:
62             return None
63         return bzrlib.__version__
64
65     def _vcs_get_user_id(self):
66         # excerpted from bzrlib.builtins.cmd_whoami.run()
67         try:
68             c = bzrlib.branch.Branch.open_containing(self.repo)[0].get_config()
69         except errors.NotBranchError:
70             c = bzrlib.config.GlobalConfig()
71         return c.username()
72
73     def _vcs_detect(self, path):
74         if self._u_search_parent_directories(path, '.bzr') != None :
75             return True
76         return False
77
78     def _vcs_root(self, path):
79         """Find the root of the deepest repository containing path."""
80         cmd = bzrlib.builtins.cmd_root()
81         cmd.outf = StringIO.StringIO()
82         cmd.run(filename=path)
83         return cmd.outf.getvalue().rstrip('\n')
84
85     def _vcs_init(self, path):
86         cmd = bzrlib.builtins.cmd_init()
87         cmd.outf = StringIO.StringIO()
88         cmd.run(location=path)
89
90     def _vcs_destroy(self):
91         vcs_dir = os.path.join(self.repo, '.bzr')
92         if os.path.exists(vcs_dir):
93             shutil.rmtree(vcs_dir)
94
95     def _vcs_add(self, path):
96         path = os.path.join(self.repo, path)
97         cmd = bzrlib.builtins.cmd_add()
98         cmd.outf = StringIO.StringIO()
99         cmd.run(file_list=[path], file_ids_from=self.repo)
100
101     def _vcs_remove(self, path):
102         # --force to also remove unversioned files.
103         path = os.path.join(self.repo, path)
104         cmd = bzrlib.builtins.cmd_remove()
105         cmd.outf = StringIO.StringIO()
106         cmd.run(file_list=[path], file_deletion_strategy='force')
107
108     def _vcs_update(self, path):
109         pass
110
111     def _parse_revision_string(self, revision=None):
112         if revision == None:
113             return revision
114         rev_opt = bzrlib.option.Option.OPTIONS['revision']
115         try:
116             rev_spec = rev_opt.type(revision)
117         except bzrlib.errors.NoSuchRevisionSpec:
118             raise base.InvalidRevision(revision)
119         return rev_spec
120
121     def _vcs_get_file_contents(self, path, revision=None):
122         if revision == None:
123             return base.VCS._vcs_get_file_contents(self, path, revision)
124         path = os.path.join(self.repo, path)
125         revision = self._parse_revision_string(revision)
126         cmd = bzrlib.builtins.cmd_cat()
127         cmd.outf = StringIO.StringIO()
128         try:
129             cmd.run(filename=path, revision=revision)
130         except bzrlib.errors.BzrCommandError, e:
131             if 'not present in revision' in str(e):
132                 raise base.InvalidID(path)
133             raise
134         return cmd.outf.getvalue()        
135
136     def _vcs_commit(self, commitfile, allow_empty=False):
137         cmd = bzrlib.builtins.cmd_commit()
138         cmd.outf = StringIO.StringIO()
139         cwd = os.getcwd()
140         os.chdir(self.repo)
141         try:
142             cmd.run(file=commitfile, unchanged=allow_empty)
143         except bzrlib.errors.BzrCommandError, e:
144             strings = ['no changes to commit.', # bzr 1.3.1
145                        'No changes to commit.'] # bzr 1.15.1
146             if self._u_any_in_string(strings, str(e)) == True:
147                 raise base.EmptyCommit()
148             raise
149         finally:
150             os.chdir(cwd)
151         return self._vcs_revision_id(-1)
152
153     def _vcs_revision_id(self, index):
154         cmd = bzrlib.builtins.cmd_revno()
155         cmd.outf = StringIO.StringIO()
156         cmd.run(location=self.repo)
157         current_revision = int(cmd.outf.getvalue())
158         if index > current_revision or index < -current_revision:
159             return None
160         if index >= 0:
161             return str(index) # bzr commit 0 is the empty tree.
162         return str(current_revision+index+1)
163
164 \f
165 if libbe.TESTING == True:
166     base.make_vcs_testcase_subclasses(Bzr, sys.modules[__name__])
167
168     unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
169     suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])