Index: subprocess42.py |
diff --git a/subprocess42.py b/subprocess42.py |
deleted file mode 100644 |
index ba2769636eea52be60582a5e44a43b950a4b83c9..0000000000000000000000000000000000000000 |
--- a/subprocess42.py |
+++ /dev/null |
@@ -1,607 +0,0 @@ |
-# Copyright 2013 The LUCI Authors. All rights reserved. |
-# Use of this source code is governed by the Apache v2.0 license that can be |
-# found in the LICENSE file. |
- |
-# This is a direct vendored copy of the original which is located at: |
-# https://github.com/luci/luci-py/blob/master/client/utils/subprocess42.py |
-# at commit 45eab10a17a55f1978f3bfa02c35457dee1c64e4. |
- |
-"""subprocess42 is the answer to life the universe and everything. |
- |
-It has the particularity of having a Popen implementation that can yield output |
-as it is produced while implementing a timeout and NOT requiring the use of |
-worker threads. |
- |
-Example: |
- Wait for a child process with a timeout, send SIGTERM, wait a grace period |
- then send SIGKILL: |
- |
- def wait_terminate_then_kill(proc, timeout, grace): |
- try: |
- return proc.wait(timeout) |
- except subprocess42.TimeoutExpired: |
- proc.terminate() |
- try: |
- return proc.wait(grace) |
- except subprocess42.TimeoutExpired: |
- proc.kill() |
- return proc.wait() |
- |
- |
-TODO(maruel): Add VOID support like subprocess2. |
-""" |
- |
-import contextlib |
-import errno |
-import os |
-import signal |
-import threading |
-import time |
- |
-import subprocess |
- |
-from subprocess import CalledProcessError, PIPE, STDOUT # pylint: disable=W0611 |
-from subprocess import list2cmdline |
- |
- |
-# Default maxsize argument. |
-MAX_SIZE = 16384 |
- |
- |
-if subprocess.mswindows: |
- import msvcrt # pylint: disable=F0401 |
- from ctypes import wintypes |
- from ctypes import windll |
- |
- |
- # Which to be received depends on how this process was called and outside the |
- # control of this script. See Popen docstring for more details. |
- STOP_SIGNALS = (signal.SIGBREAK, signal.SIGTERM) |
- |
- |
- def ReadFile(handle, desired_bytes): |
- """Calls kernel32.ReadFile().""" |
- c_read = wintypes.DWORD() |
- buff = wintypes.create_string_buffer(desired_bytes+1) |
- windll.kernel32.ReadFile( |
- handle, buff, desired_bytes, wintypes.byref(c_read), None) |
- # NULL terminate it. |
- buff[c_read.value] = '\x00' |
- return wintypes.GetLastError(), buff.value |
- |
- def PeekNamedPipe(handle): |
- """Calls kernel32.PeekNamedPipe(). Simplified version.""" |
- c_avail = wintypes.DWORD() |
- c_message = wintypes.DWORD() |
- success = windll.kernel32.PeekNamedPipe( |
- handle, None, 0, None, wintypes.byref(c_avail), |
- wintypes.byref(c_message)) |
- if not success: |
- raise OSError(wintypes.GetLastError()) |
- return c_avail.value |
- |
- def recv_multi_impl(conns, maxsize, timeout): |
- """Reads from the first available pipe. |
- |
- It will immediately return on a closed connection, independent of timeout. |
- |
- Arguments: |
- - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. |
- - timeout: If None, it is blocking. If 0 or above, will return None if no |
- data is available within |timeout| seconds. |
- |
- Returns: |
- tuple(int(index), str(data), bool(closed)). |
- """ |
- assert conns |
- assert timeout is None or isinstance(timeout, (int, float)), timeout |
- maxsize = max(maxsize or MAX_SIZE, 1) |
- |
- # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes |
- # for proc.stdout and proc.stderr but they are implemented as named pipes on |
- # Windows. Since named pipes are not waitable object, they can't be passed |
- # as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile() |
- # and finally WFMO(). This requires caching the events handles in the Popen |
- # object and remembering the pending ReadFile() calls. This will require |
- # some re-architecture to store the relevant event handle and OVERLAPPEDIO |
- # object in Popen or the file object. |
- start = time.time() |
- handles = [ |
- (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns) |
- ] |
- while True: |
- for index, handle in handles: |
- try: |
- avail = min(PeekNamedPipe(handle), maxsize) |
- if avail: |
- return index, ReadFile(handle, avail)[1], False |
- except OSError: |
- # The pipe closed. |
- return index, None, True |
- |
- if timeout is not None and (time.time() - start) >= timeout: |
- return None, None, False |
- # Polling rocks. |
- time.sleep(0.001) |
- |
-else: |
- import fcntl # pylint: disable=F0401 |
- import select |
- |
- |
- # Signals that mean this process should exit quickly. |
- STOP_SIGNALS = (signal.SIGINT, signal.SIGTERM) |
- |
- |
- def recv_multi_impl(conns, maxsize, timeout): |
- """Reads from the first available pipe. |
- |
- It will immediately return on a closed connection, independent of timeout. |
- |
- Arguments: |
- - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. |
- - timeout: If None, it is blocking. If 0 or above, will return None if no |
- data is available within |timeout| seconds. |
- |
- Returns: |
- tuple(int(index), str(data), bool(closed)). |
- """ |
- assert conns |
- assert timeout is None or isinstance(timeout, (int, float)), timeout |
- maxsize = max(maxsize or MAX_SIZE, 1) |
- |
- # select(timeout=0) will block, it has to be a value > 0. |
- if timeout == 0: |
- timeout = 0.001 |
- try: |
- r, _, _ = select.select(conns, [], [], timeout) |
- except select.error: |
- r = None |
- if not r: |
- return None, None, False |
- |
- conn = r[0] |
- # Temporarily make it non-blocking. |
- # TODO(maruel): This is not very efficient when the caller is doing this in |
- # a loop. Add a mechanism to have the caller handle this. |
- flags = fcntl.fcntl(conn, fcntl.F_GETFL) |
- if not conn.closed: |
- # pylint: disable=E1101 |
- fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
- try: |
- try: |
- data = conn.read(maxsize) |
- except IOError as e: |
- # On posix, this means the read would block. |
- if e.errno == errno.EAGAIN: |
- return conns.index(conn), None, False |
- raise e |
- |
- if not data: |
- # On posix, this means the channel closed. |
- return conns.index(conn), None, True |
- |
- return conns.index(conn), data, False |
- finally: |
- if not conn.closed: |
- fcntl.fcntl(conn, fcntl.F_SETFL, flags) |
- |
- |
-class TimeoutExpired(Exception): |
- """Compatible with python3 subprocess.""" |
- def __init__(self, cmd, timeout, output=None, stderr=None): |
- self.cmd = cmd |
- self.timeout = timeout |
- self.output = output |
- # Non-standard: |
- self.stderr = stderr |
- super(TimeoutExpired, self).__init__(str(self)) |
- |
- def __str__(self): |
- return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout) |
- |
- |
-class Popen(subprocess.Popen): |
- """Adds timeout support on stdout and stderr. |
- |
- Inspired by |
- http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/ |
- |
- Unlike subprocess, yield_any(), recv_*(), communicate() will close stdout and |
- stderr once the child process closes them, after all the data is read. |
- |
- Arguments: |
- - detached: If True, the process is created in a new process group. On |
- Windows, use CREATE_NEW_PROCESS_GROUP. On posix, use os.setpgid(0, 0). |
- |
- Additional members: |
- - start: timestamp when this process started. |
- - end: timestamp when this process exited, as seen by this process. |
- - detached: If True, the child process was started as a detached process. |
- - gid: process group id, if any. |
- - duration: time in seconds the process lasted. |
- |
- Additional methods: |
- - yield_any(): yields output until the process terminates. |
- - recv_any(): reads from stdout and/or stderr with optional timeout. |
- - recv_out() & recv_err(): specialized version of recv_any(). |
- """ |
- # subprocess.Popen.__init__() is not threadsafe; there is a race between |
- # creating the exec-error pipe for the child and setting it to CLOEXEC during |
- # which another thread can fork and cause the pipe to be inherited by its |
- # descendents, which will cause the current Popen to hang until all those |
- # descendents exit. Protect this with a lock so that only one fork/exec can |
- # happen at a time. |
- popen_lock = threading.Lock() |
- |
- def __init__(self, args, **kwargs): |
- assert 'creationflags' not in kwargs |
- assert 'preexec_fn' not in kwargs, 'Use detached=True instead' |
- self.start = time.time() |
- self.end = None |
- self.gid = None |
- self.detached = kwargs.pop('detached', False) |
- if self.detached: |
- if subprocess.mswindows: |
- kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP |
- else: |
- kwargs['preexec_fn'] = lambda: os.setpgid(0, 0) |
- with self.popen_lock: |
- super(Popen, self).__init__(args, **kwargs) |
- self.args = args |
- if self.detached and not subprocess.mswindows: |
- try: |
- self.gid = os.getpgid(self.pid) |
- except OSError: |
- # sometimes the process can run+finish before we collect its pgid. fun. |
- pass |
- |
- def duration(self): |
- """Duration of the child process. |
- |
- It is greater or equal to the actual time the child process ran. It can be |
- significantly higher than the real value if neither .wait() nor .poll() was |
- used. |
- """ |
- return (self.end or time.time()) - self.start |
- |
- # pylint: disable=arguments-differ,redefined-builtin |
- def communicate(self, input=None, timeout=None): |
- """Implements python3's timeout support. |
- |
- Unlike wait(), timeout=0 is considered the same as None. |
- |
- Raises: |
- - TimeoutExpired when more than timeout seconds were spent waiting for the |
- process. |
- """ |
- if not timeout: |
- return super(Popen, self).communicate(input=input) |
- |
- assert isinstance(timeout, (int, float)), timeout |
- |
- if self.stdin or self.stdout or self.stderr: |
- stdout = '' if self.stdout else None |
- stderr = '' if self.stderr else None |
- t = None |
- if input is not None: |
- assert self.stdin, ( |
- 'Can\'t use communicate(input) if not using ' |
- 'Popen(stdin=subprocess42.PIPE') |
- # TODO(maruel): Switch back to non-threading. |
- def write(): |
- try: |
- self.stdin.write(input) |
- except IOError: |
- pass |
- t = threading.Thread(name='Popen.communicate', target=write) |
- t.daemon = True |
- t.start() |
- |
- try: |
- if self.stdout or self.stderr: |
- start = time.time() |
- end = start + timeout |
- def remaining(): |
- return max(end - time.time(), 0) |
- for pipe, data in self.yield_any(timeout=remaining): |
- if pipe is None: |
- raise TimeoutExpired(self.args, timeout, stdout, stderr) |
- assert pipe in ('stdout', 'stderr'), pipe |
- if pipe == 'stdout': |
- stdout += data |
- else: |
- stderr += data |
- else: |
- # Only stdin is piped. |
- self.wait(timeout=timeout) |
- finally: |
- if t: |
- try: |
- self.stdin.close() |
- except IOError: |
- pass |
- t.join() |
- else: |
- # No pipe. The user wanted to use wait(). |
- self.wait(timeout=timeout) |
- return None, None |
- |
- # Indirectly initialize self.end. |
- self.wait() |
- return stdout, stderr |
- |
- def wait(self, timeout=None): # pylint: disable=arguments-differ |
- """Implements python3's timeout support. |
- |
- Raises: |
- - TimeoutExpired when more than timeout seconds were spent waiting for the |
- process. |
- """ |
- assert timeout is None or isinstance(timeout, (int, float)), timeout |
- if timeout is None: |
- super(Popen, self).wait() |
- elif self.returncode is None: |
- if subprocess.mswindows: |
- WAIT_TIMEOUT = 258 |
- result = subprocess._subprocess.WaitForSingleObject( |
- self._handle, int(timeout * 1000)) |
- if result == WAIT_TIMEOUT: |
- raise TimeoutExpired(self.args, timeout) |
- self.returncode = subprocess._subprocess.GetExitCodeProcess( |
- self._handle) |
- else: |
- # If you think the following code is horrible, it's because it is |
- # inspired by python3's stdlib. |
- end = time.time() + timeout |
- delay = 0.001 |
- while True: |
- try: |
- pid, sts = subprocess._eintr_retry_call( |
- os.waitpid, self.pid, os.WNOHANG) |
- except OSError as e: |
- if e.errno != errno.ECHILD: |
- raise |
- pid = self.pid |
- sts = 0 |
- if pid == self.pid: |
- # This sets self.returncode. |
- self._handle_exitstatus(sts) |
- break |
- remaining = end - time.time() |
- if remaining <= 0: |
- raise TimeoutExpired(self.args, timeout) |
- delay = min(delay * 2, remaining, .05) |
- time.sleep(delay) |
- |
- if not self.end: |
- # communicate() uses wait() internally. |
- self.end = time.time() |
- return self.returncode |
- |
- def poll(self): |
- ret = super(Popen, self).poll() |
- if ret is not None and not self.end: |
- self.end = time.time() |
- return ret |
- |
- def yield_any(self, maxsize=None, timeout=None): |
- """Yields output until the process terminates. |
- |
- Unlike wait(), does not raise TimeoutExpired. |
- |
- Yields: |
- (pipename, data) where pipename is either 'stdout', 'stderr' or None in |
- case of timeout or when the child process closed one of the pipe(s) and |
- all pending data on the pipe was read. |
- |
- Arguments: |
- - maxsize: See recv_any(). Can be a callable function. |
- - timeout: If None, the call is blocking. If set, yields None, None if no |
- data is available within |timeout| seconds. It resets itself after |
- each yield. Can be a callable function. |
- """ |
- assert self.stdout or self.stderr |
- if timeout is not None: |
- # timeout=0 effectively means that the pipe is continuously polled. |
- if isinstance(timeout, (int, float)): |
- assert timeout >= 0, timeout |
- old_timeout = timeout |
- timeout = lambda: old_timeout |
- else: |
- assert callable(timeout), timeout |
- |
- if maxsize is not None and not callable(maxsize): |
- assert isinstance(maxsize, (int, float)), maxsize |
- |
- last_yield = time.time() |
- while self.poll() is None: |
- to = (None if timeout is None |
- else max(timeout() - (time.time() - last_yield), 0)) |
- t, data = self.recv_any( |
- maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to) |
- if data or to is 0: |
- yield t, data |
- last_yield = time.time() |
- |
- # Read all remaining output in the pipes. |
- # There is 3 cases: |
- # - pipes get closed automatically by the calling process before it exits |
- # - pipes are closed automated by the OS |
- # - pipes are kept open due to grand-children processes outliving the |
- # children process. |
- while True: |
- ms = maxsize |
- if callable(maxsize): |
- ms = maxsize() |
- # timeout=0 is mainly to handle the case where a grand-children process |
- # outlives the process started. |
- t, data = self.recv_any(maxsize=ms, timeout=0) |
- if not data: |
- break |
- yield t, data |
- |
- def recv_any(self, maxsize=None, timeout=None): |
- """Reads from the first pipe available from stdout and stderr. |
- |
- Unlike wait(), it does not throw TimeoutExpired. |
- |
- Arguments: |
- - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. |
- - timeout: If None, it is blocking. If 0 or above, will return None if no |
- data is available within |timeout| seconds. |
- |
- Returns: |
- tuple(pipename or None, str(data)). pipename is one of 'stdout' or |
- 'stderr'. |
- """ |
- # recv_multi_impl will early exit on a closed connection. Loop accordingly |
- # to simplify call sites. |
- while True: |
- pipes = [ |
- x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0] |
- ] |
- # If both stdout and stderr have the exact file handle, they are |
- # effectively the same pipe. Deduplicate it since otherwise it confuses |
- # recv_multi_impl(). |
- if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno(): |
- pipes.pop(0) |
- |
- if not pipes: |
- return None, None |
- start = time.time() |
- conns, names = zip(*pipes) |
- index, data, closed = recv_multi_impl(conns, maxsize, timeout) |
- if index is None: |
- return index, data |
- if closed: |
- self._close(names[index]) |
- if not data: |
- # Loop again. The other pipe may still be open. |
- if timeout: |
- timeout -= (time.time() - start) |
- continue |
- |
- if self.universal_newlines and data: |
- data = self._translate_newlines(data) |
- return names[index], data |
- |
- def recv_out(self, maxsize=None, timeout=None): |
- """Reads from stdout synchronously with timeout.""" |
- return self._recv('stdout', maxsize, timeout) |
- |
- def recv_err(self, maxsize=None, timeout=None): |
- """Reads from stderr synchronously with timeout.""" |
- return self._recv('stderr', maxsize, timeout) |
- |
- def terminate(self): |
- """Tries to do something saner on Windows that the stdlib. |
- |
- Windows: |
- self.detached/CREATE_NEW_PROCESS_GROUP determines what can be used: |
- - If set, only SIGBREAK can be sent and it is sent to a single process. |
- - If not set, in theory only SIGINT can be used and *all processes* in |
- the processgroup receive it. In practice, we just kill the process. |
- See http://msdn.microsoft.com/library/windows/desktop/ms683155.aspx |
- The default on Windows is to call TerminateProcess() always, which is not |
- useful. |
- |
- On Posix, always send SIGTERM. |
- """ |
- try: |
- if subprocess.mswindows and self.detached: |
- return self.send_signal(signal.CTRL_BREAK_EVENT) |
- super(Popen, self).terminate() |
- except OSError: |
- # The function will throw if the process terminated in-between. Swallow |
- # this. |
- pass |
- |
- def kill(self): |
- """Kills the process and its children if possible. |
- |
- Swallows exceptions and return True on success. |
- """ |
- if self.gid: |
- try: |
- os.killpg(self.gid, signal.SIGKILL) |
- except OSError: |
- return False |
- else: |
- try: |
- super(Popen, self).kill() |
- except OSError: |
- return False |
- return True |
- |
- def _close(self, which): |
- """Closes either stdout or stderr.""" |
- getattr(self, which).close() |
- setattr(self, which, None) |
- |
- def _recv(self, which, maxsize, timeout): |
- """Reads from one of stdout or stderr synchronously with timeout.""" |
- conn = getattr(self, which) |
- if conn is None: |
- return None |
- _, data, closed = recv_multi_impl([conn], maxsize, timeout) |
- if closed: |
- self._close(which) |
- if self.universal_newlines and data: |
- data = self._translate_newlines(data) |
- return data |
- |
- |
-@contextlib.contextmanager |
-def set_signal_handler(signals, handler): |
- """Temporarilly override signals handler. |
- |
- Useful when waiting for a child process to handle signals like SIGTERM, so the |
- signal can be propagated to the child process. |
- """ |
- previous = {s: signal.signal(s, handler) for s in signals} |
- try: |
- yield |
- finally: |
- for sig, h in previous.iteritems(): |
- signal.signal(sig, h) |
- |
- |
-def call(*args, **kwargs): |
- """Adds support for timeout.""" |
- timeout = kwargs.pop('timeout', None) |
- return Popen(*args, **kwargs).wait(timeout) |
- |
- |
-def check_call(*args, **kwargs): |
- """Adds support for timeout.""" |
- retcode = call(*args, **kwargs) |
- if retcode: |
- raise CalledProcessError(retcode, kwargs.get('args') or args[0]) |
- return 0 |
- |
- |
-def check_output(*args, **kwargs): |
- """Adds support for timeout.""" |
- timeout = kwargs.pop('timeout', None) |
- if 'stdout' in kwargs: |
- raise ValueError('stdout argument not allowed, it will be overridden.') |
- process = Popen(stdout=PIPE, *args, **kwargs) |
- output, _ = process.communicate(timeout=timeout) |
- retcode = process.poll() |
- if retcode: |
- raise CalledProcessError(retcode, kwargs.get('args') or args[0], output) |
- return output |
- |
- |
-def call_with_timeout(args, timeout, **kwargs): |
- """Runs an executable; kill it in case of timeout.""" |
- proc = Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs) |
- try: |
- out, err = proc.communicate(timeout=timeout) |
- except TimeoutExpired as e: |
- out = e.output |
- err = e.stderr |
- proc.kill() |
- proc.wait() |
- return out, err, proc.returncode, proc.duration() |