Index: swarm_client/isolateserver.py |
=================================================================== |
--- swarm_client/isolateserver.py (revision 235167) |
+++ swarm_client/isolateserver.py (working copy) |
@@ -1,1664 +0,0 @@ |
-#!/usr/bin/env python |
-# Copyright 2013 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-"""Archives a set of files to a server.""" |
- |
-__version__ = '0.2' |
- |
-import functools |
-import hashlib |
-import json |
-import logging |
-import os |
-import re |
-import sys |
-import threading |
-import time |
-import urllib |
-import zlib |
- |
-from third_party import colorama |
-from third_party.depot_tools import fix_encoding |
-from third_party.depot_tools import subcommand |
- |
-from utils import net |
-from utils import threading_utils |
-from utils import tools |
- |
- |
-# Version of isolate protocol passed to the server in /handshake request. |
-ISOLATE_PROTOCOL_VERSION = '1.0' |
- |
- |
-# The number of files to check the isolate server per /pre-upload query. |
-# All files are sorted by likelihood of a change in the file content |
-# (currently file size is used to estimate this: larger the file -> larger the |
-# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files |
-# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1], |
-# and so on. Numbers here is a trade-off; the more per request, the lower the |
-# effect of HTTP round trip latency and TCP-level chattiness. On the other hand, |
-# larger values cause longer lookups, increasing the initial latency to start |
-# uploading, which is especially an issue for large files. This value is |
-# optimized for the "few thousands files to look up with minimal number of large |
-# files missing" case. |
-ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] |
- |
- |
-# A list of already compressed extension types that should not receive any |
-# compression before being uploaded. |
-ALREADY_COMPRESSED_TYPES = [ |
- '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', |
- 'wav', 'zip' |
-] |
- |
- |
-# The file size to be used when we don't know the correct file size, |
-# generally used for .isolated files. |
-UNKNOWN_FILE_SIZE = None |
- |
- |
-# The size of each chunk to read when downloading and unzipping files. |
-ZIPPED_FILE_CHUNK = 16 * 1024 |
- |
-# Chunk size to use when doing disk I/O. |
-DISK_FILE_CHUNK = 1024 * 1024 |
- |
-# Chunk size to use when reading from network stream. |
-NET_IO_FILE_CHUNK = 16 * 1024 |
- |
- |
-# Read timeout in seconds for downloads from isolate storage. If there's no |
-# response from the server within this timeout whole download will be aborted. |
-DOWNLOAD_READ_TIMEOUT = 60 |
- |
-# Maximum expected delay (in seconds) between successive file fetches |
-# in run_tha_test. If it takes longer than that, a deadlock might be happening |
-# and all stack frames for all threads are dumped to log. |
-DEADLOCK_TIMEOUT = 5 * 60 |
- |
- |
-# The delay (in seconds) to wait between logging statements when retrieving |
-# the required files. This is intended to let the user (or buildbot) know that |
-# the program is still running. |
-DELAY_BETWEEN_UPDATES_IN_SECS = 30 |
- |
- |
-# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly |
-# specify the names here. |
-SUPPORTED_ALGOS = { |
- 'md5': hashlib.md5, |
- 'sha-1': hashlib.sha1, |
- 'sha-512': hashlib.sha512, |
-} |
- |
- |
-# Used for serialization. |
-SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems()) |
- |
- |
-class ConfigError(ValueError): |
- """Generic failure to load a .isolated file.""" |
- pass |
- |
- |
-class MappingError(OSError): |
- """Failed to recreate the tree.""" |
- pass |
- |
- |
-def is_valid_hash(value, algo): |
- """Returns if the value is a valid hash for the corresponding algorithm.""" |
- size = 2 * algo().digest_size |
- return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) |
- |
- |
-def hash_file(filepath, algo): |
- """Calculates the hash of a file without reading it all in memory at once. |
- |
- |algo| should be one of hashlib hashing algorithm. |
- """ |
- digest = algo() |
- with open(filepath, 'rb') as f: |
- while True: |
- chunk = f.read(DISK_FILE_CHUNK) |
- if not chunk: |
- break |
- digest.update(chunk) |
- return digest.hexdigest() |
- |
- |
-def stream_read(stream, chunk_size): |
- """Reads chunks from |stream| and yields them.""" |
- while True: |
- data = stream.read(chunk_size) |
- if not data: |
- break |
- yield data |
- |
- |
-def file_read(filepath, chunk_size=DISK_FILE_CHUNK): |
- """Yields file content in chunks of given |chunk_size|.""" |
- with open(filepath, 'rb') as f: |
- while True: |
- data = f.read(chunk_size) |
- if not data: |
- break |
- yield data |
- |
- |
-def file_write(filepath, content_generator): |
- """Writes file content as generated by content_generator. |
- |
- Creates the intermediary directory as needed. |
- |
- Returns the number of bytes written. |
- |
- Meant to be mocked out in unit tests. |
- """ |
- filedir = os.path.dirname(filepath) |
- if not os.path.isdir(filedir): |
- os.makedirs(filedir) |
- total = 0 |
- with open(filepath, 'wb') as f: |
- for d in content_generator: |
- total += len(d) |
- f.write(d) |
- return total |
- |
- |
-def zip_compress(content_generator, level=7): |
- """Reads chunks from |content_generator| and yields zip compressed chunks.""" |
- compressor = zlib.compressobj(level) |
- for chunk in content_generator: |
- compressed = compressor.compress(chunk) |
- if compressed: |
- yield compressed |
- tail = compressor.flush(zlib.Z_FINISH) |
- if tail: |
- yield tail |
- |
- |
-def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK): |
- """Reads zipped data from |content_generator| and yields decompressed data. |
- |
- Decompresses data in small chunks (no larger than |chunk_size|) so that |
- zip bomb file doesn't cause zlib to preallocate huge amount of memory. |
- |
- Raises IOError if data is corrupted or incomplete. |
- """ |
- decompressor = zlib.decompressobj() |
- compressed_size = 0 |
- try: |
- for chunk in content_generator: |
- compressed_size += len(chunk) |
- data = decompressor.decompress(chunk, chunk_size) |
- if data: |
- yield data |
- while decompressor.unconsumed_tail: |
- data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size) |
- if data: |
- yield data |
- tail = decompressor.flush() |
- if tail: |
- yield tail |
- except zlib.error as e: |
- raise IOError( |
- 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e)) |
- # Ensure all data was read and decompressed. |
- if decompressor.unused_data or decompressor.unconsumed_tail: |
- raise IOError('Not all data was decompressed') |
- |
- |
-def get_zip_compression_level(filename): |
- """Given a filename calculates the ideal zip compression level to use.""" |
- file_ext = os.path.splitext(filename)[1].lower() |
- # TODO(csharp): Profile to find what compression level works best. |
- return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 |
- |
- |
-def create_directories(base_directory, files): |
- """Creates the directory structure needed by the given list of files.""" |
- logging.debug('create_directories(%s, %d)', base_directory, len(files)) |
- # Creates the tree of directories to create. |
- directories = set(os.path.dirname(f) for f in files) |
- for item in list(directories): |
- while item: |
- directories.add(item) |
- item = os.path.dirname(item) |
- for d in sorted(directories): |
- if d: |
- os.mkdir(os.path.join(base_directory, d)) |
- |
- |
-def create_links(base_directory, files): |
- """Creates any links needed by the given set of files.""" |
- for filepath, properties in files: |
- if 'l' not in properties: |
- continue |
- if sys.platform == 'win32': |
- # TODO(maruel): Create junctions or empty text files similar to what |
- # cygwin do? |
- logging.warning('Ignoring symlink %s', filepath) |
- continue |
- outfile = os.path.join(base_directory, filepath) |
- # symlink doesn't exist on Windows. So the 'link' property should |
- # never be specified for windows .isolated file. |
- os.symlink(properties['l'], outfile) # pylint: disable=E1101 |
- if 'm' in properties: |
- lchmod = getattr(os, 'lchmod', None) |
- if lchmod: |
- lchmod(outfile, properties['m']) |
- |
- |
-def is_valid_file(filepath, size): |
- """Determines if the given files appears valid. |
- |
- Currently it just checks the file's size. |
- """ |
- if size == UNKNOWN_FILE_SIZE: |
- return os.path.isfile(filepath) |
- actual_size = os.stat(filepath).st_size |
- if size != actual_size: |
- logging.warning( |
- 'Found invalid item %s; %d != %d', |
- os.path.basename(filepath), actual_size, size) |
- return False |
- return True |
- |
- |
-class WorkerPool(threading_utils.AutoRetryThreadPool): |
- """Thread pool that automatically retries on IOError and runs a preconfigured |
- function. |
- """ |
- # Initial and maximum number of worker threads. |
- INITIAL_WORKERS = 2 |
- MAX_WORKERS = 16 |
- RETRIES = 5 |
- |
- def __init__(self): |
- super(WorkerPool, self).__init__( |
- [IOError], |
- self.RETRIES, |
- self.INITIAL_WORKERS, |
- self.MAX_WORKERS, |
- 0, |
- 'remote') |
- |
- |
-class Item(object): |
- """An item to push to Storage. |
- |
- It starts its life in a main thread, travels to 'contains' thread, then to |
- 'push' thread and then finally back to the main thread. |
- |
- It is never used concurrently from multiple threads. |
- """ |
- |
- def __init__(self, digest, size, is_isolated=False): |
- self.digest = digest |
- self.size = size |
- self.is_isolated = is_isolated |
- self.compression_level = 6 |
- self.push_state = None |
- |
- def content(self, chunk_size): |
- """Iterable with content of this item in chunks of given size. |
- |
- Arguments: |
- chunk_size: preferred size of the chunk to produce, may be ignored. |
- """ |
- raise NotImplementedError() |
- |
- |
-class FileItem(Item): |
- """A file to push to Storage.""" |
- |
- def __init__(self, path, digest, size, is_isolated): |
- super(FileItem, self).__init__(digest, size, is_isolated) |
- self.path = path |
- self.compression_level = get_zip_compression_level(path) |
- |
- def content(self, chunk_size): |
- return file_read(self.path, chunk_size) |
- |
- |
-class BufferItem(Item): |
- """A byte buffer to push to Storage.""" |
- |
- def __init__(self, buf, algo, is_isolated=False): |
- super(BufferItem, self).__init__( |
- algo(buf).hexdigest(), len(buf), is_isolated) |
- self.buffer = buf |
- |
- def content(self, _chunk_size): |
- return [self.buffer] |
- |
- |
-class Storage(object): |
- """Efficiently downloads or uploads large set of files via StorageApi.""" |
- |
- def __init__(self, storage_api, use_zip): |
- self.use_zip = use_zip |
- self._storage_api = storage_api |
- self._cpu_thread_pool = None |
- self._net_thread_pool = None |
- |
- @property |
- def cpu_thread_pool(self): |
- """ThreadPool for CPU-bound tasks like zipping.""" |
- if self._cpu_thread_pool is None: |
- self._cpu_thread_pool = threading_utils.ThreadPool( |
- 2, max(threading_utils.num_processors(), 2), 0, 'zip') |
- return self._cpu_thread_pool |
- |
- @property |
- def net_thread_pool(self): |
- """AutoRetryThreadPool for IO-bound tasks, retries IOError.""" |
- if self._net_thread_pool is None: |
- self._net_thread_pool = WorkerPool() |
- return self._net_thread_pool |
- |
- def close(self): |
- """Waits for all pending tasks to finish.""" |
- if self._cpu_thread_pool: |
- self._cpu_thread_pool.join() |
- self._cpu_thread_pool.close() |
- self._cpu_thread_pool = None |
- if self._net_thread_pool: |
- self._net_thread_pool.join() |
- self._net_thread_pool.close() |
- self._net_thread_pool = None |
- |
- def __enter__(self): |
- """Context manager interface.""" |
- return self |
- |
- def __exit__(self, _exc_type, _exc_value, _traceback): |
- """Context manager interface.""" |
- self.close() |
- return False |
- |
- def upload_tree(self, indir, infiles): |
- """Uploads the given tree to the isolate server. |
- |
- Arguments: |
- indir: root directory the infiles are based in. |
- infiles: dict of files to upload from |indir|. |
- |
- Returns: |
- List of items that were uploaded. All other items are already there. |
- """ |
- logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles)) |
- |
- # Convert |indir| + |infiles| into a list of FileItem objects. |
- # Filter out symlinks, since they are not represented by items on isolate |
- # server side. |
- items = [ |
- FileItem( |
- path=os.path.join(indir, filepath), |
- digest=metadata['h'], |
- size=metadata['s'], |
- is_isolated=metadata.get('priority') == '0') |
- for filepath, metadata in infiles.iteritems() |
- if 'l' not in metadata |
- ] |
- |
- return self.upload_items(items) |
- |
- def upload_items(self, items): |
- """Uploads bunch of items to the isolate server. |
- |
- Will upload only items that are missing. |
- |
- Arguments: |
- items: list of Item instances that represents data to upload. |
- |
- Returns: |
- List of items that were uploaded. All other items are already there. |
- """ |
- # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently |
- # used by swarming.py. There's no need to spawn multiple threads and try to |
- # do stuff in parallel: there's nothing to parallelize. 'contains' check and |
- # 'push' should be performed sequentially in the context of current thread. |
- |
- # For each digest keep only first Item that matches it. All other items |
- # are just indistinguishable copies from the point of view of isolate |
- # server (it doesn't care about paths at all, only content and digests). |
- seen = {} |
- duplicates = 0 |
- for item in items: |
- if seen.setdefault(item.digest, item) is not item: |
- duplicates += 1 |
- items = seen.values() |
- if duplicates: |
- logging.info('Skipped %d duplicated files', duplicates) |
- |
- # Enqueue all upload tasks. |
- missing = set() |
- channel = threading_utils.TaskChannel() |
- for missing_item in self.get_missing_items(items): |
- missing.add(missing_item) |
- self.async_push( |
- channel, |
- WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED, |
- missing_item) |
- |
- uploaded = [] |
- # No need to spawn deadlock detector thread if there's nothing to upload. |
- if missing: |
- with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: |
- # Wait for all started uploads to finish. |
- while len(uploaded) != len(missing): |
- detector.ping() |
- item = channel.pull() |
- uploaded.append(item) |
- logging.debug( |
- 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest) |
- logging.info('All files are uploaded') |
- |
- # Print stats. |
- total = len(items) |
- total_size = sum(f.size for f in items) |
- logging.info( |
- 'Total: %6d, %9.1fkb', |
- total, |
- total_size / 1024.) |
- cache_hit = set(items) - missing |
- cache_hit_size = sum(f.size for f in cache_hit) |
- logging.info( |
- 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', |
- len(cache_hit), |
- cache_hit_size / 1024., |
- len(cache_hit) * 100. / total, |
- cache_hit_size * 100. / total_size if total_size else 0) |
- cache_miss = missing |
- cache_miss_size = sum(f.size for f in cache_miss) |
- logging.info( |
- 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', |
- len(cache_miss), |
- cache_miss_size / 1024., |
- len(cache_miss) * 100. / total, |
- cache_miss_size * 100. / total_size if total_size else 0) |
- |
- return uploaded |
- |
- def get_fetch_url(self, digest): |
- """Returns an URL that can be used to fetch an item with given digest. |
- |
- Arguments: |
- digest: hex digest of item to fetch. |
- |
- Returns: |
- An URL or None if underlying protocol doesn't support this. |
- """ |
- return self._storage_api.get_fetch_url(digest) |
- |
- def async_push(self, channel, priority, item): |
- """Starts asynchronous push to the server in a parallel thread. |
- |
- Arguments: |
- channel: TaskChannel that receives back |item| when upload ends. |
- priority: thread pool task priority for the push. |
- item: item to upload as instance of Item class. |
- """ |
- def push(content): |
- """Pushes an item and returns its id, to pass as a result to |channel|.""" |
- self._storage_api.push(item, content) |
- return item |
- |
- # If zipping is not required, just start a push task. |
- if not self.use_zip: |
- self.net_thread_pool.add_task_with_channel(channel, priority, push, |
- item.content(DISK_FILE_CHUNK)) |
- return |
- |
- # If zipping is enabled, zip in a separate thread. |
- def zip_and_push(): |
- # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble |
- # content right here. It will block until all file is zipped. |
- try: |
- stream = zip_compress(item.content(ZIPPED_FILE_CHUNK), |
- item.compression_level) |
- data = ''.join(stream) |
- except Exception as exc: |
- logging.error('Failed to zip \'%s\': %s', item, exc) |
- channel.send_exception(exc) |
- return |
- self.net_thread_pool.add_task_with_channel( |
- channel, priority, push, [data]) |
- self.cpu_thread_pool.add_task(priority, zip_and_push) |
- |
- def async_fetch(self, channel, priority, digest, size, sink): |
- """Starts asynchronous fetch from the server in a parallel thread. |
- |
- Arguments: |
- channel: TaskChannel that receives back |digest| when download ends. |
- priority: thread pool task priority for the fetch. |
- digest: hex digest of an item to download. |
- size: expected size of the item (after decompression). |
- sink: function that will be called as sink(generator). |
- """ |
- def fetch(): |
- try: |
- # Prepare reading pipeline. |
- stream = self._storage_api.fetch(digest) |
- if self.use_zip: |
- stream = zip_decompress(stream, DISK_FILE_CHUNK) |
- # Run |stream| through verifier that will assert its size. |
- verifier = FetchStreamVerifier(stream, size) |
- # Verified stream goes to |sink|. |
- sink(verifier.run()) |
- except Exception as err: |
- logging.warning('Failed to fetch %s: %s', digest, err) |
- raise |
- return digest |
- |
- # Don't bother with zip_thread_pool for decompression. Decompression is |
- # really fast and most probably IO bound anyway. |
- self.net_thread_pool.add_task_with_channel(channel, priority, fetch) |
- |
- def get_missing_items(self, items): |
- """Yields items that are missing from the server. |
- |
- Issues multiple parallel queries via StorageApi's 'contains' method. |
- |
- Arguments: |
- items: a list of Item objects to check. |
- |
- Yields: |
- Item objects that are missing from the server. |
- """ |
- channel = threading_utils.TaskChannel() |
- pending = 0 |
- # Enqueue all requests. |
- for batch in self.batch_items_for_check(items): |
- self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH, |
- self._storage_api.contains, batch) |
- pending += 1 |
- # Yield results as they come in. |
- for _ in xrange(pending): |
- for missing in channel.pull(): |
- yield missing |
- |
- @staticmethod |
- def batch_items_for_check(items): |
- """Splits list of items to check for existence on the server into batches. |
- |
- Each batch corresponds to a single 'exists?' query to the server via a call |
- to StorageApi's 'contains' method. |
- |
- Arguments: |
- items: a list of Item objects. |
- |
- Yields: |
- Batches of items to query for existence in a single operation, |
- each batch is a list of Item objects. |
- """ |
- batch_count = 0 |
- batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0] |
- next_queries = [] |
- for item in sorted(items, key=lambda x: x.size, reverse=True): |
- next_queries.append(item) |
- if len(next_queries) == batch_size_limit: |
- yield next_queries |
- next_queries = [] |
- batch_count += 1 |
- batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[ |
- min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)] |
- if next_queries: |
- yield next_queries |
- |
- |
-class FetchQueue(object): |
- """Fetches items from Storage and places them into LocalCache. |
- |
- It manages multiple concurrent fetch operations. Acts as a bridge between |
- Storage and LocalCache so that Storage and LocalCache don't depend on each |
- other at all. |
- """ |
- |
- def __init__(self, storage, cache): |
- self.storage = storage |
- self.cache = cache |
- self._channel = threading_utils.TaskChannel() |
- self._pending = set() |
- self._accessed = set() |
- self._fetched = cache.cached_set() |
- |
- def add(self, priority, digest, size=UNKNOWN_FILE_SIZE): |
- """Starts asynchronous fetch of item |digest|.""" |
- # Fetching it now? |
- if digest in self._pending: |
- return |
- |
- # Mark this file as in use, verify_all_cached will later ensure it is still |
- # in cache. |
- self._accessed.add(digest) |
- |
- # Already fetched? Notify cache to update item's LRU position. |
- if digest in self._fetched: |
- # 'touch' returns True if item is in cache and not corrupted. |
- if self.cache.touch(digest, size): |
- return |
- # Item is corrupted, remove it from cache and fetch it again. |
- self._fetched.remove(digest) |
- self.cache.evict(digest) |
- |
- # TODO(maruel): It should look at the free disk space, the current cache |
- # size and the size of the new item on every new item: |
- # - Trim the cache as more entries are listed when free disk space is low, |
- # otherwise if the amount of data downloaded during the run > free disk |
- # space, it'll crash. |
- # - Make sure there's enough free disk space to fit all dependencies of |
- # this run! If not, abort early. |
- |
- # Start fetching. |
- self._pending.add(digest) |
- self.storage.async_fetch( |
- self._channel, priority, digest, size, |
- functools.partial(self.cache.write, digest)) |
- |
- def wait(self, digests): |
- """Starts a loop that waits for at least one of |digests| to be retrieved. |
- |
- Returns the first digest retrieved. |
- """ |
- # Flush any already fetched items. |
- for digest in digests: |
- if digest in self._fetched: |
- return digest |
- |
- # Ensure all requested items are being fetched now. |
- assert all(digest in self._pending for digest in digests), ( |
- digests, self._pending) |
- |
- # Wait for some requested item to finish fetching. |
- while self._pending: |
- digest = self._channel.pull() |
- self._pending.remove(digest) |
- self._fetched.add(digest) |
- if digest in digests: |
- return digest |
- |
- # Should never reach this point due to assert above. |
- raise RuntimeError('Impossible state') |
- |
- def inject_local_file(self, path, algo): |
- """Adds local file to the cache as if it was fetched from storage.""" |
- with open(path, 'rb') as f: |
- data = f.read() |
- digest = algo(data).hexdigest() |
- self.cache.write(digest, [data]) |
- self._fetched.add(digest) |
- return digest |
- |
- @property |
- def pending_count(self): |
- """Returns number of items to be fetched.""" |
- return len(self._pending) |
- |
- def verify_all_cached(self): |
- """True if all accessed items are in cache.""" |
- return self._accessed.issubset(self.cache.cached_set()) |
- |
- |
-class FetchStreamVerifier(object): |
- """Verifies that fetched file is valid before passing it to the LocalCache.""" |
- |
- def __init__(self, stream, expected_size): |
- self.stream = stream |
- self.expected_size = expected_size |
- self.current_size = 0 |
- |
- def run(self): |
- """Generator that yields same items as |stream|. |
- |
- Verifies |stream| is complete before yielding a last chunk to consumer. |
- |
- Also wraps IOError produced by consumer into MappingError exceptions since |
- otherwise Storage will retry fetch on unrelated local cache errors. |
- """ |
- # Read one chunk ahead, keep it in |stored|. |
- # That way a complete stream can be verified before pushing last chunk |
- # to consumer. |
- stored = None |
- for chunk in self.stream: |
- assert chunk is not None |
- if stored is not None: |
- self._inspect_chunk(stored, is_last=False) |
- try: |
- yield stored |
- except IOError as exc: |
- raise MappingError('Failed to store an item in cache: %s' % exc) |
- stored = chunk |
- if stored is not None: |
- self._inspect_chunk(stored, is_last=True) |
- try: |
- yield stored |
- except IOError as exc: |
- raise MappingError('Failed to store an item in cache: %s' % exc) |
- |
- def _inspect_chunk(self, chunk, is_last): |
- """Called for each fetched chunk before passing it to consumer.""" |
- self.current_size += len(chunk) |
- if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and |
- (self.expected_size != self.current_size)): |
- raise IOError('Incorrect file size: expected %d, got %d' % ( |
- self.expected_size, self.current_size)) |
- |
- |
-class StorageApi(object): |
- """Interface for classes that implement low-level storage operations.""" |
- |
- def get_fetch_url(self, digest): |
- """Returns an URL that can be used to fetch an item with given digest. |
- |
- Arguments: |
- digest: hex digest of item to fetch. |
- |
- Returns: |
- An URL or None if the protocol doesn't support this. |
- """ |
- raise NotImplementedError() |
- |
- def fetch(self, digest): |
- """Fetches an object and yields its content. |
- |
- Arguments: |
- digest: hash digest of item to download. |
- |
- Yields: |
- Chunks of downloaded item (as str objects). |
- """ |
- raise NotImplementedError() |
- |
- def push(self, item, content): |
- """Uploads an |item| with content generated by |content| generator. |
- |
- Arguments: |
- item: Item object that holds information about an item being pushed. |
- content: a generator that yields chunks to push. |
- |
- Returns: |
- None. |
- """ |
- raise NotImplementedError() |
- |
- def contains(self, items): |
- """Checks for existence of given |items| on the server. |
- |
- Mutates |items| by assigning opaque implement specific object to Item's |
- push_state attribute on missing entries in the datastore. |
- |
- Arguments: |
- items: list of Item objects. |
- |
- Returns: |
- A list of items missing on server as a list of Item objects. |
- """ |
- raise NotImplementedError() |
- |
- |
-class IsolateServer(StorageApi): |
- """StorageApi implementation that downloads and uploads to Isolate Server. |
- |
- It uploads and downloads directly from Google Storage whenever appropriate. |
- """ |
- |
- class _PushState(object): |
- """State needed to call .push(), to be stored in Item.push_state.""" |
- def __init__(self, upload_url, finalize_url): |
- self.upload_url = upload_url |
- self.finalize_url = finalize_url |
- self.uploaded = False |
- self.finalized = False |
- |
- def __init__(self, base_url, namespace): |
- super(IsolateServer, self).__init__() |
- assert base_url.startswith('http'), base_url |
- self.base_url = base_url.rstrip('/') |
- self.namespace = namespace |
- self._lock = threading.Lock() |
- self._server_caps = None |
- |
- @staticmethod |
- def _generate_handshake_request(): |
- """Returns a dict to be sent as handshake request body.""" |
- # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage. |
- return { |
- 'client_app_version': __version__, |
- 'fetcher': True, |
- 'protocol_version': ISOLATE_PROTOCOL_VERSION, |
- 'pusher': True, |
- } |
- |
- @staticmethod |
- def _validate_handshake_response(caps): |
- """Validates and normalizes handshake response.""" |
- logging.info('Protocol version: %s', caps['protocol_version']) |
- logging.info('Server version: %s', caps['server_app_version']) |
- if caps.get('error'): |
- raise MappingError(caps['error']) |
- if not caps['access_token']: |
- raise ValueError('access_token is missing') |
- return caps |
- |
- @property |
- def _server_capabilities(self): |
- """Performs handshake with the server if not yet done. |
- |
- Returns: |
- Server capabilities dictionary as returned by /handshake endpoint. |
- |
- Raises: |
- MappingError if server rejects the handshake. |
- """ |
- # TODO(maruel): Make this request much earlier asynchronously while the |
- # files are being enumerated. |
- with self._lock: |
- if self._server_caps is None: |
- request_body = json.dumps( |
- self._generate_handshake_request(), separators=(',', ':')) |
- response = net.url_read( |
- url=self.base_url + '/content-gs/handshake', |
- data=request_body, |
- content_type='application/json', |
- method='POST') |
- if response is None: |
- raise MappingError('Failed to perform handshake.') |
- try: |
- caps = json.loads(response) |
- if not isinstance(caps, dict): |
- raise ValueError('Expecting JSON dict') |
- self._server_caps = self._validate_handshake_response(caps) |
- except (ValueError, KeyError, TypeError) as exc: |
- # KeyError exception has very confusing str conversion: it's just a |
- # missing key value and nothing else. So print exception class name |
- # as well. |
- raise MappingError('Invalid handshake response (%s): %s' % ( |
- exc.__class__.__name__, exc)) |
- return self._server_caps |
- |
- def get_fetch_url(self, digest): |
- assert isinstance(digest, basestring) |
- return '%s/content-gs/retrieve/%s/%s' % ( |
- self.base_url, self.namespace, digest) |
- |
- def fetch(self, digest): |
- source_url = self.get_fetch_url(digest) |
- logging.debug('download_file(%s)', source_url) |
- |
- # Because the app engine DB is only eventually consistent, retry 404 errors |
- # because the file might just not be visible yet (even though it has been |
- # uploaded). |
- connection = net.url_open( |
- source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) |
- if not connection: |
- raise IOError('Unable to open connection to %s' % source_url) |
- return stream_read(connection, NET_IO_FILE_CHUNK) |
- |
- def push(self, item, content): |
- assert isinstance(item, Item) |
- assert isinstance(item.push_state, IsolateServer._PushState) |
- assert not item.push_state.finalized |
- |
- # TODO(vadimsh): Do not read from |content| generator when retrying push. |
- # If |content| is indeed a generator, it can not be re-winded back |
- # to the beginning of the stream. A retry will find it exhausted. A possible |
- # solution is to wrap |content| generator with some sort of caching |
- # restartable generator. It should be done alongside streaming support |
- # implementation. |
- |
- # This push operation may be a retry after failed finalization call below, |
- # no need to reupload contents in that case. |
- if not item.push_state.uploaded: |
- # A cheezy way to avoid memcpy of (possibly huge) file, until streaming |
- # upload support is implemented. |
- if isinstance(content, list) and len(content) == 1: |
- content = content[0] |
- else: |
- content = ''.join(content) |
- # PUT file to |upload_url|. |
- response = net.url_read( |
- url=item.push_state.upload_url, |
- data=content, |
- content_type='application/octet-stream', |
- method='PUT') |
- if response is None: |
- raise IOError('Failed to upload a file %s to %s' % ( |
- item.digest, item.push_state.upload_url)) |
- item.push_state.uploaded = True |
- else: |
- logging.info( |
- 'A file %s already uploaded, retrying finalization only', item.digest) |
- |
- # Optionally notify the server that it's done. |
- if item.push_state.finalize_url: |
- # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and |
- # send it to isolated server. That way isolate server can verify that |
- # the data safely reached Google Storage (GS provides MD5 and CRC32C of |
- # stored files). |
- response = net.url_read( |
- url=item.push_state.finalize_url, |
- data='', |
- content_type='application/json', |
- method='POST') |
- if response is None: |
- raise IOError('Failed to finalize an upload of %s' % item.digest) |
- item.push_state.finalized = True |
- |
- def contains(self, items): |
- logging.info('Checking existence of %d files...', len(items)) |
- |
- # Request body is a json encoded list of dicts. |
- body = [ |
- { |
- 'h': item.digest, |
- 's': item.size, |
- 'i': int(item.is_isolated), |
- } for item in items |
- ] |
- |
- query_url = '%s/content-gs/pre-upload/%s?token=%s' % ( |
- self.base_url, |
- self.namespace, |
- urllib.quote(self._server_capabilities['access_token'])) |
- response_body = net.url_read( |
- url=query_url, |
- data=json.dumps(body, separators=(',', ':')), |
- content_type='application/json', |
- method='POST') |
- if response_body is None: |
- raise MappingError('Failed to execute /pre-upload query') |
- |
- # Response body is a list of push_urls (or null if file is already present). |
- try: |
- response = json.loads(response_body) |
- if not isinstance(response, list): |
- raise ValueError('Expecting response with json-encoded list') |
- if len(response) != len(items): |
- raise ValueError( |
- 'Incorrect number of items in the list, expected %d, ' |
- 'but got %d' % (len(items), len(response))) |
- except ValueError as err: |
- raise MappingError( |
- 'Invalid response from server: %s, body is %s' % (err, response_body)) |
- |
- # Pick Items that are missing, attach _PushState to them. |
- missing_items = [] |
- for i, push_urls in enumerate(response): |
- if push_urls: |
- assert len(push_urls) == 2, str(push_urls) |
- item = items[i] |
- assert item.push_state is None |
- item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1]) |
- missing_items.append(item) |
- logging.info('Queried %d files, %d cache hit', |
- len(items), len(items) - len(missing_items)) |
- return missing_items |
- |
- |
-class FileSystem(StorageApi): |
- """StorageApi implementation that fetches data from the file system. |
- |
- The common use case is a NFS/CIFS file server that is mounted locally that is |
- used to fetch the file on a local partition. |
- """ |
- |
- def __init__(self, base_path): |
- super(FileSystem, self).__init__() |
- self.base_path = base_path |
- |
- def get_fetch_url(self, digest): |
- return None |
- |
- def fetch(self, digest): |
- assert isinstance(digest, basestring) |
- return file_read(os.path.join(self.base_path, digest)) |
- |
- def push(self, item, content): |
- assert isinstance(item, Item) |
- file_write(os.path.join(self.base_path, item.digest), content) |
- |
- def contains(self, items): |
- return [ |
- item for item in items |
- if not os.path.exists(os.path.join(self.base_path, item.digest)) |
- ] |
- |
- |
-class LocalCache(object): |
- """Local cache that stores objects fetched via Storage. |
- |
- It can be accessed concurrently from multiple threads, so it should protect |
- its internal state with some lock. |
- """ |
- |
- def __enter__(self): |
- """Context manager interface.""" |
- return self |
- |
- def __exit__(self, _exc_type, _exec_value, _traceback): |
- """Context manager interface.""" |
- return False |
- |
- def cached_set(self): |
- """Returns a set of all cached digests (always a new object).""" |
- raise NotImplementedError() |
- |
- def touch(self, digest, size): |
- """Ensures item is not corrupted and updates its LRU position. |
- |
- Arguments: |
- digest: hash digest of item to check. |
- size: expected size of this item. |
- |
- Returns: |
- True if item is in cache and not corrupted. |
- """ |
- raise NotImplementedError() |
- |
- def evict(self, digest): |
- """Removes item from cache if it's there.""" |
- raise NotImplementedError() |
- |
- def read(self, digest): |
- """Returns contents of the cached item as a single str.""" |
- raise NotImplementedError() |
- |
- def write(self, digest, content): |
- """Reads data from |content| generator and stores it in cache.""" |
- raise NotImplementedError() |
- |
- def link(self, digest, dest, file_mode=None): |
- """Ensures file at |dest| has same content as cached |digest|.""" |
- raise NotImplementedError() |
- |
- |
-class MemoryCache(LocalCache): |
- """LocalCache implementation that stores everything in memory.""" |
- |
- def __init__(self): |
- super(MemoryCache, self).__init__() |
- # Let's not assume dict is thread safe. |
- self._lock = threading.Lock() |
- self._contents = {} |
- |
- def cached_set(self): |
- with self._lock: |
- return set(self._contents) |
- |
- def touch(self, digest, size): |
- with self._lock: |
- return digest in self._contents |
- |
- def evict(self, digest): |
- with self._lock: |
- self._contents.pop(digest, None) |
- |
- def read(self, digest): |
- with self._lock: |
- return self._contents[digest] |
- |
- def write(self, digest, content): |
- # Assemble whole stream before taking the lock. |
- data = ''.join(content) |
- with self._lock: |
- self._contents[digest] = data |
- |
- def link(self, digest, dest, file_mode=None): |
- file_write(dest, [self.read(digest)]) |
- if file_mode is not None: |
- os.chmod(dest, file_mode) |
- |
- |
-def get_hash_algo(_namespace): |
- """Return hash algorithm class to use when uploading to given |namespace|.""" |
- # TODO(vadimsh): Implement this at some point. |
- return hashlib.sha1 |
- |
- |
-def is_namespace_with_compression(namespace): |
- """Returns True if given |namespace| stores compressed objects.""" |
- return namespace.endswith(('-gzip', '-deflate')) |
- |
- |
-def get_storage_api(file_or_url, namespace): |
- """Returns an object that implements StorageApi interface.""" |
- if re.match(r'^https?://.+$', file_or_url): |
- return IsolateServer(file_or_url, namespace) |
- else: |
- return FileSystem(file_or_url) |
- |
- |
-def get_storage(file_or_url, namespace): |
- """Returns Storage class configured with appropriate StorageApi instance.""" |
- return Storage( |
- get_storage_api(file_or_url, namespace), |
- is_namespace_with_compression(namespace)) |
- |
- |
-def upload_tree(base_url, indir, infiles, namespace): |
- """Uploads the given tree to the given url. |
- |
- Arguments: |
- base_url: The base url, it is assume that |base_url|/has/ can be used to |
- query if an element was already uploaded, and |base_url|/store/ |
- can be used to upload a new element. |
- indir: Root directory the infiles are based in. |
- infiles: dict of files to upload from |indir| to |base_url|. |
- namespace: The namespace to use on the server. |
- """ |
- with get_storage(base_url, namespace) as storage: |
- storage.upload_tree(indir, infiles) |
- return 0 |
- |
- |
-def load_isolated(content, os_flavor, algo): |
- """Verifies the .isolated file is valid and loads this object with the json |
- data. |
- |
- Arguments: |
- - content: raw serialized content to load. |
- - os_flavor: OS to load this file on. Optional. |
- - algo: hashlib algorithm class. Used to confirm the algorithm matches the |
- algorithm used on the Isolate Server. |
- """ |
- try: |
- data = json.loads(content) |
- except ValueError: |
- raise ConfigError('Failed to parse: %s...' % content[:100]) |
- |
- if not isinstance(data, dict): |
- raise ConfigError('Expected dict, got %r' % data) |
- |
- # Check 'version' first, since it could modify the parsing after. |
- value = data.get('version', '1.0') |
- if not isinstance(value, basestring): |
- raise ConfigError('Expected string, got %r' % value) |
- if not re.match(r'^(\d+)\.(\d+)$', value): |
- raise ConfigError('Expected a compatible version, got %r' % value) |
- if value.split('.', 1)[0] != '1': |
- raise ConfigError('Expected compatible \'1.x\' version, got %r' % value) |
- |
- if algo is None: |
- # Default the algorithm used in the .isolated file itself, falls back to |
- # 'sha-1' if unspecified. |
- algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')] |
- |
- for key, value in data.iteritems(): |
- if key == 'algo': |
- if not isinstance(value, basestring): |
- raise ConfigError('Expected string, got %r' % value) |
- if value not in SUPPORTED_ALGOS: |
- raise ConfigError( |
- 'Expected one of \'%s\', got %r' % |
- (', '.join(sorted(SUPPORTED_ALGOS)), value)) |
- if value != SUPPORTED_ALGOS_REVERSE[algo]: |
- raise ConfigError( |
- 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value)) |
- |
- elif key == 'command': |
- if not isinstance(value, list): |
- raise ConfigError('Expected list, got %r' % value) |
- if not value: |
- raise ConfigError('Expected non-empty command') |
- for subvalue in value: |
- if not isinstance(subvalue, basestring): |
- raise ConfigError('Expected string, got %r' % subvalue) |
- |
- elif key == 'files': |
- if not isinstance(value, dict): |
- raise ConfigError('Expected dict, got %r' % value) |
- for subkey, subvalue in value.iteritems(): |
- if not isinstance(subkey, basestring): |
- raise ConfigError('Expected string, got %r' % subkey) |
- if not isinstance(subvalue, dict): |
- raise ConfigError('Expected dict, got %r' % subvalue) |
- for subsubkey, subsubvalue in subvalue.iteritems(): |
- if subsubkey == 'l': |
- if not isinstance(subsubvalue, basestring): |
- raise ConfigError('Expected string, got %r' % subsubvalue) |
- elif subsubkey == 'm': |
- if not isinstance(subsubvalue, int): |
- raise ConfigError('Expected int, got %r' % subsubvalue) |
- elif subsubkey == 'h': |
- if not is_valid_hash(subsubvalue, algo): |
- raise ConfigError('Expected sha-1, got %r' % subsubvalue) |
- elif subsubkey == 's': |
- if not isinstance(subsubvalue, int): |
- raise ConfigError('Expected int, got %r' % subsubvalue) |
- else: |
- raise ConfigError('Unknown subsubkey %s' % subsubkey) |
- if bool('h' in subvalue) == bool('l' in subvalue): |
- raise ConfigError( |
- 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' % |
- subvalue) |
- if bool('h' in subvalue) != bool('s' in subvalue): |
- raise ConfigError( |
- 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' % |
- subvalue) |
- if bool('s' in subvalue) == bool('l' in subvalue): |
- raise ConfigError( |
- 'Need only one of \'s\' (size) or \'l\' (link), got: %r' % |
- subvalue) |
- if bool('l' in subvalue) and bool('m' in subvalue): |
- raise ConfigError( |
- 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' % |
- subvalue) |
- |
- elif key == 'includes': |
- if not isinstance(value, list): |
- raise ConfigError('Expected list, got %r' % value) |
- if not value: |
- raise ConfigError('Expected non-empty includes list') |
- for subvalue in value: |
- if not is_valid_hash(subvalue, algo): |
- raise ConfigError('Expected sha-1, got %r' % subvalue) |
- |
- elif key == 'read_only': |
- if not isinstance(value, bool): |
- raise ConfigError('Expected bool, got %r' % value) |
- |
- elif key == 'relative_cwd': |
- if not isinstance(value, basestring): |
- raise ConfigError('Expected string, got %r' % value) |
- |
- elif key == 'os': |
- if os_flavor and value != os_flavor: |
- raise ConfigError( |
- 'Expected \'os\' to be \'%s\' but got \'%s\'' % |
- (os_flavor, value)) |
- |
- elif key == 'version': |
- # Already checked above. |
- pass |
- |
- else: |
- raise ConfigError('Unknown key %r' % key) |
- |
- # Automatically fix os.path.sep if necessary. While .isolated files are always |
- # in the the native path format, someone could want to download an .isolated |
- # tree from another OS. |
- wrong_path_sep = '/' if os.path.sep == '\\' else '\\' |
- if 'files' in data: |
- data['files'] = dict( |
- (k.replace(wrong_path_sep, os.path.sep), v) |
- for k, v in data['files'].iteritems()) |
- for v in data['files'].itervalues(): |
- if 'l' in v: |
- v['l'] = v['l'].replace(wrong_path_sep, os.path.sep) |
- if 'relative_cwd' in data: |
- data['relative_cwd'] = data['relative_cwd'].replace( |
- wrong_path_sep, os.path.sep) |
- return data |
- |
- |
-class IsolatedFile(object): |
- """Represents a single parsed .isolated file.""" |
- def __init__(self, obj_hash, algo): |
- """|obj_hash| is really the sha-1 of the file.""" |
- logging.debug('IsolatedFile(%s)' % obj_hash) |
- self.obj_hash = obj_hash |
- self.algo = algo |
- # Set once all the left-side of the tree is parsed. 'Tree' here means the |
- # .isolate and all the .isolated files recursively included by it with |
- # 'includes' key. The order of each sha-1 in 'includes', each representing a |
- # .isolated file in the hash table, is important, as the later ones are not |
- # processed until the firsts are retrieved and read. |
- self.can_fetch = False |
- |
- # Raw data. |
- self.data = {} |
- # A IsolatedFile instance, one per object in self.includes. |
- self.children = [] |
- |
- # Set once the .isolated file is loaded. |
- self._is_parsed = False |
- # Set once the files are fetched. |
- self.files_fetched = False |
- |
- def load(self, os_flavor, content): |
- """Verifies the .isolated file is valid and loads this object with the json |
- data. |
- """ |
- logging.debug('IsolatedFile.load(%s)' % self.obj_hash) |
- assert not self._is_parsed |
- self.data = load_isolated(content, os_flavor, self.algo) |
- self.children = [ |
- IsolatedFile(i, self.algo) for i in self.data.get('includes', []) |
- ] |
- self._is_parsed = True |
- |
- def fetch_files(self, fetch_queue, files): |
- """Adds files in this .isolated file not present in |files| dictionary. |
- |
- Preemptively request files. |
- |
- Note that |files| is modified by this function. |
- """ |
- assert self.can_fetch |
- if not self._is_parsed or self.files_fetched: |
- return |
- logging.debug('fetch_files(%s)' % self.obj_hash) |
- for filepath, properties in self.data.get('files', {}).iteritems(): |
- # Root isolated has priority on the files being mapped. In particular, |
- # overriden files must not be fetched. |
- if filepath not in files: |
- files[filepath] = properties |
- if 'h' in properties: |
- # Preemptively request files. |
- logging.debug('fetching %s' % filepath) |
- fetch_queue.add(WorkerPool.MED, properties['h'], properties['s']) |
- self.files_fetched = True |
- |
- |
-class Settings(object): |
- """Results of a completely parsed .isolated file.""" |
- def __init__(self): |
- self.command = [] |
- self.files = {} |
- self.read_only = None |
- self.relative_cwd = None |
- # The main .isolated file, a IsolatedFile instance. |
- self.root = None |
- |
- def load(self, fetch_queue, root_isolated_hash, os_flavor, algo): |
- """Loads the .isolated and all the included .isolated asynchronously. |
- |
- It enables support for "included" .isolated files. They are processed in |
- strict order but fetched asynchronously from the cache. This is important so |
- that a file in an included .isolated file that is overridden by an embedding |
- .isolated file is not fetched needlessly. The includes are fetched in one |
- pass and the files are fetched as soon as all the ones on the left-side |
- of the tree were fetched. |
- |
- The prioritization is very important here for nested .isolated files. |
- 'includes' have the highest priority and the algorithm is optimized for both |
- deep and wide trees. A deep one is a long link of .isolated files referenced |
- one at a time by one item in 'includes'. A wide one has a large number of |
- 'includes' in a single .isolated file. 'left' is defined as an included |
- .isolated file earlier in the 'includes' list. So the order of the elements |
- in 'includes' is important. |
- """ |
- self.root = IsolatedFile(root_isolated_hash, algo) |
- |
- # Isolated files being retrieved now: hash -> IsolatedFile instance. |
- pending = {} |
- # Set of hashes of already retrieved items to refuse recursive includes. |
- seen = set() |
- |
- def retrieve(isolated_file): |
- h = isolated_file.obj_hash |
- if h in seen: |
- raise ConfigError('IsolatedFile %s is retrieved recursively' % h) |
- assert h not in pending |
- seen.add(h) |
- pending[h] = isolated_file |
- fetch_queue.add(WorkerPool.HIGH, h) |
- |
- retrieve(self.root) |
- |
- while pending: |
- item_hash = fetch_queue.wait(pending) |
- item = pending.pop(item_hash) |
- item.load(os_flavor, fetch_queue.cache.read(item_hash)) |
- if item_hash == root_isolated_hash: |
- # It's the root item. |
- item.can_fetch = True |
- |
- for new_child in item.children: |
- retrieve(new_child) |
- |
- # Traverse the whole tree to see if files can now be fetched. |
- self._traverse_tree(fetch_queue, self.root) |
- |
- def check(n): |
- return all(check(x) for x in n.children) and n.files_fetched |
- assert check(self.root) |
- |
- self.relative_cwd = self.relative_cwd or '' |
- self.read_only = self.read_only or False |
- |
- def _traverse_tree(self, fetch_queue, node): |
- if node.can_fetch: |
- if not node.files_fetched: |
- self._update_self(fetch_queue, node) |
- will_break = False |
- for i in node.children: |
- if not i.can_fetch: |
- if will_break: |
- break |
- # Automatically mark the first one as fetcheable. |
- i.can_fetch = True |
- will_break = True |
- self._traverse_tree(fetch_queue, i) |
- |
- def _update_self(self, fetch_queue, node): |
- node.fetch_files(fetch_queue, self.files) |
- # Grabs properties. |
- if not self.command and node.data.get('command'): |
- # Ensure paths are correctly separated on windows. |
- self.command = node.data['command'] |
- if self.command: |
- self.command[0] = self.command[0].replace('/', os.path.sep) |
- self.command = tools.fix_python_path(self.command) |
- if self.read_only is None and node.data.get('read_only') is not None: |
- self.read_only = node.data['read_only'] |
- if (self.relative_cwd is None and |
- node.data.get('relative_cwd') is not None): |
- self.relative_cwd = node.data['relative_cwd'] |
- |
- |
-def fetch_isolated( |
- isolated_hash, storage, cache, algo, outdir, os_flavor, require_command): |
- """Aggressively downloads the .isolated file(s), then download all the files. |
- |
- Arguments: |
- isolated_hash: hash of the root *.isolated file. |
- storage: Storage class that communicates with isolate storage. |
- cache: LocalCache class that knows how to store and map files locally. |
- algo: hash algorithm to use. |
- outdir: Output directory to map file tree to. |
- os_flavor: OS flavor to choose when reading sections of *.isolated file. |
- require_command: Ensure *.isolated specifies a command to run. |
- |
- Returns: |
- Settings object that holds details about loaded *.isolated file. |
- """ |
- with cache: |
- fetch_queue = FetchQueue(storage, cache) |
- settings = Settings() |
- |
- with tools.Profiler('GetIsolateds'): |
- # Optionally support local files by manually adding them to cache. |
- if not is_valid_hash(isolated_hash, algo): |
- isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo) |
- |
- # Load all *.isolated and start loading rest of the files. |
- settings.load(fetch_queue, isolated_hash, os_flavor, algo) |
- if require_command and not settings.command: |
- # TODO(vadimsh): All fetch operations are already enqueue and there's no |
- # easy way to cancel them. |
- raise ConfigError('No command to run') |
- |
- with tools.Profiler('GetRest'): |
- # Create file system hierarchy. |
- if not os.path.isdir(outdir): |
- os.makedirs(outdir) |
- create_directories(outdir, settings.files) |
- create_links(outdir, settings.files.iteritems()) |
- |
- # Ensure working directory exists. |
- cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd)) |
- if not os.path.isdir(cwd): |
- os.makedirs(cwd) |
- |
- # Multimap: digest -> list of pairs (path, props). |
- remaining = {} |
- for filepath, props in settings.files.iteritems(): |
- if 'h' in props: |
- remaining.setdefault(props['h'], []).append((filepath, props)) |
- |
- # Now block on the remaining files to be downloaded and mapped. |
- logging.info('Retrieving remaining files (%d of them)...', |
- fetch_queue.pending_count) |
- last_update = time.time() |
- with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: |
- while remaining: |
- detector.ping() |
- |
- # Wait for any item to finish fetching to cache. |
- digest = fetch_queue.wait(remaining) |
- |
- # Link corresponding files to a fetched item in cache. |
- for filepath, props in remaining.pop(digest): |
- cache.link(digest, os.path.join(outdir, filepath), props.get('m')) |
- |
- # Report progress. |
- duration = time.time() - last_update |
- if duration > DELAY_BETWEEN_UPDATES_IN_SECS: |
- msg = '%d files remaining...' % len(remaining) |
- print msg |
- logging.info(msg) |
- last_update = time.time() |
- |
- # Cache could evict some items we just tried to fetch, it's a fatal error. |
- if not fetch_queue.verify_all_cached(): |
- raise MappingError('Cache is too small to hold all requested files') |
- return settings |
- |
- |
-@subcommand.usage('<file1..fileN> or - to read from stdin') |
-def CMDarchive(parser, args): |
- """Archives data to the server.""" |
- options, files = parser.parse_args(args) |
- |
- if files == ['-']: |
- files = sys.stdin.readlines() |
- |
- if not files: |
- parser.error('Nothing to upload') |
- |
- # Load the necessary metadata. |
- # TODO(maruel): Use a worker pool to upload as the hashing is being done. |
- infiles = dict( |
- ( |
- f, |
- { |
- 's': os.stat(f).st_size, |
- 'h': hash_file(f, get_hash_algo(options.namespace)), |
- } |
- ) |
- for f in files) |
- |
- with tools.Profiler('Archive'): |
- ret = upload_tree( |
- base_url=options.isolate_server, |
- indir=os.getcwd(), |
- infiles=infiles, |
- namespace=options.namespace) |
- if not ret: |
- print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles)) |
- return ret |
- |
- |
-def CMDdownload(parser, args): |
- """Download data from the server. |
- |
- It can either download individual files or a complete tree from a .isolated |
- file. |
- """ |
- parser.add_option( |
- '-i', '--isolated', metavar='HASH', |
- help='hash of an isolated file, .isolated file content is discarded, use ' |
- '--file if you need it') |
- parser.add_option( |
- '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2, |
- help='hash and destination of a file, can be used multiple times') |
- parser.add_option( |
- '-t', '--target', metavar='DIR', default=os.getcwd(), |
- help='destination directory') |
- options, args = parser.parse_args(args) |
- if args: |
- parser.error('Unsupported arguments: %s' % args) |
- if bool(options.isolated) == bool(options.file): |
- parser.error('Use one of --isolated or --file, and only one.') |
- |
- options.target = os.path.abspath(options.target) |
- storage = get_storage(options.isolate_server, options.namespace) |
- cache = MemoryCache() |
- algo = get_hash_algo(options.namespace) |
- |
- # Fetching individual files. |
- if options.file: |
- channel = threading_utils.TaskChannel() |
- pending = {} |
- for digest, dest in options.file: |
- pending[digest] = dest |
- storage.async_fetch( |
- channel, |
- WorkerPool.MED, |
- digest, |
- UNKNOWN_FILE_SIZE, |
- functools.partial(file_write, os.path.join(options.target, dest))) |
- while pending: |
- fetched = channel.pull() |
- dest = pending.pop(fetched) |
- logging.info('%s: %s', fetched, dest) |
- |
- # Fetching whole isolated tree. |
- if options.isolated: |
- settings = fetch_isolated( |
- isolated_hash=options.isolated, |
- storage=storage, |
- cache=cache, |
- algo=algo, |
- outdir=options.target, |
- os_flavor=None, |
- require_command=False) |
- rel = os.path.join(options.target, settings.relative_cwd) |
- print('To run this test please run from the directory %s:' % |
- os.path.join(options.target, rel)) |
- print(' ' + ' '.join(settings.command)) |
- |
- return 0 |
- |
- |
-class OptionParserIsolateServer(tools.OptionParserWithLogging): |
- def __init__(self, **kwargs): |
- tools.OptionParserWithLogging.__init__(self, **kwargs) |
- self.add_option( |
- '-I', '--isolate-server', |
- metavar='URL', default='', |
- help='Isolate server to use') |
- self.add_option( |
- '--namespace', default='default-gzip', |
- help='The namespace to use on the server, default: %default') |
- |
- def parse_args(self, *args, **kwargs): |
- options, args = tools.OptionParserWithLogging.parse_args( |
- self, *args, **kwargs) |
- options.isolate_server = options.isolate_server.rstrip('/') |
- if not options.isolate_server: |
- self.error('--isolate-server is required.') |
- return options, args |
- |
- |
-def main(args): |
- dispatcher = subcommand.CommandDispatcher(__name__) |
- try: |
- return dispatcher.execute( |
- OptionParserIsolateServer(version=__version__), args) |
- except Exception as e: |
- tools.report_error(e) |
- return 1 |
- |
- |
-if __name__ == '__main__': |
- fix_encoding.fix_encoding() |
- tools.disable_buffering() |
- colorama.init() |
- sys.exit(main(sys.argv[1:])) |