Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(407)

Unified Diff: isolateserver.py

Issue 25093003: Client side implementation of new /content-gs isolate protocol. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: PendingPush stuff Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: isolateserver.py
diff --git a/isolateserver.py b/isolateserver.py
index 2205a182ea81eedf2dd366438ddc8ae5a12d4e25..9eb68bc91ed282891780d026fbeb6115ba8dfabf 100755
--- a/isolateserver.py
+++ b/isolateserver.py
@@ -7,12 +7,10 @@
__version__ = '0.2'
-import binascii
import hashlib
import json
import logging
import os
-import random
import re
import sys
import threading
@@ -29,14 +27,15 @@ from utils import threading_utils
from utils import tools
-# The minimum size of files to upload directly to the blobstore.
-MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024
+# Version of isolate protocol passed to the server in /handshake request.
+ISOLATE_PROTOCOL_VERSION = '1.0'
-# The number of files to check the isolate server per /contains query.
+
+# The number of files to check the isolate server per /pre-upload query.
# All files are sorted by likelihood of a change in the file content
# (currently file size is used to estimate this: larger the file -> larger the
# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
-# are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1],
+# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
# and so on. Numbers here is a trade-off; the more per request, the lower the
# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
# larger values cause longer lookups, increasing the initial latency to start
@@ -62,10 +61,12 @@ UNKNOWN_FILE_SIZE = None
# The size of each chunk to read when downloading and unzipping files.
ZIPPED_FILE_CHUNK = 16 * 1024
-
# Chunk size to use when doing disk I/O.
DISK_FILE_CHUNK = 1024 * 1024
+# Chunk size to use when reading from network stream.
+NET_IO_FILE_CHUNK = 16 * 1024
+
# Read timeout in seconds for downloads from isolate storage. If there's no
# response from the server within this timeout whole download will be aborted.
@@ -106,63 +107,6 @@ class MappingError(OSError):
pass
-def randomness():
- """Generates low-entropy randomness for MIME encoding.
-
- Exists so it can be mocked out in unit tests.
- """
- return str(time.time())
-
-
-def encode_multipart_formdata(fields, files,
- mime_mapper=lambda _: 'application/octet-stream'):
- """Encodes a Multipart form data object.
-
- Args:
- fields: a sequence (name, value) elements for
- regular form fields.
- files: a sequence of (name, filename, value) elements for data to be
- uploaded as files.
- mime_mapper: function to return the mime type from the filename.
- Returns:
- content_type: for httplib.HTTP instance
- body: for httplib.HTTP instance
- """
- boundary = hashlib.md5(randomness()).hexdigest()
- body_list = []
- for (key, value) in fields:
- if isinstance(key, unicode):
- value = key.encode('utf-8')
- if isinstance(value, unicode):
- value = value.encode('utf-8')
- body_list.append('--' + boundary)
- body_list.append('Content-Disposition: form-data; name="%s"' % key)
- body_list.append('')
- body_list.append(value)
- body_list.append('--' + boundary)
- body_list.append('')
- for (key, filename, value) in files:
- if isinstance(key, unicode):
- value = key.encode('utf-8')
- if isinstance(filename, unicode):
- value = filename.encode('utf-8')
- if isinstance(value, unicode):
- value = value.encode('utf-8')
- body_list.append('--' + boundary)
- body_list.append('Content-Disposition: form-data; name="%s"; '
- 'filename="%s"' % (key, filename))
- body_list.append('Content-Type: %s' % mime_mapper(filename))
- body_list.append('')
- body_list.append(value)
- body_list.append('--' + boundary)
- body_list.append('')
- if body_list:
- body_list[-2] += '--'
- body = '\r\n'.join(body_list)
- content_type = 'multipart/form-data; boundary=%s' % boundary
- return content_type, body
-
-
def is_valid_hash(value, algo):
"""Returns if the value is a valid hash for the corresponding algorithm."""
size = 2 * algo().digest_size
@@ -184,6 +128,15 @@ def hash_file(filepath, algo):
return digest.hexdigest()
+def stream_read(stream, chunk_size):
+ """Reads chunks from |stream| and yields them."""
+ while True:
+ data = stream.read(chunk_size)
+ if not data:
+ break
+ yield data
+
+
def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
"""Yields file content in chunks of given |chunk_size|."""
with open(filepath, 'rb') as f:
@@ -226,6 +179,37 @@ def zip_compress(content_generator, level=7):
yield tail
+def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
+ """Reads zipped data from |content_generator| and yields decompressed data.
+
+ Decompresses data in small chunks (no larger than |chunk_size|) so that
+ zip bomb file doesn't cause zlib to preallocate huge amount of memory.
+
+ Raises IOError if data is corrupted or incomplete.
+ """
+ decompressor = zlib.decompressobj()
+ compressed_size = 0
+ try:
+ for chunk in content_generator:
+ compressed_size += len(chunk)
+ data = decompressor.decompress(chunk, chunk_size)
+ if data:
+ yield data
+ while decompressor.unconsumed_tail:
+ data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
+ if data:
+ yield data
+ tail = decompressor.flush()
+ if tail:
+ yield tail
+ except zlib.error as e:
+ raise IOError(
+ 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
+ # Ensure all data was read and decompressed.
+ if decompressor.unused_data or decompressor.unconsumed_tail:
+ raise IOError('Not all data was decompressed')
+
+
def get_zip_compression_level(filename):
"""Given a filename calculates the ideal zip compression level to use."""
file_ext = os.path.splitext(filename)[1].lower()
@@ -301,15 +285,6 @@ def try_remove(filepath):
pass
-def url_read(url, **kwargs):
- result = net.url_read(url, **kwargs)
- if result is None:
- # If we get no response from the server, assume it is down and raise an
- # exception.
- raise MappingError('Unable to connect to server %s' % url)
- return result
-
-
class Storage(object):
"""Efficiently downloads or uploads large set of files via StorageApi."""
@@ -366,9 +341,9 @@ class Storage(object):
# Enqueue all upload tasks.
channel = threading_utils.TaskChannel()
missing = []
- for filename, metadata, push_urls in self.get_missing_files(infiles):
- missing.append((filename, metadata))
- path = os.path.join(indir, filename)
+ for pending_push in self.get_missing_files(infiles):
+ missing.append(pending_push)
+ path = os.path.join(indir, pending_push.filename)
if metadata.get('priority', '1') == '0':
priority = WorkerPool.HIGH
else:
@@ -376,8 +351,8 @@ class Storage(object):
compression_level = get_zip_compression_level(path)
chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK
content = file_read(path, chunk_size)
- self.async_push(channel, priority, metadata['h'], metadata['s'],
- content, compression_level, push_urls)
+ self.async_push(channel, priority, pending_push,
+ content, compression_level)
# No need to spawn deadlock detector thread if there's nothing to upload.
if missing:
@@ -415,8 +390,8 @@ class Storage(object):
len(cache_miss) * 100. / total,
cache_miss_size * 100. / total_size if total_size else 0)
- def async_push(self, channel, priority, item, expected_size,
- content_generator, compression_level, push_urls=None):
+ def async_push(self, channel, priority, pending_push,
+ content_generator, compression_level):
"""Starts asynchronous push to the server in a parallel thread."""
def push(content, size):
"""Pushes an item and returns its id, to pass as a result to |channel|."""
@@ -453,7 +428,7 @@ class Storage(object):
files: a dictionary file name -> metadata dict.
Yields:
- Triplets (file name, metadata dict, push_urls object to pass to push).
+ PendingPush objects that represent missing files.
"""
channel = threading_utils.TaskChannel()
pending = 0
@@ -512,14 +487,14 @@ class StorageApi(object):
"""
raise NotImplementedError()
- def push(self, item, expected_size, content_generator, push_urls=None):
- """Uploads content generated by |content_generator| as |item|.
+ def push(self, pending_push, content_generator):
+ """Uploads content generated by |content_generator|.
+
+ Actual item being pushed is identified by |pending_push| object.
Arguments:
- item: hash digest of item to upload.
- expected_size: total length of the content yielded by |content_generator|.
+ pending_push: PendingPush object returned by 'contains' call.
content_generator: generator that produces chunks to push.
- push_urls: optional URLs returned by 'contains' call for this item.
Returns:
None.
@@ -533,91 +508,157 @@ class StorageApi(object):
files: list of pairs (file name, metadata dict).
Returns:
- A list of files missing on server as a list of triplets
- (file name, metadata dict, push_urls object to pass to push).
+ A list of files missing on server as a list of PendingPush objects.
"""
raise NotImplementedError()
+class PendingPush(object):
M-A Ruel 2013/09/30 19:38:27 I prefer the class to be before Storage.
+ """A file that were not found on the server and that should be uploaded.
M-A Ruel 2013/09/30 19:38:27 """A file that was not ...
+
+ Returned by StorageApi's 'contains' call, passed to StorageApi's 'push'.
M-A Ruel 2013/09/30 19:38:27 Returned by StorageApi.contains(), to be passed to
+ """
+
+ def __init__(self, filename, metadata):
+ self.filename = filename
+ self.metadata = metadata
+
+
class IsolateServer(StorageApi):
- """StorageApi implementation that downloads and uploads to Isolate Server."""
+ """StorageApi implementation that downloads and uploads to Isolate Server.
+
+ It uploads and downloads directly from Google Storage whenever appropriate.
+ """
+
+ class PendingIsolatePush(PendingPush):
+ def __init__(self, filename, metadata, push_urls):
+ super(IsolateServer.PendingIsolatePush, self).__init__(filename, metadata)
+ self.push_urls = push_urls
+ self.uploaded = False
+
def __init__(self, base_url, namespace):
super(IsolateServer, self).__init__()
assert base_url.startswith('http'), base_url
- self.content_url = base_url.rstrip('/') + '/content/'
+ self.base_url = base_url.rstrip('/')
self.namespace = namespace
self.algo = get_hash_algo(namespace)
- self._token = None
+ self._use_zip = is_namespace_with_compression(namespace)
self._lock = threading.Lock()
+ self._server_caps = None
+
+ @staticmethod
+ def generate_handshake_request():
+ """Returns a dict to be sent as handshake request body."""
+ # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
+ return {
+ 'protocol_version': ISOLATE_PROTOCOL_VERSION,
M-A Ruel 2013/09/30 19:38:27 Sort here too
+ 'client_app_version': __version__,
+ 'fetcher': True,
+ 'pusher': True,
+ }
+
+ @staticmethod
+ def validate_handshake_response(caps):
+ """Validates and normalizes handshake response."""
+ logging.info('Protocol version: %s', caps['protocol_version'])
+ logging.info('Server version: %s', caps['server_app_version'])
+ if caps.get('error'):
+ raise MappingError(caps['error'])
+ if not caps['access_token']:
+ raise ValueError('access_token is missing')
+ return caps
@property
- def token(self):
+ def server_capabilities(self):
+ """Performs handshake with the server if not yet done.
+
+ Returns:
+ Server capabilities dictionary as returned by /handshake endpoint.
+
+ Raises:
+ MappingError if server rejects the handshake.
+ """
# TODO(maruel): Make this request much earlier asynchronously while the
# files are being enumerated.
with self._lock:
- if not self._token:
- self._token = urllib.quote(url_read(self.content_url + 'get_token'))
- return self._token
+ if self._server_caps is None:
+ request_body = json.dumps(
+ self.generate_handshake_request(), separators=(',', ':'))
+ response = net.url_read(
+ url=self.base_url + '/content-gs/handshake',
+ data=request_body,
+ content_type='application/json',
+ method='POST')
+ if response is None:
+ raise MappingError('Failed to perform handshake.')
+ try:
+ caps = json.loads(response)
+ if not isinstance(caps, dict):
+ raise ValueError('Expecting JSON dict')
+ self._server_caps = self.validate_handshake_response(caps)
+ except (ValueError, KeyError, TypeError) as exc:
+ # KeyError exception has very confusing str conversion: it's just a
+ # missing key value and nothing else. So print exception class name
+ # as well.
+ raise MappingError('Invalid handshake response (%s): %s' % (
+ exc.__class__.__name__, exc))
+ return self._server_caps
def fetch(self, item, expected_size):
assert isinstance(item, basestring)
- assert (
- isinstance(expected_size, (int, long)) or
+ assert (isinstance(expected_size, (int, long)) or
expected_size == UNKNOWN_FILE_SIZE)
- zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item)
- logging.debug('download_file(%s)', zipped_url)
+
+ source_url = '%s/content-gs/retrieve/%s/%s' % (
+ self.base_url, self.namespace, item)
+ logging.debug('download_file(%s)', source_url)
# Because the app engine DB is only eventually consistent, retry 404 errors
# because the file might just not be visible yet (even though it has been
# uploaded).
connection = net.url_open(
- zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
+ source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
if not connection:
- raise IOError('Unable to open connection to %s' % zipped_url)
+ raise IOError('Unable to open connection to %s' % source_url)
- # TODO(maruel): Must only decompress when needed.
- decompressor = zlib.decompressobj()
try:
- compressed_size = 0
- decompressed_size = 0
- while True:
- chunk = connection.read(ZIPPED_FILE_CHUNK)
- if not chunk:
- break
- compressed_size += len(chunk)
- decompressed = decompressor.decompress(chunk)
- decompressed_size += len(decompressed)
- yield decompressed
-
- # Ensure that all the data was properly decompressed.
- uncompressed_data = decompressor.flush()
- if uncompressed_data:
- raise IOError('Decompression failed')
- if (expected_size != UNKNOWN_FILE_SIZE and
- decompressed_size != expected_size):
- raise IOError('File incorrect size after download of %s. Got %s and '
- 'expected %s' % (item, decompressed_size, expected_size))
- except zlib.error as e:
- msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % (
- item, compressed_size, connection.content_length, e)
- logging.warning(msg)
-
- # Testing seems to show that if a few machines are trying to download
- # the same blob, they can cause each other to fail. So if we hit a zip
- # error, this is the most likely cause (it only downloads some of the
- # data). Randomly sleep for between 5 and 25 seconds to try and spread
- # out the downloads.
- sleep_duration = (random.random() * 20) + 5
- time.sleep(sleep_duration)
- raise IOError(msg)
-
- def push(self, item, expected_size, content_generator, push_urls=None):
+ # Prepare reading pipeline.
+ generator = stream_read(connection, NET_IO_FILE_CHUNK)
+ if self._use_zip:
+ generator = zip_decompress(generator, DISK_FILE_CHUNK)
+
+ # Read and yield data, calculate total length of the decompressed stream.
+ total_size = 0
+ for chunk in generator:
+ total_size += len(chunk)
+ yield chunk
+
+ # Verify data length matches expectation.
+ if expected_size != UNKNOWN_FILE_SIZE and total_size != expected_size:
+ raise IOError('Incorrect file size: expected %d, got %d' % (
+ expected_size, total_size))
+
+ except IOError as err:
+ logging.warning('Failed to fetch %s: %s', item, err)
+ raise
+
+ def push(self, pending_push, content_generator):
+ assert isinstance(pending_push, IsolateServer.PendingIsolatePush)
+ # TODO: use |pending_push| below.
assert isinstance(item, basestring)
assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
- item = str(item)
+ assert push_urls and len(push_urls) == 2
+ upload_url, finalize_url = push_urls
# TODO(maruel): Support large files. This would require streaming support.
+ # TODO(vadimsh): Do not read from |content_generator| when retrying push.
+ # If |content_generator| is indeed a generator, it can not be re-winded back
+ # to the beginning of the stream. A retry will find it exhausted. A possible
+ # solution is to wrap content_generator with some sort of caching
+ # restartable generator. It should be done alongside streaming support
+ # implementation.
+
# A cheese way to avoid memcpy of (possibly huge) file, until streaming
# upload support is implemented.
if isinstance(content_generator, list) and len(content_generator) == 1:
@@ -625,64 +666,77 @@ class IsolateServer(StorageApi):
else:
content = ''.join(content_generator)
- if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE:
- return self._upload_hash_content_to_blobstore(item, content)
-
- url = '%sstore/%s/%s?token=%s' % (
- self.content_url, self.namespace, item, self.token)
- return url_read(url, data=content, content_type='application/octet-stream')
+ # PUT file to |upload_url|.
+ response = net.url_read(
+ url=upload_url,
+ data=content,
+ content_type='application/octet-stream',
+ method='PUT')
+ if response is None:
+ raise IOError('Failed to upload a file %s to %s' % (item, upload_url))
+
+ # Optionally notify the server that it's done.
+ if finalize_url:
+ # TODO(vadimsh): Calculate MD5 sum while uploading a file and send it to
+ # isolated server. That way isolate server can verify that the data safely
+ # reached Google Storage (GS provides MD5 of stored files).
+ response = net.url_read(
+ url=finalize_url,
+ data='',
+ content_type='application/json',
+ method='POST')
+ # TODO(vadimsh): Do not reupload item again if only finalize_url request
+ # failed.
+ if response is None:
+ raise IOError('Failed to finalize an upload of %s' % item)
def contains(self, files):
logging.info('Checking existence of %d files...', len(files))
- body = ''.join(
- (binascii.unhexlify(metadata['h']) for (_, metadata) in files))
- assert (len(body) % self.algo().digest_size) == 0, repr(body)
+ # Request body is a json encoded list of dicts.
+ body = [
+ {
+ 'h': metadata['h'],
+ 's': metadata['s'],
+ 'i': 1 if metadata.get('priority') == '0' else 0,
+ } for _, metadata in files
+ ]
- query_url = '%scontains/%s?token=%s' % (
- self.content_url, self.namespace, self.token)
- response = url_read(
- query_url, data=body, content_type='application/octet-stream')
- if len(files) != len(response):
+ query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
+ self.base_url,
+ self.namespace,
+ urllib.quote(self.server_capabilities['access_token']))
+ response_body = net.url_read(
+ url=query_url,
+ data=json.dumps(body, separators=(',', ':')),
+ content_type='application/json',
+ method='POST')
+ if response_body is None:
+ raise MappingError('Failed to execute /pre-upload query')
+
+ # Response body is a list of push_urls (or null if file is already present).
+ try:
+ response = json.loads(response_body)
+ if not isinstance(response, list):
+ raise ValueError('Expecting response with json-encoded list')
+ if len(response) != len(files):
+ raise ValueError(
+ 'Incorrect number of items in the list, expected %d, '
+ 'but got %d' % (len(files), len(response)))
+ except ValueError as err:
raise MappingError(
- 'Got an incorrect number of responses from the server. Expected %d, '
- 'but got %d' % (len(files), len(response)))
+ 'Invalid response from server: %s, body is %s' % (err, response_body))
- # This implementation of IsolateServer doesn't use push_urls field,
- # set it to None.
+ # Convert response into a list of triplets with info about missing files.
missing_files = [
- files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00'
+ IsolateServer.PendingIsolatePush(files[0], files[1], push_urls)
M-A Ruel 2013/09/30 19:38:27 You probably meant files[i][0], files[i][1] :)
+ for i, push_urls in enumerate(response)
+ if push_urls
]
logging.info('Queried %d files, %d cache hit',
len(files), len(files) - len(missing_files))
return missing_files
- def _upload_hash_content_to_blobstore(self, item, content):
- """Uploads the content directly to the blobstore via a generated url."""
- # TODO(maruel): Support large files. This would require streaming support.
- gen_url = '%sgenerate_blobstore_url/%s/%s' % (
- self.content_url, self.namespace, item)
- # Token is guaranteed to be already quoted but it is unnecessary here, and
- # only here.
- data = [('token', urllib.unquote(self.token))]
- content_type, body = encode_multipart_formdata(
- data, [('content', item, content)])
- last_url = gen_url
- for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
- # Retry HTTP 50x here but not 404.
- upload_url = net.url_read(gen_url, data=data)
- if not upload_url:
- raise MappingError('Unable to connect to server %s' % gen_url)
- last_url = upload_url
-
- # Do not retry this request on HTTP 50x. Regenerate an upload url each
- # time since uploading "consumes" the upload url.
- result = net.url_read(
- upload_url, data=body, content_type=content_type, retry_50x=False)
- if result is not None:
- return result
- raise MappingError('Unable to connect to server %s' % last_url)
-
class FileSystem(StorageApi):
"""StorageApi implementation that fetches data from the file system.
@@ -690,6 +744,7 @@ class FileSystem(StorageApi):
The common use case is a NFS/CIFS file server that is mounted locally that is
used to fetch the file on a local partition.
"""
+
def __init__(self, base_path):
super(FileSystem, self).__init__()
self.base_path = base_path
@@ -715,7 +770,7 @@ class FileSystem(StorageApi):
def contains(self, files):
return [
- (filename, metadata, None)
+ PendingPush(filename, metadata)
for filename, metadata in files
if not os.path.exists(os.path.join(self.base_path, metadata['h']))
]
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698