OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 import os | 6 import os |
7 import StringIO | 7 import StringIO |
8 import sys | 8 import sys |
9 import threading | |
10 | 9 |
11 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | 10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
12 | 11 |
13 from testing_support.super_mox import SuperMoxTestBase | 12 from testing_support.super_mox import SuperMoxTestBase |
14 from testing_support import trial_dir | 13 from testing_support import trial_dir |
15 | 14 |
16 import gclient_utils | 15 import gclient_utils |
17 import subprocess2 | 16 import subprocess2 |
18 | 17 |
19 | 18 |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
75 | 74 |
76 def testNoLF(self): | 75 def testNoLF(self): |
77 # Exactly as testCheckCallAndFilterAndHeader without trailing \n | 76 # Exactly as testCheckCallAndFilterAndHeader without trailing \n |
78 args = ['boo', 'foo', 'bar'] | 77 args = ['boo', 'foo', 'bar'] |
79 test_string = 'ahah\naccb\nallo\naddb' | 78 test_string = 'ahah\naccb\nallo\naddb' |
80 self._inner(args, test_string) | 79 self._inner(args, test_string) |
81 self.checkstdout('\n________ running \'boo foo bar\' in \'bleh\'\n' | 80 self.checkstdout('\n________ running \'boo foo bar\' in \'bleh\'\n' |
82 'ahah\naccb\nallo\naddb\n' | 81 'ahah\naccb\nallo\naddb\n' |
83 '________ running \'boo foo bar\' in \'bleh\'\nahah\naccb\nallo\naddb') | 82 '________ running \'boo foo bar\' in \'bleh\'\nahah\naccb\nallo\naddb') |
84 | 83 |
85 def _checkKillTimeout(self, output_block_for=0, kill_raises=False): | |
86 cv = threading.Condition() | |
87 order = [] | |
88 | |
89 output = list(reversed('output')) | |
90 def kid_stdout_read(_): | |
91 if output: | |
92 return output.pop() | |
93 else: | |
94 with cv: | |
95 cv.wait(timeout=output_block_for) | |
96 order.append('unblock') | |
97 return '' | |
98 def kid_kill(): | |
99 with cv: | |
100 order.append('killed') | |
101 cv.notify() | |
102 if kill_raises: | |
103 raise OSError('somethign went wrong') | |
104 | |
105 kid = self.ProcessIdMock('') | |
106 kid.stdout.read = kid_stdout_read | |
107 kid.kill = kid_kill # pylint: disable=W0201 | |
108 cwd = 'bleh' | |
109 args = ['ar', 'gs'] | |
110 gclient_utils.sys.stdout.write( | |
111 '\n________ running \'ar gs\' in \'bleh\'\noutput') | |
112 os.getcwd() | |
113 subprocess2.Popen( | |
114 args, cwd=cwd, | |
115 stdout=subprocess2.PIPE, | |
116 stderr=subprocess2.STDOUT, | |
117 bufsize=0).AndReturn(kid) | |
118 self.mox.ReplayAll() | |
119 | |
120 # This test relies on the testing machine's ability to process 1 char | |
121 # of output in <0.01 seconds set in kill_timeout. | |
122 gclient_utils.CheckCallAndFilterAndHeader( | |
123 args, cwd=cwd, always=True, kill_timeout=0.01) | |
124 self.checkstdout( | |
125 '\n________ running \'ar gs\' in \'bleh\'\noutput\n' | |
126 '________ running \'ar gs\' in \'bleh\'\noutput') | |
127 return order | |
128 | |
129 def testKillTimeout(self): | |
130 order = self._checkKillTimeout(output_block_for=1.0) | |
131 self.assertEquals(order, ['killed', 'unblock']) | |
132 | |
133 def testKillRaise(self): | |
134 order = self._checkKillTimeout(output_block_for=1.0, kill_raises=True) | |
135 self.assertEquals(order, ['killed', 'unblock']) | |
136 | |
137 def testNoKill(self): | |
138 order = self._checkKillTimeout(output_block_for=0.0) | |
139 self.assertEquals(order, ['unblock']) | |
140 | |
141 | 84 |
142 class SplitUrlRevisionTestCase(GclientUtilBase): | 85 class SplitUrlRevisionTestCase(GclientUtilBase): |
143 def testSSHUrl(self): | 86 def testSSHUrl(self): |
144 url = "ssh://test@example.com/test.git" | 87 url = "ssh://test@example.com/test.git" |
145 rev = "ac345e52dc" | 88 rev = "ac345e52dc" |
146 out_url, out_rev = gclient_utils.SplitUrlRevision(url) | 89 out_url, out_rev = gclient_utils.SplitUrlRevision(url) |
147 self.assertEquals(out_rev, None) | 90 self.assertEquals(out_rev, None) |
148 self.assertEquals(out_url, url) | 91 self.assertEquals(out_url, url) |
149 out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) | 92 out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) |
150 self.assertEquals(out_rev, rev) | 93 self.assertEquals(out_rev, rev) |
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
253 ], | 196 ], |
254 ['VIEW_VC:http://r/s', {'VIEW_VC': 'https://r/s'}], | 197 ['VIEW_VC:http://r/s', {'VIEW_VC': 'https://r/s'}], |
255 ] | 198 ] |
256 for content, expected in values: | 199 for content, expected in values: |
257 self.assertEquals( | 200 self.assertEquals( |
258 expected, gclient_utils.ParseCodereviewSettingsContent(content)) | 201 expected, gclient_utils.ParseCodereviewSettingsContent(content)) |
259 | 202 |
260 | 203 |
261 if __name__ == '__main__': | 204 if __name__ == '__main__': |
262 import unittest | 205 import unittest |
263 import logging | |
264 level = logging.DEBUG if '-v' in sys.argv else logging.FATAL | |
265 logging.basicConfig( | |
266 level=level, | |
267 format='%(asctime).19s %(levelname)s %(filename)s:' | |
268 '%(lineno)s %(message)s') | |
269 unittest.main() | 206 unittest.main() |
270 | 207 |
271 # vim: ts=2:sw=2:tw=80:et: | 208 # vim: ts=2:sw=2:tw=80:et: |
OLD | NEW |