1 # Copyright (C) 2005-2012 Aaron Bentley <abentley@panoramicfeedback.com>
2 # Alexander Belchenko <bialix@ukr.net>
3 # Ben Finney <benf@cybersource.com.au>
4 # Chris Ball <cjb@laptop.org>
5 # Gianluca Montecchi <gian@grys.it>
6 # W. Trevor King <wking@drexel.edu>
8 # This file is part of Bugs Everywhere.
10 # Bugs Everywhere is free software: you can redistribute it and/or modify it
11 # under the terms of the GNU General Public License as published by the Free
12 # Software Foundation, either version 2 of the License, or (at your option) any
15 # Bugs Everywhere is distributed in the hope that it will be useful, but
16 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
17 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
20 # You should have received a copy of the GNU General Public License along with
21 # Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
23 """Define the base :class:`VCS` (Version Control System) class, which
24 should be subclassed by other Version Control System backends. The
25 base class implements a "do not version" VCS.
39 import libbe.storage.base
40 import libbe.util.encoding
41 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
42 from libbe.util.utility import Dir, search_parent_directories
43 from libbe.util.subproc import CommandError, invoke
44 from libbe.util.plugin import import_by_name
45 import libbe.storage.util.upgrade as upgrade
47 if libbe.TESTING == True:
51 import libbe.ui.util.user
53 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg', 'monotone']
54 """List VCS modules in order of preference.
56 Don't list this module, it is implicitly last.
59 def set_preferred_vcs(name):
60 """Manipulate :data:`VCS_ORDER` to place `name` first.
62 This is primarily indended for testing purposes.
65 assert name in VCS_ORDER, \
66 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
67 VCS_ORDER.remove(name)
68 VCS_ORDER.insert(0, name)
70 def _get_matching_vcs(matchfn):
71 """Return the first module for which matchfn(VCS_instance) is True.
73 Searches in :data:`VCS_ORDER`.
75 for submodname in VCS_ORDER:
76 module = import_by_name('libbe.storage.vcs.%s' % submodname)
78 if matchfn(vcs) == True:
82 def vcs_by_name(vcs_name):
83 """Return the module for the VCS with the given name.
85 Searches in :data:`VCS_ORDER`.
87 if vcs_name == VCS.name:
89 return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
92 """Return an VCS instance for the vcs being used in this directory.
94 Searches in :data:`VCS_ORDER`.
96 return _get_matching_vcs(lambda vcs: vcs._detect(dir))
99 """Return an instance of an installed VCS.
101 Searches in :data:`VCS_ORDER`.
103 return _get_matching_vcs(lambda vcs: vcs.installed())
106 class VCSNotRooted (libbe.storage.base.ConnectionError):
107 def __init__(self, vcs):
108 msg = 'VCS not rooted'
109 libbe.storage.base.ConnectionError.__init__(self, msg)
112 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
113 def __init__(self, vcs):
114 msg = 'VCS unable to root'
115 libbe.storage.base.ConnectionError.__init__(self, msg)
118 class InvalidPath (InvalidID):
119 def __init__(self, path, root, msg=None, **kwargs):
121 msg = 'Path "%s" not in root "%s"' % (path, root)
122 InvalidID.__init__(self, msg=msg, **kwargs)
126 class SpacerCollision (InvalidPath):
127 def __init__(self, path, spacer):
128 msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
129 InvalidPath.__init__(self, path, root=None, msg=msg)
132 class NoSuchFile (InvalidID):
133 def __init__(self, pathname, root='.'):
134 path = os.path.abspath(os.path.join(root, pathname))
135 InvalidID.__init__(self, 'No such file: %s' % path)
138 class CachedPathID (object):
139 """Cache Storage ID <-> path policy.
141 Paths generated following::
143 .../.be/BUGDIR/bugs/BUG/comments/COMMENT
146 See :mod:`libbe.util.id` for a discussion of ID formats.
152 >>> os.mkdir(os.path.join(dir.path, '.be'))
153 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
154 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
155 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
156 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
157 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
158 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
159 >>> open(os.path.join(dir.path, '.be', 'abc', 'values'),
161 >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
163 >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
165 >>> c = CachedPathID()
167 >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
170 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
173 >>> c.path('123/values') # doctest: +ELLIPSIS
174 u'.../.be/abc/bugs/123/values'
177 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
179 >>> c.connect() # demonstrate auto init
180 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
182 >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
184 >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
186 >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
187 u'.../.be/abc/bugs/123/comments/qrs'
190 >>> c.path('qrs') # doctest: +ELLIPSIS
191 u'.../.be/abc/bugs/123/comments/qrs'
192 >>> c.remove_id('qrs')
194 Traceback (most recent call last):
196 InvalidID: qrs in revision None
201 def __init__(self, encoding=None):
202 self.encoding = libbe.util.encoding.get_text_file_encoding()
203 self._spacer_dirs = ['.be', 'bugs', 'comments']
205 def root(self, path):
206 self._root = os.path.abspath(path).rstrip(os.path.sep)
207 self._cache_path = os.path.join(
208 self._root, self._spacer_dirs[0], 'id-cache')
210 def init(self, verbose=True, cache=None):
211 """Create cache file for an existing .be directory.
213 The file contains multiple lines of the form::
221 spaced_root = os.path.join(self._root, self._spacer_dirs[0])
222 for dirpath, dirnames, filenames in os.walk(spaced_root,
224 if dirpath == spaced_root:
227 id = self.id(dirpath)
228 relpath = dirpath[len(self._root + os.path.sep):]
229 if id.count('/') == 0:
230 if verbose == True and id in self._cache:
231 print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
232 self._cache[id] = relpath
235 if self._cache != cache:
241 if os.path.exists(self._cache_path):
242 os.remove(self._cache_path)
245 if not os.path.exists(self._cache_path):
249 raise libbe.storage.base.ConnectionError
250 self._cache = {} # key: uuid, value: path
251 self._changed = False
252 f = codecs.open(self._cache_path, 'r', self.encoding)
254 fields = line.rstrip('\n').split('\t')
255 self._cache[fields[0]] = fields[1]
258 def disconnect(self):
259 if self._changed == True:
260 f = codecs.open(self._cache_path, 'w', self.encoding)
261 for uuid,path in self._cache.items():
262 f.write('%s\t%s\n' % (uuid, path))
266 def path(self, id, relpath=False):
267 fields = id.split('/', 1)
273 if uuid not in self._cache:
274 self.init(verbose=False, cache=self._cache)
275 if uuid not in self._cache:
276 raise InvalidID(uuid)
278 return os.path.join(self._cache[uuid], *extra)
279 return os.path.join(self._root, self._cache[uuid], *extra)
281 def add_id(self, id, parent=None):
282 if id.count('/') > 0:
283 # not a UUID-level path
284 assert id.startswith(parent), \
285 'Strange ID: "%s" should start with "%s"' % (id, parent)
287 elif id in self._cache:
293 spacer = self._spacer_dirs[0]
295 assert parent.count('/') == 0, \
296 'Strange parent ID: "%s" should be UUID' % parent
297 parent_path = self.path(parent, relpath=True)
298 parent_spacer = parent_path.split(os.path.sep)[-2]
299 i = self._spacer_dirs.index(parent_spacer)
300 spacer = self._spacer_dirs[i+1]
301 path = os.path.join(parent_path, spacer, id)
302 self._cache[id] = path
304 path = os.path.join(self._root, path)
307 def remove_id(self, id):
308 if id.count('/') > 0:
309 return # not a UUID-level path
314 path = os.path.join(self._root, path)
315 if not path.startswith(self._root + os.path.sep):
316 raise InvalidPath(path, self._root)
317 path = path[len(self._root + os.path.sep):]
319 if not path.startswith(self._spacer_dirs[0] + os.path.sep):
320 raise InvalidPath(path, self._spacer_dirs[0])
321 for spacer in self._spacer_dirs:
322 if not path.startswith(spacer + os.path.sep):
324 id = path[len(spacer + os.path.sep):]
325 fields = path[len(spacer + os.path.sep):].split(os.path.sep,1)
329 for spacer in self._spacer_dirs:
330 if id.endswith(os.path.sep + spacer):
331 raise SpacerCollision(orig_path, spacer)
332 if os.path.sep != '/':
333 id = id.replace(os.path.sep, '/')
340 class VCS (libbe.storage.base.VersionedStorage):
341 """Implement a 'no-VCS' interface.
343 Support for other VCSs can be added by subclassing this class, and
344 overriding methods `_vcs_*()` with code appropriate for your VCS.
346 The methods `_u_*()` are utility methods available to the `_vcs_*()`
350 client = 'false' # command-line tool for _u_invoke_client
352 def __init__(self, *args, **kwargs):
353 if 'encoding' not in kwargs:
354 kwargs['encoding'] = libbe.util.encoding.get_text_file_encoding()
355 libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
356 self.versioned = False
357 self.interspersed_vcs_files = False
358 self.verbose_invoke = False
359 self._cached_path_id = CachedPathID()
362 def _vcs_version(self):
364 Return the VCS version string.
368 def _vcs_get_user_id(self):
370 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
371 If the VCS has not been configured with a username, return None.
375 def _vcs_detect(self, path=None):
377 Detect whether a directory is revision controlled with this VCS.
381 def _vcs_root(self, path):
383 Get the VCS root. This is the default working directory for
384 future invocations. You would normally set this to the root
385 directory for your VCS.
387 if os.path.isdir(path) == False:
388 path = os.path.dirname(path)
390 path = os.path.abspath('.')
393 def _vcs_init(self, path):
395 Begin versioning the tree based at path.
399 def _vcs_destroy(self):
401 Remove any files used in versioning (e.g. whatever _vcs_init()
406 def _vcs_add(self, path):
408 Add the already created file at path to version control.
412 def _vcs_exists(self, path, revision=None):
414 Does the path exist in a given revision? (True/False)
416 raise NotImplementedError('Lazy BE developers')
418 def _vcs_remove(self, path):
420 Remove the file at path from version control. Optionally
421 remove the file from the filesystem as well.
425 def _vcs_update(self, path):
427 Notify the versioning system of changes to the versioned file
432 def _vcs_is_versioned(self, path):
434 Return true if a path is under version control, False
435 otherwise. You only need to set this if the VCS goes about
436 dumping VCS-specific files into the .be directory.
438 If you do need to implement this method (e.g. Arch), set
439 self.interspersed_vcs_files = True
441 assert self.interspersed_vcs_files == False
442 raise NotImplementedError
444 def _vcs_get_file_contents(self, path, revision=None):
446 Get the file contents as they were in a given revision.
447 Revision==None specifies the current revision.
450 raise libbe.storage.base.InvalidRevision(
451 'The %s VCS does not support revision specifiers' % self.name)
452 path = os.path.join(self.repo, path)
453 if not os.path.exists(path):
454 return libbe.util.InvalidObject
455 if os.path.isdir(path):
456 return libbe.storage.base.InvalidDirectory
462 def _vcs_path(self, id, revision):
464 Return the relative path to object id as of revision.
466 Revision will not be None.
468 raise NotImplementedError
470 def _vcs_isdir(self, path, revision):
472 Return True if path (as returned by _vcs_path) was a directory
473 as of revision, False otherwise.
475 Revision will not be None.
477 raise NotImplementedError
479 def _vcs_listdir(self, path, revision):
481 Return a list of the contents of the directory path (as
482 returned by _vcs_path) as of revision.
484 Revision will not be None, and ._vcs_isdir(path, revision)
487 raise NotImplementedError
489 def _vcs_commit(self, commitfile, allow_empty=False):
491 Commit the current working directory, using the contents of
492 commitfile as the comment. Return the name of the old
493 revision (or None if commits are not supported).
495 If allow_empty == False, raise EmptyCommit if there are no
500 def _vcs_revision_id(self, index):
502 Return the name of the <index>th revision. Index will be an
503 integer (possibly <= 0). The choice of which branch to follow
504 when crossing branches/merges is not defined.
506 Return None if revision IDs are not supported, or if the
507 specified revision does not exist.
511 def _vcs_changed(self, revision):
513 Return a tuple of lists of ids
514 (new, modified, removed)
515 from the specified revision to the current situation.
520 # Cache version string for efficiency.
521 if not hasattr(self, '_version'):
522 self._version = self._vcs_version()
525 def version_cmp(self, *args):
526 """Compare the installed VCS version `V_i` with another version
527 `V_o` (given in `*args`). Returns
538 >>> v = VCS(repo='.')
539 >>> v._version = '2.3.1 (release)'
540 >>> v.version_cmp(2,3,1)
542 >>> v.version_cmp(2,3,2)
544 >>> v.version_cmp(2,3,'a',5)
546 >>> v.version_cmp(2,3,0)
548 >>> v.version_cmp(2,3,1,'a',5)
550 >>> v.version_cmp(2,3,1,1)
554 >>> v._version = '2.0.0pre2'
555 >>> v._parsed_version = None
558 >>> v.version_cmp(2,0,1)
560 >>> v.version_cmp(2,0,0,'pre',1)
562 >>> v.version_cmp(2,0,0,'pre',2)
564 >>> v.version_cmp(2,0,0,'pre',3)
566 >>> v.version_cmp(2,0,0,'a',3)
568 >>> v.version_cmp(2,0,0,'rc',1)
571 if not hasattr(self, '_parsed_version') \
572 or self._parsed_version == None:
573 num_part = self.version().split(' ')[0]
574 self._parsed_version = []
575 for num in num_part.split('.'):
577 self._parsed_version.append(int(num))
578 except ValueError, e:
579 # bzr version number might contain non-numerical tags
580 splitter = re.compile(r'[\D]') # Match non-digits
581 splits = splitter.split(num)
582 # if len(tag) > 1 some splits will be empty; remove
583 splits = filter(lambda s: s != '', splits)
584 tag_starti = len(splits[0])
585 num_starti = num.find(splits[1], tag_starti)
586 tag = num[tag_starti:num_starti]
587 self._parsed_version.append(int(splits[0]))
588 self._parsed_version.append(tag)
589 self._parsed_version.append(int(splits[1]))
590 for current,other in zip(self._parsed_version, args):
591 if type(current) != type (other):
592 # one of them is a pre-release string
593 if type(current) != types.IntType:
597 c = cmp(current,other)
600 # see if one is longer than the other
601 verlen = len(self._parsed_version)
605 elif verlen > arglen:
606 if type(self._parsed_version[arglen]) != types.IntType:
607 return -1 # self is a prerelease
611 if type(args[verlen]) != types.IntType:
612 return 1 # args is a prerelease
617 if self.version() != None:
621 def get_user_id(self):
623 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
624 If the VCS has not been configured with a username, return None.
625 You can override the automatic lookup procedure by setting the
626 VCS.user_id attribute to a string of your choice.
628 if not hasattr(self, 'user_id'):
629 self.user_id = self._vcs_get_user_id()
630 if self.user_id == None:
632 name = libbe.ui.util.user.get_fallback_fullname()
633 email = libbe.ui.util.user.get_fallback_email()
634 self.user_id = libbe.ui.util.user.create_user_id(name, email)
637 def _detect(self, path='.'):
639 Detect whether a directory is revision controlled with this VCS.
641 return self._vcs_detect(path)
644 """Set the root directory to the path's VCS root.
646 This is the default working directory for future invocations.
647 Consider the following usage case:
649 You have a project rooted in::
653 by which I mean the VCS repository is in, for example::
657 However, you're of in some subdirectory like::
659 /path/to/source/ui/testing
661 and you want to comment on a bug. `root` will locate your VCS
662 root (``/path/to/source/``) and set the repo there. This
663 means that it doesn't matter where you are in your project
664 tree when you call "be COMMAND", it always acts as if you called
665 it from the VCS root.
667 if self._detect(self.repo) == False:
668 raise VCSUnableToRoot(self)
669 root = self._vcs_root(self.repo)
670 self.repo = os.path.realpath(root)
671 if os.path.isdir(self.repo) == False:
672 self.repo = os.path.dirname(self.repo)
673 self.be_dir = os.path.join(
674 self.repo, self._cached_path_id._spacer_dirs[0])
675 self._cached_path_id.root(self.repo)
680 Begin versioning the tree based at self.repo.
681 Also roots the vcs at path.
685 root : called if the VCS has already been initialized.
687 if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
688 raise VCSUnableToRoot(self)
689 if self._vcs_detect(self.repo) == False:
690 self._vcs_init(self.repo)
691 if self._rooted == False:
693 os.mkdir(self.be_dir)
694 self._vcs_add(self._u_rel_path(self.be_dir))
695 self._setup_storage_version()
696 self._cached_path_id.init()
700 self._cached_path_id.destroy()
701 if os.path.exists(self.be_dir):
702 shutil.rmtree(self.be_dir)
705 if self._rooted == False:
707 if not os.path.isdir(self.be_dir):
708 raise libbe.storage.base.ConnectionError(self)
709 self._cached_path_id.connect()
710 self.check_storage_version()
712 def _disconnect(self):
713 self._cached_path_id.disconnect()
715 def path(self, id, revision=None, relpath=True):
717 path = self._cached_path_id.path(id)
719 return self._u_rel_path(path)
721 path = self._vcs_path(id, revision)
724 return os.path.join(self.repo, path)
726 def _add_path(self, path, directory=False):
727 relpath = self._u_rel_path(path)
728 reldirs = relpath.split(os.path.sep)
729 if directory == False:
730 reldirs = reldirs[:-1]
732 for reldir in reldirs:
733 dir = os.path.join(dir, reldir)
734 if not os.path.exists(dir):
736 self._vcs_add(self._u_rel_path(dir))
737 elif not os.path.isdir(dir):
738 raise libbe.storage.base.InvalidDirectory
739 if directory == False:
740 if not os.path.exists(path):
741 open(path, 'w').close()
742 self._vcs_add(self._u_rel_path(path))
744 def _add(self, id, parent=None, **kwargs):
745 path = self._cached_path_id.add_id(id, parent)
746 self._add_path(path, **kwargs)
748 def _exists(self, id, revision=None):
751 path = self.path(id, revision, relpath=False)
754 return os.path.exists(path)
755 path = self.path(id, revision, relpath=True)
756 return self._vcs_exists(relpath, revision)
758 def _remove(self, id):
759 path = self._cached_path_id.path(id)
760 if os.path.exists(path):
761 if os.path.isdir(path) and len(self.children(id)) > 0:
762 raise libbe.storage.base.DirectoryNotEmpty(id)
763 self._vcs_remove(self._u_rel_path(path))
764 if os.path.exists(path):
765 if os.path.isdir(path):
769 self._cached_path_id.remove_id(id)
771 def _recursive_remove(self, id):
772 path = self._cached_path_id.path(id)
773 for dirpath,dirnames,filenames in os.walk(path, topdown=False):
774 filenames.extend(dirnames)
776 fullpath = os.path.join(dirpath, f)
777 if os.path.exists(fullpath) == False:
779 self._vcs_remove(self._u_rel_path(fullpath))
780 if os.path.exists(path):
782 path = self._cached_path_id.path(id, relpath=True)
783 for id,p in self._cached_path_id._cache.items():
784 if p.startswith(path):
785 self._cached_path_id.remove_id(id)
787 def _ancestors(self, id=None, revision=None):
791 path = self.path(id, revision, relpath=False)
794 if not path.startswith(self.repo + os.path.sep):
796 path = os.path.dirname(path)
798 id = self._u_path_to_id(path)
800 except (SpacerCollision, InvalidPath):
804 def _children(self, id=None, revision=None):
806 isdir = os.path.isdir
809 isdir = lambda path : self._vcs_isdir(
810 self._u_rel_path(path), revision)
811 listdir = lambda path : self._vcs_listdir(
812 self._u_rel_path(path), revision)
816 path = self.path(id, revision, relpath=False)
817 if isdir(path) == False:
819 children = listdir(path)
820 for i,c in enumerate(children):
821 if c in self._cached_path_id._spacer_dirs:
823 children.extend([os.path.join(c, c2) for c2 in
824 listdir(os.path.join(path, c))])
825 elif c in ['id-cache', 'version']:
827 elif self.interspersed_vcs_files \
828 and self._vcs_is_versioned(c) == False:
830 for i,c in enumerate(children):
831 if c == None: continue
832 cpath = os.path.join(path, c)
833 if self.interspersed_vcs_files == True \
834 and revision != None \
835 and self._vcs_is_versioned(cpath) == False:
838 children[i] = self._u_path_to_id(cpath)
839 return [c for c in children if c != None]
841 def _get(self, id, default=libbe.util.InvalidObject, revision=None):
843 relpath = self.path(id, revision, relpath=True)
844 contents = self._vcs_get_file_contents(relpath, revision)
846 if default == libbe.util.InvalidObject:
849 if contents in [libbe.storage.base.InvalidDirectory,
850 libbe.util.InvalidObject] \
851 or len(contents) == 0:
852 if default == libbe.util.InvalidObject:
853 raise InvalidID(id, revision)
857 def _set(self, id, value):
859 path = self._cached_path_id.path(id)
862 if not os.path.exists(path):
864 if os.path.isdir(path):
865 raise libbe.storage.base.InvalidDirectory(id)
869 self._vcs_update(self._u_rel_path(path))
871 def _commit(self, summary, body=None, allow_empty=False):
872 summary = summary.strip()+'\n'
874 summary += '\n' + body.strip() + '\n'
875 descriptor, filename = tempfile.mkstemp()
878 temp_file = os.fdopen(descriptor, 'wb')
879 temp_file.write(summary)
881 revision = self._vcs_commit(filename, allow_empty=allow_empty)
887 def revision_id(self, index=None):
891 if int(index) != index:
892 raise InvalidRevision(index)
894 raise InvalidRevision(index)
895 revid = self._vcs_revision_id(index)
897 raise libbe.storage.base.InvalidRevision(index)
900 def changed(self, revision):
901 new,mod,rem = self._vcs_changed(revision)
902 def paths_to_ids(paths):
905 id = self._u_path_to_id(p)
907 except (SpacerCollision, InvalidPath):
909 new_id = list(paths_to_ids(new))
910 mod_id = list(paths_to_ids(mod))
911 rem_id = list(paths_to_ids(rem))
912 return (new_id, mod_id, rem_id)
914 def _u_any_in_string(self, list, string):
915 """Return True if any of the strings in list are in string.
916 Otherwise return False.
918 for list_string in list:
919 if list_string in string:
923 def _u_invoke(self, *args, **kwargs):
924 if 'cwd' not in kwargs:
925 kwargs['cwd'] = self.repo
926 if 'verbose' not in kwargs:
927 kwargs['verbose'] = self.verbose_invoke
928 if 'encoding' not in kwargs:
929 kwargs['encoding'] = self.encoding
930 return invoke(*args, **kwargs)
932 def _u_invoke_client(self, *args, **kwargs):
933 cl_args = [self.client]
935 return self._u_invoke(cl_args, **kwargs)
937 def _u_search_parent_directories(self, path, filename):
938 """Find the file (or directory) named filename in path or in any of
942 search_parent_directories("/a/b/c", ".be")
943 will return the path to the first existing file from
948 or None if none of those files exist.
951 ret = search_parent_directories(path, filename)
952 except AssertionError, e:
956 def _u_find_id_from_manifest(self, id, manifest, revision=None):
957 """Search for the relative path to id using manifest, a list of all
960 Returns None if the id is not found.
962 be_dir = self._cached_path_id._spacer_dirs[0]
963 be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
964 files = [f for f in manifest if f.startswith(be_dir_sep)]
966 if not file.startswith(be_dir+os.path.sep):
968 parts = file.split(os.path.sep)
969 dir = parts.pop(0) # don't add the first spacer dir
970 for part in parts[:-1]:
971 dir = os.path.join(dir, part)
976 p_id = self._u_path_to_id(file)
979 except (SpacerCollision, InvalidPath):
981 raise InvalidID(id, revision=revision)
983 def _u_find_id(self, id, revision):
984 """Search for the relative path to id as of revision.
986 Returns None if the id is not found.
988 assert self._rooted == True
989 be_dir = self._cached_path_id._spacer_dirs[0]
990 stack = [(be_dir, be_dir)]
991 while len(stack) > 0:
992 path,long_id = stack.pop()
993 if long_id.endswith('/'+id):
995 if self._vcs_isdir(path, revision) == False:
997 for child in self._vcs_listdir(path, revision):
998 stack.append((os.path.join(path, child),
999 '/'.join([long_id, child])))
1000 raise InvalidID(id, revision=revision)
1002 def _u_path_to_id(self, path):
1003 return self._cached_path_id.id(path)
1005 def _u_rel_path(self, path, root=None):
1006 """Return the relative path to path from root.
1011 >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
1013 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
1015 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
1017 >>> vcs._u_rel_path("./a", ".")
1021 if self.repo == None:
1022 raise VCSNotRooted(self)
1024 path = os.path.abspath(path)
1025 absRoot = os.path.abspath(root)
1026 absRootSlashedDir = os.path.join(absRoot,"")
1027 if path in [absRoot, absRootSlashedDir]:
1029 if not path.startswith(absRootSlashedDir):
1030 raise InvalidPath(path, absRootSlashedDir)
1031 relpath = path[len(absRootSlashedDir):]
1034 def _u_abspath(self, path, root=None):
1035 """Return the absolute path from a path relative to root.
1041 >>> vcs._u_abspath(".be", "/a.b/c")
1045 assert self.repo != None, "VCS not rooted"
1047 return os.path.abspath(os.path.join(root, path))
1049 def _u_parse_commitfile(self, commitfile):
1050 """Split the commitfile created in self.commit() back into summary and
1053 f = codecs.open(commitfile, 'r', self.encoding)
1054 summary = f.readline()
1060 return (summary, body)
1062 def check_storage_version(self):
1063 version = self.storage_version()
1064 if version != libbe.storage.STORAGE_VERSION:
1065 upgrade.upgrade(self.repo, version)
1067 def storage_version(self, revision=None, path=None):
1068 """Return the storage version of the on-disk files.
1072 :mod:`libbe.storage.util.upgrade`
1075 path = os.path.join(self.repo, '.be', 'version')
1076 if not os.path.exists(path):
1077 raise libbe.storage.InvalidStorageVersion(None)
1078 if revision == None: # don't require connection
1079 return libbe.util.encoding.get_file_contents(
1080 path, decode=True).rstrip()
1081 relpath = self._u_rel_path(path)
1082 contents = self._vcs_get_file_contents(relpath, revision=revision)
1083 if type(contents) != types.UnicodeType:
1084 contents = unicode(contents, self.encoding)
1085 return contents.strip()
1087 def _setup_storage_version(self):
1089 Requires disk access.
1091 assert self._rooted == True
1092 path = os.path.join(self.be_dir, 'version')
1093 if not os.path.exists(path):
1094 libbe.util.encoding.set_file_contents(path,
1095 libbe.storage.STORAGE_VERSION+'\n')
1096 self._vcs_add(self._u_rel_path(path))
1099 if libbe.TESTING == True:
1100 class VCSTestCase (unittest.TestCase):
1102 Test cases for base VCS class (in addition to the Storage test
1108 def __init__(self, *args, **kwargs):
1109 super(VCSTestCase, self).__init__(*args, **kwargs)
1113 """Set up test fixtures for Storage test case."""
1114 super(VCSTestCase, self).setUp()
1116 self.dirname = self.dir.path
1117 self.s = self.Class(repo=self.dirname)
1118 if self.s.installed() == True:
1123 super(VCSTestCase, self).tearDown()
1124 if self.s.installed() == True:
1129 class VCS_installed_TestCase (VCSTestCase):
1130 def test_installed(self):
1131 """See if the VCS is installed.
1133 self.failUnless(self.s.installed() == True,
1134 '%(name)s VCS not found' % vars(self.Class))
1137 class VCS_detection_TestCase (VCSTestCase):
1138 def test_detection(self):
1139 """See if the VCS detects its installed repository
1141 if self.s.installed():
1143 self.failUnless(self.s._detect(self.dirname) == True,
1144 'Did not detected %(name)s VCS after initialising'
1148 def test_no_detection(self):
1149 """See if the VCS detects its installed repository
1151 if self.s.installed() and self.Class.name != 'None':
1154 self.failUnless(self.s._detect(self.dirname) == False,
1155 'Detected %(name)s VCS before initialising'
1160 def test_vcs_repo_in_specified_root_path(self):
1161 """VCS root directory should be in specified root path."""
1162 rp = os.path.realpath(self.s.repo)
1163 dp = os.path.realpath(self.dirname)
1164 vcs_name = self.Class.name
1166 dp == rp or rp == None,
1167 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1169 class VCS_get_user_id_TestCase(VCSTestCase):
1170 """Test cases for VCS.get_user_id method."""
1172 def test_get_existing_user_id(self):
1173 """Should get the existing user ID."""
1174 if self.s.installed():
1175 user_id = self.s.get_user_id()
1178 name,email = libbe.ui.util.user.parse_user_id(user_id)
1180 self.failUnless('@' in email, email)
1182 def make_vcs_testcase_subclasses(vcs_class, namespace):
1185 if c.versioned == True:
1186 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1187 vcs_class, namespace)
1189 libbe.storage.base.make_storage_testcase_subclasses(
1190 vcs_class, namespace)
1192 if namespace != sys.modules[__name__]:
1193 # Make VCSTestCase subclasses for vcs_class in the namespace.
1194 vcs_testcase_classes = [
1196 ob for ob in globals().values() if isinstance(ob, type))
1197 if issubclass(c, VCSTestCase) \
1200 for base_class in vcs_testcase_classes:
1201 testcase_class_name = vcs_class.__name__ + base_class.__name__
1202 testcase_class_bases = (base_class,)
1203 testcase_class_dict = dict(base_class.__dict__)
1204 testcase_class_dict['Class'] = vcs_class
1205 testcase_class = type(
1206 testcase_class_name, testcase_class_bases, testcase_class_dict)
1207 setattr(namespace, testcase_class_name, testcase_class)
1209 make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1211 unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1212 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])