| Index: swarm_client/isolateserver.py
|
| ===================================================================
|
| --- swarm_client/isolateserver.py (revision 235167)
|
| +++ swarm_client/isolateserver.py (working copy)
|
| @@ -1,1664 +0,0 @@
|
| -#!/usr/bin/env python
|
| -# Copyright 2013 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -"""Archives a set of files to a server."""
|
| -
|
| -__version__ = '0.2'
|
| -
|
| -import functools
|
| -import hashlib
|
| -import json
|
| -import logging
|
| -import os
|
| -import re
|
| -import sys
|
| -import threading
|
| -import time
|
| -import urllib
|
| -import zlib
|
| -
|
| -from third_party import colorama
|
| -from third_party.depot_tools import fix_encoding
|
| -from third_party.depot_tools import subcommand
|
| -
|
| -from utils import net
|
| -from utils import threading_utils
|
| -from utils import tools
|
| -
|
| -
|
| -# Version of isolate protocol passed to the server in /handshake request.
|
| -ISOLATE_PROTOCOL_VERSION = '1.0'
|
| -
|
| -
|
| -# The number of files to check the isolate server per /pre-upload query.
|
| -# All files are sorted by likelihood of a change in the file content
|
| -# (currently file size is used to estimate this: larger the file -> larger the
|
| -# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
|
| -# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
|
| -# and so on. Numbers here is a trade-off; the more per request, the lower the
|
| -# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
|
| -# larger values cause longer lookups, increasing the initial latency to start
|
| -# uploading, which is especially an issue for large files. This value is
|
| -# optimized for the "few thousands files to look up with minimal number of large
|
| -# files missing" case.
|
| -ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
|
| -
|
| -
|
| -# A list of already compressed extension types that should not receive any
|
| -# compression before being uploaded.
|
| -ALREADY_COMPRESSED_TYPES = [
|
| - '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
|
| - 'wav', 'zip'
|
| -]
|
| -
|
| -
|
| -# The file size to be used when we don't know the correct file size,
|
| -# generally used for .isolated files.
|
| -UNKNOWN_FILE_SIZE = None
|
| -
|
| -
|
| -# The size of each chunk to read when downloading and unzipping files.
|
| -ZIPPED_FILE_CHUNK = 16 * 1024
|
| -
|
| -# Chunk size to use when doing disk I/O.
|
| -DISK_FILE_CHUNK = 1024 * 1024
|
| -
|
| -# Chunk size to use when reading from network stream.
|
| -NET_IO_FILE_CHUNK = 16 * 1024
|
| -
|
| -
|
| -# Read timeout in seconds for downloads from isolate storage. If there's no
|
| -# response from the server within this timeout whole download will be aborted.
|
| -DOWNLOAD_READ_TIMEOUT = 60
|
| -
|
| -# Maximum expected delay (in seconds) between successive file fetches
|
| -# in run_tha_test. If it takes longer than that, a deadlock might be happening
|
| -# and all stack frames for all threads are dumped to log.
|
| -DEADLOCK_TIMEOUT = 5 * 60
|
| -
|
| -
|
| -# The delay (in seconds) to wait between logging statements when retrieving
|
| -# the required files. This is intended to let the user (or buildbot) know that
|
| -# the program is still running.
|
| -DELAY_BETWEEN_UPDATES_IN_SECS = 30
|
| -
|
| -
|
| -# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
|
| -# specify the names here.
|
| -SUPPORTED_ALGOS = {
|
| - 'md5': hashlib.md5,
|
| - 'sha-1': hashlib.sha1,
|
| - 'sha-512': hashlib.sha512,
|
| -}
|
| -
|
| -
|
| -# Used for serialization.
|
| -SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
|
| -
|
| -
|
| -class ConfigError(ValueError):
|
| - """Generic failure to load a .isolated file."""
|
| - pass
|
| -
|
| -
|
| -class MappingError(OSError):
|
| - """Failed to recreate the tree."""
|
| - pass
|
| -
|
| -
|
| -def is_valid_hash(value, algo):
|
| - """Returns if the value is a valid hash for the corresponding algorithm."""
|
| - size = 2 * algo().digest_size
|
| - return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
|
| -
|
| -
|
| -def hash_file(filepath, algo):
|
| - """Calculates the hash of a file without reading it all in memory at once.
|
| -
|
| - |algo| should be one of hashlib hashing algorithm.
|
| - """
|
| - digest = algo()
|
| - with open(filepath, 'rb') as f:
|
| - while True:
|
| - chunk = f.read(DISK_FILE_CHUNK)
|
| - if not chunk:
|
| - break
|
| - digest.update(chunk)
|
| - return digest.hexdigest()
|
| -
|
| -
|
| -def stream_read(stream, chunk_size):
|
| - """Reads chunks from |stream| and yields them."""
|
| - while True:
|
| - data = stream.read(chunk_size)
|
| - if not data:
|
| - break
|
| - yield data
|
| -
|
| -
|
| -def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
|
| - """Yields file content in chunks of given |chunk_size|."""
|
| - with open(filepath, 'rb') as f:
|
| - while True:
|
| - data = f.read(chunk_size)
|
| - if not data:
|
| - break
|
| - yield data
|
| -
|
| -
|
| -def file_write(filepath, content_generator):
|
| - """Writes file content as generated by content_generator.
|
| -
|
| - Creates the intermediary directory as needed.
|
| -
|
| - Returns the number of bytes written.
|
| -
|
| - Meant to be mocked out in unit tests.
|
| - """
|
| - filedir = os.path.dirname(filepath)
|
| - if not os.path.isdir(filedir):
|
| - os.makedirs(filedir)
|
| - total = 0
|
| - with open(filepath, 'wb') as f:
|
| - for d in content_generator:
|
| - total += len(d)
|
| - f.write(d)
|
| - return total
|
| -
|
| -
|
| -def zip_compress(content_generator, level=7):
|
| - """Reads chunks from |content_generator| and yields zip compressed chunks."""
|
| - compressor = zlib.compressobj(level)
|
| - for chunk in content_generator:
|
| - compressed = compressor.compress(chunk)
|
| - if compressed:
|
| - yield compressed
|
| - tail = compressor.flush(zlib.Z_FINISH)
|
| - if tail:
|
| - yield tail
|
| -
|
| -
|
| -def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
|
| - """Reads zipped data from |content_generator| and yields decompressed data.
|
| -
|
| - Decompresses data in small chunks (no larger than |chunk_size|) so that
|
| - zip bomb file doesn't cause zlib to preallocate huge amount of memory.
|
| -
|
| - Raises IOError if data is corrupted or incomplete.
|
| - """
|
| - decompressor = zlib.decompressobj()
|
| - compressed_size = 0
|
| - try:
|
| - for chunk in content_generator:
|
| - compressed_size += len(chunk)
|
| - data = decompressor.decompress(chunk, chunk_size)
|
| - if data:
|
| - yield data
|
| - while decompressor.unconsumed_tail:
|
| - data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
|
| - if data:
|
| - yield data
|
| - tail = decompressor.flush()
|
| - if tail:
|
| - yield tail
|
| - except zlib.error as e:
|
| - raise IOError(
|
| - 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
|
| - # Ensure all data was read and decompressed.
|
| - if decompressor.unused_data or decompressor.unconsumed_tail:
|
| - raise IOError('Not all data was decompressed')
|
| -
|
| -
|
| -def get_zip_compression_level(filename):
|
| - """Given a filename calculates the ideal zip compression level to use."""
|
| - file_ext = os.path.splitext(filename)[1].lower()
|
| - # TODO(csharp): Profile to find what compression level works best.
|
| - return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
|
| -
|
| -
|
| -def create_directories(base_directory, files):
|
| - """Creates the directory structure needed by the given list of files."""
|
| - logging.debug('create_directories(%s, %d)', base_directory, len(files))
|
| - # Creates the tree of directories to create.
|
| - directories = set(os.path.dirname(f) for f in files)
|
| - for item in list(directories):
|
| - while item:
|
| - directories.add(item)
|
| - item = os.path.dirname(item)
|
| - for d in sorted(directories):
|
| - if d:
|
| - os.mkdir(os.path.join(base_directory, d))
|
| -
|
| -
|
| -def create_links(base_directory, files):
|
| - """Creates any links needed by the given set of files."""
|
| - for filepath, properties in files:
|
| - if 'l' not in properties:
|
| - continue
|
| - if sys.platform == 'win32':
|
| - # TODO(maruel): Create junctions or empty text files similar to what
|
| - # cygwin do?
|
| - logging.warning('Ignoring symlink %s', filepath)
|
| - continue
|
| - outfile = os.path.join(base_directory, filepath)
|
| - # symlink doesn't exist on Windows. So the 'link' property should
|
| - # never be specified for windows .isolated file.
|
| - os.symlink(properties['l'], outfile) # pylint: disable=E1101
|
| - if 'm' in properties:
|
| - lchmod = getattr(os, 'lchmod', None)
|
| - if lchmod:
|
| - lchmod(outfile, properties['m'])
|
| -
|
| -
|
| -def is_valid_file(filepath, size):
|
| - """Determines if the given files appears valid.
|
| -
|
| - Currently it just checks the file's size.
|
| - """
|
| - if size == UNKNOWN_FILE_SIZE:
|
| - return os.path.isfile(filepath)
|
| - actual_size = os.stat(filepath).st_size
|
| - if size != actual_size:
|
| - logging.warning(
|
| - 'Found invalid item %s; %d != %d',
|
| - os.path.basename(filepath), actual_size, size)
|
| - return False
|
| - return True
|
| -
|
| -
|
| -class WorkerPool(threading_utils.AutoRetryThreadPool):
|
| - """Thread pool that automatically retries on IOError and runs a preconfigured
|
| - function.
|
| - """
|
| - # Initial and maximum number of worker threads.
|
| - INITIAL_WORKERS = 2
|
| - MAX_WORKERS = 16
|
| - RETRIES = 5
|
| -
|
| - def __init__(self):
|
| - super(WorkerPool, self).__init__(
|
| - [IOError],
|
| - self.RETRIES,
|
| - self.INITIAL_WORKERS,
|
| - self.MAX_WORKERS,
|
| - 0,
|
| - 'remote')
|
| -
|
| -
|
| -class Item(object):
|
| - """An item to push to Storage.
|
| -
|
| - It starts its life in a main thread, travels to 'contains' thread, then to
|
| - 'push' thread and then finally back to the main thread.
|
| -
|
| - It is never used concurrently from multiple threads.
|
| - """
|
| -
|
| - def __init__(self, digest, size, is_isolated=False):
|
| - self.digest = digest
|
| - self.size = size
|
| - self.is_isolated = is_isolated
|
| - self.compression_level = 6
|
| - self.push_state = None
|
| -
|
| - def content(self, chunk_size):
|
| - """Iterable with content of this item in chunks of given size.
|
| -
|
| - Arguments:
|
| - chunk_size: preferred size of the chunk to produce, may be ignored.
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| -
|
| -class FileItem(Item):
|
| - """A file to push to Storage."""
|
| -
|
| - def __init__(self, path, digest, size, is_isolated):
|
| - super(FileItem, self).__init__(digest, size, is_isolated)
|
| - self.path = path
|
| - self.compression_level = get_zip_compression_level(path)
|
| -
|
| - def content(self, chunk_size):
|
| - return file_read(self.path, chunk_size)
|
| -
|
| -
|
| -class BufferItem(Item):
|
| - """A byte buffer to push to Storage."""
|
| -
|
| - def __init__(self, buf, algo, is_isolated=False):
|
| - super(BufferItem, self).__init__(
|
| - algo(buf).hexdigest(), len(buf), is_isolated)
|
| - self.buffer = buf
|
| -
|
| - def content(self, _chunk_size):
|
| - return [self.buffer]
|
| -
|
| -
|
| -class Storage(object):
|
| - """Efficiently downloads or uploads large set of files via StorageApi."""
|
| -
|
| - def __init__(self, storage_api, use_zip):
|
| - self.use_zip = use_zip
|
| - self._storage_api = storage_api
|
| - self._cpu_thread_pool = None
|
| - self._net_thread_pool = None
|
| -
|
| - @property
|
| - def cpu_thread_pool(self):
|
| - """ThreadPool for CPU-bound tasks like zipping."""
|
| - if self._cpu_thread_pool is None:
|
| - self._cpu_thread_pool = threading_utils.ThreadPool(
|
| - 2, max(threading_utils.num_processors(), 2), 0, 'zip')
|
| - return self._cpu_thread_pool
|
| -
|
| - @property
|
| - def net_thread_pool(self):
|
| - """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
|
| - if self._net_thread_pool is None:
|
| - self._net_thread_pool = WorkerPool()
|
| - return self._net_thread_pool
|
| -
|
| - def close(self):
|
| - """Waits for all pending tasks to finish."""
|
| - if self._cpu_thread_pool:
|
| - self._cpu_thread_pool.join()
|
| - self._cpu_thread_pool.close()
|
| - self._cpu_thread_pool = None
|
| - if self._net_thread_pool:
|
| - self._net_thread_pool.join()
|
| - self._net_thread_pool.close()
|
| - self._net_thread_pool = None
|
| -
|
| - def __enter__(self):
|
| - """Context manager interface."""
|
| - return self
|
| -
|
| - def __exit__(self, _exc_type, _exc_value, _traceback):
|
| - """Context manager interface."""
|
| - self.close()
|
| - return False
|
| -
|
| - def upload_tree(self, indir, infiles):
|
| - """Uploads the given tree to the isolate server.
|
| -
|
| - Arguments:
|
| - indir: root directory the infiles are based in.
|
| - infiles: dict of files to upload from |indir|.
|
| -
|
| - Returns:
|
| - List of items that were uploaded. All other items are already there.
|
| - """
|
| - logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
|
| -
|
| - # Convert |indir| + |infiles| into a list of FileItem objects.
|
| - # Filter out symlinks, since they are not represented by items on isolate
|
| - # server side.
|
| - items = [
|
| - FileItem(
|
| - path=os.path.join(indir, filepath),
|
| - digest=metadata['h'],
|
| - size=metadata['s'],
|
| - is_isolated=metadata.get('priority') == '0')
|
| - for filepath, metadata in infiles.iteritems()
|
| - if 'l' not in metadata
|
| - ]
|
| -
|
| - return self.upload_items(items)
|
| -
|
| - def upload_items(self, items):
|
| - """Uploads bunch of items to the isolate server.
|
| -
|
| - Will upload only items that are missing.
|
| -
|
| - Arguments:
|
| - items: list of Item instances that represents data to upload.
|
| -
|
| - Returns:
|
| - List of items that were uploaded. All other items are already there.
|
| - """
|
| - # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
|
| - # used by swarming.py. There's no need to spawn multiple threads and try to
|
| - # do stuff in parallel: there's nothing to parallelize. 'contains' check and
|
| - # 'push' should be performed sequentially in the context of current thread.
|
| -
|
| - # For each digest keep only first Item that matches it. All other items
|
| - # are just indistinguishable copies from the point of view of isolate
|
| - # server (it doesn't care about paths at all, only content and digests).
|
| - seen = {}
|
| - duplicates = 0
|
| - for item in items:
|
| - if seen.setdefault(item.digest, item) is not item:
|
| - duplicates += 1
|
| - items = seen.values()
|
| - if duplicates:
|
| - logging.info('Skipped %d duplicated files', duplicates)
|
| -
|
| - # Enqueue all upload tasks.
|
| - missing = set()
|
| - channel = threading_utils.TaskChannel()
|
| - for missing_item in self.get_missing_items(items):
|
| - missing.add(missing_item)
|
| - self.async_push(
|
| - channel,
|
| - WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
|
| - missing_item)
|
| -
|
| - uploaded = []
|
| - # No need to spawn deadlock detector thread if there's nothing to upload.
|
| - if missing:
|
| - with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
|
| - # Wait for all started uploads to finish.
|
| - while len(uploaded) != len(missing):
|
| - detector.ping()
|
| - item = channel.pull()
|
| - uploaded.append(item)
|
| - logging.debug(
|
| - 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
|
| - logging.info('All files are uploaded')
|
| -
|
| - # Print stats.
|
| - total = len(items)
|
| - total_size = sum(f.size for f in items)
|
| - logging.info(
|
| - 'Total: %6d, %9.1fkb',
|
| - total,
|
| - total_size / 1024.)
|
| - cache_hit = set(items) - missing
|
| - cache_hit_size = sum(f.size for f in cache_hit)
|
| - logging.info(
|
| - 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
|
| - len(cache_hit),
|
| - cache_hit_size / 1024.,
|
| - len(cache_hit) * 100. / total,
|
| - cache_hit_size * 100. / total_size if total_size else 0)
|
| - cache_miss = missing
|
| - cache_miss_size = sum(f.size for f in cache_miss)
|
| - logging.info(
|
| - 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
|
| - len(cache_miss),
|
| - cache_miss_size / 1024.,
|
| - len(cache_miss) * 100. / total,
|
| - cache_miss_size * 100. / total_size if total_size else 0)
|
| -
|
| - return uploaded
|
| -
|
| - def get_fetch_url(self, digest):
|
| - """Returns an URL that can be used to fetch an item with given digest.
|
| -
|
| - Arguments:
|
| - digest: hex digest of item to fetch.
|
| -
|
| - Returns:
|
| - An URL or None if underlying protocol doesn't support this.
|
| - """
|
| - return self._storage_api.get_fetch_url(digest)
|
| -
|
| - def async_push(self, channel, priority, item):
|
| - """Starts asynchronous push to the server in a parallel thread.
|
| -
|
| - Arguments:
|
| - channel: TaskChannel that receives back |item| when upload ends.
|
| - priority: thread pool task priority for the push.
|
| - item: item to upload as instance of Item class.
|
| - """
|
| - def push(content):
|
| - """Pushes an item and returns its id, to pass as a result to |channel|."""
|
| - self._storage_api.push(item, content)
|
| - return item
|
| -
|
| - # If zipping is not required, just start a push task.
|
| - if not self.use_zip:
|
| - self.net_thread_pool.add_task_with_channel(channel, priority, push,
|
| - item.content(DISK_FILE_CHUNK))
|
| - return
|
| -
|
| - # If zipping is enabled, zip in a separate thread.
|
| - def zip_and_push():
|
| - # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
|
| - # content right here. It will block until all file is zipped.
|
| - try:
|
| - stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
|
| - item.compression_level)
|
| - data = ''.join(stream)
|
| - except Exception as exc:
|
| - logging.error('Failed to zip \'%s\': %s', item, exc)
|
| - channel.send_exception(exc)
|
| - return
|
| - self.net_thread_pool.add_task_with_channel(
|
| - channel, priority, push, [data])
|
| - self.cpu_thread_pool.add_task(priority, zip_and_push)
|
| -
|
| - def async_fetch(self, channel, priority, digest, size, sink):
|
| - """Starts asynchronous fetch from the server in a parallel thread.
|
| -
|
| - Arguments:
|
| - channel: TaskChannel that receives back |digest| when download ends.
|
| - priority: thread pool task priority for the fetch.
|
| - digest: hex digest of an item to download.
|
| - size: expected size of the item (after decompression).
|
| - sink: function that will be called as sink(generator).
|
| - """
|
| - def fetch():
|
| - try:
|
| - # Prepare reading pipeline.
|
| - stream = self._storage_api.fetch(digest)
|
| - if self.use_zip:
|
| - stream = zip_decompress(stream, DISK_FILE_CHUNK)
|
| - # Run |stream| through verifier that will assert its size.
|
| - verifier = FetchStreamVerifier(stream, size)
|
| - # Verified stream goes to |sink|.
|
| - sink(verifier.run())
|
| - except Exception as err:
|
| - logging.warning('Failed to fetch %s: %s', digest, err)
|
| - raise
|
| - return digest
|
| -
|
| - # Don't bother with zip_thread_pool for decompression. Decompression is
|
| - # really fast and most probably IO bound anyway.
|
| - self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
|
| -
|
| - def get_missing_items(self, items):
|
| - """Yields items that are missing from the server.
|
| -
|
| - Issues multiple parallel queries via StorageApi's 'contains' method.
|
| -
|
| - Arguments:
|
| - items: a list of Item objects to check.
|
| -
|
| - Yields:
|
| - Item objects that are missing from the server.
|
| - """
|
| - channel = threading_utils.TaskChannel()
|
| - pending = 0
|
| - # Enqueue all requests.
|
| - for batch in self.batch_items_for_check(items):
|
| - self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
|
| - self._storage_api.contains, batch)
|
| - pending += 1
|
| - # Yield results as they come in.
|
| - for _ in xrange(pending):
|
| - for missing in channel.pull():
|
| - yield missing
|
| -
|
| - @staticmethod
|
| - def batch_items_for_check(items):
|
| - """Splits list of items to check for existence on the server into batches.
|
| -
|
| - Each batch corresponds to a single 'exists?' query to the server via a call
|
| - to StorageApi's 'contains' method.
|
| -
|
| - Arguments:
|
| - items: a list of Item objects.
|
| -
|
| - Yields:
|
| - Batches of items to query for existence in a single operation,
|
| - each batch is a list of Item objects.
|
| - """
|
| - batch_count = 0
|
| - batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
|
| - next_queries = []
|
| - for item in sorted(items, key=lambda x: x.size, reverse=True):
|
| - next_queries.append(item)
|
| - if len(next_queries) == batch_size_limit:
|
| - yield next_queries
|
| - next_queries = []
|
| - batch_count += 1
|
| - batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
|
| - min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
|
| - if next_queries:
|
| - yield next_queries
|
| -
|
| -
|
| -class FetchQueue(object):
|
| - """Fetches items from Storage and places them into LocalCache.
|
| -
|
| - It manages multiple concurrent fetch operations. Acts as a bridge between
|
| - Storage and LocalCache so that Storage and LocalCache don't depend on each
|
| - other at all.
|
| - """
|
| -
|
| - def __init__(self, storage, cache):
|
| - self.storage = storage
|
| - self.cache = cache
|
| - self._channel = threading_utils.TaskChannel()
|
| - self._pending = set()
|
| - self._accessed = set()
|
| - self._fetched = cache.cached_set()
|
| -
|
| - def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
|
| - """Starts asynchronous fetch of item |digest|."""
|
| - # Fetching it now?
|
| - if digest in self._pending:
|
| - return
|
| -
|
| - # Mark this file as in use, verify_all_cached will later ensure it is still
|
| - # in cache.
|
| - self._accessed.add(digest)
|
| -
|
| - # Already fetched? Notify cache to update item's LRU position.
|
| - if digest in self._fetched:
|
| - # 'touch' returns True if item is in cache and not corrupted.
|
| - if self.cache.touch(digest, size):
|
| - return
|
| - # Item is corrupted, remove it from cache and fetch it again.
|
| - self._fetched.remove(digest)
|
| - self.cache.evict(digest)
|
| -
|
| - # TODO(maruel): It should look at the free disk space, the current cache
|
| - # size and the size of the new item on every new item:
|
| - # - Trim the cache as more entries are listed when free disk space is low,
|
| - # otherwise if the amount of data downloaded during the run > free disk
|
| - # space, it'll crash.
|
| - # - Make sure there's enough free disk space to fit all dependencies of
|
| - # this run! If not, abort early.
|
| -
|
| - # Start fetching.
|
| - self._pending.add(digest)
|
| - self.storage.async_fetch(
|
| - self._channel, priority, digest, size,
|
| - functools.partial(self.cache.write, digest))
|
| -
|
| - def wait(self, digests):
|
| - """Starts a loop that waits for at least one of |digests| to be retrieved.
|
| -
|
| - Returns the first digest retrieved.
|
| - """
|
| - # Flush any already fetched items.
|
| - for digest in digests:
|
| - if digest in self._fetched:
|
| - return digest
|
| -
|
| - # Ensure all requested items are being fetched now.
|
| - assert all(digest in self._pending for digest in digests), (
|
| - digests, self._pending)
|
| -
|
| - # Wait for some requested item to finish fetching.
|
| - while self._pending:
|
| - digest = self._channel.pull()
|
| - self._pending.remove(digest)
|
| - self._fetched.add(digest)
|
| - if digest in digests:
|
| - return digest
|
| -
|
| - # Should never reach this point due to assert above.
|
| - raise RuntimeError('Impossible state')
|
| -
|
| - def inject_local_file(self, path, algo):
|
| - """Adds local file to the cache as if it was fetched from storage."""
|
| - with open(path, 'rb') as f:
|
| - data = f.read()
|
| - digest = algo(data).hexdigest()
|
| - self.cache.write(digest, [data])
|
| - self._fetched.add(digest)
|
| - return digest
|
| -
|
| - @property
|
| - def pending_count(self):
|
| - """Returns number of items to be fetched."""
|
| - return len(self._pending)
|
| -
|
| - def verify_all_cached(self):
|
| - """True if all accessed items are in cache."""
|
| - return self._accessed.issubset(self.cache.cached_set())
|
| -
|
| -
|
| -class FetchStreamVerifier(object):
|
| - """Verifies that fetched file is valid before passing it to the LocalCache."""
|
| -
|
| - def __init__(self, stream, expected_size):
|
| - self.stream = stream
|
| - self.expected_size = expected_size
|
| - self.current_size = 0
|
| -
|
| - def run(self):
|
| - """Generator that yields same items as |stream|.
|
| -
|
| - Verifies |stream| is complete before yielding a last chunk to consumer.
|
| -
|
| - Also wraps IOError produced by consumer into MappingError exceptions since
|
| - otherwise Storage will retry fetch on unrelated local cache errors.
|
| - """
|
| - # Read one chunk ahead, keep it in |stored|.
|
| - # That way a complete stream can be verified before pushing last chunk
|
| - # to consumer.
|
| - stored = None
|
| - for chunk in self.stream:
|
| - assert chunk is not None
|
| - if stored is not None:
|
| - self._inspect_chunk(stored, is_last=False)
|
| - try:
|
| - yield stored
|
| - except IOError as exc:
|
| - raise MappingError('Failed to store an item in cache: %s' % exc)
|
| - stored = chunk
|
| - if stored is not None:
|
| - self._inspect_chunk(stored, is_last=True)
|
| - try:
|
| - yield stored
|
| - except IOError as exc:
|
| - raise MappingError('Failed to store an item in cache: %s' % exc)
|
| -
|
| - def _inspect_chunk(self, chunk, is_last):
|
| - """Called for each fetched chunk before passing it to consumer."""
|
| - self.current_size += len(chunk)
|
| - if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
|
| - (self.expected_size != self.current_size)):
|
| - raise IOError('Incorrect file size: expected %d, got %d' % (
|
| - self.expected_size, self.current_size))
|
| -
|
| -
|
| -class StorageApi(object):
|
| - """Interface for classes that implement low-level storage operations."""
|
| -
|
| - def get_fetch_url(self, digest):
|
| - """Returns an URL that can be used to fetch an item with given digest.
|
| -
|
| - Arguments:
|
| - digest: hex digest of item to fetch.
|
| -
|
| - Returns:
|
| - An URL or None if the protocol doesn't support this.
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| - def fetch(self, digest):
|
| - """Fetches an object and yields its content.
|
| -
|
| - Arguments:
|
| - digest: hash digest of item to download.
|
| -
|
| - Yields:
|
| - Chunks of downloaded item (as str objects).
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| - def push(self, item, content):
|
| - """Uploads an |item| with content generated by |content| generator.
|
| -
|
| - Arguments:
|
| - item: Item object that holds information about an item being pushed.
|
| - content: a generator that yields chunks to push.
|
| -
|
| - Returns:
|
| - None.
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| - def contains(self, items):
|
| - """Checks for existence of given |items| on the server.
|
| -
|
| - Mutates |items| by assigning opaque implement specific object to Item's
|
| - push_state attribute on missing entries in the datastore.
|
| -
|
| - Arguments:
|
| - items: list of Item objects.
|
| -
|
| - Returns:
|
| - A list of items missing on server as a list of Item objects.
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| -
|
| -class IsolateServer(StorageApi):
|
| - """StorageApi implementation that downloads and uploads to Isolate Server.
|
| -
|
| - It uploads and downloads directly from Google Storage whenever appropriate.
|
| - """
|
| -
|
| - class _PushState(object):
|
| - """State needed to call .push(), to be stored in Item.push_state."""
|
| - def __init__(self, upload_url, finalize_url):
|
| - self.upload_url = upload_url
|
| - self.finalize_url = finalize_url
|
| - self.uploaded = False
|
| - self.finalized = False
|
| -
|
| - def __init__(self, base_url, namespace):
|
| - super(IsolateServer, self).__init__()
|
| - assert base_url.startswith('http'), base_url
|
| - self.base_url = base_url.rstrip('/')
|
| - self.namespace = namespace
|
| - self._lock = threading.Lock()
|
| - self._server_caps = None
|
| -
|
| - @staticmethod
|
| - def _generate_handshake_request():
|
| - """Returns a dict to be sent as handshake request body."""
|
| - # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
|
| - return {
|
| - 'client_app_version': __version__,
|
| - 'fetcher': True,
|
| - 'protocol_version': ISOLATE_PROTOCOL_VERSION,
|
| - 'pusher': True,
|
| - }
|
| -
|
| - @staticmethod
|
| - def _validate_handshake_response(caps):
|
| - """Validates and normalizes handshake response."""
|
| - logging.info('Protocol version: %s', caps['protocol_version'])
|
| - logging.info('Server version: %s', caps['server_app_version'])
|
| - if caps.get('error'):
|
| - raise MappingError(caps['error'])
|
| - if not caps['access_token']:
|
| - raise ValueError('access_token is missing')
|
| - return caps
|
| -
|
| - @property
|
| - def _server_capabilities(self):
|
| - """Performs handshake with the server if not yet done.
|
| -
|
| - Returns:
|
| - Server capabilities dictionary as returned by /handshake endpoint.
|
| -
|
| - Raises:
|
| - MappingError if server rejects the handshake.
|
| - """
|
| - # TODO(maruel): Make this request much earlier asynchronously while the
|
| - # files are being enumerated.
|
| - with self._lock:
|
| - if self._server_caps is None:
|
| - request_body = json.dumps(
|
| - self._generate_handshake_request(), separators=(',', ':'))
|
| - response = net.url_read(
|
| - url=self.base_url + '/content-gs/handshake',
|
| - data=request_body,
|
| - content_type='application/json',
|
| - method='POST')
|
| - if response is None:
|
| - raise MappingError('Failed to perform handshake.')
|
| - try:
|
| - caps = json.loads(response)
|
| - if not isinstance(caps, dict):
|
| - raise ValueError('Expecting JSON dict')
|
| - self._server_caps = self._validate_handshake_response(caps)
|
| - except (ValueError, KeyError, TypeError) as exc:
|
| - # KeyError exception has very confusing str conversion: it's just a
|
| - # missing key value and nothing else. So print exception class name
|
| - # as well.
|
| - raise MappingError('Invalid handshake response (%s): %s' % (
|
| - exc.__class__.__name__, exc))
|
| - return self._server_caps
|
| -
|
| - def get_fetch_url(self, digest):
|
| - assert isinstance(digest, basestring)
|
| - return '%s/content-gs/retrieve/%s/%s' % (
|
| - self.base_url, self.namespace, digest)
|
| -
|
| - def fetch(self, digest):
|
| - source_url = self.get_fetch_url(digest)
|
| - logging.debug('download_file(%s)', source_url)
|
| -
|
| - # Because the app engine DB is only eventually consistent, retry 404 errors
|
| - # because the file might just not be visible yet (even though it has been
|
| - # uploaded).
|
| - connection = net.url_open(
|
| - source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
|
| - if not connection:
|
| - raise IOError('Unable to open connection to %s' % source_url)
|
| - return stream_read(connection, NET_IO_FILE_CHUNK)
|
| -
|
| - def push(self, item, content):
|
| - assert isinstance(item, Item)
|
| - assert isinstance(item.push_state, IsolateServer._PushState)
|
| - assert not item.push_state.finalized
|
| -
|
| - # TODO(vadimsh): Do not read from |content| generator when retrying push.
|
| - # If |content| is indeed a generator, it can not be re-winded back
|
| - # to the beginning of the stream. A retry will find it exhausted. A possible
|
| - # solution is to wrap |content| generator with some sort of caching
|
| - # restartable generator. It should be done alongside streaming support
|
| - # implementation.
|
| -
|
| - # This push operation may be a retry after failed finalization call below,
|
| - # no need to reupload contents in that case.
|
| - if not item.push_state.uploaded:
|
| - # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
|
| - # upload support is implemented.
|
| - if isinstance(content, list) and len(content) == 1:
|
| - content = content[0]
|
| - else:
|
| - content = ''.join(content)
|
| - # PUT file to |upload_url|.
|
| - response = net.url_read(
|
| - url=item.push_state.upload_url,
|
| - data=content,
|
| - content_type='application/octet-stream',
|
| - method='PUT')
|
| - if response is None:
|
| - raise IOError('Failed to upload a file %s to %s' % (
|
| - item.digest, item.push_state.upload_url))
|
| - item.push_state.uploaded = True
|
| - else:
|
| - logging.info(
|
| - 'A file %s already uploaded, retrying finalization only', item.digest)
|
| -
|
| - # Optionally notify the server that it's done.
|
| - if item.push_state.finalize_url:
|
| - # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
|
| - # send it to isolated server. That way isolate server can verify that
|
| - # the data safely reached Google Storage (GS provides MD5 and CRC32C of
|
| - # stored files).
|
| - response = net.url_read(
|
| - url=item.push_state.finalize_url,
|
| - data='',
|
| - content_type='application/json',
|
| - method='POST')
|
| - if response is None:
|
| - raise IOError('Failed to finalize an upload of %s' % item.digest)
|
| - item.push_state.finalized = True
|
| -
|
| - def contains(self, items):
|
| - logging.info('Checking existence of %d files...', len(items))
|
| -
|
| - # Request body is a json encoded list of dicts.
|
| - body = [
|
| - {
|
| - 'h': item.digest,
|
| - 's': item.size,
|
| - 'i': int(item.is_isolated),
|
| - } for item in items
|
| - ]
|
| -
|
| - query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
|
| - self.base_url,
|
| - self.namespace,
|
| - urllib.quote(self._server_capabilities['access_token']))
|
| - response_body = net.url_read(
|
| - url=query_url,
|
| - data=json.dumps(body, separators=(',', ':')),
|
| - content_type='application/json',
|
| - method='POST')
|
| - if response_body is None:
|
| - raise MappingError('Failed to execute /pre-upload query')
|
| -
|
| - # Response body is a list of push_urls (or null if file is already present).
|
| - try:
|
| - response = json.loads(response_body)
|
| - if not isinstance(response, list):
|
| - raise ValueError('Expecting response with json-encoded list')
|
| - if len(response) != len(items):
|
| - raise ValueError(
|
| - 'Incorrect number of items in the list, expected %d, '
|
| - 'but got %d' % (len(items), len(response)))
|
| - except ValueError as err:
|
| - raise MappingError(
|
| - 'Invalid response from server: %s, body is %s' % (err, response_body))
|
| -
|
| - # Pick Items that are missing, attach _PushState to them.
|
| - missing_items = []
|
| - for i, push_urls in enumerate(response):
|
| - if push_urls:
|
| - assert len(push_urls) == 2, str(push_urls)
|
| - item = items[i]
|
| - assert item.push_state is None
|
| - item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
|
| - missing_items.append(item)
|
| - logging.info('Queried %d files, %d cache hit',
|
| - len(items), len(items) - len(missing_items))
|
| - return missing_items
|
| -
|
| -
|
| -class FileSystem(StorageApi):
|
| - """StorageApi implementation that fetches data from the file system.
|
| -
|
| - The common use case is a NFS/CIFS file server that is mounted locally that is
|
| - used to fetch the file on a local partition.
|
| - """
|
| -
|
| - def __init__(self, base_path):
|
| - super(FileSystem, self).__init__()
|
| - self.base_path = base_path
|
| -
|
| - def get_fetch_url(self, digest):
|
| - return None
|
| -
|
| - def fetch(self, digest):
|
| - assert isinstance(digest, basestring)
|
| - return file_read(os.path.join(self.base_path, digest))
|
| -
|
| - def push(self, item, content):
|
| - assert isinstance(item, Item)
|
| - file_write(os.path.join(self.base_path, item.digest), content)
|
| -
|
| - def contains(self, items):
|
| - return [
|
| - item for item in items
|
| - if not os.path.exists(os.path.join(self.base_path, item.digest))
|
| - ]
|
| -
|
| -
|
| -class LocalCache(object):
|
| - """Local cache that stores objects fetched via Storage.
|
| -
|
| - It can be accessed concurrently from multiple threads, so it should protect
|
| - its internal state with some lock.
|
| - """
|
| -
|
| - def __enter__(self):
|
| - """Context manager interface."""
|
| - return self
|
| -
|
| - def __exit__(self, _exc_type, _exec_value, _traceback):
|
| - """Context manager interface."""
|
| - return False
|
| -
|
| - def cached_set(self):
|
| - """Returns a set of all cached digests (always a new object)."""
|
| - raise NotImplementedError()
|
| -
|
| - def touch(self, digest, size):
|
| - """Ensures item is not corrupted and updates its LRU position.
|
| -
|
| - Arguments:
|
| - digest: hash digest of item to check.
|
| - size: expected size of this item.
|
| -
|
| - Returns:
|
| - True if item is in cache and not corrupted.
|
| - """
|
| - raise NotImplementedError()
|
| -
|
| - def evict(self, digest):
|
| - """Removes item from cache if it's there."""
|
| - raise NotImplementedError()
|
| -
|
| - def read(self, digest):
|
| - """Returns contents of the cached item as a single str."""
|
| - raise NotImplementedError()
|
| -
|
| - def write(self, digest, content):
|
| - """Reads data from |content| generator and stores it in cache."""
|
| - raise NotImplementedError()
|
| -
|
| - def link(self, digest, dest, file_mode=None):
|
| - """Ensures file at |dest| has same content as cached |digest|."""
|
| - raise NotImplementedError()
|
| -
|
| -
|
| -class MemoryCache(LocalCache):
|
| - """LocalCache implementation that stores everything in memory."""
|
| -
|
| - def __init__(self):
|
| - super(MemoryCache, self).__init__()
|
| - # Let's not assume dict is thread safe.
|
| - self._lock = threading.Lock()
|
| - self._contents = {}
|
| -
|
| - def cached_set(self):
|
| - with self._lock:
|
| - return set(self._contents)
|
| -
|
| - def touch(self, digest, size):
|
| - with self._lock:
|
| - return digest in self._contents
|
| -
|
| - def evict(self, digest):
|
| - with self._lock:
|
| - self._contents.pop(digest, None)
|
| -
|
| - def read(self, digest):
|
| - with self._lock:
|
| - return self._contents[digest]
|
| -
|
| - def write(self, digest, content):
|
| - # Assemble whole stream before taking the lock.
|
| - data = ''.join(content)
|
| - with self._lock:
|
| - self._contents[digest] = data
|
| -
|
| - def link(self, digest, dest, file_mode=None):
|
| - file_write(dest, [self.read(digest)])
|
| - if file_mode is not None:
|
| - os.chmod(dest, file_mode)
|
| -
|
| -
|
| -def get_hash_algo(_namespace):
|
| - """Return hash algorithm class to use when uploading to given |namespace|."""
|
| - # TODO(vadimsh): Implement this at some point.
|
| - return hashlib.sha1
|
| -
|
| -
|
| -def is_namespace_with_compression(namespace):
|
| - """Returns True if given |namespace| stores compressed objects."""
|
| - return namespace.endswith(('-gzip', '-deflate'))
|
| -
|
| -
|
| -def get_storage_api(file_or_url, namespace):
|
| - """Returns an object that implements StorageApi interface."""
|
| - if re.match(r'^https?://.+$', file_or_url):
|
| - return IsolateServer(file_or_url, namespace)
|
| - else:
|
| - return FileSystem(file_or_url)
|
| -
|
| -
|
| -def get_storage(file_or_url, namespace):
|
| - """Returns Storage class configured with appropriate StorageApi instance."""
|
| - return Storage(
|
| - get_storage_api(file_or_url, namespace),
|
| - is_namespace_with_compression(namespace))
|
| -
|
| -
|
| -def upload_tree(base_url, indir, infiles, namespace):
|
| - """Uploads the given tree to the given url.
|
| -
|
| - Arguments:
|
| - base_url: The base url, it is assume that |base_url|/has/ can be used to
|
| - query if an element was already uploaded, and |base_url|/store/
|
| - can be used to upload a new element.
|
| - indir: Root directory the infiles are based in.
|
| - infiles: dict of files to upload from |indir| to |base_url|.
|
| - namespace: The namespace to use on the server.
|
| - """
|
| - with get_storage(base_url, namespace) as storage:
|
| - storage.upload_tree(indir, infiles)
|
| - return 0
|
| -
|
| -
|
| -def load_isolated(content, os_flavor, algo):
|
| - """Verifies the .isolated file is valid and loads this object with the json
|
| - data.
|
| -
|
| - Arguments:
|
| - - content: raw serialized content to load.
|
| - - os_flavor: OS to load this file on. Optional.
|
| - - algo: hashlib algorithm class. Used to confirm the algorithm matches the
|
| - algorithm used on the Isolate Server.
|
| - """
|
| - try:
|
| - data = json.loads(content)
|
| - except ValueError:
|
| - raise ConfigError('Failed to parse: %s...' % content[:100])
|
| -
|
| - if not isinstance(data, dict):
|
| - raise ConfigError('Expected dict, got %r' % data)
|
| -
|
| - # Check 'version' first, since it could modify the parsing after.
|
| - value = data.get('version', '1.0')
|
| - if not isinstance(value, basestring):
|
| - raise ConfigError('Expected string, got %r' % value)
|
| - if not re.match(r'^(\d+)\.(\d+)$', value):
|
| - raise ConfigError('Expected a compatible version, got %r' % value)
|
| - if value.split('.', 1)[0] != '1':
|
| - raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
|
| -
|
| - if algo is None:
|
| - # Default the algorithm used in the .isolated file itself, falls back to
|
| - # 'sha-1' if unspecified.
|
| - algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
|
| -
|
| - for key, value in data.iteritems():
|
| - if key == 'algo':
|
| - if not isinstance(value, basestring):
|
| - raise ConfigError('Expected string, got %r' % value)
|
| - if value not in SUPPORTED_ALGOS:
|
| - raise ConfigError(
|
| - 'Expected one of \'%s\', got %r' %
|
| - (', '.join(sorted(SUPPORTED_ALGOS)), value))
|
| - if value != SUPPORTED_ALGOS_REVERSE[algo]:
|
| - raise ConfigError(
|
| - 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
|
| -
|
| - elif key == 'command':
|
| - if not isinstance(value, list):
|
| - raise ConfigError('Expected list, got %r' % value)
|
| - if not value:
|
| - raise ConfigError('Expected non-empty command')
|
| - for subvalue in value:
|
| - if not isinstance(subvalue, basestring):
|
| - raise ConfigError('Expected string, got %r' % subvalue)
|
| -
|
| - elif key == 'files':
|
| - if not isinstance(value, dict):
|
| - raise ConfigError('Expected dict, got %r' % value)
|
| - for subkey, subvalue in value.iteritems():
|
| - if not isinstance(subkey, basestring):
|
| - raise ConfigError('Expected string, got %r' % subkey)
|
| - if not isinstance(subvalue, dict):
|
| - raise ConfigError('Expected dict, got %r' % subvalue)
|
| - for subsubkey, subsubvalue in subvalue.iteritems():
|
| - if subsubkey == 'l':
|
| - if not isinstance(subsubvalue, basestring):
|
| - raise ConfigError('Expected string, got %r' % subsubvalue)
|
| - elif subsubkey == 'm':
|
| - if not isinstance(subsubvalue, int):
|
| - raise ConfigError('Expected int, got %r' % subsubvalue)
|
| - elif subsubkey == 'h':
|
| - if not is_valid_hash(subsubvalue, algo):
|
| - raise ConfigError('Expected sha-1, got %r' % subsubvalue)
|
| - elif subsubkey == 's':
|
| - if not isinstance(subsubvalue, int):
|
| - raise ConfigError('Expected int, got %r' % subsubvalue)
|
| - else:
|
| - raise ConfigError('Unknown subsubkey %s' % subsubkey)
|
| - if bool('h' in subvalue) == bool('l' in subvalue):
|
| - raise ConfigError(
|
| - 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
|
| - subvalue)
|
| - if bool('h' in subvalue) != bool('s' in subvalue):
|
| - raise ConfigError(
|
| - 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
|
| - subvalue)
|
| - if bool('s' in subvalue) == bool('l' in subvalue):
|
| - raise ConfigError(
|
| - 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
|
| - subvalue)
|
| - if bool('l' in subvalue) and bool('m' in subvalue):
|
| - raise ConfigError(
|
| - 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
|
| - subvalue)
|
| -
|
| - elif key == 'includes':
|
| - if not isinstance(value, list):
|
| - raise ConfigError('Expected list, got %r' % value)
|
| - if not value:
|
| - raise ConfigError('Expected non-empty includes list')
|
| - for subvalue in value:
|
| - if not is_valid_hash(subvalue, algo):
|
| - raise ConfigError('Expected sha-1, got %r' % subvalue)
|
| -
|
| - elif key == 'read_only':
|
| - if not isinstance(value, bool):
|
| - raise ConfigError('Expected bool, got %r' % value)
|
| -
|
| - elif key == 'relative_cwd':
|
| - if not isinstance(value, basestring):
|
| - raise ConfigError('Expected string, got %r' % value)
|
| -
|
| - elif key == 'os':
|
| - if os_flavor and value != os_flavor:
|
| - raise ConfigError(
|
| - 'Expected \'os\' to be \'%s\' but got \'%s\'' %
|
| - (os_flavor, value))
|
| -
|
| - elif key == 'version':
|
| - # Already checked above.
|
| - pass
|
| -
|
| - else:
|
| - raise ConfigError('Unknown key %r' % key)
|
| -
|
| - # Automatically fix os.path.sep if necessary. While .isolated files are always
|
| - # in the the native path format, someone could want to download an .isolated
|
| - # tree from another OS.
|
| - wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
|
| - if 'files' in data:
|
| - data['files'] = dict(
|
| - (k.replace(wrong_path_sep, os.path.sep), v)
|
| - for k, v in data['files'].iteritems())
|
| - for v in data['files'].itervalues():
|
| - if 'l' in v:
|
| - v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
|
| - if 'relative_cwd' in data:
|
| - data['relative_cwd'] = data['relative_cwd'].replace(
|
| - wrong_path_sep, os.path.sep)
|
| - return data
|
| -
|
| -
|
| -class IsolatedFile(object):
|
| - """Represents a single parsed .isolated file."""
|
| - def __init__(self, obj_hash, algo):
|
| - """|obj_hash| is really the sha-1 of the file."""
|
| - logging.debug('IsolatedFile(%s)' % obj_hash)
|
| - self.obj_hash = obj_hash
|
| - self.algo = algo
|
| - # Set once all the left-side of the tree is parsed. 'Tree' here means the
|
| - # .isolate and all the .isolated files recursively included by it with
|
| - # 'includes' key. The order of each sha-1 in 'includes', each representing a
|
| - # .isolated file in the hash table, is important, as the later ones are not
|
| - # processed until the firsts are retrieved and read.
|
| - self.can_fetch = False
|
| -
|
| - # Raw data.
|
| - self.data = {}
|
| - # A IsolatedFile instance, one per object in self.includes.
|
| - self.children = []
|
| -
|
| - # Set once the .isolated file is loaded.
|
| - self._is_parsed = False
|
| - # Set once the files are fetched.
|
| - self.files_fetched = False
|
| -
|
| - def load(self, os_flavor, content):
|
| - """Verifies the .isolated file is valid and loads this object with the json
|
| - data.
|
| - """
|
| - logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
|
| - assert not self._is_parsed
|
| - self.data = load_isolated(content, os_flavor, self.algo)
|
| - self.children = [
|
| - IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
|
| - ]
|
| - self._is_parsed = True
|
| -
|
| - def fetch_files(self, fetch_queue, files):
|
| - """Adds files in this .isolated file not present in |files| dictionary.
|
| -
|
| - Preemptively request files.
|
| -
|
| - Note that |files| is modified by this function.
|
| - """
|
| - assert self.can_fetch
|
| - if not self._is_parsed or self.files_fetched:
|
| - return
|
| - logging.debug('fetch_files(%s)' % self.obj_hash)
|
| - for filepath, properties in self.data.get('files', {}).iteritems():
|
| - # Root isolated has priority on the files being mapped. In particular,
|
| - # overriden files must not be fetched.
|
| - if filepath not in files:
|
| - files[filepath] = properties
|
| - if 'h' in properties:
|
| - # Preemptively request files.
|
| - logging.debug('fetching %s' % filepath)
|
| - fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
|
| - self.files_fetched = True
|
| -
|
| -
|
| -class Settings(object):
|
| - """Results of a completely parsed .isolated file."""
|
| - def __init__(self):
|
| - self.command = []
|
| - self.files = {}
|
| - self.read_only = None
|
| - self.relative_cwd = None
|
| - # The main .isolated file, a IsolatedFile instance.
|
| - self.root = None
|
| -
|
| - def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
|
| - """Loads the .isolated and all the included .isolated asynchronously.
|
| -
|
| - It enables support for "included" .isolated files. They are processed in
|
| - strict order but fetched asynchronously from the cache. This is important so
|
| - that a file in an included .isolated file that is overridden by an embedding
|
| - .isolated file is not fetched needlessly. The includes are fetched in one
|
| - pass and the files are fetched as soon as all the ones on the left-side
|
| - of the tree were fetched.
|
| -
|
| - The prioritization is very important here for nested .isolated files.
|
| - 'includes' have the highest priority and the algorithm is optimized for both
|
| - deep and wide trees. A deep one is a long link of .isolated files referenced
|
| - one at a time by one item in 'includes'. A wide one has a large number of
|
| - 'includes' in a single .isolated file. 'left' is defined as an included
|
| - .isolated file earlier in the 'includes' list. So the order of the elements
|
| - in 'includes' is important.
|
| - """
|
| - self.root = IsolatedFile(root_isolated_hash, algo)
|
| -
|
| - # Isolated files being retrieved now: hash -> IsolatedFile instance.
|
| - pending = {}
|
| - # Set of hashes of already retrieved items to refuse recursive includes.
|
| - seen = set()
|
| -
|
| - def retrieve(isolated_file):
|
| - h = isolated_file.obj_hash
|
| - if h in seen:
|
| - raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
|
| - assert h not in pending
|
| - seen.add(h)
|
| - pending[h] = isolated_file
|
| - fetch_queue.add(WorkerPool.HIGH, h)
|
| -
|
| - retrieve(self.root)
|
| -
|
| - while pending:
|
| - item_hash = fetch_queue.wait(pending)
|
| - item = pending.pop(item_hash)
|
| - item.load(os_flavor, fetch_queue.cache.read(item_hash))
|
| - if item_hash == root_isolated_hash:
|
| - # It's the root item.
|
| - item.can_fetch = True
|
| -
|
| - for new_child in item.children:
|
| - retrieve(new_child)
|
| -
|
| - # Traverse the whole tree to see if files can now be fetched.
|
| - self._traverse_tree(fetch_queue, self.root)
|
| -
|
| - def check(n):
|
| - return all(check(x) for x in n.children) and n.files_fetched
|
| - assert check(self.root)
|
| -
|
| - self.relative_cwd = self.relative_cwd or ''
|
| - self.read_only = self.read_only or False
|
| -
|
| - def _traverse_tree(self, fetch_queue, node):
|
| - if node.can_fetch:
|
| - if not node.files_fetched:
|
| - self._update_self(fetch_queue, node)
|
| - will_break = False
|
| - for i in node.children:
|
| - if not i.can_fetch:
|
| - if will_break:
|
| - break
|
| - # Automatically mark the first one as fetcheable.
|
| - i.can_fetch = True
|
| - will_break = True
|
| - self._traverse_tree(fetch_queue, i)
|
| -
|
| - def _update_self(self, fetch_queue, node):
|
| - node.fetch_files(fetch_queue, self.files)
|
| - # Grabs properties.
|
| - if not self.command and node.data.get('command'):
|
| - # Ensure paths are correctly separated on windows.
|
| - self.command = node.data['command']
|
| - if self.command:
|
| - self.command[0] = self.command[0].replace('/', os.path.sep)
|
| - self.command = tools.fix_python_path(self.command)
|
| - if self.read_only is None and node.data.get('read_only') is not None:
|
| - self.read_only = node.data['read_only']
|
| - if (self.relative_cwd is None and
|
| - node.data.get('relative_cwd') is not None):
|
| - self.relative_cwd = node.data['relative_cwd']
|
| -
|
| -
|
| -def fetch_isolated(
|
| - isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
|
| - """Aggressively downloads the .isolated file(s), then download all the files.
|
| -
|
| - Arguments:
|
| - isolated_hash: hash of the root *.isolated file.
|
| - storage: Storage class that communicates with isolate storage.
|
| - cache: LocalCache class that knows how to store and map files locally.
|
| - algo: hash algorithm to use.
|
| - outdir: Output directory to map file tree to.
|
| - os_flavor: OS flavor to choose when reading sections of *.isolated file.
|
| - require_command: Ensure *.isolated specifies a command to run.
|
| -
|
| - Returns:
|
| - Settings object that holds details about loaded *.isolated file.
|
| - """
|
| - with cache:
|
| - fetch_queue = FetchQueue(storage, cache)
|
| - settings = Settings()
|
| -
|
| - with tools.Profiler('GetIsolateds'):
|
| - # Optionally support local files by manually adding them to cache.
|
| - if not is_valid_hash(isolated_hash, algo):
|
| - isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
|
| -
|
| - # Load all *.isolated and start loading rest of the files.
|
| - settings.load(fetch_queue, isolated_hash, os_flavor, algo)
|
| - if require_command and not settings.command:
|
| - # TODO(vadimsh): All fetch operations are already enqueue and there's no
|
| - # easy way to cancel them.
|
| - raise ConfigError('No command to run')
|
| -
|
| - with tools.Profiler('GetRest'):
|
| - # Create file system hierarchy.
|
| - if not os.path.isdir(outdir):
|
| - os.makedirs(outdir)
|
| - create_directories(outdir, settings.files)
|
| - create_links(outdir, settings.files.iteritems())
|
| -
|
| - # Ensure working directory exists.
|
| - cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
|
| - if not os.path.isdir(cwd):
|
| - os.makedirs(cwd)
|
| -
|
| - # Multimap: digest -> list of pairs (path, props).
|
| - remaining = {}
|
| - for filepath, props in settings.files.iteritems():
|
| - if 'h' in props:
|
| - remaining.setdefault(props['h'], []).append((filepath, props))
|
| -
|
| - # Now block on the remaining files to be downloaded and mapped.
|
| - logging.info('Retrieving remaining files (%d of them)...',
|
| - fetch_queue.pending_count)
|
| - last_update = time.time()
|
| - with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
|
| - while remaining:
|
| - detector.ping()
|
| -
|
| - # Wait for any item to finish fetching to cache.
|
| - digest = fetch_queue.wait(remaining)
|
| -
|
| - # Link corresponding files to a fetched item in cache.
|
| - for filepath, props in remaining.pop(digest):
|
| - cache.link(digest, os.path.join(outdir, filepath), props.get('m'))
|
| -
|
| - # Report progress.
|
| - duration = time.time() - last_update
|
| - if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
|
| - msg = '%d files remaining...' % len(remaining)
|
| - print msg
|
| - logging.info(msg)
|
| - last_update = time.time()
|
| -
|
| - # Cache could evict some items we just tried to fetch, it's a fatal error.
|
| - if not fetch_queue.verify_all_cached():
|
| - raise MappingError('Cache is too small to hold all requested files')
|
| - return settings
|
| -
|
| -
|
| -@subcommand.usage('<file1..fileN> or - to read from stdin')
|
| -def CMDarchive(parser, args):
|
| - """Archives data to the server."""
|
| - options, files = parser.parse_args(args)
|
| -
|
| - if files == ['-']:
|
| - files = sys.stdin.readlines()
|
| -
|
| - if not files:
|
| - parser.error('Nothing to upload')
|
| -
|
| - # Load the necessary metadata.
|
| - # TODO(maruel): Use a worker pool to upload as the hashing is being done.
|
| - infiles = dict(
|
| - (
|
| - f,
|
| - {
|
| - 's': os.stat(f).st_size,
|
| - 'h': hash_file(f, get_hash_algo(options.namespace)),
|
| - }
|
| - )
|
| - for f in files)
|
| -
|
| - with tools.Profiler('Archive'):
|
| - ret = upload_tree(
|
| - base_url=options.isolate_server,
|
| - indir=os.getcwd(),
|
| - infiles=infiles,
|
| - namespace=options.namespace)
|
| - if not ret:
|
| - print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
|
| - return ret
|
| -
|
| -
|
| -def CMDdownload(parser, args):
|
| - """Download data from the server.
|
| -
|
| - It can either download individual files or a complete tree from a .isolated
|
| - file.
|
| - """
|
| - parser.add_option(
|
| - '-i', '--isolated', metavar='HASH',
|
| - help='hash of an isolated file, .isolated file content is discarded, use '
|
| - '--file if you need it')
|
| - parser.add_option(
|
| - '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
|
| - help='hash and destination of a file, can be used multiple times')
|
| - parser.add_option(
|
| - '-t', '--target', metavar='DIR', default=os.getcwd(),
|
| - help='destination directory')
|
| - options, args = parser.parse_args(args)
|
| - if args:
|
| - parser.error('Unsupported arguments: %s' % args)
|
| - if bool(options.isolated) == bool(options.file):
|
| - parser.error('Use one of --isolated or --file, and only one.')
|
| -
|
| - options.target = os.path.abspath(options.target)
|
| - storage = get_storage(options.isolate_server, options.namespace)
|
| - cache = MemoryCache()
|
| - algo = get_hash_algo(options.namespace)
|
| -
|
| - # Fetching individual files.
|
| - if options.file:
|
| - channel = threading_utils.TaskChannel()
|
| - pending = {}
|
| - for digest, dest in options.file:
|
| - pending[digest] = dest
|
| - storage.async_fetch(
|
| - channel,
|
| - WorkerPool.MED,
|
| - digest,
|
| - UNKNOWN_FILE_SIZE,
|
| - functools.partial(file_write, os.path.join(options.target, dest)))
|
| - while pending:
|
| - fetched = channel.pull()
|
| - dest = pending.pop(fetched)
|
| - logging.info('%s: %s', fetched, dest)
|
| -
|
| - # Fetching whole isolated tree.
|
| - if options.isolated:
|
| - settings = fetch_isolated(
|
| - isolated_hash=options.isolated,
|
| - storage=storage,
|
| - cache=cache,
|
| - algo=algo,
|
| - outdir=options.target,
|
| - os_flavor=None,
|
| - require_command=False)
|
| - rel = os.path.join(options.target, settings.relative_cwd)
|
| - print('To run this test please run from the directory %s:' %
|
| - os.path.join(options.target, rel))
|
| - print(' ' + ' '.join(settings.command))
|
| -
|
| - return 0
|
| -
|
| -
|
| -class OptionParserIsolateServer(tools.OptionParserWithLogging):
|
| - def __init__(self, **kwargs):
|
| - tools.OptionParserWithLogging.__init__(self, **kwargs)
|
| - self.add_option(
|
| - '-I', '--isolate-server',
|
| - metavar='URL', default='',
|
| - help='Isolate server to use')
|
| - self.add_option(
|
| - '--namespace', default='default-gzip',
|
| - help='The namespace to use on the server, default: %default')
|
| -
|
| - def parse_args(self, *args, **kwargs):
|
| - options, args = tools.OptionParserWithLogging.parse_args(
|
| - self, *args, **kwargs)
|
| - options.isolate_server = options.isolate_server.rstrip('/')
|
| - if not options.isolate_server:
|
| - self.error('--isolate-server is required.')
|
| - return options, args
|
| -
|
| -
|
| -def main(args):
|
| - dispatcher = subcommand.CommandDispatcher(__name__)
|
| - try:
|
| - return dispatcher.execute(
|
| - OptionParserIsolateServer(version=__version__), args)
|
| - except Exception as e:
|
| - tools.report_error(e)
|
| - return 1
|
| -
|
| -
|
| -if __name__ == '__main__':
|
| - fix_encoding.fix_encoding()
|
| - tools.disable_buffering()
|
| - colorama.init()
|
| - sys.exit(main(sys.argv[1:]))
|
|
|