Index: client/tests/named_cache_test.py |
diff --git a/client/tests/named_cache_test.py b/client/tests/named_cache_test.py |
index e1330f0d604cd32202aed8448a0b8c885016fc7e..bef40ece1ffbc2b3f0c85a66871177632e4563cb 100755 |
--- a/client/tests/named_cache_test.py |
+++ b/client/tests/named_cache_test.py |
@@ -20,6 +20,16 @@ from utils import fs |
import named_cache |
+def write_file(path, contents): |
+ with open(path, 'wb') as f: |
+ f.write(contents) |
+ |
+ |
+def read_file(path): |
+ with open(path, 'rb') as f: |
+ return f.read() |
+ |
+ |
class CacheManagerTest(unittest.TestCase): |
def setUp(self): |
self.tempdir = tempfile.mkdtemp(prefix=u'named_cache_test') |
@@ -31,67 +41,111 @@ class CacheManagerTest(unittest.TestCase): |
finally: |
super(CacheManagerTest, self).tearDown() |
- def test_request(self): |
- with self.assertRaises(AssertionError): |
- self.manager.request('foo') # manager is not open |
- with self.manager.open(): |
- foo_path = self.manager.request('foo') |
- self.assertEqual(foo_path, self.manager.request('foo')) |
- bar_path = self.manager.request('bar') |
- self.assertEqual( |
- foo_path, |
- os.path.abspath(os.readlink( |
- os.path.join(self.tempdir, 'named', 'foo')))) |
- self.assertEqual( |
- bar_path, |
- os.path.abspath(os.readlink( |
- os.path.join(self.tempdir, 'named', 'bar')))) |
- self.assertEqual(os.path.dirname(bar_path), self.tempdir) |
- self.assertEqual([], os.listdir(foo_path)) |
- self.assertEqual([], os.listdir(bar_path)) |
+ def make_caches(self, names): |
+ dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test') |
+ try: |
+ names = map(unicode, names) |
+ for n in names: |
+ self.manager.install(os.path.join(dest_dir, n), n) |
+ self.assertEqual(set(names), set(os.listdir(dest_dir))) |
+ for n in names: |
+ self.manager.uninstall(os.path.join(dest_dir, n), n) |
+ self.assertEqual([], os.listdir(dest_dir)) |
+ self.assertTrue(self.manager.available.issuperset(names)) |
+ finally: |
+ file_path.rmtree(dest_dir) |
def test_get_oldest(self): |
with self.manager.open(): |
self.assertIsNone(self.manager.get_oldest()) |
- for i in xrange(10): |
- self.manager.request(str(i)) |
- self.assertEqual(self.manager.get_oldest(), '0') |
+ self.make_caches(range(10)) |
+ self.assertEqual(self.manager.get_oldest(), u'0') |
def test_get_timestamp(self): |
now = 0 |
time_fn = lambda: now |
with self.manager.open(time_fn=time_fn): |
for i in xrange(10): |
- self.manager.request(str(i)) |
+ self.make_caches([i]) |
now += 1 |
for i in xrange(10): |
self.assertEqual(i, self.manager.get_timestamp(str(i))) |
- def test_create_symlinks(self): |
+ def test_clean_cache(self): |
dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test') |
with self.manager.open(): |
- for i in xrange(10): |
- self.manager.request(str(i)) |
- self.manager.create_symlinks(dest_dir, [('1', 'a'), ('3', 'c')]) |
- self.assertEqual({'a', 'c'}, set(os.listdir(dest_dir))) |
- self.assertEqual( |
- os.readlink(os.path.join(dest_dir, 'a')), self.manager.request('1')) |
+ self.assertEqual([], os.listdir(self.manager.root_dir)) |
+ |
+ a_path = os.path.join(dest_dir, u'a') |
+ b_path = os.path.join(dest_dir, u'b') |
+ |
+ self.manager.install(a_path, u'1') |
+ self.manager.install(b_path, u'2') |
+ |
+ self.assertEqual({u'a', u'b'}, set(os.listdir(dest_dir))) |
+ self.assertFalse(self.manager.available) |
+ self.assertEqual([], os.listdir(self.manager.root_dir)) |
+ |
+ write_file(os.path.join(a_path, u'x'), u'x') |
+ write_file(os.path.join(b_path, u'y'), u'y') |
+ |
+ self.manager.uninstall(a_path, u'1') |
+ self.manager.uninstall(b_path, u'2') |
+ |
+ self.assertEqual(3, len(os.listdir(self.manager.root_dir))) |
+ path1 = os.path.join(self.manager.root_dir, self.manager._lru['1']) |
+ path2 = os.path.join(self.manager.root_dir, self.manager._lru['2']) |
+ |
+ self.assertEqual('x', read_file(os.path.join(path1, u'x'))) |
+ self.assertEqual('y', read_file(os.path.join(path2, u'y'))) |
+ self.assertEqual(os.readlink(self.manager._get_named_path('1')), path1) |
+ self.assertEqual(os.readlink(self.manager._get_named_path('2')), path2) |
+ |
+ def test_existing_cache(self): |
+ dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test') |
+ with self.manager.open(): |
+ # Assume test_clean passes. |
+ a_path = os.path.join(dest_dir, u'a') |
+ b_path = os.path.join(dest_dir, u'b') |
+ |
+ self.manager.install(a_path, u'1') |
+ write_file(os.path.join(dest_dir, u'a', u'x'), u'x') |
+ self.manager.uninstall(a_path, u'1') |
+ |
+ # Test starts here. |
+ self.manager.install(a_path, u'1') |
+ self.manager.install(b_path, u'2') |
+ self.assertEqual({'a', 'b'}, set(os.listdir(dest_dir))) |
+ self.assertFalse(self.manager.available) |
+ self.assertEqual(['named'], os.listdir(self.manager.root_dir)) |
+ |
self.assertEqual( |
- os.readlink(os.path.join(dest_dir, 'c')), self.manager.request('3')) |
- self.assertEqual([], os.listdir(os.path.join(dest_dir, 'c'))) |
+ 'x', read_file(os.path.join(os.path.join(dest_dir, u'a', u'x')))) |
+ write_file(os.path.join(a_path, 'x'), 'x2') |
+ write_file(os.path.join(b_path, 'y'), 'y') |
+ |
+ self.manager.uninstall(a_path, '1') |
+ self.manager.uninstall(b_path, '2') |
+ |
+ self.assertEqual(3, len(os.listdir(self.manager.root_dir))) |
+ path1 = os.path.join(self.manager.root_dir, self.manager._lru['1']) |
+ path2 = os.path.join(self.manager.root_dir, self.manager._lru['2']) |
+ |
+ self.assertEqual('x2', read_file(os.path.join(path1, 'x'))) |
+ self.assertEqual('y', read_file(os.path.join(path2, 'y'))) |
+ self.assertEqual(os.readlink(self.manager._get_named_path('1')), path1) |
+ self.assertEqual(os.readlink(self.manager._get_named_path('2')), path2) |
def test_trim(self): |
with self.manager.open(): |
item_count = named_cache.MAX_CACHE_SIZE + 10 |
- for i in xrange(item_count): |
- self.manager.request(str(i)) |
+ self.make_caches(range(item_count)) |
self.assertEqual(len(self.manager), item_count) |
self.manager.trim(None) |
self.assertEqual(len(self.manager), named_cache.MAX_CACHE_SIZE) |
self.assertEqual( |
set(map(str, xrange(10, 10 + named_cache.MAX_CACHE_SIZE))), |
- set(os.listdir(os.path.join(self.tempdir, 'named'))), |
- ) |
+ set(os.listdir(os.path.join(self.tempdir, 'named')))) |
def test_corrupted(self): |
with open(os.path.join(self.tempdir, u'state.json'), 'w') as f: |
@@ -99,10 +153,9 @@ class CacheManagerTest(unittest.TestCase): |
fs.makedirs(os.path.join(self.tempdir, 'a'), 0777) |
with self.manager.open(): |
self.assertFalse(os.path.isdir(self.tempdir)) |
- self.manager.request('a') |
+ self.make_caches(['a']) |
self.assertTrue(fs.islink(os.path.join(self.tempdir, 'named', 'a'))) |
- |
if __name__ == '__main__': |
fix_encoding.fix_encoding() |
VERBOSE = '-v' in sys.argv |