| OLD | NEW |
| 1 # Copyright 2013 The LUCI Authors. All rights reserved. | 1 # Copyright 2013 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """subprocess42 is the answer to life the universe and everything. | 5 """subprocess42 is the answer to life the universe and everything. |
| 6 | 6 |
| 7 It has the particularity of having a Popen implementation that can yield output | 7 It has the particularity of having a Popen implementation that can yield output |
| 8 as it is produced while implementing a timeout and NOT requiring the use of | 8 as it is produced while implementing a timeout and NOT requiring the use of |
| 9 worker threads. | 9 worker threads. |
| 10 | 10 |
| 11 Example: | 11 Example: |
| 12 Wait for a child process with a timeout, send SIGTERM, wait a grace period | 12 Wait for a child process with a timeout, send SIGTERM, wait a grace period |
| 13 then send SIGKILL: | 13 then send SIGKILL: |
| 14 | 14 |
| 15 def wait_terminate_then_kill(proc, timeout, grace): | 15 def wait_terminate_then_kill(proc, timeout, grace): |
| 16 try: | 16 try: |
| 17 return proc.wait(timeout) | 17 return proc.wait(timeout) |
| 18 except subprocess42.TimeoutExpired: | 18 except subprocess42.TimeoutExpired: |
| 19 proc.terminate() | 19 proc.terminate() |
| 20 try: | 20 try: |
| 21 return proc.wait(grace) | 21 return proc.wait(grace) |
| 22 except subprocess42.TimeoutExpired: | 22 except subprocess42.TimeoutExpired: |
| 23 proc.kill() | 23 proc.kill() |
| 24 return proc.wait() | 24 return proc.wait() |
| 25 | 25 |
| 26 | 26 |
| 27 TODO(maruel): Add VOID support like subprocess2. | 27 TODO(maruel): Add VOID support like subprocess2. |
| 28 """ | 28 """ |
| 29 | 29 |
| 30 import collections |
| 30 import contextlib | 31 import contextlib |
| 31 import errno | 32 import errno |
| 32 import os | 33 import os |
| 33 import signal | 34 import signal |
| 34 import sys | 35 import sys |
| 35 import threading | 36 import threading |
| 36 import time | 37 import time |
| 37 | 38 |
| 38 import subprocess | 39 import subprocess |
| 39 | 40 |
| (...skipping 340 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 380 # communicate() uses wait() internally. | 381 # communicate() uses wait() internally. |
| 381 self.end = time.time() | 382 self.end = time.time() |
| 382 return self.returncode | 383 return self.returncode |
| 383 | 384 |
| 384 def poll(self): | 385 def poll(self): |
| 385 ret = super(Popen, self).poll() | 386 ret = super(Popen, self).poll() |
| 386 if ret is not None and not self.end: | 387 if ret is not None and not self.end: |
| 387 self.end = time.time() | 388 self.end = time.time() |
| 388 return ret | 389 return ret |
| 389 | 390 |
| 391 def yield_any_line(self, **kwargs): |
| 392 """Yields lines until the process terminates. |
| 393 |
| 394 Like yield_any, but yields lines. |
| 395 """ |
| 396 return split(self.yield_any(**kwargs)) |
| 397 |
| 390 def yield_any(self, maxsize=None, timeout=None): | 398 def yield_any(self, maxsize=None, timeout=None): |
| 391 """Yields output until the process terminates. | 399 """Yields output until the process terminates. |
| 392 | 400 |
| 393 Unlike wait(), does not raise TimeoutExpired. | 401 Unlike wait(), does not raise TimeoutExpired. |
| 394 | 402 |
| 395 Yields: | 403 Yields: |
| 396 (pipename, data) where pipename is either 'stdout', 'stderr' or None in | 404 (pipename, data) where pipename is either 'stdout', 'stderr' or None in |
| 397 case of timeout or when the child process closed one of the pipe(s) and | 405 case of timeout or when the child process closed one of the pipe(s) and |
| 398 all pending data on the pipe was read. | 406 all pending data on the pipe was read. |
| 399 | 407 |
| (...skipping 11 matching lines...) Expand all Loading... |
| 411 old_timeout = timeout | 419 old_timeout = timeout |
| 412 timeout = lambda: old_timeout | 420 timeout = lambda: old_timeout |
| 413 else: | 421 else: |
| 414 assert callable(timeout), timeout | 422 assert callable(timeout), timeout |
| 415 | 423 |
| 416 if maxsize is not None and not callable(maxsize): | 424 if maxsize is not None and not callable(maxsize): |
| 417 assert isinstance(maxsize, (int, float)), maxsize | 425 assert isinstance(maxsize, (int, float)), maxsize |
| 418 | 426 |
| 419 last_yield = time.time() | 427 last_yield = time.time() |
| 420 while self.poll() is None: | 428 while self.poll() is None: |
| 421 to = (None if timeout is None | 429 to = timeout() if timeout else None |
| 422 else max(timeout() - (time.time() - last_yield), 0)) | 430 if to is not None: |
| 431 to = max(to - (time.time() - last_yield), 0) |
| 423 t, data = self.recv_any( | 432 t, data = self.recv_any( |
| 424 maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to) | 433 maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to) |
| 425 if data or to is 0: | 434 if data or to is 0: |
| 426 yield t, data | 435 yield t, data |
| 427 last_yield = time.time() | 436 last_yield = time.time() |
| 428 | 437 |
| 429 # Read all remaining output in the pipes. | 438 # Read all remaining output in the pipes. |
| 430 # There is 3 cases: | 439 # There is 3 cases: |
| 431 # - pipes get closed automatically by the calling process before it exits | 440 # - pipes get closed automatically by the calling process before it exits |
| 432 # - pipes are closed automated by the OS | 441 # - pipes are closed automated by the OS |
| 433 # - pipes are kept open due to grand-children processes outliving the | 442 # - pipes are kept open due to grand-children processes outliving the |
| 434 # children process. | 443 # children process. |
| 435 while True: | 444 while True: |
| 436 ms = maxsize | 445 ms = maxsize |
| 437 if callable(maxsize): | 446 if callable(maxsize): |
| 438 ms = maxsize() | 447 ms = maxsize() |
| 439 # timeout=0 is mainly to handle the case where a grand-children process | 448 # timeout=0 is mainly to handle the case where a grand-children process |
| 440 # outlives the process started. | 449 # outlives the process started. |
| 441 t, data = self.recv_any(maxsize=ms, timeout=0) | 450 t, data = self.recv_any(maxsize=ms, timeout=0) |
| 442 if not data: | 451 if not data: |
| 443 break | 452 break |
| 444 yield t, data | 453 yield t, data |
| 445 | 454 |
| 446 def recv_any(self, maxsize=None, timeout=None): | 455 def recv_any(self, maxsize=None, timeout=None): |
| 447 """Reads from the first pipe available from stdout and stderr. | 456 """Reads from the first pipe available from stdout and stderr. |
| 448 | 457 |
| 449 Unlike wait(), it does not throw TimeoutExpired. | 458 Unlike wait(), does not throw TimeoutExpired. |
| 450 | 459 |
| 451 Arguments: | 460 Arguments: |
| 452 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. | 461 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE. |
| 453 - timeout: If None, it is blocking. If 0 or above, will return None if no | 462 - timeout: If None, it is blocking. If 0 or above, will return None if no |
| 454 data is available within |timeout| seconds. | 463 data is available within |timeout| seconds. |
| 455 | 464 |
| 456 Returns: | 465 Returns: |
| 457 tuple(pipename or None, str(data)). pipename is one of 'stdout' or | 466 tuple(pipename or None, str(data)). pipename is one of 'stdout' or |
| 458 'stderr'. | 467 'stderr'. |
| 459 """ | 468 """ |
| (...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 628 SEM_NOGPFAULTERRORBOX = 2 | 637 SEM_NOGPFAULTERRORBOX = 2 |
| 629 SEM_NOALIGNMENTFAULTEXCEPT = 0x8000 | 638 SEM_NOALIGNMENTFAULTEXCEPT = 0x8000 |
| 630 ctypes.windll.kernel32.SetErrorMode( | 639 ctypes.windll.kernel32.SetErrorMode( |
| 631 SEM_FAILCRITICALERRORS|SEM_NOGPFAULTERRORBOX| | 640 SEM_FAILCRITICALERRORS|SEM_NOGPFAULTERRORBOX| |
| 632 SEM_NOALIGNMENTFAULTEXCEPT) | 641 SEM_NOALIGNMENTFAULTEXCEPT) |
| 633 # TODO(maruel): Other OSes. | 642 # TODO(maruel): Other OSes. |
| 634 # - OSX, need to figure out a way to make the following process tree local: | 643 # - OSX, need to figure out a way to make the following process tree local: |
| 635 # defaults write com.apple.CrashReporter UseUNC 1 | 644 # defaults write com.apple.CrashReporter UseUNC 1 |
| 636 # defaults write com.apple.CrashReporter DialogType none | 645 # defaults write com.apple.CrashReporter DialogType none |
| 637 # - Ubuntu, disable apport if needed. | 646 # - Ubuntu, disable apport if needed. |
| 647 |
| 648 |
| 649 def split(data, sep='\n'): |
| 650 """Splits pipe data by |sep|. Does some buffering. |
| 651 |
| 652 For example, [('stdout', 'a\nb'), ('stdout', '\n'), ('stderr', 'c\n')] -> |
| 653 [('stdout', 'a'), ('stdout', 'b'), ('stderr', 'c')]. |
| 654 |
| 655 Args: |
| 656 data: iterable of tuples (pipe_name, bytes). |
| 657 |
| 658 Returns: |
| 659 An iterator of tuples (pipe_name, bytes) where bytes is the input data |
| 660 but split by sep into separate tuples. |
| 661 """ |
| 662 # A dict {pipe_name -> list of pending chunks without separators} |
| 663 pending_chunks = collections.defaultdict(list) |
| 664 for pipe_name, chunk in data: |
| 665 if chunk is None: |
| 666 # Happens if a pipe is closed. |
| 667 continue |
| 668 |
| 669 pending = pending_chunks[pipe_name] |
| 670 start = 0 # offset in chunk to start |sep| search from |
| 671 while start < len(chunk): |
| 672 j = chunk.find(sep, start) |
| 673 if j == -1: |
| 674 pending_chunks[pipe_name].append(chunk[start:]) |
| 675 break |
| 676 |
| 677 to_emit = chunk[start:j] |
| 678 start = j + 1 |
| 679 if pending: |
| 680 # prepend and forget |
| 681 to_emit = ''.join(pending) + to_emit |
| 682 pending = [] |
| 683 pending_chunks[pipe_name] = pending |
| 684 yield pipe_name, to_emit |
| 685 |
| 686 # Emit remaining chunks that don't end with separators as is. |
| 687 for pipe_name, chunks in sorted(pending_chunks.iteritems()): |
| 688 if chunks: |
| 689 yield pipe_name, ''.join(chunks) |
| OLD | NEW |