Index: subprocess2.py |
diff --git a/subprocess2.py b/subprocess2.py |
index 810631ae225feae33cfa53b62bd5ee44e1c1f6e6..430010c144afd6e3798fd1e8bd7b22cf94374d6b 100644 |
--- a/subprocess2.py |
+++ b/subprocess2.py |
@@ -13,11 +13,16 @@ import errno |
import logging |
import os |
import Queue |
+import select |
import subprocess |
import sys |
import time |
import threading |
+if sys.platform != 'win32': |
+ import fcntl |
+ |
+ |
# Constants forwarded from subprocess. |
PIPE = subprocess.PIPE |
STDOUT = subprocess.STDOUT |
@@ -203,7 +208,7 @@ def Popen(args, **kwargs): |
def _queue_pipe_read(pipe, name, done, dest): |
- """Queue characters read from a pipe into a queue. |
+ """Queues characters read from a pipe into a queue. |
Left outside the _tee_threads function to not introduce a function closure |
to speed up variable lookup. |
@@ -336,6 +341,80 @@ def _tee_threads(proc, timeout, start, stdin, args, kwargs): |
return proc.returncode |
+def _read_pipe(handles, pipe, out_fn): |
+ """Reads bytes from a pipe and calls the output callback.""" |
+ data = pipe.read() |
+ if not data: |
+ del handles[pipe] |
+ else: |
+ out_fn(data) |
+ |
+ |
+def _tee_posix(proc, timeout, start, stdin, args, kwargs): |
+ """Polls a process and its pipe using select.select(). |
+ |
+ TODO(maruel): Implement a non-polling method for OSes that support it. |
+ """ |
+ handles_r = {} |
+ if callable(kwargs.get('stdout')): |
+ handles_r[proc.stdout] = lambda: _read_pipe( |
+ handles_r, proc.stdout, kwargs['stdout']) |
+ if callable(kwargs.get('stderr')): |
+ handles_r[proc.stderr] = lambda: _read_pipe( |
+ handles_r, proc.stderr, kwargs['stderr']) |
+ |
+ handles_w = {} |
+ if isinstance(stdin, str): |
+ stdin_io = cStringIO.StringIO(stdin) |
+ def write_stdin(): |
+ data = stdin_io.read(1) |
+ if data: |
+ proc.stdin.write(data) |
+ else: |
+ del handles_w[proc.stdin] |
+ proc.stdin.close() |
+ handles_w[proc.stdin] = write_stdin |
+ else: |
+ # TODO(maruel): Fix me, it could be VOID. |
+ assert stdin is None |
+ |
+ # Make all the file objects of the child process non-blocking file. |
+ # TODO(maruel): Test if a pipe is handed to the child process. |
+ for pipe in (proc.stdin, proc.stdout, proc.stderr): |
+ fileno = pipe and getattr(pipe, 'fileno', lambda: None)() |
+ if fileno: |
+ # Note: making a pipe non-blocking means the C stdio could act wrong. In |
+ # particular, readline() cannot be used. Work around is to use os.read(). |
+ fl = fcntl.fcntl(fileno, fcntl.F_GETFL) |
+ fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK) |
+ |
+ timed_out = False |
+ while handles_r or handles_w or (timeout and proc.poll() is None): |
+ period = None |
+ if timeout: |
+ period = max(0, timeout - (time.time() - start)) |
+ if not period and not timed_out: |
+ proc.kill() |
+ timed_out = True |
+ if timed_out: |
+ period = 0.001 |
+ |
+ # It reconstructs objects on each loop, not very efficient. |
+ reads, writes, _, = select.select( |
+ handles_r.keys(), handles_w.keys(), [], period) |
+ for read in reads: |
+ handles_r[read]() |
+ for write in writes: |
+ handles_w[write]() |
+ |
+ # No pipe open anymore and if there was a time out, the child process was |
+ # killed already. |
+ proc.wait() |
+ if timed_out: |
+ return TIMED_OUT |
+ return proc.returncode |
+ |
+ |
def communicate(args, timeout=None, **kwargs): |
"""Wraps subprocess.Popen().communicate(). |
@@ -386,7 +465,12 @@ def communicate(args, timeout=None, **kwargs): |
if kwargs.get('stderr') == PIPE: |
stderr = [] |
kwargs['stderr'] = stderr.append |
- returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) |
+ if sys.platform == 'win32': |
+ # On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE, |
+ # doesn't exist so _tee_win() cannot be used yet. |
+ returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) |
+ else: |
+ returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs) |
if not stdout is None: |
stdout = ''.join(stdout) |
if not stderr is None: |