Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(215)

Unified Diff: subprocess2.py

Issue 8749015: Reimplement r109239 but using Popen.communicate() instead. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Address comments Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: subprocess2.py
diff --git a/subprocess2.py b/subprocess2.py
index c2f393a990004224f5329a03a74ca5bfd4f7ac86..3ad804791fcabb0b423be212f31ef16c5d060064 100644
--- a/subprocess2.py
+++ b/subprocess2.py
@@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed.
"""
from __future__ import with_statement
+import cStringIO
import errno
import logging
import os
+import Queue
import subprocess
import sys
-import tempfile
import time
import threading
@@ -170,6 +171,10 @@ class Popen(subprocess.Popen):
tmp_str += '; cwd=%s' % kwargs['cwd']
logging.debug(tmp_str)
+ self.stdout_cb = None
+ self.stderr_cb = None
+ self.stdout_void = False
+ self.stderr_void = False
def fix(stream):
if kwargs.get(stream) in (VOID, os.devnull):
# Replaces VOID with handle to /dev/null.
@@ -178,11 +183,17 @@ class Popen(subprocess.Popen):
# When the pipe fills up, it will deadlock this process. Using a real
# file works around that issue.
kwargs[stream] = open(os.devnull, 'w')
+ setattr(self, stream + '_void', True)
+ if callable(kwargs.get(stream)):
+ # Callable stdout/stderr should be used only with call() wrappers.
+ setattr(self, stream + '_cb', kwargs[stream])
+ kwargs[stream] = PIPE
fix('stdout')
fix('stderr')
self.start = time.time()
+ self.timeout = None
self.shell = kwargs.get('shell', None)
# Silence pylint on MacOSX
self.returncode = None
@@ -205,6 +216,152 @@ class Popen(subprocess.Popen):
# through
raise
+ def _tee_threads(self, input): # pylint: disable=W0622
+ """Does I/O for a process's pipes using threads.
+
+ It's the simplest and slowest implementation. Expect very slow behavior.
+
+ If there is a callback and it doesn't keep up with the calls, the timeout
+ effectiveness will be delayed accordingly.
+ """
+ # Queue of either of <threadname> when done or (<threadname>, data). In
+ # theory we would like to limit to ~64kb items to not cause large memory
+ # usage when the callback blocks. It is not done because it slows down
+ # processing on OSX10.6 by a factor of 2x, making it even slower than
+ # Windows! Revisit this decision if it becomes a problem, e.g. crash
+ # because of memory exhaustion.
+ queue = Queue.Queue()
+ done = threading.Event()
+
+ def write_stdin():
+ try:
+ stdin_io = cStringIO.StringIO(input)
+ while True:
+ data = stdin_io.read(1024)
+ if data:
+ self.stdin.write(data)
+ else:
+ self.stdin.close()
+ break
+ finally:
+ queue.put('stdin')
+
+ def _queue_pipe_read(pipe, name):
+ """Queues characters read from a pipe into a queue."""
+ try:
+ while True:
+ data = pipe.read(1)
+ if not data:
+ break
+ queue.put((name, data))
+ finally:
+ queue.put(name)
+
+ def timeout_fn():
+ try:
+ done.wait(self.timeout)
+ finally:
+ queue.put('timeout')
+
+ def wait_fn():
+ try:
+ self.wait()
+ finally:
+ queue.put('wait')
+
+ # Starts up to 5 threads:
+ # Wait for the process to quit
+ # Read stdout
+ # Read stderr
+ # Write stdin
+ # Timeout
+ threads = {
+ 'wait': threading.Thread(target=wait_fn),
+ }
+ if self.timeout is not None:
+ threads['timeout'] = threading.Thread(target=timeout_fn)
+ if self.stdout_cb:
+ threads['stdout'] = threading.Thread(
+ target=_queue_pipe_read, args=(self.stdout, 'stdout'))
+ if self.stderr_cb:
+ threads['stderr'] = threading.Thread(
+ target=_queue_pipe_read, args=(self.stderr, 'stderr'))
+ if input:
+ threads['stdin'] = threading.Thread(target=write_stdin)
+ for t in threads.itervalues():
+ t.start()
+
+ timed_out = False
+ try:
+ # This thread needs to be optimized for speed.
+ while threads:
+ item = queue.get()
+ if item[0] is 'stdout':
+ self.stdout_cb(item[1])
+ elif item[0] is 'stderr':
+ self.stderr_cb(item[1])
+ else:
+ # A thread terminated.
+ threads[item].join()
+ del threads[item]
+ if item == 'wait':
+ # Terminate the timeout thread if necessary.
+ done.set()
+ elif item == 'timeout' and not timed_out and self.poll() is None:
+ logging.debug('Timed out after %fs: killing' % self.timeout)
+ self.kill()
+ timed_out = True
+ finally:
+ # Stop the threads.
+ done.set()
+ if 'wait' in threads:
+ # Accelerate things, otherwise it would hang until the child process is
+ # done.
+ logging.debug('Killing child because of an exception')
+ self.kill()
+ # Join threads.
+ for thread in threads.itervalues():
+ thread.join()
+ if timed_out:
+ self.returncode = TIMED_OUT
+
+ def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622
+ """Adds timeout and callbacks support.
+
+ Returns (stdout, stderr) like subprocess.Popen().communicate().
+
+ - The process will be killed after |timeout| seconds and returncode set to
+ TIMED_OUT.
+ """
+ self.timeout = timeout
+ if not self.timeout and not self.stdout_cb and not self.stderr_cb:
+ return super(Popen, self).communicate(input)
+
+ if self.timeout and self.shell:
+ raise TypeError(
+ 'Using timeout and shell simultaneously will cause a process leak '
+ 'since the shell will be killed instead of the child process.')
+
+ stdout = None
+ stderr = None
+ # Convert to a lambda to workaround python's deadlock.
+ # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
+ # When the pipe fills up, it will deadlock this process. Using a thread
+ # works around that issue. No need for thread safe function since the call
+ # backs are guaranteed to be called from the main thread.
+ if self.stdout and not self.stdout_cb and not self.stdout_void:
+ stdout = cStringIO.StringIO()
+ self.stdout_cb = stdout.write
+ if self.stderr and not self.stderr_cb and not self.stderr_void:
+ stderr = cStringIO.StringIO()
+ self.stderr_cb = stderr.write
+ self._tee_threads(input)
+ if stdout:
+ stdout = stdout.getvalue()
+ if stderr:
+ stderr = stderr.getvalue()
+ return (stdout, stderr)
+
def communicate(args, timeout=None, **kwargs):
"""Wraps subprocess.Popen().communicate() and add timeout support.
@@ -226,39 +383,11 @@ def communicate(args, timeout=None, **kwargs):
# set the Popen() parameter accordingly.
kwargs['stdin'] = PIPE
- if not timeout:
- # Normal workflow.
- proc = Popen(args, **kwargs)
- if stdin is not None:
- return proc.communicate(stdin), proc.returncode
- else:
- return proc.communicate(), proc.returncode
-
- # Create a temporary file to workaround python's deadlock.
- # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
- # When the pipe fills up, it will deadlock this process. Using a real file
- # works around that issue.
- with tempfile.TemporaryFile() as buff:
- kwargs['stdout'] = buff
- proc = Popen(args, **kwargs)
- if proc.shell:
- raise TypeError(
- 'Using timeout and shell simultaneously will cause a process leak '
- 'since the shell will be killed instead of the child process.')
- if stdin is not None:
- proc.stdin.write(stdin)
- while proc.returncode is None:
- proc.poll()
- if timeout and (time.time() - proc.start) > timeout:
- proc.kill()
- proc.wait()
- # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT.
- proc.returncode = TIMED_OUT
- time.sleep(0.001)
- # Now that the process died, reset the cursor and read the file.
- buff.seek(0)
- out = (buff.read(), None)
- return out, proc.returncode
+ proc = Popen(args, **kwargs)
+ if stdin not in (None, VOID):
+ return proc.communicate(stdin, timeout), proc.returncode
+ else:
+ return proc.communicate(None, timeout), proc.returncode
def call(args, **kwargs):
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698