OLD | NEW |
(Empty) | |
| 1 # Copyright 2014 Google Inc. |
| 2 # |
| 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. |
| 5 |
| 6 |
| 7 """Module to host the VerboseSubprocess, ChangeDir, and ReSearch classes. |
| 8 """ |
| 9 |
| 10 import os |
| 11 import re |
| 12 import subprocess |
| 13 |
| 14 |
| 15 def print_subprocess_args(prefix, *args, **kwargs): |
| 16 """Print out args in a human-readable manner.""" |
| 17 def quote_and_escape(string): |
| 18 """Quote and escape a string if necessary.""" |
| 19 if ' ' in string or '\n' in string: |
| 20 string = '"%s"' % string.replace('"', '\\"') |
| 21 return string |
| 22 if 'cwd' in kwargs: |
| 23 print '%scd %s' % (prefix, kwargs['cwd']) |
| 24 print prefix + ' '.join(quote_and_escape(arg) for arg in args[0]) |
| 25 if 'cwd' in kwargs: |
| 26 print '%scd -' % prefix |
| 27 |
| 28 |
| 29 class VerboseSubprocess(object): |
| 30 """Call subprocess methods, but print out command before executing. |
| 31 |
| 32 Attributes: |
| 33 verbose: (boolean) should we print out the command or not. If |
| 34 not, this is the same as calling the subprocess method |
| 35 quiet: (boolean) suppress stdout on check_call and call. |
| 36 prefix: (string) When verbose, what to print before each command. |
| 37 """ |
| 38 |
| 39 def __init__(self, verbose): |
| 40 self.verbose = verbose |
| 41 self.quiet = not verbose |
| 42 self.prefix = '~~$ ' |
| 43 |
| 44 def check_call(self, *args, **kwargs): |
| 45 """Wrapper for subprocess.check_call(). |
| 46 |
| 47 Args: |
| 48 *args: to be passed to subprocess.check_call() |
| 49 **kwargs: to be passed to subprocess.check_call() |
| 50 Returns: |
| 51 Whatever subprocess.check_call() returns. |
| 52 Raises: |
| 53 OSError or subprocess.CalledProcessError: raised by check_call. |
| 54 """ |
| 55 if self.verbose: |
| 56 print_subprocess_args(self.prefix, *args, **kwargs) |
| 57 if self.quiet: |
| 58 with open(os.devnull, 'w') as devnull: |
| 59 return subprocess.check_call(*args, stdout=devnull, **kwargs) |
| 60 else: |
| 61 return subprocess.check_call(*args, **kwargs) |
| 62 |
| 63 def call(self, *args, **kwargs): |
| 64 """Wrapper for subprocess.check(). |
| 65 |
| 66 Args: |
| 67 *args: to be passed to subprocess.check_call() |
| 68 **kwargs: to be passed to subprocess.check_call() |
| 69 Returns: |
| 70 Whatever subprocess.call() returns. |
| 71 Raises: |
| 72 OSError or subprocess.CalledProcessError: raised by call. |
| 73 """ |
| 74 if self.verbose: |
| 75 print_subprocess_args(self.prefix, *args, **kwargs) |
| 76 if self.quiet: |
| 77 with open(os.devnull, 'w') as devnull: |
| 78 return subprocess.call(*args, stdout=devnull, **kwargs) |
| 79 else: |
| 80 return subprocess.call(*args, **kwargs) |
| 81 |
| 82 def check_output(self, *args, **kwargs): |
| 83 """Wrapper for subprocess.check_output(). |
| 84 |
| 85 Args: |
| 86 *args: to be passed to subprocess.check_output() |
| 87 **kwargs: to be passed to subprocess.check_output() |
| 88 Returns: |
| 89 Whatever subprocess.check_output() returns. |
| 90 Raises: |
| 91 OSError or subprocess.CalledProcessError: raised by check_output. |
| 92 """ |
| 93 if self.verbose: |
| 94 print_subprocess_args(self.prefix, *args, **kwargs) |
| 95 return subprocess.check_output(*args, **kwargs) |
| 96 |
| 97 def strip_output(self, *args, **kwargs): |
| 98 """Wrap subprocess.check_output and str.strip(). |
| 99 |
| 100 Pass the given arguments into subprocess.check_output() and return |
| 101 the results, after stripping any excess whitespace. |
| 102 |
| 103 Args: |
| 104 *args: to be passed to subprocess.check_output() |
| 105 **kwargs: to be passed to subprocess.check_output() |
| 106 |
| 107 Returns: |
| 108 The output of the process as a string without leading or |
| 109 trailing whitespace. |
| 110 Raises: |
| 111 OSError or subprocess.CalledProcessError: raised by check_output. |
| 112 """ |
| 113 if self.verbose: |
| 114 print_subprocess_args(self.prefix, *args, **kwargs) |
| 115 return str(subprocess.check_output(*args, **kwargs)).strip() |
| 116 |
| 117 def popen(self, *args, **kwargs): |
| 118 """Wrapper for subprocess.Popen(). |
| 119 |
| 120 Args: |
| 121 *args: to be passed to subprocess.Popen() |
| 122 **kwargs: to be passed to subprocess.Popen() |
| 123 Returns: |
| 124 The output of subprocess.Popen() |
| 125 Raises: |
| 126 OSError or subprocess.CalledProcessError: raised by Popen. |
| 127 """ |
| 128 if self.verbose: |
| 129 print_subprocess_args(self.prefix, *args, **kwargs) |
| 130 return subprocess.Popen(*args, **kwargs) |
| 131 |
| 132 |
| 133 class ChangeDir(object): |
| 134 """Use with a with-statement to temporarily change directories.""" |
| 135 # pylint: disable=I0011,R0903 |
| 136 |
| 137 def __init__(self, directory, verbose=False): |
| 138 self._directory = directory |
| 139 self._verbose = verbose |
| 140 |
| 141 def __enter__(self): |
| 142 if self._directory != os.curdir: |
| 143 if self._verbose: |
| 144 print '~~$ cd %s' % self._directory |
| 145 cwd = os.getcwd() |
| 146 os.chdir(self._directory) |
| 147 self._directory = cwd |
| 148 |
| 149 def __exit__(self, etype, value, traceback): |
| 150 if self._directory != os.curdir: |
| 151 if self._verbose: |
| 152 print '~~$ cd %s' % self._directory |
| 153 os.chdir(self._directory) |
| 154 |
| 155 |
| 156 class ReSearch(object): |
| 157 """A collection of static methods for regexing things.""" |
| 158 |
| 159 @staticmethod |
| 160 def search_within_stream(input_stream, pattern, default=None): |
| 161 """Search for regular expression in a file-like object. |
| 162 |
| 163 Opens a file for reading and searches line by line for a match to |
| 164 the regex and returns the parenthesized group named return for the |
| 165 first match. Does not search across newlines. |
| 166 |
| 167 For example: |
| 168 pattern = '^root(:[^:]*){4}:(?P<return>[^:]*)' |
| 169 with open('/etc/passwd', 'r') as stream: |
| 170 return search_within_file(stream, pattern) |
| 171 should return root's home directory (/root on my system). |
| 172 |
| 173 Args: |
| 174 input_stream: file-like object to be read |
| 175 pattern: (string) to be passed to re.compile |
| 176 default: what to return if no match |
| 177 |
| 178 Returns: |
| 179 A string or whatever default is |
| 180 """ |
| 181 pattern_object = re.compile(pattern) |
| 182 for line in input_stream: |
| 183 match = pattern_object.search(line) |
| 184 if match: |
| 185 return match.group('return') |
| 186 return default |
| 187 |
| 188 @staticmethod |
| 189 def search_within_string(input_string, pattern, default=None): |
| 190 """Search for regular expression in a string. |
| 191 |
| 192 Args: |
| 193 input_string: (string) to be searched |
| 194 pattern: (string) to be passed to re.compile |
| 195 default: what to return if no match |
| 196 |
| 197 Returns: |
| 198 A string or whatever default is |
| 199 """ |
| 200 match = re.search(pattern, input_string) |
| 201 return match.group('return') if match else default |
| 202 |
| 203 @staticmethod |
| 204 def search_within_output(verbose, pattern, default, *args, **kwargs): |
| 205 """Search for regular expression in a process output. |
| 206 |
| 207 Does not search across newlines. |
| 208 |
| 209 Args: |
| 210 verbose: (boolean) shoule we call print_subprocess_args? |
| 211 pattern: (string) to be passed to re.compile |
| 212 default: what to return if no match |
| 213 *args: to be passed to subprocess.Popen() |
| 214 **kwargs: to be passed to subprocess.Popen() |
| 215 |
| 216 Returns: |
| 217 A string or whatever default is |
| 218 """ |
| 219 if verbose: |
| 220 print_subprocess_args('~~$ ', *args, **kwargs) |
| 221 proc = subprocess.Popen(*args, stdout=subprocess.PIPE, **kwargs) |
| 222 return ReSearch.search_within_stream(proc.stdout, pattern, default) |
| 223 |
| 224 |
OLD | NEW |