Chromium Code Reviews| Index: subprocess2.py |
| diff --git a/subprocess2.py b/subprocess2.py |
| index 99ecefcf7f54fdfd86698a2be1931360751e5a93..8d8b2e86f378b22e7b8b21bde6d4240189bc281c 100644 |
| --- a/subprocess2.py |
| +++ b/subprocess2.py |
| @@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed. |
| """ |
| from __future__ import with_statement |
| +import cStringIO |
| import errno |
| import logging |
| import os |
| +import Queue |
| import subprocess |
| import sys |
| -import tempfile |
| import time |
| import threading |
| @@ -170,6 +171,10 @@ class Popen(subprocess.Popen): |
| tmp_str += '; cwd=%s' % kwargs['cwd'] |
| logging.debug(tmp_str) |
| + self.stdout_cb = None |
| + self.stderr_cb = None |
| + self.stdout_void = False |
| + self.stderr_void = False |
| def fix(stream): |
| if kwargs.get(stream) in (VOID, os.devnull): |
| # Replaces VOID with handle to /dev/null. |
| @@ -178,11 +183,17 @@ class Popen(subprocess.Popen): |
| # When the pipe fills up, it will deadlock this process. Using a real |
| # file works around that issue. |
| kwargs[stream] = open(os.devnull, 'w') |
| + setattr(self, stream + '_void', True) |
| + if callable(kwargs.get(stream)): |
| + # Callable stdout/stderr should be used only with call() wrappers. |
| + setattr(self, stream + '_cb', kwargs[stream]) |
| + kwargs[stream] = PIPE |
| fix('stdout') |
| fix('stderr') |
| self.start = time.time() |
| + self.timeout = None |
| self.shell = kwargs.get('shell', None) |
| try: |
| @@ -203,6 +214,153 @@ class Popen(subprocess.Popen): |
| # through |
| raise |
| + def _tee_threads(self, input): # pylint: disable=W0622 |
| + """Does I/O for a process's pipes using thread. |
| + |
| + It's the simplest and slowest implementation. Expect very slow behavior. |
| + |
| + If there is a callback and it doesn't keep up with the calls, the timeout |
| + effectiveness will be delayed accordingly. |
| + """ |
| + # Queue of either of <threadname> when done or (<threadname>, data). In |
| + # theory we would like to limit to ~64kb items to not cause large memory |
| + # usage when the callback blocks. It is not done because it slows down |
| + # processing on OSX10.6 by a factor of 2x, making it even slower than |
| + # Windows! Revisit this decision if it becomes a problem, e.g. crash |
| + # because of memory exhaustion. |
| + queue = Queue.Queue() |
| + done = threading.Event() |
| + |
| + def write_stdin(): |
| + try: |
| + stdin_io = cStringIO.StringIO(input) |
| + while True: |
| + data = stdin_io.read(1024) |
| + if data: |
| + self.stdin.write(data) |
| + else: |
| + self.stdin.close() |
| + break |
| + finally: |
| + queue.put('stdin') |
| + |
| + def _queue_pipe_read(pipe, name): |
| + """Queues characters read from a pipe into a queue.""" |
| + try: |
| + while True: |
| + data = pipe.read(1) |
| + if not data: |
| + break |
| + queue.put((name, data)) |
| + finally: |
| + queue.put(name) |
| + |
| + def timeout_fn(): |
| + try: |
| + done.wait(self.timeout) |
| + finally: |
| + queue.put('timeout') |
| + |
| + def wait_fn(): |
| + try: |
| + self.wait() |
| + finally: |
| + queue.put('wait') |
| + |
| + # Starts up to 5 threads: |
| + # Wait for the process to quit |
| + # Read stdout |
| + # Read stderr |
| + # Write stdin |
| + # Timeout |
| + threads = { |
| + 'wait': threading.Thread(target=wait_fn), |
|
M-A Ruel
2011/11/30 20:44:51
By putting wait in its own thread, I don't need to
|
| + } |
| + if self.timeout is not None: |
| + threads['timeout'] = threading.Thread(target=timeout_fn) |
| + if self.stdout_cb: |
| + threads['stdout'] = threading.Thread( |
| + target=_queue_pipe_read, args=(self.stdout, 'stdout')) |
| + if self.stderr_cb: |
| + threads['stderr'] = threading.Thread( |
| + target=_queue_pipe_read, args=(self.stderr, 'stderr')) |
| + if input: |
| + threads['stdin'] = threading.Thread(target=write_stdin) |
| + for t in threads.itervalues(): |
| + t.daemon = True |
| + t.start() |
| + |
| + timed_out = False |
| + try: |
| + # This thread needs to be optimized for speed. |
| + while threads: |
| + item = queue.get() |
| + if item[0] is 'stdout': |
| + self.stdout_cb(item[1]) |
| + elif item[0] is 'stderr': |
| + self.stderr_cb(item[1]) |
| + else: |
| + # A thread terminated. |
| + threads[item].join() |
| + del threads[item] |
| + if item == 'wait': |
| + # Terminate the timeout thread if necessary. |
| + done.set() |
| + elif item == 'timeout' and not timed_out and self.poll() is None: |
| + logging.debug('Timed out after %fs: killing' % self.timeout) |
| + self.kill() |
| + timed_out = True |
| + finally: |
| + # Stop the threads. |
| + done.set() |
| + if 'wait' in threads: |
| + # Accelerate things, otherwise it would hang until the child process is |
| + # done. |
| + logging.debug('Killing child because of an exception') |
| + self.kill() |
| + # Join threads. |
| + for thread in threads.itervalues(): |
| + thread.join() |
| + if timed_out: |
| + self.returncode = TIMED_OUT |
| + |
| + def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622 |
| + """Adds timeout and callbacks support. |
| + |
| + Returns (stdout, stderr) like subprocess.Popen().communicate(). |
| + |
| + - The process will be killed after |timeout| seconds and returncode set to |
| + TIMED_OUT. |
| + """ |
| + self.timeout = timeout |
| + if not self.timeout and not self.stdout_cb and not self.stderr_cb: |
| + return super(Popen, self).communicate(input) |
| + |
| + if self.timeout and self.shell: |
| + raise TypeError( |
| + 'Using timeout and shell simultaneously will cause a process leak ' |
| + 'since the shell will be killed instead of the child process.') |
| + |
| + stdout = None |
| + stderr = None |
| + # Convert to a lambda to workaround python's deadlock. |
| + # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
| + # When the pipe fills up, it will deadlock this process. Using a thread |
| + # works around that issue. No need for thread safe function since the call |
| + # backs are guaranteed to be called from the main thread. |
| + if self.stdout and not self.stdout_cb and not self.stdout_void: |
| + stdout = cStringIO.StringIO() |
| + self.stdout_cb = stdout.write |
| + if self.stderr and not self.stderr_cb and not self.stderr_void: |
| + stderr = cStringIO.StringIO() |
| + self.stderr_cb = stderr.write |
| + self._tee_threads(input) |
| + if stdout: |
| + stdout = stdout.getvalue() |
| + if stderr: |
| + stderr = stderr.getvalue() |
| + return (stdout, stderr) |
| + |
| def communicate(args, timeout=None, **kwargs): |
| """Wraps subprocess.Popen().communicate() and add timeout support. |
| @@ -224,39 +382,11 @@ def communicate(args, timeout=None, **kwargs): |
| # set the Popen() parameter accordingly. |
| kwargs['stdin'] = PIPE |
| - if not timeout: |
| - # Normal workflow. |
| - proc = Popen(args, **kwargs) |
| - if stdin is not None: |
| - return proc.communicate(stdin), proc.returncode |
| - else: |
| - return proc.communicate(), proc.returncode |
| - |
| - # Create a temporary file to workaround python's deadlock. |
| - # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
| - # When the pipe fills up, it will deadlock this process. Using a real file |
| - # works around that issue. |
| - with tempfile.TemporaryFile() as buff: |
| - kwargs['stdout'] = buff |
| - proc = Popen(args, **kwargs) |
| - if proc.shell: |
| - raise TypeError( |
| - 'Using timeout and shell simultaneously will cause a process leak ' |
| - 'since the shell will be killed instead of the child process.') |
| - if stdin is not None: |
| - proc.stdin.write(stdin) |
| - while proc.returncode is None: |
| - proc.poll() |
| - if timeout and (time.time() - proc.start) > timeout: |
| - proc.kill() |
| - proc.wait() |
| - # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. |
| - proc.returncode = TIMED_OUT |
| - time.sleep(0.001) |
| - # Now that the process died, reset the cursor and read the file. |
| - buff.seek(0) |
| - out = (buff.read(), None) |
| - return out, proc.returncode |
| + proc = Popen(args, **kwargs) |
| + if stdin not in (None, VOID): |
| + return proc.communicate(stdin, timeout), proc.returncode |
| + else: |
| + return proc.communicate(None, timeout), proc.returncode |
| def call(args, **kwargs): |