Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(504)

Unified Diff: isolateserver.py

Issue 24578004: Client side implementation of new /content-gs isolate protocol. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: isolateserver.py
diff --git a/isolateserver.py b/isolateserver.py
index 2205a182ea81eedf2dd366438ddc8ae5a12d4e25..f0ae85107f9c66c5196adcb93e063dae22cdc6e6 100755
--- a/isolateserver.py
+++ b/isolateserver.py
@@ -29,6 +29,13 @@ from utils import threading_utils
from utils import tools
+# Switch that controls what IsolateServer implementation to use by default.
+ENABLE_GS_ISOLATE_API = False
Vadim Sh. 2013/09/26 01:14:19 I need your advice on how to deal with conditional
M-A Ruel 2013/09/26 01:21:47 3) Let it bake on the canary, then roll deps and
Vadim Sh. 2013/09/27 20:28:12 Sweet. It simplifies a lot.
+
+# Version of isolate protocol passed to the server in /handshake request.
+ISOLATE_PROTOCOL_VERSION = '1.0'
+
+
# The minimum size of files to upload directly to the blobstore.
MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024
@@ -62,10 +69,12 @@ UNKNOWN_FILE_SIZE = None
# The size of each chunk to read when downloading and unzipping files.
ZIPPED_FILE_CHUNK = 16 * 1024
-
# Chunk size to use when doing disk I/O.
DISK_FILE_CHUNK = 1024 * 1024
+# Chunk size to use when reading from network stream.
+NET_IO_FILE_CHUNK = 16 * 1024
+
# Read timeout in seconds for downloads from isolate storage. If there's no
# response from the server within this timeout whole download will be aborted.
@@ -184,6 +193,15 @@ def hash_file(filepath, algo):
return digest.hexdigest()
+def stream_read(stream, chunk_size):
+ """Reads chunks from |stream| and yields them."""
+ while True:
+ data = stream.read(chunk_size)
+ if not data:
+ break
+ yield data
+
+
def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
"""Yields file content in chunks of given |chunk_size|."""
with open(filepath, 'rb') as f:
@@ -226,6 +244,33 @@ def zip_compress(content_generator, level=7):
yield tail
+def zip_decompress(content_generator):
+ """Reads zipped data from |content_generator| and yields decompressed data.
+
+ |content_generator| should yield an entire zipped stream, this function will
+ ensure all passed data is decompressed.
+
+ Raises IOError if data is corrupted or incomplete.
+ """
+ decompressor = zlib.decompressobj()
+ compressed_size = 0
+ try:
+ for chunk in content_generator:
+ compressed_size += len(chunk)
+ decompressed = decompressor.decompress(chunk)
M-A Ruel 2013/09/26 01:21:47 You should create an inner loop here, so that if a
Vadim Sh. 2013/09/27 20:28:12 Done.
+ if decompressed:
+ yield decompressed
+ tail = decompressor.flush()
+ if tail:
+ yield tail
+ except zlib.error as e:
+ raise IOError(
+ 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
+ # Ensure all data was read and decompressed.
+ if decompressor.unused_data or decompressor.unconsumed_tail:
+ raise IOError('Not all data was decompressed')
+
+
def get_zip_compression_level(filename):
"""Given a filename calculates the ideal zip compression level to use."""
file_ext = os.path.splitext(filename)[1].lower()
@@ -684,6 +729,198 @@ class IsolateServer(StorageApi):
raise MappingError('Unable to connect to server %s' % last_url)
+class IsolateServerGS(StorageApi):
+ """StorageApi implementation that downloads and uploads to Isolate Server.
+
+ It uploads and downloads directly from Google Storage whenever appropriate.
+ """
+
+ def __init__(self, base_url, namespace):
+ super(IsolateServerGS, self).__init__()
+ assert base_url.startswith('http'), base_url
+ self.base_url = base_url.rstrip('/')
+ self.namespace = namespace
+ self.algo = get_hash_algo(namespace)
+ self._use_zip = is_namespace_with_compression(namespace)
+ self._lock = threading.Lock()
+ self._server_caps = None
+
+ @staticmethod
+ def generate_handshake_request():
+ """Returns a dict to be sent as handshake request body."""
+ # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
+ return {
+ 'protocol_version': ISOLATE_PROTOCOL_VERSION,
+ 'client_app_version': __version__,
+ 'fetcher': True,
+ 'pusher': True,
+ }
+
+ @staticmethod
+ def validate_handshake_response(caps):
+ """Validates and normalizes handshake response."""
+ logging.info('Protocol version: %s', caps['protocol_version'])
+ logging.info('Server version: %s', caps['server_app_version'])
+ if "error" in caps:
M-A Ruel 2013/09/26 01:21:47 'error'
Vadim Sh. 2013/09/27 20:28:12 Done.
+ raise MappingError(caps['error'])
+ if not caps['access_token']:
+ raise ValueError('access_token is missing')
+ return caps
+
+ @property
+ def server_capabilities(self):
+ """Performs handshake with the server if not yet done.
+
+ Returns:
+ Server capabilities dictionary as returned by /handshake endpoint.
+
+ Raises:
+ MappingError if server rejects the handshake.
+ """
+ # TODO(maruel): Make this request much earlier asynchronously while the
+ # files are being enumerated.
+ with self._lock:
+ if self._server_caps is None:
+ response = net.url_read(
+ url=self.base_url + '/content-gs/handshake',
+ data=json.dumps(self.generate_handshake_request()),
+ content_type='application/json')
+ try:
+ caps = json.loads(response)
+ if not isinstance(caps, dict):
+ raise ValueError('Expecting JSON dict')
+ self._server_caps = self.validate_handshake_response(caps)
+ except (ValueError, KeyError, TypeError) as exc:
+ raise MappingError('Invalid handshake response: %s' % exc)
+ return self._server_caps
+
+ @property
+ def token(self):
+ """Returns access token received through handshake process."""
+ return self.server_capabilities['access_token']
+
+ def fetch(self, item, expected_size):
+ assert isinstance(item, basestring)
+ assert (isinstance(expected_size, (int, long)) or
+ expected_size == UNKNOWN_FILE_SIZE)
+
+ source_url = '%s/content-gs/retrieve/%s/%s' % (
+ self.base_url, self.namespace, item)
+ logging.debug('download_file(%s)', source_url)
+
+ # Because the app engine DB is only eventually consistent, retry 404 errors
+ # because the file might just not be visible yet (even though it has been
+ # uploaded).
+ connection = net.url_open(
+ source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
+ if not connection:
+ raise IOError('Unable to open connection to %s' % source_url)
+
+ try:
+ # Prepare reading pipeline.
+ generator = stream_read(connection, NET_IO_FILE_CHUNK)
+ if self._use_zip:
+ generator = zip_decompress(generator)
+
+ # Read and yield data, calculate total length of the decompressed stream.
+ total_size = 0
+ for chunk in generator:
+ total_size += len(chunk)
+ yield chunk
+
+ # Verify data length matches expectation.
+ if expected_size != UNKNOWN_FILE_SIZE and total_size != expected_size:
+ raise IOError('Incorrect file size: expected %d, got %d' % (
+ expected_size, total_size))
+
+ except IOError as err:
+ logging.warning('Failed to fetch %s: %s', item, err)
+
+ # Testing seems to show that if a few machines are trying to download
M-A Ruel 2013/09/26 01:21:47 This likely doesn't apply here.
Vadim Sh. 2013/09/27 20:28:12 Done.
+ # the same blob, they can cause each other to fail. So if we hit a zip
+ # error, this is the most likely cause (it only downloads some of the
+ # data). Randomly sleep for between 5 and 25 seconds to try and spread
+ # out the downloads.
+ sleep_duration = (random.random() * 20) + 5
+ time.sleep(sleep_duration)
+ raise
+
+ def push(self, item, expected_size, content_generator, push_urls=None):
+ assert isinstance(item, basestring)
+ assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
+ assert push_urls and len(push_urls) == 2
+ upload_url, finalize_url = push_urls
+
+ # TODO(maruel): Support large files. This would require streaming support.
+
+ # TODO(vadimsh): Do not read from |content_generator| when retrying push.
+ # If |content_generator| is indeed a generator, it can not be re-winded back
+ # to the beginning of the stream. A retry will find it exhausted. A possible
+ # solution is to wrap content_generator with some sort of caching
+ # restartable generator. It should be done alongside streaming support
+ # implementation.
+
+ # A cheese way to avoid memcpy of (possibly huge) file, until streaming
+ # upload support is implemented.
+ if isinstance(content_generator, list) and len(content_generator) == 1:
+ content = content_generator[0]
+ else:
+ content = ''.join(content_generator)
+
+ # PUT file to |upload_url|.
+ response = net.url_read(upload_url, data=content,
+ content_type='application/octet-stream', method='PUT')
+ if response is None:
+ raise IOError('Failed to upload a file %s to %s' % (item, upload_url))
+
+ # Optionally notify the server that it's done.
+ if finalize_url:
+ response = net.url_read(finalize_url, data={}, method='POST')
+ if response is None:
+ raise IOError('Failed to finalize an upload of %s' % item)
+
+ def contains(self, files):
+ logging.info('Checking existence of %d files...', len(files))
+
+ # Request body is a json encoded list of dicts.
+ body = [
+ {
+ 'h': metadata['h'],
+ 's': metadata['s'],
+ 'i': 1 if metadata.get('priority') == '0' else 0,
+ } for _, metadata in files
+ ]
+
+ query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
+ self.base_url, self.namespace, self.token)
+ response_body = url_read(
+ query_url,
+ data=json.dumps(body, separators=(',', ':')),
+ content_type='application/json')
+
+ # Response body is a list of push_urls (or null if file is already present).
+ try:
+ response = json.loads(response_body)
+ if not isinstance(response, list):
+ raise ValueError('Expecting response with json-encoded list')
+ if len(response) != len(files):
+ raise ValueError(
+ 'Incorrect number of items in the list, expected %d, '
+ 'but got %d' % (len(files), len(response)))
+ except ValueError as err:
+ raise MappingError(
+ 'Invalid response from server: %s, body is %s' % (err, response_body))
+
+ # Convert response into a list of triplets with info about missing files.
+ missing_files = [
+ files[i] + (push_urls,) for i, push_urls in enumerate(response)
+ if push_urls is not None
M-A Ruel 2013/09/26 01:21:47 is "if push_urls" sufficient?
Vadim Sh. 2013/09/27 20:28:12 Done.
+ ]
+ logging.info('Queried %d files, %d cache hit',
+ len(files), len(files) - len(missing_files))
+ return missing_files
+
+
class FileSystem(StorageApi):
"""StorageApi implementation that fetches data from the file system.
@@ -735,7 +972,10 @@ def is_namespace_with_compression(namespace):
def get_storage_api(file_or_url, namespace):
"""Returns an object that implements StorageApi interface."""
if re.match(r'^https?://.+$', file_or_url):
- return IsolateServer(file_or_url, namespace)
+ if ENABLE_GS_ISOLATE_API:
+ return IsolateServerGS(file_or_url, namespace)
+ else:
+ return IsolateServer(file_or_url, namespace)
else:
return FileSystem(file_or_url)
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698