Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(84)

Unified Diff: subprocess2.py

Issue 8374026: Add callback support for stdout and stderr. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Cleanup test Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: subprocess2.py
diff --git a/subprocess2.py b/subprocess2.py
index 4708aadf362ee347cf3d6528acc3c9007c917208..810631ae225feae33cfa53b62bd5ee44e1c1f6e6 100644
--- a/subprocess2.py
+++ b/subprocess2.py
@@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed.
"""
from __future__ import with_statement
+import cStringIO
import errno
import logging
import os
+import Queue
import subprocess
import sys
-import tempfile
import time
import threading
@@ -176,6 +177,9 @@ def Popen(args, **kwargs):
# When the pipe fills up, it will deadlock this process. Using a real file
# works around that issue.
kwargs[stream] = open(os.devnull, 'w')
+ if callable(kwargs.get(stream)):
+ # Callable stdout/stderr should be used only with call() wrappers.
+ kwargs[stream] = PIPE
fix('stdout')
fix('stderr')
@@ -198,6 +202,140 @@ def Popen(args, **kwargs):
raise
+def _queue_pipe_read(pipe, name, done, dest):
+ """Queue characters read from a pipe into a queue.
+
+ Left outside the _tee_threads function to not introduce a function closure
+ to speed up variable lookup.
+ """
+ while not done.isSet():
+ data = pipe.read(1)
+ if not data:
+ break
+ dest.put((name, data))
Dirk Pranke 2011/11/04 01:15:11 This timeout doesn't really work if the pipes are
M-A Ruel 2011/11/04 01:41:48 If the process is killed, the pipes are closed, wh
+ dest.put(name)
+
+
+def _tee_threads(proc, timeout, start, stdin, args, kwargs):
+ """Does I/O for a process's pipes using thread.
+
+ It's the simplest and slowest implementation. Expect very slow behavior.
+
+ If there is a callback and it doesn't keep up with the calls, the timeout
+ effectiveness will be delayed accordingly.
+ """
+ # TODO(maruel): Implement a select based implementation on POSIX and a Windows
+ # one using WaitForMultipleObjects().
+ #
+ # Queue of either of <threadname> when done or (<threadname>, data).
+ # In theory we would like to limit to ~64kb items to not cause large memory
+ # usage when the callback blocks. It is not done because it slows down
+ # processing on OSX10.6 by a factor of 2x, making it even slower than Windows!
+ # Revisit this decision if it becomes a problem, e.g. crash because of
+ # memory exhaustion.
+ queue = Queue.Queue()
+ done = threading.Event()
+
+ def write_stdin():
+ stdin_io = cStringIO.StringIO(stdin)
+ while not done.isSet():
+ data = stdin_io.read(1024)
+ if data:
+ proc.stdin.write(data)
+ else:
+ proc.stdin.close()
+ break
+ queue.put('stdin')
+
+ def timeout_fn():
+ done.wait(timeout)
+ # No need to close the pipes since killing should be sufficient.
+ queue.put('timeout')
+
+ # Starts up to 4 threads:
+ # Read stdout
+ # Read stderr
+ # Write stdin
+ # Timeout
+ threads = {}
+ if timeout is not None:
+ threads['timeout'] = threading.Thread(target=timeout_fn)
+ if callable(kwargs.get('stdout')):
+ threads['stdout'] = threading.Thread(
+ target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue))
+ if callable(kwargs.get('stderr')):
+ threads['stderr'] = threading.Thread(
+ target=_queue_pipe_read,
+ args=(proc.stderr, 'stderr', done, queue))
+ if isinstance(stdin, str):
+ threads['stdin'] = threading.Thread(target=write_stdin)
+ for t in threads.itervalues():
+ t.daemon = True
+ t.start()
+
+ timed_out = False
+ try:
+ while proc.returncode is None:
+ assert threads
+ proc.poll()
+ item = queue.get()
+ if isinstance(item, str):
+ threads[item].join()
+ del threads[item]
+ if item == 'timeout' and not timed_out and proc.poll() is None:
+ logging.debug('Timed out: killing')
+ proc.kill()
+ timed_out = True
+ if not threads:
+ # We won't be waken up anymore. Need to busy loop.
+ break
+ else:
+ kwargs[item[0]](item[1])
+ finally:
+ # Stop the threads.
+ done.set()
+ # Join threads
+ for thread in threads.itervalues():
+ thread.join()
+
+ # Flush the queue.
+ try:
+ while True:
+ item = queue.get(False)
+ if isinstance(item, str):
+ if item == 'timeout':
+ # TODO(maruel): Does it make sense at that point?
+ if not timed_out and proc.poll() is None:
+ logging.debug('Timed out: killing')
+ proc.kill()
+ timed_out = True
+ else:
+ kwargs[item[0]](item[1])
+ except Queue.Empty:
+ pass
+
+ # Get the remainder.
+ if callable(kwargs.get('stdout')):
+ data = proc.stdout.read()
+ while data:
+ kwargs['stdout'](data)
+ data = proc.stdout.read()
+ if callable(kwargs.get('stderr')):
+ data = proc.stderr.read()
+ while data:
+ kwargs['stderr'](data)
+ data = proc.stderr.read()
+
+ if proc.returncode is None:
+ # Usually happens when killed with timeout but not listening to pipes.
+ proc.wait()
+
+ if timed_out:
+ return TIMED_OUT
+
+ return proc.returncode
+
+
def communicate(args, timeout=None, **kwargs):
"""Wraps subprocess.Popen().communicate().
@@ -207,6 +345,11 @@ def communicate(args, timeout=None, **kwargs):
TIMED_OUT.
- Automatically passes stdin content as input so do not specify stdin=PIPE.
"""
+ if timeout and kwargs.get('shell'):
+ raise TypeError(
+ 'Using timeout and shell simultaneously will cause a process leak '
+ 'since the shell will be killed instead of the child process.')
+
stdin = kwargs.pop('stdin', None)
if stdin is not None:
if stdin is VOID:
@@ -218,36 +361,37 @@ def communicate(args, timeout=None, **kwargs):
# set the Popen() parameter accordingly.
kwargs['stdin'] = PIPE
- if not timeout:
+ start = time.time()
+ proc = Popen(args, **kwargs)
+ need_buffering = (timeout or
+ callable(kwargs.get('stdout')) or callable(kwargs.get('stderr')))
+
+ if not need_buffering:
# Normal workflow.
- proc = Popen(args, **kwargs)
- if stdin is not None:
+ if stdin not in (None, VOID):
return proc.communicate(stdin), proc.returncode
else:
return proc.communicate(), proc.returncode
- # Create a temporary file to workaround python's deadlock.
+ stdout = None
+ stderr = None
+ # Convert to a lambda to workaround python's deadlock.
# http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
- # When the pipe fills up, it will deadlock this process. Using a real file
- # works around that issue.
- with tempfile.TemporaryFile() as buff:
- start = time.time()
- kwargs['stdout'] = buff
- proc = Popen(args, **kwargs)
- if stdin is not None:
- proc.stdin.write(stdin)
- while proc.returncode is None:
- proc.poll()
- if timeout and (time.time() - start) > timeout:
- proc.kill()
- proc.wait()
- # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT.
- proc.returncode = TIMED_OUT
- time.sleep(0.001)
- # Now that the process died, reset the cursor and read the file.
- buff.seek(0)
- out = [buff.read(), None]
- return out, proc.returncode
+ # When the pipe fills up, it will deadlock this process. Using a thread
+ # works around that issue. No need for thread safe function since the call
+ # backs are guaranteed to be called from the main thread.
+ if kwargs.get('stdout') == PIPE:
+ stdout = []
+ kwargs['stdout'] = stdout.append
+ if kwargs.get('stderr') == PIPE:
+ stderr = []
+ kwargs['stderr'] = stderr.append
+ returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs)
+ if not stdout is None:
+ stdout = ''.join(stdout)
+ if not stderr is None:
+ stderr = ''.join(stderr)
+ return (stdout, stderr), returncode
def call(args, **kwargs):
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698