| OLD | NEW |
| 1 #!/usr/bin/python | 1 #!/usr/bin/python |
| 2 | 2 |
| 3 import os, unittest, StringIO, socket, urllib2, shutil, subprocess, logging | 3 import os, unittest, StringIO, socket, urllib2, shutil, subprocess, logging |
| 4 | 4 |
| 5 import common | 5 import common |
| 6 from autotest_lib.client.common_lib import utils, autotemp | 6 from autotest_lib.client.common_lib import base_utils, autotemp |
| 7 from autotest_lib.client.common_lib.test_utils import mock | 7 from autotest_lib.client.common_lib.test_utils import mock |
| 8 | 8 |
| 9 | 9 |
| 10 class test_read_one_line(unittest.TestCase): | 10 class test_read_one_line(unittest.TestCase): |
| 11 def setUp(self): | 11 def setUp(self): |
| 12 self.god = mock.mock_god() | 12 self.god = mock.mock_god(ut=self) |
| 13 self.god.stub_function(utils, "open") | 13 self.god.stub_function(base_utils, "open") |
| 14 | 14 |
| 15 | 15 |
| 16 def tearDown(self): | 16 def tearDown(self): |
| 17 self.god.unstub_all() | 17 self.god.unstub_all() |
| 18 | 18 |
| 19 | 19 |
| 20 def test_ip_to_long(self): | 20 def test_ip_to_long(self): |
| 21 self.assertEqual(utils.ip_to_long('0.0.0.0'), 0) | 21 self.assertEqual(base_utils.ip_to_long('0.0.0.0'), 0) |
| 22 self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295) | 22 self.assertEqual(base_utils.ip_to_long('255.255.255.255'), 4294967295) |
| 23 self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521) | 23 self.assertEqual(base_utils.ip_to_long('192.168.0.1'), 3232235521) |
| 24 self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320) | 24 self.assertEqual(base_utils.ip_to_long('1.2.4.8'), 16909320) |
| 25 | 25 |
| 26 | 26 |
| 27 def test_long_to_ip(self): | 27 def test_long_to_ip(self): |
| 28 self.assertEqual(utils.long_to_ip(0), '0.0.0.0') | 28 self.assertEqual(base_utils.long_to_ip(0), '0.0.0.0') |
| 29 self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255') | 29 self.assertEqual(base_utils.long_to_ip(4294967295), '255.255.255.255') |
| 30 self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1') | 30 self.assertEqual(base_utils.long_to_ip(3232235521), '192.168.0.1') |
| 31 self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8') | 31 self.assertEqual(base_utils.long_to_ip(16909320), '1.2.4.8') |
| 32 | 32 |
| 33 | 33 |
| 34 def test_create_subnet_mask(self): | 34 def test_create_subnet_mask(self): |
| 35 self.assertEqual(utils.create_subnet_mask(0), 0) | 35 self.assertEqual(base_utils.create_subnet_mask(0), 0) |
| 36 self.assertEqual(utils.create_subnet_mask(32), 4294967295) | 36 self.assertEqual(base_utils.create_subnet_mask(32), 4294967295) |
| 37 self.assertEqual(utils.create_subnet_mask(25), 4294967168) | 37 self.assertEqual(base_utils.create_subnet_mask(25), 4294967168) |
| 38 | 38 |
| 39 | 39 |
| 40 def test_format_ip_with_mask(self): | 40 def test_format_ip_with_mask(self): |
| 41 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0), | 41 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 0), |
| 42 '0.0.0.0/0') | 42 '0.0.0.0/0') |
| 43 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32), | 43 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 32), |
| 44 '192.168.0.1/32') | 44 '192.168.0.1/32') |
| 45 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26), | 45 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 26), |
| 46 '192.168.0.0/26') | 46 '192.168.0.0/26') |
| 47 self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26), | 47 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.255', 26), |
| 48 '192.168.0.192/26') | 48 '192.168.0.192/26') |
| 49 | 49 |
| 50 | 50 |
| 51 def create_test_file(self, contents): | 51 def create_test_file(self, contents): |
| 52 test_file = StringIO.StringIO(contents) | 52 test_file = StringIO.StringIO(contents) |
| 53 utils.open.expect_call("filename", "r").and_return(test_file) | 53 base_utils.open.expect_call("filename", "r").and_return(test_file) |
| 54 | 54 |
| 55 | 55 |
| 56 def test_reads_one_line_file(self): | 56 def test_reads_one_line_file(self): |
| 57 self.create_test_file("abc\n") | 57 self.create_test_file("abc\n") |
| 58 self.assertEqual("abc", utils.read_one_line("filename")) | 58 self.assertEqual("abc", base_utils.read_one_line("filename")) |
| 59 self.god.check_playback() | 59 self.god.check_playback() |
| 60 | 60 |
| 61 | 61 |
| 62 def test_strips_read_lines(self): | 62 def test_strips_read_lines(self): |
| 63 self.create_test_file("abc \n") | 63 self.create_test_file("abc \n") |
| 64 self.assertEqual("abc ", utils.read_one_line("filename")) | 64 self.assertEqual("abc ", base_utils.read_one_line("filename")) |
| 65 self.god.check_playback() | 65 self.god.check_playback() |
| 66 | 66 |
| 67 | 67 |
| 68 def test_drops_extra_lines(self): | 68 def test_drops_extra_lines(self): |
| 69 self.create_test_file("line 1\nline 2\nline 3\n") | 69 self.create_test_file("line 1\nline 2\nline 3\n") |
| 70 self.assertEqual("line 1", utils.read_one_line("filename")) | 70 self.assertEqual("line 1", base_utils.read_one_line("filename")) |
| 71 self.god.check_playback() | 71 self.god.check_playback() |
| 72 | 72 |
| 73 | 73 |
| 74 def test_works_on_empty_file(self): | 74 def test_works_on_empty_file(self): |
| 75 self.create_test_file("") | 75 self.create_test_file("") |
| 76 self.assertEqual("", utils.read_one_line("filename")) | 76 self.assertEqual("", base_utils.read_one_line("filename")) |
| 77 self.god.check_playback() | 77 self.god.check_playback() |
| 78 | 78 |
| 79 | 79 |
| 80 def test_works_on_file_with_no_newlines(self): | 80 def test_works_on_file_with_no_newlines(self): |
| 81 self.create_test_file("line but no newline") | 81 self.create_test_file("line but no newline") |
| 82 self.assertEqual("line but no newline", | 82 self.assertEqual("line but no newline", |
| 83 utils.read_one_line("filename")) | 83 base_utils.read_one_line("filename")) |
| 84 self.god.check_playback() | 84 self.god.check_playback() |
| 85 | 85 |
| 86 | 86 |
| 87 def test_preserves_leading_whitespace(self): | 87 def test_preserves_leading_whitespace(self): |
| 88 self.create_test_file(" has leading whitespace") | 88 self.create_test_file(" has leading whitespace") |
| 89 self.assertEqual(" has leading whitespace", | 89 self.assertEqual(" has leading whitespace", |
| 90 utils.read_one_line("filename")) | 90 base_utils.read_one_line("filename")) |
| 91 | 91 |
| 92 | 92 |
| 93 class test_write_one_line(unittest.TestCase): | 93 class test_write_one_line(unittest.TestCase): |
| 94 def setUp(self): | 94 def setUp(self): |
| 95 self.god = mock.mock_god() | 95 self.god = mock.mock_god(ut=self) |
| 96 self.god.stub_function(utils, "open") | 96 self.god.stub_function(base_utils, "open") |
| 97 | 97 |
| 98 | 98 |
| 99 def tearDown(self): | 99 def tearDown(self): |
| 100 self.god.unstub_all() | 100 self.god.unstub_all() |
| 101 | 101 |
| 102 | 102 |
| 103 def get_write_one_line_output(self, content): | 103 def get_write_one_line_output(self, content): |
| 104 test_file = mock.SaveDataAfterCloseStringIO() | 104 test_file = mock.SaveDataAfterCloseStringIO() |
| 105 utils.open.expect_call("filename", "w").and_return(test_file) | 105 base_utils.open.expect_call("filename", "w").and_return(test_file) |
| 106 utils.write_one_line("filename", content) | 106 base_utils.write_one_line("filename", content) |
| 107 self.god.check_playback() | 107 self.god.check_playback() |
| 108 return test_file.final_data | 108 return test_file.final_data |
| 109 | 109 |
| 110 | 110 |
| 111 def test_writes_one_line_file(self): | 111 def test_writes_one_line_file(self): |
| 112 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) | 112 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) |
| 113 | 113 |
| 114 | 114 |
| 115 def test_preserves_existing_newline(self): | 115 def test_preserves_existing_newline(self): |
| 116 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) | 116 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) |
| 117 | 117 |
| 118 | 118 |
| 119 def test_preserves_leading_whitespace(self): | 119 def test_preserves_leading_whitespace(self): |
| 120 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) | 120 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) |
| 121 | 121 |
| 122 | 122 |
| 123 def test_preserves_trailing_whitespace(self): | 123 def test_preserves_trailing_whitespace(self): |
| 124 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) | 124 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) |
| 125 | 125 |
| 126 | 126 |
| 127 def test_handles_empty_input(self): | 127 def test_handles_empty_input(self): |
| 128 self.assertEqual("\n", self.get_write_one_line_output("")) | 128 self.assertEqual("\n", self.get_write_one_line_output("")) |
| 129 | 129 |
| 130 | 130 |
| 131 class test_open_write_close(unittest.TestCase): | 131 class test_open_write_close(unittest.TestCase): |
| 132 def setUp(self): | 132 def setUp(self): |
| 133 self.god = mock.mock_god() | 133 self.god = mock.mock_god(ut=self) |
| 134 self.god.stub_function(utils, "open") | 134 self.god.stub_function(base_utils, "open") |
| 135 | 135 |
| 136 | 136 |
| 137 def tearDown(self): | 137 def tearDown(self): |
| 138 self.god.unstub_all() | 138 self.god.unstub_all() |
| 139 | 139 |
| 140 | 140 |
| 141 def test_simple_functionality(self): | 141 def test_simple_functionality(self): |
| 142 data = "\n\nwhee\n" | 142 data = "\n\nwhee\n" |
| 143 test_file = mock.SaveDataAfterCloseStringIO() | 143 test_file = mock.SaveDataAfterCloseStringIO() |
| 144 utils.open.expect_call("filename", "w").and_return(test_file) | 144 base_utils.open.expect_call("filename", "w").and_return(test_file) |
| 145 utils.open_write_close("filename", data) | 145 base_utils.open_write_close("filename", data) |
| 146 self.god.check_playback() | 146 self.god.check_playback() |
| 147 self.assertEqual(data, test_file.final_data) | 147 self.assertEqual(data, test_file.final_data) |
| 148 | 148 |
| 149 | 149 |
| 150 class test_read_keyval(unittest.TestCase): | 150 class test_read_keyval(unittest.TestCase): |
| 151 def setUp(self): | 151 def setUp(self): |
| 152 self.god = mock.mock_god() | 152 self.god = mock.mock_god(ut=self) |
| 153 self.god.stub_function(utils, "open") | 153 self.god.stub_function(base_utils, "open") |
| 154 self.god.stub_function(os.path, "isdir") | 154 self.god.stub_function(os.path, "isdir") |
| 155 self.god.stub_function(os.path, "exists") | 155 self.god.stub_function(os.path, "exists") |
| 156 | 156 |
| 157 | 157 |
| 158 def tearDown(self): | 158 def tearDown(self): |
| 159 self.god.unstub_all() | 159 self.god.unstub_all() |
| 160 | 160 |
| 161 | 161 |
| 162 def create_test_file(self, filename, contents): | 162 def create_test_file(self, filename, contents): |
| 163 test_file = StringIO.StringIO(contents) | 163 test_file = StringIO.StringIO(contents) |
| 164 os.path.exists.expect_call(filename).and_return(True) | 164 os.path.exists.expect_call(filename).and_return(True) |
| 165 utils.open.expect_call(filename).and_return(test_file) | 165 base_utils.open.expect_call(filename).and_return(test_file) |
| 166 | 166 |
| 167 | 167 |
| 168 def read_keyval(self, contents): | 168 def read_keyval(self, contents): |
| 169 os.path.isdir.expect_call("file").and_return(False) | 169 os.path.isdir.expect_call("file").and_return(False) |
| 170 self.create_test_file("file", contents) | 170 self.create_test_file("file", contents) |
| 171 keyval = utils.read_keyval("file") | 171 keyval = base_utils.read_keyval("file") |
| 172 self.god.check_playback() | 172 self.god.check_playback() |
| 173 return keyval | 173 return keyval |
| 174 | 174 |
| 175 | 175 |
| 176 def test_returns_empty_when_file_doesnt_exist(self): | 176 def test_returns_empty_when_file_doesnt_exist(self): |
| 177 os.path.isdir.expect_call("file").and_return(False) | 177 os.path.isdir.expect_call("file").and_return(False) |
| 178 os.path.exists.expect_call("file").and_return(False) | 178 os.path.exists.expect_call("file").and_return(False) |
| 179 self.assertEqual({}, utils.read_keyval("file")) | 179 self.assertEqual({}, base_utils.read_keyval("file")) |
| 180 self.god.check_playback() | 180 self.god.check_playback() |
| 181 | 181 |
| 182 | 182 |
| 183 def test_accesses_files_directly(self): | 183 def test_accesses_files_directly(self): |
| 184 os.path.isdir.expect_call("file").and_return(False) | 184 os.path.isdir.expect_call("file").and_return(False) |
| 185 self.create_test_file("file", "") | 185 self.create_test_file("file", "") |
| 186 utils.read_keyval("file") | 186 base_utils.read_keyval("file") |
| 187 self.god.check_playback() | 187 self.god.check_playback() |
| 188 | 188 |
| 189 | 189 |
| 190 def test_accesses_directories_through_keyval_file(self): | 190 def test_accesses_directories_through_keyval_file(self): |
| 191 os.path.isdir.expect_call("dir").and_return(True) | 191 os.path.isdir.expect_call("dir").and_return(True) |
| 192 self.create_test_file("dir/keyval", "") | 192 self.create_test_file("dir/keyval", "") |
| 193 utils.read_keyval("dir") | 193 base_utils.read_keyval("dir") |
| 194 self.god.check_playback() | 194 self.god.check_playback() |
| 195 | 195 |
| 196 | 196 |
| 197 def test_values_are_rstripped(self): | 197 def test_values_are_rstripped(self): |
| 198 keyval = self.read_keyval("a=b \n") | 198 keyval = self.read_keyval("a=b \n") |
| 199 self.assertEquals(keyval, {"a": "b"}) | 199 self.assertEquals(keyval, {"a": "b"}) |
| 200 | 200 |
| 201 | 201 |
| 202 def test_comments_are_ignored(self): | 202 def test_comments_are_ignored(self): |
| 203 keyval = self.read_keyval("a=b # a comment\n") | 203 keyval = self.read_keyval("a=b # a comment\n") |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 240 self.assertEquals(keyval, {"a_b": "value"}) | 240 self.assertEquals(keyval, {"a_b": "value"}) |
| 241 | 241 |
| 242 | 242 |
| 243 def test_dashes_are_allowed_in_key_names(self): | 243 def test_dashes_are_allowed_in_key_names(self): |
| 244 keyval = self.read_keyval("a-b=value\n") | 244 keyval = self.read_keyval("a-b=value\n") |
| 245 self.assertEquals(keyval, {"a-b": "value"}) | 245 self.assertEquals(keyval, {"a-b": "value"}) |
| 246 | 246 |
| 247 | 247 |
| 248 class test_write_keyval(unittest.TestCase): | 248 class test_write_keyval(unittest.TestCase): |
| 249 def setUp(self): | 249 def setUp(self): |
| 250 self.god = mock.mock_god() | 250 self.god = mock.mock_god(ut=self) |
| 251 self.god.stub_function(utils, "open") | 251 self.god.stub_function(base_utils, "open") |
| 252 self.god.stub_function(os.path, "isdir") | 252 self.god.stub_function(os.path, "isdir") |
| 253 | 253 |
| 254 | 254 |
| 255 def tearDown(self): | 255 def tearDown(self): |
| 256 self.god.unstub_all() | 256 self.god.unstub_all() |
| 257 | 257 |
| 258 | 258 |
| 259 def assertHasLines(self, value, lines): | 259 def assertHasLines(self, value, lines): |
| 260 vlines = value.splitlines() | 260 vlines = value.splitlines() |
| 261 vlines.sort() | 261 vlines.sort() |
| 262 self.assertEquals(vlines, sorted(lines)) | 262 self.assertEquals(vlines, sorted(lines)) |
| 263 | 263 |
| 264 | 264 |
| 265 def write_keyval(self, filename, dictionary, expected_filename=None, | 265 def write_keyval(self, filename, dictionary, expected_filename=None, |
| 266 type_tag=None): | 266 type_tag=None): |
| 267 if expected_filename is None: | 267 if expected_filename is None: |
| 268 expected_filename = filename | 268 expected_filename = filename |
| 269 test_file = StringIO.StringIO() | 269 test_file = StringIO.StringIO() |
| 270 self.god.stub_function(test_file, "close") | 270 self.god.stub_function(test_file, "close") |
| 271 utils.open.expect_call(expected_filename, "a").and_return(test_file) | 271 base_utils.open.expect_call(expected_filename, "a").and_return(test_file
) |
| 272 test_file.close.expect_call() | 272 test_file.close.expect_call() |
| 273 if type_tag is None: | 273 if type_tag is None: |
| 274 utils.write_keyval(filename, dictionary) | 274 base_utils.write_keyval(filename, dictionary) |
| 275 else: | 275 else: |
| 276 utils.write_keyval(filename, dictionary, type_tag) | 276 base_utils.write_keyval(filename, dictionary, type_tag) |
| 277 return test_file.getvalue() | 277 return test_file.getvalue() |
| 278 | 278 |
| 279 | 279 |
| 280 def write_keyval_file(self, dictionary, type_tag=None): | 280 def write_keyval_file(self, dictionary, type_tag=None): |
| 281 os.path.isdir.expect_call("file").and_return(False) | 281 os.path.isdir.expect_call("file").and_return(False) |
| 282 return self.write_keyval("file", dictionary, type_tag=type_tag) | 282 return self.write_keyval("file", dictionary, type_tag=type_tag) |
| 283 | 283 |
| 284 | 284 |
| 285 def test_accesses_files_directly(self): | 285 def test_accesses_files_directly(self): |
| 286 os.path.isdir.expect_call("file").and_return(False) | 286 os.path.isdir.expect_call("file").and_return(False) |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 323 self.assertEquals(result, "a_b=value\n") | 323 self.assertEquals(result, "a_b=value\n") |
| 324 | 324 |
| 325 | 325 |
| 326 def test_dashes_are_allowed_in_key_names(self): | 326 def test_dashes_are_allowed_in_key_names(self): |
| 327 result = self.write_keyval_file({"a-b": "value"}) | 327 result = self.write_keyval_file({"a-b": "value"}) |
| 328 self.assertEquals(result, "a-b=value\n") | 328 self.assertEquals(result, "a-b=value\n") |
| 329 | 329 |
| 330 | 330 |
| 331 class test_is_url(unittest.TestCase): | 331 class test_is_url(unittest.TestCase): |
| 332 def test_accepts_http(self): | 332 def test_accepts_http(self): |
| 333 self.assertTrue(utils.is_url("http://example.com")) | 333 self.assertTrue(base_utils.is_url("http://example.com")) |
| 334 | 334 |
| 335 | 335 |
| 336 def test_accepts_ftp(self): | 336 def test_accepts_ftp(self): |
| 337 self.assertTrue(utils.is_url("ftp://ftp.example.com")) | 337 self.assertTrue(base_utils.is_url("ftp://ftp.example.com")) |
| 338 | 338 |
| 339 | 339 |
| 340 def test_rejects_local_path(self): | 340 def test_rejects_local_path(self): |
| 341 self.assertFalse(utils.is_url("/home/username/file")) | 341 self.assertFalse(base_utils.is_url("/home/username/file")) |
| 342 | 342 |
| 343 | 343 |
| 344 def test_rejects_local_filename(self): | 344 def test_rejects_local_filename(self): |
| 345 self.assertFalse(utils.is_url("filename")) | 345 self.assertFalse(base_utils.is_url("filename")) |
| 346 | 346 |
| 347 | 347 |
| 348 def test_rejects_relative_local_path(self): | 348 def test_rejects_relative_local_path(self): |
| 349 self.assertFalse(utils.is_url("somedir/somesubdir/file")) | 349 self.assertFalse(base_utils.is_url("somedir/somesubdir/file")) |
| 350 | 350 |
| 351 | 351 |
| 352 def test_rejects_local_path_containing_url(self): | 352 def test_rejects_local_path_containing_url(self): |
| 353 self.assertFalse(utils.is_url("somedir/http://path/file")) | 353 self.assertFalse(base_utils.is_url("somedir/http://path/file")) |
| 354 | 354 |
| 355 | 355 |
| 356 class test_urlopen(unittest.TestCase): | 356 class test_urlopen(unittest.TestCase): |
| 357 def setUp(self): | 357 def setUp(self): |
| 358 self.god = mock.mock_god() | 358 self.god = mock.mock_god(ut=self) |
| 359 | 359 |
| 360 | 360 |
| 361 def tearDown(self): | 361 def tearDown(self): |
| 362 self.god.unstub_all() | 362 self.god.unstub_all() |
| 363 | 363 |
| 364 | 364 |
| 365 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, | 365 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, |
| 366 *expected_args): | 366 *expected_args): |
| 367 expected_args += (None,) * (2 - len(expected_args)) | 367 expected_args += (None,) * (2 - len(expected_args)) |
| 368 def urlopen(url, data=None): | 368 def urlopen(url, data=None): |
| 369 self.assertEquals(expected_args, (url,data)) | 369 self.assertEquals(expected_args, (url,data)) |
| 370 test_func(socket.getdefaulttimeout()) | 370 test_func(socket.getdefaulttimeout()) |
| 371 return expected_return | 371 return expected_return |
| 372 self.god.stub_with(urllib2, "urlopen", urlopen) | 372 self.god.stub_with(urllib2, "urlopen", urlopen) |
| 373 | 373 |
| 374 | 374 |
| 375 def stub_urlopen_with_timeout_check(self, expected_timeout, | 375 def stub_urlopen_with_timeout_check(self, expected_timeout, |
| 376 expected_return, *expected_args): | 376 expected_return, *expected_args): |
| 377 def test_func(timeout): | 377 def test_func(timeout): |
| 378 self.assertEquals(timeout, expected_timeout) | 378 self.assertEquals(timeout, expected_timeout) |
| 379 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, | 379 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, |
| 380 *expected_args) | 380 *expected_args) |
| 381 | 381 |
| 382 | 382 |
| 383 def test_timeout_set_during_call(self): | 383 def test_timeout_set_during_call(self): |
| 384 self.stub_urlopen_with_timeout_check(30, "retval", "url") | 384 self.stub_urlopen_with_timeout_check(30, "retval", "url") |
| 385 retval = utils.urlopen("url", timeout=30) | 385 retval = base_utils.urlopen("url", timeout=30) |
| 386 self.assertEquals(retval, "retval") | 386 self.assertEquals(retval, "retval") |
| 387 | 387 |
| 388 | 388 |
| 389 def test_timeout_reset_after_call(self): | 389 def test_timeout_reset_after_call(self): |
| 390 old_timeout = socket.getdefaulttimeout() | 390 old_timeout = socket.getdefaulttimeout() |
| 391 self.stub_urlopen_with_timeout_check(30, None, "url") | 391 self.stub_urlopen_with_timeout_check(30, None, "url") |
| 392 try: | 392 try: |
| 393 socket.setdefaulttimeout(1234) | 393 socket.setdefaulttimeout(1234) |
| 394 utils.urlopen("url", timeout=30) | 394 base_utils.urlopen("url", timeout=30) |
| 395 self.assertEquals(1234, socket.getdefaulttimeout()) | 395 self.assertEquals(1234, socket.getdefaulttimeout()) |
| 396 finally: | 396 finally: |
| 397 socket.setdefaulttimeout(old_timeout) | 397 socket.setdefaulttimeout(old_timeout) |
| 398 | 398 |
| 399 | 399 |
| 400 def test_timeout_set_by_default(self): | 400 def test_timeout_set_by_default(self): |
| 401 def test_func(timeout): | 401 def test_func(timeout): |
| 402 self.assertTrue(timeout is not None) | 402 self.assertTrue(timeout is not None) |
| 403 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") | 403 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") |
| 404 utils.urlopen("url") | 404 base_utils.urlopen("url") |
| 405 | 405 |
| 406 | 406 |
| 407 def test_args_are_untouched(self): | 407 def test_args_are_untouched(self): |
| 408 self.stub_urlopen_with_timeout_check(30, None, "http://url", | 408 self.stub_urlopen_with_timeout_check(30, None, "http://url", |
| 409 "POST data") | 409 "POST data") |
| 410 utils.urlopen("http://url", timeout=30, data="POST data") | 410 base_utils.urlopen("http://url", timeout=30, data="POST data") |
| 411 | 411 |
| 412 | 412 |
| 413 class test_urlretrieve(unittest.TestCase): | 413 class test_urlretrieve(unittest.TestCase): |
| 414 def setUp(self): | 414 def setUp(self): |
| 415 self.god = mock.mock_god() | 415 self.god = mock.mock_god(ut=self) |
| 416 | 416 |
| 417 | 417 |
| 418 def tearDown(self): | 418 def tearDown(self): |
| 419 self.god.unstub_all() | 419 self.god.unstub_all() |
| 420 | 420 |
| 421 | 421 |
| 422 def test_urlopen_passed_arguments(self): | 422 def test_urlopen_passed_arguments(self): |
| 423 self.god.stub_function(utils, "urlopen") | 423 self.god.stub_function(base_utils, "urlopen") |
| 424 self.god.stub_function(utils.shutil, "copyfileobj") | 424 self.god.stub_function(base_utils.shutil, "copyfileobj") |
| 425 self.god.stub_function(utils, "open") | 425 self.god.stub_function(base_utils, "open") |
| 426 | 426 |
| 427 url = "url" | 427 url = "url" |
| 428 dest = "somefile" | 428 dest = "somefile" |
| 429 data = object() | 429 data = object() |
| 430 timeout = 10 | 430 timeout = 10 |
| 431 | 431 |
| 432 src_file = self.god.create_mock_class(file, "file") | 432 src_file = self.god.create_mock_class(file, "file") |
| 433 dest_file = self.god.create_mock_class(file, "file") | 433 dest_file = self.god.create_mock_class(file, "file") |
| 434 | 434 |
| 435 (utils.urlopen.expect_call(url, data=data, timeout=timeout) | 435 (base_utils.urlopen.expect_call(url, data=data, timeout=timeout) |
| 436 .and_return(src_file)) | 436 .and_return(src_file)) |
| 437 utils.open.expect_call(dest, "wb").and_return(dest_file) | 437 base_utils.open.expect_call(dest, "wb").and_return(dest_file) |
| 438 utils.shutil.copyfileobj.expect_call(src_file, dest_file) | 438 base_utils.shutil.copyfileobj.expect_call(src_file, dest_file) |
| 439 dest_file.close.expect_call() | 439 dest_file.close.expect_call() |
| 440 src_file.close.expect_call() | 440 src_file.close.expect_call() |
| 441 | 441 |
| 442 utils.urlretrieve(url, dest, data=data, timeout=timeout) | 442 base_utils.urlretrieve(url, dest, data=data, timeout=timeout) |
| 443 self.god.check_playback() | 443 self.god.check_playback() |
| 444 | 444 |
| 445 | 445 |
| 446 class test_merge_trees(unittest.TestCase): | 446 class test_merge_trees(unittest.TestCase): |
| 447 # a some path-handling helper functions | 447 # a some path-handling helper functions |
| 448 def src(self, *path_segments): | 448 def src(self, *path_segments): |
| 449 return os.path.join(self.src_tree.name, *path_segments) | 449 return os.path.join(self.src_tree.name, *path_segments) |
| 450 | 450 |
| 451 | 451 |
| 452 def dest(self, *path_segments): | 452 def dest(self, *path_segments): |
| (...skipping 27 matching lines...) Expand all Loading... |
| 480 os.mkdir(self.src("empty")) | 480 os.mkdir(self.src("empty")) |
| 481 os.mkdir(self.dest("empty")) | 481 os.mkdir(self.dest("empty")) |
| 482 | 482 |
| 483 | 483 |
| 484 def tearDown(self): | 484 def tearDown(self): |
| 485 self.src_tree.clean() | 485 self.src_tree.clean() |
| 486 self.dest_tree.clean() | 486 self.dest_tree.clean() |
| 487 | 487 |
| 488 | 488 |
| 489 def test_both_dont_exist(self): | 489 def test_both_dont_exist(self): |
| 490 utils.merge_trees(*self.paths("empty")) | 490 base_utils.merge_trees(*self.paths("empty")) |
| 491 | 491 |
| 492 | 492 |
| 493 def test_file_only_at_src(self): | 493 def test_file_only_at_src(self): |
| 494 print >> open(self.src("src_only"), "w"), "line 1" | 494 print >> open(self.src("src_only"), "w"), "line 1" |
| 495 utils.merge_trees(*self.paths("src_only")) | 495 base_utils.merge_trees(*self.paths("src_only")) |
| 496 self.assertFileEqual("src_only") | 496 self.assertFileEqual("src_only") |
| 497 | 497 |
| 498 | 498 |
| 499 def test_file_only_at_dest(self): | 499 def test_file_only_at_dest(self): |
| 500 print >> open(self.dest("dest_only"), "w"), "line 1" | 500 print >> open(self.dest("dest_only"), "w"), "line 1" |
| 501 utils.merge_trees(*self.paths("dest_only")) | 501 base_utils.merge_trees(*self.paths("dest_only")) |
| 502 self.assertEqual(False, os.path.exists(self.src("dest_only"))) | 502 self.assertEqual(False, os.path.exists(self.src("dest_only"))) |
| 503 self.assertFileContents("line 1\n", "dest_only") | 503 self.assertFileContents("line 1\n", "dest_only") |
| 504 | 504 |
| 505 | 505 |
| 506 def test_file_at_both(self): | 506 def test_file_at_both(self): |
| 507 print >> open(self.dest("in_both"), "w"), "line 1" | 507 print >> open(self.dest("in_both"), "w"), "line 1" |
| 508 print >> open(self.src("in_both"), "w"), "line 2" | 508 print >> open(self.src("in_both"), "w"), "line 2" |
| 509 utils.merge_trees(*self.paths("in_both")) | 509 base_utils.merge_trees(*self.paths("in_both")) |
| 510 self.assertFileContents("line 1\nline 2\n", "in_both") | 510 self.assertFileContents("line 1\nline 2\n", "in_both") |
| 511 | 511 |
| 512 | 512 |
| 513 def test_directory_with_files_in_both(self): | 513 def test_directory_with_files_in_both(self): |
| 514 print >> open(self.dest("in_both"), "w"), "line 1" | 514 print >> open(self.dest("in_both"), "w"), "line 1" |
| 515 print >> open(self.src("in_both"), "w"), "line 3" | 515 print >> open(self.src("in_both"), "w"), "line 3" |
| 516 utils.merge_trees(*self.paths()) | 516 base_utils.merge_trees(*self.paths()) |
| 517 self.assertFileContents("line 1\nline 3\n", "in_both") | 517 self.assertFileContents("line 1\nline 3\n", "in_both") |
| 518 | 518 |
| 519 | 519 |
| 520 def test_directory_with_mix_of_files(self): | 520 def test_directory_with_mix_of_files(self): |
| 521 print >> open(self.dest("in_dest"), "w"), "dest line" | 521 print >> open(self.dest("in_dest"), "w"), "dest line" |
| 522 print >> open(self.src("in_src"), "w"), "src line" | 522 print >> open(self.src("in_src"), "w"), "src line" |
| 523 utils.merge_trees(*self.paths()) | 523 base_utils.merge_trees(*self.paths()) |
| 524 self.assertFileContents("dest line\n", "in_dest") | 524 self.assertFileContents("dest line\n", "in_dest") |
| 525 self.assertFileContents("src line\n", "in_src") | 525 self.assertFileContents("src line\n", "in_src") |
| 526 | 526 |
| 527 | 527 |
| 528 def test_directory_with_subdirectories(self): | 528 def test_directory_with_subdirectories(self): |
| 529 os.mkdir(self.src("src_subdir")) | 529 os.mkdir(self.src("src_subdir")) |
| 530 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line" | 530 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line" |
| 531 os.mkdir(self.src("both_subdir")) | 531 os.mkdir(self.src("both_subdir")) |
| 532 os.mkdir(self.dest("both_subdir")) | 532 os.mkdir(self.dest("both_subdir")) |
| 533 print >> open(self.src("both_subdir", "subfile"), "w"), "src line" | 533 print >> open(self.src("both_subdir", "subfile"), "w"), "src line" |
| 534 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line" | 534 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line" |
| 535 utils.merge_trees(*self.paths()) | 535 base_utils.merge_trees(*self.paths()) |
| 536 self.assertFileContents("subdir line\n", "src_subdir", "subfile") | 536 self.assertFileContents("subdir line\n", "src_subdir", "subfile") |
| 537 self.assertFileContents("dest line\nsrc line\n", "both_subdir", | 537 self.assertFileContents("dest line\nsrc line\n", "both_subdir", |
| 538 "subfile") | 538 "subfile") |
| 539 | 539 |
| 540 | 540 |
| 541 class test_get_relative_path(unittest.TestCase): | 541 class test_get_relative_path(unittest.TestCase): |
| 542 def test_not_absolute(self): | 542 def test_not_absolute(self): |
| 543 self.assertRaises(AssertionError, utils.get_relative_path, "a", "b") | 543 self.assertRaises(AssertionError, base_utils.get_relative_path, "a", "b"
) |
| 544 | 544 |
| 545 def test_same_dir(self): | 545 def test_same_dir(self): |
| 546 self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c") | 546 self.assertEqual(base_utils.get_relative_path("/a/b/c", "/a/b"), "c") |
| 547 | 547 |
| 548 def test_forward_dir(self): | 548 def test_forward_dir(self): |
| 549 self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d") | 549 self.assertEqual(base_utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d"
) |
| 550 | 550 |
| 551 def test_previous_dir(self): | 551 def test_previous_dir(self): |
| 552 self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..") | 552 self.assertEqual(base_utils.get_relative_path("/a/b", "/a/b/c/d"), "../.
.") |
| 553 | 553 |
| 554 def test_parallel_dir(self): | 554 def test_parallel_dir(self): |
| 555 self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"), | 555 self.assertEqual(base_utils.get_relative_path("/a/c/d", "/a/b/c/d"), |
| 556 "../../../c/d") | 556 "../../../c/d") |
| 557 | 557 |
| 558 | 558 |
| 559 class test_sh_escape(unittest.TestCase): | 559 class test_sh_escape(unittest.TestCase): |
| 560 def _test_in_shell(self, text): | 560 def _test_in_shell(self, text): |
| 561 escaped_text = utils.sh_escape(text) | 561 escaped_text = base_utils.sh_escape(text) |
| 562 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, | 562 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, |
| 563 stdin=open(os.devnull, 'r'), | 563 stdin=open(os.devnull, 'r'), |
| 564 stdout=subprocess.PIPE, | 564 stdout=subprocess.PIPE, |
| 565 stderr=open(os.devnull, 'w')) | 565 stderr=open(os.devnull, 'w')) |
| 566 stdout, _ = proc.communicate() | 566 stdout, _ = proc.communicate() |
| 567 self.assertEqual(proc.returncode, 0) | 567 self.assertEqual(proc.returncode, 0) |
| 568 self.assertEqual(stdout[:-1], text) | 568 self.assertEqual(stdout[:-1], text) |
| 569 | 569 |
| 570 | 570 |
| 571 def test_normal_string(self): | 571 def test_normal_string(self): |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 620 self._test_in_shell('\\r') | 620 self._test_in_shell('\\r') |
| 621 self._test_in_shell('\\t') | 621 self._test_in_shell('\\t') |
| 622 self._test_in_shell('\\v') | 622 self._test_in_shell('\\v') |
| 623 self._test_in_shell('\\b') | 623 self._test_in_shell('\\b') |
| 624 self._test_in_shell('\\a') | 624 self._test_in_shell('\\a') |
| 625 self._test_in_shell('\\000') | 625 self._test_in_shell('\\000') |
| 626 | 626 |
| 627 | 627 |
| 628 class test_run(unittest.TestCase): | 628 class test_run(unittest.TestCase): |
| 629 """ | 629 """ |
| 630 Test the utils.run() function. | 630 Test the base_utils.run() function. |
| 631 | 631 |
| 632 Note: This test runs simple external commands to test the utils.run() | 632 Note: This test runs simple external commands to test the base_utils.run() |
| 633 API without assuming implementation details. | 633 API without assuming implementation details. |
| 634 """ | 634 """ |
| 635 def setUp(self): | 635 def setUp(self): |
| 636 self.god = mock.mock_god() | 636 self.god = mock.mock_god(ut=self) |
| 637 self.god.stub_function(utils.logging, 'warn') | 637 self.god.stub_function(base_utils.logging, 'warn') |
| 638 self.god.stub_function(utils.logging, 'debug') | 638 self.god.stub_function(base_utils.logging, 'debug') |
| 639 | 639 |
| 640 | 640 |
| 641 def tearDown(self): | 641 def tearDown(self): |
| 642 self.god.unstub_all() | 642 self.god.unstub_all() |
| 643 | 643 |
| 644 | 644 |
| 645 def __check_result(self, result, command, exit_status=0, stdout='', | 645 def __check_result(self, result, command, exit_status=0, stdout='', |
| 646 stderr=''): | 646 stderr=''): |
| 647 self.assertEquals(result.command, command) | 647 self.assertEquals(result.command, command) |
| 648 self.assertEquals(result.exit_status, exit_status) | 648 self.assertEquals(result.exit_status, exit_status) |
| 649 self.assertEquals(result.stdout, stdout) | 649 self.assertEquals(result.stdout, stdout) |
| 650 self.assertEquals(result.stderr, stderr) | 650 self.assertEquals(result.stderr, stderr) |
| 651 | 651 |
| 652 | 652 |
| 653 def test_default_simple(self): | 653 def test_default_simple(self): |
| 654 cmd = 'echo "hello world"' | 654 cmd = 'echo "hello world"' |
| 655 # expect some king of logging.debug() call but don't care about args | 655 # expect some king of logging.debug() call but don't care about args |
| 656 utils.logging.debug.expect_any_call() | 656 base_utils.logging.debug.expect_any_call() |
| 657 self.__check_result(utils.run(cmd), cmd, stdout='hello world\n') | 657 self.__check_result(base_utils.run(cmd), cmd, stdout='hello world\n') |
| 658 | 658 |
| 659 | 659 |
| 660 def test_default_failure(self): | 660 def test_default_failure(self): |
| 661 cmd = 'exit 11' | 661 cmd = 'exit 11' |
| 662 try: | 662 try: |
| 663 utils.run(cmd, verbose=False) | 663 base_utils.run(cmd, verbose=False) |
| 664 except utils.error.CmdError, err: | 664 except base_utils.error.CmdError, err: |
| 665 self.__check_result(err.result_obj, cmd, exit_status=11) | 665 self.__check_result(err.result_obj, cmd, exit_status=11) |
| 666 | 666 |
| 667 | 667 |
| 668 def test_ignore_status(self): | 668 def test_ignore_status(self): |
| 669 cmd = 'echo error >&2 && exit 11' | 669 cmd = 'echo error >&2 && exit 11' |
| 670 self.__check_result(utils.run(cmd, ignore_status=True, verbose=False), | 670 self.__check_result(base_utils.run(cmd, ignore_status=True, verbose=Fals
e), |
| 671 cmd, exit_status=11, stderr='error\n') | 671 cmd, exit_status=11, stderr='error\n') |
| 672 | 672 |
| 673 | 673 |
| 674 def test_timeout(self): | 674 def test_timeout(self): |
| 675 # we expect a logging.warn() message, don't care about the contents | 675 # we expect a logging.warn() message, don't care about the contents |
| 676 utils.logging.warn.expect_any_call() | 676 base_utils.logging.warn.expect_any_call() |
| 677 try: | 677 try: |
| 678 utils.run('echo -n output && sleep 10', timeout=1, verbose=False) | 678 base_utils.run('echo -n output && sleep 10', timeout=1, verbose=Fals
e) |
| 679 except utils.error.CmdError, err: | 679 except base_utils.error.CmdError, err: |
| 680 self.assertEquals(err.result_obj.stdout, 'output') | 680 self.assertEquals(err.result_obj.stdout, 'output') |
| 681 | 681 |
| 682 | 682 |
| 683 def test_stdout_stderr_tee(self): | 683 def test_stdout_stderr_tee(self): |
| 684 cmd = 'echo output && echo error >&2' | 684 cmd = 'echo output && echo error >&2' |
| 685 stdout_tee = StringIO.StringIO() | 685 stdout_tee = StringIO.StringIO() |
| 686 stderr_tee = StringIO.StringIO() | 686 stderr_tee = StringIO.StringIO() |
| 687 | 687 |
| 688 self.__check_result(utils.run( | 688 self.__check_result(base_utils.run( |
| 689 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee, | 689 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee, |
| 690 verbose=False), cmd, stdout='output\n', stderr='error\n') | 690 verbose=False), cmd, stdout='output\n', stderr='error\n') |
| 691 self.assertEqual(stdout_tee.getvalue(), 'output\n') | 691 self.assertEqual(stdout_tee.getvalue(), 'output\n') |
| 692 self.assertEqual(stderr_tee.getvalue(), 'error\n') | 692 self.assertEqual(stderr_tee.getvalue(), 'error\n') |
| 693 | 693 |
| 694 | 694 |
| 695 def test_stdin_string(self): | 695 def test_stdin_string(self): |
| 696 cmd = 'cat' | 696 cmd = 'cat' |
| 697 self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'), | 697 self.__check_result(base_utils.run(cmd, verbose=False, stdin='hi!\n'), |
| 698 cmd, stdout='hi!\n') | 698 cmd, stdout='hi!\n') |
| 699 | 699 |
| 700 | 700 |
| 701 def test_safe_args(self): | 701 def test_safe_args(self): |
| 702 cmd = 'echo "hello \\"world" "again"' | 702 cmd = 'echo "hello \\"world" "again"' |
| 703 self.__check_result(utils.run( | 703 self.__check_result(base_utils.run( |
| 704 'echo', verbose=False, args=('hello "world', 'again')), cmd, | 704 'echo', verbose=False, args=('hello "world', 'again')), cmd, |
| 705 stdout='hello "world again\n') | 705 stdout='hello "world again\n') |
| 706 | 706 |
| 707 | 707 |
| 708 def test_safe_args_given_string(self): | 708 def test_safe_args_given_string(self): |
| 709 cmd = 'echo "hello \\"world" "again"' | 709 cmd = 'echo "hello \\"world" "again"' |
| 710 self.assertRaises(TypeError, utils.run, 'echo', args='hello') | 710 self.assertRaises(TypeError, base_utils.run, 'echo', args='hello') |
| 711 | 711 |
| 712 | 712 |
| 713 class test_compare_versions(unittest.TestCase): | 713 class test_compare_versions(unittest.TestCase): |
| 714 def test_zerofill(self): | 714 def test_zerofill(self): |
| 715 self.assertEqual(utils.compare_versions('1.7', '1.10'), -1) | 715 self.assertEqual(base_utils.compare_versions('1.7', '1.10'), -1) |
| 716 self.assertEqual(utils.compare_versions('1.222', '1.3'), 1) | 716 self.assertEqual(base_utils.compare_versions('1.222', '1.3'), 1) |
| 717 self.assertEqual(utils.compare_versions('1.03', '1.3'), 0) | 717 self.assertEqual(base_utils.compare_versions('1.03', '1.3'), 0) |
| 718 | 718 |
| 719 | 719 |
| 720 def test_unequal_len(self): | 720 def test_unequal_len(self): |
| 721 self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1) | 721 self.assertEqual(base_utils.compare_versions('1.3', '1.3.4'), -1) |
| 722 self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1) | 722 self.assertEqual(base_utils.compare_versions('1.3.1', '1.3'), 1) |
| 723 | 723 |
| 724 | 724 |
| 725 def test_dash_delimited(self): | 725 def test_dash_delimited(self): |
| 726 self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1) | 726 self.assertEqual(base_utils.compare_versions('1-2-3', '1-5-1'), -1) |
| 727 self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1) | 727 self.assertEqual(base_utils.compare_versions('1-2-1', '1-1-1'), 1) |
| 728 self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0) | 728 self.assertEqual(base_utils.compare_versions('1-2-4', '1-2-4'), 0) |
| 729 | 729 |
| 730 | 730 |
| 731 def test_alphabets(self): | 731 def test_alphabets(self): |
| 732 self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1) | 732 self.assertEqual(base_utils.compare_versions('m.l.b', 'n.b.a'), -1) |
| 733 self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1) | 733 self.assertEqual(base_utils.compare_versions('n.b.a', 'm.l.b'), 1) |
| 734 self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0) | 734 self.assertEqual(base_utils.compare_versions('abc.e', 'abc.e'), 0) |
| 735 | 735 |
| 736 | 736 |
| 737 def test_mix_symbols(self): | 737 def test_mix_symbols(self): |
| 738 self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1) | 738 self.assertEqual(base_utils.compare_versions('k-320.1', 'k-320.3'), -1) |
| 739 self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1) | 739 self.assertEqual(base_utils.compare_versions('k-231.5', 'k-231.1'), 1) |
| 740 self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0) | 740 self.assertEqual(base_utils.compare_versions('k-231.1', 'k-231.1'), 0) |
| 741 | 741 |
| 742 self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1) | 742 self.assertEqual(base_utils.compare_versions('k.320-1', 'k.320-3'), -1) |
| 743 self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1) | 743 self.assertEqual(base_utils.compare_versions('k.231-5', 'k.231-1'), 1) |
| 744 self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0) | 744 self.assertEqual(base_utils.compare_versions('k.231-1', 'k.231-1'), 0) |
| 745 | 745 |
| 746 | 746 |
| 747 class test_args_to_dict(unittest.TestCase): | 747 class test_args_to_dict(unittest.TestCase): |
| 748 def test_no_args(self): | 748 def test_no_args(self): |
| 749 result = utils.args_to_dict([]) | 749 result = base_utils.args_to_dict([]) |
| 750 self.assertEqual({}, result) | 750 self.assertEqual({}, result) |
| 751 | 751 |
| 752 | 752 |
| 753 def test_matches(self): | 753 def test_matches(self): |
| 754 result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:', | 754 result = base_utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:', |
| 755 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:']) | 755 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:']) |
| 756 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'', | 756 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'', |
| 757 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'}) | 757 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'}) |
| 758 | 758 |
| 759 | 759 |
| 760 def test_unmatches(self): | 760 def test_unmatches(self): |
| 761 # Temporarily shut warning messages from args_to_dict() when an argument | 761 # Temporarily shut warning messages from args_to_dict() when an argument |
| 762 # doesn't match its pattern. | 762 # doesn't match its pattern. |
| 763 logger = logging.getLogger() | 763 logger = logging.getLogger() |
| 764 saved_level = logger.level | 764 saved_level = logger.level |
| 765 logger.setLevel(logging.ERROR) | 765 logger.setLevel(logging.ERROR) |
| 766 | 766 |
| 767 try: | 767 try: |
| 768 result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b', | 768 result = base_utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', '
a*b', |
| 769 ':VAL', '=VVV', 'WORD']) | 769 ':VAL', '=VVV', 'WORD']) |
| 770 self.assertEqual({}, result) | 770 self.assertEqual({}, result) |
| 771 finally: | 771 finally: |
| 772 # Restore level. | 772 # Restore level. |
| 773 logger.setLevel(saved_level) | 773 logger.setLevel(saved_level) |
| 774 | 774 |
| 775 | 775 |
| 776 class test_get_random_port(unittest.TestCase): | 776 class test_get_random_port(unittest.TestCase): |
| 777 def do_bind(self, port, socket_type, socket_proto): | 777 def do_bind(self, port, socket_type, socket_proto): |
| 778 s = socket.socket(socket.AF_INET, socket_type, socket_proto) | 778 s = socket.socket(socket.AF_INET, socket_type, socket_proto) |
| 779 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | 779 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 780 s.bind(('', port)) | 780 s.bind(('', port)) |
| 781 return s | 781 return s |
| 782 | 782 |
| 783 | 783 |
| 784 def test_get_port(self): | 784 def test_get_port(self): |
| 785 for _ in xrange(100): | 785 for _ in xrange(100): |
| 786 p = utils.get_unused_port() | 786 p = base_utils.get_unused_port() |
| 787 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP) | 787 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP) |
| 788 self.assert_(s.getsockname()) | 788 self.assert_(s.getsockname()) |
| 789 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | 789 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP) |
| 790 self.assert_(s.getsockname()) | 790 self.assert_(s.getsockname()) |
| 791 | 791 |
| 792 | 792 |
| 793 if __name__ == "__main__": | 793 if __name__ == "__main__": |
| 794 unittest.main() | 794 unittest.main() |
| OLD | NEW |