+# Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
+#
+# This file is part of Hooke.
+#
+# Hooke is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# Hooke is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
+# Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with Hooke. If not, see
+# <http://www.gnu.org/licenses/>.
+
"""Defines :class:`CommandLine` for driving Hooke from the command
line.
"""
+import codecs
import cmd
import optparse
import readline # including readline makes cmd.Cmd.cmdloop() smarter
import shlex
-from ..command import CommandExit, Exit, BooleanRequest, BooleanResponse, \
- Command, Argument, StoreValue
+from ..command import CommandExit, Exit, Command, Argument, StoreValue
+from ..interaction import Request, BooleanRequest, ReloadUserInterfaceConfig
from ..ui import UserInterface, CommandMessage
+from ..util.encoding import get_input_encoding, get_output_encoding
# Define a few helper classes.
self.cmd.stdout.write(msg.__class__.__name__+'\n')
self.cmd.stdout.write(str(msg).rstrip()+'\n')
break
- elif isinstance(msg, BooleanRequest):
- self._boolean_request(msg)
+ elif isinstance(msg, ReloadUserInterfaceConfig):
+ self.cmd.ui.reload_config(msg.config)
+ continue
+ elif isinstance(msg, Request):
+ self._handle_request(msg)
continue
self.cmd.stdout.write(str(msg).rstrip()+'\n')
def _parse_args(self, args):
- argv = shlex.split(args, comments=True, posix=True)
- options,args = self.parser.parse_args(argv)
+ options,args = self.parser.parse_args(args)
self._check_argument_length_bounds(args)
params = {}
for argument in self.parser.command_opts:
If not, raise optparse.OptParseError().
"""
min_args = 0
- max_args = -1
+ max_args = 0
for argument in self.parser.command_args:
if argument.optional == False and argument.count > 0:
min_args += argument.count
% (len(arguments), self.name_fn(self.command.name),
target_string))
- def _boolean_request(self, msg):
+ def _handle_request(self, msg):
+ """Repeatedly try to get a response to `msg`.
+ """
+ prompt = getattr(self, '_%s_request_prompt' % msg.type, None)
+ if prompt == None:
+ raise NotImplementedError('_%s_request_prompt' % msg.type)
+ prompt_string = prompt(msg)
+ parser = getattr(self, '_%s_request_parser' % msg.type, None)
+ if parser == None:
+ raise NotImplementedError('_%s_request_parser' % msg.type)
+ error = None
+ while True:
+ if error != None:
+ self.cmd.stdout.write(''.join([
+ error.__class__.__name__, ': ', str(error), '\n']))
+ self.cmd.stdout.write(prompt_string)
+ value = parser(msg, self.cmd.stdin.readline())
+ try:
+ response = msg.response(value)
+ break
+ except ValueError, error:
+ continue
+ self.cmd.inqueue.put(response)
+
+ def _boolean_request_prompt(self, msg):
if msg.default == True:
yn = ' [Y/n] '
else:
yn = ' [y/N] '
- self.cmd.stdout.write(msg.msg+yn)
- response = self.cmd.stdin.readline().strip().lower()
- if response.startswith('y'):
- self.cmd.inqueue.put(BooleanResponse(True))
- elif response.startswith('n'):
- self.cmd.inqueue.put(BooleanResponse(False))
+ return msg.msg + yn
+
+ def _boolean_request_parser(self, msg, response):
+ value = response.strip().lower()
+ if value.startswith('y'):
+ value = True
+ elif value.startswith('n'):
+ value = False
+ elif len(value) == 0:
+ value = msg.default
+ return value
+
+ def _string_request_prompt(self, msg):
+ if msg.default == None:
+ d = ' '
+ else:
+ d = ' [%s] ' % msg.default
+ return msg.msg + d
+
+ def _string_request_parser(self, msg, response):
+ return response.strip()
+
+ def _float_request_prompt(self, msg):
+ return self._string_request_prompt(msg)
+
+ def _float_request_parser(self, msg, resposne):
+ return float(response)
+
+ def _selection_request_prompt(self, msg):
+ options = []
+ for i,option in enumerate(msg.options):
+ options.append(' %d) %s' % (i,option))
+ options = ''.join(options)
+ if msg.default == None:
+ prompt = '? '
else:
- self.cmd.inqueue.put(BooleanResponse(msg.default))
+ prompt = '? [%d] ' % msg.default
+ return '\n'.join([msg,options,prompt])
+
+ def _selection_request_parser(self, msg, response):
+ return int(response)
+
class HelpCommand (CommandMethod):
def __init__(self, *args, **kwargs):
def __call__(self):
blocks = [self.command.help(name_fn=self.name_fn),
- '------',
+ '----',
'Usage: ' + self._usage_string(),
'']
self.cmd.stdout.write('\n'.join(blocks))
name='exit', aliases=['quit', 'EOF'], help=self.__doc__,
arguments = [
Argument(name='force', type='bool', default=False,
- callback=StoreValue(True), help="""
+ help="""
Exit without prompting the user. Use if you save often or don't make
typing mistakes ;).
""".strip()),
"""
_exit = True
if params['force'] == False:
- # TODO: get results of hooke.playlists.current().is_saved()
- is_saved = True
+ not_saved = [p.name for p in hooke.playlists
+ if p.is_saved() == False]
msg = 'Exit?'
default = True
- if is_saved == False:
- msg = 'You did not save your playlist. ' + msg
+ if len(not_saved) > 0:
+ msg = 'Unsaved playlists (%s). %s' \
+ % (', '.join([str(p) for p in not_saved]), msg)
default = False
outqueue.put(BooleanRequest(msg, default))
result = inqueue.get()
- assert isinstance(result, BooleanResponse)
+ assert result.type == 'boolean'
_exit = result.value
if _exit == True:
raise Exit()
# Now onto the main attraction.
class HookeCmd (cmd.Cmd):
- def __init__(self, commands, inqueue, outqueue):
+ def __init__(self, ui, commands, inqueue, outqueue):
cmd.Cmd.__init__(self)
+ self.ui = ui
self.commands = commands
self.local_commands = [LocalExitCommand(), LocalHelpCommand()]
self.prompt = 'hooke> '
setattr(self.__class__, 'complete_%s' % name,
CompleteCommand(self, command, self._name_fn))
+ def parseline(self, line):
+ """Override Cmd.parseline to use shlex.split.
+
+ Notes
+ -----
+ This allows us to handle comments cleanly. With the default
+ Cmd implementation, a pure comment line will call the .default
+ error message.
+
+ Since we use shlex to strip comments, we return a list of
+ split arguments rather than the raw argument string.
+ """
+ line = line.strip()
+ argv = shlex.split(line, comments=True, posix=True)
+ if len(argv) == 0:
+ return None, None, '' # return an empty line
+ elif argv[0] == '?':
+ argv[0] = 'help'
+ elif argv[0] == '!':
+ argv[0] = 'system'
+ return argv[0], argv[1:], line
+
+ def do_help(self, arg):
+ """Wrap Cmd.do_help to handle our .parseline argument list.
+ """
+ if len(arg) == 0:
+ return cmd.Cmd.do_help(self, '')
+ return cmd.Cmd.do_help(self, arg[0])
+
+ def empytline(self):
+ """Override Cmd.emptyline to not do anything.
+
+ Repeating the last non-empty command seems unwise. Explicit
+ is better than implicit.
+ """
+ pass
class CommandLine (UserInterface):
"""Command line interface. Simple and powerful.
def __init__(self):
super(CommandLine, self).__init__(name='command line')
- def run(self, commands, ui_to_command_queue, command_to_ui_queue):
- cmd = HookeCmd(commands,
+ def _cmd(self, commands, ui_to_command_queue, command_to_ui_queue):
+ cmd = HookeCmd(self, commands,
inqueue=ui_to_command_queue,
outqueue=command_to_ui_queue)
- cmd.cmdloop(self._splash_text())
+ #cmd.stdin = codecs.getreader(get_input_encoding())(cmd.stdin)
+ cmd.stdout = codecs.getwriter(get_output_encoding())(cmd.stdout)
+ return cmd
+
+ def run(self, commands, ui_to_command_queue, command_to_ui_queue):
+ cmd = self._cmd(commands, ui_to_command_queue, command_to_ui_queue)
+ cmd.cmdloop(self._splash_text(extra_info={
+ 'get-details':'run `license`',
+ }))
+
+ def run_lines(self, commands, ui_to_command_queue, command_to_ui_queue,
+ lines):
+ cmd = self._cmd(commands, ui_to_command_queue, command_to_ui_queue)
+ for line in lines:
+ cmd.onecmd(line)