Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: subprocess2.py

Issue 8374026: Add callback support for stdout and stderr. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Cleanup test Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding=utf8 1 # coding=utf8
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 """Collection of subprocess wrapper functions. 5 """Collection of subprocess wrapper functions.
6 6
7 In theory you shouldn't need anything else in subprocess, or this module failed. 7 In theory you shouldn't need anything else in subprocess, or this module failed.
8 """ 8 """
9 9
10 from __future__ import with_statement 10 from __future__ import with_statement
11 import cStringIO
11 import errno 12 import errno
12 import logging 13 import logging
13 import os 14 import os
15 import Queue
14 import subprocess 16 import subprocess
15 import sys 17 import sys
16 import tempfile
17 import time 18 import time
18 import threading 19 import threading
19 20
20 # Constants forwarded from subprocess. 21 # Constants forwarded from subprocess.
21 PIPE = subprocess.PIPE 22 PIPE = subprocess.PIPE
22 STDOUT = subprocess.STDOUT 23 STDOUT = subprocess.STDOUT
23 # Sends stdout or stderr to os.devnull. 24 # Sends stdout or stderr to os.devnull.
24 VOID = object() 25 VOID = object()
25 # Error code when a process was killed because it timed out. 26 # Error code when a process was killed because it timed out.
26 TIMED_OUT = -2001 27 TIMED_OUT = -2001
(...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after
169 logging.debug(tmp_str) 170 logging.debug(tmp_str)
170 171
171 def fix(stream): 172 def fix(stream):
172 if kwargs.get(stream) in (VOID, os.devnull): 173 if kwargs.get(stream) in (VOID, os.devnull):
173 # Replaces VOID with handle to /dev/null. 174 # Replaces VOID with handle to /dev/null.
174 # Create a temporary file to workaround python's deadlock. 175 # Create a temporary file to workaround python's deadlock.
175 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait 176 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
176 # When the pipe fills up, it will deadlock this process. Using a real file 177 # When the pipe fills up, it will deadlock this process. Using a real file
177 # works around that issue. 178 # works around that issue.
178 kwargs[stream] = open(os.devnull, 'w') 179 kwargs[stream] = open(os.devnull, 'w')
180 if callable(kwargs.get(stream)):
181 # Callable stdout/stderr should be used only with call() wrappers.
182 kwargs[stream] = PIPE
179 183
180 fix('stdout') 184 fix('stdout')
181 fix('stderr') 185 fix('stderr')
182 186
183 try: 187 try:
184 return subprocess.Popen(args, **kwargs) 188 return subprocess.Popen(args, **kwargs)
185 except OSError, e: 189 except OSError, e:
186 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': 190 if e.errno == errno.EAGAIN and sys.platform == 'cygwin':
187 # Convert fork() emulation failure into a CygwinRebaseError(). 191 # Convert fork() emulation failure into a CygwinRebaseError().
188 raise CygwinRebaseError( 192 raise CygwinRebaseError(
189 e.errno, 193 e.errno,
190 args, 194 args,
191 kwargs.get('cwd'), 195 kwargs.get('cwd'),
192 None, 196 None,
193 'Visit ' 197 'Visit '
194 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure to ' 198 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure to '
195 'learn how to fix this error; you need to rebase your cygwin dlls') 199 'learn how to fix this error; you need to rebase your cygwin dlls')
196 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go 200 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go
197 # through 201 # through
198 raise 202 raise
199 203
200 204
205 def _queue_pipe_read(pipe, name, done, dest):
206 """Queue characters read from a pipe into a queue.
207
208 Left outside the _tee_threads function to not introduce a function closure
209 to speed up variable lookup.
210 """
211 while not done.isSet():
212 data = pipe.read(1)
213 if not data:
214 break
215 dest.put((name, data))
Dirk Pranke 2011/11/04 01:15:11 This timeout doesn't really work if the pipes are
M-A Ruel 2011/11/04 01:41:48 If the process is killed, the pipes are closed, wh
216 dest.put(name)
217
218
219 def _tee_threads(proc, timeout, start, stdin, args, kwargs):
220 """Does I/O for a process's pipes using thread.
221
222 It's the simplest and slowest implementation. Expect very slow behavior.
223
224 If there is a callback and it doesn't keep up with the calls, the timeout
225 effectiveness will be delayed accordingly.
226 """
227 # TODO(maruel): Implement a select based implementation on POSIX and a Windows
228 # one using WaitForMultipleObjects().
229 #
230 # Queue of either of <threadname> when done or (<threadname>, data).
231 # In theory we would like to limit to ~64kb items to not cause large memory
232 # usage when the callback blocks. It is not done because it slows down
233 # processing on OSX10.6 by a factor of 2x, making it even slower than Windows!
234 # Revisit this decision if it becomes a problem, e.g. crash because of
235 # memory exhaustion.
236 queue = Queue.Queue()
237 done = threading.Event()
238
239 def write_stdin():
240 stdin_io = cStringIO.StringIO(stdin)
241 while not done.isSet():
242 data = stdin_io.read(1024)
243 if data:
244 proc.stdin.write(data)
245 else:
246 proc.stdin.close()
247 break
248 queue.put('stdin')
249
250 def timeout_fn():
251 done.wait(timeout)
252 # No need to close the pipes since killing should be sufficient.
253 queue.put('timeout')
254
255 # Starts up to 4 threads:
256 # Read stdout
257 # Read stderr
258 # Write stdin
259 # Timeout
260 threads = {}
261 if timeout is not None:
262 threads['timeout'] = threading.Thread(target=timeout_fn)
263 if callable(kwargs.get('stdout')):
264 threads['stdout'] = threading.Thread(
265 target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue))
266 if callable(kwargs.get('stderr')):
267 threads['stderr'] = threading.Thread(
268 target=_queue_pipe_read,
269 args=(proc.stderr, 'stderr', done, queue))
270 if isinstance(stdin, str):
271 threads['stdin'] = threading.Thread(target=write_stdin)
272 for t in threads.itervalues():
273 t.daemon = True
274 t.start()
275
276 timed_out = False
277 try:
278 while proc.returncode is None:
279 assert threads
280 proc.poll()
281 item = queue.get()
282 if isinstance(item, str):
283 threads[item].join()
284 del threads[item]
285 if item == 'timeout' and not timed_out and proc.poll() is None:
286 logging.debug('Timed out: killing')
287 proc.kill()
288 timed_out = True
289 if not threads:
290 # We won't be waken up anymore. Need to busy loop.
291 break
292 else:
293 kwargs[item[0]](item[1])
294 finally:
295 # Stop the threads.
296 done.set()
297 # Join threads
298 for thread in threads.itervalues():
299 thread.join()
300
301 # Flush the queue.
302 try:
303 while True:
304 item = queue.get(False)
305 if isinstance(item, str):
306 if item == 'timeout':
307 # TODO(maruel): Does it make sense at that point?
308 if not timed_out and proc.poll() is None:
309 logging.debug('Timed out: killing')
310 proc.kill()
311 timed_out = True
312 else:
313 kwargs[item[0]](item[1])
314 except Queue.Empty:
315 pass
316
317 # Get the remainder.
318 if callable(kwargs.get('stdout')):
319 data = proc.stdout.read()
320 while data:
321 kwargs['stdout'](data)
322 data = proc.stdout.read()
323 if callable(kwargs.get('stderr')):
324 data = proc.stderr.read()
325 while data:
326 kwargs['stderr'](data)
327 data = proc.stderr.read()
328
329 if proc.returncode is None:
330 # Usually happens when killed with timeout but not listening to pipes.
331 proc.wait()
332
333 if timed_out:
334 return TIMED_OUT
335
336 return proc.returncode
337
338
201 def communicate(args, timeout=None, **kwargs): 339 def communicate(args, timeout=None, **kwargs):
202 """Wraps subprocess.Popen().communicate(). 340 """Wraps subprocess.Popen().communicate().
203 341
204 Returns ((stdout, stderr), returncode). 342 Returns ((stdout, stderr), returncode).
205 343
206 - The process will be killed after |timeout| seconds and returncode set to 344 - The process will be killed after |timeout| seconds and returncode set to
207 TIMED_OUT. 345 TIMED_OUT.
208 - Automatically passes stdin content as input so do not specify stdin=PIPE. 346 - Automatically passes stdin content as input so do not specify stdin=PIPE.
209 """ 347 """
348 if timeout and kwargs.get('shell'):
349 raise TypeError(
350 'Using timeout and shell simultaneously will cause a process leak '
351 'since the shell will be killed instead of the child process.')
352
210 stdin = kwargs.pop('stdin', None) 353 stdin = kwargs.pop('stdin', None)
211 if stdin is not None: 354 if stdin is not None:
212 if stdin is VOID: 355 if stdin is VOID:
213 kwargs['stdin'] = open(os.devnull, 'r') 356 kwargs['stdin'] = open(os.devnull, 'r')
214 stdin = None 357 stdin = None
215 else: 358 else:
216 assert isinstance(stdin, basestring) 359 assert isinstance(stdin, basestring)
217 # When stdin is passed as an argument, use it as the actual input data and 360 # When stdin is passed as an argument, use it as the actual input data and
218 # set the Popen() parameter accordingly. 361 # set the Popen() parameter accordingly.
219 kwargs['stdin'] = PIPE 362 kwargs['stdin'] = PIPE
220 363
221 if not timeout: 364 start = time.time()
365 proc = Popen(args, **kwargs)
366 need_buffering = (timeout or
367 callable(kwargs.get('stdout')) or callable(kwargs.get('stderr')))
368
369 if not need_buffering:
222 # Normal workflow. 370 # Normal workflow.
223 proc = Popen(args, **kwargs) 371 if stdin not in (None, VOID):
224 if stdin is not None:
225 return proc.communicate(stdin), proc.returncode 372 return proc.communicate(stdin), proc.returncode
226 else: 373 else:
227 return proc.communicate(), proc.returncode 374 return proc.communicate(), proc.returncode
228 375
229 # Create a temporary file to workaround python's deadlock. 376 stdout = None
377 stderr = None
378 # Convert to a lambda to workaround python's deadlock.
230 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait 379 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
231 # When the pipe fills up, it will deadlock this process. Using a real file 380 # When the pipe fills up, it will deadlock this process. Using a thread
232 # works around that issue. 381 # works around that issue. No need for thread safe function since the call
233 with tempfile.TemporaryFile() as buff: 382 # backs are guaranteed to be called from the main thread.
234 start = time.time() 383 if kwargs.get('stdout') == PIPE:
235 kwargs['stdout'] = buff 384 stdout = []
236 proc = Popen(args, **kwargs) 385 kwargs['stdout'] = stdout.append
237 if stdin is not None: 386 if kwargs.get('stderr') == PIPE:
238 proc.stdin.write(stdin) 387 stderr = []
239 while proc.returncode is None: 388 kwargs['stderr'] = stderr.append
240 proc.poll() 389 returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs)
241 if timeout and (time.time() - start) > timeout: 390 if not stdout is None:
242 proc.kill() 391 stdout = ''.join(stdout)
243 proc.wait() 392 if not stderr is None:
244 # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. 393 stderr = ''.join(stderr)
245 proc.returncode = TIMED_OUT 394 return (stdout, stderr), returncode
246 time.sleep(0.001)
247 # Now that the process died, reset the cursor and read the file.
248 buff.seek(0)
249 out = [buff.read(), None]
250 return out, proc.returncode
251 395
252 396
253 def call(args, **kwargs): 397 def call(args, **kwargs):
254 """Emulates subprocess.call(). 398 """Emulates subprocess.call().
255 399
256 Automatically convert stdout=PIPE or stderr=PIPE to VOID. 400 Automatically convert stdout=PIPE or stderr=PIPE to VOID.
257 In no case they can be returned since no code path raises 401 In no case they can be returned since no code path raises
258 subprocess2.CalledProcessError. 402 subprocess2.CalledProcessError.
259 """ 403 """
260 if kwargs.get('stdout') == PIPE: 404 if kwargs.get('stdout') == PIPE:
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
301 445
302 Captures stdout of a process call and returns stdout only. 446 Captures stdout of a process call and returns stdout only.
303 447
304 - Throws if return code is not 0. 448 - Throws if return code is not 0.
305 - Works even prior to python 2.7. 449 - Works even prior to python 2.7.
306 - Blocks stdin by default if not specified since no output will be visible. 450 - Blocks stdin by default if not specified since no output will be visible.
307 - As per doc, "The stdout argument is not allowed as it is used internally." 451 - As per doc, "The stdout argument is not allowed as it is used internally."
308 """ 452 """
309 kwargs.setdefault('stdin', VOID) 453 kwargs.setdefault('stdin', VOID)
310 return check_call_out(args, stdout=PIPE, **kwargs)[0] 454 return check_call_out(args, stdout=PIPE, **kwargs)[0]
OLDNEW
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698