Index: subprocess42.py |
diff --git a/subprocess42.py b/subprocess42.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ba2769636eea52be60582a5e44a43b950a4b83c9 |
--- /dev/null |
+++ b/subprocess42.py |
@@ -0,0 +1,607 @@ |
+# Copyright 2013 The LUCI Authors. All rights reserved. |
+# Use of this source code is governed by the Apache v2.0 license that can be |
+# found in the LICENSE file. |
+ |
+# This is a direct vendored copy of the original which is located at: |
+# https://github.com/luci/luci-py/blob/master/client/utils/subprocess42.py |
+# at commit 45eab10a17a55f1978f3bfa02c35457dee1c64e4. |
+ |
+"""subprocess42 is the answer to life the universe and everything. |
+ |
+It has the particularity of having a Popen implementation that can yield output |
+as it is produced while implementing a timeout and NOT requiring the use of |
+worker threads. |
+ |
+Example: |
+ Wait for a child process with a timeout, send SIGTERM, wait a grace period |
+ then send SIGKILL: |
+ |
+ def wait_terminate_then_kill(proc, timeout, grace): |
+ try: |
+ return proc.wait(timeout) |
+ except subprocess42.TimeoutExpired: |
+ proc.terminate() |
+ try: |
+ return proc.wait(grace) |
+ except subprocess42.TimeoutExpired: |
+ proc.kill() |
+ return proc.wait() |
+ |
+ |
+TODO(maruel): Add VOID support like subprocess2. |
+""" |
+ |
+import contextlib |
+import errno |
+import os |
+import signal |
+import threading |
+import time |
+ |
+import subprocess |
+ |
+from subprocess import CalledProcessError, PIPE, STDOUT # pylint: disable=W0611 |
+from subprocess import list2cmdline |
+ |
+ |
+# Default maxsize argument. |
+MAX_SIZE = 16384 |
+ |
+ |
+if subprocess.mswindows: |
+ import msvcrt # pylint: disable=F0401 |
+ from ctypes import wintypes |
+ from ctypes import windll |
+ |
+ |
+ # Which to be received depends on how this process was called and outside the |
+ # control of this script. See Popen docstring for more details. |
+ STOP_SIGNALS = (signal.SIGBREAK, signal.SIGTERM) |
+ |
+ |
+ def ReadFile(handle, desired_bytes): |
+ """Calls kernel32.ReadFile().""" |
+ c_read = wintypes.DWORD() |
+ buff = wintypes.create_string_buffer(desired_bytes+1) |
+ windll.kernel32.ReadFile( |
+ handle, buff, desired_bytes, wintypes.byref(c_read), None) |
+ # NULL terminate it. |
+ buff[c_read.value] = '\x00' |
+ return wintypes.GetLastError(), buff.value |
+ |
+ def PeekNamedPipe(handle): |
+ """Calls kernel32.PeekNamedPipe(). Simplified version.""" |
+ c_avail = wintypes.DWORD() |
+ c_message = wintypes.DWORD() |
+ success = windll.kernel32.PeekNamedPipe( |
+ handle, None, 0, None, wintypes.byref(c_avail), |
+ wintypes.byref(c_message)) |
+ if not success: |
+ raise OSError(wintypes.GetLastError()) |
+ return c_avail.value |
+ |
+ def recv_multi_impl(conns, maxsize, timeout): |
+ """Reads from the first available pipe. |
+ |
+ It will immediately return on a closed connection, independent of timeout. |
+ |
+ Arguments: |
+ - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. |
+ - timeout: If None, it is blocking. If 0 or above, will return None if no |
+ data is available within |timeout| seconds. |
+ |
+ Returns: |
+ tuple(int(index), str(data), bool(closed)). |
+ """ |
+ assert conns |
+ assert timeout is None or isinstance(timeout, (int, float)), timeout |
+ maxsize = max(maxsize or MAX_SIZE, 1) |
+ |
+ # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes |
+ # for proc.stdout and proc.stderr but they are implemented as named pipes on |
+ # Windows. Since named pipes are not waitable object, they can't be passed |
+ # as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile() |
+ # and finally WFMO(). This requires caching the events handles in the Popen |
+ # object and remembering the pending ReadFile() calls. This will require |
+ # some re-architecture to store the relevant event handle and OVERLAPPEDIO |
+ # object in Popen or the file object. |
+ start = time.time() |
+ handles = [ |
+ (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns) |
+ ] |
+ while True: |
+ for index, handle in handles: |
+ try: |
+ avail = min(PeekNamedPipe(handle), maxsize) |
+ if avail: |
+ return index, ReadFile(handle, avail)[1], False |
+ except OSError: |
+ # The pipe closed. |
+ return index, None, True |
+ |
+ if timeout is not None and (time.time() - start) >= timeout: |
+ return None, None, False |
+ # Polling rocks. |
+ time.sleep(0.001) |
+ |
+else: |
+ import fcntl # pylint: disable=F0401 |
+ import select |
+ |
+ |
+ # Signals that mean this process should exit quickly. |
+ STOP_SIGNALS = (signal.SIGINT, signal.SIGTERM) |
+ |
+ |
+ def recv_multi_impl(conns, maxsize, timeout): |
+ """Reads from the first available pipe. |
+ |
+ It will immediately return on a closed connection, independent of timeout. |
+ |
+ Arguments: |
+ - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. |
+ - timeout: If None, it is blocking. If 0 or above, will return None if no |
+ data is available within |timeout| seconds. |
+ |
+ Returns: |
+ tuple(int(index), str(data), bool(closed)). |
+ """ |
+ assert conns |
+ assert timeout is None or isinstance(timeout, (int, float)), timeout |
+ maxsize = max(maxsize or MAX_SIZE, 1) |
+ |
+ # select(timeout=0) will block, it has to be a value > 0. |
+ if timeout == 0: |
+ timeout = 0.001 |
+ try: |
+ r, _, _ = select.select(conns, [], [], timeout) |
+ except select.error: |
+ r = None |
+ if not r: |
+ return None, None, False |
+ |
+ conn = r[0] |
+ # Temporarily make it non-blocking. |
+ # TODO(maruel): This is not very efficient when the caller is doing this in |
+ # a loop. Add a mechanism to have the caller handle this. |
+ flags = fcntl.fcntl(conn, fcntl.F_GETFL) |
+ if not conn.closed: |
+ # pylint: disable=E1101 |
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
+ try: |
+ try: |
+ data = conn.read(maxsize) |
+ except IOError as e: |
+ # On posix, this means the read would block. |
+ if e.errno == errno.EAGAIN: |
+ return conns.index(conn), None, False |
+ raise e |
+ |
+ if not data: |
+ # On posix, this means the channel closed. |
+ return conns.index(conn), None, True |
+ |
+ return conns.index(conn), data, False |
+ finally: |
+ if not conn.closed: |
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags) |
+ |
+ |
+class TimeoutExpired(Exception): |
+ """Compatible with python3 subprocess.""" |
+ def __init__(self, cmd, timeout, output=None, stderr=None): |
+ self.cmd = cmd |
+ self.timeout = timeout |
+ self.output = output |
+ # Non-standard: |
+ self.stderr = stderr |
+ super(TimeoutExpired, self).__init__(str(self)) |
+ |
+ def __str__(self): |
+ return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout) |
+ |
+ |
+class Popen(subprocess.Popen): |
+ """Adds timeout support on stdout and stderr. |
+ |
+ Inspired by |
+ http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/ |
+ |
+ Unlike subprocess, yield_any(), recv_*(), communicate() will close stdout and |
+ stderr once the child process closes them, after all the data is read. |
+ |
+ Arguments: |
+ - detached: If True, the process is created in a new process group. On |
+ Windows, use CREATE_NEW_PROCESS_GROUP. On posix, use os.setpgid(0, 0). |
+ |
+ Additional members: |
+ - start: timestamp when this process started. |
+ - end: timestamp when this process exited, as seen by this process. |
+ - detached: If True, the child process was started as a detached process. |
+ - gid: process group id, if any. |
+ - duration: time in seconds the process lasted. |
+ |
+ Additional methods: |
+ - yield_any(): yields output until the process terminates. |
+ - recv_any(): reads from stdout and/or stderr with optional timeout. |
+ - recv_out() & recv_err(): specialized version of recv_any(). |
+ """ |
+ # subprocess.Popen.__init__() is not threadsafe; there is a race between |
+ # creating the exec-error pipe for the child and setting it to CLOEXEC during |
+ # which another thread can fork and cause the pipe to be inherited by its |
+ # descendents, which will cause the current Popen to hang until all those |
+ # descendents exit. Protect this with a lock so that only one fork/exec can |
+ # happen at a time. |
+ popen_lock = threading.Lock() |
+ |
+ def __init__(self, args, **kwargs): |
+ assert 'creationflags' not in kwargs |
+ assert 'preexec_fn' not in kwargs, 'Use detached=True instead' |
+ self.start = time.time() |
+ self.end = None |
+ self.gid = None |
+ self.detached = kwargs.pop('detached', False) |
+ if self.detached: |
+ if subprocess.mswindows: |
+ kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP |
+ else: |
+ kwargs['preexec_fn'] = lambda: os.setpgid(0, 0) |
+ with self.popen_lock: |
+ super(Popen, self).__init__(args, **kwargs) |
+ self.args = args |
+ if self.detached and not subprocess.mswindows: |
+ try: |
+ self.gid = os.getpgid(self.pid) |
+ except OSError: |
+ # sometimes the process can run+finish before we collect its pgid. fun. |
+ pass |
+ |
+ def duration(self): |
+ """Duration of the child process. |
+ |
+ It is greater or equal to the actual time the child process ran. It can be |
+ significantly higher than the real value if neither .wait() nor .poll() was |
+ used. |
+ """ |
+ return (self.end or time.time()) - self.start |
+ |
+ # pylint: disable=arguments-differ,redefined-builtin |
+ def communicate(self, input=None, timeout=None): |
+ """Implements python3's timeout support. |
+ |
+ Unlike wait(), timeout=0 is considered the same as None. |
+ |
+ Raises: |
+ - TimeoutExpired when more than timeout seconds were spent waiting for the |
+ process. |
+ """ |
+ if not timeout: |
+ return super(Popen, self).communicate(input=input) |
+ |
+ assert isinstance(timeout, (int, float)), timeout |
+ |
+ if self.stdin or self.stdout or self.stderr: |
+ stdout = '' if self.stdout else None |
+ stderr = '' if self.stderr else None |
+ t = None |
+ if input is not None: |
+ assert self.stdin, ( |
+ 'Can\'t use communicate(input) if not using ' |
+ 'Popen(stdin=subprocess42.PIPE') |
+ # TODO(maruel): Switch back to non-threading. |
+ def write(): |
+ try: |
+ self.stdin.write(input) |
+ except IOError: |
+ pass |
+ t = threading.Thread(name='Popen.communicate', target=write) |
+ t.daemon = True |
+ t.start() |
+ |
+ try: |
+ if self.stdout or self.stderr: |
+ start = time.time() |
+ end = start + timeout |
+ def remaining(): |
+ return max(end - time.time(), 0) |
+ for pipe, data in self.yield_any(timeout=remaining): |
+ if pipe is None: |
+ raise TimeoutExpired(self.args, timeout, stdout, stderr) |
+ assert pipe in ('stdout', 'stderr'), pipe |
+ if pipe == 'stdout': |
+ stdout += data |
+ else: |
+ stderr += data |
+ else: |
+ # Only stdin is piped. |
+ self.wait(timeout=timeout) |
+ finally: |
+ if t: |
+ try: |
+ self.stdin.close() |
+ except IOError: |
+ pass |
+ t.join() |
+ else: |
+ # No pipe. The user wanted to use wait(). |
+ self.wait(timeout=timeout) |
+ return None, None |
+ |
+ # Indirectly initialize self.end. |
+ self.wait() |
+ return stdout, stderr |
+ |
+ def wait(self, timeout=None): # pylint: disable=arguments-differ |
+ """Implements python3's timeout support. |
+ |
+ Raises: |
+ - TimeoutExpired when more than timeout seconds were spent waiting for the |
+ process. |
+ """ |
+ assert timeout is None or isinstance(timeout, (int, float)), timeout |
+ if timeout is None: |
+ super(Popen, self).wait() |
+ elif self.returncode is None: |
+ if subprocess.mswindows: |
+ WAIT_TIMEOUT = 258 |
+ result = subprocess._subprocess.WaitForSingleObject( |
+ self._handle, int(timeout * 1000)) |
+ if result == WAIT_TIMEOUT: |
+ raise TimeoutExpired(self.args, timeout) |
+ self.returncode = subprocess._subprocess.GetExitCodeProcess( |
+ self._handle) |
+ else: |
+ # If you think the following code is horrible, it's because it is |
+ # inspired by python3's stdlib. |
+ end = time.time() + timeout |
+ delay = 0.001 |
+ while True: |
+ try: |
+ pid, sts = subprocess._eintr_retry_call( |
+ os.waitpid, self.pid, os.WNOHANG) |
+ except OSError as e: |
+ if e.errno != errno.ECHILD: |
+ raise |
+ pid = self.pid |
+ sts = 0 |
+ if pid == self.pid: |
+ # This sets self.returncode. |
+ self._handle_exitstatus(sts) |
+ break |
+ remaining = end - time.time() |
+ if remaining <= 0: |
+ raise TimeoutExpired(self.args, timeout) |
+ delay = min(delay * 2, remaining, .05) |
+ time.sleep(delay) |
+ |
+ if not self.end: |
+ # communicate() uses wait() internally. |
+ self.end = time.time() |
+ return self.returncode |
+ |
+ def poll(self): |
+ ret = super(Popen, self).poll() |
+ if ret is not None and not self.end: |
+ self.end = time.time() |
+ return ret |
+ |
+ def yield_any(self, maxsize=None, timeout=None): |
+ """Yields output until the process terminates. |
+ |
+ Unlike wait(), does not raise TimeoutExpired. |
+ |
+ Yields: |
+ (pipename, data) where pipename is either 'stdout', 'stderr' or None in |
+ case of timeout or when the child process closed one of the pipe(s) and |
+ all pending data on the pipe was read. |
+ |
+ Arguments: |
+ - maxsize: See recv_any(). Can be a callable function. |
+ - timeout: If None, the call is blocking. If set, yields None, None if no |
+ data is available within |timeout| seconds. It resets itself after |
+ each yield. Can be a callable function. |
+ """ |
+ assert self.stdout or self.stderr |
+ if timeout is not None: |
+ # timeout=0 effectively means that the pipe is continuously polled. |
+ if isinstance(timeout, (int, float)): |
+ assert timeout >= 0, timeout |
+ old_timeout = timeout |
+ timeout = lambda: old_timeout |
+ else: |
+ assert callable(timeout), timeout |
+ |
+ if maxsize is not None and not callable(maxsize): |
+ assert isinstance(maxsize, (int, float)), maxsize |
+ |
+ last_yield = time.time() |
+ while self.poll() is None: |
+ to = (None if timeout is None |
+ else max(timeout() - (time.time() - last_yield), 0)) |
+ t, data = self.recv_any( |
+ maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to) |
+ if data or to is 0: |
+ yield t, data |
+ last_yield = time.time() |
+ |
+ # Read all remaining output in the pipes. |
+ # There is 3 cases: |
+ # - pipes get closed automatically by the calling process before it exits |
+ # - pipes are closed automated by the OS |
+ # - pipes are kept open due to grand-children processes outliving the |
+ # children process. |
+ while True: |
+ ms = maxsize |
+ if callable(maxsize): |
+ ms = maxsize() |
+ # timeout=0 is mainly to handle the case where a grand-children process |
+ # outlives the process started. |
+ t, data = self.recv_any(maxsize=ms, timeout=0) |
+ if not data: |
+ break |
+ yield t, data |
+ |
+ def recv_any(self, maxsize=None, timeout=None): |
+ """Reads from the first pipe available from stdout and stderr. |
+ |
+ Unlike wait(), it does not throw TimeoutExpired. |
+ |
+ Arguments: |
+ - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. |
+ - timeout: If None, it is blocking. If 0 or above, will return None if no |
+ data is available within |timeout| seconds. |
+ |
+ Returns: |
+ tuple(pipename or None, str(data)). pipename is one of 'stdout' or |
+ 'stderr'. |
+ """ |
+ # recv_multi_impl will early exit on a closed connection. Loop accordingly |
+ # to simplify call sites. |
+ while True: |
+ pipes = [ |
+ x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0] |
+ ] |
+ # If both stdout and stderr have the exact file handle, they are |
+ # effectively the same pipe. Deduplicate it since otherwise it confuses |
+ # recv_multi_impl(). |
+ if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno(): |
+ pipes.pop(0) |
+ |
+ if not pipes: |
+ return None, None |
+ start = time.time() |
+ conns, names = zip(*pipes) |
+ index, data, closed = recv_multi_impl(conns, maxsize, timeout) |
+ if index is None: |
+ return index, data |
+ if closed: |
+ self._close(names[index]) |
+ if not data: |
+ # Loop again. The other pipe may still be open. |
+ if timeout: |
+ timeout -= (time.time() - start) |
+ continue |
+ |
+ if self.universal_newlines and data: |
+ data = self._translate_newlines(data) |
+ return names[index], data |
+ |
+ def recv_out(self, maxsize=None, timeout=None): |
+ """Reads from stdout synchronously with timeout.""" |
+ return self._recv('stdout', maxsize, timeout) |
+ |
+ def recv_err(self, maxsize=None, timeout=None): |
+ """Reads from stderr synchronously with timeout.""" |
+ return self._recv('stderr', maxsize, timeout) |
+ |
+ def terminate(self): |
+ """Tries to do something saner on Windows that the stdlib. |
+ |
+ Windows: |
+ self.detached/CREATE_NEW_PROCESS_GROUP determines what can be used: |
+ - If set, only SIGBREAK can be sent and it is sent to a single process. |
+ - If not set, in theory only SIGINT can be used and *all processes* in |
+ the processgroup receive it. In practice, we just kill the process. |
+ See http://msdn.microsoft.com/library/windows/desktop/ms683155.aspx |
+ The default on Windows is to call TerminateProcess() always, which is not |
+ useful. |
+ |
+ On Posix, always send SIGTERM. |
+ """ |
+ try: |
+ if subprocess.mswindows and self.detached: |
+ return self.send_signal(signal.CTRL_BREAK_EVENT) |
+ super(Popen, self).terminate() |
+ except OSError: |
+ # The function will throw if the process terminated in-between. Swallow |
+ # this. |
+ pass |
+ |
+ def kill(self): |
+ """Kills the process and its children if possible. |
+ |
+ Swallows exceptions and return True on success. |
+ """ |
+ if self.gid: |
+ try: |
+ os.killpg(self.gid, signal.SIGKILL) |
+ except OSError: |
+ return False |
+ else: |
+ try: |
+ super(Popen, self).kill() |
+ except OSError: |
+ return False |
+ return True |
+ |
+ def _close(self, which): |
+ """Closes either stdout or stderr.""" |
+ getattr(self, which).close() |
+ setattr(self, which, None) |
+ |
+ def _recv(self, which, maxsize, timeout): |
+ """Reads from one of stdout or stderr synchronously with timeout.""" |
+ conn = getattr(self, which) |
+ if conn is None: |
+ return None |
+ _, data, closed = recv_multi_impl([conn], maxsize, timeout) |
+ if closed: |
+ self._close(which) |
+ if self.universal_newlines and data: |
+ data = self._translate_newlines(data) |
+ return data |
+ |
+ |
+@contextlib.contextmanager |
+def set_signal_handler(signals, handler): |
+ """Temporarilly override signals handler. |
+ |
+ Useful when waiting for a child process to handle signals like SIGTERM, so the |
+ signal can be propagated to the child process. |
+ """ |
+ previous = {s: signal.signal(s, handler) for s in signals} |
+ try: |
+ yield |
+ finally: |
+ for sig, h in previous.iteritems(): |
+ signal.signal(sig, h) |
+ |
+ |
+def call(*args, **kwargs): |
+ """Adds support for timeout.""" |
+ timeout = kwargs.pop('timeout', None) |
+ return Popen(*args, **kwargs).wait(timeout) |
+ |
+ |
+def check_call(*args, **kwargs): |
+ """Adds support for timeout.""" |
+ retcode = call(*args, **kwargs) |
+ if retcode: |
+ raise CalledProcessError(retcode, kwargs.get('args') or args[0]) |
+ return 0 |
+ |
+ |
+def check_output(*args, **kwargs): |
+ """Adds support for timeout.""" |
+ timeout = kwargs.pop('timeout', None) |
+ if 'stdout' in kwargs: |
+ raise ValueError('stdout argument not allowed, it will be overridden.') |
+ process = Popen(stdout=PIPE, *args, **kwargs) |
+ output, _ = process.communicate(timeout=timeout) |
+ retcode = process.poll() |
+ if retcode: |
+ raise CalledProcessError(retcode, kwargs.get('args') or args[0], output) |
+ return output |
+ |
+ |
+def call_with_timeout(args, timeout, **kwargs): |
+ """Runs an executable; kill it in case of timeout.""" |
+ proc = Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs) |
+ try: |
+ out, err = proc.communicate(timeout=timeout) |
+ except TimeoutExpired as e: |
+ out = e.output |
+ err = e.stderr |
+ proc.kill() |
+ proc.wait() |
+ return out, err, proc.returncode, proc.duration() |