OLD | NEW |
---|---|
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Unit tests for subprocess2.py.""" | 6 """Unit tests for subprocess2.py.""" |
7 | 7 |
8 import logging | |
8 import optparse | 9 import optparse |
9 import os | 10 import os |
10 import sys | 11 import sys |
11 import time | 12 import time |
12 import unittest | 13 import unittest |
13 | 14 |
15 try: | |
16 import fcntl | |
17 except ImportError: | |
18 fcntl = None | |
19 | |
14 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | 20 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
15 | 21 |
16 import subprocess2 | 22 import subprocess2 |
17 | 23 |
18 class Subprocess2Test(unittest.TestCase): | 24 # Method could be a function |
25 # pylint: disable=R0201 | |
26 | |
27 | |
28 def convert_to_crlf(string): | |
29 """Unconditionally convert LF to CRLF.""" | |
30 return string.replace('\n', '\r\n') | |
31 | |
32 | |
33 def convert_to_cr(string): | |
34 """Unconditionally convert LF to CR.""" | |
35 return string.replace('\n', '\r') | |
36 | |
37 | |
38 def convert_win(string): | |
39 """Converts string to CRLF on Windows only.""" | |
40 if sys.platform == 'win32': | |
41 return string.replace('\n', '\r\n') | |
42 return string | |
43 | |
44 | |
45 class DefaultsTest(unittest.TestCase): | |
19 # Can be mocked in a test. | 46 # Can be mocked in a test. |
20 TO_SAVE = { | 47 TO_SAVE = { |
21 subprocess2: [ | 48 subprocess2: [ |
22 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], | 49 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], |
23 subprocess2.subprocess: ['Popen'], | 50 subprocess2.subprocess: ['Popen'], |
24 } | 51 } |
25 | 52 |
26 def setUp(self): | 53 def setUp(self): |
27 self.exe_path = __file__ | |
28 self.exe = [sys.executable, self.exe_path, '--child'] | |
29 self.saved = {} | 54 self.saved = {} |
30 for module, names in self.TO_SAVE.iteritems(): | 55 for module, names in self.TO_SAVE.iteritems(): |
31 self.saved[module] = dict( | 56 self.saved[module] = dict( |
32 (name, getattr(module, name)) for name in names) | 57 (name, getattr(module, name)) for name in names) |
33 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they | 58 # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they |
34 # can be trapped in the child process for better coverage. | 59 # can be trapped in the child process for better coverage. |
35 | 60 |
36 def tearDown(self): | 61 def tearDown(self): |
37 for module, saved in self.saved.iteritems(): | 62 for module, saved in self.saved.iteritems(): |
38 for name, value in saved.iteritems(): | 63 for name, value in saved.iteritems(): |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
137 # fake_communicate() doesn't 'implement' that. | 162 # fake_communicate() doesn't 'implement' that. |
138 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) | 163 self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) |
139 expected = { | 164 expected = { |
140 'args': ['foo'], | 165 'args': ['foo'], |
141 'a':True, | 166 'a':True, |
142 'stdin': subprocess2.VOID, | 167 'stdin': subprocess2.VOID, |
143 'stdout': subprocess2.PIPE, | 168 'stdout': subprocess2.PIPE, |
144 } | 169 } |
145 self.assertEquals(expected, results) | 170 self.assertEquals(expected, results) |
146 | 171 |
172 | |
173 class S2Test(unittest.TestCase): | |
Dirk Pranke
2011/11/11 20:48:20
Nit: might be better not to abbreviate this.
M-A Ruel
2011/11/11 20:49:18
I was tired of typing it when filtering on the com
| |
174 def setUp(self): | |
175 super(S2Test, self).setUp() | |
176 self.exe_path = __file__ | |
177 self.exe = [sys.executable, self.exe_path, '--child'] | |
178 self.states = {} | |
179 if fcntl: | |
180 for v in (sys.stdin, sys.stdout, sys.stderr): | |
181 fileno = v.fileno() | |
182 self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) | |
183 | |
184 def tearDown(self): | |
185 for fileno, fl in self.states.iteritems(): | |
186 self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) | |
187 super(S2Test, self).tearDown() | |
188 | |
189 def _run_test(self, function): | |
190 """Runs tests in 6 combinations: | |
191 - LF output with universal_newlines=False | |
192 - CR output with universal_newlines=False | |
193 - CRLF output with universal_newlines=False | |
194 - LF output with universal_newlines=True | |
195 - CR output with universal_newlines=True | |
196 - CRLF output with universal_newlines=True | |
197 | |
198 First |function| argument is the convertion for the origianl expected LF | |
199 string to the right EOL. | |
200 Second |function| argument is the executable and initial flag to run, to | |
201 control what EOL is used by the child process. | |
202 Third |function| argument is universal_newlines value. | |
203 """ | |
204 noop = lambda x: x | |
205 function(noop, self.exe, False) | |
206 function(convert_to_cr, self.exe + ['--cr'], False) | |
207 function(convert_to_crlf, self.exe + ['--crlf'], False) | |
208 function(noop, self.exe, True) | |
209 function(noop, self.exe + ['--cr'], True) | |
210 function(noop, self.exe + ['--crlf'], True) | |
211 | |
147 def test_timeout(self): | 212 def test_timeout(self): |
148 # It'd be better to not discard stdout. | |
149 out, returncode = subprocess2.communicate( | 213 out, returncode = subprocess2.communicate( |
150 self.exe + ['--sleep', '--stdout'], | 214 self.exe + ['--sleep_first', '--stdout'], |
151 timeout=0.01, | 215 timeout=0.01, |
152 stdout=subprocess2.PIPE) | 216 stdout=subprocess2.PIPE, |
217 shell=False) | |
153 self.assertEquals(subprocess2.TIMED_OUT, returncode) | 218 self.assertEquals(subprocess2.TIMED_OUT, returncode) |
154 self.assertEquals(['', None], out) | 219 self.assertEquals(('', None), out) |
155 | 220 |
156 def test_check_output_no_stdout(self): | 221 def test_check_output_no_stdout(self): |
157 try: | 222 try: |
158 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) | 223 subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) |
159 self.fail() | 224 self.fail() |
160 except TypeError: | 225 except TypeError: |
161 pass | 226 pass |
162 | 227 |
163 def test_stdout_void(self): | 228 def test_stdout_void(self): |
164 (out, err), code = subprocess2.communicate( | 229 def fn(c, e, un): |
165 self.exe + ['--stdout', '--stderr'], | 230 (out, err), code = subprocess2.communicate( |
166 stdout=subprocess2.VOID, | 231 e + ['--stdout', '--stderr'], |
167 stderr=subprocess2.PIPE) | 232 stdout=subprocess2.VOID, |
168 self.assertEquals(None, out) | 233 stderr=subprocess2.PIPE, |
169 expected = 'a\nbb\nccc\n' | 234 universal_newlines=un) |
170 if sys.platform == 'win32': | 235 self.assertEquals(None, out) |
171 expected = expected.replace('\n', '\r\n') | 236 self.assertEquals(c('a\nbb\nccc\n'), err) |
172 self.assertEquals(expected, err) | 237 self.assertEquals(0, code) |
173 self.assertEquals(0, code) | 238 self._run_test(fn) |
174 | 239 |
175 def test_stderr_void(self): | 240 def test_stderr_void(self): |
176 (out, err), code = subprocess2.communicate( | 241 def fn(c, e, un): |
177 self.exe + ['--stdout', '--stderr'], | 242 (out, err), code = subprocess2.communicate( |
178 universal_newlines=True, | 243 e + ['--stdout', '--stderr'], |
179 stdout=subprocess2.PIPE, | 244 stdout=subprocess2.PIPE, |
180 stderr=subprocess2.VOID) | 245 stderr=subprocess2.VOID, |
181 self.assertEquals('A\nBB\nCCC\n', out) | 246 universal_newlines=un) |
182 self.assertEquals(None, err) | 247 self.assertEquals(c('A\nBB\nCCC\n'), out) |
183 self.assertEquals(0, code) | 248 self.assertEquals(None, err) |
249 self.assertEquals(0, code) | |
250 self._run_test(fn) | |
184 | 251 |
185 def test_check_output_throw_stdout(self): | 252 def test_check_output_throw_stdout(self): |
186 try: | 253 def fn(c, e, un): |
187 subprocess2.check_output( | 254 try: |
188 self.exe + ['--fail', '--stdout'], universal_newlines=True) | 255 subprocess2.check_output( |
189 self.fail() | 256 e + ['--fail', '--stdout'], universal_newlines=un) |
190 except subprocess2.CalledProcessError, e: | 257 self.fail() |
191 self.assertEquals('A\nBB\nCCC\n', e.stdout) | 258 except subprocess2.CalledProcessError, e: |
192 self.assertEquals(None, e.stderr) | 259 self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) |
193 self.assertEquals(64, e.returncode) | 260 self.assertEquals(None, e.stderr) |
261 self.assertEquals(64, e.returncode) | |
262 self._run_test(fn) | |
194 | 263 |
195 def test_check_output_throw_no_stderr(self): | 264 def test_check_output_throw_no_stderr(self): |
196 try: | 265 def fn(c, e, un): |
197 subprocess2.check_output( | 266 try: |
198 self.exe + ['--fail', '--stderr'], universal_newlines=True) | 267 subprocess2.check_output( |
199 self.fail() | 268 e + ['--fail', '--stderr'], universal_newlines=un) |
200 except subprocess2.CalledProcessError, e: | 269 self.fail() |
201 self.assertEquals('', e.stdout) | 270 except subprocess2.CalledProcessError, e: |
202 self.assertEquals(None, e.stderr) | 271 self.assertEquals(c(''), e.stdout) |
203 self.assertEquals(64, e.returncode) | 272 self.assertEquals(None, e.stderr) |
273 self.assertEquals(64, e.returncode) | |
274 self._run_test(fn) | |
204 | 275 |
205 def test_check_output_throw_stderr(self): | 276 def test_check_output_throw_stderr(self): |
206 try: | 277 def fn(c, e, un): |
207 subprocess2.check_output( | 278 try: |
208 self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE, | 279 subprocess2.check_output( |
209 universal_newlines=True) | 280 e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, |
210 self.fail() | 281 universal_newlines=un) |
211 except subprocess2.CalledProcessError, e: | 282 self.fail() |
212 self.assertEquals('', e.stdout) | 283 except subprocess2.CalledProcessError, e: |
213 self.assertEquals('a\nbb\nccc\n', e.stderr) | 284 self.assertEquals('', e.stdout) |
214 self.assertEquals(64, e.returncode) | 285 self.assertEquals(c('a\nbb\nccc\n'), e.stderr) |
286 self.assertEquals(64, e.returncode) | |
287 self._run_test(fn) | |
215 | 288 |
216 def test_check_output_throw_stderr_stdout(self): | 289 def test_check_output_throw_stderr_stdout(self): |
217 try: | 290 def fn(c, e, un): |
218 subprocess2.check_output( | 291 try: |
219 self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, | 292 subprocess2.check_output( |
220 universal_newlines=True) | 293 e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, |
221 self.fail() | 294 universal_newlines=un) |
222 except subprocess2.CalledProcessError, e: | 295 self.fail() |
223 self.assertEquals('a\nbb\nccc\n', e.stdout) | 296 except subprocess2.CalledProcessError, e: |
224 self.assertEquals(None, e.stderr) | 297 self.assertEquals(c('a\nbb\nccc\n'), e.stdout) |
225 self.assertEquals(64, e.returncode) | 298 self.assertEquals(None, e.stderr) |
299 self.assertEquals(64, e.returncode) | |
300 self._run_test(fn) | |
226 | 301 |
227 def test_check_call_throw(self): | 302 def test_check_call_throw(self): |
228 try: | 303 try: |
229 subprocess2.check_call(self.exe + ['--fail', '--stderr']) | 304 subprocess2.check_call(self.exe + ['--fail', '--stderr']) |
230 self.fail() | 305 self.fail() |
231 except subprocess2.CalledProcessError, e: | 306 except subprocess2.CalledProcessError, e: |
232 self.assertEquals(None, e.stdout) | 307 self.assertEquals(None, e.stdout) |
233 self.assertEquals(None, e.stderr) | 308 self.assertEquals(None, e.stderr) |
234 self.assertEquals(64, e.returncode) | 309 self.assertEquals(64, e.returncode) |
235 | 310 |
236 | 311 |
237 def child_main(args): | 312 def child_main(args): |
313 if sys.platform == 'win32': | |
314 # Annoying, make sure the output is not translated on Windows. | |
315 # pylint: disable=E1101,F0401 | |
316 import msvcrt | |
317 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) | |
318 msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) | |
319 | |
238 parser = optparse.OptionParser() | 320 parser = optparse.OptionParser() |
239 parser.add_option( | 321 parser.add_option( |
240 '--fail', | 322 '--fail', |
241 dest='return_value', | 323 dest='return_value', |
242 action='store_const', | 324 action='store_const', |
243 default=0, | 325 default=0, |
244 const=64) | 326 const=64) |
327 parser.add_option( | |
328 '--crlf', action='store_const', const='\r\n', dest='eol', default='\n') | |
329 parser.add_option( | |
330 '--cr', action='store_const', const='\r', dest='eol') | |
245 parser.add_option('--stdout', action='store_true') | 331 parser.add_option('--stdout', action='store_true') |
246 parser.add_option('--stderr', action='store_true') | 332 parser.add_option('--stderr', action='store_true') |
247 parser.add_option('--sleep', action='store_true') | 333 parser.add_option('--sleep_first', action='store_true') |
334 parser.add_option('--sleep_last', action='store_true') | |
335 parser.add_option('--large', action='store_true') | |
336 parser.add_option('--read', action='store_true') | |
248 options, args = parser.parse_args(args) | 337 options, args = parser.parse_args(args) |
249 if args: | 338 if args: |
250 parser.error('Internal error') | 339 parser.error('Internal error') |
340 if options.sleep_first: | |
341 time.sleep(10) | |
251 | 342 |
252 def do(string): | 343 def do(string): |
253 if options.stdout: | 344 if options.stdout: |
254 print >> sys.stdout, string.upper() | 345 sys.stdout.write(string.upper()) |
346 sys.stdout.write(options.eol) | |
255 if options.stderr: | 347 if options.stderr: |
256 print >> sys.stderr, string.lower() | 348 sys.stderr.write(string.lower()) |
349 sys.stderr.write(options.eol) | |
257 | 350 |
258 do('A') | 351 do('A') |
259 do('BB') | 352 do('BB') |
260 do('CCC') | 353 do('CCC') |
261 if options.sleep: | 354 if options.large: |
355 # Print 128kb. | |
356 string = '0123456789abcdef' * (8*1024) | |
357 sys.stdout.write(string) | |
358 if options.read: | |
359 try: | |
360 while sys.stdin.read(): | |
361 pass | |
362 except OSError: | |
363 pass | |
364 if options.sleep_last: | |
262 time.sleep(10) | 365 time.sleep(10) |
263 return options.return_value | 366 return options.return_value |
264 | 367 |
265 | 368 |
266 if __name__ == '__main__': | 369 if __name__ == '__main__': |
370 logging.basicConfig(level= | |
371 [logging.WARNING, logging.INFO, logging.DEBUG][ | |
372 min(2, sys.argv.count('-v'))]) | |
267 if len(sys.argv) > 1 and sys.argv[1] == '--child': | 373 if len(sys.argv) > 1 and sys.argv[1] == '--child': |
268 sys.exit(child_main(sys.argv[2:])) | 374 sys.exit(child_main(sys.argv[2:])) |
269 unittest.main() | 375 unittest.main() |
OLD | NEW |