Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(65)

Side by Side Diff: tools/telemetry/third_party/subprocess32/test_subprocess32.py

Issue 1323303002: [Telemetry] Add timeout to telemetry run_tests (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 import unittest
2 from test import test_support
3 import subprocess32
4 subprocess = subprocess32
5 import sys
6 import signal
7 import os
8 import errno
9 import tempfile
10 import time
11 try:
12 import threading
13 except ImportError:
14 threading = None
15 import re
16 #import sysconfig
17 import select
18 import shutil
19 try:
20 import gc
21 except ImportError:
22 gc = None
23
24 mswindows = (sys.platform == "win32")
25
26 #
27 # Depends on the following external programs: Python
28 #
29
30 if mswindows:
31 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
32 'os.O_BINARY);')
33 else:
34 SETBINARY = ''
35
36
37 try:
38 mkstemp = tempfile.mkstemp
39 except AttributeError:
40 # tempfile.mkstemp is not available
41 def mkstemp():
42 """Replacement for mkstemp, calling mktemp."""
43 fname = tempfile.mktemp()
44 return os.open(fname, os.O_RDWR|os.O_CREAT), fname
45
46 try:
47 strip_python_stderr = test_support.strip_python_stderr
48 except AttributeError:
49 # Copied from the test.test_support module in 2.7.
50 def strip_python_stderr(stderr):
51 """Strip the stderr of a Python process from potential debug output
52 emitted by the interpreter.
53
54 This will typically be run on the result of the communicate() method
55 of a subprocess.Popen object.
56 """
57 stderr = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr).strip()
58 return stderr
59
60 class BaseTestCase(unittest.TestCase):
61 def setUp(self):
62 # Try to minimize the number of children we have so this test
63 # doesn't crash on some buildbots (Alphas in particular).
64 reap_children()
65 if not hasattr(unittest.TestCase, 'addCleanup'):
66 self._cleanups = []
67
68 def tearDown(self):
69 try:
70 for inst in subprocess._active:
71 inst.wait()
72 subprocess._cleanup()
73 self.assertFalse(subprocess._active, "subprocess._active not empty")
74 finally:
75 if self._use_our_own_cleanup_implementation:
76 self._doCleanups()
77
78 if not hasattr(unittest.TestCase, 'assertIn'):
79 def assertIn(self, a, b, msg=None):
80 self.assert_((a in b), msg or ('%r not in %r' % (a, b)))
81 def assertNotIn(self, a, b, msg=None):
82 self.assert_((a not in b), msg or ('%r in %r' % (a, b)))
83
84 if not hasattr(unittest.TestCase, 'skipTest'):
85 def skipTest(self, message):
86 """These will still fail but it'll be clear that it is okay."""
87 self.fail('SKIPPED - %s\n' % (message,))
88
89 def _addCleanup(self, function, *args, **kwargs):
90 """Add a function, with arguments, to be called when the test is
91 completed. Functions added are called on a LIFO basis and are
92 called after tearDown on test failure or success.
93
94 Unlike unittest2 or python 2.7, cleanups are not if setUp fails.
95 That is easier to implement in this subclass and is all we need.
96 """
97 self._cleanups.append((function, args, kwargs))
98
99 def _doCleanups(self):
100 """Execute all cleanup functions. Normally called for you after
101 tearDown."""
102 while self._cleanups:
103 function, args, kwargs = self._cleanups.pop()
104 try:
105 function(*args, **kwargs)
106 except KeyboardInterrupt:
107 raise
108 except:
109 pass
110
111 _use_our_own_cleanup_implementation = False
112 if not hasattr(unittest.TestCase, 'addCleanup'):
113 _use_our_own_cleanup_implementation = True
114 addCleanup = _addCleanup
115
116 def assertStderrEqual(self, stderr, expected, msg=None):
117 # In a debug build, stuff like "[6580 refs]" is printed to stderr at
118 # shutdown time. That frustrates tests trying to check stderr produced
119 # from a spawned Python process.
120 actual = strip_python_stderr(stderr)
121 # strip_python_stderr also strips whitespace, so we do too.
122 expected = expected.strip()
123 self.assertEqual(actual, expected, msg)
124
125
126 class PopenTestException(Exception):
127 pass
128
129
130 class PopenExecuteChildRaises(subprocess32.Popen):
131 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
132 _execute_child fails.
133 """
134 def _execute_child(self, *args, **kwargs):
135 raise PopenTestException("Forced Exception for Test")
136
137
138 class ProcessTestCase(BaseTestCase):
139
140 def test_call_seq(self):
141 # call() function with sequence argument
142 rc = subprocess.call([sys.executable, "-c",
143 "import sys; sys.exit(47)"])
144 self.assertEqual(rc, 47)
145
146 def test_call_timeout(self):
147 # call() function with timeout argument; we want to test that the child
148 # process gets killed when the timeout expires. If the child isn't
149 # killed, this call will deadlock since subprocess.call waits for the
150 # child.
151 self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
152 [sys.executable, "-c", "while True: pass"],
153 timeout=0.1)
154
155 def test_check_call_zero(self):
156 # check_call() function with zero return code
157 rc = subprocess.check_call([sys.executable, "-c",
158 "import sys; sys.exit(0)"])
159 self.assertEqual(rc, 0)
160
161 def test_check_call_nonzero(self):
162 # check_call() function with non-zero return code
163 try:
164 subprocess.check_call([sys.executable, "-c",
165 "import sys; sys.exit(47)"])
166 except subprocess.CalledProcessError, c:
167 self.assertEqual(c.returncode, 47)
168
169 def test_check_output(self):
170 # check_output() function with zero return code
171 output = subprocess.check_output(
172 [sys.executable, "-c", "print 'BDFL'"])
173 self.assertIn('BDFL', output)
174
175 def test_check_output_nonzero(self):
176 # check_call() function with non-zero return code
177 try:
178 subprocess.check_output(
179 [sys.executable, "-c", "import sys; sys.exit(5)"])
180 except subprocess.CalledProcessError, c:
181 self.assertEqual(c.returncode, 5)
182
183 def test_check_output_stderr(self):
184 # check_output() function stderr redirected to stdout
185 output = subprocess.check_output(
186 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
187 stderr=subprocess.STDOUT)
188 self.assertIn('BDFL', output)
189
190 def test_check_output_stdout_arg(self):
191 # check_output() function stderr redirected to stdout
192 try:
193 output = subprocess.check_output(
194 [sys.executable, "-c", "print 'will not be run'"],
195 stdout=sys.stdout)
196 self.fail("Expected ValueError when stdout arg supplied.")
197 except ValueError, c:
198 self.assertIn('stdout', c.args[0])
199
200 def test_check_output_timeout(self):
201 # check_output() function with timeout arg
202 try:
203 output = subprocess.check_output(
204 [sys.executable, "-c",
205 "import sys; sys.stdout.write('BDFL')\n"
206 "sys.stdout.flush()\n"
207 "while True: pass"],
208 timeout=0.5)
209 except subprocess.TimeoutExpired, exception:
210 self.assertEqual(exception.output, 'BDFL')
211 else:
212 self.fail("Expected TimeoutExpired.")
213
214 def test_call_kwargs(self):
215 # call() function with keyword args
216 newenv = os.environ.copy()
217 newenv["FRUIT"] = "banana"
218 rc = subprocess.call([sys.executable, "-c",
219 'import sys, os;'
220 'sys.exit(os.getenv("FRUIT")=="banana")'],
221 env=newenv)
222 self.assertEqual(rc, 1)
223
224 def test_stdin_none(self):
225 # .stdin is None when not redirected
226 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
227 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
228 p.wait()
229 self.assertEqual(p.stdin, None)
230
231 def test_stdout_none(self):
232 # .stdout is None when not redirected, and the child's stdout will
233 # be inherited from the parent. In order to test this we run a
234 # subprocess in a subprocess:
235 # this_test
236 # \-- subprocess created by this test (parent)
237 # \-- subprocess created by the parent subprocess (child)
238 # The parent doesn't specify stdout, so the child will use the
239 # parent's stdout. This test checks that the message printed by the
240 # child goes to the parent stdout. The parent also checks that the
241 # child's stdout is None. See #11963.
242 code = ('import sys; from subprocess32 import Popen, PIPE;'
243 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"], '
244 ' stdin=PIPE, stderr=PIPE);'
245 'p.wait(); assert p.stdout is None;')
246 p = subprocess.Popen([sys.executable, "-c", code],
247 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
248 self.addCleanup(p.stdout.close)
249 self.addCleanup(p.stderr.close)
250 out, err = p.communicate()
251 self.assertEqual(p.returncode, 0, err)
252 self.assertEqual(out.rstrip(), 'test_stdout_none')
253
254 def test_stderr_none(self):
255 # .stderr is None when not redirected
256 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
257 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
258 p.wait()
259 self.assertEqual(p.stderr, None)
260
261 # For use in the test_cwd* tests below.
262 def _normalize_cwd(self, cwd):
263 # Normalize an expected cwd (for Tru64 support).
264 # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
265 # strings. See bug #1063571.
266 original_cwd = os.getcwd()
267 os.chdir(cwd)
268 cwd = os.getcwd()
269 os.chdir(original_cwd)
270 return cwd
271
272 # For use in the test_cwd* tests below.
273 def _split_python_path(self):
274 # Return normalized (python_dir, python_base).
275 python_path = os.path.realpath(sys.executable)
276 return os.path.split(python_path)
277
278 # For use in the test_cwd* tests below.
279 def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
280 # Invoke Python via Popen, and assert that (1) the call succeeds,
281 # and that (2) the current working directory of the child process
282 # matches *expected_cwd*.
283 p = subprocess.Popen([python_arg, "-c",
284 "import os, sys; "
285 "sys.stdout.write(os.getcwd()); "
286 "sys.exit(47)"],
287 stdout=subprocess.PIPE,
288 **kwargs)
289 self.addCleanup(p.stdout.close)
290 p.wait()
291 self.assertEqual(47, p.returncode)
292 normcase = os.path.normcase
293 self.assertEqual(normcase(expected_cwd),
294 normcase(p.stdout.read().decode("utf-8")))
295
296 def test_cwd(self):
297 # Check that cwd changes the cwd for the child process.
298 temp_dir = tempfile.gettempdir()
299 temp_dir = self._normalize_cwd(temp_dir)
300 self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
301
302 #@unittest.skipIf(mswindows, "pending resolution of issue #15533")
303 def test_cwd_with_relative_arg(self):
304 # Check that Popen looks for args[0] relative to cwd if args[0]
305 # is relative.
306 python_dir, python_base = self._split_python_path()
307 rel_python = os.path.join(os.curdir, python_base)
308
309 path = 'tempcwd'
310 saved_dir = os.getcwd()
311 os.mkdir(path)
312 try:
313 os.chdir(path)
314 wrong_dir = os.getcwd()
315 # Before calling with the correct cwd, confirm that the call fails
316 # without cwd and with the wrong cwd.
317 self.assertRaises(OSError, subprocess.Popen,
318 [rel_python])
319 self.assertRaises(OSError, subprocess.Popen,
320 [rel_python], cwd=wrong_dir)
321 python_dir = self._normalize_cwd(python_dir)
322 self._assert_cwd(python_dir, rel_python, cwd=python_dir)
323 finally:
324 os.chdir(saved_dir)
325 shutil.rmtree(path)
326
327 #@unittest.skipIf(mswindows, "pending resolution of issue #15533")
328 def test_cwd_with_relative_executable(self):
329 # Check that Popen looks for executable relative to cwd if executable
330 # is relative (and that executable takes precedence over args[0]).
331 python_dir, python_base = self._split_python_path()
332 rel_python = os.path.join(os.curdir, python_base)
333 doesntexist = "somethingyoudonthave"
334
335 path = 'tempcwd'
336 saved_dir = os.getcwd()
337 os.mkdir(path)
338 try:
339 os.chdir(path)
340 wrong_dir = os.getcwd()
341 # Before calling with the correct cwd, confirm that the call fails
342 # without cwd and with the wrong cwd.
343 self.assertRaises(OSError, subprocess.Popen,
344 [doesntexist], executable=rel_python)
345 self.assertRaises(OSError, subprocess.Popen,
346 [doesntexist], executable=rel_python,
347 cwd=wrong_dir)
348 python_dir = self._normalize_cwd(python_dir)
349 self._assert_cwd(python_dir, doesntexist, executable=rel_python,
350 cwd=python_dir)
351 finally:
352 os.chdir(saved_dir)
353 shutil.rmtree(path)
354
355 def test_cwd_with_absolute_arg(self):
356 # Check that Popen can find the executable when the cwd is wrong
357 # if args[0] is an absolute path.
358 python_dir, python_base = self._split_python_path()
359 abs_python = os.path.join(python_dir, python_base)
360 rel_python = os.path.join(os.curdir, python_base)
361 wrong_dir = tempfile.mkdtemp()
362 wrong_dir = os.path.realpath(wrong_dir)
363 try:
364 # Before calling with an absolute path, confirm that using a
365 # relative path fails.
366 self.assertRaises(OSError, subprocess.Popen,
367 [rel_python], cwd=wrong_dir)
368 wrong_dir = self._normalize_cwd(wrong_dir)
369 self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
370 finally:
371 shutil.rmtree(wrong_dir)
372
373 def test_executable_with_cwd(self):
374 python_dir, python_base = self._split_python_path()
375 python_dir = self._normalize_cwd(python_dir)
376 self._assert_cwd(python_dir, "somethingyoudonthave",
377 executable=sys.executable, cwd=python_dir)
378
379 #@unittest.skipIf(sysconfig.is_python_build(),
380 # "need an installed Python. See #7774")
381 #def test_executable_without_cwd(self):
382 # # For a normal installation, it should work without 'cwd'
383 # # argument. For test runs in the build directory, see #7774.
384 # self._assert_cwd('', "somethingyoudonthave", executable=sys.executable)
385
386 def test_stdin_pipe(self):
387 # stdin redirection
388 p = subprocess.Popen([sys.executable, "-c",
389 'import sys; sys.exit(sys.stdin.read() == "pear")'],
390 stdin=subprocess.PIPE)
391 p.stdin.write("pear")
392 p.stdin.close()
393 p.wait()
394 self.assertEqual(p.returncode, 1)
395
396 def test_stdin_filedes(self):
397 # stdin is set to open file descriptor
398 tf = tempfile.TemporaryFile()
399 d = tf.fileno()
400 os.write(d, "pear")
401 os.lseek(d, 0, 0)
402 p = subprocess.Popen([sys.executable, "-c",
403 'import sys; sys.exit(sys.stdin.read() == "pear")'],
404 stdin=d)
405 p.wait()
406 self.assertEqual(p.returncode, 1)
407
408 def test_stdin_fileobj(self):
409 # stdin is set to open file object
410 tf = tempfile.TemporaryFile()
411 tf.write("pear")
412 tf.seek(0)
413 p = subprocess.Popen([sys.executable, "-c",
414 'import sys; sys.exit(sys.stdin.read() == "pear")'],
415 stdin=tf)
416 p.wait()
417 self.assertEqual(p.returncode, 1)
418
419 def test_stdout_pipe(self):
420 # stdout redirection
421 p = subprocess.Popen([sys.executable, "-c",
422 'import sys; sys.stdout.write("orange")'],
423 stdout=subprocess.PIPE)
424 self.assertEqual(p.stdout.read(), "orange")
425
426 def test_stdout_filedes(self):
427 # stdout is set to open file descriptor
428 tf = tempfile.TemporaryFile()
429 d = tf.fileno()
430 p = subprocess.Popen([sys.executable, "-c",
431 'import sys; sys.stdout.write("orange")'],
432 stdout=d)
433 p.wait()
434 os.lseek(d, 0, 0)
435 self.assertEqual(os.read(d, 1024), "orange")
436
437 def test_stdout_fileobj(self):
438 # stdout is set to open file object
439 tf = tempfile.TemporaryFile()
440 p = subprocess.Popen([sys.executable, "-c",
441 'import sys; sys.stdout.write("orange")'],
442 stdout=tf)
443 p.wait()
444 tf.seek(0)
445 self.assertEqual(tf.read(), "orange")
446
447 def test_stderr_pipe(self):
448 # stderr redirection
449 p = subprocess.Popen([sys.executable, "-c",
450 'import sys; sys.stderr.write("strawberry")'],
451 stderr=subprocess.PIPE)
452 self.assertStderrEqual(p.stderr.read(), "strawberry")
453
454 def test_stderr_filedes(self):
455 # stderr is set to open file descriptor
456 tf = tempfile.TemporaryFile()
457 d = tf.fileno()
458 p = subprocess.Popen([sys.executable, "-c",
459 'import sys; sys.stderr.write("strawberry")'],
460 stderr=d)
461 p.wait()
462 os.lseek(d, 0, 0)
463 self.assertStderrEqual(os.read(d, 1024), "strawberry")
464
465 def test_stderr_fileobj(self):
466 # stderr is set to open file object
467 tf = tempfile.TemporaryFile()
468 p = subprocess.Popen([sys.executable, "-c",
469 'import sys; sys.stderr.write("strawberry")'],
470 stderr=tf)
471 p.wait()
472 tf.seek(0)
473 self.assertStderrEqual(tf.read(), "strawberry")
474
475 def test_stdout_stderr_pipe(self):
476 # capture stdout and stderr to the same pipe
477 p = subprocess.Popen([sys.executable, "-c",
478 'import sys;'
479 'sys.stdout.write("apple");'
480 'sys.stdout.flush();'
481 'sys.stderr.write("orange")'],
482 stdout=subprocess.PIPE,
483 stderr=subprocess.STDOUT)
484 self.assertStderrEqual(p.stdout.read(), "appleorange")
485
486 def test_stdout_stderr_file(self):
487 # capture stdout and stderr to the same open file
488 tf = tempfile.TemporaryFile()
489 p = subprocess.Popen([sys.executable, "-c",
490 'import sys;'
491 'sys.stdout.write("apple");'
492 'sys.stdout.flush();'
493 'sys.stderr.write("orange")'],
494 stdout=tf,
495 stderr=tf)
496 p.wait()
497 tf.seek(0)
498 self.assertStderrEqual(tf.read(), "appleorange")
499
500 def test_stdout_filedes_of_stdout(self):
501 # stdout is set to 1 (#1531862).
502 # To avoid printing the text on stdout, we do something similar to
503 # test_stdout_none (see above). The parent subprocess calls the child
504 # subprocess passing stdout=1, and this test uses stdout=PIPE in
505 # order to capture and check the output of the parent. See #11963.
506 code = ('import sys, subprocess32; '
507 'rc = subprocess32.call([sys.executable, "-c", '
508 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
509 '\'test with stdout=1\'))"], stdout=1); '
510 'assert rc == 18')
511 p = subprocess.Popen([sys.executable, "-c", code],
512 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
513 self.addCleanup(p.stdout.close)
514 self.addCleanup(p.stderr.close)
515 out, err = p.communicate()
516 self.assertEqual(p.returncode, 0, err)
517 self.assertEqual(out.rstrip(), 'test with stdout=1')
518
519 def test_env(self):
520 newenv = os.environ.copy()
521 newenv["FRUIT"] = "orange"
522 p = subprocess.Popen([sys.executable, "-c",
523 'import sys,os;'
524 'sys.stdout.write(os.getenv("FRUIT"))'],
525 stdout=subprocess.PIPE,
526 env=newenv)
527 try:
528 stdout, stderr = p.communicate()
529 self.assertEqual(stdout, "orange")
530 finally:
531 p.__exit__(None, None, None)
532
533 def test_empty_env(self):
534 # Note: This excludes some __CF_* and VERSIONER_* keys OS X insists
535 # on adding even when the environment in exec is empty.
536 p = subprocess.Popen(
537 [sys.executable, "-c",
538 'import os; '
539 'print([k for k in os.environ.keys() '
540 ' if ("VERSIONER" not in k and "__CF" not in k)])'],
541 stdout=subprocess.PIPE, env={})
542 try:
543 stdout, stderr = p.communicate()
544 self.assertEqual(stdout.strip(), "[]")
545 finally:
546 p.__exit__(None, None, None)
547
548 def test_communicate_stdin(self):
549 p = subprocess.Popen([sys.executable, "-c",
550 'import sys;'
551 'sys.exit(sys.stdin.read() == "pear")'],
552 stdin=subprocess.PIPE)
553 p.communicate("pear")
554 self.assertEqual(p.returncode, 1)
555
556 def test_communicate_stdout(self):
557 p = subprocess.Popen([sys.executable, "-c",
558 'import sys; sys.stdout.write("pineapple")'],
559 stdout=subprocess.PIPE)
560 (stdout, stderr) = p.communicate()
561 self.assertEqual(stdout, "pineapple")
562 self.assertEqual(stderr, None)
563
564 def test_communicate_stderr(self):
565 p = subprocess.Popen([sys.executable, "-c",
566 'import sys; sys.stderr.write("pineapple")'],
567 stderr=subprocess.PIPE)
568 (stdout, stderr) = p.communicate()
569 self.assertEqual(stdout, None)
570 self.assertStderrEqual(stderr, "pineapple")
571
572 def test_communicate(self):
573 p = subprocess.Popen([sys.executable, "-c",
574 'import sys,os;'
575 'sys.stderr.write("pineapple");'
576 'sys.stdout.write(sys.stdin.read())'],
577 stdin=subprocess.PIPE,
578 stdout=subprocess.PIPE,
579 stderr=subprocess.PIPE)
580 (stdout, stderr) = p.communicate("banana")
581 self.assertEqual(stdout, "banana")
582 self.assertStderrEqual(stderr, "pineapple")
583
584 def test_communicate_timeout(self):
585 p = subprocess.Popen([sys.executable, "-c",
586 'import sys,os,time;'
587 'sys.stderr.write("pineapple\\n");'
588 'time.sleep(1);'
589 'sys.stderr.write("pear\\n");'
590 'sys.stdout.write(sys.stdin.read())'],
591 universal_newlines=True,
592 stdin=subprocess.PIPE,
593 stdout=subprocess.PIPE,
594 stderr=subprocess.PIPE)
595 self.assertRaises(subprocess.TimeoutExpired, p.communicate, u"banana",
596 timeout=0.3)
597 # Make sure we can keep waiting for it, and that we get the whole output
598 # after it completes.
599 (stdout, stderr) = p.communicate()
600 self.assertEqual(stdout, "banana")
601 self.assertStderrEqual(stderr.encode(), "pineapple\npear\n")
602
603 def test_communicate_timeout_large_ouput(self):
604 # Test a expring timeout while the child is outputting lots of data.
605 p = subprocess.Popen([sys.executable, "-c",
606 'import sys,os,time;'
607 'sys.stdout.write("a" * (64 * 1024));'
608 'time.sleep(0.2);'
609 'sys.stdout.write("a" * (64 * 1024));'
610 'time.sleep(0.2);'
611 'sys.stdout.write("a" * (64 * 1024));'
612 'time.sleep(0.2);'
613 'sys.stdout.write("a" * (64 * 1024));'],
614 stdout=subprocess.PIPE)
615 self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
616 (stdout, _) = p.communicate()
617 self.assertEqual(len(stdout), 4 * 64 * 1024)
618
619 # Test for the fd leak reported in http://bugs.python.org/issue2791.
620 def test_communicate_pipe_fd_leak(self):
621 for stdin_pipe in (False, True):
622 for stdout_pipe in (False, True):
623 for stderr_pipe in (False, True):
624 options = {}
625 if stdin_pipe:
626 options['stdin'] = subprocess.PIPE
627 if stdout_pipe:
628 options['stdout'] = subprocess.PIPE
629 if stderr_pipe:
630 options['stderr'] = subprocess.PIPE
631 if not options:
632 continue
633 p = subprocess.Popen((sys.executable, "-c", "pass"), **optio ns)
634 p.communicate()
635 if p.stdin is not None:
636 self.assertTrue(p.stdin.closed)
637 if p.stdout is not None:
638 self.assertTrue(p.stdout.closed)
639 if p.stderr is not None:
640 self.assertTrue(p.stderr.closed)
641
642 def test_communicate_returns(self):
643 # communicate() should return None if no redirection is active
644 p = subprocess.Popen([sys.executable, "-c",
645 "import sys; sys.exit(47)"])
646 (stdout, stderr) = p.communicate()
647 self.assertEqual(stdout, None)
648 self.assertEqual(stderr, None)
649
650 def test_communicate_pipe_buf(self):
651 # communicate() with writes larger than pipe_buf
652 # This test will probably deadlock rather than fail, if
653 # communicate() does not work properly.
654 x, y = os.pipe()
655 if mswindows:
656 pipe_buf = 512
657 else:
658 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
659 os.close(x)
660 os.close(y)
661 p = subprocess.Popen([sys.executable, "-c",
662 'import sys,os;'
663 'sys.stdout.write(sys.stdin.read(47));'
664 'sys.stderr.write("xyz"*%d);'
665 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
666 stdin=subprocess.PIPE,
667 stdout=subprocess.PIPE,
668 stderr=subprocess.PIPE)
669 string_to_write = "abc"*pipe_buf
670 (stdout, stderr) = p.communicate(string_to_write)
671 self.assertEqual(stdout, string_to_write)
672
673 def test_writes_before_communicate(self):
674 # stdin.write before communicate()
675 p = subprocess.Popen([sys.executable, "-c",
676 'import sys,os;'
677 'sys.stdout.write(sys.stdin.read())'],
678 stdin=subprocess.PIPE,
679 stdout=subprocess.PIPE,
680 stderr=subprocess.PIPE)
681 p.stdin.write("banana")
682 (stdout, stderr) = p.communicate("split")
683 self.assertEqual(stdout, "bananasplit")
684 self.assertStderrEqual(stderr, "")
685
686 def test_universal_newlines(self):
687 p = subprocess.Popen([sys.executable, "-c",
688 'import sys,os;' + SETBINARY +
689 'sys.stdout.write("line1\\n");'
690 'sys.stdout.flush();'
691 'sys.stdout.write("line2\\r");'
692 'sys.stdout.flush();'
693 'sys.stdout.write("line3\\r\\n");'
694 'sys.stdout.flush();'
695 'sys.stdout.write("line4\\r");'
696 'sys.stdout.flush();'
697 'sys.stdout.write("\\nline5");'
698 'sys.stdout.flush();'
699 'sys.stdout.write("\\nline6");'],
700 stdout=subprocess.PIPE,
701 universal_newlines=1)
702 stdout = p.stdout.read()
703 if hasattr(file, 'newlines'):
704 # Interpreter with universal newline support
705 self.assertEqual(stdout,
706 "line1\nline2\nline3\nline4\nline5\nline6")
707 else:
708 # Interpreter without universal newline support
709 self.assertEqual(stdout,
710 "line1\nline2\rline3\r\nline4\r\nline5\nline6")
711
712 def test_universal_newlines_communicate(self):
713 # universal newlines through communicate()
714 p = subprocess.Popen([sys.executable, "-c",
715 'import sys,os;' + SETBINARY +
716 'sys.stdout.write("line1\\n");'
717 'sys.stdout.flush();'
718 'sys.stdout.write("line2\\r");'
719 'sys.stdout.flush();'
720 'sys.stdout.write("line3\\r\\n");'
721 'sys.stdout.flush();'
722 'sys.stdout.write("line4\\r");'
723 'sys.stdout.flush();'
724 'sys.stdout.write("\\nline5");'
725 'sys.stdout.flush();'
726 'sys.stdout.write("\\nline6");'],
727 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
728 universal_newlines=1)
729 (stdout, stderr) = p.communicate()
730 if hasattr(file, 'newlines'):
731 # Interpreter with universal newline support
732 self.assertEqual(stdout,
733 "line1\nline2\nline3\nline4\nline5\nline6")
734 else:
735 # Interpreter without universal newline support
736 self.assertEqual(stdout,
737 "line1\nline2\rline3\r\nline4\r\nline5\nline6")
738
739 def test_universal_newlines_communicate_input_none(self):
740 # Test communicate(input=None) with universal newlines.
741 #
742 # We set stdout to PIPE because, as of this writing, a different
743 # code path is tested when the number of pipes is zero or one.
744 p = subprocess.Popen([sys.executable, "-c", "pass"],
745 stdin=subprocess.PIPE,
746 stdout=subprocess.PIPE,
747 universal_newlines=True)
748 p.communicate()
749 self.assertEqual(p.returncode, 0)
750
751 def test_no_leaking(self):
752 # Make sure we leak no resources
753 if not hasattr(test_support, "is_resource_enabled") \
754 or test_support.is_resource_enabled("subprocess") and not mswindo ws:
755 max_handles = 1026 # too much for most UNIX systems
756 else:
757 max_handles = 65
758 for i in range(max_handles):
759 p = subprocess.Popen([sys.executable, "-c",
760 "import sys;sys.stdout.write(sys.stdin.read())"],
761 stdin=subprocess.PIPE,
762 stdout=subprocess.PIPE,
763 stderr=subprocess.PIPE)
764 data = p.communicate("lime")[0]
765 self.assertEqual(data, "lime")
766
767 def test_universal_newlines_communicate_stdin_stdout_stderr(self):
768 # universal newlines through communicate(), with stdin, stdout, stderr
769 p = subprocess.Popen([sys.executable, "-c",
770 'import sys,os;' + SETBINARY + '''\nif True:
771 s = sys.stdin.readline()
772 sys.stdout.write(s)
773 sys.stdout.write("line2\\r")
774 sys.stderr.write("eline2\\n")
775 s = sys.stdin.read()
776 sys.stdout.write(s)
777 sys.stdout.write("line4\\n")
778 sys.stdout.write("line5\\r\\n")
779 sys.stderr.write("eline6\\r")
780 sys.stderr.write("eline7\\r\\nz")
781 '''],
782 stdin=subprocess.PIPE,
783 stderr=subprocess.PIPE,
784 stdout=subprocess.PIPE,
785 universal_newlines=True)
786 self.addCleanup(p.stdout.close)
787 self.addCleanup(p.stderr.close)
788 (stdout, stderr) = p.communicate(u"line1\nline3\n")
789 self.assertEqual(p.returncode, 0)
790 self.assertEqual(u"line1\nline2\nline3\nline4\nline5\n", stdout)
791 # Python debug build push something like "[42442 refs]\n"
792 # to stderr at exit of subprocess.
793 # Don't use assertStderrEqual because it strips CR and LF from output.
794 self.assertTrue(stderr.startswith(u"eline2\neline6\neline7\n"))
795
796 def test_list2cmdline(self):
797 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
798 '"a b c" d e')
799 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
800 'ab\\"c \\ d')
801 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
802 'ab\\"c " \\\\" d')
803 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
804 'a\\\\\\b "de fg" h')
805 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
806 'a\\\\\\"b c d')
807 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
808 '"a\\\\b c" d e')
809 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
810 '"a\\\\b\\ c" d e')
811 self.assertEqual(subprocess.list2cmdline(['ab', '']),
812 'ab ""')
813
814
815 def test_poll(self):
816 p = subprocess.Popen([sys.executable,
817 "-c", "import time; time.sleep(1)"])
818 count = 0
819 while p.poll() is None:
820 time.sleep(0.1)
821 count += 1
822 # We expect that the poll loop probably went around about 10 times,
823 # but, based on system scheduling we can't control, it's possible
824 # poll() never returned None. It "should be" very rare that it
825 # didn't go around at least twice.
826 self.assert_(count >= 2)
827 # Subsequent invocations should just return the returncode
828 self.assertEqual(p.poll(), 0)
829
830
831 def test_wait(self):
832 p = subprocess.Popen([sys.executable,
833 "-c", "import time; time.sleep(2)"])
834 self.assertEqual(p.wait(), 0)
835 # Subsequent invocations should just return the returncode
836 self.assertEqual(p.wait(), 0)
837
838
839 def test_wait_timeout(self):
840 p = subprocess.Popen([sys.executable,
841 "-c", "import time; time.sleep(0.1)"])
842 try:
843 p.wait(timeout=0.01)
844 except subprocess.TimeoutExpired, e:
845 self.assertIn("0.01", str(e)) # For coverage of __str__.
846 else:
847 self.fail("subprocess.TimeoutExpired expected but not raised.")
848 self.assertEqual(p.wait(timeout=2), 0)
849
850
851 def test_invalid_bufsize(self):
852 # an invalid type of the bufsize argument should raise
853 # TypeError.
854 try:
855 subprocess.Popen([sys.executable, "-c", "pass"], "orange")
856 except TypeError:
857 pass
858
859 def test_leaking_fds_on_error(self):
860 # see bug #5179: Popen leaks file descriptors to PIPEs if
861 # the child fails to execute; this will eventually exhaust
862 # the maximum number of open fds. 1024 seems a very common
863 # value for that limit, but Windows has 2048, so we loop
864 # 1024 times (each call leaked two fds).
865 for i in range(1024):
866 # Windows raises IOError. Others raise OSError.
867 try:
868 subprocess.Popen(['nonexisting_i_hope'],
869 stdout=subprocess.PIPE,
870 stderr=subprocess.PIPE)
871 except EnvironmentError, c:
872 if c.errno != 2: # ignore "no such file"
873 raise
874
875 #@unittest.skipIf(threading is None, "threading required")
876 def test_threadsafe_wait(self):
877 """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
878 proc = subprocess.Popen([sys.executable, '-c',
879 'import time; time.sleep(12)'])
880 self.assertEqual(proc.returncode, None)
881 results = []
882
883 def kill_proc_timer_thread():
884 results.append(('thread-start-poll-result', proc.poll()))
885 # terminate it from the thread and wait for the result.
886 proc.kill()
887 proc.wait()
888 results.append(('thread-after-kill-and-wait', proc.returncode))
889 # this wait should be a no-op given the above.
890 proc.wait()
891 results.append(('thread-after-second-wait', proc.returncode))
892
893 # This is a timing sensitive test, the failure mode is
894 # triggered when both the main thread and this thread are in
895 # the wait() call at once. The delay here is to allow the
896 # main thread to most likely be blocked in its wait() call.
897 t = threading.Timer(0.2, kill_proc_timer_thread)
898 t.start()
899
900 # Wait for the process to finish; the thread should kill it
901 # long before it finishes on its own. Supplying a timeout
902 # triggers a different code path for better coverage.
903 proc.wait(timeout=20)
904 # Should be -9 because of the proc.kill() from the thread.
905 self.assertEqual(proc.returncode, -9,
906 msg="unexpected result in wait from main thread")
907
908 # This should be a no-op with no change in returncode.
909 proc.wait()
910 self.assertEqual(proc.returncode, -9,
911 msg="unexpected result in second main wait.")
912
913 t.join()
914 # Ensure that all of the thread results are as expected.
915 # When a race condition occurs in wait(), the returncode could
916 # be set by the wrong thread that doesn't actually have it
917 # leading to an incorrect value.
918 self.assertEqual([('thread-start-poll-result', None),
919 ('thread-after-kill-and-wait', -9),
920 ('thread-after-second-wait', -9)],
921 results)
922
923 def test_issue8780(self):
924 # Ensure that stdout is inherited from the parent
925 # if stdout=PIPE is not used
926 code = ';'.join((
927 'import subprocess32, sys',
928 'retcode = subprocess32.call('
929 "[sys.executable, '-c', 'print(\"Hello World!\")'])",
930 'assert retcode == 0'))
931 output = subprocess.check_output([sys.executable, '-c', code])
932 self.assert_(output.startswith('Hello World!'), output)
933
934 def test_communicate_eintr(self):
935 # Issue #12493: communicate() should handle EINTR
936 def handler(signum, frame):
937 pass
938 old_handler = signal.signal(signal.SIGALRM, handler)
939 self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
940
941 # the process is running for 2 seconds
942 args = [sys.executable, "-c", 'import time; time.sleep(2)']
943 for stream in ('stdout', 'stderr'):
944 kw = {stream: subprocess.PIPE}
945 process = subprocess.Popen(args, **kw)
946 try:
947 signal.alarm(1)
948 # communicate() will be interrupted by SIGALRM
949 process.communicate()
950 finally:
951 process.__exit__(None, None, None)
952
953
954 # This test is Linux-ish specific for simplicity to at least have
955 # some coverage. It is not a platform specific bug.
956 #@unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
957 # "Linux specific")
958 def test_failed_child_execute_fd_leak(self):
959 """Test for the fork() failure fd leak reported in issue16327."""
960 if not os.path.isdir('/proc/%d/fd' % os.getpid()):
961 self.skipTest("Linux specific")
962 fd_directory = '/proc/%d/fd' % os.getpid()
963 fds_before_popen = os.listdir(fd_directory)
964 try:
965 PopenExecuteChildRaises(
966 [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
967 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
968 except PopenTestException:
969 pass # Yay! Because 2.4 doesn't support with statements.
970 else:
971 self.fail("PopenTestException expected but not raised.")
972
973 # NOTE: This test doesn't verify that the real _execute_child
974 # does not close the file descriptors itself on the way out
975 # during an exception. Code inspection has confirmed that.
976
977 fds_after_exception = os.listdir(fd_directory)
978 self.assertEqual(fds_before_popen, fds_after_exception)
979
980
981 # context manager
982 class _SuppressCoreFiles(object):
983 """Try to prevent core files from being created."""
984 old_limit = None
985
986 def __enter__(self):
987 """Try to save previous ulimit, then set it to (0, 0)."""
988 try:
989 import resource
990 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
991 resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
992 except (ImportError, ValueError, resource.error):
993 pass
994
995 def __exit__(self, *args):
996 """Return core file behavior to default."""
997 if self.old_limit is None:
998 return
999 try:
1000 import resource
1001 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
1002 except (ImportError, ValueError, resource.error):
1003 pass
1004
1005
1006 #@unittest.skipIf(mswindows, "POSIX specific tests")
1007 class POSIXProcessTestCase(BaseTestCase):
1008
1009 def setUp(self):
1010 BaseTestCase.setUp(self)
1011 self._nonexistent_dir = "/_this/pa.th/does/not/exist"
1012
1013 def _get_chdir_exception(self):
1014 try:
1015 os.chdir(self._nonexistent_dir)
1016 except OSError, e:
1017 # This avoids hard coding the errno value or the OS perror()
1018 # string and instead capture the exception that we want to see
1019 # below for comparison.
1020 desired_exception = e
1021 desired_exception.strerror += ': ' + repr(self._nonexistent_dir)
1022 else:
1023 self.fail("chdir to nonexistant directory %s succeeded." %
1024 self._nonexistent_dir)
1025 return desired_exception
1026
1027 def test_exception_cwd(self):
1028 """Test error in the child raised in the parent for a bad cwd."""
1029 desired_exception = self._get_chdir_exception()
1030 try:
1031 p = subprocess.Popen([sys.executable, "-c", ""],
1032 cwd=self._nonexistent_dir)
1033 except OSError, e:
1034 # Test that the child process chdir failure actually makes
1035 # it up to the parent process as the correct exception.
1036 self.assertEqual(desired_exception.errno, e.errno)
1037 self.assertEqual(desired_exception.strerror, e.strerror)
1038 else:
1039 self.fail("Expected OSError: %s" % desired_exception)
1040
1041 def test_exception_bad_executable(self):
1042 """Test error in the child raised in the parent for a bad executable."""
1043 desired_exception = self._get_chdir_exception()
1044 try:
1045 p = subprocess.Popen([sys.executable, "-c", ""],
1046 executable=self._nonexistent_dir)
1047 except OSError, e:
1048 # Test that the child process exec failure actually makes
1049 # it up to the parent process as the correct exception.
1050 self.assertEqual(desired_exception.errno, e.errno)
1051 self.assertEqual(desired_exception.strerror, e.strerror)
1052 else:
1053 self.fail("Expected OSError: %s" % desired_exception)
1054
1055 def test_exception_bad_args_0(self):
1056 """Test error in the child raised in the parent for a bad args[0]."""
1057 desired_exception = self._get_chdir_exception()
1058 try:
1059 p = subprocess.Popen([self._nonexistent_dir, "-c", ""])
1060 except OSError, e:
1061 # Test that the child process exec failure actually makes
1062 # it up to the parent process as the correct exception.
1063 self.assertEqual(desired_exception.errno, e.errno)
1064 self.assertEqual(desired_exception.strerror, e.strerror)
1065 else:
1066 self.fail("Expected OSError: %s" % desired_exception)
1067
1068 def test_restore_signals(self):
1069 # Code coverage for both values of restore_signals to make sure it
1070 # at least does not blow up.
1071 # A test for behavior would be complex. Contributions welcome.
1072 subprocess.call([sys.executable, "-c", ""], restore_signals=True)
1073 subprocess.call([sys.executable, "-c", ""], restore_signals=False)
1074
1075 def test_start_new_session(self):
1076 # For code coverage of calling setsid(). We don't care if we get an
1077 # EPERM error from it depending on the test execution environment, that
1078 # still indicates that it was called.
1079 try:
1080 output = subprocess.check_output(
1081 [sys.executable, "-c",
1082 "import os; print(os.getpgid(os.getpid()))"],
1083 start_new_session=True)
1084 except OSError, e:
1085 if e.errno != errno.EPERM:
1086 raise
1087 else:
1088 parent_pgid = os.getpgid(os.getpid())
1089 child_pgid = int(output)
1090 self.assertNotEqual(parent_pgid, child_pgid)
1091
1092 def test_run_abort(self):
1093 # returncode handles signal termination
1094 scf = _SuppressCoreFiles()
1095 scf.__enter__()
1096 try:
1097 p = subprocess.Popen([sys.executable, "-c",
1098 "import os; os.abort()"])
1099 p.wait()
1100 finally:
1101 scf.__exit__()
1102 self.assertEqual(-p.returncode, signal.SIGABRT)
1103
1104 def test_preexec(self):
1105 # DISCLAIMER: Setting environment variables is *not* a good use
1106 # of a preexec_fn. This is merely a test.
1107 p = subprocess.Popen([sys.executable, "-c",
1108 "import sys, os;"
1109 "sys.stdout.write(os.getenv('FRUIT'))"],
1110 stdout=subprocess.PIPE,
1111 preexec_fn=lambda: os.putenv("FRUIT", "apple"))
1112 self.assertEqual(p.stdout.read(), "apple")
1113
1114 def test_preexec_exception(self):
1115 def raise_it():
1116 raise ValueError("What if two swallows carried a coconut?")
1117 try:
1118 p = subprocess.Popen([sys.executable, "-c", ""],
1119 preexec_fn=raise_it)
1120 except RuntimeError, e:
1121 self.assertTrue(
1122 subprocess._posixsubprocess,
1123 "Expected a ValueError from the preexec_fn")
1124 except ValueError, e:
1125 self.assertIn("coconut", e.args[0])
1126 else:
1127 self.fail("Exception raised by preexec_fn did not make it "
1128 "to the parent process.")
1129
1130 class _TestExecuteChildPopen(subprocess.Popen):
1131 """Used to test behavior at the end of _execute_child."""
1132 def __init__(self, testcase, *args, **kwargs):
1133 self._testcase = testcase
1134 subprocess.Popen.__init__(self, *args, **kwargs)
1135
1136 def _execute_child(self, *args, **kwargs):
1137 try:
1138 subprocess.Popen._execute_child(self, *args, **kwargs)
1139 finally:
1140 # Open a bunch of file descriptors and verify that
1141 # none of them are the same as the ones the Popen
1142 # instance is using for stdin/stdout/stderr.
1143 devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
1144 for _ in range(8)]
1145 try:
1146 for fd in devzero_fds:
1147 self._testcase.assertNotIn(
1148 fd, (self.stdin.fileno(), self.stdout.fileno(),
1149 self.stderr.fileno()),
1150 msg="At least one fd was closed early.")
1151 finally:
1152 map(os.close, devzero_fds)
1153
1154 #@unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
1155 def test_preexec_errpipe_does_not_double_close_pipes(self):
1156 """Issue16140: Don't double close pipes on preexec error."""
1157
1158 def raise_it():
1159 raise RuntimeError("force the _execute_child() errpipe_data path.")
1160
1161 try:
1162 self._TestExecuteChildPopen(
1163 self, [sys.executable, "-c", "pass"],
1164 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1165 stderr=subprocess.PIPE, preexec_fn=raise_it)
1166 except RuntimeError:
1167 pass # Yay! Because 2.4 doesn't support with statements.
1168 else:
1169 self.fail("RuntimeError expected but not raised.")
1170
1171 #@unittest.skipUnless(gc, "Requires a gc module.")
1172 def test_preexec_gc_module_failure(self):
1173 # This tests the code that disables garbage collection if the child
1174 # process will execute any Python.
1175 def raise_runtime_error():
1176 raise RuntimeError("this shouldn't escape")
1177 enabled = gc.isenabled()
1178 orig_gc_disable = gc.disable
1179 orig_gc_isenabled = gc.isenabled
1180 try:
1181 gc.disable()
1182 self.assertFalse(gc.isenabled())
1183 subprocess.call([sys.executable, '-c', ''],
1184 preexec_fn=lambda: None)
1185 self.assertFalse(gc.isenabled(),
1186 "Popen enabled gc when it shouldn't.")
1187
1188 gc.enable()
1189 self.assertTrue(gc.isenabled())
1190 subprocess.call([sys.executable, '-c', ''],
1191 preexec_fn=lambda: None)
1192 self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
1193
1194 gc.disable = raise_runtime_error
1195 self.assertRaises(RuntimeError, subprocess.Popen,
1196 [sys.executable, '-c', ''],
1197 preexec_fn=lambda: None)
1198
1199 del gc.isenabled # force an AttributeError
1200 self.assertRaises(AttributeError, subprocess.Popen,
1201 [sys.executable, '-c', ''],
1202 preexec_fn=lambda: None)
1203 finally:
1204 gc.disable = orig_gc_disable
1205 gc.isenabled = orig_gc_isenabled
1206 if not enabled:
1207 gc.disable()
1208
1209 def test_args_string(self):
1210 # args is a string
1211 f, fname = mkstemp()
1212 os.write(f, "#!/bin/sh\n")
1213 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
1214 sys.executable)
1215 os.close(f)
1216 os.chmod(fname, 0700)
1217 p = subprocess.Popen(fname)
1218 p.wait()
1219 os.remove(fname)
1220 self.assertEqual(p.returncode, 47)
1221
1222 def test_invalid_args(self):
1223 # invalid arguments should raise ValueError
1224 self.assertRaises(ValueError, subprocess.call,
1225 [sys.executable, "-c",
1226 "import sys; sys.exit(47)"],
1227 startupinfo=47)
1228 self.assertRaises(ValueError, subprocess.call,
1229 [sys.executable, "-c",
1230 "import sys; sys.exit(47)"],
1231 creationflags=47)
1232
1233 def test_shell_sequence(self):
1234 # Run command through the shell (sequence)
1235 newenv = os.environ.copy()
1236 newenv["FRUIT"] = "apple"
1237 p = subprocess.Popen(["echo $FRUIT"], shell=1,
1238 stdout=subprocess.PIPE,
1239 env=newenv)
1240 self.assertEqual(p.stdout.read().strip(), "apple")
1241
1242 def test_shell_string(self):
1243 # Run command through the shell (string)
1244 newenv = os.environ.copy()
1245 newenv["FRUIT"] = "apple"
1246 p = subprocess.Popen("echo $FRUIT", shell=1,
1247 stdout=subprocess.PIPE,
1248 env=newenv)
1249 self.assertEqual(p.stdout.read().strip(), "apple")
1250
1251 def test_call_string(self):
1252 # call() function with string argument on UNIX
1253 f, fname = mkstemp()
1254 os.write(f, "#!/bin/sh\n")
1255 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
1256 sys.executable)
1257 os.close(f)
1258 os.chmod(fname, 0700)
1259 rc = subprocess.call(fname)
1260 os.remove(fname)
1261 self.assertEqual(rc, 47)
1262
1263 def test_specific_shell(self):
1264 # Issue #9265: Incorrect name passed as arg[0].
1265 shells = []
1266 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
1267 for name in ['bash', 'ksh']:
1268 sh = os.path.join(prefix, name)
1269 if os.path.isfile(sh):
1270 shells.append(sh)
1271 if not shells: # Will probably work for any shell but csh.
1272 self.skipTest("bash or ksh required for this test")
1273 sh = '/bin/sh'
1274 if os.path.isfile(sh) and not os.path.islink(sh):
1275 # Test will fail if /bin/sh is a symlink to csh.
1276 shells.append(sh)
1277 for sh in shells:
1278 p = subprocess.Popen("echo $0", executable=sh, shell=True,
1279 stdout=subprocess.PIPE)
1280 self.assertEqual(p.stdout.read().strip(), sh)
1281
1282 def _kill_process(self, method, *args):
1283 # Do not inherit file handles from the parent.
1284 # It should fix failures on some platforms.
1285 p = subprocess.Popen([sys.executable, "-c", """if 1:
1286 import sys, time
1287 sys.stdout.write('x\\n')
1288 sys.stdout.flush()
1289 time.sleep(30)
1290 """],
1291 close_fds=True,
1292 stdin=subprocess.PIPE,
1293 stdout=subprocess.PIPE,
1294 stderr=subprocess.PIPE)
1295 # Wait for the interpreter to be completely initialized before
1296 # sending any signal.
1297 p.stdout.read(1)
1298 getattr(p, method)(*args)
1299 return p
1300
1301 def test_send_signal(self):
1302 p = self._kill_process('send_signal', signal.SIGINT)
1303 _, stderr = p.communicate()
1304 self.assertIn('KeyboardInterrupt', stderr)
1305 self.assertNotEqual(p.wait(), 0)
1306
1307 def test_kill(self):
1308 p = self._kill_process('kill')
1309 _, stderr = p.communicate()
1310 self.assertStderrEqual(stderr, '')
1311 self.assertEqual(p.wait(), -signal.SIGKILL)
1312
1313 def test_terminate(self):
1314 p = self._kill_process('terminate')
1315 _, stderr = p.communicate()
1316 self.assertStderrEqual(stderr, '')
1317 self.assertEqual(p.wait(), -signal.SIGTERM)
1318
1319 def check_close_std_fds(self, fds):
1320 # Issue #9905: test that subprocess pipes still work properly with
1321 # some standard fds closed
1322 stdin = 0
1323 newfds = []
1324 for a in fds:
1325 b = os.dup(a)
1326 newfds.append(b)
1327 if a == 0:
1328 stdin = b
1329 try:
1330 for fd in fds:
1331 os.close(fd)
1332 out, err = subprocess.Popen([sys.executable, "-c",
1333 'import sys;'
1334 'sys.stdout.write("apple");'
1335 'sys.stdout.flush();'
1336 'sys.stderr.write("orange")'],
1337 stdin=stdin,
1338 stdout=subprocess.PIPE,
1339 stderr=subprocess.PIPE).communicate()
1340 err = strip_python_stderr(err)
1341 self.assertEqual((out, err), ('apple', 'orange'))
1342 finally:
1343 for b, a in zip(newfds, fds):
1344 os.dup2(b, a)
1345 for b in newfds:
1346 os.close(b)
1347
1348 def test_close_fd_0(self):
1349 self.check_close_std_fds([0])
1350
1351 def test_close_fd_1(self):
1352 self.check_close_std_fds([1])
1353
1354 def test_close_fd_2(self):
1355 self.check_close_std_fds([2])
1356
1357 def test_close_fds_0_1(self):
1358 self.check_close_std_fds([0, 1])
1359
1360 def test_close_fds_0_2(self):
1361 self.check_close_std_fds([0, 2])
1362
1363 def test_close_fds_1_2(self):
1364 self.check_close_std_fds([1, 2])
1365
1366 def test_close_fds_0_1_2(self):
1367 # Issue #10806: test that subprocess pipes still work properly with
1368 # all standard fds closed.
1369 self.check_close_std_fds([0, 1, 2])
1370
1371 def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
1372 # open up some temporary files
1373 temps = [mkstemp() for i in range(3)]
1374 temp_fds = [fd for fd, fname in temps]
1375 try:
1376 # unlink the files -- we won't need to reopen them
1377 for fd, fname in temps:
1378 os.unlink(fname)
1379
1380 # save a copy of the standard file descriptors
1381 saved_fds = [os.dup(fd) for fd in range(3)]
1382 try:
1383 # duplicate the temp files over the standard fd's 0, 1, 2
1384 for fd, temp_fd in enumerate(temp_fds):
1385 os.dup2(temp_fd, fd)
1386
1387 # write some data to what will become stdin, and rewind
1388 os.write(stdin_no, "STDIN")
1389 os.lseek(stdin_no, 0, 0)
1390
1391 # now use those files in the given order, so that subprocess
1392 # has to rearrange them in the child
1393 p = subprocess.Popen([sys.executable, "-c",
1394 'import sys; got = sys.stdin.read();'
1395 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
1396 stdin=stdin_no,
1397 stdout=stdout_no,
1398 stderr=stderr_no)
1399 p.wait()
1400
1401 for fd in temp_fds:
1402 os.lseek(fd, 0, 0)
1403
1404 out = os.read(stdout_no, 1024)
1405 err = os.read(stderr_no, 1024)
1406 finally:
1407 for std, saved in enumerate(saved_fds):
1408 os.dup2(saved, std)
1409 os.close(saved)
1410
1411 self.assertEqual(out, "got STDIN")
1412 self.assertStderrEqual(err, "err")
1413
1414 finally:
1415 for fd in temp_fds:
1416 os.close(fd)
1417
1418 # When duping fds, if there arises a situation where one of the fds is
1419 # either 0, 1 or 2, it is possible that it is overwritten (#12607).
1420 # This tests all combinations of this.
1421 def test_swap_fds(self):
1422 self.check_swap_fds(0, 1, 2)
1423 self.check_swap_fds(0, 2, 1)
1424 self.check_swap_fds(1, 0, 2)
1425 self.check_swap_fds(1, 2, 0)
1426 self.check_swap_fds(2, 0, 1)
1427 self.check_swap_fds(2, 1, 0)
1428
1429 def test_small_errpipe_write_fd(self):
1430 """Issue #15798: Popen should work when stdio fds are available."""
1431 new_stdin = os.dup(0)
1432 new_stdout = os.dup(1)
1433 try:
1434 os.close(0)
1435 os.close(1)
1436
1437 subprocess.Popen([
1438 sys.executable, "-c", "pass"]).wait()
1439 finally:
1440 # Restore original stdin and stdout
1441 os.dup2(new_stdin, 0)
1442 os.dup2(new_stdout, 1)
1443 os.close(new_stdin)
1444 os.close(new_stdout)
1445
1446 def test_remapping_std_fds(self):
1447 # open up some temporary files
1448 temps = [mkstemp() for i in range(3)]
1449 try:
1450 temp_fds = [fd for fd, fname in temps]
1451
1452 # unlink the files -- we won't need to reopen them
1453 for fd, fname in temps:
1454 os.unlink(fname)
1455
1456 # write some data to what will become stdin, and rewind
1457 os.write(temp_fds[1], "STDIN")
1458 os.lseek(temp_fds[1], 0, 0)
1459
1460 # move the standard file descriptors out of the way
1461 saved_fds = [os.dup(fd) for fd in range(3)]
1462 try:
1463 # duplicate the file objects over the standard fd's
1464 for fd, temp_fd in enumerate(temp_fds):
1465 os.dup2(temp_fd, fd)
1466
1467 # now use those files in the "wrong" order, so that subprocess
1468 # has to rearrange them in the child
1469 p = subprocess.Popen([sys.executable, "-c",
1470 'import sys; got = sys.stdin.read();'
1471 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
1472 stdin=temp_fds[1],
1473 stdout=temp_fds[2],
1474 stderr=temp_fds[0])
1475 p.wait()
1476 finally:
1477 # restore the original fd's underneath sys.stdin, etc.
1478 for std, saved in enumerate(saved_fds):
1479 os.dup2(saved, std)
1480 os.close(saved)
1481
1482 for fd in temp_fds:
1483 os.lseek(fd, 0, 0)
1484
1485 out = os.read(temp_fds[2], 1024)
1486 err = os.read(temp_fds[0], 1024)
1487 self.assertEqual(out, "got STDIN")
1488 self.assertStderrEqual(err, "err")
1489
1490 finally:
1491 for fd in temp_fds:
1492 os.close(fd)
1493
1494 # NOTE: test_surrogates_error_message makes no sense on python 2.x. omitted.
1495 # NOTE: test_undecodable_env makes no sense on python 2.x. omitted.
1496 # NOTE: test_bytes_program makes no sense on python 2.x. omitted.
1497
1498 def test_fs_encode_unicode_error(self):
1499 fs_encoding = sys.getfilesystemencoding()
1500 if fs_encoding.upper() not in ("ANSI_X3.4-1968", "ASCII"):
1501 self.skipTest(
1502 "Requires a restictive sys.filesystemencoding(), "
1503 "not %s. Run python with LANG=C" % fs_encoding)
1504 highbit_executable_name = os.path.join(
1505 test_support.findfile("testdata"), u"Does\\Not\uDCff\\Exist")
1506 try:
1507 subprocess.call([highbit_executable_name])
1508 except UnicodeEncodeError:
1509 return
1510 except RuntimeError, e:
1511 # The ProcessTestCasePOSIXPurePython version ends up here. It
1512 # can't re-construct the unicode error from the child because it
1513 # doesn't have all the arguments. BFD. One doesn't use
1514 # subprocess32 for the old pure python implementation...
1515 if "UnicodeEncodeError" not in str(e):
1516 self.fail("Expected a RuntimeError whining about how a "
1517 "UnicodeEncodeError from the child could not "
1518 "be reraised. Not: %s" % e)
1519 return
1520 self.fail("Expected a UnicodeEncodeError to be raised.")
1521
1522 def test_pipe_cloexec(self):
1523 sleeper = test_support.findfile("testdata/input_reader.py")
1524 fd_status = test_support.findfile("testdata/fd_status.py")
1525
1526 p1 = subprocess.Popen([sys.executable, sleeper],
1527 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1528 stderr=subprocess.PIPE, close_fds=False)
1529
1530 self.addCleanup(p1.communicate, '')
1531
1532 p2 = subprocess.Popen([sys.executable, fd_status],
1533 stdout=subprocess.PIPE, close_fds=False)
1534
1535 output, error = p2.communicate()
1536 result_fds = set(map(int, output.split(',')))
1537 unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
1538 p1.stderr.fileno()])
1539
1540 self.assertFalse(result_fds & unwanted_fds,
1541 "Expected no fds from %r to be open in child, "
1542 "found %r" %
1543 (unwanted_fds, result_fds & unwanted_fds))
1544
1545 def test_pipe_cloexec_real_tools(self):
1546 qcat = test_support.findfile("testdata/qcat.py")
1547 qgrep = test_support.findfile("testdata/qgrep.py")
1548
1549 subdata = 'zxcvbn'
1550 data = subdata * 4 + '\n'
1551
1552 p1 = subprocess.Popen([sys.executable, qcat],
1553 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1554 close_fds=False)
1555
1556 p2 = subprocess.Popen([sys.executable, qgrep, subdata],
1557 stdin=p1.stdout, stdout=subprocess.PIPE,
1558 close_fds=False)
1559
1560 self.addCleanup(p1.wait)
1561 self.addCleanup(p2.wait)
1562 def kill_p1():
1563 try:
1564 p1.terminate()
1565 except ProcessLookupError:
1566 pass
1567 def kill_p2():
1568 try:
1569 p2.terminate()
1570 except ProcessLookupError:
1571 pass
1572 self.addCleanup(kill_p1)
1573 self.addCleanup(kill_p2)
1574
1575 p1.stdin.write(data)
1576 p1.stdin.close()
1577
1578 readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
1579
1580 self.assertTrue(readfiles, "The child hung")
1581 self.assertEqual(p2.stdout.read(), data)
1582
1583 p1.stdout.close()
1584 p2.stdout.close()
1585
1586 def test_close_fds(self):
1587 fd_status = test_support.findfile("testdata/fd_status.py")
1588
1589 fds = os.pipe()
1590 self.addCleanup(os.close, fds[0])
1591 self.addCleanup(os.close, fds[1])
1592
1593 open_fds = set(fds)
1594 # add a bunch more fds
1595 for _ in range(9):
1596 fd = os.open("/dev/null", os.O_RDONLY)
1597 self.addCleanup(os.close, fd)
1598 open_fds.add(fd)
1599
1600 p = subprocess.Popen([sys.executable, fd_status],
1601 stdout=subprocess.PIPE, close_fds=False)
1602 output, ignored = p.communicate()
1603 remaining_fds = set(map(int, output.split(',')))
1604
1605 self.assertEqual(remaining_fds & open_fds, open_fds,
1606 "Some fds were closed")
1607
1608 p = subprocess.Popen([sys.executable, fd_status],
1609 stdout=subprocess.PIPE, close_fds=True)
1610 output, ignored = p.communicate()
1611 remaining_fds = set(map(int, output.split(',')))
1612
1613 self.assertFalse(remaining_fds & open_fds,
1614 "Some fds were left open")
1615 self.assertIn(1, remaining_fds, "Subprocess failed")
1616
1617 # Keep some of the fd's we opened open in the subprocess.
1618 # This tests _posixsubprocess.c's proper handling of fds_to_keep.
1619 fds_to_keep = set(open_fds.pop() for _ in range(8))
1620 p = subprocess.Popen([sys.executable, fd_status],
1621 stdout=subprocess.PIPE, close_fds=True,
1622 pass_fds=())
1623 output, ignored = p.communicate()
1624 remaining_fds = set(map(int, output.split(',')))
1625
1626 self.assertFalse(remaining_fds & fds_to_keep & open_fds,
1627 "Some fds not in pass_fds were left open")
1628 self.assertIn(1, remaining_fds, "Subprocess failed")
1629
1630 def test_pass_fds(self):
1631 fd_status = test_support.findfile("testdata/fd_status.py")
1632
1633 open_fds = set()
1634
1635 for x in range(5):
1636 fds = os.pipe()
1637 self.addCleanup(os.close, fds[0])
1638 self.addCleanup(os.close, fds[1])
1639 open_fds.update(fds)
1640
1641 for fd in open_fds:
1642 p = subprocess.Popen([sys.executable, fd_status],
1643 stdout=subprocess.PIPE, close_fds=True,
1644 pass_fds=(fd, ))
1645 output, ignored = p.communicate()
1646
1647 remaining_fds = set(map(int, output.split(',')))
1648 to_be_closed = open_fds - set((fd,))
1649
1650 self.assertIn(fd, remaining_fds, "fd to be passed not passed")
1651 self.assertFalse(remaining_fds & to_be_closed,
1652 "fd to be closed passed")
1653
1654 # Syntax requires Python 2.5, assertWarns requires Python 2.7.
1655 #with self.assertWarns(RuntimeWarning) as context:
1656 # self.assertFalse(subprocess.call(
1657 # [sys.executable, "-c", "import sys; sys.exit(0)"],
1658 # close_fds=False, pass_fds=(fd, )))
1659 #self.assertIn('overriding close_fds', str(context.warning))
1660
1661 def test_stdout_stdin_are_single_inout_fd(self):
1662 inout = open(os.devnull, "r+")
1663 try:
1664 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0) "],
1665 stdout=inout, stdin=inout)
1666 p.wait()
1667 finally:
1668 inout.close()
1669
1670 def test_stdout_stderr_are_single_inout_fd(self):
1671 inout = open(os.devnull, "r+")
1672 try:
1673 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0) "],
1674 stdout=inout, stderr=inout)
1675 p.wait()
1676 finally:
1677 inout.close()
1678
1679 def test_stderr_stdin_are_single_inout_fd(self):
1680 inout = open(os.devnull, "r+")
1681 try:
1682 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0) "],
1683 stderr=inout, stdin=inout)
1684 p.wait()
1685 finally:
1686 inout.close()
1687
1688 def test_wait_when_sigchild_ignored(self):
1689 # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
1690 sigchild_ignore = test_support.findfile("testdata/sigchild_ignore.py")
1691 p = subprocess.Popen([sys.executable, sigchild_ignore],
1692 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1693 stdout, stderr = p.communicate()
1694 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
1695 " non-zero with this error:\n%s" % stderr)
1696
1697 def test_select_unbuffered(self):
1698 # Issue #11459: bufsize=0 should really set the pipes as
1699 # unbuffered (and therefore let select() work properly).
1700 p = subprocess.Popen([sys.executable, "-c",
1701 'import sys;'
1702 'sys.stdout.write("apple")'],
1703 stdout=subprocess.PIPE,
1704 bufsize=0)
1705 f = p.stdout
1706 self.addCleanup(f.close)
1707 try:
1708 self.assertEqual(f.read(4), "appl")
1709 self.assertIn(f, select.select([f], [], [], 0.0)[0])
1710 finally:
1711 p.wait()
1712
1713 def test_zombie_fast_process_del(self):
1714 # Issue #12650: on Unix, if Popen.__del__() was called before the
1715 # process exited, it wouldn't be added to subprocess._active, and would
1716 # remain a zombie.
1717 # spawn a Popen, and delete its reference before it exits
1718 p = subprocess.Popen([sys.executable, "-c",
1719 'import sys, time;'
1720 'time.sleep(0.2)'],
1721 stdout=subprocess.PIPE,
1722 stderr=subprocess.PIPE)
1723 self.addCleanup(p.stdout.close)
1724 self.addCleanup(p.stderr.close)
1725 ident = id(p)
1726 pid = p.pid
1727 del p
1728 # check that p is in the active processes list
1729 self.assertIn(ident, [id(o) for o in subprocess._active])
1730
1731 def test_leak_fast_process_del_killed(self):
1732 # Issue #12650: on Unix, if Popen.__del__() was called before the
1733 # process exited, and the process got killed by a signal, it would never
1734 # be removed from subprocess._active, which triggered a FD and memory
1735 # leak.
1736 # spawn a Popen, delete its reference and kill it
1737 p = subprocess.Popen([sys.executable, "-c",
1738 'import time;'
1739 'time.sleep(3)'],
1740 stdout=subprocess.PIPE,
1741 stderr=subprocess.PIPE)
1742 self.addCleanup(p.stdout.close)
1743 self.addCleanup(p.stderr.close)
1744 ident = id(p)
1745 pid = p.pid
1746 del p
1747 os.kill(pid, signal.SIGKILL)
1748 # check that p is in the active processes list
1749 self.assertIn(ident, [id(o) for o in subprocess._active])
1750
1751 # let some time for the process to exit, and create a new Popen: this
1752 # should trigger the wait() of p
1753 time.sleep(0.2)
1754 try:
1755 proc = subprocess.Popen(['nonexisting_i_hope'],
1756 stdout=subprocess.PIPE,
1757 stderr=subprocess.PIPE)
1758 proc.__exit__(None, None, None)
1759 except EnvironmentError:
1760 pass
1761 else:
1762 self.fail("EnvironmentError not raised.")
1763 # p should have been wait()ed on, and removed from the _active list
1764 self.assertRaises(OSError, os.waitpid, pid, 0)
1765 self.assertNotIn(ident, [id(o) for o in subprocess._active])
1766
1767 def test_close_fds_after_preexec(self):
1768 fd_status = test_support.findfile("testdata/fd_status.py")
1769
1770 # this FD is used as dup2() target by preexec_fn, and should be closed
1771 # in the child process
1772 fd = os.dup(1)
1773 self.addCleanup(os.close, fd)
1774
1775 p = subprocess.Popen([sys.executable, fd_status],
1776 stdout=subprocess.PIPE, close_fds=True,
1777 preexec_fn=lambda: os.dup2(1, fd))
1778 output, ignored = p.communicate()
1779
1780 remaining_fds = set(map(int, output.split(',')))
1781
1782 self.assertNotIn(fd, remaining_fds)
1783
1784
1785 if mswindows:
1786 class POSIXProcessTestCase(unittest.TestCase): pass
1787
1788
1789 #@unittest.skipUnless(mswindows, "Windows specific tests")
1790 class Win32ProcessTestCase(BaseTestCase):
1791
1792 def test_startupinfo(self):
1793 # startupinfo argument
1794 # We uses hardcoded constants, because we do not want to
1795 # depend on win32all.
1796 STARTF_USESHOWWINDOW = 1
1797 SW_MAXIMIZE = 3
1798 startupinfo = subprocess.STARTUPINFO()
1799 startupinfo.dwFlags = STARTF_USESHOWWINDOW
1800 startupinfo.wShowWindow = SW_MAXIMIZE
1801 # Since Python is a console process, it won't be affected
1802 # by wShowWindow, but the argument should be silently
1803 # ignored
1804 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
1805 startupinfo=startupinfo)
1806
1807 def test_creationflags(self):
1808 # creationflags argument
1809 CREATE_NEW_CONSOLE = 16
1810 sys.stderr.write(" a DOS box should flash briefly ...\n")
1811 subprocess.call(sys.executable +
1812 ' -c "import time; time.sleep(0.25)"',
1813 creationflags=CREATE_NEW_CONSOLE)
1814
1815 def test_invalid_args(self):
1816 # invalid arguments should raise ValueError
1817 self.assertRaises(ValueError, subprocess.call,
1818 [sys.executable, "-c",
1819 "import sys; sys.exit(47)"],
1820 preexec_fn=lambda: 1)
1821 self.assertRaises(ValueError, subprocess.call,
1822 [sys.executable, "-c",
1823 "import sys; sys.exit(47)"],
1824 stdout=subprocess.PIPE,
1825 close_fds=True)
1826
1827 def test_close_fds(self):
1828 # close file descriptors
1829 rc = subprocess.call([sys.executable, "-c",
1830 "import sys; sys.exit(47)"],
1831 close_fds=True)
1832 self.assertEqual(rc, 47)
1833
1834 def test_shell_sequence(self):
1835 # Run command through the shell (sequence)
1836 newenv = os.environ.copy()
1837 newenv["FRUIT"] = "physalis"
1838 p = subprocess.Popen(["set"], shell=1,
1839 stdout=subprocess.PIPE,
1840 env=newenv)
1841 self.assertIn("physalis", p.stdout.read())
1842
1843 def test_shell_string(self):
1844 # Run command through the shell (string)
1845 newenv = os.environ.copy()
1846 newenv["FRUIT"] = "physalis"
1847 p = subprocess.Popen("set", shell=1,
1848 stdout=subprocess.PIPE,
1849 env=newenv)
1850 self.assertIn("physalis", p.stdout.read())
1851
1852 def test_call_string(self):
1853 # call() function with string argument on Windows
1854 rc = subprocess.call(sys.executable +
1855 ' -c "import sys; sys.exit(47)"')
1856 self.assertEqual(rc, 47)
1857
1858 def _kill_process(self, method, *args):
1859 # Some win32 buildbot raises EOFError if stdin is inherited
1860 p = subprocess.Popen([sys.executable, "-c", "input()"],
1861 stdin=subprocess.PIPE, stderr=subprocess.PIPE)
1862
1863 # Let the process initialize (Issue #3137)
1864 time.sleep(0.1)
1865 # The process should not terminate prematurely
1866 self.assert_(p.poll() is None)
1867 # Retry if the process do not receive the signal.
1868 count, maxcount = 0, 3
1869 while count < maxcount and p.poll() is None:
1870 getattr(p, method)(*args)
1871 time.sleep(0.1)
1872 count += 1
1873
1874 returncode = p.poll()
1875 self.assert_(returncode is not None, "the subprocess did not terminate")
1876 if count > 1:
1877 print >>sys.stderr, ("p.{}{} succeeded after "
1878 "{} attempts".format(method, args, count))
1879 _, stderr = p.communicate()
1880 self.assertStderrEqual(stderr, '')
1881 self.assertEqual(p.wait(), returncode)
1882 self.assertNotEqual(returncode, 0)
1883
1884 def test_send_signal(self):
1885 self._kill_process('send_signal', signal.SIGTERM)
1886
1887 def test_kill(self):
1888 self._kill_process('kill')
1889
1890 def test_terminate(self):
1891 self._kill_process('terminate')
1892
1893
1894 if not mswindows:
1895 class Win32ProcessTestCase(unittest.TestCase): pass
1896
1897
1898 #@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
1899 # "poll system call not supported")
1900 class ProcessTestCaseNoPoll(ProcessTestCase):
1901 def setUp(self):
1902 subprocess._has_poll = False
1903 ProcessTestCase.setUp(self)
1904
1905 def tearDown(self):
1906 subprocess._has_poll = True
1907 ProcessTestCase.tearDown(self)
1908
1909
1910 if not getattr(subprocess, '_has_poll', False):
1911 class ProcessTestCaseNoPoll(unittest.TestCase): pass
1912
1913
1914 #@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False),
1915 # "_posixsubprocess extension module not found.")
1916 class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase):
1917 def setUp(self):
1918 subprocess._posixsubprocess = None
1919 ProcessTestCase.setUp(self)
1920 POSIXProcessTestCase.setUp(self)
1921
1922 def tearDown(self):
1923 subprocess._posixsubprocess = sys.modules['_posixsubprocess']
1924 POSIXProcessTestCase.tearDown(self)
1925 ProcessTestCase.tearDown(self)
1926
1927
1928 if not getattr(subprocess, '_posixsubprocess', False):
1929 print >>sys.stderr, "_posixsubprocess extension module not found."
1930 class ProcessTestCasePOSIXPurePython(unittest.TestCase): pass
1931
1932
1933 class HelperFunctionTests(unittest.TestCase):
1934 #@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
1935 def test_eintr_retry_call(self):
1936 record_calls = []
1937 def fake_os_func(*args):
1938 record_calls.append(args)
1939 if len(record_calls) == 2:
1940 raise OSError(errno.EINTR, "fake interrupted system call")
1941 return tuple(reversed(args))
1942
1943 self.assertEqual((999, 256),
1944 subprocess._eintr_retry_call(fake_os_func, 256, 999))
1945 self.assertEqual([(256, 999)], record_calls)
1946 # This time there will be an EINTR so it will loop once.
1947 self.assertEqual((666,),
1948 subprocess._eintr_retry_call(fake_os_func, 666))
1949 self.assertEqual([(256, 999), (666,), (666,)], record_calls)
1950
1951 if mswindows:
1952 del test_eintr_retry_call
1953
1954 if not hasattr(unittest.TestCase, 'assertSequenceEqual'):
1955 def assertSequenceEqual(self, seq1, seq2):
1956 self.assertEqual(list(seq1), list(seq2))
1957
1958 def test_get_exec_path(self):
1959 defpath_list = os.defpath.split(os.pathsep)
1960 test_path = ['/monty', '/python', '', '/flying/circus']
1961 test_env = {'PATH': os.pathsep.join(test_path)}
1962
1963 get_exec_path = subprocess._get_exec_path
1964 saved_environ = os.environ
1965 try:
1966 os.environ = dict(test_env)
1967 # Test that defaulting to os.environ works.
1968 self.assertSequenceEqual(test_path, get_exec_path())
1969 self.assertSequenceEqual(test_path, get_exec_path(env=None))
1970 finally:
1971 os.environ = saved_environ
1972
1973 # No PATH environment variable
1974 self.assertSequenceEqual(defpath_list, get_exec_path({}))
1975 # Empty PATH environment variable
1976 self.assertSequenceEqual(('',), get_exec_path({'PATH':''}))
1977 # Supplied PATH environment variable
1978 self.assertSequenceEqual(test_path, get_exec_path(test_env))
1979
1980
1981 def reap_children():
1982 """Use this function at the end of test_main() whenever sub-processes
1983 are started. This will help ensure that no extra children (zombies)
1984 stick around to hog resources and create problems when looking
1985 for refleaks.
1986 """
1987
1988 # Reap all our dead child processes so we don't leave zombies around.
1989 # These hog resources and might be causing some of the buildbots to die.
1990 if hasattr(os, 'waitpid'):
1991 any_process = -1
1992 while True:
1993 try:
1994 # This will raise an exception on Windows. That's ok.
1995 pid, status = os.waitpid(any_process, os.WNOHANG)
1996 if pid == 0:
1997 break
1998 except:
1999 break
2000
2001
2002
2003 class ContextManagerTests(BaseTestCase):
2004
2005 def test_pipe(self):
2006 proc = subprocess.Popen([sys.executable, "-c",
2007 "import sys;"
2008 "sys.stdout.write('stdout');"
2009 "sys.stderr.write('stderr');"],
2010 stdout=subprocess.PIPE,
2011 stderr=subprocess.PIPE)
2012 try:
2013 self.assertEqual(proc.stdout.read(), "stdout")
2014 self.assertStderrEqual(proc.stderr.read(), "stderr")
2015 finally:
2016 proc.__exit__(None, None, None)
2017
2018 self.assertTrue(proc.stdout.closed)
2019 self.assertTrue(proc.stderr.closed)
2020
2021 def test_returncode(self):
2022 proc = subprocess.Popen([sys.executable, "-c",
2023 "import sys; sys.exit(100)"])
2024 proc.__exit__(None, None, None)
2025 # __exit__ calls wait(), so the returncode should be set
2026 self.assertEqual(proc.returncode, 100)
2027
2028 def test_communicate_stdin(self):
2029 proc = subprocess.Popen([sys.executable, "-c",
2030 "import sys;"
2031 "sys.exit(sys.stdin.read() == 'context')"],
2032 stdin=subprocess.PIPE)
2033 try:
2034 proc.communicate("context")
2035 self.assertEqual(proc.returncode, 1)
2036 finally:
2037 proc.__exit__(None, None, None)
2038
2039 def test_invalid_args(self):
2040 try:
2041 proc = subprocess.Popen(['nonexisting_i_hope'],
2042 stdout=subprocess.PIPE,
2043 stderr=subprocess.PIPE)
2044 proc.__exit__(None, None, None)
2045 except EnvironmentError, exception:
2046 # ignore errors that indicate the command was not found
2047 if exception.errno not in (errno.ENOENT, errno.EACCES):
2048 raise
2049 else:
2050 self.fail("Expected an EnvironmentError exception.")
2051
2052
2053 if sys.version_info[:2] <= (2,4):
2054 # The test suite hangs during the pure python test on 2.4. No idea why.
2055 # That is not the implementation anyone is using this module for anyways.
2056 class ProcessTestCasePOSIXPurePython(unittest.TestCase): pass
2057
2058
2059 def test_main():
2060 unit_tests = (ProcessTestCase,
2061 POSIXProcessTestCase,
2062 Win32ProcessTestCase,
2063 ProcessTestCasePOSIXPurePython,
2064 ProcessTestCaseNoPoll,
2065 HelperFunctionTests,
2066 ContextManagerTests)
2067
2068 test_support.run_unittest(*unit_tests)
2069 reap_children()
2070
2071 if __name__ == "__main__":
2072 test_main()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698