OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Unit tests for subprocess2.py.""" | 6 """Unit tests for subprocess2.py.""" |
7 | 7 |
8 import logging | |
9 import optparse | 8 import optparse |
10 import os | 9 import os |
11 import sys | 10 import sys |
12 import time | 11 import time |
13 import unittest | 12 import unittest |
14 | 13 |
15 try: | |
16 import fcntl | |
17 except ImportError: | |
18 fcntl = None | |
19 | |
20 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | 14 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
21 sys.path.insert(0, ROOT_DIR) | 15 sys.path.insert(0, ROOT_DIR) |
22 | 16 |
23 import subprocess2 | 17 import subprocess2 |
24 | 18 |
25 # Method could be a function | 19 # Method could be a function |
26 # pylint: disable=R0201 | 20 # pylint: disable=R0201 |
27 | 21 |
28 | 22 class Subprocess2Test(unittest.TestCase): |
29 def convert_to_crlf(string): | |
30 """Unconditionally convert LF to CRLF.""" | |
31 return string.replace('\n', '\r\n') | |
32 | |
33 | |
34 def convert_to_cr(string): | |
35 """Unconditionally convert LF to CR.""" | |
36 return string.replace('\n', '\r') | |
37 | |
38 | |
39 def convert_win(string): | |
40 """Converts string to CRLF on Windows only.""" | |
41 if sys.platform == 'win32': | |
42 return string.replace('\n', '\r\n') | |
43 return string | |
44 | |
45 | |
46 class DefaultsTest(unittest.TestCase): | |
47 # Can be mocked in a test. | 23 # Can be mocked in a test. |
48 TO_SAVE = { | 24 TO_SAVE = { |
49 subprocess2: [ | 25 subprocess2: [ |
50 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], | 26 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], |
51 subprocess2.subprocess: ['Popen'], | 27 subprocess2.subprocess: ['Popen'], |
52 } | 28 } |
53 | 29 |
54 def setUp(self): | 30 def setUp(self): |
| 31 self.exe_path = __file__ |
| 32 self.exe = [sys.executable, self.exe_path, '--child'] |
55 self.saved = {} | 33 self.saved = {} |
56 for module, names in self.TO_SAVE.iteritems(): | 34 for module, names in self.TO_SAVE.iteritems(): |
57 self.saved[module] = dict( | 35 self.saved[module] = dict( |
58 (name, getattr(module, name)) for name in names) | 36 (name, getattr(module, name)) for name in names) |
59 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they | 37 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they |
60 # can be trapped in the child process for better coverage. | 38 # can be trapped in the child process for better coverage. |
61 | 39 |
62 def tearDown(self): | 40 def tearDown(self): |
63 for module, saved in self.saved.iteritems(): | 41 for module, saved in self.saved.iteritems(): |
64 for name, value in saved.iteritems(): | 42 for name, value in saved.iteritems(): |
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
161 # fake_communicate() doesn't 'implement' that. | 139 # fake_communicate() doesn't 'implement' that. |
162 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) | 140 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) |
163 expected = { | 141 expected = { |
164 'args': ['foo'], | 142 'args': ['foo'], |
165 'a':True, | 143 'a':True, |
166 'stdin': subprocess2.VOID, | 144 'stdin': subprocess2.VOID, |
167 'stdout': subprocess2.PIPE, | 145 'stdout': subprocess2.PIPE, |
168 } | 146 } |
169 self.assertEquals(expected, results) | 147 self.assertEquals(expected, results) |
170 | 148 |
171 def test_timeout_shell_throws(self): | |
172 # Never called. | |
173 _ = self._fake_Popen() | |
174 try: | |
175 subprocess2.communicate( | |
176 sys.executable, | |
177 timeout=0.01, | |
178 stdout=subprocess2.PIPE, | |
179 shell=True) | |
180 self.fail() | |
181 except TypeError: | |
182 pass | |
183 | |
184 | |
185 class S2Test(unittest.TestCase): | |
186 def setUp(self): | |
187 super(S2Test, self).setUp() | |
188 self.exe_path = __file__ | |
189 self.exe = [sys.executable, self.exe_path, '--child'] | |
190 self.states = {} | |
191 if fcntl: | |
192 for v in (sys.stdin, sys.stdout, sys.stderr): | |
193 fileno = v.fileno() | |
194 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) | |
195 | |
196 def tearDown(self): | |
197 for fileno, fl in self.states.iteritems(): | |
198 self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) | |
199 super(S2Test, self).tearDown() | |
200 | |
201 def _run_test(self, function): | |
202 """Runs tests in 6 combinations: | |
203 - LF output with universal_newlines=False | |
204 - CR output with universal_newlines=False | |
205 - CRLF output with universal_newlines=False | |
206 - LF output with universal_newlines=True | |
207 - CR output with universal_newlines=True | |
208 - CRLF output with universal_newlines=True | |
209 | |
210 First |function| argument is the convertion for the origianl expected LF | |
211 string to the right EOL. | |
212 Second |function| argument is the executable and initial flag to run, to | |
213 control what EOL is used by the child process. | |
214 Third |function| argument is universal_newlines value. | |
215 """ | |
216 noop = lambda x: x | |
217 function(noop, self.exe, False) | |
218 function(convert_to_cr, self.exe + ['--cr'], False) | |
219 function(convert_to_crlf, self.exe + ['--crlf'], False) | |
220 function(noop, self.exe, True) | |
221 function(noop, self.exe + ['--cr'], True) | |
222 function(noop, self.exe + ['--crlf'], True) | |
223 | |
224 def test_timeout(self): | 149 def test_timeout(self): |
| 150 # It'd be better to not discard stdout. |
225 out, returncode = subprocess2.communicate( | 151 out, returncode = subprocess2.communicate( |
226 self.exe + ['--sleep_first', '--stdout'], | 152 self.exe + ['--sleep', '--stdout'], |
227 timeout=0.01, | 153 timeout=0.01, |
228 stdout=subprocess2.PIPE, | 154 stdout=subprocess2.PIPE) |
229 shell=False) | |
230 self.assertEquals(subprocess2.TIMED_OUT, returncode) | 155 self.assertEquals(subprocess2.TIMED_OUT, returncode) |
231 self.assertEquals(('', None), out) | 156 self.assertEquals(['', None], out) |
232 | 157 |
233 def test_check_output_no_stdout(self): | 158 def test_check_output_no_stdout(self): |
234 try: | 159 try: |
235 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) | 160 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) |
236 self.fail() | 161 self.fail() |
237 except TypeError: | 162 except TypeError: |
238 pass | 163 pass |
239 | 164 |
240 def test_stdout_void(self): | 165 def test_stdout_void(self): |
241 def fn(c, e, un): | 166 (out, err), code = subprocess2.communicate( |
242 (out, err), code = subprocess2.communicate( | 167 self.exe + ['--stdout', '--stderr'], |
243 e + ['--stdout', '--stderr'], | 168 stdout=subprocess2.VOID, |
244 stdout=subprocess2.VOID, | 169 stderr=subprocess2.PIPE) |
245 stderr=subprocess2.PIPE, | 170 self.assertEquals(None, out) |
246 universal_newlines=un) | 171 expected = 'a\nbb\nccc\n' |
247 self.assertEquals(None, out) | 172 if sys.platform == 'win32': |
248 self.assertEquals(c('a\nbb\nccc\n'), err) | 173 expected = expected.replace('\n', '\r\n') |
249 self.assertEquals(0, code) | 174 self.assertEquals(expected, err) |
250 self._run_test(fn) | 175 self.assertEquals(0, code) |
251 | 176 |
252 def test_stderr_void(self): | 177 def test_stderr_void(self): |
253 def fn(c, e, un): | 178 (out, err), code = subprocess2.communicate( |
254 (out, err), code = subprocess2.communicate( | 179 self.exe + ['--stdout', '--stderr'], |
255 e + ['--stdout', '--stderr'], | 180 universal_newlines=True, |
256 stdout=subprocess2.PIPE, | 181 stdout=subprocess2.PIPE, |
257 stderr=subprocess2.VOID, | 182 stderr=subprocess2.VOID) |
258 universal_newlines=un) | 183 self.assertEquals('A\nBB\nCCC\n', out) |
259 self.assertEquals(c('A\nBB\nCCC\n'), out) | 184 self.assertEquals(None, err) |
260 self.assertEquals(None, err) | 185 self.assertEquals(0, code) |
261 self.assertEquals(0, code) | |
262 self._run_test(fn) | |
263 | 186 |
264 def test_check_output_throw_stdout(self): | 187 def test_check_output_throw_stdout(self): |
265 def fn(c, e, un): | 188 try: |
266 try: | 189 subprocess2.check_output( |
267 subprocess2.check_output( | 190 self.exe + ['--fail', '--stdout'], universal_newlines=True) |
268 e + ['--fail', '--stdout'], universal_newlines=un) | 191 self.fail() |
269 self.fail() | 192 except subprocess2.CalledProcessError, e: |
270 except subprocess2.CalledProcessError, e: | 193 self.assertEquals('A\nBB\nCCC\n', e.stdout) |
271 self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) | 194 self.assertEquals(None, e.stderr) |
272 self.assertEquals(None, e.stderr) | 195 self.assertEquals(64, e.returncode) |
273 self.assertEquals(64, e.returncode) | |
274 self._run_test(fn) | |
275 | 196 |
276 def test_check_output_throw_no_stderr(self): | 197 def test_check_output_throw_no_stderr(self): |
277 def fn(c, e, un): | 198 try: |
278 try: | 199 subprocess2.check_output( |
279 subprocess2.check_output( | 200 self.exe + ['--fail', '--stderr'], universal_newlines=True) |
280 e + ['--fail', '--stderr'], universal_newlines=un) | 201 self.fail() |
281 self.fail() | 202 except subprocess2.CalledProcessError, e: |
282 except subprocess2.CalledProcessError, e: | 203 self.assertEquals('', e.stdout) |
283 self.assertEquals(c(''), e.stdout) | 204 self.assertEquals(None, e.stderr) |
284 self.assertEquals(None, e.stderr) | 205 self.assertEquals(64, e.returncode) |
285 self.assertEquals(64, e.returncode) | |
286 self._run_test(fn) | |
287 | 206 |
288 def test_check_output_throw_stderr(self): | 207 def test_check_output_throw_stderr(self): |
289 def fn(c, e, un): | 208 try: |
290 try: | 209 subprocess2.check_output( |
291 subprocess2.check_output( | 210 self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE, |
292 e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, | 211 universal_newlines=True) |
293 universal_newlines=un) | 212 self.fail() |
294 self.fail() | 213 except subprocess2.CalledProcessError, e: |
295 except subprocess2.CalledProcessError, e: | 214 self.assertEquals('', e.stdout) |
296 self.assertEquals('', e.stdout) | 215 self.assertEquals('a\nbb\nccc\n', e.stderr) |
297 self.assertEquals(c('a\nbb\nccc\n'), e.stderr) | 216 self.assertEquals(64, e.returncode) |
298 self.assertEquals(64, e.returncode) | |
299 self._run_test(fn) | |
300 | 217 |
301 def test_check_output_throw_stderr_stdout(self): | 218 def test_check_output_throw_stderr_stdout(self): |
302 def fn(c, e, un): | 219 try: |
303 try: | 220 subprocess2.check_output( |
304 subprocess2.check_output( | 221 self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, |
305 e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, | 222 universal_newlines=True) |
306 universal_newlines=un) | 223 self.fail() |
307 self.fail() | 224 except subprocess2.CalledProcessError, e: |
308 except subprocess2.CalledProcessError, e: | 225 self.assertEquals('a\nbb\nccc\n', e.stdout) |
309 self.assertEquals(c('a\nbb\nccc\n'), e.stdout) | 226 self.assertEquals(None, e.stderr) |
310 self.assertEquals(None, e.stderr) | 227 self.assertEquals(64, e.returncode) |
311 self.assertEquals(64, e.returncode) | |
312 self._run_test(fn) | |
313 | 228 |
314 def test_check_call_throw(self): | 229 def test_check_call_throw(self): |
315 try: | 230 try: |
316 subprocess2.check_call(self.exe + ['--fail', '--stderr']) | 231 subprocess2.check_call(self.exe + ['--fail', '--stderr']) |
317 self.fail() | 232 self.fail() |
318 except subprocess2.CalledProcessError, e: | 233 except subprocess2.CalledProcessError, e: |
319 self.assertEquals(None, e.stdout) | 234 self.assertEquals(None, e.stdout) |
320 self.assertEquals(None, e.stderr) | 235 self.assertEquals(None, e.stderr) |
321 self.assertEquals(64, e.returncode) | 236 self.assertEquals(64, e.returncode) |
322 | 237 |
323 def test_check_output_tee_stderr(self): | |
324 def fn(c, e, un): | |
325 stderr = [] | |
326 out, returncode = subprocess2.communicate( | |
327 e + ['--stderr'], stderr=stderr.append, | |
328 universal_newlines=un) | |
329 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) | |
330 self.assertEquals((None, None), out) | |
331 self.assertEquals(0, returncode) | |
332 self._run_test(fn) | |
333 | |
334 def test_check_output_tee_stdout_stderr(self): | |
335 def fn(c, e, un): | |
336 stdout = [] | |
337 stderr = [] | |
338 out, returncode = subprocess2.communicate( | |
339 e + ['--stdout', '--stderr'], | |
340 stdout=stdout.append, | |
341 stderr=stderr.append, | |
342 universal_newlines=un) | |
343 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) | |
344 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) | |
345 self.assertEquals((None, None), out) | |
346 self.assertEquals(0, returncode) | |
347 self._run_test(fn) | |
348 | |
349 def test_check_output_tee_stdin(self): | |
350 def fn(c, e, un): | |
351 stdout = [] | |
352 stdin = '0123456789' | |
353 out, returncode = subprocess2.communicate( | |
354 e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append, | |
355 universal_newlines=un) | |
356 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) | |
357 self.assertEquals((None, None), out) | |
358 self.assertEquals(0, returncode) | |
359 self._run_test(fn) | |
360 | |
361 def test_check_output_tee_throw(self): | |
362 def fn(c, e, un): | |
363 stderr = [] | |
364 try: | |
365 subprocess2.check_output( | |
366 e + ['--stderr', '--fail'], stderr=stderr.append, | |
367 universal_newlines=un) | |
368 self.fail() | |
369 except subprocess2.CalledProcessError, e: | |
370 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) | |
371 self.assertEquals('', e.stdout) | |
372 self.assertEquals(None, e.stderr) | |
373 self.assertEquals(64, e.returncode) | |
374 self._run_test(fn) | |
375 | |
376 def test_check_output_tee_large(self): | |
377 stdout = [] | |
378 # Read 128kb. On my workstation it takes >2s. Welcome to 2011. | |
379 out, returncode = subprocess2.communicate( | |
380 self.exe + ['--large'], stdout=stdout.append) | |
381 self.assertEquals(128*1024, len(''.join(stdout))) | |
382 self.assertEquals((None, None), out) | |
383 self.assertEquals(0, returncode) | |
384 | |
385 def test_check_output_tee_large_stdin(self): | |
386 stdout = [] | |
387 # Write 128kb. | |
388 stdin = '0123456789abcdef' * (8*1024) | |
389 out, returncode = subprocess2.communicate( | |
390 self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) | |
391 self.assertEquals(128*1024, len(''.join(stdout))) | |
392 self.assertEquals((None, None), out) | |
393 self.assertEquals(0, returncode) | |
394 | |
395 | 238 |
396 def child_main(args): | 239 def child_main(args): |
397 if sys.platform == 'win32': | |
398 # Annoying, make sure the output is not translated on Windows. | |
399 # pylint: disable=E1101,F0401 | |
400 import msvcrt | |
401 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) | |
402 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) | |
403 | |
404 parser = optparse.OptionParser() | 240 parser = optparse.OptionParser() |
405 parser.add_option( | 241 parser.add_option( |
406 '--fail', | 242 '--fail', |
407 dest='return_value', | 243 dest='return_value', |
408 action='store_const', | 244 action='store_const', |
409 default=0, | 245 default=0, |
410 const=64) | 246 const=64) |
411 parser.add_option( | |
412 '--crlf', action='store_const', const='\r\n', dest='eol', default='\n') | |
413 parser.add_option( | |
414 '--cr', action='store_const', const='\r', dest='eol') | |
415 parser.add_option('--stdout', action='store_true') | 247 parser.add_option('--stdout', action='store_true') |
416 parser.add_option('--stderr', action='store_true') | 248 parser.add_option('--stderr', action='store_true') |
417 parser.add_option('--sleep_first', action='store_true') | 249 parser.add_option('--sleep', action='store_true') |
418 parser.add_option('--sleep_last', action='store_true') | |
419 parser.add_option('--large', action='store_true') | |
420 parser.add_option('--read', action='store_true') | |
421 options, args = parser.parse_args(args) | 250 options, args = parser.parse_args(args) |
422 if args: | 251 if args: |
423 parser.error('Internal error') | 252 parser.error('Internal error') |
424 if options.sleep_first: | |
425 time.sleep(10) | |
426 | 253 |
427 def do(string): | 254 def do(string): |
428 if options.stdout: | 255 if options.stdout: |
429 sys.stdout.write(string.upper()) | 256 print >> sys.stdout, string.upper() |
430 sys.stdout.write(options.eol) | |
431 if options.stderr: | 257 if options.stderr: |
432 sys.stderr.write(string.lower()) | 258 print >> sys.stderr, string.lower() |
433 sys.stderr.write(options.eol) | |
434 | 259 |
435 do('A') | 260 do('A') |
436 do('BB') | 261 do('BB') |
437 do('CCC') | 262 do('CCC') |
438 if options.large: | 263 if options.sleep: |
439 # Print 128kb. | |
440 string = '0123456789abcdef' * (8*1024) | |
441 sys.stdout.write(string) | |
442 if options.read: | |
443 try: | |
444 while sys.stdin.read(): | |
445 pass | |
446 except OSError: | |
447 pass | |
448 if options.sleep_last: | |
449 time.sleep(10) | 264 time.sleep(10) |
450 return options.return_value | 265 return options.return_value |
451 | 266 |
452 | 267 |
453 if __name__ == '__main__': | 268 if __name__ == '__main__': |
454 logging.basicConfig(level= | |
455 [logging.WARNING, logging.INFO, logging.DEBUG][ | |
456 min(2, sys.argv.count('-v'))]) | |
457 if len(sys.argv) > 1 and sys.argv[1] == '--child': | 269 if len(sys.argv) > 1 and sys.argv[1] == '--child': |
458 sys.exit(child_main(sys.argv[2:])) | 270 sys.exit(child_main(sys.argv[2:])) |
459 unittest.main() | 271 unittest.main() |
OLD | NEW |