OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 import os | 6 import os |
7 import StringIO | 7 import StringIO |
8 import sys | 8 import sys |
| 9 import threading |
9 | 10 |
10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | 11 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
11 | 12 |
12 from testing_support.super_mox import SuperMoxTestBase | 13 from testing_support.super_mox import SuperMoxTestBase |
13 from testing_support import trial_dir | 14 from testing_support import trial_dir |
14 | 15 |
15 import gclient_utils | 16 import gclient_utils |
16 import subprocess2 | 17 import subprocess2 |
17 | 18 |
18 | 19 |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
74 | 75 |
75 def testNoLF(self): | 76 def testNoLF(self): |
76 # Exactly as testCheckCallAndFilterAndHeader without trailing \n | 77 # Exactly as testCheckCallAndFilterAndHeader without trailing \n |
77 args = ['boo', 'foo', 'bar'] | 78 args = ['boo', 'foo', 'bar'] |
78 test_string = 'ahah\naccb\nallo\naddb' | 79 test_string = 'ahah\naccb\nallo\naddb' |
79 self._inner(args, test_string) | 80 self._inner(args, test_string) |
80 self.checkstdout('\n________ running \'boo foo bar\' in \'bleh\'\n' | 81 self.checkstdout('\n________ running \'boo foo bar\' in \'bleh\'\n' |
81 'ahah\naccb\nallo\naddb\n' | 82 'ahah\naccb\nallo\naddb\n' |
82 '________ running \'boo foo bar\' in \'bleh\'\nahah\naccb\nallo\naddb') | 83 '________ running \'boo foo bar\' in \'bleh\'\nahah\naccb\nallo\naddb') |
83 | 84 |
| 85 def _checkKillTimeout(self, output_block_for=0, kill_raises=False): |
| 86 cv = threading.Condition() |
| 87 order = [] |
| 88 |
| 89 output = list(reversed('output')) |
| 90 def kid_stdout_read(_): |
| 91 if output: |
| 92 return output.pop() |
| 93 else: |
| 94 with cv: |
| 95 cv.wait(timeout=output_block_for) |
| 96 order.append('unblock') |
| 97 return '' |
| 98 def kid_kill(): |
| 99 with cv: |
| 100 order.append('killed') |
| 101 cv.notify() |
| 102 if kill_raises: |
| 103 raise OSError('somethign went wrong') |
| 104 |
| 105 kid = self.ProcessIdMock('') |
| 106 kid.stdout.read = kid_stdout_read |
| 107 kid.kill = kid_kill # pylint: disable=W0201 |
| 108 cwd = 'bleh' |
| 109 args = ['ar', 'gs'] |
| 110 gclient_utils.sys.stdout.write( |
| 111 '\n________ running \'ar gs\' in \'bleh\'\noutput') |
| 112 os.getcwd() |
| 113 subprocess2.Popen( |
| 114 args, cwd=cwd, |
| 115 stdout=subprocess2.PIPE, |
| 116 stderr=subprocess2.STDOUT, |
| 117 bufsize=0).AndReturn(kid) |
| 118 self.mox.ReplayAll() |
| 119 |
| 120 # This test relies on the testing machine's ability to process 1 char |
| 121 # of output in <0.01 seconds set in kill_timeout. |
| 122 gclient_utils.CheckCallAndFilterAndHeader( |
| 123 args, cwd=cwd, always=True, kill_timeout=0.01) |
| 124 self.checkstdout( |
| 125 '\n________ running \'ar gs\' in \'bleh\'\noutput\n' |
| 126 '________ running \'ar gs\' in \'bleh\'\noutput') |
| 127 return order |
| 128 |
| 129 def testKillTimeout(self): |
| 130 order = self._checkKillTimeout(output_block_for=1.0) |
| 131 self.assertEquals(order, ['killed', 'unblock']) |
| 132 |
| 133 def testKillRaise(self): |
| 134 order = self._checkKillTimeout(output_block_for=1.0, kill_raises=True) |
| 135 self.assertEquals(order, ['killed', 'unblock']) |
| 136 |
| 137 def testNoKill(self): |
| 138 order = self._checkKillTimeout(output_block_for=0.0) |
| 139 self.assertEquals(order, ['unblock']) |
| 140 |
84 | 141 |
85 class SplitUrlRevisionTestCase(GclientUtilBase): | 142 class SplitUrlRevisionTestCase(GclientUtilBase): |
86 def testSSHUrl(self): | 143 def testSSHUrl(self): |
87 url = "ssh://test@example.com/test.git" | 144 url = "ssh://test@example.com/test.git" |
88 rev = "ac345e52dc" | 145 rev = "ac345e52dc" |
89 out_url, out_rev = gclient_utils.SplitUrlRevision(url) | 146 out_url, out_rev = gclient_utils.SplitUrlRevision(url) |
90 self.assertEquals(out_rev, None) | 147 self.assertEquals(out_rev, None) |
91 self.assertEquals(out_url, url) | 148 self.assertEquals(out_url, url) |
92 out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) | 149 out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) |
93 self.assertEquals(out_rev, rev) | 150 self.assertEquals(out_rev, rev) |
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
196 ], | 253 ], |
197 ['VIEW_VC:http://r/s', {'VIEW_VC': 'https://r/s'}], | 254 ['VIEW_VC:http://r/s', {'VIEW_VC': 'https://r/s'}], |
198 ] | 255 ] |
199 for content, expected in values: | 256 for content, expected in values: |
200 self.assertEquals( | 257 self.assertEquals( |
201 expected, gclient_utils.ParseCodereviewSettingsContent(content)) | 258 expected, gclient_utils.ParseCodereviewSettingsContent(content)) |
202 | 259 |
203 | 260 |
204 if __name__ == '__main__': | 261 if __name__ == '__main__': |
205 import unittest | 262 import unittest |
| 263 import logging |
| 264 level = logging.DEBUG if '-v' in sys.argv else logging.FATAL |
| 265 logging.basicConfig( |
| 266 level=level, |
| 267 format='%(asctime).19s %(levelname)s %(filename)s:' |
| 268 '%(lineno)s %(message)s') |
206 unittest.main() | 269 unittest.main() |
207 | 270 |
208 # vim: ts=2:sw=2:tw=80:et: | 271 # vim: ts=2:sw=2:tw=80:et: |
OLD | NEW |