| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 | 5 |
| 6 """Unit tests for subprocess2.py.""" | 6 """Unit tests for subprocess2.py.""" |
| 7 | 7 |
| 8 import logging | |
| 9 import optparse | 8 import optparse |
| 10 import os | 9 import os |
| 11 import sys | 10 import sys |
| 12 import time | 11 import time |
| 13 import unittest | 12 import unittest |
| 14 | 13 |
| 15 try: | |
| 16 import fcntl | |
| 17 except ImportError: | |
| 18 fcntl = None | |
| 19 | |
| 20 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | 14 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| 21 sys.path.insert(0, ROOT_DIR) | 15 sys.path.insert(0, ROOT_DIR) |
| 22 | 16 |
| 23 import subprocess2 | 17 import subprocess2 |
| 24 | 18 |
| 25 # Method could be a function | 19 # Method could be a function |
| 26 # pylint: disable=R0201 | 20 # pylint: disable=R0201 |
| 27 | 21 |
| 28 | 22 class Subprocess2Test(unittest.TestCase): |
| 29 def convert_to_crlf(string): | |
| 30 """Unconditionally convert LF to CRLF.""" | |
| 31 return string.replace('\n', '\r\n') | |
| 32 | |
| 33 | |
| 34 def convert_to_cr(string): | |
| 35 """Unconditionally convert LF to CR.""" | |
| 36 return string.replace('\n', '\r') | |
| 37 | |
| 38 | |
| 39 def convert_win(string): | |
| 40 """Converts string to CRLF on Windows only.""" | |
| 41 if sys.platform == 'win32': | |
| 42 return string.replace('\n', '\r\n') | |
| 43 return string | |
| 44 | |
| 45 | |
| 46 class DefaultsTest(unittest.TestCase): | |
| 47 # Can be mocked in a test. | 23 # Can be mocked in a test. |
| 48 TO_SAVE = { | 24 TO_SAVE = { |
| 49 subprocess2: [ | 25 subprocess2: [ |
| 50 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], | 26 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], |
| 51 subprocess2.subprocess: ['Popen'], | 27 subprocess2.subprocess: ['Popen'], |
| 52 } | 28 } |
| 53 | 29 |
| 54 def setUp(self): | 30 def setUp(self): |
| 31 self.exe_path = __file__ |
| 32 self.exe = [sys.executable, self.exe_path, '--child'] |
| 55 self.saved = {} | 33 self.saved = {} |
| 56 for module, names in self.TO_SAVE.iteritems(): | 34 for module, names in self.TO_SAVE.iteritems(): |
| 57 self.saved[module] = dict( | 35 self.saved[module] = dict( |
| 58 (name, getattr(module, name)) for name in names) | 36 (name, getattr(module, name)) for name in names) |
| 59 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they | 37 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they |
| 60 # can be trapped in the child process for better coverage. | 38 # can be trapped in the child process for better coverage. |
| 61 | 39 |
| 62 def tearDown(self): | 40 def tearDown(self): |
| 63 for module, saved in self.saved.iteritems(): | 41 for module, saved in self.saved.iteritems(): |
| 64 for name, value in saved.iteritems(): | 42 for name, value in saved.iteritems(): |
| (...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 161 # fake_communicate() doesn't 'implement' that. | 139 # fake_communicate() doesn't 'implement' that. |
| 162 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) | 140 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) |
| 163 expected = { | 141 expected = { |
| 164 'args': ['foo'], | 142 'args': ['foo'], |
| 165 'a':True, | 143 'a':True, |
| 166 'stdin': subprocess2.VOID, | 144 'stdin': subprocess2.VOID, |
| 167 'stdout': subprocess2.PIPE, | 145 'stdout': subprocess2.PIPE, |
| 168 } | 146 } |
| 169 self.assertEquals(expected, results) | 147 self.assertEquals(expected, results) |
| 170 | 148 |
| 171 def test_timeout_shell_throws(self): | |
| 172 # Never called. | |
| 173 _ = self._fake_Popen() | |
| 174 try: | |
| 175 subprocess2.communicate( | |
| 176 sys.executable, | |
| 177 timeout=0.01, | |
| 178 stdout=subprocess2.PIPE, | |
| 179 shell=True) | |
| 180 self.fail() | |
| 181 except TypeError: | |
| 182 pass | |
| 183 | |
| 184 | |
| 185 class S2Test(unittest.TestCase): | |
| 186 def setUp(self): | |
| 187 super(S2Test, self).setUp() | |
| 188 self.exe_path = __file__ | |
| 189 self.exe = [sys.executable, self.exe_path, '--child'] | |
| 190 self.states = {} | |
| 191 if fcntl: | |
| 192 for v in (sys.stdin, sys.stdout, sys.stderr): | |
| 193 fileno = v.fileno() | |
| 194 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) | |
| 195 | |
| 196 def tearDown(self): | |
| 197 for fileno, fl in self.states.iteritems(): | |
| 198 self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) | |
| 199 super(S2Test, self).tearDown() | |
| 200 | |
| 201 def _run_test(self, function): | |
| 202 """Runs tests in 6 combinations: | |
| 203 - LF output with universal_newlines=False | |
| 204 - CR output with universal_newlines=False | |
| 205 - CRLF output with universal_newlines=False | |
| 206 - LF output with universal_newlines=True | |
| 207 - CR output with universal_newlines=True | |
| 208 - CRLF output with universal_newlines=True | |
| 209 | |
| 210 First |function| argument is the convertion for the origianl expected LF | |
| 211 string to the right EOL. | |
| 212 Second |function| argument is the executable and initial flag to run, to | |
| 213 control what EOL is used by the child process. | |
| 214 Third |function| argument is universal_newlines value. | |
| 215 """ | |
| 216 noop = lambda x: x | |
| 217 function(noop, self.exe, False) | |
| 218 function(convert_to_cr, self.exe + ['--cr'], False) | |
| 219 function(convert_to_crlf, self.exe + ['--crlf'], False) | |
| 220 function(noop, self.exe, True) | |
| 221 function(noop, self.exe + ['--cr'], True) | |
| 222 function(noop, self.exe + ['--crlf'], True) | |
| 223 | |
| 224 def test_timeout(self): | 149 def test_timeout(self): |
| 150 # It'd be better to not discard stdout. |
| 225 out, returncode = subprocess2.communicate( | 151 out, returncode = subprocess2.communicate( |
| 226 self.exe + ['--sleep_first', '--stdout'], | 152 self.exe + ['--sleep', '--stdout'], |
| 227 timeout=0.01, | 153 timeout=0.01, |
| 228 stdout=subprocess2.PIPE, | 154 stdout=subprocess2.PIPE) |
| 229 shell=False) | |
| 230 self.assertEquals(subprocess2.TIMED_OUT, returncode) | 155 self.assertEquals(subprocess2.TIMED_OUT, returncode) |
| 231 self.assertEquals(('', None), out) | 156 self.assertEquals(['', None], out) |
| 232 | 157 |
| 233 def test_check_output_no_stdout(self): | 158 def test_check_output_no_stdout(self): |
| 234 try: | 159 try: |
| 235 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) | 160 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) |
| 236 self.fail() | 161 self.fail() |
| 237 except TypeError: | 162 except TypeError: |
| 238 pass | 163 pass |
| 239 | 164 |
| 240 def test_stdout_void(self): | 165 def test_stdout_void(self): |
| 241 def fn(c, e, un): | 166 (out, err), code = subprocess2.communicate( |
| 242 (out, err), code = subprocess2.communicate( | 167 self.exe + ['--stdout', '--stderr'], |
| 243 e + ['--stdout', '--stderr'], | 168 stdout=subprocess2.VOID, |
| 244 stdout=subprocess2.VOID, | 169 stderr=subprocess2.PIPE) |
| 245 stderr=subprocess2.PIPE, | 170 self.assertEquals(None, out) |
| 246 universal_newlines=un) | 171 expected = 'a\nbb\nccc\n' |
| 247 self.assertEquals(None, out) | 172 if sys.platform == 'win32': |
| 248 self.assertEquals(c('a\nbb\nccc\n'), err) | 173 expected = expected.replace('\n', '\r\n') |
| 249 self.assertEquals(0, code) | 174 self.assertEquals(expected, err) |
| 250 self._run_test(fn) | 175 self.assertEquals(0, code) |
| 251 | 176 |
| 252 def test_stderr_void(self): | 177 def test_stderr_void(self): |
| 253 def fn(c, e, un): | 178 (out, err), code = subprocess2.communicate( |
| 254 (out, err), code = subprocess2.communicate( | 179 self.exe + ['--stdout', '--stderr'], |
| 255 e + ['--stdout', '--stderr'], | 180 universal_newlines=True, |
| 256 stdout=subprocess2.PIPE, | 181 stdout=subprocess2.PIPE, |
| 257 stderr=subprocess2.VOID, | 182 stderr=subprocess2.VOID) |
| 258 universal_newlines=un) | 183 self.assertEquals('A\nBB\nCCC\n', out) |
| 259 self.assertEquals(c('A\nBB\nCCC\n'), out) | 184 self.assertEquals(None, err) |
| 260 self.assertEquals(None, err) | 185 self.assertEquals(0, code) |
| 261 self.assertEquals(0, code) | |
| 262 self._run_test(fn) | |
| 263 | 186 |
| 264 def test_check_output_throw_stdout(self): | 187 def test_check_output_throw_stdout(self): |
| 265 def fn(c, e, un): | 188 try: |
| 266 try: | 189 subprocess2.check_output( |
| 267 subprocess2.check_output( | 190 self.exe + ['--fail', '--stdout'], universal_newlines=True) |
| 268 e + ['--fail', '--stdout'], universal_newlines=un) | 191 self.fail() |
| 269 self.fail() | 192 except subprocess2.CalledProcessError, e: |
| 270 except subprocess2.CalledProcessError, e: | 193 self.assertEquals('A\nBB\nCCC\n', e.stdout) |
| 271 self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) | 194 self.assertEquals(None, e.stderr) |
| 272 self.assertEquals(None, e.stderr) | 195 self.assertEquals(64, e.returncode) |
| 273 self.assertEquals(64, e.returncode) | |
| 274 self._run_test(fn) | |
| 275 | 196 |
| 276 def test_check_output_throw_no_stderr(self): | 197 def test_check_output_throw_no_stderr(self): |
| 277 def fn(c, e, un): | 198 try: |
| 278 try: | 199 subprocess2.check_output( |
| 279 subprocess2.check_output( | 200 self.exe + ['--fail', '--stderr'], universal_newlines=True) |
| 280 e + ['--fail', '--stderr'], universal_newlines=un) | 201 self.fail() |
| 281 self.fail() | 202 except subprocess2.CalledProcessError, e: |
| 282 except subprocess2.CalledProcessError, e: | 203 self.assertEquals('', e.stdout) |
| 283 self.assertEquals(c(''), e.stdout) | 204 self.assertEquals(None, e.stderr) |
| 284 self.assertEquals(None, e.stderr) | 205 self.assertEquals(64, e.returncode) |
| 285 self.assertEquals(64, e.returncode) | |
| 286 self._run_test(fn) | |
| 287 | 206 |
| 288 def test_check_output_throw_stderr(self): | 207 def test_check_output_throw_stderr(self): |
| 289 def fn(c, e, un): | 208 try: |
| 290 try: | 209 subprocess2.check_output( |
| 291 subprocess2.check_output( | 210 self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE, |
| 292 e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, | 211 universal_newlines=True) |
| 293 universal_newlines=un) | 212 self.fail() |
| 294 self.fail() | 213 except subprocess2.CalledProcessError, e: |
| 295 except subprocess2.CalledProcessError, e: | 214 self.assertEquals('', e.stdout) |
| 296 self.assertEquals('', e.stdout) | 215 self.assertEquals('a\nbb\nccc\n', e.stderr) |
| 297 self.assertEquals(c('a\nbb\nccc\n'), e.stderr) | 216 self.assertEquals(64, e.returncode) |
| 298 self.assertEquals(64, e.returncode) | |
| 299 self._run_test(fn) | |
| 300 | 217 |
| 301 def test_check_output_throw_stderr_stdout(self): | 218 def test_check_output_throw_stderr_stdout(self): |
| 302 def fn(c, e, un): | 219 try: |
| 303 try: | 220 subprocess2.check_output( |
| 304 subprocess2.check_output( | 221 self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, |
| 305 e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, | 222 universal_newlines=True) |
| 306 universal_newlines=un) | 223 self.fail() |
| 307 self.fail() | 224 except subprocess2.CalledProcessError, e: |
| 308 except subprocess2.CalledProcessError, e: | 225 self.assertEquals('a\nbb\nccc\n', e.stdout) |
| 309 self.assertEquals(c('a\nbb\nccc\n'), e.stdout) | 226 self.assertEquals(None, e.stderr) |
| 310 self.assertEquals(None, e.stderr) | 227 self.assertEquals(64, e.returncode) |
| 311 self.assertEquals(64, e.returncode) | |
| 312 self._run_test(fn) | |
| 313 | 228 |
| 314 def test_check_call_throw(self): | 229 def test_check_call_throw(self): |
| 315 try: | 230 try: |
| 316 subprocess2.check_call(self.exe + ['--fail', '--stderr']) | 231 subprocess2.check_call(self.exe + ['--fail', '--stderr']) |
| 317 self.fail() | 232 self.fail() |
| 318 except subprocess2.CalledProcessError, e: | 233 except subprocess2.CalledProcessError, e: |
| 319 self.assertEquals(None, e.stdout) | 234 self.assertEquals(None, e.stdout) |
| 320 self.assertEquals(None, e.stderr) | 235 self.assertEquals(None, e.stderr) |
| 321 self.assertEquals(64, e.returncode) | 236 self.assertEquals(64, e.returncode) |
| 322 | 237 |
| 323 def test_check_output_tee_stderr(self): | |
| 324 def fn(c, e, un): | |
| 325 stderr = [] | |
| 326 out, returncode = subprocess2.communicate( | |
| 327 e + ['--stderr'], stderr=stderr.append, | |
| 328 universal_newlines=un) | |
| 329 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) | |
| 330 self.assertEquals((None, None), out) | |
| 331 self.assertEquals(0, returncode) | |
| 332 self._run_test(fn) | |
| 333 | |
| 334 def test_check_output_tee_stdout_stderr(self): | |
| 335 def fn(c, e, un): | |
| 336 stdout = [] | |
| 337 stderr = [] | |
| 338 out, returncode = subprocess2.communicate( | |
| 339 e + ['--stdout', '--stderr'], | |
| 340 stdout=stdout.append, | |
| 341 stderr=stderr.append, | |
| 342 universal_newlines=un) | |
| 343 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) | |
| 344 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) | |
| 345 self.assertEquals((None, None), out) | |
| 346 self.assertEquals(0, returncode) | |
| 347 self._run_test(fn) | |
| 348 | |
| 349 def test_check_output_tee_stdin(self): | |
| 350 def fn(c, e, un): | |
| 351 stdout = [] | |
| 352 stdin = '0123456789' | |
| 353 out, returncode = subprocess2.communicate( | |
| 354 e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append, | |
| 355 universal_newlines=un) | |
| 356 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) | |
| 357 self.assertEquals((None, None), out) | |
| 358 self.assertEquals(0, returncode) | |
| 359 self._run_test(fn) | |
| 360 | |
| 361 def test_check_output_tee_throw(self): | |
| 362 def fn(c, e, un): | |
| 363 stderr = [] | |
| 364 try: | |
| 365 subprocess2.check_output( | |
| 366 e + ['--stderr', '--fail'], stderr=stderr.append, | |
| 367 universal_newlines=un) | |
| 368 self.fail() | |
| 369 except subprocess2.CalledProcessError, e: | |
| 370 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) | |
| 371 self.assertEquals('', e.stdout) | |
| 372 self.assertEquals(None, e.stderr) | |
| 373 self.assertEquals(64, e.returncode) | |
| 374 self._run_test(fn) | |
| 375 | |
| 376 def test_check_output_tee_large(self): | |
| 377 stdout = [] | |
| 378 # Read 128kb. On my workstation it takes >2s. Welcome to 2011. | |
| 379 out, returncode = subprocess2.communicate( | |
| 380 self.exe + ['--large'], stdout=stdout.append) | |
| 381 self.assertEquals(128*1024, len(''.join(stdout))) | |
| 382 self.assertEquals((None, None), out) | |
| 383 self.assertEquals(0, returncode) | |
| 384 | |
| 385 def test_check_output_tee_large_stdin(self): | |
| 386 stdout = [] | |
| 387 # Write 128kb. | |
| 388 stdin = '0123456789abcdef' * (8*1024) | |
| 389 out, returncode = subprocess2.communicate( | |
| 390 self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) | |
| 391 self.assertEquals(128*1024, len(''.join(stdout))) | |
| 392 self.assertEquals((None, None), out) | |
| 393 self.assertEquals(0, returncode) | |
| 394 | |
| 395 | 238 |
| 396 def child_main(args): | 239 def child_main(args): |
| 397 if sys.platform == 'win32': | |
| 398 # Annoying, make sure the output is not translated on Windows. | |
| 399 # pylint: disable=E1101,F0401 | |
| 400 import msvcrt | |
| 401 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) | |
| 402 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) | |
| 403 | |
| 404 parser = optparse.OptionParser() | 240 parser = optparse.OptionParser() |
| 405 parser.add_option( | 241 parser.add_option( |
| 406 '--fail', | 242 '--fail', |
| 407 dest='return_value', | 243 dest='return_value', |
| 408 action='store_const', | 244 action='store_const', |
| 409 default=0, | 245 default=0, |
| 410 const=64) | 246 const=64) |
| 411 parser.add_option( | |
| 412 '--crlf', action='store_const', const='\r\n', dest='eol', default='\n') | |
| 413 parser.add_option( | |
| 414 '--cr', action='store_const', const='\r', dest='eol') | |
| 415 parser.add_option('--stdout', action='store_true') | 247 parser.add_option('--stdout', action='store_true') |
| 416 parser.add_option('--stderr', action='store_true') | 248 parser.add_option('--stderr', action='store_true') |
| 417 parser.add_option('--sleep_first', action='store_true') | 249 parser.add_option('--sleep', action='store_true') |
| 418 parser.add_option('--sleep_last', action='store_true') | |
| 419 parser.add_option('--large', action='store_true') | |
| 420 parser.add_option('--read', action='store_true') | |
| 421 options, args = parser.parse_args(args) | 250 options, args = parser.parse_args(args) |
| 422 if args: | 251 if args: |
| 423 parser.error('Internal error') | 252 parser.error('Internal error') |
| 424 if options.sleep_first: | |
| 425 time.sleep(10) | |
| 426 | 253 |
| 427 def do(string): | 254 def do(string): |
| 428 if options.stdout: | 255 if options.stdout: |
| 429 sys.stdout.write(string.upper()) | 256 print >> sys.stdout, string.upper() |
| 430 sys.stdout.write(options.eol) | |
| 431 if options.stderr: | 257 if options.stderr: |
| 432 sys.stderr.write(string.lower()) | 258 print >> sys.stderr, string.lower() |
| 433 sys.stderr.write(options.eol) | |
| 434 | 259 |
| 435 do('A') | 260 do('A') |
| 436 do('BB') | 261 do('BB') |
| 437 do('CCC') | 262 do('CCC') |
| 438 if options.large: | 263 if options.sleep: |
| 439 # Print 128kb. | |
| 440 string = '0123456789abcdef' * (8*1024) | |
| 441 sys.stdout.write(string) | |
| 442 if options.read: | |
| 443 try: | |
| 444 while sys.stdin.read(): | |
| 445 pass | |
| 446 except OSError: | |
| 447 pass | |
| 448 if options.sleep_last: | |
| 449 time.sleep(10) | 264 time.sleep(10) |
| 450 return options.return_value | 265 return options.return_value |
| 451 | 266 |
| 452 | 267 |
| 453 if __name__ == '__main__': | 268 if __name__ == '__main__': |
| 454 logging.basicConfig(level= | |
| 455 [logging.WARNING, logging.INFO, logging.DEBUG][ | |
| 456 min(2, sys.argv.count('-v'))]) | |
| 457 if len(sys.argv) > 1 and sys.argv[1] == '--child': | 269 if len(sys.argv) > 1 and sys.argv[1] == '--child': |
| 458 sys.exit(child_main(sys.argv[2:])) | 270 sys.exit(child_main(sys.argv[2:])) |
| 459 unittest.main() | 271 unittest.main() |
| OLD | NEW |