| Index: subprocess42.py
|
| diff --git a/subprocess42.py b/subprocess42.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..ba2769636eea52be60582a5e44a43b950a4b83c9
|
| --- /dev/null
|
| +++ b/subprocess42.py
|
| @@ -0,0 +1,607 @@
|
| +# Copyright 2013 The LUCI Authors. All rights reserved.
|
| +# Use of this source code is governed by the Apache v2.0 license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +# This is a direct vendored copy of the original which is located at:
|
| +# https://github.com/luci/luci-py/blob/master/client/utils/subprocess42.py
|
| +# at commit 45eab10a17a55f1978f3bfa02c35457dee1c64e4.
|
| +
|
| +"""subprocess42 is the answer to life the universe and everything.
|
| +
|
| +It has the particularity of having a Popen implementation that can yield output
|
| +as it is produced while implementing a timeout and NOT requiring the use of
|
| +worker threads.
|
| +
|
| +Example:
|
| + Wait for a child process with a timeout, send SIGTERM, wait a grace period
|
| + then send SIGKILL:
|
| +
|
| + def wait_terminate_then_kill(proc, timeout, grace):
|
| + try:
|
| + return proc.wait(timeout)
|
| + except subprocess42.TimeoutExpired:
|
| + proc.terminate()
|
| + try:
|
| + return proc.wait(grace)
|
| + except subprocess42.TimeoutExpired:
|
| + proc.kill()
|
| + return proc.wait()
|
| +
|
| +
|
| +TODO(maruel): Add VOID support like subprocess2.
|
| +"""
|
| +
|
| +import contextlib
|
| +import errno
|
| +import os
|
| +import signal
|
| +import threading
|
| +import time
|
| +
|
| +import subprocess
|
| +
|
| +from subprocess import CalledProcessError, PIPE, STDOUT # pylint: disable=W0611
|
| +from subprocess import list2cmdline
|
| +
|
| +
|
| +# Default maxsize argument.
|
| +MAX_SIZE = 16384
|
| +
|
| +
|
| +if subprocess.mswindows:
|
| + import msvcrt # pylint: disable=F0401
|
| + from ctypes import wintypes
|
| + from ctypes import windll
|
| +
|
| +
|
| + # Which to be received depends on how this process was called and outside the
|
| + # control of this script. See Popen docstring for more details.
|
| + STOP_SIGNALS = (signal.SIGBREAK, signal.SIGTERM)
|
| +
|
| +
|
| + def ReadFile(handle, desired_bytes):
|
| + """Calls kernel32.ReadFile()."""
|
| + c_read = wintypes.DWORD()
|
| + buff = wintypes.create_string_buffer(desired_bytes+1)
|
| + windll.kernel32.ReadFile(
|
| + handle, buff, desired_bytes, wintypes.byref(c_read), None)
|
| + # NULL terminate it.
|
| + buff[c_read.value] = '\x00'
|
| + return wintypes.GetLastError(), buff.value
|
| +
|
| + def PeekNamedPipe(handle):
|
| + """Calls kernel32.PeekNamedPipe(). Simplified version."""
|
| + c_avail = wintypes.DWORD()
|
| + c_message = wintypes.DWORD()
|
| + success = windll.kernel32.PeekNamedPipe(
|
| + handle, None, 0, None, wintypes.byref(c_avail),
|
| + wintypes.byref(c_message))
|
| + if not success:
|
| + raise OSError(wintypes.GetLastError())
|
| + return c_avail.value
|
| +
|
| + def recv_multi_impl(conns, maxsize, timeout):
|
| + """Reads from the first available pipe.
|
| +
|
| + It will immediately return on a closed connection, independent of timeout.
|
| +
|
| + Arguments:
|
| + - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
|
| + - timeout: If None, it is blocking. If 0 or above, will return None if no
|
| + data is available within |timeout| seconds.
|
| +
|
| + Returns:
|
| + tuple(int(index), str(data), bool(closed)).
|
| + """
|
| + assert conns
|
| + assert timeout is None or isinstance(timeout, (int, float)), timeout
|
| + maxsize = max(maxsize or MAX_SIZE, 1)
|
| +
|
| + # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
|
| + # for proc.stdout and proc.stderr but they are implemented as named pipes on
|
| + # Windows. Since named pipes are not waitable object, they can't be passed
|
| + # as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile()
|
| + # and finally WFMO(). This requires caching the events handles in the Popen
|
| + # object and remembering the pending ReadFile() calls. This will require
|
| + # some re-architecture to store the relevant event handle and OVERLAPPEDIO
|
| + # object in Popen or the file object.
|
| + start = time.time()
|
| + handles = [
|
| + (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
|
| + ]
|
| + while True:
|
| + for index, handle in handles:
|
| + try:
|
| + avail = min(PeekNamedPipe(handle), maxsize)
|
| + if avail:
|
| + return index, ReadFile(handle, avail)[1], False
|
| + except OSError:
|
| + # The pipe closed.
|
| + return index, None, True
|
| +
|
| + if timeout is not None and (time.time() - start) >= timeout:
|
| + return None, None, False
|
| + # Polling rocks.
|
| + time.sleep(0.001)
|
| +
|
| +else:
|
| + import fcntl # pylint: disable=F0401
|
| + import select
|
| +
|
| +
|
| + # Signals that mean this process should exit quickly.
|
| + STOP_SIGNALS = (signal.SIGINT, signal.SIGTERM)
|
| +
|
| +
|
| + def recv_multi_impl(conns, maxsize, timeout):
|
| + """Reads from the first available pipe.
|
| +
|
| + It will immediately return on a closed connection, independent of timeout.
|
| +
|
| + Arguments:
|
| + - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
|
| + - timeout: If None, it is blocking. If 0 or above, will return None if no
|
| + data is available within |timeout| seconds.
|
| +
|
| + Returns:
|
| + tuple(int(index), str(data), bool(closed)).
|
| + """
|
| + assert conns
|
| + assert timeout is None or isinstance(timeout, (int, float)), timeout
|
| + maxsize = max(maxsize or MAX_SIZE, 1)
|
| +
|
| + # select(timeout=0) will block, it has to be a value > 0.
|
| + if timeout == 0:
|
| + timeout = 0.001
|
| + try:
|
| + r, _, _ = select.select(conns, [], [], timeout)
|
| + except select.error:
|
| + r = None
|
| + if not r:
|
| + return None, None, False
|
| +
|
| + conn = r[0]
|
| + # Temporarily make it non-blocking.
|
| + # TODO(maruel): This is not very efficient when the caller is doing this in
|
| + # a loop. Add a mechanism to have the caller handle this.
|
| + flags = fcntl.fcntl(conn, fcntl.F_GETFL)
|
| + if not conn.closed:
|
| + # pylint: disable=E1101
|
| + fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
| + try:
|
| + try:
|
| + data = conn.read(maxsize)
|
| + except IOError as e:
|
| + # On posix, this means the read would block.
|
| + if e.errno == errno.EAGAIN:
|
| + return conns.index(conn), None, False
|
| + raise e
|
| +
|
| + if not data:
|
| + # On posix, this means the channel closed.
|
| + return conns.index(conn), None, True
|
| +
|
| + return conns.index(conn), data, False
|
| + finally:
|
| + if not conn.closed:
|
| + fcntl.fcntl(conn, fcntl.F_SETFL, flags)
|
| +
|
| +
|
| +class TimeoutExpired(Exception):
|
| + """Compatible with python3 subprocess."""
|
| + def __init__(self, cmd, timeout, output=None, stderr=None):
|
| + self.cmd = cmd
|
| + self.timeout = timeout
|
| + self.output = output
|
| + # Non-standard:
|
| + self.stderr = stderr
|
| + super(TimeoutExpired, self).__init__(str(self))
|
| +
|
| + def __str__(self):
|
| + return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout)
|
| +
|
| +
|
| +class Popen(subprocess.Popen):
|
| + """Adds timeout support on stdout and stderr.
|
| +
|
| + Inspired by
|
| + http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
|
| +
|
| + Unlike subprocess, yield_any(), recv_*(), communicate() will close stdout and
|
| + stderr once the child process closes them, after all the data is read.
|
| +
|
| + Arguments:
|
| + - detached: If True, the process is created in a new process group. On
|
| + Windows, use CREATE_NEW_PROCESS_GROUP. On posix, use os.setpgid(0, 0).
|
| +
|
| + Additional members:
|
| + - start: timestamp when this process started.
|
| + - end: timestamp when this process exited, as seen by this process.
|
| + - detached: If True, the child process was started as a detached process.
|
| + - gid: process group id, if any.
|
| + - duration: time in seconds the process lasted.
|
| +
|
| + Additional methods:
|
| + - yield_any(): yields output until the process terminates.
|
| + - recv_any(): reads from stdout and/or stderr with optional timeout.
|
| + - recv_out() & recv_err(): specialized version of recv_any().
|
| + """
|
| + # subprocess.Popen.__init__() is not threadsafe; there is a race between
|
| + # creating the exec-error pipe for the child and setting it to CLOEXEC during
|
| + # which another thread can fork and cause the pipe to be inherited by its
|
| + # descendents, which will cause the current Popen to hang until all those
|
| + # descendents exit. Protect this with a lock so that only one fork/exec can
|
| + # happen at a time.
|
| + popen_lock = threading.Lock()
|
| +
|
| + def __init__(self, args, **kwargs):
|
| + assert 'creationflags' not in kwargs
|
| + assert 'preexec_fn' not in kwargs, 'Use detached=True instead'
|
| + self.start = time.time()
|
| + self.end = None
|
| + self.gid = None
|
| + self.detached = kwargs.pop('detached', False)
|
| + if self.detached:
|
| + if subprocess.mswindows:
|
| + kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
|
| + else:
|
| + kwargs['preexec_fn'] = lambda: os.setpgid(0, 0)
|
| + with self.popen_lock:
|
| + super(Popen, self).__init__(args, **kwargs)
|
| + self.args = args
|
| + if self.detached and not subprocess.mswindows:
|
| + try:
|
| + self.gid = os.getpgid(self.pid)
|
| + except OSError:
|
| + # sometimes the process can run+finish before we collect its pgid. fun.
|
| + pass
|
| +
|
| + def duration(self):
|
| + """Duration of the child process.
|
| +
|
| + It is greater or equal to the actual time the child process ran. It can be
|
| + significantly higher than the real value if neither .wait() nor .poll() was
|
| + used.
|
| + """
|
| + return (self.end or time.time()) - self.start
|
| +
|
| + # pylint: disable=arguments-differ,redefined-builtin
|
| + def communicate(self, input=None, timeout=None):
|
| + """Implements python3's timeout support.
|
| +
|
| + Unlike wait(), timeout=0 is considered the same as None.
|
| +
|
| + Raises:
|
| + - TimeoutExpired when more than timeout seconds were spent waiting for the
|
| + process.
|
| + """
|
| + if not timeout:
|
| + return super(Popen, self).communicate(input=input)
|
| +
|
| + assert isinstance(timeout, (int, float)), timeout
|
| +
|
| + if self.stdin or self.stdout or self.stderr:
|
| + stdout = '' if self.stdout else None
|
| + stderr = '' if self.stderr else None
|
| + t = None
|
| + if input is not None:
|
| + assert self.stdin, (
|
| + 'Can\'t use communicate(input) if not using '
|
| + 'Popen(stdin=subprocess42.PIPE')
|
| + # TODO(maruel): Switch back to non-threading.
|
| + def write():
|
| + try:
|
| + self.stdin.write(input)
|
| + except IOError:
|
| + pass
|
| + t = threading.Thread(name='Popen.communicate', target=write)
|
| + t.daemon = True
|
| + t.start()
|
| +
|
| + try:
|
| + if self.stdout or self.stderr:
|
| + start = time.time()
|
| + end = start + timeout
|
| + def remaining():
|
| + return max(end - time.time(), 0)
|
| + for pipe, data in self.yield_any(timeout=remaining):
|
| + if pipe is None:
|
| + raise TimeoutExpired(self.args, timeout, stdout, stderr)
|
| + assert pipe in ('stdout', 'stderr'), pipe
|
| + if pipe == 'stdout':
|
| + stdout += data
|
| + else:
|
| + stderr += data
|
| + else:
|
| + # Only stdin is piped.
|
| + self.wait(timeout=timeout)
|
| + finally:
|
| + if t:
|
| + try:
|
| + self.stdin.close()
|
| + except IOError:
|
| + pass
|
| + t.join()
|
| + else:
|
| + # No pipe. The user wanted to use wait().
|
| + self.wait(timeout=timeout)
|
| + return None, None
|
| +
|
| + # Indirectly initialize self.end.
|
| + self.wait()
|
| + return stdout, stderr
|
| +
|
| + def wait(self, timeout=None): # pylint: disable=arguments-differ
|
| + """Implements python3's timeout support.
|
| +
|
| + Raises:
|
| + - TimeoutExpired when more than timeout seconds were spent waiting for the
|
| + process.
|
| + """
|
| + assert timeout is None or isinstance(timeout, (int, float)), timeout
|
| + if timeout is None:
|
| + super(Popen, self).wait()
|
| + elif self.returncode is None:
|
| + if subprocess.mswindows:
|
| + WAIT_TIMEOUT = 258
|
| + result = subprocess._subprocess.WaitForSingleObject(
|
| + self._handle, int(timeout * 1000))
|
| + if result == WAIT_TIMEOUT:
|
| + raise TimeoutExpired(self.args, timeout)
|
| + self.returncode = subprocess._subprocess.GetExitCodeProcess(
|
| + self._handle)
|
| + else:
|
| + # If you think the following code is horrible, it's because it is
|
| + # inspired by python3's stdlib.
|
| + end = time.time() + timeout
|
| + delay = 0.001
|
| + while True:
|
| + try:
|
| + pid, sts = subprocess._eintr_retry_call(
|
| + os.waitpid, self.pid, os.WNOHANG)
|
| + except OSError as e:
|
| + if e.errno != errno.ECHILD:
|
| + raise
|
| + pid = self.pid
|
| + sts = 0
|
| + if pid == self.pid:
|
| + # This sets self.returncode.
|
| + self._handle_exitstatus(sts)
|
| + break
|
| + remaining = end - time.time()
|
| + if remaining <= 0:
|
| + raise TimeoutExpired(self.args, timeout)
|
| + delay = min(delay * 2, remaining, .05)
|
| + time.sleep(delay)
|
| +
|
| + if not self.end:
|
| + # communicate() uses wait() internally.
|
| + self.end = time.time()
|
| + return self.returncode
|
| +
|
| + def poll(self):
|
| + ret = super(Popen, self).poll()
|
| + if ret is not None and not self.end:
|
| + self.end = time.time()
|
| + return ret
|
| +
|
| + def yield_any(self, maxsize=None, timeout=None):
|
| + """Yields output until the process terminates.
|
| +
|
| + Unlike wait(), does not raise TimeoutExpired.
|
| +
|
| + Yields:
|
| + (pipename, data) where pipename is either 'stdout', 'stderr' or None in
|
| + case of timeout or when the child process closed one of the pipe(s) and
|
| + all pending data on the pipe was read.
|
| +
|
| + Arguments:
|
| + - maxsize: See recv_any(). Can be a callable function.
|
| + - timeout: If None, the call is blocking. If set, yields None, None if no
|
| + data is available within |timeout| seconds. It resets itself after
|
| + each yield. Can be a callable function.
|
| + """
|
| + assert self.stdout or self.stderr
|
| + if timeout is not None:
|
| + # timeout=0 effectively means that the pipe is continuously polled.
|
| + if isinstance(timeout, (int, float)):
|
| + assert timeout >= 0, timeout
|
| + old_timeout = timeout
|
| + timeout = lambda: old_timeout
|
| + else:
|
| + assert callable(timeout), timeout
|
| +
|
| + if maxsize is not None and not callable(maxsize):
|
| + assert isinstance(maxsize, (int, float)), maxsize
|
| +
|
| + last_yield = time.time()
|
| + while self.poll() is None:
|
| + to = (None if timeout is None
|
| + else max(timeout() - (time.time() - last_yield), 0))
|
| + t, data = self.recv_any(
|
| + maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to)
|
| + if data or to is 0:
|
| + yield t, data
|
| + last_yield = time.time()
|
| +
|
| + # Read all remaining output in the pipes.
|
| + # There is 3 cases:
|
| + # - pipes get closed automatically by the calling process before it exits
|
| + # - pipes are closed automated by the OS
|
| + # - pipes are kept open due to grand-children processes outliving the
|
| + # children process.
|
| + while True:
|
| + ms = maxsize
|
| + if callable(maxsize):
|
| + ms = maxsize()
|
| + # timeout=0 is mainly to handle the case where a grand-children process
|
| + # outlives the process started.
|
| + t, data = self.recv_any(maxsize=ms, timeout=0)
|
| + if not data:
|
| + break
|
| + yield t, data
|
| +
|
| + def recv_any(self, maxsize=None, timeout=None):
|
| + """Reads from the first pipe available from stdout and stderr.
|
| +
|
| + Unlike wait(), it does not throw TimeoutExpired.
|
| +
|
| + Arguments:
|
| + - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
|
| + - timeout: If None, it is blocking. If 0 or above, will return None if no
|
| + data is available within |timeout| seconds.
|
| +
|
| + Returns:
|
| + tuple(pipename or None, str(data)). pipename is one of 'stdout' or
|
| + 'stderr'.
|
| + """
|
| + # recv_multi_impl will early exit on a closed connection. Loop accordingly
|
| + # to simplify call sites.
|
| + while True:
|
| + pipes = [
|
| + x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
|
| + ]
|
| + # If both stdout and stderr have the exact file handle, they are
|
| + # effectively the same pipe. Deduplicate it since otherwise it confuses
|
| + # recv_multi_impl().
|
| + if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
|
| + pipes.pop(0)
|
| +
|
| + if not pipes:
|
| + return None, None
|
| + start = time.time()
|
| + conns, names = zip(*pipes)
|
| + index, data, closed = recv_multi_impl(conns, maxsize, timeout)
|
| + if index is None:
|
| + return index, data
|
| + if closed:
|
| + self._close(names[index])
|
| + if not data:
|
| + # Loop again. The other pipe may still be open.
|
| + if timeout:
|
| + timeout -= (time.time() - start)
|
| + continue
|
| +
|
| + if self.universal_newlines and data:
|
| + data = self._translate_newlines(data)
|
| + return names[index], data
|
| +
|
| + def recv_out(self, maxsize=None, timeout=None):
|
| + """Reads from stdout synchronously with timeout."""
|
| + return self._recv('stdout', maxsize, timeout)
|
| +
|
| + def recv_err(self, maxsize=None, timeout=None):
|
| + """Reads from stderr synchronously with timeout."""
|
| + return self._recv('stderr', maxsize, timeout)
|
| +
|
| + def terminate(self):
|
| + """Tries to do something saner on Windows that the stdlib.
|
| +
|
| + Windows:
|
| + self.detached/CREATE_NEW_PROCESS_GROUP determines what can be used:
|
| + - If set, only SIGBREAK can be sent and it is sent to a single process.
|
| + - If not set, in theory only SIGINT can be used and *all processes* in
|
| + the processgroup receive it. In practice, we just kill the process.
|
| + See http://msdn.microsoft.com/library/windows/desktop/ms683155.aspx
|
| + The default on Windows is to call TerminateProcess() always, which is not
|
| + useful.
|
| +
|
| + On Posix, always send SIGTERM.
|
| + """
|
| + try:
|
| + if subprocess.mswindows and self.detached:
|
| + return self.send_signal(signal.CTRL_BREAK_EVENT)
|
| + super(Popen, self).terminate()
|
| + except OSError:
|
| + # The function will throw if the process terminated in-between. Swallow
|
| + # this.
|
| + pass
|
| +
|
| + def kill(self):
|
| + """Kills the process and its children if possible.
|
| +
|
| + Swallows exceptions and return True on success.
|
| + """
|
| + if self.gid:
|
| + try:
|
| + os.killpg(self.gid, signal.SIGKILL)
|
| + except OSError:
|
| + return False
|
| + else:
|
| + try:
|
| + super(Popen, self).kill()
|
| + except OSError:
|
| + return False
|
| + return True
|
| +
|
| + def _close(self, which):
|
| + """Closes either stdout or stderr."""
|
| + getattr(self, which).close()
|
| + setattr(self, which, None)
|
| +
|
| + def _recv(self, which, maxsize, timeout):
|
| + """Reads from one of stdout or stderr synchronously with timeout."""
|
| + conn = getattr(self, which)
|
| + if conn is None:
|
| + return None
|
| + _, data, closed = recv_multi_impl([conn], maxsize, timeout)
|
| + if closed:
|
| + self._close(which)
|
| + if self.universal_newlines and data:
|
| + data = self._translate_newlines(data)
|
| + return data
|
| +
|
| +
|
| +@contextlib.contextmanager
|
| +def set_signal_handler(signals, handler):
|
| + """Temporarilly override signals handler.
|
| +
|
| + Useful when waiting for a child process to handle signals like SIGTERM, so the
|
| + signal can be propagated to the child process.
|
| + """
|
| + previous = {s: signal.signal(s, handler) for s in signals}
|
| + try:
|
| + yield
|
| + finally:
|
| + for sig, h in previous.iteritems():
|
| + signal.signal(sig, h)
|
| +
|
| +
|
| +def call(*args, **kwargs):
|
| + """Adds support for timeout."""
|
| + timeout = kwargs.pop('timeout', None)
|
| + return Popen(*args, **kwargs).wait(timeout)
|
| +
|
| +
|
| +def check_call(*args, **kwargs):
|
| + """Adds support for timeout."""
|
| + retcode = call(*args, **kwargs)
|
| + if retcode:
|
| + raise CalledProcessError(retcode, kwargs.get('args') or args[0])
|
| + return 0
|
| +
|
| +
|
| +def check_output(*args, **kwargs):
|
| + """Adds support for timeout."""
|
| + timeout = kwargs.pop('timeout', None)
|
| + if 'stdout' in kwargs:
|
| + raise ValueError('stdout argument not allowed, it will be overridden.')
|
| + process = Popen(stdout=PIPE, *args, **kwargs)
|
| + output, _ = process.communicate(timeout=timeout)
|
| + retcode = process.poll()
|
| + if retcode:
|
| + raise CalledProcessError(retcode, kwargs.get('args') or args[0], output)
|
| + return output
|
| +
|
| +
|
| +def call_with_timeout(args, timeout, **kwargs):
|
| + """Runs an executable; kill it in case of timeout."""
|
| + proc = Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
|
| + try:
|
| + out, err = proc.communicate(timeout=timeout)
|
| + except TimeoutExpired as e:
|
| + out = e.output
|
| + err = e.stderr
|
| + proc.kill()
|
| + proc.wait()
|
| + return out, err, proc.returncode, proc.duration()
|
|
|