Index: subprocess2.py |
diff --git a/subprocess2.py b/subprocess2.py |
index 430010c144afd6e3798fd1e8bd7b22cf94374d6b..4708aadf362ee347cf3d6528acc3c9007c917208 100644 |
--- a/subprocess2.py |
+++ b/subprocess2.py |
@@ -8,21 +8,15 @@ In theory you shouldn't need anything else in subprocess, or this module failed. |
""" |
from __future__ import with_statement |
-import cStringIO |
import errno |
import logging |
import os |
-import Queue |
-import select |
import subprocess |
import sys |
+import tempfile |
import time |
import threading |
-if sys.platform != 'win32': |
- import fcntl |
- |
- |
# Constants forwarded from subprocess. |
PIPE = subprocess.PIPE |
STDOUT = subprocess.STDOUT |
@@ -182,9 +176,6 @@ def Popen(args, **kwargs): |
# When the pipe fills up, it will deadlock this process. Using a real file |
# works around that issue. |
kwargs[stream] = open(os.devnull, 'w') |
- if callable(kwargs.get(stream)): |
- # Callable stdout/stderr should be used only with call() wrappers. |
- kwargs[stream] = PIPE |
fix('stdout') |
fix('stderr') |
@@ -207,214 +198,6 @@ def Popen(args, **kwargs): |
raise |
-def _queue_pipe_read(pipe, name, done, dest): |
- """Queues characters read from a pipe into a queue. |
- |
- Left outside the _tee_threads function to not introduce a function closure |
- to speed up variable lookup. |
- """ |
- while not done.isSet(): |
- data = pipe.read(1) |
- if not data: |
- break |
- dest.put((name, data)) |
- dest.put(name) |
- |
- |
-def _tee_threads(proc, timeout, start, stdin, args, kwargs): |
- """Does I/O for a process's pipes using thread. |
- |
- It's the simplest and slowest implementation. Expect very slow behavior. |
- |
- If there is a callback and it doesn't keep up with the calls, the timeout |
- effectiveness will be delayed accordingly. |
- """ |
- # TODO(maruel): Implement a select based implementation on POSIX and a Windows |
- # one using WaitForMultipleObjects(). |
- # |
- # Queue of either of <threadname> when done or (<threadname>, data). |
- # In theory we would like to limit to ~64kb items to not cause large memory |
- # usage when the callback blocks. It is not done because it slows down |
- # processing on OSX10.6 by a factor of 2x, making it even slower than Windows! |
- # Revisit this decision if it becomes a problem, e.g. crash because of |
- # memory exhaustion. |
- queue = Queue.Queue() |
- done = threading.Event() |
- |
- def write_stdin(): |
- stdin_io = cStringIO.StringIO(stdin) |
- while not done.isSet(): |
- data = stdin_io.read(1024) |
- if data: |
- proc.stdin.write(data) |
- else: |
- proc.stdin.close() |
- break |
- queue.put('stdin') |
- |
- def timeout_fn(): |
- done.wait(timeout) |
- # No need to close the pipes since killing should be sufficient. |
- queue.put('timeout') |
- |
- # Starts up to 4 threads: |
- # Read stdout |
- # Read stderr |
- # Write stdin |
- # Timeout |
- threads = {} |
- if timeout is not None: |
- threads['timeout'] = threading.Thread(target=timeout_fn) |
- if callable(kwargs.get('stdout')): |
- threads['stdout'] = threading.Thread( |
- target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue)) |
- if callable(kwargs.get('stderr')): |
- threads['stderr'] = threading.Thread( |
- target=_queue_pipe_read, |
- args=(proc.stderr, 'stderr', done, queue)) |
- if isinstance(stdin, str): |
- threads['stdin'] = threading.Thread(target=write_stdin) |
- for t in threads.itervalues(): |
- t.daemon = True |
- t.start() |
- |
- timed_out = False |
- try: |
- while proc.returncode is None: |
- assert threads |
- proc.poll() |
- item = queue.get() |
- if isinstance(item, str): |
- threads[item].join() |
- del threads[item] |
- if item == 'timeout' and not timed_out and proc.poll() is None: |
- logging.debug('Timed out: killing') |
- proc.kill() |
- timed_out = True |
- if not threads: |
- # We won't be waken up anymore. Need to busy loop. |
- break |
- else: |
- kwargs[item[0]](item[1]) |
- finally: |
- # Stop the threads. |
- done.set() |
- # Join threads |
- for thread in threads.itervalues(): |
- thread.join() |
- |
- # Flush the queue. |
- try: |
- while True: |
- item = queue.get(False) |
- if isinstance(item, str): |
- if item == 'timeout': |
- # TODO(maruel): Does it make sense at that point? |
- if not timed_out and proc.poll() is None: |
- logging.debug('Timed out: killing') |
- proc.kill() |
- timed_out = True |
- else: |
- kwargs[item[0]](item[1]) |
- except Queue.Empty: |
- pass |
- |
- # Get the remainder. |
- if callable(kwargs.get('stdout')): |
- data = proc.stdout.read() |
- while data: |
- kwargs['stdout'](data) |
- data = proc.stdout.read() |
- if callable(kwargs.get('stderr')): |
- data = proc.stderr.read() |
- while data: |
- kwargs['stderr'](data) |
- data = proc.stderr.read() |
- |
- if proc.returncode is None: |
- # Usually happens when killed with timeout but not listening to pipes. |
- proc.wait() |
- |
- if timed_out: |
- return TIMED_OUT |
- |
- return proc.returncode |
- |
- |
-def _read_pipe(handles, pipe, out_fn): |
- """Reads bytes from a pipe and calls the output callback.""" |
- data = pipe.read() |
- if not data: |
- del handles[pipe] |
- else: |
- out_fn(data) |
- |
- |
-def _tee_posix(proc, timeout, start, stdin, args, kwargs): |
- """Polls a process and its pipe using select.select(). |
- |
- TODO(maruel): Implement a non-polling method for OSes that support it. |
- """ |
- handles_r = {} |
- if callable(kwargs.get('stdout')): |
- handles_r[proc.stdout] = lambda: _read_pipe( |
- handles_r, proc.stdout, kwargs['stdout']) |
- if callable(kwargs.get('stderr')): |
- handles_r[proc.stderr] = lambda: _read_pipe( |
- handles_r, proc.stderr, kwargs['stderr']) |
- |
- handles_w = {} |
- if isinstance(stdin, str): |
- stdin_io = cStringIO.StringIO(stdin) |
- def write_stdin(): |
- data = stdin_io.read(1) |
- if data: |
- proc.stdin.write(data) |
- else: |
- del handles_w[proc.stdin] |
- proc.stdin.close() |
- handles_w[proc.stdin] = write_stdin |
- else: |
- # TODO(maruel): Fix me, it could be VOID. |
- assert stdin is None |
- |
- # Make all the file objects of the child process non-blocking file. |
- # TODO(maruel): Test if a pipe is handed to the child process. |
- for pipe in (proc.stdin, proc.stdout, proc.stderr): |
- fileno = pipe and getattr(pipe, 'fileno', lambda: None)() |
- if fileno: |
- # Note: making a pipe non-blocking means the C stdio could act wrong. In |
- # particular, readline() cannot be used. Work around is to use os.read(). |
- fl = fcntl.fcntl(fileno, fcntl.F_GETFL) |
- fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK) |
- |
- timed_out = False |
- while handles_r or handles_w or (timeout and proc.poll() is None): |
- period = None |
- if timeout: |
- period = max(0, timeout - (time.time() - start)) |
- if not period and not timed_out: |
- proc.kill() |
- timed_out = True |
- if timed_out: |
- period = 0.001 |
- |
- # It reconstructs objects on each loop, not very efficient. |
- reads, writes, _, = select.select( |
- handles_r.keys(), handles_w.keys(), [], period) |
- for read in reads: |
- handles_r[read]() |
- for write in writes: |
- handles_w[write]() |
- |
- # No pipe open anymore and if there was a time out, the child process was |
- # killed already. |
- proc.wait() |
- if timed_out: |
- return TIMED_OUT |
- return proc.returncode |
- |
- |
def communicate(args, timeout=None, **kwargs): |
"""Wraps subprocess.Popen().communicate(). |
@@ -424,11 +207,6 @@ def communicate(args, timeout=None, **kwargs): |
TIMED_OUT. |
- Automatically passes stdin content as input so do not specify stdin=PIPE. |
""" |
- if timeout and kwargs.get('shell'): |
- raise TypeError( |
- 'Using timeout and shell simultaneously will cause a process leak ' |
- 'since the shell will be killed instead of the child process.') |
- |
stdin = kwargs.pop('stdin', None) |
if stdin is not None: |
if stdin is VOID: |
@@ -440,42 +218,36 @@ def communicate(args, timeout=None, **kwargs): |
# set the Popen() parameter accordingly. |
kwargs['stdin'] = PIPE |
- start = time.time() |
- proc = Popen(args, **kwargs) |
- need_buffering = (timeout or |
- callable(kwargs.get('stdout')) or callable(kwargs.get('stderr'))) |
- |
- if not need_buffering: |
+ if not timeout: |
# Normal workflow. |
- if stdin not in (None, VOID): |
+ proc = Popen(args, **kwargs) |
+ if stdin is not None: |
return proc.communicate(stdin), proc.returncode |
else: |
return proc.communicate(), proc.returncode |
- stdout = None |
- stderr = None |
- # Convert to a lambda to workaround python's deadlock. |
+ # Create a temporary file to workaround python's deadlock. |
# http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
- # When the pipe fills up, it will deadlock this process. Using a thread |
- # works around that issue. No need for thread safe function since the call |
- # backs are guaranteed to be called from the main thread. |
- if kwargs.get('stdout') == PIPE: |
- stdout = [] |
- kwargs['stdout'] = stdout.append |
- if kwargs.get('stderr') == PIPE: |
- stderr = [] |
- kwargs['stderr'] = stderr.append |
- if sys.platform == 'win32': |
- # On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE, |
- # doesn't exist so _tee_win() cannot be used yet. |
- returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) |
- else: |
- returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs) |
- if not stdout is None: |
- stdout = ''.join(stdout) |
- if not stderr is None: |
- stderr = ''.join(stderr) |
- return (stdout, stderr), returncode |
+ # When the pipe fills up, it will deadlock this process. Using a real file |
+ # works around that issue. |
+ with tempfile.TemporaryFile() as buff: |
+ start = time.time() |
+ kwargs['stdout'] = buff |
+ proc = Popen(args, **kwargs) |
+ if stdin is not None: |
+ proc.stdin.write(stdin) |
+ while proc.returncode is None: |
+ proc.poll() |
+ if timeout and (time.time() - start) > timeout: |
+ proc.kill() |
+ proc.wait() |
+ # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. |
+ proc.returncode = TIMED_OUT |
+ time.sleep(0.001) |
+ # Now that the process died, reset the cursor and read the file. |
+ buff.seek(0) |
+ out = [buff.read(), None] |
+ return out, proc.returncode |
def call(args, **kwargs): |