Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(333)

Side by Side Diff: subprocess2.py

Issue 14826003: Refactor nag functionality in to NagTimer class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Simplify sleep_time calculation Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « gclient_utils.py ('k') | tests/gclient_scm_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding=utf8 1 # coding=utf8
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 """Collection of subprocess wrapper functions. 5 """Collection of subprocess wrapper functions.
6 6
7 In theory you shouldn't need anything else in subprocess, or this module failed. 7 In theory you shouldn't need anything else in subprocess, or this module failed.
8 """ 8 """
9 9
10 import cStringIO 10 import cStringIO
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
125 # Requires modifications. 125 # Requires modifications.
126 env = env.copy() 126 env = env.copy()
127 def fix_lang(name): 127 def fix_lang(name):
128 if not is_english(name): 128 if not is_english(name):
129 env[name] = 'en_US.UTF-8' 129 env[name] = 'en_US.UTF-8'
130 fix_lang('LANG') 130 fix_lang('LANG')
131 fix_lang('LANGUAGE') 131 fix_lang('LANGUAGE')
132 return env 132 return env
133 133
134 134
135 class NagTimer(object):
136 """
137 Triggers a callback when a time interval passes without an event being fired.
138
139 For example, the event could be receiving terminal output from a subprocess;
140 and the callback could print a warning to stderr that the subprocess appeared
141 to be hung.
142 """
143 def __init__(self, interval, cb):
144 self.interval = interval
145 self.cb = cb
146 self.timer = threading.Timer(self.interval, self.fn)
147 self.last_output = self.previous_last_output = 0
148
149 def start(self):
150 self.last_output = self.previous_last_output = time.time()
151 self.timer.start()
152
153 def event(self):
154 self.last_output = time.time()
155
156 def fn(self):
157 now = time.time()
158 if self.last_output == self.previous_last_output:
159 self.cb(now - self.previous_last_output)
160 # Use 0.1 fudge factor, just in case
161 # (self.last_output - now) is very close to zero.
162 sleep_time = (self.last_output - now - 0.1) % self.interval
M-A Ruel 2013/05/03 16:42:58 % with float? You should use an explicit cast in t
szager1 2013/05/03 18:16:58 Can you explain why? I can't think of a good reas
M-A Ruel 2013/05/03 18:32:34 Ah ok I hadn't realized that.
163 self.previous_last_output = self.last_output
164 self.timer = threading.Timer(sleep_time + 0.1, self.fn)
165 self.timer.start()
166
167 def cancel(self):
168 self.timer.cancel()
169
170
135 class Popen(subprocess.Popen): 171 class Popen(subprocess.Popen):
136 """Wraps subprocess.Popen() with various workarounds. 172 """Wraps subprocess.Popen() with various workarounds.
137 173
138 - Forces English output since it's easier to parse the stdout if it is always 174 - Forces English output since it's easier to parse the stdout if it is always
139 in English. 175 in English.
140 - Sets shell=True on windows by default. You can override this by forcing 176 - Sets shell=True on windows by default. You can override this by forcing
141 shell parameter to a value. 177 shell parameter to a value.
142 - Adds support for VOID to not buffer when not needed. 178 - Adds support for VOID to not buffer when not needed.
143 - Adds self.start property. 179 - Adds self.start property.
144 180
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
223 effectiveness will be delayed accordingly. 259 effectiveness will be delayed accordingly.
224 """ 260 """
225 # Queue of either of <threadname> when done or (<threadname>, data). In 261 # Queue of either of <threadname> when done or (<threadname>, data). In
226 # theory we would like to limit to ~64kb items to not cause large memory 262 # theory we would like to limit to ~64kb items to not cause large memory
227 # usage when the callback blocks. It is not done because it slows down 263 # usage when the callback blocks. It is not done because it slows down
228 # processing on OSX10.6 by a factor of 2x, making it even slower than 264 # processing on OSX10.6 by a factor of 2x, making it even slower than
229 # Windows! Revisit this decision if it becomes a problem, e.g. crash 265 # Windows! Revisit this decision if it becomes a problem, e.g. crash
230 # because of memory exhaustion. 266 # because of memory exhaustion.
231 queue = Queue.Queue() 267 queue = Queue.Queue()
232 done = threading.Event() 268 done = threading.Event()
233 timer = [] 269 nag = None
234 last_output = [time.time()] * 2
235 270
236 def write_stdin(): 271 def write_stdin():
237 try: 272 try:
238 stdin_io = cStringIO.StringIO(input) 273 stdin_io = cStringIO.StringIO(input)
239 while True: 274 while True:
240 data = stdin_io.read(1024) 275 data = stdin_io.read(1024)
241 if data: 276 if data:
242 self.stdin.write(data) 277 self.stdin.write(data)
243 else: 278 else:
244 self.stdin.close() 279 self.stdin.close()
245 break 280 break
246 finally: 281 finally:
247 queue.put('stdin') 282 queue.put('stdin')
248 283
249 def _queue_pipe_read(pipe, name): 284 def _queue_pipe_read(pipe, name):
250 """Queues characters read from a pipe into a queue.""" 285 """Queues characters read from a pipe into a queue."""
251 try: 286 try:
252 while True: 287 while True:
253 data = pipe.read(1) 288 data = pipe.read(1)
254 if not data: 289 if not data:
255 break 290 break
256 last_output[0] = time.time() 291 if nag:
292 nag.event()
257 queue.put((name, data)) 293 queue.put((name, data))
258 finally: 294 finally:
259 queue.put(name) 295 queue.put(name)
260 296
261 def nag_fn():
262 now = time.time()
263 if done.is_set():
264 return
265 if last_output[0] == last_output[1]:
266 logging.warn(' No output for %.0f seconds from command:' % (
267 now - last_output[1]))
268 logging.warn(' %s' % self.cmd_str)
269 # Use 0.1 fudge factor in case:
270 # now ~= last_output[0] + self.nag_timer
271 sleep_time = self.nag_timer + last_output[0] - now - 0.1
272 while sleep_time < 0:
273 sleep_time += self.nag_timer
274 last_output[1] = last_output[0]
275 timer[0] = threading.Timer(sleep_time, nag_fn)
276 timer[0].start()
277
278 def timeout_fn(): 297 def timeout_fn():
279 try: 298 try:
280 done.wait(self.timeout) 299 done.wait(self.timeout)
281 finally: 300 finally:
282 queue.put('timeout') 301 queue.put('timeout')
283 302
284 def wait_fn(): 303 def wait_fn():
285 try: 304 try:
286 self.wait() 305 self.wait()
287 finally: 306 finally:
(...skipping 18 matching lines...) Expand all
306 target=_queue_pipe_read, args=(self.stderr, 'stderr')) 325 target=_queue_pipe_read, args=(self.stderr, 'stderr'))
307 if input: 326 if input:
308 threads['stdin'] = threading.Thread(target=write_stdin) 327 threads['stdin'] = threading.Thread(target=write_stdin)
309 elif self.stdin: 328 elif self.stdin:
310 # Pipe but no input, make sure it's closed. 329 # Pipe but no input, make sure it's closed.
311 self.stdin.close() 330 self.stdin.close()
312 for t in threads.itervalues(): 331 for t in threads.itervalues():
313 t.start() 332 t.start()
314 333
315 if self.nag_timer: 334 if self.nag_timer:
316 timer.append(threading.Timer(self.nag_timer, nag_fn)) 335 def _nag_cb(elapsed):
317 timer[0].start() 336 logging.warn(' No output for %.0f seconds from command:' % elapsed)
337 logging.warn(' %s' % self.cmd_str)
338 nag = NagTimer(self.nag_timer, _nag_cb)
339 nag.start()
318 340
319 timed_out = False 341 timed_out = False
320 try: 342 try:
321 # This thread needs to be optimized for speed. 343 # This thread needs to be optimized for speed.
322 while threads: 344 while threads:
323 item = queue.get() 345 item = queue.get()
324 if item[0] == 'stdout': 346 if item[0] == 'stdout':
325 self.stdout_cb(item[1]) 347 self.stdout_cb(item[1])
326 elif item[0] == 'stderr': 348 elif item[0] == 'stderr':
327 self.stderr_cb(item[1]) 349 self.stderr_cb(item[1])
328 else: 350 else:
329 # A thread terminated. 351 # A thread terminated.
330 threads[item].join() 352 threads[item].join()
331 del threads[item] 353 del threads[item]
332 if item == 'wait': 354 if item == 'wait':
333 # Terminate the timeout thread if necessary. 355 # Terminate the timeout thread if necessary.
334 done.set() 356 done.set()
335 elif item == 'timeout' and not timed_out and self.poll() is None: 357 elif item == 'timeout' and not timed_out and self.poll() is None:
336 logging.debug('Timed out after %fs: killing' % self.timeout) 358 logging.debug('Timed out after %fs: killing' % self.timeout)
337 self.kill() 359 self.kill()
338 timed_out = True 360 timed_out = True
339 finally: 361 finally:
340 # Stop the threads. 362 # Stop the threads.
341 done.set() 363 done.set()
342 if timer: 364 if nag:
343 timer[0].cancel() 365 nag.cancel()
344 if 'wait' in threads: 366 if 'wait' in threads:
345 # Accelerate things, otherwise it would hang until the child process is 367 # Accelerate things, otherwise it would hang until the child process is
346 # done. 368 # done.
347 logging.debug('Killing child because of an exception') 369 logging.debug('Killing child because of an exception')
348 self.kill() 370 self.kill()
349 # Join threads. 371 # Join threads.
350 for thread in threads.itervalues(): 372 for thread in threads.itervalues():
351 thread.join() 373 thread.join()
352 if timed_out: 374 if timed_out:
353 self.returncode = TIMED_OUT 375 self.returncode = TIMED_OUT
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
474 496
475 - Throws if return code is not 0. 497 - Throws if return code is not 0.
476 - Works even prior to python 2.7. 498 - Works even prior to python 2.7.
477 - Blocks stdin by default if not specified since no output will be visible. 499 - Blocks stdin by default if not specified since no output will be visible.
478 - As per doc, "The stdout argument is not allowed as it is used internally." 500 - As per doc, "The stdout argument is not allowed as it is used internally."
479 """ 501 """
480 kwargs.setdefault('stdin', VOID) 502 kwargs.setdefault('stdin', VOID)
481 if 'stdout' in kwargs: 503 if 'stdout' in kwargs:
482 raise ValueError('stdout argument not allowed, it will be overridden.') 504 raise ValueError('stdout argument not allowed, it will be overridden.')
483 return check_call_out(args, stdout=PIPE, **kwargs)[0] 505 return check_call_out(args, stdout=PIPE, **kwargs)[0]
OLDNEW
« no previous file with comments | « gclient_utils.py ('k') | tests/gclient_scm_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698