Chromium Code Reviews| Index: subprocess2.py |
| diff --git a/subprocess2.py b/subprocess2.py |
| index 4708aadf362ee347cf3d6528acc3c9007c917208..810631ae225feae33cfa53b62bd5ee44e1c1f6e6 100644 |
| --- a/subprocess2.py |
| +++ b/subprocess2.py |
| @@ -8,12 +8,13 @@ In theory you shouldn't need anything else in subprocess, or this module failed. |
| """ |
| from __future__ import with_statement |
| +import cStringIO |
| import errno |
| import logging |
| import os |
| +import Queue |
| import subprocess |
| import sys |
| -import tempfile |
| import time |
| import threading |
| @@ -176,6 +177,9 @@ def Popen(args, **kwargs): |
| # When the pipe fills up, it will deadlock this process. Using a real file |
| # works around that issue. |
| kwargs[stream] = open(os.devnull, 'w') |
| + if callable(kwargs.get(stream)): |
| + # Callable stdout/stderr should be used only with call() wrappers. |
| + kwargs[stream] = PIPE |
| fix('stdout') |
| fix('stderr') |
| @@ -198,6 +202,140 @@ def Popen(args, **kwargs): |
| raise |
| +def _queue_pipe_read(pipe, name, done, dest): |
| + """Queue characters read from a pipe into a queue. |
| + |
| + Left outside the _tee_threads function to not introduce a function closure |
| + to speed up variable lookup. |
| + """ |
| + while not done.isSet(): |
| + data = pipe.read(1) |
| + if not data: |
| + break |
| + dest.put((name, data)) |
|
Dirk Pranke
2011/11/04 01:15:11
This timeout doesn't really work if the pipes are
M-A Ruel
2011/11/04 01:41:48
If the process is killed, the pipes are closed, wh
|
| + dest.put(name) |
| + |
| + |
| +def _tee_threads(proc, timeout, start, stdin, args, kwargs): |
| + """Does I/O for a process's pipes using thread. |
| + |
| + It's the simplest and slowest implementation. Expect very slow behavior. |
| + |
| + If there is a callback and it doesn't keep up with the calls, the timeout |
| + effectiveness will be delayed accordingly. |
| + """ |
| + # TODO(maruel): Implement a select based implementation on POSIX and a Windows |
| + # one using WaitForMultipleObjects(). |
| + # |
| + # Queue of either of <threadname> when done or (<threadname>, data). |
| + # In theory we would like to limit to ~64kb items to not cause large memory |
| + # usage when the callback blocks. It is not done because it slows down |
| + # processing on OSX10.6 by a factor of 2x, making it even slower than Windows! |
| + # Revisit this decision if it becomes a problem, e.g. crash because of |
| + # memory exhaustion. |
| + queue = Queue.Queue() |
| + done = threading.Event() |
| + |
| + def write_stdin(): |
| + stdin_io = cStringIO.StringIO(stdin) |
| + while not done.isSet(): |
| + data = stdin_io.read(1024) |
| + if data: |
| + proc.stdin.write(data) |
| + else: |
| + proc.stdin.close() |
| + break |
| + queue.put('stdin') |
| + |
| + def timeout_fn(): |
| + done.wait(timeout) |
| + # No need to close the pipes since killing should be sufficient. |
| + queue.put('timeout') |
| + |
| + # Starts up to 4 threads: |
| + # Read stdout |
| + # Read stderr |
| + # Write stdin |
| + # Timeout |
| + threads = {} |
| + if timeout is not None: |
| + threads['timeout'] = threading.Thread(target=timeout_fn) |
| + if callable(kwargs.get('stdout')): |
| + threads['stdout'] = threading.Thread( |
| + target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue)) |
| + if callable(kwargs.get('stderr')): |
| + threads['stderr'] = threading.Thread( |
| + target=_queue_pipe_read, |
| + args=(proc.stderr, 'stderr', done, queue)) |
| + if isinstance(stdin, str): |
| + threads['stdin'] = threading.Thread(target=write_stdin) |
| + for t in threads.itervalues(): |
| + t.daemon = True |
| + t.start() |
| + |
| + timed_out = False |
| + try: |
| + while proc.returncode is None: |
| + assert threads |
| + proc.poll() |
| + item = queue.get() |
| + if isinstance(item, str): |
| + threads[item].join() |
| + del threads[item] |
| + if item == 'timeout' and not timed_out and proc.poll() is None: |
| + logging.debug('Timed out: killing') |
| + proc.kill() |
| + timed_out = True |
| + if not threads: |
| + # We won't be waken up anymore. Need to busy loop. |
| + break |
| + else: |
| + kwargs[item[0]](item[1]) |
| + finally: |
| + # Stop the threads. |
| + done.set() |
| + # Join threads |
| + for thread in threads.itervalues(): |
| + thread.join() |
| + |
| + # Flush the queue. |
| + try: |
| + while True: |
| + item = queue.get(False) |
| + if isinstance(item, str): |
| + if item == 'timeout': |
| + # TODO(maruel): Does it make sense at that point? |
| + if not timed_out and proc.poll() is None: |
| + logging.debug('Timed out: killing') |
| + proc.kill() |
| + timed_out = True |
| + else: |
| + kwargs[item[0]](item[1]) |
| + except Queue.Empty: |
| + pass |
| + |
| + # Get the remainder. |
| + if callable(kwargs.get('stdout')): |
| + data = proc.stdout.read() |
| + while data: |
| + kwargs['stdout'](data) |
| + data = proc.stdout.read() |
| + if callable(kwargs.get('stderr')): |
| + data = proc.stderr.read() |
| + while data: |
| + kwargs['stderr'](data) |
| + data = proc.stderr.read() |
| + |
| + if proc.returncode is None: |
| + # Usually happens when killed with timeout but not listening to pipes. |
| + proc.wait() |
| + |
| + if timed_out: |
| + return TIMED_OUT |
| + |
| + return proc.returncode |
| + |
| + |
| def communicate(args, timeout=None, **kwargs): |
| """Wraps subprocess.Popen().communicate(). |
| @@ -207,6 +345,11 @@ def communicate(args, timeout=None, **kwargs): |
| TIMED_OUT. |
| - Automatically passes stdin content as input so do not specify stdin=PIPE. |
| """ |
| + if timeout and kwargs.get('shell'): |
| + raise TypeError( |
| + 'Using timeout and shell simultaneously will cause a process leak ' |
| + 'since the shell will be killed instead of the child process.') |
| + |
| stdin = kwargs.pop('stdin', None) |
| if stdin is not None: |
| if stdin is VOID: |
| @@ -218,36 +361,37 @@ def communicate(args, timeout=None, **kwargs): |
| # set the Popen() parameter accordingly. |
| kwargs['stdin'] = PIPE |
| - if not timeout: |
| + start = time.time() |
| + proc = Popen(args, **kwargs) |
| + need_buffering = (timeout or |
| + callable(kwargs.get('stdout')) or callable(kwargs.get('stderr'))) |
| + |
| + if not need_buffering: |
| # Normal workflow. |
| - proc = Popen(args, **kwargs) |
| - if stdin is not None: |
| + if stdin not in (None, VOID): |
| return proc.communicate(stdin), proc.returncode |
| else: |
| return proc.communicate(), proc.returncode |
| - # Create a temporary file to workaround python's deadlock. |
| + stdout = None |
| + stderr = None |
| + # Convert to a lambda to workaround python's deadlock. |
| # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
| - # When the pipe fills up, it will deadlock this process. Using a real file |
| - # works around that issue. |
| - with tempfile.TemporaryFile() as buff: |
| - start = time.time() |
| - kwargs['stdout'] = buff |
| - proc = Popen(args, **kwargs) |
| - if stdin is not None: |
| - proc.stdin.write(stdin) |
| - while proc.returncode is None: |
| - proc.poll() |
| - if timeout and (time.time() - start) > timeout: |
| - proc.kill() |
| - proc.wait() |
| - # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. |
| - proc.returncode = TIMED_OUT |
| - time.sleep(0.001) |
| - # Now that the process died, reset the cursor and read the file. |
| - buff.seek(0) |
| - out = [buff.read(), None] |
| - return out, proc.returncode |
| + # When the pipe fills up, it will deadlock this process. Using a thread |
| + # works around that issue. No need for thread safe function since the call |
| + # backs are guaranteed to be called from the main thread. |
| + if kwargs.get('stdout') == PIPE: |
| + stdout = [] |
| + kwargs['stdout'] = stdout.append |
| + if kwargs.get('stderr') == PIPE: |
| + stderr = [] |
| + kwargs['stderr'] = stderr.append |
| + returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) |
| + if not stdout is None: |
| + stdout = ''.join(stdout) |
| + if not stderr is None: |
| + stderr = ''.join(stderr) |
| + return (stdout, stderr), returncode |
| def call(args, **kwargs): |