Fixed docstrings so only Sphinx errors are "autosummary" and "missing attribute"
[be.git] / libbe / storage / vcs / base.py
1 # Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
2 #                         Alexander Belchenko <bialix@ukr.net>
3 #                         Ben Finney <benf@cybersource.com.au>
4 #                         Chris Ball <cjb@laptop.org>
5 #                         Gianluca Montecchi <gian@grys.it>
6 #                         W. Trevor King <wking@drexel.edu>
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License along
19 # with this program; if not, write to the Free Software Foundation, Inc.,
20 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
22 """Define the base :class:`VCS` (Version Control System) class, which
23 should be subclassed by other Version Control System backends.  The
24 base class implements a "do not version" VCS.
25 """
26
27 import codecs
28 import os
29 import os.path
30 import re
31 import shutil
32 import sys
33 import tempfile
34 import types
35
36 import libbe
37 import libbe.storage
38 import libbe.storage.base
39 import libbe.util.encoding
40 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
41 from libbe.util.utility import Dir, search_parent_directories
42 from libbe.util.subproc import CommandError, invoke
43 from libbe.util.plugin import import_by_name
44 import libbe.storage.util.upgrade as upgrade
45
46 if libbe.TESTING == True:
47     import unittest
48     import doctest
49
50     import libbe.ui.util.user
51
52 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg']
53 """List VCS modules in order of preference.
54
55 Don't list this module, it is implicitly last.
56 """
57
58 def set_preferred_vcs(name):
59     """Manipulate :data:`VCS_ORDER` to place `name` first.
60
61     This is primarily indended for testing purposes.
62     """
63     global VCS_ORDER
64     assert name in VCS_ORDER, \
65         'unrecognized VCS %s not in\n  %s' % (name, VCS_ORDER)
66     VCS_ORDER.remove(name)
67     VCS_ORDER.insert(0, name)
68
69 def _get_matching_vcs(matchfn):
70     """Return the first module for which matchfn(VCS_instance) is True.
71
72     Searches in :data:`VCS_ORDER`.
73     """
74     for submodname in VCS_ORDER:
75         module = import_by_name('libbe.storage.vcs.%s' % submodname)
76         vcs = module.new()
77         if matchfn(vcs) == True:
78             return vcs
79     return VCS()
80
81 def vcs_by_name(vcs_name):
82     """Return the module for the VCS with the given name.
83
84     Searches in :data:`VCS_ORDER`.
85     """
86     if vcs_name == VCS.name:
87         return new()
88     return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
89
90 def detect_vcs(dir):
91     """Return an VCS instance for the vcs being used in this directory.
92
93     Searches in :data:`VCS_ORDER`.
94     """
95     return _get_matching_vcs(lambda vcs: vcs._detect(dir))
96
97 def installed_vcs():
98     """Return an instance of an installed VCS.
99
100     Searches in :data:`VCS_ORDER`.
101     """
102     return _get_matching_vcs(lambda vcs: vcs.installed())
103
104
105 class VCSNotRooted (libbe.storage.base.ConnectionError):
106     def __init__(self, vcs):
107         msg = 'VCS not rooted'
108         libbe.storage.base.ConnectionError.__init__(self, msg)
109         self.vcs = vcs
110
111 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
112     def __init__(self, vcs):
113         msg = 'VCS unable to root'
114         libbe.storage.base.ConnectionError.__init__(self, msg)
115         self.vcs = vcs
116
117 class InvalidPath (InvalidID):
118     def __init__(self, path, root, msg=None, **kwargs):
119         if msg == None:
120             msg = 'Path "%s" not in root "%s"' % (path, root)
121         InvalidID.__init__(self, msg=msg, **kwargs)
122         self.path = path
123         self.root = root
124
125 class SpacerCollision (InvalidPath):
126     def __init__(self, path, spacer):
127         msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
128         InvalidPath.__init__(self, path, root=None, msg=msg)
129         self.spacer = spacer
130
131 class NoSuchFile (InvalidID):
132     def __init__(self, pathname, root='.'):
133         path = os.path.abspath(os.path.join(root, pathname))
134         InvalidID.__init__(self, 'No such file: %s' % path)
135
136
137 class CachedPathID (object):
138     """Cache Storage ID <-> path policy.
139  
140     Paths generated following::
141
142        .../.be/BUGDIR/bugs/BUG/comments/COMMENT
143           ^-- root path
144
145     See :mod:`libbe.util.id` for a discussion of ID formats.
146
147     Examples
148     --------
149
150     >>> dir = Dir()
151     >>> os.mkdir(os.path.join(dir.path, '.be'))
152     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
153     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
154     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
155     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
156     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
157     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
158     >>> file(os.path.join(dir.path, '.be', 'abc', 'values'),
159     ...      'w').close()
160     >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
161     ...      'w').close()
162     >>> file(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
163     ...      'w').close()
164     >>> c = CachedPathID()
165     >>> c.root(dir.path)
166     >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
167     'def/values'
168     >>> c.init()
169     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
170     ['abc', 'id-cache']
171     >>> c.connect()
172     >>> c.path('123/values') # doctest: +ELLIPSIS
173     u'.../.be/abc/bugs/123/values'
174     >>> c.disconnect()
175     >>> c.destroy()
176     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
177     ['abc']
178     >>> c.connect() # demonstrate auto init
179     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
180     ['abc', 'id-cache']
181     >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
182     u'.../.be/xyz'
183     >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
184     u'.../.be/xyz/def'
185     >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
186     u'.../.be/abc/bugs/123/comments/qrs'
187     >>> c.disconnect()
188     >>> c.connect()
189     >>> c.path('qrs') # doctest: +ELLIPSIS
190     u'.../.be/abc/bugs/123/comments/qrs'
191     >>> c.remove_id('qrs')
192     >>> c.path('qrs')
193     Traceback (most recent call last):
194       ...
195     InvalidID: qrs in revision None
196     >>> c.disconnect()
197     >>> c.destroy()
198     >>> dir.cleanup()
199     """
200     def __init__(self, encoding=None):
201         self.encoding = libbe.util.encoding.get_filesystem_encoding()
202         self._spacer_dirs = ['.be', 'bugs', 'comments']
203
204     def root(self, path):
205         self._root = os.path.abspath(path).rstrip(os.path.sep)
206         self._cache_path = os.path.join(
207             self._root, self._spacer_dirs[0], 'id-cache')
208
209     def init(self, verbose=True, cache=None):
210         """Create cache file for an existing .be directory.
211
212         The file contains multiple lines of the form::
213
214             UUID\tPATH
215         """
216         if cache == None:
217             self._cache = {}
218         else:
219             self._cache = cache
220         spaced_root = os.path.join(self._root, self._spacer_dirs[0])
221         for dirpath, dirnames, filenames in os.walk(spaced_root):
222             if dirpath == spaced_root:
223                 continue
224             try:
225                 id = self.id(dirpath)
226                 relpath = dirpath[len(self._root)+1:]
227                 if id.count('/') == 0:
228                     if verbose == True and id in self._cache:
229                         print >> sys.stderr, 'Multiple paths for %s: \n  %s\n  %s' % (id, self._cache[id], relpath)
230                     self._cache[id] = relpath
231             except InvalidPath:
232                 pass
233         if self._cache != cache:
234             self._changed = True
235         if cache == None:
236             self.disconnect()
237
238     def destroy(self):
239         if os.path.exists(self._cache_path):
240             os.remove(self._cache_path)
241
242     def connect(self):
243         if not os.path.exists(self._cache_path):
244             try:
245                 self.init()
246             except IOError:
247                 raise libbe.storage.base.ConnectionError
248         self._cache = {} # key: uuid, value: path
249         self._changed = False
250         f = codecs.open(self._cache_path, 'r', self.encoding)
251         for line in f:
252             fields = line.rstrip('\n').split('\t')
253             self._cache[fields[0]] = fields[1]
254         f.close()
255
256     def disconnect(self):
257         if self._changed == True:
258             f = codecs.open(self._cache_path, 'w', self.encoding)
259             for uuid,path in self._cache.items():
260                 f.write('%s\t%s\n' % (uuid, path))
261             f.close()
262         self._cache = {}
263
264     def path(self, id, relpath=False):
265         fields = id.split('/', 1)
266         uuid = fields[0]
267         if len(fields) == 1:
268             extra = []
269         else:
270             extra = fields[1:]
271         if uuid not in self._cache:
272             self.init(verbose=False, cache=self._cache)
273             if uuid not in self._cache:
274                 raise InvalidID(uuid)
275         if relpath == True:
276             return os.path.join(self._cache[uuid], *extra)
277         return os.path.join(self._root, self._cache[uuid], *extra)
278
279     def add_id(self, id, parent=None):
280         if id.count('/') > 0:
281             # not a UUID-level path
282             assert id.startswith(parent), \
283                 'Strange ID: "%s" should start with "%s"' % (id, parent)
284             path = self.path(id)
285         elif id in self._cache:
286             # already added
287             path = self.path(id)
288         else:
289             if parent == None:
290                 parent_path = ''
291                 spacer = self._spacer_dirs[0]
292             else:
293                 assert parent.count('/') == 0, \
294                     'Strange parent ID: "%s" should be UUID' % parent
295                 parent_path = self.path(parent, relpath=True)
296                 parent_spacer = parent_path.split(os.path.sep)[-2]
297                 i = self._spacer_dirs.index(parent_spacer)
298                 spacer = self._spacer_dirs[i+1]
299             path = os.path.join(parent_path, spacer, id)
300             self._cache[id] = path
301             self._changed = True
302             path = os.path.join(self._root, path)
303         return path
304
305     def remove_id(self, id):
306         if id.count('/') > 0:
307             return # not a UUID-level path
308         self._cache.pop(id)
309         self._changed = True
310
311     def id(self, path):
312         path = os.path.join(self._root, path)
313         if not path.startswith(self._root + os.path.sep):
314             raise InvalidPath(path, self._root)
315         path = path[len(self._root)+1:]
316         orig_path = path
317         if not path.startswith(self._spacer_dirs[0] + os.path.sep):
318             raise InvalidPath(path, self._spacer_dirs[0])
319         for spacer in self._spacer_dirs:
320             if not path.startswith(spacer + os.path.sep):
321                 break
322             id = path[len(spacer)+1:]
323             fields = path[len(spacer)+1:].split(os.path.sep,1)
324             if len(fields) == 1:
325                 break
326             path = fields[1]
327         for spacer in self._spacer_dirs:
328             if id.endswith(os.path.sep + spacer):
329                 raise SpacerCollision(orig_path, spacer)
330         if os.path.sep != '/':
331             id = id.replace(os.path.sep, '/')
332         return id
333
334
335 def new():
336     return VCS()
337
338 class VCS (libbe.storage.base.VersionedStorage):
339     """Implement a 'no-VCS' interface.
340
341     Support for other VCSs can be added by subclassing this class, and
342     overriding methods `_vcs_*()` with code appropriate for your VCS.
343
344     The methods `_u_*()` are utility methods available to the `_vcs_*()`
345     methods.
346     """
347     name = 'None'
348     client = 'false' # command-line tool for _u_invoke_client
349
350     def __init__(self, *args, **kwargs):
351         if 'encoding' not in kwargs:
352             kwargs['encoding'] = libbe.util.encoding.get_filesystem_encoding()
353         libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
354         self.versioned = False
355         self.interspersed_vcs_files = False
356         self.verbose_invoke = False
357         self._cached_path_id = CachedPathID()
358         self._rooted = False
359
360     def _vcs_version(self):
361         """
362         Return the VCS version string.
363         """
364         return '0'
365
366     def _vcs_get_user_id(self):
367         """
368         Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
369         If the VCS has not been configured with a username, return None.
370         """
371         return None
372
373     def _vcs_detect(self, path=None):
374         """
375         Detect whether a directory is revision controlled with this VCS.
376         """
377         return True
378
379     def _vcs_root(self, path):
380         """
381         Get the VCS root.  This is the default working directory for
382         future invocations.  You would normally set this to the root
383         directory for your VCS.
384         """
385         if os.path.isdir(path) == False:
386             path = os.path.dirname(path)
387             if path == '':
388                 path = os.path.abspath('.')
389         return path
390
391     def _vcs_init(self, path):
392         """
393         Begin versioning the tree based at path.
394         """
395         pass
396
397     def _vcs_destroy(self):
398         """
399         Remove any files used in versioning (e.g. whatever _vcs_init()
400         created).
401         """
402         pass
403
404     def _vcs_add(self, path):
405         """
406         Add the already created file at path to version control.
407         """
408         pass
409
410     def _vcs_exists(self, path, revision=None):
411         """
412         Does the path exist in a given revision? (True/False)
413         """
414         raise NotImplementedError('Lazy BE developers')
415
416     def _vcs_remove(self, path):
417         """
418         Remove the file at path from version control.  Optionally
419         remove the file from the filesystem as well.
420         """
421         pass
422
423     def _vcs_update(self, path):
424         """
425         Notify the versioning system of changes to the versioned file
426         at path.
427         """
428         pass
429
430     def _vcs_is_versioned(self, path):
431         """
432         Return true if a path is under version control, False
433         otherwise.  You only need to set this if the VCS goes about
434         dumping VCS-specific files into the .be directory.
435
436         If you do need to implement this method (e.g. Arch), set
437           self.interspersed_vcs_files = True
438         """
439         assert self.interspersed_vcs_files == False
440         raise NotImplementedError
441
442     def _vcs_get_file_contents(self, path, revision=None):
443         """
444         Get the file contents as they were in a given revision.
445         Revision==None specifies the current revision.
446         """
447         if revision != None:
448             raise libbe.storage.base.InvalidRevision(
449                 'The %s VCS does not support revision specifiers' % self.name)
450         path = os.path.join(self.repo, path)
451         if not os.path.exists(path):
452             return libbe.util.InvalidObject
453         if os.path.isdir(path):
454             return libbe.storage.base.InvalidDirectory
455         f = open(path, 'rb')
456         contents = f.read()
457         f.close()
458         return contents
459
460     def _vcs_path(self, id, revision):
461         """
462         Return the relative path to object id as of revision.
463         
464         Revision will not be None.
465         """
466         raise NotImplementedError
467
468     def _vcs_isdir(self, path, revision):
469         """
470         Return True if path (as returned by _vcs_path) was a directory
471         as of revision, False otherwise.
472         
473         Revision will not be None.
474         """
475         raise NotImplementedError
476
477     def _vcs_listdir(self, path, revision):
478         """
479         Return a list of the contents of the directory path (as
480         returned by _vcs_path) as of revision.
481         
482         Revision will not be None, and ._vcs_isdir(path, revision)
483         will be True.
484         """
485         raise NotImplementedError
486
487     def _vcs_commit(self, commitfile, allow_empty=False):
488         """
489         Commit the current working directory, using the contents of
490         commitfile as the comment.  Return the name of the old
491         revision (or None if commits are not supported).
492
493         If allow_empty == False, raise EmptyCommit if there are no
494         changes to commit.
495         """
496         return None
497
498     def _vcs_revision_id(self, index):
499         """
500         Return the name of the <index>th revision.  Index will be an
501         integer (possibly <= 0).  The choice of which branch to follow
502         when crossing branches/merges is not defined.
503
504         Return None if revision IDs are not supported, or if the
505         specified revision does not exist.
506         """
507         return None
508
509     def _vcs_changed(self, revision):
510         """
511         Return a tuple of lists of ids
512           (new, modified, removed)
513         from the specified revision to the current situation.
514         """
515         return ([], [], [])
516
517     def version(self):
518         # Cache version string for efficiency.
519         if not hasattr(self, '_version'):
520             self._version = self._get_version()
521         return self._version
522
523     def _get_version(self):
524         try:
525             ret = self._vcs_version()
526             return ret
527         except OSError, e:
528             if e.errno == errno.ENOENT:
529                 return None
530             else:
531                 raise OSError, e
532         except CommandError:
533             return None
534
535     def installed(self):
536         if self.version() != None:
537             return True
538         return False
539
540     def get_user_id(self):
541         """
542         Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
543         If the VCS has not been configured with a username, return None.
544         You can override the automatic lookup procedure by setting the
545         VCS.user_id attribute to a string of your choice.
546         """
547         if not hasattr(self, 'user_id'):
548             self.user_id = self._vcs_get_user_id()
549         return self.user_id
550
551     def _detect(self, path='.'):
552         """
553         Detect whether a directory is revision controlled with this VCS.
554         """
555         return self._vcs_detect(path)
556
557     def root(self):
558         """Set the root directory to the path's VCS root.
559
560         This is the default working directory for future invocations.
561         Consider the following usage case:
562
563         You have a project rooted in::
564
565           /path/to/source/
566
567         by which I mean the VCS repository is in, for example::
568
569           /path/to/source/.bzr
570
571         However, you're of in some subdirectory like::
572
573           /path/to/source/ui/testing
574
575         and you want to comment on a bug.  `root` will locate your VCS
576         root (``/path/to/source/``) and set the repo there.  This
577         means that it doesn't matter where you are in your project
578         tree when you call "be COMMAND", it always acts as if you called
579         it from the VCS root.
580         """
581         if self._detect(self.repo) == False:
582             raise VCSUnableToRoot(self)
583         root = self._vcs_root(self.repo)
584         self.repo = os.path.abspath(root)
585         if os.path.isdir(self.repo) == False:
586             self.repo = os.path.dirname(self.repo)
587         self.be_dir = os.path.join(
588             self.repo, self._cached_path_id._spacer_dirs[0])
589         self._cached_path_id.root(self.repo)
590         self._rooted = True
591
592     def _init(self):
593         """
594         Begin versioning the tree based at self.repo.
595         Also roots the vcs at path.
596
597         See Also
598         --------
599         root : called if the VCS has already been initialized.
600         """
601         if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
602             raise VCSUnableToRoot(self)
603         if self._vcs_detect(self.repo) == False:
604             self._vcs_init(self.repo)
605         if self._rooted == False:
606             self.root()
607         os.mkdir(self.be_dir)
608         self._vcs_add(self._u_rel_path(self.be_dir))
609         self._setup_storage_version()
610         self._cached_path_id.init()
611
612     def _destroy(self):
613         self._vcs_destroy()
614         self._cached_path_id.destroy()
615         if os.path.exists(self.be_dir):
616             shutil.rmtree(self.be_dir)
617
618     def _connect(self):
619         if self._rooted == False:
620             self.root()
621         if not os.path.isdir(self.be_dir):
622             raise libbe.storage.base.ConnectionError(self)
623         self._cached_path_id.connect()
624         self.check_storage_version()
625
626     def _disconnect(self):
627         self._cached_path_id.disconnect()
628
629     def path(self, id, revision=None, relpath=True):
630         if revision == None:
631             path = self._cached_path_id.path(id)
632             if relpath == True:
633                 return self._u_rel_path(path)
634             return path
635         path = self._vcs_path(id, revision)
636         if relpath == True:
637             return path
638         return os.path.join(self.repo, path)
639
640     def _add_path(self, path, directory=False):
641         relpath = self._u_rel_path(path)
642         reldirs = relpath.split(os.path.sep)
643         if directory == False:
644             reldirs = reldirs[:-1]
645         dir = self.repo
646         for reldir in reldirs:
647             dir = os.path.join(dir, reldir)
648             if not os.path.exists(dir):
649                 os.mkdir(dir)
650                 self._vcs_add(self._u_rel_path(dir))
651             elif not os.path.isdir(dir):
652                 raise libbe.storage.base.InvalidDirectory
653         if directory == False:
654             if not os.path.exists(path):
655                 open(path, 'w').close()
656             self._vcs_add(self._u_rel_path(path))
657
658     def _add(self, id, parent=None, **kwargs):
659         path = self._cached_path_id.add_id(id, parent)
660         self._add_path(path, **kwargs)
661
662     def _exists(self, id, revision=None):
663         if revision == None:
664             try:
665                 path = self.path(id, revision, relpath=False)
666             except InvalidID, e:
667                 return False
668             return os.path.exists(path)
669         path = self.path(id, revision, relpath=True)
670         return self._vcs_exists(relpath, revision)
671
672     def _remove(self, id):
673         path = self._cached_path_id.path(id)
674         if os.path.exists(path):
675             if os.path.isdir(path) and len(self.children(id)) > 0:
676                 raise libbe.storage.base.DirectoryNotEmpty(id)
677             self._vcs_remove(self._u_rel_path(path))
678             if os.path.exists(path):
679                 if os.path.isdir(path):
680                     os.rmdir(path)
681                 else:
682                     os.remove(path)
683         self._cached_path_id.remove_id(id)
684
685     def _recursive_remove(self, id):
686         path = self._cached_path_id.path(id)
687         for dirpath,dirnames,filenames in os.walk(path, topdown=False):
688             filenames.extend(dirnames)
689             for f in filenames:
690                 fullpath = os.path.join(dirpath, f)
691                 if os.path.exists(fullpath) == False:
692                     continue
693                 self._vcs_remove(self._u_rel_path(fullpath))
694         if os.path.exists(path):
695             shutil.rmtree(path)
696         path = self._cached_path_id.path(id, relpath=True)
697         for id,p in self._cached_path_id._cache.items():
698             if p.startswith(path):
699                 self._cached_path_id.remove_id(id)
700
701     def _ancestors(self, id=None, revision=None):
702         if id==None:
703             path = self.be_dir
704         else:
705             path = self.path(id, revision, relpath=False)
706         ancestors = []
707         while True:
708             if not path.startswith(self.repo + os.path.sep):
709                 break
710             path = os.path.dirname(path)
711             try:
712                 id = self._u_path_to_id(path)
713                 ancestors.append(id)
714             except (SpacerCollision, InvalidPath):
715                 pass    
716         return ancestors
717
718     def _children(self, id=None, revision=None):
719         if revision == None:
720             isdir = os.path.isdir
721             listdir = os.listdir
722         else:
723             isdir = lambda path : self._vcs_isdir(
724                 self._u_rel_path(path), revision)
725             listdir = lambda path : self._vcs_listdir(
726                 self._u_rel_path(path), revision)
727         if id==None:
728             path = self.be_dir
729         else:
730             path = self.path(id, revision, relpath=False)
731         if isdir(path) == False: 
732             return []
733         children = listdir(path)
734         for i,c in enumerate(children):
735             if c in self._cached_path_id._spacer_dirs:
736                 children[i] = None
737                 children.extend([os.path.join(c, c2) for c2 in
738                                  listdir(os.path.join(path, c))])
739             elif c in ['id-cache', 'version']:
740                 children[i] = None
741             elif self.interspersed_vcs_files \
742                     and self._vcs_is_versioned(c) == False:
743                 children[i] = None
744         for i,c in enumerate(children):
745             if c == None: continue
746             cpath = os.path.join(path, c)
747             if self.interspersed_vcs_files == True \
748                     and revision != None \
749                     and self._vcs_is_versioned(cpath) == False:
750                 children[i] = None
751             else:
752                 children[i] = self._u_path_to_id(cpath)
753                 children[i]
754         return [c for c in children if c != None]
755
756     def _get(self, id, default=libbe.util.InvalidObject, revision=None):
757         try:
758             relpath = self.path(id, revision, relpath=True)
759             contents = self._vcs_get_file_contents(relpath, revision)
760         except InvalidID, e:
761             if default == libbe.util.InvalidObject:
762                 raise e
763             return default
764         if contents in [libbe.storage.base.InvalidDirectory,
765                         libbe.util.InvalidObject] \
766                 or len(contents) == 0:
767             if default == libbe.util.InvalidObject:
768                 raise InvalidID(id, revision)
769             return default
770         return contents
771
772     def _set(self, id, value):
773         try:
774             path = self._cached_path_id.path(id)
775         except InvalidID, e:
776             raise
777         if not os.path.exists(path):
778             raise InvalidID(id)
779         if os.path.isdir(path):
780             raise libbe.storage.base.InvalidDirectory(id)
781         f = open(path, "wb")
782         f.write(value)
783         f.close()
784         self._vcs_update(self._u_rel_path(path))
785
786     def _commit(self, summary, body=None, allow_empty=False):
787         summary = summary.strip()+'\n'
788         if body is not None:
789             summary += '\n' + body.strip() + '\n'
790         descriptor, filename = tempfile.mkstemp()
791         revision = None
792         try:
793             temp_file = os.fdopen(descriptor, 'wb')
794             temp_file.write(summary)
795             temp_file.flush()
796             revision = self._vcs_commit(filename, allow_empty=allow_empty)
797             temp_file.close()
798         finally:
799             os.remove(filename)
800         return revision
801
802     def revision_id(self, index=None):
803         if index == None:
804             return None
805         try:
806             if int(index) != index:
807                 raise InvalidRevision(index)
808         except ValueError:
809             raise InvalidRevision(index)
810         revid = self._vcs_revision_id(index)
811         if revid == None:
812             raise libbe.storage.base.InvalidRevision(index)
813         return revid
814
815     def changed(self, revision):
816         new,mod,rem = self._vcs_changed(revision)
817         def paths_to_ids(paths):
818             for p in paths:
819                 try:
820                     id = self._u_path_to_id(p)
821                     yield id
822                 except (SpacerCollision, InvalidPath):
823                     pass
824         new_id = list(paths_to_ids(new))
825         mod_id = list(paths_to_ids(mod))
826         rem_id = list(paths_to_ids(rem))
827         return (new_id, mod_id, rem_id)
828
829     def _u_any_in_string(self, list, string):
830         """Return True if any of the strings in list are in string.
831         Otherwise return False.
832         """
833         for list_string in list:
834             if list_string in string:
835                 return True
836         return False
837
838     def _u_invoke(self, *args, **kwargs):
839         if 'cwd' not in kwargs:
840             kwargs['cwd'] = self.repo
841         if 'verbose' not in kwargs:
842             kwargs['verbose'] = self.verbose_invoke
843         if 'encoding' not in kwargs:
844             kwargs['encoding'] = self.encoding
845         return invoke(*args, **kwargs)
846
847     def _u_invoke_client(self, *args, **kwargs):
848         cl_args = [self.client]
849         cl_args.extend(args)
850         return self._u_invoke(cl_args, **kwargs)
851
852     def _u_search_parent_directories(self, path, filename):
853         """Find the file (or directory) named filename in path or in any of
854         path's parents.
855
856         e.g.
857           search_parent_directories("/a/b/c", ".be")
858         will return the path to the first existing file from
859           /a/b/c/.be
860           /a/b/.be
861           /a/.be
862           /.be
863         or None if none of those files exist.
864         """
865         try:
866             ret = search_parent_directories(path, filename)
867         except AssertionError, e:
868             return None
869         return ret
870
871     def _u_find_id_from_manifest(self, id, manifest, revision=None):
872         """Search for the relative path to id using manifest, a list of all
873         files.
874         
875         Returns None if the id is not found.
876         """
877         be_dir = self._cached_path_id._spacer_dirs[0]
878         be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
879         files = [f for f in manifest if f.startswith(be_dir_sep)]
880         for file in files:
881             if not file.startswith(be_dir+os.path.sep):
882                 continue
883             parts = file.split(os.path.sep)
884             dir = parts.pop(0) # don't add the first spacer dir
885             for part in parts[:-1]:
886                 dir = os.path.join(dir, part)
887                 if not dir in files:
888                     files.append(dir)
889         for file in files:
890             try:
891                 p_id = self._u_path_to_id(file)
892                 if p_id == id:
893                     return file
894             except (SpacerCollision, InvalidPath):
895                 pass
896         raise InvalidID(id, revision=revision)
897
898     def _u_find_id(self, id, revision):
899         """Search for the relative path to id as of revision.
900
901         Returns None if the id is not found.
902         """
903         assert self._rooted == True
904         be_dir = self._cached_path_id._spacer_dirs[0]
905         stack = [(be_dir, be_dir)]
906         while len(stack) > 0:
907             path,long_id = stack.pop()
908             if long_id.endswith('/'+id):
909                 return path
910             if self._vcs_isdir(path, revision) == False:
911                 continue
912             for child in self._vcs_listdir(path, revision):
913                 stack.append((os.path.join(path, child),
914                               '/'.join([long_id, child])))
915         raise InvalidID(id, revision=revision)
916
917     def _u_path_to_id(self, path):
918         return self._cached_path_id.id(path)
919
920     def _u_rel_path(self, path, root=None):
921         """Return the relative path to path from root.
922
923         Examples:
924
925         >>> vcs = new()
926         >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
927         '.be'
928         >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
929         '.'
930         >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
931         '.'
932         >>> vcs._u_rel_path("./a", ".")
933         'a'
934         """
935         if root == None:
936             if self.repo == None:
937                 raise VCSNotRooted(self)
938             root = self.repo
939         path = os.path.abspath(path)
940         absRoot = os.path.abspath(root)
941         absRootSlashedDir = os.path.join(absRoot,"")
942         if path in [absRoot, absRootSlashedDir]:
943             return '.'
944         if not path.startswith(absRootSlashedDir):
945             raise InvalidPath(path, absRootSlashedDir)
946         relpath = path[len(absRootSlashedDir):]
947         return relpath
948
949     def _u_abspath(self, path, root=None):
950         """Return the absolute path from a path realtive to root.
951
952         Examples
953         --------
954
955         >>> vcs = new()
956         >>> vcs._u_abspath(".be", "/a.b/c")
957         '/a.b/c/.be'
958         """
959         if root == None:
960             assert self.repo != None, "VCS not rooted"
961             root = self.repo
962         return os.path.abspath(os.path.join(root, path))
963
964     def _u_parse_commitfile(self, commitfile):
965         """Split the commitfile created in self.commit() back into summary and
966         header lines.
967         """
968         f = codecs.open(commitfile, 'r', self.encoding)
969         summary = f.readline()
970         body = f.read()
971         body.lstrip('\n')
972         if len(body) == 0:
973             body = None
974         f.close()
975         return (summary, body)
976
977     def check_storage_version(self):
978         version = self.storage_version()
979         if version != libbe.storage.STORAGE_VERSION:
980             upgrade.upgrade(self.repo, version)
981
982     def storage_version(self, revision=None, path=None):
983         """Return the storage version of the on-disk files.
984
985         See Also
986         --------
987         :mod:`libbe.storage.util.upgrade`
988         """
989         if path == None:
990             path = os.path.join(self.repo, '.be', 'version')
991         if not os.path.exists(path):
992             raise libbe.storage.InvalidStorageVersion(None)
993         if revision == None: # don't require connection
994             return libbe.util.encoding.get_file_contents(
995                 path, decode=True).rstrip('\n')
996         relpath = self._u_rel_path(path)
997         contents = self._vcs_get_file_contents(relpath, revision=revision)
998         if type(contents) != types.UnicodeType:
999             contents = unicode(contents, self.encoding)
1000         return contents.strip()
1001
1002     def _setup_storage_version(self):
1003         """
1004         Requires disk access.
1005         """
1006         assert self._rooted == True
1007         path = os.path.join(self.be_dir, 'version')
1008         if not os.path.exists(path):
1009             libbe.util.encoding.set_file_contents(path,
1010                 libbe.storage.STORAGE_VERSION+'\n')
1011             self._vcs_add(self._u_rel_path(path))
1012
1013 \f
1014 if libbe.TESTING == True:
1015     class VCSTestCase (unittest.TestCase):
1016         """
1017         Test cases for base VCS class (in addition to the Storage test
1018         cases).
1019         """
1020
1021         Class = VCS
1022
1023         def __init__(self, *args, **kwargs):
1024             super(VCSTestCase, self).__init__(*args, **kwargs)
1025             self.dirname = None
1026
1027         def setUp(self):
1028             """Set up test fixtures for Storage test case."""
1029             super(VCSTestCase, self).setUp()
1030             self.dir = Dir()
1031             self.dirname = self.dir.path
1032             self.s = self.Class(repo=self.dirname)
1033             if self.s.installed() == True:
1034                 self.s.init()
1035                 self.s.connect()
1036
1037         def tearDown(self):
1038             super(VCSTestCase, self).tearDown()
1039             if self.s.installed() == True:
1040                 self.s.disconnect()
1041                 self.s.destroy()
1042             self.dir.cleanup()
1043
1044     class VCS_installed_TestCase (VCSTestCase):
1045         def test_installed(self):
1046             """See if the VCS is installed.
1047             """
1048             self.failUnless(self.s.installed() == True,
1049                             '%(name)s VCS not found' % vars(self.Class))
1050
1051
1052     class VCS_detection_TestCase (VCSTestCase):
1053         def test_detection(self):
1054             """See if the VCS detects its installed repository
1055             """
1056             if self.s.installed():
1057                 self.s.disconnect()
1058                 self.failUnless(self.s._detect(self.dirname) == True,
1059                     'Did not detected %(name)s VCS after initialising'
1060                     % vars(self.Class))
1061                 self.s.connect()
1062
1063         def test_no_detection(self):
1064             """See if the VCS detects its installed repository
1065             """
1066             if self.s.installed() and self.Class.name != 'None':
1067                 self.s.disconnect()
1068                 self.s.destroy()
1069                 self.failUnless(self.s._detect(self.dirname) == False,
1070                     'Detected %(name)s VCS before initialising'
1071                     % vars(self.Class))
1072                 self.s.init()
1073                 self.s.connect()
1074
1075         def test_vcs_repo_in_specified_root_path(self):
1076             """VCS root directory should be in specified root path."""
1077             rp = os.path.realpath(self.s.repo)
1078             dp = os.path.realpath(self.dirname)
1079             vcs_name = self.Class.name
1080             self.failUnless(
1081                 dp == rp or rp == None,
1082                 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1083
1084     class VCS_get_user_id_TestCase(VCSTestCase):
1085         """Test cases for VCS.get_user_id method."""
1086
1087         def test_gets_existing_user_id(self):
1088             """Should get the existing user ID."""
1089             if self.s.installed():
1090                 user_id = self.s.get_user_id()
1091                 if user_id == None:
1092                     return
1093                 name,email = libbe.ui.util.user.parse_user_id(user_id)
1094                 if email != None:
1095                     self.failUnless('@' in email, email)
1096
1097     def make_vcs_testcase_subclasses(vcs_class, namespace):
1098         c = vcs_class()
1099         if c.installed():
1100             if c.versioned == True:
1101                 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1102                     vcs_class, namespace)
1103             else:
1104                 libbe.storage.base.make_storage_testcase_subclasses(
1105                     vcs_class, namespace)
1106
1107         if namespace != sys.modules[__name__]:
1108             # Make VCSTestCase subclasses for vcs_class in the namespace.
1109             vcs_testcase_classes = [
1110                 c for c in (
1111                     ob for ob in globals().values() if isinstance(ob, type))
1112                 if issubclass(c, VCSTestCase) \
1113                     and c.Class == VCS]
1114
1115             for base_class in vcs_testcase_classes:
1116                 testcase_class_name = vcs_class.__name__ + base_class.__name__
1117                 testcase_class_bases = (base_class,)
1118                 testcase_class_dict = dict(base_class.__dict__)
1119                 testcase_class_dict['Class'] = vcs_class
1120                 testcase_class = type(
1121                     testcase_class_name, testcase_class_bases, testcase_class_dict)
1122                 setattr(namespace, testcase_class_name, testcase_class)
1123
1124     make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1125
1126     unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1127     suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])