| Index: tools/misc_utils.py
|
| diff --git a/tools/misc_utils.py b/tools/misc_utils.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..13978a47421116fd9c1cd45936e584431150618c
|
| --- /dev/null
|
| +++ b/tools/misc_utils.py
|
| @@ -0,0 +1,224 @@
|
| +# Copyright 2014 Google Inc.
|
| +#
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +
|
| +"""Module to host the VerboseSubprocess, ChangeDir, and ReSearch classes.
|
| +"""
|
| +
|
| +import os
|
| +import re
|
| +import subprocess
|
| +
|
| +
|
| +def print_subprocess_args(prefix, *args, **kwargs):
|
| + """Print out args in a human-readable manner."""
|
| + def quote_and_escape(string):
|
| + """Quote and escape a string if necessary."""
|
| + if ' ' in string or '\n' in string:
|
| + string = '"%s"' % string.replace('"', '\\"')
|
| + return string
|
| + if 'cwd' in kwargs:
|
| + print '%scd %s' % (prefix, kwargs['cwd'])
|
| + print prefix + ' '.join(quote_and_escape(arg) for arg in args[0])
|
| + if 'cwd' in kwargs:
|
| + print '%scd -' % prefix
|
| +
|
| +
|
| +class VerboseSubprocess(object):
|
| + """Call subprocess methods, but print out command before executing.
|
| +
|
| + Attributes:
|
| + verbose: (boolean) should we print out the command or not. If
|
| + not, this is the same as calling the subprocess method
|
| + quiet: (boolean) suppress stdout on check_call and call.
|
| + prefix: (string) When verbose, what to print before each command.
|
| + """
|
| +
|
| + def __init__(self, verbose):
|
| + self.verbose = verbose
|
| + self.quiet = not verbose
|
| + self.prefix = '~~$ '
|
| +
|
| + def check_call(self, *args, **kwargs):
|
| + """Wrapper for subprocess.check_call().
|
| +
|
| + Args:
|
| + *args: to be passed to subprocess.check_call()
|
| + **kwargs: to be passed to subprocess.check_call()
|
| + Returns:
|
| + Whatever subprocess.check_call() returns.
|
| + Raises:
|
| + OSError or subprocess.CalledProcessError: raised by check_call.
|
| + """
|
| + if self.verbose:
|
| + print_subprocess_args(self.prefix, *args, **kwargs)
|
| + if self.quiet:
|
| + with open(os.devnull, 'w') as devnull:
|
| + return subprocess.check_call(*args, stdout=devnull, **kwargs)
|
| + else:
|
| + return subprocess.check_call(*args, **kwargs)
|
| +
|
| + def call(self, *args, **kwargs):
|
| + """Wrapper for subprocess.check().
|
| +
|
| + Args:
|
| + *args: to be passed to subprocess.check_call()
|
| + **kwargs: to be passed to subprocess.check_call()
|
| + Returns:
|
| + Whatever subprocess.call() returns.
|
| + Raises:
|
| + OSError or subprocess.CalledProcessError: raised by call.
|
| + """
|
| + if self.verbose:
|
| + print_subprocess_args(self.prefix, *args, **kwargs)
|
| + if self.quiet:
|
| + with open(os.devnull, 'w') as devnull:
|
| + return subprocess.call(*args, stdout=devnull, **kwargs)
|
| + else:
|
| + return subprocess.call(*args, **kwargs)
|
| +
|
| + def check_output(self, *args, **kwargs):
|
| + """Wrapper for subprocess.check_output().
|
| +
|
| + Args:
|
| + *args: to be passed to subprocess.check_output()
|
| + **kwargs: to be passed to subprocess.check_output()
|
| + Returns:
|
| + Whatever subprocess.check_output() returns.
|
| + Raises:
|
| + OSError or subprocess.CalledProcessError: raised by check_output.
|
| + """
|
| + if self.verbose:
|
| + print_subprocess_args(self.prefix, *args, **kwargs)
|
| + return subprocess.check_output(*args, **kwargs)
|
| +
|
| + def strip_output(self, *args, **kwargs):
|
| + """Wrap subprocess.check_output and str.strip().
|
| +
|
| + Pass the given arguments into subprocess.check_output() and return
|
| + the results, after stripping any excess whitespace.
|
| +
|
| + Args:
|
| + *args: to be passed to subprocess.check_output()
|
| + **kwargs: to be passed to subprocess.check_output()
|
| +
|
| + Returns:
|
| + The output of the process as a string without leading or
|
| + trailing whitespace.
|
| + Raises:
|
| + OSError or subprocess.CalledProcessError: raised by check_output.
|
| + """
|
| + if self.verbose:
|
| + print_subprocess_args(self.prefix, *args, **kwargs)
|
| + return str(subprocess.check_output(*args, **kwargs)).strip()
|
| +
|
| + def popen(self, *args, **kwargs):
|
| + """Wrapper for subprocess.Popen().
|
| +
|
| + Args:
|
| + *args: to be passed to subprocess.Popen()
|
| + **kwargs: to be passed to subprocess.Popen()
|
| + Returns:
|
| + The output of subprocess.Popen()
|
| + Raises:
|
| + OSError or subprocess.CalledProcessError: raised by Popen.
|
| + """
|
| + if self.verbose:
|
| + print_subprocess_args(self.prefix, *args, **kwargs)
|
| + return subprocess.Popen(*args, **kwargs)
|
| +
|
| +
|
| +class ChangeDir(object):
|
| + """Use with a with-statement to temporarily change directories."""
|
| + # pylint: disable=I0011,R0903
|
| +
|
| + def __init__(self, directory, verbose=False):
|
| + self._directory = directory
|
| + self._verbose = verbose
|
| +
|
| + def __enter__(self):
|
| + if self._directory != os.curdir:
|
| + if self._verbose:
|
| + print '~~$ cd %s' % self._directory
|
| + cwd = os.getcwd()
|
| + os.chdir(self._directory)
|
| + self._directory = cwd
|
| +
|
| + def __exit__(self, etype, value, traceback):
|
| + if self._directory != os.curdir:
|
| + if self._verbose:
|
| + print '~~$ cd %s' % self._directory
|
| + os.chdir(self._directory)
|
| +
|
| +
|
| +class ReSearch(object):
|
| + """A collection of static methods for regexing things."""
|
| +
|
| + @staticmethod
|
| + def search_within_stream(input_stream, pattern, default=None):
|
| + """Search for regular expression in a file-like object.
|
| +
|
| + Opens a file for reading and searches line by line for a match to
|
| + the regex and returns the parenthesized group named return for the
|
| + first match. Does not search across newlines.
|
| +
|
| + For example:
|
| + pattern = '^root(:[^:]*){4}:(?P<return>[^:]*)'
|
| + with open('/etc/passwd', 'r') as stream:
|
| + return search_within_file(stream, pattern)
|
| + should return root's home directory (/root on my system).
|
| +
|
| + Args:
|
| + input_stream: file-like object to be read
|
| + pattern: (string) to be passed to re.compile
|
| + default: what to return if no match
|
| +
|
| + Returns:
|
| + A string or whatever default is
|
| + """
|
| + pattern_object = re.compile(pattern)
|
| + for line in input_stream:
|
| + match = pattern_object.search(line)
|
| + if match:
|
| + return match.group('return')
|
| + return default
|
| +
|
| + @staticmethod
|
| + def search_within_string(input_string, pattern, default=None):
|
| + """Search for regular expression in a string.
|
| +
|
| + Args:
|
| + input_string: (string) to be searched
|
| + pattern: (string) to be passed to re.compile
|
| + default: what to return if no match
|
| +
|
| + Returns:
|
| + A string or whatever default is
|
| + """
|
| + match = re.search(pattern, input_string)
|
| + return match.group('return') if match else default
|
| +
|
| + @staticmethod
|
| + def search_within_output(verbose, pattern, default, *args, **kwargs):
|
| + """Search for regular expression in a process output.
|
| +
|
| + Does not search across newlines.
|
| +
|
| + Args:
|
| + verbose: (boolean) shoule we call print_subprocess_args?
|
| + pattern: (string) to be passed to re.compile
|
| + default: what to return if no match
|
| + *args: to be passed to subprocess.Popen()
|
| + **kwargs: to be passed to subprocess.Popen()
|
| +
|
| + Returns:
|
| + A string or whatever default is
|
| + """
|
| + if verbose:
|
| + print_subprocess_args('~~$ ', *args, **kwargs)
|
| + proc = subprocess.Popen(*args, stdout=subprocess.PIPE, **kwargs)
|
| + return ReSearch.search_within_stream(proc.stdout, pattern, default)
|
| +
|
| +
|
|
|