| Index: tests/subprocess2_test.py
|
| diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py
|
| index cb390751e2c9b47e35c7f42e5ee57394f7371e81..9d30452c3118220135168a934e52d407c0305a3f 100755
|
| --- a/tests/subprocess2_test.py
|
| +++ b/tests/subprocess2_test.py
|
| @@ -5,18 +5,12 @@
|
|
|
| """Unit tests for subprocess2.py."""
|
|
|
| -import logging
|
| import optparse
|
| import os
|
| import sys
|
| import time
|
| import unittest
|
|
|
| -try:
|
| - import fcntl
|
| -except ImportError:
|
| - fcntl = None
|
| -
|
| ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
| sys.path.insert(0, ROOT_DIR)
|
|
|
| @@ -25,25 +19,7 @@ import subprocess2
|
| # Method could be a function
|
| # pylint: disable=R0201
|
|
|
| -
|
| -def convert_to_crlf(string):
|
| - """Unconditionally convert LF to CRLF."""
|
| - return string.replace('\n', '\r\n')
|
| -
|
| -
|
| -def convert_to_cr(string):
|
| - """Unconditionally convert LF to CR."""
|
| - return string.replace('\n', '\r')
|
| -
|
| -
|
| -def convert_win(string):
|
| - """Converts string to CRLF on Windows only."""
|
| - if sys.platform == 'win32':
|
| - return string.replace('\n', '\r\n')
|
| - return string
|
| -
|
| -
|
| -class DefaultsTest(unittest.TestCase):
|
| +class Subprocess2Test(unittest.TestCase):
|
| # Can be mocked in a test.
|
| TO_SAVE = {
|
| subprocess2: [
|
| @@ -52,6 +28,8 @@ class DefaultsTest(unittest.TestCase):
|
| }
|
|
|
| def setUp(self):
|
| + self.exe_path = __file__
|
| + self.exe = [sys.executable, self.exe_path, '--child']
|
| self.saved = {}
|
| for module, names in self.TO_SAVE.iteritems():
|
| self.saved[module] = dict(
|
| @@ -168,67 +146,14 @@ class DefaultsTest(unittest.TestCase):
|
| }
|
| self.assertEquals(expected, results)
|
|
|
| - def test_timeout_shell_throws(self):
|
| - # Never called.
|
| - _ = self._fake_Popen()
|
| - try:
|
| - subprocess2.communicate(
|
| - sys.executable,
|
| - timeout=0.01,
|
| - stdout=subprocess2.PIPE,
|
| - shell=True)
|
| - self.fail()
|
| - except TypeError:
|
| - pass
|
| -
|
| -
|
| -class S2Test(unittest.TestCase):
|
| - def setUp(self):
|
| - super(S2Test, self).setUp()
|
| - self.exe_path = __file__
|
| - self.exe = [sys.executable, self.exe_path, '--child']
|
| - self.states = {}
|
| - if fcntl:
|
| - for v in (sys.stdin, sys.stdout, sys.stderr):
|
| - fileno = v.fileno()
|
| - self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL)
|
| -
|
| - def tearDown(self):
|
| - for fileno, fl in self.states.iteritems():
|
| - self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL))
|
| - super(S2Test, self).tearDown()
|
| -
|
| - def _run_test(self, function):
|
| - """Runs tests in 6 combinations:
|
| - - LF output with universal_newlines=False
|
| - - CR output with universal_newlines=False
|
| - - CRLF output with universal_newlines=False
|
| - - LF output with universal_newlines=True
|
| - - CR output with universal_newlines=True
|
| - - CRLF output with universal_newlines=True
|
| -
|
| - First |function| argument is the convertion for the origianl expected LF
|
| - string to the right EOL.
|
| - Second |function| argument is the executable and initial flag to run, to
|
| - control what EOL is used by the child process.
|
| - Third |function| argument is universal_newlines value.
|
| - """
|
| - noop = lambda x: x
|
| - function(noop, self.exe, False)
|
| - function(convert_to_cr, self.exe + ['--cr'], False)
|
| - function(convert_to_crlf, self.exe + ['--crlf'], False)
|
| - function(noop, self.exe, True)
|
| - function(noop, self.exe + ['--cr'], True)
|
| - function(noop, self.exe + ['--crlf'], True)
|
| -
|
| def test_timeout(self):
|
| + # It'd be better to not discard stdout.
|
| out, returncode = subprocess2.communicate(
|
| - self.exe + ['--sleep_first', '--stdout'],
|
| + self.exe + ['--sleep', '--stdout'],
|
| timeout=0.01,
|
| - stdout=subprocess2.PIPE,
|
| - shell=False)
|
| + stdout=subprocess2.PIPE)
|
| self.assertEquals(subprocess2.TIMED_OUT, returncode)
|
| - self.assertEquals(('', None), out)
|
| + self.assertEquals(['', None], out)
|
|
|
| def test_check_output_no_stdout(self):
|
| try:
|
| @@ -238,78 +163,68 @@ class S2Test(unittest.TestCase):
|
| pass
|
|
|
| def test_stdout_void(self):
|
| - def fn(c, e, un):
|
| - (out, err), code = subprocess2.communicate(
|
| - e + ['--stdout', '--stderr'],
|
| - stdout=subprocess2.VOID,
|
| - stderr=subprocess2.PIPE,
|
| - universal_newlines=un)
|
| - self.assertEquals(None, out)
|
| - self.assertEquals(c('a\nbb\nccc\n'), err)
|
| - self.assertEquals(0, code)
|
| - self._run_test(fn)
|
| + (out, err), code = subprocess2.communicate(
|
| + self.exe + ['--stdout', '--stderr'],
|
| + stdout=subprocess2.VOID,
|
| + stderr=subprocess2.PIPE)
|
| + self.assertEquals(None, out)
|
| + expected = 'a\nbb\nccc\n'
|
| + if sys.platform == 'win32':
|
| + expected = expected.replace('\n', '\r\n')
|
| + self.assertEquals(expected, err)
|
| + self.assertEquals(0, code)
|
|
|
| def test_stderr_void(self):
|
| - def fn(c, e, un):
|
| - (out, err), code = subprocess2.communicate(
|
| - e + ['--stdout', '--stderr'],
|
| - stdout=subprocess2.PIPE,
|
| - stderr=subprocess2.VOID,
|
| - universal_newlines=un)
|
| - self.assertEquals(c('A\nBB\nCCC\n'), out)
|
| - self.assertEquals(None, err)
|
| - self.assertEquals(0, code)
|
| - self._run_test(fn)
|
| + (out, err), code = subprocess2.communicate(
|
| + self.exe + ['--stdout', '--stderr'],
|
| + universal_newlines=True,
|
| + stdout=subprocess2.PIPE,
|
| + stderr=subprocess2.VOID)
|
| + self.assertEquals('A\nBB\nCCC\n', out)
|
| + self.assertEquals(None, err)
|
| + self.assertEquals(0, code)
|
|
|
| def test_check_output_throw_stdout(self):
|
| - def fn(c, e, un):
|
| - try:
|
| - subprocess2.check_output(
|
| - e + ['--fail', '--stdout'], universal_newlines=un)
|
| - self.fail()
|
| - except subprocess2.CalledProcessError, e:
|
| - self.assertEquals(c('A\nBB\nCCC\n'), e.stdout)
|
| - self.assertEquals(None, e.stderr)
|
| - self.assertEquals(64, e.returncode)
|
| - self._run_test(fn)
|
| + try:
|
| + subprocess2.check_output(
|
| + self.exe + ['--fail', '--stdout'], universal_newlines=True)
|
| + self.fail()
|
| + except subprocess2.CalledProcessError, e:
|
| + self.assertEquals('A\nBB\nCCC\n', e.stdout)
|
| + self.assertEquals(None, e.stderr)
|
| + self.assertEquals(64, e.returncode)
|
|
|
| def test_check_output_throw_no_stderr(self):
|
| - def fn(c, e, un):
|
| - try:
|
| - subprocess2.check_output(
|
| - e + ['--fail', '--stderr'], universal_newlines=un)
|
| - self.fail()
|
| - except subprocess2.CalledProcessError, e:
|
| - self.assertEquals(c(''), e.stdout)
|
| - self.assertEquals(None, e.stderr)
|
| - self.assertEquals(64, e.returncode)
|
| - self._run_test(fn)
|
| + try:
|
| + subprocess2.check_output(
|
| + self.exe + ['--fail', '--stderr'], universal_newlines=True)
|
| + self.fail()
|
| + except subprocess2.CalledProcessError, e:
|
| + self.assertEquals('', e.stdout)
|
| + self.assertEquals(None, e.stderr)
|
| + self.assertEquals(64, e.returncode)
|
|
|
| def test_check_output_throw_stderr(self):
|
| - def fn(c, e, un):
|
| - try:
|
| - subprocess2.check_output(
|
| - e + ['--fail', '--stderr'], stderr=subprocess2.PIPE,
|
| - universal_newlines=un)
|
| - self.fail()
|
| - except subprocess2.CalledProcessError, e:
|
| - self.assertEquals('', e.stdout)
|
| - self.assertEquals(c('a\nbb\nccc\n'), e.stderr)
|
| - self.assertEquals(64, e.returncode)
|
| - self._run_test(fn)
|
| + try:
|
| + subprocess2.check_output(
|
| + self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE,
|
| + universal_newlines=True)
|
| + self.fail()
|
| + except subprocess2.CalledProcessError, e:
|
| + self.assertEquals('', e.stdout)
|
| + self.assertEquals('a\nbb\nccc\n', e.stderr)
|
| + self.assertEquals(64, e.returncode)
|
|
|
| def test_check_output_throw_stderr_stdout(self):
|
| - def fn(c, e, un):
|
| - try:
|
| - subprocess2.check_output(
|
| - e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT,
|
| - universal_newlines=un)
|
| - self.fail()
|
| - except subprocess2.CalledProcessError, e:
|
| - self.assertEquals(c('a\nbb\nccc\n'), e.stdout)
|
| - self.assertEquals(None, e.stderr)
|
| - self.assertEquals(64, e.returncode)
|
| - self._run_test(fn)
|
| + try:
|
| + subprocess2.check_output(
|
| + self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT,
|
| + universal_newlines=True)
|
| + self.fail()
|
| + except subprocess2.CalledProcessError, e:
|
| + self.assertEquals('a\nbb\nccc\n', e.stdout)
|
| + self.assertEquals(None, e.stderr)
|
| + self.assertEquals(64, e.returncode)
|
|
|
| def test_check_call_throw(self):
|
| try:
|
| @@ -320,87 +235,8 @@ class S2Test(unittest.TestCase):
|
| self.assertEquals(None, e.stderr)
|
| self.assertEquals(64, e.returncode)
|
|
|
| - def test_check_output_tee_stderr(self):
|
| - def fn(c, e, un):
|
| - stderr = []
|
| - out, returncode = subprocess2.communicate(
|
| - e + ['--stderr'], stderr=stderr.append,
|
| - universal_newlines=un)
|
| - self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
|
| - self.assertEquals((None, None), out)
|
| - self.assertEquals(0, returncode)
|
| - self._run_test(fn)
|
| -
|
| - def test_check_output_tee_stdout_stderr(self):
|
| - def fn(c, e, un):
|
| - stdout = []
|
| - stderr = []
|
| - out, returncode = subprocess2.communicate(
|
| - e + ['--stdout', '--stderr'],
|
| - stdout=stdout.append,
|
| - stderr=stderr.append,
|
| - universal_newlines=un)
|
| - self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
|
| - self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
|
| - self.assertEquals((None, None), out)
|
| - self.assertEquals(0, returncode)
|
| - self._run_test(fn)
|
| -
|
| - def test_check_output_tee_stdin(self):
|
| - def fn(c, e, un):
|
| - stdout = []
|
| - stdin = '0123456789'
|
| - out, returncode = subprocess2.communicate(
|
| - e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append,
|
| - universal_newlines=un)
|
| - self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
|
| - self.assertEquals((None, None), out)
|
| - self.assertEquals(0, returncode)
|
| - self._run_test(fn)
|
| -
|
| - def test_check_output_tee_throw(self):
|
| - def fn(c, e, un):
|
| - stderr = []
|
| - try:
|
| - subprocess2.check_output(
|
| - e + ['--stderr', '--fail'], stderr=stderr.append,
|
| - universal_newlines=un)
|
| - self.fail()
|
| - except subprocess2.CalledProcessError, e:
|
| - self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
|
| - self.assertEquals('', e.stdout)
|
| - self.assertEquals(None, e.stderr)
|
| - self.assertEquals(64, e.returncode)
|
| - self._run_test(fn)
|
| -
|
| - def test_check_output_tee_large(self):
|
| - stdout = []
|
| - # Read 128kb. On my workstation it takes >2s. Welcome to 2011.
|
| - out, returncode = subprocess2.communicate(
|
| - self.exe + ['--large'], stdout=stdout.append)
|
| - self.assertEquals(128*1024, len(''.join(stdout)))
|
| - self.assertEquals((None, None), out)
|
| - self.assertEquals(0, returncode)
|
| -
|
| - def test_check_output_tee_large_stdin(self):
|
| - stdout = []
|
| - # Write 128kb.
|
| - stdin = '0123456789abcdef' * (8*1024)
|
| - out, returncode = subprocess2.communicate(
|
| - self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append)
|
| - self.assertEquals(128*1024, len(''.join(stdout)))
|
| - self.assertEquals((None, None), out)
|
| - self.assertEquals(0, returncode)
|
| -
|
|
|
| def child_main(args):
|
| - if sys.platform == 'win32':
|
| - # Annoying, make sure the output is not translated on Windows.
|
| - # pylint: disable=E1101,F0401
|
| - import msvcrt
|
| - msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
|
| - msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
|
| -
|
| parser = optparse.OptionParser()
|
| parser.add_option(
|
| '--fail',
|
| @@ -408,52 +244,28 @@ def child_main(args):
|
| action='store_const',
|
| default=0,
|
| const=64)
|
| - parser.add_option(
|
| - '--crlf', action='store_const', const='\r\n', dest='eol', default='\n')
|
| - parser.add_option(
|
| - '--cr', action='store_const', const='\r', dest='eol')
|
| parser.add_option('--stdout', action='store_true')
|
| parser.add_option('--stderr', action='store_true')
|
| - parser.add_option('--sleep_first', action='store_true')
|
| - parser.add_option('--sleep_last', action='store_true')
|
| - parser.add_option('--large', action='store_true')
|
| - parser.add_option('--read', action='store_true')
|
| + parser.add_option('--sleep', action='store_true')
|
| options, args = parser.parse_args(args)
|
| if args:
|
| parser.error('Internal error')
|
| - if options.sleep_first:
|
| - time.sleep(10)
|
|
|
| def do(string):
|
| if options.stdout:
|
| - sys.stdout.write(string.upper())
|
| - sys.stdout.write(options.eol)
|
| + print >> sys.stdout, string.upper()
|
| if options.stderr:
|
| - sys.stderr.write(string.lower())
|
| - sys.stderr.write(options.eol)
|
| + print >> sys.stderr, string.lower()
|
|
|
| do('A')
|
| do('BB')
|
| do('CCC')
|
| - if options.large:
|
| - # Print 128kb.
|
| - string = '0123456789abcdef' * (8*1024)
|
| - sys.stdout.write(string)
|
| - if options.read:
|
| - try:
|
| - while sys.stdin.read():
|
| - pass
|
| - except OSError:
|
| - pass
|
| - if options.sleep_last:
|
| + if options.sleep:
|
| time.sleep(10)
|
| return options.return_value
|
|
|
|
|
| if __name__ == '__main__':
|
| - logging.basicConfig(level=
|
| - [logging.WARNING, logging.INFO, logging.DEBUG][
|
| - min(2, sys.argv.count('-v'))])
|
| if len(sys.argv) > 1 and sys.argv[1] == '--child':
|
| sys.exit(child_main(sys.argv[2:]))
|
| unittest.main()
|
|
|