OLD | NEW |
---|---|
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Unit tests for subprocess2.py.""" | 6 """Unit tests for subprocess2.py.""" |
7 | 7 |
8 import logging | 8 import logging |
9 import optparse | 9 import optparse |
10 import os | 10 import os |
11 import sys | 11 import sys |
12 import time | 12 import time |
13 import unittest | 13 import unittest |
14 | 14 |
15 try: | 15 try: |
16 import fcntl | 16 import fcntl |
17 except ImportError: | 17 except ImportError: |
18 fcntl = None | 18 fcntl = None |
19 | 19 |
20 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | 20 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
21 | 21 |
22 import subprocess | |
22 import subprocess2 | 23 import subprocess2 |
23 | 24 |
24 from testing_support import auto_stub | 25 from testing_support import auto_stub |
25 | 26 |
26 # Method could be a function | 27 # Method could be a function |
27 # pylint: disable=R0201 | 28 # pylint: disable=R0201 |
28 | 29 |
29 | 30 |
30 def convert_to_crlf(string): | 31 def convert_to_crlf(string): |
31 """Unconditionally convert LF to CRLF.""" | 32 """Unconditionally convert LF to CRLF.""" |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
75 | 76 |
76 def _fake_subprocess_Popen(self): | 77 def _fake_subprocess_Popen(self): |
77 """Mocks the base class subprocess.Popen only.""" | 78 """Mocks the base class subprocess.Popen only.""" |
78 results = {} | 79 results = {} |
79 def __init__(self, args, **kwargs): | 80 def __init__(self, args, **kwargs): |
80 assert not results | 81 assert not results |
81 results.update(kwargs) | 82 results.update(kwargs) |
82 results['args'] = args | 83 results['args'] = args |
83 def communicate(): | 84 def communicate(): |
84 return None, None | 85 return None, None |
85 self.mock(subprocess2.subprocess.Popen, '__init__', __init__) | 86 self.mock(subprocess.Popen, '__init__', __init__) |
86 self.mock(subprocess2.subprocess.Popen, 'communicate', communicate) | 87 self.mock(subprocess.Popen, 'communicate', communicate) |
87 return results | 88 return results |
88 | 89 |
89 def test_check_call_defaults(self): | 90 def test_check_call_defaults(self): |
90 results = self._fake_communicate() | 91 results = self._fake_communicate() |
91 self.assertEquals( | 92 self.assertEquals( |
92 ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True)) | 93 ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True)) |
93 expected = { | 94 expected = { |
94 'args': ['foo'], | 95 'args': ['foo'], |
95 'a':True, | 96 'a':True, |
96 } | 97 } |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
151 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) | 152 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) |
152 expected = { | 153 expected = { |
153 'args': ['foo'], | 154 'args': ['foo'], |
154 'a':True, | 155 'a':True, |
155 'stdin': subprocess2.VOID, | 156 'stdin': subprocess2.VOID, |
156 'stdout': subprocess2.PIPE, | 157 'stdout': subprocess2.PIPE, |
157 } | 158 } |
158 self.assertEquals(expected, results) | 159 self.assertEquals(expected, results) |
159 | 160 |
160 | 161 |
161 class S2Test(unittest.TestCase): | 162 class BaseTestCase(unittest.TestCase): |
162 def setUp(self): | 163 def setUp(self): |
163 super(S2Test, self).setUp() | 164 super(BaseTestCase, self).setUp() |
164 self.exe_path = __file__ | 165 self.exe_path = __file__ |
165 self.exe = [sys.executable, self.exe_path, '--child'] | 166 self.exe = [sys.executable, self.exe_path, '--child'] |
166 self.states = {} | 167 self.states = {} |
167 if fcntl: | 168 if fcntl: |
168 for v in (sys.stdin, sys.stdout, sys.stderr): | 169 for v in (sys.stdin, sys.stdout, sys.stderr): |
169 fileno = v.fileno() | 170 fileno = v.fileno() |
170 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) | 171 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) |
171 | 172 |
172 def tearDown(self): | 173 def tearDown(self): |
173 for fileno, fl in self.states.iteritems(): | 174 for fileno, fl in self.states.iteritems(): |
174 self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) | 175 self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) |
175 super(S2Test, self).tearDown() | 176 super(BaseTestCase, self).tearDown() |
176 | 177 |
178 | |
179 class RegressionTest(BaseTestCase): | |
180 # Regression tests to ensure that subprocess and subprocess2 have the same | |
181 # behavior. | |
182 def _run_test(self, function): | |
183 """Runs tests in 12 combinations: | |
184 - LF output with universal_newlines=False | |
185 - CR output with universal_newlines=False | |
186 - CRLF output with universal_newlines=False | |
187 - LF output with universal_newlines=True | |
188 - CR output with universal_newlines=True | |
189 - CRLF output with universal_newlines=True | |
190 | |
191 Once with subprocess, once with subprocess2. | |
192 | |
193 First |function| argument is the convertion for the origianl expected LF | |
Dirk Pranke
2011/11/28 22:31:50
typos: "conversion" and "original".
| |
194 string to the right EOL. | |
195 Second |function| argument is the executable and initial flag to run, to | |
196 control what EOL is used by the child process. | |
197 Third |function| argument is universal_newlines value. | |
198 """ | |
199 noop = lambda x: x | |
200 for subp in (subprocess, subprocess2): | |
201 function(noop, self.exe, False, subp) | |
202 function(convert_to_cr, self.exe + ['--cr'], False, subp) | |
203 function(convert_to_crlf, self.exe + ['--crlf'], False, subp) | |
204 function(noop, self.exe, True, subp) | |
205 function(noop, self.exe + ['--cr'], True, subp) | |
206 function(noop, self.exe + ['--crlf'], True, subp) | |
207 | |
208 def _check_pipes(self, subp, e, stdout, stderr): | |
209 """On exception, look if the exception members are set correctly.""" | |
210 if subp is subprocess: | |
211 # subprocess never save the output. | |
212 self.assertFalse(hasattr(e, 'stdout')) | |
213 self.assertFalse(hasattr(e, 'stderr')) | |
214 elif subp is subprocess2: | |
215 self.assertEquals(stdout, e.stdout) | |
216 self.assertEquals(stderr, e.stderr) | |
217 else: | |
218 self.fail() | |
219 | |
220 def test_check_output_no_stdout(self): | |
221 for subp in (subprocess, subprocess2): | |
222 try: | |
223 subp.check_output(self.exe, stdout=subp.PIPE) | |
224 self.fail() | |
225 except ValueError: | |
226 pass | |
227 | |
228 def test_check_output_throw_stdout(self): | |
229 def fn(c, e, un, subp): | |
230 try: | |
231 subp.check_output( | |
232 e + ['--fail', '--stdout'], universal_newlines=un) | |
233 self.fail() | |
234 except subp.CalledProcessError, e: | |
235 self._check_pipes(subp, e, c('A\nBB\nCCC\n'), None) | |
236 self.assertEquals(64, e.returncode) | |
237 self._run_test(fn) | |
238 | |
239 def test_check_output_throw_no_stderr(self): | |
240 def fn(c, e, un, subp): | |
241 try: | |
242 subp.check_output( | |
243 e + ['--fail', '--stderr'], universal_newlines=un) | |
244 self.fail() | |
245 except subp.CalledProcessError, e: | |
246 self._check_pipes(subp, e, c(''), None) | |
247 self.assertEquals(64, e.returncode) | |
248 self._run_test(fn) | |
249 | |
250 def test_check_output_throw_stderr(self): | |
251 def fn(c, e, un, subp): | |
252 try: | |
253 subp.check_output( | |
254 e + ['--fail', '--stderr'], | |
255 stderr=subp.PIPE, | |
256 universal_newlines=un) | |
257 self.fail() | |
258 except subp.CalledProcessError, e: | |
259 self._check_pipes(subp, e, '', c('a\nbb\nccc\n')) | |
260 self.assertEquals(64, e.returncode) | |
261 self._run_test(fn) | |
262 | |
263 def test_check_output_throw_stderr_stdout(self): | |
264 def fn(c, e, un, subp): | |
265 try: | |
266 subp.check_output( | |
267 e + ['--fail', '--stderr'], | |
268 stderr=subp.STDOUT, | |
269 universal_newlines=un) | |
270 self.fail() | |
271 except subp.CalledProcessError, e: | |
272 self._check_pipes(subp, e, c('a\nbb\nccc\n'), None) | |
273 self.assertEquals(64, e.returncode) | |
274 self._run_test(fn) | |
275 | |
276 def test_check_call_throw(self): | |
277 for subp in (subprocess, subprocess2): | |
278 try: | |
279 subp.check_call(self.exe + ['--fail', '--stderr']) | |
280 self.fail() | |
281 except subp.CalledProcessError, e: | |
282 self._check_pipes(subp, e, None, None) | |
283 self.assertEquals(64, e.returncode) | |
284 | |
285 | |
286 class S2Test(BaseTestCase): | |
287 # Tests that can only run in subprocess2, e.g. new functionalities. | |
288 # In particular, subprocess2.communicate() doesn't exist in subprocess. | |
177 def _run_test(self, function): | 289 def _run_test(self, function): |
178 """Runs tests in 6 combinations: | 290 """Runs tests in 6 combinations: |
179 - LF output with universal_newlines=False | 291 - LF output with universal_newlines=False |
180 - CR output with universal_newlines=False | 292 - CR output with universal_newlines=False |
181 - CRLF output with universal_newlines=False | 293 - CRLF output with universal_newlines=False |
182 - LF output with universal_newlines=True | 294 - LF output with universal_newlines=True |
183 - CR output with universal_newlines=True | 295 - CR output with universal_newlines=True |
184 - CRLF output with universal_newlines=True | 296 - CRLF output with universal_newlines=True |
185 | 297 |
186 First |function| argument is the convertion for the origianl expected LF | 298 First |function| argument is the convertion for the origianl expected LF |
187 string to the right EOL. | 299 string to the right EOL. |
188 Second |function| argument is the executable and initial flag to run, to | 300 Second |function| argument is the executable and initial flag to run, to |
189 control what EOL is used by the child process. | 301 control what EOL is used by the child process. |
190 Third |function| argument is universal_newlines value. | 302 Third |function| argument is universal_newlines value. |
191 """ | 303 """ |
192 noop = lambda x: x | 304 noop = lambda x: x |
193 function(noop, self.exe, False) | 305 function(noop, self.exe, False) |
194 function(convert_to_cr, self.exe + ['--cr'], False) | 306 function(convert_to_cr, self.exe + ['--cr'], False) |
195 function(convert_to_crlf, self.exe + ['--crlf'], False) | 307 function(convert_to_crlf, self.exe + ['--crlf'], False) |
196 function(noop, self.exe, True) | 308 function(noop, self.exe, True) |
197 function(noop, self.exe + ['--cr'], True) | 309 function(noop, self.exe + ['--cr'], True) |
198 function(noop, self.exe + ['--crlf'], True) | 310 function(noop, self.exe + ['--crlf'], True) |
199 | 311 |
200 def test_timeout(self): | 312 def test_timeout(self): |
201 out, returncode = subprocess2.communicate( | 313 # timeout doesn't exist in subprocess. |
202 self.exe + ['--sleep_first', '--stdout'], | 314 def fn(c, e, un): |
203 timeout=0.01, | 315 out, returncode = subprocess2.communicate( |
204 stdout=subprocess2.PIPE, | 316 self.exe + ['--sleep_first', '--stdout'], |
205 shell=False) | 317 timeout=0.01, |
206 self.assertEquals(subprocess2.TIMED_OUT, returncode) | 318 stdout=subprocess2.PIPE, |
207 self.assertEquals(('', None), out) | 319 shell=False) |
208 | 320 self.assertEquals(subprocess2.TIMED_OUT, returncode) |
209 def test_check_output_no_stdout(self): | 321 self.assertEquals(('', None), out) |
210 try: | 322 self._run_test(fn) |
211 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) | |
212 self.fail() | |
213 except TypeError: | |
214 pass | |
215 | 323 |
216 def test_stdout_void(self): | 324 def test_stdout_void(self): |
217 def fn(c, e, un): | 325 def fn(c, e, un): |
218 (out, err), code = subprocess2.communicate( | 326 (out, err), code = subprocess2.communicate( |
219 e + ['--stdout', '--stderr'], | 327 e + ['--stdout', '--stderr'], |
220 stdout=subprocess2.VOID, | 328 stdout=subprocess2.VOID, |
221 stderr=subprocess2.PIPE, | 329 stderr=subprocess2.PIPE, |
222 universal_newlines=un) | 330 universal_newlines=un) |
223 self.assertEquals(None, out) | 331 self.assertEquals(None, out) |
224 self.assertEquals(c('a\nbb\nccc\n'), err) | 332 self.assertEquals(c('a\nbb\nccc\n'), err) |
(...skipping 30 matching lines...) Expand all Loading... | |
255 (out, err), code = subprocess2.communicate( | 363 (out, err), code = subprocess2.communicate( |
256 e + ['--stderr'], | 364 e + ['--stderr'], |
257 stderr=subprocess2.STDOUT, | 365 stderr=subprocess2.STDOUT, |
258 universal_newlines=un) | 366 universal_newlines=un) |
259 # stderr output into stdout but stdout is not piped. | 367 # stderr output into stdout but stdout is not piped. |
260 self.assertEquals(None, out) | 368 self.assertEquals(None, out) |
261 self.assertEquals(None, err) | 369 self.assertEquals(None, err) |
262 self.assertEquals(0, code) | 370 self.assertEquals(0, code) |
263 self._run_test(fn) | 371 self._run_test(fn) |
264 | 372 |
265 def test_check_output_throw_stdout(self): | |
266 def fn(c, e, un): | |
267 try: | |
268 subprocess2.check_output( | |
269 e + ['--fail', '--stdout'], universal_newlines=un) | |
270 self.fail() | |
271 except subprocess2.CalledProcessError, e: | |
272 self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) | |
273 self.assertEquals(None, e.stderr) | |
274 self.assertEquals(64, e.returncode) | |
275 self._run_test(fn) | |
276 | |
277 def test_check_output_throw_no_stderr(self): | |
278 def fn(c, e, un): | |
279 try: | |
280 subprocess2.check_output( | |
281 e + ['--fail', '--stderr'], universal_newlines=un) | |
282 self.fail() | |
283 except subprocess2.CalledProcessError, e: | |
284 self.assertEquals(c(''), e.stdout) | |
285 self.assertEquals(None, e.stderr) | |
286 self.assertEquals(64, e.returncode) | |
287 self._run_test(fn) | |
288 | |
289 def test_check_output_throw_stderr(self): | |
290 def fn(c, e, un): | |
291 try: | |
292 subprocess2.check_output( | |
293 e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, | |
294 universal_newlines=un) | |
295 self.fail() | |
296 except subprocess2.CalledProcessError, e: | |
297 self.assertEquals('', e.stdout) | |
298 self.assertEquals(c('a\nbb\nccc\n'), e.stderr) | |
299 self.assertEquals(64, e.returncode) | |
300 self._run_test(fn) | |
301 | |
302 def test_check_output_throw_stderr_stdout(self): | |
303 def fn(c, e, un): | |
304 try: | |
305 subprocess2.check_output( | |
306 e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, | |
307 universal_newlines=un) | |
308 self.fail() | |
309 except subprocess2.CalledProcessError, e: | |
310 self.assertEquals(c('a\nbb\nccc\n'), e.stdout) | |
311 self.assertEquals(None, e.stderr) | |
312 self.assertEquals(64, e.returncode) | |
313 self._run_test(fn) | |
314 | |
315 def test_check_call_throw(self): | |
316 try: | |
317 subprocess2.check_call(self.exe + ['--fail', '--stderr']) | |
318 self.fail() | |
319 except subprocess2.CalledProcessError, e: | |
320 self.assertEquals(None, e.stdout) | |
321 self.assertEquals(None, e.stderr) | |
322 self.assertEquals(64, e.returncode) | |
323 | |
324 | 373 |
325 def child_main(args): | 374 def child_main(args): |
326 if sys.platform == 'win32': | 375 if sys.platform == 'win32': |
327 # Annoying, make sure the output is not translated on Windows. | 376 # Annoying, make sure the output is not translated on Windows. |
328 # pylint: disable=E1101,F0401 | 377 # pylint: disable=E1101,F0401 |
329 import msvcrt | 378 import msvcrt |
330 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) | 379 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) |
331 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) | 380 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) |
332 | 381 |
333 parser = optparse.OptionParser() | 382 parser = optparse.OptionParser() |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
379 return options.return_value | 428 return options.return_value |
380 | 429 |
381 | 430 |
382 if __name__ == '__main__': | 431 if __name__ == '__main__': |
383 logging.basicConfig(level= | 432 logging.basicConfig(level= |
384 [logging.WARNING, logging.INFO, logging.DEBUG][ | 433 [logging.WARNING, logging.INFO, logging.DEBUG][ |
385 min(2, sys.argv.count('-v'))]) | 434 min(2, sys.argv.count('-v'))]) |
386 if len(sys.argv) > 1 and sys.argv[1] == '--child': | 435 if len(sys.argv) > 1 and sys.argv[1] == '--child': |
387 sys.exit(child_main(sys.argv[2:])) | 436 sys.exit(child_main(sys.argv[2:])) |
388 unittest.main() | 437 unittest.main() |
OLD | NEW |