1 # Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc.
2 # Alexander Belchenko <bialix@ukr.net>
3 # Ben Finney <benf@cybersource.com.au>
4 # Chris Ball <cjb@laptop.org>
5 # Gianluca Montecchi <gian@grys.it>
6 # W. Trevor King <wking@drexel.edu>
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License along
19 # with this program; if not, write to the Free Software Foundation, Inc.,
20 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 Define the base VCS (Version Control System) class, which should be
24 subclassed by other Version Control System backends. The base class
25 implements a "do not version" VCS.
39 import libbe.storage.base
40 import libbe.util.encoding
41 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
42 from libbe.util.utility import Dir, search_parent_directories
43 from libbe.util.subproc import CommandError, invoke
44 from libbe.util.plugin import import_by_name
45 import libbe.storage.util.upgrade as upgrade
47 if libbe.TESTING == True:
51 import libbe.ui.util.user
53 # List VCS modules in order of preference.
54 # Don't list this module, it is implicitly last.
55 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']
57 def set_preferred_vcs(name):
59 assert name in VCS_ORDER, \
60 'unrecognized VCS %s not in\n %s' % (name, VCS_ORDER)
61 VCS_ORDER.remove(name)
62 VCS_ORDER.insert(0, name)
64 def _get_matching_vcs(matchfn):
65 """Return the first module for which matchfn(VCS_instance) is true"""
66 for submodname in VCS_ORDER:
67 module = import_by_name('libbe.storage.vcs.%s' % submodname)
69 if matchfn(vcs) == True:
73 def vcs_by_name(vcs_name):
74 """Return the module for the VCS with the given name"""
75 if vcs_name == VCS.name:
77 return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
80 """Return an VCS instance for the vcs being used in this directory"""
81 return _get_matching_vcs(lambda vcs: vcs._detect(dir))
84 """Return an instance of an installed VCS"""
85 return _get_matching_vcs(lambda vcs: vcs.installed())
88 class VCSNotRooted (libbe.storage.base.ConnectionError):
89 def __init__(self, vcs):
90 msg = 'VCS not rooted'
91 libbe.storage.base.ConnectionError.__init__(self, msg)
94 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
95 def __init__(self, vcs):
96 msg = 'VCS unable to root'
97 libbe.storage.base.ConnectionError.__init__(self, msg)
100 class InvalidPath (InvalidID):
101 def __init__(self, path, root, msg=None, **kwargs):
103 msg = 'Path "%s" not in root "%s"' % (path, root)
104 InvalidID.__init__(self, msg=msg, **kwargs)
108 class SpacerCollision (InvalidPath):
109 def __init__(self, path, spacer):
110 msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
111 InvalidPath.__init__(self, path, root=None, msg=msg)
114 class NoSuchFile (InvalidID):
115 def __init__(self, pathname, root='.'):
116 path = os.path.abspath(os.path.join(root, pathname))
117 InvalidID.__init__(self, 'No such file: %s' % path)
120 class CachedPathID (object):
122 Storage ID <-> path policy.
123 .../.be/BUGDIR/bugs/BUG/comments/COMMENT
127 >>> os.mkdir(os.path.join(dir.path, '.be'))
128 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
129 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
130 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
131 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
132 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
133 >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
134 >>> file(os.path.join(dir.path, '.be', 'abc', 'values'),
136 >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
138 >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
140 >>> c = CachedPathID()
142 >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
145 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
148 >>> c.path('123/values') # doctest: +ELLIPSIS
149 u'.../.be/abc/bugs/123/values'
152 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
154 >>> c.connect() # demonstrate auto init
155 >>> sorted(os.listdir(os.path.join(c._root, '.be')))
157 >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
159 >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
161 >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
162 u'.../.be/abc/bugs/123/comments/qrs'
165 >>> c.path('qrs') # doctest: +ELLIPSIS
166 u'.../.be/abc/bugs/123/comments/qrs'
167 >>> c.remove_id('qrs')
169 Traceback (most recent call last):
176 def __init__(self, encoding=None):
177 self.encoding = libbe.util.encoding.get_filesystem_encoding()
178 self._spacer_dirs = ['.be', 'bugs', 'comments']
180 def root(self, path):
181 self._root = os.path.abspath(path).rstrip(os.path.sep)
182 self._cache_path = os.path.join(
183 self._root, self._spacer_dirs[0], 'id-cache')
187 Create cache file for an existing .be directory.
188 File if multiple lines of the form:
192 spaced_root = os.path.join(self._root, self._spacer_dirs[0])
193 for dirpath, dirnames, filenames in os.walk(spaced_root):
194 if dirpath == spaced_root:
197 id = self.id(dirpath)
198 relpath = dirpath[len(self._root)+1:]
199 if id.count('/') == 0:
200 if id in self._cache:
201 print >> sys.stderr, 'Multiple paths for %s: \n %s\n %s' % (id, self._cache[id], relpath)
202 self._cache[id] = relpath
209 if os.path.exists(self._cache_path):
210 os.remove(self._cache_path)
213 if not os.path.exists(self._cache_path):
217 raise libbe.storage.base.ConnectionError
218 self._cache = {} # key: uuid, value: path
219 self._changed = False
220 f = codecs.open(self._cache_path, 'r', self.encoding)
222 fields = line.rstrip('\n').split('\t')
223 self._cache[fields[0]] = fields[1]
226 def disconnect(self):
227 if self._changed == True:
228 f = codecs.open(self._cache_path, 'w', self.encoding)
229 for uuid,path in self._cache.items():
230 f.write('%s\t%s\n' % (uuid, path))
234 def path(self, id, relpath=False):
235 fields = id.split('/', 1)
241 if uuid not in self._cache:
242 raise InvalidID(uuid)
244 return os.path.join(self._cache[uuid], *extra)
245 return os.path.join(self._root, self._cache[uuid], *extra)
247 def add_id(self, id, parent=None):
248 if id.count('/') > 0:
249 # not a UUID-level path
250 assert id.startswith(parent), \
251 'Strange ID: "%s" should start with "%s"' % (id, parent)
253 elif id in self._cache:
259 spacer = self._spacer_dirs[0]
261 assert parent.count('/') == 0, \
262 'Strange parent ID: "%s" should be UUID' % parent
263 parent_path = self.path(parent, relpath=True)
264 parent_spacer = parent_path.split(os.path.sep)[-2]
265 i = self._spacer_dirs.index(parent_spacer)
266 spacer = self._spacer_dirs[i+1]
267 path = os.path.join(parent_path, spacer, id)
268 self._cache[id] = path
270 path = os.path.join(self._root, path)
273 def remove_id(self, id):
274 if id.count('/') > 0:
275 return # not a UUID-level path
280 path = os.path.join(self._root, path)
281 if not path.startswith(self._root + os.path.sep):
282 raise InvalidPath(path, self._root)
283 path = path[len(self._root)+1:]
285 if not path.startswith(self._spacer_dirs[0] + os.path.sep):
286 raise InvalidPath(path, self._spacer_dirs[0])
287 for spacer in self._spacer_dirs:
288 if not path.startswith(spacer + os.path.sep):
290 id = path[len(spacer)+1:]
291 fields = path[len(spacer)+1:].split(os.path.sep,1)
295 for spacer in self._spacer_dirs:
296 if id.endswith(os.path.sep + spacer):
297 raise SpacerCollision(orig_path, spacer)
298 if os.path.sep != '/':
299 id = id.replace(os.path.sep, '/')
306 class VCS (libbe.storage.base.VersionedStorage):
308 This class implements a 'no-vcs' interface.
310 Support for other VCSs can be added by subclassing this class, and
311 overriding methods _vcs_*() with code appropriate for your VCS.
313 The methods _u_*() are utility methods available to the _vcs_*()
316 Sink to existing root
317 ======================
319 Consider the following usage case:
320 You have a bug directory rooted in
322 by which I mean the '.be' directory is at
324 However, you're of in some subdirectory like
325 /path/to/source/GUI/testing
326 and you want to comment on a bug. Setting sink_to_root=True when
327 you initialize your BugDir will cause it to search for the '.be'
328 file in the ancestors of the path you passed in as 'root'.
329 /path/to/source/GUI/testing/.be miss
330 /path/to/source/GUI/.be miss
331 /path/to/source/.be hit!
332 So it still roots itself appropriately without much work for you.
337 BugDirs live completely in memory when .sync_with_disk is False.
338 This is the default configuration setup by BugDir(from_disk=False).
339 If .sync_with_disk == True (e.g. BugDir(from_disk=True)), then
340 any changes to the BugDir will be immediately written to disk.
342 If you want to change .sync_with_disk, we suggest you use
343 .set_sync_with_disk(), which propogates the new setting through to
344 all bugs/comments/etc. that have been loaded into memory. If
345 you've been living in memory and want to move to
346 .sync_with_disk==True, but you're not sure if anything has been
347 changed in memory, a call to .save() immediately before the
348 .set_sync_with_disk(True) call is a safe move.
350 Regardless of .sync_with_disk, a call to .save() will write out
351 all the contents that the BugDir instance has loaded into memory.
352 If sync_with_disk has been True over the course of all interesting
353 changes, this .save() call will be a waste of time.
355 The BugDir will only load information from the file system when it
356 loads new settings/bugs/comments that it doesn't already have in
357 memory and .sync_with_disk == True.
359 Allow storage initialization
360 ========================
362 This one is for testing purposes. Setting it to True allows the
363 BugDir to search for an installed Storage backend and initialize
364 it in the root directory. This is a convenience option for
365 supporting tests of versioning functionality
366 (e.g. .duplicate_bugdir).
368 Disable encoding manipulation
369 =============================
371 This one is for testing purposed. You might have non-ASCII
372 Unicode in your bugs, comments, files, etc. BugDir instances try
373 and support your preferred encoding scheme (e.g. "utf-8") when
374 dealing with stream and file input/output. For stream output,
375 this involves replacing sys.stdout and sys.stderr
376 (libbe.encode.set_IO_stream_encodings). However this messes up
377 doctest's output catching. In order to support doctest tests
378 using BugDirs, set manipulate_encodings=False, and stick to ASCII
383 if sink_to_existing_root == True:
384 self.root = self._find_root(root)
386 if not os.path.exists(root):
388 raise NoRootEntry(root)
390 # get a temporary storage until we've loaded settings
391 self.sync_with_disk = False
392 self.storage = self._guess_storage()
394 if assert_new_BugDir == True:
395 if os.path.exists(self.get_path()):
396 raise AlreadyInitialized, self.get_path()
398 storage = self._guess_storage(allow_storage_init)
399 self.storage = storage
400 self._setup_user_id(self.user_id)
403 # methods for getting the BugDir situated in the filesystem
405 def _find_root(self, path):
407 Search for an existing bug database dir and it's ancestors and
408 return a BugDir rooted there. Only called by __init__, and
409 then only if sink_to_existing_root == True.
411 if not os.path.exists(path):
413 raise NoRootEntry(path)
414 versionfile=utility.search_parent_directories(path,
415 os.path.join(".be", "version"))
416 if versionfile != None:
417 beroot = os.path.dirname(versionfile)
418 root = os.path.dirname(beroot)
421 beroot = utility.search_parent_directories(path, ".be")
427 def _guess_storage(self, allow_storage_init=False):
429 Only called by __init__.
431 deepdir = self.get_path()
432 if not os.path.exists(deepdir):
433 deepdir = os.path.dirname(deepdir)
434 new_storage = storage.detect_storage(deepdir)
436 if new_storage.name == "None":
437 if allow_storage_init == True:
438 new_storage = storage.installed_storage()
439 new_storage.init(self.root)
442 os.listdir(self.get_path("bugs")):
445 client = 'false' # command-line tool for _u_invoke_client
447 def __init__(self, *args, **kwargs):
448 if 'encoding' not in kwargs:
449 kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding()
450 libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
451 self.versioned = False
452 self.interspersed_vcs_files = False
453 self.verbose_invoke = False
454 self._cached_path_id = CachedPathID()
457 def _vcs_version(self):
459 Return the VCS version string.
463 def _vcs_get_user_id(self):
465 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
466 If the VCS has not been configured with a username, return None.
470 def _vcs_detect(self, path=None):
472 Detect whether a directory is revision controlled with this VCS.
476 def _vcs_root(self, path):
478 Get the VCS root. This is the default working directory for
479 future invocations. You would normally set this to the root
480 directory for your VCS.
482 if os.path.isdir(path) == False:
483 path = os.path.dirname(path)
485 path = os.path.abspath('.')
488 def _vcs_init(self, path):
490 Begin versioning the tree based at path.
494 def _vcs_destroy(self):
496 Remove any files used in versioning (e.g. whatever _vcs_init()
501 def _vcs_add(self, path):
503 Add the already created file at path to version control.
507 def _vcs_remove(self, path):
509 Remove the file at path from version control. Optionally
510 remove the file from the filesystem as well.
514 def _vcs_update(self, path):
516 Notify the versioning system of changes to the versioned file
521 def _vcs_is_versioned(self, path):
523 Return true if a path is under version control, False
524 otherwise. You only need to set this if the VCS goes about
525 dumping VCS-specific files into the .be directory.
527 If you do need to implement this method (e.g. Arch), set
528 self.interspersed_vcs_files = True
530 assert self.interspersed_vcs_files == False
531 raise NotImplementedError
533 def _vcs_get_file_contents(self, path, revision=None):
535 Get the file contents as they were in a given revision.
536 Revision==None specifies the current revision.
539 raise libbe.storage.base.InvalidRevision(
540 'The %s VCS does not support revision specifiers' % self.name)
541 path = os.path.join(self.repo, path)
542 if not os.path.exists(path):
543 return libbe.util.InvalidObject
544 if os.path.isdir(path):
545 return libbe.storage.base.InvalidDirectory
551 def _vcs_path(self, id, revision):
553 Return the path to object id as of revision.
555 Revision will not be None.
557 raise NotImplementedError
559 def _vcs_isdir(self, path, revision):
561 Return True if path (as returned by _vcs_path) was a directory
562 as of revision, False otherwise.
564 Revision will not be None.
566 raise NotImplementedError
568 def _vcs_listdir(self, path, revision):
570 Return a list of the contents of the directory path (as
571 returned by _vcs_path) as of revision.
573 Revision will not be None, and ._vcs_isdir(path, revision)
576 raise NotImplementedError
578 def _vcs_commit(self, commitfile, allow_empty=False):
580 Commit the current working directory, using the contents of
581 commitfile as the comment. Return the name of the old
582 revision (or None if commits are not supported).
584 If allow_empty == False, raise EmptyCommit if there are no
589 def _vcs_revision_id(self, index):
591 Return the name of the <index>th revision. Index will be an
592 integer (possibly <= 0). The choice of which branch to follow
593 when crossing branches/merges is not defined.
595 Return None if revision IDs are not supported, or if the
596 specified revision does not exist.
601 # Cache version string for efficiency.
602 if not hasattr(self, '_version'):
603 self._version = self._get_version()
606 def _get_version(self):
608 ret = self._vcs_version()
611 if e.errno == errno.ENOENT:
619 if self.version() != None:
623 def get_user_id(self):
625 Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
626 If the VCS has not been configured with a username, return None.
627 You can override the automatic lookup procedure by setting the
628 VCS.user_id attribute to a string of your choice.
630 if not hasattr(self, 'user_id'):
631 self.user_id = self._vcs_get_user_id()
634 def _detect(self, path='.'):
636 Detect whether a directory is revision controlled with this VCS.
638 return self._vcs_detect(path)
642 Set the root directory to the path's VCS root. This is the
643 default working directory for future invocations.
645 if self._detect(self.repo) == False:
646 raise VCSUnableToRoot(self)
647 root = self._vcs_root(self.repo)
648 self.repo = os.path.abspath(root)
649 if os.path.isdir(self.repo) == False:
650 self.repo = os.path.dirname(self.repo)
651 self.be_dir = os.path.join(
652 self.repo, self._cached_path_id._spacer_dirs[0])
653 self._cached_path_id.root(self.repo)
658 Begin versioning the tree based at self.repo.
659 Also roots the vcs at path.
661 if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
662 raise VCSUnableToRoot(self)
663 if self._vcs_detect(self.repo) == False:
664 self._vcs_init(self.repo)
665 if self._rooted == False:
667 os.mkdir(self.be_dir)
668 self._vcs_add(self._u_rel_path(self.be_dir))
669 self._setup_storage_version()
670 self._cached_path_id.init()
674 self._cached_path_id.destroy()
675 if os.path.exists(self.be_dir):
676 shutil.rmtree(self.be_dir)
679 if self._rooted == False:
681 if not os.path.isdir(self.be_dir):
682 raise libbe.storage.base.ConnectionError(self)
683 self._cached_path_id.connect()
684 self.check_storage_version()
686 def disconnect(self):
687 self._cached_path_id.disconnect()
689 def _add_path(self, path, directory=False):
690 relpath = self._u_rel_path(path)
691 reldirs = relpath.split(os.path.sep)
692 if directory == False:
693 reldirs = reldirs[:-1]
695 for reldir in reldirs:
696 dir = os.path.join(dir, reldir)
697 if not os.path.exists(dir):
699 self._vcs_add(self._u_rel_path(dir))
700 elif not os.path.isdir(dir):
701 raise libbe.storage.base.InvalidDirectory
702 if directory == False:
703 if not os.path.exists(path):
704 open(path, 'w').close()
705 self._vcs_add(self._u_rel_path(path))
707 def _add(self, id, parent=None, **kwargs):
708 path = self._cached_path_id.add_id(id, parent)
709 self._add_path(path, **kwargs)
711 def _remove(self, id):
712 path = self._cached_path_id.path(id)
713 if os.path.exists(path):
714 if os.path.isdir(path) and len(self.children(id)) > 0:
715 raise libbe.storage.base.DirectoryNotEmpty(id)
716 self._vcs_remove(self._u_rel_path(path))
717 if os.path.exists(path):
718 if os.path.isdir(path):
722 self._cached_path_id.remove_id(id)
724 def _recursive_remove(self, id):
725 path = self._cached_path_id.path(id)
726 for dirpath,dirnames,filenames in os.walk(path, topdown=False):
727 filenames.extend(dirnames)
729 fullpath = os.path.join(dirpath, f)
730 if os.path.exists(fullpath) == False:
732 self._vcs_remove(self._u_rel_path(fullpath))
733 if os.path.exists(path):
735 path = self._cached_path_id.path(id, relpath=True)
736 for id,p in self._cached_path_id._cache.items():
737 if p.startswith(path):
738 self._cached_path_id.remove_id(id)
740 def _children(self, id=None, revision=None):
742 id_to_path = self._cached_path_id.path
743 isdir = os.path.isdir
746 id_to_path = lambda id : self._vcs_path(id, revision)
747 isdir = lambda path : self._vcs_isdir(path, revision)
748 listdir = lambda path : self._vcs_listdir(path, revision)
752 path = id_to_path(id)
753 if isdir(path) == False:
755 children = listdir(path)
756 for i,c in enumerate(children):
757 if c in self._cached_path_id._spacer_dirs:
759 children.extend([os.path.join(c, c2) for c2 in
760 listdir(os.path.join(path, c))])
761 elif c in ['id-cache', 'version']:
763 for i,c in enumerate(children):
764 if c == None: continue
765 cpath = os.path.join(path, c)
766 if self.interspersed_vcs_files == True \
767 and revision != None \
768 and self._vcs_is_versioned(cpath) == False:
771 children[i] = self._u_path_to_id(cpath)
773 return [c for c in children if c != None]
775 def _get(self, id, default=libbe.util.InvalidObject, revision=None):
777 path = self._cached_path_id.path(id)
779 if default == libbe.util.InvalidObject:
782 relpath = self._u_rel_path(path)
784 contents = self._vcs_get_file_contents(relpath, revision)
786 if InvalidID == None:
789 if contents in [libbe.storage.base.InvalidDirectory,
790 libbe.util.InvalidObject]:
792 elif len(contents) == 0:
796 def _set(self, id, value):
798 path = self._cached_path_id.path(id)
801 if not os.path.exists(path):
803 if os.path.isdir(path):
804 raise libbe.storage.base.InvalidDirectory(id)
808 self._vcs_update(self._u_rel_path(path))
810 def _commit(self, summary, body=None, allow_empty=False):
811 summary = summary.strip()+'\n'
813 summary += '\n' + body.strip() + '\n'
814 descriptor, filename = tempfile.mkstemp()
817 temp_file = os.fdopen(descriptor, 'wb')
818 temp_file.write(summary)
820 revision = self._vcs_commit(filename, allow_empty=allow_empty)
826 def revision_id(self, index=None):
830 if int(index) != index:
831 raise InvalidRevision(index)
833 raise InvalidRevision(index)
834 revid = self._vcs_revision_id(index)
836 raise libbe.storage.base.InvalidRevision(index)
839 def _u_any_in_string(self, list, string):
841 Return True if any of the strings in list are in string.
842 Otherwise return False.
844 for list_string in list:
845 if list_string in string:
849 def _u_invoke(self, *args, **kwargs):
850 if 'cwd' not in kwargs:
851 kwargs['cwd'] = self.repo
852 if 'verbose' not in kwargs:
853 kwargs['verbose'] = self.verbose_invoke
854 if 'encoding' not in kwargs:
855 kwargs['encoding'] = self.encoding
856 return invoke(*args, **kwargs)
858 def _u_invoke_client(self, *args, **kwargs):
859 cl_args = [self.client]
861 return self._u_invoke(cl_args, **kwargs)
863 def _u_search_parent_directories(self, path, filename):
865 Find the file (or directory) named filename in path or in any
869 search_parent_directories("/a/b/c", ".be")
870 will return the path to the first existing file from
875 or None if none of those files exist.
877 return search_parent_directories(path, filename)
879 def _u_find_id(self, id, revision):
881 Search for the relative path to id as of revision.
882 Returns None if the id is not found.
884 assert self._rooted == True
885 be_dir = self._cached_path_id._spacer_dirs[0]
886 stack = [(be_dir, be_dir)]
887 while len(stack) > 0:
888 path,long_id = stack.pop()
889 if long_id.endswith('/'+id):
891 if self._vcs_isdir(path, revision) == False:
893 for child in self._vcs_listdir(path, revision):
894 stack.append((os.path.join(path, child),
895 '/'.join([long_id, child])))
896 raise InvalidID(id, revision=revision)
898 def _u_path_to_id(self, path):
899 return self._cached_path_id.id(path)
901 def _u_rel_path(self, path, root=None):
903 Return the relative path to path from root.
905 >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
907 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
909 >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
911 >>> vcs._u_rel_path("./a", ".")
915 if self.repo == None:
916 raise VCSNotRooted(self)
918 path = os.path.abspath(path)
919 absRoot = os.path.abspath(root)
920 absRootSlashedDir = os.path.join(absRoot,"")
921 if path in [absRoot, absRootSlashedDir]:
923 if not path.startswith(absRootSlashedDir):
924 raise InvalidPath(path, absRootSlashedDir)
925 relpath = path[len(absRootSlashedDir):]
928 def _u_abspath(self, path, root=None):
930 Return the absolute path from a path realtive to root.
932 >>> vcs._u_abspath(".be", "/a.b/c")
936 assert self.repo != None, "VCS not rooted"
938 return os.path.abspath(os.path.join(root, path))
940 def _u_parse_commitfile(self, commitfile):
942 Split the commitfile created in self.commit() back into
943 summary and header lines.
945 f = codecs.open(commitfile, 'r', self.encoding)
946 summary = f.readline()
952 return (summary, body)
954 def check_storage_version(self):
955 version = self.storage_version()
956 if version != libbe.storage.STORAGE_VERSION:
957 upgrade.upgrade(self.repo, version)
959 def storage_version(self, revision=None, path=None):
961 Requires disk access.
964 path = os.path.join(self.repo, '.be', 'version')
965 if not os.path.exists(path):
966 raise libbe.storage.InvalidStorageVersion(None)
967 if revision == None: # don't require connection
968 return libbe.util.encoding.get_file_contents(
969 path, decode=True).rstrip('\n')
970 contents = self._vcs_get_file_contents(path, revision=revision)
971 if type(contents) != types.UnicodeType:
972 contents = unicode(contents, self.encoding)
973 return contents.strip()
975 def _setup_storage_version(self):
977 Requires disk access.
979 assert self._rooted == True
980 path = os.path.join(self.be_dir, 'version')
981 if not os.path.exists(path):
982 libbe.util.encoding.set_file_contents(path,
983 libbe.storage.STORAGE_VERSION+'\n')
984 self._vcs_add(self._u_rel_path(path))
987 if libbe.TESTING == True:
988 class VCSTestCase (unittest.TestCase):
990 Test cases for base VCS class (in addition to the Storage test
996 def __init__(self, *args, **kwargs):
997 super(VCSTestCase, self).__init__(*args, **kwargs)
1001 """Set up test fixtures for Storage test case."""
1002 super(VCSTestCase, self).setUp()
1004 self.dirname = self.dir.path
1005 self.s = self.Class(repo=self.dirname)
1006 if self.s.installed() == True:
1011 super(VCSTestCase, self).tearDown()
1012 if self.s.installed() == True:
1017 class VCS_installed_TestCase (VCSTestCase):
1018 def test_installed(self):
1020 See if the VCS is installed.
1022 self.failUnless(self.s.installed() == True,
1023 '%(name)s VCS not found' % vars(self.Class))
1026 class VCS_detection_TestCase (VCSTestCase):
1027 def test_detection(self):
1029 See if the VCS detects its installed repository
1031 if self.s.installed():
1033 self.failUnless(self.s._detect(self.dirname) == True,
1034 'Did not detected %(name)s VCS after initialising'
1038 def test_no_detection(self):
1040 See if the VCS detects its installed repository
1042 if self.s.installed() and self.Class.name != 'None':
1045 self.failUnless(self.s._detect(self.dirname) == False,
1046 'Detected %(name)s VCS before initialising'
1051 def test_vcs_repo_in_specified_root_path(self):
1052 """VCS root directory should be in specified root path."""
1053 rp = os.path.realpath(self.s.repo)
1054 dp = os.path.realpath(self.dirname)
1055 vcs_name = self.Class.name
1057 dp == rp or rp == None,
1058 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1060 class VCS_get_user_id_TestCase(VCSTestCase):
1061 """Test cases for VCS.get_user_id method."""
1063 def test_gets_existing_user_id(self):
1064 """Should get the existing user ID."""
1065 if self.s.installed():
1066 user_id = self.s.get_user_id()
1069 name,email = libbe.ui.util.user.parse_user_id(user_id)
1071 self.failUnless('@' in email, email)
1073 def make_vcs_testcase_subclasses(vcs_class, namespace):
1076 if c.versioned == True:
1077 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1078 vcs_class, namespace)
1080 libbe.storage.base.make_storage_testcase_subclasses(
1081 vcs_class, namespace)
1083 if namespace != sys.modules[__name__]:
1084 # Make VCSTestCase subclasses for vcs_class in the namespace.
1085 vcs_testcase_classes = [
1087 ob for ob in globals().values() if isinstance(ob, type))
1088 if issubclass(c, VCSTestCase)]
1090 for base_class in vcs_testcase_classes:
1091 testcase_class_name = vcs_class.__name__ + base_class.__name__
1092 testcase_class_bases = (base_class,)
1093 testcase_class_dict = dict(base_class.__dict__)
1094 testcase_class_dict['Class'] = vcs_class
1095 testcase_class = type(
1096 testcase_class_name, testcase_class_bases, testcase_class_dict)
1097 setattr(namespace, testcase_class_name, testcase_class)
1099 make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1101 unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1102 suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])