Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: tests/subprocess2_test.py

Issue 8505046: Revert r109283, r109282 and r109239. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « subprocess2.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Unit tests for subprocess2.py.""" 6 """Unit tests for subprocess2.py."""
7 7
8 import logging
9 import optparse 8 import optparse
10 import os 9 import os
11 import sys 10 import sys
12 import time 11 import time
13 import unittest 12 import unittest
14 13
15 try:
16 import fcntl
17 except ImportError:
18 fcntl = None
19
20 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 14 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
21 sys.path.insert(0, ROOT_DIR) 15 sys.path.insert(0, ROOT_DIR)
22 16
23 import subprocess2 17 import subprocess2
24 18
25 # Method could be a function 19 # Method could be a function
26 # pylint: disable=R0201 20 # pylint: disable=R0201
27 21
28 22 class Subprocess2Test(unittest.TestCase):
29 def convert_to_crlf(string):
30 """Unconditionally convert LF to CRLF."""
31 return string.replace('\n', '\r\n')
32
33
34 def convert_to_cr(string):
35 """Unconditionally convert LF to CR."""
36 return string.replace('\n', '\r')
37
38
39 def convert_win(string):
40 """Converts string to CRLF on Windows only."""
41 if sys.platform == 'win32':
42 return string.replace('\n', '\r\n')
43 return string
44
45
46 class DefaultsTest(unittest.TestCase):
47 # Can be mocked in a test. 23 # Can be mocked in a test.
48 TO_SAVE = { 24 TO_SAVE = {
49 subprocess2: [ 25 subprocess2: [
50 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], 26 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'],
51 subprocess2.subprocess: ['Popen'], 27 subprocess2.subprocess: ['Popen'],
52 } 28 }
53 29
54 def setUp(self): 30 def setUp(self):
31 self.exe_path = __file__
32 self.exe = [sys.executable, self.exe_path, '--child']
55 self.saved = {} 33 self.saved = {}
56 for module, names in self.TO_SAVE.iteritems(): 34 for module, names in self.TO_SAVE.iteritems():
57 self.saved[module] = dict( 35 self.saved[module] = dict(
58 (name, getattr(module, name)) for name in names) 36 (name, getattr(module, name)) for name in names)
59 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they 37 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they
60 # can be trapped in the child process for better coverage. 38 # can be trapped in the child process for better coverage.
61 39
62 def tearDown(self): 40 def tearDown(self):
63 for module, saved in self.saved.iteritems(): 41 for module, saved in self.saved.iteritems():
64 for name, value in saved.iteritems(): 42 for name, value in saved.iteritems():
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after
161 # fake_communicate() doesn't 'implement' that. 139 # fake_communicate() doesn't 'implement' that.
162 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) 140 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True))
163 expected = { 141 expected = {
164 'args': ['foo'], 142 'args': ['foo'],
165 'a':True, 143 'a':True,
166 'stdin': subprocess2.VOID, 144 'stdin': subprocess2.VOID,
167 'stdout': subprocess2.PIPE, 145 'stdout': subprocess2.PIPE,
168 } 146 }
169 self.assertEquals(expected, results) 147 self.assertEquals(expected, results)
170 148
171 def test_timeout_shell_throws(self):
172 # Never called.
173 _ = self._fake_Popen()
174 try:
175 subprocess2.communicate(
176 sys.executable,
177 timeout=0.01,
178 stdout=subprocess2.PIPE,
179 shell=True)
180 self.fail()
181 except TypeError:
182 pass
183
184
185 class S2Test(unittest.TestCase):
186 def setUp(self):
187 super(S2Test, self).setUp()
188 self.exe_path = __file__
189 self.exe = [sys.executable, self.exe_path, '--child']
190 self.states = {}
191 if fcntl:
192 for v in (sys.stdin, sys.stdout, sys.stderr):
193 fileno = v.fileno()
194 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL)
195
196 def tearDown(self):
197 for fileno, fl in self.states.iteritems():
198 self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL))
199 super(S2Test, self).tearDown()
200
201 def _run_test(self, function):
202 """Runs tests in 6 combinations:
203 - LF output with universal_newlines=False
204 - CR output with universal_newlines=False
205 - CRLF output with universal_newlines=False
206 - LF output with universal_newlines=True
207 - CR output with universal_newlines=True
208 - CRLF output with universal_newlines=True
209
210 First |function| argument is the convertion for the origianl expected LF
211 string to the right EOL.
212 Second |function| argument is the executable and initial flag to run, to
213 control what EOL is used by the child process.
214 Third |function| argument is universal_newlines value.
215 """
216 noop = lambda x: x
217 function(noop, self.exe, False)
218 function(convert_to_cr, self.exe + ['--cr'], False)
219 function(convert_to_crlf, self.exe + ['--crlf'], False)
220 function(noop, self.exe, True)
221 function(noop, self.exe + ['--cr'], True)
222 function(noop, self.exe + ['--crlf'], True)
223
224 def test_timeout(self): 149 def test_timeout(self):
150 # It'd be better to not discard stdout.
225 out, returncode = subprocess2.communicate( 151 out, returncode = subprocess2.communicate(
226 self.exe + ['--sleep_first', '--stdout'], 152 self.exe + ['--sleep', '--stdout'],
227 timeout=0.01, 153 timeout=0.01,
228 stdout=subprocess2.PIPE, 154 stdout=subprocess2.PIPE)
229 shell=False)
230 self.assertEquals(subprocess2.TIMED_OUT, returncode) 155 self.assertEquals(subprocess2.TIMED_OUT, returncode)
231 self.assertEquals(('', None), out) 156 self.assertEquals(['', None], out)
232 157
233 def test_check_output_no_stdout(self): 158 def test_check_output_no_stdout(self):
234 try: 159 try:
235 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) 160 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE)
236 self.fail() 161 self.fail()
237 except TypeError: 162 except TypeError:
238 pass 163 pass
239 164
240 def test_stdout_void(self): 165 def test_stdout_void(self):
241 def fn(c, e, un): 166 (out, err), code = subprocess2.communicate(
242 (out, err), code = subprocess2.communicate( 167 self.exe + ['--stdout', '--stderr'],
243 e + ['--stdout', '--stderr'], 168 stdout=subprocess2.VOID,
244 stdout=subprocess2.VOID, 169 stderr=subprocess2.PIPE)
245 stderr=subprocess2.PIPE, 170 self.assertEquals(None, out)
246 universal_newlines=un) 171 expected = 'a\nbb\nccc\n'
247 self.assertEquals(None, out) 172 if sys.platform == 'win32':
248 self.assertEquals(c('a\nbb\nccc\n'), err) 173 expected = expected.replace('\n', '\r\n')
249 self.assertEquals(0, code) 174 self.assertEquals(expected, err)
250 self._run_test(fn) 175 self.assertEquals(0, code)
251 176
252 def test_stderr_void(self): 177 def test_stderr_void(self):
253 def fn(c, e, un): 178 (out, err), code = subprocess2.communicate(
254 (out, err), code = subprocess2.communicate( 179 self.exe + ['--stdout', '--stderr'],
255 e + ['--stdout', '--stderr'], 180 universal_newlines=True,
256 stdout=subprocess2.PIPE, 181 stdout=subprocess2.PIPE,
257 stderr=subprocess2.VOID, 182 stderr=subprocess2.VOID)
258 universal_newlines=un) 183 self.assertEquals('A\nBB\nCCC\n', out)
259 self.assertEquals(c('A\nBB\nCCC\n'), out) 184 self.assertEquals(None, err)
260 self.assertEquals(None, err) 185 self.assertEquals(0, code)
261 self.assertEquals(0, code)
262 self._run_test(fn)
263 186
264 def test_check_output_throw_stdout(self): 187 def test_check_output_throw_stdout(self):
265 def fn(c, e, un): 188 try:
266 try: 189 subprocess2.check_output(
267 subprocess2.check_output( 190 self.exe + ['--fail', '--stdout'], universal_newlines=True)
268 e + ['--fail', '--stdout'], universal_newlines=un) 191 self.fail()
269 self.fail() 192 except subprocess2.CalledProcessError, e:
270 except subprocess2.CalledProcessError, e: 193 self.assertEquals('A\nBB\nCCC\n', e.stdout)
271 self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) 194 self.assertEquals(None, e.stderr)
272 self.assertEquals(None, e.stderr) 195 self.assertEquals(64, e.returncode)
273 self.assertEquals(64, e.returncode)
274 self._run_test(fn)
275 196
276 def test_check_output_throw_no_stderr(self): 197 def test_check_output_throw_no_stderr(self):
277 def fn(c, e, un): 198 try:
278 try: 199 subprocess2.check_output(
279 subprocess2.check_output( 200 self.exe + ['--fail', '--stderr'], universal_newlines=True)
280 e + ['--fail', '--stderr'], universal_newlines=un) 201 self.fail()
281 self.fail() 202 except subprocess2.CalledProcessError, e:
282 except subprocess2.CalledProcessError, e: 203 self.assertEquals('', e.stdout)
283 self.assertEquals(c(''), e.stdout) 204 self.assertEquals(None, e.stderr)
284 self.assertEquals(None, e.stderr) 205 self.assertEquals(64, e.returncode)
285 self.assertEquals(64, e.returncode)
286 self._run_test(fn)
287 206
288 def test_check_output_throw_stderr(self): 207 def test_check_output_throw_stderr(self):
289 def fn(c, e, un): 208 try:
290 try: 209 subprocess2.check_output(
291 subprocess2.check_output( 210 self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE,
292 e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, 211 universal_newlines=True)
293 universal_newlines=un) 212 self.fail()
294 self.fail() 213 except subprocess2.CalledProcessError, e:
295 except subprocess2.CalledProcessError, e: 214 self.assertEquals('', e.stdout)
296 self.assertEquals('', e.stdout) 215 self.assertEquals('a\nbb\nccc\n', e.stderr)
297 self.assertEquals(c('a\nbb\nccc\n'), e.stderr) 216 self.assertEquals(64, e.returncode)
298 self.assertEquals(64, e.returncode)
299 self._run_test(fn)
300 217
301 def test_check_output_throw_stderr_stdout(self): 218 def test_check_output_throw_stderr_stdout(self):
302 def fn(c, e, un): 219 try:
303 try: 220 subprocess2.check_output(
304 subprocess2.check_output( 221 self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT,
305 e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, 222 universal_newlines=True)
306 universal_newlines=un) 223 self.fail()
307 self.fail() 224 except subprocess2.CalledProcessError, e:
308 except subprocess2.CalledProcessError, e: 225 self.assertEquals('a\nbb\nccc\n', e.stdout)
309 self.assertEquals(c('a\nbb\nccc\n'), e.stdout) 226 self.assertEquals(None, e.stderr)
310 self.assertEquals(None, e.stderr) 227 self.assertEquals(64, e.returncode)
311 self.assertEquals(64, e.returncode)
312 self._run_test(fn)
313 228
314 def test_check_call_throw(self): 229 def test_check_call_throw(self):
315 try: 230 try:
316 subprocess2.check_call(self.exe + ['--fail', '--stderr']) 231 subprocess2.check_call(self.exe + ['--fail', '--stderr'])
317 self.fail() 232 self.fail()
318 except subprocess2.CalledProcessError, e: 233 except subprocess2.CalledProcessError, e:
319 self.assertEquals(None, e.stdout) 234 self.assertEquals(None, e.stdout)
320 self.assertEquals(None, e.stderr) 235 self.assertEquals(None, e.stderr)
321 self.assertEquals(64, e.returncode) 236 self.assertEquals(64, e.returncode)
322 237
323 def test_check_output_tee_stderr(self):
324 def fn(c, e, un):
325 stderr = []
326 out, returncode = subprocess2.communicate(
327 e + ['--stderr'], stderr=stderr.append,
328 universal_newlines=un)
329 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
330 self.assertEquals((None, None), out)
331 self.assertEquals(0, returncode)
332 self._run_test(fn)
333
334 def test_check_output_tee_stdout_stderr(self):
335 def fn(c, e, un):
336 stdout = []
337 stderr = []
338 out, returncode = subprocess2.communicate(
339 e + ['--stdout', '--stderr'],
340 stdout=stdout.append,
341 stderr=stderr.append,
342 universal_newlines=un)
343 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
344 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
345 self.assertEquals((None, None), out)
346 self.assertEquals(0, returncode)
347 self._run_test(fn)
348
349 def test_check_output_tee_stdin(self):
350 def fn(c, e, un):
351 stdout = []
352 stdin = '0123456789'
353 out, returncode = subprocess2.communicate(
354 e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append,
355 universal_newlines=un)
356 self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
357 self.assertEquals((None, None), out)
358 self.assertEquals(0, returncode)
359 self._run_test(fn)
360
361 def test_check_output_tee_throw(self):
362 def fn(c, e, un):
363 stderr = []
364 try:
365 subprocess2.check_output(
366 e + ['--stderr', '--fail'], stderr=stderr.append,
367 universal_newlines=un)
368 self.fail()
369 except subprocess2.CalledProcessError, e:
370 self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
371 self.assertEquals('', e.stdout)
372 self.assertEquals(None, e.stderr)
373 self.assertEquals(64, e.returncode)
374 self._run_test(fn)
375
376 def test_check_output_tee_large(self):
377 stdout = []
378 # Read 128kb. On my workstation it takes >2s. Welcome to 2011.
379 out, returncode = subprocess2.communicate(
380 self.exe + ['--large'], stdout=stdout.append)
381 self.assertEquals(128*1024, len(''.join(stdout)))
382 self.assertEquals((None, None), out)
383 self.assertEquals(0, returncode)
384
385 def test_check_output_tee_large_stdin(self):
386 stdout = []
387 # Write 128kb.
388 stdin = '0123456789abcdef' * (8*1024)
389 out, returncode = subprocess2.communicate(
390 self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append)
391 self.assertEquals(128*1024, len(''.join(stdout)))
392 self.assertEquals((None, None), out)
393 self.assertEquals(0, returncode)
394
395 238
396 def child_main(args): 239 def child_main(args):
397 if sys.platform == 'win32':
398 # Annoying, make sure the output is not translated on Windows.
399 # pylint: disable=E1101,F0401
400 import msvcrt
401 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
402 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
403
404 parser = optparse.OptionParser() 240 parser = optparse.OptionParser()
405 parser.add_option( 241 parser.add_option(
406 '--fail', 242 '--fail',
407 dest='return_value', 243 dest='return_value',
408 action='store_const', 244 action='store_const',
409 default=0, 245 default=0,
410 const=64) 246 const=64)
411 parser.add_option(
412 '--crlf', action='store_const', const='\r\n', dest='eol', default='\n')
413 parser.add_option(
414 '--cr', action='store_const', const='\r', dest='eol')
415 parser.add_option('--stdout', action='store_true') 247 parser.add_option('--stdout', action='store_true')
416 parser.add_option('--stderr', action='store_true') 248 parser.add_option('--stderr', action='store_true')
417 parser.add_option('--sleep_first', action='store_true') 249 parser.add_option('--sleep', action='store_true')
418 parser.add_option('--sleep_last', action='store_true')
419 parser.add_option('--large', action='store_true')
420 parser.add_option('--read', action='store_true')
421 options, args = parser.parse_args(args) 250 options, args = parser.parse_args(args)
422 if args: 251 if args:
423 parser.error('Internal error') 252 parser.error('Internal error')
424 if options.sleep_first:
425 time.sleep(10)
426 253
427 def do(string): 254 def do(string):
428 if options.stdout: 255 if options.stdout:
429 sys.stdout.write(string.upper()) 256 print >> sys.stdout, string.upper()
430 sys.stdout.write(options.eol)
431 if options.stderr: 257 if options.stderr:
432 sys.stderr.write(string.lower()) 258 print >> sys.stderr, string.lower()
433 sys.stderr.write(options.eol)
434 259
435 do('A') 260 do('A')
436 do('BB') 261 do('BB')
437 do('CCC') 262 do('CCC')
438 if options.large: 263 if options.sleep:
439 # Print 128kb.
440 string = '0123456789abcdef' * (8*1024)
441 sys.stdout.write(string)
442 if options.read:
443 try:
444 while sys.stdin.read():
445 pass
446 except OSError:
447 pass
448 if options.sleep_last:
449 time.sleep(10) 264 time.sleep(10)
450 return options.return_value 265 return options.return_value
451 266
452 267
453 if __name__ == '__main__': 268 if __name__ == '__main__':
454 logging.basicConfig(level=
455 [logging.WARNING, logging.INFO, logging.DEBUG][
456 min(2, sys.argv.count('-v'))])
457 if len(sys.argv) > 1 and sys.argv[1] == '--child': 269 if len(sys.argv) > 1 and sys.argv[1] == '--child':
458 sys.exit(child_main(sys.argv[2:])) 270 sys.exit(child_main(sys.argv[2:]))
459 unittest.main() 271 unittest.main()
OLDNEW
« no previous file with comments | « subprocess2.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698