OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Unit tests for subprocess2.py.""" | 6 """Unit tests for subprocess2.py.""" |
7 | 7 |
| 8 import cStringIO |
8 import logging | 9 import logging |
9 import optparse | 10 import optparse |
10 import os | 11 import os |
11 import sys | 12 import sys |
12 import time | 13 import time |
13 import unittest | 14 import unittest |
14 | 15 |
15 try: | 16 try: |
16 import fcntl # pylint: disable=F0401 | 17 import fcntl # pylint: disable=F0401 |
17 except ImportError: | 18 except ImportError: |
(...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
158 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) | 159 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) |
159 expected = { | 160 expected = { |
160 'args': ['foo'], | 161 'args': ['foo'], |
161 'a':True, | 162 'a':True, |
162 'stdin': subprocess2.VOID, | 163 'stdin': subprocess2.VOID, |
163 'stdout': subprocess2.PIPE, | 164 'stdout': subprocess2.PIPE, |
164 } | 165 } |
165 self.assertEquals(expected, results) | 166 self.assertEquals(expected, results) |
166 | 167 |
167 | 168 |
168 class BaseTestCase(unittest.TestCase): | 169 class BaseTestCase(auto_stub.TestCase): |
169 def setUp(self): | 170 def setUp(self): |
170 super(BaseTestCase, self).setUp() | 171 super(BaseTestCase, self).setUp() |
171 self.exe_path = __file__ | 172 self.exe_path = __file__ |
172 self.exe = [sys.executable, self.exe_path, '--child'] | 173 self.exe = [sys.executable, self.exe_path, '--child'] |
173 self.states = {} | 174 self.states = {} |
174 if fcntl: | 175 if fcntl: |
175 for v in (sys.stdin, sys.stdout, sys.stderr): | 176 for v in (sys.stdin, sys.stdout, sys.stderr): |
176 fileno = v.fileno() | 177 fileno = v.fileno() |
177 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) | 178 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) |
178 | 179 |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
229 self.assertEquals(stderr, e.stderr) | 230 self.assertEquals(stderr, e.stderr) |
230 else: | 231 else: |
231 self.fail() | 232 self.fail() |
232 | 233 |
233 def test_check_output_no_stdout(self): | 234 def test_check_output_no_stdout(self): |
234 try: | 235 try: |
235 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) | 236 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) |
236 self.fail() | 237 self.fail() |
237 except ValueError: | 238 except ValueError: |
238 pass | 239 pass |
239 | 240 |
240 if (sys.version_info[0] * 10 + sys.version_info[1]) >= 27: | 241 if (sys.version_info[0] * 10 + sys.version_info[1]) >= 27: |
241 # python 2.7+ | 242 # python 2.7+ |
242 try: | 243 try: |
243 # pylint: disable=E1101 | 244 # pylint: disable=E1101 |
244 subprocess.check_output(self.exe, stdout=subprocess.PIPE) | 245 subprocess.check_output(self.exe, stdout=subprocess.PIPE) |
245 self.fail() | 246 self.fail() |
246 except ValueError: | 247 except ValueError: |
247 pass | 248 pass |
248 | 249 |
249 def test_check_output_throw_stdout(self): | 250 def test_check_output_throw_stdout(self): |
(...skipping 360 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
610 proc = subprocess2.Popen( | 611 proc = subprocess2.Popen( |
611 self.exe + ['--stdout', '--sleep_first'], stdout=PIPE) | 612 self.exe + ['--stdout', '--sleep_first'], stdout=PIPE) |
612 res = proc.communicate(nag_timer=3), proc.returncode | 613 res = proc.communicate(nag_timer=3), proc.returncode |
613 l.removeFilter(f) | 614 l.removeFilter(f) |
614 self._check_res(res, 'A\nBB\nCCC\n', None, 0) | 615 self._check_res(res, 'A\nBB\nCCC\n', None, 0) |
615 expected = ['No output for 3 seconds from command:', proc.cmd_str, | 616 expected = ['No output for 3 seconds from command:', proc.cmd_str, |
616 'No output for 6 seconds from command:', proc.cmd_str, | 617 'No output for 6 seconds from command:', proc.cmd_str, |
617 'No output for 9 seconds from command:', proc.cmd_str] | 618 'No output for 9 seconds from command:', proc.cmd_str] |
618 self.assertEquals(w, expected) | 619 self.assertEquals(w, expected) |
619 | 620 |
620 | 621 def test_communicate_and_stream(self): |
| 622 self.mock(sys, 'stdout', cStringIO.StringIO()) |
| 623 self.mock(sys, 'stderr', cStringIO.StringIO()) |
| 624 (stdout, stderr), returncode = subprocess2.communicate_and_stream( |
| 625 self.exe + ['--stderr', '--stdout', '--fail']) |
| 626 self.assertEquals(returncode, 64) |
| 627 self.assertEquals('A\nBB\nCCC\n', sys.stdout.getvalue()) |
| 628 self.assertEquals('A\nBB\nCCC\n', stdout) |
| 629 self.assertEquals('a\nbb\nccc\n', sys.stderr.getvalue()) |
| 630 self.assertEquals('a\nbb\nccc\n', stderr) |
| 631 |
| 632 def test_communicate_and_stream_partial_pipe(self): |
| 633 self.mock(sys, 'stdout', cStringIO.StringIO()) |
| 634 self.mock(sys, 'stderr', cStringIO.StringIO()) |
| 635 (stdout, stderr), returncode = subprocess2.communicate_and_stream( |
| 636 self.exe + ['--stderr', '--stdout'], stdout=PIPE) |
| 637 self.assertEquals(returncode, 0) |
| 638 self.assertEquals('', sys.stdout.getvalue()) |
| 639 self.assertEquals('A\nBB\nCCC\n', stdout) |
| 640 self.assertEquals('a\nbb\nccc\n', sys.stderr.getvalue()) |
| 641 self.assertEquals('a\nbb\nccc\n', stderr) |
| 642 |
| 643 def test_communicate_and_stream_partial_void(self): |
| 644 self.mock(sys, 'stdout', cStringIO.StringIO()) |
| 645 self.mock(sys, 'stderr', cStringIO.StringIO()) |
| 646 (stdout, stderr), returncode = subprocess2.communicate_and_stream( |
| 647 self.exe + ['--stderr', '--stdout', '--fail'], stderr=VOID) |
| 648 self.assertEquals(returncode, 64) |
| 649 self.assertEquals('A\nBB\nCCC\n', sys.stdout.getvalue()) |
| 650 self.assertEquals('A\nBB\nCCC\n', stdout) |
| 651 self.assertEquals('', sys.stderr.getvalue()) |
| 652 self.assertIsNone(stderr) |
| 653 |
| 654 def test_communicate_and_stream_redirect(self): |
| 655 self.mock(sys, 'stdout', cStringIO.StringIO()) |
| 656 self.mock(sys, 'stderr', cStringIO.StringIO()) |
| 657 (stdout, stderr), returncode = subprocess2.communicate_and_stream( |
| 658 self.exe + ['--stderr', '--stdout', '--fail'], stderr=STDOUT) |
| 659 self.assertEquals(returncode, 64) |
| 660 self.assertEquals('', sys.stderr.getvalue()) |
| 661 self.assertIsNone(stderr) |
| 662 self.assertEquals(sys.stdout.getvalue(), stdout) |
| 663 # Because stderr/stdout is interwoven arbitrarily, |
| 664 # just check that no output is missed, but not the order. |
| 665 self.assertEquals(sorted(stdout), sorted('A\nBB\nCCC\n' + 'a\nbb\nccc\n')) |
| 666 |
| 667 |
| 668 |
621 def child_main(args): | 669 def child_main(args): |
622 if sys.platform == 'win32': | 670 if sys.platform == 'win32': |
623 # Annoying, make sure the output is not translated on Windows. | 671 # Annoying, make sure the output is not translated on Windows. |
624 # pylint: disable=E1101,F0401 | 672 # pylint: disable=E1101,F0401 |
625 import msvcrt | 673 import msvcrt |
626 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) | 674 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) |
627 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) | 675 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) |
628 | 676 |
629 parser = optparse.OptionParser() | 677 parser = optparse.OptionParser() |
630 parser.add_option( | 678 parser.add_option( |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
676 return options.return_value | 724 return options.return_value |
677 | 725 |
678 | 726 |
679 if __name__ == '__main__': | 727 if __name__ == '__main__': |
680 logging.basicConfig(level= | 728 logging.basicConfig(level= |
681 [logging.WARNING, logging.INFO, logging.DEBUG][ | 729 [logging.WARNING, logging.INFO, logging.DEBUG][ |
682 min(2, sys.argv.count('-v'))]) | 730 min(2, sys.argv.count('-v'))]) |
683 if len(sys.argv) > 1 and sys.argv[1] == '--child': | 731 if len(sys.argv) > 1 and sys.argv[1] == '--child': |
684 sys.exit(child_main(sys.argv[2:])) | 732 sys.exit(child_main(sys.argv[2:])) |
685 unittest.main() | 733 unittest.main() |
OLD | NEW |