Index: tests/subprocess2_test.py |
diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py |
index 9d30452c3118220135168a934e52d407c0305a3f..d6c3fb7004a248bc46d499dfb406f54bf198b318 100755 |
--- a/tests/subprocess2_test.py |
+++ b/tests/subprocess2_test.py |
@@ -5,6 +5,7 @@ |
"""Unit tests for subprocess2.py.""" |
+import logging |
import optparse |
import os |
import sys |
@@ -19,6 +20,14 @@ import subprocess2 |
# Method could be a function |
# pylint: disable=R0201 |
+ |
+def convert(string): |
+ """Converts string to CRLF on Windows.""" |
+ if sys.platform == 'win32': |
+ return string.replace('\n', '\r\n') |
+ return string |
+ |
+ |
class Subprocess2Test(unittest.TestCase): |
# Can be mocked in a test. |
TO_SAVE = { |
@@ -147,13 +156,24 @@ class Subprocess2Test(unittest.TestCase): |
self.assertEquals(expected, results) |
def test_timeout(self): |
- # It'd be better to not discard stdout. |
out, returncode = subprocess2.communicate( |
self.exe + ['--sleep', '--stdout'], |
timeout=0.01, |
- stdout=subprocess2.PIPE) |
+ stdout=subprocess2.PIPE, |
+ shell=False) |
self.assertEquals(subprocess2.TIMED_OUT, returncode) |
- self.assertEquals(['', None], out) |
+ self.assertEquals(('', None), out) |
+ |
+ def test_timeout_shell_throws(self): |
+ try: |
+ subprocess2.communicate( |
+ self.exe + ['--sleep', '--stdout'], |
+ timeout=0.01, |
+ stdout=subprocess2.PIPE, |
+ shell=True) |
+ self.fail() |
+ except TypeError: |
+ pass |
def test_check_output_no_stdout(self): |
try: |
@@ -168,10 +188,7 @@ class Subprocess2Test(unittest.TestCase): |
stdout=subprocess2.VOID, |
stderr=subprocess2.PIPE) |
self.assertEquals(None, out) |
- expected = 'a\nbb\nccc\n' |
- if sys.platform == 'win32': |
- expected = expected.replace('\n', '\r\n') |
- self.assertEquals(expected, err) |
+ self.assertEquals(convert('a\nbb\nccc\n'), err) |
self.assertEquals(0, code) |
def test_stderr_void(self): |
@@ -235,6 +252,66 @@ class Subprocess2Test(unittest.TestCase): |
self.assertEquals(None, e.stderr) |
self.assertEquals(64, e.returncode) |
+ def test_check_output_tee_stderr(self): |
+ stderr = [] |
+ out, returncode = subprocess2.communicate( |
+ self.exe + ['--stderr'], stderr=stderr.append) |
+ self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) |
+ self.assertEquals((None, None), out) |
+ self.assertEquals(0, returncode) |
+ |
+ def test_check_output_tee_stdout_stderr(self): |
+ stdout = [] |
+ stderr = [] |
+ out, returncode = subprocess2.communicate( |
+ self.exe + ['--stdout', '--stderr'], |
+ stdout=stdout.append, |
+ stderr=stderr.append) |
+ self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) |
+ self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) |
+ self.assertEquals((None, None), out) |
+ self.assertEquals(0, returncode) |
+ |
+ def test_check_output_tee_stdin(self): |
+ stdout = [] |
+ stdin = '0123456789' |
+ out, returncode = subprocess2.communicate( |
+ self.exe + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append) |
+ self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) |
+ self.assertEquals((None, None), out) |
+ self.assertEquals(0, returncode) |
+ |
+ def test_check_output_tee_throw(self): |
+ stderr = [] |
+ try: |
+ subprocess2.check_output( |
+ self.exe + ['--stderr', '--fail'], stderr=stderr.append) |
+ self.fail() |
+ except subprocess2.CalledProcessError, e: |
+ self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) |
+ self.assertEquals('', e.stdout) |
+ self.assertEquals(None, e.stderr) |
+ self.assertEquals(64, e.returncode) |
+ |
+ def test_check_output_tee_large(self): |
+ stdout = [] |
+ # Read 128kb. On my workstation it takes >2s. Welcome to 2011. |
+ out, returncode = subprocess2.communicate( |
+ self.exe + ['--large'], stdout=stdout.append) |
+ self.assertEquals(128*1024, len(''.join(stdout))) |
+ self.assertEquals((None, None), out) |
+ self.assertEquals(0, returncode) |
+ |
+ def test_check_output_tee_large_stdin(self): |
+ stdout = [] |
+ # Write 128kb. |
+ stdin = '0123456789abcdef' * (8*1024) |
+ out, returncode = subprocess2.communicate( |
+ self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) |
+ self.assertEquals(128*1024, len(''.join(stdout))) |
+ self.assertEquals((None, None), out) |
+ self.assertEquals(0, returncode) |
+ |
def child_main(args): |
parser = optparse.OptionParser() |
@@ -247,9 +324,13 @@ def child_main(args): |
parser.add_option('--stdout', action='store_true') |
parser.add_option('--stderr', action='store_true') |
parser.add_option('--sleep', action='store_true') |
+ parser.add_option('--large', action='store_true') |
+ parser.add_option('--read', action='store_true') |
options, args = parser.parse_args(args) |
if args: |
parser.error('Internal error') |
+ if options.sleep: |
+ time.sleep(10) |
def do(string): |
if options.stdout: |
@@ -260,12 +341,23 @@ def child_main(args): |
do('A') |
do('BB') |
do('CCC') |
- if options.sleep: |
- time.sleep(10) |
+ if options.large: |
+ # Print 128kb. |
+ string = '0123456789abcdef' * (8*1024) |
+ sys.stdout.write(string) |
+ if options.read: |
+ try: |
+ while sys.stdin.read(): |
+ pass |
+ except OSError: |
+ pass |
return options.return_value |
if __name__ == '__main__': |
+ logging.basicConfig(level= |
+ [logging.WARNING, logging.INFO, logging.DEBUG][ |
+ min(2, sys.argv.count('-v'))]) |
if len(sys.argv) > 1 and sys.argv[1] == '--child': |
sys.exit(child_main(sys.argv[2:])) |
unittest.main() |