OLD | NEW |
1 # coding=utf8 | 1 # coding=utf8 |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 """Collection of subprocess wrapper functions. | 5 """Collection of subprocess wrapper functions. |
6 | 6 |
7 In theory you shouldn't need anything else in subprocess, or this module failed. | 7 In theory you shouldn't need anything else in subprocess, or this module failed. |
8 """ | 8 """ |
9 | 9 |
10 from __future__ import with_statement | 10 from __future__ import with_statement |
| 11 import cStringIO |
11 import errno | 12 import errno |
12 import logging | 13 import logging |
13 import os | 14 import os |
| 15 import Queue |
14 import subprocess | 16 import subprocess |
15 import sys | 17 import sys |
16 import tempfile | |
17 import time | 18 import time |
18 import threading | 19 import threading |
19 | 20 |
20 | 21 |
21 # Constants forwarded from subprocess. | 22 # Constants forwarded from subprocess. |
22 PIPE = subprocess.PIPE | 23 PIPE = subprocess.PIPE |
23 STDOUT = subprocess.STDOUT | 24 STDOUT = subprocess.STDOUT |
24 # Sends stdout or stderr to os.devnull. | 25 # Sends stdout or stderr to os.devnull. |
25 VOID = object() | 26 VOID = object() |
26 # Error code when a process was killed because it timed out. | 27 # Error code when a process was killed because it timed out. |
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
163 if isinstance(args, basestring): | 164 if isinstance(args, basestring): |
164 tmp_str = args | 165 tmp_str = args |
165 elif isinstance(args, (list, tuple)): | 166 elif isinstance(args, (list, tuple)): |
166 tmp_str = ' '.join(args) | 167 tmp_str = ' '.join(args) |
167 else: | 168 else: |
168 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None) | 169 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None) |
169 if kwargs.get('cwd', None): | 170 if kwargs.get('cwd', None): |
170 tmp_str += '; cwd=%s' % kwargs['cwd'] | 171 tmp_str += '; cwd=%s' % kwargs['cwd'] |
171 logging.debug(tmp_str) | 172 logging.debug(tmp_str) |
172 | 173 |
| 174 self.stdout_cb = None |
| 175 self.stderr_cb = None |
| 176 self.stdout_void = False |
| 177 self.stderr_void = False |
173 def fix(stream): | 178 def fix(stream): |
174 if kwargs.get(stream) in (VOID, os.devnull): | 179 if kwargs.get(stream) in (VOID, os.devnull): |
175 # Replaces VOID with handle to /dev/null. | 180 # Replaces VOID with handle to /dev/null. |
176 # Create a temporary file to workaround python's deadlock. | 181 # Create a temporary file to workaround python's deadlock. |
177 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | 182 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
178 # When the pipe fills up, it will deadlock this process. Using a real | 183 # When the pipe fills up, it will deadlock this process. Using a real |
179 # file works around that issue. | 184 # file works around that issue. |
180 kwargs[stream] = open(os.devnull, 'w') | 185 kwargs[stream] = open(os.devnull, 'w') |
| 186 setattr(self, stream + '_void', True) |
| 187 if callable(kwargs.get(stream)): |
| 188 # Callable stdout/stderr should be used only with call() wrappers. |
| 189 setattr(self, stream + '_cb', kwargs[stream]) |
| 190 kwargs[stream] = PIPE |
181 | 191 |
182 fix('stdout') | 192 fix('stdout') |
183 fix('stderr') | 193 fix('stderr') |
184 | 194 |
185 self.start = time.time() | 195 self.start = time.time() |
| 196 self.timeout = None |
186 self.shell = kwargs.get('shell', None) | 197 self.shell = kwargs.get('shell', None) |
187 # Silence pylint on MacOSX | 198 # Silence pylint on MacOSX |
188 self.returncode = None | 199 self.returncode = None |
189 | 200 |
190 try: | 201 try: |
191 super(Popen, self).__init__(args, **kwargs) | 202 super(Popen, self).__init__(args, **kwargs) |
192 except OSError, e: | 203 except OSError, e: |
193 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': | 204 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': |
194 # Convert fork() emulation failure into a CygwinRebaseError(). | 205 # Convert fork() emulation failure into a CygwinRebaseError(). |
195 raise CygwinRebaseError( | 206 raise CygwinRebaseError( |
196 e.errno, | 207 e.errno, |
197 args, | 208 args, |
198 kwargs.get('cwd'), | 209 kwargs.get('cwd'), |
199 None, | 210 None, |
200 'Visit ' | 211 'Visit ' |
201 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure ' | 212 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure ' |
202 'to learn how to fix this error; you need to rebase your cygwin ' | 213 'to learn how to fix this error; you need to rebase your cygwin ' |
203 'dlls') | 214 'dlls') |
204 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go | 215 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go |
205 # through | 216 # through |
206 raise | 217 raise |
207 | 218 |
| 219 def _tee_threads(self, input): # pylint: disable=W0622 |
| 220 """Does I/O for a process's pipes using threads. |
| 221 |
| 222 It's the simplest and slowest implementation. Expect very slow behavior. |
| 223 |
| 224 If there is a callback and it doesn't keep up with the calls, the timeout |
| 225 effectiveness will be delayed accordingly. |
| 226 """ |
| 227 # Queue of either of <threadname> when done or (<threadname>, data). In |
| 228 # theory we would like to limit to ~64kb items to not cause large memory |
| 229 # usage when the callback blocks. It is not done because it slows down |
| 230 # processing on OSX10.6 by a factor of 2x, making it even slower than |
| 231 # Windows! Revisit this decision if it becomes a problem, e.g. crash |
| 232 # because of memory exhaustion. |
| 233 queue = Queue.Queue() |
| 234 done = threading.Event() |
| 235 |
| 236 def write_stdin(): |
| 237 try: |
| 238 stdin_io = cStringIO.StringIO(input) |
| 239 while True: |
| 240 data = stdin_io.read(1024) |
| 241 if data: |
| 242 self.stdin.write(data) |
| 243 else: |
| 244 self.stdin.close() |
| 245 break |
| 246 finally: |
| 247 queue.put('stdin') |
| 248 |
| 249 def _queue_pipe_read(pipe, name): |
| 250 """Queues characters read from a pipe into a queue.""" |
| 251 try: |
| 252 while True: |
| 253 data = pipe.read(1) |
| 254 if not data: |
| 255 break |
| 256 queue.put((name, data)) |
| 257 finally: |
| 258 queue.put(name) |
| 259 |
| 260 def timeout_fn(): |
| 261 try: |
| 262 done.wait(self.timeout) |
| 263 finally: |
| 264 queue.put('timeout') |
| 265 |
| 266 def wait_fn(): |
| 267 try: |
| 268 self.wait() |
| 269 finally: |
| 270 queue.put('wait') |
| 271 |
| 272 # Starts up to 5 threads: |
| 273 # Wait for the process to quit |
| 274 # Read stdout |
| 275 # Read stderr |
| 276 # Write stdin |
| 277 # Timeout |
| 278 threads = { |
| 279 'wait': threading.Thread(target=wait_fn), |
| 280 } |
| 281 if self.timeout is not None: |
| 282 threads['timeout'] = threading.Thread(target=timeout_fn) |
| 283 if self.stdout_cb: |
| 284 threads['stdout'] = threading.Thread( |
| 285 target=_queue_pipe_read, args=(self.stdout, 'stdout')) |
| 286 if self.stderr_cb: |
| 287 threads['stderr'] = threading.Thread( |
| 288 target=_queue_pipe_read, args=(self.stderr, 'stderr')) |
| 289 if input: |
| 290 threads['stdin'] = threading.Thread(target=write_stdin) |
| 291 for t in threads.itervalues(): |
| 292 t.start() |
| 293 |
| 294 timed_out = False |
| 295 try: |
| 296 # This thread needs to be optimized for speed. |
| 297 while threads: |
| 298 item = queue.get() |
| 299 if item[0] is 'stdout': |
| 300 self.stdout_cb(item[1]) |
| 301 elif item[0] is 'stderr': |
| 302 self.stderr_cb(item[1]) |
| 303 else: |
| 304 # A thread terminated. |
| 305 threads[item].join() |
| 306 del threads[item] |
| 307 if item == 'wait': |
| 308 # Terminate the timeout thread if necessary. |
| 309 done.set() |
| 310 elif item == 'timeout' and not timed_out and self.poll() is None: |
| 311 logging.debug('Timed out after %fs: killing' % self.timeout) |
| 312 self.kill() |
| 313 timed_out = True |
| 314 finally: |
| 315 # Stop the threads. |
| 316 done.set() |
| 317 if 'wait' in threads: |
| 318 # Accelerate things, otherwise it would hang until the child process is |
| 319 # done. |
| 320 logging.debug('Killing child because of an exception') |
| 321 self.kill() |
| 322 # Join threads. |
| 323 for thread in threads.itervalues(): |
| 324 thread.join() |
| 325 if timed_out: |
| 326 self.returncode = TIMED_OUT |
| 327 |
| 328 def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622 |
| 329 """Adds timeout and callbacks support. |
| 330 |
| 331 Returns (stdout, stderr) like subprocess.Popen().communicate(). |
| 332 |
| 333 - The process will be killed after |timeout| seconds and returncode set to |
| 334 TIMED_OUT. |
| 335 """ |
| 336 self.timeout = timeout |
| 337 if not self.timeout and not self.stdout_cb and not self.stderr_cb: |
| 338 return super(Popen, self).communicate(input) |
| 339 |
| 340 if self.timeout and self.shell: |
| 341 raise TypeError( |
| 342 'Using timeout and shell simultaneously will cause a process leak ' |
| 343 'since the shell will be killed instead of the child process.') |
| 344 |
| 345 stdout = None |
| 346 stderr = None |
| 347 # Convert to a lambda to workaround python's deadlock. |
| 348 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
| 349 # When the pipe fills up, it will deadlock this process. Using a thread |
| 350 # works around that issue. No need for thread safe function since the call |
| 351 # backs are guaranteed to be called from the main thread. |
| 352 if self.stdout and not self.stdout_cb and not self.stdout_void: |
| 353 stdout = cStringIO.StringIO() |
| 354 self.stdout_cb = stdout.write |
| 355 if self.stderr and not self.stderr_cb and not self.stderr_void: |
| 356 stderr = cStringIO.StringIO() |
| 357 self.stderr_cb = stderr.write |
| 358 self._tee_threads(input) |
| 359 if stdout: |
| 360 stdout = stdout.getvalue() |
| 361 if stderr: |
| 362 stderr = stderr.getvalue() |
| 363 return (stdout, stderr) |
| 364 |
208 | 365 |
209 def communicate(args, timeout=None, **kwargs): | 366 def communicate(args, timeout=None, **kwargs): |
210 """Wraps subprocess.Popen().communicate() and add timeout support. | 367 """Wraps subprocess.Popen().communicate() and add timeout support. |
211 | 368 |
212 Returns ((stdout, stderr), returncode). | 369 Returns ((stdout, stderr), returncode). |
213 | 370 |
214 - The process will be killed after |timeout| seconds and returncode set to | 371 - The process will be killed after |timeout| seconds and returncode set to |
215 TIMED_OUT. | 372 TIMED_OUT. |
216 - Automatically passes stdin content as input so do not specify stdin=PIPE. | 373 - Automatically passes stdin content as input so do not specify stdin=PIPE. |
217 """ | 374 """ |
218 stdin = kwargs.pop('stdin', None) | 375 stdin = kwargs.pop('stdin', None) |
219 if stdin is not None: | 376 if stdin is not None: |
220 if stdin is VOID: | 377 if stdin is VOID: |
221 kwargs['stdin'] = open(os.devnull, 'r') | 378 kwargs['stdin'] = open(os.devnull, 'r') |
222 stdin = None | 379 stdin = None |
223 else: | 380 else: |
224 assert isinstance(stdin, basestring) | 381 assert isinstance(stdin, basestring) |
225 # When stdin is passed as an argument, use it as the actual input data and | 382 # When stdin is passed as an argument, use it as the actual input data and |
226 # set the Popen() parameter accordingly. | 383 # set the Popen() parameter accordingly. |
227 kwargs['stdin'] = PIPE | 384 kwargs['stdin'] = PIPE |
228 | 385 |
229 if not timeout: | 386 proc = Popen(args, **kwargs) |
230 # Normal workflow. | 387 if stdin not in (None, VOID): |
231 proc = Popen(args, **kwargs) | 388 return proc.communicate(stdin, timeout), proc.returncode |
232 if stdin is not None: | 389 else: |
233 return proc.communicate(stdin), proc.returncode | 390 return proc.communicate(None, timeout), proc.returncode |
234 else: | |
235 return proc.communicate(), proc.returncode | |
236 | |
237 # Create a temporary file to workaround python's deadlock. | |
238 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | |
239 # When the pipe fills up, it will deadlock this process. Using a real file | |
240 # works around that issue. | |
241 with tempfile.TemporaryFile() as buff: | |
242 kwargs['stdout'] = buff | |
243 proc = Popen(args, **kwargs) | |
244 if proc.shell: | |
245 raise TypeError( | |
246 'Using timeout and shell simultaneously will cause a process leak ' | |
247 'since the shell will be killed instead of the child process.') | |
248 if stdin is not None: | |
249 proc.stdin.write(stdin) | |
250 while proc.returncode is None: | |
251 proc.poll() | |
252 if timeout and (time.time() - proc.start) > timeout: | |
253 proc.kill() | |
254 proc.wait() | |
255 # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. | |
256 proc.returncode = TIMED_OUT | |
257 time.sleep(0.001) | |
258 # Now that the process died, reset the cursor and read the file. | |
259 buff.seek(0) | |
260 out = (buff.read(), None) | |
261 return out, proc.returncode | |
262 | 391 |
263 | 392 |
264 def call(args, **kwargs): | 393 def call(args, **kwargs): |
265 """Emulates subprocess.call(). | 394 """Emulates subprocess.call(). |
266 | 395 |
267 Automatically convert stdout=PIPE or stderr=PIPE to VOID. | 396 Automatically convert stdout=PIPE or stderr=PIPE to VOID. |
268 In no case they can be returned since no code path raises | 397 In no case they can be returned since no code path raises |
269 subprocess2.CalledProcessError. | 398 subprocess2.CalledProcessError. |
270 """ | 399 """ |
271 if kwargs.get('stdout') == PIPE: | 400 if kwargs.get('stdout') == PIPE: |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
314 | 443 |
315 - Throws if return code is not 0. | 444 - Throws if return code is not 0. |
316 - Works even prior to python 2.7. | 445 - Works even prior to python 2.7. |
317 - Blocks stdin by default if not specified since no output will be visible. | 446 - Blocks stdin by default if not specified since no output will be visible. |
318 - As per doc, "The stdout argument is not allowed as it is used internally." | 447 - As per doc, "The stdout argument is not allowed as it is used internally." |
319 """ | 448 """ |
320 kwargs.setdefault('stdin', VOID) | 449 kwargs.setdefault('stdin', VOID) |
321 if 'stdout' in kwargs: | 450 if 'stdout' in kwargs: |
322 raise ValueError('stdout argument not allowed, it will be overridden.') | 451 raise ValueError('stdout argument not allowed, it will be overridden.') |
323 return check_call_out(args, stdout=PIPE, **kwargs)[0] | 452 return check_call_out(args, stdout=PIPE, **kwargs)[0] |
OLD | NEW |