| OLD | NEW |
| (Empty) |
| 1 #!/usr/bin/python | |
| 2 | |
| 3 import os, unittest, StringIO, socket, urllib2, shutil, subprocess, logging | |
| 4 | |
| 5 import common | |
| 6 from autotest_lib.client.common_lib import base_utils, autotemp | |
| 7 from autotest_lib.client.common_lib.test_utils import mock | |
| 8 | |
| 9 | |
| 10 class test_read_one_line(unittest.TestCase): | |
| 11 def setUp(self): | |
| 12 self.god = mock.mock_god() | |
| 13 self.god.stub_function(base_utils, "open") | |
| 14 | |
| 15 | |
| 16 def tearDown(self): | |
| 17 self.god.unstub_all() | |
| 18 | |
| 19 | |
| 20 def test_ip_to_long(self): | |
| 21 self.assertEqual(base_utils.ip_to_long('0.0.0.0'), 0) | |
| 22 self.assertEqual(base_utils.ip_to_long('255.255.255.255'), 4294967295) | |
| 23 self.assertEqual(base_utils.ip_to_long('192.168.0.1'), 3232235521) | |
| 24 self.assertEqual(base_utils.ip_to_long('1.2.4.8'), 16909320) | |
| 25 | |
| 26 | |
| 27 def test_long_to_ip(self): | |
| 28 self.assertEqual(base_utils.long_to_ip(0), '0.0.0.0') | |
| 29 self.assertEqual(base_utils.long_to_ip(4294967295), '255.255.255.255') | |
| 30 self.assertEqual(base_utils.long_to_ip(3232235521), '192.168.0.1') | |
| 31 self.assertEqual(base_utils.long_to_ip(16909320), '1.2.4.8') | |
| 32 | |
| 33 | |
| 34 def test_create_subnet_mask(self): | |
| 35 self.assertEqual(base_utils.create_subnet_mask(0), 0) | |
| 36 self.assertEqual(base_utils.create_subnet_mask(32), 4294967295) | |
| 37 self.assertEqual(base_utils.create_subnet_mask(25), 4294967168) | |
| 38 | |
| 39 | |
| 40 def test_format_ip_with_mask(self): | |
| 41 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 0), | |
| 42 '0.0.0.0/0') | |
| 43 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 32), | |
| 44 '192.168.0.1/32') | |
| 45 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 26), | |
| 46 '192.168.0.0/26') | |
| 47 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.255', 26), | |
| 48 '192.168.0.192/26') | |
| 49 | |
| 50 | |
| 51 def create_test_file(self, contents): | |
| 52 test_file = StringIO.StringIO(contents) | |
| 53 base_utils.open.expect_call("filename", "r").and_return(test_file) | |
| 54 | |
| 55 | |
| 56 def test_reads_one_line_file(self): | |
| 57 self.create_test_file("abc\n") | |
| 58 self.assertEqual("abc", base_utils.read_one_line("filename")) | |
| 59 self.god.check_playback() | |
| 60 | |
| 61 | |
| 62 def test_strips_read_lines(self): | |
| 63 self.create_test_file("abc \n") | |
| 64 self.assertEqual("abc ", base_utils.read_one_line("filename")) | |
| 65 self.god.check_playback() | |
| 66 | |
| 67 | |
| 68 def test_drops_extra_lines(self): | |
| 69 self.create_test_file("line 1\nline 2\nline 3\n") | |
| 70 self.assertEqual("line 1", base_utils.read_one_line("filename")) | |
| 71 self.god.check_playback() | |
| 72 | |
| 73 | |
| 74 def test_works_on_empty_file(self): | |
| 75 self.create_test_file("") | |
| 76 self.assertEqual("", base_utils.read_one_line("filename")) | |
| 77 self.god.check_playback() | |
| 78 | |
| 79 | |
| 80 def test_works_on_file_with_no_newlines(self): | |
| 81 self.create_test_file("line but no newline") | |
| 82 self.assertEqual("line but no newline", | |
| 83 base_utils.read_one_line("filename")) | |
| 84 self.god.check_playback() | |
| 85 | |
| 86 | |
| 87 def test_preserves_leading_whitespace(self): | |
| 88 self.create_test_file(" has leading whitespace") | |
| 89 self.assertEqual(" has leading whitespace", | |
| 90 base_utils.read_one_line("filename")) | |
| 91 | |
| 92 | |
| 93 class test_write_one_line(unittest.TestCase): | |
| 94 def setUp(self): | |
| 95 self.god = mock.mock_god() | |
| 96 self.god.stub_function(base_utils, "open") | |
| 97 | |
| 98 | |
| 99 def tearDown(self): | |
| 100 self.god.unstub_all() | |
| 101 | |
| 102 | |
| 103 def get_write_one_line_output(self, content): | |
| 104 test_file = mock.SaveDataAfterCloseStringIO() | |
| 105 base_utils.open.expect_call("filename", "w").and_return(test_file) | |
| 106 base_utils.write_one_line("filename", content) | |
| 107 self.god.check_playback() | |
| 108 return test_file.final_data | |
| 109 | |
| 110 | |
| 111 def test_writes_one_line_file(self): | |
| 112 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) | |
| 113 | |
| 114 | |
| 115 def test_preserves_existing_newline(self): | |
| 116 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) | |
| 117 | |
| 118 | |
| 119 def test_preserves_leading_whitespace(self): | |
| 120 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) | |
| 121 | |
| 122 | |
| 123 def test_preserves_trailing_whitespace(self): | |
| 124 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) | |
| 125 | |
| 126 | |
| 127 def test_handles_empty_input(self): | |
| 128 self.assertEqual("\n", self.get_write_one_line_output("")) | |
| 129 | |
| 130 | |
| 131 class test_open_write_close(unittest.TestCase): | |
| 132 def setUp(self): | |
| 133 self.god = mock.mock_god() | |
| 134 self.god.stub_function(base_utils, "open") | |
| 135 | |
| 136 | |
| 137 def tearDown(self): | |
| 138 self.god.unstub_all() | |
| 139 | |
| 140 | |
| 141 def test_simple_functionality(self): | |
| 142 data = "\n\nwhee\n" | |
| 143 test_file = mock.SaveDataAfterCloseStringIO() | |
| 144 base_utils.open.expect_call("filename", "w").and_return(test_file) | |
| 145 base_utils.open_write_close("filename", data) | |
| 146 self.god.check_playback() | |
| 147 self.assertEqual(data, test_file.final_data) | |
| 148 | |
| 149 | |
| 150 class test_read_keyval(unittest.TestCase): | |
| 151 def setUp(self): | |
| 152 self.god = mock.mock_god() | |
| 153 self.god.stub_function(base_utils, "open") | |
| 154 self.god.stub_function(os.path, "isdir") | |
| 155 self.god.stub_function(os.path, "exists") | |
| 156 | |
| 157 | |
| 158 def tearDown(self): | |
| 159 self.god.unstub_all() | |
| 160 | |
| 161 | |
| 162 def create_test_file(self, filename, contents): | |
| 163 test_file = StringIO.StringIO(contents) | |
| 164 os.path.exists.expect_call(filename).and_return(True) | |
| 165 base_utils.open.expect_call(filename).and_return(test_file) | |
| 166 | |
| 167 | |
| 168 def read_keyval(self, contents): | |
| 169 os.path.isdir.expect_call("file").and_return(False) | |
| 170 self.create_test_file("file", contents) | |
| 171 keyval = base_utils.read_keyval("file") | |
| 172 self.god.check_playback() | |
| 173 return keyval | |
| 174 | |
| 175 | |
| 176 def test_returns_empty_when_file_doesnt_exist(self): | |
| 177 os.path.isdir.expect_call("file").and_return(False) | |
| 178 os.path.exists.expect_call("file").and_return(False) | |
| 179 self.assertEqual({}, base_utils.read_keyval("file")) | |
| 180 self.god.check_playback() | |
| 181 | |
| 182 | |
| 183 def test_accesses_files_directly(self): | |
| 184 os.path.isdir.expect_call("file").and_return(False) | |
| 185 self.create_test_file("file", "") | |
| 186 base_utils.read_keyval("file") | |
| 187 self.god.check_playback() | |
| 188 | |
| 189 | |
| 190 def test_accesses_directories_through_keyval_file(self): | |
| 191 os.path.isdir.expect_call("dir").and_return(True) | |
| 192 self.create_test_file("dir/keyval", "") | |
| 193 base_utils.read_keyval("dir") | |
| 194 self.god.check_playback() | |
| 195 | |
| 196 | |
| 197 def test_values_are_rstripped(self): | |
| 198 keyval = self.read_keyval("a=b \n") | |
| 199 self.assertEquals(keyval, {"a": "b"}) | |
| 200 | |
| 201 | |
| 202 def test_comments_are_ignored(self): | |
| 203 keyval = self.read_keyval("a=b # a comment\n") | |
| 204 self.assertEquals(keyval, {"a": "b"}) | |
| 205 | |
| 206 | |
| 207 def test_integers_become_ints(self): | |
| 208 keyval = self.read_keyval("a=1\n") | |
| 209 self.assertEquals(keyval, {"a": 1}) | |
| 210 self.assertEquals(int, type(keyval["a"])) | |
| 211 | |
| 212 | |
| 213 def test_float_values_become_floats(self): | |
| 214 keyval = self.read_keyval("a=1.5\n") | |
| 215 self.assertEquals(keyval, {"a": 1.5}) | |
| 216 self.assertEquals(float, type(keyval["a"])) | |
| 217 | |
| 218 | |
| 219 def test_multiple_lines(self): | |
| 220 keyval = self.read_keyval("a=one\nb=two\n") | |
| 221 self.assertEquals(keyval, {"a": "one", "b": "two"}) | |
| 222 | |
| 223 | |
| 224 def test_the_last_duplicate_line_is_used(self): | |
| 225 keyval = self.read_keyval("a=one\nb=two\na=three\n") | |
| 226 self.assertEquals(keyval, {"a": "three", "b": "two"}) | |
| 227 | |
| 228 | |
| 229 def test_extra_equals_are_included_in_values(self): | |
| 230 keyval = self.read_keyval("a=b=c\n") | |
| 231 self.assertEquals(keyval, {"a": "b=c"}) | |
| 232 | |
| 233 | |
| 234 def test_non_alphanumeric_keynames_are_rejected(self): | |
| 235 self.assertRaises(ValueError, self.read_keyval, "a$=one\n") | |
| 236 | |
| 237 | |
| 238 def test_underscores_are_allowed_in_key_names(self): | |
| 239 keyval = self.read_keyval("a_b=value\n") | |
| 240 self.assertEquals(keyval, {"a_b": "value"}) | |
| 241 | |
| 242 | |
| 243 def test_dashes_are_allowed_in_key_names(self): | |
| 244 keyval = self.read_keyval("a-b=value\n") | |
| 245 self.assertEquals(keyval, {"a-b": "value"}) | |
| 246 | |
| 247 | |
| 248 class test_write_keyval(unittest.TestCase): | |
| 249 def setUp(self): | |
| 250 self.god = mock.mock_god() | |
| 251 self.god.stub_function(base_utils, "open") | |
| 252 self.god.stub_function(os.path, "isdir") | |
| 253 | |
| 254 | |
| 255 def tearDown(self): | |
| 256 self.god.unstub_all() | |
| 257 | |
| 258 | |
| 259 def assertHasLines(self, value, lines): | |
| 260 vlines = value.splitlines() | |
| 261 vlines.sort() | |
| 262 self.assertEquals(vlines, sorted(lines)) | |
| 263 | |
| 264 | |
| 265 def write_keyval(self, filename, dictionary, expected_filename=None, | |
| 266 type_tag=None): | |
| 267 if expected_filename is None: | |
| 268 expected_filename = filename | |
| 269 test_file = StringIO.StringIO() | |
| 270 self.god.stub_function(test_file, "close") | |
| 271 base_utils.open.expect_call(expected_filename, | |
| 272 "a").and_return(test_file) | |
| 273 test_file.close.expect_call() | |
| 274 if type_tag is None: | |
| 275 base_utils.write_keyval(filename, dictionary) | |
| 276 else: | |
| 277 base_utils.write_keyval(filename, dictionary, type_tag) | |
| 278 return test_file.getvalue() | |
| 279 | |
| 280 | |
| 281 def write_keyval_file(self, dictionary, type_tag=None): | |
| 282 os.path.isdir.expect_call("file").and_return(False) | |
| 283 return self.write_keyval("file", dictionary, type_tag=type_tag) | |
| 284 | |
| 285 | |
| 286 def test_accesses_files_directly(self): | |
| 287 os.path.isdir.expect_call("file").and_return(False) | |
| 288 result = self.write_keyval("file", {"a": "1"}) | |
| 289 self.assertEquals(result, "a=1\n") | |
| 290 | |
| 291 | |
| 292 def test_accesses_directories_through_keyval_file(self): | |
| 293 os.path.isdir.expect_call("dir").and_return(True) | |
| 294 result = self.write_keyval("dir", {"b": "2"}, "dir/keyval") | |
| 295 self.assertEquals(result, "b=2\n") | |
| 296 | |
| 297 | |
| 298 def test_numbers_are_stringified(self): | |
| 299 result = self.write_keyval_file({"c": 3}) | |
| 300 self.assertEquals(result, "c=3\n") | |
| 301 | |
| 302 | |
| 303 def test_type_tags_are_excluded_by_default(self): | |
| 304 result = self.write_keyval_file({"d": "a string"}) | |
| 305 self.assertEquals(result, "d=a string\n") | |
| 306 self.assertRaises(ValueError, self.write_keyval_file, | |
| 307 {"d{perf}": "a string"}) | |
| 308 | |
| 309 | |
| 310 def test_perf_tags_are_allowed(self): | |
| 311 result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2}, | |
| 312 type_tag="perf") | |
| 313 self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"]) | |
| 314 self.assertRaises(ValueError, self.write_keyval_file, | |
| 315 {"a": 1, "b": 2}, type_tag="perf") | |
| 316 | |
| 317 | |
| 318 def test_non_alphanumeric_keynames_are_rejected(self): | |
| 319 self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0}) | |
| 320 | |
| 321 | |
| 322 def test_underscores_are_allowed_in_key_names(self): | |
| 323 result = self.write_keyval_file({"a_b": "value"}) | |
| 324 self.assertEquals(result, "a_b=value\n") | |
| 325 | |
| 326 | |
| 327 def test_dashes_are_allowed_in_key_names(self): | |
| 328 result = self.write_keyval_file({"a-b": "value"}) | |
| 329 self.assertEquals(result, "a-b=value\n") | |
| 330 | |
| 331 | |
| 332 class test_is_url(unittest.TestCase): | |
| 333 def test_accepts_http(self): | |
| 334 self.assertTrue(base_utils.is_url("http://example.com")) | |
| 335 | |
| 336 | |
| 337 def test_accepts_ftp(self): | |
| 338 self.assertTrue(base_utils.is_url("ftp://ftp.example.com")) | |
| 339 | |
| 340 | |
| 341 def test_rejects_local_path(self): | |
| 342 self.assertFalse(base_utils.is_url("/home/username/file")) | |
| 343 | |
| 344 | |
| 345 def test_rejects_local_filename(self): | |
| 346 self.assertFalse(base_utils.is_url("filename")) | |
| 347 | |
| 348 | |
| 349 def test_rejects_relative_local_path(self): | |
| 350 self.assertFalse(base_utils.is_url("somedir/somesubdir/file")) | |
| 351 | |
| 352 | |
| 353 def test_rejects_local_path_containing_url(self): | |
| 354 self.assertFalse(base_utils.is_url("somedir/http://path/file")) | |
| 355 | |
| 356 | |
| 357 class test_urlopen(unittest.TestCase): | |
| 358 def setUp(self): | |
| 359 self.god = mock.mock_god() | |
| 360 | |
| 361 | |
| 362 def tearDown(self): | |
| 363 self.god.unstub_all() | |
| 364 | |
| 365 | |
| 366 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, | |
| 367 *expected_args): | |
| 368 expected_args += (None,) * (2 - len(expected_args)) | |
| 369 def urlopen(url, data=None): | |
| 370 self.assertEquals(expected_args, (url,data)) | |
| 371 test_func(socket.getdefaulttimeout()) | |
| 372 return expected_return | |
| 373 self.god.stub_with(urllib2, "urlopen", urlopen) | |
| 374 | |
| 375 | |
| 376 def stub_urlopen_with_timeout_check(self, expected_timeout, | |
| 377 expected_return, *expected_args): | |
| 378 def test_func(timeout): | |
| 379 self.assertEquals(timeout, expected_timeout) | |
| 380 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, | |
| 381 *expected_args) | |
| 382 | |
| 383 | |
| 384 def test_timeout_set_during_call(self): | |
| 385 self.stub_urlopen_with_timeout_check(30, "retval", "url") | |
| 386 retval = base_utils.urlopen("url", timeout=30) | |
| 387 self.assertEquals(retval, "retval") | |
| 388 | |
| 389 | |
| 390 def test_timeout_reset_after_call(self): | |
| 391 old_timeout = socket.getdefaulttimeout() | |
| 392 self.stub_urlopen_with_timeout_check(30, None, "url") | |
| 393 try: | |
| 394 socket.setdefaulttimeout(1234) | |
| 395 base_utils.urlopen("url", timeout=30) | |
| 396 self.assertEquals(1234, socket.getdefaulttimeout()) | |
| 397 finally: | |
| 398 socket.setdefaulttimeout(old_timeout) | |
| 399 | |
| 400 | |
| 401 def test_timeout_set_by_default(self): | |
| 402 def test_func(timeout): | |
| 403 self.assertTrue(timeout is not None) | |
| 404 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") | |
| 405 base_utils.urlopen("url") | |
| 406 | |
| 407 | |
| 408 def test_args_are_untouched(self): | |
| 409 self.stub_urlopen_with_timeout_check(30, None, "http://url", | |
| 410 "POST data") | |
| 411 base_utils.urlopen("http://url", timeout=30, data="POST data") | |
| 412 | |
| 413 | |
| 414 class test_urlretrieve(unittest.TestCase): | |
| 415 def setUp(self): | |
| 416 self.god = mock.mock_god() | |
| 417 | |
| 418 | |
| 419 def tearDown(self): | |
| 420 self.god.unstub_all() | |
| 421 | |
| 422 | |
| 423 def test_urlopen_passed_arguments(self): | |
| 424 self.god.stub_function(base_utils, "urlopen") | |
| 425 self.god.stub_function(base_utils.shutil, "copyfileobj") | |
| 426 self.god.stub_function(base_utils, "open") | |
| 427 | |
| 428 url = "url" | |
| 429 dest = "somefile" | |
| 430 data = object() | |
| 431 timeout = 10 | |
| 432 | |
| 433 src_file = self.god.create_mock_class(file, "file") | |
| 434 dest_file = self.god.create_mock_class(file, "file") | |
| 435 | |
| 436 (base_utils.urlopen.expect_call(url, data=data, timeout=timeout) | |
| 437 .and_return(src_file)) | |
| 438 base_utils.open.expect_call(dest, "wb").and_return(dest_file) | |
| 439 base_utils.shutil.copyfileobj.expect_call(src_file, dest_file) | |
| 440 dest_file.close.expect_call() | |
| 441 src_file.close.expect_call() | |
| 442 | |
| 443 base_utils.urlretrieve(url, dest, data=data, timeout=timeout) | |
| 444 self.god.check_playback() | |
| 445 | |
| 446 | |
| 447 class test_merge_trees(unittest.TestCase): | |
| 448 # a some path-handling helper functions | |
| 449 def src(self, *path_segments): | |
| 450 return os.path.join(self.src_tree.name, *path_segments) | |
| 451 | |
| 452 | |
| 453 def dest(self, *path_segments): | |
| 454 return os.path.join(self.dest_tree.name, *path_segments) | |
| 455 | |
| 456 | |
| 457 def paths(self, *path_segments): | |
| 458 return self.src(*path_segments), self.dest(*path_segments) | |
| 459 | |
| 460 | |
| 461 def assertFileEqual(self, *path_segments): | |
| 462 src, dest = self.paths(*path_segments) | |
| 463 self.assertEqual(True, os.path.isfile(src)) | |
| 464 self.assertEqual(True, os.path.isfile(dest)) | |
| 465 self.assertEqual(os.path.getsize(src), os.path.getsize(dest)) | |
| 466 self.assertEqual(open(src).read(), open(dest).read()) | |
| 467 | |
| 468 | |
| 469 def assertFileContents(self, contents, *path_segments): | |
| 470 dest = self.dest(*path_segments) | |
| 471 self.assertEqual(True, os.path.isfile(dest)) | |
| 472 self.assertEqual(os.path.getsize(dest), len(contents)) | |
| 473 self.assertEqual(contents, open(dest).read()) | |
| 474 | |
| 475 | |
| 476 def setUp(self): | |
| 477 self.src_tree = autotemp.tempdir(unique_id='utilsrc') | |
| 478 self.dest_tree = autotemp.tempdir(unique_id='utilsdest') | |
| 479 | |
| 480 # empty subdirs | |
| 481 os.mkdir(self.src("empty")) | |
| 482 os.mkdir(self.dest("empty")) | |
| 483 | |
| 484 | |
| 485 def tearDown(self): | |
| 486 self.src_tree.clean() | |
| 487 self.dest_tree.clean() | |
| 488 | |
| 489 | |
| 490 def test_both_dont_exist(self): | |
| 491 base_utils.merge_trees(*self.paths("empty")) | |
| 492 | |
| 493 | |
| 494 def test_file_only_at_src(self): | |
| 495 print >> open(self.src("src_only"), "w"), "line 1" | |
| 496 base_utils.merge_trees(*self.paths("src_only")) | |
| 497 self.assertFileEqual("src_only") | |
| 498 | |
| 499 | |
| 500 def test_file_only_at_dest(self): | |
| 501 print >> open(self.dest("dest_only"), "w"), "line 1" | |
| 502 base_utils.merge_trees(*self.paths("dest_only")) | |
| 503 self.assertEqual(False, os.path.exists(self.src("dest_only"))) | |
| 504 self.assertFileContents("line 1\n", "dest_only") | |
| 505 | |
| 506 | |
| 507 def test_file_at_both(self): | |
| 508 print >> open(self.dest("in_both"), "w"), "line 1" | |
| 509 print >> open(self.src("in_both"), "w"), "line 2" | |
| 510 base_utils.merge_trees(*self.paths("in_both")) | |
| 511 self.assertFileContents("line 1\nline 2\n", "in_both") | |
| 512 | |
| 513 | |
| 514 def test_directory_with_files_in_both(self): | |
| 515 print >> open(self.dest("in_both"), "w"), "line 1" | |
| 516 print >> open(self.src("in_both"), "w"), "line 3" | |
| 517 base_utils.merge_trees(*self.paths()) | |
| 518 self.assertFileContents("line 1\nline 3\n", "in_both") | |
| 519 | |
| 520 | |
| 521 def test_directory_with_mix_of_files(self): | |
| 522 print >> open(self.dest("in_dest"), "w"), "dest line" | |
| 523 print >> open(self.src("in_src"), "w"), "src line" | |
| 524 base_utils.merge_trees(*self.paths()) | |
| 525 self.assertFileContents("dest line\n", "in_dest") | |
| 526 self.assertFileContents("src line\n", "in_src") | |
| 527 | |
| 528 | |
| 529 def test_directory_with_subdirectories(self): | |
| 530 os.mkdir(self.src("src_subdir")) | |
| 531 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line" | |
| 532 os.mkdir(self.src("both_subdir")) | |
| 533 os.mkdir(self.dest("both_subdir")) | |
| 534 print >> open(self.src("both_subdir", "subfile"), "w"), "src line" | |
| 535 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line" | |
| 536 base_utils.merge_trees(*self.paths()) | |
| 537 self.assertFileContents("subdir line\n", "src_subdir", "subfile") | |
| 538 self.assertFileContents("dest line\nsrc line\n", "both_subdir", | |
| 539 "subfile") | |
| 540 | |
| 541 | |
| 542 class test_get_relative_path(unittest.TestCase): | |
| 543 def test_not_absolute(self): | |
| 544 self.assertRaises(AssertionError, | |
| 545 base_utils.get_relative_path, "a", "b") | |
| 546 | |
| 547 def test_same_dir(self): | |
| 548 self.assertEqual(base_utils.get_relative_path("/a/b/c", "/a/b"), "c") | |
| 549 | |
| 550 def test_forward_dir(self): | |
| 551 self.assertEqual(base_utils.get_relative_path("/a/b/c/d", "/a/b"), | |
| 552 "c/d") | |
| 553 | |
| 554 def test_previous_dir(self): | |
| 555 self.assertEqual(base_utils.get_relative_path("/a/b", "/a/b/c/d"), | |
| 556 "../..") | |
| 557 | |
| 558 def test_parallel_dir(self): | |
| 559 self.assertEqual(base_utils.get_relative_path("/a/c/d", "/a/b/c/d"), | |
| 560 "../../../c/d") | |
| 561 | |
| 562 | |
| 563 class test_sh_escape(unittest.TestCase): | |
| 564 def _test_in_shell(self, text): | |
| 565 escaped_text = base_utils.sh_escape(text) | |
| 566 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, | |
| 567 stdin=open(os.devnull, 'r'), | |
| 568 stdout=subprocess.PIPE, | |
| 569 stderr=open(os.devnull, 'w')) | |
| 570 stdout, _ = proc.communicate() | |
| 571 self.assertEqual(proc.returncode, 0) | |
| 572 self.assertEqual(stdout[:-1], text) | |
| 573 | |
| 574 | |
| 575 def test_normal_string(self): | |
| 576 self._test_in_shell('abcd') | |
| 577 | |
| 578 | |
| 579 def test_spaced_string(self): | |
| 580 self._test_in_shell('abcd efgh') | |
| 581 | |
| 582 | |
| 583 def test_dollar(self): | |
| 584 self._test_in_shell('$') | |
| 585 | |
| 586 | |
| 587 def test_single_quote(self): | |
| 588 self._test_in_shell('\'') | |
| 589 | |
| 590 | |
| 591 def test_single_quoted_string(self): | |
| 592 self._test_in_shell('\'efgh\'') | |
| 593 | |
| 594 | |
| 595 def test_double_quote(self): | |
| 596 self._test_in_shell('"') | |
| 597 | |
| 598 | |
| 599 def test_double_quoted_string(self): | |
| 600 self._test_in_shell('"abcd"') | |
| 601 | |
| 602 | |
| 603 def test_backtick(self): | |
| 604 self._test_in_shell('`') | |
| 605 | |
| 606 | |
| 607 def test_backticked_string(self): | |
| 608 self._test_in_shell('`jklm`') | |
| 609 | |
| 610 | |
| 611 def test_backslash(self): | |
| 612 self._test_in_shell('\\') | |
| 613 | |
| 614 | |
| 615 def test_backslashed_special_characters(self): | |
| 616 self._test_in_shell('\\$') | |
| 617 self._test_in_shell('\\"') | |
| 618 self._test_in_shell('\\\'') | |
| 619 self._test_in_shell('\\`') | |
| 620 | |
| 621 | |
| 622 def test_backslash_codes(self): | |
| 623 self._test_in_shell('\\n') | |
| 624 self._test_in_shell('\\r') | |
| 625 self._test_in_shell('\\t') | |
| 626 self._test_in_shell('\\v') | |
| 627 self._test_in_shell('\\b') | |
| 628 self._test_in_shell('\\a') | |
| 629 self._test_in_shell('\\000') | |
| 630 | |
| 631 | |
| 632 class test_run(unittest.TestCase): | |
| 633 """ | |
| 634 Test the base_utils.run() function. | |
| 635 | |
| 636 Note: This test runs simple external commands to test the base_utils.run() | |
| 637 API without assuming implementation details. | |
| 638 """ | |
| 639 def setUp(self): | |
| 640 self.god = mock.mock_god() | |
| 641 self.god.stub_function(base_utils.logging, 'warn') | |
| 642 self.god.stub_function(base_utils.logging, 'debug') | |
| 643 | |
| 644 | |
| 645 def tearDown(self): | |
| 646 self.god.unstub_all() | |
| 647 | |
| 648 | |
| 649 def __check_result(self, result, command, exit_status=0, stdout='', | |
| 650 stderr=''): | |
| 651 self.assertEquals(result.command, command) | |
| 652 self.assertEquals(result.exit_status, exit_status) | |
| 653 self.assertEquals(result.stdout, stdout) | |
| 654 self.assertEquals(result.stderr, stderr) | |
| 655 | |
| 656 | |
| 657 def test_default_simple(self): | |
| 658 cmd = 'echo "hello world"' | |
| 659 # expect some king of logging.debug() call but don't care about args | |
| 660 base_utils.logging.debug.expect_any_call() | |
| 661 self.__check_result(base_utils.run(cmd), cmd, stdout='hello world\n') | |
| 662 | |
| 663 | |
| 664 def test_default_failure(self): | |
| 665 cmd = 'exit 11' | |
| 666 try: | |
| 667 base_utils.run(cmd, verbose=False) | |
| 668 except base_utils.error.CmdError, err: | |
| 669 self.__check_result(err.result_obj, cmd, exit_status=11) | |
| 670 | |
| 671 | |
| 672 def test_ignore_status(self): | |
| 673 cmd = 'echo error >&2 && exit 11' | |
| 674 self.__check_result(base_utils.run(cmd, ignore_status=True, | |
| 675 verbose=False), | |
| 676 cmd, exit_status=11, stderr='error\n') | |
| 677 | |
| 678 | |
| 679 def test_timeout(self): | |
| 680 # we expect a logging.warn() message, don't care about the contents | |
| 681 base_utils.logging.warn.expect_any_call() | |
| 682 try: | |
| 683 base_utils.run('echo -n output && sleep 10', | |
| 684 timeout=1, verbose=False) | |
| 685 except base_utils.error.CmdError, err: | |
| 686 self.assertEquals(err.result_obj.stdout, 'output') | |
| 687 | |
| 688 | |
| 689 def test_stdout_stderr_tee(self): | |
| 690 cmd = 'echo output && echo error >&2' | |
| 691 stdout_tee = StringIO.StringIO() | |
| 692 stderr_tee = StringIO.StringIO() | |
| 693 | |
| 694 self.__check_result(base_utils.run( | |
| 695 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee, | |
| 696 verbose=False), cmd, stdout='output\n', stderr='error\n') | |
| 697 self.assertEqual(stdout_tee.getvalue(), 'output\n') | |
| 698 self.assertEqual(stderr_tee.getvalue(), 'error\n') | |
| 699 | |
| 700 | |
| 701 def test_stdin_string(self): | |
| 702 cmd = 'cat' | |
| 703 self.__check_result(base_utils.run(cmd, verbose=False, stdin='hi!\n'), | |
| 704 cmd, stdout='hi!\n') | |
| 705 | |
| 706 | |
| 707 def test_safe_args(self): | |
| 708 cmd = 'echo "hello \\"world" "again"' | |
| 709 self.__check_result(base_utils.run( | |
| 710 'echo', verbose=False, args=('hello "world', 'again')), cmd, | |
| 711 stdout='hello "world again\n') | |
| 712 | |
| 713 | |
| 714 def test_safe_args_given_string(self): | |
| 715 cmd = 'echo "hello \\"world" "again"' | |
| 716 self.assertRaises(TypeError, base_utils.run, 'echo', args='hello') | |
| 717 | |
| 718 | |
| 719 class test_compare_versions(unittest.TestCase): | |
| 720 def test_zerofill(self): | |
| 721 self.assertEqual(base_utils.compare_versions('1.7', '1.10'), -1) | |
| 722 self.assertEqual(base_utils.compare_versions('1.222', '1.3'), 1) | |
| 723 self.assertEqual(base_utils.compare_versions('1.03', '1.3'), 0) | |
| 724 | |
| 725 | |
| 726 def test_unequal_len(self): | |
| 727 self.assertEqual(base_utils.compare_versions('1.3', '1.3.4'), -1) | |
| 728 self.assertEqual(base_utils.compare_versions('1.3.1', '1.3'), 1) | |
| 729 | |
| 730 | |
| 731 def test_dash_delimited(self): | |
| 732 self.assertEqual(base_utils.compare_versions('1-2-3', '1-5-1'), -1) | |
| 733 self.assertEqual(base_utils.compare_versions('1-2-1', '1-1-1'), 1) | |
| 734 self.assertEqual(base_utils.compare_versions('1-2-4', '1-2-4'), 0) | |
| 735 | |
| 736 | |
| 737 def test_alphabets(self): | |
| 738 self.assertEqual(base_utils.compare_versions('m.l.b', 'n.b.a'), -1) | |
| 739 self.assertEqual(base_utils.compare_versions('n.b.a', 'm.l.b'), 1) | |
| 740 self.assertEqual(base_utils.compare_versions('abc.e', 'abc.e'), 0) | |
| 741 | |
| 742 | |
| 743 def test_mix_symbols(self): | |
| 744 self.assertEqual(base_utils.compare_versions('k-320.1', 'k-320.3'), -1) | |
| 745 self.assertEqual(base_utils.compare_versions('k-231.5', 'k-231.1'), 1) | |
| 746 self.assertEqual(base_utils.compare_versions('k-231.1', 'k-231.1'), 0) | |
| 747 | |
| 748 self.assertEqual(base_utils.compare_versions('k.320-1', 'k.320-3'), -1) | |
| 749 self.assertEqual(base_utils.compare_versions('k.231-5', 'k.231-1'), 1) | |
| 750 self.assertEqual(base_utils.compare_versions('k.231-1', 'k.231-1'), 0) | |
| 751 | |
| 752 | |
| 753 class test_args_to_dict(unittest.TestCase): | |
| 754 def test_no_args(self): | |
| 755 result = base_utils.args_to_dict([]) | |
| 756 self.assertEqual({}, result) | |
| 757 | |
| 758 | |
| 759 def test_matches(self): | |
| 760 result = base_utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:', | |
| 761 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:']) | |
| 762 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'', | |
| 763 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'}) | |
| 764 | |
| 765 | |
| 766 def test_unmatches(self): | |
| 767 # Temporarily shut warning messages from args_to_dict() when an argument | |
| 768 # doesn't match its pattern. | |
| 769 logger = logging.getLogger() | |
| 770 saved_level = logger.level | |
| 771 logger.setLevel(logging.ERROR) | |
| 772 | |
| 773 try: | |
| 774 result = base_utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', | |
| 775 'a*b', ':VAL', '=VVV', 'WORD']) | |
| 776 self.assertEqual({}, result) | |
| 777 finally: | |
| 778 # Restore level. | |
| 779 logger.setLevel(saved_level) | |
| 780 | |
| 781 | |
| 782 class test_get_random_port(unittest.TestCase): | |
| 783 def do_bind(self, port, socket_type, socket_proto): | |
| 784 s = socket.socket(socket.AF_INET, socket_type, socket_proto) | |
| 785 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| 786 s.bind(('', port)) | |
| 787 return s | |
| 788 | |
| 789 | |
| 790 def test_get_port(self): | |
| 791 for _ in xrange(100): | |
| 792 p = base_utils.get_unused_port() | |
| 793 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP) | |
| 794 self.assert_(s.getsockname()) | |
| 795 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
| 796 self.assert_(s.getsockname()) | |
| 797 | |
| 798 if __name__ == "__main__": | |
| 799 unittest.main() | |
| OLD | NEW |