Index: subprocess2.py |
diff --git a/subprocess2.py b/subprocess2.py |
index 4708aadf362ee347cf3d6528acc3c9007c917208..810631ae225feae33cfa53b62bd5ee44e1c1f6e6 100644 |
--- a/subprocess2.py |
+++ b/subprocess2.py |
@@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed. |
""" |
from __future__ import with_statement |
+import cStringIO |
import errno |
import logging |
import os |
+import Queue |
import subprocess |
import sys |
-import tempfile |
import time |
import threading |
@@ -176,6 +177,9 @@ def Popen(args, **kwargs): |
# When the pipe fills up, it will deadlock this process. Using a real file |
# works around that issue. |
kwargs[stream] = open(os.devnull, 'w') |
+ if callable(kwargs.get(stream)): |
+ # Callable stdout/stderr should be used only with call() wrappers. |
+ kwargs[stream] = PIPE |
fix('stdout') |
fix('stderr') |
@@ -198,6 +202,140 @@ def Popen(args, **kwargs): |
raise |
+def _queue_pipe_read(pipe, name, done, dest): |
+ """Queue characters read from a pipe into a queue. |
+ |
+ Left outside the _tee_threads function to not introduce a function closure |
+ to speed up variable lookup. |
+ """ |
+ while not done.isSet(): |
+ data = pipe.read(1) |
+ if not data: |
+ break |
+ dest.put((name, data)) |
Dirk Pranke
2011/11/04 01:15:11
This timeout doesn't really work if the pipes are
M-A Ruel
2011/11/04 01:41:48
If the process is killed, the pipes are closed, wh
|
+ dest.put(name) |
+ |
+ |
+def _tee_threads(proc, timeout, start, stdin, args, kwargs): |
+ """Does I/O for a process's pipes using thread. |
+ |
+ It's the simplest and slowest implementation. Expect very slow behavior. |
+ |
+ If there is a callback and it doesn't keep up with the calls, the timeout |
+ effectiveness will be delayed accordingly. |
+ """ |
+ # TODO(maruel): Implement a select based implementation on POSIX and a Windows |
+ # one using WaitForMultipleObjects(). |
+ # |
+ # Queue of either of <threadname> when done or (<threadname>, data). |
+ # In theory we would like to limit to ~64kb items to not cause large memory |
+ # usage when the callback blocks. It is not done because it slows down |
+ # processing on OSX10.6 by a factor of 2x, making it even slower than Windows! |
+ # Revisit this decision if it becomes a problem, e.g. crash because of |
+ # memory exhaustion. |
+ queue = Queue.Queue() |
+ done = threading.Event() |
+ |
+ def write_stdin(): |
+ stdin_io = cStringIO.StringIO(stdin) |
+ while not done.isSet(): |
+ data = stdin_io.read(1024) |
+ if data: |
+ proc.stdin.write(data) |
+ else: |
+ proc.stdin.close() |
+ break |
+ queue.put('stdin') |
+ |
+ def timeout_fn(): |
+ done.wait(timeout) |
+ # No need to close the pipes since killing should be sufficient. |
+ queue.put('timeout') |
+ |
+ # Starts up to 4 threads: |
+ # Read stdout |
+ # Read stderr |
+ # Write stdin |
+ # Timeout |
+ threads = {} |
+ if timeout is not None: |
+ threads['timeout'] = threading.Thread(target=timeout_fn) |
+ if callable(kwargs.get('stdout')): |
+ threads['stdout'] = threading.Thread( |
+ target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue)) |
+ if callable(kwargs.get('stderr')): |
+ threads['stderr'] = threading.Thread( |
+ target=_queue_pipe_read, |
+ args=(proc.stderr, 'stderr', done, queue)) |
+ if isinstance(stdin, str): |
+ threads['stdin'] = threading.Thread(target=write_stdin) |
+ for t in threads.itervalues(): |
+ t.daemon = True |
+ t.start() |
+ |
+ timed_out = False |
+ try: |
+ while proc.returncode is None: |
+ assert threads |
+ proc.poll() |
+ item = queue.get() |
+ if isinstance(item, str): |
+ threads[item].join() |
+ del threads[item] |
+ if item == 'timeout' and not timed_out and proc.poll() is None: |
+ logging.debug('Timed out: killing') |
+ proc.kill() |
+ timed_out = True |
+ if not threads: |
+ # We won't be waken up anymore. Need to busy loop. |
+ break |
+ else: |
+ kwargs[item[0]](item[1]) |
+ finally: |
+ # Stop the threads. |
+ done.set() |
+ # Join threads |
+ for thread in threads.itervalues(): |
+ thread.join() |
+ |
+ # Flush the queue. |
+ try: |
+ while True: |
+ item = queue.get(False) |
+ if isinstance(item, str): |
+ if item == 'timeout': |
+ # TODO(maruel): Does it make sense at that point? |
+ if not timed_out and proc.poll() is None: |
+ logging.debug('Timed out: killing') |
+ proc.kill() |
+ timed_out = True |
+ else: |
+ kwargs[item[0]](item[1]) |
+ except Queue.Empty: |
+ pass |
+ |
+ # Get the remainder. |
+ if callable(kwargs.get('stdout')): |
+ data = proc.stdout.read() |
+ while data: |
+ kwargs['stdout'](data) |
+ data = proc.stdout.read() |
+ if callable(kwargs.get('stderr')): |
+ data = proc.stderr.read() |
+ while data: |
+ kwargs['stderr'](data) |
+ data = proc.stderr.read() |
+ |
+ if proc.returncode is None: |
+ # Usually happens when killed with timeout but not listening to pipes. |
+ proc.wait() |
+ |
+ if timed_out: |
+ return TIMED_OUT |
+ |
+ return proc.returncode |
+ |
+ |
def communicate(args, timeout=None, **kwargs): |
"""Wraps subprocess.Popen().communicate(). |
@@ -207,6 +345,11 @@ def communicate(args, timeout=None, **kwargs): |
TIMED_OUT. |
- Automatically passes stdin content as input so do not specify stdin=PIPE. |
""" |
+ if timeout and kwargs.get('shell'): |
+ raise TypeError( |
+ 'Using timeout and shell simultaneously will cause a process leak ' |
+ 'since the shell will be killed instead of the child process.') |
+ |
stdin = kwargs.pop('stdin', None) |
if stdin is not None: |
if stdin is VOID: |
@@ -218,36 +361,37 @@ def communicate(args, timeout=None, **kwargs): |
# set the Popen() parameter accordingly. |
kwargs['stdin'] = PIPE |
- if not timeout: |
+ start = time.time() |
+ proc = Popen(args, **kwargs) |
+ need_buffering = (timeout or |
+ callable(kwargs.get('stdout')) or callable(kwargs.get('stderr'))) |
+ |
+ if not need_buffering: |
# Normal workflow. |
- proc = Popen(args, **kwargs) |
- if stdin is not None: |
+ if stdin not in (None, VOID): |
return proc.communicate(stdin), proc.returncode |
else: |
return proc.communicate(), proc.returncode |
- # Create a temporary file to workaround python's deadlock. |
+ stdout = None |
+ stderr = None |
+ # Convert to a lambda to workaround python's deadlock. |
# http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
- # When the pipe fills up, it will deadlock this process. Using a real file |
- # works around that issue. |
- with tempfile.TemporaryFile() as buff: |
- start = time.time() |
- kwargs['stdout'] = buff |
- proc = Popen(args, **kwargs) |
- if stdin is not None: |
- proc.stdin.write(stdin) |
- while proc.returncode is None: |
- proc.poll() |
- if timeout and (time.time() - start) > timeout: |
- proc.kill() |
- proc.wait() |
- # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. |
- proc.returncode = TIMED_OUT |
- time.sleep(0.001) |
- # Now that the process died, reset the cursor and read the file. |
- buff.seek(0) |
- out = [buff.read(), None] |
- return out, proc.returncode |
+ # When the pipe fills up, it will deadlock this process. Using a thread |
+ # works around that issue. No need for thread safe function since the call |
+ # backs are guaranteed to be called from the main thread. |
+ if kwargs.get('stdout') == PIPE: |
+ stdout = [] |
+ kwargs['stdout'] = stdout.append |
+ if kwargs.get('stderr') == PIPE: |
+ stderr = [] |
+ kwargs['stderr'] = stderr.append |
+ returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) |
+ if not stdout is None: |
+ stdout = ''.join(stdout) |
+ if not stderr is None: |
+ stderr = ''.join(stderr) |
+ return (stdout, stderr), returncode |
def call(args, **kwargs): |