OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Unit tests for subprocess2.py.""" | 6 """Unit tests for subprocess2.py.""" |
7 | 7 |
8 import logging | 8 import logging |
9 import optparse | 9 import optparse |
10 import os | 10 import os |
11 import sys | 11 import sys |
12 import time | 12 import time |
13 import unittest | 13 import unittest |
14 | 14 |
15 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | 15 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
16 sys.path.insert(0, ROOT_DIR) | 16 sys.path.insert(0, ROOT_DIR) |
17 | 17 |
18 import subprocess2 | 18 import subprocess2 |
19 | 19 |
20 # Method could be a function | 20 # Method could be a function |
21 # pylint: disable=R0201 | 21 # pylint: disable=R0201 |
22 | 22 |
23 | 23 |
24 def convert(string): | 24 def convert_to_crlf(string): |
25 """Converts string to CRLF on Windows.""" | 25 """Unconditionally convert LF to CRLF.""" |
| 26 return string.replace('\n', '\r\n') |
| 27 |
| 28 |
| 29 def convert_to_cr(string): |
| 30 """Unconditionally convert LF to CR.""" |
| 31 return string.replace('\n', '\r') |
| 32 |
| 33 |
| 34 def convert_win(string): |
| 35 """Converts string to CRLF on Windows only.""" |
26 if sys.platform == 'win32': | 36 if sys.platform == 'win32': |
27 return string.replace('\n', '\r\n') | 37 return string.replace('\n', '\r\n') |
28 return string | 38 return string |
29 | 39 |
30 | 40 |
31 class Subprocess2Test(unittest.TestCase): | 41 class DefaultsTest(unittest.TestCase): |
32 # Can be mocked in a test. | 42 # Can be mocked in a test. |
33 TO_SAVE = { | 43 TO_SAVE = { |
34 subprocess2: [ | 44 subprocess2: [ |
35 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], | 45 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], |
36 subprocess2.subprocess: ['Popen'], | 46 subprocess2.subprocess: ['Popen'], |
37 } | 47 } |
38 | 48 |
39 def setUp(self): | 49 def setUp(self): |
40 self.exe_path = __file__ | |
41 self.exe = [sys.executable, self.exe_path, '--child'] | |
42 self.saved = {} | 50 self.saved = {} |
43 for module, names in self.TO_SAVE.iteritems(): | 51 for module, names in self.TO_SAVE.iteritems(): |
44 self.saved[module] = dict( | 52 self.saved[module] = dict( |
45 (name, getattr(module, name)) for name in names) | 53 (name, getattr(module, name)) for name in names) |
46 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they | 54 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they |
47 # can be trapped in the child process for better coverage. | 55 # can be trapped in the child process for better coverage. |
48 | 56 |
49 def tearDown(self): | 57 def tearDown(self): |
50 for module, saved in self.saved.iteritems(): | 58 for module, saved in self.saved.iteritems(): |
51 for name, value in saved.iteritems(): | 59 for name, value in saved.iteritems(): |
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
148 # fake_communicate() doesn't 'implement' that. | 156 # fake_communicate() doesn't 'implement' that. |
149 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) | 157 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) |
150 expected = { | 158 expected = { |
151 'args': ['foo'], | 159 'args': ['foo'], |
152 'a':True, | 160 'a':True, |
153 'stdin': subprocess2.VOID, | 161 'stdin': subprocess2.VOID, |
154 'stdout': subprocess2.PIPE, | 162 'stdout': subprocess2.PIPE, |
155 } | 163 } |
156 self.assertEquals(expected, results) | 164 self.assertEquals(expected, results) |
157 | 165 |
158 def test_timeout(self): | |
159 out, returncode = subprocess2.communicate( | |
160 self.exe + ['--sleep', '--stdout'], | |
161 timeout=0.01, | |
162 stdout=subprocess2.PIPE, | |
163 shell=False) | |
164 self.assertEquals(subprocess2.TIMED_OUT, returncode) | |
165 self.assertEquals(('', None), out) | |
166 | |
167 def test_timeout_shell_throws(self): | 166 def test_timeout_shell_throws(self): |
| 167 # Never called. |
| 168 _ = self._fake_Popen() |
168 try: | 169 try: |
169 subprocess2.communicate( | 170 subprocess2.communicate( |
170 self.exe + ['--sleep', '--stdout'], | 171 sys.executable, |
171 timeout=0.01, | 172 timeout=0.01, |
172 stdout=subprocess2.PIPE, | 173 stdout=subprocess2.PIPE, |
173 shell=True) | 174 shell=True) |
174 self.fail() | 175 self.fail() |
175 except TypeError: | 176 except TypeError: |
176 pass | 177 pass |
177 | 178 |
| 179 |
| 180 class S2Test(unittest.TestCase): |
| 181 def setUp(self): |
| 182 super(S2Test, self).setUp() |
| 183 self.exe_path = __file__ |
| 184 self.exe = [sys.executable, self.exe_path, '--child'] |
| 185 |
| 186 def _run_test(self, function): |
| 187 """Runs tests in 6 combinations: |
| 188 - LF output with universal_newlines=False |
| 189 - CR output with universal_newlines=False |
| 190 - CRLF output with universal_newlines=False |
| 191 - LF output with universal_newlines=True |
| 192 - CR output with universal_newlines=True |
| 193 - CRLF output with universal_newlines=True |
| 194 |
| 195 First |function| argument is the convertion for the origianl expected LF |
| 196 string to the right EOL. |
| 197 Second |function| argument is the executable and initial flag to run, to |
| 198 control what EOL is used by the child process. |
| 199 Third |function| argument is universal_newlines value. |
| 200 """ |
| 201 noop = lambda x: x |
| 202 function(noop, self.exe, False) |
| 203 function(convert_to_cr, self.exe + ['--cr'], False) |
| 204 function(convert_to_crlf, self.exe + ['--crlf'], False) |
| 205 function(noop, self.exe, True) |
| 206 function(noop, self.exe + ['--cr'], True) |
| 207 function(noop, self.exe + ['--crlf'], True) |
| 208 |
| 209 def test_timeout(self): |
| 210 out, returncode = subprocess2.communicate( |
| 211 self.exe + ['--sleep_first', '--stdout'], |
| 212 timeout=0.01, |
| 213 stdout=subprocess2.PIPE, |
| 214 shell=False) |
| 215 self.assertEquals(subprocess2.TIMED_OUT, returncode) |
| 216 self.assertEquals(('', None), out) |
| 217 |
178 def test_check_output_no_stdout(self): | 218 def test_check_output_no_stdout(self): |
179 try: | 219 try: |
180 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) | 220 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) |
181 self.fail() | 221 self.fail() |
182 except TypeError: | 222 except TypeError: |
183 pass | 223 pass |
184 | 224 |
185 def test_stdout_void(self): | 225 def test_stdout_void(self): |
186 (out, err), code = subprocess2.communicate( | 226 def fn(c, e, un): |
187 self.exe + ['--stdout', '--stderr'], | 227 (out, err), code = subprocess2.communicate( |
188 stdout=subprocess2.VOID, | 228 e + ['--stdout', '--stderr'], |
189 stderr=subprocess2.PIPE) | 229 stdout=subprocess2.VOID, |
190 self.assertEquals(None, out) | 230 stderr=subprocess2.PIPE, |
191 self.assertEquals(convert('a\nbb\nccc\n'), err) | 231 universal_newlines=un) |
192 self.assertEquals(0, code) | 232 self.assertEquals(None, out) |
| 233 self.assertEquals(c('a\nbb\nccc\n'), err) |
| 234 self.assertEquals(0, code) |
| 235 self._run_test(fn) |
193 | 236 |
194 def test_stderr_void(self): | 237 def test_stderr_void(self): |
195 (out, err), code = subprocess2.communicate( | 238 def fn(c, e, un): |
196 self.exe + ['--stdout', '--stderr'], | 239 (out, err), code = subprocess2.communicate( |
197 universal_newlines=True, | 240 e + ['--stdout', '--stderr'], |
198 stdout=subprocess2.PIPE, | 241 stdout=subprocess2.PIPE, |
199 stderr=subprocess2.VOID) | 242 stderr=subprocess2.VOID, |
200 self.assertEquals('A\nBB\nCCC\n', out) | 243 universal_newlines=un) |
201 self.assertEquals(None, err) | 244 self.assertEquals(c('A\nBB\nCCC\n'), out) |
202 self.assertEquals(0, code) | 245 self.assertEquals(None, err) |
| 246 self.assertEquals(0, code) |
| 247 self._run_test(fn) |
203 | 248 |
204 def test_check_output_throw_stdout(self): | 249 def test_check_output_throw_stdout(self): |
205 try: | 250 def fn(c, e, un): |
206 subprocess2.check_output( | 251 try: |
207 self.exe + ['--fail', '--stdout'], universal_newlines=True) | 252 subprocess2.check_output( |
208 self.fail() | 253 e + ['--fail', '--stdout'], universal_newlines=un) |
209 except subprocess2.CalledProcessError, e: | 254 self.fail() |
210 self.assertEquals('A\nBB\nCCC\n', e.stdout) | 255 except subprocess2.CalledProcessError, e: |
211 self.assertEquals(None, e.stderr) | 256 self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) |
212 self.assertEquals(64, e.returncode) | 257 self.assertEquals(None, e.stderr) |
| 258 self.assertEquals(64, e.returncode) |
| 259 self._run_test(fn) |
213 | 260 |
214 def test_check_output_throw_no_stderr(self): | 261 def test_check_output_throw_no_stderr(self): |
215 try: | 262 def fn(c, e, un): |
216 subprocess2.check_output( | 263 try: |
217 self.exe + ['--fail', '--stderr'], universal_newlines=True) | 264 subprocess2.check_output( |
218 self.fail() | 265 e + ['--fail', '--stderr'], universal_newlines=un) |
219 except subprocess2.CalledProcessError, e: | 266 self.fail() |
220 self.assertEquals('', e.stdout) | 267 except subprocess2.CalledProcessError, e: |
221 self.assertEquals(None, e.stderr) | 268 self.assertEquals(c(''), e.stdout) |
222 self.assertEquals(64, e.returncode) | 269 self.assertEquals(None, e.stderr) |
| 270 self.assertEquals(64, e.returncode) |
| 271 self._run_test(fn) |
223 | 272 |
224 def test_check_output_throw_stderr(self): | 273 def test_check_output_throw_stderr(self): |
225 try: | 274 def fn(c, e, un): |
226 subprocess2.check_output( | 275 try: |
227 self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE, | 276 subprocess2.check_output( |
228 universal_newlines=True) | 277 e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, |
229 self.fail() | 278 universal_newlines=un) |
230 except subprocess2.CalledProcessError, e: | 279 self.fail() |
231 self.assertEquals('', e.stdout) | 280 except subprocess2.CalledProcessError, e: |
232 self.assertEquals('a\nbb\nccc\n', e.stderr) | 281 self.assertEquals('', e.stdout) |
233 self.assertEquals(64, e.returncode) | 282 self.assertEquals(c('a\nbb\nccc\n'), e.stderr) |
| 283 self.assertEquals(64, e.returncode) |
| 284 self._run_test(fn) |
234 | 285 |
235 def test_check_output_throw_stderr_stdout(self): | 286 def test_check_output_throw_stderr_stdout(self): |
236 try: | 287 def fn(c, e, un): |
237 subprocess2.check_output( | 288 try: |
238 self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, | 289 subprocess2.check_output( |
239 universal_newlines=True) | 290 e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, |
240 self.fail() | 291 universal_newlines=un) |
241 except subprocess2.CalledProcessError, e: | 292 self.fail() |
242 self.assertEquals('a\nbb\nccc\n', e.stdout) | 293 except subprocess2.CalledProcessError, e: |
243 self.assertEquals(None, e.stderr) | 294 self.assertEquals(c('a\nbb\nccc\n'), e.stdout) |
244 self.assertEquals(64, e.returncode) | 295 self.assertEquals(None, e.stderr) |
| 296 self.assertEquals(64, e.returncode) |
| 297 self._run_test(fn) |
245 | 298 |
246 def test_check_call_throw(self): | 299 def test_check_call_throw(self): |
247 try: | 300 try: |
248 subprocess2.check_call(self.exe + ['--fail', '--stderr']) | 301 subprocess2.check_call(self.exe + ['--fail', '--stderr']) |
249 self.fail() | 302 self.fail() |
250 except subprocess2.CalledProcessError, e: | 303 except subprocess2.CalledProcessError, e: |
251 self.assertEquals(None, e.stdout) | 304 self.assertEquals(None, e.stdout) |
252 self.assertEquals(None, e.stderr) | 305 self.assertEquals(None, e.stderr) |
253 self.assertEquals(64, e.returncode) | 306 self.assertEquals(64, e.returncode) |
254 | 307 |
255 def test_check_output_tee_stderr(self): | 308 def test_check_output_tee_stderr(self): |
256 stderr = [] | 309 def fn(c, e, un): |
257 out, returncode = subprocess2.communicate( | 310 stderr = [] |
258 self.exe + ['--stderr'], stderr=stderr.append) | 311 out, returncode = subprocess2.communicate( |
259 self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) | 312 e + ['--stderr'], stderr=stderr.append, |
260 self.assertEquals((None, None), out) | 313 universal_newlines=un) |
261 self.assertEquals(0, returncode) | 314 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) |
| 315 self.assertEquals((None, None), out) |
| 316 self.assertEquals(0, returncode) |
| 317 self._run_test(fn) |
262 | 318 |
263 def test_check_output_tee_stdout_stderr(self): | 319 def test_check_output_tee_stdout_stderr(self): |
264 stdout = [] | 320 def fn(c, e, un): |
265 stderr = [] | 321 stdout = [] |
266 out, returncode = subprocess2.communicate( | 322 stderr = [] |
267 self.exe + ['--stdout', '--stderr'], | 323 out, returncode = subprocess2.communicate( |
268 stdout=stdout.append, | 324 e + ['--stdout', '--stderr'], |
269 stderr=stderr.append) | 325 stdout=stdout.append, |
270 self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) | 326 stderr=stderr.append, |
271 self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) | 327 universal_newlines=un) |
272 self.assertEquals((None, None), out) | 328 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) |
273 self.assertEquals(0, returncode) | 329 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) |
| 330 self.assertEquals((None, None), out) |
| 331 self.assertEquals(0, returncode) |
| 332 self._run_test(fn) |
274 | 333 |
275 def test_check_output_tee_stdin(self): | 334 def test_check_output_tee_stdin(self): |
276 stdout = [] | 335 def fn(c, e, un): |
277 stdin = '0123456789' | 336 stdout = [] |
278 out, returncode = subprocess2.communicate( | 337 stdin = '0123456789' |
279 self.exe + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append) | 338 out, returncode = subprocess2.communicate( |
280 self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) | 339 e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append, |
281 self.assertEquals((None, None), out) | 340 universal_newlines=un) |
282 self.assertEquals(0, returncode) | 341 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) |
| 342 self.assertEquals((None, None), out) |
| 343 self.assertEquals(0, returncode) |
| 344 self._run_test(fn) |
283 | 345 |
284 def test_check_output_tee_throw(self): | 346 def test_check_output_tee_throw(self): |
285 stderr = [] | 347 def fn(c, e, un): |
286 try: | 348 stderr = [] |
287 subprocess2.check_output( | 349 try: |
288 self.exe + ['--stderr', '--fail'], stderr=stderr.append) | 350 subprocess2.check_output( |
289 self.fail() | 351 e + ['--stderr', '--fail'], stderr=stderr.append, |
290 except subprocess2.CalledProcessError, e: | 352 universal_newlines=un) |
291 self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) | 353 self.fail() |
292 self.assertEquals('', e.stdout) | 354 except subprocess2.CalledProcessError, e: |
293 self.assertEquals(None, e.stderr) | 355 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) |
294 self.assertEquals(64, e.returncode) | 356 self.assertEquals('', e.stdout) |
| 357 self.assertEquals(None, e.stderr) |
| 358 self.assertEquals(64, e.returncode) |
| 359 self._run_test(fn) |
295 | 360 |
296 def test_check_output_tee_large(self): | 361 def test_check_output_tee_large(self): |
297 stdout = [] | 362 stdout = [] |
298 # Read 128kb. On my workstation it takes >2s. Welcome to 2011. | 363 # Read 128kb. On my workstation it takes >2s. Welcome to 2011. |
299 out, returncode = subprocess2.communicate( | 364 out, returncode = subprocess2.communicate( |
300 self.exe + ['--large'], stdout=stdout.append) | 365 self.exe + ['--large'], stdout=stdout.append) |
301 self.assertEquals(128*1024, len(''.join(stdout))) | 366 self.assertEquals(128*1024, len(''.join(stdout))) |
302 self.assertEquals((None, None), out) | 367 self.assertEquals((None, None), out) |
303 self.assertEquals(0, returncode) | 368 self.assertEquals(0, returncode) |
304 | 369 |
305 def test_check_output_tee_large_stdin(self): | 370 def test_check_output_tee_large_stdin(self): |
306 stdout = [] | 371 stdout = [] |
307 # Write 128kb. | 372 # Write 128kb. |
308 stdin = '0123456789abcdef' * (8*1024) | 373 stdin = '0123456789abcdef' * (8*1024) |
309 out, returncode = subprocess2.communicate( | 374 out, returncode = subprocess2.communicate( |
310 self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) | 375 self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) |
311 self.assertEquals(128*1024, len(''.join(stdout))) | 376 self.assertEquals(128*1024, len(''.join(stdout))) |
312 self.assertEquals((None, None), out) | 377 self.assertEquals((None, None), out) |
313 self.assertEquals(0, returncode) | 378 self.assertEquals(0, returncode) |
314 | 379 |
315 | 380 |
316 def child_main(args): | 381 def child_main(args): |
| 382 if sys.platform == 'win32': |
| 383 # Annoying, make sure the output is not translated on Windows. |
| 384 # pylint: disable=E1101,F0401 |
| 385 import msvcrt |
| 386 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) |
| 387 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) |
| 388 |
317 parser = optparse.OptionParser() | 389 parser = optparse.OptionParser() |
318 parser.add_option( | 390 parser.add_option( |
319 '--fail', | 391 '--fail', |
320 dest='return_value', | 392 dest='return_value', |
321 action='store_const', | 393 action='store_const', |
322 default=0, | 394 default=0, |
323 const=64) | 395 const=64) |
| 396 parser.add_option( |
| 397 '--crlf', action='store_const', const='\r\n', dest='eol', default='\n') |
| 398 parser.add_option( |
| 399 '--cr', action='store_const', const='\r', dest='eol') |
324 parser.add_option('--stdout', action='store_true') | 400 parser.add_option('--stdout', action='store_true') |
325 parser.add_option('--stderr', action='store_true') | 401 parser.add_option('--stderr', action='store_true') |
326 parser.add_option('--sleep', action='store_true') | 402 parser.add_option('--sleep_first', action='store_true') |
| 403 parser.add_option('--sleep_last', action='store_true') |
327 parser.add_option('--large', action='store_true') | 404 parser.add_option('--large', action='store_true') |
328 parser.add_option('--read', action='store_true') | 405 parser.add_option('--read', action='store_true') |
329 options, args = parser.parse_args(args) | 406 options, args = parser.parse_args(args) |
330 if args: | 407 if args: |
331 parser.error('Internal error') | 408 parser.error('Internal error') |
332 if options.sleep: | 409 if options.sleep_first: |
333 time.sleep(10) | 410 time.sleep(10) |
334 | 411 |
335 def do(string): | 412 def do(string): |
336 if options.stdout: | 413 if options.stdout: |
337 print >> sys.stdout, string.upper() | 414 sys.stdout.write(string.upper()) |
| 415 sys.stdout.write(options.eol) |
338 if options.stderr: | 416 if options.stderr: |
339 print >> sys.stderr, string.lower() | 417 sys.stderr.write(string.lower()) |
| 418 sys.stderr.write(options.eol) |
340 | 419 |
341 do('A') | 420 do('A') |
342 do('BB') | 421 do('BB') |
343 do('CCC') | 422 do('CCC') |
344 if options.large: | 423 if options.large: |
345 # Print 128kb. | 424 # Print 128kb. |
346 string = '0123456789abcdef' * (8*1024) | 425 string = '0123456789abcdef' * (8*1024) |
347 sys.stdout.write(string) | 426 sys.stdout.write(string) |
348 if options.read: | 427 if options.read: |
349 try: | 428 try: |
350 while sys.stdin.read(): | 429 while sys.stdin.read(): |
351 pass | 430 pass |
352 except OSError: | 431 except OSError: |
353 pass | 432 pass |
| 433 if options.sleep_last: |
| 434 time.sleep(10) |
354 return options.return_value | 435 return options.return_value |
355 | 436 |
356 | 437 |
357 if __name__ == '__main__': | 438 if __name__ == '__main__': |
358 logging.basicConfig(level= | 439 logging.basicConfig(level= |
359 [logging.WARNING, logging.INFO, logging.DEBUG][ | 440 [logging.WARNING, logging.INFO, logging.DEBUG][ |
360 min(2, sys.argv.count('-v'))]) | 441 min(2, sys.argv.count('-v'))]) |
361 if len(sys.argv) > 1 and sys.argv[1] == '--child': | 442 if len(sys.argv) > 1 and sys.argv[1] == '--child': |
362 sys.exit(child_main(sys.argv[2:])) | 443 sys.exit(child_main(sys.argv[2:])) |
363 unittest.main() | 444 unittest.main() |
OLD | NEW |