Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Side by Side Diff: subprocess2.py

Issue 8749015: Reimplement r109239 but using Popen.communicate() instead. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Address comments Created 9 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding=utf8 1 # coding=utf8
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 """Collection of subprocess wrapper functions. 5 """Collection of subprocess wrapper functions.
6 6
7 In theory you shouldn't need anything else in subprocess, or this module failed. 7 In theory you shouldn't need anything else in subprocess, or this module failed.
8 """ 8 """
9 9
10 from __future__ import with_statement 10 from __future__ import with_statement
11 import cStringIO
11 import errno 12 import errno
12 import logging 13 import logging
13 import os 14 import os
15 import Queue
14 import subprocess 16 import subprocess
15 import sys 17 import sys
16 import tempfile
17 import time 18 import time
18 import threading 19 import threading
19 20
20 21
21 # Constants forwarded from subprocess. 22 # Constants forwarded from subprocess.
22 PIPE = subprocess.PIPE 23 PIPE = subprocess.PIPE
23 STDOUT = subprocess.STDOUT 24 STDOUT = subprocess.STDOUT
24 # Sends stdout or stderr to os.devnull. 25 # Sends stdout or stderr to os.devnull.
25 VOID = object() 26 VOID = object()
26 # Error code when a process was killed because it timed out. 27 # Error code when a process was killed because it timed out.
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 if isinstance(args, basestring): 164 if isinstance(args, basestring):
164 tmp_str = args 165 tmp_str = args
165 elif isinstance(args, (list, tuple)): 166 elif isinstance(args, (list, tuple)):
166 tmp_str = ' '.join(args) 167 tmp_str = ' '.join(args)
167 else: 168 else:
168 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None) 169 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None)
169 if kwargs.get('cwd', None): 170 if kwargs.get('cwd', None):
170 tmp_str += '; cwd=%s' % kwargs['cwd'] 171 tmp_str += '; cwd=%s' % kwargs['cwd']
171 logging.debug(tmp_str) 172 logging.debug(tmp_str)
172 173
174 self.stdout_cb = None
175 self.stderr_cb = None
176 self.stdout_void = False
177 self.stderr_void = False
173 def fix(stream): 178 def fix(stream):
174 if kwargs.get(stream) in (VOID, os.devnull): 179 if kwargs.get(stream) in (VOID, os.devnull):
175 # Replaces VOID with handle to /dev/null. 180 # Replaces VOID with handle to /dev/null.
176 # Create a temporary file to workaround python's deadlock. 181 # Create a temporary file to workaround python's deadlock.
177 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait 182 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
178 # When the pipe fills up, it will deadlock this process. Using a real 183 # When the pipe fills up, it will deadlock this process. Using a real
179 # file works around that issue. 184 # file works around that issue.
180 kwargs[stream] = open(os.devnull, 'w') 185 kwargs[stream] = open(os.devnull, 'w')
186 setattr(self, stream + '_void', True)
187 if callable(kwargs.get(stream)):
188 # Callable stdout/stderr should be used only with call() wrappers.
189 setattr(self, stream + '_cb', kwargs[stream])
190 kwargs[stream] = PIPE
181 191
182 fix('stdout') 192 fix('stdout')
183 fix('stderr') 193 fix('stderr')
184 194
185 self.start = time.time() 195 self.start = time.time()
196 self.timeout = None
186 self.shell = kwargs.get('shell', None) 197 self.shell = kwargs.get('shell', None)
187 # Silence pylint on MacOSX 198 # Silence pylint on MacOSX
188 self.returncode = None 199 self.returncode = None
189 200
190 try: 201 try:
191 super(Popen, self).__init__(args, **kwargs) 202 super(Popen, self).__init__(args, **kwargs)
192 except OSError, e: 203 except OSError, e:
193 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': 204 if e.errno == errno.EAGAIN and sys.platform == 'cygwin':
194 # Convert fork() emulation failure into a CygwinRebaseError(). 205 # Convert fork() emulation failure into a CygwinRebaseError().
195 raise CygwinRebaseError( 206 raise CygwinRebaseError(
196 e.errno, 207 e.errno,
197 args, 208 args,
198 kwargs.get('cwd'), 209 kwargs.get('cwd'),
199 None, 210 None,
200 'Visit ' 211 'Visit '
201 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure ' 212 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure '
202 'to learn how to fix this error; you need to rebase your cygwin ' 213 'to learn how to fix this error; you need to rebase your cygwin '
203 'dlls') 214 'dlls')
204 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go 215 # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go
205 # through 216 # through
206 raise 217 raise
207 218
219 def _tee_threads(self, input): # pylint: disable=W0622
220 """Does I/O for a process's pipes using threads.
221
222 It's the simplest and slowest implementation. Expect very slow behavior.
223
224 If there is a callback and it doesn't keep up with the calls, the timeout
225 effectiveness will be delayed accordingly.
226 """
227 # Queue of either of <threadname> when done or (<threadname>, data). In
228 # theory we would like to limit to ~64kb items to not cause large memory
229 # usage when the callback blocks. It is not done because it slows down
230 # processing on OSX10.6 by a factor of 2x, making it even slower than
231 # Windows! Revisit this decision if it becomes a problem, e.g. crash
232 # because of memory exhaustion.
233 queue = Queue.Queue()
234 done = threading.Event()
235
236 def write_stdin():
237 try:
238 stdin_io = cStringIO.StringIO(input)
239 while True:
240 data = stdin_io.read(1024)
241 if data:
242 self.stdin.write(data)
243 else:
244 self.stdin.close()
245 break
246 finally:
247 queue.put('stdin')
248
249 def _queue_pipe_read(pipe, name):
250 """Queues characters read from a pipe into a queue."""
251 try:
252 while True:
253 data = pipe.read(1)
254 if not data:
255 break
256 queue.put((name, data))
257 finally:
258 queue.put(name)
259
260 def timeout_fn():
261 try:
262 done.wait(self.timeout)
263 finally:
264 queue.put('timeout')
265
266 def wait_fn():
267 try:
268 self.wait()
269 finally:
270 queue.put('wait')
271
272 # Starts up to 5 threads:
273 # Wait for the process to quit
274 # Read stdout
275 # Read stderr
276 # Write stdin
277 # Timeout
278 threads = {
279 'wait': threading.Thread(target=wait_fn),
280 }
281 if self.timeout is not None:
282 threads['timeout'] = threading.Thread(target=timeout_fn)
283 if self.stdout_cb:
284 threads['stdout'] = threading.Thread(
285 target=_queue_pipe_read, args=(self.stdout, 'stdout'))
286 if self.stderr_cb:
287 threads['stderr'] = threading.Thread(
288 target=_queue_pipe_read, args=(self.stderr, 'stderr'))
289 if input:
290 threads['stdin'] = threading.Thread(target=write_stdin)
291 for t in threads.itervalues():
292 t.start()
293
294 timed_out = False
295 try:
296 # This thread needs to be optimized for speed.
297 while threads:
298 item = queue.get()
299 if item[0] is 'stdout':
300 self.stdout_cb(item[1])
301 elif item[0] is 'stderr':
302 self.stderr_cb(item[1])
303 else:
304 # A thread terminated.
305 threads[item].join()
306 del threads[item]
307 if item == 'wait':
308 # Terminate the timeout thread if necessary.
309 done.set()
310 elif item == 'timeout' and not timed_out and self.poll() is None:
311 logging.debug('Timed out after %fs: killing' % self.timeout)
312 self.kill()
313 timed_out = True
314 finally:
315 # Stop the threads.
316 done.set()
317 if 'wait' in threads:
318 # Accelerate things, otherwise it would hang until the child process is
319 # done.
320 logging.debug('Killing child because of an exception')
321 self.kill()
322 # Join threads.
323 for thread in threads.itervalues():
324 thread.join()
325 if timed_out:
326 self.returncode = TIMED_OUT
327
328 def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622
329 """Adds timeout and callbacks support.
330
331 Returns (stdout, stderr) like subprocess.Popen().communicate().
332
333 - The process will be killed after |timeout| seconds and returncode set to
334 TIMED_OUT.
335 """
336 self.timeout = timeout
337 if not self.timeout and not self.stdout_cb and not self.stderr_cb:
338 return super(Popen, self).communicate(input)
339
340 if self.timeout and self.shell:
341 raise TypeError(
342 'Using timeout and shell simultaneously will cause a process leak '
343 'since the shell will be killed instead of the child process.')
344
345 stdout = None
346 stderr = None
347 # Convert to a lambda to workaround python's deadlock.
348 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
349 # When the pipe fills up, it will deadlock this process. Using a thread
350 # works around that issue. No need for thread safe function since the call
351 # backs are guaranteed to be called from the main thread.
352 if self.stdout and not self.stdout_cb and not self.stdout_void:
353 stdout = cStringIO.StringIO()
354 self.stdout_cb = stdout.write
355 if self.stderr and not self.stderr_cb and not self.stderr_void:
356 stderr = cStringIO.StringIO()
357 self.stderr_cb = stderr.write
358 self._tee_threads(input)
359 if stdout:
360 stdout = stdout.getvalue()
361 if stderr:
362 stderr = stderr.getvalue()
363 return (stdout, stderr)
364
208 365
209 def communicate(args, timeout=None, **kwargs): 366 def communicate(args, timeout=None, **kwargs):
210 """Wraps subprocess.Popen().communicate() and add timeout support. 367 """Wraps subprocess.Popen().communicate() and add timeout support.
211 368
212 Returns ((stdout, stderr), returncode). 369 Returns ((stdout, stderr), returncode).
213 370
214 - The process will be killed after |timeout| seconds and returncode set to 371 - The process will be killed after |timeout| seconds and returncode set to
215 TIMED_OUT. 372 TIMED_OUT.
216 - Automatically passes stdin content as input so do not specify stdin=PIPE. 373 - Automatically passes stdin content as input so do not specify stdin=PIPE.
217 """ 374 """
218 stdin = kwargs.pop('stdin', None) 375 stdin = kwargs.pop('stdin', None)
219 if stdin is not None: 376 if stdin is not None:
220 if stdin is VOID: 377 if stdin is VOID:
221 kwargs['stdin'] = open(os.devnull, 'r') 378 kwargs['stdin'] = open(os.devnull, 'r')
222 stdin = None 379 stdin = None
223 else: 380 else:
224 assert isinstance(stdin, basestring) 381 assert isinstance(stdin, basestring)
225 # When stdin is passed as an argument, use it as the actual input data and 382 # When stdin is passed as an argument, use it as the actual input data and
226 # set the Popen() parameter accordingly. 383 # set the Popen() parameter accordingly.
227 kwargs['stdin'] = PIPE 384 kwargs['stdin'] = PIPE
228 385
229 if not timeout: 386 proc = Popen(args, **kwargs)
230 # Normal workflow. 387 if stdin not in (None, VOID):
231 proc = Popen(args, **kwargs) 388 return proc.communicate(stdin, timeout), proc.returncode
232 if stdin is not None: 389 else:
233 return proc.communicate(stdin), proc.returncode 390 return proc.communicate(None, timeout), proc.returncode
234 else:
235 return proc.communicate(), proc.returncode
236
237 # Create a temporary file to workaround python's deadlock.
238 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
239 # When the pipe fills up, it will deadlock this process. Using a real file
240 # works around that issue.
241 with tempfile.TemporaryFile() as buff:
242 kwargs['stdout'] = buff
243 proc = Popen(args, **kwargs)
244 if proc.shell:
245 raise TypeError(
246 'Using timeout and shell simultaneously will cause a process leak '
247 'since the shell will be killed instead of the child process.')
248 if stdin is not None:
249 proc.stdin.write(stdin)
250 while proc.returncode is None:
251 proc.poll()
252 if timeout and (time.time() - proc.start) > timeout:
253 proc.kill()
254 proc.wait()
255 # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT.
256 proc.returncode = TIMED_OUT
257 time.sleep(0.001)
258 # Now that the process died, reset the cursor and read the file.
259 buff.seek(0)
260 out = (buff.read(), None)
261 return out, proc.returncode
262 391
263 392
264 def call(args, **kwargs): 393 def call(args, **kwargs):
265 """Emulates subprocess.call(). 394 """Emulates subprocess.call().
266 395
267 Automatically convert stdout=PIPE or stderr=PIPE to VOID. 396 Automatically convert stdout=PIPE or stderr=PIPE to VOID.
268 In no case they can be returned since no code path raises 397 In no case they can be returned since no code path raises
269 subprocess2.CalledProcessError. 398 subprocess2.CalledProcessError.
270 """ 399 """
271 if kwargs.get('stdout') == PIPE: 400 if kwargs.get('stdout') == PIPE:
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
314 443
315 - Throws if return code is not 0. 444 - Throws if return code is not 0.
316 - Works even prior to python 2.7. 445 - Works even prior to python 2.7.
317 - Blocks stdin by default if not specified since no output will be visible. 446 - Blocks stdin by default if not specified since no output will be visible.
318 - As per doc, "The stdout argument is not allowed as it is used internally." 447 - As per doc, "The stdout argument is not allowed as it is used internally."
319 """ 448 """
320 kwargs.setdefault('stdin', VOID) 449 kwargs.setdefault('stdin', VOID)
321 if 'stdout' in kwargs: 450 if 'stdout' in kwargs:
322 raise ValueError('stdout argument not allowed, it will be overridden.') 451 raise ValueError('stdout argument not allowed, it will be overridden.')
323 return check_call_out(args, stdout=PIPE, **kwargs)[0] 452 return check_call_out(args, stdout=PIPE, **kwargs)[0]
OLDNEW
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698