Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1908)

Unified Diff: swarm_client/isolateserver.py

Issue 69143004: Delete swarm_client. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « swarm_client/isolate_merge.py ('k') | swarm_client/run_isolated.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: swarm_client/isolateserver.py
===================================================================
--- swarm_client/isolateserver.py (revision 235167)
+++ swarm_client/isolateserver.py (working copy)
@@ -1,1664 +0,0 @@
-#!/usr/bin/env python
-# Copyright 2013 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Archives a set of files to a server."""
-
-__version__ = '0.2'
-
-import functools
-import hashlib
-import json
-import logging
-import os
-import re
-import sys
-import threading
-import time
-import urllib
-import zlib
-
-from third_party import colorama
-from third_party.depot_tools import fix_encoding
-from third_party.depot_tools import subcommand
-
-from utils import net
-from utils import threading_utils
-from utils import tools
-
-
-# Version of isolate protocol passed to the server in /handshake request.
-ISOLATE_PROTOCOL_VERSION = '1.0'
-
-
-# The number of files to check the isolate server per /pre-upload query.
-# All files are sorted by likelihood of a change in the file content
-# (currently file size is used to estimate this: larger the file -> larger the
-# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
-# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
-# and so on. Numbers here is a trade-off; the more per request, the lower the
-# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
-# larger values cause longer lookups, increasing the initial latency to start
-# uploading, which is especially an issue for large files. This value is
-# optimized for the "few thousands files to look up with minimal number of large
-# files missing" case.
-ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
-
-
-# A list of already compressed extension types that should not receive any
-# compression before being uploaded.
-ALREADY_COMPRESSED_TYPES = [
- '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
- 'wav', 'zip'
-]
-
-
-# The file size to be used when we don't know the correct file size,
-# generally used for .isolated files.
-UNKNOWN_FILE_SIZE = None
-
-
-# The size of each chunk to read when downloading and unzipping files.
-ZIPPED_FILE_CHUNK = 16 * 1024
-
-# Chunk size to use when doing disk I/O.
-DISK_FILE_CHUNK = 1024 * 1024
-
-# Chunk size to use when reading from network stream.
-NET_IO_FILE_CHUNK = 16 * 1024
-
-
-# Read timeout in seconds for downloads from isolate storage. If there's no
-# response from the server within this timeout whole download will be aborted.
-DOWNLOAD_READ_TIMEOUT = 60
-
-# Maximum expected delay (in seconds) between successive file fetches
-# in run_tha_test. If it takes longer than that, a deadlock might be happening
-# and all stack frames for all threads are dumped to log.
-DEADLOCK_TIMEOUT = 5 * 60
-
-
-# The delay (in seconds) to wait between logging statements when retrieving
-# the required files. This is intended to let the user (or buildbot) know that
-# the program is still running.
-DELAY_BETWEEN_UPDATES_IN_SECS = 30
-
-
-# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
-# specify the names here.
-SUPPORTED_ALGOS = {
- 'md5': hashlib.md5,
- 'sha-1': hashlib.sha1,
- 'sha-512': hashlib.sha512,
-}
-
-
-# Used for serialization.
-SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
-
-
-class ConfigError(ValueError):
- """Generic failure to load a .isolated file."""
- pass
-
-
-class MappingError(OSError):
- """Failed to recreate the tree."""
- pass
-
-
-def is_valid_hash(value, algo):
- """Returns if the value is a valid hash for the corresponding algorithm."""
- size = 2 * algo().digest_size
- return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
-
-
-def hash_file(filepath, algo):
- """Calculates the hash of a file without reading it all in memory at once.
-
- |algo| should be one of hashlib hashing algorithm.
- """
- digest = algo()
- with open(filepath, 'rb') as f:
- while True:
- chunk = f.read(DISK_FILE_CHUNK)
- if not chunk:
- break
- digest.update(chunk)
- return digest.hexdigest()
-
-
-def stream_read(stream, chunk_size):
- """Reads chunks from |stream| and yields them."""
- while True:
- data = stream.read(chunk_size)
- if not data:
- break
- yield data
-
-
-def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
- """Yields file content in chunks of given |chunk_size|."""
- with open(filepath, 'rb') as f:
- while True:
- data = f.read(chunk_size)
- if not data:
- break
- yield data
-
-
-def file_write(filepath, content_generator):
- """Writes file content as generated by content_generator.
-
- Creates the intermediary directory as needed.
-
- Returns the number of bytes written.
-
- Meant to be mocked out in unit tests.
- """
- filedir = os.path.dirname(filepath)
- if not os.path.isdir(filedir):
- os.makedirs(filedir)
- total = 0
- with open(filepath, 'wb') as f:
- for d in content_generator:
- total += len(d)
- f.write(d)
- return total
-
-
-def zip_compress(content_generator, level=7):
- """Reads chunks from |content_generator| and yields zip compressed chunks."""
- compressor = zlib.compressobj(level)
- for chunk in content_generator:
- compressed = compressor.compress(chunk)
- if compressed:
- yield compressed
- tail = compressor.flush(zlib.Z_FINISH)
- if tail:
- yield tail
-
-
-def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
- """Reads zipped data from |content_generator| and yields decompressed data.
-
- Decompresses data in small chunks (no larger than |chunk_size|) so that
- zip bomb file doesn't cause zlib to preallocate huge amount of memory.
-
- Raises IOError if data is corrupted or incomplete.
- """
- decompressor = zlib.decompressobj()
- compressed_size = 0
- try:
- for chunk in content_generator:
- compressed_size += len(chunk)
- data = decompressor.decompress(chunk, chunk_size)
- if data:
- yield data
- while decompressor.unconsumed_tail:
- data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
- if data:
- yield data
- tail = decompressor.flush()
- if tail:
- yield tail
- except zlib.error as e:
- raise IOError(
- 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
- # Ensure all data was read and decompressed.
- if decompressor.unused_data or decompressor.unconsumed_tail:
- raise IOError('Not all data was decompressed')
-
-
-def get_zip_compression_level(filename):
- """Given a filename calculates the ideal zip compression level to use."""
- file_ext = os.path.splitext(filename)[1].lower()
- # TODO(csharp): Profile to find what compression level works best.
- return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
-
-
-def create_directories(base_directory, files):
- """Creates the directory structure needed by the given list of files."""
- logging.debug('create_directories(%s, %d)', base_directory, len(files))
- # Creates the tree of directories to create.
- directories = set(os.path.dirname(f) for f in files)
- for item in list(directories):
- while item:
- directories.add(item)
- item = os.path.dirname(item)
- for d in sorted(directories):
- if d:
- os.mkdir(os.path.join(base_directory, d))
-
-
-def create_links(base_directory, files):
- """Creates any links needed by the given set of files."""
- for filepath, properties in files:
- if 'l' not in properties:
- continue
- if sys.platform == 'win32':
- # TODO(maruel): Create junctions or empty text files similar to what
- # cygwin do?
- logging.warning('Ignoring symlink %s', filepath)
- continue
- outfile = os.path.join(base_directory, filepath)
- # symlink doesn't exist on Windows. So the 'link' property should
- # never be specified for windows .isolated file.
- os.symlink(properties['l'], outfile) # pylint: disable=E1101
- if 'm' in properties:
- lchmod = getattr(os, 'lchmod', None)
- if lchmod:
- lchmod(outfile, properties['m'])
-
-
-def is_valid_file(filepath, size):
- """Determines if the given files appears valid.
-
- Currently it just checks the file's size.
- """
- if size == UNKNOWN_FILE_SIZE:
- return os.path.isfile(filepath)
- actual_size = os.stat(filepath).st_size
- if size != actual_size:
- logging.warning(
- 'Found invalid item %s; %d != %d',
- os.path.basename(filepath), actual_size, size)
- return False
- return True
-
-
-class WorkerPool(threading_utils.AutoRetryThreadPool):
- """Thread pool that automatically retries on IOError and runs a preconfigured
- function.
- """
- # Initial and maximum number of worker threads.
- INITIAL_WORKERS = 2
- MAX_WORKERS = 16
- RETRIES = 5
-
- def __init__(self):
- super(WorkerPool, self).__init__(
- [IOError],
- self.RETRIES,
- self.INITIAL_WORKERS,
- self.MAX_WORKERS,
- 0,
- 'remote')
-
-
-class Item(object):
- """An item to push to Storage.
-
- It starts its life in a main thread, travels to 'contains' thread, then to
- 'push' thread and then finally back to the main thread.
-
- It is never used concurrently from multiple threads.
- """
-
- def __init__(self, digest, size, is_isolated=False):
- self.digest = digest
- self.size = size
- self.is_isolated = is_isolated
- self.compression_level = 6
- self.push_state = None
-
- def content(self, chunk_size):
- """Iterable with content of this item in chunks of given size.
-
- Arguments:
- chunk_size: preferred size of the chunk to produce, may be ignored.
- """
- raise NotImplementedError()
-
-
-class FileItem(Item):
- """A file to push to Storage."""
-
- def __init__(self, path, digest, size, is_isolated):
- super(FileItem, self).__init__(digest, size, is_isolated)
- self.path = path
- self.compression_level = get_zip_compression_level(path)
-
- def content(self, chunk_size):
- return file_read(self.path, chunk_size)
-
-
-class BufferItem(Item):
- """A byte buffer to push to Storage."""
-
- def __init__(self, buf, algo, is_isolated=False):
- super(BufferItem, self).__init__(
- algo(buf).hexdigest(), len(buf), is_isolated)
- self.buffer = buf
-
- def content(self, _chunk_size):
- return [self.buffer]
-
-
-class Storage(object):
- """Efficiently downloads or uploads large set of files via StorageApi."""
-
- def __init__(self, storage_api, use_zip):
- self.use_zip = use_zip
- self._storage_api = storage_api
- self._cpu_thread_pool = None
- self._net_thread_pool = None
-
- @property
- def cpu_thread_pool(self):
- """ThreadPool for CPU-bound tasks like zipping."""
- if self._cpu_thread_pool is None:
- self._cpu_thread_pool = threading_utils.ThreadPool(
- 2, max(threading_utils.num_processors(), 2), 0, 'zip')
- return self._cpu_thread_pool
-
- @property
- def net_thread_pool(self):
- """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
- if self._net_thread_pool is None:
- self._net_thread_pool = WorkerPool()
- return self._net_thread_pool
-
- def close(self):
- """Waits for all pending tasks to finish."""
- if self._cpu_thread_pool:
- self._cpu_thread_pool.join()
- self._cpu_thread_pool.close()
- self._cpu_thread_pool = None
- if self._net_thread_pool:
- self._net_thread_pool.join()
- self._net_thread_pool.close()
- self._net_thread_pool = None
-
- def __enter__(self):
- """Context manager interface."""
- return self
-
- def __exit__(self, _exc_type, _exc_value, _traceback):
- """Context manager interface."""
- self.close()
- return False
-
- def upload_tree(self, indir, infiles):
- """Uploads the given tree to the isolate server.
-
- Arguments:
- indir: root directory the infiles are based in.
- infiles: dict of files to upload from |indir|.
-
- Returns:
- List of items that were uploaded. All other items are already there.
- """
- logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
-
- # Convert |indir| + |infiles| into a list of FileItem objects.
- # Filter out symlinks, since they are not represented by items on isolate
- # server side.
- items = [
- FileItem(
- path=os.path.join(indir, filepath),
- digest=metadata['h'],
- size=metadata['s'],
- is_isolated=metadata.get('priority') == '0')
- for filepath, metadata in infiles.iteritems()
- if 'l' not in metadata
- ]
-
- return self.upload_items(items)
-
- def upload_items(self, items):
- """Uploads bunch of items to the isolate server.
-
- Will upload only items that are missing.
-
- Arguments:
- items: list of Item instances that represents data to upload.
-
- Returns:
- List of items that were uploaded. All other items are already there.
- """
- # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
- # used by swarming.py. There's no need to spawn multiple threads and try to
- # do stuff in parallel: there's nothing to parallelize. 'contains' check and
- # 'push' should be performed sequentially in the context of current thread.
-
- # For each digest keep only first Item that matches it. All other items
- # are just indistinguishable copies from the point of view of isolate
- # server (it doesn't care about paths at all, only content and digests).
- seen = {}
- duplicates = 0
- for item in items:
- if seen.setdefault(item.digest, item) is not item:
- duplicates += 1
- items = seen.values()
- if duplicates:
- logging.info('Skipped %d duplicated files', duplicates)
-
- # Enqueue all upload tasks.
- missing = set()
- channel = threading_utils.TaskChannel()
- for missing_item in self.get_missing_items(items):
- missing.add(missing_item)
- self.async_push(
- channel,
- WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
- missing_item)
-
- uploaded = []
- # No need to spawn deadlock detector thread if there's nothing to upload.
- if missing:
- with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
- # Wait for all started uploads to finish.
- while len(uploaded) != len(missing):
- detector.ping()
- item = channel.pull()
- uploaded.append(item)
- logging.debug(
- 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
- logging.info('All files are uploaded')
-
- # Print stats.
- total = len(items)
- total_size = sum(f.size for f in items)
- logging.info(
- 'Total: %6d, %9.1fkb',
- total,
- total_size / 1024.)
- cache_hit = set(items) - missing
- cache_hit_size = sum(f.size for f in cache_hit)
- logging.info(
- 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
- len(cache_hit),
- cache_hit_size / 1024.,
- len(cache_hit) * 100. / total,
- cache_hit_size * 100. / total_size if total_size else 0)
- cache_miss = missing
- cache_miss_size = sum(f.size for f in cache_miss)
- logging.info(
- 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
- len(cache_miss),
- cache_miss_size / 1024.,
- len(cache_miss) * 100. / total,
- cache_miss_size * 100. / total_size if total_size else 0)
-
- return uploaded
-
- def get_fetch_url(self, digest):
- """Returns an URL that can be used to fetch an item with given digest.
-
- Arguments:
- digest: hex digest of item to fetch.
-
- Returns:
- An URL or None if underlying protocol doesn't support this.
- """
- return self._storage_api.get_fetch_url(digest)
-
- def async_push(self, channel, priority, item):
- """Starts asynchronous push to the server in a parallel thread.
-
- Arguments:
- channel: TaskChannel that receives back |item| when upload ends.
- priority: thread pool task priority for the push.
- item: item to upload as instance of Item class.
- """
- def push(content):
- """Pushes an item and returns its id, to pass as a result to |channel|."""
- self._storage_api.push(item, content)
- return item
-
- # If zipping is not required, just start a push task.
- if not self.use_zip:
- self.net_thread_pool.add_task_with_channel(channel, priority, push,
- item.content(DISK_FILE_CHUNK))
- return
-
- # If zipping is enabled, zip in a separate thread.
- def zip_and_push():
- # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
- # content right here. It will block until all file is zipped.
- try:
- stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
- item.compression_level)
- data = ''.join(stream)
- except Exception as exc:
- logging.error('Failed to zip \'%s\': %s', item, exc)
- channel.send_exception(exc)
- return
- self.net_thread_pool.add_task_with_channel(
- channel, priority, push, [data])
- self.cpu_thread_pool.add_task(priority, zip_and_push)
-
- def async_fetch(self, channel, priority, digest, size, sink):
- """Starts asynchronous fetch from the server in a parallel thread.
-
- Arguments:
- channel: TaskChannel that receives back |digest| when download ends.
- priority: thread pool task priority for the fetch.
- digest: hex digest of an item to download.
- size: expected size of the item (after decompression).
- sink: function that will be called as sink(generator).
- """
- def fetch():
- try:
- # Prepare reading pipeline.
- stream = self._storage_api.fetch(digest)
- if self.use_zip:
- stream = zip_decompress(stream, DISK_FILE_CHUNK)
- # Run |stream| through verifier that will assert its size.
- verifier = FetchStreamVerifier(stream, size)
- # Verified stream goes to |sink|.
- sink(verifier.run())
- except Exception as err:
- logging.warning('Failed to fetch %s: %s', digest, err)
- raise
- return digest
-
- # Don't bother with zip_thread_pool for decompression. Decompression is
- # really fast and most probably IO bound anyway.
- self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
-
- def get_missing_items(self, items):
- """Yields items that are missing from the server.
-
- Issues multiple parallel queries via StorageApi's 'contains' method.
-
- Arguments:
- items: a list of Item objects to check.
-
- Yields:
- Item objects that are missing from the server.
- """
- channel = threading_utils.TaskChannel()
- pending = 0
- # Enqueue all requests.
- for batch in self.batch_items_for_check(items):
- self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
- self._storage_api.contains, batch)
- pending += 1
- # Yield results as they come in.
- for _ in xrange(pending):
- for missing in channel.pull():
- yield missing
-
- @staticmethod
- def batch_items_for_check(items):
- """Splits list of items to check for existence on the server into batches.
-
- Each batch corresponds to a single 'exists?' query to the server via a call
- to StorageApi's 'contains' method.
-
- Arguments:
- items: a list of Item objects.
-
- Yields:
- Batches of items to query for existence in a single operation,
- each batch is a list of Item objects.
- """
- batch_count = 0
- batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
- next_queries = []
- for item in sorted(items, key=lambda x: x.size, reverse=True):
- next_queries.append(item)
- if len(next_queries) == batch_size_limit:
- yield next_queries
- next_queries = []
- batch_count += 1
- batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
- min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
- if next_queries:
- yield next_queries
-
-
-class FetchQueue(object):
- """Fetches items from Storage and places them into LocalCache.
-
- It manages multiple concurrent fetch operations. Acts as a bridge between
- Storage and LocalCache so that Storage and LocalCache don't depend on each
- other at all.
- """
-
- def __init__(self, storage, cache):
- self.storage = storage
- self.cache = cache
- self._channel = threading_utils.TaskChannel()
- self._pending = set()
- self._accessed = set()
- self._fetched = cache.cached_set()
-
- def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
- """Starts asynchronous fetch of item |digest|."""
- # Fetching it now?
- if digest in self._pending:
- return
-
- # Mark this file as in use, verify_all_cached will later ensure it is still
- # in cache.
- self._accessed.add(digest)
-
- # Already fetched? Notify cache to update item's LRU position.
- if digest in self._fetched:
- # 'touch' returns True if item is in cache and not corrupted.
- if self.cache.touch(digest, size):
- return
- # Item is corrupted, remove it from cache and fetch it again.
- self._fetched.remove(digest)
- self.cache.evict(digest)
-
- # TODO(maruel): It should look at the free disk space, the current cache
- # size and the size of the new item on every new item:
- # - Trim the cache as more entries are listed when free disk space is low,
- # otherwise if the amount of data downloaded during the run > free disk
- # space, it'll crash.
- # - Make sure there's enough free disk space to fit all dependencies of
- # this run! If not, abort early.
-
- # Start fetching.
- self._pending.add(digest)
- self.storage.async_fetch(
- self._channel, priority, digest, size,
- functools.partial(self.cache.write, digest))
-
- def wait(self, digests):
- """Starts a loop that waits for at least one of |digests| to be retrieved.
-
- Returns the first digest retrieved.
- """
- # Flush any already fetched items.
- for digest in digests:
- if digest in self._fetched:
- return digest
-
- # Ensure all requested items are being fetched now.
- assert all(digest in self._pending for digest in digests), (
- digests, self._pending)
-
- # Wait for some requested item to finish fetching.
- while self._pending:
- digest = self._channel.pull()
- self._pending.remove(digest)
- self._fetched.add(digest)
- if digest in digests:
- return digest
-
- # Should never reach this point due to assert above.
- raise RuntimeError('Impossible state')
-
- def inject_local_file(self, path, algo):
- """Adds local file to the cache as if it was fetched from storage."""
- with open(path, 'rb') as f:
- data = f.read()
- digest = algo(data).hexdigest()
- self.cache.write(digest, [data])
- self._fetched.add(digest)
- return digest
-
- @property
- def pending_count(self):
- """Returns number of items to be fetched."""
- return len(self._pending)
-
- def verify_all_cached(self):
- """True if all accessed items are in cache."""
- return self._accessed.issubset(self.cache.cached_set())
-
-
-class FetchStreamVerifier(object):
- """Verifies that fetched file is valid before passing it to the LocalCache."""
-
- def __init__(self, stream, expected_size):
- self.stream = stream
- self.expected_size = expected_size
- self.current_size = 0
-
- def run(self):
- """Generator that yields same items as |stream|.
-
- Verifies |stream| is complete before yielding a last chunk to consumer.
-
- Also wraps IOError produced by consumer into MappingError exceptions since
- otherwise Storage will retry fetch on unrelated local cache errors.
- """
- # Read one chunk ahead, keep it in |stored|.
- # That way a complete stream can be verified before pushing last chunk
- # to consumer.
- stored = None
- for chunk in self.stream:
- assert chunk is not None
- if stored is not None:
- self._inspect_chunk(stored, is_last=False)
- try:
- yield stored
- except IOError as exc:
- raise MappingError('Failed to store an item in cache: %s' % exc)
- stored = chunk
- if stored is not None:
- self._inspect_chunk(stored, is_last=True)
- try:
- yield stored
- except IOError as exc:
- raise MappingError('Failed to store an item in cache: %s' % exc)
-
- def _inspect_chunk(self, chunk, is_last):
- """Called for each fetched chunk before passing it to consumer."""
- self.current_size += len(chunk)
- if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
- (self.expected_size != self.current_size)):
- raise IOError('Incorrect file size: expected %d, got %d' % (
- self.expected_size, self.current_size))
-
-
-class StorageApi(object):
- """Interface for classes that implement low-level storage operations."""
-
- def get_fetch_url(self, digest):
- """Returns an URL that can be used to fetch an item with given digest.
-
- Arguments:
- digest: hex digest of item to fetch.
-
- Returns:
- An URL or None if the protocol doesn't support this.
- """
- raise NotImplementedError()
-
- def fetch(self, digest):
- """Fetches an object and yields its content.
-
- Arguments:
- digest: hash digest of item to download.
-
- Yields:
- Chunks of downloaded item (as str objects).
- """
- raise NotImplementedError()
-
- def push(self, item, content):
- """Uploads an |item| with content generated by |content| generator.
-
- Arguments:
- item: Item object that holds information about an item being pushed.
- content: a generator that yields chunks to push.
-
- Returns:
- None.
- """
- raise NotImplementedError()
-
- def contains(self, items):
- """Checks for existence of given |items| on the server.
-
- Mutates |items| by assigning opaque implement specific object to Item's
- push_state attribute on missing entries in the datastore.
-
- Arguments:
- items: list of Item objects.
-
- Returns:
- A list of items missing on server as a list of Item objects.
- """
- raise NotImplementedError()
-
-
-class IsolateServer(StorageApi):
- """StorageApi implementation that downloads and uploads to Isolate Server.
-
- It uploads and downloads directly from Google Storage whenever appropriate.
- """
-
- class _PushState(object):
- """State needed to call .push(), to be stored in Item.push_state."""
- def __init__(self, upload_url, finalize_url):
- self.upload_url = upload_url
- self.finalize_url = finalize_url
- self.uploaded = False
- self.finalized = False
-
- def __init__(self, base_url, namespace):
- super(IsolateServer, self).__init__()
- assert base_url.startswith('http'), base_url
- self.base_url = base_url.rstrip('/')
- self.namespace = namespace
- self._lock = threading.Lock()
- self._server_caps = None
-
- @staticmethod
- def _generate_handshake_request():
- """Returns a dict to be sent as handshake request body."""
- # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
- return {
- 'client_app_version': __version__,
- 'fetcher': True,
- 'protocol_version': ISOLATE_PROTOCOL_VERSION,
- 'pusher': True,
- }
-
- @staticmethod
- def _validate_handshake_response(caps):
- """Validates and normalizes handshake response."""
- logging.info('Protocol version: %s', caps['protocol_version'])
- logging.info('Server version: %s', caps['server_app_version'])
- if caps.get('error'):
- raise MappingError(caps['error'])
- if not caps['access_token']:
- raise ValueError('access_token is missing')
- return caps
-
- @property
- def _server_capabilities(self):
- """Performs handshake with the server if not yet done.
-
- Returns:
- Server capabilities dictionary as returned by /handshake endpoint.
-
- Raises:
- MappingError if server rejects the handshake.
- """
- # TODO(maruel): Make this request much earlier asynchronously while the
- # files are being enumerated.
- with self._lock:
- if self._server_caps is None:
- request_body = json.dumps(
- self._generate_handshake_request(), separators=(',', ':'))
- response = net.url_read(
- url=self.base_url + '/content-gs/handshake',
- data=request_body,
- content_type='application/json',
- method='POST')
- if response is None:
- raise MappingError('Failed to perform handshake.')
- try:
- caps = json.loads(response)
- if not isinstance(caps, dict):
- raise ValueError('Expecting JSON dict')
- self._server_caps = self._validate_handshake_response(caps)
- except (ValueError, KeyError, TypeError) as exc:
- # KeyError exception has very confusing str conversion: it's just a
- # missing key value and nothing else. So print exception class name
- # as well.
- raise MappingError('Invalid handshake response (%s): %s' % (
- exc.__class__.__name__, exc))
- return self._server_caps
-
- def get_fetch_url(self, digest):
- assert isinstance(digest, basestring)
- return '%s/content-gs/retrieve/%s/%s' % (
- self.base_url, self.namespace, digest)
-
- def fetch(self, digest):
- source_url = self.get_fetch_url(digest)
- logging.debug('download_file(%s)', source_url)
-
- # Because the app engine DB is only eventually consistent, retry 404 errors
- # because the file might just not be visible yet (even though it has been
- # uploaded).
- connection = net.url_open(
- source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
- if not connection:
- raise IOError('Unable to open connection to %s' % source_url)
- return stream_read(connection, NET_IO_FILE_CHUNK)
-
- def push(self, item, content):
- assert isinstance(item, Item)
- assert isinstance(item.push_state, IsolateServer._PushState)
- assert not item.push_state.finalized
-
- # TODO(vadimsh): Do not read from |content| generator when retrying push.
- # If |content| is indeed a generator, it can not be re-winded back
- # to the beginning of the stream. A retry will find it exhausted. A possible
- # solution is to wrap |content| generator with some sort of caching
- # restartable generator. It should be done alongside streaming support
- # implementation.
-
- # This push operation may be a retry after failed finalization call below,
- # no need to reupload contents in that case.
- if not item.push_state.uploaded:
- # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
- # upload support is implemented.
- if isinstance(content, list) and len(content) == 1:
- content = content[0]
- else:
- content = ''.join(content)
- # PUT file to |upload_url|.
- response = net.url_read(
- url=item.push_state.upload_url,
- data=content,
- content_type='application/octet-stream',
- method='PUT')
- if response is None:
- raise IOError('Failed to upload a file %s to %s' % (
- item.digest, item.push_state.upload_url))
- item.push_state.uploaded = True
- else:
- logging.info(
- 'A file %s already uploaded, retrying finalization only', item.digest)
-
- # Optionally notify the server that it's done.
- if item.push_state.finalize_url:
- # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
- # send it to isolated server. That way isolate server can verify that
- # the data safely reached Google Storage (GS provides MD5 and CRC32C of
- # stored files).
- response = net.url_read(
- url=item.push_state.finalize_url,
- data='',
- content_type='application/json',
- method='POST')
- if response is None:
- raise IOError('Failed to finalize an upload of %s' % item.digest)
- item.push_state.finalized = True
-
- def contains(self, items):
- logging.info('Checking existence of %d files...', len(items))
-
- # Request body is a json encoded list of dicts.
- body = [
- {
- 'h': item.digest,
- 's': item.size,
- 'i': int(item.is_isolated),
- } for item in items
- ]
-
- query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
- self.base_url,
- self.namespace,
- urllib.quote(self._server_capabilities['access_token']))
- response_body = net.url_read(
- url=query_url,
- data=json.dumps(body, separators=(',', ':')),
- content_type='application/json',
- method='POST')
- if response_body is None:
- raise MappingError('Failed to execute /pre-upload query')
-
- # Response body is a list of push_urls (or null if file is already present).
- try:
- response = json.loads(response_body)
- if not isinstance(response, list):
- raise ValueError('Expecting response with json-encoded list')
- if len(response) != len(items):
- raise ValueError(
- 'Incorrect number of items in the list, expected %d, '
- 'but got %d' % (len(items), len(response)))
- except ValueError as err:
- raise MappingError(
- 'Invalid response from server: %s, body is %s' % (err, response_body))
-
- # Pick Items that are missing, attach _PushState to them.
- missing_items = []
- for i, push_urls in enumerate(response):
- if push_urls:
- assert len(push_urls) == 2, str(push_urls)
- item = items[i]
- assert item.push_state is None
- item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
- missing_items.append(item)
- logging.info('Queried %d files, %d cache hit',
- len(items), len(items) - len(missing_items))
- return missing_items
-
-
-class FileSystem(StorageApi):
- """StorageApi implementation that fetches data from the file system.
-
- The common use case is a NFS/CIFS file server that is mounted locally that is
- used to fetch the file on a local partition.
- """
-
- def __init__(self, base_path):
- super(FileSystem, self).__init__()
- self.base_path = base_path
-
- def get_fetch_url(self, digest):
- return None
-
- def fetch(self, digest):
- assert isinstance(digest, basestring)
- return file_read(os.path.join(self.base_path, digest))
-
- def push(self, item, content):
- assert isinstance(item, Item)
- file_write(os.path.join(self.base_path, item.digest), content)
-
- def contains(self, items):
- return [
- item for item in items
- if not os.path.exists(os.path.join(self.base_path, item.digest))
- ]
-
-
-class LocalCache(object):
- """Local cache that stores objects fetched via Storage.
-
- It can be accessed concurrently from multiple threads, so it should protect
- its internal state with some lock.
- """
-
- def __enter__(self):
- """Context manager interface."""
- return self
-
- def __exit__(self, _exc_type, _exec_value, _traceback):
- """Context manager interface."""
- return False
-
- def cached_set(self):
- """Returns a set of all cached digests (always a new object)."""
- raise NotImplementedError()
-
- def touch(self, digest, size):
- """Ensures item is not corrupted and updates its LRU position.
-
- Arguments:
- digest: hash digest of item to check.
- size: expected size of this item.
-
- Returns:
- True if item is in cache and not corrupted.
- """
- raise NotImplementedError()
-
- def evict(self, digest):
- """Removes item from cache if it's there."""
- raise NotImplementedError()
-
- def read(self, digest):
- """Returns contents of the cached item as a single str."""
- raise NotImplementedError()
-
- def write(self, digest, content):
- """Reads data from |content| generator and stores it in cache."""
- raise NotImplementedError()
-
- def link(self, digest, dest, file_mode=None):
- """Ensures file at |dest| has same content as cached |digest|."""
- raise NotImplementedError()
-
-
-class MemoryCache(LocalCache):
- """LocalCache implementation that stores everything in memory."""
-
- def __init__(self):
- super(MemoryCache, self).__init__()
- # Let's not assume dict is thread safe.
- self._lock = threading.Lock()
- self._contents = {}
-
- def cached_set(self):
- with self._lock:
- return set(self._contents)
-
- def touch(self, digest, size):
- with self._lock:
- return digest in self._contents
-
- def evict(self, digest):
- with self._lock:
- self._contents.pop(digest, None)
-
- def read(self, digest):
- with self._lock:
- return self._contents[digest]
-
- def write(self, digest, content):
- # Assemble whole stream before taking the lock.
- data = ''.join(content)
- with self._lock:
- self._contents[digest] = data
-
- def link(self, digest, dest, file_mode=None):
- file_write(dest, [self.read(digest)])
- if file_mode is not None:
- os.chmod(dest, file_mode)
-
-
-def get_hash_algo(_namespace):
- """Return hash algorithm class to use when uploading to given |namespace|."""
- # TODO(vadimsh): Implement this at some point.
- return hashlib.sha1
-
-
-def is_namespace_with_compression(namespace):
- """Returns True if given |namespace| stores compressed objects."""
- return namespace.endswith(('-gzip', '-deflate'))
-
-
-def get_storage_api(file_or_url, namespace):
- """Returns an object that implements StorageApi interface."""
- if re.match(r'^https?://.+$', file_or_url):
- return IsolateServer(file_or_url, namespace)
- else:
- return FileSystem(file_or_url)
-
-
-def get_storage(file_or_url, namespace):
- """Returns Storage class configured with appropriate StorageApi instance."""
- return Storage(
- get_storage_api(file_or_url, namespace),
- is_namespace_with_compression(namespace))
-
-
-def upload_tree(base_url, indir, infiles, namespace):
- """Uploads the given tree to the given url.
-
- Arguments:
- base_url: The base url, it is assume that |base_url|/has/ can be used to
- query if an element was already uploaded, and |base_url|/store/
- can be used to upload a new element.
- indir: Root directory the infiles are based in.
- infiles: dict of files to upload from |indir| to |base_url|.
- namespace: The namespace to use on the server.
- """
- with get_storage(base_url, namespace) as storage:
- storage.upload_tree(indir, infiles)
- return 0
-
-
-def load_isolated(content, os_flavor, algo):
- """Verifies the .isolated file is valid and loads this object with the json
- data.
-
- Arguments:
- - content: raw serialized content to load.
- - os_flavor: OS to load this file on. Optional.
- - algo: hashlib algorithm class. Used to confirm the algorithm matches the
- algorithm used on the Isolate Server.
- """
- try:
- data = json.loads(content)
- except ValueError:
- raise ConfigError('Failed to parse: %s...' % content[:100])
-
- if not isinstance(data, dict):
- raise ConfigError('Expected dict, got %r' % data)
-
- # Check 'version' first, since it could modify the parsing after.
- value = data.get('version', '1.0')
- if not isinstance(value, basestring):
- raise ConfigError('Expected string, got %r' % value)
- if not re.match(r'^(\d+)\.(\d+)$', value):
- raise ConfigError('Expected a compatible version, got %r' % value)
- if value.split('.', 1)[0] != '1':
- raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
-
- if algo is None:
- # Default the algorithm used in the .isolated file itself, falls back to
- # 'sha-1' if unspecified.
- algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
-
- for key, value in data.iteritems():
- if key == 'algo':
- if not isinstance(value, basestring):
- raise ConfigError('Expected string, got %r' % value)
- if value not in SUPPORTED_ALGOS:
- raise ConfigError(
- 'Expected one of \'%s\', got %r' %
- (', '.join(sorted(SUPPORTED_ALGOS)), value))
- if value != SUPPORTED_ALGOS_REVERSE[algo]:
- raise ConfigError(
- 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
-
- elif key == 'command':
- if not isinstance(value, list):
- raise ConfigError('Expected list, got %r' % value)
- if not value:
- raise ConfigError('Expected non-empty command')
- for subvalue in value:
- if not isinstance(subvalue, basestring):
- raise ConfigError('Expected string, got %r' % subvalue)
-
- elif key == 'files':
- if not isinstance(value, dict):
- raise ConfigError('Expected dict, got %r' % value)
- for subkey, subvalue in value.iteritems():
- if not isinstance(subkey, basestring):
- raise ConfigError('Expected string, got %r' % subkey)
- if not isinstance(subvalue, dict):
- raise ConfigError('Expected dict, got %r' % subvalue)
- for subsubkey, subsubvalue in subvalue.iteritems():
- if subsubkey == 'l':
- if not isinstance(subsubvalue, basestring):
- raise ConfigError('Expected string, got %r' % subsubvalue)
- elif subsubkey == 'm':
- if not isinstance(subsubvalue, int):
- raise ConfigError('Expected int, got %r' % subsubvalue)
- elif subsubkey == 'h':
- if not is_valid_hash(subsubvalue, algo):
- raise ConfigError('Expected sha-1, got %r' % subsubvalue)
- elif subsubkey == 's':
- if not isinstance(subsubvalue, int):
- raise ConfigError('Expected int, got %r' % subsubvalue)
- else:
- raise ConfigError('Unknown subsubkey %s' % subsubkey)
- if bool('h' in subvalue) == bool('l' in subvalue):
- raise ConfigError(
- 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
- subvalue)
- if bool('h' in subvalue) != bool('s' in subvalue):
- raise ConfigError(
- 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
- subvalue)
- if bool('s' in subvalue) == bool('l' in subvalue):
- raise ConfigError(
- 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
- subvalue)
- if bool('l' in subvalue) and bool('m' in subvalue):
- raise ConfigError(
- 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
- subvalue)
-
- elif key == 'includes':
- if not isinstance(value, list):
- raise ConfigError('Expected list, got %r' % value)
- if not value:
- raise ConfigError('Expected non-empty includes list')
- for subvalue in value:
- if not is_valid_hash(subvalue, algo):
- raise ConfigError('Expected sha-1, got %r' % subvalue)
-
- elif key == 'read_only':
- if not isinstance(value, bool):
- raise ConfigError('Expected bool, got %r' % value)
-
- elif key == 'relative_cwd':
- if not isinstance(value, basestring):
- raise ConfigError('Expected string, got %r' % value)
-
- elif key == 'os':
- if os_flavor and value != os_flavor:
- raise ConfigError(
- 'Expected \'os\' to be \'%s\' but got \'%s\'' %
- (os_flavor, value))
-
- elif key == 'version':
- # Already checked above.
- pass
-
- else:
- raise ConfigError('Unknown key %r' % key)
-
- # Automatically fix os.path.sep if necessary. While .isolated files are always
- # in the the native path format, someone could want to download an .isolated
- # tree from another OS.
- wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
- if 'files' in data:
- data['files'] = dict(
- (k.replace(wrong_path_sep, os.path.sep), v)
- for k, v in data['files'].iteritems())
- for v in data['files'].itervalues():
- if 'l' in v:
- v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
- if 'relative_cwd' in data:
- data['relative_cwd'] = data['relative_cwd'].replace(
- wrong_path_sep, os.path.sep)
- return data
-
-
-class IsolatedFile(object):
- """Represents a single parsed .isolated file."""
- def __init__(self, obj_hash, algo):
- """|obj_hash| is really the sha-1 of the file."""
- logging.debug('IsolatedFile(%s)' % obj_hash)
- self.obj_hash = obj_hash
- self.algo = algo
- # Set once all the left-side of the tree is parsed. 'Tree' here means the
- # .isolate and all the .isolated files recursively included by it with
- # 'includes' key. The order of each sha-1 in 'includes', each representing a
- # .isolated file in the hash table, is important, as the later ones are not
- # processed until the firsts are retrieved and read.
- self.can_fetch = False
-
- # Raw data.
- self.data = {}
- # A IsolatedFile instance, one per object in self.includes.
- self.children = []
-
- # Set once the .isolated file is loaded.
- self._is_parsed = False
- # Set once the files are fetched.
- self.files_fetched = False
-
- def load(self, os_flavor, content):
- """Verifies the .isolated file is valid and loads this object with the json
- data.
- """
- logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
- assert not self._is_parsed
- self.data = load_isolated(content, os_flavor, self.algo)
- self.children = [
- IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
- ]
- self._is_parsed = True
-
- def fetch_files(self, fetch_queue, files):
- """Adds files in this .isolated file not present in |files| dictionary.
-
- Preemptively request files.
-
- Note that |files| is modified by this function.
- """
- assert self.can_fetch
- if not self._is_parsed or self.files_fetched:
- return
- logging.debug('fetch_files(%s)' % self.obj_hash)
- for filepath, properties in self.data.get('files', {}).iteritems():
- # Root isolated has priority on the files being mapped. In particular,
- # overriden files must not be fetched.
- if filepath not in files:
- files[filepath] = properties
- if 'h' in properties:
- # Preemptively request files.
- logging.debug('fetching %s' % filepath)
- fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
- self.files_fetched = True
-
-
-class Settings(object):
- """Results of a completely parsed .isolated file."""
- def __init__(self):
- self.command = []
- self.files = {}
- self.read_only = None
- self.relative_cwd = None
- # The main .isolated file, a IsolatedFile instance.
- self.root = None
-
- def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
- """Loads the .isolated and all the included .isolated asynchronously.
-
- It enables support for "included" .isolated files. They are processed in
- strict order but fetched asynchronously from the cache. This is important so
- that a file in an included .isolated file that is overridden by an embedding
- .isolated file is not fetched needlessly. The includes are fetched in one
- pass and the files are fetched as soon as all the ones on the left-side
- of the tree were fetched.
-
- The prioritization is very important here for nested .isolated files.
- 'includes' have the highest priority and the algorithm is optimized for both
- deep and wide trees. A deep one is a long link of .isolated files referenced
- one at a time by one item in 'includes'. A wide one has a large number of
- 'includes' in a single .isolated file. 'left' is defined as an included
- .isolated file earlier in the 'includes' list. So the order of the elements
- in 'includes' is important.
- """
- self.root = IsolatedFile(root_isolated_hash, algo)
-
- # Isolated files being retrieved now: hash -> IsolatedFile instance.
- pending = {}
- # Set of hashes of already retrieved items to refuse recursive includes.
- seen = set()
-
- def retrieve(isolated_file):
- h = isolated_file.obj_hash
- if h in seen:
- raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
- assert h not in pending
- seen.add(h)
- pending[h] = isolated_file
- fetch_queue.add(WorkerPool.HIGH, h)
-
- retrieve(self.root)
-
- while pending:
- item_hash = fetch_queue.wait(pending)
- item = pending.pop(item_hash)
- item.load(os_flavor, fetch_queue.cache.read(item_hash))
- if item_hash == root_isolated_hash:
- # It's the root item.
- item.can_fetch = True
-
- for new_child in item.children:
- retrieve(new_child)
-
- # Traverse the whole tree to see if files can now be fetched.
- self._traverse_tree(fetch_queue, self.root)
-
- def check(n):
- return all(check(x) for x in n.children) and n.files_fetched
- assert check(self.root)
-
- self.relative_cwd = self.relative_cwd or ''
- self.read_only = self.read_only or False
-
- def _traverse_tree(self, fetch_queue, node):
- if node.can_fetch:
- if not node.files_fetched:
- self._update_self(fetch_queue, node)
- will_break = False
- for i in node.children:
- if not i.can_fetch:
- if will_break:
- break
- # Automatically mark the first one as fetcheable.
- i.can_fetch = True
- will_break = True
- self._traverse_tree(fetch_queue, i)
-
- def _update_self(self, fetch_queue, node):
- node.fetch_files(fetch_queue, self.files)
- # Grabs properties.
- if not self.command and node.data.get('command'):
- # Ensure paths are correctly separated on windows.
- self.command = node.data['command']
- if self.command:
- self.command[0] = self.command[0].replace('/', os.path.sep)
- self.command = tools.fix_python_path(self.command)
- if self.read_only is None and node.data.get('read_only') is not None:
- self.read_only = node.data['read_only']
- if (self.relative_cwd is None and
- node.data.get('relative_cwd') is not None):
- self.relative_cwd = node.data['relative_cwd']
-
-
-def fetch_isolated(
- isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
- """Aggressively downloads the .isolated file(s), then download all the files.
-
- Arguments:
- isolated_hash: hash of the root *.isolated file.
- storage: Storage class that communicates with isolate storage.
- cache: LocalCache class that knows how to store and map files locally.
- algo: hash algorithm to use.
- outdir: Output directory to map file tree to.
- os_flavor: OS flavor to choose when reading sections of *.isolated file.
- require_command: Ensure *.isolated specifies a command to run.
-
- Returns:
- Settings object that holds details about loaded *.isolated file.
- """
- with cache:
- fetch_queue = FetchQueue(storage, cache)
- settings = Settings()
-
- with tools.Profiler('GetIsolateds'):
- # Optionally support local files by manually adding them to cache.
- if not is_valid_hash(isolated_hash, algo):
- isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
-
- # Load all *.isolated and start loading rest of the files.
- settings.load(fetch_queue, isolated_hash, os_flavor, algo)
- if require_command and not settings.command:
- # TODO(vadimsh): All fetch operations are already enqueue and there's no
- # easy way to cancel them.
- raise ConfigError('No command to run')
-
- with tools.Profiler('GetRest'):
- # Create file system hierarchy.
- if not os.path.isdir(outdir):
- os.makedirs(outdir)
- create_directories(outdir, settings.files)
- create_links(outdir, settings.files.iteritems())
-
- # Ensure working directory exists.
- cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
- if not os.path.isdir(cwd):
- os.makedirs(cwd)
-
- # Multimap: digest -> list of pairs (path, props).
- remaining = {}
- for filepath, props in settings.files.iteritems():
- if 'h' in props:
- remaining.setdefault(props['h'], []).append((filepath, props))
-
- # Now block on the remaining files to be downloaded and mapped.
- logging.info('Retrieving remaining files (%d of them)...',
- fetch_queue.pending_count)
- last_update = time.time()
- with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
- while remaining:
- detector.ping()
-
- # Wait for any item to finish fetching to cache.
- digest = fetch_queue.wait(remaining)
-
- # Link corresponding files to a fetched item in cache.
- for filepath, props in remaining.pop(digest):
- cache.link(digest, os.path.join(outdir, filepath), props.get('m'))
-
- # Report progress.
- duration = time.time() - last_update
- if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
- msg = '%d files remaining...' % len(remaining)
- print msg
- logging.info(msg)
- last_update = time.time()
-
- # Cache could evict some items we just tried to fetch, it's a fatal error.
- if not fetch_queue.verify_all_cached():
- raise MappingError('Cache is too small to hold all requested files')
- return settings
-
-
-@subcommand.usage('<file1..fileN> or - to read from stdin')
-def CMDarchive(parser, args):
- """Archives data to the server."""
- options, files = parser.parse_args(args)
-
- if files == ['-']:
- files = sys.stdin.readlines()
-
- if not files:
- parser.error('Nothing to upload')
-
- # Load the necessary metadata.
- # TODO(maruel): Use a worker pool to upload as the hashing is being done.
- infiles = dict(
- (
- f,
- {
- 's': os.stat(f).st_size,
- 'h': hash_file(f, get_hash_algo(options.namespace)),
- }
- )
- for f in files)
-
- with tools.Profiler('Archive'):
- ret = upload_tree(
- base_url=options.isolate_server,
- indir=os.getcwd(),
- infiles=infiles,
- namespace=options.namespace)
- if not ret:
- print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
- return ret
-
-
-def CMDdownload(parser, args):
- """Download data from the server.
-
- It can either download individual files or a complete tree from a .isolated
- file.
- """
- parser.add_option(
- '-i', '--isolated', metavar='HASH',
- help='hash of an isolated file, .isolated file content is discarded, use '
- '--file if you need it')
- parser.add_option(
- '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
- help='hash and destination of a file, can be used multiple times')
- parser.add_option(
- '-t', '--target', metavar='DIR', default=os.getcwd(),
- help='destination directory')
- options, args = parser.parse_args(args)
- if args:
- parser.error('Unsupported arguments: %s' % args)
- if bool(options.isolated) == bool(options.file):
- parser.error('Use one of --isolated or --file, and only one.')
-
- options.target = os.path.abspath(options.target)
- storage = get_storage(options.isolate_server, options.namespace)
- cache = MemoryCache()
- algo = get_hash_algo(options.namespace)
-
- # Fetching individual files.
- if options.file:
- channel = threading_utils.TaskChannel()
- pending = {}
- for digest, dest in options.file:
- pending[digest] = dest
- storage.async_fetch(
- channel,
- WorkerPool.MED,
- digest,
- UNKNOWN_FILE_SIZE,
- functools.partial(file_write, os.path.join(options.target, dest)))
- while pending:
- fetched = channel.pull()
- dest = pending.pop(fetched)
- logging.info('%s: %s', fetched, dest)
-
- # Fetching whole isolated tree.
- if options.isolated:
- settings = fetch_isolated(
- isolated_hash=options.isolated,
- storage=storage,
- cache=cache,
- algo=algo,
- outdir=options.target,
- os_flavor=None,
- require_command=False)
- rel = os.path.join(options.target, settings.relative_cwd)
- print('To run this test please run from the directory %s:' %
- os.path.join(options.target, rel))
- print(' ' + ' '.join(settings.command))
-
- return 0
-
-
-class OptionParserIsolateServer(tools.OptionParserWithLogging):
- def __init__(self, **kwargs):
- tools.OptionParserWithLogging.__init__(self, **kwargs)
- self.add_option(
- '-I', '--isolate-server',
- metavar='URL', default='',
- help='Isolate server to use')
- self.add_option(
- '--namespace', default='default-gzip',
- help='The namespace to use on the server, default: %default')
-
- def parse_args(self, *args, **kwargs):
- options, args = tools.OptionParserWithLogging.parse_args(
- self, *args, **kwargs)
- options.isolate_server = options.isolate_server.rstrip('/')
- if not options.isolate_server:
- self.error('--isolate-server is required.')
- return options, args
-
-
-def main(args):
- dispatcher = subcommand.CommandDispatcher(__name__)
- try:
- return dispatcher.execute(
- OptionParserIsolateServer(version=__version__), args)
- except Exception as e:
- tools.report_error(e)
- return 1
-
-
-if __name__ == '__main__':
- fix_encoding.fix_encoding()
- tools.disable_buffering()
- colorama.init()
- sys.exit(main(sys.argv[1:]))
« no previous file with comments | « swarm_client/isolate_merge.py ('k') | swarm_client/run_isolated.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698