Index: subprocess2.py |
diff --git a/subprocess2.py b/subprocess2.py |
index c2f393a990004224f5329a03a74ca5bfd4f7ac86..3ad804791fcabb0b423be212f31ef16c5d060064 100644 |
--- a/subprocess2.py |
+++ b/subprocess2.py |
@@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed. |
""" |
from __future__ import with_statement |
+import cStringIO |
import errno |
import logging |
import os |
+import Queue |
import subprocess |
import sys |
-import tempfile |
import time |
import threading |
@@ -170,6 +171,10 @@ class Popen(subprocess.Popen): |
tmp_str += '; cwd=%s' % kwargs['cwd'] |
logging.debug(tmp_str) |
+ self.stdout_cb = None |
+ self.stderr_cb = None |
+ self.stdout_void = False |
+ self.stderr_void = False |
def fix(stream): |
if kwargs.get(stream) in (VOID, os.devnull): |
# Replaces VOID with handle to /dev/null. |
@@ -178,11 +183,17 @@ class Popen(subprocess.Popen): |
# When the pipe fills up, it will deadlock this process. Using a real |
# file works around that issue. |
kwargs[stream] = open(os.devnull, 'w') |
+ setattr(self, stream + '_void', True) |
+ if callable(kwargs.get(stream)): |
+ # Callable stdout/stderr should be used only with call() wrappers. |
+ setattr(self, stream + '_cb', kwargs[stream]) |
+ kwargs[stream] = PIPE |
fix('stdout') |
fix('stderr') |
self.start = time.time() |
+ self.timeout = None |
self.shell = kwargs.get('shell', None) |
# Silence pylint on MacOSX |
self.returncode = None |
@@ -205,6 +216,152 @@ class Popen(subprocess.Popen): |
# through |
raise |
+ def _tee_threads(self, input): # pylint: disable=W0622 |
+ """Does I/O for a process's pipes using threads. |
+ |
+ It's the simplest and slowest implementation. Expect very slow behavior. |
+ |
+ If there is a callback and it doesn't keep up with the calls, the timeout |
+ effectiveness will be delayed accordingly. |
+ """ |
+ # Queue of either of <threadname> when done or (<threadname>, data). In |
+ # theory we would like to limit to ~64kb items to not cause large memory |
+ # usage when the callback blocks. It is not done because it slows down |
+ # processing on OSX10.6 by a factor of 2x, making it even slower than |
+ # Windows! Revisit this decision if it becomes a problem, e.g. crash |
+ # because of memory exhaustion. |
+ queue = Queue.Queue() |
+ done = threading.Event() |
+ |
+ def write_stdin(): |
+ try: |
+ stdin_io = cStringIO.StringIO(input) |
+ while True: |
+ data = stdin_io.read(1024) |
+ if data: |
+ self.stdin.write(data) |
+ else: |
+ self.stdin.close() |
+ break |
+ finally: |
+ queue.put('stdin') |
+ |
+ def _queue_pipe_read(pipe, name): |
+ """Queues characters read from a pipe into a queue.""" |
+ try: |
+ while True: |
+ data = pipe.read(1) |
+ if not data: |
+ break |
+ queue.put((name, data)) |
+ finally: |
+ queue.put(name) |
+ |
+ def timeout_fn(): |
+ try: |
+ done.wait(self.timeout) |
+ finally: |
+ queue.put('timeout') |
+ |
+ def wait_fn(): |
+ try: |
+ self.wait() |
+ finally: |
+ queue.put('wait') |
+ |
+ # Starts up to 5 threads: |
+ # Wait for the process to quit |
+ # Read stdout |
+ # Read stderr |
+ # Write stdin |
+ # Timeout |
+ threads = { |
+ 'wait': threading.Thread(target=wait_fn), |
+ } |
+ if self.timeout is not None: |
+ threads['timeout'] = threading.Thread(target=timeout_fn) |
+ if self.stdout_cb: |
+ threads['stdout'] = threading.Thread( |
+ target=_queue_pipe_read, args=(self.stdout, 'stdout')) |
+ if self.stderr_cb: |
+ threads['stderr'] = threading.Thread( |
+ target=_queue_pipe_read, args=(self.stderr, 'stderr')) |
+ if input: |
+ threads['stdin'] = threading.Thread(target=write_stdin) |
+ for t in threads.itervalues(): |
+ t.start() |
+ |
+ timed_out = False |
+ try: |
+ # This thread needs to be optimized for speed. |
+ while threads: |
+ item = queue.get() |
+ if item[0] is 'stdout': |
+ self.stdout_cb(item[1]) |
+ elif item[0] is 'stderr': |
+ self.stderr_cb(item[1]) |
+ else: |
+ # A thread terminated. |
+ threads[item].join() |
+ del threads[item] |
+ if item == 'wait': |
+ # Terminate the timeout thread if necessary. |
+ done.set() |
+ elif item == 'timeout' and not timed_out and self.poll() is None: |
+ logging.debug('Timed out after %fs: killing' % self.timeout) |
+ self.kill() |
+ timed_out = True |
+ finally: |
+ # Stop the threads. |
+ done.set() |
+ if 'wait' in threads: |
+ # Accelerate things, otherwise it would hang until the child process is |
+ # done. |
+ logging.debug('Killing child because of an exception') |
+ self.kill() |
+ # Join threads. |
+ for thread in threads.itervalues(): |
+ thread.join() |
+ if timed_out: |
+ self.returncode = TIMED_OUT |
+ |
+ def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622 |
+ """Adds timeout and callbacks support. |
+ |
+ Returns (stdout, stderr) like subprocess.Popen().communicate(). |
+ |
+ - The process will be killed after |timeout| seconds and returncode set to |
+ TIMED_OUT. |
+ """ |
+ self.timeout = timeout |
+ if not self.timeout and not self.stdout_cb and not self.stderr_cb: |
+ return super(Popen, self).communicate(input) |
+ |
+ if self.timeout and self.shell: |
+ raise TypeError( |
+ 'Using timeout and shell simultaneously will cause a process leak ' |
+ 'since the shell will be killed instead of the child process.') |
+ |
+ stdout = None |
+ stderr = None |
+ # Convert to a lambda to workaround python's deadlock. |
+ # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
+ # When the pipe fills up, it will deadlock this process. Using a thread |
+ # works around that issue. No need for thread safe function since the call |
+ # backs are guaranteed to be called from the main thread. |
+ if self.stdout and not self.stdout_cb and not self.stdout_void: |
+ stdout = cStringIO.StringIO() |
+ self.stdout_cb = stdout.write |
+ if self.stderr and not self.stderr_cb and not self.stderr_void: |
+ stderr = cStringIO.StringIO() |
+ self.stderr_cb = stderr.write |
+ self._tee_threads(input) |
+ if stdout: |
+ stdout = stdout.getvalue() |
+ if stderr: |
+ stderr = stderr.getvalue() |
+ return (stdout, stderr) |
+ |
def communicate(args, timeout=None, **kwargs): |
"""Wraps subprocess.Popen().communicate() and add timeout support. |
@@ -226,39 +383,11 @@ def communicate(args, timeout=None, **kwargs): |
# set the Popen() parameter accordingly. |
kwargs['stdin'] = PIPE |
- if not timeout: |
- # Normal workflow. |
- proc = Popen(args, **kwargs) |
- if stdin is not None: |
- return proc.communicate(stdin), proc.returncode |
- else: |
- return proc.communicate(), proc.returncode |
- |
- # Create a temporary file to workaround python's deadlock. |
- # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
- # When the pipe fills up, it will deadlock this process. Using a real file |
- # works around that issue. |
- with tempfile.TemporaryFile() as buff: |
- kwargs['stdout'] = buff |
- proc = Popen(args, **kwargs) |
- if proc.shell: |
- raise TypeError( |
- 'Using timeout and shell simultaneously will cause a process leak ' |
- 'since the shell will be killed instead of the child process.') |
- if stdin is not None: |
- proc.stdin.write(stdin) |
- while proc.returncode is None: |
- proc.poll() |
- if timeout and (time.time() - proc.start) > timeout: |
- proc.kill() |
- proc.wait() |
- # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. |
- proc.returncode = TIMED_OUT |
- time.sleep(0.001) |
- # Now that the process died, reset the cursor and read the file. |
- buff.seek(0) |
- out = (buff.read(), None) |
- return out, proc.returncode |
+ proc = Popen(args, **kwargs) |
+ if stdin not in (None, VOID): |
+ return proc.communicate(stdin, timeout), proc.returncode |
+ else: |
+ return proc.communicate(None, timeout), proc.returncode |
def call(args, **kwargs): |