Transitioned help to Command-format
[be.git] / libbe / command / base.py
1 # Copyright
2
3 import codecs
4 import optparse
5 import os.path
6 import sys
7
8 import libbe
9 import libbe.ui.util.user
10 import libbe.util.encoding
11 import libbe.util.plugin
12
13 class UserError(Exception):
14     pass
15
16 class UnknownCommand(UserError):
17     def __init__(self, cmd):
18         Exception.__init__(self, "Unknown command '%s'" % cmd)
19         self.cmd = cmd
20
21
22 def get_command(command_name):
23     """Retrieves the module for a user command
24
25     >>> try:
26     ...     get_command('asdf')
27     ... except UnknownCommand, e:
28     ...     print e
29     Unknown command 'asdf'
30     >>> repr(get_command('list')).startswith("<module 'libbe.command.list' from ")
31     True
32     """
33     try:
34         cmd = libbe.util.plugin.import_by_name(
35             'libbe.command.%s' % command_name.replace("-", "_"))
36     except ImportError, e:
37         raise UnknownCommand(command_name)
38     return cmd
39
40 def get_command_class(module, command_name):
41     """Retrieves a command class from a module.
42
43     >>> import_xml_mod = get_command('import-xml')
44     >>> import_xml = get_command_class(import_xml_mod, 'import-xml')
45     >>> repr(import_xml)
46     "<class 'libbe.command.import_xml.Import_XML'>"
47     """
48     try:
49         cname = command_name.capitalize().replace('-', '_')
50         cmd = getattr(module, cname)
51     except ImportError, e:
52         raise UnknownCommand(command_name)
53     return cmd
54
55 def commands():
56     for modname in libbe.util.plugin.modnames('libbe.command'):
57         if modname not in ['base', 'util']:
58             yield modname
59
60 class CommandInput (object):
61     def __init__(self, name, help=''):
62         self.name = name
63         self.help = help
64
65 class Argument (CommandInput):
66     def __init__(self, metavar=None, default=None, type='string',
67                  optional=False, repeatable=False,
68                  completion_callback=None, *args, **kwargs):
69         CommandInput.__init__(self, *args, **kwargs)
70         self.metavar = metavar
71         self.default = default
72         self.type = type
73         self.optional = optional
74         self.repeatable = repeatable
75         self.completion_callback = completion_callback
76         if self.metavar == None:
77             self.metavar = self.name.upper()
78
79 class Option (CommandInput):
80     def __init__(self, callback=None, short_name=None, arg=None,
81                  *args, **kwargs):
82         CommandInput.__init__(self, *args, **kwargs)
83         self.callback = callback
84         self.short_name = short_name
85         self.arg = arg
86         if self.arg == None and self.callback == None:
87             # use an implicit boolean argument
88             self.arg = Argument(name=self.name, help=self.help,
89                                 default=False, type='bool')
90         self.validate()
91
92     def validate(self):
93         if self.arg == None:
94             assert self.callback != None, self.name
95             return
96         assert self.callback == None, '%s: %s' (self.name, self.callback)
97         assert self.arg.name == self.name, \
98             'Name missmatch: %s != %s' % (self.arg.name, self.name)
99         assert self.arg.optional == False, self.name
100         assert self.arg.repeatable == False, self.name
101
102     def __str__(self):
103         return '--%s' % self.name
104
105     def __repr__(self):
106         return '<Option %s>' % self.__str__()
107
108 class _DummyParser (optparse.OptionParser):
109     def __init__(self, command):
110         optparse.OptionParser.__init__(self)
111         self.remove_option('-h')
112         self.command = command
113         self._command_opts = []
114         for option in self.command.options:
115             self._add_option(option)
116
117     def _add_option(self, option):
118         # from libbe.ui.command_line.CmdOptionParser._add_option
119         option.validate()
120         long_opt = '--%s' % option.name
121         if option.short_name != None:
122             short_opt = '-%s' % option.short_name
123         assert '_' not in option.name, \
124             'Non-reconstructable option name %s' % option.name
125         kwargs = {'dest':option.name.replace('-', '_'),
126                   'help':option.help}
127         if option.arg == None or option.arg.type == 'bool':
128             kwargs['action'] = 'store_true'
129             kwargs['metavar'] = None
130             kwargs['default'] = False
131         else:
132             kwargs['type'] = option.arg.type
133             kwargs['action'] = 'store'
134             kwargs['metavar'] = option.arg.metavar
135             kwargs['default'] = option.arg.default
136         if option.short_name != None:
137             opt = optparse.Option(short_opt, long_opt, **kwargs)
138         else:
139             opt = optparse.Option(long_opt, **kwargs)
140         #option.takes_value = lambda : option.arg != None
141         opt._option = option
142         self._command_opts.append(opt)
143         self.add_option(opt)
144
145 class OptionFormatter (optparse.IndentedHelpFormatter):
146     def __init__(self, command):
147         optparse.IndentedHelpFormatter.__init__(self)
148         self.command = command
149     def option_help(self):
150         # based on optparse.OptionParser.format_option_help()
151         parser = _DummyParser(self.command)
152         self.store_option_strings(parser)
153         ret = []
154         ret.append(self.format_heading('Options'))
155         self.indent()
156         for option in parser._command_opts:
157             ret.append(self.format_option(option))
158             ret.append('\n')
159         self.dedent()
160         # Drop the last '\n', or the header if no options or option groups:
161         return ''.join(ret[:-1])
162
163 class Command (object):
164     """One-line command description.
165
166     >>> c = Command()
167     >>> print c.help()
168     usage: be command [options]
169     <BLANKLINE>
170     Options:
171       -h, --help  Print a help message.
172     <BLANKLINE>
173       --complete  Print a list of possible completions.
174     <BLANKLINE>
175     A detailed help message.
176     """
177
178     name = 'command'
179
180     def __init__(self, input_encoding=None, output_encoding=None):
181         self.status = None
182         self.result = None
183         self.ui = None # calling user-interface, e.g. for Help()
184         self.requires_bugdir = False
185         self.requires_storage = False
186         self.requires_unconnected_storage = False
187         self.restrict_file_access = True
188         self.input_encoding = None
189         self.output_encoding = None
190         self.options = [
191             Option(name='help', short_name='h',
192                 help='Print a help message.',
193                 callback=self.help),
194             Option(name='complete',
195                 help='Print a list of possible completions.',
196                 callback=self.complete),
197                 ]
198         self.args = []
199
200     def run(self, storage=None, bugdir=None, options=None, args=None):
201         if options == None:
202             options = {}
203         if args == None:
204             args = []
205         params = {}
206         for option in self.options:
207             assert option.name not in params, params[option.name]
208             if option.name in options:
209                 params[option.name] = options.pop(option.name)
210             elif option.arg != None:
211                 params[option.name] = option.arg.default
212             else: # non-arg options are flags, set to default flag value
213                 params[option.name] = False
214         assert 'user-id' not in params, params['user-id']
215         if 'user-id' in options:
216             params['user-id'] = options.pop('user-id')
217         else:
218             params['user-id'] = libbe.ui.util.user.get_user_id(storage)
219         if len(options) > 0:
220             raise UserError, 'Invalid option passed to command %s:\n  %s' \
221                 % (self.name, '\n  '.join(['%s: %s' % (k,v)
222                                            for k,v in options.items()]))
223         in_optional_args = False
224         for i,arg in enumerate(self.args):
225             if arg.repeatable == True:
226                 assert i == len(self.args)-1, arg.name
227             if in_optional_args == True:
228                 assert arg.optional == True, arg.name
229             else:
230                 in_optional_args = arg.optional
231             if i < len(args):
232                 if arg.repeatable == True:
233                     params[arg.name] = [args[i]]
234                 else:
235                     params[arg.name] = args[i]
236             else:  # no value given
237                 assert in_optional_args == True, arg.name
238                 if arg.repeatable == True:
239                     params[arg.name] = [arg.default]
240                 else:
241                     params[arg.name] = arg.default
242         if len(args) > len(self.args):  # add some additional repeats
243             assert self.args[-1].repeatable == True, self.args[-1].name
244             params[self.args[-1].name].extend(args[len(self.args):])
245
246         if params['help'] == True:
247             pass
248         else:
249             params.pop('help')
250         if params['complete'] != None:
251             pass
252         else:
253             params.pop('complete')
254
255         self._setup_io(self.input_encoding, self.output_encoding)
256         self.status = self._run(storage, bugdir, **params)
257         return self.status
258
259     def _run(self, storage, bugdir, **kwargs):
260         pass
261
262     def _setup_io(self, input_encoding=None, output_encoding=None):
263         if input_encoding == None:
264             input_encoding = libbe.util.encoding.get_input_encoding()
265         if output_encoding == None:
266             output_encoding = libbe.util.encoding.get_output_encoding()
267         self.stdin = codecs.getwriter(input_encoding)(sys.stdin)
268         self.stdin.encoding = input_encoding
269         self.stdout = codecs.getwriter(output_encoding)(sys.stdout)
270         self.stdout.encoding = output_encoding
271
272     def help(self, *args):       
273         return '\n\n'.join([self._usage(),
274                             self._option_help(),
275                             self._long_help()])
276
277     def _usage(self):
278         usage = 'usage: be %s [options]' % self.name
279         num_optional = 0
280         for arg in self.args:
281             usage += ' '
282             if arg.optional == True:
283                 usage += '['
284                 num_optional += 1
285             usage += arg.metavar
286             if arg.repeatable == True:
287                 usage += ' ...'
288         usage += ']'*num_optional
289         return usage
290
291     def _option_help(self):
292         o = OptionFormatter(self)
293         return o.option_help().strip('\n')
294
295     def _long_help(self):
296         return "A detailed help message."
297
298     def complete(self, argument=None, fragment=None):
299         if argument == None:
300             ret = ['--%s' % o.name for o in self.options]
301             if len(self.args) > 0 and self.args[0].completion_callback != None:
302                 ret.extend(self.args[0].completion_callback(self, argument))
303             return ret
304         elif argument.completion_callback != None:
305             # finish a particular argument
306             return argument.completion_callback(self, argument, fragment)
307         return [] # the particular argument doesn't supply completion info
308
309     def check_restricted_access(self, storage, path):
310         """
311         Check that the file at path is inside bugdir.root.  This is
312         important if you allow other users to execute becommands with
313         your username (e.g. if you're running be-handle-mail through
314         your ~/.procmailrc).  If this check wasn't made, a user could
315         e.g.  run
316           be commit -b ~/.ssh/id_rsa "Hack to expose ssh key"
317         which would expose your ssh key to anyone who could read the
318         VCS log.
319
320         >>> class DummyStorage (object): pass
321         >>> s = DummyStorage()
322         >>> s.repo = os.path.expanduser('~/x/')
323         >>> c = Command()
324         >>> try:
325         ...     c.check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
326         ... except UserError, e:
327         ...     assert str(e).startswith('file access restricted!'), str(e)
328         ...     print 'we got the expected error'
329         we got the expected error
330         >>> c.check_restricted_access(s, os.path.expanduser('~/x'))
331         >>> c.check_restricted_access(s, os.path.expanduser('~/x/y'))
332         >>> c.restrict_file_access = False
333         >>> c.check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
334         """
335         if self.restrict_file_access == True:
336             path = os.path.abspath(path)
337             repo = os.path.abspath(storage.repo).rstrip(os.path.sep)
338             if path == repo or path.startswith(repo+os.path.sep):
339                 return
340             raise UserError('file access restricted!\n  %s not in %s'
341                             % (path, repo))