| Index: client/tests/named_cache_test.py
|
| diff --git a/client/tests/named_cache_test.py b/client/tests/named_cache_test.py
|
| index e1330f0d604cd32202aed8448a0b8c885016fc7e..bef40ece1ffbc2b3f0c85a66871177632e4563cb 100755
|
| --- a/client/tests/named_cache_test.py
|
| +++ b/client/tests/named_cache_test.py
|
| @@ -20,6 +20,16 @@ from utils import fs
|
| import named_cache
|
|
|
|
|
| +def write_file(path, contents):
|
| + with open(path, 'wb') as f:
|
| + f.write(contents)
|
| +
|
| +
|
| +def read_file(path):
|
| + with open(path, 'rb') as f:
|
| + return f.read()
|
| +
|
| +
|
| class CacheManagerTest(unittest.TestCase):
|
| def setUp(self):
|
| self.tempdir = tempfile.mkdtemp(prefix=u'named_cache_test')
|
| @@ -31,67 +41,111 @@ class CacheManagerTest(unittest.TestCase):
|
| finally:
|
| super(CacheManagerTest, self).tearDown()
|
|
|
| - def test_request(self):
|
| - with self.assertRaises(AssertionError):
|
| - self.manager.request('foo') # manager is not open
|
| - with self.manager.open():
|
| - foo_path = self.manager.request('foo')
|
| - self.assertEqual(foo_path, self.manager.request('foo'))
|
| - bar_path = self.manager.request('bar')
|
| - self.assertEqual(
|
| - foo_path,
|
| - os.path.abspath(os.readlink(
|
| - os.path.join(self.tempdir, 'named', 'foo'))))
|
| - self.assertEqual(
|
| - bar_path,
|
| - os.path.abspath(os.readlink(
|
| - os.path.join(self.tempdir, 'named', 'bar'))))
|
| - self.assertEqual(os.path.dirname(bar_path), self.tempdir)
|
| - self.assertEqual([], os.listdir(foo_path))
|
| - self.assertEqual([], os.listdir(bar_path))
|
| + def make_caches(self, names):
|
| + dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test')
|
| + try:
|
| + names = map(unicode, names)
|
| + for n in names:
|
| + self.manager.install(os.path.join(dest_dir, n), n)
|
| + self.assertEqual(set(names), set(os.listdir(dest_dir)))
|
| + for n in names:
|
| + self.manager.uninstall(os.path.join(dest_dir, n), n)
|
| + self.assertEqual([], os.listdir(dest_dir))
|
| + self.assertTrue(self.manager.available.issuperset(names))
|
| + finally:
|
| + file_path.rmtree(dest_dir)
|
|
|
| def test_get_oldest(self):
|
| with self.manager.open():
|
| self.assertIsNone(self.manager.get_oldest())
|
| - for i in xrange(10):
|
| - self.manager.request(str(i))
|
| - self.assertEqual(self.manager.get_oldest(), '0')
|
| + self.make_caches(range(10))
|
| + self.assertEqual(self.manager.get_oldest(), u'0')
|
|
|
| def test_get_timestamp(self):
|
| now = 0
|
| time_fn = lambda: now
|
| with self.manager.open(time_fn=time_fn):
|
| for i in xrange(10):
|
| - self.manager.request(str(i))
|
| + self.make_caches([i])
|
| now += 1
|
| for i in xrange(10):
|
| self.assertEqual(i, self.manager.get_timestamp(str(i)))
|
|
|
| - def test_create_symlinks(self):
|
| + def test_clean_cache(self):
|
| dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test')
|
| with self.manager.open():
|
| - for i in xrange(10):
|
| - self.manager.request(str(i))
|
| - self.manager.create_symlinks(dest_dir, [('1', 'a'), ('3', 'c')])
|
| - self.assertEqual({'a', 'c'}, set(os.listdir(dest_dir)))
|
| - self.assertEqual(
|
| - os.readlink(os.path.join(dest_dir, 'a')), self.manager.request('1'))
|
| + self.assertEqual([], os.listdir(self.manager.root_dir))
|
| +
|
| + a_path = os.path.join(dest_dir, u'a')
|
| + b_path = os.path.join(dest_dir, u'b')
|
| +
|
| + self.manager.install(a_path, u'1')
|
| + self.manager.install(b_path, u'2')
|
| +
|
| + self.assertEqual({u'a', u'b'}, set(os.listdir(dest_dir)))
|
| + self.assertFalse(self.manager.available)
|
| + self.assertEqual([], os.listdir(self.manager.root_dir))
|
| +
|
| + write_file(os.path.join(a_path, u'x'), u'x')
|
| + write_file(os.path.join(b_path, u'y'), u'y')
|
| +
|
| + self.manager.uninstall(a_path, u'1')
|
| + self.manager.uninstall(b_path, u'2')
|
| +
|
| + self.assertEqual(3, len(os.listdir(self.manager.root_dir)))
|
| + path1 = os.path.join(self.manager.root_dir, self.manager._lru['1'])
|
| + path2 = os.path.join(self.manager.root_dir, self.manager._lru['2'])
|
| +
|
| + self.assertEqual('x', read_file(os.path.join(path1, u'x')))
|
| + self.assertEqual('y', read_file(os.path.join(path2, u'y')))
|
| + self.assertEqual(os.readlink(self.manager._get_named_path('1')), path1)
|
| + self.assertEqual(os.readlink(self.manager._get_named_path('2')), path2)
|
| +
|
| + def test_existing_cache(self):
|
| + dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test')
|
| + with self.manager.open():
|
| + # Assume test_clean passes.
|
| + a_path = os.path.join(dest_dir, u'a')
|
| + b_path = os.path.join(dest_dir, u'b')
|
| +
|
| + self.manager.install(a_path, u'1')
|
| + write_file(os.path.join(dest_dir, u'a', u'x'), u'x')
|
| + self.manager.uninstall(a_path, u'1')
|
| +
|
| + # Test starts here.
|
| + self.manager.install(a_path, u'1')
|
| + self.manager.install(b_path, u'2')
|
| + self.assertEqual({'a', 'b'}, set(os.listdir(dest_dir)))
|
| + self.assertFalse(self.manager.available)
|
| + self.assertEqual(['named'], os.listdir(self.manager.root_dir))
|
| +
|
| self.assertEqual(
|
| - os.readlink(os.path.join(dest_dir, 'c')), self.manager.request('3'))
|
| - self.assertEqual([], os.listdir(os.path.join(dest_dir, 'c')))
|
| + 'x', read_file(os.path.join(os.path.join(dest_dir, u'a', u'x'))))
|
| + write_file(os.path.join(a_path, 'x'), 'x2')
|
| + write_file(os.path.join(b_path, 'y'), 'y')
|
| +
|
| + self.manager.uninstall(a_path, '1')
|
| + self.manager.uninstall(b_path, '2')
|
| +
|
| + self.assertEqual(3, len(os.listdir(self.manager.root_dir)))
|
| + path1 = os.path.join(self.manager.root_dir, self.manager._lru['1'])
|
| + path2 = os.path.join(self.manager.root_dir, self.manager._lru['2'])
|
| +
|
| + self.assertEqual('x2', read_file(os.path.join(path1, 'x')))
|
| + self.assertEqual('y', read_file(os.path.join(path2, 'y')))
|
| + self.assertEqual(os.readlink(self.manager._get_named_path('1')), path1)
|
| + self.assertEqual(os.readlink(self.manager._get_named_path('2')), path2)
|
|
|
| def test_trim(self):
|
| with self.manager.open():
|
| item_count = named_cache.MAX_CACHE_SIZE + 10
|
| - for i in xrange(item_count):
|
| - self.manager.request(str(i))
|
| + self.make_caches(range(item_count))
|
| self.assertEqual(len(self.manager), item_count)
|
| self.manager.trim(None)
|
| self.assertEqual(len(self.manager), named_cache.MAX_CACHE_SIZE)
|
| self.assertEqual(
|
| set(map(str, xrange(10, 10 + named_cache.MAX_CACHE_SIZE))),
|
| - set(os.listdir(os.path.join(self.tempdir, 'named'))),
|
| - )
|
| + set(os.listdir(os.path.join(self.tempdir, 'named'))))
|
|
|
| def test_corrupted(self):
|
| with open(os.path.join(self.tempdir, u'state.json'), 'w') as f:
|
| @@ -99,10 +153,9 @@ class CacheManagerTest(unittest.TestCase):
|
| fs.makedirs(os.path.join(self.tempdir, 'a'), 0777)
|
| with self.manager.open():
|
| self.assertFalse(os.path.isdir(self.tempdir))
|
| - self.manager.request('a')
|
| + self.make_caches(['a'])
|
| self.assertTrue(fs.islink(os.path.join(self.tempdir, 'named', 'a')))
|
|
|
| -
|
| if __name__ == '__main__':
|
| fix_encoding.fix_encoding()
|
| VERBOSE = '-v' in sys.argv
|
|
|