Index: third_party/pexpect/tests/test_misc.py |
diff --git a/third_party/pexpect/tests/test_misc.py b/third_party/pexpect/tests/test_misc.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..16bdfc2636afc8e1d513358984219a42767a5960 |
--- /dev/null |
+++ b/third_party/pexpect/tests/test_misc.py |
@@ -0,0 +1,353 @@ |
+#!/usr/bin/env python |
+''' |
+PEXPECT LICENSE |
+ |
+ This license is approved by the OSI and FSF as GPL-compatible. |
+ http://opensource.org/licenses/isc-license.txt |
+ |
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org> |
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY |
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE |
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. |
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES |
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF |
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR |
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES |
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN |
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF |
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
+ |
+''' |
+import unittest |
+import sys |
+import re |
+import signal |
+import time |
+import tempfile |
+import os |
+ |
+import pexpect |
+from . import PexpectTestCase |
+ |
+# the program cat(1) may display ^D\x08\x08 when \x04 (EOF, Ctrl-D) is sent |
+_CAT_EOF = b'^D\x08\x08' |
+ |
+ |
+if (sys.version_info[0] >= 3): |
+ def _u(s): |
+ return s.decode('utf-8') |
+else: |
+ def _u(s): |
+ return s |
+ |
+ |
+class TestCaseMisc(PexpectTestCase.PexpectTestCase): |
+ |
+ def test_isatty(self): |
+ " Test isatty() is True after spawning process on most platforms. " |
+ child = pexpect.spawn('cat') |
+ if not child.isatty() and sys.platform.lower().startswith('sunos'): |
+ if hasattr(unittest, 'SkipTest'): |
+ raise unittest.SkipTest("Not supported on this platform.") |
+ return 'skip' |
+ assert child.isatty() |
+ |
+ def test_read(self): |
+ " Test spawn.read by calls of various size. " |
+ child = pexpect.spawn('cat') |
+ child.sendline("abc") |
+ child.sendeof() |
+ self.assertEqual(child.read(0), b'') |
+ self.assertEqual(child.read(1), b'a') |
+ self.assertEqual(child.read(1), b'b') |
+ self.assertEqual(child.read(1), b'c') |
+ self.assertEqual(child.read(2), b'\r\n') |
+ remaining = child.read().replace(_CAT_EOF, b'') |
+ self.assertEqual(remaining, b'abc\r\n') |
+ |
+ def test_readline_bin_echo(self): |
+ " Test spawn('echo'). " |
+ # given, |
+ child = pexpect.spawn('echo', ['alpha', 'beta']) |
+ |
+ # exercise, |
+ assert child.readline() == b'alpha beta' + child.crlf |
+ |
+ def test_readline(self): |
+ " Test spawn.readline(). " |
+ # when argument 0 is sent, nothing is returned. |
+ # Otherwise the argument value is meaningless. |
+ child = pexpect.spawn('cat', echo=False) |
+ child.sendline("alpha") |
+ child.sendline("beta") |
+ child.sendline("gamma") |
+ child.sendline("delta") |
+ child.sendeof() |
+ assert child.readline(0) == b'' |
+ assert child.readline().rstrip() == b'alpha' |
+ assert child.readline(1).rstrip() == b'beta' |
+ assert child.readline(2).rstrip() == b'gamma' |
+ assert child.readline().rstrip() == b'delta' |
+ child.expect(pexpect.EOF) |
+ assert not child.isalive() |
+ assert child.exitstatus == 0 |
+ |
+ def test_iter(self): |
+ " iterating over lines of spawn.__iter__(). " |
+ child = pexpect.spawn('cat', echo=False) |
+ child.sendline("abc") |
+ child.sendline("123") |
+ child.sendeof() |
+ # Don't use ''.join() because we want to test __iter__(). |
+ page = b'' |
+ for line in child: |
+ page += line |
+ page = page.replace(_CAT_EOF, b'') |
+ assert page == b'abc\r\n123\r\n' |
+ |
+ def test_readlines(self): |
+ " reading all lines of spawn.readlines(). " |
+ child = pexpect.spawn('cat', echo=False) |
+ child.sendline("abc") |
+ child.sendline("123") |
+ child.sendeof() |
+ page = b''.join(child.readlines()).replace(_CAT_EOF, b'') |
+ assert page == b'abc\r\n123\r\n' |
+ child.expect(pexpect.EOF) |
+ assert not child.isalive() |
+ assert child.exitstatus == 0 |
+ |
+ def test_write(self): |
+ " write a character and return it in return. " |
+ child = pexpect.spawn('cat', echo=False) |
+ child.write('a') |
+ child.write('\r') |
+ self.assertEqual(child.readline(), b'a\r\n') |
+ |
+ def test_writelines(self): |
+ " spawn.writelines() " |
+ child = pexpect.spawn('cat') |
+ # notice that much like file.writelines, we do not delimit by newline |
+ # -- it is equivalent to calling write(''.join([args,])) |
+ child.writelines(['abc', '123', 'xyz', '\r']) |
+ child.sendeof() |
+ line = child.readline() |
+ assert line == b'abc123xyz\r\n' |
+ |
+ def test_eof(self): |
+ " call to expect() after EOF is received raises pexpect.EOF " |
+ child = pexpect.spawn('cat') |
+ child.sendeof() |
+ with self.assertRaises(pexpect.EOF): |
+ child.expect('the unexpected') |
+ |
+ def test_with(self): |
+ "spawn can be used as a context manager" |
+ with pexpect.spawn(sys.executable + ' echo_w_prompt.py') as p: |
+ p.expect('<in >') |
+ p.sendline(b'alpha') |
+ p.expect(b'<out>alpha') |
+ assert p.isalive() |
+ |
+ assert not p.isalive() |
+ |
+ def test_terminate(self): |
+ " test force terminate always succeeds (SIGKILL). " |
+ child = pexpect.spawn('cat') |
+ child.terminate(force=1) |
+ assert child.terminated |
+ |
+ def test_sighup(self): |
+ " validate argument `ignore_sighup=True` and `ignore_sighup=False`. " |
+ getch = sys.executable + ' getch.py' |
+ child = pexpect.spawn(getch, ignore_sighup=True) |
+ child.expect('READY') |
+ child.kill(signal.SIGHUP) |
+ for _ in range(10): |
+ if not child.isalive(): |
+ self.fail('Child process should not have exited.') |
+ time.sleep(0.1) |
+ |
+ child = pexpect.spawn(getch, ignore_sighup=False) |
+ child.expect('READY') |
+ child.kill(signal.SIGHUP) |
+ for _ in range(10): |
+ if not child.isalive(): |
+ break |
+ time.sleep(0.1) |
+ else: |
+ self.fail('Child process should have exited.') |
+ |
+ def test_bad_child_pid(self): |
+ " assert bad condition error in isalive(). " |
+ expect_errmsg = re.escape("isalive() encountered condition where ") |
+ child = pexpect.spawn('cat') |
+ child.terminate(force=1) |
+ # Force an invalid state to test isalive |
+ child.ptyproc.terminated = 0 |
+ try: |
+ with self.assertRaisesRegexp(pexpect.ExceptionPexpect, |
+ ".*" + expect_errmsg): |
+ child.isalive() |
+ finally: |
+ # Force valid state for child for __del__ |
+ child.terminated = 1 |
+ |
+ def test_bad_arguments_suggest_fdpsawn(self): |
+ " assert custom exception for spawn(int). " |
+ expect_errmsg = "maybe you want to use fdpexpect.fdspawn" |
+ with self.assertRaisesRegexp(pexpect.ExceptionPexpect, |
+ ".*" + expect_errmsg): |
+ pexpect.spawn(1) |
+ |
+ def test_bad_arguments_second_arg_is_list(self): |
+ " Second argument to spawn, if used, must be only a list." |
+ with self.assertRaises(TypeError): |
+ pexpect.spawn('ls', '-la') |
+ |
+ with self.assertRaises(TypeError): |
+ # not even a tuple, |
+ pexpect.spawn('ls', ('-la',)) |
+ |
+ def test_read_after_close_raises_value_error(self): |
+ " Calling read_nonblocking after close raises ValueError. " |
+ # as read_nonblocking underlies all other calls to read, |
+ # ValueError should be thrown for all forms of read. |
+ with self.assertRaises(ValueError): |
+ p = pexpect.spawn('cat') |
+ p.close() |
+ p.read_nonblocking() |
+ |
+ with self.assertRaises(ValueError): |
+ p = pexpect.spawn('cat') |
+ p.close() |
+ p.read() |
+ |
+ with self.assertRaises(ValueError): |
+ p = pexpect.spawn('cat') |
+ p.close() |
+ p.readline() |
+ |
+ with self.assertRaises(ValueError): |
+ p = pexpect.spawn('cat') |
+ p.close() |
+ p.readlines() |
+ |
+ def test_isalive(self): |
+ " check isalive() before and after EOF. (True, False) " |
+ child = pexpect.spawn('cat') |
+ assert child.isalive() is True |
+ child.sendeof() |
+ child.expect(pexpect.EOF) |
+ assert child.isalive() is False |
+ |
+ def test_bad_type_in_expect(self): |
+ " expect() does not accept dictionary arguments. " |
+ child = pexpect.spawn('cat') |
+ with self.assertRaises(TypeError): |
+ child.expect({}) |
+ |
+ def test_env(self): |
+ " check keyword argument `env=' of pexpect.run() " |
+ default_env_output = pexpect.run('env') |
+ custom_env_output = pexpect.run('env', env={'_key': '_value'}) |
+ assert custom_env_output != default_env_output |
+ assert b'_key=_value' in custom_env_output |
+ |
+ def test_cwd(self): |
+ " check keyword argument `cwd=' of pexpect.run() " |
+ tmp_dir = os.path.realpath(tempfile.gettempdir()) |
+ default = pexpect.run('pwd') |
+ pwd_tmp = pexpect.run('pwd', cwd=tmp_dir).rstrip() |
+ assert default != pwd_tmp |
+ assert tmp_dir == _u(pwd_tmp) |
+ |
+ def _test_searcher_as(self, searcher, plus=None): |
+ # given, |
+ given_words = ['alpha', 'beta', 'gamma', 'delta', ] |
+ given_search = given_words |
+ if searcher == pexpect.searcher_re: |
+ given_search = [re.compile(word) for word in given_words] |
+ if plus is not None: |
+ given_search = given_search + [plus] |
+ search_string = searcher(given_search) |
+ basic_fmt = '\n {0}: {1}' |
+ fmt = basic_fmt |
+ if searcher is pexpect.searcher_re: |
+ fmt = '\n {0}: re.compile({1})' |
+ expected_output = '{0}:'.format(searcher.__name__) |
+ idx = 0 |
+ for word in given_words: |
+ expected_output += fmt.format(idx, '"{0}"'.format(word)) |
+ idx += 1 |
+ if plus is not None: |
+ if plus == pexpect.EOF: |
+ expected_output += basic_fmt.format(idx, 'EOF') |
+ elif plus == pexpect.TIMEOUT: |
+ expected_output += basic_fmt.format(idx, 'TIMEOUT') |
+ |
+ # exercise, |
+ assert search_string.__str__() == expected_output |
+ |
+ def test_searcher_as_string(self): |
+ " check searcher_string(..).__str__() " |
+ self._test_searcher_as(pexpect.searcher_string) |
+ |
+ def test_searcher_as_string_with_EOF(self): |
+ " check searcher_string(..).__str__() that includes EOF " |
+ self._test_searcher_as(pexpect.searcher_string, plus=pexpect.EOF) |
+ |
+ def test_searcher_as_string_with_TIMEOUT(self): |
+ " check searcher_string(..).__str__() that includes TIMEOUT " |
+ self._test_searcher_as(pexpect.searcher_string, plus=pexpect.TIMEOUT) |
+ |
+ def test_searcher_re_as_string(self): |
+ " check searcher_re(..).__str__() " |
+ self._test_searcher_as(pexpect.searcher_re) |
+ |
+ def test_searcher_re_as_string_with_EOF(self): |
+ " check searcher_re(..).__str__() that includes EOF " |
+ self._test_searcher_as(pexpect.searcher_re, plus=pexpect.EOF) |
+ |
+ def test_searcher_re_as_string_with_TIMEOUT(self): |
+ " check searcher_re(..).__str__() that includes TIMEOUT " |
+ self._test_searcher_as(pexpect.searcher_re, plus=pexpect.TIMEOUT) |
+ |
+ def test_nonnative_pty_fork(self): |
+ " test forced self.__fork_pty() and __pty_make_controlling_tty " |
+ # given, |
+ class spawn_ourptyfork(pexpect.spawn): |
+ def _spawn(self, command, args=[], preexec_fn=None, |
+ dimensions=None): |
+ self.use_native_pty_fork = False |
+ pexpect.spawn._spawn(self, command, args, preexec_fn, |
+ dimensions) |
+ |
+ # exercise, |
+ p = spawn_ourptyfork('cat', echo=False) |
+ # verify, |
+ p.sendline('abc') |
+ p.expect('abc') |
+ p.sendeof() |
+ p.expect(pexpect.EOF) |
+ assert not p.isalive() |
+ |
+ def test_exception_tb(self): |
+ " test get_trace() filters away pexpect/__init__.py calls. " |
+ p = pexpect.spawn('sleep 1') |
+ try: |
+ p.expect('BLAH') |
+ except pexpect.ExceptionPexpect as e: |
+ # get_trace should filter out frames in pexpect's own code |
+ tb = e.get_trace() |
+ # exercise, |
+ assert 'raise ' not in tb |
+ assert 'pexpect/__init__.py' not in tb |
+ else: |
+ assert False, "Should have raised an exception." |
+ |
+if __name__ == '__main__': |
+ unittest.main() |
+ |
+suite = unittest.makeSuite(TestCaseMisc,'test') |
+ |