Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1066)

Unified Diff: subprocess42.py

Issue 2291223003: It's time to bring subprocess42 to depot_tools. (Closed)
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: subprocess42.py
diff --git a/subprocess42.py b/subprocess42.py
new file mode 100644
index 0000000000000000000000000000000000000000..ba2769636eea52be60582a5e44a43b950a4b83c9
--- /dev/null
+++ b/subprocess42.py
@@ -0,0 +1,607 @@
+# Copyright 2013 The LUCI Authors. All rights reserved.
+# Use of this source code is governed by the Apache v2.0 license that can be
+# found in the LICENSE file.
+
+# This is a direct vendored copy of the original which is located at:
+# https://github.com/luci/luci-py/blob/master/client/utils/subprocess42.py
+# at commit 45eab10a17a55f1978f3bfa02c35457dee1c64e4.
+
+"""subprocess42 is the answer to life the universe and everything.
+
+It has the particularity of having a Popen implementation that can yield output
+as it is produced while implementing a timeout and NOT requiring the use of
+worker threads.
+
+Example:
+ Wait for a child process with a timeout, send SIGTERM, wait a grace period
+ then send SIGKILL:
+
+ def wait_terminate_then_kill(proc, timeout, grace):
+ try:
+ return proc.wait(timeout)
+ except subprocess42.TimeoutExpired:
+ proc.terminate()
+ try:
+ return proc.wait(grace)
+ except subprocess42.TimeoutExpired:
+ proc.kill()
+ return proc.wait()
+
+
+TODO(maruel): Add VOID support like subprocess2.
+"""
+
+import contextlib
+import errno
+import os
+import signal
+import threading
+import time
+
+import subprocess
+
+from subprocess import CalledProcessError, PIPE, STDOUT # pylint: disable=W0611
+from subprocess import list2cmdline
+
+
+# Default maxsize argument.
+MAX_SIZE = 16384
+
+
+if subprocess.mswindows:
+ import msvcrt # pylint: disable=F0401
+ from ctypes import wintypes
+ from ctypes import windll
+
+
+ # Which to be received depends on how this process was called and outside the
+ # control of this script. See Popen docstring for more details.
+ STOP_SIGNALS = (signal.SIGBREAK, signal.SIGTERM)
+
+
+ def ReadFile(handle, desired_bytes):
+ """Calls kernel32.ReadFile()."""
+ c_read = wintypes.DWORD()
+ buff = wintypes.create_string_buffer(desired_bytes+1)
+ windll.kernel32.ReadFile(
+ handle, buff, desired_bytes, wintypes.byref(c_read), None)
+ # NULL terminate it.
+ buff[c_read.value] = '\x00'
+ return wintypes.GetLastError(), buff.value
+
+ def PeekNamedPipe(handle):
+ """Calls kernel32.PeekNamedPipe(). Simplified version."""
+ c_avail = wintypes.DWORD()
+ c_message = wintypes.DWORD()
+ success = windll.kernel32.PeekNamedPipe(
+ handle, None, 0, None, wintypes.byref(c_avail),
+ wintypes.byref(c_message))
+ if not success:
+ raise OSError(wintypes.GetLastError())
+ return c_avail.value
+
+ def recv_multi_impl(conns, maxsize, timeout):
+ """Reads from the first available pipe.
+
+ It will immediately return on a closed connection, independent of timeout.
+
+ Arguments:
+ - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
+ - timeout: If None, it is blocking. If 0 or above, will return None if no
+ data is available within |timeout| seconds.
+
+ Returns:
+ tuple(int(index), str(data), bool(closed)).
+ """
+ assert conns
+ assert timeout is None or isinstance(timeout, (int, float)), timeout
+ maxsize = max(maxsize or MAX_SIZE, 1)
+
+ # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
+ # for proc.stdout and proc.stderr but they are implemented as named pipes on
+ # Windows. Since named pipes are not waitable object, they can't be passed
+ # as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile()
+ # and finally WFMO(). This requires caching the events handles in the Popen
+ # object and remembering the pending ReadFile() calls. This will require
+ # some re-architecture to store the relevant event handle and OVERLAPPEDIO
+ # object in Popen or the file object.
+ start = time.time()
+ handles = [
+ (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
+ ]
+ while True:
+ for index, handle in handles:
+ try:
+ avail = min(PeekNamedPipe(handle), maxsize)
+ if avail:
+ return index, ReadFile(handle, avail)[1], False
+ except OSError:
+ # The pipe closed.
+ return index, None, True
+
+ if timeout is not None and (time.time() - start) >= timeout:
+ return None, None, False
+ # Polling rocks.
+ time.sleep(0.001)
+
+else:
+ import fcntl # pylint: disable=F0401
+ import select
+
+
+ # Signals that mean this process should exit quickly.
+ STOP_SIGNALS = (signal.SIGINT, signal.SIGTERM)
+
+
+ def recv_multi_impl(conns, maxsize, timeout):
+ """Reads from the first available pipe.
+
+ It will immediately return on a closed connection, independent of timeout.
+
+ Arguments:
+ - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
+ - timeout: If None, it is blocking. If 0 or above, will return None if no
+ data is available within |timeout| seconds.
+
+ Returns:
+ tuple(int(index), str(data), bool(closed)).
+ """
+ assert conns
+ assert timeout is None or isinstance(timeout, (int, float)), timeout
+ maxsize = max(maxsize or MAX_SIZE, 1)
+
+ # select(timeout=0) will block, it has to be a value > 0.
+ if timeout == 0:
+ timeout = 0.001
+ try:
+ r, _, _ = select.select(conns, [], [], timeout)
+ except select.error:
+ r = None
+ if not r:
+ return None, None, False
+
+ conn = r[0]
+ # Temporarily make it non-blocking.
+ # TODO(maruel): This is not very efficient when the caller is doing this in
+ # a loop. Add a mechanism to have the caller handle this.
+ flags = fcntl.fcntl(conn, fcntl.F_GETFL)
+ if not conn.closed:
+ # pylint: disable=E1101
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+ try:
+ try:
+ data = conn.read(maxsize)
+ except IOError as e:
+ # On posix, this means the read would block.
+ if e.errno == errno.EAGAIN:
+ return conns.index(conn), None, False
+ raise e
+
+ if not data:
+ # On posix, this means the channel closed.
+ return conns.index(conn), None, True
+
+ return conns.index(conn), data, False
+ finally:
+ if not conn.closed:
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags)
+
+
+class TimeoutExpired(Exception):
+ """Compatible with python3 subprocess."""
+ def __init__(self, cmd, timeout, output=None, stderr=None):
+ self.cmd = cmd
+ self.timeout = timeout
+ self.output = output
+ # Non-standard:
+ self.stderr = stderr
+ super(TimeoutExpired, self).__init__(str(self))
+
+ def __str__(self):
+ return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout)
+
+
+class Popen(subprocess.Popen):
+ """Adds timeout support on stdout and stderr.
+
+ Inspired by
+ http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
+
+ Unlike subprocess, yield_any(), recv_*(), communicate() will close stdout and
+ stderr once the child process closes them, after all the data is read.
+
+ Arguments:
+ - detached: If True, the process is created in a new process group. On
+ Windows, use CREATE_NEW_PROCESS_GROUP. On posix, use os.setpgid(0, 0).
+
+ Additional members:
+ - start: timestamp when this process started.
+ - end: timestamp when this process exited, as seen by this process.
+ - detached: If True, the child process was started as a detached process.
+ - gid: process group id, if any.
+ - duration: time in seconds the process lasted.
+
+ Additional methods:
+ - yield_any(): yields output until the process terminates.
+ - recv_any(): reads from stdout and/or stderr with optional timeout.
+ - recv_out() & recv_err(): specialized version of recv_any().
+ """
+ # subprocess.Popen.__init__() is not threadsafe; there is a race between
+ # creating the exec-error pipe for the child and setting it to CLOEXEC during
+ # which another thread can fork and cause the pipe to be inherited by its
+ # descendents, which will cause the current Popen to hang until all those
+ # descendents exit. Protect this with a lock so that only one fork/exec can
+ # happen at a time.
+ popen_lock = threading.Lock()
+
+ def __init__(self, args, **kwargs):
+ assert 'creationflags' not in kwargs
+ assert 'preexec_fn' not in kwargs, 'Use detached=True instead'
+ self.start = time.time()
+ self.end = None
+ self.gid = None
+ self.detached = kwargs.pop('detached', False)
+ if self.detached:
+ if subprocess.mswindows:
+ kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
+ else:
+ kwargs['preexec_fn'] = lambda: os.setpgid(0, 0)
+ with self.popen_lock:
+ super(Popen, self).__init__(args, **kwargs)
+ self.args = args
+ if self.detached and not subprocess.mswindows:
+ try:
+ self.gid = os.getpgid(self.pid)
+ except OSError:
+ # sometimes the process can run+finish before we collect its pgid. fun.
+ pass
+
+ def duration(self):
+ """Duration of the child process.
+
+ It is greater or equal to the actual time the child process ran. It can be
+ significantly higher than the real value if neither .wait() nor .poll() was
+ used.
+ """
+ return (self.end or time.time()) - self.start
+
+ # pylint: disable=arguments-differ,redefined-builtin
+ def communicate(self, input=None, timeout=None):
+ """Implements python3's timeout support.
+
+ Unlike wait(), timeout=0 is considered the same as None.
+
+ Raises:
+ - TimeoutExpired when more than timeout seconds were spent waiting for the
+ process.
+ """
+ if not timeout:
+ return super(Popen, self).communicate(input=input)
+
+ assert isinstance(timeout, (int, float)), timeout
+
+ if self.stdin or self.stdout or self.stderr:
+ stdout = '' if self.stdout else None
+ stderr = '' if self.stderr else None
+ t = None
+ if input is not None:
+ assert self.stdin, (
+ 'Can\'t use communicate(input) if not using '
+ 'Popen(stdin=subprocess42.PIPE')
+ # TODO(maruel): Switch back to non-threading.
+ def write():
+ try:
+ self.stdin.write(input)
+ except IOError:
+ pass
+ t = threading.Thread(name='Popen.communicate', target=write)
+ t.daemon = True
+ t.start()
+
+ try:
+ if self.stdout or self.stderr:
+ start = time.time()
+ end = start + timeout
+ def remaining():
+ return max(end - time.time(), 0)
+ for pipe, data in self.yield_any(timeout=remaining):
+ if pipe is None:
+ raise TimeoutExpired(self.args, timeout, stdout, stderr)
+ assert pipe in ('stdout', 'stderr'), pipe
+ if pipe == 'stdout':
+ stdout += data
+ else:
+ stderr += data
+ else:
+ # Only stdin is piped.
+ self.wait(timeout=timeout)
+ finally:
+ if t:
+ try:
+ self.stdin.close()
+ except IOError:
+ pass
+ t.join()
+ else:
+ # No pipe. The user wanted to use wait().
+ self.wait(timeout=timeout)
+ return None, None
+
+ # Indirectly initialize self.end.
+ self.wait()
+ return stdout, stderr
+
+ def wait(self, timeout=None): # pylint: disable=arguments-differ
+ """Implements python3's timeout support.
+
+ Raises:
+ - TimeoutExpired when more than timeout seconds were spent waiting for the
+ process.
+ """
+ assert timeout is None or isinstance(timeout, (int, float)), timeout
+ if timeout is None:
+ super(Popen, self).wait()
+ elif self.returncode is None:
+ if subprocess.mswindows:
+ WAIT_TIMEOUT = 258
+ result = subprocess._subprocess.WaitForSingleObject(
+ self._handle, int(timeout * 1000))
+ if result == WAIT_TIMEOUT:
+ raise TimeoutExpired(self.args, timeout)
+ self.returncode = subprocess._subprocess.GetExitCodeProcess(
+ self._handle)
+ else:
+ # If you think the following code is horrible, it's because it is
+ # inspired by python3's stdlib.
+ end = time.time() + timeout
+ delay = 0.001
+ while True:
+ try:
+ pid, sts = subprocess._eintr_retry_call(
+ os.waitpid, self.pid, os.WNOHANG)
+ except OSError as e:
+ if e.errno != errno.ECHILD:
+ raise
+ pid = self.pid
+ sts = 0
+ if pid == self.pid:
+ # This sets self.returncode.
+ self._handle_exitstatus(sts)
+ break
+ remaining = end - time.time()
+ if remaining <= 0:
+ raise TimeoutExpired(self.args, timeout)
+ delay = min(delay * 2, remaining, .05)
+ time.sleep(delay)
+
+ if not self.end:
+ # communicate() uses wait() internally.
+ self.end = time.time()
+ return self.returncode
+
+ def poll(self):
+ ret = super(Popen, self).poll()
+ if ret is not None and not self.end:
+ self.end = time.time()
+ return ret
+
+ def yield_any(self, maxsize=None, timeout=None):
+ """Yields output until the process terminates.
+
+ Unlike wait(), does not raise TimeoutExpired.
+
+ Yields:
+ (pipename, data) where pipename is either 'stdout', 'stderr' or None in
+ case of timeout or when the child process closed one of the pipe(s) and
+ all pending data on the pipe was read.
+
+ Arguments:
+ - maxsize: See recv_any(). Can be a callable function.
+ - timeout: If None, the call is blocking. If set, yields None, None if no
+ data is available within |timeout| seconds. It resets itself after
+ each yield. Can be a callable function.
+ """
+ assert self.stdout or self.stderr
+ if timeout is not None:
+ # timeout=0 effectively means that the pipe is continuously polled.
+ if isinstance(timeout, (int, float)):
+ assert timeout >= 0, timeout
+ old_timeout = timeout
+ timeout = lambda: old_timeout
+ else:
+ assert callable(timeout), timeout
+
+ if maxsize is not None and not callable(maxsize):
+ assert isinstance(maxsize, (int, float)), maxsize
+
+ last_yield = time.time()
+ while self.poll() is None:
+ to = (None if timeout is None
+ else max(timeout() - (time.time() - last_yield), 0))
+ t, data = self.recv_any(
+ maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to)
+ if data or to is 0:
+ yield t, data
+ last_yield = time.time()
+
+ # Read all remaining output in the pipes.
+ # There is 3 cases:
+ # - pipes get closed automatically by the calling process before it exits
+ # - pipes are closed automated by the OS
+ # - pipes are kept open due to grand-children processes outliving the
+ # children process.
+ while True:
+ ms = maxsize
+ if callable(maxsize):
+ ms = maxsize()
+ # timeout=0 is mainly to handle the case where a grand-children process
+ # outlives the process started.
+ t, data = self.recv_any(maxsize=ms, timeout=0)
+ if not data:
+ break
+ yield t, data
+
+ def recv_any(self, maxsize=None, timeout=None):
+ """Reads from the first pipe available from stdout and stderr.
+
+ Unlike wait(), it does not throw TimeoutExpired.
+
+ Arguments:
+ - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
+ - timeout: If None, it is blocking. If 0 or above, will return None if no
+ data is available within |timeout| seconds.
+
+ Returns:
+ tuple(pipename or None, str(data)). pipename is one of 'stdout' or
+ 'stderr'.
+ """
+ # recv_multi_impl will early exit on a closed connection. Loop accordingly
+ # to simplify call sites.
+ while True:
+ pipes = [
+ x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
+ ]
+ # If both stdout and stderr have the exact file handle, they are
+ # effectively the same pipe. Deduplicate it since otherwise it confuses
+ # recv_multi_impl().
+ if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
+ pipes.pop(0)
+
+ if not pipes:
+ return None, None
+ start = time.time()
+ conns, names = zip(*pipes)
+ index, data, closed = recv_multi_impl(conns, maxsize, timeout)
+ if index is None:
+ return index, data
+ if closed:
+ self._close(names[index])
+ if not data:
+ # Loop again. The other pipe may still be open.
+ if timeout:
+ timeout -= (time.time() - start)
+ continue
+
+ if self.universal_newlines and data:
+ data = self._translate_newlines(data)
+ return names[index], data
+
+ def recv_out(self, maxsize=None, timeout=None):
+ """Reads from stdout synchronously with timeout."""
+ return self._recv('stdout', maxsize, timeout)
+
+ def recv_err(self, maxsize=None, timeout=None):
+ """Reads from stderr synchronously with timeout."""
+ return self._recv('stderr', maxsize, timeout)
+
+ def terminate(self):
+ """Tries to do something saner on Windows that the stdlib.
+
+ Windows:
+ self.detached/CREATE_NEW_PROCESS_GROUP determines what can be used:
+ - If set, only SIGBREAK can be sent and it is sent to a single process.
+ - If not set, in theory only SIGINT can be used and *all processes* in
+ the processgroup receive it. In practice, we just kill the process.
+ See http://msdn.microsoft.com/library/windows/desktop/ms683155.aspx
+ The default on Windows is to call TerminateProcess() always, which is not
+ useful.
+
+ On Posix, always send SIGTERM.
+ """
+ try:
+ if subprocess.mswindows and self.detached:
+ return self.send_signal(signal.CTRL_BREAK_EVENT)
+ super(Popen, self).terminate()
+ except OSError:
+ # The function will throw if the process terminated in-between. Swallow
+ # this.
+ pass
+
+ def kill(self):
+ """Kills the process and its children if possible.
+
+ Swallows exceptions and return True on success.
+ """
+ if self.gid:
+ try:
+ os.killpg(self.gid, signal.SIGKILL)
+ except OSError:
+ return False
+ else:
+ try:
+ super(Popen, self).kill()
+ except OSError:
+ return False
+ return True
+
+ def _close(self, which):
+ """Closes either stdout or stderr."""
+ getattr(self, which).close()
+ setattr(self, which, None)
+
+ def _recv(self, which, maxsize, timeout):
+ """Reads from one of stdout or stderr synchronously with timeout."""
+ conn = getattr(self, which)
+ if conn is None:
+ return None
+ _, data, closed = recv_multi_impl([conn], maxsize, timeout)
+ if closed:
+ self._close(which)
+ if self.universal_newlines and data:
+ data = self._translate_newlines(data)
+ return data
+
+
+@contextlib.contextmanager
+def set_signal_handler(signals, handler):
+ """Temporarilly override signals handler.
+
+ Useful when waiting for a child process to handle signals like SIGTERM, so the
+ signal can be propagated to the child process.
+ """
+ previous = {s: signal.signal(s, handler) for s in signals}
+ try:
+ yield
+ finally:
+ for sig, h in previous.iteritems():
+ signal.signal(sig, h)
+
+
+def call(*args, **kwargs):
+ """Adds support for timeout."""
+ timeout = kwargs.pop('timeout', None)
+ return Popen(*args, **kwargs).wait(timeout)
+
+
+def check_call(*args, **kwargs):
+ """Adds support for timeout."""
+ retcode = call(*args, **kwargs)
+ if retcode:
+ raise CalledProcessError(retcode, kwargs.get('args') or args[0])
+ return 0
+
+
+def check_output(*args, **kwargs):
+ """Adds support for timeout."""
+ timeout = kwargs.pop('timeout', None)
+ if 'stdout' in kwargs:
+ raise ValueError('stdout argument not allowed, it will be overridden.')
+ process = Popen(stdout=PIPE, *args, **kwargs)
+ output, _ = process.communicate(timeout=timeout)
+ retcode = process.poll()
+ if retcode:
+ raise CalledProcessError(retcode, kwargs.get('args') or args[0], output)
+ return output
+
+
+def call_with_timeout(args, timeout, **kwargs):
+ """Runs an executable; kill it in case of timeout."""
+ proc = Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
+ try:
+ out, err = proc.communicate(timeout=timeout)
+ except TimeoutExpired as e:
+ out = e.output
+ err = e.stderr
+ proc.kill()
+ proc.wait()
+ return out, err, proc.returncode, proc.duration()
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698