1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This file is part of Hooke.
5 # Hooke is free software: you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation, either
8 # version 3 of the License, or (at your option) any later version.
10 # Hooke is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke. If not, see
17 # <http://www.gnu.org/licenses/>.
19 """Defines :class:`CommandLine` for driving Hooke from the command
26 import readline # including readline makes cmd.Cmd.cmdloop() smarter
29 from ..command import CommandExit, Exit, Command, Argument, StoreValue
30 from ..interaction import Request, BooleanRequest, ReloadUserInterfaceConfig
31 from ..ui import UserInterface, CommandMessage
32 from ..util.encoding import get_input_encoding, get_output_encoding
35 # Define a few helper classes.
37 class Default (object):
38 """Marker for options not given on the command line.
42 class CommandLineParser (optparse.OptionParser):
43 """Implement a command line syntax for a
44 :class:`hooke.command.Command`.
46 def __init__(self, command, name_fn):
47 optparse.OptionParser.__init__(self, prog=name_fn(command.name))
48 self.command = command
49 self.command_opts = []
50 self.command_args = []
51 for a in command.arguments:
53 continue # 'help' is a default OptionParser option
54 if a.optional == True:
55 name = name_fn(a.name)
57 '--%s' % name, dest=name, default=Default)
58 self.command_opts.append(a)
60 self.command_args.append(a)
61 infinite_counters = [a for a in self.command_args if a.count == -1]
62 assert len(infinite_counters) <= 1, \
63 'Multiple infinite counts for %s: %s\nNeed a better CommandLineParser implementation.' \
64 % (command.name, ', '.join([a.name for a in infinite_counters]))
65 if len(infinite_counters) == 1: # move the big counter to the end.
66 infinite_counter = infinite_counters[0]
67 self.command_args.remove(infinite_counter)
68 self.command_args.append(infinite_counter)
70 def exit(self, status=0, msg=None):
71 """Override :meth:`optparse.OptionParser.exit` which calls
75 raise optparse.OptParseError(msg)
76 raise optparse.OptParseError('OptParse EXIT')
78 class CommandMethod (object):
79 """Base class for method replacer.
81 The .__call__ methods of `CommandMethod` subclasses functions will
82 provide the `do_*`, `help_*`, and `complete_*` methods of
85 def __init__(self, cmd, command, name_fn):
87 self.command = command
88 self.name_fn = name_fn
90 def __call__(self, *args, **kwargs):
91 raise NotImplementedError
93 class DoCommand (CommandMethod):
94 def __init__(self, *args, **kwargs):
95 super(DoCommand, self).__init__(*args, **kwargs)
96 self.parser = CommandLineParser(self.command, self.name_fn)
98 def __call__(self, args):
100 args = self._parse_args(args)
101 except optparse.OptParseError, e:
102 self.cmd.stdout.write(str(e).lstrip()+'\n')
103 self.cmd.stdout.write('Failure\n')
105 self.cmd.inqueue.put(CommandMessage(self.command, args))
107 msg = self.cmd.outqueue.get()
108 if isinstance(msg, Exit):
110 elif isinstance(msg, CommandExit):
111 self.cmd.stdout.write(msg.__class__.__name__+'\n')
112 self.cmd.stdout.write(str(msg).rstrip()+'\n')
114 elif isinstance(msg, ReloadUserInterfaceConfig):
115 self.cmd.ui.reload_config(msg.config)
117 elif isinstance(msg, Request):
118 self._handle_request(msg)
120 self.cmd.stdout.write(str(msg).rstrip()+'\n')
122 def _parse_args(self, args):
123 argv = shlex.split(args, comments=True, posix=True)
124 options,args = self.parser.parse_args(argv)
125 self._check_argument_length_bounds(args)
127 for argument in self.parser.command_opts:
128 value = getattr(options, self.name_fn(argument.name))
130 params[argument.name] = value
132 for argument in self.parser.command_args:
133 if argument.count == 1:
134 params[argument.name] = args[arg_index]
135 elif argument.count > 1:
136 params[argument.name] = \
137 args[arg_index:arg_index+argument.count]
138 else: # argument.count == -1:
139 params[argument.name] = args[arg_index:]
140 arg_index += argument.count
143 def _check_argument_length_bounds(self, arguments):
144 """Check that there are an appropriate number of arguments in
147 If not, raise optparse.OptParseError().
151 for argument in self.parser.command_args:
152 if argument.optional == False and argument.count > 0:
153 min_args += argument.count
154 if max_args >= 0: # otherwise already infinite
155 if argument.count == -1:
158 max_args += argument.count
159 if len(arguments) < min_args \
160 or (max_args >= 0 and len(arguments) > max_args):
161 if min_args == max_args:
162 target_string = str(min_args)
164 target_string = 'more than %d' % min_args
166 target_string = '%d to %d' % (min_args, max_args)
167 raise optparse.OptParseError(
168 '%d arguments given, but %s takes %s'
169 % (len(arguments), self.name_fn(self.command.name),
172 def _handle_request(self, msg):
173 """Repeatedly try to get a response to `msg`.
175 prompt = getattr(self, '_%s_request_prompt' % msg.type, None)
177 raise NotImplementedError('_%s_request_prompt' % msg.type)
178 prompt_string = prompt(msg)
179 parser = getattr(self, '_%s_request_parser' % msg.type, None)
181 raise NotImplementedError('_%s_request_parser' % msg.type)
185 self.cmd.stdout.write(''.join([
186 error.__class__.__name__, ': ', str(error), '\n']))
187 self.cmd.stdout.write(prompt_string)
188 value = parser(msg, self.cmd.stdin.readline())
190 response = msg.response(value)
192 except ValueError, error:
194 self.cmd.inqueue.put(response)
196 def _boolean_request_prompt(self, msg):
197 if msg.default == True:
203 def _boolean_request_parser(self, msg, response):
204 value = response.strip().lower()
205 if value.startswith('y'):
207 elif value.startswith('n'):
209 elif len(value) == 0:
213 def _string_request_prompt(self, msg):
214 if msg.default == None:
217 d = ' [%s] ' % msg.default
220 def _string_request_parser(self, msg, response):
221 return response.strip()
223 def _float_request_prompt(self, msg):
224 return self._string_request_prompt(msg)
226 def _float_request_parser(self, msg, resposne):
227 return float(response)
229 def _selection_request_prompt(self, msg):
231 for i,option in enumerate(msg.options):
232 options.append(' %d) %s' % (i,option))
233 options = ''.join(options)
234 if msg.default == None:
237 prompt = '? [%d] ' % msg.default
238 return '\n'.join([msg,options,prompt])
240 def _selection_request_parser(self, msg, response):
244 class HelpCommand (CommandMethod):
245 def __init__(self, *args, **kwargs):
246 super(HelpCommand, self).__init__(*args, **kwargs)
247 self.parser = CommandLineParser(self.command, self.name_fn)
250 blocks = [self.command.help(name_fn=self.name_fn),
252 'Usage: ' + self._usage_string(),
254 self.cmd.stdout.write('\n'.join(blocks))
257 return self.command.help(name_fn=self.name_fn)
259 def _usage_string(self):
260 if len(self.parser.command_opts) == 0:
263 options_string = '[options]'
264 arg_string = ' '.join(
265 [self.name_fn(arg.name) for arg in self.parser.command_args])
266 return ' '.join([x for x in [self.parser.prog,
271 class CompleteCommand (CommandMethod):
272 def __call__(self, text, line, begidx, endidx):
276 # Define some additional commands
278 class LocalHelpCommand (Command):
279 """Called with an argument, prints that command's documentation.
281 With no argument, lists all available help topics as well as any
282 undocumented commands.
285 super(LocalHelpCommand, self).__init__(name='help', help=self.__doc__)
286 # We set .arguments now (vs. using th arguments option to __init__),
287 # to overwrite the default help argument. We don't override
288 # :meth:`cmd.Cmd.do_help`, so `help --help` is not a valid command.
290 Argument(name='command', type='string', optional=True,
291 help='The name of the command you want help with.')
294 def _run(self, hooke, inqueue, outqueue, params):
295 raise NotImplementedError # cmd.Cmd already implements .do_help()
297 class LocalExitCommand (Command):
298 """Exit Hooke cleanly.
301 super(LocalExitCommand, self).__init__(
302 name='exit', aliases=['quit', 'EOF'], help=self.__doc__,
304 Argument(name='force', type='bool', default=False,
306 Exit without prompting the user. Use if you save often or don't make
311 def _run(self, hooke, inqueue, outqueue, params):
312 """The guts of the `do_exit/_quit/_EOF` commands.
314 A `True` return stops :meth:`.cmdloop` execution.
317 if params['force'] == False:
318 not_saved = [p.name for p in hooke.playlists
319 if p.is_saved() == False]
322 if len(not_saved) > 0:
323 msg = 'Unsaved playlists (%s). %s' \
324 % (', '.join([str(p) for p in not_saved]), msg)
326 outqueue.put(BooleanRequest(msg, default))
327 result = inqueue.get()
328 assert result.type == 'boolean'
334 # Now onto the main attraction.
336 class HookeCmd (cmd.Cmd):
337 def __init__(self, ui, commands, inqueue, outqueue):
338 cmd.Cmd.__init__(self)
340 self.commands = commands
341 self.local_commands = [LocalExitCommand(), LocalHelpCommand()]
342 self.prompt = 'hooke> '
343 self._add_command_methods()
344 self.inqueue = inqueue
345 self.outqueue = outqueue
347 def _name_fn(self, name):
348 return name.replace(' ', '_')
350 def _add_command_methods(self):
351 for command in self.commands + self.local_commands:
352 for name in [command.name] + command.aliases:
353 name = self._name_fn(name)
354 setattr(self.__class__, 'help_%s' % name,
355 HelpCommand(self, command, self._name_fn))
357 setattr(self.__class__, 'do_%s' % name,
358 DoCommand(self, command, self._name_fn))
359 setattr(self.__class__, 'complete_%s' % name,
360 CompleteCommand(self, command, self._name_fn))
363 class CommandLine (UserInterface):
364 """Command line interface. Simple and powerful.
367 super(CommandLine, self).__init__(name='command line')
369 def _cmd(self, commands, ui_to_command_queue, command_to_ui_queue):
370 cmd = HookeCmd(self, commands,
371 inqueue=ui_to_command_queue,
372 outqueue=command_to_ui_queue)
373 #cmd.stdin = codecs.getreader(get_input_encoding())(cmd.stdin)
374 cmd.stdout = codecs.getwriter(get_output_encoding())(cmd.stdout)
377 def run(self, commands, ui_to_command_queue, command_to_ui_queue):
378 cmd = self._cmd(commands, ui_to_command_queue, command_to_ui_queue)
379 cmd.cmdloop(self._splash_text())
381 def run_lines(self, commands, ui_to_command_queue, command_to_ui_queue,
383 cmd = self._cmd(commands, ui_to_command_queue, command_to_ui_queue)