Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(689)

Side by Side Diff: recipe_engine/third_party/subprocess42.py

Issue 1959563002: Use subprocess42 in recipe_engine to avoid threading madness. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/recipes-py@master
Patch Set: *sigh* presubmit Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « recipe_engine/step_runner.py ('k') | recipe_engine/unittests/run_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2013 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed by the Apache v2.0 license that can be
3 # found in the LICENSE file.
4
5 # This is a direct vendored copy of the original which is located at:
6 # https://github.com/luci/luci-py/blob/master/client/utils/subprocess42.py
7
8 """subprocess42 is the answer to life the universe and everything.
9
10 It has the particularity of having a Popen implementation that can yield output
11 as it is produced while implementing a timeout and NOT requiring the use of
12 worker threads.
13
14 Example:
15 Wait for a child process with a timeout, send SIGTERM, wait a grace period
16 then send SIGKILL:
17
18 def wait_terminate_then_kill(proc, timeout, grace):
19 try:
20 return proc.wait(timeout)
21 except subprocess42.TimeoutExpired:
22 proc.terminate()
23 try:
24 return proc.wait(grace)
25 except subprocess42.TimeoutExpired:
26 proc.kill()
27 return proc.wait()
28
29
30 TODO(maruel): Add VOID support like subprocess2.
31 """
32
33 import contextlib
34 import errno
35 import os
36 import signal
37 import threading
38 import time
39
40 import subprocess
41
42 from subprocess import CalledProcessError, PIPE, STDOUT # pylint: disable=W0611
43 from subprocess import list2cmdline
44
45
46 # Default maxsize argument.
47 MAX_SIZE = 16384
48
49
50 if subprocess.mswindows:
51 import msvcrt # pylint: disable=F0401
52 from ctypes import wintypes
53 from ctypes import windll
54
55
56 # Which to be received depends on how this process was called and outside the
57 # control of this script. See Popen docstring for more details.
58 STOP_SIGNALS = (signal.SIGBREAK, signal.SIGTERM)
59
60
61 def ReadFile(handle, desired_bytes):
62 """Calls kernel32.ReadFile()."""
63 c_read = wintypes.DWORD()
64 buff = wintypes.create_string_buffer(desired_bytes+1)
65 windll.kernel32.ReadFile(
66 handle, buff, desired_bytes, wintypes.byref(c_read), None)
67 # NULL terminate it.
68 buff[c_read.value] = '\x00'
69 return wintypes.GetLastError(), buff.value
70
71 def PeekNamedPipe(handle):
72 """Calls kernel32.PeekNamedPipe(). Simplified version."""
73 c_avail = wintypes.DWORD()
74 c_message = wintypes.DWORD()
75 success = windll.kernel32.PeekNamedPipe(
76 handle, None, 0, None, wintypes.byref(c_avail),
77 wintypes.byref(c_message))
78 if not success:
79 raise OSError(wintypes.GetLastError())
80 return c_avail.value
81
82 def recv_multi_impl(conns, maxsize, timeout):
83 """Reads from the first available pipe.
84
85 It will immediately return on a closed connection, independent of timeout.
86
87 Arguments:
88 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
89 - timeout: If None, it is blocking. If 0 or above, will return None if no
90 data is available within |timeout| seconds.
91
92 Returns:
93 tuple(int(index), str(data), bool(closed)).
94 """
95 assert conns
96 assert timeout is None or isinstance(timeout, (int, float)), timeout
97 maxsize = max(maxsize or MAX_SIZE, 1)
98
99 # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
100 # for proc.stdout and proc.stderr but they are implemented as named pipes on
101 # Windows. Since named pipes are not waitable object, they can't be passed
102 # as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile()
103 # and finally WFMO(). This requires caching the events handles in the Popen
104 # object and remembering the pending ReadFile() calls. This will require
105 # some re-architecture to store the relevant event handle and OVERLAPPEDIO
106 # object in Popen or the file object.
107 start = time.time()
108 handles = [
109 (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
110 ]
111 while True:
112 for index, handle in handles:
113 try:
114 avail = min(PeekNamedPipe(handle), maxsize)
115 if avail:
116 return index, ReadFile(handle, avail)[1], False
117 except OSError:
118 # The pipe closed.
119 return index, None, True
120
121 if timeout is not None and (time.time() - start) >= timeout:
122 return None, None, False
123 # Polling rocks.
124 time.sleep(0.001)
125
126 else:
127 import fcntl # pylint: disable=F0401
128 import select
129
130
131 # Signals that mean this process should exit quickly.
132 STOP_SIGNALS = (signal.SIGINT, signal.SIGTERM)
133
134
135 def recv_multi_impl(conns, maxsize, timeout):
136 """Reads from the first available pipe.
137
138 It will immediately return on a closed connection, independent of timeout.
139
140 Arguments:
141 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
142 - timeout: If None, it is blocking. If 0 or above, will return None if no
143 data is available within |timeout| seconds.
144
145 Returns:
146 tuple(int(index), str(data), bool(closed)).
147 """
148 assert conns
149 assert timeout is None or isinstance(timeout, (int, float)), timeout
150 maxsize = max(maxsize or MAX_SIZE, 1)
151
152 # select(timeout=0) will block, it has to be a value > 0.
153 if timeout == 0:
154 timeout = 0.001
155 try:
156 r, _, _ = select.select(conns, [], [], timeout)
157 except select.error:
158 r = None
159 if not r:
160 return None, None, False
161
162 conn = r[0]
163 # Temporarily make it non-blocking.
164 # TODO(maruel): This is not very efficient when the caller is doing this in
165 # a loop. Add a mechanism to have the caller handle this.
166 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
167 if not conn.closed:
168 # pylint: disable=E1101
169 fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
170 try:
171 data = conn.read(maxsize)
172 if not data:
173 # On posix, this means the channel closed.
174 return conns.index(conn), None, True
175 return conns.index(conn), data, False
176 finally:
177 if not conn.closed:
178 fcntl.fcntl(conn, fcntl.F_SETFL, flags)
179
180
181 class TimeoutExpired(Exception):
182 """Compatible with python3 subprocess."""
183 def __init__(self, cmd, timeout, output=None, stderr=None):
184 self.cmd = cmd
185 self.timeout = timeout
186 self.output = output
187 # Non-standard:
188 self.stderr = stderr
189 super(TimeoutExpired, self).__init__(str(self))
190
191 def __str__(self):
192 return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout)
193
194
195 class Popen(subprocess.Popen):
196 """Adds timeout support on stdout and stderr.
197
198 Inspired by
199 http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subpro cess-use-on-win/
200
201 Unlike subprocess, yield_any(), recv_*(), communicate() will close stdout and
202 stderr once the child process closes them, after all the data is read.
203
204 Arguments:
205 - detached: If True, the process is created in a new process group. On
206 Windows, use CREATE_NEW_PROCESS_GROUP. On posix, use os.setpgid(0, 0).
207
208 Additional members:
209 - start: timestamp when this process started.
210 - end: timestamp when this process exited, as seen by this process.
211 - detached: If True, the child process was started as a detached process.
212 - gid: process group id, if any.
213 - duration: time in seconds the process lasted.
214
215 Additional methods:
216 - yield_any(): yields output until the process terminates.
217 - recv_any(): reads from stdout and/or stderr with optional timeout.
218 - recv_out() & recv_err(): specialized version of recv_any().
219 """
220 # subprocess.Popen.__init__() is not threadsafe; there is a race between
221 # creating the exec-error pipe for the child and setting it to CLOEXEC during
222 # which another thread can fork and cause the pipe to be inherited by its
223 # descendents, which will cause the current Popen to hang until all those
224 # descendents exit. Protect this with a lock so that only one fork/exec can
225 # happen at a time.
226 popen_lock = threading.Lock()
227
228 def __init__(self, args, **kwargs):
229 assert 'creationflags' not in kwargs
230 assert 'preexec_fn' not in kwargs, 'Use detached=True instead'
231 self.start = time.time()
232 self.end = None
233 self.gid = None
234 self.detached = kwargs.pop('detached', False)
235 if self.detached:
236 if subprocess.mswindows:
237 kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
238 else:
239 kwargs['preexec_fn'] = lambda: os.setpgid(0, 0)
240 with self.popen_lock:
241 super(Popen, self).__init__(args, **kwargs)
242 self.args = args
243 if self.detached and not subprocess.mswindows:
244 try:
245 self.gid = os.getpgid(self.pid)
246 except OSError:
247 # sometimes the process can run+finish before we collect its pgid. fun.
248 pass
249
250 def duration(self):
251 """Duration of the child process.
252
253 It is greater or equal to the actual time the child process ran. It can be
254 significantly higher than the real value if neither .wait() nor .poll() was
255 used.
256 """
257 return (self.end or time.time()) - self.start
258
259 # pylint: disable=arguments-differ,redefined-builtin
260 def communicate(self, input=None, timeout=None):
261 """Implements python3's timeout support.
262
263 Unlike wait(), timeout=0 is considered the same as None.
264
265 Raises:
266 - TimeoutExpired when more than timeout seconds were spent waiting for the
267 process.
268 """
269 if not timeout:
270 return super(Popen, self).communicate(input=input)
271
272 assert isinstance(timeout, (int, float)), timeout
273
274 if self.stdin or self.stdout or self.stderr:
275 stdout = '' if self.stdout else None
276 stderr = '' if self.stderr else None
277 t = None
278 if input is not None:
279 assert self.stdin, (
280 'Can\'t use communicate(input) if not using '
281 'Popen(stdin=subprocess42.PIPE')
282 # TODO(maruel): Switch back to non-threading.
283 def write():
284 try:
285 self.stdin.write(input)
286 except IOError:
287 pass
288 t = threading.Thread(name='Popen.communicate', target=write)
289 t.daemon = True
290 t.start()
291
292 try:
293 if self.stdout or self.stderr:
294 start = time.time()
295 end = start + timeout
296 def remaining():
297 return max(end - time.time(), 0)
298 for pipe, data in self.yield_any(timeout=remaining):
299 if pipe is None:
300 raise TimeoutExpired(self.args, timeout, stdout, stderr)
301 assert pipe in ('stdout', 'stderr'), pipe
302 if pipe == 'stdout':
303 stdout += data
304 else:
305 stderr += data
306 else:
307 # Only stdin is piped.
308 self.wait(timeout=timeout)
309 finally:
310 if t:
311 try:
312 self.stdin.close()
313 except IOError:
314 pass
315 t.join()
316 else:
317 # No pipe. The user wanted to use wait().
318 self.wait(timeout=timeout)
319 return None, None
320
321 # Indirectly initialize self.end.
322 self.wait()
323 return stdout, stderr
324
325 def wait(self, timeout=None): # pylint: disable=arguments-differ
326 """Implements python3's timeout support.
327
328 Raises:
329 - TimeoutExpired when more than timeout seconds were spent waiting for the
330 process.
331 """
332 assert timeout is None or isinstance(timeout, (int, float)), timeout
333 if timeout is None:
334 super(Popen, self).wait()
335 elif self.returncode is None:
336 if subprocess.mswindows:
337 WAIT_TIMEOUT = 258
338 result = subprocess._subprocess.WaitForSingleObject(
339 self._handle, int(timeout * 1000))
340 if result == WAIT_TIMEOUT:
341 raise TimeoutExpired(self.args, timeout)
342 self.returncode = subprocess._subprocess.GetExitCodeProcess(
343 self._handle)
344 else:
345 # If you think the following code is horrible, it's because it is
346 # inspired by python3's stdlib.
347 end = time.time() + timeout
348 delay = 0.001
349 while True:
350 try:
351 pid, sts = subprocess._eintr_retry_call(
352 os.waitpid, self.pid, os.WNOHANG)
353 except OSError as e:
354 if e.errno != errno.ECHILD:
355 raise
356 pid = self.pid
357 sts = 0
358 if pid == self.pid:
359 # This sets self.returncode.
360 self._handle_exitstatus(sts)
361 break
362 remaining = end - time.time()
363 if remaining <= 0:
364 raise TimeoutExpired(self.args, timeout)
365 delay = min(delay * 2, remaining, .05)
366 time.sleep(delay)
367
368 if not self.end:
369 # communicate() uses wait() internally.
370 self.end = time.time()
371 return self.returncode
372
373 def poll(self):
374 ret = super(Popen, self).poll()
375 if ret is not None and not self.end:
376 self.end = time.time()
377 return ret
378
379 def yield_any(self, maxsize=None, timeout=None):
380 """Yields output until the process terminates.
381
382 Unlike wait(), does not raise TimeoutExpired.
383
384 Yields:
385 (pipename, data) where pipename is either 'stdout', 'stderr' or None in
386 case of timeout or when the child process closed one of the pipe(s) and
387 all pending data on the pipe was read.
388
389 Arguments:
390 - maxsize: See recv_any(). Can be a callable function.
391 - timeout: If None, the call is blocking. If set, yields None, None if no
392 data is available within |timeout| seconds. It resets itself after
393 each yield. Can be a callable function.
394 """
395 assert self.stdout or self.stderr
396 if timeout is not None:
397 # timeout=0 effectively means that the pipe is continuously polled.
398 if isinstance(timeout, (int, float)):
399 assert timeout >= 0, timeout
400 old_timeout = timeout
401 timeout = lambda: old_timeout
402 else:
403 assert callable(timeout), timeout
404
405 if maxsize is not None and not callable(maxsize):
406 assert isinstance(maxsize, (int, float)), maxsize
407
408 last_yield = time.time()
409 while self.poll() is None:
410 to = (None if timeout is None
411 else max(timeout() - (time.time() - last_yield), 0))
412 t, data = self.recv_any(
413 maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to)
414 if data or to is 0:
415 yield t, data
416 last_yield = time.time()
417
418 # Read all remaining output in the pipes.
419 # There is 3 cases:
420 # - pipes get closed automatically by the calling process before it exits
421 # - pipes are closed automated by the OS
422 # - pipes are kept open due to grand-children processes outliving the
423 # children process.
424 while True:
425 ms = maxsize
426 if callable(maxsize):
427 ms = maxsize()
428 # timeout=0 is mainly to handle the case where a grand-children process
429 # outlives the process started.
430 t, data = self.recv_any(maxsize=ms, timeout=0)
431 if not data:
432 break
433 yield t, data
434
435 def recv_any(self, maxsize=None, timeout=None):
436 """Reads from the first pipe available from stdout and stderr.
437
438 Unlike wait(), it does not throw TimeoutExpired.
439
440 Arguments:
441 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
442 - timeout: If None, it is blocking. If 0 or above, will return None if no
443 data is available within |timeout| seconds.
444
445 Returns:
446 tuple(pipename or None, str(data)). pipename is one of 'stdout' or
447 'stderr'.
448 """
449 # recv_multi_impl will early exit on a closed connection. Loop accordingly
450 # to simplify call sites.
451 while True:
452 pipes = [
453 x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
454 ]
455 # If both stdout and stderr have the exact file handle, they are
456 # effectively the same pipe. Deduplicate it since otherwise it confuses
457 # recv_multi_impl().
458 if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
459 pipes.pop(0)
460
461 if not pipes:
462 return None, None
463 start = time.time()
464 conns, names = zip(*pipes)
465 index, data, closed = recv_multi_impl(conns, maxsize, timeout)
466 if index is None:
467 return index, data
468 if closed:
469 self._close(names[index])
470 if not data:
471 # Loop again. The other pipe may still be open.
472 if timeout:
473 timeout -= (time.time() - start)
474 continue
475
476 if self.universal_newlines:
477 data = self._translate_newlines(data)
478 return names[index], data
479
480 def recv_out(self, maxsize=None, timeout=None):
481 """Reads from stdout synchronously with timeout."""
482 return self._recv('stdout', maxsize, timeout)
483
484 def recv_err(self, maxsize=None, timeout=None):
485 """Reads from stderr synchronously with timeout."""
486 return self._recv('stderr', maxsize, timeout)
487
488 def terminate(self):
489 """Tries to do something saner on Windows that the stdlib.
490
491 Windows:
492 self.detached/CREATE_NEW_PROCESS_GROUP determines what can be used:
493 - If set, only SIGBREAK can be sent and it is sent to a single process.
494 - If not set, in theory only SIGINT can be used and *all processes* in
495 the processgroup receive it. In practice, we just kill the process.
496 See http://msdn.microsoft.com/library/windows/desktop/ms683155.aspx
497 The default on Windows is to call TerminateProcess() always, which is not
498 useful.
499
500 On Posix, always send SIGTERM.
501 """
502 try:
503 if subprocess.mswindows and self.detached:
504 return self.send_signal(signal.CTRL_BREAK_EVENT)
505 super(Popen, self).terminate()
506 except OSError:
507 # The function will throw if the process terminated in-between. Swallow
508 # this.
509 pass
510
511 def kill(self):
512 """Kills the process and its children if possible.
513
514 Swallows exceptions and return True on success.
515 """
516 if self.gid:
517 try:
518 os.killpg(self.gid, signal.SIGKILL)
519 except OSError:
520 return False
521 else:
522 try:
523 super(Popen, self).kill()
524 except OSError:
525 return False
526 return True
527
528 def _close(self, which):
529 """Closes either stdout or stderr."""
530 getattr(self, which).close()
531 setattr(self, which, None)
532
533 def _recv(self, which, maxsize, timeout):
534 """Reads from one of stdout or stderr synchronously with timeout."""
535 conn = getattr(self, which)
536 if conn is None:
537 return None
538 _, data, closed = recv_multi_impl([conn], maxsize, timeout)
539 if closed:
540 self._close(which)
541 if self.universal_newlines and data:
542 data = self._translate_newlines(data)
543 return data
544
545
546 @contextlib.contextmanager
547 def set_signal_handler(signals, handler):
548 """Temporarilly override signals handler.
549
550 Useful when waiting for a child process to handle signals like SIGTERM, so the
551 signal can be propagated to the child process.
552 """
553 previous = {s: signal.signal(s, handler) for s in signals}
554 try:
555 yield
556 finally:
557 for sig, h in previous.iteritems():
558 signal.signal(sig, h)
559
560
561 def call(*args, **kwargs):
562 """Adds support for timeout."""
563 timeout = kwargs.pop('timeout', None)
564 return Popen(*args, **kwargs).wait(timeout)
565
566
567 def check_call(*args, **kwargs):
568 """Adds support for timeout."""
569 retcode = call(*args, **kwargs)
570 if retcode:
571 raise CalledProcessError(retcode, kwargs.get('args') or args[0])
572 return 0
573
574
575 def check_output(*args, **kwargs):
576 """Adds support for timeout."""
577 timeout = kwargs.pop('timeout', None)
578 if 'stdout' in kwargs:
579 raise ValueError('stdout argument not allowed, it will be overridden.')
580 process = Popen(stdout=PIPE, *args, **kwargs)
581 output, _ = process.communicate(timeout=timeout)
582 retcode = process.poll()
583 if retcode:
584 raise CalledProcessError(retcode, kwargs.get('args') or args[0], output)
585 return output
586
587
588 def call_with_timeout(args, timeout, **kwargs):
589 """Runs an executable; kill it in case of timeout."""
590 proc = Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
591 try:
592 out, err = proc.communicate(timeout=timeout)
593 except TimeoutExpired as e:
594 out = e.output
595 err = e.stderr
596 proc.kill()
597 proc.wait()
598 return out, err, proc.returncode, proc.duration()
OLDNEW
« no previous file with comments | « recipe_engine/step_runner.py ('k') | recipe_engine/unittests/run_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698