OLD | NEW |
| (Empty) |
1 # Copyright 2013 The LUCI Authors. All rights reserved. | |
2 # Use of this source code is governed by the Apache v2.0 license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 # This is a direct vendored copy of the original which is located at: | |
6 # https://github.com/luci/luci-py/blob/master/client/utils/subprocess42.py | |
7 # at commit 45eab10a17a55f1978f3bfa02c35457dee1c64e4. | |
8 | |
9 """subprocess42 is the answer to life the universe and everything. | |
10 | |
11 It has the particularity of having a Popen implementation that can yield output | |
12 as it is produced while implementing a timeout and NOT requiring the use of | |
13 worker threads. | |
14 | |
15 Example: | |
16 Wait for a child process with a timeout, send SIGTERM, wait a grace period | |
17 then send SIGKILL: | |
18 | |
19 def wait_terminate_then_kill(proc, timeout, grace): | |
20 try: | |
21 return proc.wait(timeout) | |
22 except subprocess42.TimeoutExpired: | |
23 proc.terminate() | |
24 try: | |
25 return proc.wait(grace) | |
26 except subprocess42.TimeoutExpired: | |
27 proc.kill() | |
28 return proc.wait() | |
29 | |
30 | |
31 TODO(maruel): Add VOID support like subprocess2. | |
32 """ | |
33 | |
34 import contextlib | |
35 import errno | |
36 import os | |
37 import signal | |
38 import threading | |
39 import time | |
40 | |
41 import subprocess | |
42 | |
43 from subprocess import CalledProcessError, PIPE, STDOUT # pylint: disable=W0611 | |
44 from subprocess import list2cmdline | |
45 | |
46 | |
47 # Default maxsize argument. | |
48 MAX_SIZE = 16384 | |
49 | |
50 | |
51 if subprocess.mswindows: | |
52 import msvcrt # pylint: disable=F0401 | |
53 from ctypes import wintypes | |
54 from ctypes import windll | |
55 | |
56 | |
57 # Which to be received depends on how this process was called and outside the | |
58 # control of this script. See Popen docstring for more details. | |
59 STOP_SIGNALS = (signal.SIGBREAK, signal.SIGTERM) | |
60 | |
61 | |
62 def ReadFile(handle, desired_bytes): | |
63 """Calls kernel32.ReadFile().""" | |
64 c_read = wintypes.DWORD() | |
65 buff = wintypes.create_string_buffer(desired_bytes+1) | |
66 windll.kernel32.ReadFile( | |
67 handle, buff, desired_bytes, wintypes.byref(c_read), None) | |
68 # NULL terminate it. | |
69 buff[c_read.value] = '\x00' | |
70 return wintypes.GetLastError(), buff.value | |
71 | |
72 def PeekNamedPipe(handle): | |
73 """Calls kernel32.PeekNamedPipe(). Simplified version.""" | |
74 c_avail = wintypes.DWORD() | |
75 c_message = wintypes.DWORD() | |
76 success = windll.kernel32.PeekNamedPipe( | |
77 handle, None, 0, None, wintypes.byref(c_avail), | |
78 wintypes.byref(c_message)) | |
79 if not success: | |
80 raise OSError(wintypes.GetLastError()) | |
81 return c_avail.value | |
82 | |
83 def recv_multi_impl(conns, maxsize, timeout): | |
84 """Reads from the first available pipe. | |
85 | |
86 It will immediately return on a closed connection, independent of timeout. | |
87 | |
88 Arguments: | |
89 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. | |
90 - timeout: If None, it is blocking. If 0 or above, will return None if no | |
91 data is available within |timeout| seconds. | |
92 | |
93 Returns: | |
94 tuple(int(index), str(data), bool(closed)). | |
95 """ | |
96 assert conns | |
97 assert timeout is None or isinstance(timeout, (int, float)), timeout | |
98 maxsize = max(maxsize or MAX_SIZE, 1) | |
99 | |
100 # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes | |
101 # for proc.stdout and proc.stderr but they are implemented as named pipes on | |
102 # Windows. Since named pipes are not waitable object, they can't be passed | |
103 # as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile() | |
104 # and finally WFMO(). This requires caching the events handles in the Popen | |
105 # object and remembering the pending ReadFile() calls. This will require | |
106 # some re-architecture to store the relevant event handle and OVERLAPPEDIO | |
107 # object in Popen or the file object. | |
108 start = time.time() | |
109 handles = [ | |
110 (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns) | |
111 ] | |
112 while True: | |
113 for index, handle in handles: | |
114 try: | |
115 avail = min(PeekNamedPipe(handle), maxsize) | |
116 if avail: | |
117 return index, ReadFile(handle, avail)[1], False | |
118 except OSError: | |
119 # The pipe closed. | |
120 return index, None, True | |
121 | |
122 if timeout is not None and (time.time() - start) >= timeout: | |
123 return None, None, False | |
124 # Polling rocks. | |
125 time.sleep(0.001) | |
126 | |
127 else: | |
128 import fcntl # pylint: disable=F0401 | |
129 import select | |
130 | |
131 | |
132 # Signals that mean this process should exit quickly. | |
133 STOP_SIGNALS = (signal.SIGINT, signal.SIGTERM) | |
134 | |
135 | |
136 def recv_multi_impl(conns, maxsize, timeout): | |
137 """Reads from the first available pipe. | |
138 | |
139 It will immediately return on a closed connection, independent of timeout. | |
140 | |
141 Arguments: | |
142 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. | |
143 - timeout: If None, it is blocking. If 0 or above, will return None if no | |
144 data is available within |timeout| seconds. | |
145 | |
146 Returns: | |
147 tuple(int(index), str(data), bool(closed)). | |
148 """ | |
149 assert conns | |
150 assert timeout is None or isinstance(timeout, (int, float)), timeout | |
151 maxsize = max(maxsize or MAX_SIZE, 1) | |
152 | |
153 # select(timeout=0) will block, it has to be a value > 0. | |
154 if timeout == 0: | |
155 timeout = 0.001 | |
156 try: | |
157 r, _, _ = select.select(conns, [], [], timeout) | |
158 except select.error: | |
159 r = None | |
160 if not r: | |
161 return None, None, False | |
162 | |
163 conn = r[0] | |
164 # Temporarily make it non-blocking. | |
165 # TODO(maruel): This is not very efficient when the caller is doing this in | |
166 # a loop. Add a mechanism to have the caller handle this. | |
167 flags = fcntl.fcntl(conn, fcntl.F_GETFL) | |
168 if not conn.closed: | |
169 # pylint: disable=E1101 | |
170 fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK) | |
171 try: | |
172 try: | |
173 data = conn.read(maxsize) | |
174 except IOError as e: | |
175 # On posix, this means the read would block. | |
176 if e.errno == errno.EAGAIN: | |
177 return conns.index(conn), None, False | |
178 raise e | |
179 | |
180 if not data: | |
181 # On posix, this means the channel closed. | |
182 return conns.index(conn), None, True | |
183 | |
184 return conns.index(conn), data, False | |
185 finally: | |
186 if not conn.closed: | |
187 fcntl.fcntl(conn, fcntl.F_SETFL, flags) | |
188 | |
189 | |
190 class TimeoutExpired(Exception): | |
191 """Compatible with python3 subprocess.""" | |
192 def __init__(self, cmd, timeout, output=None, stderr=None): | |
193 self.cmd = cmd | |
194 self.timeout = timeout | |
195 self.output = output | |
196 # Non-standard: | |
197 self.stderr = stderr | |
198 super(TimeoutExpired, self).__init__(str(self)) | |
199 | |
200 def __str__(self): | |
201 return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout) | |
202 | |
203 | |
204 class Popen(subprocess.Popen): | |
205 """Adds timeout support on stdout and stderr. | |
206 | |
207 Inspired by | |
208 http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subpro
cess-use-on-win/ | |
209 | |
210 Unlike subprocess, yield_any(), recv_*(), communicate() will close stdout and | |
211 stderr once the child process closes them, after all the data is read. | |
212 | |
213 Arguments: | |
214 - detached: If True, the process is created in a new process group. On | |
215 Windows, use CREATE_NEW_PROCESS_GROUP. On posix, use os.setpgid(0, 0). | |
216 | |
217 Additional members: | |
218 - start: timestamp when this process started. | |
219 - end: timestamp when this process exited, as seen by this process. | |
220 - detached: If True, the child process was started as a detached process. | |
221 - gid: process group id, if any. | |
222 - duration: time in seconds the process lasted. | |
223 | |
224 Additional methods: | |
225 - yield_any(): yields output until the process terminates. | |
226 - recv_any(): reads from stdout and/or stderr with optional timeout. | |
227 - recv_out() & recv_err(): specialized version of recv_any(). | |
228 """ | |
229 # subprocess.Popen.__init__() is not threadsafe; there is a race between | |
230 # creating the exec-error pipe for the child and setting it to CLOEXEC during | |
231 # which another thread can fork and cause the pipe to be inherited by its | |
232 # descendents, which will cause the current Popen to hang until all those | |
233 # descendents exit. Protect this with a lock so that only one fork/exec can | |
234 # happen at a time. | |
235 popen_lock = threading.Lock() | |
236 | |
237 def __init__(self, args, **kwargs): | |
238 assert 'creationflags' not in kwargs | |
239 assert 'preexec_fn' not in kwargs, 'Use detached=True instead' | |
240 self.start = time.time() | |
241 self.end = None | |
242 self.gid = None | |
243 self.detached = kwargs.pop('detached', False) | |
244 if self.detached: | |
245 if subprocess.mswindows: | |
246 kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP | |
247 else: | |
248 kwargs['preexec_fn'] = lambda: os.setpgid(0, 0) | |
249 with self.popen_lock: | |
250 super(Popen, self).__init__(args, **kwargs) | |
251 self.args = args | |
252 if self.detached and not subprocess.mswindows: | |
253 try: | |
254 self.gid = os.getpgid(self.pid) | |
255 except OSError: | |
256 # sometimes the process can run+finish before we collect its pgid. fun. | |
257 pass | |
258 | |
259 def duration(self): | |
260 """Duration of the child process. | |
261 | |
262 It is greater or equal to the actual time the child process ran. It can be | |
263 significantly higher than the real value if neither .wait() nor .poll() was | |
264 used. | |
265 """ | |
266 return (self.end or time.time()) - self.start | |
267 | |
268 # pylint: disable=arguments-differ,redefined-builtin | |
269 def communicate(self, input=None, timeout=None): | |
270 """Implements python3's timeout support. | |
271 | |
272 Unlike wait(), timeout=0 is considered the same as None. | |
273 | |
274 Raises: | |
275 - TimeoutExpired when more than timeout seconds were spent waiting for the | |
276 process. | |
277 """ | |
278 if not timeout: | |
279 return super(Popen, self).communicate(input=input) | |
280 | |
281 assert isinstance(timeout, (int, float)), timeout | |
282 | |
283 if self.stdin or self.stdout or self.stderr: | |
284 stdout = '' if self.stdout else None | |
285 stderr = '' if self.stderr else None | |
286 t = None | |
287 if input is not None: | |
288 assert self.stdin, ( | |
289 'Can\'t use communicate(input) if not using ' | |
290 'Popen(stdin=subprocess42.PIPE') | |
291 # TODO(maruel): Switch back to non-threading. | |
292 def write(): | |
293 try: | |
294 self.stdin.write(input) | |
295 except IOError: | |
296 pass | |
297 t = threading.Thread(name='Popen.communicate', target=write) | |
298 t.daemon = True | |
299 t.start() | |
300 | |
301 try: | |
302 if self.stdout or self.stderr: | |
303 start = time.time() | |
304 end = start + timeout | |
305 def remaining(): | |
306 return max(end - time.time(), 0) | |
307 for pipe, data in self.yield_any(timeout=remaining): | |
308 if pipe is None: | |
309 raise TimeoutExpired(self.args, timeout, stdout, stderr) | |
310 assert pipe in ('stdout', 'stderr'), pipe | |
311 if pipe == 'stdout': | |
312 stdout += data | |
313 else: | |
314 stderr += data | |
315 else: | |
316 # Only stdin is piped. | |
317 self.wait(timeout=timeout) | |
318 finally: | |
319 if t: | |
320 try: | |
321 self.stdin.close() | |
322 except IOError: | |
323 pass | |
324 t.join() | |
325 else: | |
326 # No pipe. The user wanted to use wait(). | |
327 self.wait(timeout=timeout) | |
328 return None, None | |
329 | |
330 # Indirectly initialize self.end. | |
331 self.wait() | |
332 return stdout, stderr | |
333 | |
334 def wait(self, timeout=None): # pylint: disable=arguments-differ | |
335 """Implements python3's timeout support. | |
336 | |
337 Raises: | |
338 - TimeoutExpired when more than timeout seconds were spent waiting for the | |
339 process. | |
340 """ | |
341 assert timeout is None or isinstance(timeout, (int, float)), timeout | |
342 if timeout is None: | |
343 super(Popen, self).wait() | |
344 elif self.returncode is None: | |
345 if subprocess.mswindows: | |
346 WAIT_TIMEOUT = 258 | |
347 result = subprocess._subprocess.WaitForSingleObject( | |
348 self._handle, int(timeout * 1000)) | |
349 if result == WAIT_TIMEOUT: | |
350 raise TimeoutExpired(self.args, timeout) | |
351 self.returncode = subprocess._subprocess.GetExitCodeProcess( | |
352 self._handle) | |
353 else: | |
354 # If you think the following code is horrible, it's because it is | |
355 # inspired by python3's stdlib. | |
356 end = time.time() + timeout | |
357 delay = 0.001 | |
358 while True: | |
359 try: | |
360 pid, sts = subprocess._eintr_retry_call( | |
361 os.waitpid, self.pid, os.WNOHANG) | |
362 except OSError as e: | |
363 if e.errno != errno.ECHILD: | |
364 raise | |
365 pid = self.pid | |
366 sts = 0 | |
367 if pid == self.pid: | |
368 # This sets self.returncode. | |
369 self._handle_exitstatus(sts) | |
370 break | |
371 remaining = end - time.time() | |
372 if remaining <= 0: | |
373 raise TimeoutExpired(self.args, timeout) | |
374 delay = min(delay * 2, remaining, .05) | |
375 time.sleep(delay) | |
376 | |
377 if not self.end: | |
378 # communicate() uses wait() internally. | |
379 self.end = time.time() | |
380 return self.returncode | |
381 | |
382 def poll(self): | |
383 ret = super(Popen, self).poll() | |
384 if ret is not None and not self.end: | |
385 self.end = time.time() | |
386 return ret | |
387 | |
388 def yield_any(self, maxsize=None, timeout=None): | |
389 """Yields output until the process terminates. | |
390 | |
391 Unlike wait(), does not raise TimeoutExpired. | |
392 | |
393 Yields: | |
394 (pipename, data) where pipename is either 'stdout', 'stderr' or None in | |
395 case of timeout or when the child process closed one of the pipe(s) and | |
396 all pending data on the pipe was read. | |
397 | |
398 Arguments: | |
399 - maxsize: See recv_any(). Can be a callable function. | |
400 - timeout: If None, the call is blocking. If set, yields None, None if no | |
401 data is available within |timeout| seconds. It resets itself after | |
402 each yield. Can be a callable function. | |
403 """ | |
404 assert self.stdout or self.stderr | |
405 if timeout is not None: | |
406 # timeout=0 effectively means that the pipe is continuously polled. | |
407 if isinstance(timeout, (int, float)): | |
408 assert timeout >= 0, timeout | |
409 old_timeout = timeout | |
410 timeout = lambda: old_timeout | |
411 else: | |
412 assert callable(timeout), timeout | |
413 | |
414 if maxsize is not None and not callable(maxsize): | |
415 assert isinstance(maxsize, (int, float)), maxsize | |
416 | |
417 last_yield = time.time() | |
418 while self.poll() is None: | |
419 to = (None if timeout is None | |
420 else max(timeout() - (time.time() - last_yield), 0)) | |
421 t, data = self.recv_any( | |
422 maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to) | |
423 if data or to is 0: | |
424 yield t, data | |
425 last_yield = time.time() | |
426 | |
427 # Read all remaining output in the pipes. | |
428 # There is 3 cases: | |
429 # - pipes get closed automatically by the calling process before it exits | |
430 # - pipes are closed automated by the OS | |
431 # - pipes are kept open due to grand-children processes outliving the | |
432 # children process. | |
433 while True: | |
434 ms = maxsize | |
435 if callable(maxsize): | |
436 ms = maxsize() | |
437 # timeout=0 is mainly to handle the case where a grand-children process | |
438 # outlives the process started. | |
439 t, data = self.recv_any(maxsize=ms, timeout=0) | |
440 if not data: | |
441 break | |
442 yield t, data | |
443 | |
444 def recv_any(self, maxsize=None, timeout=None): | |
445 """Reads from the first pipe available from stdout and stderr. | |
446 | |
447 Unlike wait(), it does not throw TimeoutExpired. | |
448 | |
449 Arguments: | |
450 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. | |
451 - timeout: If None, it is blocking. If 0 or above, will return None if no | |
452 data is available within |timeout| seconds. | |
453 | |
454 Returns: | |
455 tuple(pipename or None, str(data)). pipename is one of 'stdout' or | |
456 'stderr'. | |
457 """ | |
458 # recv_multi_impl will early exit on a closed connection. Loop accordingly | |
459 # to simplify call sites. | |
460 while True: | |
461 pipes = [ | |
462 x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0] | |
463 ] | |
464 # If both stdout and stderr have the exact file handle, they are | |
465 # effectively the same pipe. Deduplicate it since otherwise it confuses | |
466 # recv_multi_impl(). | |
467 if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno(): | |
468 pipes.pop(0) | |
469 | |
470 if not pipes: | |
471 return None, None | |
472 start = time.time() | |
473 conns, names = zip(*pipes) | |
474 index, data, closed = recv_multi_impl(conns, maxsize, timeout) | |
475 if index is None: | |
476 return index, data | |
477 if closed: | |
478 self._close(names[index]) | |
479 if not data: | |
480 # Loop again. The other pipe may still be open. | |
481 if timeout: | |
482 timeout -= (time.time() - start) | |
483 continue | |
484 | |
485 if self.universal_newlines and data: | |
486 data = self._translate_newlines(data) | |
487 return names[index], data | |
488 | |
489 def recv_out(self, maxsize=None, timeout=None): | |
490 """Reads from stdout synchronously with timeout.""" | |
491 return self._recv('stdout', maxsize, timeout) | |
492 | |
493 def recv_err(self, maxsize=None, timeout=None): | |
494 """Reads from stderr synchronously with timeout.""" | |
495 return self._recv('stderr', maxsize, timeout) | |
496 | |
497 def terminate(self): | |
498 """Tries to do something saner on Windows that the stdlib. | |
499 | |
500 Windows: | |
501 self.detached/CREATE_NEW_PROCESS_GROUP determines what can be used: | |
502 - If set, only SIGBREAK can be sent and it is sent to a single process. | |
503 - If not set, in theory only SIGINT can be used and *all processes* in | |
504 the processgroup receive it. In practice, we just kill the process. | |
505 See http://msdn.microsoft.com/library/windows/desktop/ms683155.aspx | |
506 The default on Windows is to call TerminateProcess() always, which is not | |
507 useful. | |
508 | |
509 On Posix, always send SIGTERM. | |
510 """ | |
511 try: | |
512 if subprocess.mswindows and self.detached: | |
513 return self.send_signal(signal.CTRL_BREAK_EVENT) | |
514 super(Popen, self).terminate() | |
515 except OSError: | |
516 # The function will throw if the process terminated in-between. Swallow | |
517 # this. | |
518 pass | |
519 | |
520 def kill(self): | |
521 """Kills the process and its children if possible. | |
522 | |
523 Swallows exceptions and return True on success. | |
524 """ | |
525 if self.gid: | |
526 try: | |
527 os.killpg(self.gid, signal.SIGKILL) | |
528 except OSError: | |
529 return False | |
530 else: | |
531 try: | |
532 super(Popen, self).kill() | |
533 except OSError: | |
534 return False | |
535 return True | |
536 | |
537 def _close(self, which): | |
538 """Closes either stdout or stderr.""" | |
539 getattr(self, which).close() | |
540 setattr(self, which, None) | |
541 | |
542 def _recv(self, which, maxsize, timeout): | |
543 """Reads from one of stdout or stderr synchronously with timeout.""" | |
544 conn = getattr(self, which) | |
545 if conn is None: | |
546 return None | |
547 _, data, closed = recv_multi_impl([conn], maxsize, timeout) | |
548 if closed: | |
549 self._close(which) | |
550 if self.universal_newlines and data: | |
551 data = self._translate_newlines(data) | |
552 return data | |
553 | |
554 | |
555 @contextlib.contextmanager | |
556 def set_signal_handler(signals, handler): | |
557 """Temporarilly override signals handler. | |
558 | |
559 Useful when waiting for a child process to handle signals like SIGTERM, so the | |
560 signal can be propagated to the child process. | |
561 """ | |
562 previous = {s: signal.signal(s, handler) for s in signals} | |
563 try: | |
564 yield | |
565 finally: | |
566 for sig, h in previous.iteritems(): | |
567 signal.signal(sig, h) | |
568 | |
569 | |
570 def call(*args, **kwargs): | |
571 """Adds support for timeout.""" | |
572 timeout = kwargs.pop('timeout', None) | |
573 return Popen(*args, **kwargs).wait(timeout) | |
574 | |
575 | |
576 def check_call(*args, **kwargs): | |
577 """Adds support for timeout.""" | |
578 retcode = call(*args, **kwargs) | |
579 if retcode: | |
580 raise CalledProcessError(retcode, kwargs.get('args') or args[0]) | |
581 return 0 | |
582 | |
583 | |
584 def check_output(*args, **kwargs): | |
585 """Adds support for timeout.""" | |
586 timeout = kwargs.pop('timeout', None) | |
587 if 'stdout' in kwargs: | |
588 raise ValueError('stdout argument not allowed, it will be overridden.') | |
589 process = Popen(stdout=PIPE, *args, **kwargs) | |
590 output, _ = process.communicate(timeout=timeout) | |
591 retcode = process.poll() | |
592 if retcode: | |
593 raise CalledProcessError(retcode, kwargs.get('args') or args[0], output) | |
594 return output | |
595 | |
596 | |
597 def call_with_timeout(args, timeout, **kwargs): | |
598 """Runs an executable; kill it in case of timeout.""" | |
599 proc = Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs) | |
600 try: | |
601 out, err = proc.communicate(timeout=timeout) | |
602 except TimeoutExpired as e: | |
603 out = e.output | |
604 err = e.stderr | |
605 proc.kill() | |
606 proc.wait() | |
607 return out, err, proc.returncode, proc.duration() | |
OLD | NEW |