Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(141)

Side by Side Diff: client/isolateserver.py

Issue 2853413002: Add unit test to clean_caches() (Closed)
Patch Set: os.path.relpath Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/cipd.py ('k') | client/named_cache.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The LUCI Authors. All rights reserved. 2 # Copyright 2013 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 """Archives a set of files or directories to an Isolate Server.""" 6 """Archives a set of files or directories to an Isolate Server."""
7 7
8 __version__ = '0.8.0' 8 __version__ = '0.8.0'
9 9
10 import errno 10 import errno
(...skipping 962 matching lines...) Expand 10 before | Expand all | Expand 10 after
973 """ 973 """
974 raise NotImplementedError() 974 raise NotImplementedError()
975 975
976 def write(self, digest, content): 976 def write(self, digest, content):
977 """Reads data from |content| generator and stores it in cache. 977 """Reads data from |content| generator and stores it in cache.
978 978
979 Returns digest to simplify chaining. 979 Returns digest to simplify chaining.
980 """ 980 """
981 raise NotImplementedError() 981 raise NotImplementedError()
982 982
983 def trim(self):
984 """Enforces cache policies.
985
986 Returns:
987 Number of items evicted.
988 """
989 raise NotImplementedError()
990
983 991
984 class MemoryCache(LocalCache): 992 class MemoryCache(LocalCache):
985 """LocalCache implementation that stores everything in memory.""" 993 """LocalCache implementation that stores everything in memory."""
986 994
987 def __init__(self, file_mode_mask=0500): 995 def __init__(self, file_mode_mask=0500):
988 """Args: 996 """Args:
989 file_mode_mask: bit mask to AND file mode with. Default value will make 997 file_mode_mask: bit mask to AND file mode with. Default value will make
990 all mapped files to be read only. 998 all mapped files to be read only.
991 """ 999 """
992 super(MemoryCache, self).__init__() 1000 super(MemoryCache, self).__init__()
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
1024 return io.BytesIO(d) 1032 return io.BytesIO(d)
1025 1033
1026 def write(self, digest, content): 1034 def write(self, digest, content):
1027 # Assemble whole stream before taking the lock. 1035 # Assemble whole stream before taking the lock.
1028 data = ''.join(content) 1036 data = ''.join(content)
1029 with self._lock: 1037 with self._lock:
1030 self._contents[digest] = data 1038 self._contents[digest] = data
1031 self._added.append(len(data)) 1039 self._added.append(len(data))
1032 return digest 1040 return digest
1033 1041
1042 def trim(self):
1043 """Trimming is not implemented for MemoryCache."""
1044 return 0
1045
1034 1046
1035 class CachePolicies(object): 1047 class CachePolicies(object):
1036 def __init__(self, max_cache_size, min_free_space, max_items): 1048 def __init__(self, max_cache_size, min_free_space, max_items):
1037 """ 1049 """
1038 Arguments: 1050 Arguments:
1039 - max_cache_size: Trim if the cache gets larger than this value. If 0, the 1051 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1040 cache is effectively a leak. 1052 cache is effectively a leak.
1041 - min_free_space: Trim if disk free space becomes lower than this value. If 1053 - min_free_space: Trim if disk free space becomes lower than this value. If
1042 0, it unconditionally fill the disk. 1054 0, it unconditionally fill the disk.
1043 - max_items: Maximum number of items to keep in the cache. If 0, do not 1055 - max_items: Maximum number of items to keep in the cache. If 0, do not
1044 enforce a limit. 1056 enforce a limit.
1045 """ 1057 """
1046 self.max_cache_size = max_cache_size 1058 self.max_cache_size = max_cache_size
1047 self.min_free_space = min_free_space 1059 self.min_free_space = min_free_space
1048 self.max_items = max_items 1060 self.max_items = max_items
1049 1061
1050 1062
1051 class DiskCache(LocalCache): 1063 class DiskCache(LocalCache):
1052 """Stateful LRU cache in a flat hash table in a directory. 1064 """Stateful LRU cache in a flat hash table in a directory.
1053 1065
1054 Saves its state as json file. 1066 Saves its state as json file.
1055 """ 1067 """
1056 STATE_FILE = u'state.json' 1068 STATE_FILE = u'state.json'
1057 1069
1058 def __init__(self, cache_dir, policies, hash_algo, trim=True): 1070 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
1059 """ 1071 """
1060 Arguments: 1072 Arguments:
1061 cache_dir: directory where to place the cache. 1073 cache_dir: directory where to place the cache.
1062 policies: cache retention policies. 1074 policies: cache retention policies.
1063 algo: hashing algorithm used. 1075 algo: hashing algorithm used.
1064 trim: if True to enforce |policies| right away. 1076 trim: if True to enforce |policies| right away.
1065 It can be done later by calling trim() explicitly. 1077 It can be done later by calling trim() explicitly.
1066 """ 1078 """
1067 # All protected methods (starting with '_') except _path should be called 1079 # All protected methods (starting with '_') except _path should be called
1068 # with self._lock held. 1080 # with self._lock held.
1069 super(DiskCache, self).__init__() 1081 super(DiskCache, self).__init__()
1070 self.cache_dir = cache_dir 1082 self.cache_dir = cache_dir
1071 self.policies = policies 1083 self.policies = policies
1072 self.hash_algo = hash_algo 1084 self.hash_algo = hash_algo
1073 self.state_file = os.path.join(cache_dir, self.STATE_FILE) 1085 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1074 # Items in a LRU lookup dict(digest: size). 1086 # Items in a LRU lookup dict(digest: size).
1075 self._lru = lru.LRUDict() 1087 self._lru = lru.LRUDict()
1076 # Current cached free disk space. It is updated by self._trim(). 1088 # Current cached free disk space. It is updated by self._trim().
1077 file_path.ensure_tree(self.cache_dir) 1089 file_path.ensure_tree(self.cache_dir)
1078 self._free_disk = file_path.get_free_space(self.cache_dir) 1090 self._free_disk = file_path.get_free_space(self.cache_dir)
1079 # The first item in the LRU cache that must not be evicted during this run 1091 # The first item in the LRU cache that must not be evicted during this run
1080 # since it was referenced. All items more recent that _protected in the LRU 1092 # since it was referenced. All items more recent that _protected in the LRU
1081 # cache are also inherently protected. It could be a set() of all items 1093 # cache are also inherently protected. It could be a set() of all items
1082 # referenced but this increases memory usage without a use case. 1094 # referenced but this increases memory usage without a use case.
1083 self._protected = None 1095 self._protected = None
1084 # Cleanup operations done by self._load(), if any. 1096 # Cleanup operations done by self._load(), if any.
1085 self._operations = [] 1097 self._operations = []
1086 with tools.Profiler('Setup'): 1098 with tools.Profiler('Setup'):
1087 with self._lock: 1099 with self._lock:
1088 self._load(trim=trim) 1100 self._load(trim, time_fn)
1089 1101
1090 def __contains__(self, digest): 1102 def __contains__(self, digest):
1091 with self._lock: 1103 with self._lock:
1092 return digest in self._lru 1104 return digest in self._lru
1093 1105
1094 def __enter__(self): 1106 def __enter__(self):
1095 return self 1107 return self
1096 1108
1097 def __exit__(self, _exc_type, _exec_value, _traceback): 1109 def __exit__(self, _exc_type, _exec_value, _traceback):
1098 with tools.Profiler('CleanupTrimming'): 1110 with tools.Profiler('CleanupTrimming'):
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
1150 pass 1162 pass
1151 else: 1163 else:
1152 file_path.try_remove(p) 1164 file_path.try_remove(p)
1153 continue 1165 continue
1154 1166
1155 if previous: 1167 if previous:
1156 # Filter out entries that were not found. 1168 # Filter out entries that were not found.
1157 logging.warning('Removed %d lost files', len(previous)) 1169 logging.warning('Removed %d lost files', len(previous))
1158 for filename in previous: 1170 for filename in previous:
1159 self._lru.pop(filename) 1171 self._lru.pop(filename)
1172 self._save()
1160 1173
1161 # What remains to be done is to hash every single item to 1174 # What remains to be done is to hash every single item to
1162 # detect corruption, then save to ensure state.json is up to date. 1175 # detect corruption, then save to ensure state.json is up to date.
1163 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes. 1176 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1164 # TODO(maruel): Let's revisit once directory metadata is stored in 1177 # TODO(maruel): Let's revisit once directory metadata is stored in
1165 # state.json so only the files that had been mapped since the last cleanup() 1178 # state.json so only the files that had been mapped since the last cleanup()
1166 # call are manually verified. 1179 # call are manually verified.
1167 # 1180 #
1168 #with self._lock: 1181 #with self._lock:
1169 # for digest in self._lru: 1182 # for digest in self._lru:
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
1248 def get_timestamp(self, digest): 1261 def get_timestamp(self, digest):
1249 """Returns timestamp of last use of an item. 1262 """Returns timestamp of last use of an item.
1250 1263
1251 Raises KeyError if item is not found. 1264 Raises KeyError if item is not found.
1252 """ 1265 """
1253 return self._lru.get_timestamp(digest) 1266 return self._lru.get_timestamp(digest)
1254 1267
1255 def trim(self): 1268 def trim(self):
1256 """Forces retention policies.""" 1269 """Forces retention policies."""
1257 with self._lock: 1270 with self._lock:
1258 self._trim() 1271 return self._trim()
1259 1272
1260 def _load(self, trim): 1273 def _load(self, trim, time_fn):
1261 """Loads state of the cache from json file. 1274 """Loads state of the cache from json file.
1262 1275
1263 If cache_dir does not exist on disk, it is created. 1276 If cache_dir does not exist on disk, it is created.
1264 """ 1277 """
1265 self._lock.assert_locked() 1278 self._lock.assert_locked()
1266 1279
1267 if not fs.isfile(self.state_file): 1280 if not fs.isfile(self.state_file):
1268 if not os.path.isdir(self.cache_dir): 1281 if not os.path.isdir(self.cache_dir):
1269 fs.makedirs(self.cache_dir) 1282 fs.makedirs(self.cache_dir)
1270 else: 1283 else:
1271 # Load state of the cache. 1284 # Load state of the cache.
1272 try: 1285 try:
1273 self._lru = lru.LRUDict.load(self.state_file) 1286 self._lru = lru.LRUDict.load(self.state_file)
1274 except ValueError as err: 1287 except ValueError as err:
1275 logging.error('Failed to load cache state: %s' % (err,)) 1288 logging.error('Failed to load cache state: %s' % (err,))
1276 # Don't want to keep broken state file. 1289 # Don't want to keep broken state file.
1277 file_path.try_remove(self.state_file) 1290 file_path.try_remove(self.state_file)
1291 if time_fn:
1292 self._lru.time_fn = time_fn
1278 if trim: 1293 if trim:
1279 self._trim() 1294 self._trim()
1280 # We want the initial cache size after trimming, i.e. what is readily 1295 # We want the initial cache size after trimming, i.e. what is readily
1281 # avaiable. 1296 # avaiable.
1282 self._initial_number_items = len(self._lru) 1297 self._initial_number_items = len(self._lru)
1283 self._initial_size = sum(self._lru.itervalues()) 1298 self._initial_size = sum(self._lru.itervalues())
1284 if self._evicted: 1299 if self._evicted:
1285 logging.info( 1300 logging.info(
1286 'Trimming evicted items with the following sizes: %s', 1301 'Trimming evicted items with the following sizes: %s',
1287 sorted(self._evicted)) 1302 sorted(self._evicted))
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
1331 1346
1332 logging.warning( 1347 logging.warning(
1333 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,' 1348 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1334 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)', 1349 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1335 trimmed_due_to_space, 1350 trimmed_due_to_space,
1336 self._free_disk / 1024., 1351 self._free_disk / 1024.,
1337 total_usage / 1024., 1352 total_usage / 1024.,
1338 usage_percent, 1353 usage_percent,
1339 self.policies.max_cache_size / 1024.) 1354 self.policies.max_cache_size / 1024.)
1340 self._save() 1355 self._save()
1356 return trimmed_due_to_space
1341 1357
1342 def _path(self, digest): 1358 def _path(self, digest):
1343 """Returns the path to one item.""" 1359 """Returns the path to one item."""
1344 return os.path.join(self.cache_dir, digest) 1360 return os.path.join(self.cache_dir, digest)
1345 1361
1346 def _remove_lru_file(self, allow_protected): 1362 def _remove_lru_file(self, allow_protected):
1347 """Removes the lastest recently used file and returns its size.""" 1363 """Removes the lastest recently used file and returns its size."""
1348 self._lock.assert_locked() 1364 self._lock.assert_locked()
1349 try: 1365 try:
1350 digest, (size, _) = self._lru.get_oldest() 1366 digest, (size, _) = self._lru.get_oldest()
1351 if not allow_protected and digest == self._protected: 1367 if not allow_protected and digest == self._protected:
1352 raise Error('Not enough space to fetch the whole isolated tree') 1368 raise Error(
1369 'Not enough space to fetch the whole isolated tree; %sb free, min '
1370 'is %sb' % (self._free_disk, self.policies.min_free_space))
1353 except KeyError: 1371 except KeyError:
1354 raise Error('Nothing to remove') 1372 raise Error('Nothing to remove')
1355 digest, (size, _) = self._lru.pop_oldest() 1373 digest, (size, _) = self._lru.pop_oldest()
1356 logging.debug('Removing LRU file %s', digest) 1374 logging.debug('Removing LRU file %s', digest)
1357 self._delete_file(digest, size) 1375 self._delete_file(digest, size)
1358 return size 1376 return size
1359 1377
1360 def _add(self, digest, size=UNKNOWN_FILE_SIZE): 1378 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1361 """Adds an item into LRU cache marking it as a newest one.""" 1379 """Adds an item into LRU cache marking it as a newest one."""
1362 self._lock.assert_locked() 1380 self._lock.assert_locked()
(...skipping 622 matching lines...) Expand 10 before | Expand all | Expand 10 after
1985 cache_group.add_option( 2003 cache_group.add_option(
1986 '--max-items', 2004 '--max-items',
1987 type='int', 2005 type='int',
1988 metavar='NNN', 2006 metavar='NNN',
1989 default=100000, 2007 default=100000,
1990 help='Trim if more than this number of items are in the cache ' 2008 help='Trim if more than this number of items are in the cache '
1991 'default=%default') 2009 'default=%default')
1992 parser.add_option_group(cache_group) 2010 parser.add_option_group(cache_group)
1993 2011
1994 2012
1995 def process_cache_options(options, trim=True): 2013 def process_cache_options(options, **kwargs):
1996 if options.cache: 2014 if options.cache:
1997 policies = CachePolicies( 2015 policies = CachePolicies(
1998 options.max_cache_size, options.min_free_space, options.max_items) 2016 options.max_cache_size, options.min_free_space, options.max_items)
1999 2017
2000 # |options.cache| path may not exist until DiskCache() instance is created. 2018 # |options.cache| path may not exist until DiskCache() instance is created.
2001 return DiskCache( 2019 return DiskCache(
2002 unicode(os.path.abspath(options.cache)), 2020 unicode(os.path.abspath(options.cache)),
2003 policies, 2021 policies,
2004 isolated_format.get_hash_algo(options.namespace), 2022 isolated_format.get_hash_algo(options.namespace),
2005 trim=trim) 2023 **kwargs)
2006 else: 2024 else:
2007 return MemoryCache() 2025 return MemoryCache()
2008 2026
2009 2027
2010 class OptionParserIsolateServer(logging_utils.OptionParserWithLogging): 2028 class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
2011 def __init__(self, **kwargs): 2029 def __init__(self, **kwargs):
2012 logging_utils.OptionParserWithLogging.__init__( 2030 logging_utils.OptionParserWithLogging.__init__(
2013 self, 2031 self,
2014 version=__version__, 2032 version=__version__,
2015 prog=os.path.basename(sys.modules[__name__].__file__), 2033 prog=os.path.basename(sys.modules[__name__].__file__),
(...skipping 12 matching lines...) Expand all
2028 return dispatcher.execute(OptionParserIsolateServer(), args) 2046 return dispatcher.execute(OptionParserIsolateServer(), args)
2029 2047
2030 2048
2031 if __name__ == '__main__': 2049 if __name__ == '__main__':
2032 subprocess42.inhibit_os_error_reporting() 2050 subprocess42.inhibit_os_error_reporting()
2033 fix_encoding.fix_encoding() 2051 fix_encoding.fix_encoding()
2034 tools.disable_buffering() 2052 tools.disable_buffering()
2035 colorama.init() 2053 colorama.init()
2036 file_path.enable_symlink() 2054 file_path.enable_symlink()
2037 sys.exit(main(sys.argv[1:])) 2055 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « client/cipd.py ('k') | client/named_cache.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698