Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
| 5 | 5 |
| 6 import logging | 6 import logging |
| 7 import os | 7 import os |
| 8 import sys | 8 import sys |
| 9 import tempfile | 9 import tempfile |
| 10 import unittest | 10 import unittest |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 24 def setUp(self): | 24 def setUp(self): |
| 25 self.tempdir = tempfile.mkdtemp(prefix=u'named_cache_test') | 25 self.tempdir = tempfile.mkdtemp(prefix=u'named_cache_test') |
| 26 self.manager = named_cache.CacheManager(self.tempdir) | 26 self.manager = named_cache.CacheManager(self.tempdir) |
| 27 | 27 |
| 28 def tearDown(self): | 28 def tearDown(self): |
| 29 try: | 29 try: |
| 30 file_path.rmtree(self.tempdir) | 30 file_path.rmtree(self.tempdir) |
| 31 finally: | 31 finally: |
| 32 super(CacheManagerTest, self).tearDown() | 32 super(CacheManagerTest, self).tearDown() |
| 33 | 33 |
| 34 def test_request(self): | 34 def make_caches(self, names): |
| 35 with self.assertRaises(AssertionError): | 35 dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test') |
| 36 self.manager.request('foo') # manager is not open | 36 try: |
| 37 with self.manager.open(): | 37 names = map(str, names) |
| 38 foo_path = self.manager.request('foo') | 38 caches = [(n, n) for n in names] |
| 39 self.assertEqual(foo_path, self.manager.request('foo')) | 39 self.manager.install(dest_dir, caches) |
| 40 bar_path = self.manager.request('bar') | 40 self.assertEqual(set(names), set(os.listdir(dest_dir))) |
| 41 self.assertEqual( | 41 self.manager.uninstall(dest_dir, caches) |
| 42 foo_path, | 42 self.assertEqual([], os.listdir(dest_dir)) |
| 43 os.path.abspath(os.readlink( | 43 self.assertTrue(self.manager.available.issuperset(names)) |
| 44 os.path.join(self.tempdir, 'named', 'foo')))) | 44 finally: |
| 45 self.assertEqual( | 45 file_path.rmtree(dest_dir) |
| 46 bar_path, | |
| 47 os.path.abspath(os.readlink( | |
| 48 os.path.join(self.tempdir, 'named', 'bar')))) | |
| 49 self.assertEqual(os.path.dirname(bar_path), self.tempdir) | |
| 50 self.assertEqual([], os.listdir(foo_path)) | |
| 51 self.assertEqual([], os.listdir(bar_path)) | |
| 52 | 46 |
| 53 def test_get_oldest(self): | 47 def test_get_oldest(self): |
| 54 with self.manager.open(): | 48 with self.manager.open(): |
| 55 self.assertIsNone(self.manager.get_oldest()) | 49 self.assertIsNone(self.manager.get_oldest()) |
| 56 for i in xrange(10): | 50 self.make_caches(range(10)) |
| 57 self.manager.request(str(i)) | |
| 58 self.assertEqual(self.manager.get_oldest(), '0') | 51 self.assertEqual(self.manager.get_oldest(), '0') |
| 59 | 52 |
| 60 def test_get_timestamp(self): | 53 def test_get_timestamp(self): |
| 61 now = 0 | 54 now = 0 |
| 62 time_fn = lambda: now | 55 time_fn = lambda: now |
| 63 with self.manager.open(time_fn=time_fn): | 56 with self.manager.open(time_fn=time_fn): |
| 64 for i in xrange(10): | 57 for i in xrange(10): |
| 65 self.manager.request(str(i)) | 58 self.make_caches([i]) |
| 66 now += 1 | 59 now += 1 |
| 67 for i in xrange(10): | 60 for i in xrange(10): |
| 68 self.assertEqual(i, self.manager.get_timestamp(str(i))) | 61 self.assertEqual(i, self.manager.get_timestamp(str(i))) |
| 69 | 62 |
| 70 def test_create_symlinks(self): | 63 def test_clean_cache(self): |
| 71 dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test') | 64 dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test') |
| 72 with self.manager.open(): | 65 with self.manager.open(): |
| 73 for i in xrange(10): | 66 caches = [('1', 'a'), ('2', 'b')] |
| 74 self.manager.request(str(i)) | 67 self.assertEqual([], os.listdir(self.manager.root_dir)) |
| 75 self.manager.create_symlinks(dest_dir, [('1', 'a'), ('3', 'c')]) | 68 |
| 76 self.assertEqual({'a', 'c'}, set(os.listdir(dest_dir))) | 69 self.manager.install(dest_dir, caches) |
| 70 self.assertEqual({'a', 'b'}, set(os.listdir(dest_dir))) | |
| 71 self.assertFalse(self.manager.available) | |
| 72 self.assertEqual([], os.listdir(self.manager.root_dir)) | |
| 73 | |
| 74 write_file(os.path.join(dest_dir, 'a', 'x'), 'x') | |
| 75 write_file(os.path.join(dest_dir, 'b', 'y'), 'y') | |
| 76 | |
| 77 self.manager.uninstall(dest_dir, caches) | |
| 78 | |
| 79 self.assertEqual(3, len(os.listdir(self.manager.root_dir))) | |
| 80 path1 = os.path.join(self.manager.root_dir, self.manager._lru['1']) | |
| 81 path2 = os.path.join(self.manager.root_dir, self.manager._lru['2']) | |
| 82 | |
| 83 self.assertEqual('x', read_file(os.path.join(path1, 'x'))) | |
| 84 self.assertEqual('y', read_file(os.path.join(path2, 'y'))) | |
| 85 self.assertEqual(os.readlink(self.manager._get_named_path('1')), path1) | |
| 86 self.assertEqual(os.readlink(self.manager._get_named_path('2')), path2) | |
| 87 | |
| 88 def test_existing_cache(self): | |
| 89 dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test') | |
| 90 with self.manager.open(): | |
| 91 # Assume test_clean passes. | |
| 92 self.manager.install(dest_dir, [('1', 'a')]) | |
| 93 write_file(os.path.join(dest_dir, 'a', 'x'), 'x') | |
| 94 self.manager.uninstall(dest_dir, [('1', 'a')]) | |
| 95 | |
| 96 # Test starts here. | |
| 97 caches = [('1', 'a'), ('2', 'b')] | |
| 98 self.manager.install(dest_dir, caches) | |
| 99 self.assertEqual({'a', 'b'}, set(os.listdir(dest_dir))) | |
| 100 self.assertFalse(self.manager.available) | |
| 101 self.assertEqual(['named'], os.listdir(self.manager.root_dir)) | |
| 102 | |
| 77 self.assertEqual( | 103 self.assertEqual( |
| 78 os.readlink(os.path.join(dest_dir, 'a')), self.manager.request('1')) | 104 'x', read_file(os.path.join(os.path.join(dest_dir, 'a', 'x')))) |
| 79 self.assertEqual( | 105 write_file(os.path.join(dest_dir, 'a', 'x'), 'x2') |
| 80 os.readlink(os.path.join(dest_dir, 'c')), self.manager.request('3')) | 106 write_file(os.path.join(dest_dir, 'b', 'y'), 'y') |
| 81 self.assertEqual([], os.listdir(os.path.join(dest_dir, 'c'))) | 107 |
| 108 self.manager.uninstall(dest_dir, caches) | |
| 109 | |
| 110 self.assertEqual(3, len(os.listdir(self.manager.root_dir))) | |
| 111 path1 = os.path.join(self.manager.root_dir, self.manager._lru['1']) | |
| 112 path2 = os.path.join(self.manager.root_dir, self.manager._lru['2']) | |
| 113 | |
| 114 self.assertEqual('x2', read_file(os.path.join(path1, 'x'))) | |
| 115 self.assertEqual('y', read_file(os.path.join(path2, 'y'))) | |
| 116 self.assertEqual(os.readlink(self.manager._get_named_path('1')), path1) | |
| 117 self.assertEqual(os.readlink(self.manager._get_named_path('2')), path2) | |
| 82 | 118 |
| 83 def test_trim(self): | 119 def test_trim(self): |
| 84 with self.manager.open(): | 120 with self.manager.open(): |
| 85 item_count = named_cache.MAX_CACHE_SIZE + 10 | 121 item_count = named_cache.MAX_CACHE_SIZE + 10 |
| 86 for i in xrange(item_count): | 122 self.make_caches(range(item_count)) |
| 87 self.manager.request(str(i)) | |
| 88 self.assertEqual(len(self.manager), item_count) | 123 self.assertEqual(len(self.manager), item_count) |
| 89 self.manager.trim(None) | 124 self.manager.trim(None) |
| 90 self.assertEqual(len(self.manager), named_cache.MAX_CACHE_SIZE) | 125 self.assertEqual(len(self.manager), named_cache.MAX_CACHE_SIZE) |
| 91 self.assertEqual( | 126 self.assertEqual( |
| 92 set(map(str, xrange(10, 10 + named_cache.MAX_CACHE_SIZE))), | 127 set(map(str, xrange(10, 10 + named_cache.MAX_CACHE_SIZE))), |
| 93 set(os.listdir(os.path.join(self.tempdir, 'named'))), | 128 set(os.listdir(os.path.join(self.tempdir, 'named')))) |
| 94 ) | |
| 95 | 129 |
| 96 def test_corrupted(self): | 130 def test_corrupted(self): |
| 97 with open(os.path.join(self.tempdir, u'state.json'), 'w') as f: | 131 with open(os.path.join(self.tempdir, u'state.json'), 'w') as f: |
| 98 f.write('}}}}') | 132 f.write('}}}}') |
| 99 fs.makedirs(os.path.join(self.tempdir, 'a'), 0777) | 133 fs.makedirs(os.path.join(self.tempdir, 'a'), 0777) |
| 100 with self.manager.open(): | 134 with self.manager.open(): |
| 101 self.assertFalse(os.path.isdir(self.tempdir)) | 135 self.assertFalse(os.path.isdir(self.tempdir)) |
| 102 self.manager.request('a') | 136 self.make_caches(['a']) |
| 103 self.assertTrue(fs.islink(os.path.join(self.tempdir, 'named', 'a'))) | 137 self.assertTrue(fs.islink(os.path.join(self.tempdir, 'named', 'a'))) |
| 104 | 138 |
| 105 | 139 |
| 140 def write_file(path, text): | |
|
M-A Ruel
2017/05/09 18:35:14
keep utility functions at the top.
| |
| 141 with open(path, 'w') as f: | |
|
M-A Ruel
2017/05/09 18:35:14
'wb' and 'rb' ?
nodir
2017/05/09 23:20:09
dunno, I intentionally called the parameter text.
| |
| 142 f.write(text) | |
| 143 | |
| 144 | |
| 145 def read_file(path): | |
| 146 with open(path, 'r') as f: | |
| 147 return f.read() | |
| 148 | |
| 149 | |
| 106 if __name__ == '__main__': | 150 if __name__ == '__main__': |
| 107 fix_encoding.fix_encoding() | 151 fix_encoding.fix_encoding() |
| 108 VERBOSE = '-v' in sys.argv | 152 VERBOSE = '-v' in sys.argv |
| 109 logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) | 153 logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) |
| 110 unittest.main() | 154 unittest.main() |
| OLD | NEW |