| OLD | NEW |
| (Empty) | |
| 1 import unittest |
| 2 from test import test_support |
| 3 import subprocess32 |
| 4 subprocess = subprocess32 |
| 5 import sys |
| 6 import signal |
| 7 import os |
| 8 import errno |
| 9 import tempfile |
| 10 import time |
| 11 try: |
| 12 import threading |
| 13 except ImportError: |
| 14 threading = None |
| 15 import re |
| 16 #import sysconfig |
| 17 import select |
| 18 import shutil |
| 19 try: |
| 20 import gc |
| 21 except ImportError: |
| 22 gc = None |
| 23 |
| 24 mswindows = (sys.platform == "win32") |
| 25 |
| 26 # |
| 27 # Depends on the following external programs: Python |
| 28 # |
| 29 |
| 30 if mswindows: |
| 31 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' |
| 32 'os.O_BINARY);') |
| 33 else: |
| 34 SETBINARY = '' |
| 35 |
| 36 |
| 37 try: |
| 38 mkstemp = tempfile.mkstemp |
| 39 except AttributeError: |
| 40 # tempfile.mkstemp is not available |
| 41 def mkstemp(): |
| 42 """Replacement for mkstemp, calling mktemp.""" |
| 43 fname = tempfile.mktemp() |
| 44 return os.open(fname, os.O_RDWR|os.O_CREAT), fname |
| 45 |
| 46 try: |
| 47 strip_python_stderr = test_support.strip_python_stderr |
| 48 except AttributeError: |
| 49 # Copied from the test.test_support module in 2.7. |
| 50 def strip_python_stderr(stderr): |
| 51 """Strip the stderr of a Python process from potential debug output |
| 52 emitted by the interpreter. |
| 53 |
| 54 This will typically be run on the result of the communicate() method |
| 55 of a subprocess.Popen object. |
| 56 """ |
| 57 stderr = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr).strip() |
| 58 return stderr |
| 59 |
| 60 class BaseTestCase(unittest.TestCase): |
| 61 def setUp(self): |
| 62 # Try to minimize the number of children we have so this test |
| 63 # doesn't crash on some buildbots (Alphas in particular). |
| 64 reap_children() |
| 65 if not hasattr(unittest.TestCase, 'addCleanup'): |
| 66 self._cleanups = [] |
| 67 |
| 68 def tearDown(self): |
| 69 try: |
| 70 for inst in subprocess._active: |
| 71 inst.wait() |
| 72 subprocess._cleanup() |
| 73 self.assertFalse(subprocess._active, "subprocess._active not empty") |
| 74 finally: |
| 75 if self._use_our_own_cleanup_implementation: |
| 76 self._doCleanups() |
| 77 |
| 78 if not hasattr(unittest.TestCase, 'assertIn'): |
| 79 def assertIn(self, a, b, msg=None): |
| 80 self.assert_((a in b), msg or ('%r not in %r' % (a, b))) |
| 81 def assertNotIn(self, a, b, msg=None): |
| 82 self.assert_((a not in b), msg or ('%r in %r' % (a, b))) |
| 83 |
| 84 if not hasattr(unittest.TestCase, 'skipTest'): |
| 85 def skipTest(self, message): |
| 86 """These will still fail but it'll be clear that it is okay.""" |
| 87 self.fail('SKIPPED - %s\n' % (message,)) |
| 88 |
| 89 def _addCleanup(self, function, *args, **kwargs): |
| 90 """Add a function, with arguments, to be called when the test is |
| 91 completed. Functions added are called on a LIFO basis and are |
| 92 called after tearDown on test failure or success. |
| 93 |
| 94 Unlike unittest2 or python 2.7, cleanups are not if setUp fails. |
| 95 That is easier to implement in this subclass and is all we need. |
| 96 """ |
| 97 self._cleanups.append((function, args, kwargs)) |
| 98 |
| 99 def _doCleanups(self): |
| 100 """Execute all cleanup functions. Normally called for you after |
| 101 tearDown.""" |
| 102 while self._cleanups: |
| 103 function, args, kwargs = self._cleanups.pop() |
| 104 try: |
| 105 function(*args, **kwargs) |
| 106 except KeyboardInterrupt: |
| 107 raise |
| 108 except: |
| 109 pass |
| 110 |
| 111 _use_our_own_cleanup_implementation = False |
| 112 if not hasattr(unittest.TestCase, 'addCleanup'): |
| 113 _use_our_own_cleanup_implementation = True |
| 114 addCleanup = _addCleanup |
| 115 |
| 116 def assertStderrEqual(self, stderr, expected, msg=None): |
| 117 # In a debug build, stuff like "[6580 refs]" is printed to stderr at |
| 118 # shutdown time. That frustrates tests trying to check stderr produced |
| 119 # from a spawned Python process. |
| 120 actual = strip_python_stderr(stderr) |
| 121 # strip_python_stderr also strips whitespace, so we do too. |
| 122 expected = expected.strip() |
| 123 self.assertEqual(actual, expected, msg) |
| 124 |
| 125 |
| 126 class PopenTestException(Exception): |
| 127 pass |
| 128 |
| 129 |
| 130 class PopenExecuteChildRaises(subprocess32.Popen): |
| 131 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when |
| 132 _execute_child fails. |
| 133 """ |
| 134 def _execute_child(self, *args, **kwargs): |
| 135 raise PopenTestException("Forced Exception for Test") |
| 136 |
| 137 |
| 138 class ProcessTestCase(BaseTestCase): |
| 139 |
| 140 def test_call_seq(self): |
| 141 # call() function with sequence argument |
| 142 rc = subprocess.call([sys.executable, "-c", |
| 143 "import sys; sys.exit(47)"]) |
| 144 self.assertEqual(rc, 47) |
| 145 |
| 146 def test_call_timeout(self): |
| 147 # call() function with timeout argument; we want to test that the child |
| 148 # process gets killed when the timeout expires. If the child isn't |
| 149 # killed, this call will deadlock since subprocess.call waits for the |
| 150 # child. |
| 151 self.assertRaises(subprocess.TimeoutExpired, subprocess.call, |
| 152 [sys.executable, "-c", "while True: pass"], |
| 153 timeout=0.1) |
| 154 |
| 155 def test_check_call_zero(self): |
| 156 # check_call() function with zero return code |
| 157 rc = subprocess.check_call([sys.executable, "-c", |
| 158 "import sys; sys.exit(0)"]) |
| 159 self.assertEqual(rc, 0) |
| 160 |
| 161 def test_check_call_nonzero(self): |
| 162 # check_call() function with non-zero return code |
| 163 try: |
| 164 subprocess.check_call([sys.executable, "-c", |
| 165 "import sys; sys.exit(47)"]) |
| 166 except subprocess.CalledProcessError, c: |
| 167 self.assertEqual(c.returncode, 47) |
| 168 |
| 169 def test_check_output(self): |
| 170 # check_output() function with zero return code |
| 171 output = subprocess.check_output( |
| 172 [sys.executable, "-c", "print 'BDFL'"]) |
| 173 self.assertIn('BDFL', output) |
| 174 |
| 175 def test_check_output_nonzero(self): |
| 176 # check_call() function with non-zero return code |
| 177 try: |
| 178 subprocess.check_output( |
| 179 [sys.executable, "-c", "import sys; sys.exit(5)"]) |
| 180 except subprocess.CalledProcessError, c: |
| 181 self.assertEqual(c.returncode, 5) |
| 182 |
| 183 def test_check_output_stderr(self): |
| 184 # check_output() function stderr redirected to stdout |
| 185 output = subprocess.check_output( |
| 186 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], |
| 187 stderr=subprocess.STDOUT) |
| 188 self.assertIn('BDFL', output) |
| 189 |
| 190 def test_check_output_stdout_arg(self): |
| 191 # check_output() function stderr redirected to stdout |
| 192 try: |
| 193 output = subprocess.check_output( |
| 194 [sys.executable, "-c", "print 'will not be run'"], |
| 195 stdout=sys.stdout) |
| 196 self.fail("Expected ValueError when stdout arg supplied.") |
| 197 except ValueError, c: |
| 198 self.assertIn('stdout', c.args[0]) |
| 199 |
| 200 def test_check_output_timeout(self): |
| 201 # check_output() function with timeout arg |
| 202 try: |
| 203 output = subprocess.check_output( |
| 204 [sys.executable, "-c", |
| 205 "import sys; sys.stdout.write('BDFL')\n" |
| 206 "sys.stdout.flush()\n" |
| 207 "while True: pass"], |
| 208 timeout=0.5) |
| 209 except subprocess.TimeoutExpired, exception: |
| 210 self.assertEqual(exception.output, 'BDFL') |
| 211 else: |
| 212 self.fail("Expected TimeoutExpired.") |
| 213 |
| 214 def test_call_kwargs(self): |
| 215 # call() function with keyword args |
| 216 newenv = os.environ.copy() |
| 217 newenv["FRUIT"] = "banana" |
| 218 rc = subprocess.call([sys.executable, "-c", |
| 219 'import sys, os;' |
| 220 'sys.exit(os.getenv("FRUIT")=="banana")'], |
| 221 env=newenv) |
| 222 self.assertEqual(rc, 1) |
| 223 |
| 224 def test_stdin_none(self): |
| 225 # .stdin is None when not redirected |
| 226 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], |
| 227 stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 228 p.wait() |
| 229 self.assertEqual(p.stdin, None) |
| 230 |
| 231 def test_stdout_none(self): |
| 232 # .stdout is None when not redirected, and the child's stdout will |
| 233 # be inherited from the parent. In order to test this we run a |
| 234 # subprocess in a subprocess: |
| 235 # this_test |
| 236 # \-- subprocess created by this test (parent) |
| 237 # \-- subprocess created by the parent subprocess (child) |
| 238 # The parent doesn't specify stdout, so the child will use the |
| 239 # parent's stdout. This test checks that the message printed by the |
| 240 # child goes to the parent stdout. The parent also checks that the |
| 241 # child's stdout is None. See #11963. |
| 242 code = ('import sys; from subprocess32 import Popen, PIPE;' |
| 243 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],
' |
| 244 ' stdin=PIPE, stderr=PIPE);' |
| 245 'p.wait(); assert p.stdout is None;') |
| 246 p = subprocess.Popen([sys.executable, "-c", code], |
| 247 stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 248 self.addCleanup(p.stdout.close) |
| 249 self.addCleanup(p.stderr.close) |
| 250 out, err = p.communicate() |
| 251 self.assertEqual(p.returncode, 0, err) |
| 252 self.assertEqual(out.rstrip(), 'test_stdout_none') |
| 253 |
| 254 def test_stderr_none(self): |
| 255 # .stderr is None when not redirected |
| 256 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], |
| 257 stdin=subprocess.PIPE, stdout=subprocess.PIPE) |
| 258 p.wait() |
| 259 self.assertEqual(p.stderr, None) |
| 260 |
| 261 # For use in the test_cwd* tests below. |
| 262 def _normalize_cwd(self, cwd): |
| 263 # Normalize an expected cwd (for Tru64 support). |
| 264 # We can't use os.path.realpath since it doesn't expand Tru64 {memb} |
| 265 # strings. See bug #1063571. |
| 266 original_cwd = os.getcwd() |
| 267 os.chdir(cwd) |
| 268 cwd = os.getcwd() |
| 269 os.chdir(original_cwd) |
| 270 return cwd |
| 271 |
| 272 # For use in the test_cwd* tests below. |
| 273 def _split_python_path(self): |
| 274 # Return normalized (python_dir, python_base). |
| 275 python_path = os.path.realpath(sys.executable) |
| 276 return os.path.split(python_path) |
| 277 |
| 278 # For use in the test_cwd* tests below. |
| 279 def _assert_cwd(self, expected_cwd, python_arg, **kwargs): |
| 280 # Invoke Python via Popen, and assert that (1) the call succeeds, |
| 281 # and that (2) the current working directory of the child process |
| 282 # matches *expected_cwd*. |
| 283 p = subprocess.Popen([python_arg, "-c", |
| 284 "import os, sys; " |
| 285 "sys.stdout.write(os.getcwd()); " |
| 286 "sys.exit(47)"], |
| 287 stdout=subprocess.PIPE, |
| 288 **kwargs) |
| 289 self.addCleanup(p.stdout.close) |
| 290 p.wait() |
| 291 self.assertEqual(47, p.returncode) |
| 292 normcase = os.path.normcase |
| 293 self.assertEqual(normcase(expected_cwd), |
| 294 normcase(p.stdout.read().decode("utf-8"))) |
| 295 |
| 296 def test_cwd(self): |
| 297 # Check that cwd changes the cwd for the child process. |
| 298 temp_dir = tempfile.gettempdir() |
| 299 temp_dir = self._normalize_cwd(temp_dir) |
| 300 self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir) |
| 301 |
| 302 #@unittest.skipIf(mswindows, "pending resolution of issue #15533") |
| 303 def test_cwd_with_relative_arg(self): |
| 304 # Check that Popen looks for args[0] relative to cwd if args[0] |
| 305 # is relative. |
| 306 python_dir, python_base = self._split_python_path() |
| 307 rel_python = os.path.join(os.curdir, python_base) |
| 308 |
| 309 path = 'tempcwd' |
| 310 saved_dir = os.getcwd() |
| 311 os.mkdir(path) |
| 312 try: |
| 313 os.chdir(path) |
| 314 wrong_dir = os.getcwd() |
| 315 # Before calling with the correct cwd, confirm that the call fails |
| 316 # without cwd and with the wrong cwd. |
| 317 self.assertRaises(OSError, subprocess.Popen, |
| 318 [rel_python]) |
| 319 self.assertRaises(OSError, subprocess.Popen, |
| 320 [rel_python], cwd=wrong_dir) |
| 321 python_dir = self._normalize_cwd(python_dir) |
| 322 self._assert_cwd(python_dir, rel_python, cwd=python_dir) |
| 323 finally: |
| 324 os.chdir(saved_dir) |
| 325 shutil.rmtree(path) |
| 326 |
| 327 #@unittest.skipIf(mswindows, "pending resolution of issue #15533") |
| 328 def test_cwd_with_relative_executable(self): |
| 329 # Check that Popen looks for executable relative to cwd if executable |
| 330 # is relative (and that executable takes precedence over args[0]). |
| 331 python_dir, python_base = self._split_python_path() |
| 332 rel_python = os.path.join(os.curdir, python_base) |
| 333 doesntexist = "somethingyoudonthave" |
| 334 |
| 335 path = 'tempcwd' |
| 336 saved_dir = os.getcwd() |
| 337 os.mkdir(path) |
| 338 try: |
| 339 os.chdir(path) |
| 340 wrong_dir = os.getcwd() |
| 341 # Before calling with the correct cwd, confirm that the call fails |
| 342 # without cwd and with the wrong cwd. |
| 343 self.assertRaises(OSError, subprocess.Popen, |
| 344 [doesntexist], executable=rel_python) |
| 345 self.assertRaises(OSError, subprocess.Popen, |
| 346 [doesntexist], executable=rel_python, |
| 347 cwd=wrong_dir) |
| 348 python_dir = self._normalize_cwd(python_dir) |
| 349 self._assert_cwd(python_dir, doesntexist, executable=rel_python, |
| 350 cwd=python_dir) |
| 351 finally: |
| 352 os.chdir(saved_dir) |
| 353 shutil.rmtree(path) |
| 354 |
| 355 def test_cwd_with_absolute_arg(self): |
| 356 # Check that Popen can find the executable when the cwd is wrong |
| 357 # if args[0] is an absolute path. |
| 358 python_dir, python_base = self._split_python_path() |
| 359 abs_python = os.path.join(python_dir, python_base) |
| 360 rel_python = os.path.join(os.curdir, python_base) |
| 361 wrong_dir = tempfile.mkdtemp() |
| 362 wrong_dir = os.path.realpath(wrong_dir) |
| 363 try: |
| 364 # Before calling with an absolute path, confirm that using a |
| 365 # relative path fails. |
| 366 self.assertRaises(OSError, subprocess.Popen, |
| 367 [rel_python], cwd=wrong_dir) |
| 368 wrong_dir = self._normalize_cwd(wrong_dir) |
| 369 self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir) |
| 370 finally: |
| 371 shutil.rmtree(wrong_dir) |
| 372 |
| 373 def test_executable_with_cwd(self): |
| 374 python_dir, python_base = self._split_python_path() |
| 375 python_dir = self._normalize_cwd(python_dir) |
| 376 self._assert_cwd(python_dir, "somethingyoudonthave", |
| 377 executable=sys.executable, cwd=python_dir) |
| 378 |
| 379 #@unittest.skipIf(sysconfig.is_python_build(), |
| 380 # "need an installed Python. See #7774") |
| 381 #def test_executable_without_cwd(self): |
| 382 # # For a normal installation, it should work without 'cwd' |
| 383 # # argument. For test runs in the build directory, see #7774. |
| 384 # self._assert_cwd('', "somethingyoudonthave", executable=sys.executable) |
| 385 |
| 386 def test_stdin_pipe(self): |
| 387 # stdin redirection |
| 388 p = subprocess.Popen([sys.executable, "-c", |
| 389 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
| 390 stdin=subprocess.PIPE) |
| 391 p.stdin.write("pear") |
| 392 p.stdin.close() |
| 393 p.wait() |
| 394 self.assertEqual(p.returncode, 1) |
| 395 |
| 396 def test_stdin_filedes(self): |
| 397 # stdin is set to open file descriptor |
| 398 tf = tempfile.TemporaryFile() |
| 399 d = tf.fileno() |
| 400 os.write(d, "pear") |
| 401 os.lseek(d, 0, 0) |
| 402 p = subprocess.Popen([sys.executable, "-c", |
| 403 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
| 404 stdin=d) |
| 405 p.wait() |
| 406 self.assertEqual(p.returncode, 1) |
| 407 |
| 408 def test_stdin_fileobj(self): |
| 409 # stdin is set to open file object |
| 410 tf = tempfile.TemporaryFile() |
| 411 tf.write("pear") |
| 412 tf.seek(0) |
| 413 p = subprocess.Popen([sys.executable, "-c", |
| 414 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
| 415 stdin=tf) |
| 416 p.wait() |
| 417 self.assertEqual(p.returncode, 1) |
| 418 |
| 419 def test_stdout_pipe(self): |
| 420 # stdout redirection |
| 421 p = subprocess.Popen([sys.executable, "-c", |
| 422 'import sys; sys.stdout.write("orange")'], |
| 423 stdout=subprocess.PIPE) |
| 424 self.assertEqual(p.stdout.read(), "orange") |
| 425 |
| 426 def test_stdout_filedes(self): |
| 427 # stdout is set to open file descriptor |
| 428 tf = tempfile.TemporaryFile() |
| 429 d = tf.fileno() |
| 430 p = subprocess.Popen([sys.executable, "-c", |
| 431 'import sys; sys.stdout.write("orange")'], |
| 432 stdout=d) |
| 433 p.wait() |
| 434 os.lseek(d, 0, 0) |
| 435 self.assertEqual(os.read(d, 1024), "orange") |
| 436 |
| 437 def test_stdout_fileobj(self): |
| 438 # stdout is set to open file object |
| 439 tf = tempfile.TemporaryFile() |
| 440 p = subprocess.Popen([sys.executable, "-c", |
| 441 'import sys; sys.stdout.write("orange")'], |
| 442 stdout=tf) |
| 443 p.wait() |
| 444 tf.seek(0) |
| 445 self.assertEqual(tf.read(), "orange") |
| 446 |
| 447 def test_stderr_pipe(self): |
| 448 # stderr redirection |
| 449 p = subprocess.Popen([sys.executable, "-c", |
| 450 'import sys; sys.stderr.write("strawberry")'], |
| 451 stderr=subprocess.PIPE) |
| 452 self.assertStderrEqual(p.stderr.read(), "strawberry") |
| 453 |
| 454 def test_stderr_filedes(self): |
| 455 # stderr is set to open file descriptor |
| 456 tf = tempfile.TemporaryFile() |
| 457 d = tf.fileno() |
| 458 p = subprocess.Popen([sys.executable, "-c", |
| 459 'import sys; sys.stderr.write("strawberry")'], |
| 460 stderr=d) |
| 461 p.wait() |
| 462 os.lseek(d, 0, 0) |
| 463 self.assertStderrEqual(os.read(d, 1024), "strawberry") |
| 464 |
| 465 def test_stderr_fileobj(self): |
| 466 # stderr is set to open file object |
| 467 tf = tempfile.TemporaryFile() |
| 468 p = subprocess.Popen([sys.executable, "-c", |
| 469 'import sys; sys.stderr.write("strawberry")'], |
| 470 stderr=tf) |
| 471 p.wait() |
| 472 tf.seek(0) |
| 473 self.assertStderrEqual(tf.read(), "strawberry") |
| 474 |
| 475 def test_stdout_stderr_pipe(self): |
| 476 # capture stdout and stderr to the same pipe |
| 477 p = subprocess.Popen([sys.executable, "-c", |
| 478 'import sys;' |
| 479 'sys.stdout.write("apple");' |
| 480 'sys.stdout.flush();' |
| 481 'sys.stderr.write("orange")'], |
| 482 stdout=subprocess.PIPE, |
| 483 stderr=subprocess.STDOUT) |
| 484 self.assertStderrEqual(p.stdout.read(), "appleorange") |
| 485 |
| 486 def test_stdout_stderr_file(self): |
| 487 # capture stdout and stderr to the same open file |
| 488 tf = tempfile.TemporaryFile() |
| 489 p = subprocess.Popen([sys.executable, "-c", |
| 490 'import sys;' |
| 491 'sys.stdout.write("apple");' |
| 492 'sys.stdout.flush();' |
| 493 'sys.stderr.write("orange")'], |
| 494 stdout=tf, |
| 495 stderr=tf) |
| 496 p.wait() |
| 497 tf.seek(0) |
| 498 self.assertStderrEqual(tf.read(), "appleorange") |
| 499 |
| 500 def test_stdout_filedes_of_stdout(self): |
| 501 # stdout is set to 1 (#1531862). |
| 502 # To avoid printing the text on stdout, we do something similar to |
| 503 # test_stdout_none (see above). The parent subprocess calls the child |
| 504 # subprocess passing stdout=1, and this test uses stdout=PIPE in |
| 505 # order to capture and check the output of the parent. See #11963. |
| 506 code = ('import sys, subprocess32; ' |
| 507 'rc = subprocess32.call([sys.executable, "-c", ' |
| 508 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' |
| 509 '\'test with stdout=1\'))"], stdout=1); ' |
| 510 'assert rc == 18') |
| 511 p = subprocess.Popen([sys.executable, "-c", code], |
| 512 stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 513 self.addCleanup(p.stdout.close) |
| 514 self.addCleanup(p.stderr.close) |
| 515 out, err = p.communicate() |
| 516 self.assertEqual(p.returncode, 0, err) |
| 517 self.assertEqual(out.rstrip(), 'test with stdout=1') |
| 518 |
| 519 def test_env(self): |
| 520 newenv = os.environ.copy() |
| 521 newenv["FRUIT"] = "orange" |
| 522 p = subprocess.Popen([sys.executable, "-c", |
| 523 'import sys,os;' |
| 524 'sys.stdout.write(os.getenv("FRUIT"))'], |
| 525 stdout=subprocess.PIPE, |
| 526 env=newenv) |
| 527 try: |
| 528 stdout, stderr = p.communicate() |
| 529 self.assertEqual(stdout, "orange") |
| 530 finally: |
| 531 p.__exit__(None, None, None) |
| 532 |
| 533 def test_empty_env(self): |
| 534 # Note: This excludes some __CF_* and VERSIONER_* keys OS X insists |
| 535 # on adding even when the environment in exec is empty. |
| 536 p = subprocess.Popen( |
| 537 [sys.executable, "-c", |
| 538 'import os; ' |
| 539 'print([k for k in os.environ.keys() ' |
| 540 ' if ("VERSIONER" not in k and "__CF" not in k)])'], |
| 541 stdout=subprocess.PIPE, env={}) |
| 542 try: |
| 543 stdout, stderr = p.communicate() |
| 544 self.assertEqual(stdout.strip(), "[]") |
| 545 finally: |
| 546 p.__exit__(None, None, None) |
| 547 |
| 548 def test_communicate_stdin(self): |
| 549 p = subprocess.Popen([sys.executable, "-c", |
| 550 'import sys;' |
| 551 'sys.exit(sys.stdin.read() == "pear")'], |
| 552 stdin=subprocess.PIPE) |
| 553 p.communicate("pear") |
| 554 self.assertEqual(p.returncode, 1) |
| 555 |
| 556 def test_communicate_stdout(self): |
| 557 p = subprocess.Popen([sys.executable, "-c", |
| 558 'import sys; sys.stdout.write("pineapple")'], |
| 559 stdout=subprocess.PIPE) |
| 560 (stdout, stderr) = p.communicate() |
| 561 self.assertEqual(stdout, "pineapple") |
| 562 self.assertEqual(stderr, None) |
| 563 |
| 564 def test_communicate_stderr(self): |
| 565 p = subprocess.Popen([sys.executable, "-c", |
| 566 'import sys; sys.stderr.write("pineapple")'], |
| 567 stderr=subprocess.PIPE) |
| 568 (stdout, stderr) = p.communicate() |
| 569 self.assertEqual(stdout, None) |
| 570 self.assertStderrEqual(stderr, "pineapple") |
| 571 |
| 572 def test_communicate(self): |
| 573 p = subprocess.Popen([sys.executable, "-c", |
| 574 'import sys,os;' |
| 575 'sys.stderr.write("pineapple");' |
| 576 'sys.stdout.write(sys.stdin.read())'], |
| 577 stdin=subprocess.PIPE, |
| 578 stdout=subprocess.PIPE, |
| 579 stderr=subprocess.PIPE) |
| 580 (stdout, stderr) = p.communicate("banana") |
| 581 self.assertEqual(stdout, "banana") |
| 582 self.assertStderrEqual(stderr, "pineapple") |
| 583 |
| 584 def test_communicate_timeout(self): |
| 585 p = subprocess.Popen([sys.executable, "-c", |
| 586 'import sys,os,time;' |
| 587 'sys.stderr.write("pineapple\\n");' |
| 588 'time.sleep(1);' |
| 589 'sys.stderr.write("pear\\n");' |
| 590 'sys.stdout.write(sys.stdin.read())'], |
| 591 universal_newlines=True, |
| 592 stdin=subprocess.PIPE, |
| 593 stdout=subprocess.PIPE, |
| 594 stderr=subprocess.PIPE) |
| 595 self.assertRaises(subprocess.TimeoutExpired, p.communicate, u"banana", |
| 596 timeout=0.3) |
| 597 # Make sure we can keep waiting for it, and that we get the whole output |
| 598 # after it completes. |
| 599 (stdout, stderr) = p.communicate() |
| 600 self.assertEqual(stdout, "banana") |
| 601 self.assertStderrEqual(stderr.encode(), "pineapple\npear\n") |
| 602 |
| 603 def test_communicate_timeout_large_ouput(self): |
| 604 # Test a expring timeout while the child is outputting lots of data. |
| 605 p = subprocess.Popen([sys.executable, "-c", |
| 606 'import sys,os,time;' |
| 607 'sys.stdout.write("a" * (64 * 1024));' |
| 608 'time.sleep(0.2);' |
| 609 'sys.stdout.write("a" * (64 * 1024));' |
| 610 'time.sleep(0.2);' |
| 611 'sys.stdout.write("a" * (64 * 1024));' |
| 612 'time.sleep(0.2);' |
| 613 'sys.stdout.write("a" * (64 * 1024));'], |
| 614 stdout=subprocess.PIPE) |
| 615 self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) |
| 616 (stdout, _) = p.communicate() |
| 617 self.assertEqual(len(stdout), 4 * 64 * 1024) |
| 618 |
| 619 # Test for the fd leak reported in http://bugs.python.org/issue2791. |
| 620 def test_communicate_pipe_fd_leak(self): |
| 621 for stdin_pipe in (False, True): |
| 622 for stdout_pipe in (False, True): |
| 623 for stderr_pipe in (False, True): |
| 624 options = {} |
| 625 if stdin_pipe: |
| 626 options['stdin'] = subprocess.PIPE |
| 627 if stdout_pipe: |
| 628 options['stdout'] = subprocess.PIPE |
| 629 if stderr_pipe: |
| 630 options['stderr'] = subprocess.PIPE |
| 631 if not options: |
| 632 continue |
| 633 p = subprocess.Popen((sys.executable, "-c", "pass"), **optio
ns) |
| 634 p.communicate() |
| 635 if p.stdin is not None: |
| 636 self.assertTrue(p.stdin.closed) |
| 637 if p.stdout is not None: |
| 638 self.assertTrue(p.stdout.closed) |
| 639 if p.stderr is not None: |
| 640 self.assertTrue(p.stderr.closed) |
| 641 |
| 642 def test_communicate_returns(self): |
| 643 # communicate() should return None if no redirection is active |
| 644 p = subprocess.Popen([sys.executable, "-c", |
| 645 "import sys; sys.exit(47)"]) |
| 646 (stdout, stderr) = p.communicate() |
| 647 self.assertEqual(stdout, None) |
| 648 self.assertEqual(stderr, None) |
| 649 |
| 650 def test_communicate_pipe_buf(self): |
| 651 # communicate() with writes larger than pipe_buf |
| 652 # This test will probably deadlock rather than fail, if |
| 653 # communicate() does not work properly. |
| 654 x, y = os.pipe() |
| 655 if mswindows: |
| 656 pipe_buf = 512 |
| 657 else: |
| 658 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") |
| 659 os.close(x) |
| 660 os.close(y) |
| 661 p = subprocess.Popen([sys.executable, "-c", |
| 662 'import sys,os;' |
| 663 'sys.stdout.write(sys.stdin.read(47));' |
| 664 'sys.stderr.write("xyz"*%d);' |
| 665 'sys.stdout.write(sys.stdin.read())' % pipe_buf], |
| 666 stdin=subprocess.PIPE, |
| 667 stdout=subprocess.PIPE, |
| 668 stderr=subprocess.PIPE) |
| 669 string_to_write = "abc"*pipe_buf |
| 670 (stdout, stderr) = p.communicate(string_to_write) |
| 671 self.assertEqual(stdout, string_to_write) |
| 672 |
| 673 def test_writes_before_communicate(self): |
| 674 # stdin.write before communicate() |
| 675 p = subprocess.Popen([sys.executable, "-c", |
| 676 'import sys,os;' |
| 677 'sys.stdout.write(sys.stdin.read())'], |
| 678 stdin=subprocess.PIPE, |
| 679 stdout=subprocess.PIPE, |
| 680 stderr=subprocess.PIPE) |
| 681 p.stdin.write("banana") |
| 682 (stdout, stderr) = p.communicate("split") |
| 683 self.assertEqual(stdout, "bananasplit") |
| 684 self.assertStderrEqual(stderr, "") |
| 685 |
| 686 def test_universal_newlines(self): |
| 687 p = subprocess.Popen([sys.executable, "-c", |
| 688 'import sys,os;' + SETBINARY + |
| 689 'sys.stdout.write("line1\\n");' |
| 690 'sys.stdout.flush();' |
| 691 'sys.stdout.write("line2\\r");' |
| 692 'sys.stdout.flush();' |
| 693 'sys.stdout.write("line3\\r\\n");' |
| 694 'sys.stdout.flush();' |
| 695 'sys.stdout.write("line4\\r");' |
| 696 'sys.stdout.flush();' |
| 697 'sys.stdout.write("\\nline5");' |
| 698 'sys.stdout.flush();' |
| 699 'sys.stdout.write("\\nline6");'], |
| 700 stdout=subprocess.PIPE, |
| 701 universal_newlines=1) |
| 702 stdout = p.stdout.read() |
| 703 if hasattr(file, 'newlines'): |
| 704 # Interpreter with universal newline support |
| 705 self.assertEqual(stdout, |
| 706 "line1\nline2\nline3\nline4\nline5\nline6") |
| 707 else: |
| 708 # Interpreter without universal newline support |
| 709 self.assertEqual(stdout, |
| 710 "line1\nline2\rline3\r\nline4\r\nline5\nline6") |
| 711 |
| 712 def test_universal_newlines_communicate(self): |
| 713 # universal newlines through communicate() |
| 714 p = subprocess.Popen([sys.executable, "-c", |
| 715 'import sys,os;' + SETBINARY + |
| 716 'sys.stdout.write("line1\\n");' |
| 717 'sys.stdout.flush();' |
| 718 'sys.stdout.write("line2\\r");' |
| 719 'sys.stdout.flush();' |
| 720 'sys.stdout.write("line3\\r\\n");' |
| 721 'sys.stdout.flush();' |
| 722 'sys.stdout.write("line4\\r");' |
| 723 'sys.stdout.flush();' |
| 724 'sys.stdout.write("\\nline5");' |
| 725 'sys.stdout.flush();' |
| 726 'sys.stdout.write("\\nline6");'], |
| 727 stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| 728 universal_newlines=1) |
| 729 (stdout, stderr) = p.communicate() |
| 730 if hasattr(file, 'newlines'): |
| 731 # Interpreter with universal newline support |
| 732 self.assertEqual(stdout, |
| 733 "line1\nline2\nline3\nline4\nline5\nline6") |
| 734 else: |
| 735 # Interpreter without universal newline support |
| 736 self.assertEqual(stdout, |
| 737 "line1\nline2\rline3\r\nline4\r\nline5\nline6") |
| 738 |
| 739 def test_universal_newlines_communicate_input_none(self): |
| 740 # Test communicate(input=None) with universal newlines. |
| 741 # |
| 742 # We set stdout to PIPE because, as of this writing, a different |
| 743 # code path is tested when the number of pipes is zero or one. |
| 744 p = subprocess.Popen([sys.executable, "-c", "pass"], |
| 745 stdin=subprocess.PIPE, |
| 746 stdout=subprocess.PIPE, |
| 747 universal_newlines=True) |
| 748 p.communicate() |
| 749 self.assertEqual(p.returncode, 0) |
| 750 |
| 751 def test_no_leaking(self): |
| 752 # Make sure we leak no resources |
| 753 if not hasattr(test_support, "is_resource_enabled") \ |
| 754 or test_support.is_resource_enabled("subprocess") and not mswindo
ws: |
| 755 max_handles = 1026 # too much for most UNIX systems |
| 756 else: |
| 757 max_handles = 65 |
| 758 for i in range(max_handles): |
| 759 p = subprocess.Popen([sys.executable, "-c", |
| 760 "import sys;sys.stdout.write(sys.stdin.read())"], |
| 761 stdin=subprocess.PIPE, |
| 762 stdout=subprocess.PIPE, |
| 763 stderr=subprocess.PIPE) |
| 764 data = p.communicate("lime")[0] |
| 765 self.assertEqual(data, "lime") |
| 766 |
| 767 def test_universal_newlines_communicate_stdin_stdout_stderr(self): |
| 768 # universal newlines through communicate(), with stdin, stdout, stderr |
| 769 p = subprocess.Popen([sys.executable, "-c", |
| 770 'import sys,os;' + SETBINARY + '''\nif True: |
| 771 s = sys.stdin.readline() |
| 772 sys.stdout.write(s) |
| 773 sys.stdout.write("line2\\r") |
| 774 sys.stderr.write("eline2\\n") |
| 775 s = sys.stdin.read() |
| 776 sys.stdout.write(s) |
| 777 sys.stdout.write("line4\\n") |
| 778 sys.stdout.write("line5\\r\\n") |
| 779 sys.stderr.write("eline6\\r") |
| 780 sys.stderr.write("eline7\\r\\nz") |
| 781 '''], |
| 782 stdin=subprocess.PIPE, |
| 783 stderr=subprocess.PIPE, |
| 784 stdout=subprocess.PIPE, |
| 785 universal_newlines=True) |
| 786 self.addCleanup(p.stdout.close) |
| 787 self.addCleanup(p.stderr.close) |
| 788 (stdout, stderr) = p.communicate(u"line1\nline3\n") |
| 789 self.assertEqual(p.returncode, 0) |
| 790 self.assertEqual(u"line1\nline2\nline3\nline4\nline5\n", stdout) |
| 791 # Python debug build push something like "[42442 refs]\n" |
| 792 # to stderr at exit of subprocess. |
| 793 # Don't use assertStderrEqual because it strips CR and LF from output. |
| 794 self.assertTrue(stderr.startswith(u"eline2\neline6\neline7\n")) |
| 795 |
| 796 def test_list2cmdline(self): |
| 797 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), |
| 798 '"a b c" d e') |
| 799 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), |
| 800 'ab\\"c \\ d') |
| 801 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), |
| 802 'ab\\"c " \\\\" d') |
| 803 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), |
| 804 'a\\\\\\b "de fg" h') |
| 805 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), |
| 806 'a\\\\\\"b c d') |
| 807 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), |
| 808 '"a\\\\b c" d e') |
| 809 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), |
| 810 '"a\\\\b\\ c" d e') |
| 811 self.assertEqual(subprocess.list2cmdline(['ab', '']), |
| 812 'ab ""') |
| 813 |
| 814 |
| 815 def test_poll(self): |
| 816 p = subprocess.Popen([sys.executable, |
| 817 "-c", "import time; time.sleep(1)"]) |
| 818 count = 0 |
| 819 while p.poll() is None: |
| 820 time.sleep(0.1) |
| 821 count += 1 |
| 822 # We expect that the poll loop probably went around about 10 times, |
| 823 # but, based on system scheduling we can't control, it's possible |
| 824 # poll() never returned None. It "should be" very rare that it |
| 825 # didn't go around at least twice. |
| 826 self.assert_(count >= 2) |
| 827 # Subsequent invocations should just return the returncode |
| 828 self.assertEqual(p.poll(), 0) |
| 829 |
| 830 |
| 831 def test_wait(self): |
| 832 p = subprocess.Popen([sys.executable, |
| 833 "-c", "import time; time.sleep(2)"]) |
| 834 self.assertEqual(p.wait(), 0) |
| 835 # Subsequent invocations should just return the returncode |
| 836 self.assertEqual(p.wait(), 0) |
| 837 |
| 838 |
| 839 def test_wait_timeout(self): |
| 840 p = subprocess.Popen([sys.executable, |
| 841 "-c", "import time; time.sleep(0.1)"]) |
| 842 try: |
| 843 p.wait(timeout=0.01) |
| 844 except subprocess.TimeoutExpired, e: |
| 845 self.assertIn("0.01", str(e)) # For coverage of __str__. |
| 846 else: |
| 847 self.fail("subprocess.TimeoutExpired expected but not raised.") |
| 848 self.assertEqual(p.wait(timeout=2), 0) |
| 849 |
| 850 |
| 851 def test_invalid_bufsize(self): |
| 852 # an invalid type of the bufsize argument should raise |
| 853 # TypeError. |
| 854 try: |
| 855 subprocess.Popen([sys.executable, "-c", "pass"], "orange") |
| 856 except TypeError: |
| 857 pass |
| 858 |
| 859 def test_leaking_fds_on_error(self): |
| 860 # see bug #5179: Popen leaks file descriptors to PIPEs if |
| 861 # the child fails to execute; this will eventually exhaust |
| 862 # the maximum number of open fds. 1024 seems a very common |
| 863 # value for that limit, but Windows has 2048, so we loop |
| 864 # 1024 times (each call leaked two fds). |
| 865 for i in range(1024): |
| 866 # Windows raises IOError. Others raise OSError. |
| 867 try: |
| 868 subprocess.Popen(['nonexisting_i_hope'], |
| 869 stdout=subprocess.PIPE, |
| 870 stderr=subprocess.PIPE) |
| 871 except EnvironmentError, c: |
| 872 if c.errno != 2: # ignore "no such file" |
| 873 raise |
| 874 |
| 875 #@unittest.skipIf(threading is None, "threading required") |
| 876 def test_threadsafe_wait(self): |
| 877 """Issue21291: Popen.wait() needs to be threadsafe for returncode.""" |
| 878 proc = subprocess.Popen([sys.executable, '-c', |
| 879 'import time; time.sleep(12)']) |
| 880 self.assertEqual(proc.returncode, None) |
| 881 results = [] |
| 882 |
| 883 def kill_proc_timer_thread(): |
| 884 results.append(('thread-start-poll-result', proc.poll())) |
| 885 # terminate it from the thread and wait for the result. |
| 886 proc.kill() |
| 887 proc.wait() |
| 888 results.append(('thread-after-kill-and-wait', proc.returncode)) |
| 889 # this wait should be a no-op given the above. |
| 890 proc.wait() |
| 891 results.append(('thread-after-second-wait', proc.returncode)) |
| 892 |
| 893 # This is a timing sensitive test, the failure mode is |
| 894 # triggered when both the main thread and this thread are in |
| 895 # the wait() call at once. The delay here is to allow the |
| 896 # main thread to most likely be blocked in its wait() call. |
| 897 t = threading.Timer(0.2, kill_proc_timer_thread) |
| 898 t.start() |
| 899 |
| 900 # Wait for the process to finish; the thread should kill it |
| 901 # long before it finishes on its own. Supplying a timeout |
| 902 # triggers a different code path for better coverage. |
| 903 proc.wait(timeout=20) |
| 904 # Should be -9 because of the proc.kill() from the thread. |
| 905 self.assertEqual(proc.returncode, -9, |
| 906 msg="unexpected result in wait from main thread") |
| 907 |
| 908 # This should be a no-op with no change in returncode. |
| 909 proc.wait() |
| 910 self.assertEqual(proc.returncode, -9, |
| 911 msg="unexpected result in second main wait.") |
| 912 |
| 913 t.join() |
| 914 # Ensure that all of the thread results are as expected. |
| 915 # When a race condition occurs in wait(), the returncode could |
| 916 # be set by the wrong thread that doesn't actually have it |
| 917 # leading to an incorrect value. |
| 918 self.assertEqual([('thread-start-poll-result', None), |
| 919 ('thread-after-kill-and-wait', -9), |
| 920 ('thread-after-second-wait', -9)], |
| 921 results) |
| 922 |
| 923 def test_issue8780(self): |
| 924 # Ensure that stdout is inherited from the parent |
| 925 # if stdout=PIPE is not used |
| 926 code = ';'.join(( |
| 927 'import subprocess32, sys', |
| 928 'retcode = subprocess32.call(' |
| 929 "[sys.executable, '-c', 'print(\"Hello World!\")'])", |
| 930 'assert retcode == 0')) |
| 931 output = subprocess.check_output([sys.executable, '-c', code]) |
| 932 self.assert_(output.startswith('Hello World!'), output) |
| 933 |
| 934 def test_communicate_eintr(self): |
| 935 # Issue #12493: communicate() should handle EINTR |
| 936 def handler(signum, frame): |
| 937 pass |
| 938 old_handler = signal.signal(signal.SIGALRM, handler) |
| 939 self.addCleanup(signal.signal, signal.SIGALRM, old_handler) |
| 940 |
| 941 # the process is running for 2 seconds |
| 942 args = [sys.executable, "-c", 'import time; time.sleep(2)'] |
| 943 for stream in ('stdout', 'stderr'): |
| 944 kw = {stream: subprocess.PIPE} |
| 945 process = subprocess.Popen(args, **kw) |
| 946 try: |
| 947 signal.alarm(1) |
| 948 # communicate() will be interrupted by SIGALRM |
| 949 process.communicate() |
| 950 finally: |
| 951 process.__exit__(None, None, None) |
| 952 |
| 953 |
| 954 # This test is Linux-ish specific for simplicity to at least have |
| 955 # some coverage. It is not a platform specific bug. |
| 956 #@unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), |
| 957 # "Linux specific") |
| 958 def test_failed_child_execute_fd_leak(self): |
| 959 """Test for the fork() failure fd leak reported in issue16327.""" |
| 960 if not os.path.isdir('/proc/%d/fd' % os.getpid()): |
| 961 self.skipTest("Linux specific") |
| 962 fd_directory = '/proc/%d/fd' % os.getpid() |
| 963 fds_before_popen = os.listdir(fd_directory) |
| 964 try: |
| 965 PopenExecuteChildRaises( |
| 966 [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE, |
| 967 stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 968 except PopenTestException: |
| 969 pass # Yay! Because 2.4 doesn't support with statements. |
| 970 else: |
| 971 self.fail("PopenTestException expected but not raised.") |
| 972 |
| 973 # NOTE: This test doesn't verify that the real _execute_child |
| 974 # does not close the file descriptors itself on the way out |
| 975 # during an exception. Code inspection has confirmed that. |
| 976 |
| 977 fds_after_exception = os.listdir(fd_directory) |
| 978 self.assertEqual(fds_before_popen, fds_after_exception) |
| 979 |
| 980 |
| 981 # context manager |
| 982 class _SuppressCoreFiles(object): |
| 983 """Try to prevent core files from being created.""" |
| 984 old_limit = None |
| 985 |
| 986 def __enter__(self): |
| 987 """Try to save previous ulimit, then set it to (0, 0).""" |
| 988 try: |
| 989 import resource |
| 990 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) |
| 991 resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) |
| 992 except (ImportError, ValueError, resource.error): |
| 993 pass |
| 994 |
| 995 def __exit__(self, *args): |
| 996 """Return core file behavior to default.""" |
| 997 if self.old_limit is None: |
| 998 return |
| 999 try: |
| 1000 import resource |
| 1001 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) |
| 1002 except (ImportError, ValueError, resource.error): |
| 1003 pass |
| 1004 |
| 1005 |
| 1006 #@unittest.skipIf(mswindows, "POSIX specific tests") |
| 1007 class POSIXProcessTestCase(BaseTestCase): |
| 1008 |
| 1009 def setUp(self): |
| 1010 BaseTestCase.setUp(self) |
| 1011 self._nonexistent_dir = "/_this/pa.th/does/not/exist" |
| 1012 |
| 1013 def _get_chdir_exception(self): |
| 1014 try: |
| 1015 os.chdir(self._nonexistent_dir) |
| 1016 except OSError, e: |
| 1017 # This avoids hard coding the errno value or the OS perror() |
| 1018 # string and instead capture the exception that we want to see |
| 1019 # below for comparison. |
| 1020 desired_exception = e |
| 1021 desired_exception.strerror += ': ' + repr(self._nonexistent_dir) |
| 1022 else: |
| 1023 self.fail("chdir to nonexistant directory %s succeeded." % |
| 1024 self._nonexistent_dir) |
| 1025 return desired_exception |
| 1026 |
| 1027 def test_exception_cwd(self): |
| 1028 """Test error in the child raised in the parent for a bad cwd.""" |
| 1029 desired_exception = self._get_chdir_exception() |
| 1030 try: |
| 1031 p = subprocess.Popen([sys.executable, "-c", ""], |
| 1032 cwd=self._nonexistent_dir) |
| 1033 except OSError, e: |
| 1034 # Test that the child process chdir failure actually makes |
| 1035 # it up to the parent process as the correct exception. |
| 1036 self.assertEqual(desired_exception.errno, e.errno) |
| 1037 self.assertEqual(desired_exception.strerror, e.strerror) |
| 1038 else: |
| 1039 self.fail("Expected OSError: %s" % desired_exception) |
| 1040 |
| 1041 def test_exception_bad_executable(self): |
| 1042 """Test error in the child raised in the parent for a bad executable.""" |
| 1043 desired_exception = self._get_chdir_exception() |
| 1044 try: |
| 1045 p = subprocess.Popen([sys.executable, "-c", ""], |
| 1046 executable=self._nonexistent_dir) |
| 1047 except OSError, e: |
| 1048 # Test that the child process exec failure actually makes |
| 1049 # it up to the parent process as the correct exception. |
| 1050 self.assertEqual(desired_exception.errno, e.errno) |
| 1051 self.assertEqual(desired_exception.strerror, e.strerror) |
| 1052 else: |
| 1053 self.fail("Expected OSError: %s" % desired_exception) |
| 1054 |
| 1055 def test_exception_bad_args_0(self): |
| 1056 """Test error in the child raised in the parent for a bad args[0].""" |
| 1057 desired_exception = self._get_chdir_exception() |
| 1058 try: |
| 1059 p = subprocess.Popen([self._nonexistent_dir, "-c", ""]) |
| 1060 except OSError, e: |
| 1061 # Test that the child process exec failure actually makes |
| 1062 # it up to the parent process as the correct exception. |
| 1063 self.assertEqual(desired_exception.errno, e.errno) |
| 1064 self.assertEqual(desired_exception.strerror, e.strerror) |
| 1065 else: |
| 1066 self.fail("Expected OSError: %s" % desired_exception) |
| 1067 |
| 1068 def test_restore_signals(self): |
| 1069 # Code coverage for both values of restore_signals to make sure it |
| 1070 # at least does not blow up. |
| 1071 # A test for behavior would be complex. Contributions welcome. |
| 1072 subprocess.call([sys.executable, "-c", ""], restore_signals=True) |
| 1073 subprocess.call([sys.executable, "-c", ""], restore_signals=False) |
| 1074 |
| 1075 def test_start_new_session(self): |
| 1076 # For code coverage of calling setsid(). We don't care if we get an |
| 1077 # EPERM error from it depending on the test execution environment, that |
| 1078 # still indicates that it was called. |
| 1079 try: |
| 1080 output = subprocess.check_output( |
| 1081 [sys.executable, "-c", |
| 1082 "import os; print(os.getpgid(os.getpid()))"], |
| 1083 start_new_session=True) |
| 1084 except OSError, e: |
| 1085 if e.errno != errno.EPERM: |
| 1086 raise |
| 1087 else: |
| 1088 parent_pgid = os.getpgid(os.getpid()) |
| 1089 child_pgid = int(output) |
| 1090 self.assertNotEqual(parent_pgid, child_pgid) |
| 1091 |
| 1092 def test_run_abort(self): |
| 1093 # returncode handles signal termination |
| 1094 scf = _SuppressCoreFiles() |
| 1095 scf.__enter__() |
| 1096 try: |
| 1097 p = subprocess.Popen([sys.executable, "-c", |
| 1098 "import os; os.abort()"]) |
| 1099 p.wait() |
| 1100 finally: |
| 1101 scf.__exit__() |
| 1102 self.assertEqual(-p.returncode, signal.SIGABRT) |
| 1103 |
| 1104 def test_preexec(self): |
| 1105 # DISCLAIMER: Setting environment variables is *not* a good use |
| 1106 # of a preexec_fn. This is merely a test. |
| 1107 p = subprocess.Popen([sys.executable, "-c", |
| 1108 "import sys, os;" |
| 1109 "sys.stdout.write(os.getenv('FRUIT'))"], |
| 1110 stdout=subprocess.PIPE, |
| 1111 preexec_fn=lambda: os.putenv("FRUIT", "apple")) |
| 1112 self.assertEqual(p.stdout.read(), "apple") |
| 1113 |
| 1114 def test_preexec_exception(self): |
| 1115 def raise_it(): |
| 1116 raise ValueError("What if two swallows carried a coconut?") |
| 1117 try: |
| 1118 p = subprocess.Popen([sys.executable, "-c", ""], |
| 1119 preexec_fn=raise_it) |
| 1120 except RuntimeError, e: |
| 1121 self.assertTrue( |
| 1122 subprocess._posixsubprocess, |
| 1123 "Expected a ValueError from the preexec_fn") |
| 1124 except ValueError, e: |
| 1125 self.assertIn("coconut", e.args[0]) |
| 1126 else: |
| 1127 self.fail("Exception raised by preexec_fn did not make it " |
| 1128 "to the parent process.") |
| 1129 |
| 1130 class _TestExecuteChildPopen(subprocess.Popen): |
| 1131 """Used to test behavior at the end of _execute_child.""" |
| 1132 def __init__(self, testcase, *args, **kwargs): |
| 1133 self._testcase = testcase |
| 1134 subprocess.Popen.__init__(self, *args, **kwargs) |
| 1135 |
| 1136 def _execute_child(self, *args, **kwargs): |
| 1137 try: |
| 1138 subprocess.Popen._execute_child(self, *args, **kwargs) |
| 1139 finally: |
| 1140 # Open a bunch of file descriptors and verify that |
| 1141 # none of them are the same as the ones the Popen |
| 1142 # instance is using for stdin/stdout/stderr. |
| 1143 devzero_fds = [os.open("/dev/zero", os.O_RDONLY) |
| 1144 for _ in range(8)] |
| 1145 try: |
| 1146 for fd in devzero_fds: |
| 1147 self._testcase.assertNotIn( |
| 1148 fd, (self.stdin.fileno(), self.stdout.fileno(), |
| 1149 self.stderr.fileno()), |
| 1150 msg="At least one fd was closed early.") |
| 1151 finally: |
| 1152 map(os.close, devzero_fds) |
| 1153 |
| 1154 #@unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") |
| 1155 def test_preexec_errpipe_does_not_double_close_pipes(self): |
| 1156 """Issue16140: Don't double close pipes on preexec error.""" |
| 1157 |
| 1158 def raise_it(): |
| 1159 raise RuntimeError("force the _execute_child() errpipe_data path.") |
| 1160 |
| 1161 try: |
| 1162 self._TestExecuteChildPopen( |
| 1163 self, [sys.executable, "-c", "pass"], |
| 1164 stdin=subprocess.PIPE, stdout=subprocess.PIPE, |
| 1165 stderr=subprocess.PIPE, preexec_fn=raise_it) |
| 1166 except RuntimeError: |
| 1167 pass # Yay! Because 2.4 doesn't support with statements. |
| 1168 else: |
| 1169 self.fail("RuntimeError expected but not raised.") |
| 1170 |
| 1171 #@unittest.skipUnless(gc, "Requires a gc module.") |
| 1172 def test_preexec_gc_module_failure(self): |
| 1173 # This tests the code that disables garbage collection if the child |
| 1174 # process will execute any Python. |
| 1175 def raise_runtime_error(): |
| 1176 raise RuntimeError("this shouldn't escape") |
| 1177 enabled = gc.isenabled() |
| 1178 orig_gc_disable = gc.disable |
| 1179 orig_gc_isenabled = gc.isenabled |
| 1180 try: |
| 1181 gc.disable() |
| 1182 self.assertFalse(gc.isenabled()) |
| 1183 subprocess.call([sys.executable, '-c', ''], |
| 1184 preexec_fn=lambda: None) |
| 1185 self.assertFalse(gc.isenabled(), |
| 1186 "Popen enabled gc when it shouldn't.") |
| 1187 |
| 1188 gc.enable() |
| 1189 self.assertTrue(gc.isenabled()) |
| 1190 subprocess.call([sys.executable, '-c', ''], |
| 1191 preexec_fn=lambda: None) |
| 1192 self.assertTrue(gc.isenabled(), "Popen left gc disabled.") |
| 1193 |
| 1194 gc.disable = raise_runtime_error |
| 1195 self.assertRaises(RuntimeError, subprocess.Popen, |
| 1196 [sys.executable, '-c', ''], |
| 1197 preexec_fn=lambda: None) |
| 1198 |
| 1199 del gc.isenabled # force an AttributeError |
| 1200 self.assertRaises(AttributeError, subprocess.Popen, |
| 1201 [sys.executable, '-c', ''], |
| 1202 preexec_fn=lambda: None) |
| 1203 finally: |
| 1204 gc.disable = orig_gc_disable |
| 1205 gc.isenabled = orig_gc_isenabled |
| 1206 if not enabled: |
| 1207 gc.disable() |
| 1208 |
| 1209 def test_args_string(self): |
| 1210 # args is a string |
| 1211 f, fname = mkstemp() |
| 1212 os.write(f, "#!/bin/sh\n") |
| 1213 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % |
| 1214 sys.executable) |
| 1215 os.close(f) |
| 1216 os.chmod(fname, 0700) |
| 1217 p = subprocess.Popen(fname) |
| 1218 p.wait() |
| 1219 os.remove(fname) |
| 1220 self.assertEqual(p.returncode, 47) |
| 1221 |
| 1222 def test_invalid_args(self): |
| 1223 # invalid arguments should raise ValueError |
| 1224 self.assertRaises(ValueError, subprocess.call, |
| 1225 [sys.executable, "-c", |
| 1226 "import sys; sys.exit(47)"], |
| 1227 startupinfo=47) |
| 1228 self.assertRaises(ValueError, subprocess.call, |
| 1229 [sys.executable, "-c", |
| 1230 "import sys; sys.exit(47)"], |
| 1231 creationflags=47) |
| 1232 |
| 1233 def test_shell_sequence(self): |
| 1234 # Run command through the shell (sequence) |
| 1235 newenv = os.environ.copy() |
| 1236 newenv["FRUIT"] = "apple" |
| 1237 p = subprocess.Popen(["echo $FRUIT"], shell=1, |
| 1238 stdout=subprocess.PIPE, |
| 1239 env=newenv) |
| 1240 self.assertEqual(p.stdout.read().strip(), "apple") |
| 1241 |
| 1242 def test_shell_string(self): |
| 1243 # Run command through the shell (string) |
| 1244 newenv = os.environ.copy() |
| 1245 newenv["FRUIT"] = "apple" |
| 1246 p = subprocess.Popen("echo $FRUIT", shell=1, |
| 1247 stdout=subprocess.PIPE, |
| 1248 env=newenv) |
| 1249 self.assertEqual(p.stdout.read().strip(), "apple") |
| 1250 |
| 1251 def test_call_string(self): |
| 1252 # call() function with string argument on UNIX |
| 1253 f, fname = mkstemp() |
| 1254 os.write(f, "#!/bin/sh\n") |
| 1255 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % |
| 1256 sys.executable) |
| 1257 os.close(f) |
| 1258 os.chmod(fname, 0700) |
| 1259 rc = subprocess.call(fname) |
| 1260 os.remove(fname) |
| 1261 self.assertEqual(rc, 47) |
| 1262 |
| 1263 def test_specific_shell(self): |
| 1264 # Issue #9265: Incorrect name passed as arg[0]. |
| 1265 shells = [] |
| 1266 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: |
| 1267 for name in ['bash', 'ksh']: |
| 1268 sh = os.path.join(prefix, name) |
| 1269 if os.path.isfile(sh): |
| 1270 shells.append(sh) |
| 1271 if not shells: # Will probably work for any shell but csh. |
| 1272 self.skipTest("bash or ksh required for this test") |
| 1273 sh = '/bin/sh' |
| 1274 if os.path.isfile(sh) and not os.path.islink(sh): |
| 1275 # Test will fail if /bin/sh is a symlink to csh. |
| 1276 shells.append(sh) |
| 1277 for sh in shells: |
| 1278 p = subprocess.Popen("echo $0", executable=sh, shell=True, |
| 1279 stdout=subprocess.PIPE) |
| 1280 self.assertEqual(p.stdout.read().strip(), sh) |
| 1281 |
| 1282 def _kill_process(self, method, *args): |
| 1283 # Do not inherit file handles from the parent. |
| 1284 # It should fix failures on some platforms. |
| 1285 p = subprocess.Popen([sys.executable, "-c", """if 1: |
| 1286 import sys, time |
| 1287 sys.stdout.write('x\\n') |
| 1288 sys.stdout.flush() |
| 1289 time.sleep(30) |
| 1290 """], |
| 1291 close_fds=True, |
| 1292 stdin=subprocess.PIPE, |
| 1293 stdout=subprocess.PIPE, |
| 1294 stderr=subprocess.PIPE) |
| 1295 # Wait for the interpreter to be completely initialized before |
| 1296 # sending any signal. |
| 1297 p.stdout.read(1) |
| 1298 getattr(p, method)(*args) |
| 1299 return p |
| 1300 |
| 1301 def test_send_signal(self): |
| 1302 p = self._kill_process('send_signal', signal.SIGINT) |
| 1303 _, stderr = p.communicate() |
| 1304 self.assertIn('KeyboardInterrupt', stderr) |
| 1305 self.assertNotEqual(p.wait(), 0) |
| 1306 |
| 1307 def test_kill(self): |
| 1308 p = self._kill_process('kill') |
| 1309 _, stderr = p.communicate() |
| 1310 self.assertStderrEqual(stderr, '') |
| 1311 self.assertEqual(p.wait(), -signal.SIGKILL) |
| 1312 |
| 1313 def test_terminate(self): |
| 1314 p = self._kill_process('terminate') |
| 1315 _, stderr = p.communicate() |
| 1316 self.assertStderrEqual(stderr, '') |
| 1317 self.assertEqual(p.wait(), -signal.SIGTERM) |
| 1318 |
| 1319 def check_close_std_fds(self, fds): |
| 1320 # Issue #9905: test that subprocess pipes still work properly with |
| 1321 # some standard fds closed |
| 1322 stdin = 0 |
| 1323 newfds = [] |
| 1324 for a in fds: |
| 1325 b = os.dup(a) |
| 1326 newfds.append(b) |
| 1327 if a == 0: |
| 1328 stdin = b |
| 1329 try: |
| 1330 for fd in fds: |
| 1331 os.close(fd) |
| 1332 out, err = subprocess.Popen([sys.executable, "-c", |
| 1333 'import sys;' |
| 1334 'sys.stdout.write("apple");' |
| 1335 'sys.stdout.flush();' |
| 1336 'sys.stderr.write("orange")'], |
| 1337 stdin=stdin, |
| 1338 stdout=subprocess.PIPE, |
| 1339 stderr=subprocess.PIPE).communicate() |
| 1340 err = strip_python_stderr(err) |
| 1341 self.assertEqual((out, err), ('apple', 'orange')) |
| 1342 finally: |
| 1343 for b, a in zip(newfds, fds): |
| 1344 os.dup2(b, a) |
| 1345 for b in newfds: |
| 1346 os.close(b) |
| 1347 |
| 1348 def test_close_fd_0(self): |
| 1349 self.check_close_std_fds([0]) |
| 1350 |
| 1351 def test_close_fd_1(self): |
| 1352 self.check_close_std_fds([1]) |
| 1353 |
| 1354 def test_close_fd_2(self): |
| 1355 self.check_close_std_fds([2]) |
| 1356 |
| 1357 def test_close_fds_0_1(self): |
| 1358 self.check_close_std_fds([0, 1]) |
| 1359 |
| 1360 def test_close_fds_0_2(self): |
| 1361 self.check_close_std_fds([0, 2]) |
| 1362 |
| 1363 def test_close_fds_1_2(self): |
| 1364 self.check_close_std_fds([1, 2]) |
| 1365 |
| 1366 def test_close_fds_0_1_2(self): |
| 1367 # Issue #10806: test that subprocess pipes still work properly with |
| 1368 # all standard fds closed. |
| 1369 self.check_close_std_fds([0, 1, 2]) |
| 1370 |
| 1371 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): |
| 1372 # open up some temporary files |
| 1373 temps = [mkstemp() for i in range(3)] |
| 1374 temp_fds = [fd for fd, fname in temps] |
| 1375 try: |
| 1376 # unlink the files -- we won't need to reopen them |
| 1377 for fd, fname in temps: |
| 1378 os.unlink(fname) |
| 1379 |
| 1380 # save a copy of the standard file descriptors |
| 1381 saved_fds = [os.dup(fd) for fd in range(3)] |
| 1382 try: |
| 1383 # duplicate the temp files over the standard fd's 0, 1, 2 |
| 1384 for fd, temp_fd in enumerate(temp_fds): |
| 1385 os.dup2(temp_fd, fd) |
| 1386 |
| 1387 # write some data to what will become stdin, and rewind |
| 1388 os.write(stdin_no, "STDIN") |
| 1389 os.lseek(stdin_no, 0, 0) |
| 1390 |
| 1391 # now use those files in the given order, so that subprocess |
| 1392 # has to rearrange them in the child |
| 1393 p = subprocess.Popen([sys.executable, "-c", |
| 1394 'import sys; got = sys.stdin.read();' |
| 1395 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], |
| 1396 stdin=stdin_no, |
| 1397 stdout=stdout_no, |
| 1398 stderr=stderr_no) |
| 1399 p.wait() |
| 1400 |
| 1401 for fd in temp_fds: |
| 1402 os.lseek(fd, 0, 0) |
| 1403 |
| 1404 out = os.read(stdout_no, 1024) |
| 1405 err = os.read(stderr_no, 1024) |
| 1406 finally: |
| 1407 for std, saved in enumerate(saved_fds): |
| 1408 os.dup2(saved, std) |
| 1409 os.close(saved) |
| 1410 |
| 1411 self.assertEqual(out, "got STDIN") |
| 1412 self.assertStderrEqual(err, "err") |
| 1413 |
| 1414 finally: |
| 1415 for fd in temp_fds: |
| 1416 os.close(fd) |
| 1417 |
| 1418 # When duping fds, if there arises a situation where one of the fds is |
| 1419 # either 0, 1 or 2, it is possible that it is overwritten (#12607). |
| 1420 # This tests all combinations of this. |
| 1421 def test_swap_fds(self): |
| 1422 self.check_swap_fds(0, 1, 2) |
| 1423 self.check_swap_fds(0, 2, 1) |
| 1424 self.check_swap_fds(1, 0, 2) |
| 1425 self.check_swap_fds(1, 2, 0) |
| 1426 self.check_swap_fds(2, 0, 1) |
| 1427 self.check_swap_fds(2, 1, 0) |
| 1428 |
| 1429 def test_small_errpipe_write_fd(self): |
| 1430 """Issue #15798: Popen should work when stdio fds are available.""" |
| 1431 new_stdin = os.dup(0) |
| 1432 new_stdout = os.dup(1) |
| 1433 try: |
| 1434 os.close(0) |
| 1435 os.close(1) |
| 1436 |
| 1437 subprocess.Popen([ |
| 1438 sys.executable, "-c", "pass"]).wait() |
| 1439 finally: |
| 1440 # Restore original stdin and stdout |
| 1441 os.dup2(new_stdin, 0) |
| 1442 os.dup2(new_stdout, 1) |
| 1443 os.close(new_stdin) |
| 1444 os.close(new_stdout) |
| 1445 |
| 1446 def test_remapping_std_fds(self): |
| 1447 # open up some temporary files |
| 1448 temps = [mkstemp() for i in range(3)] |
| 1449 try: |
| 1450 temp_fds = [fd for fd, fname in temps] |
| 1451 |
| 1452 # unlink the files -- we won't need to reopen them |
| 1453 for fd, fname in temps: |
| 1454 os.unlink(fname) |
| 1455 |
| 1456 # write some data to what will become stdin, and rewind |
| 1457 os.write(temp_fds[1], "STDIN") |
| 1458 os.lseek(temp_fds[1], 0, 0) |
| 1459 |
| 1460 # move the standard file descriptors out of the way |
| 1461 saved_fds = [os.dup(fd) for fd in range(3)] |
| 1462 try: |
| 1463 # duplicate the file objects over the standard fd's |
| 1464 for fd, temp_fd in enumerate(temp_fds): |
| 1465 os.dup2(temp_fd, fd) |
| 1466 |
| 1467 # now use those files in the "wrong" order, so that subprocess |
| 1468 # has to rearrange them in the child |
| 1469 p = subprocess.Popen([sys.executable, "-c", |
| 1470 'import sys; got = sys.stdin.read();' |
| 1471 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], |
| 1472 stdin=temp_fds[1], |
| 1473 stdout=temp_fds[2], |
| 1474 stderr=temp_fds[0]) |
| 1475 p.wait() |
| 1476 finally: |
| 1477 # restore the original fd's underneath sys.stdin, etc. |
| 1478 for std, saved in enumerate(saved_fds): |
| 1479 os.dup2(saved, std) |
| 1480 os.close(saved) |
| 1481 |
| 1482 for fd in temp_fds: |
| 1483 os.lseek(fd, 0, 0) |
| 1484 |
| 1485 out = os.read(temp_fds[2], 1024) |
| 1486 err = os.read(temp_fds[0], 1024) |
| 1487 self.assertEqual(out, "got STDIN") |
| 1488 self.assertStderrEqual(err, "err") |
| 1489 |
| 1490 finally: |
| 1491 for fd in temp_fds: |
| 1492 os.close(fd) |
| 1493 |
| 1494 # NOTE: test_surrogates_error_message makes no sense on python 2.x. omitted. |
| 1495 # NOTE: test_undecodable_env makes no sense on python 2.x. omitted. |
| 1496 # NOTE: test_bytes_program makes no sense on python 2.x. omitted. |
| 1497 |
| 1498 def test_fs_encode_unicode_error(self): |
| 1499 fs_encoding = sys.getfilesystemencoding() |
| 1500 if fs_encoding.upper() not in ("ANSI_X3.4-1968", "ASCII"): |
| 1501 self.skipTest( |
| 1502 "Requires a restictive sys.filesystemencoding(), " |
| 1503 "not %s. Run python with LANG=C" % fs_encoding) |
| 1504 highbit_executable_name = os.path.join( |
| 1505 test_support.findfile("testdata"), u"Does\\Not\uDCff\\Exist") |
| 1506 try: |
| 1507 subprocess.call([highbit_executable_name]) |
| 1508 except UnicodeEncodeError: |
| 1509 return |
| 1510 except RuntimeError, e: |
| 1511 # The ProcessTestCasePOSIXPurePython version ends up here. It |
| 1512 # can't re-construct the unicode error from the child because it |
| 1513 # doesn't have all the arguments. BFD. One doesn't use |
| 1514 # subprocess32 for the old pure python implementation... |
| 1515 if "UnicodeEncodeError" not in str(e): |
| 1516 self.fail("Expected a RuntimeError whining about how a " |
| 1517 "UnicodeEncodeError from the child could not " |
| 1518 "be reraised. Not: %s" % e) |
| 1519 return |
| 1520 self.fail("Expected a UnicodeEncodeError to be raised.") |
| 1521 |
| 1522 def test_pipe_cloexec(self): |
| 1523 sleeper = test_support.findfile("testdata/input_reader.py") |
| 1524 fd_status = test_support.findfile("testdata/fd_status.py") |
| 1525 |
| 1526 p1 = subprocess.Popen([sys.executable, sleeper], |
| 1527 stdin=subprocess.PIPE, stdout=subprocess.PIPE, |
| 1528 stderr=subprocess.PIPE, close_fds=False) |
| 1529 |
| 1530 self.addCleanup(p1.communicate, '') |
| 1531 |
| 1532 p2 = subprocess.Popen([sys.executable, fd_status], |
| 1533 stdout=subprocess.PIPE, close_fds=False) |
| 1534 |
| 1535 output, error = p2.communicate() |
| 1536 result_fds = set(map(int, output.split(','))) |
| 1537 unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(), |
| 1538 p1.stderr.fileno()]) |
| 1539 |
| 1540 self.assertFalse(result_fds & unwanted_fds, |
| 1541 "Expected no fds from %r to be open in child, " |
| 1542 "found %r" % |
| 1543 (unwanted_fds, result_fds & unwanted_fds)) |
| 1544 |
| 1545 def test_pipe_cloexec_real_tools(self): |
| 1546 qcat = test_support.findfile("testdata/qcat.py") |
| 1547 qgrep = test_support.findfile("testdata/qgrep.py") |
| 1548 |
| 1549 subdata = 'zxcvbn' |
| 1550 data = subdata * 4 + '\n' |
| 1551 |
| 1552 p1 = subprocess.Popen([sys.executable, qcat], |
| 1553 stdin=subprocess.PIPE, stdout=subprocess.PIPE, |
| 1554 close_fds=False) |
| 1555 |
| 1556 p2 = subprocess.Popen([sys.executable, qgrep, subdata], |
| 1557 stdin=p1.stdout, stdout=subprocess.PIPE, |
| 1558 close_fds=False) |
| 1559 |
| 1560 self.addCleanup(p1.wait) |
| 1561 self.addCleanup(p2.wait) |
| 1562 def kill_p1(): |
| 1563 try: |
| 1564 p1.terminate() |
| 1565 except ProcessLookupError: |
| 1566 pass |
| 1567 def kill_p2(): |
| 1568 try: |
| 1569 p2.terminate() |
| 1570 except ProcessLookupError: |
| 1571 pass |
| 1572 self.addCleanup(kill_p1) |
| 1573 self.addCleanup(kill_p2) |
| 1574 |
| 1575 p1.stdin.write(data) |
| 1576 p1.stdin.close() |
| 1577 |
| 1578 readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10) |
| 1579 |
| 1580 self.assertTrue(readfiles, "The child hung") |
| 1581 self.assertEqual(p2.stdout.read(), data) |
| 1582 |
| 1583 p1.stdout.close() |
| 1584 p2.stdout.close() |
| 1585 |
| 1586 def test_close_fds(self): |
| 1587 fd_status = test_support.findfile("testdata/fd_status.py") |
| 1588 |
| 1589 fds = os.pipe() |
| 1590 self.addCleanup(os.close, fds[0]) |
| 1591 self.addCleanup(os.close, fds[1]) |
| 1592 |
| 1593 open_fds = set(fds) |
| 1594 # add a bunch more fds |
| 1595 for _ in range(9): |
| 1596 fd = os.open("/dev/null", os.O_RDONLY) |
| 1597 self.addCleanup(os.close, fd) |
| 1598 open_fds.add(fd) |
| 1599 |
| 1600 p = subprocess.Popen([sys.executable, fd_status], |
| 1601 stdout=subprocess.PIPE, close_fds=False) |
| 1602 output, ignored = p.communicate() |
| 1603 remaining_fds = set(map(int, output.split(','))) |
| 1604 |
| 1605 self.assertEqual(remaining_fds & open_fds, open_fds, |
| 1606 "Some fds were closed") |
| 1607 |
| 1608 p = subprocess.Popen([sys.executable, fd_status], |
| 1609 stdout=subprocess.PIPE, close_fds=True) |
| 1610 output, ignored = p.communicate() |
| 1611 remaining_fds = set(map(int, output.split(','))) |
| 1612 |
| 1613 self.assertFalse(remaining_fds & open_fds, |
| 1614 "Some fds were left open") |
| 1615 self.assertIn(1, remaining_fds, "Subprocess failed") |
| 1616 |
| 1617 # Keep some of the fd's we opened open in the subprocess. |
| 1618 # This tests _posixsubprocess.c's proper handling of fds_to_keep. |
| 1619 fds_to_keep = set(open_fds.pop() for _ in range(8)) |
| 1620 p = subprocess.Popen([sys.executable, fd_status], |
| 1621 stdout=subprocess.PIPE, close_fds=True, |
| 1622 pass_fds=()) |
| 1623 output, ignored = p.communicate() |
| 1624 remaining_fds = set(map(int, output.split(','))) |
| 1625 |
| 1626 self.assertFalse(remaining_fds & fds_to_keep & open_fds, |
| 1627 "Some fds not in pass_fds were left open") |
| 1628 self.assertIn(1, remaining_fds, "Subprocess failed") |
| 1629 |
| 1630 def test_pass_fds(self): |
| 1631 fd_status = test_support.findfile("testdata/fd_status.py") |
| 1632 |
| 1633 open_fds = set() |
| 1634 |
| 1635 for x in range(5): |
| 1636 fds = os.pipe() |
| 1637 self.addCleanup(os.close, fds[0]) |
| 1638 self.addCleanup(os.close, fds[1]) |
| 1639 open_fds.update(fds) |
| 1640 |
| 1641 for fd in open_fds: |
| 1642 p = subprocess.Popen([sys.executable, fd_status], |
| 1643 stdout=subprocess.PIPE, close_fds=True, |
| 1644 pass_fds=(fd, )) |
| 1645 output, ignored = p.communicate() |
| 1646 |
| 1647 remaining_fds = set(map(int, output.split(','))) |
| 1648 to_be_closed = open_fds - set((fd,)) |
| 1649 |
| 1650 self.assertIn(fd, remaining_fds, "fd to be passed not passed") |
| 1651 self.assertFalse(remaining_fds & to_be_closed, |
| 1652 "fd to be closed passed") |
| 1653 |
| 1654 # Syntax requires Python 2.5, assertWarns requires Python 2.7. |
| 1655 #with self.assertWarns(RuntimeWarning) as context: |
| 1656 # self.assertFalse(subprocess.call( |
| 1657 # [sys.executable, "-c", "import sys; sys.exit(0)"], |
| 1658 # close_fds=False, pass_fds=(fd, ))) |
| 1659 #self.assertIn('overriding close_fds', str(context.warning)) |
| 1660 |
| 1661 def test_stdout_stdin_are_single_inout_fd(self): |
| 1662 inout = open(os.devnull, "r+") |
| 1663 try: |
| 1664 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)
"], |
| 1665 stdout=inout, stdin=inout) |
| 1666 p.wait() |
| 1667 finally: |
| 1668 inout.close() |
| 1669 |
| 1670 def test_stdout_stderr_are_single_inout_fd(self): |
| 1671 inout = open(os.devnull, "r+") |
| 1672 try: |
| 1673 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)
"], |
| 1674 stdout=inout, stderr=inout) |
| 1675 p.wait() |
| 1676 finally: |
| 1677 inout.close() |
| 1678 |
| 1679 def test_stderr_stdin_are_single_inout_fd(self): |
| 1680 inout = open(os.devnull, "r+") |
| 1681 try: |
| 1682 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)
"], |
| 1683 stderr=inout, stdin=inout) |
| 1684 p.wait() |
| 1685 finally: |
| 1686 inout.close() |
| 1687 |
| 1688 def test_wait_when_sigchild_ignored(self): |
| 1689 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. |
| 1690 sigchild_ignore = test_support.findfile("testdata/sigchild_ignore.py") |
| 1691 p = subprocess.Popen([sys.executable, sigchild_ignore], |
| 1692 stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 1693 stdout, stderr = p.communicate() |
| 1694 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" |
| 1695 " non-zero with this error:\n%s" % stderr) |
| 1696 |
| 1697 def test_select_unbuffered(self): |
| 1698 # Issue #11459: bufsize=0 should really set the pipes as |
| 1699 # unbuffered (and therefore let select() work properly). |
| 1700 p = subprocess.Popen([sys.executable, "-c", |
| 1701 'import sys;' |
| 1702 'sys.stdout.write("apple")'], |
| 1703 stdout=subprocess.PIPE, |
| 1704 bufsize=0) |
| 1705 f = p.stdout |
| 1706 self.addCleanup(f.close) |
| 1707 try: |
| 1708 self.assertEqual(f.read(4), "appl") |
| 1709 self.assertIn(f, select.select([f], [], [], 0.0)[0]) |
| 1710 finally: |
| 1711 p.wait() |
| 1712 |
| 1713 def test_zombie_fast_process_del(self): |
| 1714 # Issue #12650: on Unix, if Popen.__del__() was called before the |
| 1715 # process exited, it wouldn't be added to subprocess._active, and would |
| 1716 # remain a zombie. |
| 1717 # spawn a Popen, and delete its reference before it exits |
| 1718 p = subprocess.Popen([sys.executable, "-c", |
| 1719 'import sys, time;' |
| 1720 'time.sleep(0.2)'], |
| 1721 stdout=subprocess.PIPE, |
| 1722 stderr=subprocess.PIPE) |
| 1723 self.addCleanup(p.stdout.close) |
| 1724 self.addCleanup(p.stderr.close) |
| 1725 ident = id(p) |
| 1726 pid = p.pid |
| 1727 del p |
| 1728 # check that p is in the active processes list |
| 1729 self.assertIn(ident, [id(o) for o in subprocess._active]) |
| 1730 |
| 1731 def test_leak_fast_process_del_killed(self): |
| 1732 # Issue #12650: on Unix, if Popen.__del__() was called before the |
| 1733 # process exited, and the process got killed by a signal, it would never |
| 1734 # be removed from subprocess._active, which triggered a FD and memory |
| 1735 # leak. |
| 1736 # spawn a Popen, delete its reference and kill it |
| 1737 p = subprocess.Popen([sys.executable, "-c", |
| 1738 'import time;' |
| 1739 'time.sleep(3)'], |
| 1740 stdout=subprocess.PIPE, |
| 1741 stderr=subprocess.PIPE) |
| 1742 self.addCleanup(p.stdout.close) |
| 1743 self.addCleanup(p.stderr.close) |
| 1744 ident = id(p) |
| 1745 pid = p.pid |
| 1746 del p |
| 1747 os.kill(pid, signal.SIGKILL) |
| 1748 # check that p is in the active processes list |
| 1749 self.assertIn(ident, [id(o) for o in subprocess._active]) |
| 1750 |
| 1751 # let some time for the process to exit, and create a new Popen: this |
| 1752 # should trigger the wait() of p |
| 1753 time.sleep(0.2) |
| 1754 try: |
| 1755 proc = subprocess.Popen(['nonexisting_i_hope'], |
| 1756 stdout=subprocess.PIPE, |
| 1757 stderr=subprocess.PIPE) |
| 1758 proc.__exit__(None, None, None) |
| 1759 except EnvironmentError: |
| 1760 pass |
| 1761 else: |
| 1762 self.fail("EnvironmentError not raised.") |
| 1763 # p should have been wait()ed on, and removed from the _active list |
| 1764 self.assertRaises(OSError, os.waitpid, pid, 0) |
| 1765 self.assertNotIn(ident, [id(o) for o in subprocess._active]) |
| 1766 |
| 1767 def test_close_fds_after_preexec(self): |
| 1768 fd_status = test_support.findfile("testdata/fd_status.py") |
| 1769 |
| 1770 # this FD is used as dup2() target by preexec_fn, and should be closed |
| 1771 # in the child process |
| 1772 fd = os.dup(1) |
| 1773 self.addCleanup(os.close, fd) |
| 1774 |
| 1775 p = subprocess.Popen([sys.executable, fd_status], |
| 1776 stdout=subprocess.PIPE, close_fds=True, |
| 1777 preexec_fn=lambda: os.dup2(1, fd)) |
| 1778 output, ignored = p.communicate() |
| 1779 |
| 1780 remaining_fds = set(map(int, output.split(','))) |
| 1781 |
| 1782 self.assertNotIn(fd, remaining_fds) |
| 1783 |
| 1784 |
| 1785 if mswindows: |
| 1786 class POSIXProcessTestCase(unittest.TestCase): pass |
| 1787 |
| 1788 |
| 1789 #@unittest.skipUnless(mswindows, "Windows specific tests") |
| 1790 class Win32ProcessTestCase(BaseTestCase): |
| 1791 |
| 1792 def test_startupinfo(self): |
| 1793 # startupinfo argument |
| 1794 # We uses hardcoded constants, because we do not want to |
| 1795 # depend on win32all. |
| 1796 STARTF_USESHOWWINDOW = 1 |
| 1797 SW_MAXIMIZE = 3 |
| 1798 startupinfo = subprocess.STARTUPINFO() |
| 1799 startupinfo.dwFlags = STARTF_USESHOWWINDOW |
| 1800 startupinfo.wShowWindow = SW_MAXIMIZE |
| 1801 # Since Python is a console process, it won't be affected |
| 1802 # by wShowWindow, but the argument should be silently |
| 1803 # ignored |
| 1804 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], |
| 1805 startupinfo=startupinfo) |
| 1806 |
| 1807 def test_creationflags(self): |
| 1808 # creationflags argument |
| 1809 CREATE_NEW_CONSOLE = 16 |
| 1810 sys.stderr.write(" a DOS box should flash briefly ...\n") |
| 1811 subprocess.call(sys.executable + |
| 1812 ' -c "import time; time.sleep(0.25)"', |
| 1813 creationflags=CREATE_NEW_CONSOLE) |
| 1814 |
| 1815 def test_invalid_args(self): |
| 1816 # invalid arguments should raise ValueError |
| 1817 self.assertRaises(ValueError, subprocess.call, |
| 1818 [sys.executable, "-c", |
| 1819 "import sys; sys.exit(47)"], |
| 1820 preexec_fn=lambda: 1) |
| 1821 self.assertRaises(ValueError, subprocess.call, |
| 1822 [sys.executable, "-c", |
| 1823 "import sys; sys.exit(47)"], |
| 1824 stdout=subprocess.PIPE, |
| 1825 close_fds=True) |
| 1826 |
| 1827 def test_close_fds(self): |
| 1828 # close file descriptors |
| 1829 rc = subprocess.call([sys.executable, "-c", |
| 1830 "import sys; sys.exit(47)"], |
| 1831 close_fds=True) |
| 1832 self.assertEqual(rc, 47) |
| 1833 |
| 1834 def test_shell_sequence(self): |
| 1835 # Run command through the shell (sequence) |
| 1836 newenv = os.environ.copy() |
| 1837 newenv["FRUIT"] = "physalis" |
| 1838 p = subprocess.Popen(["set"], shell=1, |
| 1839 stdout=subprocess.PIPE, |
| 1840 env=newenv) |
| 1841 self.assertIn("physalis", p.stdout.read()) |
| 1842 |
| 1843 def test_shell_string(self): |
| 1844 # Run command through the shell (string) |
| 1845 newenv = os.environ.copy() |
| 1846 newenv["FRUIT"] = "physalis" |
| 1847 p = subprocess.Popen("set", shell=1, |
| 1848 stdout=subprocess.PIPE, |
| 1849 env=newenv) |
| 1850 self.assertIn("physalis", p.stdout.read()) |
| 1851 |
| 1852 def test_call_string(self): |
| 1853 # call() function with string argument on Windows |
| 1854 rc = subprocess.call(sys.executable + |
| 1855 ' -c "import sys; sys.exit(47)"') |
| 1856 self.assertEqual(rc, 47) |
| 1857 |
| 1858 def _kill_process(self, method, *args): |
| 1859 # Some win32 buildbot raises EOFError if stdin is inherited |
| 1860 p = subprocess.Popen([sys.executable, "-c", "input()"], |
| 1861 stdin=subprocess.PIPE, stderr=subprocess.PIPE) |
| 1862 |
| 1863 # Let the process initialize (Issue #3137) |
| 1864 time.sleep(0.1) |
| 1865 # The process should not terminate prematurely |
| 1866 self.assert_(p.poll() is None) |
| 1867 # Retry if the process do not receive the signal. |
| 1868 count, maxcount = 0, 3 |
| 1869 while count < maxcount and p.poll() is None: |
| 1870 getattr(p, method)(*args) |
| 1871 time.sleep(0.1) |
| 1872 count += 1 |
| 1873 |
| 1874 returncode = p.poll() |
| 1875 self.assert_(returncode is not None, "the subprocess did not terminate") |
| 1876 if count > 1: |
| 1877 print >>sys.stderr, ("p.{}{} succeeded after " |
| 1878 "{} attempts".format(method, args, count)) |
| 1879 _, stderr = p.communicate() |
| 1880 self.assertStderrEqual(stderr, '') |
| 1881 self.assertEqual(p.wait(), returncode) |
| 1882 self.assertNotEqual(returncode, 0) |
| 1883 |
| 1884 def test_send_signal(self): |
| 1885 self._kill_process('send_signal', signal.SIGTERM) |
| 1886 |
| 1887 def test_kill(self): |
| 1888 self._kill_process('kill') |
| 1889 |
| 1890 def test_terminate(self): |
| 1891 self._kill_process('terminate') |
| 1892 |
| 1893 |
| 1894 if not mswindows: |
| 1895 class Win32ProcessTestCase(unittest.TestCase): pass |
| 1896 |
| 1897 |
| 1898 #@unittest.skipUnless(getattr(subprocess, '_has_poll', False), |
| 1899 # "poll system call not supported") |
| 1900 class ProcessTestCaseNoPoll(ProcessTestCase): |
| 1901 def setUp(self): |
| 1902 subprocess._has_poll = False |
| 1903 ProcessTestCase.setUp(self) |
| 1904 |
| 1905 def tearDown(self): |
| 1906 subprocess._has_poll = True |
| 1907 ProcessTestCase.tearDown(self) |
| 1908 |
| 1909 |
| 1910 if not getattr(subprocess, '_has_poll', False): |
| 1911 class ProcessTestCaseNoPoll(unittest.TestCase): pass |
| 1912 |
| 1913 |
| 1914 #@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False), |
| 1915 # "_posixsubprocess extension module not found.") |
| 1916 class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase): |
| 1917 def setUp(self): |
| 1918 subprocess._posixsubprocess = None |
| 1919 ProcessTestCase.setUp(self) |
| 1920 POSIXProcessTestCase.setUp(self) |
| 1921 |
| 1922 def tearDown(self): |
| 1923 subprocess._posixsubprocess = sys.modules['_posixsubprocess'] |
| 1924 POSIXProcessTestCase.tearDown(self) |
| 1925 ProcessTestCase.tearDown(self) |
| 1926 |
| 1927 |
| 1928 if not getattr(subprocess, '_posixsubprocess', False): |
| 1929 print >>sys.stderr, "_posixsubprocess extension module not found." |
| 1930 class ProcessTestCasePOSIXPurePython(unittest.TestCase): pass |
| 1931 |
| 1932 |
| 1933 class HelperFunctionTests(unittest.TestCase): |
| 1934 #@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") |
| 1935 def test_eintr_retry_call(self): |
| 1936 record_calls = [] |
| 1937 def fake_os_func(*args): |
| 1938 record_calls.append(args) |
| 1939 if len(record_calls) == 2: |
| 1940 raise OSError(errno.EINTR, "fake interrupted system call") |
| 1941 return tuple(reversed(args)) |
| 1942 |
| 1943 self.assertEqual((999, 256), |
| 1944 subprocess._eintr_retry_call(fake_os_func, 256, 999)) |
| 1945 self.assertEqual([(256, 999)], record_calls) |
| 1946 # This time there will be an EINTR so it will loop once. |
| 1947 self.assertEqual((666,), |
| 1948 subprocess._eintr_retry_call(fake_os_func, 666)) |
| 1949 self.assertEqual([(256, 999), (666,), (666,)], record_calls) |
| 1950 |
| 1951 if mswindows: |
| 1952 del test_eintr_retry_call |
| 1953 |
| 1954 if not hasattr(unittest.TestCase, 'assertSequenceEqual'): |
| 1955 def assertSequenceEqual(self, seq1, seq2): |
| 1956 self.assertEqual(list(seq1), list(seq2)) |
| 1957 |
| 1958 def test_get_exec_path(self): |
| 1959 defpath_list = os.defpath.split(os.pathsep) |
| 1960 test_path = ['/monty', '/python', '', '/flying/circus'] |
| 1961 test_env = {'PATH': os.pathsep.join(test_path)} |
| 1962 |
| 1963 get_exec_path = subprocess._get_exec_path |
| 1964 saved_environ = os.environ |
| 1965 try: |
| 1966 os.environ = dict(test_env) |
| 1967 # Test that defaulting to os.environ works. |
| 1968 self.assertSequenceEqual(test_path, get_exec_path()) |
| 1969 self.assertSequenceEqual(test_path, get_exec_path(env=None)) |
| 1970 finally: |
| 1971 os.environ = saved_environ |
| 1972 |
| 1973 # No PATH environment variable |
| 1974 self.assertSequenceEqual(defpath_list, get_exec_path({})) |
| 1975 # Empty PATH environment variable |
| 1976 self.assertSequenceEqual(('',), get_exec_path({'PATH':''})) |
| 1977 # Supplied PATH environment variable |
| 1978 self.assertSequenceEqual(test_path, get_exec_path(test_env)) |
| 1979 |
| 1980 |
| 1981 def reap_children(): |
| 1982 """Use this function at the end of test_main() whenever sub-processes |
| 1983 are started. This will help ensure that no extra children (zombies) |
| 1984 stick around to hog resources and create problems when looking |
| 1985 for refleaks. |
| 1986 """ |
| 1987 |
| 1988 # Reap all our dead child processes so we don't leave zombies around. |
| 1989 # These hog resources and might be causing some of the buildbots to die. |
| 1990 if hasattr(os, 'waitpid'): |
| 1991 any_process = -1 |
| 1992 while True: |
| 1993 try: |
| 1994 # This will raise an exception on Windows. That's ok. |
| 1995 pid, status = os.waitpid(any_process, os.WNOHANG) |
| 1996 if pid == 0: |
| 1997 break |
| 1998 except: |
| 1999 break |
| 2000 |
| 2001 |
| 2002 |
| 2003 class ContextManagerTests(BaseTestCase): |
| 2004 |
| 2005 def test_pipe(self): |
| 2006 proc = subprocess.Popen([sys.executable, "-c", |
| 2007 "import sys;" |
| 2008 "sys.stdout.write('stdout');" |
| 2009 "sys.stderr.write('stderr');"], |
| 2010 stdout=subprocess.PIPE, |
| 2011 stderr=subprocess.PIPE) |
| 2012 try: |
| 2013 self.assertEqual(proc.stdout.read(), "stdout") |
| 2014 self.assertStderrEqual(proc.stderr.read(), "stderr") |
| 2015 finally: |
| 2016 proc.__exit__(None, None, None) |
| 2017 |
| 2018 self.assertTrue(proc.stdout.closed) |
| 2019 self.assertTrue(proc.stderr.closed) |
| 2020 |
| 2021 def test_returncode(self): |
| 2022 proc = subprocess.Popen([sys.executable, "-c", |
| 2023 "import sys; sys.exit(100)"]) |
| 2024 proc.__exit__(None, None, None) |
| 2025 # __exit__ calls wait(), so the returncode should be set |
| 2026 self.assertEqual(proc.returncode, 100) |
| 2027 |
| 2028 def test_communicate_stdin(self): |
| 2029 proc = subprocess.Popen([sys.executable, "-c", |
| 2030 "import sys;" |
| 2031 "sys.exit(sys.stdin.read() == 'context')"], |
| 2032 stdin=subprocess.PIPE) |
| 2033 try: |
| 2034 proc.communicate("context") |
| 2035 self.assertEqual(proc.returncode, 1) |
| 2036 finally: |
| 2037 proc.__exit__(None, None, None) |
| 2038 |
| 2039 def test_invalid_args(self): |
| 2040 try: |
| 2041 proc = subprocess.Popen(['nonexisting_i_hope'], |
| 2042 stdout=subprocess.PIPE, |
| 2043 stderr=subprocess.PIPE) |
| 2044 proc.__exit__(None, None, None) |
| 2045 except EnvironmentError, exception: |
| 2046 # ignore errors that indicate the command was not found |
| 2047 if exception.errno not in (errno.ENOENT, errno.EACCES): |
| 2048 raise |
| 2049 else: |
| 2050 self.fail("Expected an EnvironmentError exception.") |
| 2051 |
| 2052 |
| 2053 if sys.version_info[:2] <= (2,4): |
| 2054 # The test suite hangs during the pure python test on 2.4. No idea why. |
| 2055 # That is not the implementation anyone is using this module for anyways. |
| 2056 class ProcessTestCasePOSIXPurePython(unittest.TestCase): pass |
| 2057 |
| 2058 |
| 2059 def test_main(): |
| 2060 unit_tests = (ProcessTestCase, |
| 2061 POSIXProcessTestCase, |
| 2062 Win32ProcessTestCase, |
| 2063 ProcessTestCasePOSIXPurePython, |
| 2064 ProcessTestCaseNoPoll, |
| 2065 HelperFunctionTests, |
| 2066 ContextManagerTests) |
| 2067 |
| 2068 test_support.run_unittest(*unit_tests) |
| 2069 reap_children() |
| 2070 |
| 2071 if __name__ == "__main__": |
| 2072 test_main() |
| OLD | NEW |