OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Unit tests for subprocess2.py.""" | 6 """Unit tests for subprocess2.py.""" |
7 | 7 |
8 import logging | 8 import logging |
9 import optparse | 9 import optparse |
10 import os | 10 import os |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
70 def _fake_Popen(self): | 70 def _fake_Popen(self): |
71 """Mocks the whole subprocess2.Popen class.""" | 71 """Mocks the whole subprocess2.Popen class.""" |
72 results = {} | 72 results = {} |
73 class fake_Popen(object): | 73 class fake_Popen(object): |
74 returncode = -8 | 74 returncode = -8 |
75 def __init__(self, args, **kwargs): | 75 def __init__(self, args, **kwargs): |
76 assert not results | 76 assert not results |
77 results.update(kwargs) | 77 results.update(kwargs) |
78 results['args'] = args | 78 results['args'] = args |
79 @staticmethod | 79 @staticmethod |
80 def communicate(): | 80 def communicate(input=None, timeout=None): # pylint: disable=W0622 |
81 return None, None | 81 return None, None |
82 self.mock(subprocess2, 'Popen', fake_Popen) | 82 self.mock(subprocess2, 'Popen', fake_Popen) |
83 return results | 83 return results |
84 | 84 |
85 def _fake_subprocess_Popen(self): | 85 def _fake_subprocess_Popen(self): |
86 """Mocks the base class subprocess.Popen only.""" | 86 """Mocks the base class subprocess.Popen only.""" |
87 results = {} | 87 results = {} |
88 def __init__(self, args, **kwargs): | 88 def __init__(self, args, **kwargs): |
89 assert not results | 89 assert not results |
90 results.update(kwargs) | 90 results.update(kwargs) |
(...skipping 237 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
328 function(noop, self.exe, True) | 328 function(noop, self.exe, True) |
329 function(noop, self.exe + ['--cr'], True) | 329 function(noop, self.exe + ['--cr'], True) |
330 function(noop, self.exe + ['--crlf'], True) | 330 function(noop, self.exe + ['--crlf'], True) |
331 | 331 |
332 def _check_res(self, res, stdout, stderr, returncode): | 332 def _check_res(self, res, stdout, stderr, returncode): |
333 (out, err), code = res | 333 (out, err), code = res |
334 self.assertEquals(stdout, out) | 334 self.assertEquals(stdout, out) |
335 self.assertEquals(stderr, err) | 335 self.assertEquals(stderr, err) |
336 self.assertEquals(returncode, code) | 336 self.assertEquals(returncode, code) |
337 | 337 |
| 338 def _check_exception(self, e, stdout, stderr, returncode): |
| 339 """On exception, look if the exception members are set correctly.""" |
| 340 self.assertEquals(returncode, e.returncode) |
| 341 self.assertEquals(stdout, e.stdout) |
| 342 self.assertEquals(stderr, e.stderr) |
| 343 |
338 def test_timeout(self): | 344 def test_timeout(self): |
339 # timeout doesn't exist in subprocess. | 345 # timeout doesn't exist in subprocess. |
340 def fn(c, e, un): | 346 def fn(c, e, un): |
341 res = subprocess2.communicate( | 347 res = subprocess2.communicate( |
342 self.exe + ['--sleep_first', '--stdout'], | 348 self.exe + ['--sleep_first', '--stdout'], |
343 timeout=0.01, | 349 timeout=0.01, |
344 stdout=PIPE, | 350 stdout=PIPE, |
345 shell=False) | 351 shell=False) |
346 self._check_res(res, '', None, TIMED_OUT) | 352 self._check_res(res, '', None, TIMED_OUT) |
347 self._run_test(fn) | 353 self._run_test(fn) |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
398 self._run_test(fn) | 404 self._run_test(fn) |
399 | 405 |
400 def test_check_output_redirect_stderr_to_stdout(self): | 406 def test_check_output_redirect_stderr_to_stdout(self): |
401 def fn(c, e, un): | 407 def fn(c, e, un): |
402 # stderr output into stdout but stdout is not piped. | 408 # stderr output into stdout but stdout is not piped. |
403 res = subprocess2.communicate( | 409 res = subprocess2.communicate( |
404 e + ['--stderr'], stderr=STDOUT, universal_newlines=un) | 410 e + ['--stderr'], stderr=STDOUT, universal_newlines=un) |
405 self._check_res(res, None, None, 0) | 411 self._check_res(res, None, None, 0) |
406 self._run_test(fn) | 412 self._run_test(fn) |
407 | 413 |
| 414 def test_check_output_tee_stderr(self): |
| 415 def fn(c, e, un): |
| 416 stderr = [] |
| 417 res = subprocess2.communicate( |
| 418 e + ['--stderr'], stderr=stderr.append, universal_newlines=un) |
| 419 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) |
| 420 self._check_res(res, None, None, 0) |
| 421 self._run_test(fn) |
| 422 |
| 423 def test_check_output_tee_stdout_stderr(self): |
| 424 def fn(c, e, un): |
| 425 stdout = [] |
| 426 stderr = [] |
| 427 res = subprocess2.communicate( |
| 428 e + ['--stdout', '--stderr'], |
| 429 stdout=stdout.append, |
| 430 stderr=stderr.append, |
| 431 universal_newlines=un) |
| 432 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) |
| 433 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) |
| 434 self._check_res(res, None, None, 0) |
| 435 self._run_test(fn) |
| 436 |
| 437 def test_check_output_tee_stdin(self): |
| 438 def fn(c, e, un): |
| 439 stdout = [] |
| 440 stdin = '0123456789' |
| 441 res = subprocess2.communicate( |
| 442 e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append, |
| 443 universal_newlines=un) |
| 444 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) |
| 445 self._check_res(res, None, None, 0) |
| 446 self._run_test(fn) |
| 447 |
| 448 def test_check_output_tee_throw(self): |
| 449 def fn(c, e, un): |
| 450 stderr = [] |
| 451 try: |
| 452 subprocess2.check_output( |
| 453 e + ['--stderr', '--fail'], stderr=stderr.append, |
| 454 universal_newlines=un) |
| 455 self.fail() |
| 456 except subprocess2.CalledProcessError, e: |
| 457 self._check_exception(e, '', None, 64) |
| 458 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) |
| 459 self._run_test(fn) |
| 460 |
| 461 def test_tee_timeout_stdout_void(self): |
| 462 def fn(c, e, un): |
| 463 stderr = [] |
| 464 res = subprocess2.communicate( |
| 465 e + ['--stdout', '--stderr', '--fail'], |
| 466 stdout=VOID, |
| 467 stderr=stderr.append, |
| 468 timeout=10, |
| 469 universal_newlines=un) |
| 470 self._check_res(res, None, None, 64) |
| 471 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) |
| 472 self._run_test(fn) |
| 473 |
| 474 def test_tee_timeout_stderr_void(self): |
| 475 def fn(c, e, un): |
| 476 stdout = [] |
| 477 res = subprocess2.communicate( |
| 478 e + ['--stdout', '--stderr', '--fail'], |
| 479 stdout=stdout.append, |
| 480 stderr=VOID, |
| 481 timeout=10, |
| 482 universal_newlines=un) |
| 483 self._check_res(res, None, None, 64) |
| 484 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) |
| 485 self._run_test(fn) |
| 486 |
| 487 def test_tee_timeout_stderr_stdout(self): |
| 488 def fn(c, e, un): |
| 489 stdout = [] |
| 490 res = subprocess2.communicate( |
| 491 e + ['--stdout', '--stderr', '--fail'], |
| 492 stdout=stdout.append, |
| 493 stderr=STDOUT, |
| 494 timeout=10, |
| 495 universal_newlines=un) |
| 496 self._check_res(res, None, None, 64) |
| 497 # Ordering is wrong due to buffering. |
| 498 self.assertEquals(c('a\nbb\nccc\nA\nBB\nCCC\n'), ''.join(stdout)) |
| 499 self._run_test(fn) |
| 500 |
| 501 def test_tee_large(self): |
| 502 stdout = [] |
| 503 # Read 128kb. On my workstation it takes >2s. Welcome to 2011. |
| 504 res = subprocess2.communicate(self.exe + ['--large'], stdout=stdout.append) |
| 505 self.assertEquals(128*1024, len(''.join(stdout))) |
| 506 self._check_res(res, None, None, 0) |
| 507 |
| 508 def test_tee_large_stdin(self): |
| 509 stdout = [] |
| 510 # Write 128kb. |
| 511 stdin = '0123456789abcdef' * (8*1024) |
| 512 res = subprocess2.communicate( |
| 513 self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) |
| 514 self.assertEquals(128*1024, len(''.join(stdout))) |
| 515 self._check_res(res, None, None, 0) |
| 516 |
| 517 def test_tee_throw(self): |
| 518 # Having a callback throwing up should not cause side-effects. It's a bit |
| 519 # hard to measure. |
| 520 class Blow(Exception): |
| 521 pass |
| 522 def blow(_): |
| 523 raise Blow() |
| 524 proc = subprocess2.Popen(self.exe + ['--stdout'], stdout=blow) |
| 525 try: |
| 526 proc.communicate() |
| 527 self.fail() |
| 528 except Blow: |
| 529 self.assertNotEquals(0, proc.returncode) |
| 530 |
408 | 531 |
409 def child_main(args): | 532 def child_main(args): |
410 if sys.platform == 'win32': | 533 if sys.platform == 'win32': |
411 # Annoying, make sure the output is not translated on Windows. | 534 # Annoying, make sure the output is not translated on Windows. |
412 # pylint: disable=E1101,F0401 | 535 # pylint: disable=E1101,F0401 |
413 import msvcrt | 536 import msvcrt |
414 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) | 537 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) |
415 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) | 538 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) |
416 | 539 |
417 parser = optparse.OptionParser() | 540 parser = optparse.OptionParser() |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
463 return options.return_value | 586 return options.return_value |
464 | 587 |
465 | 588 |
466 if __name__ == '__main__': | 589 if __name__ == '__main__': |
467 logging.basicConfig(level= | 590 logging.basicConfig(level= |
468 [logging.WARNING, logging.INFO, logging.DEBUG][ | 591 [logging.WARNING, logging.INFO, logging.DEBUG][ |
469 min(2, sys.argv.count('-v'))]) | 592 min(2, sys.argv.count('-v'))]) |
470 if len(sys.argv) > 1 and sys.argv[1] == '--child': | 593 if len(sys.argv) > 1 and sys.argv[1] == '--child': |
471 sys.exit(child_main(sys.argv[2:])) | 594 sys.exit(child_main(sys.argv[2:])) |
472 unittest.main() | 595 unittest.main() |
OLD | NEW |