Index: tests/gclient_utils_test.py |
diff --git a/tests/gclient_utils_test.py b/tests/gclient_utils_test.py |
index 64a98362559761d04c6b2c5e1881be817cdac581..828bea4a191a3eb6865a6531e9626ee43de25dc0 100755 |
--- a/tests/gclient_utils_test.py |
+++ b/tests/gclient_utils_test.py |
@@ -7,6 +7,7 @@ import os |
import StringIO |
import sys |
import threading |
+import time |
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
@@ -15,6 +16,7 @@ from testing_support import trial_dir |
import gclient_utils |
import subprocess2 |
+import subprocess42 |
class GclientUtilBase(SuperMoxTestBase): |
@@ -23,6 +25,7 @@ class GclientUtilBase(SuperMoxTestBase): |
gclient_utils.sys.stdout.flush = lambda: None |
self.mox.StubOutWithMock(subprocess2, 'Popen') |
self.mox.StubOutWithMock(subprocess2, 'communicate') |
+ self.mox.StubOutWithMock(subprocess42, 'Popen') |
class CheckCallAndFilterTestCase(GclientUtilBase): |
@@ -139,6 +142,93 @@ class CheckCallAndFilterTestCase(GclientUtilBase): |
self.assertEquals(order, ['unblock']) |
+class CheckCallAndFilter42TestCase(GclientUtilBase): |
+ class ProcessIdMock(object): |
+ def __init__(self, test_string, block_for=None): |
+ self._test_string = test_string |
+ self._block_for = block_for |
+ self._killed = threading.Event() |
+ self._started = time.time() |
+ self.pid = 9284 |
+ # pylint: disable=R0201 |
+ def wait(self): |
+ return 0 |
+ def kill(self): |
+ self._killed.set() |
+ def duration(self): |
+ return time.time() - self._started |
+ def yield_any(self, maxsize, timeout): |
+ for c in self._test_string: |
+ yield 'stdout', c |
+ if self._block_for: |
+ while not self._killed.is_set(): |
+ left = self._started + self._block_for - time.time() |
+ if left < 0: |
+ break |
+ # ignore timeout argument for faster test. |
+ self._killed.wait(min(left, 0.01)) |
+ yield None, None |
+ |
+ def _inner(self, args, test_string, block_for=None, kill_timeout=None): |
+ cwd = 'bleh' |
+ gclient_utils.sys.stdout.write( |
+ '\n________ running \'boo foo bar\' in \'bleh\'\n') |
+ for i in test_string: |
+ gclient_utils.sys.stdout.write(i) |
+ # pylint: disable=E1101 |
+ subprocess42.Popen( |
+ args, |
+ cwd=cwd, |
+ stdout=subprocess42.PIPE, |
+ stderr=subprocess42.STDOUT, |
+ bufsize=0).AndReturn(self.ProcessIdMock(test_string, block_for)) |
+ |
+ os.getcwd() |
+ self.mox.ReplayAll() |
+ compiled_pattern = gclient_utils.re.compile(r'a(.*)b') |
+ line_list = [] |
+ capture_list = [] |
+ def FilterLines(line): |
+ line_list.append(line) |
+ assert isinstance(line, str), type(line) |
+ match = compiled_pattern.search(line) |
+ if match: |
+ capture_list.append(match.group(1)) |
+ gclient_utils.CheckCallAndFilterAndHeader( |
+ args, cwd=cwd, always=True, filter_fn=FilterLines, use_v42=True, |
+ kill_timeout=kill_timeout) |
+ self.assertEquals(line_list, ['ahah', 'accb', 'allo', 'addb']) |
+ self.assertEquals(capture_list, ['cc', 'dd']) |
+ |
+ def testCheckCallAndFilter(self): |
+ args = ['boo', 'foo', 'bar'] |
+ test_string = 'ahah\naccb\nallo\naddb\n' |
+ self._inner(args, test_string) |
+ self.checkstdout('\n________ running \'boo foo bar\' in \'bleh\'\n' |
+ 'ahah\naccb\nallo\naddb\n\n' |
+ '________ running \'boo foo bar\' in \'bleh\'\nahah\naccb\nallo\naddb' |
+ '\n') |
+ |
+ def testNoLF(self): |
+ # Exactly as testCheckCallAndFilterAndHeader without trailing \n |
+ args = ['boo', 'foo', 'bar'] |
+ test_string = 'ahah\naccb\nallo\naddb' |
+ self._inner(args, test_string) |
+ self.checkstdout('\n________ running \'boo foo bar\' in \'bleh\'\n' |
+ 'ahah\naccb\nallo\naddb\n' |
+ '________ running \'boo foo bar\' in \'bleh\'\nahah\naccb\nallo\naddb') |
+ |
+ def testKilled(self): |
+ args = ['boo', 'foo', 'bar'] |
+ test_string = 'ahah\naccb\nallo\naddb' |
+ self._inner(args, test_string, block_for=3, kill_timeout=1) |
+ self.checkstdout('\n________ running \'boo foo bar\' in \'bleh\'\n' |
+ 'ahah\naccb\nallo\naddb\n' |
+ '________ running \'boo foo bar\' in \'bleh\'\nahah\naccb\nallo\naddb' |
+ 'ERROR: killing process \'"boo" "foo" "bar"\' in bleh running for 1s ' |
+ '(timeout: 1s)\n') |
+ |
+ |
class SplitUrlRevisionTestCase(GclientUtilBase): |
def testSSHUrl(self): |
url = "ssh://test@example.com/test.git" |