import readline # including readline makes cmd.Cmd.cmdloop() smarter
import shlex
-from ..command import CommandExit, Exit, BooleanRequest, BooleanResponse, \
- Command, Argument, StoreValue
+from ..command import CommandExit, Exit, Command, Argument, StoreValue
+from ..interaction import Request, BooleanRequest
from ..ui import UserInterface, CommandMessage
self.command_opts.append(a)
else:
self.command_args.append(a)
+ infinite_counters = [a for a in self.command_args if a.count == -1]
+ assert len(infinite_counters) <= 1, \
+ 'Multiple infinite counts for %s: %s\nNeed a better CommandLineParser implementation.' \
+ % (command.name, ', '.join([a.name for a in infinite_counters]))
+ if len(infinite_counters) == 1: # move the big counter to the end.
+ infinite_counter = infinite_counters[0]
+ self.command_args.remove(infinite_counter)
+ self.command_args.append(infinite_counter)
def exit(self, status=0, msg=None):
"""Override :meth:`optparse.OptionParser.exit` which calls
self.cmd.stdout.write(msg.__class__.__name__+'\n')
self.cmd.stdout.write(str(msg).rstrip()+'\n')
break
- elif isinstance(msg, BooleanRequest):
- self._boolean_request(msg)
+ elif isinstance(msg, Request):
+ self._handle_request(msg)
continue
self.cmd.stdout.write(str(msg).rstrip()+'\n')
def _parse_args(self, args):
argv = shlex.split(args, comments=True, posix=True)
options,args = self.parser.parse_args(argv)
- if len(args) != len(self.parser.command_args):
- raise optparse.OptParseError('%d arguments given, but %s takes %d'
- % (len(args),
- self.name_fn(self.command.name),
- len(self.parser.command_args)))
+ self._check_argument_length_bounds(args)
params = {}
for argument in self.parser.command_opts:
value = getattr(options, self.name_fn(argument.name))
if value != Default:
params[argument.name] = value
- for i,argument in enumerate(self.parser.command_args):
- params[argument.name] = args[i]
+ arg_index = 0
+ for argument in self.parser.command_args:
+ if argument.count == 1:
+ params[argument.name] = args[arg_index]
+ elif argument.count > 1:
+ params[argument.name] = \
+ args[arg_index:arg_index+argument.count]
+ else: # argument.count == -1:
+ params[argument.name] = args[arg_index:]
+ arg_index += argument.count
return params
- def _boolean_request(self, msg):
+ def _check_argument_length_bounds(self, arguments):
+ """Check that there are an appropriate number of arguments in
+ `args`.
+
+ If not, raise optparse.OptParseError().
+ """
+ min_args = 0
+ max_args = -1
+ for argument in self.parser.command_args:
+ if argument.optional == False and argument.count > 0:
+ min_args += argument.count
+ if max_args >= 0: # otherwise already infinite
+ if argument.count == -1:
+ max_args = -1
+ else:
+ max_args += argument.count
+ if len(arguments) < min_args \
+ or (max_args >= 0 and len(arguments) > max_args):
+ if min_args == max_args:
+ target_string = str(min_args)
+ elif max_args == -1:
+ target_string = 'more than %d' % min_args
+ else:
+ target_string = '%d to %d' % (min_args, max_args)
+ raise optparse.OptParseError(
+ '%d arguments given, but %s takes %s'
+ % (len(arguments), self.name_fn(self.command.name),
+ target_string))
+
+ def _handle_request(self, msg):
+ """Repeatedly try to get a response to `msg`.
+ """
+ prompt = getattr(self, '_%s_request_prompt' % msg.type, None)
+ if prompt == None:
+ raise NotImplementedError('_%s_request_prompt' % msg.type)
+ prompt_string = prompt(msg)
+ parser = getattr(self, '_%s_request_parser' % msg.type, None)
+ if parser == None:
+ raise NotImplementedError('_%s_request_parser' % msg.type)
+ error = None
+ while True:
+ if error != None:
+ self.cmd.stdout.write(''.join([
+ error.__class__.__name__, ': ', str(error), '\n']))
+ self.cmd.stdout.write(prompt_string)
+ value = parser(msg, self.cmd.stdin.readline())
+ try:
+ response = msg.response(value)
+ break
+ except ValueError, error:
+ continue
+ self.cmd.inqueue.put(response)
+
+ def _boolean_request_prompt(self, msg):
if msg.default == True:
yn = ' [Y/n] '
else:
yn = ' [y/N] '
- self.cmd.stdout.write(msg.msg+yn)
- response = self.cmd.stdin.readline().strip().lower()
- if response.startswith('y'):
- self.cmd.inqueue.put(BooleanResponse(True))
- elif response.startswith('n'):
- self.cmd.inqueue.put(BooleanResponse(False))
+ return msg.msg + yn
+
+ def _boolean_request_parser(self, msg, response):
+ value = response.strip().lower()
+ if value.startswith('y'):
+ value = True
+ elif value.startswith('n'):
+ value = False
+ elif len(value) == 0:
+ value = msg.default
+ return value
+
+ def _string_request_prompt(self, msg):
+ if msg.default == None:
+ d = ' '
else:
- self.cmd.inqueue.put(BooleanResponse(msg.default))
+ d = ' [%s] ' % msg.default
+ return msg.msg + d
+
+ def _string_request_parser(self, msg, response):
+ return response.strip()
+
+ def _float_request_prompt(self, msg):
+ return self._string_request_prompt(msg)
+
+ def _float_request_parser(self, msg, resposne):
+ return float(response)
+
+ def _selection_request_prompt(self, msg):
+ options = []
+ for i,option in enumerate(msg.options):
+ options.append(' %d) %s' % (i,option))
+ options = ''.join(options)
+ if msg.default == None:
+ prompt = '? '
+ else:
+ prompt = '? [%d] ' % msg.default
+ return '\n'.join([msg,options,prompt])
+
+ def _selection_request_parser(self, msg, response):
+ return int(response)
+
class HelpCommand (CommandMethod):
def __init__(self, *args, **kwargs):
def __call__(self):
blocks = [self.command.help(name_fn=self.name_fn),
- '------',
+ '----',
'Usage: ' + self._usage_string(),
'']
self.cmd.stdout.write('\n'.join(blocks))
name='exit', aliases=['quit', 'EOF'], help=self.__doc__,
arguments = [
Argument(name='force', type='bool', default=False,
- callback=StoreValue(True), help="""
+ help="""
Exit without prompting the user. Use if you save often or don't make
typing mistakes ;).
""".strip()),
default = False
outqueue.put(BooleanRequest(msg, default))
result = inqueue.get()
- assert isinstance(result, BooleanResponse)
+ assert result.type == 'boolean'
_exit = result.value
if _exit == True:
raise Exit()