Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(294)

Side by Side Diff: client/isolateserver.py

Issue 2186263002: luci-py: Refactor file writing code to allow file objects. (Closed) Base URL: https://github.com/luci/luci-py.git@master
Patch Set: Rebase Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/cipd.py ('k') | client/run_isolated.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The LUCI Authors. All rights reserved. 2 # Copyright 2013 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 """Archives a set of files or directories to an Isolate Server.""" 6 """Archives a set of files or directories to an Isolate Server."""
7 7
8 __version__ = '0.5.1' 8 __version__ = '0.6.0'
9 9
10 import base64 10 import base64
11 import errno
11 import functools 12 import functools
12 import errno 13 import io
13 import logging 14 import logging
14 import optparse 15 import optparse
15 import os 16 import os
16 import re 17 import re
17 import signal 18 import signal
19 import stat
18 import sys 20 import sys
19 import tempfile 21 import tempfile
20 import threading 22 import threading
21 import time 23 import time
22 import types 24 import types
23 import zlib 25 import zlib
24 26
25 from third_party import colorama 27 from third_party import colorama
26 from third_party.depot_tools import fix_encoding 28 from third_party.depot_tools import fix_encoding
27 from third_party.depot_tools import subcommand 29 from third_party.depot_tools import subcommand
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
142 """ 144 """
143 file_path.ensure_tree(os.path.dirname(path)) 145 file_path.ensure_tree(os.path.dirname(path))
144 total = 0 146 total = 0
145 with fs.open(path, 'wb') as f: 147 with fs.open(path, 'wb') as f:
146 for d in content_generator: 148 for d in content_generator:
147 total += len(d) 149 total += len(d)
148 f.write(d) 150 f.write(d)
149 return total 151 return total
150 152
151 153
154 def fileobj_path(fileobj):
155 """Return file system path for file like object or None.
156
157 The returned path is guaranteed to exist and can be passed to file system
158 operations like copy.
159 """
160 name = getattr(fileobj, 'name', None)
161 if name is None:
162 return
163
164 # If the file like object was created using something like open("test.txt")
165 # name will end up being a str (such as a function outside our control, like
166 # the standard library). We want all our paths to be unicode objects, so we
167 # decode it.
168 if not isinstance(name, unicode):
169 name = name.decode(sys.getfilesystemencoding())
170
171 if fs.exists(name):
172 return name
173
174
175 # TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
176 # wrappers have been created.
177 def fileobj_copy(
178 dstfileobj, srcfileobj, size=-1,
179 chunk_size=isolated_format.DISK_FILE_CHUNK):
180 """Copy data from srcfileobj to dstfileobj.
181
182 Providing size means exactly that amount of data will be copied (if there
183 isn't enough data, an IOError exception is thrown). Otherwise all data until
184 the EOF marker will be copied.
185 """
186 if size == -1 and hasattr(srcfileobj, 'tell'):
187 if srcfileobj.tell() != 0:
188 raise IOError('partial file but not using size')
189
190 written = 0
191 while written != size:
192 readsize = chunk_size
193 if size > 0:
194 readsize = min(readsize, size-written)
195 data = srcfileobj.read(readsize)
196 if not data:
197 if size == -1:
198 break
199 raise IOError('partial file, got %s, wanted %s' % (written, size))
200 dstfileobj.write(data)
201 written += len(data)
202
203
204 def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
205 """Put srcfileobj at the given dstpath with given mode.
206
207 The function aims to do this as efficiently as possible while still allowing
208 any possible file like object be given.
209
210 Creating a tree of hardlinks has a few drawbacks:
211 - tmpfs cannot be used for the scratch space. The tree has to be on the same
212 partition as the cache.
213 - involves a write to the inode, which advances ctime, cause a metadata
214 writeback (causing disk seeking).
215 - cache ctime cannot be used to detect modifications / corruption.
216 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
217 partition. This is why the function automatically fallbacks to copying the
218 file content.
219 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
220 same owner is for all hardlinks.
221 - Anecdotal report that ext2 is known to be potentially faulty on high rate
222 of hardlink creation.
223
224 Creating a tree of symlinks has a few drawbacks:
225 - Tasks running the equivalent of os.path.realpath() will get the naked path
226 and may fail.
227 - Windows:
228 - Symlinks are reparse points:
229 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
230 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
231 - Symbolic links are Win32 paths, not NT paths.
232 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win 32-to-nt.html
233 - Symbolic links are supported on Windows 7 and later only.
234 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
235 default.
236 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
237 RID is present in the token;
238 https://msdn.microsoft.com/en-us/library/bb530410.aspx
239 """
240 srcpath = fileobj_path(srcfileobj)
241 if srcpath and size == -1:
242 readonly = file_mode is None or (
243 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
244
245 if readonly:
246 # If the file is read only we can link the file
247 if use_symlink:
248 link_mode = file_path.SYMLINK_WITH_FALLBACK
249 else:
250 link_mode = file_path.HARDLINK_WITH_FALLBACK
251 else:
252 # If not read only, we must copy the file
253 link_mode = file_path.COPY
254
255 file_path.link_file(dstpath, srcpath, link_mode)
256 else:
257 # Need to write out the file
258 with fs.open(dstpath, 'wb') as dstfileobj:
259 fileobj_copy(dstfileobj, srcfileobj, size)
260
261 assert fs.exists(dstpath)
262
263 # file_mode of 0 is actually valid, so need explicit check.
264 if file_mode is not None:
265 fs.chmod(dstpath, file_mode)
266
267
152 def zip_compress(content_generator, level=7): 268 def zip_compress(content_generator, level=7):
153 """Reads chunks from |content_generator| and yields zip compressed chunks.""" 269 """Reads chunks from |content_generator| and yields zip compressed chunks."""
154 compressor = zlib.compressobj(level) 270 compressor = zlib.compressobj(level)
155 for chunk in content_generator: 271 for chunk in content_generator:
156 compressed = compressor.compress(chunk) 272 compressed = compressor.compress(chunk)
157 if compressed: 273 if compressed:
158 yield compressed 274 yield compressed
159 tail = compressor.flush(zlib.Z_FINISH) 275 tail = compressor.flush(zlib.Z_FINISH)
160 if tail: 276 if tail:
161 yield tail 277 yield tail
(...skipping 1054 matching lines...) Expand 10 before | Expand all | Expand 10 after
1216 """ 1332 """
1217 cache_dir = None 1333 cache_dir = None
1218 1334
1219 def __init__(self): 1335 def __init__(self):
1220 self._lock = threading_utils.LockWithAssert() 1336 self._lock = threading_utils.LockWithAssert()
1221 # Profiling values. 1337 # Profiling values.
1222 self._added = [] 1338 self._added = []
1223 self._initial_number_items = 0 1339 self._initial_number_items = 0
1224 self._initial_size = 0 1340 self._initial_size = 0
1225 self._evicted = [] 1341 self._evicted = []
1226 self._linked = [] 1342 self._used = []
1227 1343
1228 def __contains__(self, digest): 1344 def __contains__(self, digest):
1229 raise NotImplementedError() 1345 raise NotImplementedError()
1230 1346
1231 def __enter__(self): 1347 def __enter__(self):
1232 """Context manager interface.""" 1348 """Context manager interface."""
1233 return self 1349 return self
1234 1350
1235 def __exit__(self, _exc_type, _exec_value, _traceback): 1351 def __exit__(self, _exc_type, _exec_value, _traceback):
1236 """Context manager interface.""" 1352 """Context manager interface."""
1237 return False 1353 return False
1238 1354
1239 @property 1355 @property
1240 def added(self): 1356 def added(self):
1241 return self._added[:] 1357 return self._added[:]
1242 1358
1243 @property 1359 @property
1244 def evicted(self): 1360 def evicted(self):
1245 return self._evicted[:] 1361 return self._evicted[:]
1246 1362
1247 @property 1363 @property
1364 def used(self):
1365 return self._used[:]
1366
1367 @property
1248 def initial_number_items(self): 1368 def initial_number_items(self):
1249 return self._initial_number_items 1369 return self._initial_number_items
1250 1370
1251 @property 1371 @property
1252 def initial_size(self): 1372 def initial_size(self):
1253 return self._initial_size 1373 return self._initial_size
1254 1374
1255 @property
1256 def linked(self):
1257 return self._linked[:]
1258
1259 def cached_set(self): 1375 def cached_set(self):
1260 """Returns a set of all cached digests (always a new object).""" 1376 """Returns a set of all cached digests (always a new object)."""
1261 raise NotImplementedError() 1377 raise NotImplementedError()
1262 1378
1263 def cleanup(self): 1379 def cleanup(self):
1264 """Deletes any corrupted item from the cache and trims it if necessary.""" 1380 """Deletes any corrupted item from the cache and trims it if necessary."""
1265 raise NotImplementedError() 1381 raise NotImplementedError()
1266 1382
1267 def touch(self, digest, size): 1383 def touch(self, digest, size):
1268 """Ensures item is not corrupted and updates its LRU position. 1384 """Ensures item is not corrupted and updates its LRU position.
1269 1385
1270 Arguments: 1386 Arguments:
1271 digest: hash digest of item to check. 1387 digest: hash digest of item to check.
1272 size: expected size of this item. 1388 size: expected size of this item.
1273 1389
1274 Returns: 1390 Returns:
1275 True if item is in cache and not corrupted. 1391 True if item is in cache and not corrupted.
1276 """ 1392 """
1277 raise NotImplementedError() 1393 raise NotImplementedError()
1278 1394
1279 def evict(self, digest): 1395 def evict(self, digest):
1280 """Removes item from cache if it's there.""" 1396 """Removes item from cache if it's there."""
1281 raise NotImplementedError() 1397 raise NotImplementedError()
1282 1398
1283 def read(self, digest): 1399 def getfileobj(self, digest):
1284 """Returns contents of the cached item as a single str.""" 1400 """Returns a readable file like object.
1401
1402 If file exists on the file system it will have a .name attribute with an
1403 absolute path to the file.
1404 """
1285 raise NotImplementedError() 1405 raise NotImplementedError()
1286 1406
1287 def write(self, digest, content): 1407 def write(self, digest, content):
1288 """Reads data from |content| generator and stores it in cache. 1408 """Reads data from |content| generator and stores it in cache.
1289 1409
1290 Returns digest to simplify chaining. 1410 Returns digest to simplify chaining.
1291 """ 1411 """
1292 raise NotImplementedError() 1412 raise NotImplementedError()
1293 1413
1294 def link(self, digest, dest, file_mode, use_symlink):
1295 """Ensures file at |dest| has same content as cached |digest|.
1296
1297 If file_mode is provided, it is used to set the executable bit if
1298 applicable.
1299
1300 The function may copy the content, create a hardlink and if use_symlink is
1301 True, create a symlink if possible.
1302
1303 Creating a tree of hardlinks has a few drawbacks:
1304 - tmpfs cannot be used for the scratch space. The tree has to be on the same
1305 partition as the cache.
1306 - involves a write to the inode, which advances ctime, cause a metadata
1307 writeback (causing disk seeking).
1308 - cache ctime cannot be used to detect modifications / corruption.
1309 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
1310 partition. This is why the function automatically fallbacks to copying the
1311 file content.
1312 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
1313 same owner is for all hardlinks.
1314 - Anecdotal report that ext2 is known to be potentially faulty on high rate
1315 of hardlink creation.
1316
1317 Creating a tree of symlinks has a few drawbacks:
1318 - Tasks running the equivalent of os.path.realpath() will get the naked path
1319 and may fail.
1320 - Windows:
1321 - Symlinks are reparse points:
1322 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
1323 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
1324 - Symbolic links are Win32 paths, not NT paths.
1325 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-w in32-to-nt.html
1326 - Symbolic links are supported on Windows 7 and later only.
1327 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
1328 default.
1329 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
1330 RID is present in the token;
1331 https://msdn.microsoft.com/en-us/library/bb530410.aspx
1332 """
1333 raise NotImplementedError()
1334
1335 1414
1336 class MemoryCache(LocalCache): 1415 class MemoryCache(LocalCache):
1337 """LocalCache implementation that stores everything in memory.""" 1416 """LocalCache implementation that stores everything in memory."""
1338 1417
1339 def __init__(self, file_mode_mask=0500): 1418 def __init__(self, file_mode_mask=0500):
1340 """Args: 1419 """Args:
1341 file_mode_mask: bit mask to AND file mode with. Default value will make 1420 file_mode_mask: bit mask to AND file mode with. Default value will make
1342 all mapped files to be read only. 1421 all mapped files to be read only.
1343 """ 1422 """
1344 super(MemoryCache, self).__init__() 1423 super(MemoryCache, self).__init__()
(...skipping 14 matching lines...) Expand all
1359 def touch(self, digest, size): 1438 def touch(self, digest, size):
1360 with self._lock: 1439 with self._lock:
1361 return digest in self._contents 1440 return digest in self._contents
1362 1441
1363 def evict(self, digest): 1442 def evict(self, digest):
1364 with self._lock: 1443 with self._lock:
1365 v = self._contents.pop(digest, None) 1444 v = self._contents.pop(digest, None)
1366 if v is not None: 1445 if v is not None:
1367 self._evicted.add(v) 1446 self._evicted.add(v)
1368 1447
1369 def read(self, digest): 1448 def getfileobj(self, digest):
1370 with self._lock: 1449 with self._lock:
1371 try: 1450 try:
1372 return self._contents[digest] 1451 d = self._contents[digest]
1373 except KeyError: 1452 except KeyError:
1374 raise CacheMiss(digest) 1453 raise CacheMiss(digest)
1454 self._used.append(len(d))
1455 return io.BytesIO(d)
1375 1456
1376 def write(self, digest, content): 1457 def write(self, digest, content):
1377 # Assemble whole stream before taking the lock. 1458 # Assemble whole stream before taking the lock.
1378 data = ''.join(content) 1459 data = ''.join(content)
1379 with self._lock: 1460 with self._lock:
1380 self._contents[digest] = data 1461 self._contents[digest] = data
1381 self._added.append(len(data)) 1462 self._added.append(len(data))
1382 return digest 1463 return digest
1383 1464
1384 def link(self, digest, dest, file_mode, use_symlink):
1385 """Since data is kept in memory, there is no filenode to hardlink/symlink.
1386 """
1387 data = self.read(digest)
1388 file_write(dest, [data])
1389 if file_mode is not None:
1390 fs.chmod(dest, file_mode & self._file_mode_mask)
1391 with self._lock:
1392 self._linked.append(len(data))
1393
1394 1465
1395 class CachePolicies(object): 1466 class CachePolicies(object):
1396 def __init__(self, max_cache_size, min_free_space, max_items): 1467 def __init__(self, max_cache_size, min_free_space, max_items):
1397 """ 1468 """
1398 Arguments: 1469 Arguments:
1399 - max_cache_size: Trim if the cache gets larger than this value. If 0, the 1470 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1400 cache is effectively a leak. 1471 cache is effectively a leak.
1401 - min_free_space: Trim if disk free space becomes lower than this value. If 1472 - min_free_space: Trim if disk free space becomes lower than this value. If
1402 0, it unconditionally fill the disk. 1473 0, it unconditionally fill the disk.
1403 - max_items: Maximum number of items to keep in the cache. If 0, do not 1474 - max_items: Maximum number of items to keep in the cache. If 0, do not
(...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after
1550 self._protected = self._protected or digest 1621 self._protected = self._protected or digest
1551 return True 1622 return True
1552 1623
1553 def evict(self, digest): 1624 def evict(self, digest):
1554 with self._lock: 1625 with self._lock:
1555 # Do not check for 'digest == self._protected' since it could be because 1626 # Do not check for 'digest == self._protected' since it could be because
1556 # the object is corrupted. 1627 # the object is corrupted.
1557 self._lru.pop(digest) 1628 self._lru.pop(digest)
1558 self._delete_file(digest, UNKNOWN_FILE_SIZE) 1629 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1559 1630
1560 def read(self, digest): 1631 def getfileobj(self, digest):
1561 try: 1632 try:
1562 with fs.open(self._path(digest), 'rb') as f: 1633 f = fs.open(self._path(digest), 'rb')
1563 return f.read() 1634 with self._lock:
1635 self._used.append(self._lru[digest])
1636 return f
1564 except IOError: 1637 except IOError:
1565 raise CacheMiss(digest) 1638 raise CacheMiss(digest)
1566 1639
1567 def write(self, digest, content): 1640 def write(self, digest, content):
1568 assert content is not None 1641 assert content is not None
1569 with self._lock: 1642 with self._lock:
1570 self._protected = self._protected or digest 1643 self._protected = self._protected or digest
1571 path = self._path(digest) 1644 path = self._path(digest)
1572 # A stale broken file may remain. It is possible for the file to have write 1645 # A stale broken file may remain. It is possible for the file to have write
1573 # access bit removed which would cause the file_write() call to fail to open 1646 # access bit removed which would cause the file_write() call to fail to open
(...skipping 10 matching lines...) Expand all
1584 file_path.try_remove(path) 1657 file_path.try_remove(path)
1585 raise 1658 raise
1586 # Make the file read-only in the cache. This has a few side-effects since 1659 # Make the file read-only in the cache. This has a few side-effects since
1587 # the file node is modified, so every directory entries to this file becomes 1660 # the file node is modified, so every directory entries to this file becomes
1588 # read-only. It's fine here because it is a new file. 1661 # read-only. It's fine here because it is a new file.
1589 file_path.set_read_only(path, True) 1662 file_path.set_read_only(path, True)
1590 with self._lock: 1663 with self._lock:
1591 self._add(digest, size) 1664 self._add(digest, size)
1592 return digest 1665 return digest
1593 1666
1594 def link(self, digest, dest, file_mode, use_symlink):
1595 """Links the file to |dest|.
1596
1597 Note that the file permission bits are on the file node, not the directory
1598 entry, so changing the access bit on any of the directory entries for the
1599 file node will affect them all.
1600 """
1601 path = self._path(digest)
1602 mode = (
1603 file_path.SYMLINK_WITH_FALLBACK if use_symlink
1604 else file_path.HARDLINK_WITH_FALLBACK)
1605 if not file_path.link_file(dest, path, mode):
1606 # Report to the server that it failed with more details. We'll want to
1607 # squash them all.
1608 on_error.report('Failed to link\n%s -> %s' % (path, dest))
1609
1610 if file_mode is not None:
1611 # Ignores all other bits.
1612 fs.chmod(dest, file_mode & 0500)
1613 with self._lock:
1614 self._linked.append(self._lru[digest])
1615
1616 def _load(self): 1667 def _load(self):
1617 """Loads state of the cache from json file. 1668 """Loads state of the cache from json file.
1618 1669
1619 If cache_dir does not exist on disk, it is created. 1670 If cache_dir does not exist on disk, it is created.
1620 """ 1671 """
1621 self._lock.assert_locked() 1672 self._lock.assert_locked()
1622 1673
1623 if not fs.isfile(self.state_file): 1674 if not fs.isfile(self.state_file):
1624 if not os.path.isdir(self.cache_dir): 1675 if not os.path.isdir(self.cache_dir):
1625 fs.makedirs(self.cache_dir) 1676 fs.makedirs(self.cache_dir)
(...skipping 171 matching lines...) Expand 10 before | Expand all | Expand 10 after
1797 pending[h] = isolated_file 1848 pending[h] = isolated_file
1798 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH) 1849 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1799 1850
1800 # Start fetching root *.isolated file (single file, not the whole bundle). 1851 # Start fetching root *.isolated file (single file, not the whole bundle).
1801 retrieve_async(self.root) 1852 retrieve_async(self.root)
1802 1853
1803 while pending: 1854 while pending:
1804 # Wait until some *.isolated file is fetched, parse it. 1855 # Wait until some *.isolated file is fetched, parse it.
1805 item_hash = fetch_queue.wait(pending) 1856 item_hash = fetch_queue.wait(pending)
1806 item = pending.pop(item_hash) 1857 item = pending.pop(item_hash)
1807 item.load(fetch_queue.cache.read(item_hash)) 1858 with fetch_queue.cache.getfileobj(item_hash) as f:
1859 item.load(f.read())
1808 1860
1809 # Start fetching included *.isolated files. 1861 # Start fetching included *.isolated files.
1810 for new_child in item.children: 1862 for new_child in item.children:
1811 retrieve_async(new_child) 1863 retrieve_async(new_child)
1812 1864
1813 # Always fetch *.isolated files in traversal order, waiting if necessary 1865 # Always fetch *.isolated files in traversal order, waiting if necessary
1814 # until next to-be-processed node loads. "Waiting" is done by yielding 1866 # until next to-be-processed node loads. "Waiting" is done by yielding
1815 # back to the outer loop, that waits until some *.isolated is loaded. 1867 # back to the outer loop, that waits until some *.isolated is loaded.
1816 for node in isolated_format.walk_includes(self.root): 1868 for node in isolated_format.walk_includes(self.root):
1817 if node not in processed: 1869 if node not in processed:
(...skipping 17 matching lines...) Expand all
1835 """Starts fetching files from |isolated| that are not yet being fetched. 1887 """Starts fetching files from |isolated| that are not yet being fetched.
1836 1888
1837 Modifies self.files. 1889 Modifies self.files.
1838 """ 1890 """
1839 logging.debug('fetch_files(%s)', isolated.obj_hash) 1891 logging.debug('fetch_files(%s)', isolated.obj_hash)
1840 for filepath, properties in isolated.data.get('files', {}).iteritems(): 1892 for filepath, properties in isolated.data.get('files', {}).iteritems():
1841 # Root isolated has priority on the files being mapped. In particular, 1893 # Root isolated has priority on the files being mapped. In particular,
1842 # overridden files must not be fetched. 1894 # overridden files must not be fetched.
1843 if filepath not in self.files: 1895 if filepath not in self.files:
1844 self.files[filepath] = properties 1896 self.files[filepath] = properties
1897
1898 # Make sure if the isolated is read only, the mode doesn't have write
1899 # bits.
1900 if 'm' in properties and self.read_only:
1901 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1902
1903 # Preemptively request hashed files.
1845 if 'h' in properties: 1904 if 'h' in properties:
1846 # Preemptively request files.
1847 logging.debug('fetching %s', filepath) 1905 logging.debug('fetching %s', filepath)
1848 fetch_queue.add( 1906 fetch_queue.add(
1849 properties['h'], properties['s'], threading_utils.PRIORITY_MED) 1907 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1850 1908
1851 def _update_self(self, node): 1909 def _update_self(self, node):
1852 """Extracts bundle global parameters from loaded *.isolated file. 1910 """Extracts bundle global parameters from loaded *.isolated file.
1853 1911
1854 Will be called with each loaded *.isolated file in order of traversal of 1912 Will be called with each loaded *.isolated file in order of traversal of
1855 isolated include graph (see isolated_format.walk_includes). 1913 isolated include graph (see isolated_format.walk_includes).
1856 """ 1914 """
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after
2000 logging.info('Retrieving remaining files (%d of them)...', 2058 logging.info('Retrieving remaining files (%d of them)...',
2001 fetch_queue.pending_count) 2059 fetch_queue.pending_count)
2002 last_update = time.time() 2060 last_update = time.time()
2003 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: 2061 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
2004 while remaining: 2062 while remaining:
2005 detector.ping() 2063 detector.ping()
2006 2064
2007 # Wait for any item to finish fetching to cache. 2065 # Wait for any item to finish fetching to cache.
2008 digest = fetch_queue.wait(remaining) 2066 digest = fetch_queue.wait(remaining)
2009 2067
2010 # Link corresponding files to a fetched item in cache. 2068 # Create the files in the destination using item in cache as the
2069 # source.
2011 for filepath, props in remaining.pop(digest): 2070 for filepath, props in remaining.pop(digest):
2012 dest = os.path.join(outdir, filepath) 2071 fullpath = os.path.join(outdir, filepath)
2013 if os.path.exists(dest): 2072
2014 raise AlreadyExists('File %s already exists' % dest) 2073 with cache.getfileobj(digest) as srcfileobj:
2015 cache.link(digest, dest, props.get('m'), use_symlinks) 2074 file_mode = props.get('m')
2075 if file_mode:
2076 # Ignore all bits apart from the user
2077 file_mode &= 0700
2078 putfile(
2079 srcfileobj, fullpath, file_mode,
2080 use_symlink=use_symlinks)
2016 2081
2017 # Report progress. 2082 # Report progress.
2018 duration = time.time() - last_update 2083 duration = time.time() - last_update
2019 if duration > DELAY_BETWEEN_UPDATES_IN_SECS: 2084 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
2020 msg = '%d files remaining...' % len(remaining) 2085 msg = '%d files remaining...' % len(remaining)
2021 print msg 2086 print msg
2022 logging.info(msg) 2087 logging.info(msg)
2023 last_update = time.time() 2088 last_update = time.time()
2024 2089
2025 # Cache could evict some items we just tried to fetch, it's a fatal error. 2090 # Cache could evict some items we just tried to fetch, it's a fatal error.
(...skipping 324 matching lines...) Expand 10 before | Expand all | Expand 10 after
2350 return dispatcher.execute(OptionParserIsolateServer(), args) 2415 return dispatcher.execute(OptionParserIsolateServer(), args)
2351 2416
2352 2417
2353 if __name__ == '__main__': 2418 if __name__ == '__main__':
2354 subprocess42.inhibit_os_error_reporting() 2419 subprocess42.inhibit_os_error_reporting()
2355 fix_encoding.fix_encoding() 2420 fix_encoding.fix_encoding()
2356 tools.disable_buffering() 2421 tools.disable_buffering()
2357 colorama.init() 2422 colorama.init()
2358 file_path.enable_symlink() 2423 file_path.enable_symlink()
2359 sys.exit(main(sys.argv[1:])) 2424 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « client/cipd.py ('k') | client/run_isolated.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698