OLD | NEW |
---|---|
1 # coding=utf8 | 1 # coding=utf8 |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 """Collection of subprocess wrapper functions. | 5 """Collection of subprocess wrapper functions. |
6 | 6 |
7 In theory you shouldn't need anything else in subprocess, or this module failed. | 7 In theory you shouldn't need anything else in subprocess, or this module failed. |
8 """ | 8 """ |
9 | 9 |
10 from __future__ import with_statement | 10 from __future__ import with_statement |
11 import cStringIO | |
11 import errno | 12 import errno |
12 import logging | 13 import logging |
13 import os | 14 import os |
15 import Queue | |
14 import subprocess | 16 import subprocess |
15 import sys | 17 import sys |
16 import tempfile | |
17 import time | 18 import time |
18 import threading | 19 import threading |
19 | 20 |
20 | 21 |
21 # Constants forwarded from subprocess. | 22 # Constants forwarded from subprocess. |
22 PIPE = subprocess.PIPE | 23 PIPE = subprocess.PIPE |
23 STDOUT = subprocess.STDOUT | 24 STDOUT = subprocess.STDOUT |
24 # Sends stdout or stderr to os.devnull. | 25 # Sends stdout or stderr to os.devnull. |
25 VOID = object() | 26 VOID = object() |
26 # Error code when a process was killed because it timed out. | 27 # Error code when a process was killed because it timed out. |
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
163 if isinstance(args, basestring): | 164 if isinstance(args, basestring): |
164 tmp_str = args | 165 tmp_str = args |
165 elif isinstance(args, (list, tuple)): | 166 elif isinstance(args, (list, tuple)): |
166 tmp_str = ' '.join(args) | 167 tmp_str = ' '.join(args) |
167 else: | 168 else: |
168 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None) | 169 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None) |
169 if kwargs.get('cwd', None): | 170 if kwargs.get('cwd', None): |
170 tmp_str += '; cwd=%s' % kwargs['cwd'] | 171 tmp_str += '; cwd=%s' % kwargs['cwd'] |
171 logging.debug(tmp_str) | 172 logging.debug(tmp_str) |
172 | 173 |
174 self.stdout_cb = None | |
175 self.stderr_cb = None | |
176 self.stdout_void = False | |
177 self.stderr_void = False | |
173 def fix(stream): | 178 def fix(stream): |
174 if kwargs.get(stream) in (VOID, os.devnull): | 179 if kwargs.get(stream) in (VOID, os.devnull): |
175 # Replaces VOID with handle to /dev/null. | 180 # Replaces VOID with handle to /dev/null. |
176 # Create a temporary file to workaround python's deadlock. | 181 # Create a temporary file to workaround python's deadlock. |
177 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | 182 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
178 # When the pipe fills up, it will deadlock this process. Using a real | 183 # When the pipe fills up, it will deadlock this process. Using a real |
179 # file works around that issue. | 184 # file works around that issue. |
180 kwargs[stream] = open(os.devnull, 'w') | 185 kwargs[stream] = open(os.devnull, 'w') |
186 setattr(self, stream + '_void', True) | |
187 if callable(kwargs.get(stream)): | |
188 # Callable stdout/stderr should be used only with call() wrappers. | |
189 setattr(self, stream + '_cb', kwargs[stream]) | |
190 kwargs[stream] = PIPE | |
181 | 191 |
182 fix('stdout') | 192 fix('stdout') |
183 fix('stderr') | 193 fix('stderr') |
184 | 194 |
185 self.start = time.time() | 195 self.start = time.time() |
196 self.timeout = None | |
186 self.shell = kwargs.get('shell', None) | 197 self.shell = kwargs.get('shell', None) |
187 # Silence pylint on MacOSX | 198 # Silence pylint on MacOSX |
188 self.returncode = None | 199 self.returncode = None |
189 | 200 |
190 try: | 201 try: |
191 super(Popen, self).__init__(args, **kwargs) | 202 super(Popen, self).__init__(args, **kwargs) |
192 except OSError, e: | 203 except OSError, e: |
193 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': | 204 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': |
194 # Convert fork() emulation failure into a CygwinRebaseError(). | 205 # Convert fork() emulation failure into a CygwinRebaseError(). |
195 raise CygwinRebaseError( | 206 raise CygwinRebaseError( |
196 e.errno, | 207 e.errno, |
197 args, | 208 args, |
198 kwargs.get('cwd'), | 209 kwargs.get('cwd'), |
199 None, | 210 None, |
200 'Visit ' | 211 'Visit ' |
201 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure ' | 212 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure ' |
202 'to learn how to fix this error; you need to rebase your cygwin ' | 213 'to learn how to fix this error; you need to rebase your cygwin ' |
203 'dlls') | 214 'dlls') |
204 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go | 215 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go |
205 # through | 216 # through |
206 raise | 217 raise |
207 | 218 |
219 def _tee_threads(self, input): # pylint: disable=W0622 | |
220 """Does I/O for a process's pipes using thread. | |
Dirk Pranke
2011/11/30 21:06:44
Nit: "threads".
| |
221 | |
222 It's the simplest and slowest implementation. Expect very slow behavior. | |
223 | |
224 If there is a callback and it doesn't keep up with the calls, the timeout | |
225 effectiveness will be delayed accordingly. | |
226 """ | |
227 # Queue of either of <threadname> when done or (<threadname>, data). In | |
228 # theory we would like to limit to ~64kb items to not cause large memory | |
229 # usage when the callback blocks. It is not done because it slows down | |
230 # processing on OSX10.6 by a factor of 2x, making it even slower than | |
231 # Windows! Revisit this decision if it becomes a problem, e.g. crash | |
232 # because of memory exhaustion. | |
233 queue = Queue.Queue() | |
234 done = threading.Event() | |
235 | |
236 def write_stdin(): | |
237 try: | |
238 stdin_io = cStringIO.StringIO(input) | |
239 while True: | |
240 data = stdin_io.read(1024) | |
241 if data: | |
242 self.stdin.write(data) | |
243 else: | |
244 self.stdin.close() | |
245 break | |
246 finally: | |
247 queue.put('stdin') | |
248 | |
249 def _queue_pipe_read(pipe, name): | |
250 """Queues characters read from a pipe into a queue.""" | |
251 try: | |
252 while True: | |
253 data = pipe.read(1) | |
254 if not data: | |
255 break | |
256 queue.put((name, data)) | |
257 finally: | |
258 queue.put(name) | |
259 | |
260 def timeout_fn(): | |
261 try: | |
262 done.wait(self.timeout) | |
263 finally: | |
264 queue.put('timeout') | |
265 | |
266 def wait_fn(): | |
267 try: | |
268 self.wait() | |
269 finally: | |
270 queue.put('wait') | |
271 | |
272 # Starts up to 5 threads: | |
273 # Wait for the process to quit | |
274 # Read stdout | |
275 # Read stderr | |
276 # Write stdin | |
277 # Timeout | |
278 threads = { | |
279 'wait': threading.Thread(target=wait_fn), | |
Dirk Pranke
2011/11/30 21:06:44
Why do you need this one? Shouldn't the other thre
M-A Ruel
2011/11/30 21:26:04
(Argh, rietveld ate my comment)
In the previous c
Dirk Pranke
2011/11/30 22:15:54
Hm. Okay, I can see that. I think you could get ri
| |
280 } | |
281 if self.timeout is not None: | |
282 threads['timeout'] = threading.Thread(target=timeout_fn) | |
283 if self.stdout_cb: | |
284 threads['stdout'] = threading.Thread( | |
285 target=_queue_pipe_read, args=(self.stdout, 'stdout')) | |
286 if self.stderr_cb: | |
287 threads['stderr'] = threading.Thread( | |
288 target=_queue_pipe_read, args=(self.stderr, 'stderr')) | |
289 if input: | |
290 threads['stdin'] = threading.Thread(target=write_stdin) | |
291 for t in threads.itervalues(): | |
292 t.daemon = True | |
Dirk Pranke
2011/11/30 21:06:44
Are you sure you want these to be daemons? I supp
M-A Ruel
2011/11/30 21:26:04
Right, it's not needed anymore. Removed.
| |
293 t.start() | |
294 | |
295 timed_out = False | |
296 try: | |
297 # This thread needs to be optimized for speed. | |
298 while threads: | |
299 item = queue.get() | |
300 if item[0] is 'stdout': | |
301 self.stdout_cb(item[1]) | |
302 elif item[0] is 'stderr': | |
303 self.stderr_cb(item[1]) | |
304 else: | |
305 # A thread terminated. | |
306 threads[item].join() | |
307 del threads[item] | |
308 if item == 'wait': | |
309 # Terminate the timeout thread if necessary. | |
310 done.set() | |
311 elif item == 'timeout' and not timed_out and self.poll() is None: | |
312 logging.debug('Timed out after %fs: killing' % self.timeout) | |
313 self.kill() | |
314 timed_out = True | |
315 finally: | |
316 # Stop the threads. | |
317 done.set() | |
318 if 'wait' in threads: | |
319 # Accelerate things, otherwise it would hang until the child process is | |
320 # done. | |
321 logging.debug('Killing child because of an exception') | |
322 self.kill() | |
323 # Join threads. | |
324 for thread in threads.itervalues(): | |
325 thread.join() | |
326 if timed_out: | |
327 self.returncode = TIMED_OUT | |
328 | |
329 def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622 | |
330 """Adds timeout and callbacks support. | |
331 | |
332 Returns (stdout, stderr) like subprocess.Popen().communicate(). | |
333 | |
334 - The process will be killed after |timeout| seconds and returncode set to | |
335 TIMED_OUT. | |
336 """ | |
337 self.timeout = timeout | |
338 if not self.timeout and not self.stdout_cb and not self.stderr_cb: | |
339 return super(Popen, self).communicate(input) | |
340 | |
341 if self.timeout and self.shell: | |
342 raise TypeError( | |
343 'Using timeout and shell simultaneously will cause a process leak ' | |
344 'since the shell will be killed instead of the child process.') | |
345 | |
346 stdout = None | |
347 stderr = None | |
348 # Convert to a lambda to workaround python's deadlock. | |
349 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | |
350 # When the pipe fills up, it will deadlock this process. Using a thread | |
351 # works around that issue. No need for thread safe function since the call | |
352 # backs are guaranteed to be called from the main thread. | |
353 if self.stdout and not self.stdout_cb and not self.stdout_void: | |
354 stdout = cStringIO.StringIO() | |
355 self.stdout_cb = stdout.write | |
356 if self.stderr and not self.stderr_cb and not self.stderr_void: | |
357 stderr = cStringIO.StringIO() | |
358 self.stderr_cb = stderr.write | |
359 self._tee_threads(input) | |
360 if stdout: | |
361 stdout = stdout.getvalue() | |
362 if stderr: | |
363 stderr = stderr.getvalue() | |
364 return (stdout, stderr) | |
365 | |
208 | 366 |
209 def communicate(args, timeout=None, **kwargs): | 367 def communicate(args, timeout=None, **kwargs): |
210 """Wraps subprocess.Popen().communicate() and add timeout support. | 368 """Wraps subprocess.Popen().communicate() and add timeout support. |
211 | 369 |
212 Returns ((stdout, stderr), returncode). | 370 Returns ((stdout, stderr), returncode). |
213 | 371 |
214 - The process will be killed after |timeout| seconds and returncode set to | 372 - The process will be killed after |timeout| seconds and returncode set to |
215 TIMED_OUT. | 373 TIMED_OUT. |
216 - Automatically passes stdin content as input so do not specify stdin=PIPE. | 374 - Automatically passes stdin content as input so do not specify stdin=PIPE. |
217 """ | 375 """ |
218 stdin = kwargs.pop('stdin', None) | 376 stdin = kwargs.pop('stdin', None) |
219 if stdin is not None: | 377 if stdin is not None: |
220 if stdin is VOID: | 378 if stdin is VOID: |
221 kwargs['stdin'] = open(os.devnull, 'r') | 379 kwargs['stdin'] = open(os.devnull, 'r') |
222 stdin = None | 380 stdin = None |
223 else: | 381 else: |
224 assert isinstance(stdin, basestring) | 382 assert isinstance(stdin, basestring) |
225 # When stdin is passed as an argument, use it as the actual input data and | 383 # When stdin is passed as an argument, use it as the actual input data and |
226 # set the Popen() parameter accordingly. | 384 # set the Popen() parameter accordingly. |
227 kwargs['stdin'] = PIPE | 385 kwargs['stdin'] = PIPE |
228 | 386 |
229 if not timeout: | 387 proc = Popen(args, **kwargs) |
230 # Normal workflow. | 388 if stdin not in (None, VOID): |
231 proc = Popen(args, **kwargs) | 389 return proc.communicate(stdin, timeout), proc.returncode |
232 if stdin is not None: | 390 else: |
233 return proc.communicate(stdin), proc.returncode | 391 return proc.communicate(None, timeout), proc.returncode |
234 else: | |
235 return proc.communicate(), proc.returncode | |
236 | |
237 # Create a temporary file to workaround python's deadlock. | |
238 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | |
239 # When the pipe fills up, it will deadlock this process. Using a real file | |
240 # works around that issue. | |
241 with tempfile.TemporaryFile() as buff: | |
242 kwargs['stdout'] = buff | |
243 proc = Popen(args, **kwargs) | |
244 if proc.shell: | |
245 raise TypeError( | |
246 'Using timeout and shell simultaneously will cause a process leak ' | |
247 'since the shell will be killed instead of the child process.') | |
248 if stdin is not None: | |
249 proc.stdin.write(stdin) | |
250 while proc.returncode is None: | |
251 proc.poll() | |
252 if timeout and (time.time() - proc.start) > timeout: | |
253 proc.kill() | |
254 proc.wait() | |
255 # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT. | |
256 proc.returncode = TIMED_OUT | |
257 time.sleep(0.001) | |
258 # Now that the process died, reset the cursor and read the file. | |
259 buff.seek(0) | |
260 out = (buff.read(), None) | |
261 return out, proc.returncode | |
262 | 392 |
263 | 393 |
264 def call(args, **kwargs): | 394 def call(args, **kwargs): |
265 """Emulates subprocess.call(). | 395 """Emulates subprocess.call(). |
266 | 396 |
267 Automatically convert stdout=PIPE or stderr=PIPE to VOID. | 397 Automatically convert stdout=PIPE or stderr=PIPE to VOID. |
268 In no case they can be returned since no code path raises | 398 In no case they can be returned since no code path raises |
269 subprocess2.CalledProcessError. | 399 subprocess2.CalledProcessError. |
270 """ | 400 """ |
271 if kwargs.get('stdout') == PIPE: | 401 if kwargs.get('stdout') == PIPE: |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
314 | 444 |
315 - Throws if return code is not 0. | 445 - Throws if return code is not 0. |
316 - Works even prior to python 2.7. | 446 - Works even prior to python 2.7. |
317 - Blocks stdin by default if not specified since no output will be visible. | 447 - Blocks stdin by default if not specified since no output will be visible. |
318 - As per doc, "The stdout argument is not allowed as it is used internally." | 448 - As per doc, "The stdout argument is not allowed as it is used internally." |
319 """ | 449 """ |
320 kwargs.setdefault('stdin', VOID) | 450 kwargs.setdefault('stdin', VOID) |
321 if 'stdout' in kwargs: | 451 if 'stdout' in kwargs: |
322 raise ValueError('stdout argument not allowed, it will be overridden.') | 452 raise ValueError('stdout argument not allowed, it will be overridden.') |
323 return check_call_out(args, stdout=PIPE, **kwargs)[0] | 453 return check_call_out(args, stdout=PIPE, **kwargs)[0] |
OLD | NEW |