1 # Copyright (C) 2008-2012 Ben Finney <benf@cybersource.com.au>
2 # Chris Ball <cjb@laptop.org>
3 # Gianluca Montecchi <gian@grys.it>
4 # Robert Lehmann <mail@robertlehmann.de>
5 # W. Trevor King <wking@drexel.edu>
7 # This file is part of Bugs Everywhere.
9 # Bugs Everywhere is free software: you can redistribute it and/or modify it
10 # under the terms of the GNU General Public License as published by the Free
11 # Software Foundation, either version 2 of the License, or (at your option) any
14 # Bugs Everywhere is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
16 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
19 # You should have received a copy of the GNU General Public License along with
20 # Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
24 .. _Git: http://git-scm.com/
34 import pygit2 as _pygit2
35 except ImportError, error:
37 _pygit2_import_error = error
40 from ...ui.util import user as _user
41 from ...util import encoding as _encoding
42 from ..base import EmptyCommit as _EmptyCommit
45 if libbe.TESTING == True:
57 class PygitGit(base.VCS):
58 """:py:class:`base.VCS` implementation for Git.
60 Using :py:mod:`pygit2` for the Git activity.
64 _null_oid = '\00' * 20
66 def __init__(self, *args, **kwargs):
67 base.VCS.__init__(self, *args, **kwargs)
69 self._pygit_repository = None
71 def __getstate__(self):
72 """`pygit2.Repository`\s don't seem to pickle well.
74 attrs = dict(self.__dict__)
75 if self._pygit_repository is not None:
76 attrs['_pygit_repository'] = self._pygit_repository.path
79 def __setstate__(self, state):
80 """`pygit2.Repository`\s don't seem to pickle well.
82 self.__dict__.update(state)
83 if self._pygit_repository is not None:
84 gitdir = self._pygit_repository
85 self._pygit_repository = _pygit2.Repository(gitdir)
87 def _vcs_version(self):
89 return getattr(_pygit2, '__verison__', '?')
92 def _vcs_get_user_id(self):
94 name = self._pygit_repository.config['user.name']
98 email = self._pygit_repository.config['user.email']
101 if name != '' or email != '': # got something!
102 # guess missing info, if necessary
104 name = _user.get_fallback_fullname()
106 email = _user.get_fallback_email()
108 raise ValueError((name, email))
109 return _user.create_user_id(name, email)
110 return None # Git has no infomation
112 def _vcs_detect(self, path):
114 _pygit2.discover_repository(path)
119 def _vcs_root(self, path):
120 """Find the root of the deepest repository containing path."""
121 # Assume that nothing funny is going on; in particular, that we aren't
122 # dealing with a bare repo.
123 gitdir = _pygit2.discover_repository(path)
124 self._pygit_repository = _pygit2.Repository(gitdir)
125 dirname,tip = os.path.split(gitdir)
126 if tip == '': # split('x/y/z/.git/') == ('x/y/z/.git', '')
127 dirname,tip = os.path.split(dirname)
128 assert tip == '.git', tip
131 def _vcs_init(self, path):
133 self._pygit_repository = _pygit2.init_repository(path, bare)
135 def _vcs_destroy(self):
136 vcs_dir = os.path.join(self.repo, '.git')
137 if os.path.exists(vcs_dir):
138 shutil.rmtree(vcs_dir)
140 def _vcs_add(self, path):
141 abspath = self._u_abspath(path)
142 if os.path.isdir(abspath):
144 self._pygit_repository.index.read()
145 self._pygit_repository.index.add(path)
146 self._pygit_repository.index.write()
148 def _vcs_remove(self, path):
149 abspath = self._u_abspath(path)
150 if not os.path.isdir(self._u_abspath(abspath)):
151 self._pygit_repository.index.read()
152 del self._pygit_repository.index[path]
153 self._pygit_repository.index.write()
154 os.remove(os.path.join(self.repo, path))
156 def _vcs_update(self, path):
159 def _git_get_commit(self, revision):
160 if isinstance(revision, str):
161 revision = unicode(revision, 'ascii')
162 commit = self._pygit_repository.revparse_single(revision)
163 assert commit.type == _pygit2.GIT_OBJ_COMMIT, commit
166 def _git_get_object(self, path, revision):
167 commit = self._git_get_commit(revision=revision)
169 sections = path.split(os.path.sep)
170 for section in sections[:-1]: # traverse trees
173 if entry.name == section:
174 eobj = entry.to_object()
175 if eobj.type == _pygit2.GIT_OBJ_TREE:
179 raise ValueError(path) # not a directory
180 if child_tree is None:
181 raise ValueError((path, sections, section, [e.name for e in tree]))
182 raise ValueError(path) # not found
186 if entry.name == sections[-1]:
187 eobj = entry.to_object()
190 def _vcs_get_file_contents(self, path, revision=None):
192 return base.VCS._vcs_get_file_contents(self, path, revision)
194 blob = self._git_get_object(path=path, revision=revision)
195 if blob.type != _pygit2.GIT_OBJ_BLOB:
196 raise ValueError(path) # not a file
197 return blob.read_raw()
199 def _vcs_path(self, id, revision):
200 return self._u_find_id(id, revision)
202 def _vcs_isdir(self, path, revision):
203 obj = self._git_get_object(path=path, revision=revision)
204 return obj.type == _pygit2.GIT_OBJ_TREE
206 def _vcs_listdir(self, path, revision):
207 tree = self._git_get_object(path=path, revision=revision)
208 assert tree.type == _pygit2.GIT_OBJ_TREE, tree
209 return [e.name for e in tree]
211 def _vcs_commit(self, commitfile, allow_empty=False):
212 self._pygit_repository.index.read()
213 tree_oid = self._pygit_repository.index.write_tree()
215 self._pygit_repository.head
216 except _pygit2.GitError: # no head; this is the first commit
218 tree = self._pygit_repository[tree_oid]
219 if not allow_empty and len(tree) == 0:
222 parents = [self._pygit_repository.head.oid]
223 if (not allow_empty and
224 tree_oid == self._pygit_repository.head.tree.oid):
227 user_id = self.get_user_id()
228 name,email = _user.parse_user_id(user_id)
229 # using default times is recent, see
230 # https://github.com/libgit2/pygit2/pull/129
231 author = _pygit2.Signature(name, email)
233 message = _encoding.get_file_contents(commitfile, decode=False)
234 encoding = _encoding.get_text_file_encoding()
235 commit_oid = self._pygit_repository.create_commit(
236 update_ref, author, committer, message, tree_oid, parents,
238 commit = self._pygit_repository[commit_oid]
241 def _vcs_revision_id(self, index):
242 walker = self._pygit_repository.walk(
243 self._pygit_repository.head.oid, _pygit2.GIT_SORT_TIME)
245 target_i = -1 - index # -1: 0, -2: 1, ...
246 for i,commit in enumerate(walker):
250 revisions = [commit.hex for commit in walker]
251 # revisions is [newest, older, ..., oldest]
252 if index > len(revisions):
254 return revisions[len(revisions) - index]
256 raise NotImplementedError('initial revision')
259 def _vcs_changed(self, revision):
260 commit = self._git_get_commit(revision=revision)
261 diff = commit.tree.diff(self._pygit_repository.head.tree)
265 for hunk in diff.changes['hunks']:
266 if hunk.old_oid == self._null_hex: # pygit2 uses hex in hunk.*_oid
267 new.add(hunk.new_file)
268 elif hunk.new_oid == self._null_hex:
269 removed.add(hunk.old_file)
271 modified.add(hunk.new_file)
272 return (list(new), list(modified), list(removed))
275 class ExecGit (PygitGit):
276 """:class:`base.VCS` implementation for Git.
281 def _vcs_version(self):
283 status,output,error = self._u_invoke_client('--version')
284 except CommandError: # command not found?
286 return output.strip()
288 def _vcs_get_user_id(self):
289 status,output,error = self._u_invoke_client(
290 'config', 'user.name', expect=(0,1))
292 name = output.rstrip('\n')
295 status,output,error = self._u_invoke_client(
296 'config', 'user.email', expect=(0,1))
298 email = output.rstrip('\n')
301 if name != '' or email != '': # got something!
302 # guess missing info, if necessary
304 name = _user.get_fallback_fullname()
306 email = _user.get_fallback_email()
307 return _user.create_user_id(name, email)
308 return None # Git has no infomation
310 def _vcs_detect(self, path):
311 if self._u_search_parent_directories(path, '.git') != None :
315 def _vcs_root(self, path):
316 """Find the root of the deepest repository containing path."""
317 # Assume that nothing funny is going on; in particular, that we aren't
318 # dealing with a bare repo.
319 if os.path.isdir(path) != True:
320 path = os.path.dirname(path)
321 status,output,error = self._u_invoke_client('rev-parse', '--git-dir',
323 gitdir = os.path.join(path, output.rstrip('\n'))
324 dirname = os.path.abspath(os.path.dirname(gitdir))
327 def _vcs_init(self, path):
328 self._u_invoke_client('init', cwd=path)
330 def _vcs_destroy(self):
331 vcs_dir = os.path.join(self.repo, '.git')
332 if os.path.exists(vcs_dir):
333 shutil.rmtree(vcs_dir)
335 def _vcs_add(self, path):
336 if os.path.isdir(path):
338 self._u_invoke_client('add', path)
340 def _vcs_remove(self, path):
341 if not os.path.isdir(self._u_abspath(path)):
342 self._u_invoke_client('rm', '-f', path)
344 def _vcs_update(self, path):
347 def _vcs_get_file_contents(self, path, revision=None):
349 return base.VCS._vcs_get_file_contents(self, path, revision)
351 arg = '%s:%s' % (revision,path)
352 status,output,error = self._u_invoke_client('show', arg)
355 def _vcs_path(self, id, revision):
356 return self._u_find_id(id, revision)
358 def _vcs_isdir(self, path, revision):
359 arg = '%s:%s' % (revision,path)
360 args = ['ls-tree', arg]
361 kwargs = {'expect':(0,128)}
362 status,output,error = self._u_invoke_client(*args, **kwargs)
364 if 'not a tree object' in error:
366 raise base.CommandError(args, status, stderr=error)
369 def _vcs_listdir(self, path, revision):
370 arg = '%s:%s' % (revision,path)
371 status,output,error = self._u_invoke_client(
372 'ls-tree', '--name-only', arg)
373 return output.rstrip('\n').splitlines()
375 def _vcs_commit(self, commitfile, allow_empty=False):
376 args = ['commit', '--file', commitfile]
377 if allow_empty == True:
378 args.append('--allow-empty')
379 status,output,error = self._u_invoke_client(*args)
381 kwargs = {'expect':(0,1)}
382 status,output,error = self._u_invoke_client(*args, **kwargs)
383 strings = ['nothing to commit',
384 'nothing added to commit']
385 if self._u_any_in_string(strings, output) == True:
386 raise base.EmptyCommit()
387 full_revision = self._vcs_revision_id(-1)
388 assert full_revision[:7] in output, \
389 'Mismatched revisions:\n%s\n%s' % (full_revision, output)
392 def _vcs_revision_id(self, index):
393 args = ['rev-list', '--first-parent', '--reverse', 'HEAD']
394 kwargs = {'expect':(0,128)}
395 status,output,error = self._u_invoke_client(*args, **kwargs)
397 if error.startswith("fatal: ambiguous argument 'HEAD': unknown "):
399 raise base.CommandError(args, status, stderr=error)
400 revisions = output.splitlines()
403 return revisions[index-1]
405 return revisions[index]
411 def _diff(self, revision):
412 status,output,error = self._u_invoke_client('diff', revision)
415 def _parse_diff(self, diff_text):
416 """_parse_diff(diff_text) -> (new,modified,removed)
418 `new`, `modified`, and `removed` are lists of files.
422 diff --git a/dir/changed b/dir/changed
423 index 6c3ea8c..2f2f7c7 100644
431 diff --git a/dir/deleted b/dir/deleted
432 deleted file mode 100644
433 index 225ec04..0000000
440 diff --git a/dir/moved b/dir/moved
441 deleted file mode 100644
442 index 5ef102f..0000000
450 diff --git a/dir/moved2 b/dir/moved2
452 index 0000000..5ef102f
460 diff --git a/dir/new b/dir/new
462 index 0000000..94954ab
472 lines = diff_text.splitlines()
473 for i,line in enumerate(lines):
474 if not line.startswith('diff '):
476 file_a,file_b = line.split()[-2:]
477 assert file_a.startswith('a/'), \
478 'missformed file_a %s' % file_a
479 assert file_b.startswith('b/'), \
480 'missformed file_b %s' % file_b
482 assert file_b[2:] == file, \
483 'diff file missmatch %s != %s' % (file_a, file_b)
484 if lines[i+1].startswith('new '):
486 elif lines[i+1].startswith('index '):
487 modified.append(file)
488 elif lines[i+1].startswith('deleted '):
490 return (new,modified,removed)
492 def _vcs_changed(self, revision):
493 return self._parse_diff(self._diff(revision))
496 if libbe.TESTING == True:
497 base.make_vcs_testcase_subclasses(PygitGit, sys.modules[__name__])
498 base.make_vcs_testcase_subclasses(ExecGit, sys.modules[__name__])
500 unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
501 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])