Transition to libbe.LOG for logging
[be.git] / libbe / storage / vcs / base.py
1 # Copyright (C) 2005-2012 Aaron Bentley <abentley@panoramicfeedback.com>
2 #                         Alexander Belchenko <bialix@ukr.net>
3 #                         Ben Finney <benf@cybersource.com.au>
4 #                         Chris Ball <cjb@laptop.org>
5 #                         Gianluca Montecchi <gian@grys.it>
6 #                         W. Trevor King <wking@tremily.us>
7 #
8 # This file is part of Bugs Everywhere.
9 #
10 # Bugs Everywhere is free software: you can redistribute it and/or modify it
11 # under the terms of the GNU General Public License as published by the Free
12 # Software Foundation, either version 2 of the License, or (at your option) any
13 # later version.
14 #
15 # Bugs Everywhere is distributed in the hope that it will be useful, but
16 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
17 # FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
18 # more details.
19 #
20 # You should have received a copy of the GNU General Public License along with
21 # Bugs Everywhere.  If not, see <http://www.gnu.org/licenses/>.
22
23 """Define the base :py:class:`VCS` (Version Control System) class, which
24 should be subclassed by other Version Control System backends.  The
25 base class implements a "do not version" VCS.
26 """
27
28 import codecs
29 import os
30 import os.path
31 import re
32 import shutil
33 import sys
34 import tempfile
35 import types
36
37 import libbe
38 import libbe.storage
39 import libbe.storage.base
40 import libbe.util.encoding
41 from libbe.storage.base import EmptyCommit, InvalidRevision, InvalidID
42 from libbe.util.utility import Dir, search_parent_directories
43 from libbe.util.subproc import CommandError, invoke
44 from libbe.util.plugin import import_by_name
45 import libbe.storage.util.upgrade as upgrade
46
47 if libbe.TESTING == True:
48     import unittest
49     import doctest
50
51     import libbe.ui.util.user
52
53 VCS_ORDER = ['arch', 'bzr', 'darcs', 'git', 'hg', 'monotone']
54 """List VCS modules in order of preference.
55
56 Don't list this module, it is implicitly last.
57 """
58
59 def set_preferred_vcs(name):
60     """Manipulate :py:data:`VCS_ORDER` to place `name` first.
61
62     This is primarily indended for testing purposes.
63     """
64     global VCS_ORDER
65     assert name in VCS_ORDER, \
66         'unrecognized VCS %s not in\n  %s' % (name, VCS_ORDER)
67     VCS_ORDER.remove(name)
68     VCS_ORDER.insert(0, name)
69
70 def _get_matching_vcs(matchfn):
71     """Return the first module for which matchfn(VCS_instance) is True.
72
73     Searches in :py:data:`VCS_ORDER`.
74     """
75     for submodname in VCS_ORDER:
76         module = import_by_name('libbe.storage.vcs.%s' % submodname)
77         vcs = module.new()
78         if matchfn(vcs) == True:
79             return vcs
80     return VCS()
81
82 def vcs_by_name(vcs_name):
83     """Return the module for the VCS with the given name.
84
85     Searches in :py:data:`VCS_ORDER`.
86     """
87     if vcs_name == VCS.name:
88         return new()
89     return _get_matching_vcs(lambda vcs: vcs.name == vcs_name)
90
91 def detect_vcs(dir):
92     """Return an VCS instance for the vcs being used in this directory.
93
94     Searches in :py:data:`VCS_ORDER`.
95     """
96     return _get_matching_vcs(lambda vcs: vcs._detect(dir))
97
98 def installed_vcs():
99     """Return an instance of an installed VCS.
100
101     Searches in :py:data:`VCS_ORDER`.
102     """
103     return _get_matching_vcs(lambda vcs: vcs.installed())
104
105
106 class VCSNotRooted (libbe.storage.base.ConnectionError):
107     def __init__(self, vcs):
108         msg = 'VCS not rooted'
109         libbe.storage.base.ConnectionError.__init__(self, msg)
110         self.vcs = vcs
111
112 class VCSUnableToRoot (libbe.storage.base.ConnectionError):
113     def __init__(self, vcs):
114         msg = 'VCS unable to root'
115         libbe.storage.base.ConnectionError.__init__(self, msg)
116         self.vcs = vcs
117
118 class InvalidPath (InvalidID):
119     def __init__(self, path, root, msg=None, **kwargs):
120         if msg == None:
121             msg = 'Path "%s" not in root "%s"' % (path, root)
122         InvalidID.__init__(self, msg=msg, **kwargs)
123         self.path = path
124         self.root = root
125
126 class SpacerCollision (InvalidPath):
127     def __init__(self, path, spacer):
128         msg = 'Path "%s" collides with spacer directory "%s"' % (path, spacer)
129         InvalidPath.__init__(self, path, root=None, msg=msg)
130         self.spacer = spacer
131
132 class NoSuchFile (InvalidID):
133     def __init__(self, pathname, root='.'):
134         path = os.path.abspath(os.path.join(root, pathname))
135         InvalidID.__init__(self, 'No such file: %s' % path)
136
137
138 class CachedPathID (object):
139     """Cache Storage ID <-> path policy.
140  
141     Paths generated following::
142
143        .../.be/BUGDIR/bugs/BUG/comments/COMMENT
144           ^-- root path
145
146     See :py:mod:`libbe.util.id` for a discussion of ID formats.
147
148     Examples
149     --------
150
151     >>> dir = Dir()
152     >>> os.mkdir(os.path.join(dir.path, '.be'))
153     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc'))
154     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs'))
155     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123'))
156     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments'))
157     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def'))
158     >>> os.mkdir(os.path.join(dir.path, '.be', 'abc', 'bugs', '456'))
159     >>> open(os.path.join(dir.path, '.be', 'abc', 'values'),
160     ...      'w').close()
161     >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'values'),
162     ...      'w').close()
163     >>> open(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'),
164     ...      'w').close()
165     >>> c = CachedPathID()
166     >>> c.root(dir.path)
167     >>> c.id(os.path.join(dir.path, '.be', 'abc', 'bugs', '123', 'comments', 'def', 'values'))
168     'def/values'
169     >>> c.init()
170     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
171     ['abc', 'id-cache']
172     >>> c.connect()
173     >>> c.path('123/values') # doctest: +ELLIPSIS
174     u'.../.be/abc/bugs/123/values'
175     >>> c.disconnect()
176     >>> c.destroy()
177     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
178     ['abc']
179     >>> c.connect() # demonstrate auto init
180     >>> sorted(os.listdir(os.path.join(c._root, '.be')))
181     ['abc', 'id-cache']
182     >>> c.add_id(u'xyz', parent=None) # doctest: +ELLIPSIS
183     u'.../.be/xyz'
184     >>> c.add_id('xyz/def', parent='xyz') # doctest: +ELLIPSIS
185     u'.../.be/xyz/def'
186     >>> c.add_id('qrs', parent='123') # doctest: +ELLIPSIS
187     u'.../.be/abc/bugs/123/comments/qrs'
188     >>> c.disconnect()
189     >>> c.connect()
190     >>> c.path('qrs') # doctest: +ELLIPSIS
191     u'.../.be/abc/bugs/123/comments/qrs'
192     >>> c.remove_id('qrs')
193     >>> c.path('qrs')
194     Traceback (most recent call last):
195       ...
196     InvalidID: qrs in revision None
197     >>> c.disconnect()
198     >>> c.destroy()
199     >>> dir.cleanup()
200     """
201     def __init__(self, encoding=None):
202         self.encoding = libbe.util.encoding.get_text_file_encoding()
203         self._spacer_dirs = ['.be', 'bugs', 'comments']
204
205     def root(self, path):
206         self._root = os.path.abspath(path).rstrip(os.path.sep)
207         self._cache_path = os.path.join(
208             self._root, self._spacer_dirs[0], 'id-cache')
209
210     def init(self, cache=None):
211         """Create cache file for an existing .be directory.
212
213         The file contains multiple lines of the form::
214
215             UUID\tPATH
216         """
217         if cache == None:
218             self._cache = {}
219         else:
220             self._cache = cache
221         spaced_root = os.path.join(self._root, self._spacer_dirs[0])
222         for dirpath, dirnames, filenames in os.walk(spaced_root,
223                                                     followlinks=True):
224             if dirpath == spaced_root:
225                 continue
226             try:
227                 id = self.id(dirpath)
228                 relpath = dirpath[len(self._root + os.path.sep):]
229                 if id.count('/') == 0:
230                     if id in self._cache:
231                         libbe.LOG.warning(
232                             'multiple paths for {0}:\n  {1}\n  {2}'.format(
233                                 id, self._cache[id], relpath))
234                     self._cache[id] = relpath
235             except InvalidPath:
236                 pass
237         if self._cache != cache:
238             self._changed = True
239         if cache == None:
240             self.disconnect()
241
242     def destroy(self):
243         if os.path.exists(self._cache_path):
244             os.remove(self._cache_path)
245
246     def connect(self):
247         if not os.path.exists(self._cache_path):
248             try:
249                 self.init()
250             except IOError:
251                 raise libbe.storage.base.ConnectionError
252         self._cache = {} # key: uuid, value: path
253         self._changed = False
254         f = codecs.open(self._cache_path, 'r', self.encoding)
255         for line in f:
256             fields = line.rstrip('\n').split('\t')
257             self._cache[fields[0]] = fields[1]
258         f.close()
259
260     def disconnect(self):
261         if self._changed == True:
262             f = codecs.open(self._cache_path, 'w', self.encoding)
263             for uuid,path in self._cache.items():
264                 f.write('%s\t%s\n' % (uuid, path))
265             f.close()
266         self._cache = {}
267
268     def path(self, id, relpath=False):
269         fields = id.split('/', 1)
270         uuid = fields[0]
271         if len(fields) == 1:
272             extra = []
273         else:
274             extra = fields[1:]
275         if uuid not in self._cache:
276             self.init(cache=self._cache)
277             if uuid not in self._cache:
278                 raise InvalidID(uuid)
279         if relpath == True:
280             return os.path.join(self._cache[uuid], *extra)
281         return os.path.join(self._root, self._cache[uuid], *extra)
282
283     def add_id(self, id, parent=None):
284         if id.count('/') > 0:
285             # not a UUID-level path
286             assert id.startswith(parent), \
287                 'Strange ID: "%s" should start with "%s"' % (id, parent)
288             path = self.path(id)
289         elif id in self._cache:
290             # already added
291             path = self.path(id)
292         else:
293             if parent == None:
294                 parent_path = ''
295                 spacer = self._spacer_dirs[0]
296             else:
297                 assert parent.count('/') == 0, \
298                     'Strange parent ID: "%s" should be UUID' % parent
299                 parent_path = self.path(parent, relpath=True)
300                 parent_spacer = parent_path.split(os.path.sep)[-2]
301                 i = self._spacer_dirs.index(parent_spacer)
302                 spacer = self._spacer_dirs[i+1]
303             path = os.path.join(parent_path, spacer, id)
304             self._cache[id] = path
305             self._changed = True
306             path = os.path.join(self._root, path)
307         return path
308
309     def remove_id(self, id):
310         if id.count('/') > 0:
311             return # not a UUID-level path
312         self._cache.pop(id)
313         self._changed = True
314
315     def id(self, path):
316         path = os.path.join(self._root, path)
317         if not path.startswith(self._root + os.path.sep):
318             raise InvalidPath(path, self._root)
319         path = path[len(self._root + os.path.sep):]
320         orig_path = path
321         if not path.startswith(self._spacer_dirs[0] + os.path.sep):
322             raise InvalidPath(path, self._spacer_dirs[0])
323         for spacer in self._spacer_dirs:
324             if not path.startswith(spacer + os.path.sep):
325                 break
326             id = path[len(spacer + os.path.sep):]
327             fields = path[len(spacer + os.path.sep):].split(os.path.sep,1)
328             if len(fields) == 1:
329                 break
330             path = fields[1]
331         for spacer in self._spacer_dirs:
332             if id.endswith(os.path.sep + spacer):
333                 raise SpacerCollision(orig_path, spacer)
334         if os.path.sep != '/':
335             id = id.replace(os.path.sep, '/')
336         return id
337
338
339 def new():
340     return VCS()
341
342 class VCS (libbe.storage.base.VersionedStorage):
343     """Implement a 'no-VCS' interface.
344
345     Support for other VCSs can be added by subclassing this class, and
346     overriding methods `_vcs_*()` with code appropriate for your VCS.
347
348     The methods `_u_*()` are utility methods available to the `_vcs_*()`
349     methods.
350     """
351     name = 'None'
352     client = 'false' # command-line tool for _u_invoke_client
353
354     def __init__(self, *args, **kwargs):
355         if 'encoding' not in kwargs:
356             kwargs['encoding'] = libbe.util.encoding.get_text_file_encoding()
357         libbe.storage.base.VersionedStorage.__init__(self, *args, **kwargs)
358         self.versioned = False
359         self.interspersed_vcs_files = False
360         self._cached_path_id = CachedPathID()
361         self._rooted = False
362
363     def _vcs_version(self):
364         """
365         Return the VCS version string.
366         """
367         return '0'
368
369     def _vcs_get_user_id(self):
370         """
371         Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
372         If the VCS has not been configured with a username, return None.
373         """
374         return None
375
376     def _vcs_detect(self, path=None):
377         """
378         Detect whether a directory is revision controlled with this VCS.
379         """
380         return True
381
382     def _vcs_root(self, path):
383         """
384         Get the VCS root.  This is the default working directory for
385         future invocations.  You would normally set this to the root
386         directory for your VCS.
387         """
388         if os.path.isdir(path) == False:
389             path = os.path.dirname(path)
390             if path == '':
391                 path = os.path.abspath('.')
392         return path
393
394     def _vcs_init(self, path):
395         """
396         Begin versioning the tree based at path.
397         """
398         pass
399
400     def _vcs_destroy(self):
401         """
402         Remove any files used in versioning (e.g. whatever _vcs_init()
403         created).
404         """
405         pass
406
407     def _vcs_add(self, path):
408         """
409         Add the already created file at path to version control.
410         """
411         pass
412
413     def _vcs_exists(self, path, revision=None):
414         """
415         Does the path exist in a given revision? (True/False)
416         """
417         raise NotImplementedError('Lazy BE developers')
418
419     def _vcs_remove(self, path):
420         """
421         Remove the file at path from version control.  Optionally
422         remove the file from the filesystem as well.
423         """
424         pass
425
426     def _vcs_update(self, path):
427         """
428         Notify the versioning system of changes to the versioned file
429         at path.
430         """
431         pass
432
433     def _vcs_is_versioned(self, path):
434         """
435         Return true if a path is under version control, False
436         otherwise.  You only need to set this if the VCS goes about
437         dumping VCS-specific files into the .be directory.
438
439         If you do need to implement this method (e.g. Arch), set
440           self.interspersed_vcs_files = True
441         """
442         assert self.interspersed_vcs_files == False
443         raise NotImplementedError
444
445     def _vcs_get_file_contents(self, path, revision=None):
446         """
447         Get the file contents as they were in a given revision.
448         Revision==None specifies the current revision.
449         """
450         if revision != None:
451             raise libbe.storage.base.InvalidRevision(
452                 'The %s VCS does not support revision specifiers' % self.name)
453         path = os.path.join(self.repo, path)
454         if not os.path.exists(path):
455             return libbe.util.InvalidObject
456         if os.path.isdir(path):
457             return libbe.storage.base.InvalidDirectory
458         f = open(path, 'rb')
459         contents = f.read()
460         f.close()
461         return contents
462
463     def _vcs_path(self, id, revision):
464         """
465         Return the relative path to object id as of revision.
466         
467         Revision will not be None.
468         """
469         raise NotImplementedError
470
471     def _vcs_isdir(self, path, revision):
472         """
473         Return True if path (as returned by _vcs_path) was a directory
474         as of revision, False otherwise.
475         
476         Revision will not be None.
477         """
478         raise NotImplementedError
479
480     def _vcs_listdir(self, path, revision):
481         """
482         Return a list of the contents of the directory path (as
483         returned by _vcs_path) as of revision.
484         
485         Revision will not be None, and ._vcs_isdir(path, revision)
486         will be True.
487         """
488         raise NotImplementedError
489
490     def _vcs_commit(self, commitfile, allow_empty=False):
491         """
492         Commit the current working directory, using the contents of
493         commitfile as the comment.  Return the name of the old
494         revision (or None if commits are not supported).
495
496         If allow_empty == False, raise EmptyCommit if there are no
497         changes to commit.
498         """
499         return None
500
501     def _vcs_revision_id(self, index):
502         """
503         Return the name of the <index>th revision.  Index will be an
504         integer (possibly <= 0).  The choice of which branch to follow
505         when crossing branches/merges is not defined.
506
507         Return None if revision IDs are not supported, or if the
508         specified revision does not exist.
509         """
510         return None
511
512     def _vcs_changed(self, revision):
513         """
514         Return a tuple of lists of ids
515           (new, modified, removed)
516         from the specified revision to the current situation.
517         """
518         return ([], [], [])
519
520     def version(self):
521         # Cache version string for efficiency.
522         if not hasattr(self, '_version'):
523             self._version = self._vcs_version()
524         return self._version
525
526     def version_cmp(self, *args):
527         """Compare the installed VCS version `V_i` with another version
528         `V_o` (given in `*args`).  Returns
529
530            === ===============
531             1  if `V_i > V_o`
532             0  if `V_i == V_o`
533            -1  if `V_i < V_o`
534            === ===============
535
536         Examples
537         --------
538
539         >>> v = VCS(repo='.')
540         >>> v._version = '2.3.1 (release)'
541         >>> v.version_cmp(2,3,1)
542         0
543         >>> v.version_cmp(2,3,2)
544         -1
545         >>> v.version_cmp(2,3,'a',5)
546         1
547         >>> v.version_cmp(2,3,0)
548         1
549         >>> v.version_cmp(2,3,1,'a',5)
550         1
551         >>> v.version_cmp(2,3,1,1)
552         -1
553         >>> v.version_cmp(3)
554         -1
555         >>> v._version = '2.0.0pre2'
556         >>> v._parsed_version = None
557         >>> v.version_cmp(3)
558         -1
559         >>> v.version_cmp(2,0,1)
560         -1
561         >>> v.version_cmp(2,0,0,'pre',1)
562         1
563         >>> v.version_cmp(2,0,0,'pre',2)
564         0
565         >>> v.version_cmp(2,0,0,'pre',3)
566         -1
567         >>> v.version_cmp(2,0,0,'a',3)
568         1
569         >>> v.version_cmp(2,0,0,'rc',1)
570         -1
571         """
572         if not hasattr(self, '_parsed_version') \
573                 or self._parsed_version == None:
574             num_part = self.version().split(' ')[0]
575             self._parsed_version = []
576             for num in num_part.split('.'):
577                 try:
578                     self._parsed_version.append(int(num))
579                 except ValueError, e:
580                     # bzr version number might contain non-numerical tags
581                     splitter = re.compile(r'[\D]') # Match non-digits
582                     splits = splitter.split(num)
583                     # if len(tag) > 1 some splits will be empty; remove
584                     splits = filter(lambda s: s != '', splits)
585                     tag_starti = len(splits[0])
586                     num_starti = num.find(splits[1], tag_starti)
587                     tag = num[tag_starti:num_starti]
588                     self._parsed_version.append(int(splits[0]))
589                     self._parsed_version.append(tag)
590                     self._parsed_version.append(int(splits[1]))
591         for current,other in zip(self._parsed_version, args):
592             if type(current) != type (other):
593                 # one of them is a pre-release string
594                 if type(current) != types.IntType:
595                     return -1
596                 else:
597                     return 1
598             c = cmp(current,other)
599             if c != 0:
600                 return c
601         # see if one is longer than the other
602         verlen = len(self._parsed_version)
603         arglen = len(args)
604         if verlen == arglen:
605             return 0
606         elif verlen > arglen:
607             if type(self._parsed_version[arglen]) != types.IntType:
608                 return -1 # self is a prerelease
609             else:
610                 return 1
611         else:
612             if type(args[verlen]) != types.IntType:
613                 return 1 # args is a prerelease
614             else:
615                 return -1
616
617     def installed(self):
618         if self.version() != None:
619             return True
620         return False
621
622     def get_user_id(self):
623         """
624         Get the VCS's suggested user id (e.g. "John Doe <jdoe@example.com>").
625         If the VCS has not been configured with a username, return None.
626         You can override the automatic lookup procedure by setting the
627         VCS.user_id attribute to a string of your choice.
628         """
629         if not hasattr(self, 'user_id'):
630             self.user_id = self._vcs_get_user_id()
631             if self.user_id == None:
632                 # guess missing info
633                 name = libbe.ui.util.user.get_fallback_fullname()
634                 email = libbe.ui.util.user.get_fallback_email()
635                 self.user_id = libbe.ui.util.user.create_user_id(name, email)
636         return self.user_id
637
638     def _detect(self, path='.'):
639         """
640         Detect whether a directory is revision controlled with this VCS.
641         """
642         return self._vcs_detect(path)
643
644     def root(self):
645         """Set the root directory to the path's VCS root.
646
647         This is the default working directory for future invocations.
648         Consider the following usage case:
649
650         You have a project rooted in::
651
652           /path/to/source/
653
654         by which I mean the VCS repository is in, for example::
655
656           /path/to/source/.bzr
657
658         However, you're of in some subdirectory like::
659
660           /path/to/source/ui/testing
661
662         and you want to comment on a bug.  `root` will locate your VCS
663         root (``/path/to/source/``) and set the repo there.  This
664         means that it doesn't matter where you are in your project
665         tree when you call "be COMMAND", it always acts as if you called
666         it from the VCS root.
667         """
668         if self._detect(self.repo) == False:
669             raise VCSUnableToRoot(self)
670         root = self._vcs_root(self.repo)
671         self.repo = os.path.realpath(root)
672         if os.path.isdir(self.repo) == False:
673             self.repo = os.path.dirname(self.repo)
674         self.be_dir = os.path.join(
675             self.repo, self._cached_path_id._spacer_dirs[0])
676         self._cached_path_id.root(self.repo)
677         self._rooted = True
678
679     def _init(self):
680         """
681         Begin versioning the tree based at self.repo.
682         Also roots the vcs at path.
683
684         See Also
685         --------
686         root : called if the VCS has already been initialized.
687         """
688         if not os.path.exists(self.repo) or not os.path.isdir(self.repo):
689             raise VCSUnableToRoot(self)
690         if self._vcs_detect(self.repo) == False:
691             self._vcs_init(self.repo)
692         if self._rooted == False:
693             self.root()
694         os.mkdir(self.be_dir)
695         self._vcs_add(self._u_rel_path(self.be_dir))
696         self._setup_storage_version()
697         self._cached_path_id.init()
698
699     def _destroy(self):
700         self._vcs_destroy()
701         self._cached_path_id.destroy()
702         if os.path.exists(self.be_dir):
703             shutil.rmtree(self.be_dir)
704
705     def _connect(self):
706         if self._rooted == False:
707             self.root()
708         if not os.path.isdir(self.be_dir):
709             raise libbe.storage.base.ConnectionError(self)
710         self._cached_path_id.connect()
711         self.check_storage_version()
712
713     def _disconnect(self):
714         self._cached_path_id.disconnect()
715
716     def path(self, id, revision=None, relpath=True):
717         if revision == None:
718             path = self._cached_path_id.path(id)
719             if relpath == True:
720                 return self._u_rel_path(path)
721             return path
722         path = self._vcs_path(id, revision)
723         if relpath == True:
724             return path
725         return os.path.join(self.repo, path)
726
727     def _add_path(self, path, directory=False):
728         relpath = self._u_rel_path(path)
729         reldirs = relpath.split(os.path.sep)
730         if directory == False:
731             reldirs = reldirs[:-1]
732         dir = self.repo
733         for reldir in reldirs:
734             dir = os.path.join(dir, reldir)
735             if not os.path.exists(dir):
736                 os.mkdir(dir)
737                 self._vcs_add(self._u_rel_path(dir))
738             elif not os.path.isdir(dir):
739                 raise libbe.storage.base.InvalidDirectory
740         if directory == False:
741             if not os.path.exists(path):
742                 open(path, 'w').close()
743             self._vcs_add(self._u_rel_path(path))
744
745     def _add(self, id, parent=None, **kwargs):
746         path = self._cached_path_id.add_id(id, parent)
747         self._add_path(path, **kwargs)
748
749     def _exists(self, id, revision=None):
750         if revision == None:
751             try:
752                 path = self.path(id, revision, relpath=False)
753             except InvalidID, e:
754                 return False
755             return os.path.exists(path)
756         path = self.path(id, revision, relpath=True)
757         return self._vcs_exists(relpath, revision)
758
759     def _remove(self, id):
760         path = self._cached_path_id.path(id)
761         if os.path.exists(path):
762             if os.path.isdir(path) and len(self.children(id)) > 0:
763                 raise libbe.storage.base.DirectoryNotEmpty(id)
764             self._vcs_remove(self._u_rel_path(path))
765             if os.path.exists(path):
766                 if os.path.isdir(path):
767                     os.rmdir(path)
768                 else:
769                     os.remove(path)
770         self._cached_path_id.remove_id(id)
771
772     def _recursive_remove(self, id):
773         path = self._cached_path_id.path(id)
774         for dirpath,dirnames,filenames in os.walk(path, topdown=False):
775             filenames.extend(dirnames)
776             for f in filenames:
777                 fullpath = os.path.join(dirpath, f)
778                 if os.path.exists(fullpath) == False:
779                     continue
780                 self._vcs_remove(self._u_rel_path(fullpath))
781         if os.path.exists(path):
782             shutil.rmtree(path)
783         path = self._cached_path_id.path(id, relpath=True)
784         for id,p in self._cached_path_id._cache.items():
785             if p.startswith(path):
786                 self._cached_path_id.remove_id(id)
787
788     def _ancestors(self, id=None, revision=None):
789         if id==None:
790             path = self.be_dir
791         else:
792             path = self.path(id, revision, relpath=False)
793         ancestors = []
794         while True:
795             if not path.startswith(self.repo + os.path.sep):
796                 break
797             path = os.path.dirname(path)
798             try:
799                 id = self._u_path_to_id(path)
800                 ancestors.append(id)
801             except (SpacerCollision, InvalidPath):
802                 pass    
803         return ancestors
804
805     def _children(self, id=None, revision=None):
806         if revision == None:
807             isdir = os.path.isdir
808             listdir = os.listdir
809         else:
810             isdir = lambda path : self._vcs_isdir(
811                 self._u_rel_path(path), revision)
812             listdir = lambda path : self._vcs_listdir(
813                 self._u_rel_path(path), revision)
814         if id==None:
815             path = self.be_dir
816         else:
817             path = self.path(id, revision, relpath=False)
818         if isdir(path) == False: 
819             return []
820         children = listdir(path)
821         for i,c in enumerate(children):
822             if c in self._cached_path_id._spacer_dirs:
823                 children[i] = None
824                 children.extend([os.path.join(c, c2) for c2 in
825                                  listdir(os.path.join(path, c))])
826             elif c in ['id-cache', 'version']:
827                 children[i] = None
828             elif self.interspersed_vcs_files \
829                     and self._vcs_is_versioned(c) == False:
830                 children[i] = None
831         for i,c in enumerate(children):
832             if c == None: continue
833             cpath = os.path.join(path, c)
834             if self.interspersed_vcs_files == True \
835                     and revision != None \
836                     and self._vcs_is_versioned(cpath) == False:
837                 children[i] = None
838             else:
839                 children[i] = self._u_path_to_id(cpath)
840         return [c for c in children if c != None]
841
842     def _get(self, id, default=libbe.util.InvalidObject, revision=None):
843         try:
844             relpath = self.path(id, revision, relpath=True)
845             contents = self._vcs_get_file_contents(relpath, revision)
846         except InvalidID, e:
847             if default == libbe.util.InvalidObject:
848                 raise e
849             return default
850         if contents in [libbe.storage.base.InvalidDirectory,
851                         libbe.util.InvalidObject] \
852                 or len(contents) == 0:
853             if default == libbe.util.InvalidObject:
854                 raise InvalidID(id, revision)
855             return default
856         return contents
857
858     def _set(self, id, value):
859         try:
860             path = self._cached_path_id.path(id)
861         except InvalidID, e:
862             raise
863         if not os.path.exists(path):
864             raise InvalidID(id)
865         if os.path.isdir(path):
866             raise libbe.storage.base.InvalidDirectory(id)
867         f = open(path, "wb")
868         f.write(value)
869         f.close()
870         self._vcs_update(self._u_rel_path(path))
871
872     def _commit(self, summary, body=None, allow_empty=False):
873         summary = summary.strip()+'\n'
874         if body is not None:
875             summary += '\n' + body.strip() + '\n'
876         descriptor, filename = tempfile.mkstemp()
877         revision = None
878         try:
879             temp_file = os.fdopen(descriptor, 'wb')
880             temp_file.write(summary)
881             temp_file.flush()
882             revision = self._vcs_commit(filename, allow_empty=allow_empty)
883             temp_file.close()
884         finally:
885             os.remove(filename)
886         return revision
887
888     def revision_id(self, index=None):
889         if index == None:
890             return None
891         try:
892             if int(index) != index:
893                 raise InvalidRevision(index)
894         except ValueError:
895             raise InvalidRevision(index)
896         revid = self._vcs_revision_id(index)
897         if revid == None:
898             raise libbe.storage.base.InvalidRevision(index)
899         return revid
900
901     def changed(self, revision):
902         new,mod,rem = self._vcs_changed(revision)
903         def paths_to_ids(paths):
904             for p in paths:
905                 try:
906                     id = self._u_path_to_id(p)
907                     yield id
908                 except (SpacerCollision, InvalidPath):
909                     pass
910         new_id = list(paths_to_ids(new))
911         mod_id = list(paths_to_ids(mod))
912         rem_id = list(paths_to_ids(rem))
913         return (new_id, mod_id, rem_id)
914
915     def _u_any_in_string(self, list, string):
916         """Return True if any of the strings in list are in string.
917         Otherwise return False.
918         """
919         for list_string in list:
920             if list_string in string:
921                 return True
922         return False
923
924     def _u_invoke(self, *args, **kwargs):
925         if 'cwd' not in kwargs:
926             kwargs['cwd'] = self.repo
927         if 'encoding' not in kwargs:
928             kwargs['encoding'] = self.encoding
929         return invoke(*args, **kwargs)
930
931     def _u_invoke_client(self, *args, **kwargs):
932         cl_args = [self.client]
933         cl_args.extend(args)
934         return self._u_invoke(cl_args, **kwargs)
935
936     def _u_search_parent_directories(self, path, filename):
937         """Find the file (or directory) named filename in path or in any of
938         path's parents.
939
940         e.g.
941           search_parent_directories("/a/b/c", ".be")
942         will return the path to the first existing file from
943           /a/b/c/.be
944           /a/b/.be
945           /a/.be
946           /.be
947         or None if none of those files exist.
948         """
949         try:
950             ret = search_parent_directories(path, filename)
951         except AssertionError, e:
952             return None
953         return ret
954
955     def _u_find_id_from_manifest(self, id, manifest, revision=None):
956         """Search for the relative path to id using manifest, a list of all
957         files.
958         
959         Returns None if the id is not found.
960         """
961         be_dir = self._cached_path_id._spacer_dirs[0]
962         be_dir_sep = self._cached_path_id._spacer_dirs[0] + os.path.sep
963         files = [f for f in manifest if f.startswith(be_dir_sep)]
964         for file in files:
965             if not file.startswith(be_dir+os.path.sep):
966                 continue
967             parts = file.split(os.path.sep)
968             dir = parts.pop(0) # don't add the first spacer dir
969             for part in parts[:-1]:
970                 dir = os.path.join(dir, part)
971                 if not dir in files:
972                     files.append(dir)
973         for file in files:
974             try:
975                 p_id = self._u_path_to_id(file)
976                 if p_id == id:
977                     return file
978             except (SpacerCollision, InvalidPath):
979                 pass
980         raise InvalidID(id, revision=revision)
981
982     def _u_find_id(self, id, revision):
983         """Search for the relative path to id as of revision.
984
985         Returns None if the id is not found.
986         """
987         assert self._rooted == True
988         be_dir = self._cached_path_id._spacer_dirs[0]
989         stack = [(be_dir, be_dir)]
990         while len(stack) > 0:
991             path,long_id = stack.pop()
992             if long_id.endswith('/'+id):
993                 return path
994             if self._vcs_isdir(path, revision) == False:
995                 continue
996             for child in self._vcs_listdir(path, revision):
997                 stack.append((os.path.join(path, child),
998                               '/'.join([long_id, child])))
999         raise InvalidID(id, revision=revision)
1000
1001     def _u_path_to_id(self, path):
1002         return self._cached_path_id.id(path)
1003
1004     def _u_rel_path(self, path, root=None):
1005         """Return the relative path to path from root.
1006
1007         Examples:
1008
1009         >>> vcs = new()
1010         >>> vcs._u_rel_path("/a.b/c/.be", "/a.b/c")
1011         '.be'
1012         >>> vcs._u_rel_path("/a.b/c/", "/a.b/c")
1013         '.'
1014         >>> vcs._u_rel_path("/a.b/c/", "/a.b/c/")
1015         '.'
1016         >>> vcs._u_rel_path("./a", ".")
1017         'a'
1018         """
1019         if root == None:
1020             if self.repo == None:
1021                 raise VCSNotRooted(self)
1022             root = self.repo
1023         path = os.path.abspath(path)
1024         absRoot = os.path.abspath(root)
1025         absRootSlashedDir = os.path.join(absRoot,"")
1026         if path in [absRoot, absRootSlashedDir]:
1027             return '.'
1028         if not path.startswith(absRootSlashedDir):
1029             raise InvalidPath(path, absRootSlashedDir)
1030         relpath = path[len(absRootSlashedDir):]
1031         return relpath
1032
1033     def _u_abspath(self, path, root=None):
1034         """Return the absolute path from a path relative to root.
1035
1036         Examples
1037         --------
1038
1039         >>> vcs = new()
1040         >>> vcs._u_abspath(".be", "/a.b/c")
1041         '/a.b/c/.be'
1042         """
1043         if root == None:
1044             assert self.repo != None, "VCS not rooted"
1045             root = self.repo
1046         return os.path.abspath(os.path.join(root, path))
1047
1048     def _u_parse_commitfile(self, commitfile):
1049         """Split the commitfile created in self.commit() back into summary and
1050         header lines.
1051         """
1052         f = codecs.open(commitfile, 'r', self.encoding)
1053         summary = f.readline()
1054         body = f.read()
1055         body.lstrip('\n')
1056         if len(body) == 0:
1057             body = None
1058         f.close()
1059         return (summary, body)
1060
1061     def check_storage_version(self):
1062         version = self.storage_version()
1063         if version != libbe.storage.STORAGE_VERSION:
1064             upgrade.upgrade(self.repo, version)
1065
1066     def storage_version(self, revision=None, path=None):
1067         """Return the storage version of the on-disk files.
1068
1069         See Also
1070         --------
1071         libbe.storage.util.upgrade
1072         """
1073         if path == None:
1074             path = os.path.join(self.repo, '.be', 'version')
1075         if not os.path.exists(path):
1076             raise libbe.storage.InvalidStorageVersion(None)
1077         if revision == None: # don't require connection
1078             return libbe.util.encoding.get_file_contents(
1079                 path, decode=True).rstrip()
1080         relpath = self._u_rel_path(path)
1081         contents = self._vcs_get_file_contents(relpath, revision=revision)
1082         if type(contents) != types.UnicodeType:
1083             contents = unicode(contents, self.encoding)
1084         return contents.strip()
1085
1086     def _setup_storage_version(self):
1087         """
1088         Requires disk access.
1089         """
1090         assert self._rooted == True
1091         path = os.path.join(self.be_dir, 'version')
1092         if not os.path.exists(path):
1093             libbe.util.encoding.set_file_contents(path,
1094                 libbe.storage.STORAGE_VERSION+'\n')
1095             self._vcs_add(self._u_rel_path(path))
1096
1097
1098 if libbe.TESTING == True:
1099     class VCSTestCase (unittest.TestCase):
1100         """
1101         Test cases for base VCS class (in addition to the Storage test
1102         cases).
1103         """
1104
1105         Class = VCS
1106
1107         def __init__(self, *args, **kwargs):
1108             super(VCSTestCase, self).__init__(*args, **kwargs)
1109             self.dirname = None
1110
1111         def setUp(self):
1112             """Set up test fixtures for Storage test case."""
1113             super(VCSTestCase, self).setUp()
1114             self.dir = Dir()
1115             self.dirname = self.dir.path
1116             self.s = self.Class(repo=self.dirname)
1117             if self.s.installed() == True:
1118                 self.s.init()
1119                 self.s.connect()
1120
1121         def tearDown(self):
1122             super(VCSTestCase, self).tearDown()
1123             if self.s.installed() == True:
1124                 self.s.disconnect()
1125                 self.s.destroy()
1126             self.dir.cleanup()
1127
1128     class VCS_installed_TestCase (VCSTestCase):
1129         def test_installed(self):
1130             """See if the VCS is installed.
1131             """
1132             self.failUnless(self.s.installed() == True,
1133                             '%(name)s VCS not found' % vars(self.Class))
1134
1135
1136     class VCS_detection_TestCase (VCSTestCase):
1137         def test_detection(self):
1138             """See if the VCS detects its installed repository
1139             """
1140             if self.s.installed():
1141                 self.s.disconnect()
1142                 self.failUnless(self.s._detect(self.dirname) == True,
1143                     'Did not detected %(name)s VCS after initialising'
1144                     % vars(self.Class))
1145                 self.s.connect()
1146
1147         def test_no_detection(self):
1148             """See if the VCS detects its installed repository
1149             """
1150             if self.s.installed() and self.Class.name != 'None':
1151                 self.s.disconnect()
1152                 self.s.destroy()
1153                 self.failUnless(self.s._detect(self.dirname) == False,
1154                     'Detected %(name)s VCS before initialising'
1155                     % vars(self.Class))
1156                 self.s.init()
1157                 self.s.connect()
1158
1159         def test_vcs_repo_in_specified_root_path(self):
1160             """VCS root directory should be in specified root path."""
1161             rp = os.path.realpath(self.s.repo)
1162             dp = os.path.realpath(self.dirname)
1163             vcs_name = self.Class.name
1164             self.failUnless(
1165                 dp == rp or rp == None,
1166                 "%(vcs_name)s VCS root in wrong dir (%(dp)s %(rp)s)" % vars())
1167
1168     class VCS_get_user_id_TestCase(VCSTestCase):
1169         """Test cases for VCS.get_user_id method."""
1170
1171         def test_get_existing_user_id(self):
1172             """Should get the existing user ID."""
1173             if self.s.installed():
1174                 user_id = self.s.get_user_id()
1175                 if user_id == None:
1176                     return
1177                 name,email = libbe.ui.util.user.parse_user_id(user_id)
1178                 if email != None:
1179                     self.failUnless('@' in email, email)
1180
1181     def make_vcs_testcase_subclasses(vcs_class, namespace):
1182         c = vcs_class()
1183         if c.installed():
1184             if c.versioned == True:
1185                 libbe.storage.base.make_versioned_storage_testcase_subclasses(
1186                     vcs_class, namespace)
1187             else:
1188                 libbe.storage.base.make_storage_testcase_subclasses(
1189                     vcs_class, namespace)
1190
1191         if namespace != sys.modules[__name__]:
1192             # Make VCSTestCase subclasses for vcs_class in the namespace.
1193             vcs_testcase_classes = [
1194                 c for c in (
1195                     ob for ob in globals().values() if isinstance(ob, type))
1196                 if issubclass(c, VCSTestCase) \
1197                     and c.Class == VCS]
1198
1199             for base_class in vcs_testcase_classes:
1200                 testcase_class_name = vcs_class.__name__ + base_class.__name__
1201                 testcase_class_bases = (base_class,)
1202                 testcase_class_dict = dict(base_class.__dict__)
1203                 testcase_class_dict['Class'] = vcs_class
1204                 testcase_class = type(
1205                     testcase_class_name, testcase_class_bases, testcase_class_dict)
1206                 setattr(namespace, testcase_class_name, testcase_class)
1207
1208     make_vcs_testcase_subclasses(VCS, sys.modules[__name__])
1209
1210     unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
1211     suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])