Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(195)

Unified Diff: subprocess2.py

Issue 8505046: Revert r109283, r109282 and r109239. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: subprocess2.py
diff --git a/subprocess2.py b/subprocess2.py
index 430010c144afd6e3798fd1e8bd7b22cf94374d6b..4708aadf362ee347cf3d6528acc3c9007c917208 100644
--- a/subprocess2.py
+++ b/subprocess2.py
@@ -8,21 +8,15 @@ In theory you shouldn't need anything else in subprocess, or this module failed.
"""
from __future__ import with_statement
-import cStringIO
import errno
import logging
import os
-import Queue
-import select
import subprocess
import sys
+import tempfile
import time
import threading
-if sys.platform != 'win32':
- import fcntl
-
-
# Constants forwarded from subprocess.
PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
@@ -182,9 +176,6 @@ def Popen(args, **kwargs):
# When the pipe fills up, it will deadlock this process. Using a real file
# works around that issue.
kwargs[stream] = open(os.devnull, 'w')
- if callable(kwargs.get(stream)):
- # Callable stdout/stderr should be used only with call() wrappers.
- kwargs[stream] = PIPE
fix('stdout')
fix('stderr')
@@ -207,214 +198,6 @@ def Popen(args, **kwargs):
raise
-def _queue_pipe_read(pipe, name, done, dest):
- """Queues characters read from a pipe into a queue.
-
- Left outside the _tee_threads function to not introduce a function closure
- to speed up variable lookup.
- """
- while not done.isSet():
- data = pipe.read(1)
- if not data:
- break
- dest.put((name, data))
- dest.put(name)
-
-
-def _tee_threads(proc, timeout, start, stdin, args, kwargs):
- """Does I/O for a process's pipes using thread.
-
- It's the simplest and slowest implementation. Expect very slow behavior.
-
- If there is a callback and it doesn't keep up with the calls, the timeout
- effectiveness will be delayed accordingly.
- """
- # TODO(maruel): Implement a select based implementation on POSIX and a Windows
- # one using WaitForMultipleObjects().
- #
- # Queue of either of <threadname> when done or (<threadname>, data).
- # In theory we would like to limit to ~64kb items to not cause large memory
- # usage when the callback blocks. It is not done because it slows down
- # processing on OSX10.6 by a factor of 2x, making it even slower than Windows!
- # Revisit this decision if it becomes a problem, e.g. crash because of
- # memory exhaustion.
- queue = Queue.Queue()
- done = threading.Event()
-
- def write_stdin():
- stdin_io = cStringIO.StringIO(stdin)
- while not done.isSet():
- data = stdin_io.read(1024)
- if data:
- proc.stdin.write(data)
- else:
- proc.stdin.close()
- break
- queue.put('stdin')
-
- def timeout_fn():
- done.wait(timeout)
- # No need to close the pipes since killing should be sufficient.
- queue.put('timeout')
-
- # Starts up to 4 threads:
- # Read stdout
- # Read stderr
- # Write stdin
- # Timeout
- threads = {}
- if timeout is not None:
- threads['timeout'] = threading.Thread(target=timeout_fn)
- if callable(kwargs.get('stdout')):
- threads['stdout'] = threading.Thread(
- target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue))
- if callable(kwargs.get('stderr')):
- threads['stderr'] = threading.Thread(
- target=_queue_pipe_read,
- args=(proc.stderr, 'stderr', done, queue))
- if isinstance(stdin, str):
- threads['stdin'] = threading.Thread(target=write_stdin)
- for t in threads.itervalues():
- t.daemon = True
- t.start()
-
- timed_out = False
- try:
- while proc.returncode is None:
- assert threads
- proc.poll()
- item = queue.get()
- if isinstance(item, str):
- threads[item].join()
- del threads[item]
- if item == 'timeout' and not timed_out and proc.poll() is None:
- logging.debug('Timed out: killing')
- proc.kill()
- timed_out = True
- if not threads:
- # We won't be waken up anymore. Need to busy loop.
- break
- else:
- kwargs[item[0]](item[1])
- finally:
- # Stop the threads.
- done.set()
- # Join threads
- for thread in threads.itervalues():
- thread.join()
-
- # Flush the queue.
- try:
- while True:
- item = queue.get(False)
- if isinstance(item, str):
- if item == 'timeout':
- # TODO(maruel): Does it make sense at that point?
- if not timed_out and proc.poll() is None:
- logging.debug('Timed out: killing')
- proc.kill()
- timed_out = True
- else:
- kwargs[item[0]](item[1])
- except Queue.Empty:
- pass
-
- # Get the remainder.
- if callable(kwargs.get('stdout')):
- data = proc.stdout.read()
- while data:
- kwargs['stdout'](data)
- data = proc.stdout.read()
- if callable(kwargs.get('stderr')):
- data = proc.stderr.read()
- while data:
- kwargs['stderr'](data)
- data = proc.stderr.read()
-
- if proc.returncode is None:
- # Usually happens when killed with timeout but not listening to pipes.
- proc.wait()
-
- if timed_out:
- return TIMED_OUT
-
- return proc.returncode
-
-
-def _read_pipe(handles, pipe, out_fn):
- """Reads bytes from a pipe and calls the output callback."""
- data = pipe.read()
- if not data:
- del handles[pipe]
- else:
- out_fn(data)
-
-
-def _tee_posix(proc, timeout, start, stdin, args, kwargs):
- """Polls a process and its pipe using select.select().
-
- TODO(maruel): Implement a non-polling method for OSes that support it.
- """
- handles_r = {}
- if callable(kwargs.get('stdout')):
- handles_r[proc.stdout] = lambda: _read_pipe(
- handles_r, proc.stdout, kwargs['stdout'])
- if callable(kwargs.get('stderr')):
- handles_r[proc.stderr] = lambda: _read_pipe(
- handles_r, proc.stderr, kwargs['stderr'])
-
- handles_w = {}
- if isinstance(stdin, str):
- stdin_io = cStringIO.StringIO(stdin)
- def write_stdin():
- data = stdin_io.read(1)
- if data:
- proc.stdin.write(data)
- else:
- del handles_w[proc.stdin]
- proc.stdin.close()
- handles_w[proc.stdin] = write_stdin
- else:
- # TODO(maruel): Fix me, it could be VOID.
- assert stdin is None
-
- # Make all the file objects of the child process non-blocking file.
- # TODO(maruel): Test if a pipe is handed to the child process.
- for pipe in (proc.stdin, proc.stdout, proc.stderr):
- fileno = pipe and getattr(pipe, 'fileno', lambda: None)()
- if fileno:
- # Note: making a pipe non-blocking means the C stdio could act wrong. In
- # particular, readline() cannot be used. Work around is to use os.read().
- fl = fcntl.fcntl(fileno, fcntl.F_GETFL)
- fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK)
-
- timed_out = False
- while handles_r or handles_w or (timeout and proc.poll() is None):
- period = None
- if timeout:
- period = max(0, timeout - (time.time() - start))
- if not period and not timed_out:
- proc.kill()
- timed_out = True
- if timed_out:
- period = 0.001
-
- # It reconstructs objects on each loop, not very efficient.
- reads, writes, _, = select.select(
- handles_r.keys(), handles_w.keys(), [], period)
- for read in reads:
- handles_r[read]()
- for write in writes:
- handles_w[write]()
-
- # No pipe open anymore and if there was a time out, the child process was
- # killed already.
- proc.wait()
- if timed_out:
- return TIMED_OUT
- return proc.returncode
-
-
def communicate(args, timeout=None, **kwargs):
"""Wraps subprocess.Popen().communicate().
@@ -424,11 +207,6 @@ def communicate(args, timeout=None, **kwargs):
TIMED_OUT.
- Automatically passes stdin content as input so do not specify stdin=PIPE.
"""
- if timeout and kwargs.get('shell'):
- raise TypeError(
- 'Using timeout and shell simultaneously will cause a process leak '
- 'since the shell will be killed instead of the child process.')
-
stdin = kwargs.pop('stdin', None)
if stdin is not None:
if stdin is VOID:
@@ -440,42 +218,36 @@ def communicate(args, timeout=None, **kwargs):
# set the Popen() parameter accordingly.
kwargs['stdin'] = PIPE
- start = time.time()
- proc = Popen(args, **kwargs)
- need_buffering = (timeout or
- callable(kwargs.get('stdout')) or callable(kwargs.get('stderr')))
-
- if not need_buffering:
+ if not timeout:
# Normal workflow.
- if stdin not in (None, VOID):
+ proc = Popen(args, **kwargs)
+ if stdin is not None:
return proc.communicate(stdin), proc.returncode
else:
return proc.communicate(), proc.returncode
- stdout = None
- stderr = None
- # Convert to a lambda to workaround python's deadlock.
+ # Create a temporary file to workaround python's deadlock.
# http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
- # When the pipe fills up, it will deadlock this process. Using a thread
- # works around that issue. No need for thread safe function since the call
- # backs are guaranteed to be called from the main thread.
- if kwargs.get('stdout') == PIPE:
- stdout = []
- kwargs['stdout'] = stdout.append
- if kwargs.get('stderr') == PIPE:
- stderr = []
- kwargs['stderr'] = stderr.append
- if sys.platform == 'win32':
- # On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE,
- # doesn't exist so _tee_win() cannot be used yet.
- returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs)
- else:
- returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs)
- if not stdout is None:
- stdout = ''.join(stdout)
- if not stderr is None:
- stderr = ''.join(stderr)
- return (stdout, stderr), returncode
+ # When the pipe fills up, it will deadlock this process. Using a real file
+ # works around that issue.
+ with tempfile.TemporaryFile() as buff:
+ start = time.time()
+ kwargs['stdout'] = buff
+ proc = Popen(args, **kwargs)
+ if stdin is not None:
+ proc.stdin.write(stdin)
+ while proc.returncode is None:
+ proc.poll()
+ if timeout and (time.time() - start) > timeout:
+ proc.kill()
+ proc.wait()
+ # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT.
+ proc.returncode = TIMED_OUT
+ time.sleep(0.001)
+ # Now that the process died, reset the cursor and read the file.
+ buff.seek(0)
+ out = [buff.read(), None]
+ return out, proc.returncode
def call(args, **kwargs):
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698