| Index: subprocess2.py
|
| diff --git a/subprocess2.py b/subprocess2.py
|
| index c2f393a990004224f5329a03a74ca5bfd4f7ac86..3ad804791fcabb0b423be212f31ef16c5d060064 100644
|
| --- a/subprocess2.py
|
| +++ b/subprocess2.py
|
| @@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed.
|
| """
|
|
|
| from __future__ import with_statement
|
| +import cStringIO
|
| import errno
|
| import logging
|
| import os
|
| +import Queue
|
| import subprocess
|
| import sys
|
| -import tempfile
|
| import time
|
| import threading
|
|
|
| @@ -170,6 +171,10 @@ class Popen(subprocess.Popen):
|
| tmp_str += '; cwd=%s' % kwargs['cwd']
|
| logging.debug(tmp_str)
|
|
|
| + self.stdout_cb = None
|
| + self.stderr_cb = None
|
| + self.stdout_void = False
|
| + self.stderr_void = False
|
| def fix(stream):
|
| if kwargs.get(stream) in (VOID, os.devnull):
|
| # Replaces VOID with handle to /dev/null.
|
| @@ -178,11 +183,17 @@ class Popen(subprocess.Popen):
|
| # When the pipe fills up, it will deadlock this process. Using a real
|
| # file works around that issue.
|
| kwargs[stream] = open(os.devnull, 'w')
|
| + setattr(self, stream + '_void', True)
|
| + if callable(kwargs.get(stream)):
|
| + # Callable stdout/stderr should be used only with call() wrappers.
|
| + setattr(self, stream + '_cb', kwargs[stream])
|
| + kwargs[stream] = PIPE
|
|
|
| fix('stdout')
|
| fix('stderr')
|
|
|
| self.start = time.time()
|
| + self.timeout = None
|
| self.shell = kwargs.get('shell', None)
|
| # Silence pylint on MacOSX
|
| self.returncode = None
|
| @@ -205,6 +216,152 @@ class Popen(subprocess.Popen):
|
| # through
|
| raise
|
|
|
| + def _tee_threads(self, input): # pylint: disable=W0622
|
| + """Does I/O for a process's pipes using threads.
|
| +
|
| + It's the simplest and slowest implementation. Expect very slow behavior.
|
| +
|
| + If there is a callback and it doesn't keep up with the calls, the timeout
|
| + effectiveness will be delayed accordingly.
|
| + """
|
| + # Queue of either of <threadname> when done or (<threadname>, data). In
|
| + # theory we would like to limit to ~64kb items to not cause large memory
|
| + # usage when the callback blocks. It is not done because it slows down
|
| + # processing on OSX10.6 by a factor of 2x, making it even slower than
|
| + # Windows! Revisit this decision if it becomes a problem, e.g. crash
|
| + # because of memory exhaustion.
|
| + queue = Queue.Queue()
|
| + done = threading.Event()
|
| +
|
| + def write_stdin():
|
| + try:
|
| + stdin_io = cStringIO.StringIO(input)
|
| + while True:
|
| + data = stdin_io.read(1024)
|
| + if data:
|
| + self.stdin.write(data)
|
| + else:
|
| + self.stdin.close()
|
| + break
|
| + finally:
|
| + queue.put('stdin')
|
| +
|
| + def _queue_pipe_read(pipe, name):
|
| + """Queues characters read from a pipe into a queue."""
|
| + try:
|
| + while True:
|
| + data = pipe.read(1)
|
| + if not data:
|
| + break
|
| + queue.put((name, data))
|
| + finally:
|
| + queue.put(name)
|
| +
|
| + def timeout_fn():
|
| + try:
|
| + done.wait(self.timeout)
|
| + finally:
|
| + queue.put('timeout')
|
| +
|
| + def wait_fn():
|
| + try:
|
| + self.wait()
|
| + finally:
|
| + queue.put('wait')
|
| +
|
| + # Starts up to 5 threads:
|
| + # Wait for the process to quit
|
| + # Read stdout
|
| + # Read stderr
|
| + # Write stdin
|
| + # Timeout
|
| + threads = {
|
| + 'wait': threading.Thread(target=wait_fn),
|
| + }
|
| + if self.timeout is not None:
|
| + threads['timeout'] = threading.Thread(target=timeout_fn)
|
| + if self.stdout_cb:
|
| + threads['stdout'] = threading.Thread(
|
| + target=_queue_pipe_read, args=(self.stdout, 'stdout'))
|
| + if self.stderr_cb:
|
| + threads['stderr'] = threading.Thread(
|
| + target=_queue_pipe_read, args=(self.stderr, 'stderr'))
|
| + if input:
|
| + threads['stdin'] = threading.Thread(target=write_stdin)
|
| + for t in threads.itervalues():
|
| + t.start()
|
| +
|
| + timed_out = False
|
| + try:
|
| + # This thread needs to be optimized for speed.
|
| + while threads:
|
| + item = queue.get()
|
| + if item[0] is 'stdout':
|
| + self.stdout_cb(item[1])
|
| + elif item[0] is 'stderr':
|
| + self.stderr_cb(item[1])
|
| + else:
|
| + # A thread terminated.
|
| + threads[item].join()
|
| + del threads[item]
|
| + if item == 'wait':
|
| + # Terminate the timeout thread if necessary.
|
| + done.set()
|
| + elif item == 'timeout' and not timed_out and self.poll() is None:
|
| + logging.debug('Timed out after %fs: killing' % self.timeout)
|
| + self.kill()
|
| + timed_out = True
|
| + finally:
|
| + # Stop the threads.
|
| + done.set()
|
| + if 'wait' in threads:
|
| + # Accelerate things, otherwise it would hang until the child process is
|
| + # done.
|
| + logging.debug('Killing child because of an exception')
|
| + self.kill()
|
| + # Join threads.
|
| + for thread in threads.itervalues():
|
| + thread.join()
|
| + if timed_out:
|
| + self.returncode = TIMED_OUT
|
| +
|
| + def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622
|
| + """Adds timeout and callbacks support.
|
| +
|
| + Returns (stdout, stderr) like subprocess.Popen().communicate().
|
| +
|
| + - The process will be killed after |timeout| seconds and returncode set to
|
| + TIMED_OUT.
|
| + """
|
| + self.timeout = timeout
|
| + if not self.timeout and not self.stdout_cb and not self.stderr_cb:
|
| + return super(Popen, self).communicate(input)
|
| +
|
| + if self.timeout and self.shell:
|
| + raise TypeError(
|
| + 'Using timeout and shell simultaneously will cause a process leak '
|
| + 'since the shell will be killed instead of the child process.')
|
| +
|
| + stdout = None
|
| + stderr = None
|
| + # Convert to a lambda to workaround python's deadlock.
|
| + # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
|
| + # When the pipe fills up, it will deadlock this process. Using a thread
|
| + # works around that issue. No need for thread safe function since the call
|
| + # backs are guaranteed to be called from the main thread.
|
| + if self.stdout and not self.stdout_cb and not self.stdout_void:
|
| + stdout = cStringIO.StringIO()
|
| + self.stdout_cb = stdout.write
|
| + if self.stderr and not self.stderr_cb and not self.stderr_void:
|
| + stderr = cStringIO.StringIO()
|
| + self.stderr_cb = stderr.write
|
| + self._tee_threads(input)
|
| + if stdout:
|
| + stdout = stdout.getvalue()
|
| + if stderr:
|
| + stderr = stderr.getvalue()
|
| + return (stdout, stderr)
|
| +
|
|
|
| def communicate(args, timeout=None, **kwargs):
|
| """Wraps subprocess.Popen().communicate() and add timeout support.
|
| @@ -226,39 +383,11 @@ def communicate(args, timeout=None, **kwargs):
|
| # set the Popen() parameter accordingly.
|
| kwargs['stdin'] = PIPE
|
|
|
| - if not timeout:
|
| - # Normal workflow.
|
| - proc = Popen(args, **kwargs)
|
| - if stdin is not None:
|
| - return proc.communicate(stdin), proc.returncode
|
| - else:
|
| - return proc.communicate(), proc.returncode
|
| -
|
| - # Create a temporary file to workaround python's deadlock.
|
| - # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
|
| - # When the pipe fills up, it will deadlock this process. Using a real file
|
| - # works around that issue.
|
| - with tempfile.TemporaryFile() as buff:
|
| - kwargs['stdout'] = buff
|
| - proc = Popen(args, **kwargs)
|
| - if proc.shell:
|
| - raise TypeError(
|
| - 'Using timeout and shell simultaneously will cause a process leak '
|
| - 'since the shell will be killed instead of the child process.')
|
| - if stdin is not None:
|
| - proc.stdin.write(stdin)
|
| - while proc.returncode is None:
|
| - proc.poll()
|
| - if timeout and (time.time() - proc.start) > timeout:
|
| - proc.kill()
|
| - proc.wait()
|
| - # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT.
|
| - proc.returncode = TIMED_OUT
|
| - time.sleep(0.001)
|
| - # Now that the process died, reset the cursor and read the file.
|
| - buff.seek(0)
|
| - out = (buff.read(), None)
|
| - return out, proc.returncode
|
| + proc = Popen(args, **kwargs)
|
| + if stdin not in (None, VOID):
|
| + return proc.communicate(stdin, timeout), proc.returncode
|
| + else:
|
| + return proc.communicate(None, timeout), proc.returncode
|
|
|
|
|
| def call(args, **kwargs):
|
|
|