OLD | NEW |
---|---|
1 # coding=utf8 | 1 # coding=utf8 |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 """Collection of subprocess wrapper functions. | 5 """Collection of subprocess wrapper functions. |
6 | 6 |
7 In theory you shouldn't need anything else in subprocess, or this module failed. | 7 In theory you shouldn't need anything else in subprocess, or this module failed. |
8 """ | 8 """ |
9 | 9 |
10 from __future__ import with_statement | 10 from __future__ import with_statement |
11 import cStringIO | |
11 import errno | 12 import errno |
12 import logging | 13 import logging |
13 import os | 14 import os |
15 import Queue | |
14 import subprocess | 16 import subprocess |
15 import sys | 17 import sys |
16 import tempfile | |
17 import time | 18 import time |
18 import threading | 19 import threading |
19 | 20 |
20 | 21 |
21 # Constants forwarded from subprocess. | 22 # Constants forwarded from subprocess. |
22 PIPE = subprocess.PIPE | 23 PIPE = subprocess.PIPE |
23 STDOUT = subprocess.STDOUT | 24 STDOUT = subprocess.STDOUT |
24 # Sends stdout or stderr to os.devnull. | 25 # Sends stdout or stderr to os.devnull. |
25 VOID = object() | 26 VOID = object() |
26 # Error code when a process was killed because it timed out. | 27 # Error code when a process was killed because it timed out. |
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
163 if isinstance(args, basestring): | 164 if isinstance(args, basestring): |
164 tmp_str = args | 165 tmp_str = args |
165 elif isinstance(args, (list, tuple)): | 166 elif isinstance(args, (list, tuple)): |
166 tmp_str = ' '.join(args) | 167 tmp_str = ' '.join(args) |
167 else: | 168 else: |
168 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None) | 169 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None) |
169 if kwargs.get('cwd', None): | 170 if kwargs.get('cwd', None): |
170 tmp_str += '; cwd=%s' % kwargs['cwd'] | 171 tmp_str += '; cwd=%s' % kwargs['cwd'] |
171 logging.debug(tmp_str) | 172 logging.debug(tmp_str) |
172 | 173 |
174 self.stdout_cb = None | |
175 self.stderr_cb = None | |
176 self.stdout_void = False | |
177 self.stderr_void = False | |
173 def fix(stream): | 178 def fix(stream): |
174 if kwargs.get(stream) in (VOID, os.devnull): | 179 if kwargs.get(stream) in (VOID, os.devnull): |
175 # Replaces VOID with handle to /dev/null. | 180 # Replaces VOID with handle to /dev/null. |
176 # Create a temporary file to workaround python's deadlock. | 181 # Create a temporary file to workaround python's deadlock. |
177 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | 182 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
178 # When the pipe fills up, it will deadlock this process. Using a real | 183 # When the pipe fills up, it will deadlock this process. Using a real |
179 # file works around that issue. | 184 # file works around that issue. |
180 kwargs[stream] = open(os.devnull, 'w') | 185 kwargs[stream] = open(os.devnull, 'w') |
186 setattr(self, stream + '_void', True) | |
187 if callable(kwargs.get(stream)): | |
188 # Callable stdout/stderr should be used only with call() wrappers. | |
189 setattr(self, stream + '_cb', kwargs[stream]) | |
190 kwargs[stream] = PIPE | |
181 | 191 |
182 fix('stdout') | 192 fix('stdout') |
183 fix('stderr') | 193 fix('stderr') |
184 | 194 |
185 self.start = time.time() | 195 self.start = time.time() |
196 self.timeout = None | |
186 self.shell = kwargs.get('shell', None) | 197 self.shell = kwargs.get('shell', None) |
187 | 198 |
188 try: | 199 try: |
189 super(Popen, self).__init__(args, **kwargs) | 200 super(Popen, self).__init__(args, **kwargs) |
190 except OSError, e: | 201 except OSError, e: |
191 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': | 202 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': |
192 # Convert fork() emulation failure into a CygwinRebaseError(). | 203 # Convert fork() emulation failure into a CygwinRebaseError(). |
193 raise CygwinRebaseError( | 204 raise CygwinRebaseError( |
194 e.errno, | 205 e.errno, |
195 args, | 206 args, |
196 kwargs.get('cwd'), | 207 kwargs.get('cwd'), |
197 None, | 208 None, |
198 'Visit ' | 209 'Visit ' |
199 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure ' | 210 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure ' |
200 'to learn how to fix this error; you need to rebase your cygwin ' | 211 'to learn how to fix this error; you need to rebase your cygwin ' |
201 'dlls') | 212 'dlls') |
202 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go | 213 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go |
203 # through | 214 # through |
204 raise | 215 raise |
205 | 216 |
217 def _tee_threads(self, input): # pylint: disable=W0622 | |
218 """Does I/O for a process's pipes using thread. | |
219 | |
220 It's the simplest and slowest implementation. Expect very slow behavior. | |
221 | |
222 If there is a callback and it doesn't keep up with the calls, the timeout | |
223 effectiveness will be delayed accordingly. | |
224 """ | |
225 # Queue of either of <threadname> when done or (<threadname>, data). In | |
226 # theory we would like to limit to ~64kb items to not cause large memory | |
227 # usage when the callback blocks. It is not done because it slows down | |
228 # processing on OSX10.6 by a factor of 2x, making it even slower than | |
229 # Windows! Revisit this decision if it becomes a problem, e.g. crash | |
230 # because of memory exhaustion. | |
231 queue = Queue.Queue() | |
232 done = threading.Event() | |
233 | |
234 def write_stdin(): | |
235 try: | |
236 stdin_io = cStringIO.StringIO(input) | |
237 while True: | |
238 data = stdin_io.read(1024) | |
239 if data: | |
240 self.stdin.write(data) | |
241 else: | |
242 self.stdin.close() | |
243 break | |
244 finally: | |
245 queue.put('stdin') | |
246 | |
247 def _queue_pipe_read(pipe, name): | |
248 """Queues characters read from a pipe into a queue.""" | |
249 try: | |
250 while True: | |
251 data = pipe.read(1) | |
252 if not data: | |
253 break | |
254 queue.put((name, data)) | |
255 finally: | |
256 queue.put(name) | |
257 | |
258 def timeout_fn(): | |
259 try: | |
260 done.wait(self.timeout) | |
261 finally: | |
262 queue.put('timeout') | |
263 | |
264 def wait_fn(): | |
265 try: | |
266 self.wait() | |
267 finally: | |
268 queue.put('wait') | |
269 | |
270 # Starts up to 5 threads: | |
271 # Wait for the process to quit | |
272 # Read stdout | |
273 # Read stderr | |
274 # Write stdin | |
275 # Timeout | |
276 threads = { | |
277 'wait': threading.Thread(target=wait_fn), | |
M-A Ruel
2011/11/30 20:44:51
By putting wait in its own thread, I don't need to
| |
278 } | |
279 if self.timeout is not None: | |
280 threads['timeout'] = threading.Thread(target=timeout_fn) | |
281 if self.stdout_cb: | |
282 threads['stdout'] = threading.Thread( | |
283 target=_queue_pipe_read, args=(self.stdout, 'stdout')) | |
284 if self.stderr_cb: | |
285 threads['stderr'] = threading.Thread( | |
286 target=_queue_pipe_read, args=(self.stderr, 'stderr')) | |
287 if input: | |
288 threads['stdin'] = threading.Thread(target=write_stdin) | |
289 for t in threads.itervalues(): | |
290 t.daemon = True | |
291 t.start() | |
292 | |
293 timed_out = False | |
294 try: | |
295 # This thread needs to be optimized for speed. | |
296 while threads: | |
297 item = queue.get() | |
298 if item[0] is 'stdout': | |
299 self.stdout_cb(item[1]) | |
300 elif item[0] is 'stderr': | |
301 self.stderr_cb(item[1]) | |
302 else: | |
303 # A thread terminated. | |
304 threads[item].join() | |
305 del threads[item] | |
306 if item == 'wait': | |
307 # Terminate the timeout thread if necessary. | |
308 done.set() | |
309 elif item == 'timeout' and not timed_out and self.poll() is None: | |
310 logging.debug('Timed out after %fs: killing' % self.timeout) | |
311 self.kill() | |
312 timed_out = True | |
313 finally: | |
314 # Stop the threads. | |
315 done.set() | |
316 if 'wait' in threads: | |
317 # Accelerate things, otherwise it would hang until the child process is | |
318 # done. | |
319 logging.debug('Killing child because of an exception') | |
320 self.kill() | |
321 # Join threads. | |
322 for thread in threads.itervalues(): | |
323 thread.join() | |
324 if timed_out: | |
325 self.returncode = TIMED_OUT | |
326 | |
327 def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622 | |
328 """Adds timeout and callbacks support. | |
329 | |
330 Returns (stdout, stderr) like subprocess.Popen().communicate(). | |
331 | |
332 - The process will be killed after |timeout| seconds and returncode set to | |
333 TIMED_OUT. | |
334 """ | |
335 self.timeout = timeout | |
336 if not self.timeout and not self.stdout_cb and not self.stderr_cb: | |
337 return super(Popen, self).communicate(input) | |
338 | |
339 if self.timeout and self.shell: | |
340 raise TypeError( | |
341 'Using timeout and shell simultaneously will cause a process leak ' | |
342 'since the shell will be killed instead of the child process.') | |
343 | |
344 stdout = None | |
345 stderr = None | |
346 # Convert to a lambda to workaround python's deadlock. | |
347 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | |
348 # When the pipe fills up, it will deadlock this process. Using a thread | |
349 # works around that issue. No need for thread safe function since the call | |
350 # backs are guaranteed to be called from the main thread. | |
351 if self.stdout and not self.stdout_cb and not self.stdout_void: | |
352 stdout = cStringIO.StringIO() | |
353 self.stdout_cb = stdout.write | |
354 if self.stderr and not self.stderr_cb and not self.stderr_void: | |
355 stderr = cStringIO.StringIO() | |
356 self.stderr_cb = stderr.write | |
357 self._tee_threads(input) | |
358 if stdout: | |
359 stdout = stdout.getvalue() | |
360 if stderr: | |
361 stderr = stderr.getvalue() | |
362 return (stdout, stderr) | |
363 | |
206 | 364 |
207 def communicate(args, timeout=None, **kwargs): | 365 def communicate(args, timeout=None, **kwargs): |
208 """Wraps subprocess.Popen().communicate() and add timeout support. | 366 """Wraps subprocess.Popen().communicate() and add timeout support. |
209 | 367 |
210 Returns ((stdout, stderr), returncode). | 368 Returns ((stdout, stderr), returncode). |
211 | 369 |
212 - The process will be killed after |timeout| seconds and returncode set to | 370 - The process will be killed after |timeout| seconds and returncode set to |
213 TIMED_OUT. | 371 TIMED_OUT. |
214 - Automatically passes stdin content as input so do not specify stdin=PIPE. | 372 - Automatically passes stdin content as input so do not specify stdin=PIPE. |
215 """ | 373 """ |
216 stdin = kwargs.pop('stdin', None) | 374 stdin = kwargs.pop('stdin', None) |
217 if stdin is not None: | 375 if stdin is not None: |
218 if stdin is VOID: | 376 if stdin is VOID: |
219 kwargs['stdin'] = open(os.devnull, 'r') | 377 kwargs['stdin'] = open(os.devnull, 'r') |
220 stdin = None | 378 stdin = None |
221 else: | 379 else: |
222 assert isinstance(stdin, basestring) | 380 assert isinstance(stdin, basestring) |
223 # When stdin is passed as an argument, use it as the actual input data and | 381 # When stdin is passed as an argument, use it as the actual input data and |
224 # set the Popen() parameter accordingly. | 382 # set the Popen() parameter accordingly. |
225 kwargs['stdin'] = PIPE | 383 kwargs['stdin'] = PIPE |
226 | 384 |
227 if not timeout: | 385 proc = Popen(args, **kwargs) |
228 # Normal workflow. | 386 if stdin not in (None, VOID): |
229 proc = Popen(args, **kwargs) | 387 return proc.communicate(stdin, timeout), proc.returncode |
230 if stdin is not None: | 388 else: |
231 return proc.communicate(stdin), proc.returncode | 389 return proc.communicate(None, timeout), proc.returncode |
232 else: | |
233 return proc.communicate(), proc.returncode | |
234 | |
235 # Create a temporary file to workaround python's deadlock. | |
236 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | |
237 # When the pipe fills up, it will deadlock this process. Using a real file | |
238 # works around that issue. | |
239 with tempfile.TemporaryFile() as buff: | |
240 kwargs['stdout'] = buff | |
241 proc = Popen(args, **kwargs) | |
242 if proc.shell: | |
243 raise TypeError( | |
244 'Using timeout and shell simultaneously will cause a process leak ' | |
245 'since the shell will be killed instead of the child process.') | |
246 if stdin is not None: | |
247 proc.stdin.write(stdin) | |
248 while proc.returncode is None: | |
249 proc.poll() | |
250 if timeout and (time.time() - proc.start) > timeout: | |
251 proc.kill() | |
252 proc.wait() | |
253 # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. | |
254 proc.returncode = TIMED_OUT | |
255 time.sleep(0.001) | |
256 # Now that the process died, reset the cursor and read the file. | |
257 buff.seek(0) | |
258 out = (buff.read(), None) | |
259 return out, proc.returncode | |
260 | 390 |
261 | 391 |
262 def call(args, **kwargs): | 392 def call(args, **kwargs): |
263 """Emulates subprocess.call(). | 393 """Emulates subprocess.call(). |
264 | 394 |
265 Automatically convert stdout=PIPE or stderr=PIPE to VOID. | 395 Automatically convert stdout=PIPE or stderr=PIPE to VOID. |
266 In no case they can be returned since no code path raises | 396 In no case they can be returned since no code path raises |
267 subprocess2.CalledProcessError. | 397 subprocess2.CalledProcessError. |
268 """ | 398 """ |
269 if kwargs.get('stdout') == PIPE: | 399 if kwargs.get('stdout') == PIPE: |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
312 | 442 |
313 - Throws if return code is not 0. | 443 - Throws if return code is not 0. |
314 - Works even prior to python 2.7. | 444 - Works even prior to python 2.7. |
315 - Blocks stdin by default if not specified since no output will be visible. | 445 - Blocks stdin by default if not specified since no output will be visible. |
316 - As per doc, "The stdout argument is not allowed as it is used internally." | 446 - As per doc, "The stdout argument is not allowed as it is used internally." |
317 """ | 447 """ |
318 kwargs.setdefault('stdin', VOID) | 448 kwargs.setdefault('stdin', VOID) |
319 if 'stdout' in kwargs: | 449 if 'stdout' in kwargs: |
320 raise ValueError('stdout argument not allowed, it will be overridden.') | 450 raise ValueError('stdout argument not allowed, it will be overridden.') |
321 return check_call_out(args, stdout=PIPE, **kwargs)[0] | 451 return check_call_out(args, stdout=PIPE, **kwargs)[0] |
OLD | NEW |