OLD | NEW |
1 # coding=utf8 | 1 # coding=utf8 |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 """Collection of subprocess wrapper functions. | 5 """Collection of subprocess wrapper functions. |
6 | 6 |
7 In theory you shouldn't need anything else in subprocess, or this module failed. | 7 In theory you shouldn't need anything else in subprocess, or this module failed. |
8 """ | 8 """ |
9 | 9 |
10 from __future__ import with_statement | 10 from __future__ import with_statement |
11 import cStringIO | |
12 import errno | 11 import errno |
13 import logging | 12 import logging |
14 import os | 13 import os |
15 import Queue | |
16 import select | |
17 import subprocess | 14 import subprocess |
18 import sys | 15 import sys |
| 16 import tempfile |
19 import time | 17 import time |
20 import threading | 18 import threading |
21 | 19 |
22 if sys.platform != 'win32': | |
23 import fcntl | |
24 | |
25 | |
26 # Constants forwarded from subprocess. | 20 # Constants forwarded from subprocess. |
27 PIPE = subprocess.PIPE | 21 PIPE = subprocess.PIPE |
28 STDOUT = subprocess.STDOUT | 22 STDOUT = subprocess.STDOUT |
29 # Sends stdout or stderr to os.devnull. | 23 # Sends stdout or stderr to os.devnull. |
30 VOID = object() | 24 VOID = object() |
31 # Error code when a process was killed because it timed out. | 25 # Error code when a process was killed because it timed out. |
32 TIMED_OUT = -2001 | 26 TIMED_OUT = -2001 |
33 | 27 |
34 # Globals. | 28 # Globals. |
35 # Set to True if you somehow need to disable this hack. | 29 # Set to True if you somehow need to disable this hack. |
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
175 logging.debug(tmp_str) | 169 logging.debug(tmp_str) |
176 | 170 |
177 def fix(stream): | 171 def fix(stream): |
178 if kwargs.get(stream) in (VOID, os.devnull): | 172 if kwargs.get(stream) in (VOID, os.devnull): |
179 # Replaces VOID with handle to /dev/null. | 173 # Replaces VOID with handle to /dev/null. |
180 # Create a temporary file to workaround python's deadlock. | 174 # Create a temporary file to workaround python's deadlock. |
181 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | 175 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
182 # When the pipe fills up, it will deadlock this process. Using a real file | 176 # When the pipe fills up, it will deadlock this process. Using a real file |
183 # works around that issue. | 177 # works around that issue. |
184 kwargs[stream] = open(os.devnull, 'w') | 178 kwargs[stream] = open(os.devnull, 'w') |
185 if callable(kwargs.get(stream)): | |
186 # Callable stdout/stderr should be used only with call() wrappers. | |
187 kwargs[stream] = PIPE | |
188 | 179 |
189 fix('stdout') | 180 fix('stdout') |
190 fix('stderr') | 181 fix('stderr') |
191 | 182 |
192 try: | 183 try: |
193 return subprocess.Popen(args, **kwargs) | 184 return subprocess.Popen(args, **kwargs) |
194 except OSError, e: | 185 except OSError, e: |
195 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': | 186 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': |
196 # Convert fork() emulation failure into a CygwinRebaseError(). | 187 # Convert fork() emulation failure into a CygwinRebaseError(). |
197 raise CygwinRebaseError( | 188 raise CygwinRebaseError( |
198 e.errno, | 189 e.errno, |
199 args, | 190 args, |
200 kwargs.get('cwd'), | 191 kwargs.get('cwd'), |
201 None, | 192 None, |
202 'Visit ' | 193 'Visit ' |
203 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure to ' | 194 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure to ' |
204 'learn how to fix this error; you need to rebase your cygwin dlls') | 195 'learn how to fix this error; you need to rebase your cygwin dlls') |
205 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go | 196 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go |
206 # through | 197 # through |
207 raise | 198 raise |
208 | 199 |
209 | 200 |
210 def _queue_pipe_read(pipe, name, done, dest): | |
211 """Queues characters read from a pipe into a queue. | |
212 | |
213 Left outside the _tee_threads function to not introduce a function closure | |
214 to speed up variable lookup. | |
215 """ | |
216 while not done.isSet(): | |
217 data = pipe.read(1) | |
218 if not data: | |
219 break | |
220 dest.put((name, data)) | |
221 dest.put(name) | |
222 | |
223 | |
224 def _tee_threads(proc, timeout, start, stdin, args, kwargs): | |
225 """Does I/O for a process's pipes using thread. | |
226 | |
227 It's the simplest and slowest implementation. Expect very slow behavior. | |
228 | |
229 If there is a callback and it doesn't keep up with the calls, the timeout | |
230 effectiveness will be delayed accordingly. | |
231 """ | |
232 # TODO(maruel): Implement a select based implementation on POSIX and a Windows | |
233 # one using WaitForMultipleObjects(). | |
234 # | |
235 # Queue of either of <threadname> when done or (<threadname>, data). | |
236 # In theory we would like to limit to ~64kb items to not cause large memory | |
237 # usage when the callback blocks. It is not done because it slows down | |
238 # processing on OSX10.6 by a factor of 2x, making it even slower than Windows! | |
239 # Revisit this decision if it becomes a problem, e.g. crash because of | |
240 # memory exhaustion. | |
241 queue = Queue.Queue() | |
242 done = threading.Event() | |
243 | |
244 def write_stdin(): | |
245 stdin_io = cStringIO.StringIO(stdin) | |
246 while not done.isSet(): | |
247 data = stdin_io.read(1024) | |
248 if data: | |
249 proc.stdin.write(data) | |
250 else: | |
251 proc.stdin.close() | |
252 break | |
253 queue.put('stdin') | |
254 | |
255 def timeout_fn(): | |
256 done.wait(timeout) | |
257 # No need to close the pipes since killing should be sufficient. | |
258 queue.put('timeout') | |
259 | |
260 # Starts up to 4 threads: | |
261 # Read stdout | |
262 # Read stderr | |
263 # Write stdin | |
264 # Timeout | |
265 threads = {} | |
266 if timeout is not None: | |
267 threads['timeout'] = threading.Thread(target=timeout_fn) | |
268 if callable(kwargs.get('stdout')): | |
269 threads['stdout'] = threading.Thread( | |
270 target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue)) | |
271 if callable(kwargs.get('stderr')): | |
272 threads['stderr'] = threading.Thread( | |
273 target=_queue_pipe_read, | |
274 args=(proc.stderr, 'stderr', done, queue)) | |
275 if isinstance(stdin, str): | |
276 threads['stdin'] = threading.Thread(target=write_stdin) | |
277 for t in threads.itervalues(): | |
278 t.daemon = True | |
279 t.start() | |
280 | |
281 timed_out = False | |
282 try: | |
283 while proc.returncode is None: | |
284 assert threads | |
285 proc.poll() | |
286 item = queue.get() | |
287 if isinstance(item, str): | |
288 threads[item].join() | |
289 del threads[item] | |
290 if item == 'timeout' and not timed_out and proc.poll() is None: | |
291 logging.debug('Timed out: killing') | |
292 proc.kill() | |
293 timed_out = True | |
294 if not threads: | |
295 # We won't be waken up anymore. Need to busy loop. | |
296 break | |
297 else: | |
298 kwargs[item[0]](item[1]) | |
299 finally: | |
300 # Stop the threads. | |
301 done.set() | |
302 # Join threads | |
303 for thread in threads.itervalues(): | |
304 thread.join() | |
305 | |
306 # Flush the queue. | |
307 try: | |
308 while True: | |
309 item = queue.get(False) | |
310 if isinstance(item, str): | |
311 if item == 'timeout': | |
312 # TODO(maruel): Does it make sense at that point? | |
313 if not timed_out and proc.poll() is None: | |
314 logging.debug('Timed out: killing') | |
315 proc.kill() | |
316 timed_out = True | |
317 else: | |
318 kwargs[item[0]](item[1]) | |
319 except Queue.Empty: | |
320 pass | |
321 | |
322 # Get the remainder. | |
323 if callable(kwargs.get('stdout')): | |
324 data = proc.stdout.read() | |
325 while data: | |
326 kwargs['stdout'](data) | |
327 data = proc.stdout.read() | |
328 if callable(kwargs.get('stderr')): | |
329 data = proc.stderr.read() | |
330 while data: | |
331 kwargs['stderr'](data) | |
332 data = proc.stderr.read() | |
333 | |
334 if proc.returncode is None: | |
335 # Usually happens when killed with timeout but not listening to pipes. | |
336 proc.wait() | |
337 | |
338 if timed_out: | |
339 return TIMED_OUT | |
340 | |
341 return proc.returncode | |
342 | |
343 | |
344 def _read_pipe(handles, pipe, out_fn): | |
345 """Reads bytes from a pipe and calls the output callback.""" | |
346 data = pipe.read() | |
347 if not data: | |
348 del handles[pipe] | |
349 else: | |
350 out_fn(data) | |
351 | |
352 | |
353 def _tee_posix(proc, timeout, start, stdin, args, kwargs): | |
354 """Polls a process and its pipe using select.select(). | |
355 | |
356 TODO(maruel): Implement a non-polling method for OSes that support it. | |
357 """ | |
358 handles_r = {} | |
359 if callable(kwargs.get('stdout')): | |
360 handles_r[proc.stdout] = lambda: _read_pipe( | |
361 handles_r, proc.stdout, kwargs['stdout']) | |
362 if callable(kwargs.get('stderr')): | |
363 handles_r[proc.stderr] = lambda: _read_pipe( | |
364 handles_r, proc.stderr, kwargs['stderr']) | |
365 | |
366 handles_w = {} | |
367 if isinstance(stdin, str): | |
368 stdin_io = cStringIO.StringIO(stdin) | |
369 def write_stdin(): | |
370 data = stdin_io.read(1) | |
371 if data: | |
372 proc.stdin.write(data) | |
373 else: | |
374 del handles_w[proc.stdin] | |
375 proc.stdin.close() | |
376 handles_w[proc.stdin] = write_stdin | |
377 else: | |
378 # TODO(maruel): Fix me, it could be VOID. | |
379 assert stdin is None | |
380 | |
381 # Make all the file objects of the child process non-blocking file. | |
382 # TODO(maruel): Test if a pipe is handed to the child process. | |
383 for pipe in (proc.stdin, proc.stdout, proc.stderr): | |
384 fileno = pipe and getattr(pipe, 'fileno', lambda: None)() | |
385 if fileno: | |
386 # Note: making a pipe non-blocking means the C stdio could act wrong. In | |
387 # particular, readline() cannot be used. Work around is to use os.read(). | |
388 fl = fcntl.fcntl(fileno, fcntl.F_GETFL) | |
389 fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK) | |
390 | |
391 timed_out = False | |
392 while handles_r or handles_w or (timeout and proc.poll() is None): | |
393 period = None | |
394 if timeout: | |
395 period = max(0, timeout - (time.time() - start)) | |
396 if not period and not timed_out: | |
397 proc.kill() | |
398 timed_out = True | |
399 if timed_out: | |
400 period = 0.001 | |
401 | |
402 # It reconstructs objects on each loop, not very efficient. | |
403 reads, writes, _, = select.select( | |
404 handles_r.keys(), handles_w.keys(), [], period) | |
405 for read in reads: | |
406 handles_r[read]() | |
407 for write in writes: | |
408 handles_w[write]() | |
409 | |
410 # No pipe open anymore and if there was a time out, the child process was | |
411 # killed already. | |
412 proc.wait() | |
413 if timed_out: | |
414 return TIMED_OUT | |
415 return proc.returncode | |
416 | |
417 | |
418 def communicate(args, timeout=None, **kwargs): | 201 def communicate(args, timeout=None, **kwargs): |
419 """Wraps subprocess.Popen().communicate(). | 202 """Wraps subprocess.Popen().communicate(). |
420 | 203 |
421 Returns ((stdout, stderr), returncode). | 204 Returns ((stdout, stderr), returncode). |
422 | 205 |
423 - The process will be killed after |timeout| seconds and returncode set to | 206 - The process will be killed after |timeout| seconds and returncode set to |
424 TIMED_OUT. | 207 TIMED_OUT. |
425 - Automatically passes stdin content as input so do not specify stdin=PIPE. | 208 - Automatically passes stdin content as input so do not specify stdin=PIPE. |
426 """ | 209 """ |
427 if timeout and kwargs.get('shell'): | |
428 raise TypeError( | |
429 'Using timeout and shell simultaneously will cause a process leak ' | |
430 'since the shell will be killed instead of the child process.') | |
431 | |
432 stdin = kwargs.pop('stdin', None) | 210 stdin = kwargs.pop('stdin', None) |
433 if stdin is not None: | 211 if stdin is not None: |
434 if stdin is VOID: | 212 if stdin is VOID: |
435 kwargs['stdin'] = open(os.devnull, 'r') | 213 kwargs['stdin'] = open(os.devnull, 'r') |
436 stdin = None | 214 stdin = None |
437 else: | 215 else: |
438 assert isinstance(stdin, basestring) | 216 assert isinstance(stdin, basestring) |
439 # When stdin is passed as an argument, use it as the actual input data and | 217 # When stdin is passed as an argument, use it as the actual input data and |
440 # set the Popen() parameter accordingly. | 218 # set the Popen() parameter accordingly. |
441 kwargs['stdin'] = PIPE | 219 kwargs['stdin'] = PIPE |
442 | 220 |
443 start = time.time() | 221 if not timeout: |
444 proc = Popen(args, **kwargs) | |
445 need_buffering = (timeout or | |
446 callable(kwargs.get('stdout')) or callable(kwargs.get('stderr'))) | |
447 | |
448 if not need_buffering: | |
449 # Normal workflow. | 222 # Normal workflow. |
450 if stdin not in (None, VOID): | 223 proc = Popen(args, **kwargs) |
| 224 if stdin is not None: |
451 return proc.communicate(stdin), proc.returncode | 225 return proc.communicate(stdin), proc.returncode |
452 else: | 226 else: |
453 return proc.communicate(), proc.returncode | 227 return proc.communicate(), proc.returncode |
454 | 228 |
455 stdout = None | 229 # Create a temporary file to workaround python's deadlock. |
456 stderr = None | |
457 # Convert to a lambda to workaround python's deadlock. | |
458 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | 230 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
459 # When the pipe fills up, it will deadlock this process. Using a thread | 231 # When the pipe fills up, it will deadlock this process. Using a real file |
460 # works around that issue. No need for thread safe function since the call | 232 # works around that issue. |
461 # backs are guaranteed to be called from the main thread. | 233 with tempfile.TemporaryFile() as buff: |
462 if kwargs.get('stdout') == PIPE: | 234 start = time.time() |
463 stdout = [] | 235 kwargs['stdout'] = buff |
464 kwargs['stdout'] = stdout.append | 236 proc = Popen(args, **kwargs) |
465 if kwargs.get('stderr') == PIPE: | 237 if stdin is not None: |
466 stderr = [] | 238 proc.stdin.write(stdin) |
467 kwargs['stderr'] = stderr.append | 239 while proc.returncode is None: |
468 if sys.platform == 'win32': | 240 proc.poll() |
469 # On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE, | 241 if timeout and (time.time() - start) > timeout: |
470 # doesn't exist so _tee_win() cannot be used yet. | 242 proc.kill() |
471 returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) | 243 proc.wait() |
472 else: | 244 # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. |
473 returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs) | 245 proc.returncode = TIMED_OUT |
474 if not stdout is None: | 246 time.sleep(0.001) |
475 stdout = ''.join(stdout) | 247 # Now that the process died, reset the cursor and read the file. |
476 if not stderr is None: | 248 buff.seek(0) |
477 stderr = ''.join(stderr) | 249 out = [buff.read(), None] |
478 return (stdout, stderr), returncode | 250 return out, proc.returncode |
479 | 251 |
480 | 252 |
481 def call(args, **kwargs): | 253 def call(args, **kwargs): |
482 """Emulates subprocess.call(). | 254 """Emulates subprocess.call(). |
483 | 255 |
484 Automatically convert stdout=PIPE or stderr=PIPE to VOID. | 256 Automatically convert stdout=PIPE or stderr=PIPE to VOID. |
485 In no case they can be returned since no code path raises | 257 In no case they can be returned since no code path raises |
486 subprocess2.CalledProcessError. | 258 subprocess2.CalledProcessError. |
487 """ | 259 """ |
488 if kwargs.get('stdout') == PIPE: | 260 if kwargs.get('stdout') == PIPE: |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
529 | 301 |
530 Captures stdout of a process call and returns stdout only. | 302 Captures stdout of a process call and returns stdout only. |
531 | 303 |
532 - Throws if return code is not 0. | 304 - Throws if return code is not 0. |
533 - Works even prior to python 2.7. | 305 - Works even prior to python 2.7. |
534 - Blocks stdin by default if not specified since no output will be visible. | 306 - Blocks stdin by default if not specified since no output will be visible. |
535 - As per doc, "The stdout argument is not allowed as it is used internally." | 307 - As per doc, "The stdout argument is not allowed as it is used internally." |
536 """ | 308 """ |
537 kwargs.setdefault('stdin', VOID) | 309 kwargs.setdefault('stdin', VOID) |
538 return check_call_out(args, stdout=PIPE, **kwargs)[0] | 310 return check_call_out(args, stdout=PIPE, **kwargs)[0] |
OLD | NEW |