OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # |
| 3 # Copyright 2015 Google Inc. |
| 4 # |
| 5 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 # you may not use this file except in compliance with the License. |
| 7 # You may obtain a copy of the License at |
| 8 # |
| 9 # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 # |
| 11 # Unless required by applicable law or agreed to in writing, software |
| 12 # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 # See the License for the specific language governing permissions and |
| 15 # limitations under the License. |
| 16 |
| 17 """Appcommands-compatible command class with extra fixins.""" |
| 18 from __future__ import print_function |
| 19 |
| 20 import cmd |
| 21 import inspect |
| 22 import pdb |
| 23 import shlex |
| 24 import sys |
| 25 import traceback |
| 26 import types |
| 27 |
| 28 import six |
| 29 |
| 30 from google.apputils import app |
| 31 from google.apputils import appcommands |
| 32 import gflags as flags |
| 33 |
| 34 __all__ = [ |
| 35 'NewCmd', |
| 36 'Repl', |
| 37 ] |
| 38 |
| 39 flags.DEFINE_boolean( |
| 40 'debug_mode', False, |
| 41 'Show tracebacks on Python exceptions.') |
| 42 flags.DEFINE_boolean( |
| 43 'headless', False, |
| 44 'Assume no user is at the controlling console.') |
| 45 FLAGS = flags.FLAGS |
| 46 |
| 47 |
| 48 def _SafeMakeAscii(s): |
| 49 if isinstance(s, six.text_type): |
| 50 return s.encode('ascii') |
| 51 elif isinstance(s, str): |
| 52 return s.decode('ascii') |
| 53 else: |
| 54 return six.text_type(s).encode('ascii', 'backslashreplace') |
| 55 |
| 56 |
| 57 class NewCmd(appcommands.Cmd): |
| 58 |
| 59 """Featureful extension of appcommands.Cmd.""" |
| 60 |
| 61 def __init__(self, name, flag_values): |
| 62 super(NewCmd, self).__init__(name, flag_values) |
| 63 run_with_args = getattr(self, 'RunWithArgs', None) |
| 64 self._new_style = isinstance(run_with_args, types.MethodType) |
| 65 if self._new_style: |
| 66 func = run_with_args.__func__ |
| 67 |
| 68 argspec = inspect.getargspec(func) |
| 69 if argspec.args and argspec.args[0] == 'self': |
| 70 argspec = argspec._replace( # pylint: disable=protected-access |
| 71 args=argspec.args[1:]) |
| 72 self._argspec = argspec |
| 73 # TODO(craigcitro): Do we really want to support all this |
| 74 # nonsense? |
| 75 self._star_args = self._argspec.varargs is not None |
| 76 self._star_kwds = self._argspec.keywords is not None |
| 77 self._max_args = len(self._argspec.args or ()) |
| 78 self._min_args = self._max_args - len(self._argspec.defaults or ()) |
| 79 if self._star_args: |
| 80 self._max_args = sys.maxsize |
| 81 |
| 82 self._debug_mode = FLAGS.debug_mode |
| 83 self.surface_in_shell = True |
| 84 self.__doc__ = self.RunWithArgs.__doc__ |
| 85 |
| 86 def __getattr__(self, name): |
| 87 if name in self._command_flags: |
| 88 return self._command_flags[name].value |
| 89 return super(NewCmd, self).__getattribute__(name) |
| 90 |
| 91 def _GetFlag(self, flagname): |
| 92 if flagname in self._command_flags: |
| 93 return self._command_flags[flagname] |
| 94 else: |
| 95 return None |
| 96 |
| 97 def Run(self, argv): |
| 98 """Run this command. |
| 99 |
| 100 If self is a new-style command, we set up arguments and call |
| 101 self.RunWithArgs, gracefully handling exceptions. If not, we |
| 102 simply call self.Run(argv). |
| 103 |
| 104 Args: |
| 105 argv: List of arguments as strings. |
| 106 |
| 107 Returns: |
| 108 0 on success, nonzero on failure. |
| 109 """ |
| 110 if not self._new_style: |
| 111 return super(NewCmd, self).Run(argv) |
| 112 |
| 113 # TODO(craigcitro): We need to save and restore flags each time so |
| 114 # that we can per-command flags in the REPL. |
| 115 args = argv[1:] |
| 116 fail = None |
| 117 fail_template = '%s positional args, found %d, expected at %s %d' |
| 118 if len(args) < self._min_args: |
| 119 fail = fail_template % ('Not enough', len(args), |
| 120 'least', self._min_args) |
| 121 if len(args) > self._max_args: |
| 122 fail = fail_template % ('Too many', len(args), |
| 123 'most', self._max_args) |
| 124 if fail: |
| 125 print(fail) |
| 126 if self.usage: |
| 127 print('Usage: %s' % (self.usage,)) |
| 128 return 1 |
| 129 |
| 130 if self._debug_mode: |
| 131 return self.RunDebug(args, {}) |
| 132 else: |
| 133 return self.RunSafely(args, {}) |
| 134 |
| 135 def RunCmdLoop(self, argv): |
| 136 """Hook for use in cmd.Cmd-based command shells.""" |
| 137 try: |
| 138 args = shlex.split(argv) |
| 139 except ValueError as e: |
| 140 raise SyntaxError(self.EncodeForPrinting(e)) |
| 141 return self.Run([self._command_name] + args) |
| 142 |
| 143 @staticmethod |
| 144 def EncodeForPrinting(s): |
| 145 """Safely encode a string as the encoding for sys.stdout.""" |
| 146 encoding = sys.stdout.encoding or 'ascii' |
| 147 return six.text_type(s).encode(encoding, 'backslashreplace') |
| 148 |
| 149 def _FormatError(self, e): |
| 150 """Hook for subclasses to modify how error messages are printed.""" |
| 151 return _SafeMakeAscii(e) |
| 152 |
| 153 def _HandleError(self, e): |
| 154 message = self._FormatError(e) |
| 155 print('Exception raised in %s operation: %s' % ( |
| 156 self._command_name, message)) |
| 157 return 1 |
| 158 |
| 159 def _IsDebuggableException(self, e): |
| 160 """Hook for subclasses to skip debugging on certain exceptions.""" |
| 161 return not isinstance(e, app.UsageError) |
| 162 |
| 163 def RunDebug(self, args, kwds): |
| 164 """Run this command in debug mode.""" |
| 165 try: |
| 166 return_value = self.RunWithArgs(*args, **kwds) |
| 167 except BaseException as e: |
| 168 # Don't break into the debugger for expected exceptions. |
| 169 if not self._IsDebuggableException(e): |
| 170 return self._HandleError(e) |
| 171 print() |
| 172 print('****************************************************') |
| 173 print('** Unexpected Exception raised in execution! **') |
| 174 if FLAGS.headless: |
| 175 print('** --headless mode enabled, exiting. **') |
| 176 print('** See STDERR for traceback. **') |
| 177 else: |
| 178 print('** --debug_mode enabled, starting pdb. **') |
| 179 print('****************************************************') |
| 180 print() |
| 181 traceback.print_exc() |
| 182 print() |
| 183 if not FLAGS.headless: |
| 184 pdb.post_mortem() |
| 185 return 1 |
| 186 return return_value |
| 187 |
| 188 def RunSafely(self, args, kwds): |
| 189 """Run this command, turning exceptions into print statements.""" |
| 190 try: |
| 191 return_value = self.RunWithArgs(*args, **kwds) |
| 192 except BaseException as e: |
| 193 return self._HandleError(e) |
| 194 return return_value |
| 195 |
| 196 |
| 197 class CommandLoop(cmd.Cmd): |
| 198 |
| 199 """Instance of cmd.Cmd built to work with NewCmd.""" |
| 200 |
| 201 class TerminateSignal(Exception): |
| 202 |
| 203 """Exception type used for signaling loop completion.""" |
| 204 |
| 205 def __init__(self, commands, prompt): |
| 206 cmd.Cmd.__init__(self) |
| 207 self._commands = {'help': commands['help']} |
| 208 self._special_command_names = ['help', 'repl', 'EOF'] |
| 209 for name, command in commands.items(): |
| 210 if (name not in self._special_command_names and |
| 211 isinstance(command, NewCmd) and |
| 212 command.surface_in_shell): |
| 213 self._commands[name] = command |
| 214 setattr(self, 'do_%s' % (name,), command.RunCmdLoop) |
| 215 self._default_prompt = prompt |
| 216 self._set_prompt() |
| 217 self._last_return_code = 0 |
| 218 |
| 219 @property |
| 220 def last_return_code(self): |
| 221 return self._last_return_code |
| 222 |
| 223 def _set_prompt(self): |
| 224 self.prompt = self._default_prompt |
| 225 |
| 226 def do_EOF(self, *unused_args): # pylint: disable=invalid-name |
| 227 """Terminate the running command loop. |
| 228 |
| 229 This function raises an exception to avoid the need to do |
| 230 potentially-error-prone string parsing inside onecmd. |
| 231 |
| 232 Args: |
| 233 *unused_args: unused. |
| 234 |
| 235 Returns: |
| 236 Never returns. |
| 237 |
| 238 Raises: |
| 239 CommandLoop.TerminateSignal: always. |
| 240 """ |
| 241 raise CommandLoop.TerminateSignal() |
| 242 |
| 243 def postloop(self): |
| 244 print('Goodbye.') |
| 245 |
| 246 # pylint: disable=arguments-differ |
| 247 def completedefault(self, unused_text, line, unused_begidx, unused_endidx): |
| 248 if not line: |
| 249 return [] |
| 250 else: |
| 251 command_name = line.partition(' ')[0].lower() |
| 252 usage = '' |
| 253 if command_name in self._commands: |
| 254 usage = self._commands[command_name].usage |
| 255 if usage: |
| 256 print() |
| 257 print(usage) |
| 258 print('%s%s' % (self.prompt, line), end=' ') |
| 259 return [] |
| 260 # pylint: enable=arguments-differ |
| 261 |
| 262 def emptyline(self): |
| 263 print('Available commands:', end=' ') |
| 264 print(' '.join(list(self._commands))) |
| 265 |
| 266 def precmd(self, line): |
| 267 """Preprocess the shell input.""" |
| 268 if line == 'EOF': |
| 269 return line |
| 270 if line.startswith('exit') or line.startswith('quit'): |
| 271 return 'EOF' |
| 272 words = line.strip().split() |
| 273 if len(words) == 1 and words[0] not in ['help', 'ls', 'version']: |
| 274 return 'help %s' % (line.strip(),) |
| 275 return line |
| 276 |
| 277 def onecmd(self, line): |
| 278 """Process a single command. |
| 279 |
| 280 Runs a single command, and stores the return code in |
| 281 self._last_return_code. Always returns False unless the command |
| 282 was EOF. |
| 283 |
| 284 Args: |
| 285 line: (str) Command line to process. |
| 286 |
| 287 Returns: |
| 288 A bool signaling whether or not the command loop should terminate. |
| 289 """ |
| 290 try: |
| 291 self._last_return_code = cmd.Cmd.onecmd(self, line) |
| 292 except CommandLoop.TerminateSignal: |
| 293 return True |
| 294 except BaseException as e: |
| 295 name = line.split(' ')[0] |
| 296 print('Error running %s:' % name) |
| 297 print(e) |
| 298 self._last_return_code = 1 |
| 299 return False |
| 300 |
| 301 def get_names(self): |
| 302 names = dir(self) |
| 303 commands = (name for name in self._commands |
| 304 if name not in self._special_command_names) |
| 305 names.extend('do_%s' % (name,) for name in commands) |
| 306 names.remove('do_EOF') |
| 307 return names |
| 308 |
| 309 def do_help(self, command_name): |
| 310 """Print the help for command_name (if present) or general help.""" |
| 311 |
| 312 # TODO(craigcitro): Add command-specific flags. |
| 313 def FormatOneCmd(name, command, command_names): |
| 314 indent_size = appcommands.GetMaxCommandLength() + 3 |
| 315 if len(command_names) > 1: |
| 316 indent = ' ' * indent_size |
| 317 command_help = flags.TextWrap( |
| 318 command.CommandGetHelp('', cmd_names=command_names), |
| 319 indent=indent, |
| 320 firstline_indent='') |
| 321 first_help_line, _, rest = command_help.partition('\n') |
| 322 first_line = '%-*s%s' % (indent_size, |
| 323 name + ':', first_help_line) |
| 324 return '\n'.join((first_line, rest)) |
| 325 else: |
| 326 default_indent = ' ' |
| 327 return '\n' + flags.TextWrap( |
| 328 command.CommandGetHelp('', cmd_names=command_names), |
| 329 indent=default_indent, |
| 330 firstline_indent=default_indent) + '\n' |
| 331 |
| 332 if not command_name: |
| 333 print('\nHelp for commands:\n') |
| 334 command_names = list(self._commands) |
| 335 print('\n\n'.join( |
| 336 FormatOneCmd(name, command, command_names) |
| 337 for name, command in self._commands.items() |
| 338 if name not in self._special_command_names)) |
| 339 print() |
| 340 elif command_name in self._commands: |
| 341 print(FormatOneCmd(command_name, self._commands[command_name], |
| 342 command_names=[command_name])) |
| 343 return 0 |
| 344 |
| 345 def postcmd(self, stop, line): |
| 346 return bool(stop) or line == 'EOF' |
| 347 |
| 348 |
| 349 class Repl(NewCmd): |
| 350 |
| 351 """Start an interactive session.""" |
| 352 PROMPT = '> ' |
| 353 |
| 354 def __init__(self, name, fv): |
| 355 super(Repl, self).__init__(name, fv) |
| 356 self.surface_in_shell = False |
| 357 flags.DEFINE_string( |
| 358 'prompt', '', |
| 359 'Prompt to use for interactive shell.', |
| 360 flag_values=fv) |
| 361 |
| 362 def RunWithArgs(self): |
| 363 """Start an interactive session.""" |
| 364 prompt = FLAGS.prompt or self.PROMPT |
| 365 repl = CommandLoop(appcommands.GetCommandList(), prompt=prompt) |
| 366 print('Welcome! (Type help for more information.)') |
| 367 while True: |
| 368 try: |
| 369 repl.cmdloop() |
| 370 break |
| 371 except KeyboardInterrupt: |
| 372 print() |
| 373 return repl.last_return_code |
OLD | NEW |