| Index: subprocess2.py
|
| diff --git a/subprocess2.py b/subprocess2.py
|
| index 430010c144afd6e3798fd1e8bd7b22cf94374d6b..4708aadf362ee347cf3d6528acc3c9007c917208 100644
|
| --- a/subprocess2.py
|
| +++ b/subprocess2.py
|
| @@ -8,21 +8,15 @@ In theory you shouldn't need anything else in subprocess, or this module failed.
|
| """
|
|
|
| from __future__ import with_statement
|
| -import cStringIO
|
| import errno
|
| import logging
|
| import os
|
| -import Queue
|
| -import select
|
| import subprocess
|
| import sys
|
| +import tempfile
|
| import time
|
| import threading
|
|
|
| -if sys.platform != 'win32':
|
| - import fcntl
|
| -
|
| -
|
| # Constants forwarded from subprocess.
|
| PIPE = subprocess.PIPE
|
| STDOUT = subprocess.STDOUT
|
| @@ -182,9 +176,6 @@ def Popen(args, **kwargs):
|
| # When the pipe fills up, it will deadlock this process. Using a real file
|
| # works around that issue.
|
| kwargs[stream] = open(os.devnull, 'w')
|
| - if callable(kwargs.get(stream)):
|
| - # Callable stdout/stderr should be used only with call() wrappers.
|
| - kwargs[stream] = PIPE
|
|
|
| fix('stdout')
|
| fix('stderr')
|
| @@ -207,214 +198,6 @@ def Popen(args, **kwargs):
|
| raise
|
|
|
|
|
| -def _queue_pipe_read(pipe, name, done, dest):
|
| - """Queues characters read from a pipe into a queue.
|
| -
|
| - Left outside the _tee_threads function to not introduce a function closure
|
| - to speed up variable lookup.
|
| - """
|
| - while not done.isSet():
|
| - data = pipe.read(1)
|
| - if not data:
|
| - break
|
| - dest.put((name, data))
|
| - dest.put(name)
|
| -
|
| -
|
| -def _tee_threads(proc, timeout, start, stdin, args, kwargs):
|
| - """Does I/O for a process's pipes using thread.
|
| -
|
| - It's the simplest and slowest implementation. Expect very slow behavior.
|
| -
|
| - If there is a callback and it doesn't keep up with the calls, the timeout
|
| - effectiveness will be delayed accordingly.
|
| - """
|
| - # TODO(maruel): Implement a select based implementation on POSIX and a Windows
|
| - # one using WaitForMultipleObjects().
|
| - #
|
| - # Queue of either of <threadname> when done or (<threadname>, data).
|
| - # In theory we would like to limit to ~64kb items to not cause large memory
|
| - # usage when the callback blocks. It is not done because it slows down
|
| - # processing on OSX10.6 by a factor of 2x, making it even slower than Windows!
|
| - # Revisit this decision if it becomes a problem, e.g. crash because of
|
| - # memory exhaustion.
|
| - queue = Queue.Queue()
|
| - done = threading.Event()
|
| -
|
| - def write_stdin():
|
| - stdin_io = cStringIO.StringIO(stdin)
|
| - while not done.isSet():
|
| - data = stdin_io.read(1024)
|
| - if data:
|
| - proc.stdin.write(data)
|
| - else:
|
| - proc.stdin.close()
|
| - break
|
| - queue.put('stdin')
|
| -
|
| - def timeout_fn():
|
| - done.wait(timeout)
|
| - # No need to close the pipes since killing should be sufficient.
|
| - queue.put('timeout')
|
| -
|
| - # Starts up to 4 threads:
|
| - # Read stdout
|
| - # Read stderr
|
| - # Write stdin
|
| - # Timeout
|
| - threads = {}
|
| - if timeout is not None:
|
| - threads['timeout'] = threading.Thread(target=timeout_fn)
|
| - if callable(kwargs.get('stdout')):
|
| - threads['stdout'] = threading.Thread(
|
| - target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue))
|
| - if callable(kwargs.get('stderr')):
|
| - threads['stderr'] = threading.Thread(
|
| - target=_queue_pipe_read,
|
| - args=(proc.stderr, 'stderr', done, queue))
|
| - if isinstance(stdin, str):
|
| - threads['stdin'] = threading.Thread(target=write_stdin)
|
| - for t in threads.itervalues():
|
| - t.daemon = True
|
| - t.start()
|
| -
|
| - timed_out = False
|
| - try:
|
| - while proc.returncode is None:
|
| - assert threads
|
| - proc.poll()
|
| - item = queue.get()
|
| - if isinstance(item, str):
|
| - threads[item].join()
|
| - del threads[item]
|
| - if item == 'timeout' and not timed_out and proc.poll() is None:
|
| - logging.debug('Timed out: killing')
|
| - proc.kill()
|
| - timed_out = True
|
| - if not threads:
|
| - # We won't be waken up anymore. Need to busy loop.
|
| - break
|
| - else:
|
| - kwargs[item[0]](item[1])
|
| - finally:
|
| - # Stop the threads.
|
| - done.set()
|
| - # Join threads
|
| - for thread in threads.itervalues():
|
| - thread.join()
|
| -
|
| - # Flush the queue.
|
| - try:
|
| - while True:
|
| - item = queue.get(False)
|
| - if isinstance(item, str):
|
| - if item == 'timeout':
|
| - # TODO(maruel): Does it make sense at that point?
|
| - if not timed_out and proc.poll() is None:
|
| - logging.debug('Timed out: killing')
|
| - proc.kill()
|
| - timed_out = True
|
| - else:
|
| - kwargs[item[0]](item[1])
|
| - except Queue.Empty:
|
| - pass
|
| -
|
| - # Get the remainder.
|
| - if callable(kwargs.get('stdout')):
|
| - data = proc.stdout.read()
|
| - while data:
|
| - kwargs['stdout'](data)
|
| - data = proc.stdout.read()
|
| - if callable(kwargs.get('stderr')):
|
| - data = proc.stderr.read()
|
| - while data:
|
| - kwargs['stderr'](data)
|
| - data = proc.stderr.read()
|
| -
|
| - if proc.returncode is None:
|
| - # Usually happens when killed with timeout but not listening to pipes.
|
| - proc.wait()
|
| -
|
| - if timed_out:
|
| - return TIMED_OUT
|
| -
|
| - return proc.returncode
|
| -
|
| -
|
| -def _read_pipe(handles, pipe, out_fn):
|
| - """Reads bytes from a pipe and calls the output callback."""
|
| - data = pipe.read()
|
| - if not data:
|
| - del handles[pipe]
|
| - else:
|
| - out_fn(data)
|
| -
|
| -
|
| -def _tee_posix(proc, timeout, start, stdin, args, kwargs):
|
| - """Polls a process and its pipe using select.select().
|
| -
|
| - TODO(maruel): Implement a non-polling method for OSes that support it.
|
| - """
|
| - handles_r = {}
|
| - if callable(kwargs.get('stdout')):
|
| - handles_r[proc.stdout] = lambda: _read_pipe(
|
| - handles_r, proc.stdout, kwargs['stdout'])
|
| - if callable(kwargs.get('stderr')):
|
| - handles_r[proc.stderr] = lambda: _read_pipe(
|
| - handles_r, proc.stderr, kwargs['stderr'])
|
| -
|
| - handles_w = {}
|
| - if isinstance(stdin, str):
|
| - stdin_io = cStringIO.StringIO(stdin)
|
| - def write_stdin():
|
| - data = stdin_io.read(1)
|
| - if data:
|
| - proc.stdin.write(data)
|
| - else:
|
| - del handles_w[proc.stdin]
|
| - proc.stdin.close()
|
| - handles_w[proc.stdin] = write_stdin
|
| - else:
|
| - # TODO(maruel): Fix me, it could be VOID.
|
| - assert stdin is None
|
| -
|
| - # Make all the file objects of the child process non-blocking file.
|
| - # TODO(maruel): Test if a pipe is handed to the child process.
|
| - for pipe in (proc.stdin, proc.stdout, proc.stderr):
|
| - fileno = pipe and getattr(pipe, 'fileno', lambda: None)()
|
| - if fileno:
|
| - # Note: making a pipe non-blocking means the C stdio could act wrong. In
|
| - # particular, readline() cannot be used. Work around is to use os.read().
|
| - fl = fcntl.fcntl(fileno, fcntl.F_GETFL)
|
| - fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
| -
|
| - timed_out = False
|
| - while handles_r or handles_w or (timeout and proc.poll() is None):
|
| - period = None
|
| - if timeout:
|
| - period = max(0, timeout - (time.time() - start))
|
| - if not period and not timed_out:
|
| - proc.kill()
|
| - timed_out = True
|
| - if timed_out:
|
| - period = 0.001
|
| -
|
| - # It reconstructs objects on each loop, not very efficient.
|
| - reads, writes, _, = select.select(
|
| - handles_r.keys(), handles_w.keys(), [], period)
|
| - for read in reads:
|
| - handles_r[read]()
|
| - for write in writes:
|
| - handles_w[write]()
|
| -
|
| - # No pipe open anymore and if there was a time out, the child process was
|
| - # killed already.
|
| - proc.wait()
|
| - if timed_out:
|
| - return TIMED_OUT
|
| - return proc.returncode
|
| -
|
| -
|
| def communicate(args, timeout=None, **kwargs):
|
| """Wraps subprocess.Popen().communicate().
|
|
|
| @@ -424,11 +207,6 @@ def communicate(args, timeout=None, **kwargs):
|
| TIMED_OUT.
|
| - Automatically passes stdin content as input so do not specify stdin=PIPE.
|
| """
|
| - if timeout and kwargs.get('shell'):
|
| - raise TypeError(
|
| - 'Using timeout and shell simultaneously will cause a process leak '
|
| - 'since the shell will be killed instead of the child process.')
|
| -
|
| stdin = kwargs.pop('stdin', None)
|
| if stdin is not None:
|
| if stdin is VOID:
|
| @@ -440,42 +218,36 @@ def communicate(args, timeout=None, **kwargs):
|
| # set the Popen() parameter accordingly.
|
| kwargs['stdin'] = PIPE
|
|
|
| - start = time.time()
|
| - proc = Popen(args, **kwargs)
|
| - need_buffering = (timeout or
|
| - callable(kwargs.get('stdout')) or callable(kwargs.get('stderr')))
|
| -
|
| - if not need_buffering:
|
| + if not timeout:
|
| # Normal workflow.
|
| - if stdin not in (None, VOID):
|
| + proc = Popen(args, **kwargs)
|
| + if stdin is not None:
|
| return proc.communicate(stdin), proc.returncode
|
| else:
|
| return proc.communicate(), proc.returncode
|
|
|
| - stdout = None
|
| - stderr = None
|
| - # Convert to a lambda to workaround python's deadlock.
|
| + # Create a temporary file to workaround python's deadlock.
|
| # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
|
| - # When the pipe fills up, it will deadlock this process. Using a thread
|
| - # works around that issue. No need for thread safe function since the call
|
| - # backs are guaranteed to be called from the main thread.
|
| - if kwargs.get('stdout') == PIPE:
|
| - stdout = []
|
| - kwargs['stdout'] = stdout.append
|
| - if kwargs.get('stderr') == PIPE:
|
| - stderr = []
|
| - kwargs['stderr'] = stderr.append
|
| - if sys.platform == 'win32':
|
| - # On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE,
|
| - # doesn't exist so _tee_win() cannot be used yet.
|
| - returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs)
|
| - else:
|
| - returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs)
|
| - if not stdout is None:
|
| - stdout = ''.join(stdout)
|
| - if not stderr is None:
|
| - stderr = ''.join(stderr)
|
| - return (stdout, stderr), returncode
|
| + # When the pipe fills up, it will deadlock this process. Using a real file
|
| + # works around that issue.
|
| + with tempfile.TemporaryFile() as buff:
|
| + start = time.time()
|
| + kwargs['stdout'] = buff
|
| + proc = Popen(args, **kwargs)
|
| + if stdin is not None:
|
| + proc.stdin.write(stdin)
|
| + while proc.returncode is None:
|
| + proc.poll()
|
| + if timeout and (time.time() - start) > timeout:
|
| + proc.kill()
|
| + proc.wait()
|
| + # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT.
|
| + proc.returncode = TIMED_OUT
|
| + time.sleep(0.001)
|
| + # Now that the process died, reset the cursor and read the file.
|
| + buff.seek(0)
|
| + out = [buff.read(), None]
|
| + return out, proc.returncode
|
|
|
|
|
| def call(args, **kwargs):
|
|
|