OLD | NEW |
---|---|
1 # coding=utf8 | 1 # coding=utf8 |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 """Collection of subprocess wrapper functions. | 5 """Collection of subprocess wrapper functions. |
6 | 6 |
7 In theory you shouldn't need anything else in subprocess, or this module failed. | 7 In theory you shouldn't need anything else in subprocess, or this module failed. |
8 """ | 8 """ |
9 | 9 |
10 import cStringIO | 10 import cStringIO |
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
125 # Requires modifications. | 125 # Requires modifications. |
126 env = env.copy() | 126 env = env.copy() |
127 def fix_lang(name): | 127 def fix_lang(name): |
128 if not is_english(name): | 128 if not is_english(name): |
129 env[name] = 'en_US.UTF-8' | 129 env[name] = 'en_US.UTF-8' |
130 fix_lang('LANG') | 130 fix_lang('LANG') |
131 fix_lang('LANGUAGE') | 131 fix_lang('LANGUAGE') |
132 return env | 132 return env |
133 | 133 |
134 | 134 |
135 class NagTimer(object): | |
136 """ | |
137 Triggers a callback when a time interval passes without an event being fired. | |
138 | |
139 For example, the event could be receiving terminal output from a subprocess; | |
140 and the callback could print a warning to stderr that the subprocess appeared | |
141 to be hung. | |
142 """ | |
143 def __init__(self, interval, cb): | |
144 self.interval = interval | |
145 self.cb = cb | |
146 self.timer = threading.Timer(self.interval, self.fn) | |
147 self.last_output = self.previous_last_output = 0 | |
148 | |
149 def start(self): | |
150 self.last_output = self.previous_last_output = time.time() | |
151 self.timer.start() | |
152 | |
153 def event(self): | |
154 self.last_output = time.time() | |
155 | |
156 def fn(self): | |
157 now = time.time() | |
158 if self.last_output == self.previous_last_output: | |
159 self.cb(now - self.previous_last_output) | |
160 # Use 0.1 fudge factor, just in case | |
161 # (self.last_output - now) is very close to zero. | |
162 sleep_time = (self.last_output - now - 0.1) % self.interval | |
M-A Ruel
2013/05/03 16:42:58
% with float? You should use an explicit cast in t
szager1
2013/05/03 18:16:58
Can you explain why? I can't think of a good reas
M-A Ruel
2013/05/03 18:32:34
Ah ok I hadn't realized that.
| |
163 self.previous_last_output = self.last_output | |
164 self.timer = threading.Timer(sleep_time + 0.1, self.fn) | |
165 self.timer.start() | |
166 | |
167 def cancel(self): | |
168 self.timer.cancel() | |
169 | |
170 | |
135 class Popen(subprocess.Popen): | 171 class Popen(subprocess.Popen): |
136 """Wraps subprocess.Popen() with various workarounds. | 172 """Wraps subprocess.Popen() with various workarounds. |
137 | 173 |
138 - Forces English output since it's easier to parse the stdout if it is always | 174 - Forces English output since it's easier to parse the stdout if it is always |
139 in English. | 175 in English. |
140 - Sets shell=True on windows by default. You can override this by forcing | 176 - Sets shell=True on windows by default. You can override this by forcing |
141 shell parameter to a value. | 177 shell parameter to a value. |
142 - Adds support for VOID to not buffer when not needed. | 178 - Adds support for VOID to not buffer when not needed. |
143 - Adds self.start property. | 179 - Adds self.start property. |
144 | 180 |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
223 effectiveness will be delayed accordingly. | 259 effectiveness will be delayed accordingly. |
224 """ | 260 """ |
225 # Queue of either of <threadname> when done or (<threadname>, data). In | 261 # Queue of either of <threadname> when done or (<threadname>, data). In |
226 # theory we would like to limit to ~64kb items to not cause large memory | 262 # theory we would like to limit to ~64kb items to not cause large memory |
227 # usage when the callback blocks. It is not done because it slows down | 263 # usage when the callback blocks. It is not done because it slows down |
228 # processing on OSX10.6 by a factor of 2x, making it even slower than | 264 # processing on OSX10.6 by a factor of 2x, making it even slower than |
229 # Windows! Revisit this decision if it becomes a problem, e.g. crash | 265 # Windows! Revisit this decision if it becomes a problem, e.g. crash |
230 # because of memory exhaustion. | 266 # because of memory exhaustion. |
231 queue = Queue.Queue() | 267 queue = Queue.Queue() |
232 done = threading.Event() | 268 done = threading.Event() |
233 timer = [] | 269 nag = None |
234 last_output = [time.time()] * 2 | |
235 | 270 |
236 def write_stdin(): | 271 def write_stdin(): |
237 try: | 272 try: |
238 stdin_io = cStringIO.StringIO(input) | 273 stdin_io = cStringIO.StringIO(input) |
239 while True: | 274 while True: |
240 data = stdin_io.read(1024) | 275 data = stdin_io.read(1024) |
241 if data: | 276 if data: |
242 self.stdin.write(data) | 277 self.stdin.write(data) |
243 else: | 278 else: |
244 self.stdin.close() | 279 self.stdin.close() |
245 break | 280 break |
246 finally: | 281 finally: |
247 queue.put('stdin') | 282 queue.put('stdin') |
248 | 283 |
249 def _queue_pipe_read(pipe, name): | 284 def _queue_pipe_read(pipe, name): |
250 """Queues characters read from a pipe into a queue.""" | 285 """Queues characters read from a pipe into a queue.""" |
251 try: | 286 try: |
252 while True: | 287 while True: |
253 data = pipe.read(1) | 288 data = pipe.read(1) |
254 if not data: | 289 if not data: |
255 break | 290 break |
256 last_output[0] = time.time() | 291 if nag: |
292 nag.event() | |
257 queue.put((name, data)) | 293 queue.put((name, data)) |
258 finally: | 294 finally: |
259 queue.put(name) | 295 queue.put(name) |
260 | 296 |
261 def nag_fn(): | |
262 now = time.time() | |
263 if done.is_set(): | |
264 return | |
265 if last_output[0] == last_output[1]: | |
266 logging.warn(' No output for %.0f seconds from command:' % ( | |
267 now - last_output[1])) | |
268 logging.warn(' %s' % self.cmd_str) | |
269 # Use 0.1 fudge factor in case: | |
270 # now ~= last_output[0] + self.nag_timer | |
271 sleep_time = self.nag_timer + last_output[0] - now - 0.1 | |
272 while sleep_time < 0: | |
273 sleep_time += self.nag_timer | |
274 last_output[1] = last_output[0] | |
275 timer[0] = threading.Timer(sleep_time, nag_fn) | |
276 timer[0].start() | |
277 | |
278 def timeout_fn(): | 297 def timeout_fn(): |
279 try: | 298 try: |
280 done.wait(self.timeout) | 299 done.wait(self.timeout) |
281 finally: | 300 finally: |
282 queue.put('timeout') | 301 queue.put('timeout') |
283 | 302 |
284 def wait_fn(): | 303 def wait_fn(): |
285 try: | 304 try: |
286 self.wait() | 305 self.wait() |
287 finally: | 306 finally: |
(...skipping 18 matching lines...) Expand all Loading... | |
306 target=_queue_pipe_read, args=(self.stderr, 'stderr')) | 325 target=_queue_pipe_read, args=(self.stderr, 'stderr')) |
307 if input: | 326 if input: |
308 threads['stdin'] = threading.Thread(target=write_stdin) | 327 threads['stdin'] = threading.Thread(target=write_stdin) |
309 elif self.stdin: | 328 elif self.stdin: |
310 # Pipe but no input, make sure it's closed. | 329 # Pipe but no input, make sure it's closed. |
311 self.stdin.close() | 330 self.stdin.close() |
312 for t in threads.itervalues(): | 331 for t in threads.itervalues(): |
313 t.start() | 332 t.start() |
314 | 333 |
315 if self.nag_timer: | 334 if self.nag_timer: |
316 timer.append(threading.Timer(self.nag_timer, nag_fn)) | 335 def _nag_cb(elapsed): |
317 timer[0].start() | 336 logging.warn(' No output for %.0f seconds from command:' % elapsed) |
337 logging.warn(' %s' % self.cmd_str) | |
338 nag = NagTimer(self.nag_timer, _nag_cb) | |
339 nag.start() | |
318 | 340 |
319 timed_out = False | 341 timed_out = False |
320 try: | 342 try: |
321 # This thread needs to be optimized for speed. | 343 # This thread needs to be optimized for speed. |
322 while threads: | 344 while threads: |
323 item = queue.get() | 345 item = queue.get() |
324 if item[0] == 'stdout': | 346 if item[0] == 'stdout': |
325 self.stdout_cb(item[1]) | 347 self.stdout_cb(item[1]) |
326 elif item[0] == 'stderr': | 348 elif item[0] == 'stderr': |
327 self.stderr_cb(item[1]) | 349 self.stderr_cb(item[1]) |
328 else: | 350 else: |
329 # A thread terminated. | 351 # A thread terminated. |
330 threads[item].join() | 352 threads[item].join() |
331 del threads[item] | 353 del threads[item] |
332 if item == 'wait': | 354 if item == 'wait': |
333 # Terminate the timeout thread if necessary. | 355 # Terminate the timeout thread if necessary. |
334 done.set() | 356 done.set() |
335 elif item == 'timeout' and not timed_out and self.poll() is None: | 357 elif item == 'timeout' and not timed_out and self.poll() is None: |
336 logging.debug('Timed out after %fs: killing' % self.timeout) | 358 logging.debug('Timed out after %fs: killing' % self.timeout) |
337 self.kill() | 359 self.kill() |
338 timed_out = True | 360 timed_out = True |
339 finally: | 361 finally: |
340 # Stop the threads. | 362 # Stop the threads. |
341 done.set() | 363 done.set() |
342 if timer: | 364 if nag: |
343 timer[0].cancel() | 365 nag.cancel() |
344 if 'wait' in threads: | 366 if 'wait' in threads: |
345 # Accelerate things, otherwise it would hang until the child process is | 367 # Accelerate things, otherwise it would hang until the child process is |
346 # done. | 368 # done. |
347 logging.debug('Killing child because of an exception') | 369 logging.debug('Killing child because of an exception') |
348 self.kill() | 370 self.kill() |
349 # Join threads. | 371 # Join threads. |
350 for thread in threads.itervalues(): | 372 for thread in threads.itervalues(): |
351 thread.join() | 373 thread.join() |
352 if timed_out: | 374 if timed_out: |
353 self.returncode = TIMED_OUT | 375 self.returncode = TIMED_OUT |
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
474 | 496 |
475 - Throws if return code is not 0. | 497 - Throws if return code is not 0. |
476 - Works even prior to python 2.7. | 498 - Works even prior to python 2.7. |
477 - Blocks stdin by default if not specified since no output will be visible. | 499 - Blocks stdin by default if not specified since no output will be visible. |
478 - As per doc, "The stdout argument is not allowed as it is used internally." | 500 - As per doc, "The stdout argument is not allowed as it is used internally." |
479 """ | 501 """ |
480 kwargs.setdefault('stdin', VOID) | 502 kwargs.setdefault('stdin', VOID) |
481 if 'stdout' in kwargs: | 503 if 'stdout' in kwargs: |
482 raise ValueError('stdout argument not allowed, it will be overridden.') | 504 raise ValueError('stdout argument not allowed, it will be overridden.') |
483 return check_call_out(args, stdout=PIPE, **kwargs)[0] | 505 return check_call_out(args, stdout=PIPE, **kwargs)[0] |
OLD | NEW |