OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Unit tests for subprocess2.py.""" | 6 """Unit tests for subprocess2.py.""" |
7 | 7 |
8 import logging | 8 import logging |
9 import optparse | 9 import optparse |
10 import os | 10 import os |
11 import sys | 11 import sys |
12 import time | 12 import time |
13 import unittest | 13 import unittest |
14 | 14 |
15 try: | 15 try: |
16 import fcntl | 16 import fcntl |
17 except ImportError: | 17 except ImportError: |
18 fcntl = None | 18 fcntl = None |
19 | 19 |
20 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | 20 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
21 | 21 |
| 22 import subprocess |
22 import subprocess2 | 23 import subprocess2 |
23 | 24 |
24 from testing_support import auto_stub | 25 from testing_support import auto_stub |
25 | 26 |
26 # Method could be a function | 27 # Method could be a function |
27 # pylint: disable=R0201 | 28 # pylint: disable=R0201 |
28 | 29 |
29 | 30 |
30 def convert_to_crlf(string): | 31 def convert_to_crlf(string): |
31 """Unconditionally convert LF to CRLF.""" | 32 """Unconditionally convert LF to CRLF.""" |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
75 | 76 |
76 def _fake_subprocess_Popen(self): | 77 def _fake_subprocess_Popen(self): |
77 """Mocks the base class subprocess.Popen only.""" | 78 """Mocks the base class subprocess.Popen only.""" |
78 results = {} | 79 results = {} |
79 def __init__(self, args, **kwargs): | 80 def __init__(self, args, **kwargs): |
80 assert not results | 81 assert not results |
81 results.update(kwargs) | 82 results.update(kwargs) |
82 results['args'] = args | 83 results['args'] = args |
83 def communicate(): | 84 def communicate(): |
84 return None, None | 85 return None, None |
85 self.mock(subprocess2.subprocess.Popen, '__init__', __init__) | 86 self.mock(subprocess.Popen, '__init__', __init__) |
86 self.mock(subprocess2.subprocess.Popen, 'communicate', communicate) | 87 self.mock(subprocess.Popen, 'communicate', communicate) |
87 return results | 88 return results |
88 | 89 |
89 def test_check_call_defaults(self): | 90 def test_check_call_defaults(self): |
90 results = self._fake_communicate() | 91 results = self._fake_communicate() |
91 self.assertEquals( | 92 self.assertEquals( |
92 ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True)) | 93 ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True)) |
93 expected = { | 94 expected = { |
94 'args': ['foo'], | 95 'args': ['foo'], |
95 'a':True, | 96 'a':True, |
96 } | 97 } |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
151 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) | 152 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) |
152 expected = { | 153 expected = { |
153 'args': ['foo'], | 154 'args': ['foo'], |
154 'a':True, | 155 'a':True, |
155 'stdin': subprocess2.VOID, | 156 'stdin': subprocess2.VOID, |
156 'stdout': subprocess2.PIPE, | 157 'stdout': subprocess2.PIPE, |
157 } | 158 } |
158 self.assertEquals(expected, results) | 159 self.assertEquals(expected, results) |
159 | 160 |
160 | 161 |
161 class S2Test(unittest.TestCase): | 162 class BaseTestCase(unittest.TestCase): |
162 def setUp(self): | 163 def setUp(self): |
163 super(S2Test, self).setUp() | 164 super(BaseTestCase, self).setUp() |
164 self.exe_path = __file__ | 165 self.exe_path = __file__ |
165 self.exe = [sys.executable, self.exe_path, '--child'] | 166 self.exe = [sys.executable, self.exe_path, '--child'] |
166 self.states = {} | 167 self.states = {} |
167 if fcntl: | 168 if fcntl: |
168 for v in (sys.stdin, sys.stdout, sys.stderr): | 169 for v in (sys.stdin, sys.stdout, sys.stderr): |
169 fileno = v.fileno() | 170 fileno = v.fileno() |
170 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) | 171 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) |
171 | 172 |
172 def tearDown(self): | 173 def tearDown(self): |
173 for fileno, fl in self.states.iteritems(): | 174 for fileno, fl in self.states.iteritems(): |
174 self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) | 175 self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) |
175 super(S2Test, self).tearDown() | 176 super(BaseTestCase, self).tearDown() |
176 | 177 |
| 178 |
| 179 class RegressionTest(BaseTestCase): |
| 180 # Regression tests to ensure that subprocess and subprocess2 have the same |
| 181 # behavior. |
| 182 def _run_test(self, function): |
| 183 """Runs tests in 12 combinations: |
| 184 - LF output with universal_newlines=False |
| 185 - CR output with universal_newlines=False |
| 186 - CRLF output with universal_newlines=False |
| 187 - LF output with universal_newlines=True |
| 188 - CR output with universal_newlines=True |
| 189 - CRLF output with universal_newlines=True |
| 190 |
| 191 Once with subprocess, once with subprocess2. |
| 192 |
| 193 First |function| argument is the conversion for the original expected LF |
| 194 string to the right EOL. |
| 195 Second |function| argument is the executable and initial flag to run, to |
| 196 control what EOL is used by the child process. |
| 197 Third |function| argument is universal_newlines value. |
| 198 """ |
| 199 noop = lambda x: x |
| 200 for subp in (subprocess, subprocess2): |
| 201 function(noop, self.exe, False, subp) |
| 202 function(convert_to_cr, self.exe + ['--cr'], False, subp) |
| 203 function(convert_to_crlf, self.exe + ['--crlf'], False, subp) |
| 204 function(noop, self.exe, True, subp) |
| 205 function(noop, self.exe + ['--cr'], True, subp) |
| 206 function(noop, self.exe + ['--crlf'], True, subp) |
| 207 |
| 208 def _check_pipes(self, subp, e, stdout, stderr): |
| 209 """On exception, look if the exception members are set correctly.""" |
| 210 if subp is subprocess: |
| 211 # subprocess never save the output. |
| 212 self.assertFalse(hasattr(e, 'stdout')) |
| 213 self.assertFalse(hasattr(e, 'stderr')) |
| 214 elif subp is subprocess2: |
| 215 self.assertEquals(stdout, e.stdout) |
| 216 self.assertEquals(stderr, e.stderr) |
| 217 else: |
| 218 self.fail() |
| 219 |
| 220 def test_check_output_no_stdout(self): |
| 221 try: |
| 222 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) |
| 223 self.fail() |
| 224 except ValueError: |
| 225 pass |
| 226 |
| 227 if (sys.version_info[0] * 10 + sys.version_info[1]) >= 27: |
| 228 # python 2.7+ |
| 229 try: |
| 230 # pylint: disable=E1101 |
| 231 subprocess.check_output(self.exe, stdout=subprocess.PIPE) |
| 232 self.fail() |
| 233 except ValueError: |
| 234 pass |
| 235 |
| 236 def test_check_output_throw_stdout(self): |
| 237 def fn(c, e, un, subp): |
| 238 if not hasattr(subp, 'check_output'): |
| 239 return |
| 240 try: |
| 241 subp.check_output( |
| 242 e + ['--fail', '--stdout'], universal_newlines=un) |
| 243 self.fail() |
| 244 except subp.CalledProcessError, e: |
| 245 self._check_pipes(subp, e, c('A\nBB\nCCC\n'), None) |
| 246 self.assertEquals(64, e.returncode) |
| 247 self._run_test(fn) |
| 248 |
| 249 def test_check_output_throw_no_stderr(self): |
| 250 def fn(c, e, un, subp): |
| 251 if not hasattr(subp, 'check_output'): |
| 252 return |
| 253 try: |
| 254 subp.check_output( |
| 255 e + ['--fail', '--stderr'], universal_newlines=un) |
| 256 self.fail() |
| 257 except subp.CalledProcessError, e: |
| 258 self._check_pipes(subp, e, c(''), None) |
| 259 self.assertEquals(64, e.returncode) |
| 260 self._run_test(fn) |
| 261 |
| 262 def test_check_output_throw_stderr(self): |
| 263 def fn(c, e, un, subp): |
| 264 if not hasattr(subp, 'check_output'): |
| 265 return |
| 266 try: |
| 267 subp.check_output( |
| 268 e + ['--fail', '--stderr'], |
| 269 stderr=subp.PIPE, |
| 270 universal_newlines=un) |
| 271 self.fail() |
| 272 except subp.CalledProcessError, e: |
| 273 self._check_pipes(subp, e, '', c('a\nbb\nccc\n')) |
| 274 self.assertEquals(64, e.returncode) |
| 275 self._run_test(fn) |
| 276 |
| 277 def test_check_output_throw_stderr_stdout(self): |
| 278 def fn(c, e, un, subp): |
| 279 if not hasattr(subp, 'check_output'): |
| 280 return |
| 281 try: |
| 282 subp.check_output( |
| 283 e + ['--fail', '--stderr'], |
| 284 stderr=subp.STDOUT, |
| 285 universal_newlines=un) |
| 286 self.fail() |
| 287 except subp.CalledProcessError, e: |
| 288 self._check_pipes(subp, e, c('a\nbb\nccc\n'), None) |
| 289 self.assertEquals(64, e.returncode) |
| 290 self._run_test(fn) |
| 291 |
| 292 def test_check_call_throw(self): |
| 293 for subp in (subprocess, subprocess2): |
| 294 try: |
| 295 subp.check_call(self.exe + ['--fail', '--stderr']) |
| 296 self.fail() |
| 297 except subp.CalledProcessError, e: |
| 298 self._check_pipes(subp, e, None, None) |
| 299 self.assertEquals(64, e.returncode) |
| 300 |
| 301 |
| 302 class S2Test(BaseTestCase): |
| 303 # Tests that can only run in subprocess2, e.g. new functionalities. |
| 304 # In particular, subprocess2.communicate() doesn't exist in subprocess. |
177 def _run_test(self, function): | 305 def _run_test(self, function): |
178 """Runs tests in 6 combinations: | 306 """Runs tests in 6 combinations: |
179 - LF output with universal_newlines=False | 307 - LF output with universal_newlines=False |
180 - CR output with universal_newlines=False | 308 - CR output with universal_newlines=False |
181 - CRLF output with universal_newlines=False | 309 - CRLF output with universal_newlines=False |
182 - LF output with universal_newlines=True | 310 - LF output with universal_newlines=True |
183 - CR output with universal_newlines=True | 311 - CR output with universal_newlines=True |
184 - CRLF output with universal_newlines=True | 312 - CRLF output with universal_newlines=True |
185 | 313 |
186 First |function| argument is the convertion for the origianl expected LF | 314 First |function| argument is the convertion for the origianl expected LF |
187 string to the right EOL. | 315 string to the right EOL. |
188 Second |function| argument is the executable and initial flag to run, to | 316 Second |function| argument is the executable and initial flag to run, to |
189 control what EOL is used by the child process. | 317 control what EOL is used by the child process. |
190 Third |function| argument is universal_newlines value. | 318 Third |function| argument is universal_newlines value. |
191 """ | 319 """ |
192 noop = lambda x: x | 320 noop = lambda x: x |
193 function(noop, self.exe, False) | 321 function(noop, self.exe, False) |
194 function(convert_to_cr, self.exe + ['--cr'], False) | 322 function(convert_to_cr, self.exe + ['--cr'], False) |
195 function(convert_to_crlf, self.exe + ['--crlf'], False) | 323 function(convert_to_crlf, self.exe + ['--crlf'], False) |
196 function(noop, self.exe, True) | 324 function(noop, self.exe, True) |
197 function(noop, self.exe + ['--cr'], True) | 325 function(noop, self.exe + ['--cr'], True) |
198 function(noop, self.exe + ['--crlf'], True) | 326 function(noop, self.exe + ['--crlf'], True) |
199 | 327 |
200 def test_timeout(self): | 328 def test_timeout(self): |
201 out, returncode = subprocess2.communicate( | 329 # timeout doesn't exist in subprocess. |
202 self.exe + ['--sleep_first', '--stdout'], | 330 def fn(c, e, un): |
203 timeout=0.01, | 331 out, returncode = subprocess2.communicate( |
204 stdout=subprocess2.PIPE, | 332 self.exe + ['--sleep_first', '--stdout'], |
205 shell=False) | 333 timeout=0.01, |
206 self.assertEquals(subprocess2.TIMED_OUT, returncode) | 334 stdout=subprocess2.PIPE, |
207 self.assertEquals(('', None), out) | 335 shell=False) |
208 | 336 self.assertEquals(subprocess2.TIMED_OUT, returncode) |
209 def test_check_output_no_stdout(self): | 337 self.assertEquals(('', None), out) |
210 try: | 338 self._run_test(fn) |
211 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) | |
212 self.fail() | |
213 except TypeError: | |
214 pass | |
215 | 339 |
216 def test_stdout_void(self): | 340 def test_stdout_void(self): |
217 def fn(c, e, un): | 341 def fn(c, e, un): |
218 (out, err), code = subprocess2.communicate( | 342 (out, err), code = subprocess2.communicate( |
219 e + ['--stdout', '--stderr'], | 343 e + ['--stdout', '--stderr'], |
220 stdout=subprocess2.VOID, | 344 stdout=subprocess2.VOID, |
221 stderr=subprocess2.PIPE, | 345 stderr=subprocess2.PIPE, |
222 universal_newlines=un) | 346 universal_newlines=un) |
223 self.assertEquals(None, out) | 347 self.assertEquals(None, out) |
224 self.assertEquals(c('a\nbb\nccc\n'), err) | 348 self.assertEquals(c('a\nbb\nccc\n'), err) |
(...skipping 30 matching lines...) Expand all Loading... |
255 (out, err), code = subprocess2.communicate( | 379 (out, err), code = subprocess2.communicate( |
256 e + ['--stderr'], | 380 e + ['--stderr'], |
257 stderr=subprocess2.STDOUT, | 381 stderr=subprocess2.STDOUT, |
258 universal_newlines=un) | 382 universal_newlines=un) |
259 # stderr output into stdout but stdout is not piped. | 383 # stderr output into stdout but stdout is not piped. |
260 self.assertEquals(None, out) | 384 self.assertEquals(None, out) |
261 self.assertEquals(None, err) | 385 self.assertEquals(None, err) |
262 self.assertEquals(0, code) | 386 self.assertEquals(0, code) |
263 self._run_test(fn) | 387 self._run_test(fn) |
264 | 388 |
265 def test_check_output_throw_stdout(self): | |
266 def fn(c, e, un): | |
267 try: | |
268 subprocess2.check_output( | |
269 e + ['--fail', '--stdout'], universal_newlines=un) | |
270 self.fail() | |
271 except subprocess2.CalledProcessError, e: | |
272 self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) | |
273 self.assertEquals(None, e.stderr) | |
274 self.assertEquals(64, e.returncode) | |
275 self._run_test(fn) | |
276 | |
277 def test_check_output_throw_no_stderr(self): | |
278 def fn(c, e, un): | |
279 try: | |
280 subprocess2.check_output( | |
281 e + ['--fail', '--stderr'], universal_newlines=un) | |
282 self.fail() | |
283 except subprocess2.CalledProcessError, e: | |
284 self.assertEquals(c(''), e.stdout) | |
285 self.assertEquals(None, e.stderr) | |
286 self.assertEquals(64, e.returncode) | |
287 self._run_test(fn) | |
288 | |
289 def test_check_output_throw_stderr(self): | |
290 def fn(c, e, un): | |
291 try: | |
292 subprocess2.check_output( | |
293 e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, | |
294 universal_newlines=un) | |
295 self.fail() | |
296 except subprocess2.CalledProcessError, e: | |
297 self.assertEquals('', e.stdout) | |
298 self.assertEquals(c('a\nbb\nccc\n'), e.stderr) | |
299 self.assertEquals(64, e.returncode) | |
300 self._run_test(fn) | |
301 | |
302 def test_check_output_throw_stderr_stdout(self): | |
303 def fn(c, e, un): | |
304 try: | |
305 subprocess2.check_output( | |
306 e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, | |
307 universal_newlines=un) | |
308 self.fail() | |
309 except subprocess2.CalledProcessError, e: | |
310 self.assertEquals(c('a\nbb\nccc\n'), e.stdout) | |
311 self.assertEquals(None, e.stderr) | |
312 self.assertEquals(64, e.returncode) | |
313 self._run_test(fn) | |
314 | |
315 def test_check_call_throw(self): | |
316 try: | |
317 subprocess2.check_call(self.exe + ['--fail', '--stderr']) | |
318 self.fail() | |
319 except subprocess2.CalledProcessError, e: | |
320 self.assertEquals(None, e.stdout) | |
321 self.assertEquals(None, e.stderr) | |
322 self.assertEquals(64, e.returncode) | |
323 | |
324 | 389 |
325 def child_main(args): | 390 def child_main(args): |
326 if sys.platform == 'win32': | 391 if sys.platform == 'win32': |
327 # Annoying, make sure the output is not translated on Windows. | 392 # Annoying, make sure the output is not translated on Windows. |
328 # pylint: disable=E1101,F0401 | 393 # pylint: disable=E1101,F0401 |
329 import msvcrt | 394 import msvcrt |
330 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) | 395 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) |
331 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) | 396 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) |
332 | 397 |
333 parser = optparse.OptionParser() | 398 parser = optparse.OptionParser() |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
379 return options.return_value | 444 return options.return_value |
380 | 445 |
381 | 446 |
382 if __name__ == '__main__': | 447 if __name__ == '__main__': |
383 logging.basicConfig(level= | 448 logging.basicConfig(level= |
384 [logging.WARNING, logging.INFO, logging.DEBUG][ | 449 [logging.WARNING, logging.INFO, logging.DEBUG][ |
385 min(2, sys.argv.count('-v'))]) | 450 min(2, sys.argv.count('-v'))]) |
386 if len(sys.argv) > 1 and sys.argv[1] == '--child': | 451 if len(sys.argv) > 1 and sys.argv[1] == '--child': |
387 sys.exit(child_main(sys.argv[2:])) | 452 sys.exit(child_main(sys.argv[2:])) |
388 unittest.main() | 453 unittest.main() |
OLD | NEW |