Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(401)

Side by Side Diff: client/isolateserver.py

Issue 2060983006: luci-py/isolateserver.py: Add archive support when downloading. (Closed) Base URL: https://github.com/luci/luci-py.git@master
Patch Set: Tests now pass. Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The LUCI Authors. All rights reserved. 2 # Copyright 2013 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 """Archives a set of files or directories to an Isolate Server.""" 6 """Archives a set of files or directories to an Isolate Server."""
7 7
8 __version__ = '0.4.9' 8 __version__ = '0.4.9'
9 9
10 import base64 10 import base64
11 import functools 11 import functools
12 import errno 12 import errno
13 import logging 13 import logging
14 import optparse 14 import optparse
15 import os 15 import os
16 import re 16 import re
17 import stat
17 import signal 18 import signal
18 import sys 19 import sys
19 import tempfile 20 import tempfile
20 import threading 21 import threading
21 import time 22 import time
22 import types 23 import types
23 import zlib 24 import zlib
24 25
26 import cStringIO as StringIO
27
25 from third_party import colorama 28 from third_party import colorama
26 from third_party.depot_tools import fix_encoding 29 from third_party.depot_tools import fix_encoding
27 from third_party.depot_tools import subcommand 30 from third_party.depot_tools import subcommand
28 31
32 from libs import arfile
29 from utils import file_path 33 from utils import file_path
30 from utils import fs 34 from utils import fs
31 from utils import logging_utils 35 from utils import logging_utils
32 from utils import lru 36 from utils import lru
33 from utils import net 37 from utils import net
34 from utils import on_error 38 from utils import on_error
35 from utils import subprocess42 39 from utils import subprocess42
36 from utils import threading_utils 40 from utils import threading_utils
37 from utils import tools 41 from utils import tools
38 42
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
142 """ 146 """
143 file_path.ensure_tree(os.path.dirname(path)) 147 file_path.ensure_tree(os.path.dirname(path))
144 total = 0 148 total = 0
145 with fs.open(path, 'wb') as f: 149 with fs.open(path, 'wb') as f:
146 for d in content_generator: 150 for d in content_generator:
147 total += len(d) 151 total += len(d)
148 f.write(d) 152 f.write(d)
149 return total 153 return total
150 154
151 155
156 def fileobj_path(fileobj):
157 name = getattr(fileobj, "name", None)
158 if name is None:
159 return
160
161 if not isinstance(name, unicode):
162 name = name.decode(sys.getfilesystemencoding())
163
164 if fs.exists(name):
165 return name
166
167
168 def putfile(srcfileobj, dstpath, file_mode=None):
169 srcpath = fileobj_path(srcfileobj)
170 if srcpath:
171 readonly = file_mode is None or (
172 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
173 if readonly:
174 # We can link the file
175 file_path.link_file(dstpath, srcpath, file_path.HARDLINK_WITH_FALLBACK)
176 else:
177 # We must copy the file
178 file_path.link_file(dstpath, srcpath, file_path.COPY)
179 else:
180 # Need to write out the file
181 with fs.open(dstpath, "wb") as dstfileobj:
182 fs.copyfileobj(srcfileobj, dstfileobj)
183
184 # file_mode of 0 is actually valid, so need explicit check.
185 if file_mode is not None:
186 fs.chmod(dstpath, file_mode)
187
188
152 def zip_compress(content_generator, level=7): 189 def zip_compress(content_generator, level=7):
153 """Reads chunks from |content_generator| and yields zip compressed chunks.""" 190 """Reads chunks from |content_generator| and yields zip compressed chunks."""
154 compressor = zlib.compressobj(level) 191 compressor = zlib.compressobj(level)
155 for chunk in content_generator: 192 for chunk in content_generator:
156 compressed = compressor.compress(chunk) 193 compressed = compressor.compress(chunk)
157 if compressed: 194 if compressed:
158 yield compressed 195 yield compressed
159 tail = compressor.flush(zlib.Z_FINISH) 196 tail = compressor.flush(zlib.Z_FINISH)
160 if tail: 197 if tail:
161 yield tail 198 yield tail
(...skipping 1054 matching lines...) Expand 10 before | Expand all | Expand 10 after
1216 """ 1253 """
1217 cache_dir = None 1254 cache_dir = None
1218 1255
1219 def __init__(self): 1256 def __init__(self):
1220 self._lock = threading_utils.LockWithAssert() 1257 self._lock = threading_utils.LockWithAssert()
1221 # Profiling values. 1258 # Profiling values.
1222 self._added = [] 1259 self._added = []
1223 self._initial_number_items = 0 1260 self._initial_number_items = 0
1224 self._initial_size = 0 1261 self._initial_size = 0
1225 self._evicted = [] 1262 self._evicted = []
1226 self._linked = [] 1263 self._used = []
1227 1264
1228 def __contains__(self, digest): 1265 def __contains__(self, digest):
1229 raise NotImplementedError() 1266 raise NotImplementedError()
1230 1267
1231 def __enter__(self): 1268 def __enter__(self):
1232 """Context manager interface.""" 1269 """Context manager interface."""
1233 return self 1270 return self
1234 1271
1235 def __exit__(self, _exc_type, _exec_value, _traceback): 1272 def __exit__(self, _exc_type, _exec_value, _traceback):
1236 """Context manager interface.""" 1273 """Context manager interface."""
1237 return False 1274 return False
1238 1275
1239 @property 1276 @property
1240 def added(self): 1277 def added(self):
1241 return self._added[:] 1278 return self._added[:]
1242 1279
1243 @property 1280 @property
1244 def evicted(self): 1281 def evicted(self):
1245 return self._evicted[:] 1282 return self._evicted[:]
1246 1283
1247 @property 1284 @property
1285 def used(self):
1286 return self._used[:]
1287
1288 @property
1248 def initial_number_items(self): 1289 def initial_number_items(self):
1249 return self._initial_number_items 1290 return self._initial_number_items
1250 1291
1251 @property 1292 @property
1252 def initial_size(self): 1293 def initial_size(self):
1253 return self._initial_size 1294 return self._initial_size
1254 1295
1255 @property
1256 def linked(self):
1257 return self._linked[:]
1258
1259 def cached_set(self): 1296 def cached_set(self):
1260 """Returns a set of all cached digests (always a new object).""" 1297 """Returns a set of all cached digests (always a new object)."""
1261 raise NotImplementedError() 1298 raise NotImplementedError()
1262 1299
1263 def cleanup(self): 1300 def cleanup(self):
1264 """Deletes any corrupted item from the cache and trims it if necessary.""" 1301 """Deletes any corrupted item from the cache and trims it if necessary."""
1265 raise NotImplementedError() 1302 raise NotImplementedError()
1266 1303
1267 def touch(self, digest, size): 1304 def touch(self, digest, size):
1268 """Ensures item is not corrupted and updates its LRU position. 1305 """Ensures item is not corrupted and updates its LRU position.
1269 1306
1270 Arguments: 1307 Arguments:
1271 digest: hash digest of item to check. 1308 digest: hash digest of item to check.
1272 size: expected size of this item. 1309 size: expected size of this item.
1273 1310
1274 Returns: 1311 Returns:
1275 True if item is in cache and not corrupted. 1312 True if item is in cache and not corrupted.
1276 """ 1313 """
1277 raise NotImplementedError() 1314 raise NotImplementedError()
1278 1315
1279 def evict(self, digest): 1316 def evict(self, digest):
1280 """Removes item from cache if it's there.""" 1317 """Removes item from cache if it's there."""
1281 raise NotImplementedError() 1318 raise NotImplementedError()
1282 1319
1283 def read(self, digest): 1320 def getfileobj(self, digest):
1284 """Returns contents of the cached item as a single str.""" 1321 """Returns a file like object.
1322
1323 If file exists on the file system it will have a .name attribute with an
1324 absolute path to the file.
1325 """
1285 raise NotImplementedError() 1326 raise NotImplementedError()
1286 1327
1287 def write(self, digest, content): 1328 def write(self, digest, content):
1288 """Reads data from |content| generator and stores it in cache. 1329 """Reads data from |content| generator and stores it in cache.
1289 1330
1290 Returns digest to simplify chaining. 1331 Returns digest to simplify chaining.
1291 """ 1332 """
1292 raise NotImplementedError() 1333 raise NotImplementedError()
1293 1334
1294 def hardlink(self, digest, dest, file_mode):
1295 """Ensures file at |dest| has same content as cached |digest|.
1296
1297 If file_mode is provided, it is used to set the executable bit if
1298 applicable.
1299 """
1300 raise NotImplementedError()
1301
1302 1335
1303 class MemoryCache(LocalCache): 1336 class MemoryCache(LocalCache):
1304 """LocalCache implementation that stores everything in memory.""" 1337 """LocalCache implementation that stores everything in memory."""
1305 1338
1306 def __init__(self, file_mode_mask=0500): 1339 def __init__(self, file_mode_mask=0500):
1307 """Args: 1340 """Args:
1308 file_mode_mask: bit mask to AND file mode with. Default value will make 1341 file_mode_mask: bit mask to AND file mode with. Default value will make
1309 all mapped files to be read only. 1342 all mapped files to be read only.
1310 """ 1343 """
1311 super(MemoryCache, self).__init__() 1344 super(MemoryCache, self).__init__()
(...skipping 14 matching lines...) Expand all
1326 def touch(self, digest, size): 1359 def touch(self, digest, size):
1327 with self._lock: 1360 with self._lock:
1328 return digest in self._contents 1361 return digest in self._contents
1329 1362
1330 def evict(self, digest): 1363 def evict(self, digest):
1331 with self._lock: 1364 with self._lock:
1332 v = self._contents.pop(digest, None) 1365 v = self._contents.pop(digest, None)
1333 if v is not None: 1366 if v is not None:
1334 self._evicted.add(v) 1367 self._evicted.add(v)
1335 1368
1336 def read(self, digest): 1369 def getfileobj(self, digest):
1337 with self._lock: 1370 with self._lock:
1338 try: 1371 try:
1339 return self._contents[digest] 1372 d = self._contents[digest]
1373 self._used.append(len(d))
1374 return StringIO.StringIO(d)
1340 except KeyError: 1375 except KeyError:
1341 raise CacheMiss(digest) 1376 raise CacheMiss(digest)
1342 1377
1343 def write(self, digest, content): 1378 def write(self, digest, content):
1344 # Assemble whole stream before taking the lock. 1379 # Assemble whole stream before taking the lock.
1345 data = ''.join(content) 1380 data = ''.join(content)
1346 with self._lock: 1381 with self._lock:
1347 self._contents[digest] = data 1382 self._contents[digest] = data
1348 self._added.append(len(data)) 1383 self._added.append(len(data))
1349 return digest 1384 return digest
1350 1385
1351 def hardlink(self, digest, dest, file_mode):
1352 """Since data is kept in memory, there is no filenode to hardlink."""
1353 data = self.read(digest)
1354 file_write(dest, [data])
1355 if file_mode is not None:
1356 fs.chmod(dest, file_mode & self._file_mode_mask)
1357 with self._lock:
1358 self._linked.append(len(data))
1359
1360 1386
1361 class CachePolicies(object): 1387 class CachePolicies(object):
1362 def __init__(self, max_cache_size, min_free_space, max_items): 1388 def __init__(self, max_cache_size, min_free_space, max_items):
1363 """ 1389 """
1364 Arguments: 1390 Arguments:
1365 - max_cache_size: Trim if the cache gets larger than this value. If 0, the 1391 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1366 cache is effectively a leak. 1392 cache is effectively a leak.
1367 - min_free_space: Trim if disk free space becomes lower than this value. If 1393 - min_free_space: Trim if disk free space becomes lower than this value. If
1368 0, it unconditionally fill the disk. 1394 0, it unconditionally fill the disk.
1369 - max_items: Maximum number of items to keep in the cache. If 0, do not 1395 - max_items: Maximum number of items to keep in the cache. If 0, do not
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
1481 self._protected.add(digest) 1507 self._protected.add(digest)
1482 return True 1508 return True
1483 1509
1484 def evict(self, digest): 1510 def evict(self, digest):
1485 with self._lock: 1511 with self._lock:
1486 # Do not check for 'digest in self._protected' since it could be because 1512 # Do not check for 'digest in self._protected' since it could be because
1487 # the object is corrupted. 1513 # the object is corrupted.
1488 self._lru.pop(digest) 1514 self._lru.pop(digest)
1489 self._delete_file(digest, UNKNOWN_FILE_SIZE) 1515 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1490 1516
1491 def read(self, digest): 1517 def getfileobj(self, digest):
1492 try: 1518 try:
1493 with fs.open(self._path(digest), 'rb') as f: 1519 f = fs.open(self._path(digest), 'rb')
1494 return f.read() 1520 with self._lock:
1521 self._used.append(self._lru[digest])
1522 return f
1495 except IOError: 1523 except IOError:
1496 raise CacheMiss(digest) 1524 raise CacheMiss(digest)
1497 1525
1498 def write(self, digest, content): 1526 def write(self, digest, content):
1499 assert content is not None 1527 assert content is not None
1500 with self._lock: 1528 with self._lock:
1501 self._protected.add(digest) 1529 self._protected.add(digest)
1502 path = self._path(digest) 1530 path = self._path(digest)
1503 # A stale broken file may remain. It is possible for the file to have write 1531 # A stale broken file may remain. It is possible for the file to have write
1504 # access bit removed which would cause the file_write() call to fail to open 1532 # access bit removed which would cause the file_write() call to fail to open
(...skipping 10 matching lines...) Expand all
1515 file_path.try_remove(path) 1543 file_path.try_remove(path)
1516 raise 1544 raise
1517 # Make the file read-only in the cache. This has a few side-effects since 1545 # Make the file read-only in the cache. This has a few side-effects since
1518 # the file node is modified, so every directory entries to this file becomes 1546 # the file node is modified, so every directory entries to this file becomes
1519 # read-only. It's fine here because it is a new file. 1547 # read-only. It's fine here because it is a new file.
1520 file_path.set_read_only(path, True) 1548 file_path.set_read_only(path, True)
1521 with self._lock: 1549 with self._lock:
1522 self._add(digest, size) 1550 self._add(digest, size)
1523 return digest 1551 return digest
1524 1552
1525 def hardlink(self, digest, dest, file_mode):
1526 """Hardlinks the file to |dest|.
1527
1528 Note that the file permission bits are on the file node, not the directory
1529 entry, so changing the access bit on any of the directory entries for the
1530 file node will affect them all.
1531 """
1532 path = self._path(digest)
1533 if not file_path.link_file(dest, path, file_path.HARDLINK_WITH_FALLBACK):
1534 # Report to the server that it failed with more details. We'll want to
1535 # squash them all.
1536 on_error.report('Failed to hardlink\n%s -> %s' % (path, dest))
1537
1538 if file_mode is not None:
1539 # Ignores all other bits.
1540 fs.chmod(dest, file_mode & 0500)
1541 with self._lock:
1542 self._linked.append(self._lru[digest])
1543
1544 def _load(self): 1553 def _load(self):
1545 """Loads state of the cache from json file.""" 1554 """Loads state of the cache from json file."""
1546 self._lock.assert_locked() 1555 self._lock.assert_locked()
1547 1556
1548 if not os.path.isdir(self.cache_dir): 1557 if not os.path.isdir(self.cache_dir):
1549 fs.makedirs(self.cache_dir) 1558 fs.makedirs(self.cache_dir)
1550 else: 1559 else:
1551 # Make sure the cache is read-only. 1560 # Make sure the cache is read-only.
1552 # TODO(maruel): Calculate the cost and optimize the performance 1561 # TODO(maruel): Calculate the cost and optimize the performance
1553 # accordingly. 1562 # accordingly.
(...skipping 210 matching lines...) Expand 10 before | Expand all | Expand 10 after
1764 pending[h] = isolated_file 1773 pending[h] = isolated_file
1765 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH) 1774 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1766 1775
1767 # Start fetching root *.isolated file (single file, not the whole bundle). 1776 # Start fetching root *.isolated file (single file, not the whole bundle).
1768 retrieve_async(self.root) 1777 retrieve_async(self.root)
1769 1778
1770 while pending: 1779 while pending:
1771 # Wait until some *.isolated file is fetched, parse it. 1780 # Wait until some *.isolated file is fetched, parse it.
1772 item_hash = fetch_queue.wait(pending) 1781 item_hash = fetch_queue.wait(pending)
1773 item = pending.pop(item_hash) 1782 item = pending.pop(item_hash)
1774 item.load(fetch_queue.cache.read(item_hash)) 1783 item.load(fetch_queue.cache.getfileobj(item_hash).read())
1775 1784
1776 # Start fetching included *.isolated files. 1785 # Start fetching included *.isolated files.
1777 for new_child in item.children: 1786 for new_child in item.children:
1778 retrieve_async(new_child) 1787 retrieve_async(new_child)
1779 1788
1780 # Always fetch *.isolated files in traversal order, waiting if necessary 1789 # Always fetch *.isolated files in traversal order, waiting if necessary
1781 # until next to-be-processed node loads. "Waiting" is done by yielding 1790 # until next to-be-processed node loads. "Waiting" is done by yielding
1782 # back to the outer loop, that waits until some *.isolated is loaded. 1791 # back to the outer loop, that waits until some *.isolated is loaded.
1783 for node in isolated_format.walk_includes(self.root): 1792 for node in isolated_format.walk_includes(self.root):
1784 if node not in processed: 1793 if node not in processed:
(...skipping 17 matching lines...) Expand all
1802 """Starts fetching files from |isolated| that are not yet being fetched. 1811 """Starts fetching files from |isolated| that are not yet being fetched.
1803 1812
1804 Modifies self.files. 1813 Modifies self.files.
1805 """ 1814 """
1806 logging.debug('fetch_files(%s)', isolated.obj_hash) 1815 logging.debug('fetch_files(%s)', isolated.obj_hash)
1807 for filepath, properties in isolated.data.get('files', {}).iteritems(): 1816 for filepath, properties in isolated.data.get('files', {}).iteritems():
1808 # Root isolated has priority on the files being mapped. In particular, 1817 # Root isolated has priority on the files being mapped. In particular,
1809 # overridden files must not be fetched. 1818 # overridden files must not be fetched.
1810 if filepath not in self.files: 1819 if filepath not in self.files:
1811 self.files[filepath] = properties 1820 self.files[filepath] = properties
1821
1822 # Make sure if the isolated is read only, the mode doesn't have write
1823 # bits.
1824 if 'm' in properties and self.read_only:
1825 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1826
1827 # Preemptively request hashed files.
1812 if 'h' in properties: 1828 if 'h' in properties:
1813 # Preemptively request files.
1814 logging.debug('fetching %s', filepath) 1829 logging.debug('fetching %s', filepath)
1815 fetch_queue.add( 1830 fetch_queue.add(
1816 properties['h'], properties['s'], threading_utils.PRIORITY_MED) 1831 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1817 1832
1818 def _update_self(self, node): 1833 def _update_self(self, node):
1819 """Extracts bundle global parameters from loaded *.isolated file. 1834 """Extracts bundle global parameters from loaded *.isolated file.
1820 1835
1821 Will be called with each loaded *.isolated file in order of traversal of 1836 Will be called with each loaded *.isolated file in order of traversal of
1822 isolated include graph (see isolated_format.walk_includes). 1837 isolated include graph (see isolated_format.walk_includes).
1823 """ 1838 """
(...skipping 141 matching lines...) Expand 10 before | Expand all | Expand 10 after
1965 logging.info('Retrieving remaining files (%d of them)...', 1980 logging.info('Retrieving remaining files (%d of them)...',
1966 fetch_queue.pending_count) 1981 fetch_queue.pending_count)
1967 last_update = time.time() 1982 last_update = time.time()
1968 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: 1983 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1969 while remaining: 1984 while remaining:
1970 detector.ping() 1985 detector.ping()
1971 1986
1972 # Wait for any item to finish fetching to cache. 1987 # Wait for any item to finish fetching to cache.
1973 digest = fetch_queue.wait(remaining) 1988 digest = fetch_queue.wait(remaining)
1974 1989
1975 # Link corresponding files to a fetched item in cache. 1990 # Create the files in the destination using item in cache as the
1991 # source.
1976 for filepath, props in remaining.pop(digest): 1992 for filepath, props in remaining.pop(digest):
1977 dest = os.path.join(outdir, filepath) 1993 srcfileobj = cache.getfileobj(digest)
1978 if os.path.exists(dest): 1994
1979 raise AlreadyExists('File %s already exists' % dest) 1995 isarchive = props.get('a')
1980 cache.hardlink(digest, dest, props.get('m')) 1996 if isarchive:
1997 basedir = os.path.dirname(filepath)
1998
1999 extractor = None
2000 if isarchive == 'ar':
2001 extractor = arfile.ArFileReader(srcfileobj, fullparse=False)
2002 else:
2003 raise isolated_format.IsolatedError(
2004 'Unknown archive format %r', isarchive)
2005
2006 for ai, ifd in extractor:
2007 fp = os.path.normpath(os.path.join(basedir, ai.name))
2008 file_path.ensure_tree(os.path.dirname(fp))
2009 putfile(ifd, fp, ai.mode)
2010
2011 else:
2012 # Ignore all bits apart from the user
2013 putfile(
2014 srcfileobj, os.path.join(outdir, filepath), props.get('m'))
1981 2015
1982 # Report progress. 2016 # Report progress.
1983 duration = time.time() - last_update 2017 duration = time.time() - last_update
1984 if duration > DELAY_BETWEEN_UPDATES_IN_SECS: 2018 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1985 msg = '%d files remaining...' % len(remaining) 2019 msg = '%d files remaining...' % len(remaining)
1986 print msg 2020 print msg
1987 logging.info(msg) 2021 logging.info(msg)
1988 last_update = time.time() 2022 last_update = time.time()
1989 2023
1990 # Cache could evict some items we just tried to fetch, it's a fatal error. 2024 # Cache could evict some items we just tried to fetch, it's a fatal error.
(...skipping 316 matching lines...) Expand 10 before | Expand all | Expand 10 after
2307 dispatcher = subcommand.CommandDispatcher(__name__) 2341 dispatcher = subcommand.CommandDispatcher(__name__)
2308 return dispatcher.execute(OptionParserIsolateServer(), args) 2342 return dispatcher.execute(OptionParserIsolateServer(), args)
2309 2343
2310 2344
2311 if __name__ == '__main__': 2345 if __name__ == '__main__':
2312 subprocess42.inhibit_os_error_reporting() 2346 subprocess42.inhibit_os_error_reporting()
2313 fix_encoding.fix_encoding() 2347 fix_encoding.fix_encoding()
2314 tools.disable_buffering() 2348 tools.disable_buffering()
2315 colorama.init() 2349 colorama.init()
2316 sys.exit(main(sys.argv[1:])) 2350 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « client/isolated_format.py ('k') | client/run_isolated.py » ('j') | client/run_isolated.py » ('J')

Powered by Google App Engine
This is Rietveld 408576698