| Index: subprocess2.py
|
| diff --git a/subprocess2.py b/subprocess2.py
|
| index e81798613cf6946789841df1dd4fce44417d9897..ac44555dcf5a038478ca172669035d974910382d 100644
|
| --- a/subprocess2.py
|
| +++ b/subprocess2.py
|
| @@ -132,6 +132,42 @@ def get_english_env(env):
|
| return env
|
|
|
|
|
| +class NagTimer(object):
|
| + """
|
| + Triggers a callback when a time interval passes without an event being fired.
|
| +
|
| + For example, the event could be receiving terminal output from a subprocess;
|
| + and the callback could print a warning to stderr that the subprocess appeared
|
| + to be hung.
|
| + """
|
| + def __init__(self, interval, cb):
|
| + self.interval = interval
|
| + self.cb = cb
|
| + self.timer = threading.Timer(self.interval, self.fn)
|
| + self.last_output = self.previous_last_output = 0
|
| +
|
| + def start(self):
|
| + self.last_output = self.previous_last_output = time.time()
|
| + self.timer.start()
|
| +
|
| + def event(self):
|
| + self.last_output = time.time()
|
| +
|
| + def fn(self):
|
| + now = time.time()
|
| + if self.last_output == self.previous_last_output:
|
| + self.cb(now - self.previous_last_output)
|
| + # Use 0.1 fudge factor, just in case
|
| + # (self.last_output - now) is very close to zero.
|
| + sleep_time = (self.last_output - now - 0.1) % self.interval
|
| + self.previous_last_output = self.last_output
|
| + self.timer = threading.Timer(sleep_time + 0.1, self.fn)
|
| + self.timer.start()
|
| +
|
| + def cancel(self):
|
| + self.timer.cancel()
|
| +
|
| +
|
| class Popen(subprocess.Popen):
|
| """Wraps subprocess.Popen() with various workarounds.
|
|
|
| @@ -192,6 +228,7 @@ class Popen(subprocess.Popen):
|
| self.start = time.time()
|
| self.timeout = None
|
| self.nag_timer = None
|
| + self.nag_max = None
|
| self.shell = kwargs.get('shell', None)
|
| # Silence pylint on MacOSX
|
| self.returncode = None
|
| @@ -230,8 +267,7 @@ class Popen(subprocess.Popen):
|
| # because of memory exhaustion.
|
| queue = Queue.Queue()
|
| done = threading.Event()
|
| - timer = []
|
| - last_output = [time.time()] * 2
|
| + nag = None
|
|
|
| def write_stdin():
|
| try:
|
| @@ -253,28 +289,12 @@ class Popen(subprocess.Popen):
|
| data = pipe.read(1)
|
| if not data:
|
| break
|
| - last_output[0] = time.time()
|
| + if nag:
|
| + nag.event()
|
| queue.put((name, data))
|
| finally:
|
| queue.put(name)
|
|
|
| - def nag_fn():
|
| - now = time.time()
|
| - if done.is_set():
|
| - return
|
| - if last_output[0] == last_output[1]:
|
| - logging.warn(' No output for %.0f seconds from command:' % (
|
| - now - last_output[1]))
|
| - logging.warn(' %s' % self.cmd_str)
|
| - # Use 0.1 fudge factor in case:
|
| - # now ~= last_output[0] + self.nag_timer
|
| - sleep_time = self.nag_timer + last_output[0] - now - 0.1
|
| - while sleep_time < 0:
|
| - sleep_time += self.nag_timer
|
| - last_output[1] = last_output[0]
|
| - timer[0] = threading.Timer(sleep_time, nag_fn)
|
| - timer[0].start()
|
| -
|
| def timeout_fn():
|
| try:
|
| done.wait(self.timeout)
|
| @@ -313,8 +333,15 @@ class Popen(subprocess.Popen):
|
| t.start()
|
|
|
| if self.nag_timer:
|
| - timer.append(threading.Timer(self.nag_timer, nag_fn))
|
| - timer[0].start()
|
| + def _nag_cb(elapsed):
|
| + logging.warn(' No output for %.0f seconds from command:' % elapsed)
|
| + logging.warn(' %s' % self.cmd_str)
|
| + if (self.nag_max and
|
| + int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max):
|
| + queue.put('timeout')
|
| + done.set() # Must do this so that timeout thread stops waiting.
|
| + nag = NagTimer(self.nag_timer, _nag_cb)
|
| + nag.start()
|
|
|
| timed_out = False
|
| try:
|
| @@ -327,20 +354,22 @@ class Popen(subprocess.Popen):
|
| self.stderr_cb(item[1])
|
| else:
|
| # A thread terminated.
|
| - threads[item].join()
|
| - del threads[item]
|
| + if item in threads:
|
| + threads[item].join()
|
| + del threads[item]
|
| if item == 'wait':
|
| # Terminate the timeout thread if necessary.
|
| done.set()
|
| elif item == 'timeout' and not timed_out and self.poll() is None:
|
| - logging.debug('Timed out after %fs: killing' % self.timeout)
|
| + logging.debug('Timed out after %.0fs: killing' % (
|
| + time.time() - self.start))
|
| self.kill()
|
| timed_out = True
|
| finally:
|
| # Stop the threads.
|
| done.set()
|
| - if timer:
|
| - timer[0].cancel()
|
| + if nag:
|
| + nag.cancel()
|
| if 'wait' in threads:
|
| # Accelerate things, otherwise it would hang until the child process is
|
| # done.
|
| @@ -353,7 +382,8 @@ class Popen(subprocess.Popen):
|
| self.returncode = TIMED_OUT
|
|
|
| # pylint: disable=W0221,W0622
|
| - def communicate(self, input=None, timeout=None, nag_timer=None):
|
| + def communicate(self, input=None, timeout=None, nag_timer=None,
|
| + nag_max=None):
|
| """Adds timeout and callbacks support.
|
|
|
| Returns (stdout, stderr) like subprocess.Popen().communicate().
|
| @@ -365,6 +395,7 @@ class Popen(subprocess.Popen):
|
| """
|
| self.timeout = timeout
|
| self.nag_timer = nag_timer
|
| + self.nag_max = nag_max
|
| if (not self.timeout and not self.nag_timer and
|
| not self.stdout_cb and not self.stderr_cb):
|
| return super(Popen, self).communicate(input)
|
| @@ -393,7 +424,7 @@ class Popen(subprocess.Popen):
|
| return (stdout, stderr)
|
|
|
|
|
| -def communicate(args, timeout=None, nag_timer=None, **kwargs):
|
| +def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs):
|
| """Wraps subprocess.Popen().communicate() and add timeout support.
|
|
|
| Returns ((stdout, stderr), returncode).
|
|
|