| Index: tests/subprocess2_test.py
 | 
| diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py
 | 
| index cb390751e2c9b47e35c7f42e5ee57394f7371e81..9d30452c3118220135168a934e52d407c0305a3f 100755
 | 
| --- a/tests/subprocess2_test.py
 | 
| +++ b/tests/subprocess2_test.py
 | 
| @@ -5,18 +5,12 @@
 | 
|  
 | 
|  """Unit tests for subprocess2.py."""
 | 
|  
 | 
| -import logging
 | 
|  import optparse
 | 
|  import os
 | 
|  import sys
 | 
|  import time
 | 
|  import unittest
 | 
|  
 | 
| -try:
 | 
| -  import fcntl
 | 
| -except ImportError:
 | 
| -  fcntl = None
 | 
| -
 | 
|  ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 | 
|  sys.path.insert(0, ROOT_DIR)
 | 
|  
 | 
| @@ -25,25 +19,7 @@ import subprocess2
 | 
|  # Method could be a function
 | 
|  # pylint: disable=R0201
 | 
|  
 | 
| -
 | 
| -def convert_to_crlf(string):
 | 
| -  """Unconditionally convert LF to CRLF."""
 | 
| -  return string.replace('\n', '\r\n')
 | 
| -
 | 
| -
 | 
| -def convert_to_cr(string):
 | 
| -  """Unconditionally convert LF to CR."""
 | 
| -  return string.replace('\n', '\r')
 | 
| -
 | 
| -
 | 
| -def convert_win(string):
 | 
| -  """Converts string to CRLF on Windows only."""
 | 
| -  if sys.platform == 'win32':
 | 
| -    return string.replace('\n', '\r\n')
 | 
| -  return string
 | 
| -
 | 
| -
 | 
| -class DefaultsTest(unittest.TestCase):
 | 
| +class Subprocess2Test(unittest.TestCase):
 | 
|    # Can be mocked in a test.
 | 
|    TO_SAVE = {
 | 
|      subprocess2: [
 | 
| @@ -52,6 +28,8 @@ class DefaultsTest(unittest.TestCase):
 | 
|    }
 | 
|  
 | 
|    def setUp(self):
 | 
| +    self.exe_path = __file__
 | 
| +    self.exe = [sys.executable, self.exe_path, '--child']
 | 
|      self.saved = {}
 | 
|      for module, names in self.TO_SAVE.iteritems():
 | 
|        self.saved[module] = dict(
 | 
| @@ -168,67 +146,14 @@ class DefaultsTest(unittest.TestCase):
 | 
|      }
 | 
|      self.assertEquals(expected, results)
 | 
|  
 | 
| -  def test_timeout_shell_throws(self):
 | 
| -    # Never called.
 | 
| -    _ = self._fake_Popen()
 | 
| -    try:
 | 
| -      subprocess2.communicate(
 | 
| -          sys.executable,
 | 
| -          timeout=0.01,
 | 
| -          stdout=subprocess2.PIPE,
 | 
| -          shell=True)
 | 
| -      self.fail()
 | 
| -    except TypeError:
 | 
| -      pass
 | 
| -
 | 
| -
 | 
| -class S2Test(unittest.TestCase):
 | 
| -  def setUp(self):
 | 
| -    super(S2Test, self).setUp()
 | 
| -    self.exe_path = __file__
 | 
| -    self.exe = [sys.executable, self.exe_path, '--child']
 | 
| -    self.states = {}
 | 
| -    if fcntl:
 | 
| -      for v in (sys.stdin, sys.stdout, sys.stderr):
 | 
| -        fileno = v.fileno()
 | 
| -        self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL)
 | 
| -
 | 
| -  def tearDown(self):
 | 
| -    for fileno, fl in self.states.iteritems():
 | 
| -      self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL))
 | 
| -    super(S2Test, self).tearDown()
 | 
| -
 | 
| -  def _run_test(self, function):
 | 
| -    """Runs tests in 6 combinations:
 | 
| -    - LF output with universal_newlines=False
 | 
| -    - CR output with universal_newlines=False
 | 
| -    - CRLF output with universal_newlines=False
 | 
| -    - LF output with universal_newlines=True
 | 
| -    - CR output with universal_newlines=True
 | 
| -    - CRLF output with universal_newlines=True
 | 
| -
 | 
| -    First |function| argument is the convertion for the origianl expected LF
 | 
| -    string to the right EOL.
 | 
| -    Second |function| argument is the executable and initial flag to run, to
 | 
| -    control what EOL is used by the child process.
 | 
| -    Third |function| argument is universal_newlines value.
 | 
| -    """
 | 
| -    noop = lambda x: x
 | 
| -    function(noop, self.exe, False)
 | 
| -    function(convert_to_cr, self.exe + ['--cr'], False)
 | 
| -    function(convert_to_crlf, self.exe + ['--crlf'], False)
 | 
| -    function(noop, self.exe, True)
 | 
| -    function(noop, self.exe + ['--cr'], True)
 | 
| -    function(noop, self.exe + ['--crlf'], True)
 | 
| -
 | 
|    def test_timeout(self):
 | 
| +    # It'd be better to not discard stdout.
 | 
|      out, returncode = subprocess2.communicate(
 | 
| -        self.exe + ['--sleep_first', '--stdout'],
 | 
| +        self.exe + ['--sleep', '--stdout'],
 | 
|          timeout=0.01,
 | 
| -        stdout=subprocess2.PIPE,
 | 
| -        shell=False)
 | 
| +        stdout=subprocess2.PIPE)
 | 
|      self.assertEquals(subprocess2.TIMED_OUT, returncode)
 | 
| -    self.assertEquals(('', None), out)
 | 
| +    self.assertEquals(['', None], out)
 | 
|  
 | 
|    def test_check_output_no_stdout(self):
 | 
|      try:
 | 
| @@ -238,78 +163,68 @@ class S2Test(unittest.TestCase):
 | 
|        pass
 | 
|  
 | 
|    def test_stdout_void(self):
 | 
| -    def fn(c, e, un):
 | 
| -      (out, err), code = subprocess2.communicate(
 | 
| -          e + ['--stdout', '--stderr'],
 | 
| -          stdout=subprocess2.VOID,
 | 
| -          stderr=subprocess2.PIPE,
 | 
| -          universal_newlines=un)
 | 
| -      self.assertEquals(None, out)
 | 
| -      self.assertEquals(c('a\nbb\nccc\n'), err)
 | 
| -      self.assertEquals(0, code)
 | 
| -    self._run_test(fn)
 | 
| +    (out, err), code = subprocess2.communicate(
 | 
| +         self.exe + ['--stdout', '--stderr'],
 | 
| +         stdout=subprocess2.VOID,
 | 
| +         stderr=subprocess2.PIPE)
 | 
| +    self.assertEquals(None, out)
 | 
| +    expected = 'a\nbb\nccc\n'
 | 
| +    if sys.platform == 'win32':
 | 
| +      expected = expected.replace('\n', '\r\n')
 | 
| +    self.assertEquals(expected, err)
 | 
| +    self.assertEquals(0, code)
 | 
|  
 | 
|    def test_stderr_void(self):
 | 
| -    def fn(c, e, un):
 | 
| -      (out, err), code = subprocess2.communicate(
 | 
| -          e + ['--stdout', '--stderr'],
 | 
| -          stdout=subprocess2.PIPE,
 | 
| -          stderr=subprocess2.VOID,
 | 
| -          universal_newlines=un)
 | 
| -      self.assertEquals(c('A\nBB\nCCC\n'), out)
 | 
| -      self.assertEquals(None, err)
 | 
| -      self.assertEquals(0, code)
 | 
| -    self._run_test(fn)
 | 
| +    (out, err), code = subprocess2.communicate(
 | 
| +         self.exe + ['--stdout', '--stderr'],
 | 
| +         universal_newlines=True,
 | 
| +         stdout=subprocess2.PIPE,
 | 
| +         stderr=subprocess2.VOID)
 | 
| +    self.assertEquals('A\nBB\nCCC\n', out)
 | 
| +    self.assertEquals(None, err)
 | 
| +    self.assertEquals(0, code)
 | 
|  
 | 
|    def test_check_output_throw_stdout(self):
 | 
| -    def fn(c, e, un):
 | 
| -      try:
 | 
| -        subprocess2.check_output(
 | 
| -            e + ['--fail', '--stdout'], universal_newlines=un)
 | 
| -        self.fail()
 | 
| -      except subprocess2.CalledProcessError, e:
 | 
| -        self.assertEquals(c('A\nBB\nCCC\n'), e.stdout)
 | 
| -        self.assertEquals(None, e.stderr)
 | 
| -        self.assertEquals(64, e.returncode)
 | 
| -    self._run_test(fn)
 | 
| +    try:
 | 
| +      subprocess2.check_output(
 | 
| +          self.exe + ['--fail', '--stdout'], universal_newlines=True)
 | 
| +      self.fail()
 | 
| +    except subprocess2.CalledProcessError, e:
 | 
| +      self.assertEquals('A\nBB\nCCC\n', e.stdout)
 | 
| +      self.assertEquals(None, e.stderr)
 | 
| +      self.assertEquals(64, e.returncode)
 | 
|  
 | 
|    def test_check_output_throw_no_stderr(self):
 | 
| -    def fn(c, e, un):
 | 
| -      try:
 | 
| -        subprocess2.check_output(
 | 
| -            e + ['--fail', '--stderr'], universal_newlines=un)
 | 
| -        self.fail()
 | 
| -      except subprocess2.CalledProcessError, e:
 | 
| -        self.assertEquals(c(''), e.stdout)
 | 
| -        self.assertEquals(None, e.stderr)
 | 
| -        self.assertEquals(64, e.returncode)
 | 
| -    self._run_test(fn)
 | 
| +    try:
 | 
| +      subprocess2.check_output(
 | 
| +          self.exe + ['--fail', '--stderr'], universal_newlines=True)
 | 
| +      self.fail()
 | 
| +    except subprocess2.CalledProcessError, e:
 | 
| +      self.assertEquals('', e.stdout)
 | 
| +      self.assertEquals(None, e.stderr)
 | 
| +      self.assertEquals(64, e.returncode)
 | 
|  
 | 
|    def test_check_output_throw_stderr(self):
 | 
| -    def fn(c, e, un):
 | 
| -      try:
 | 
| -        subprocess2.check_output(
 | 
| -            e + ['--fail', '--stderr'], stderr=subprocess2.PIPE,
 | 
| -            universal_newlines=un)
 | 
| -        self.fail()
 | 
| -      except subprocess2.CalledProcessError, e:
 | 
| -        self.assertEquals('', e.stdout)
 | 
| -        self.assertEquals(c('a\nbb\nccc\n'), e.stderr)
 | 
| -        self.assertEquals(64, e.returncode)
 | 
| -    self._run_test(fn)
 | 
| +    try:
 | 
| +      subprocess2.check_output(
 | 
| +          self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE,
 | 
| +          universal_newlines=True)
 | 
| +      self.fail()
 | 
| +    except subprocess2.CalledProcessError, e:
 | 
| +      self.assertEquals('', e.stdout)
 | 
| +      self.assertEquals('a\nbb\nccc\n', e.stderr)
 | 
| +      self.assertEquals(64, e.returncode)
 | 
|  
 | 
|    def test_check_output_throw_stderr_stdout(self):
 | 
| -    def fn(c, e, un):
 | 
| -      try:
 | 
| -        subprocess2.check_output(
 | 
| -            e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT,
 | 
| -            universal_newlines=un)
 | 
| -        self.fail()
 | 
| -      except subprocess2.CalledProcessError, e:
 | 
| -        self.assertEquals(c('a\nbb\nccc\n'), e.stdout)
 | 
| -        self.assertEquals(None, e.stderr)
 | 
| -        self.assertEquals(64, e.returncode)
 | 
| -    self._run_test(fn)
 | 
| +    try:
 | 
| +      subprocess2.check_output(
 | 
| +          self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT,
 | 
| +          universal_newlines=True)
 | 
| +      self.fail()
 | 
| +    except subprocess2.CalledProcessError, e:
 | 
| +      self.assertEquals('a\nbb\nccc\n', e.stdout)
 | 
| +      self.assertEquals(None, e.stderr)
 | 
| +      self.assertEquals(64, e.returncode)
 | 
|  
 | 
|    def test_check_call_throw(self):
 | 
|      try:
 | 
| @@ -320,87 +235,8 @@ class S2Test(unittest.TestCase):
 | 
|        self.assertEquals(None, e.stderr)
 | 
|        self.assertEquals(64, e.returncode)
 | 
|  
 | 
| -  def test_check_output_tee_stderr(self):
 | 
| -    def fn(c, e, un):
 | 
| -      stderr = []
 | 
| -      out, returncode = subprocess2.communicate(
 | 
| -          e + ['--stderr'], stderr=stderr.append,
 | 
| -          universal_newlines=un)
 | 
| -      self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
 | 
| -      self.assertEquals((None, None), out)
 | 
| -      self.assertEquals(0, returncode)
 | 
| -    self._run_test(fn)
 | 
| -
 | 
| -  def test_check_output_tee_stdout_stderr(self):
 | 
| -    def fn(c, e, un):
 | 
| -      stdout = []
 | 
| -      stderr = []
 | 
| -      out, returncode = subprocess2.communicate(
 | 
| -          e + ['--stdout', '--stderr'],
 | 
| -          stdout=stdout.append,
 | 
| -          stderr=stderr.append,
 | 
| -          universal_newlines=un)
 | 
| -      self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
 | 
| -      self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
 | 
| -      self.assertEquals((None, None), out)
 | 
| -      self.assertEquals(0, returncode)
 | 
| -    self._run_test(fn)
 | 
| -
 | 
| -  def test_check_output_tee_stdin(self):
 | 
| -    def fn(c, e, un):
 | 
| -      stdout = []
 | 
| -      stdin = '0123456789'
 | 
| -      out, returncode = subprocess2.communicate(
 | 
| -          e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append,
 | 
| -          universal_newlines=un)
 | 
| -      self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
 | 
| -      self.assertEquals((None, None), out)
 | 
| -      self.assertEquals(0, returncode)
 | 
| -    self._run_test(fn)
 | 
| -
 | 
| -  def test_check_output_tee_throw(self):
 | 
| -    def fn(c, e, un):
 | 
| -      stderr = []
 | 
| -      try:
 | 
| -        subprocess2.check_output(
 | 
| -            e + ['--stderr', '--fail'], stderr=stderr.append,
 | 
| -            universal_newlines=un)
 | 
| -        self.fail()
 | 
| -      except subprocess2.CalledProcessError, e:
 | 
| -        self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
 | 
| -        self.assertEquals('', e.stdout)
 | 
| -        self.assertEquals(None, e.stderr)
 | 
| -        self.assertEquals(64, e.returncode)
 | 
| -    self._run_test(fn)
 | 
| -
 | 
| -  def test_check_output_tee_large(self):
 | 
| -    stdout = []
 | 
| -    # Read 128kb. On my workstation it takes >2s. Welcome to 2011.
 | 
| -    out, returncode = subprocess2.communicate(
 | 
| -        self.exe + ['--large'], stdout=stdout.append)
 | 
| -    self.assertEquals(128*1024, len(''.join(stdout)))
 | 
| -    self.assertEquals((None, None), out)
 | 
| -    self.assertEquals(0, returncode)
 | 
| -
 | 
| -  def test_check_output_tee_large_stdin(self):
 | 
| -    stdout = []
 | 
| -    # Write 128kb.
 | 
| -    stdin = '0123456789abcdef' * (8*1024)
 | 
| -    out, returncode = subprocess2.communicate(
 | 
| -        self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append)
 | 
| -    self.assertEquals(128*1024, len(''.join(stdout)))
 | 
| -    self.assertEquals((None, None), out)
 | 
| -    self.assertEquals(0, returncode)
 | 
| -
 | 
|  
 | 
|  def child_main(args):
 | 
| -  if sys.platform == 'win32':
 | 
| -    # Annoying, make sure the output is not translated on Windows.
 | 
| -    # pylint: disable=E1101,F0401
 | 
| -    import msvcrt
 | 
| -    msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
 | 
| -    msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
 | 
| -
 | 
|    parser = optparse.OptionParser()
 | 
|    parser.add_option(
 | 
|        '--fail',
 | 
| @@ -408,52 +244,28 @@ def child_main(args):
 | 
|        action='store_const',
 | 
|        default=0,
 | 
|        const=64)
 | 
| -  parser.add_option(
 | 
| -      '--crlf', action='store_const', const='\r\n', dest='eol', default='\n')
 | 
| -  parser.add_option(
 | 
| -      '--cr', action='store_const', const='\r', dest='eol')
 | 
|    parser.add_option('--stdout', action='store_true')
 | 
|    parser.add_option('--stderr', action='store_true')
 | 
| -  parser.add_option('--sleep_first', action='store_true')
 | 
| -  parser.add_option('--sleep_last', action='store_true')
 | 
| -  parser.add_option('--large', action='store_true')
 | 
| -  parser.add_option('--read', action='store_true')
 | 
| +  parser.add_option('--sleep', action='store_true')
 | 
|    options, args = parser.parse_args(args)
 | 
|    if args:
 | 
|      parser.error('Internal error')
 | 
| -  if options.sleep_first:
 | 
| -    time.sleep(10)
 | 
|  
 | 
|    def do(string):
 | 
|      if options.stdout:
 | 
| -      sys.stdout.write(string.upper())
 | 
| -      sys.stdout.write(options.eol)
 | 
| +      print >> sys.stdout, string.upper()
 | 
|      if options.stderr:
 | 
| -      sys.stderr.write(string.lower())
 | 
| -      sys.stderr.write(options.eol)
 | 
| +      print >> sys.stderr, string.lower()
 | 
|  
 | 
|    do('A')
 | 
|    do('BB')
 | 
|    do('CCC')
 | 
| -  if options.large:
 | 
| -    # Print 128kb.
 | 
| -    string = '0123456789abcdef' * (8*1024)
 | 
| -    sys.stdout.write(string)
 | 
| -  if options.read:
 | 
| -    try:
 | 
| -      while sys.stdin.read():
 | 
| -        pass
 | 
| -    except OSError:
 | 
| -      pass
 | 
| -  if options.sleep_last:
 | 
| +  if options.sleep:
 | 
|      time.sleep(10)
 | 
|    return options.return_value
 | 
|  
 | 
|  
 | 
|  if __name__ == '__main__':
 | 
| -  logging.basicConfig(level=
 | 
| -      [logging.WARNING, logging.INFO, logging.DEBUG][
 | 
| -        min(2, sys.argv.count('-v'))])
 | 
|    if len(sys.argv) > 1 and sys.argv[1] == '--child':
 | 
|      sys.exit(child_main(sys.argv[2:]))
 | 
|    unittest.main()
 | 
| 
 |