Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: subprocess2.py

Issue 8505046: Revert r109283, r109282 and r109239. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding=utf8 1 # coding=utf8
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 """Collection of subprocess wrapper functions. 5 """Collection of subprocess wrapper functions.
6 6
7 In theory you shouldn't need anything else in subprocess, or this module failed. 7 In theory you shouldn't need anything else in subprocess, or this module failed.
8 """ 8 """
9 9
10 from __future__ import with_statement 10 from __future__ import with_statement
11 import cStringIO
12 import errno 11 import errno
13 import logging 12 import logging
14 import os 13 import os
15 import Queue
16 import select
17 import subprocess 14 import subprocess
18 import sys 15 import sys
16 import tempfile
19 import time 17 import time
20 import threading 18 import threading
21 19
22 if sys.platform != 'win32':
23 import fcntl
24
25
26 # Constants forwarded from subprocess. 20 # Constants forwarded from subprocess.
27 PIPE = subprocess.PIPE 21 PIPE = subprocess.PIPE
28 STDOUT = subprocess.STDOUT 22 STDOUT = subprocess.STDOUT
29 # Sends stdout or stderr to os.devnull. 23 # Sends stdout or stderr to os.devnull.
30 VOID = object() 24 VOID = object()
31 # Error code when a process was killed because it timed out. 25 # Error code when a process was killed because it timed out.
32 TIMED_OUT = -2001 26 TIMED_OUT = -2001
33 27
34 # Globals. 28 # Globals.
35 # Set to True if you somehow need to disable this hack. 29 # Set to True if you somehow need to disable this hack.
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
175 logging.debug(tmp_str) 169 logging.debug(tmp_str)
176 170
177 def fix(stream): 171 def fix(stream):
178 if kwargs.get(stream) in (VOID, os.devnull): 172 if kwargs.get(stream) in (VOID, os.devnull):
179 # Replaces VOID with handle to /dev/null. 173 # Replaces VOID with handle to /dev/null.
180 # Create a temporary file to workaround python's deadlock. 174 # Create a temporary file to workaround python's deadlock.
181 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait 175 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
182 # When the pipe fills up, it will deadlock this process. Using a real file 176 # When the pipe fills up, it will deadlock this process. Using a real file
183 # works around that issue. 177 # works around that issue.
184 kwargs[stream] = open(os.devnull, 'w') 178 kwargs[stream] = open(os.devnull, 'w')
185 if callable(kwargs.get(stream)):
186 # Callable stdout/stderr should be used only with call() wrappers.
187 kwargs[stream] = PIPE
188 179
189 fix('stdout') 180 fix('stdout')
190 fix('stderr') 181 fix('stderr')
191 182
192 try: 183 try:
193 return subprocess.Popen(args, **kwargs) 184 return subprocess.Popen(args, **kwargs)
194 except OSError, e: 185 except OSError, e:
195 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': 186 if e.errno == errno.EAGAIN and sys.platform == 'cygwin':
196 # Convert fork() emulation failure into a CygwinRebaseError(). 187 # Convert fork() emulation failure into a CygwinRebaseError().
197 raise CygwinRebaseError( 188 raise CygwinRebaseError(
198 e.errno, 189 e.errno,
199 args, 190 args,
200 kwargs.get('cwd'), 191 kwargs.get('cwd'),
201 None, 192 None,
202 'Visit ' 193 'Visit '
203 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure to ' 194 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure to '
204 'learn how to fix this error; you need to rebase your cygwin dlls') 195 'learn how to fix this error; you need to rebase your cygwin dlls')
205 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go 196 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go
206 # through 197 # through
207 raise 198 raise
208 199
209 200
210 def _queue_pipe_read(pipe, name, done, dest):
211 """Queues characters read from a pipe into a queue.
212
213 Left outside the _tee_threads function to not introduce a function closure
214 to speed up variable lookup.
215 """
216 while not done.isSet():
217 data = pipe.read(1)
218 if not data:
219 break
220 dest.put((name, data))
221 dest.put(name)
222
223
224 def _tee_threads(proc, timeout, start, stdin, args, kwargs):
225 """Does I/O for a process's pipes using thread.
226
227 It's the simplest and slowest implementation. Expect very slow behavior.
228
229 If there is a callback and it doesn't keep up with the calls, the timeout
230 effectiveness will be delayed accordingly.
231 """
232 # TODO(maruel): Implement a select based implementation on POSIX and a Windows
233 # one using WaitForMultipleObjects().
234 #
235 # Queue of either of <threadname> when done or (<threadname>, data).
236 # In theory we would like to limit to ~64kb items to not cause large memory
237 # usage when the callback blocks. It is not done because it slows down
238 # processing on OSX10.6 by a factor of 2x, making it even slower than Windows!
239 # Revisit this decision if it becomes a problem, e.g. crash because of
240 # memory exhaustion.
241 queue = Queue.Queue()
242 done = threading.Event()
243
244 def write_stdin():
245 stdin_io = cStringIO.StringIO(stdin)
246 while not done.isSet():
247 data = stdin_io.read(1024)
248 if data:
249 proc.stdin.write(data)
250 else:
251 proc.stdin.close()
252 break
253 queue.put('stdin')
254
255 def timeout_fn():
256 done.wait(timeout)
257 # No need to close the pipes since killing should be sufficient.
258 queue.put('timeout')
259
260 # Starts up to 4 threads:
261 # Read stdout
262 # Read stderr
263 # Write stdin
264 # Timeout
265 threads = {}
266 if timeout is not None:
267 threads['timeout'] = threading.Thread(target=timeout_fn)
268 if callable(kwargs.get('stdout')):
269 threads['stdout'] = threading.Thread(
270 target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue))
271 if callable(kwargs.get('stderr')):
272 threads['stderr'] = threading.Thread(
273 target=_queue_pipe_read,
274 args=(proc.stderr, 'stderr', done, queue))
275 if isinstance(stdin, str):
276 threads['stdin'] = threading.Thread(target=write_stdin)
277 for t in threads.itervalues():
278 t.daemon = True
279 t.start()
280
281 timed_out = False
282 try:
283 while proc.returncode is None:
284 assert threads
285 proc.poll()
286 item = queue.get()
287 if isinstance(item, str):
288 threads[item].join()
289 del threads[item]
290 if item == 'timeout' and not timed_out and proc.poll() is None:
291 logging.debug('Timed out: killing')
292 proc.kill()
293 timed_out = True
294 if not threads:
295 # We won't be waken up anymore. Need to busy loop.
296 break
297 else:
298 kwargs[item[0]](item[1])
299 finally:
300 # Stop the threads.
301 done.set()
302 # Join threads
303 for thread in threads.itervalues():
304 thread.join()
305
306 # Flush the queue.
307 try:
308 while True:
309 item = queue.get(False)
310 if isinstance(item, str):
311 if item == 'timeout':
312 # TODO(maruel): Does it make sense at that point?
313 if not timed_out and proc.poll() is None:
314 logging.debug('Timed out: killing')
315 proc.kill()
316 timed_out = True
317 else:
318 kwargs[item[0]](item[1])
319 except Queue.Empty:
320 pass
321
322 # Get the remainder.
323 if callable(kwargs.get('stdout')):
324 data = proc.stdout.read()
325 while data:
326 kwargs['stdout'](data)
327 data = proc.stdout.read()
328 if callable(kwargs.get('stderr')):
329 data = proc.stderr.read()
330 while data:
331 kwargs['stderr'](data)
332 data = proc.stderr.read()
333
334 if proc.returncode is None:
335 # Usually happens when killed with timeout but not listening to pipes.
336 proc.wait()
337
338 if timed_out:
339 return TIMED_OUT
340
341 return proc.returncode
342
343
344 def _read_pipe(handles, pipe, out_fn):
345 """Reads bytes from a pipe and calls the output callback."""
346 data = pipe.read()
347 if not data:
348 del handles[pipe]
349 else:
350 out_fn(data)
351
352
353 def _tee_posix(proc, timeout, start, stdin, args, kwargs):
354 """Polls a process and its pipe using select.select().
355
356 TODO(maruel): Implement a non-polling method for OSes that support it.
357 """
358 handles_r = {}
359 if callable(kwargs.get('stdout')):
360 handles_r[proc.stdout] = lambda: _read_pipe(
361 handles_r, proc.stdout, kwargs['stdout'])
362 if callable(kwargs.get('stderr')):
363 handles_r[proc.stderr] = lambda: _read_pipe(
364 handles_r, proc.stderr, kwargs['stderr'])
365
366 handles_w = {}
367 if isinstance(stdin, str):
368 stdin_io = cStringIO.StringIO(stdin)
369 def write_stdin():
370 data = stdin_io.read(1)
371 if data:
372 proc.stdin.write(data)
373 else:
374 del handles_w[proc.stdin]
375 proc.stdin.close()
376 handles_w[proc.stdin] = write_stdin
377 else:
378 # TODO(maruel): Fix me, it could be VOID.
379 assert stdin is None
380
381 # Make all the file objects of the child process non-blocking file.
382 # TODO(maruel): Test if a pipe is handed to the child process.
383 for pipe in (proc.stdin, proc.stdout, proc.stderr):
384 fileno = pipe and getattr(pipe, 'fileno', lambda: None)()
385 if fileno:
386 # Note: making a pipe non-blocking means the C stdio could act wrong. In
387 # particular, readline() cannot be used. Work around is to use os.read().
388 fl = fcntl.fcntl(fileno, fcntl.F_GETFL)
389 fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK)
390
391 timed_out = False
392 while handles_r or handles_w or (timeout and proc.poll() is None):
393 period = None
394 if timeout:
395 period = max(0, timeout - (time.time() - start))
396 if not period and not timed_out:
397 proc.kill()
398 timed_out = True
399 if timed_out:
400 period = 0.001
401
402 # It reconstructs objects on each loop, not very efficient.
403 reads, writes, _, = select.select(
404 handles_r.keys(), handles_w.keys(), [], period)
405 for read in reads:
406 handles_r[read]()
407 for write in writes:
408 handles_w[write]()
409
410 # No pipe open anymore and if there was a time out, the child process was
411 # killed already.
412 proc.wait()
413 if timed_out:
414 return TIMED_OUT
415 return proc.returncode
416
417
418 def communicate(args, timeout=None, **kwargs): 201 def communicate(args, timeout=None, **kwargs):
419 """Wraps subprocess.Popen().communicate(). 202 """Wraps subprocess.Popen().communicate().
420 203
421 Returns ((stdout, stderr), returncode). 204 Returns ((stdout, stderr), returncode).
422 205
423 - The process will be killed after |timeout| seconds and returncode set to 206 - The process will be killed after |timeout| seconds and returncode set to
424 TIMED_OUT. 207 TIMED_OUT.
425 - Automatically passes stdin content as input so do not specify stdin=PIPE. 208 - Automatically passes stdin content as input so do not specify stdin=PIPE.
426 """ 209 """
427 if timeout and kwargs.get('shell'):
428 raise TypeError(
429 'Using timeout and shell simultaneously will cause a process leak '
430 'since the shell will be killed instead of the child process.')
431
432 stdin = kwargs.pop('stdin', None) 210 stdin = kwargs.pop('stdin', None)
433 if stdin is not None: 211 if stdin is not None:
434 if stdin is VOID: 212 if stdin is VOID:
435 kwargs['stdin'] = open(os.devnull, 'r') 213 kwargs['stdin'] = open(os.devnull, 'r')
436 stdin = None 214 stdin = None
437 else: 215 else:
438 assert isinstance(stdin, basestring) 216 assert isinstance(stdin, basestring)
439 # When stdin is passed as an argument, use it as the actual input data and 217 # When stdin is passed as an argument, use it as the actual input data and
440 # set the Popen() parameter accordingly. 218 # set the Popen() parameter accordingly.
441 kwargs['stdin'] = PIPE 219 kwargs['stdin'] = PIPE
442 220
443 start = time.time() 221 if not timeout:
444 proc = Popen(args, **kwargs)
445 need_buffering = (timeout or
446 callable(kwargs.get('stdout')) or callable(kwargs.get('stderr')))
447
448 if not need_buffering:
449 # Normal workflow. 222 # Normal workflow.
450 if stdin not in (None, VOID): 223 proc = Popen(args, **kwargs)
224 if stdin is not None:
451 return proc.communicate(stdin), proc.returncode 225 return proc.communicate(stdin), proc.returncode
452 else: 226 else:
453 return proc.communicate(), proc.returncode 227 return proc.communicate(), proc.returncode
454 228
455 stdout = None 229 # Create a temporary file to workaround python's deadlock.
456 stderr = None
457 # Convert to a lambda to workaround python's deadlock.
458 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait 230 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
459 # When the pipe fills up, it will deadlock this process. Using a thread 231 # When the pipe fills up, it will deadlock this process. Using a real file
460 # works around that issue. No need for thread safe function since the call 232 # works around that issue.
461 # backs are guaranteed to be called from the main thread. 233 with tempfile.TemporaryFile() as buff:
462 if kwargs.get('stdout') == PIPE: 234 start = time.time()
463 stdout = [] 235 kwargs['stdout'] = buff
464 kwargs['stdout'] = stdout.append 236 proc = Popen(args, **kwargs)
465 if kwargs.get('stderr') == PIPE: 237 if stdin is not None:
466 stderr = [] 238 proc.stdin.write(stdin)
467 kwargs['stderr'] = stderr.append 239 while proc.returncode is None:
468 if sys.platform == 'win32': 240 proc.poll()
469 # On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE, 241 if timeout and (time.time() - start) > timeout:
470 # doesn't exist so _tee_win() cannot be used yet. 242 proc.kill()
471 returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) 243 proc.wait()
472 else: 244 # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT.
473 returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs) 245 proc.returncode = TIMED_OUT
474 if not stdout is None: 246 time.sleep(0.001)
475 stdout = ''.join(stdout) 247 # Now that the process died, reset the cursor and read the file.
476 if not stderr is None: 248 buff.seek(0)
477 stderr = ''.join(stderr) 249 out = [buff.read(), None]
478 return (stdout, stderr), returncode 250 return out, proc.returncode
479 251
480 252
481 def call(args, **kwargs): 253 def call(args, **kwargs):
482 """Emulates subprocess.call(). 254 """Emulates subprocess.call().
483 255
484 Automatically convert stdout=PIPE or stderr=PIPE to VOID. 256 Automatically convert stdout=PIPE or stderr=PIPE to VOID.
485 In no case they can be returned since no code path raises 257 In no case they can be returned since no code path raises
486 subprocess2.CalledProcessError. 258 subprocess2.CalledProcessError.
487 """ 259 """
488 if kwargs.get('stdout') == PIPE: 260 if kwargs.get('stdout') == PIPE:
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
529 301
530 Captures stdout of a process call and returns stdout only. 302 Captures stdout of a process call and returns stdout only.
531 303
532 - Throws if return code is not 0. 304 - Throws if return code is not 0.
533 - Works even prior to python 2.7. 305 - Works even prior to python 2.7.
534 - Blocks stdin by default if not specified since no output will be visible. 306 - Blocks stdin by default if not specified since no output will be visible.
535 - As per doc, "The stdout argument is not allowed as it is used internally." 307 - As per doc, "The stdout argument is not allowed as it is used internally."
536 """ 308 """
537 kwargs.setdefault('stdin', VOID) 309 kwargs.setdefault('stdin', VOID)
538 return check_call_out(args, stdout=PIPE, **kwargs)[0] 310 return check_call_out(args, stdout=PIPE, **kwargs)[0]
OLDNEW
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698