Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 | 5 |
| 6 """Unit tests for subprocess2.py.""" | 6 """Unit tests for subprocess2.py.""" |
| 7 | 7 |
| 8 import logging | 8 import logging |
| 9 import optparse | 9 import optparse |
| 10 import os | 10 import os |
| 11 import sys | 11 import sys |
| 12 import time | 12 import time |
| 13 import unittest | 13 import unittest |
| 14 | 14 |
| 15 try: | 15 try: |
| 16 import fcntl | 16 import fcntl |
| 17 except ImportError: | 17 except ImportError: |
| 18 fcntl = None | 18 fcntl = None |
| 19 | 19 |
| 20 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | 20 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| 21 | 21 |
| 22 import subprocess | |
| 22 import subprocess2 | 23 import subprocess2 |
| 23 | 24 |
| 24 from testing_support import auto_stub | 25 from testing_support import auto_stub |
| 25 | 26 |
| 26 # Method could be a function | 27 # Method could be a function |
| 27 # pylint: disable=R0201 | 28 # pylint: disable=R0201 |
| 28 | 29 |
| 29 | 30 |
| 30 def convert_to_crlf(string): | 31 def convert_to_crlf(string): |
| 31 """Unconditionally convert LF to CRLF.""" | 32 """Unconditionally convert LF to CRLF.""" |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 75 | 76 |
| 76 def _fake_subprocess_Popen(self): | 77 def _fake_subprocess_Popen(self): |
| 77 """Mocks the base class subprocess.Popen only.""" | 78 """Mocks the base class subprocess.Popen only.""" |
| 78 results = {} | 79 results = {} |
| 79 def __init__(self, args, **kwargs): | 80 def __init__(self, args, **kwargs): |
| 80 assert not results | 81 assert not results |
| 81 results.update(kwargs) | 82 results.update(kwargs) |
| 82 results['args'] = args | 83 results['args'] = args |
| 83 def communicate(): | 84 def communicate(): |
| 84 return None, None | 85 return None, None |
| 85 self.mock(subprocess2.subprocess.Popen, '__init__', __init__) | 86 self.mock(subprocess.Popen, '__init__', __init__) |
| 86 self.mock(subprocess2.subprocess.Popen, 'communicate', communicate) | 87 self.mock(subprocess.Popen, 'communicate', communicate) |
| 87 return results | 88 return results |
| 88 | 89 |
| 89 def test_check_call_defaults(self): | 90 def test_check_call_defaults(self): |
| 90 results = self._fake_communicate() | 91 results = self._fake_communicate() |
| 91 self.assertEquals( | 92 self.assertEquals( |
| 92 ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True)) | 93 ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True)) |
| 93 expected = { | 94 expected = { |
| 94 'args': ['foo'], | 95 'args': ['foo'], |
| 95 'a':True, | 96 'a':True, |
| 96 } | 97 } |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 151 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) | 152 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) |
| 152 expected = { | 153 expected = { |
| 153 'args': ['foo'], | 154 'args': ['foo'], |
| 154 'a':True, | 155 'a':True, |
| 155 'stdin': subprocess2.VOID, | 156 'stdin': subprocess2.VOID, |
| 156 'stdout': subprocess2.PIPE, | 157 'stdout': subprocess2.PIPE, |
| 157 } | 158 } |
| 158 self.assertEquals(expected, results) | 159 self.assertEquals(expected, results) |
| 159 | 160 |
| 160 | 161 |
| 161 class S2Test(unittest.TestCase): | 162 class BaseTestCase(unittest.TestCase): |
| 162 def setUp(self): | 163 def setUp(self): |
| 163 super(S2Test, self).setUp() | 164 super(BaseTestCase, self).setUp() |
| 164 self.exe_path = __file__ | 165 self.exe_path = __file__ |
| 165 self.exe = [sys.executable, self.exe_path, '--child'] | 166 self.exe = [sys.executable, self.exe_path, '--child'] |
| 166 self.states = {} | 167 self.states = {} |
| 167 if fcntl: | 168 if fcntl: |
| 168 for v in (sys.stdin, sys.stdout, sys.stderr): | 169 for v in (sys.stdin, sys.stdout, sys.stderr): |
| 169 fileno = v.fileno() | 170 fileno = v.fileno() |
| 170 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) | 171 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) |
| 171 | 172 |
| 172 def tearDown(self): | 173 def tearDown(self): |
| 173 for fileno, fl in self.states.iteritems(): | 174 for fileno, fl in self.states.iteritems(): |
| 174 self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) | 175 self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) |
| 175 super(S2Test, self).tearDown() | 176 super(BaseTestCase, self).tearDown() |
| 176 | 177 |
| 178 | |
| 179 class RegressionTest(BaseTestCase): | |
| 180 # Regression tests to ensure that subprocess and subprocess2 have the same | |
| 181 # behavior. | |
| 182 def _run_test(self, function): | |
| 183 """Runs tests in 12 combinations: | |
| 184 - LF output with universal_newlines=False | |
| 185 - CR output with universal_newlines=False | |
| 186 - CRLF output with universal_newlines=False | |
| 187 - LF output with universal_newlines=True | |
| 188 - CR output with universal_newlines=True | |
| 189 - CRLF output with universal_newlines=True | |
| 190 | |
| 191 Once with subprocess, once with subprocess2. | |
| 192 | |
| 193 First |function| argument is the convertion for the origianl expected LF | |
|
Dirk Pranke
2011/11/28 22:31:50
typos: "conversion" and "original".
| |
| 194 string to the right EOL. | |
| 195 Second |function| argument is the executable and initial flag to run, to | |
| 196 control what EOL is used by the child process. | |
| 197 Third |function| argument is universal_newlines value. | |
| 198 """ | |
| 199 noop = lambda x: x | |
| 200 for subp in (subprocess, subprocess2): | |
| 201 function(noop, self.exe, False, subp) | |
| 202 function(convert_to_cr, self.exe + ['--cr'], False, subp) | |
| 203 function(convert_to_crlf, self.exe + ['--crlf'], False, subp) | |
| 204 function(noop, self.exe, True, subp) | |
| 205 function(noop, self.exe + ['--cr'], True, subp) | |
| 206 function(noop, self.exe + ['--crlf'], True, subp) | |
| 207 | |
| 208 def _check_pipes(self, subp, e, stdout, stderr): | |
| 209 """On exception, look if the exception members are set correctly.""" | |
| 210 if subp is subprocess: | |
| 211 # subprocess never save the output. | |
| 212 self.assertFalse(hasattr(e, 'stdout')) | |
| 213 self.assertFalse(hasattr(e, 'stderr')) | |
| 214 elif subp is subprocess2: | |
| 215 self.assertEquals(stdout, e.stdout) | |
| 216 self.assertEquals(stderr, e.stderr) | |
| 217 else: | |
| 218 self.fail() | |
| 219 | |
| 220 def test_check_output_no_stdout(self): | |
| 221 for subp in (subprocess, subprocess2): | |
| 222 try: | |
| 223 subp.check_output(self.exe, stdout=subp.PIPE) | |
| 224 self.fail() | |
| 225 except ValueError: | |
| 226 pass | |
| 227 | |
| 228 def test_check_output_throw_stdout(self): | |
| 229 def fn(c, e, un, subp): | |
| 230 try: | |
| 231 subp.check_output( | |
| 232 e + ['--fail', '--stdout'], universal_newlines=un) | |
| 233 self.fail() | |
| 234 except subp.CalledProcessError, e: | |
| 235 self._check_pipes(subp, e, c('A\nBB\nCCC\n'), None) | |
| 236 self.assertEquals(64, e.returncode) | |
| 237 self._run_test(fn) | |
| 238 | |
| 239 def test_check_output_throw_no_stderr(self): | |
| 240 def fn(c, e, un, subp): | |
| 241 try: | |
| 242 subp.check_output( | |
| 243 e + ['--fail', '--stderr'], universal_newlines=un) | |
| 244 self.fail() | |
| 245 except subp.CalledProcessError, e: | |
| 246 self._check_pipes(subp, e, c(''), None) | |
| 247 self.assertEquals(64, e.returncode) | |
| 248 self._run_test(fn) | |
| 249 | |
| 250 def test_check_output_throw_stderr(self): | |
| 251 def fn(c, e, un, subp): | |
| 252 try: | |
| 253 subp.check_output( | |
| 254 e + ['--fail', '--stderr'], | |
| 255 stderr=subp.PIPE, | |
| 256 universal_newlines=un) | |
| 257 self.fail() | |
| 258 except subp.CalledProcessError, e: | |
| 259 self._check_pipes(subp, e, '', c('a\nbb\nccc\n')) | |
| 260 self.assertEquals(64, e.returncode) | |
| 261 self._run_test(fn) | |
| 262 | |
| 263 def test_check_output_throw_stderr_stdout(self): | |
| 264 def fn(c, e, un, subp): | |
| 265 try: | |
| 266 subp.check_output( | |
| 267 e + ['--fail', '--stderr'], | |
| 268 stderr=subp.STDOUT, | |
| 269 universal_newlines=un) | |
| 270 self.fail() | |
| 271 except subp.CalledProcessError, e: | |
| 272 self._check_pipes(subp, e, c('a\nbb\nccc\n'), None) | |
| 273 self.assertEquals(64, e.returncode) | |
| 274 self._run_test(fn) | |
| 275 | |
| 276 def test_check_call_throw(self): | |
| 277 for subp in (subprocess, subprocess2): | |
| 278 try: | |
| 279 subp.check_call(self.exe + ['--fail', '--stderr']) | |
| 280 self.fail() | |
| 281 except subp.CalledProcessError, e: | |
| 282 self._check_pipes(subp, e, None, None) | |
| 283 self.assertEquals(64, e.returncode) | |
| 284 | |
| 285 | |
| 286 class S2Test(BaseTestCase): | |
| 287 # Tests that can only run in subprocess2, e.g. new functionalities. | |
| 288 # In particular, subprocess2.communicate() doesn't exist in subprocess. | |
| 177 def _run_test(self, function): | 289 def _run_test(self, function): |
| 178 """Runs tests in 6 combinations: | 290 """Runs tests in 6 combinations: |
| 179 - LF output with universal_newlines=False | 291 - LF output with universal_newlines=False |
| 180 - CR output with universal_newlines=False | 292 - CR output with universal_newlines=False |
| 181 - CRLF output with universal_newlines=False | 293 - CRLF output with universal_newlines=False |
| 182 - LF output with universal_newlines=True | 294 - LF output with universal_newlines=True |
| 183 - CR output with universal_newlines=True | 295 - CR output with universal_newlines=True |
| 184 - CRLF output with universal_newlines=True | 296 - CRLF output with universal_newlines=True |
| 185 | 297 |
| 186 First |function| argument is the convertion for the origianl expected LF | 298 First |function| argument is the convertion for the origianl expected LF |
| 187 string to the right EOL. | 299 string to the right EOL. |
| 188 Second |function| argument is the executable and initial flag to run, to | 300 Second |function| argument is the executable and initial flag to run, to |
| 189 control what EOL is used by the child process. | 301 control what EOL is used by the child process. |
| 190 Third |function| argument is universal_newlines value. | 302 Third |function| argument is universal_newlines value. |
| 191 """ | 303 """ |
| 192 noop = lambda x: x | 304 noop = lambda x: x |
| 193 function(noop, self.exe, False) | 305 function(noop, self.exe, False) |
| 194 function(convert_to_cr, self.exe + ['--cr'], False) | 306 function(convert_to_cr, self.exe + ['--cr'], False) |
| 195 function(convert_to_crlf, self.exe + ['--crlf'], False) | 307 function(convert_to_crlf, self.exe + ['--crlf'], False) |
| 196 function(noop, self.exe, True) | 308 function(noop, self.exe, True) |
| 197 function(noop, self.exe + ['--cr'], True) | 309 function(noop, self.exe + ['--cr'], True) |
| 198 function(noop, self.exe + ['--crlf'], True) | 310 function(noop, self.exe + ['--crlf'], True) |
| 199 | 311 |
| 200 def test_timeout(self): | 312 def test_timeout(self): |
| 201 out, returncode = subprocess2.communicate( | 313 # timeout doesn't exist in subprocess. |
| 202 self.exe + ['--sleep_first', '--stdout'], | 314 def fn(c, e, un): |
| 203 timeout=0.01, | 315 out, returncode = subprocess2.communicate( |
| 204 stdout=subprocess2.PIPE, | 316 self.exe + ['--sleep_first', '--stdout'], |
| 205 shell=False) | 317 timeout=0.01, |
| 206 self.assertEquals(subprocess2.TIMED_OUT, returncode) | 318 stdout=subprocess2.PIPE, |
| 207 self.assertEquals(('', None), out) | 319 shell=False) |
| 208 | 320 self.assertEquals(subprocess2.TIMED_OUT, returncode) |
| 209 def test_check_output_no_stdout(self): | 321 self.assertEquals(('', None), out) |
| 210 try: | 322 self._run_test(fn) |
| 211 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) | |
| 212 self.fail() | |
| 213 except TypeError: | |
| 214 pass | |
| 215 | 323 |
| 216 def test_stdout_void(self): | 324 def test_stdout_void(self): |
| 217 def fn(c, e, un): | 325 def fn(c, e, un): |
| 218 (out, err), code = subprocess2.communicate( | 326 (out, err), code = subprocess2.communicate( |
| 219 e + ['--stdout', '--stderr'], | 327 e + ['--stdout', '--stderr'], |
| 220 stdout=subprocess2.VOID, | 328 stdout=subprocess2.VOID, |
| 221 stderr=subprocess2.PIPE, | 329 stderr=subprocess2.PIPE, |
| 222 universal_newlines=un) | 330 universal_newlines=un) |
| 223 self.assertEquals(None, out) | 331 self.assertEquals(None, out) |
| 224 self.assertEquals(c('a\nbb\nccc\n'), err) | 332 self.assertEquals(c('a\nbb\nccc\n'), err) |
| (...skipping 30 matching lines...) Expand all Loading... | |
| 255 (out, err), code = subprocess2.communicate( | 363 (out, err), code = subprocess2.communicate( |
| 256 e + ['--stderr'], | 364 e + ['--stderr'], |
| 257 stderr=subprocess2.STDOUT, | 365 stderr=subprocess2.STDOUT, |
| 258 universal_newlines=un) | 366 universal_newlines=un) |
| 259 # stderr output into stdout but stdout is not piped. | 367 # stderr output into stdout but stdout is not piped. |
| 260 self.assertEquals(None, out) | 368 self.assertEquals(None, out) |
| 261 self.assertEquals(None, err) | 369 self.assertEquals(None, err) |
| 262 self.assertEquals(0, code) | 370 self.assertEquals(0, code) |
| 263 self._run_test(fn) | 371 self._run_test(fn) |
| 264 | 372 |
| 265 def test_check_output_throw_stdout(self): | |
| 266 def fn(c, e, un): | |
| 267 try: | |
| 268 subprocess2.check_output( | |
| 269 e + ['--fail', '--stdout'], universal_newlines=un) | |
| 270 self.fail() | |
| 271 except subprocess2.CalledProcessError, e: | |
| 272 self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) | |
| 273 self.assertEquals(None, e.stderr) | |
| 274 self.assertEquals(64, e.returncode) | |
| 275 self._run_test(fn) | |
| 276 | |
| 277 def test_check_output_throw_no_stderr(self): | |
| 278 def fn(c, e, un): | |
| 279 try: | |
| 280 subprocess2.check_output( | |
| 281 e + ['--fail', '--stderr'], universal_newlines=un) | |
| 282 self.fail() | |
| 283 except subprocess2.CalledProcessError, e: | |
| 284 self.assertEquals(c(''), e.stdout) | |
| 285 self.assertEquals(None, e.stderr) | |
| 286 self.assertEquals(64, e.returncode) | |
| 287 self._run_test(fn) | |
| 288 | |
| 289 def test_check_output_throw_stderr(self): | |
| 290 def fn(c, e, un): | |
| 291 try: | |
| 292 subprocess2.check_output( | |
| 293 e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, | |
| 294 universal_newlines=un) | |
| 295 self.fail() | |
| 296 except subprocess2.CalledProcessError, e: | |
| 297 self.assertEquals('', e.stdout) | |
| 298 self.assertEquals(c('a\nbb\nccc\n'), e.stderr) | |
| 299 self.assertEquals(64, e.returncode) | |
| 300 self._run_test(fn) | |
| 301 | |
| 302 def test_check_output_throw_stderr_stdout(self): | |
| 303 def fn(c, e, un): | |
| 304 try: | |
| 305 subprocess2.check_output( | |
| 306 e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, | |
| 307 universal_newlines=un) | |
| 308 self.fail() | |
| 309 except subprocess2.CalledProcessError, e: | |
| 310 self.assertEquals(c('a\nbb\nccc\n'), e.stdout) | |
| 311 self.assertEquals(None, e.stderr) | |
| 312 self.assertEquals(64, e.returncode) | |
| 313 self._run_test(fn) | |
| 314 | |
| 315 def test_check_call_throw(self): | |
| 316 try: | |
| 317 subprocess2.check_call(self.exe + ['--fail', '--stderr']) | |
| 318 self.fail() | |
| 319 except subprocess2.CalledProcessError, e: | |
| 320 self.assertEquals(None, e.stdout) | |
| 321 self.assertEquals(None, e.stderr) | |
| 322 self.assertEquals(64, e.returncode) | |
| 323 | |
| 324 | 373 |
| 325 def child_main(args): | 374 def child_main(args): |
| 326 if sys.platform == 'win32': | 375 if sys.platform == 'win32': |
| 327 # Annoying, make sure the output is not translated on Windows. | 376 # Annoying, make sure the output is not translated on Windows. |
| 328 # pylint: disable=E1101,F0401 | 377 # pylint: disable=E1101,F0401 |
| 329 import msvcrt | 378 import msvcrt |
| 330 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) | 379 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) |
| 331 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) | 380 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) |
| 332 | 381 |
| 333 parser = optparse.OptionParser() | 382 parser = optparse.OptionParser() |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 379 return options.return_value | 428 return options.return_value |
| 380 | 429 |
| 381 | 430 |
| 382 if __name__ == '__main__': | 431 if __name__ == '__main__': |
| 383 logging.basicConfig(level= | 432 logging.basicConfig(level= |
| 384 [logging.WARNING, logging.INFO, logging.DEBUG][ | 433 [logging.WARNING, logging.INFO, logging.DEBUG][ |
| 385 min(2, sys.argv.count('-v'))]) | 434 min(2, sys.argv.count('-v'))]) |
| 386 if len(sys.argv) > 1 and sys.argv[1] == '--child': | 435 if len(sys.argv) > 1 and sys.argv[1] == '--child': |
| 387 sys.exit(child_main(sys.argv[2:])) | 436 sys.exit(child_main(sys.argv[2:])) |
| 388 unittest.main() | 437 unittest.main() |
| OLD | NEW |