1 # Copyright (C) 2009-2012 Chris Ball <cjb@laptop.org>
2 # Phil Schumm <philschumm@gmail.com>
3 # Robert Lehmann <mail@robertlehmann.de>
4 # W. Trevor King <wking@drexel.edu>
6 # This file is part of Bugs Everywhere.
8 # Bugs Everywhere is free software: you can redistribute it and/or modify it
9 # under the terms of the GNU General Public License as published by the Free
10 # Software Foundation, either version 2 of the License, or (at your option) any
13 # Bugs Everywhere is distributed in the hope that it will be useful, but
14 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
15 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
18 # You should have received a copy of the GNU General Public License along with
19 # Bugs Everywhere. If not, see <http://www.gnu.org/licenses/>.
31 import libbe.ui.util.user
32 import libbe.util.encoding
33 import libbe.util.http
34 import libbe.util.plugin
37 class UserError (Exception):
38 "An error due to improper BE usage."
42 class UsageError (UserError):
43 """A serious parsing error due to invalid BE command construction.
45 The distinction between `UserError`\s and the more specific
46 `UsageError`\s is that when displaying a `UsageError` to the user,
47 the user is pointed towards the command usage information. Use
48 the more general `UserError` if you feel that usage information
49 would not be particularly enlightening.
51 def __init__(self, command=None, command_name=None, message=None):
52 super(UsageError, self).__init__(message)
53 self.command = command
54 if command_name is None and command is not None:
55 command_name = command.name
56 self.command_name = command_name
57 self.message = message
60 class UnknownCommand (UsageError):
61 def __init__(self, command_name, message=None):
62 uc_message = "Unknown command '%s'" % command_name
66 message = '%s\n(%s)' % (uc_message, message)
67 super(UnknownCommand, self).__init__(
68 command_name=command_name,
72 def get_command(command_name):
73 """Retrieves the module for a user command
76 ... get_command('asdf')
77 ... except UnknownCommand, e:
79 Unknown command 'asdf'
80 (No module named asdf)
81 >>> repr(get_command('list')).startswith("<module 'libbe.command.list' from ")
85 cmd = libbe.util.plugin.import_by_name(
86 'libbe.command.%s' % command_name.replace("-", "_"))
87 except ImportError, e:
88 raise UnknownCommand(command_name, message=unicode(e))
91 def get_command_class(module=None, command_name=None):
92 """Retrieves a command class from a module.
94 >>> import_xml_mod = get_command('import-xml')
95 >>> import_xml = get_command_class(import_xml_mod, 'import-xml')
97 "<class 'libbe.command.import_xml.Import_XML'>"
98 >>> import_xml = get_command_class(command_name='import-xml')
100 "<class 'libbe.command.import_xml.Import_XML'>"
103 module = get_command(command_name)
105 cname = command_name.capitalize().replace('-', '_')
106 cmd = getattr(module, cname)
107 except ImportError, e:
108 raise UnknownCommand(command_name)
111 def modname_to_command_name(modname):
112 """Little hack to replicate
114 >>> def real_modname_to_command_name(modname):
115 ... mod = libbe.util.plugin.import_by_name(
116 ... 'libbe.command.%s' % modname)
117 ... attrs = [getattr(mod, name) for name in dir(mod)]
119 ... for attr_name in dir(mod):
120 ... attr = getattr(mod, attr_name)
122 ... if issubclass(attr, Command):
123 ... commands.append(attr)
124 ... except TypeError, e:
126 ... if len(commands) == 0:
127 ... raise Exception('No Command classes in %s' % dir(mod))
128 ... return commands[0].name
129 >>> real_modname_to_command_name('new')
131 >>> real_modname_to_command_name('import_xml')
134 return modname.replace('_', '-')
136 def commands(command_names=False):
137 for modname in libbe.util.plugin.modnames('libbe.command'):
138 if modname not in ['base', 'util']:
139 if command_names == False:
142 yield modname_to_command_name(modname)
144 class CommandInput (object):
145 def __init__(self, name, help=''):
150 return '<%s %s>' % (self.__class__.__name__, self.name)
153 return self.__str__()
155 class Argument (CommandInput):
156 def __init__(self, metavar=None, default=None, type='string',
157 optional=False, repeatable=False,
158 completion_callback=None, *args, **kwargs):
159 CommandInput.__init__(self, *args, **kwargs)
160 self.metavar = metavar
161 self.default = default
163 self.optional = optional
164 self.repeatable = repeatable
165 self.completion_callback = completion_callback
166 if self.metavar == None:
167 self.metavar = self.name.upper()
169 class Option (CommandInput):
170 def __init__(self, callback=None, short_name=None, arg=None,
172 CommandInput.__init__(self, *args, **kwargs)
173 self.callback = callback
174 self.short_name = short_name
176 if self.arg == None and self.callback == None:
177 # use an implicit boolean argument
178 self.arg = Argument(name=self.name, help=self.help,
179 default=False, type='bool')
184 assert self.callback != None, self.name
186 assert self.callback == None, '%s: %s' (self.name, self.callback)
187 assert self.arg.name == self.name, \
188 'Name missmatch: %s != %s' % (self.arg.name, self.name)
189 assert self.arg.optional == False, self.name
190 assert self.arg.repeatable == False, self.name
193 return '--%s' % self.name
196 return '<Option %s>' % self.__str__()
198 class _DummyParser (optparse.OptionParser):
199 def __init__(self, command):
200 optparse.OptionParser.__init__(self)
201 self.remove_option('-h')
202 self.command = command
203 self._command_opts = []
204 for option in self.command.options:
205 self._add_option(option)
207 def _add_option(self, option):
208 # from libbe.ui.command_line.CmdOptionParser._add_option
210 long_opt = '--%s' % option.name
211 if option.short_name != None:
212 short_opt = '-%s' % option.short_name
213 assert '_' not in option.name, \
214 'Non-reconstructable option name %s' % option.name
215 kwargs = {'dest':option.name.replace('-', '_'),
217 if option.arg == None or option.arg.type == 'bool':
218 kwargs['action'] = 'store_true'
219 kwargs['metavar'] = None
220 kwargs['default'] = False
222 kwargs['type'] = option.arg.type
223 kwargs['action'] = 'store'
224 kwargs['metavar'] = option.arg.metavar
225 kwargs['default'] = option.arg.default
226 if option.short_name != None:
227 opt = optparse.Option(short_opt, long_opt, **kwargs)
229 opt = optparse.Option(long_opt, **kwargs)
230 #option.takes_value = lambda : option.arg != None
232 self._command_opts.append(opt)
235 class OptionFormatter (optparse.IndentedHelpFormatter):
236 def __init__(self, command):
237 optparse.IndentedHelpFormatter.__init__(self)
238 self.command = command
239 def option_help(self):
240 # based on optparse.OptionParser.format_option_help()
241 parser = _DummyParser(self.command)
242 self.store_option_strings(parser)
244 ret.append(self.format_heading('Options'))
246 for option in parser._command_opts:
247 ret.append(self.format_option(option))
250 # Drop the last '\n', or the header if no options or option groups:
251 return ''.join(ret[:-1])
253 class Command (object):
254 """One-line command description here.
258 usage: be command [options]
261 -h, --help Print a help message.
263 --complete Print a list of possible completions.
265 A detailed help message.
267 user_agent = 'BE-HTTP-Command'
271 def __init__(self, ui=None, server=None):
272 self.ui = ui # calling user-interface
273 self.server = server # location of eventual execution
276 self.restrict_file_access = True
278 Option(name='help', short_name='h',
279 help='Print a help message.',
281 Option(name='complete',
282 help='Print a list of possible completions.',
283 callback=self.complete),
287 def run(self, options=None, args=None):
288 self.status = 1 # in case we raise an exception
289 params = self._parse_options_args(options, args)
290 if params['help'] == True:
294 if params['complete'] != None:
297 params.pop('complete')
300 self.status = self._run_remote(**params)
302 self.status = self._run(**params)
305 def _parse_options_args(self, options=None, args=None):
311 for option in self.options:
312 assert option.name not in params, params[option.name]
313 if option.name in options:
314 params[option.name] = options.pop(option.name)
315 elif option.arg != None:
316 params[option.name] = option.arg.default
317 else: # non-arg options are flags, set to default flag value
318 params[option.name] = False
319 assert 'user-id' not in params, params['user-id']
320 if 'user-id' in options:
321 self._user_id = options.pop('user-id')
323 raise UserError, 'Invalid option passed to command %s:\n %s' \
324 % (self.name, '\n '.join(['%s: %s' % (k,v)
325 for k,v in options.items()]))
326 in_optional_args = False
327 for i,arg in enumerate(self.args):
328 if arg.repeatable == True:
329 assert i == len(self.args)-1, arg.name
330 if in_optional_args == True:
331 assert arg.optional == True, arg.name
333 in_optional_args = arg.optional
335 if arg.repeatable == True:
336 params[arg.name] = [args[i]]
338 params[arg.name] = args[i]
339 else: # no value given
340 assert in_optional_args == True, arg.name
341 params[arg.name] = arg.default
342 if len(args) > len(self.args): # add some additional repeats
343 assert self.args[-1].repeatable == True, self.args[-1].name
344 params[self.args[-1].name].extend(args[len(self.args):])
347 def _run(self, **kwargs):
348 raise NotImplementedError
350 def _run_remote(self, **kwargs):
351 data = yaml.safe_dump({
352 'command': self.name,
353 'parameters': kwargs,
355 url = urlparse.urljoin(self.server, 'run')
356 page,final_url,info = libbe.util.http.get_post_url(
357 url=url, get=False, data=data, agent=self.user_agent)
358 self.stdout.write(page)
361 def help(self, *args):
362 return '\n\n'.join([self.usage(),
364 self._long_help().rstrip('\n')])
367 usage = 'usage: be %s [options]' % self.name
369 for arg in self.args:
371 if arg.optional == True:
375 if arg.repeatable == True:
377 usage += ']'*num_optional
380 def _option_help(self):
381 o = OptionFormatter(self)
382 return o.option_help().strip('\n')
384 def _long_help(self):
385 return "A detailed help message."
387 def complete(self, argument=None, fragment=None):
389 ret = ['--%s' % o.name for o in self.options
390 if o.name != 'complete']
391 if len(self.args) > 0 and self.args[0].completion_callback != None:
392 ret.extend(self.args[0].completion_callback(self, argument, fragment))
394 elif argument.completion_callback != None:
395 # finish a particular argument
396 return argument.completion_callback(self, argument, fragment)
397 return [] # the particular argument doesn't supply completion info
399 def _check_restricted_access(self, storage, path):
401 Check that the file at path is inside bugdir.root. This is
402 important if you allow other users to execute becommands with
403 your username (e.g. if you're running be-handle-mail through
404 your ~/.procmailrc). If this check wasn't made, a user could
406 be commit -b ~/.ssh/id_rsa "Hack to expose ssh key"
407 which would expose your ssh key to anyone who could read the
410 >>> class DummyStorage (object): pass
411 >>> s = DummyStorage()
412 >>> s.repo = os.path.expanduser('~/x/')
415 ... c._check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
416 ... except UserError, e:
417 ... assert str(e).startswith('file access restricted!'), str(e)
418 ... print 'we got the expected error'
419 we got the expected error
420 >>> c._check_restricted_access(s, os.path.expanduser('~/x'))
421 >>> c._check_restricted_access(s, os.path.expanduser('~/x/y'))
422 >>> c.restrict_file_access = False
423 >>> c._check_restricted_access(s, os.path.expanduser('~/.ssh/id_rsa'))
425 if self.restrict_file_access == True:
426 path = os.path.abspath(path)
427 repo = os.path.abspath(storage.repo).rstrip(os.path.sep)
428 if path == repo or path.startswith(repo+os.path.sep):
430 raise UserError('file access restricted!\n %s not in %s'
436 class InputOutput (object):
437 def __init__(self, stdin=None, stdout=None):
441 def setup_command(self, command):
442 if not hasattr(self.stdin, 'encoding'):
443 self.stdin.encoding = libbe.util.encoding.get_input_encoding()
444 if not hasattr(self.stdout, 'encoding'):
445 self.stdout.encoding = libbe.util.encoding.get_output_encoding()
446 command.stdin = self.stdin
447 command.stdin.encoding = self.stdin.encoding
448 command.stdout = self.stdout
449 command.stdout.encoding = self.stdout.encoding
454 class StdInputOutput (InputOutput):
455 def __init__(self, input_encoding=None, output_encoding=None):
456 stdin,stdout = self._get_io(input_encoding, output_encoding)
457 InputOutput.__init__(self, stdin, stdout)
459 def _get_io(self, input_encoding=None, output_encoding=None):
460 if input_encoding == None:
461 input_encoding = libbe.util.encoding.get_input_encoding()
462 if output_encoding == None:
463 output_encoding = libbe.util.encoding.get_output_encoding()
464 stdin = codecs.getreader(input_encoding)(sys.stdin)
465 stdin.encoding = input_encoding
466 stdout = codecs.getwriter(output_encoding)(sys.stdout)
467 stdout.encoding = output_encoding
468 return (stdin, stdout)
470 class StringInputOutput (InputOutput):
472 >>> s = StringInputOutput()
473 >>> s.set_stdin('hello')
478 >>> print >> s.stdout, 'goodbye'
484 Also works with unicode strings
486 >>> s.set_stdin(u'hello')
489 >>> print >> s.stdout, u'goodbye'
494 stdin = StringIO.StringIO()
495 stdin.encoding = 'utf-8'
496 stdout = StringIO.StringIO()
497 stdout.encoding = 'utf-8'
498 InputOutput.__init__(self, stdin, stdout)
500 def set_stdin(self, stdin_string):
501 self.stdin = StringIO.StringIO(stdin_string)
503 def get_stdout(self):
504 ret = self.stdout.getvalue()
505 self.stdout.truncate(size=0)
508 class UnconnectedStorageGetter (object):
509 def __init__(self, location):
510 self.location = location
513 return libbe.storage.get_storage(self.location)
515 class StorageCallbacks (object):
516 def __init__(self, location=None):
519 self.location = location
520 self._get_unconnected_storage = UnconnectedStorageGetter(location)
522 def setup_command(self, command):
523 command._get_unconnected_storage = self.get_unconnected_storage
524 command._get_storage = self.get_storage
525 command._get_bugdirs = self.get_bugdirs
527 def get_unconnected_storage(self):
529 Callback for use by commands that need it.
531 The returned Storage instance is may actually be connected,
532 but commands that make use of the returned value should only
533 make use of non-connected Storage methods. This is mainly
534 intended for the init command, which calls Storage.init().
536 if not hasattr(self, '_unconnected_storage'):
537 if self._get_unconnected_storage == None:
538 raise NotImplementedError
539 self._unconnected_storage = self._get_unconnected_storage()
540 return self._unconnected_storage
542 def set_unconnected_storage(self, unconnected_storage):
543 self._unconnected_storage = unconnected_storage
545 def get_storage(self):
546 """Callback for use by commands that need it."""
547 if not hasattr(self, '_storage'):
548 self._storage = self.get_unconnected_storage()
549 self._storage.connect()
550 version = self._storage.storage_version()
551 if version != libbe.storage.STORAGE_VERSION:
552 raise libbe.storage.InvalidStorageVersion(version)
555 def set_storage(self, storage):
556 self._storage = storage
558 def get_bugdirs(self):
559 """Callback for use by commands that need it."""
560 if not hasattr(self, '_bugdirs'):
561 storage = self.get_storage()
562 self._bugdirs = dict(
563 (uuid, libbe.bugdir.BugDir(
567 for uuid in storage.children())
570 def set_bugdirs(self, bugdirs):
571 self._bugdirs = bugdirs
574 if hasattr(self, '_storage'):
575 self._storage.disconnect()
577 class UserInterface (object):
578 def __init__(self, io=None, location=None):
580 io = StringInputOutput()
582 self.storage_callbacks = StorageCallbacks(location)
583 self.restrict_file_access = True
586 raise NotImplementedError
588 def run(self, command, options=None, args=None):
589 self.setup_command(command)
590 return command.run(options, args)
592 def setup_command(self, command):
593 if command.ui is None:
595 if self.io is not None:
596 self.io.setup_command(command)
597 if self.storage_callbacks is not None:
598 self.storage_callbacks.setup_command(command)
599 command.restrict_file_access = self.restrict_file_access
600 command._get_user_id = self._get_user_id
602 def _get_user_id(self):
603 """Callback for use by commands that need it."""
604 if not hasattr(self, '_user_id'):
605 self._user_id = libbe.ui.util.user.get_user_id(
606 self.storage_callbacks.get_storage())
610 if self.storage_callbacks is not None:
611 self.storage_callbacks.cleanup()