Chromium Code Reviews| Index: tests/subprocess2_test.py |
| diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py |
| index 54e4e15124ceed59d7622869aa379a52679e50ef..f6c23ed55b13999335adf99ef6b1d1cdfb25f043 100755 |
| --- a/tests/subprocess2_test.py |
| +++ b/tests/subprocess2_test.py |
| @@ -5,17 +5,44 @@ |
| """Unit tests for subprocess2.py.""" |
| +import logging |
| import optparse |
| import os |
| import sys |
| import time |
| import unittest |
| +try: |
| + import fcntl |
| +except ImportError: |
| + fcntl = None |
| + |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| import subprocess2 |
| -class Subprocess2Test(unittest.TestCase): |
| +# Method could be a function |
| +# pylint: disable=R0201 |
| + |
| + |
| +def convert_to_crlf(string): |
| + """Unconditionally convert LF to CRLF.""" |
| + return string.replace('\n', '\r\n') |
| + |
| + |
| +def convert_to_cr(string): |
| + """Unconditionally convert LF to CR.""" |
| + return string.replace('\n', '\r') |
| + |
| + |
| +def convert_win(string): |
| + """Converts string to CRLF on Windows only.""" |
| + if sys.platform == 'win32': |
| + return string.replace('\n', '\r\n') |
| + return string |
| + |
| + |
| +class DefaultsTest(unittest.TestCase): |
| # Can be mocked in a test. |
| TO_SAVE = { |
| subprocess2: [ |
| @@ -24,8 +51,6 @@ class Subprocess2Test(unittest.TestCase): |
| } |
| def setUp(self): |
| - self.exe_path = __file__ |
| - self.exe = [sys.executable, self.exe_path, '--child'] |
| self.saved = {} |
| for module, names in self.TO_SAVE.iteritems(): |
| self.saved[module] = dict( |
| @@ -144,14 +169,54 @@ class Subprocess2Test(unittest.TestCase): |
| } |
| self.assertEquals(expected, results) |
| + |
| +class S2Test(unittest.TestCase): |
|
Dirk Pranke
2011/11/11 20:48:20
Nit: might be better not to abbreviate this.
M-A Ruel
2011/11/11 20:49:18
I was tired of typing it when filtering on the com
|
| + def setUp(self): |
| + super(S2Test, self).setUp() |
| + self.exe_path = __file__ |
| + self.exe = [sys.executable, self.exe_path, '--child'] |
| + self.states = {} |
| + if fcntl: |
| + for v in (sys.stdin, sys.stdout, sys.stderr): |
| + fileno = v.fileno() |
| + self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) |
| + |
| + def tearDown(self): |
| + for fileno, fl in self.states.iteritems(): |
| + self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) |
| + super(S2Test, self).tearDown() |
| + |
| + def _run_test(self, function): |
| + """Runs tests in 6 combinations: |
| + - LF output with universal_newlines=False |
| + - CR output with universal_newlines=False |
| + - CRLF output with universal_newlines=False |
| + - LF output with universal_newlines=True |
| + - CR output with universal_newlines=True |
| + - CRLF output with universal_newlines=True |
| + |
| + First |function| argument is the convertion for the origianl expected LF |
| + string to the right EOL. |
| + Second |function| argument is the executable and initial flag to run, to |
| + control what EOL is used by the child process. |
| + Third |function| argument is universal_newlines value. |
| + """ |
| + noop = lambda x: x |
| + function(noop, self.exe, False) |
| + function(convert_to_cr, self.exe + ['--cr'], False) |
| + function(convert_to_crlf, self.exe + ['--crlf'], False) |
| + function(noop, self.exe, True) |
| + function(noop, self.exe + ['--cr'], True) |
| + function(noop, self.exe + ['--crlf'], True) |
| + |
| def test_timeout(self): |
| - # It'd be better to not discard stdout. |
| out, returncode = subprocess2.communicate( |
| - self.exe + ['--sleep', '--stdout'], |
| + self.exe + ['--sleep_first', '--stdout'], |
| timeout=0.01, |
| - stdout=subprocess2.PIPE) |
| + stdout=subprocess2.PIPE, |
| + shell=False) |
| self.assertEquals(subprocess2.TIMED_OUT, returncode) |
| - self.assertEquals(['', None], out) |
| + self.assertEquals(('', None), out) |
| def test_check_output_no_stdout(self): |
| try: |
| @@ -161,68 +226,78 @@ class Subprocess2Test(unittest.TestCase): |
| pass |
| def test_stdout_void(self): |
| - (out, err), code = subprocess2.communicate( |
| - self.exe + ['--stdout', '--stderr'], |
| - stdout=subprocess2.VOID, |
| - stderr=subprocess2.PIPE) |
| - self.assertEquals(None, out) |
| - expected = 'a\nbb\nccc\n' |
| - if sys.platform == 'win32': |
| - expected = expected.replace('\n', '\r\n') |
| - self.assertEquals(expected, err) |
| - self.assertEquals(0, code) |
| + def fn(c, e, un): |
| + (out, err), code = subprocess2.communicate( |
| + e + ['--stdout', '--stderr'], |
| + stdout=subprocess2.VOID, |
| + stderr=subprocess2.PIPE, |
| + universal_newlines=un) |
| + self.assertEquals(None, out) |
| + self.assertEquals(c('a\nbb\nccc\n'), err) |
| + self.assertEquals(0, code) |
| + self._run_test(fn) |
| def test_stderr_void(self): |
| - (out, err), code = subprocess2.communicate( |
| - self.exe + ['--stdout', '--stderr'], |
| - universal_newlines=True, |
| - stdout=subprocess2.PIPE, |
| - stderr=subprocess2.VOID) |
| - self.assertEquals('A\nBB\nCCC\n', out) |
| - self.assertEquals(None, err) |
| - self.assertEquals(0, code) |
| + def fn(c, e, un): |
| + (out, err), code = subprocess2.communicate( |
| + e + ['--stdout', '--stderr'], |
| + stdout=subprocess2.PIPE, |
| + stderr=subprocess2.VOID, |
| + universal_newlines=un) |
| + self.assertEquals(c('A\nBB\nCCC\n'), out) |
| + self.assertEquals(None, err) |
| + self.assertEquals(0, code) |
| + self._run_test(fn) |
| def test_check_output_throw_stdout(self): |
| - try: |
| - subprocess2.check_output( |
| - self.exe + ['--fail', '--stdout'], universal_newlines=True) |
| - self.fail() |
| - except subprocess2.CalledProcessError, e: |
| - self.assertEquals('A\nBB\nCCC\n', e.stdout) |
| - self.assertEquals(None, e.stderr) |
| - self.assertEquals(64, e.returncode) |
| + def fn(c, e, un): |
| + try: |
| + subprocess2.check_output( |
| + e + ['--fail', '--stdout'], universal_newlines=un) |
| + self.fail() |
| + except subprocess2.CalledProcessError, e: |
| + self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) |
| + self.assertEquals(None, e.stderr) |
| + self.assertEquals(64, e.returncode) |
| + self._run_test(fn) |
| def test_check_output_throw_no_stderr(self): |
| - try: |
| - subprocess2.check_output( |
| - self.exe + ['--fail', '--stderr'], universal_newlines=True) |
| - self.fail() |
| - except subprocess2.CalledProcessError, e: |
| - self.assertEquals('', e.stdout) |
| - self.assertEquals(None, e.stderr) |
| - self.assertEquals(64, e.returncode) |
| + def fn(c, e, un): |
| + try: |
| + subprocess2.check_output( |
| + e + ['--fail', '--stderr'], universal_newlines=un) |
| + self.fail() |
| + except subprocess2.CalledProcessError, e: |
| + self.assertEquals(c(''), e.stdout) |
| + self.assertEquals(None, e.stderr) |
| + self.assertEquals(64, e.returncode) |
| + self._run_test(fn) |
| def test_check_output_throw_stderr(self): |
| - try: |
| - subprocess2.check_output( |
| - self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE, |
| - universal_newlines=True) |
| - self.fail() |
| - except subprocess2.CalledProcessError, e: |
| - self.assertEquals('', e.stdout) |
| - self.assertEquals('a\nbb\nccc\n', e.stderr) |
| - self.assertEquals(64, e.returncode) |
| + def fn(c, e, un): |
| + try: |
| + subprocess2.check_output( |
| + e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, |
| + universal_newlines=un) |
| + self.fail() |
| + except subprocess2.CalledProcessError, e: |
| + self.assertEquals('', e.stdout) |
| + self.assertEquals(c('a\nbb\nccc\n'), e.stderr) |
| + self.assertEquals(64, e.returncode) |
| + self._run_test(fn) |
| def test_check_output_throw_stderr_stdout(self): |
| - try: |
| - subprocess2.check_output( |
| - self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, |
| - universal_newlines=True) |
| - self.fail() |
| - except subprocess2.CalledProcessError, e: |
| - self.assertEquals('a\nbb\nccc\n', e.stdout) |
| - self.assertEquals(None, e.stderr) |
| - self.assertEquals(64, e.returncode) |
| + def fn(c, e, un): |
| + try: |
| + subprocess2.check_output( |
| + e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, |
| + universal_newlines=un) |
| + self.fail() |
| + except subprocess2.CalledProcessError, e: |
| + self.assertEquals(c('a\nbb\nccc\n'), e.stdout) |
| + self.assertEquals(None, e.stderr) |
| + self.assertEquals(64, e.returncode) |
| + self._run_test(fn) |
| def test_check_call_throw(self): |
| try: |
| @@ -235,6 +310,13 @@ class Subprocess2Test(unittest.TestCase): |
| def child_main(args): |
| + if sys.platform == 'win32': |
| + # Annoying, make sure the output is not translated on Windows. |
| + # pylint: disable=E1101,F0401 |
| + import msvcrt |
| + msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) |
| + msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) |
| + |
| parser = optparse.OptionParser() |
| parser.add_option( |
| '--fail', |
| @@ -242,28 +324,52 @@ def child_main(args): |
| action='store_const', |
| default=0, |
| const=64) |
| + parser.add_option( |
| + '--crlf', action='store_const', const='\r\n', dest='eol', default='\n') |
| + parser.add_option( |
| + '--cr', action='store_const', const='\r', dest='eol') |
| parser.add_option('--stdout', action='store_true') |
| parser.add_option('--stderr', action='store_true') |
| - parser.add_option('--sleep', action='store_true') |
| + parser.add_option('--sleep_first', action='store_true') |
| + parser.add_option('--sleep_last', action='store_true') |
| + parser.add_option('--large', action='store_true') |
| + parser.add_option('--read', action='store_true') |
| options, args = parser.parse_args(args) |
| if args: |
| parser.error('Internal error') |
| + if options.sleep_first: |
| + time.sleep(10) |
| def do(string): |
| if options.stdout: |
| - print >> sys.stdout, string.upper() |
| + sys.stdout.write(string.upper()) |
| + sys.stdout.write(options.eol) |
| if options.stderr: |
| - print >> sys.stderr, string.lower() |
| + sys.stderr.write(string.lower()) |
| + sys.stderr.write(options.eol) |
| do('A') |
| do('BB') |
| do('CCC') |
| - if options.sleep: |
| + if options.large: |
| + # Print 128kb. |
| + string = '0123456789abcdef' * (8*1024) |
| + sys.stdout.write(string) |
| + if options.read: |
| + try: |
| + while sys.stdin.read(): |
| + pass |
| + except OSError: |
| + pass |
| + if options.sleep_last: |
| time.sleep(10) |
| return options.return_value |
| if __name__ == '__main__': |
| + logging.basicConfig(level= |
| + [logging.WARNING, logging.INFO, logging.DEBUG][ |
| + min(2, sys.argv.count('-v'))]) |
| if len(sys.argv) > 1 and sys.argv[1] == '--child': |
| sys.exit(child_main(sys.argv[2:])) |
| unittest.main() |