9 import libbe.ui.util.user
10 import libbe.util.encoding
11 import libbe.util.plugin
13 class UserError(Exception):
16 class UnknownCommand(UserError):
17 def __init__(self, cmd):
18 Exception.__init__(self, "Unknown command '%s'" % cmd)
22 def get_command(command_name):
23 """Retrieves the module for a user command
26 ... get_command('asdf')
27 ... except UnknownCommand, e:
29 Unknown command 'asdf'
30 >>> repr(get_command('list')).startswith("<module 'libbe.command.list' from ")
34 cmd = libbe.util.plugin.import_by_name(
35 'libbe.command.%s' % command_name.replace("-", "_"))
36 except ImportError, e:
37 raise UnknownCommand(command_name)
40 def get_command_class(module, command_name):
41 """Retrieves a command class from a module.
43 >>> import_xml_mod = get_command('import-xml')
44 >>> import_xml = get_command_class(import_xml_mod, 'import-xml')
46 "<class 'libbe.command.import_xml.Import_XML'>"
49 cname = command_name.capitalize().replace('-', '_')
50 cmd = getattr(module, cname)
51 except ImportError, e:
52 raise UnknownCommand(command_name)
56 for modname in libbe.util.plugin.modnames('libbe.command'):
57 if modname not in ['base', 'util']:
60 class CommandInput (object):
61 def __init__(self, name, help=''):
65 class Argument (CommandInput):
66 def __init__(self, metavar=None, default=None, type='string',
67 optional=False, repeatable=False,
68 completion_callback=None, *args, **kwargs):
69 CommandInput.__init__(self, *args, **kwargs)
70 self.metavar = metavar
71 self.default = default
73 self.optional = optional
74 self.repeatable = repeatable
75 self.completion_callback = completion_callback
76 if self.metavar == None:
77 self.metavar = self.name.upper()
79 class Option (CommandInput):
80 def __init__(self, callback=None, short_name=None, arg=None,
82 CommandInput.__init__(self, *args, **kwargs)
83 self.callback = callback
84 self.short_name = short_name
86 if self.arg == None and self.callback == None:
87 # use an implicit boolean argument
88 self.arg = Argument(name=self.name, help=self.help,
89 default=False, type='bool')
94 assert self.callback != None, self.name
96 assert self.callback == None, '%s: %s' (self.name, self.callback)
97 assert self.arg.name == self.name, \
98 'Name missmatch: %s != %s' % (self.arg.name, self.name)
99 assert self.arg.optional == False, self.name
100 assert self.arg.repeatable == False, self.name
103 return '--%s' % self.name
106 return '<Option %s>' % self.__str__()
108 class _DummyParser (optparse.OptionParser):
109 def __init__(self, command):
110 optparse.OptionParser.__init__(self)
111 self.remove_option('-h')
112 self.command = command
113 self._command_opts = []
114 for option in self.command.options:
115 self._add_option(option)
117 def _add_option(self, option):
118 # from libbe.ui.command_line.CmdOptionParser._add_option
120 long_opt = '--%s' % option.name
121 if option.short_name != None:
122 short_opt = '-%s' % option.short_name
123 assert '_' not in option.name, \
124 'Non-reconstructable option name %s' % option.name
125 kwargs = {'dest':option.name.replace('-', '_'),
127 if option.arg == None or option.arg.type == 'bool':
128 kwargs['action'] = 'store_true'
129 kwargs['metavar'] = None
130 kwargs['default'] = False
132 kwargs['type'] = option.arg.type
133 kwargs['action'] = 'store'
134 kwargs['metavar'] = option.arg.metavar
135 kwargs['default'] = option.arg.default
136 if option.short_name != None:
137 opt = optparse.Option(short_opt, long_opt, **kwargs)
139 opt = optparse.Option(long_opt, **kwargs)
140 #option.takes_value = lambda : option.arg != None
142 self._command_opts.append(opt)
145 class OptionFormatter (optparse.IndentedHelpFormatter):
146 def __init__(self, command):
147 optparse.IndentedHelpFormatter.__init__(self)
148 self.command = command
149 def option_help(self):
150 # based on optparse.OptionParser.format_option_help()
151 parser = _DummyParser(self.command)
152 self.store_option_strings(parser)
154 ret.append(self.format_heading('Options'))
156 for option in parser._command_opts:
157 ret.append(self.format_option(option))
160 # Drop the last '\n', or the header if no options or option groups:
161 return ''.join(ret[:-1])
163 class Command (object):
164 """One-line command description.
168 usage: be command [options]
171 -h, --help Print a help message.
173 --complete Print a list of possible completions.
175 A detailed help message.
180 def __init__(self, input_encoding=None, output_encoding=None):
183 self.ui = None # calling user-interface, e.g. for Help()
184 self.requires_bugdir = False
185 self.requires_storage = False
186 self.requires_unconnected_storage = False
187 self.restrict_file_access = True
188 self.input_encoding = None
189 self.output_encoding = None
191 Option(name='help', short_name='h',
192 help='Print a help message.',
194 Option(name='complete',
195 help='Print a list of possible completions.',
196 callback=self.complete),
200 def run(self, storage=None, bugdir=None, options=None, args=None):
206 for option in self.options:
207 assert option.name not in params, params[option.name]
208 if option.name in options:
209 params[option.name] = options.pop(option.name)
210 elif option.arg != None:
211 params[option.name] = option.arg.default
212 else: # non-arg options are flags, set to default flag value
213 params[option.name] = False
214 assert 'user-id' not in params, params['user-id']
215 if 'user-id' in options:
216 params['user-id'] = options.pop('user-id')
218 params['user-id'] = libbe.ui.util.user.get_user_id(storage)
220 raise UserError, 'Invalid option passed to command %s:\n %s' \
221 % (self.name, '\n '.join(['%s: %s' % (k,v)
222 for k,v in options.items()]))
223 in_optional_args = False
224 for i,arg in enumerate(self.args):
225 if arg.repeatable == True:
226 assert i == len(self.args)-1, arg.name
227 if in_optional_args == True:
228 assert arg.optional == True, arg.name
230 in_optional_args = arg.optional
232 if arg.repeatable == True:
233 params[arg.name] = [args[i]]
235 params[arg.name] = args[i]
236 else: # no value given
237 assert in_optional_args == True, arg.name
238 if arg.repeatable == True:
239 params[arg.name] = [arg.default]
241 params[arg.name] = arg.default
242 if len(args) > len(self.args): # add some additional repeats
243 assert self.args[-1].repeatable == True, self.args[-1].name
244 params[self.args[-1].name].extend(args[len(self.args):])
246 if params['help'] == True:
250 if params['complete'] != None:
253 params.pop('complete')
255 self._setup_io(self.input_encoding, self.output_encoding)
256 self.status = self._run(storage, bugdir, **params)
259 def _run(self, storage, bugdir, **kwargs):
262 def _setup_io(self, input_encoding=None, output_encoding=None):
263 if input_encoding == None:
264 input_encoding = libbe.util.encoding.get_input_encoding()
265 if output_encoding == None:
266 output_encoding = libbe.util.encoding.get_output_encoding()
267 self.stdin = codecs.getwriter(input_encoding)(sys.stdin)
268 self.stdin.encoding = input_encoding
269 self.stdout = codecs.getwriter(output_encoding)(sys.stdout)
270 self.stdout.encoding = output_encoding
272 def help(self, *args):
273 return '\n\n'.join([self._usage(),
278 usage = 'usage: be %s [options]' % self.name
280 for arg in self.args:
282 if arg.optional == True:
286 if arg.repeatable == True:
288 usage += ']'*num_optional
291 def _option_help(self):
292 o = OptionFormatter(self)
293 return o.option_help().strip('\n')
295 def _long_help(self):
296 return "A detailed help message."
298 def complete(self, argument=None, fragment=None):
300 ret = ['--%s' % o.name for o in self.options]
301 if len(self.args) > 0 and self.args[0].completion_callback != None:
302 ret.extend(self.args[0].completion_callback(self, argument))
304 elif argument.completion_callback != None:
305 # finish a particular argument
306 return argument.completion_callback(self, argument, fragment)
307 return [] # the particular argument doesn't supply completion info
309 def check_restricted_access(self, storage, path):
311 Check that the file at path is inside bugdir.root. This is
312 important if you allow other users to execute becommands with
313 your username (e.g. if you're running be-handle-mail through
314 your ~/.procmailrc). If this check wasn't made, a user could
316 be commit -b ~/.ssh/id_rsa "Hack to expose ssh key"
317 which would expose your ssh key to anyone who could read the
320 >>> class DummyStorage (object): pass
321 >>> s = DummyStorage()
322 >>> s.repo = os.path.expanduser('~/x/')
325 ... c.check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
326 ... except UserError, e:
327 ... assert str(e).startswith('file access restricted!'), str(e)
328 ... print 'we got the expected error'
329 we got the expected error
330 >>> c.check_restricted_access(s, os.path.expanduser('~/x'))
331 >>> c.check_restricted_access(s, os.path.expanduser('~/x/y'))
332 >>> c.restrict_file_access = False
333 >>> c.check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
335 if self.restrict_file_access == True:
336 path = os.path.abspath(path)
337 repo = os.path.abspath(storage.repo).rstrip(os.path.sep)
338 if path == repo or path.startswith(repo+os.path.sep):
340 raise UserError('file access restricted!\n %s not in %s'