Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(365)

Side by Side Diff: tests/subprocess2_test.py

Issue 8472002: Add tests for both universal_newlines=True and False. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Unit tests for subprocess2.py.""" 6 """Unit tests for subprocess2.py."""
7 7
8 import logging 8 import logging
9 import optparse 9 import optparse
10 import os 10 import os
11 import sys 11 import sys
12 import time 12 import time
13 import unittest 13 import unittest
14 14
15 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 15 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
16 sys.path.insert(0, ROOT_DIR) 16 sys.path.insert(0, ROOT_DIR)
17 17
18 import subprocess2 18 import subprocess2
19 19
20 # Method could be a function 20 # Method could be a function
21 # pylint: disable=R0201 21 # pylint: disable=R0201
22 22
23 23
24 def convert(string): 24 def convert_to_crlf(string):
25 """Converts string to CRLF on Windows.""" 25 """Unconditionally convert LF to CRLF."""
26 return string.replace('\n', '\r\n')
27
28
29 def convert_to_cr(string):
30 """Unconditionally convert LF to CR."""
31 return string.replace('\n', '\r')
32
33
34 def convert_win(string):
35 """Converts string to CRLF on Windows only."""
26 if sys.platform == 'win32': 36 if sys.platform == 'win32':
27 return string.replace('\n', '\r\n') 37 return string.replace('\n', '\r\n')
28 return string 38 return string
29 39
30 40
31 class Subprocess2Test(unittest.TestCase): 41 class DefaultsTest(unittest.TestCase):
32 # Can be mocked in a test. 42 # Can be mocked in a test.
33 TO_SAVE = { 43 TO_SAVE = {
34 subprocess2: [ 44 subprocess2: [
35 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], 45 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'],
36 subprocess2.subprocess: ['Popen'], 46 subprocess2.subprocess: ['Popen'],
37 } 47 }
38 48
39 def setUp(self): 49 def setUp(self):
40 self.exe_path = __file__
41 self.exe = [sys.executable, self.exe_path, '--child']
42 self.saved = {} 50 self.saved = {}
43 for module, names in self.TO_SAVE.iteritems(): 51 for module, names in self.TO_SAVE.iteritems():
44 self.saved[module] = dict( 52 self.saved[module] = dict(
45 (name, getattr(module, name)) for name in names) 53 (name, getattr(module, name)) for name in names)
46 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they 54 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they
47 # can be trapped in the child process for better coverage. 55 # can be trapped in the child process for better coverage.
48 56
49 def tearDown(self): 57 def tearDown(self):
50 for module, saved in self.saved.iteritems(): 58 for module, saved in self.saved.iteritems():
51 for name, value in saved.iteritems(): 59 for name, value in saved.iteritems():
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after
148 # fake_communicate() doesn't 'implement' that. 156 # fake_communicate() doesn't 'implement' that.
149 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) 157 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True))
150 expected = { 158 expected = {
151 'args': ['foo'], 159 'args': ['foo'],
152 'a':True, 160 'a':True,
153 'stdin': subprocess2.VOID, 161 'stdin': subprocess2.VOID,
154 'stdout': subprocess2.PIPE, 162 'stdout': subprocess2.PIPE,
155 } 163 }
156 self.assertEquals(expected, results) 164 self.assertEquals(expected, results)
157 165
158 def test_timeout(self):
159 out, returncode = subprocess2.communicate(
160 self.exe + ['--sleep', '--stdout'],
161 timeout=0.01,
162 stdout=subprocess2.PIPE,
163 shell=False)
164 self.assertEquals(subprocess2.TIMED_OUT, returncode)
165 self.assertEquals(('', None), out)
166
167 def test_timeout_shell_throws(self): 166 def test_timeout_shell_throws(self):
167 # Never called.
168 _ = self._fake_Popen()
168 try: 169 try:
169 subprocess2.communicate( 170 subprocess2.communicate(
170 self.exe + ['--sleep', '--stdout'], 171 sys.executable,
171 timeout=0.01, 172 timeout=0.01,
172 stdout=subprocess2.PIPE, 173 stdout=subprocess2.PIPE,
173 shell=True) 174 shell=True)
174 self.fail() 175 self.fail()
175 except TypeError: 176 except TypeError:
176 pass 177 pass
177 178
179
180 class S2Test(unittest.TestCase):
181 def setUp(self):
182 super(S2Test, self).setUp()
183 self.exe_path = __file__
184 self.exe = [sys.executable, self.exe_path, '--child']
185
186 def _run_test(self, function):
187 """Runs tests in 6 combinations:
188 - LF output with universal_newlines=False
189 - CR output with universal_newlines=False
190 - CRLF output with universal_newlines=False
191 - LF output with universal_newlines=True
192 - CR output with universal_newlines=True
193 - CRLF output with universal_newlines=True
194
195 First |function| argument is the convertion for the origianl expected LF
196 string to the right EOL.
197 Second |function| argument is the executable and initial flag to run, to
198 control what EOL is used by the child process.
199 Third |function| argument is universal_newlines value.
200 """
201 noop = lambda x: x
202 function(noop, self.exe, False)
203 function(convert_to_cr, self.exe + ['--cr'], False)
204 function(convert_to_crlf, self.exe + ['--crlf'], False)
205 function(noop, self.exe, True)
206 function(noop, self.exe + ['--cr'], True)
207 function(noop, self.exe + ['--crlf'], True)
208
209 def test_timeout(self):
210 out, returncode = subprocess2.communicate(
211 self.exe + ['--sleep_first', '--stdout'],
212 timeout=0.01,
213 stdout=subprocess2.PIPE,
214 shell=False)
215 self.assertEquals(subprocess2.TIMED_OUT, returncode)
216 self.assertEquals(('', None), out)
217
178 def test_check_output_no_stdout(self): 218 def test_check_output_no_stdout(self):
179 try: 219 try:
180 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) 220 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE)
181 self.fail() 221 self.fail()
182 except TypeError: 222 except TypeError:
183 pass 223 pass
184 224
185 def test_stdout_void(self): 225 def test_stdout_void(self):
186 (out, err), code = subprocess2.communicate( 226 def fn(c, e, un):
187 self.exe + ['--stdout', '--stderr'], 227 (out, err), code = subprocess2.communicate(
188 stdout=subprocess2.VOID, 228 e + ['--stdout', '--stderr'],
189 stderr=subprocess2.PIPE) 229 stdout=subprocess2.VOID,
190 self.assertEquals(None, out) 230 stderr=subprocess2.PIPE,
191 self.assertEquals(convert('a\nbb\nccc\n'), err) 231 universal_newlines=un)
192 self.assertEquals(0, code) 232 self.assertEquals(None, out)
233 self.assertEquals(c('a\nbb\nccc\n'), err)
234 self.assertEquals(0, code)
235 self._run_test(fn)
193 236
194 def test_stderr_void(self): 237 def test_stderr_void(self):
195 (out, err), code = subprocess2.communicate( 238 def fn(c, e, un):
196 self.exe + ['--stdout', '--stderr'], 239 (out, err), code = subprocess2.communicate(
197 universal_newlines=True, 240 e + ['--stdout', '--stderr'],
198 stdout=subprocess2.PIPE, 241 stdout=subprocess2.PIPE,
199 stderr=subprocess2.VOID) 242 stderr=subprocess2.VOID,
200 self.assertEquals('A\nBB\nCCC\n', out) 243 universal_newlines=un)
201 self.assertEquals(None, err) 244 self.assertEquals(c('A\nBB\nCCC\n'), out)
202 self.assertEquals(0, code) 245 self.assertEquals(None, err)
246 self.assertEquals(0, code)
247 self._run_test(fn)
203 248
204 def test_check_output_throw_stdout(self): 249 def test_check_output_throw_stdout(self):
205 try: 250 def fn(c, e, un):
206 subprocess2.check_output( 251 try:
207 self.exe + ['--fail', '--stdout'], universal_newlines=True) 252 subprocess2.check_output(
208 self.fail() 253 e + ['--fail', '--stdout'], universal_newlines=un)
209 except subprocess2.CalledProcessError, e: 254 self.fail()
210 self.assertEquals('A\nBB\nCCC\n', e.stdout) 255 except subprocess2.CalledProcessError, e:
211 self.assertEquals(None, e.stderr) 256 self.assertEquals(c('A\nBB\nCCC\n'), e.stdout)
212 self.assertEquals(64, e.returncode) 257 self.assertEquals(None, e.stderr)
258 self.assertEquals(64, e.returncode)
259 self._run_test(fn)
213 260
214 def test_check_output_throw_no_stderr(self): 261 def test_check_output_throw_no_stderr(self):
215 try: 262 def fn(c, e, un):
216 subprocess2.check_output( 263 try:
217 self.exe + ['--fail', '--stderr'], universal_newlines=True) 264 subprocess2.check_output(
218 self.fail() 265 e + ['--fail', '--stderr'], universal_newlines=un)
219 except subprocess2.CalledProcessError, e: 266 self.fail()
220 self.assertEquals('', e.stdout) 267 except subprocess2.CalledProcessError, e:
221 self.assertEquals(None, e.stderr) 268 self.assertEquals(c(''), e.stdout)
222 self.assertEquals(64, e.returncode) 269 self.assertEquals(None, e.stderr)
270 self.assertEquals(64, e.returncode)
271 self._run_test(fn)
223 272
224 def test_check_output_throw_stderr(self): 273 def test_check_output_throw_stderr(self):
225 try: 274 def fn(c, e, un):
226 subprocess2.check_output( 275 try:
227 self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE, 276 subprocess2.check_output(
228 universal_newlines=True) 277 e + ['--fail', '--stderr'], stderr=subprocess2.PIPE,
229 self.fail() 278 universal_newlines=un)
230 except subprocess2.CalledProcessError, e: 279 self.fail()
231 self.assertEquals('', e.stdout) 280 except subprocess2.CalledProcessError, e:
232 self.assertEquals('a\nbb\nccc\n', e.stderr) 281 self.assertEquals('', e.stdout)
233 self.assertEquals(64, e.returncode) 282 self.assertEquals(c('a\nbb\nccc\n'), e.stderr)
283 self.assertEquals(64, e.returncode)
284 self._run_test(fn)
234 285
235 def test_check_output_throw_stderr_stdout(self): 286 def test_check_output_throw_stderr_stdout(self):
236 try: 287 def fn(c, e, un):
237 subprocess2.check_output( 288 try:
238 self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, 289 subprocess2.check_output(
239 universal_newlines=True) 290 e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT,
240 self.fail() 291 universal_newlines=un)
241 except subprocess2.CalledProcessError, e: 292 self.fail()
242 self.assertEquals('a\nbb\nccc\n', e.stdout) 293 except subprocess2.CalledProcessError, e:
243 self.assertEquals(None, e.stderr) 294 self.assertEquals(c('a\nbb\nccc\n'), e.stdout)
244 self.assertEquals(64, e.returncode) 295 self.assertEquals(None, e.stderr)
296 self.assertEquals(64, e.returncode)
297 self._run_test(fn)
245 298
246 def test_check_call_throw(self): 299 def test_check_call_throw(self):
247 try: 300 try:
248 subprocess2.check_call(self.exe + ['--fail', '--stderr']) 301 subprocess2.check_call(self.exe + ['--fail', '--stderr'])
249 self.fail() 302 self.fail()
250 except subprocess2.CalledProcessError, e: 303 except subprocess2.CalledProcessError, e:
251 self.assertEquals(None, e.stdout) 304 self.assertEquals(None, e.stdout)
252 self.assertEquals(None, e.stderr) 305 self.assertEquals(None, e.stderr)
253 self.assertEquals(64, e.returncode) 306 self.assertEquals(64, e.returncode)
254 307
255 def test_check_output_tee_stderr(self): 308 def test_check_output_tee_stderr(self):
256 stderr = [] 309 def fn(c, e, un):
257 out, returncode = subprocess2.communicate( 310 stderr = []
258 self.exe + ['--stderr'], stderr=stderr.append) 311 out, returncode = subprocess2.communicate(
259 self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) 312 e + ['--stderr'], stderr=stderr.append,
260 self.assertEquals((None, None), out) 313 universal_newlines=un)
261 self.assertEquals(0, returncode) 314 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
315 self.assertEquals((None, None), out)
316 self.assertEquals(0, returncode)
317 self._run_test(fn)
262 318
263 def test_check_output_tee_stdout_stderr(self): 319 def test_check_output_tee_stdout_stderr(self):
264 stdout = [] 320 def fn(c, e, un):
265 stderr = [] 321 stdout = []
266 out, returncode = subprocess2.communicate( 322 stderr = []
267 self.exe + ['--stdout', '--stderr'], 323 out, returncode = subprocess2.communicate(
268 stdout=stdout.append, 324 e + ['--stdout', '--stderr'],
269 stderr=stderr.append) 325 stdout=stdout.append,
270 self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) 326 stderr=stderr.append,
271 self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) 327 universal_newlines=un)
272 self.assertEquals((None, None), out) 328 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
273 self.assertEquals(0, returncode) 329 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
330 self.assertEquals((None, None), out)
331 self.assertEquals(0, returncode)
332 self._run_test(fn)
274 333
275 def test_check_output_tee_stdin(self): 334 def test_check_output_tee_stdin(self):
276 stdout = [] 335 def fn(c, e, un):
277 stdin = '0123456789' 336 stdout = []
278 out, returncode = subprocess2.communicate( 337 stdin = '0123456789'
279 self.exe + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append) 338 out, returncode = subprocess2.communicate(
280 self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) 339 e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append,
281 self.assertEquals((None, None), out) 340 universal_newlines=un)
282 self.assertEquals(0, returncode) 341 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
342 self.assertEquals((None, None), out)
343 self.assertEquals(0, returncode)
344 self._run_test(fn)
283 345
284 def test_check_output_tee_throw(self): 346 def test_check_output_tee_throw(self):
285 stderr = [] 347 def fn(c, e, un):
286 try: 348 stderr = []
287 subprocess2.check_output( 349 try:
288 self.exe + ['--stderr', '--fail'], stderr=stderr.append) 350 subprocess2.check_output(
289 self.fail() 351 e + ['--stderr', '--fail'], stderr=stderr.append,
290 except subprocess2.CalledProcessError, e: 352 universal_newlines=un)
291 self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) 353 self.fail()
292 self.assertEquals('', e.stdout) 354 except subprocess2.CalledProcessError, e:
293 self.assertEquals(None, e.stderr) 355 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
294 self.assertEquals(64, e.returncode) 356 self.assertEquals('', e.stdout)
357 self.assertEquals(None, e.stderr)
358 self.assertEquals(64, e.returncode)
359 self._run_test(fn)
295 360
296 def test_check_output_tee_large(self): 361 def test_check_output_tee_large(self):
297 stdout = [] 362 stdout = []
298 # Read 128kb. On my workstation it takes >2s. Welcome to 2011. 363 # Read 128kb. On my workstation it takes >2s. Welcome to 2011.
299 out, returncode = subprocess2.communicate( 364 out, returncode = subprocess2.communicate(
300 self.exe + ['--large'], stdout=stdout.append) 365 self.exe + ['--large'], stdout=stdout.append)
301 self.assertEquals(128*1024, len(''.join(stdout))) 366 self.assertEquals(128*1024, len(''.join(stdout)))
302 self.assertEquals((None, None), out) 367 self.assertEquals((None, None), out)
303 self.assertEquals(0, returncode) 368 self.assertEquals(0, returncode)
304 369
305 def test_check_output_tee_large_stdin(self): 370 def test_check_output_tee_large_stdin(self):
306 stdout = [] 371 stdout = []
307 # Write 128kb. 372 # Write 128kb.
308 stdin = '0123456789abcdef' * (8*1024) 373 stdin = '0123456789abcdef' * (8*1024)
309 out, returncode = subprocess2.communicate( 374 out, returncode = subprocess2.communicate(
310 self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append) 375 self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append)
311 self.assertEquals(128*1024, len(''.join(stdout))) 376 self.assertEquals(128*1024, len(''.join(stdout)))
312 self.assertEquals((None, None), out) 377 self.assertEquals((None, None), out)
313 self.assertEquals(0, returncode) 378 self.assertEquals(0, returncode)
314 379
315 380
316 def child_main(args): 381 def child_main(args):
382 if sys.platform == 'win32':
383 # Annoying, make sure the output is not translated on Windows.
384 # pylint: disable=E1101,F0401
385 import msvcrt
386 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
387 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
388
317 parser = optparse.OptionParser() 389 parser = optparse.OptionParser()
318 parser.add_option( 390 parser.add_option(
319 '--fail', 391 '--fail',
320 dest='return_value', 392 dest='return_value',
321 action='store_const', 393 action='store_const',
322 default=0, 394 default=0,
323 const=64) 395 const=64)
396 parser.add_option(
397 '--crlf', action='store_const', const='\r\n', dest='eol', default='\n')
398 parser.add_option(
399 '--cr', action='store_const', const='\r', dest='eol')
324 parser.add_option('--stdout', action='store_true') 400 parser.add_option('--stdout', action='store_true')
325 parser.add_option('--stderr', action='store_true') 401 parser.add_option('--stderr', action='store_true')
326 parser.add_option('--sleep', action='store_true') 402 parser.add_option('--sleep_first', action='store_true')
403 parser.add_option('--sleep_last', action='store_true')
327 parser.add_option('--large', action='store_true') 404 parser.add_option('--large', action='store_true')
328 parser.add_option('--read', action='store_true') 405 parser.add_option('--read', action='store_true')
329 options, args = parser.parse_args(args) 406 options, args = parser.parse_args(args)
330 if args: 407 if args:
331 parser.error('Internal error') 408 parser.error('Internal error')
332 if options.sleep: 409 if options.sleep_first:
333 time.sleep(10) 410 time.sleep(10)
334 411
335 def do(string): 412 def do(string):
336 if options.stdout: 413 if options.stdout:
337 print >> sys.stdout, string.upper() 414 sys.stdout.write(string.upper())
415 sys.stdout.write(options.eol)
338 if options.stderr: 416 if options.stderr:
339 print >> sys.stderr, string.lower() 417 sys.stderr.write(string.lower())
418 sys.stderr.write(options.eol)
340 419
341 do('A') 420 do('A')
342 do('BB') 421 do('BB')
343 do('CCC') 422 do('CCC')
344 if options.large: 423 if options.large:
345 # Print 128kb. 424 # Print 128kb.
346 string = '0123456789abcdef' * (8*1024) 425 string = '0123456789abcdef' * (8*1024)
347 sys.stdout.write(string) 426 sys.stdout.write(string)
348 if options.read: 427 if options.read:
349 try: 428 try:
350 while sys.stdin.read(): 429 while sys.stdin.read():
351 pass 430 pass
352 except OSError: 431 except OSError:
353 pass 432 pass
433 if options.sleep_last:
434 time.sleep(10)
354 return options.return_value 435 return options.return_value
355 436
356 437
357 if __name__ == '__main__': 438 if __name__ == '__main__':
358 logging.basicConfig(level= 439 logging.basicConfig(level=
359 [logging.WARNING, logging.INFO, logging.DEBUG][ 440 [logging.WARNING, logging.INFO, logging.DEBUG][
360 min(2, sys.argv.count('-v'))]) 441 min(2, sys.argv.count('-v'))])
361 if len(sys.argv) > 1 and sys.argv[1] == '--child': 442 if len(sys.argv) > 1 and sys.argv[1] == '--child':
362 sys.exit(child_main(sys.argv[2:])) 443 sys.exit(child_main(sys.argv[2:]))
363 unittest.main() 444 unittest.main()
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698