| Index: third_party/pexpect/tests/test_run.py
 | 
| diff --git a/third_party/pexpect/tests/test_run.py b/third_party/pexpect/tests/test_run.py
 | 
| new file mode 100644
 | 
| index 0000000000000000000000000000000000000000..1b3c92ff6020a7c668082b75e35b506ca91b7fb7
 | 
| --- /dev/null
 | 
| +++ b/third_party/pexpect/tests/test_run.py
 | 
| @@ -0,0 +1,191 @@
 | 
| +#!/usr/bin/env python
 | 
| +# encoding: utf-8
 | 
| +'''
 | 
| +PEXPECT LICENSE
 | 
| +
 | 
| +    This license is approved by the OSI and FSF as GPL-compatible.
 | 
| +        http://opensource.org/licenses/isc-license.txt
 | 
| +
 | 
| +    Copyright (c) 2012, Noah Spurrier <noah@noah.org>
 | 
| +    PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
 | 
| +    PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
 | 
| +    COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
 | 
| +    THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 | 
| +    WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 | 
| +    MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 | 
| +    ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 | 
| +    WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 | 
| +    ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 | 
| +    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | 
| +
 | 
| +'''
 | 
| +import pexpect
 | 
| +import unittest
 | 
| +import subprocess
 | 
| +import tempfile
 | 
| +import sys
 | 
| +import os
 | 
| +from . import PexpectTestCase
 | 
| +
 | 
| +unicode_type = str if pexpect.PY3 else unicode
 | 
| +
 | 
| +
 | 
| +def timeout_callback(values):
 | 
| +    if values["event_count"] > 3:
 | 
| +        return 1
 | 
| +    return 0
 | 
| +
 | 
| +
 | 
| +def function_events_callback(values):
 | 
| +    try:
 | 
| +        previous_echoed = (values["child_result_list"][-1]
 | 
| +                           .decode().split("\n")[-2].strip())
 | 
| +        if previous_echoed.endswith("stage-1"):
 | 
| +            return "echo stage-2\n"
 | 
| +        elif previous_echoed.endswith("stage-2"):
 | 
| +            return "echo stage-3\n"
 | 
| +        elif previous_echoed.endswith("stage-3"):
 | 
| +            return "exit\n"
 | 
| +        else:
 | 
| +            raise Exception("Unexpected output {0}".format(previous_echoed))
 | 
| +    except IndexError:
 | 
| +        return "echo stage-1\n"
 | 
| +
 | 
| +
 | 
| +class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
 | 
| +    runfunc = staticmethod(pexpect.run)
 | 
| +    cr = b'\r'
 | 
| +    empty = b''
 | 
| +    prep_subprocess_out = staticmethod(lambda x: x)
 | 
| +
 | 
| +    def setUp(self):
 | 
| +        fd, self.rcfile = tempfile.mkstemp()
 | 
| +        os.write(fd, b'PS1=GO: \n')
 | 
| +        os.close(fd)
 | 
| +        super(RunFuncTestCase, self).setUp()
 | 
| +
 | 
| +    def tearDown(self):
 | 
| +        os.unlink(self.rcfile)
 | 
| +        super(RunFuncTestCase, self).tearDown()
 | 
| +
 | 
| +    def test_run_exit(self):
 | 
| +        (data, exitstatus) = self.runfunc('python exit1.py', withexitstatus=1)
 | 
| +        assert exitstatus == 1, "Exit status of 'python exit1.py' should be 1."
 | 
| +
 | 
| +    def test_run(self):
 | 
| +        the_old_way = subprocess.Popen(
 | 
| +            args=['uname', '-m', '-n'],
 | 
| +            stdout=subprocess.PIPE
 | 
| +        ).communicate()[0].rstrip()
 | 
| +
 | 
| +        (the_new_way, exitstatus) = self.runfunc(
 | 
| +            'uname -m -n', withexitstatus=1)
 | 
| +        the_new_way = the_new_way.replace(self.cr, self.empty).rstrip()
 | 
| +
 | 
| +        self.assertEqual(self.prep_subprocess_out(the_old_way), the_new_way)
 | 
| +        self.assertEqual(exitstatus, 0)
 | 
| +
 | 
| +    def test_run_callback(self):
 | 
| +        # TODO it seems like this test could block forever if run fails...
 | 
| +        events = {pexpect.TIMEOUT: timeout_callback}
 | 
| +        self.runfunc("cat", timeout=1, events=events)
 | 
| +
 | 
| +    def test_run_bad_exitstatus(self):
 | 
| +        (the_new_way, exitstatus) = self.runfunc(
 | 
| +            'ls -l /najoeufhdnzkxjd', withexitstatus=1)
 | 
| +        assert exitstatus != 0
 | 
| +
 | 
| +    def test_run_event_as_string(self):
 | 
| +        events = [
 | 
| +            # second match on 'abc', echo 'def'
 | 
| +            ('abc\r\n.*GO:', 'echo "def"\n'),
 | 
| +            # final match on 'def': exit
 | 
| +            ('def\r\n.*GO:', 'exit\n'),
 | 
| +            # first match on 'GO:' prompt, echo 'abc'
 | 
| +            ('GO:', 'echo "abc"\n')
 | 
| +        ]
 | 
| +
 | 
| +        (data, exitstatus) = pexpect.run(
 | 
| +            'bash --rcfile {0}'.format(self.rcfile),
 | 
| +            withexitstatus=True,
 | 
| +            events=events,
 | 
| +            timeout=10)
 | 
| +        assert exitstatus == 0
 | 
| +
 | 
| +    def test_run_event_as_function(self):
 | 
| +        events = [
 | 
| +            ('GO:', function_events_callback)
 | 
| +        ]
 | 
| +
 | 
| +        (data, exitstatus) = pexpect.run(
 | 
| +            'bash --rcfile {0}'.format(self.rcfile),
 | 
| +            withexitstatus=True,
 | 
| +            events=events,
 | 
| +            timeout=10)
 | 
| +        assert exitstatus == 0
 | 
| +
 | 
| +    def test_run_event_as_method(self):
 | 
| +        events = [
 | 
| +            ('GO:', self._method_events_callback)
 | 
| +        ]
 | 
| +
 | 
| +        (data, exitstatus) = pexpect.run(
 | 
| +            'bash --rcfile {0}'.format(self.rcfile),
 | 
| +            withexitstatus=True,
 | 
| +            events=events,
 | 
| +            timeout=10)
 | 
| +        assert exitstatus == 0
 | 
| +
 | 
| +    def test_run_event_typeerror(self):
 | 
| +        events = [('GO:', -1)]
 | 
| +        with self.assertRaises(TypeError):
 | 
| +            pexpect.run('bash --rcfile {0}'.format(self.rcfile),
 | 
| +                        withexitstatus=True,
 | 
| +                        events=events,
 | 
| +                        timeout=10)
 | 
| +
 | 
| +    def _method_events_callback(self, values):
 | 
| +        try:
 | 
| +            previous_echoed = (values["child_result_list"][-1].decode()
 | 
| +                               .split("\n")[-2].strip())
 | 
| +            if previous_echoed.endswith("foo1"):
 | 
| +                return "echo foo2\n"
 | 
| +            elif previous_echoed.endswith("foo2"):
 | 
| +                return "echo foo3\n"
 | 
| +            elif previous_echoed.endswith("foo3"):
 | 
| +                return "exit\n"
 | 
| +            else:
 | 
| +                raise Exception("Unexpected output {0!r}"
 | 
| +                                .format(previous_echoed))
 | 
| +        except IndexError:
 | 
| +            return "echo foo1\n"
 | 
| +
 | 
| +
 | 
| +class RunUnicodeFuncTestCase(RunFuncTestCase):
 | 
| +    runfunc = staticmethod(pexpect.runu)
 | 
| +    cr = b'\r'.decode('ascii')
 | 
| +    empty = b''.decode('ascii')
 | 
| +    prep_subprocess_out = staticmethod(lambda x: x.decode('utf-8', 'replace'))
 | 
| +
 | 
| +    def test_run_unicode(self):
 | 
| +        if pexpect.PY3:
 | 
| +            char = chr(254)   # รพ
 | 
| +            pattern = '<in >'
 | 
| +        else:
 | 
| +            char = unichr(254)  # analysis:ignore
 | 
| +            pattern = '<in >'.decode('ascii')
 | 
| +
 | 
| +        def callback(values):
 | 
| +            if values['event_count'] == 0:
 | 
| +                return char + '\n'
 | 
| +            else:
 | 
| +                return True  # Stop the child process
 | 
| +
 | 
| +        output = pexpect.runu(sys.executable + ' echo_w_prompt.py',
 | 
| +                              env={'PYTHONIOENCODING': 'utf-8'},
 | 
| +                              events={pattern: callback})
 | 
| +        assert isinstance(output, unicode_type), type(output)
 | 
| +        assert ('<out>' + char) in output, output
 | 
| +
 | 
| +if __name__ == '__main__':
 | 
| +    unittest.main()
 | 
| 
 |