OLD | NEW |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 | 2 |
3 import os, unittest, StringIO, socket, urllib2, shutil, subprocess, logging | 3 import os, unittest, StringIO, socket, urllib2, shutil, subprocess, logging |
4 | 4 |
5 import common | 5 import common |
6 from autotest_lib.client.common_lib import utils, autotemp | 6 from autotest_lib.client.common_lib import base_utils, autotemp |
7 from autotest_lib.client.common_lib.test_utils import mock | 7 from autotest_lib.client.common_lib.test_utils import mock |
8 | 8 |
9 | 9 |
10 class test_read_one_line(unittest.TestCase): | 10 class test_read_one_line(unittest.TestCase): |
11 def setUp(self): | 11 def setUp(self): |
12 self.god = mock.mock_god() | 12 self.god = mock.mock_god(ut=self) |
13 self.god.stub_function(utils, "open") | 13 self.god.stub_function(base_utils, "open") |
14 | 14 |
15 | 15 |
16 def tearDown(self): | 16 def tearDown(self): |
17 self.god.unstub_all() | 17 self.god.unstub_all() |
18 | 18 |
19 | 19 |
20 def test_ip_to_long(self): | 20 def test_ip_to_long(self): |
21 self.assertEqual(utils.ip_to_long('0.0.0.0'), 0) | 21 self.assertEqual(base_utils.ip_to_long('0.0.0.0'), 0) |
22 self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295) | 22 self.assertEqual(base_utils.ip_to_long('255.255.255.255'), 4294967295) |
23 self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521) | 23 self.assertEqual(base_utils.ip_to_long('192.168.0.1'), 3232235521) |
24 self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320) | 24 self.assertEqual(base_utils.ip_to_long('1.2.4.8'), 16909320) |
25 | 25 |
26 | 26 |
27 def test_long_to_ip(self): | 27 def test_long_to_ip(self): |
28 self.assertEqual(utils.long_to_ip(0), '0.0.0.0') | 28 self.assertEqual(base_utils.long_to_ip(0), '0.0.0.0') |
29 self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255') | 29 self.assertEqual(base_utils.long_to_ip(4294967295), '255.255.255.255') |
30 self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1') | 30 self.assertEqual(base_utils.long_to_ip(3232235521), '192.168.0.1') |
31 self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8') | 31 self.assertEqual(base_utils.long_to_ip(16909320), '1.2.4.8') |
32 | 32 |
33 | 33 |
34 def test_create_subnet_mask(self): | 34 def test_create_subnet_mask(self): |
35 self.assertEqual(utils.create_subnet_mask(0), 0) | 35 self.assertEqual(base_utils.create_subnet_mask(0), 0) |
36 self.assertEqual(utils.create_subnet_mask(32), 4294967295) | 36 self.assertEqual(base_utils.create_subnet_mask(32), 4294967295) |
37 self.assertEqual(utils.create_subnet_mask(25), 4294967168) | 37 self.assertEqual(base_utils.create_subnet_mask(25), 4294967168) |
38 | 38 |
39 | 39 |
40 def test_format_ip_with_mask(self): | 40 def test_format_ip_with_mask(self): |
41 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0), | 41 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 0), |
42 '0.0.0.0/0') | 42 '0.0.0.0/0') |
43 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32), | 43 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 32), |
44 '192.168.0.1/32') | 44 '192.168.0.1/32') |
45 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26), | 45 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 26), |
46 '192.168.0.0/26') | 46 '192.168.0.0/26') |
47 self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26), | 47 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.255', 26), |
48 '192.168.0.192/26') | 48 '192.168.0.192/26') |
49 | 49 |
50 | 50 |
51 def create_test_file(self, contents): | 51 def create_test_file(self, contents): |
52 test_file = StringIO.StringIO(contents) | 52 test_file = StringIO.StringIO(contents) |
53 utils.open.expect_call("filename", "r").and_return(test_file) | 53 base_utils.open.expect_call("filename", "r").and_return(test_file) |
54 | 54 |
55 | 55 |
56 def test_reads_one_line_file(self): | 56 def test_reads_one_line_file(self): |
57 self.create_test_file("abc\n") | 57 self.create_test_file("abc\n") |
58 self.assertEqual("abc", utils.read_one_line("filename")) | 58 self.assertEqual("abc", base_utils.read_one_line("filename")) |
59 self.god.check_playback() | 59 self.god.check_playback() |
60 | 60 |
61 | 61 |
62 def test_strips_read_lines(self): | 62 def test_strips_read_lines(self): |
63 self.create_test_file("abc \n") | 63 self.create_test_file("abc \n") |
64 self.assertEqual("abc ", utils.read_one_line("filename")) | 64 self.assertEqual("abc ", base_utils.read_one_line("filename")) |
65 self.god.check_playback() | 65 self.god.check_playback() |
66 | 66 |
67 | 67 |
68 def test_drops_extra_lines(self): | 68 def test_drops_extra_lines(self): |
69 self.create_test_file("line 1\nline 2\nline 3\n") | 69 self.create_test_file("line 1\nline 2\nline 3\n") |
70 self.assertEqual("line 1", utils.read_one_line("filename")) | 70 self.assertEqual("line 1", base_utils.read_one_line("filename")) |
71 self.god.check_playback() | 71 self.god.check_playback() |
72 | 72 |
73 | 73 |
74 def test_works_on_empty_file(self): | 74 def test_works_on_empty_file(self): |
75 self.create_test_file("") | 75 self.create_test_file("") |
76 self.assertEqual("", utils.read_one_line("filename")) | 76 self.assertEqual("", base_utils.read_one_line("filename")) |
77 self.god.check_playback() | 77 self.god.check_playback() |
78 | 78 |
79 | 79 |
80 def test_works_on_file_with_no_newlines(self): | 80 def test_works_on_file_with_no_newlines(self): |
81 self.create_test_file("line but no newline") | 81 self.create_test_file("line but no newline") |
82 self.assertEqual("line but no newline", | 82 self.assertEqual("line but no newline", |
83 utils.read_one_line("filename")) | 83 base_utils.read_one_line("filename")) |
84 self.god.check_playback() | 84 self.god.check_playback() |
85 | 85 |
86 | 86 |
87 def test_preserves_leading_whitespace(self): | 87 def test_preserves_leading_whitespace(self): |
88 self.create_test_file(" has leading whitespace") | 88 self.create_test_file(" has leading whitespace") |
89 self.assertEqual(" has leading whitespace", | 89 self.assertEqual(" has leading whitespace", |
90 utils.read_one_line("filename")) | 90 base_utils.read_one_line("filename")) |
91 | 91 |
92 | 92 |
93 class test_write_one_line(unittest.TestCase): | 93 class test_write_one_line(unittest.TestCase): |
94 def setUp(self): | 94 def setUp(self): |
95 self.god = mock.mock_god() | 95 self.god = mock.mock_god(ut=self) |
96 self.god.stub_function(utils, "open") | 96 self.god.stub_function(base_utils, "open") |
97 | 97 |
98 | 98 |
99 def tearDown(self): | 99 def tearDown(self): |
100 self.god.unstub_all() | 100 self.god.unstub_all() |
101 | 101 |
102 | 102 |
103 def get_write_one_line_output(self, content): | 103 def get_write_one_line_output(self, content): |
104 test_file = mock.SaveDataAfterCloseStringIO() | 104 test_file = mock.SaveDataAfterCloseStringIO() |
105 utils.open.expect_call("filename", "w").and_return(test_file) | 105 base_utils.open.expect_call("filename", "w").and_return(test_file) |
106 utils.write_one_line("filename", content) | 106 base_utils.write_one_line("filename", content) |
107 self.god.check_playback() | 107 self.god.check_playback() |
108 return test_file.final_data | 108 return test_file.final_data |
109 | 109 |
110 | 110 |
111 def test_writes_one_line_file(self): | 111 def test_writes_one_line_file(self): |
112 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) | 112 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) |
113 | 113 |
114 | 114 |
115 def test_preserves_existing_newline(self): | 115 def test_preserves_existing_newline(self): |
116 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) | 116 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) |
117 | 117 |
118 | 118 |
119 def test_preserves_leading_whitespace(self): | 119 def test_preserves_leading_whitespace(self): |
120 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) | 120 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) |
121 | 121 |
122 | 122 |
123 def test_preserves_trailing_whitespace(self): | 123 def test_preserves_trailing_whitespace(self): |
124 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) | 124 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) |
125 | 125 |
126 | 126 |
127 def test_handles_empty_input(self): | 127 def test_handles_empty_input(self): |
128 self.assertEqual("\n", self.get_write_one_line_output("")) | 128 self.assertEqual("\n", self.get_write_one_line_output("")) |
129 | 129 |
130 | 130 |
131 class test_open_write_close(unittest.TestCase): | 131 class test_open_write_close(unittest.TestCase): |
132 def setUp(self): | 132 def setUp(self): |
133 self.god = mock.mock_god() | 133 self.god = mock.mock_god(ut=self) |
134 self.god.stub_function(utils, "open") | 134 self.god.stub_function(base_utils, "open") |
135 | 135 |
136 | 136 |
137 def tearDown(self): | 137 def tearDown(self): |
138 self.god.unstub_all() | 138 self.god.unstub_all() |
139 | 139 |
140 | 140 |
141 def test_simple_functionality(self): | 141 def test_simple_functionality(self): |
142 data = "\n\nwhee\n" | 142 data = "\n\nwhee\n" |
143 test_file = mock.SaveDataAfterCloseStringIO() | 143 test_file = mock.SaveDataAfterCloseStringIO() |
144 utils.open.expect_call("filename", "w").and_return(test_file) | 144 base_utils.open.expect_call("filename", "w").and_return(test_file) |
145 utils.open_write_close("filename", data) | 145 base_utils.open_write_close("filename", data) |
146 self.god.check_playback() | 146 self.god.check_playback() |
147 self.assertEqual(data, test_file.final_data) | 147 self.assertEqual(data, test_file.final_data) |
148 | 148 |
149 | 149 |
150 class test_read_keyval(unittest.TestCase): | 150 class test_read_keyval(unittest.TestCase): |
151 def setUp(self): | 151 def setUp(self): |
152 self.god = mock.mock_god() | 152 self.god = mock.mock_god(ut=self) |
153 self.god.stub_function(utils, "open") | 153 self.god.stub_function(base_utils, "open") |
154 self.god.stub_function(os.path, "isdir") | 154 self.god.stub_function(os.path, "isdir") |
155 self.god.stub_function(os.path, "exists") | 155 self.god.stub_function(os.path, "exists") |
156 | 156 |
157 | 157 |
158 def tearDown(self): | 158 def tearDown(self): |
159 self.god.unstub_all() | 159 self.god.unstub_all() |
160 | 160 |
161 | 161 |
162 def create_test_file(self, filename, contents): | 162 def create_test_file(self, filename, contents): |
163 test_file = StringIO.StringIO(contents) | 163 test_file = StringIO.StringIO(contents) |
164 os.path.exists.expect_call(filename).and_return(True) | 164 os.path.exists.expect_call(filename).and_return(True) |
165 utils.open.expect_call(filename).and_return(test_file) | 165 base_utils.open.expect_call(filename).and_return(test_file) |
166 | 166 |
167 | 167 |
168 def read_keyval(self, contents): | 168 def read_keyval(self, contents): |
169 os.path.isdir.expect_call("file").and_return(False) | 169 os.path.isdir.expect_call("file").and_return(False) |
170 self.create_test_file("file", contents) | 170 self.create_test_file("file", contents) |
171 keyval = utils.read_keyval("file") | 171 keyval = base_utils.read_keyval("file") |
172 self.god.check_playback() | 172 self.god.check_playback() |
173 return keyval | 173 return keyval |
174 | 174 |
175 | 175 |
176 def test_returns_empty_when_file_doesnt_exist(self): | 176 def test_returns_empty_when_file_doesnt_exist(self): |
177 os.path.isdir.expect_call("file").and_return(False) | 177 os.path.isdir.expect_call("file").and_return(False) |
178 os.path.exists.expect_call("file").and_return(False) | 178 os.path.exists.expect_call("file").and_return(False) |
179 self.assertEqual({}, utils.read_keyval("file")) | 179 self.assertEqual({}, base_utils.read_keyval("file")) |
180 self.god.check_playback() | 180 self.god.check_playback() |
181 | 181 |
182 | 182 |
183 def test_accesses_files_directly(self): | 183 def test_accesses_files_directly(self): |
184 os.path.isdir.expect_call("file").and_return(False) | 184 os.path.isdir.expect_call("file").and_return(False) |
185 self.create_test_file("file", "") | 185 self.create_test_file("file", "") |
186 utils.read_keyval("file") | 186 base_utils.read_keyval("file") |
187 self.god.check_playback() | 187 self.god.check_playback() |
188 | 188 |
189 | 189 |
190 def test_accesses_directories_through_keyval_file(self): | 190 def test_accesses_directories_through_keyval_file(self): |
191 os.path.isdir.expect_call("dir").and_return(True) | 191 os.path.isdir.expect_call("dir").and_return(True) |
192 self.create_test_file("dir/keyval", "") | 192 self.create_test_file("dir/keyval", "") |
193 utils.read_keyval("dir") | 193 base_utils.read_keyval("dir") |
194 self.god.check_playback() | 194 self.god.check_playback() |
195 | 195 |
196 | 196 |
197 def test_values_are_rstripped(self): | 197 def test_values_are_rstripped(self): |
198 keyval = self.read_keyval("a=b \n") | 198 keyval = self.read_keyval("a=b \n") |
199 self.assertEquals(keyval, {"a": "b"}) | 199 self.assertEquals(keyval, {"a": "b"}) |
200 | 200 |
201 | 201 |
202 def test_comments_are_ignored(self): | 202 def test_comments_are_ignored(self): |
203 keyval = self.read_keyval("a=b # a comment\n") | 203 keyval = self.read_keyval("a=b # a comment\n") |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
240 self.assertEquals(keyval, {"a_b": "value"}) | 240 self.assertEquals(keyval, {"a_b": "value"}) |
241 | 241 |
242 | 242 |
243 def test_dashes_are_allowed_in_key_names(self): | 243 def test_dashes_are_allowed_in_key_names(self): |
244 keyval = self.read_keyval("a-b=value\n") | 244 keyval = self.read_keyval("a-b=value\n") |
245 self.assertEquals(keyval, {"a-b": "value"}) | 245 self.assertEquals(keyval, {"a-b": "value"}) |
246 | 246 |
247 | 247 |
248 class test_write_keyval(unittest.TestCase): | 248 class test_write_keyval(unittest.TestCase): |
249 def setUp(self): | 249 def setUp(self): |
250 self.god = mock.mock_god() | 250 self.god = mock.mock_god(ut=self) |
251 self.god.stub_function(utils, "open") | 251 self.god.stub_function(base_utils, "open") |
252 self.god.stub_function(os.path, "isdir") | 252 self.god.stub_function(os.path, "isdir") |
253 | 253 |
254 | 254 |
255 def tearDown(self): | 255 def tearDown(self): |
256 self.god.unstub_all() | 256 self.god.unstub_all() |
257 | 257 |
258 | 258 |
259 def assertHasLines(self, value, lines): | 259 def assertHasLines(self, value, lines): |
260 vlines = value.splitlines() | 260 vlines = value.splitlines() |
261 vlines.sort() | 261 vlines.sort() |
262 self.assertEquals(vlines, sorted(lines)) | 262 self.assertEquals(vlines, sorted(lines)) |
263 | 263 |
264 | 264 |
265 def write_keyval(self, filename, dictionary, expected_filename=None, | 265 def write_keyval(self, filename, dictionary, expected_filename=None, |
266 type_tag=None): | 266 type_tag=None): |
267 if expected_filename is None: | 267 if expected_filename is None: |
268 expected_filename = filename | 268 expected_filename = filename |
269 test_file = StringIO.StringIO() | 269 test_file = StringIO.StringIO() |
270 self.god.stub_function(test_file, "close") | 270 self.god.stub_function(test_file, "close") |
271 utils.open.expect_call(expected_filename, "a").and_return(test_file) | 271 base_utils.open.expect_call(expected_filename, "a").and_return(test_file
) |
272 test_file.close.expect_call() | 272 test_file.close.expect_call() |
273 if type_tag is None: | 273 if type_tag is None: |
274 utils.write_keyval(filename, dictionary) | 274 base_utils.write_keyval(filename, dictionary) |
275 else: | 275 else: |
276 utils.write_keyval(filename, dictionary, type_tag) | 276 base_utils.write_keyval(filename, dictionary, type_tag) |
277 return test_file.getvalue() | 277 return test_file.getvalue() |
278 | 278 |
279 | 279 |
280 def write_keyval_file(self, dictionary, type_tag=None): | 280 def write_keyval_file(self, dictionary, type_tag=None): |
281 os.path.isdir.expect_call("file").and_return(False) | 281 os.path.isdir.expect_call("file").and_return(False) |
282 return self.write_keyval("file", dictionary, type_tag=type_tag) | 282 return self.write_keyval("file", dictionary, type_tag=type_tag) |
283 | 283 |
284 | 284 |
285 def test_accesses_files_directly(self): | 285 def test_accesses_files_directly(self): |
286 os.path.isdir.expect_call("file").and_return(False) | 286 os.path.isdir.expect_call("file").and_return(False) |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
323 self.assertEquals(result, "a_b=value\n") | 323 self.assertEquals(result, "a_b=value\n") |
324 | 324 |
325 | 325 |
326 def test_dashes_are_allowed_in_key_names(self): | 326 def test_dashes_are_allowed_in_key_names(self): |
327 result = self.write_keyval_file({"a-b": "value"}) | 327 result = self.write_keyval_file({"a-b": "value"}) |
328 self.assertEquals(result, "a-b=value\n") | 328 self.assertEquals(result, "a-b=value\n") |
329 | 329 |
330 | 330 |
331 class test_is_url(unittest.TestCase): | 331 class test_is_url(unittest.TestCase): |
332 def test_accepts_http(self): | 332 def test_accepts_http(self): |
333 self.assertTrue(utils.is_url("http://example.com")) | 333 self.assertTrue(base_utils.is_url("http://example.com")) |
334 | 334 |
335 | 335 |
336 def test_accepts_ftp(self): | 336 def test_accepts_ftp(self): |
337 self.assertTrue(utils.is_url("ftp://ftp.example.com")) | 337 self.assertTrue(base_utils.is_url("ftp://ftp.example.com")) |
338 | 338 |
339 | 339 |
340 def test_rejects_local_path(self): | 340 def test_rejects_local_path(self): |
341 self.assertFalse(utils.is_url("/home/username/file")) | 341 self.assertFalse(base_utils.is_url("/home/username/file")) |
342 | 342 |
343 | 343 |
344 def test_rejects_local_filename(self): | 344 def test_rejects_local_filename(self): |
345 self.assertFalse(utils.is_url("filename")) | 345 self.assertFalse(base_utils.is_url("filename")) |
346 | 346 |
347 | 347 |
348 def test_rejects_relative_local_path(self): | 348 def test_rejects_relative_local_path(self): |
349 self.assertFalse(utils.is_url("somedir/somesubdir/file")) | 349 self.assertFalse(base_utils.is_url("somedir/somesubdir/file")) |
350 | 350 |
351 | 351 |
352 def test_rejects_local_path_containing_url(self): | 352 def test_rejects_local_path_containing_url(self): |
353 self.assertFalse(utils.is_url("somedir/http://path/file")) | 353 self.assertFalse(base_utils.is_url("somedir/http://path/file")) |
354 | 354 |
355 | 355 |
356 class test_urlopen(unittest.TestCase): | 356 class test_urlopen(unittest.TestCase): |
357 def setUp(self): | 357 def setUp(self): |
358 self.god = mock.mock_god() | 358 self.god = mock.mock_god(ut=self) |
359 | 359 |
360 | 360 |
361 def tearDown(self): | 361 def tearDown(self): |
362 self.god.unstub_all() | 362 self.god.unstub_all() |
363 | 363 |
364 | 364 |
365 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, | 365 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, |
366 *expected_args): | 366 *expected_args): |
367 expected_args += (None,) * (2 - len(expected_args)) | 367 expected_args += (None,) * (2 - len(expected_args)) |
368 def urlopen(url, data=None): | 368 def urlopen(url, data=None): |
369 self.assertEquals(expected_args, (url,data)) | 369 self.assertEquals(expected_args, (url,data)) |
370 test_func(socket.getdefaulttimeout()) | 370 test_func(socket.getdefaulttimeout()) |
371 return expected_return | 371 return expected_return |
372 self.god.stub_with(urllib2, "urlopen", urlopen) | 372 self.god.stub_with(urllib2, "urlopen", urlopen) |
373 | 373 |
374 | 374 |
375 def stub_urlopen_with_timeout_check(self, expected_timeout, | 375 def stub_urlopen_with_timeout_check(self, expected_timeout, |
376 expected_return, *expected_args): | 376 expected_return, *expected_args): |
377 def test_func(timeout): | 377 def test_func(timeout): |
378 self.assertEquals(timeout, expected_timeout) | 378 self.assertEquals(timeout, expected_timeout) |
379 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, | 379 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, |
380 *expected_args) | 380 *expected_args) |
381 | 381 |
382 | 382 |
383 def test_timeout_set_during_call(self): | 383 def test_timeout_set_during_call(self): |
384 self.stub_urlopen_with_timeout_check(30, "retval", "url") | 384 self.stub_urlopen_with_timeout_check(30, "retval", "url") |
385 retval = utils.urlopen("url", timeout=30) | 385 retval = base_utils.urlopen("url", timeout=30) |
386 self.assertEquals(retval, "retval") | 386 self.assertEquals(retval, "retval") |
387 | 387 |
388 | 388 |
389 def test_timeout_reset_after_call(self): | 389 def test_timeout_reset_after_call(self): |
390 old_timeout = socket.getdefaulttimeout() | 390 old_timeout = socket.getdefaulttimeout() |
391 self.stub_urlopen_with_timeout_check(30, None, "url") | 391 self.stub_urlopen_with_timeout_check(30, None, "url") |
392 try: | 392 try: |
393 socket.setdefaulttimeout(1234) | 393 socket.setdefaulttimeout(1234) |
394 utils.urlopen("url", timeout=30) | 394 base_utils.urlopen("url", timeout=30) |
395 self.assertEquals(1234, socket.getdefaulttimeout()) | 395 self.assertEquals(1234, socket.getdefaulttimeout()) |
396 finally: | 396 finally: |
397 socket.setdefaulttimeout(old_timeout) | 397 socket.setdefaulttimeout(old_timeout) |
398 | 398 |
399 | 399 |
400 def test_timeout_set_by_default(self): | 400 def test_timeout_set_by_default(self): |
401 def test_func(timeout): | 401 def test_func(timeout): |
402 self.assertTrue(timeout is not None) | 402 self.assertTrue(timeout is not None) |
403 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") | 403 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") |
404 utils.urlopen("url") | 404 base_utils.urlopen("url") |
405 | 405 |
406 | 406 |
407 def test_args_are_untouched(self): | 407 def test_args_are_untouched(self): |
408 self.stub_urlopen_with_timeout_check(30, None, "http://url", | 408 self.stub_urlopen_with_timeout_check(30, None, "http://url", |
409 "POST data") | 409 "POST data") |
410 utils.urlopen("http://url", timeout=30, data="POST data") | 410 base_utils.urlopen("http://url", timeout=30, data="POST data") |
411 | 411 |
412 | 412 |
413 class test_urlretrieve(unittest.TestCase): | 413 class test_urlretrieve(unittest.TestCase): |
414 def setUp(self): | 414 def setUp(self): |
415 self.god = mock.mock_god() | 415 self.god = mock.mock_god(ut=self) |
416 | 416 |
417 | 417 |
418 def tearDown(self): | 418 def tearDown(self): |
419 self.god.unstub_all() | 419 self.god.unstub_all() |
420 | 420 |
421 | 421 |
422 def test_urlopen_passed_arguments(self): | 422 def test_urlopen_passed_arguments(self): |
423 self.god.stub_function(utils, "urlopen") | 423 self.god.stub_function(base_utils, "urlopen") |
424 self.god.stub_function(utils.shutil, "copyfileobj") | 424 self.god.stub_function(base_utils.shutil, "copyfileobj") |
425 self.god.stub_function(utils, "open") | 425 self.god.stub_function(base_utils, "open") |
426 | 426 |
427 url = "url" | 427 url = "url" |
428 dest = "somefile" | 428 dest = "somefile" |
429 data = object() | 429 data = object() |
430 timeout = 10 | 430 timeout = 10 |
431 | 431 |
432 src_file = self.god.create_mock_class(file, "file") | 432 src_file = self.god.create_mock_class(file, "file") |
433 dest_file = self.god.create_mock_class(file, "file") | 433 dest_file = self.god.create_mock_class(file, "file") |
434 | 434 |
435 (utils.urlopen.expect_call(url, data=data, timeout=timeout) | 435 (base_utils.urlopen.expect_call(url, data=data, timeout=timeout) |
436 .and_return(src_file)) | 436 .and_return(src_file)) |
437 utils.open.expect_call(dest, "wb").and_return(dest_file) | 437 base_utils.open.expect_call(dest, "wb").and_return(dest_file) |
438 utils.shutil.copyfileobj.expect_call(src_file, dest_file) | 438 base_utils.shutil.copyfileobj.expect_call(src_file, dest_file) |
439 dest_file.close.expect_call() | 439 dest_file.close.expect_call() |
440 src_file.close.expect_call() | 440 src_file.close.expect_call() |
441 | 441 |
442 utils.urlretrieve(url, dest, data=data, timeout=timeout) | 442 base_utils.urlretrieve(url, dest, data=data, timeout=timeout) |
443 self.god.check_playback() | 443 self.god.check_playback() |
444 | 444 |
445 | 445 |
446 class test_merge_trees(unittest.TestCase): | 446 class test_merge_trees(unittest.TestCase): |
447 # a some path-handling helper functions | 447 # a some path-handling helper functions |
448 def src(self, *path_segments): | 448 def src(self, *path_segments): |
449 return os.path.join(self.src_tree.name, *path_segments) | 449 return os.path.join(self.src_tree.name, *path_segments) |
450 | 450 |
451 | 451 |
452 def dest(self, *path_segments): | 452 def dest(self, *path_segments): |
(...skipping 27 matching lines...) Expand all Loading... |
480 os.mkdir(self.src("empty")) | 480 os.mkdir(self.src("empty")) |
481 os.mkdir(self.dest("empty")) | 481 os.mkdir(self.dest("empty")) |
482 | 482 |
483 | 483 |
484 def tearDown(self): | 484 def tearDown(self): |
485 self.src_tree.clean() | 485 self.src_tree.clean() |
486 self.dest_tree.clean() | 486 self.dest_tree.clean() |
487 | 487 |
488 | 488 |
489 def test_both_dont_exist(self): | 489 def test_both_dont_exist(self): |
490 utils.merge_trees(*self.paths("empty")) | 490 base_utils.merge_trees(*self.paths("empty")) |
491 | 491 |
492 | 492 |
493 def test_file_only_at_src(self): | 493 def test_file_only_at_src(self): |
494 print >> open(self.src("src_only"), "w"), "line 1" | 494 print >> open(self.src("src_only"), "w"), "line 1" |
495 utils.merge_trees(*self.paths("src_only")) | 495 base_utils.merge_trees(*self.paths("src_only")) |
496 self.assertFileEqual("src_only") | 496 self.assertFileEqual("src_only") |
497 | 497 |
498 | 498 |
499 def test_file_only_at_dest(self): | 499 def test_file_only_at_dest(self): |
500 print >> open(self.dest("dest_only"), "w"), "line 1" | 500 print >> open(self.dest("dest_only"), "w"), "line 1" |
501 utils.merge_trees(*self.paths("dest_only")) | 501 base_utils.merge_trees(*self.paths("dest_only")) |
502 self.assertEqual(False, os.path.exists(self.src("dest_only"))) | 502 self.assertEqual(False, os.path.exists(self.src("dest_only"))) |
503 self.assertFileContents("line 1\n", "dest_only") | 503 self.assertFileContents("line 1\n", "dest_only") |
504 | 504 |
505 | 505 |
506 def test_file_at_both(self): | 506 def test_file_at_both(self): |
507 print >> open(self.dest("in_both"), "w"), "line 1" | 507 print >> open(self.dest("in_both"), "w"), "line 1" |
508 print >> open(self.src("in_both"), "w"), "line 2" | 508 print >> open(self.src("in_both"), "w"), "line 2" |
509 utils.merge_trees(*self.paths("in_both")) | 509 base_utils.merge_trees(*self.paths("in_both")) |
510 self.assertFileContents("line 1\nline 2\n", "in_both") | 510 self.assertFileContents("line 1\nline 2\n", "in_both") |
511 | 511 |
512 | 512 |
513 def test_directory_with_files_in_both(self): | 513 def test_directory_with_files_in_both(self): |
514 print >> open(self.dest("in_both"), "w"), "line 1" | 514 print >> open(self.dest("in_both"), "w"), "line 1" |
515 print >> open(self.src("in_both"), "w"), "line 3" | 515 print >> open(self.src("in_both"), "w"), "line 3" |
516 utils.merge_trees(*self.paths()) | 516 base_utils.merge_trees(*self.paths()) |
517 self.assertFileContents("line 1\nline 3\n", "in_both") | 517 self.assertFileContents("line 1\nline 3\n", "in_both") |
518 | 518 |
519 | 519 |
520 def test_directory_with_mix_of_files(self): | 520 def test_directory_with_mix_of_files(self): |
521 print >> open(self.dest("in_dest"), "w"), "dest line" | 521 print >> open(self.dest("in_dest"), "w"), "dest line" |
522 print >> open(self.src("in_src"), "w"), "src line" | 522 print >> open(self.src("in_src"), "w"), "src line" |
523 utils.merge_trees(*self.paths()) | 523 base_utils.merge_trees(*self.paths()) |
524 self.assertFileContents("dest line\n", "in_dest") | 524 self.assertFileContents("dest line\n", "in_dest") |
525 self.assertFileContents("src line\n", "in_src") | 525 self.assertFileContents("src line\n", "in_src") |
526 | 526 |
527 | 527 |
528 def test_directory_with_subdirectories(self): | 528 def test_directory_with_subdirectories(self): |
529 os.mkdir(self.src("src_subdir")) | 529 os.mkdir(self.src("src_subdir")) |
530 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line" | 530 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line" |
531 os.mkdir(self.src("both_subdir")) | 531 os.mkdir(self.src("both_subdir")) |
532 os.mkdir(self.dest("both_subdir")) | 532 os.mkdir(self.dest("both_subdir")) |
533 print >> open(self.src("both_subdir", "subfile"), "w"), "src line" | 533 print >> open(self.src("both_subdir", "subfile"), "w"), "src line" |
534 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line" | 534 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line" |
535 utils.merge_trees(*self.paths()) | 535 base_utils.merge_trees(*self.paths()) |
536 self.assertFileContents("subdir line\n", "src_subdir", "subfile") | 536 self.assertFileContents("subdir line\n", "src_subdir", "subfile") |
537 self.assertFileContents("dest line\nsrc line\n", "both_subdir", | 537 self.assertFileContents("dest line\nsrc line\n", "both_subdir", |
538 "subfile") | 538 "subfile") |
539 | 539 |
540 | 540 |
541 class test_get_relative_path(unittest.TestCase): | 541 class test_get_relative_path(unittest.TestCase): |
542 def test_not_absolute(self): | 542 def test_not_absolute(self): |
543 self.assertRaises(AssertionError, utils.get_relative_path, "a", "b") | 543 self.assertRaises(AssertionError, base_utils.get_relative_path, "a", "b"
) |
544 | 544 |
545 def test_same_dir(self): | 545 def test_same_dir(self): |
546 self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c") | 546 self.assertEqual(base_utils.get_relative_path("/a/b/c", "/a/b"), "c") |
547 | 547 |
548 def test_forward_dir(self): | 548 def test_forward_dir(self): |
549 self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d") | 549 self.assertEqual(base_utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d"
) |
550 | 550 |
551 def test_previous_dir(self): | 551 def test_previous_dir(self): |
552 self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..") | 552 self.assertEqual(base_utils.get_relative_path("/a/b", "/a/b/c/d"), "../.
.") |
553 | 553 |
554 def test_parallel_dir(self): | 554 def test_parallel_dir(self): |
555 self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"), | 555 self.assertEqual(base_utils.get_relative_path("/a/c/d", "/a/b/c/d"), |
556 "../../../c/d") | 556 "../../../c/d") |
557 | 557 |
558 | 558 |
559 class test_sh_escape(unittest.TestCase): | 559 class test_sh_escape(unittest.TestCase): |
560 def _test_in_shell(self, text): | 560 def _test_in_shell(self, text): |
561 escaped_text = utils.sh_escape(text) | 561 escaped_text = base_utils.sh_escape(text) |
562 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, | 562 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, |
563 stdin=open(os.devnull, 'r'), | 563 stdin=open(os.devnull, 'r'), |
564 stdout=subprocess.PIPE, | 564 stdout=subprocess.PIPE, |
565 stderr=open(os.devnull, 'w')) | 565 stderr=open(os.devnull, 'w')) |
566 stdout, _ = proc.communicate() | 566 stdout, _ = proc.communicate() |
567 self.assertEqual(proc.returncode, 0) | 567 self.assertEqual(proc.returncode, 0) |
568 self.assertEqual(stdout[:-1], text) | 568 self.assertEqual(stdout[:-1], text) |
569 | 569 |
570 | 570 |
571 def test_normal_string(self): | 571 def test_normal_string(self): |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
620 self._test_in_shell('\\r') | 620 self._test_in_shell('\\r') |
621 self._test_in_shell('\\t') | 621 self._test_in_shell('\\t') |
622 self._test_in_shell('\\v') | 622 self._test_in_shell('\\v') |
623 self._test_in_shell('\\b') | 623 self._test_in_shell('\\b') |
624 self._test_in_shell('\\a') | 624 self._test_in_shell('\\a') |
625 self._test_in_shell('\\000') | 625 self._test_in_shell('\\000') |
626 | 626 |
627 | 627 |
628 class test_run(unittest.TestCase): | 628 class test_run(unittest.TestCase): |
629 """ | 629 """ |
630 Test the utils.run() function. | 630 Test the base_utils.run() function. |
631 | 631 |
632 Note: This test runs simple external commands to test the utils.run() | 632 Note: This test runs simple external commands to test the base_utils.run() |
633 API without assuming implementation details. | 633 API without assuming implementation details. |
634 """ | 634 """ |
635 def setUp(self): | 635 def setUp(self): |
636 self.god = mock.mock_god() | 636 self.god = mock.mock_god(ut=self) |
637 self.god.stub_function(utils.logging, 'warn') | 637 self.god.stub_function(base_utils.logging, 'warn') |
638 self.god.stub_function(utils.logging, 'debug') | 638 self.god.stub_function(base_utils.logging, 'debug') |
639 | 639 |
640 | 640 |
641 def tearDown(self): | 641 def tearDown(self): |
642 self.god.unstub_all() | 642 self.god.unstub_all() |
643 | 643 |
644 | 644 |
645 def __check_result(self, result, command, exit_status=0, stdout='', | 645 def __check_result(self, result, command, exit_status=0, stdout='', |
646 stderr=''): | 646 stderr=''): |
647 self.assertEquals(result.command, command) | 647 self.assertEquals(result.command, command) |
648 self.assertEquals(result.exit_status, exit_status) | 648 self.assertEquals(result.exit_status, exit_status) |
649 self.assertEquals(result.stdout, stdout) | 649 self.assertEquals(result.stdout, stdout) |
650 self.assertEquals(result.stderr, stderr) | 650 self.assertEquals(result.stderr, stderr) |
651 | 651 |
652 | 652 |
653 def test_default_simple(self): | 653 def test_default_simple(self): |
654 cmd = 'echo "hello world"' | 654 cmd = 'echo "hello world"' |
655 # expect some king of logging.debug() call but don't care about args | 655 # expect some king of logging.debug() call but don't care about args |
656 utils.logging.debug.expect_any_call() | 656 base_utils.logging.debug.expect_any_call() |
657 self.__check_result(utils.run(cmd), cmd, stdout='hello world\n') | 657 self.__check_result(base_utils.run(cmd), cmd, stdout='hello world\n') |
658 | 658 |
659 | 659 |
660 def test_default_failure(self): | 660 def test_default_failure(self): |
661 cmd = 'exit 11' | 661 cmd = 'exit 11' |
662 try: | 662 try: |
663 utils.run(cmd, verbose=False) | 663 base_utils.run(cmd, verbose=False) |
664 except utils.error.CmdError, err: | 664 except base_utils.error.CmdError, err: |
665 self.__check_result(err.result_obj, cmd, exit_status=11) | 665 self.__check_result(err.result_obj, cmd, exit_status=11) |
666 | 666 |
667 | 667 |
668 def test_ignore_status(self): | 668 def test_ignore_status(self): |
669 cmd = 'echo error >&2 && exit 11' | 669 cmd = 'echo error >&2 && exit 11' |
670 self.__check_result(utils.run(cmd, ignore_status=True, verbose=False), | 670 self.__check_result(base_utils.run(cmd, ignore_status=True, verbose=Fals
e), |
671 cmd, exit_status=11, stderr='error\n') | 671 cmd, exit_status=11, stderr='error\n') |
672 | 672 |
673 | 673 |
674 def test_timeout(self): | 674 def test_timeout(self): |
675 # we expect a logging.warn() message, don't care about the contents | 675 # we expect a logging.warn() message, don't care about the contents |
676 utils.logging.warn.expect_any_call() | 676 base_utils.logging.warn.expect_any_call() |
677 try: | 677 try: |
678 utils.run('echo -n output && sleep 10', timeout=1, verbose=False) | 678 base_utils.run('echo -n output && sleep 10', timeout=1, verbose=Fals
e) |
679 except utils.error.CmdError, err: | 679 except base_utils.error.CmdError, err: |
680 self.assertEquals(err.result_obj.stdout, 'output') | 680 self.assertEquals(err.result_obj.stdout, 'output') |
681 | 681 |
682 | 682 |
683 def test_stdout_stderr_tee(self): | 683 def test_stdout_stderr_tee(self): |
684 cmd = 'echo output && echo error >&2' | 684 cmd = 'echo output && echo error >&2' |
685 stdout_tee = StringIO.StringIO() | 685 stdout_tee = StringIO.StringIO() |
686 stderr_tee = StringIO.StringIO() | 686 stderr_tee = StringIO.StringIO() |
687 | 687 |
688 self.__check_result(utils.run( | 688 self.__check_result(base_utils.run( |
689 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee, | 689 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee, |
690 verbose=False), cmd, stdout='output\n', stderr='error\n') | 690 verbose=False), cmd, stdout='output\n', stderr='error\n') |
691 self.assertEqual(stdout_tee.getvalue(), 'output\n') | 691 self.assertEqual(stdout_tee.getvalue(), 'output\n') |
692 self.assertEqual(stderr_tee.getvalue(), 'error\n') | 692 self.assertEqual(stderr_tee.getvalue(), 'error\n') |
693 | 693 |
694 | 694 |
695 def test_stdin_string(self): | 695 def test_stdin_string(self): |
696 cmd = 'cat' | 696 cmd = 'cat' |
697 self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'), | 697 self.__check_result(base_utils.run(cmd, verbose=False, stdin='hi!\n'), |
698 cmd, stdout='hi!\n') | 698 cmd, stdout='hi!\n') |
699 | 699 |
700 | 700 |
701 def test_safe_args(self): | 701 def test_safe_args(self): |
702 cmd = 'echo "hello \\"world" "again"' | 702 cmd = 'echo "hello \\"world" "again"' |
703 self.__check_result(utils.run( | 703 self.__check_result(base_utils.run( |
704 'echo', verbose=False, args=('hello "world', 'again')), cmd, | 704 'echo', verbose=False, args=('hello "world', 'again')), cmd, |
705 stdout='hello "world again\n') | 705 stdout='hello "world again\n') |
706 | 706 |
707 | 707 |
708 def test_safe_args_given_string(self): | 708 def test_safe_args_given_string(self): |
709 cmd = 'echo "hello \\"world" "again"' | 709 cmd = 'echo "hello \\"world" "again"' |
710 self.assertRaises(TypeError, utils.run, 'echo', args='hello') | 710 self.assertRaises(TypeError, base_utils.run, 'echo', args='hello') |
711 | 711 |
712 | 712 |
713 class test_compare_versions(unittest.TestCase): | 713 class test_compare_versions(unittest.TestCase): |
714 def test_zerofill(self): | 714 def test_zerofill(self): |
715 self.assertEqual(utils.compare_versions('1.7', '1.10'), -1) | 715 self.assertEqual(base_utils.compare_versions('1.7', '1.10'), -1) |
716 self.assertEqual(utils.compare_versions('1.222', '1.3'), 1) | 716 self.assertEqual(base_utils.compare_versions('1.222', '1.3'), 1) |
717 self.assertEqual(utils.compare_versions('1.03', '1.3'), 0) | 717 self.assertEqual(base_utils.compare_versions('1.03', '1.3'), 0) |
718 | 718 |
719 | 719 |
720 def test_unequal_len(self): | 720 def test_unequal_len(self): |
721 self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1) | 721 self.assertEqual(base_utils.compare_versions('1.3', '1.3.4'), -1) |
722 self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1) | 722 self.assertEqual(base_utils.compare_versions('1.3.1', '1.3'), 1) |
723 | 723 |
724 | 724 |
725 def test_dash_delimited(self): | 725 def test_dash_delimited(self): |
726 self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1) | 726 self.assertEqual(base_utils.compare_versions('1-2-3', '1-5-1'), -1) |
727 self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1) | 727 self.assertEqual(base_utils.compare_versions('1-2-1', '1-1-1'), 1) |
728 self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0) | 728 self.assertEqual(base_utils.compare_versions('1-2-4', '1-2-4'), 0) |
729 | 729 |
730 | 730 |
731 def test_alphabets(self): | 731 def test_alphabets(self): |
732 self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1) | 732 self.assertEqual(base_utils.compare_versions('m.l.b', 'n.b.a'), -1) |
733 self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1) | 733 self.assertEqual(base_utils.compare_versions('n.b.a', 'm.l.b'), 1) |
734 self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0) | 734 self.assertEqual(base_utils.compare_versions('abc.e', 'abc.e'), 0) |
735 | 735 |
736 | 736 |
737 def test_mix_symbols(self): | 737 def test_mix_symbols(self): |
738 self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1) | 738 self.assertEqual(base_utils.compare_versions('k-320.1', 'k-320.3'), -1) |
739 self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1) | 739 self.assertEqual(base_utils.compare_versions('k-231.5', 'k-231.1'), 1) |
740 self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0) | 740 self.assertEqual(base_utils.compare_versions('k-231.1', 'k-231.1'), 0) |
741 | 741 |
742 self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1) | 742 self.assertEqual(base_utils.compare_versions('k.320-1', 'k.320-3'), -1) |
743 self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1) | 743 self.assertEqual(base_utils.compare_versions('k.231-5', 'k.231-1'), 1) |
744 self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0) | 744 self.assertEqual(base_utils.compare_versions('k.231-1', 'k.231-1'), 0) |
745 | 745 |
746 | 746 |
747 class test_args_to_dict(unittest.TestCase): | 747 class test_args_to_dict(unittest.TestCase): |
748 def test_no_args(self): | 748 def test_no_args(self): |
749 result = utils.args_to_dict([]) | 749 result = base_utils.args_to_dict([]) |
750 self.assertEqual({}, result) | 750 self.assertEqual({}, result) |
751 | 751 |
752 | 752 |
753 def test_matches(self): | 753 def test_matches(self): |
754 result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:', | 754 result = base_utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:', |
755 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:']) | 755 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:']) |
756 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'', | 756 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'', |
757 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'}) | 757 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'}) |
758 | 758 |
759 | 759 |
760 def test_unmatches(self): | 760 def test_unmatches(self): |
761 # Temporarily shut warning messages from args_to_dict() when an argument | 761 # Temporarily shut warning messages from args_to_dict() when an argument |
762 # doesn't match its pattern. | 762 # doesn't match its pattern. |
763 logger = logging.getLogger() | 763 logger = logging.getLogger() |
764 saved_level = logger.level | 764 saved_level = logger.level |
765 logger.setLevel(logging.ERROR) | 765 logger.setLevel(logging.ERROR) |
766 | 766 |
767 try: | 767 try: |
768 result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b', | 768 result = base_utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', '
a*b', |
769 ':VAL', '=VVV', 'WORD']) | 769 ':VAL', '=VVV', 'WORD']) |
770 self.assertEqual({}, result) | 770 self.assertEqual({}, result) |
771 finally: | 771 finally: |
772 # Restore level. | 772 # Restore level. |
773 logger.setLevel(saved_level) | 773 logger.setLevel(saved_level) |
774 | 774 |
775 | 775 |
776 class test_get_random_port(unittest.TestCase): | 776 class test_get_random_port(unittest.TestCase): |
777 def do_bind(self, port, socket_type, socket_proto): | 777 def do_bind(self, port, socket_type, socket_proto): |
778 s = socket.socket(socket.AF_INET, socket_type, socket_proto) | 778 s = socket.socket(socket.AF_INET, socket_type, socket_proto) |
779 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | 779 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
780 s.bind(('', port)) | 780 s.bind(('', port)) |
781 return s | 781 return s |
782 | 782 |
783 | 783 |
784 def test_get_port(self): | 784 def test_get_port(self): |
785 for _ in xrange(100): | 785 for _ in xrange(100): |
786 p = utils.get_unused_port() | 786 p = base_utils.get_unused_port() |
787 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP) | 787 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP) |
788 self.assert_(s.getsockname()) | 788 self.assert_(s.getsockname()) |
789 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | 789 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP) |
790 self.assert_(s.getsockname()) | 790 self.assert_(s.getsockname()) |
791 | 791 |
792 | 792 |
793 if __name__ == "__main__": | 793 if __name__ == "__main__": |
794 unittest.main() | 794 unittest.main() |
OLD | NEW |