Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(110)

Side by Side Diff: subprocess42.py

Issue 2295043003: Revert of It's time to bring subprocess42 to depot_tools. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2013 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed by the Apache v2.0 license that can be
3 # found in the LICENSE file.
4
5 # This is a direct vendored copy of the original which is located at:
6 # https://github.com/luci/luci-py/blob/master/client/utils/subprocess42.py
7 # at commit 45eab10a17a55f1978f3bfa02c35457dee1c64e4.
8
9 """subprocess42 is the answer to life the universe and everything.
10
11 It has the particularity of having a Popen implementation that can yield output
12 as it is produced while implementing a timeout and NOT requiring the use of
13 worker threads.
14
15 Example:
16 Wait for a child process with a timeout, send SIGTERM, wait a grace period
17 then send SIGKILL:
18
19 def wait_terminate_then_kill(proc, timeout, grace):
20 try:
21 return proc.wait(timeout)
22 except subprocess42.TimeoutExpired:
23 proc.terminate()
24 try:
25 return proc.wait(grace)
26 except subprocess42.TimeoutExpired:
27 proc.kill()
28 return proc.wait()
29
30
31 TODO(maruel): Add VOID support like subprocess2.
32 """
33
34 import contextlib
35 import errno
36 import os
37 import signal
38 import threading
39 import time
40
41 import subprocess
42
43 from subprocess import CalledProcessError, PIPE, STDOUT # pylint: disable=W0611
44 from subprocess import list2cmdline
45
46
47 # Default maxsize argument.
48 MAX_SIZE = 16384
49
50
51 if subprocess.mswindows:
52 import msvcrt # pylint: disable=F0401
53 from ctypes import wintypes
54 from ctypes import windll
55
56
57 # Which to be received depends on how this process was called and outside the
58 # control of this script. See Popen docstring for more details.
59 STOP_SIGNALS = (signal.SIGBREAK, signal.SIGTERM)
60
61
62 def ReadFile(handle, desired_bytes):
63 """Calls kernel32.ReadFile()."""
64 c_read = wintypes.DWORD()
65 buff = wintypes.create_string_buffer(desired_bytes+1)
66 windll.kernel32.ReadFile(
67 handle, buff, desired_bytes, wintypes.byref(c_read), None)
68 # NULL terminate it.
69 buff[c_read.value] = '\x00'
70 return wintypes.GetLastError(), buff.value
71
72 def PeekNamedPipe(handle):
73 """Calls kernel32.PeekNamedPipe(). Simplified version."""
74 c_avail = wintypes.DWORD()
75 c_message = wintypes.DWORD()
76 success = windll.kernel32.PeekNamedPipe(
77 handle, None, 0, None, wintypes.byref(c_avail),
78 wintypes.byref(c_message))
79 if not success:
80 raise OSError(wintypes.GetLastError())
81 return c_avail.value
82
83 def recv_multi_impl(conns, maxsize, timeout):
84 """Reads from the first available pipe.
85
86 It will immediately return on a closed connection, independent of timeout.
87
88 Arguments:
89 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
90 - timeout: If None, it is blocking. If 0 or above, will return None if no
91 data is available within |timeout| seconds.
92
93 Returns:
94 tuple(int(index), str(data), bool(closed)).
95 """
96 assert conns
97 assert timeout is None or isinstance(timeout, (int, float)), timeout
98 maxsize = max(maxsize or MAX_SIZE, 1)
99
100 # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
101 # for proc.stdout and proc.stderr but they are implemented as named pipes on
102 # Windows. Since named pipes are not waitable object, they can't be passed
103 # as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile()
104 # and finally WFMO(). This requires caching the events handles in the Popen
105 # object and remembering the pending ReadFile() calls. This will require
106 # some re-architecture to store the relevant event handle and OVERLAPPEDIO
107 # object in Popen or the file object.
108 start = time.time()
109 handles = [
110 (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
111 ]
112 while True:
113 for index, handle in handles:
114 try:
115 avail = min(PeekNamedPipe(handle), maxsize)
116 if avail:
117 return index, ReadFile(handle, avail)[1], False
118 except OSError:
119 # The pipe closed.
120 return index, None, True
121
122 if timeout is not None and (time.time() - start) >= timeout:
123 return None, None, False
124 # Polling rocks.
125 time.sleep(0.001)
126
127 else:
128 import fcntl # pylint: disable=F0401
129 import select
130
131
132 # Signals that mean this process should exit quickly.
133 STOP_SIGNALS = (signal.SIGINT, signal.SIGTERM)
134
135
136 def recv_multi_impl(conns, maxsize, timeout):
137 """Reads from the first available pipe.
138
139 It will immediately return on a closed connection, independent of timeout.
140
141 Arguments:
142 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
143 - timeout: If None, it is blocking. If 0 or above, will return None if no
144 data is available within |timeout| seconds.
145
146 Returns:
147 tuple(int(index), str(data), bool(closed)).
148 """
149 assert conns
150 assert timeout is None or isinstance(timeout, (int, float)), timeout
151 maxsize = max(maxsize or MAX_SIZE, 1)
152
153 # select(timeout=0) will block, it has to be a value > 0.
154 if timeout == 0:
155 timeout = 0.001
156 try:
157 r, _, _ = select.select(conns, [], [], timeout)
158 except select.error:
159 r = None
160 if not r:
161 return None, None, False
162
163 conn = r[0]
164 # Temporarily make it non-blocking.
165 # TODO(maruel): This is not very efficient when the caller is doing this in
166 # a loop. Add a mechanism to have the caller handle this.
167 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
168 if not conn.closed:
169 # pylint: disable=E1101
170 fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
171 try:
172 try:
173 data = conn.read(maxsize)
174 except IOError as e:
175 # On posix, this means the read would block.
176 if e.errno == errno.EAGAIN:
177 return conns.index(conn), None, False
178 raise e
179
180 if not data:
181 # On posix, this means the channel closed.
182 return conns.index(conn), None, True
183
184 return conns.index(conn), data, False
185 finally:
186 if not conn.closed:
187 fcntl.fcntl(conn, fcntl.F_SETFL, flags)
188
189
190 class TimeoutExpired(Exception):
191 """Compatible with python3 subprocess."""
192 def __init__(self, cmd, timeout, output=None, stderr=None):
193 self.cmd = cmd
194 self.timeout = timeout
195 self.output = output
196 # Non-standard:
197 self.stderr = stderr
198 super(TimeoutExpired, self).__init__(str(self))
199
200 def __str__(self):
201 return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout)
202
203
204 class Popen(subprocess.Popen):
205 """Adds timeout support on stdout and stderr.
206
207 Inspired by
208 http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subpro cess-use-on-win/
209
210 Unlike subprocess, yield_any(), recv_*(), communicate() will close stdout and
211 stderr once the child process closes them, after all the data is read.
212
213 Arguments:
214 - detached: If True, the process is created in a new process group. On
215 Windows, use CREATE_NEW_PROCESS_GROUP. On posix, use os.setpgid(0, 0).
216
217 Additional members:
218 - start: timestamp when this process started.
219 - end: timestamp when this process exited, as seen by this process.
220 - detached: If True, the child process was started as a detached process.
221 - gid: process group id, if any.
222 - duration: time in seconds the process lasted.
223
224 Additional methods:
225 - yield_any(): yields output until the process terminates.
226 - recv_any(): reads from stdout and/or stderr with optional timeout.
227 - recv_out() & recv_err(): specialized version of recv_any().
228 """
229 # subprocess.Popen.__init__() is not threadsafe; there is a race between
230 # creating the exec-error pipe for the child and setting it to CLOEXEC during
231 # which another thread can fork and cause the pipe to be inherited by its
232 # descendents, which will cause the current Popen to hang until all those
233 # descendents exit. Protect this with a lock so that only one fork/exec can
234 # happen at a time.
235 popen_lock = threading.Lock()
236
237 def __init__(self, args, **kwargs):
238 assert 'creationflags' not in kwargs
239 assert 'preexec_fn' not in kwargs, 'Use detached=True instead'
240 self.start = time.time()
241 self.end = None
242 self.gid = None
243 self.detached = kwargs.pop('detached', False)
244 if self.detached:
245 if subprocess.mswindows:
246 kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
247 else:
248 kwargs['preexec_fn'] = lambda: os.setpgid(0, 0)
249 with self.popen_lock:
250 super(Popen, self).__init__(args, **kwargs)
251 self.args = args
252 if self.detached and not subprocess.mswindows:
253 try:
254 self.gid = os.getpgid(self.pid)
255 except OSError:
256 # sometimes the process can run+finish before we collect its pgid. fun.
257 pass
258
259 def duration(self):
260 """Duration of the child process.
261
262 It is greater or equal to the actual time the child process ran. It can be
263 significantly higher than the real value if neither .wait() nor .poll() was
264 used.
265 """
266 return (self.end or time.time()) - self.start
267
268 # pylint: disable=arguments-differ,redefined-builtin
269 def communicate(self, input=None, timeout=None):
270 """Implements python3's timeout support.
271
272 Unlike wait(), timeout=0 is considered the same as None.
273
274 Raises:
275 - TimeoutExpired when more than timeout seconds were spent waiting for the
276 process.
277 """
278 if not timeout:
279 return super(Popen, self).communicate(input=input)
280
281 assert isinstance(timeout, (int, float)), timeout
282
283 if self.stdin or self.stdout or self.stderr:
284 stdout = '' if self.stdout else None
285 stderr = '' if self.stderr else None
286 t = None
287 if input is not None:
288 assert self.stdin, (
289 'Can\'t use communicate(input) if not using '
290 'Popen(stdin=subprocess42.PIPE')
291 # TODO(maruel): Switch back to non-threading.
292 def write():
293 try:
294 self.stdin.write(input)
295 except IOError:
296 pass
297 t = threading.Thread(name='Popen.communicate', target=write)
298 t.daemon = True
299 t.start()
300
301 try:
302 if self.stdout or self.stderr:
303 start = time.time()
304 end = start + timeout
305 def remaining():
306 return max(end - time.time(), 0)
307 for pipe, data in self.yield_any(timeout=remaining):
308 if pipe is None:
309 raise TimeoutExpired(self.args, timeout, stdout, stderr)
310 assert pipe in ('stdout', 'stderr'), pipe
311 if pipe == 'stdout':
312 stdout += data
313 else:
314 stderr += data
315 else:
316 # Only stdin is piped.
317 self.wait(timeout=timeout)
318 finally:
319 if t:
320 try:
321 self.stdin.close()
322 except IOError:
323 pass
324 t.join()
325 else:
326 # No pipe. The user wanted to use wait().
327 self.wait(timeout=timeout)
328 return None, None
329
330 # Indirectly initialize self.end.
331 self.wait()
332 return stdout, stderr
333
334 def wait(self, timeout=None): # pylint: disable=arguments-differ
335 """Implements python3's timeout support.
336
337 Raises:
338 - TimeoutExpired when more than timeout seconds were spent waiting for the
339 process.
340 """
341 assert timeout is None or isinstance(timeout, (int, float)), timeout
342 if timeout is None:
343 super(Popen, self).wait()
344 elif self.returncode is None:
345 if subprocess.mswindows:
346 WAIT_TIMEOUT = 258
347 result = subprocess._subprocess.WaitForSingleObject(
348 self._handle, int(timeout * 1000))
349 if result == WAIT_TIMEOUT:
350 raise TimeoutExpired(self.args, timeout)
351 self.returncode = subprocess._subprocess.GetExitCodeProcess(
352 self._handle)
353 else:
354 # If you think the following code is horrible, it's because it is
355 # inspired by python3's stdlib.
356 end = time.time() + timeout
357 delay = 0.001
358 while True:
359 try:
360 pid, sts = subprocess._eintr_retry_call(
361 os.waitpid, self.pid, os.WNOHANG)
362 except OSError as e:
363 if e.errno != errno.ECHILD:
364 raise
365 pid = self.pid
366 sts = 0
367 if pid == self.pid:
368 # This sets self.returncode.
369 self._handle_exitstatus(sts)
370 break
371 remaining = end - time.time()
372 if remaining <= 0:
373 raise TimeoutExpired(self.args, timeout)
374 delay = min(delay * 2, remaining, .05)
375 time.sleep(delay)
376
377 if not self.end:
378 # communicate() uses wait() internally.
379 self.end = time.time()
380 return self.returncode
381
382 def poll(self):
383 ret = super(Popen, self).poll()
384 if ret is not None and not self.end:
385 self.end = time.time()
386 return ret
387
388 def yield_any(self, maxsize=None, timeout=None):
389 """Yields output until the process terminates.
390
391 Unlike wait(), does not raise TimeoutExpired.
392
393 Yields:
394 (pipename, data) where pipename is either 'stdout', 'stderr' or None in
395 case of timeout or when the child process closed one of the pipe(s) and
396 all pending data on the pipe was read.
397
398 Arguments:
399 - maxsize: See recv_any(). Can be a callable function.
400 - timeout: If None, the call is blocking. If set, yields None, None if no
401 data is available within |timeout| seconds. It resets itself after
402 each yield. Can be a callable function.
403 """
404 assert self.stdout or self.stderr
405 if timeout is not None:
406 # timeout=0 effectively means that the pipe is continuously polled.
407 if isinstance(timeout, (int, float)):
408 assert timeout >= 0, timeout
409 old_timeout = timeout
410 timeout = lambda: old_timeout
411 else:
412 assert callable(timeout), timeout
413
414 if maxsize is not None and not callable(maxsize):
415 assert isinstance(maxsize, (int, float)), maxsize
416
417 last_yield = time.time()
418 while self.poll() is None:
419 to = (None if timeout is None
420 else max(timeout() - (time.time() - last_yield), 0))
421 t, data = self.recv_any(
422 maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to)
423 if data or to is 0:
424 yield t, data
425 last_yield = time.time()
426
427 # Read all remaining output in the pipes.
428 # There is 3 cases:
429 # - pipes get closed automatically by the calling process before it exits
430 # - pipes are closed automated by the OS
431 # - pipes are kept open due to grand-children processes outliving the
432 # children process.
433 while True:
434 ms = maxsize
435 if callable(maxsize):
436 ms = maxsize()
437 # timeout=0 is mainly to handle the case where a grand-children process
438 # outlives the process started.
439 t, data = self.recv_any(maxsize=ms, timeout=0)
440 if not data:
441 break
442 yield t, data
443
444 def recv_any(self, maxsize=None, timeout=None):
445 """Reads from the first pipe available from stdout and stderr.
446
447 Unlike wait(), it does not throw TimeoutExpired.
448
449 Arguments:
450 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
451 - timeout: If None, it is blocking. If 0 or above, will return None if no
452 data is available within |timeout| seconds.
453
454 Returns:
455 tuple(pipename or None, str(data)). pipename is one of 'stdout' or
456 'stderr'.
457 """
458 # recv_multi_impl will early exit on a closed connection. Loop accordingly
459 # to simplify call sites.
460 while True:
461 pipes = [
462 x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
463 ]
464 # If both stdout and stderr have the exact file handle, they are
465 # effectively the same pipe. Deduplicate it since otherwise it confuses
466 # recv_multi_impl().
467 if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
468 pipes.pop(0)
469
470 if not pipes:
471 return None, None
472 start = time.time()
473 conns, names = zip(*pipes)
474 index, data, closed = recv_multi_impl(conns, maxsize, timeout)
475 if index is None:
476 return index, data
477 if closed:
478 self._close(names[index])
479 if not data:
480 # Loop again. The other pipe may still be open.
481 if timeout:
482 timeout -= (time.time() - start)
483 continue
484
485 if self.universal_newlines and data:
486 data = self._translate_newlines(data)
487 return names[index], data
488
489 def recv_out(self, maxsize=None, timeout=None):
490 """Reads from stdout synchronously with timeout."""
491 return self._recv('stdout', maxsize, timeout)
492
493 def recv_err(self, maxsize=None, timeout=None):
494 """Reads from stderr synchronously with timeout."""
495 return self._recv('stderr', maxsize, timeout)
496
497 def terminate(self):
498 """Tries to do something saner on Windows that the stdlib.
499
500 Windows:
501 self.detached/CREATE_NEW_PROCESS_GROUP determines what can be used:
502 - If set, only SIGBREAK can be sent and it is sent to a single process.
503 - If not set, in theory only SIGINT can be used and *all processes* in
504 the processgroup receive it. In practice, we just kill the process.
505 See http://msdn.microsoft.com/library/windows/desktop/ms683155.aspx
506 The default on Windows is to call TerminateProcess() always, which is not
507 useful.
508
509 On Posix, always send SIGTERM.
510 """
511 try:
512 if subprocess.mswindows and self.detached:
513 return self.send_signal(signal.CTRL_BREAK_EVENT)
514 super(Popen, self).terminate()
515 except OSError:
516 # The function will throw if the process terminated in-between. Swallow
517 # this.
518 pass
519
520 def kill(self):
521 """Kills the process and its children if possible.
522
523 Swallows exceptions and return True on success.
524 """
525 if self.gid:
526 try:
527 os.killpg(self.gid, signal.SIGKILL)
528 except OSError:
529 return False
530 else:
531 try:
532 super(Popen, self).kill()
533 except OSError:
534 return False
535 return True
536
537 def _close(self, which):
538 """Closes either stdout or stderr."""
539 getattr(self, which).close()
540 setattr(self, which, None)
541
542 def _recv(self, which, maxsize, timeout):
543 """Reads from one of stdout or stderr synchronously with timeout."""
544 conn = getattr(self, which)
545 if conn is None:
546 return None
547 _, data, closed = recv_multi_impl([conn], maxsize, timeout)
548 if closed:
549 self._close(which)
550 if self.universal_newlines and data:
551 data = self._translate_newlines(data)
552 return data
553
554
555 @contextlib.contextmanager
556 def set_signal_handler(signals, handler):
557 """Temporarilly override signals handler.
558
559 Useful when waiting for a child process to handle signals like SIGTERM, so the
560 signal can be propagated to the child process.
561 """
562 previous = {s: signal.signal(s, handler) for s in signals}
563 try:
564 yield
565 finally:
566 for sig, h in previous.iteritems():
567 signal.signal(sig, h)
568
569
570 def call(*args, **kwargs):
571 """Adds support for timeout."""
572 timeout = kwargs.pop('timeout', None)
573 return Popen(*args, **kwargs).wait(timeout)
574
575
576 def check_call(*args, **kwargs):
577 """Adds support for timeout."""
578 retcode = call(*args, **kwargs)
579 if retcode:
580 raise CalledProcessError(retcode, kwargs.get('args') or args[0])
581 return 0
582
583
584 def check_output(*args, **kwargs):
585 """Adds support for timeout."""
586 timeout = kwargs.pop('timeout', None)
587 if 'stdout' in kwargs:
588 raise ValueError('stdout argument not allowed, it will be overridden.')
589 process = Popen(stdout=PIPE, *args, **kwargs)
590 output, _ = process.communicate(timeout=timeout)
591 retcode = process.poll()
592 if retcode:
593 raise CalledProcessError(retcode, kwargs.get('args') or args[0], output)
594 return output
595
596
597 def call_with_timeout(args, timeout, **kwargs):
598 """Runs an executable; kill it in case of timeout."""
599 proc = Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
600 try:
601 out, err = proc.communicate(timeout=timeout)
602 except TimeoutExpired as e:
603 out = e.output
604 err = e.stderr
605 proc.kill()
606 proc.wait()
607 return out, err, proc.returncode, proc.duration()
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698