OLD | NEW |
---|---|
1 # coding=utf8 | 1 # coding=utf8 |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 """Collection of subprocess wrapper functions. | 5 """Collection of subprocess wrapper functions. |
6 | 6 |
7 In theory you shouldn't need anything else in subprocess, or this module failed. | 7 In theory you shouldn't need anything else in subprocess, or this module failed. |
8 """ | 8 """ |
9 | 9 |
10 from __future__ import with_statement | 10 from __future__ import with_statement |
11 import cStringIO | |
11 import errno | 12 import errno |
12 import logging | 13 import logging |
13 import os | 14 import os |
15 import Queue | |
14 import subprocess | 16 import subprocess |
15 import sys | 17 import sys |
16 import tempfile | |
17 import time | 18 import time |
18 import threading | 19 import threading |
19 | 20 |
20 # Constants forwarded from subprocess. | 21 # Constants forwarded from subprocess. |
21 PIPE = subprocess.PIPE | 22 PIPE = subprocess.PIPE |
22 STDOUT = subprocess.STDOUT | 23 STDOUT = subprocess.STDOUT |
23 # Sends stdout or stderr to os.devnull. | 24 # Sends stdout or stderr to os.devnull. |
24 VOID = object() | 25 VOID = object() |
25 # Error code when a process was killed because it timed out. | 26 # Error code when a process was killed because it timed out. |
26 TIMED_OUT = -2001 | 27 TIMED_OUT = -2001 |
(...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
169 logging.debug(tmp_str) | 170 logging.debug(tmp_str) |
170 | 171 |
171 def fix(stream): | 172 def fix(stream): |
172 if kwargs.get(stream) in (VOID, os.devnull): | 173 if kwargs.get(stream) in (VOID, os.devnull): |
173 # Replaces VOID with handle to /dev/null. | 174 # Replaces VOID with handle to /dev/null. |
174 # Create a temporary file to workaround python's deadlock. | 175 # Create a temporary file to workaround python's deadlock. |
175 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | 176 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
176 # When the pipe fills up, it will deadlock this process. Using a real file | 177 # When the pipe fills up, it will deadlock this process. Using a real file |
177 # works around that issue. | 178 # works around that issue. |
178 kwargs[stream] = open(os.devnull, 'w') | 179 kwargs[stream] = open(os.devnull, 'w') |
180 if callable(kwargs.get(stream)): | |
181 # Callable stdout/stderr should be used only with call() wrappers. | |
182 kwargs[stream] = PIPE | |
179 | 183 |
180 fix('stdout') | 184 fix('stdout') |
181 fix('stderr') | 185 fix('stderr') |
182 | 186 |
183 try: | 187 try: |
184 return subprocess.Popen(args, **kwargs) | 188 return subprocess.Popen(args, **kwargs) |
185 except OSError, e: | 189 except OSError, e: |
186 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': | 190 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': |
187 # Convert fork() emulation failure into a CygwinRebaseError(). | 191 # Convert fork() emulation failure into a CygwinRebaseError(). |
188 raise CygwinRebaseError( | 192 raise CygwinRebaseError( |
189 e.errno, | 193 e.errno, |
190 args, | 194 args, |
191 kwargs.get('cwd'), | 195 kwargs.get('cwd'), |
192 None, | 196 None, |
193 'Visit ' | 197 'Visit ' |
194 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure to ' | 198 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure to ' |
195 'learn how to fix this error; you need to rebase your cygwin dlls') | 199 'learn how to fix this error; you need to rebase your cygwin dlls') |
196 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go | 200 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go |
197 # through | 201 # through |
198 raise | 202 raise |
199 | 203 |
200 | 204 |
205 def _queue_pipe_read(pipe, name, done, dest): | |
206 """Queue characters read from a pipe into a queue. | |
207 | |
208 Left outside the _tee_threads function to not introduce a function closure | |
209 to speed up variable lookup. | |
210 """ | |
211 while not done.isSet(): | |
212 data = pipe.read(1) | |
213 if not data: | |
214 break | |
215 dest.put((name, data)) | |
Dirk Pranke
2011/11/04 01:15:11
This timeout doesn't really work if the pipes are
M-A Ruel
2011/11/04 01:41:48
If the process is killed, the pipes are closed, wh
| |
216 dest.put(name) | |
217 | |
218 | |
219 def _tee_threads(proc, timeout, start, stdin, args, kwargs): | |
220 """Does I/O for a process's pipes using thread. | |
221 | |
222 It's the simplest and slowest implementation. Expect very slow behavior. | |
223 | |
224 If there is a callback and it doesn't keep up with the calls, the timeout | |
225 effectiveness will be delayed accordingly. | |
226 """ | |
227 # TODO(maruel): Implement a select based implementation on POSIX and a Windows | |
228 # one using WaitForMultipleObjects(). | |
229 # | |
230 # Queue of either of <threadname> when done or (<threadname>, data). | |
231 # In theory we would like to limit to ~64kb items to not cause large memory | |
232 # usage when the callback blocks. It is not done because it slows down | |
233 # processing on OSX10.6 by a factor of 2x, making it even slower than Windows! | |
234 # Revisit this decision if it becomes a problem, e.g. crash because of | |
235 # memory exhaustion. | |
236 queue = Queue.Queue() | |
237 done = threading.Event() | |
238 | |
239 def write_stdin(): | |
240 stdin_io = cStringIO.StringIO(stdin) | |
241 while not done.isSet(): | |
242 data = stdin_io.read(1024) | |
243 if data: | |
244 proc.stdin.write(data) | |
245 else: | |
246 proc.stdin.close() | |
247 break | |
248 queue.put('stdin') | |
249 | |
250 def timeout_fn(): | |
251 done.wait(timeout) | |
252 # No need to close the pipes since killing should be sufficient. | |
253 queue.put('timeout') | |
254 | |
255 # Starts up to 4 threads: | |
256 # Read stdout | |
257 # Read stderr | |
258 # Write stdin | |
259 # Timeout | |
260 threads = {} | |
261 if timeout is not None: | |
262 threads['timeout'] = threading.Thread(target=timeout_fn) | |
263 if callable(kwargs.get('stdout')): | |
264 threads['stdout'] = threading.Thread( | |
265 target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue)) | |
266 if callable(kwargs.get('stderr')): | |
267 threads['stderr'] = threading.Thread( | |
268 target=_queue_pipe_read, | |
269 args=(proc.stderr, 'stderr', done, queue)) | |
270 if isinstance(stdin, str): | |
271 threads['stdin'] = threading.Thread(target=write_stdin) | |
272 for t in threads.itervalues(): | |
273 t.daemon = True | |
274 t.start() | |
275 | |
276 timed_out = False | |
277 try: | |
278 while proc.returncode is None: | |
279 assert threads | |
280 proc.poll() | |
281 item = queue.get() | |
282 if isinstance(item, str): | |
283 threads[item].join() | |
284 del threads[item] | |
285 if item == 'timeout' and not timed_out and proc.poll() is None: | |
286 logging.debug('Timed out: killing') | |
287 proc.kill() | |
288 timed_out = True | |
289 if not threads: | |
290 # We won't be waken up anymore. Need to busy loop. | |
291 break | |
292 else: | |
293 kwargs[item[0]](item[1]) | |
294 finally: | |
295 # Stop the threads. | |
296 done.set() | |
297 # Join threads | |
298 for thread in threads.itervalues(): | |
299 thread.join() | |
300 | |
301 # Flush the queue. | |
302 try: | |
303 while True: | |
304 item = queue.get(False) | |
305 if isinstance(item, str): | |
306 if item == 'timeout': | |
307 # TODO(maruel): Does it make sense at that point? | |
308 if not timed_out and proc.poll() is None: | |
309 logging.debug('Timed out: killing') | |
310 proc.kill() | |
311 timed_out = True | |
312 else: | |
313 kwargs[item[0]](item[1]) | |
314 except Queue.Empty: | |
315 pass | |
316 | |
317 # Get the remainder. | |
318 if callable(kwargs.get('stdout')): | |
319 data = proc.stdout.read() | |
320 while data: | |
321 kwargs['stdout'](data) | |
322 data = proc.stdout.read() | |
323 if callable(kwargs.get('stderr')): | |
324 data = proc.stderr.read() | |
325 while data: | |
326 kwargs['stderr'](data) | |
327 data = proc.stderr.read() | |
328 | |
329 if proc.returncode is None: | |
330 # Usually happens when killed with timeout but not listening to pipes. | |
331 proc.wait() | |
332 | |
333 if timed_out: | |
334 return TIMED_OUT | |
335 | |
336 return proc.returncode | |
337 | |
338 | |
201 def communicate(args, timeout=None, **kwargs): | 339 def communicate(args, timeout=None, **kwargs): |
202 """Wraps subprocess.Popen().communicate(). | 340 """Wraps subprocess.Popen().communicate(). |
203 | 341 |
204 Returns ((stdout, stderr), returncode). | 342 Returns ((stdout, stderr), returncode). |
205 | 343 |
206 - The process will be killed after |timeout| seconds and returncode set to | 344 - The process will be killed after |timeout| seconds and returncode set to |
207 TIMED_OUT. | 345 TIMED_OUT. |
208 - Automatically passes stdin content as input so do not specify stdin=PIPE. | 346 - Automatically passes stdin content as input so do not specify stdin=PIPE. |
209 """ | 347 """ |
348 if timeout and kwargs.get('shell'): | |
349 raise TypeError( | |
350 'Using timeout and shell simultaneously will cause a process leak ' | |
351 'since the shell will be killed instead of the child process.') | |
352 | |
210 stdin = kwargs.pop('stdin', None) | 353 stdin = kwargs.pop('stdin', None) |
211 if stdin is not None: | 354 if stdin is not None: |
212 if stdin is VOID: | 355 if stdin is VOID: |
213 kwargs['stdin'] = open(os.devnull, 'r') | 356 kwargs['stdin'] = open(os.devnull, 'r') |
214 stdin = None | 357 stdin = None |
215 else: | 358 else: |
216 assert isinstance(stdin, basestring) | 359 assert isinstance(stdin, basestring) |
217 # When stdin is passed as an argument, use it as the actual input data and | 360 # When stdin is passed as an argument, use it as the actual input data and |
218 # set the Popen() parameter accordingly. | 361 # set the Popen() parameter accordingly. |
219 kwargs['stdin'] = PIPE | 362 kwargs['stdin'] = PIPE |
220 | 363 |
221 if not timeout: | 364 start = time.time() |
365 proc = Popen(args, **kwargs) | |
366 need_buffering = (timeout or | |
367 callable(kwargs.get('stdout')) or callable(kwargs.get('stderr'))) | |
368 | |
369 if not need_buffering: | |
222 # Normal workflow. | 370 # Normal workflow. |
223 proc = Popen(args, **kwargs) | 371 if stdin not in (None, VOID): |
224 if stdin is not None: | |
225 return proc.communicate(stdin), proc.returncode | 372 return proc.communicate(stdin), proc.returncode |
226 else: | 373 else: |
227 return proc.communicate(), proc.returncode | 374 return proc.communicate(), proc.returncode |
228 | 375 |
229 # Create a temporary file to workaround python's deadlock. | 376 stdout = None |
377 stderr = None | |
378 # Convert to a lambda to workaround python's deadlock. | |
230 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | 379 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
231 # When the pipe fills up, it will deadlock this process. Using a real file | 380 # When the pipe fills up, it will deadlock this process. Using a thread |
232 # works around that issue. | 381 # works around that issue. No need for thread safe function since the call |
233 with tempfile.TemporaryFile() as buff: | 382 # backs are guaranteed to be called from the main thread. |
234 start = time.time() | 383 if kwargs.get('stdout') == PIPE: |
235 kwargs['stdout'] = buff | 384 stdout = [] |
236 proc = Popen(args, **kwargs) | 385 kwargs['stdout'] = stdout.append |
237 if stdin is not None: | 386 if kwargs.get('stderr') == PIPE: |
238 proc.stdin.write(stdin) | 387 stderr = [] |
239 while proc.returncode is None: | 388 kwargs['stderr'] = stderr.append |
240 proc.poll() | 389 returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) |
241 if timeout and (time.time() - start) > timeout: | 390 if not stdout is None: |
242 proc.kill() | 391 stdout = ''.join(stdout) |
243 proc.wait() | 392 if not stderr is None: |
244 # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. | 393 stderr = ''.join(stderr) |
245 proc.returncode = TIMED_OUT | 394 return (stdout, stderr), returncode |
246 time.sleep(0.001) | |
247 # Now that the process died, reset the cursor and read the file. | |
248 buff.seek(0) | |
249 out = [buff.read(), None] | |
250 return out, proc.returncode | |
251 | 395 |
252 | 396 |
253 def call(args, **kwargs): | 397 def call(args, **kwargs): |
254 """Emulates subprocess.call(). | 398 """Emulates subprocess.call(). |
255 | 399 |
256 Automatically convert stdout=PIPE or stderr=PIPE to VOID. | 400 Automatically convert stdout=PIPE or stderr=PIPE to VOID. |
257 In no case they can be returned since no code path raises | 401 In no case they can be returned since no code path raises |
258 subprocess2.CalledProcessError. | 402 subprocess2.CalledProcessError. |
259 """ | 403 """ |
260 if kwargs.get('stdout') == PIPE: | 404 if kwargs.get('stdout') == PIPE: |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
301 | 445 |
302 Captures stdout of a process call and returns stdout only. | 446 Captures stdout of a process call and returns stdout only. |
303 | 447 |
304 - Throws if return code is not 0. | 448 - Throws if return code is not 0. |
305 - Works even prior to python 2.7. | 449 - Works even prior to python 2.7. |
306 - Blocks stdin by default if not specified since no output will be visible. | 450 - Blocks stdin by default if not specified since no output will be visible. |
307 - As per doc, "The stdout argument is not allowed as it is used internally." | 451 - As per doc, "The stdout argument is not allowed as it is used internally." |
308 """ | 452 """ |
309 kwargs.setdefault('stdin', VOID) | 453 kwargs.setdefault('stdin', VOID) |
310 return check_call_out(args, stdout=PIPE, **kwargs)[0] | 454 return check_call_out(args, stdout=PIPE, **kwargs)[0] |
OLD | NEW |