Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(121)

Unified Diff: subprocess42.py

Issue 2295043003: Revert of It's time to bring subprocess42 to depot_tools. (Closed)
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: subprocess42.py
diff --git a/subprocess42.py b/subprocess42.py
deleted file mode 100644
index ba2769636eea52be60582a5e44a43b950a4b83c9..0000000000000000000000000000000000000000
--- a/subprocess42.py
+++ /dev/null
@@ -1,607 +0,0 @@
-# Copyright 2013 The LUCI Authors. All rights reserved.
-# Use of this source code is governed by the Apache v2.0 license that can be
-# found in the LICENSE file.
-
-# This is a direct vendored copy of the original which is located at:
-# https://github.com/luci/luci-py/blob/master/client/utils/subprocess42.py
-# at commit 45eab10a17a55f1978f3bfa02c35457dee1c64e4.
-
-"""subprocess42 is the answer to life the universe and everything.
-
-It has the particularity of having a Popen implementation that can yield output
-as it is produced while implementing a timeout and NOT requiring the use of
-worker threads.
-
-Example:
- Wait for a child process with a timeout, send SIGTERM, wait a grace period
- then send SIGKILL:
-
- def wait_terminate_then_kill(proc, timeout, grace):
- try:
- return proc.wait(timeout)
- except subprocess42.TimeoutExpired:
- proc.terminate()
- try:
- return proc.wait(grace)
- except subprocess42.TimeoutExpired:
- proc.kill()
- return proc.wait()
-
-
-TODO(maruel): Add VOID support like subprocess2.
-"""
-
-import contextlib
-import errno
-import os
-import signal
-import threading
-import time
-
-import subprocess
-
-from subprocess import CalledProcessError, PIPE, STDOUT # pylint: disable=W0611
-from subprocess import list2cmdline
-
-
-# Default maxsize argument.
-MAX_SIZE = 16384
-
-
-if subprocess.mswindows:
- import msvcrt # pylint: disable=F0401
- from ctypes import wintypes
- from ctypes import windll
-
-
- # Which to be received depends on how this process was called and outside the
- # control of this script. See Popen docstring for more details.
- STOP_SIGNALS = (signal.SIGBREAK, signal.SIGTERM)
-
-
- def ReadFile(handle, desired_bytes):
- """Calls kernel32.ReadFile()."""
- c_read = wintypes.DWORD()
- buff = wintypes.create_string_buffer(desired_bytes+1)
- windll.kernel32.ReadFile(
- handle, buff, desired_bytes, wintypes.byref(c_read), None)
- # NULL terminate it.
- buff[c_read.value] = '\x00'
- return wintypes.GetLastError(), buff.value
-
- def PeekNamedPipe(handle):
- """Calls kernel32.PeekNamedPipe(). Simplified version."""
- c_avail = wintypes.DWORD()
- c_message = wintypes.DWORD()
- success = windll.kernel32.PeekNamedPipe(
- handle, None, 0, None, wintypes.byref(c_avail),
- wintypes.byref(c_message))
- if not success:
- raise OSError(wintypes.GetLastError())
- return c_avail.value
-
- def recv_multi_impl(conns, maxsize, timeout):
- """Reads from the first available pipe.
-
- It will immediately return on a closed connection, independent of timeout.
-
- Arguments:
- - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
- - timeout: If None, it is blocking. If 0 or above, will return None if no
- data is available within |timeout| seconds.
-
- Returns:
- tuple(int(index), str(data), bool(closed)).
- """
- assert conns
- assert timeout is None or isinstance(timeout, (int, float)), timeout
- maxsize = max(maxsize or MAX_SIZE, 1)
-
- # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
- # for proc.stdout and proc.stderr but they are implemented as named pipes on
- # Windows. Since named pipes are not waitable object, they can't be passed
- # as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile()
- # and finally WFMO(). This requires caching the events handles in the Popen
- # object and remembering the pending ReadFile() calls. This will require
- # some re-architecture to store the relevant event handle and OVERLAPPEDIO
- # object in Popen or the file object.
- start = time.time()
- handles = [
- (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
- ]
- while True:
- for index, handle in handles:
- try:
- avail = min(PeekNamedPipe(handle), maxsize)
- if avail:
- return index, ReadFile(handle, avail)[1], False
- except OSError:
- # The pipe closed.
- return index, None, True
-
- if timeout is not None and (time.time() - start) >= timeout:
- return None, None, False
- # Polling rocks.
- time.sleep(0.001)
-
-else:
- import fcntl # pylint: disable=F0401
- import select
-
-
- # Signals that mean this process should exit quickly.
- STOP_SIGNALS = (signal.SIGINT, signal.SIGTERM)
-
-
- def recv_multi_impl(conns, maxsize, timeout):
- """Reads from the first available pipe.
-
- It will immediately return on a closed connection, independent of timeout.
-
- Arguments:
- - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
- - timeout: If None, it is blocking. If 0 or above, will return None if no
- data is available within |timeout| seconds.
-
- Returns:
- tuple(int(index), str(data), bool(closed)).
- """
- assert conns
- assert timeout is None or isinstance(timeout, (int, float)), timeout
- maxsize = max(maxsize or MAX_SIZE, 1)
-
- # select(timeout=0) will block, it has to be a value > 0.
- if timeout == 0:
- timeout = 0.001
- try:
- r, _, _ = select.select(conns, [], [], timeout)
- except select.error:
- r = None
- if not r:
- return None, None, False
-
- conn = r[0]
- # Temporarily make it non-blocking.
- # TODO(maruel): This is not very efficient when the caller is doing this in
- # a loop. Add a mechanism to have the caller handle this.
- flags = fcntl.fcntl(conn, fcntl.F_GETFL)
- if not conn.closed:
- # pylint: disable=E1101
- fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
- try:
- try:
- data = conn.read(maxsize)
- except IOError as e:
- # On posix, this means the read would block.
- if e.errno == errno.EAGAIN:
- return conns.index(conn), None, False
- raise e
-
- if not data:
- # On posix, this means the channel closed.
- return conns.index(conn), None, True
-
- return conns.index(conn), data, False
- finally:
- if not conn.closed:
- fcntl.fcntl(conn, fcntl.F_SETFL, flags)
-
-
-class TimeoutExpired(Exception):
- """Compatible with python3 subprocess."""
- def __init__(self, cmd, timeout, output=None, stderr=None):
- self.cmd = cmd
- self.timeout = timeout
- self.output = output
- # Non-standard:
- self.stderr = stderr
- super(TimeoutExpired, self).__init__(str(self))
-
- def __str__(self):
- return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout)
-
-
-class Popen(subprocess.Popen):
- """Adds timeout support on stdout and stderr.
-
- Inspired by
- http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
-
- Unlike subprocess, yield_any(), recv_*(), communicate() will close stdout and
- stderr once the child process closes them, after all the data is read.
-
- Arguments:
- - detached: If True, the process is created in a new process group. On
- Windows, use CREATE_NEW_PROCESS_GROUP. On posix, use os.setpgid(0, 0).
-
- Additional members:
- - start: timestamp when this process started.
- - end: timestamp when this process exited, as seen by this process.
- - detached: If True, the child process was started as a detached process.
- - gid: process group id, if any.
- - duration: time in seconds the process lasted.
-
- Additional methods:
- - yield_any(): yields output until the process terminates.
- - recv_any(): reads from stdout and/or stderr with optional timeout.
- - recv_out() & recv_err(): specialized version of recv_any().
- """
- # subprocess.Popen.__init__() is not threadsafe; there is a race between
- # creating the exec-error pipe for the child and setting it to CLOEXEC during
- # which another thread can fork and cause the pipe to be inherited by its
- # descendents, which will cause the current Popen to hang until all those
- # descendents exit. Protect this with a lock so that only one fork/exec can
- # happen at a time.
- popen_lock = threading.Lock()
-
- def __init__(self, args, **kwargs):
- assert 'creationflags' not in kwargs
- assert 'preexec_fn' not in kwargs, 'Use detached=True instead'
- self.start = time.time()
- self.end = None
- self.gid = None
- self.detached = kwargs.pop('detached', False)
- if self.detached:
- if subprocess.mswindows:
- kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
- else:
- kwargs['preexec_fn'] = lambda: os.setpgid(0, 0)
- with self.popen_lock:
- super(Popen, self).__init__(args, **kwargs)
- self.args = args
- if self.detached and not subprocess.mswindows:
- try:
- self.gid = os.getpgid(self.pid)
- except OSError:
- # sometimes the process can run+finish before we collect its pgid. fun.
- pass
-
- def duration(self):
- """Duration of the child process.
-
- It is greater or equal to the actual time the child process ran. It can be
- significantly higher than the real value if neither .wait() nor .poll() was
- used.
- """
- return (self.end or time.time()) - self.start
-
- # pylint: disable=arguments-differ,redefined-builtin
- def communicate(self, input=None, timeout=None):
- """Implements python3's timeout support.
-
- Unlike wait(), timeout=0 is considered the same as None.
-
- Raises:
- - TimeoutExpired when more than timeout seconds were spent waiting for the
- process.
- """
- if not timeout:
- return super(Popen, self).communicate(input=input)
-
- assert isinstance(timeout, (int, float)), timeout
-
- if self.stdin or self.stdout or self.stderr:
- stdout = '' if self.stdout else None
- stderr = '' if self.stderr else None
- t = None
- if input is not None:
- assert self.stdin, (
- 'Can\'t use communicate(input) if not using '
- 'Popen(stdin=subprocess42.PIPE')
- # TODO(maruel): Switch back to non-threading.
- def write():
- try:
- self.stdin.write(input)
- except IOError:
- pass
- t = threading.Thread(name='Popen.communicate', target=write)
- t.daemon = True
- t.start()
-
- try:
- if self.stdout or self.stderr:
- start = time.time()
- end = start + timeout
- def remaining():
- return max(end - time.time(), 0)
- for pipe, data in self.yield_any(timeout=remaining):
- if pipe is None:
- raise TimeoutExpired(self.args, timeout, stdout, stderr)
- assert pipe in ('stdout', 'stderr'), pipe
- if pipe == 'stdout':
- stdout += data
- else:
- stderr += data
- else:
- # Only stdin is piped.
- self.wait(timeout=timeout)
- finally:
- if t:
- try:
- self.stdin.close()
- except IOError:
- pass
- t.join()
- else:
- # No pipe. The user wanted to use wait().
- self.wait(timeout=timeout)
- return None, None
-
- # Indirectly initialize self.end.
- self.wait()
- return stdout, stderr
-
- def wait(self, timeout=None): # pylint: disable=arguments-differ
- """Implements python3's timeout support.
-
- Raises:
- - TimeoutExpired when more than timeout seconds were spent waiting for the
- process.
- """
- assert timeout is None or isinstance(timeout, (int, float)), timeout
- if timeout is None:
- super(Popen, self).wait()
- elif self.returncode is None:
- if subprocess.mswindows:
- WAIT_TIMEOUT = 258
- result = subprocess._subprocess.WaitForSingleObject(
- self._handle, int(timeout * 1000))
- if result == WAIT_TIMEOUT:
- raise TimeoutExpired(self.args, timeout)
- self.returncode = subprocess._subprocess.GetExitCodeProcess(
- self._handle)
- else:
- # If you think the following code is horrible, it's because it is
- # inspired by python3's stdlib.
- end = time.time() + timeout
- delay = 0.001
- while True:
- try:
- pid, sts = subprocess._eintr_retry_call(
- os.waitpid, self.pid, os.WNOHANG)
- except OSError as e:
- if e.errno != errno.ECHILD:
- raise
- pid = self.pid
- sts = 0
- if pid == self.pid:
- # This sets self.returncode.
- self._handle_exitstatus(sts)
- break
- remaining = end - time.time()
- if remaining <= 0:
- raise TimeoutExpired(self.args, timeout)
- delay = min(delay * 2, remaining, .05)
- time.sleep(delay)
-
- if not self.end:
- # communicate() uses wait() internally.
- self.end = time.time()
- return self.returncode
-
- def poll(self):
- ret = super(Popen, self).poll()
- if ret is not None and not self.end:
- self.end = time.time()
- return ret
-
- def yield_any(self, maxsize=None, timeout=None):
- """Yields output until the process terminates.
-
- Unlike wait(), does not raise TimeoutExpired.
-
- Yields:
- (pipename, data) where pipename is either 'stdout', 'stderr' or None in
- case of timeout or when the child process closed one of the pipe(s) and
- all pending data on the pipe was read.
-
- Arguments:
- - maxsize: See recv_any(). Can be a callable function.
- - timeout: If None, the call is blocking. If set, yields None, None if no
- data is available within |timeout| seconds. It resets itself after
- each yield. Can be a callable function.
- """
- assert self.stdout or self.stderr
- if timeout is not None:
- # timeout=0 effectively means that the pipe is continuously polled.
- if isinstance(timeout, (int, float)):
- assert timeout >= 0, timeout
- old_timeout = timeout
- timeout = lambda: old_timeout
- else:
- assert callable(timeout), timeout
-
- if maxsize is not None and not callable(maxsize):
- assert isinstance(maxsize, (int, float)), maxsize
-
- last_yield = time.time()
- while self.poll() is None:
- to = (None if timeout is None
- else max(timeout() - (time.time() - last_yield), 0))
- t, data = self.recv_any(
- maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to)
- if data or to is 0:
- yield t, data
- last_yield = time.time()
-
- # Read all remaining output in the pipes.
- # There is 3 cases:
- # - pipes get closed automatically by the calling process before it exits
- # - pipes are closed automated by the OS
- # - pipes are kept open due to grand-children processes outliving the
- # children process.
- while True:
- ms = maxsize
- if callable(maxsize):
- ms = maxsize()
- # timeout=0 is mainly to handle the case where a grand-children process
- # outlives the process started.
- t, data = self.recv_any(maxsize=ms, timeout=0)
- if not data:
- break
- yield t, data
-
- def recv_any(self, maxsize=None, timeout=None):
- """Reads from the first pipe available from stdout and stderr.
-
- Unlike wait(), it does not throw TimeoutExpired.
-
- Arguments:
- - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
- - timeout: If None, it is blocking. If 0 or above, will return None if no
- data is available within |timeout| seconds.
-
- Returns:
- tuple(pipename or None, str(data)). pipename is one of 'stdout' or
- 'stderr'.
- """
- # recv_multi_impl will early exit on a closed connection. Loop accordingly
- # to simplify call sites.
- while True:
- pipes = [
- x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
- ]
- # If both stdout and stderr have the exact file handle, they are
- # effectively the same pipe. Deduplicate it since otherwise it confuses
- # recv_multi_impl().
- if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
- pipes.pop(0)
-
- if not pipes:
- return None, None
- start = time.time()
- conns, names = zip(*pipes)
- index, data, closed = recv_multi_impl(conns, maxsize, timeout)
- if index is None:
- return index, data
- if closed:
- self._close(names[index])
- if not data:
- # Loop again. The other pipe may still be open.
- if timeout:
- timeout -= (time.time() - start)
- continue
-
- if self.universal_newlines and data:
- data = self._translate_newlines(data)
- return names[index], data
-
- def recv_out(self, maxsize=None, timeout=None):
- """Reads from stdout synchronously with timeout."""
- return self._recv('stdout', maxsize, timeout)
-
- def recv_err(self, maxsize=None, timeout=None):
- """Reads from stderr synchronously with timeout."""
- return self._recv('stderr', maxsize, timeout)
-
- def terminate(self):
- """Tries to do something saner on Windows that the stdlib.
-
- Windows:
- self.detached/CREATE_NEW_PROCESS_GROUP determines what can be used:
- - If set, only SIGBREAK can be sent and it is sent to a single process.
- - If not set, in theory only SIGINT can be used and *all processes* in
- the processgroup receive it. In practice, we just kill the process.
- See http://msdn.microsoft.com/library/windows/desktop/ms683155.aspx
- The default on Windows is to call TerminateProcess() always, which is not
- useful.
-
- On Posix, always send SIGTERM.
- """
- try:
- if subprocess.mswindows and self.detached:
- return self.send_signal(signal.CTRL_BREAK_EVENT)
- super(Popen, self).terminate()
- except OSError:
- # The function will throw if the process terminated in-between. Swallow
- # this.
- pass
-
- def kill(self):
- """Kills the process and its children if possible.
-
- Swallows exceptions and return True on success.
- """
- if self.gid:
- try:
- os.killpg(self.gid, signal.SIGKILL)
- except OSError:
- return False
- else:
- try:
- super(Popen, self).kill()
- except OSError:
- return False
- return True
-
- def _close(self, which):
- """Closes either stdout or stderr."""
- getattr(self, which).close()
- setattr(self, which, None)
-
- def _recv(self, which, maxsize, timeout):
- """Reads from one of stdout or stderr synchronously with timeout."""
- conn = getattr(self, which)
- if conn is None:
- return None
- _, data, closed = recv_multi_impl([conn], maxsize, timeout)
- if closed:
- self._close(which)
- if self.universal_newlines and data:
- data = self._translate_newlines(data)
- return data
-
-
-@contextlib.contextmanager
-def set_signal_handler(signals, handler):
- """Temporarilly override signals handler.
-
- Useful when waiting for a child process to handle signals like SIGTERM, so the
- signal can be propagated to the child process.
- """
- previous = {s: signal.signal(s, handler) for s in signals}
- try:
- yield
- finally:
- for sig, h in previous.iteritems():
- signal.signal(sig, h)
-
-
-def call(*args, **kwargs):
- """Adds support for timeout."""
- timeout = kwargs.pop('timeout', None)
- return Popen(*args, **kwargs).wait(timeout)
-
-
-def check_call(*args, **kwargs):
- """Adds support for timeout."""
- retcode = call(*args, **kwargs)
- if retcode:
- raise CalledProcessError(retcode, kwargs.get('args') or args[0])
- return 0
-
-
-def check_output(*args, **kwargs):
- """Adds support for timeout."""
- timeout = kwargs.pop('timeout', None)
- if 'stdout' in kwargs:
- raise ValueError('stdout argument not allowed, it will be overridden.')
- process = Popen(stdout=PIPE, *args, **kwargs)
- output, _ = process.communicate(timeout=timeout)
- retcode = process.poll()
- if retcode:
- raise CalledProcessError(retcode, kwargs.get('args') or args[0], output)
- return output
-
-
-def call_with_timeout(args, timeout, **kwargs):
- """Runs an executable; kill it in case of timeout."""
- proc = Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
- try:
- out, err = proc.communicate(timeout=timeout)
- except TimeoutExpired as e:
- out = e.output
- err = e.stderr
- proc.kill()
- proc.wait()
- return out, err, proc.returncode, proc.duration()
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698