Chromium Code Reviews| Index: isolateserver.py |
| diff --git a/isolateserver.py b/isolateserver.py |
| index 2205a182ea81eedf2dd366438ddc8ae5a12d4e25..9eb68bc91ed282891780d026fbeb6115ba8dfabf 100755 |
| --- a/isolateserver.py |
| +++ b/isolateserver.py |
| @@ -7,12 +7,10 @@ |
| __version__ = '0.2' |
| -import binascii |
| import hashlib |
| import json |
| import logging |
| import os |
| -import random |
| import re |
| import sys |
| import threading |
| @@ -29,14 +27,15 @@ from utils import threading_utils |
| from utils import tools |
| -# The minimum size of files to upload directly to the blobstore. |
| -MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 |
| +# Version of isolate protocol passed to the server in /handshake request. |
| +ISOLATE_PROTOCOL_VERSION = '1.0' |
| -# The number of files to check the isolate server per /contains query. |
| + |
| +# The number of files to check the isolate server per /pre-upload query. |
| # All files are sorted by likelihood of a change in the file content |
| # (currently file size is used to estimate this: larger the file -> larger the |
| # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files |
| -# are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], |
| +# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1], |
| # and so on. Numbers here is a trade-off; the more per request, the lower the |
| # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, |
| # larger values cause longer lookups, increasing the initial latency to start |
| @@ -62,10 +61,12 @@ UNKNOWN_FILE_SIZE = None |
| # The size of each chunk to read when downloading and unzipping files. |
| ZIPPED_FILE_CHUNK = 16 * 1024 |
| - |
| # Chunk size to use when doing disk I/O. |
| DISK_FILE_CHUNK = 1024 * 1024 |
| +# Chunk size to use when reading from network stream. |
| +NET_IO_FILE_CHUNK = 16 * 1024 |
| + |
| # Read timeout in seconds for downloads from isolate storage. If there's no |
| # response from the server within this timeout whole download will be aborted. |
| @@ -106,63 +107,6 @@ class MappingError(OSError): |
| pass |
| -def randomness(): |
| - """Generates low-entropy randomness for MIME encoding. |
| - |
| - Exists so it can be mocked out in unit tests. |
| - """ |
| - return str(time.time()) |
| - |
| - |
| -def encode_multipart_formdata(fields, files, |
| - mime_mapper=lambda _: 'application/octet-stream'): |
| - """Encodes a Multipart form data object. |
| - |
| - Args: |
| - fields: a sequence (name, value) elements for |
| - regular form fields. |
| - files: a sequence of (name, filename, value) elements for data to be |
| - uploaded as files. |
| - mime_mapper: function to return the mime type from the filename. |
| - Returns: |
| - content_type: for httplib.HTTP instance |
| - body: for httplib.HTTP instance |
| - """ |
| - boundary = hashlib.md5(randomness()).hexdigest() |
| - body_list = [] |
| - for (key, value) in fields: |
| - if isinstance(key, unicode): |
| - value = key.encode('utf-8') |
| - if isinstance(value, unicode): |
| - value = value.encode('utf-8') |
| - body_list.append('--' + boundary) |
| - body_list.append('Content-Disposition: form-data; name="%s"' % key) |
| - body_list.append('') |
| - body_list.append(value) |
| - body_list.append('--' + boundary) |
| - body_list.append('') |
| - for (key, filename, value) in files: |
| - if isinstance(key, unicode): |
| - value = key.encode('utf-8') |
| - if isinstance(filename, unicode): |
| - value = filename.encode('utf-8') |
| - if isinstance(value, unicode): |
| - value = value.encode('utf-8') |
| - body_list.append('--' + boundary) |
| - body_list.append('Content-Disposition: form-data; name="%s"; ' |
| - 'filename="%s"' % (key, filename)) |
| - body_list.append('Content-Type: %s' % mime_mapper(filename)) |
| - body_list.append('') |
| - body_list.append(value) |
| - body_list.append('--' + boundary) |
| - body_list.append('') |
| - if body_list: |
| - body_list[-2] += '--' |
| - body = '\r\n'.join(body_list) |
| - content_type = 'multipart/form-data; boundary=%s' % boundary |
| - return content_type, body |
| - |
| - |
| def is_valid_hash(value, algo): |
| """Returns if the value is a valid hash for the corresponding algorithm.""" |
| size = 2 * algo().digest_size |
| @@ -184,6 +128,15 @@ def hash_file(filepath, algo): |
| return digest.hexdigest() |
| +def stream_read(stream, chunk_size): |
| + """Reads chunks from |stream| and yields them.""" |
| + while True: |
| + data = stream.read(chunk_size) |
| + if not data: |
| + break |
| + yield data |
| + |
| + |
| def file_read(filepath, chunk_size=DISK_FILE_CHUNK): |
| """Yields file content in chunks of given |chunk_size|.""" |
| with open(filepath, 'rb') as f: |
| @@ -226,6 +179,37 @@ def zip_compress(content_generator, level=7): |
| yield tail |
| +def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK): |
| + """Reads zipped data from |content_generator| and yields decompressed data. |
| + |
| + Decompresses data in small chunks (no larger than |chunk_size|) so that |
| + zip bomb file doesn't cause zlib to preallocate huge amount of memory. |
| + |
| + Raises IOError if data is corrupted or incomplete. |
| + """ |
| + decompressor = zlib.decompressobj() |
| + compressed_size = 0 |
| + try: |
| + for chunk in content_generator: |
| + compressed_size += len(chunk) |
| + data = decompressor.decompress(chunk, chunk_size) |
| + if data: |
| + yield data |
| + while decompressor.unconsumed_tail: |
| + data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size) |
| + if data: |
| + yield data |
| + tail = decompressor.flush() |
| + if tail: |
| + yield tail |
| + except zlib.error as e: |
| + raise IOError( |
| + 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e)) |
| + # Ensure all data was read and decompressed. |
| + if decompressor.unused_data or decompressor.unconsumed_tail: |
| + raise IOError('Not all data was decompressed') |
| + |
| + |
| def get_zip_compression_level(filename): |
| """Given a filename calculates the ideal zip compression level to use.""" |
| file_ext = os.path.splitext(filename)[1].lower() |
| @@ -301,15 +285,6 @@ def try_remove(filepath): |
| pass |
| -def url_read(url, **kwargs): |
| - result = net.url_read(url, **kwargs) |
| - if result is None: |
| - # If we get no response from the server, assume it is down and raise an |
| - # exception. |
| - raise MappingError('Unable to connect to server %s' % url) |
| - return result |
| - |
| - |
| class Storage(object): |
| """Efficiently downloads or uploads large set of files via StorageApi.""" |
| @@ -366,9 +341,9 @@ class Storage(object): |
| # Enqueue all upload tasks. |
| channel = threading_utils.TaskChannel() |
| missing = [] |
| - for filename, metadata, push_urls in self.get_missing_files(infiles): |
| - missing.append((filename, metadata)) |
| - path = os.path.join(indir, filename) |
| + for pending_push in self.get_missing_files(infiles): |
| + missing.append(pending_push) |
| + path = os.path.join(indir, pending_push.filename) |
| if metadata.get('priority', '1') == '0': |
| priority = WorkerPool.HIGH |
| else: |
| @@ -376,8 +351,8 @@ class Storage(object): |
| compression_level = get_zip_compression_level(path) |
| chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK |
| content = file_read(path, chunk_size) |
| - self.async_push(channel, priority, metadata['h'], metadata['s'], |
| - content, compression_level, push_urls) |
| + self.async_push(channel, priority, pending_push, |
| + content, compression_level) |
| # No need to spawn deadlock detector thread if there's nothing to upload. |
| if missing: |
| @@ -415,8 +390,8 @@ class Storage(object): |
| len(cache_miss) * 100. / total, |
| cache_miss_size * 100. / total_size if total_size else 0) |
| - def async_push(self, channel, priority, item, expected_size, |
| - content_generator, compression_level, push_urls=None): |
| + def async_push(self, channel, priority, pending_push, |
| + content_generator, compression_level): |
| """Starts asynchronous push to the server in a parallel thread.""" |
| def push(content, size): |
| """Pushes an item and returns its id, to pass as a result to |channel|.""" |
| @@ -453,7 +428,7 @@ class Storage(object): |
| files: a dictionary file name -> metadata dict. |
| Yields: |
| - Triplets (file name, metadata dict, push_urls object to pass to push). |
| + PendingPush objects that represent missing files. |
| """ |
| channel = threading_utils.TaskChannel() |
| pending = 0 |
| @@ -512,14 +487,14 @@ class StorageApi(object): |
| """ |
| raise NotImplementedError() |
| - def push(self, item, expected_size, content_generator, push_urls=None): |
| - """Uploads content generated by |content_generator| as |item|. |
| + def push(self, pending_push, content_generator): |
| + """Uploads content generated by |content_generator|. |
| + |
| + Actual item being pushed is identified by |pending_push| object. |
| Arguments: |
| - item: hash digest of item to upload. |
| - expected_size: total length of the content yielded by |content_generator|. |
| + pending_push: PendingPush object returned by 'contains' call. |
| content_generator: generator that produces chunks to push. |
| - push_urls: optional URLs returned by 'contains' call for this item. |
| Returns: |
| None. |
| @@ -533,91 +508,157 @@ class StorageApi(object): |
| files: list of pairs (file name, metadata dict). |
| Returns: |
| - A list of files missing on server as a list of triplets |
| - (file name, metadata dict, push_urls object to pass to push). |
| + A list of files missing on server as a list of PendingPush objects. |
| """ |
| raise NotImplementedError() |
| +class PendingPush(object): |
|
M-A Ruel
2013/09/30 19:38:27
I prefer the class to be before Storage.
|
| + """A file that were not found on the server and that should be uploaded. |
|
M-A Ruel
2013/09/30 19:38:27
"""A file that was not ...
|
| + |
| + Returned by StorageApi's 'contains' call, passed to StorageApi's 'push'. |
|
M-A Ruel
2013/09/30 19:38:27
Returned by StorageApi.contains(), to be passed to
|
| + """ |
| + |
| + def __init__(self, filename, metadata): |
| + self.filename = filename |
| + self.metadata = metadata |
| + |
| + |
| class IsolateServer(StorageApi): |
| - """StorageApi implementation that downloads and uploads to Isolate Server.""" |
| + """StorageApi implementation that downloads and uploads to Isolate Server. |
| + |
| + It uploads and downloads directly from Google Storage whenever appropriate. |
| + """ |
| + |
| + class PendingIsolatePush(PendingPush): |
| + def __init__(self, filename, metadata, push_urls): |
| + super(IsolateServer.PendingIsolatePush, self).__init__(filename, metadata) |
| + self.push_urls = push_urls |
| + self.uploaded = False |
| + |
| def __init__(self, base_url, namespace): |
| super(IsolateServer, self).__init__() |
| assert base_url.startswith('http'), base_url |
| - self.content_url = base_url.rstrip('/') + '/content/' |
| + self.base_url = base_url.rstrip('/') |
| self.namespace = namespace |
| self.algo = get_hash_algo(namespace) |
| - self._token = None |
| + self._use_zip = is_namespace_with_compression(namespace) |
| self._lock = threading.Lock() |
| + self._server_caps = None |
| + |
| + @staticmethod |
| + def generate_handshake_request(): |
| + """Returns a dict to be sent as handshake request body.""" |
| + # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage. |
| + return { |
| + 'protocol_version': ISOLATE_PROTOCOL_VERSION, |
|
M-A Ruel
2013/09/30 19:38:27
Sort here too
|
| + 'client_app_version': __version__, |
| + 'fetcher': True, |
| + 'pusher': True, |
| + } |
| + |
| + @staticmethod |
| + def validate_handshake_response(caps): |
| + """Validates and normalizes handshake response.""" |
| + logging.info('Protocol version: %s', caps['protocol_version']) |
| + logging.info('Server version: %s', caps['server_app_version']) |
| + if caps.get('error'): |
| + raise MappingError(caps['error']) |
| + if not caps['access_token']: |
| + raise ValueError('access_token is missing') |
| + return caps |
| @property |
| - def token(self): |
| + def server_capabilities(self): |
| + """Performs handshake with the server if not yet done. |
| + |
| + Returns: |
| + Server capabilities dictionary as returned by /handshake endpoint. |
| + |
| + Raises: |
| + MappingError if server rejects the handshake. |
| + """ |
| # TODO(maruel): Make this request much earlier asynchronously while the |
| # files are being enumerated. |
| with self._lock: |
| - if not self._token: |
| - self._token = urllib.quote(url_read(self.content_url + 'get_token')) |
| - return self._token |
| + if self._server_caps is None: |
| + request_body = json.dumps( |
| + self.generate_handshake_request(), separators=(',', ':')) |
| + response = net.url_read( |
| + url=self.base_url + '/content-gs/handshake', |
| + data=request_body, |
| + content_type='application/json', |
| + method='POST') |
| + if response is None: |
| + raise MappingError('Failed to perform handshake.') |
| + try: |
| + caps = json.loads(response) |
| + if not isinstance(caps, dict): |
| + raise ValueError('Expecting JSON dict') |
| + self._server_caps = self.validate_handshake_response(caps) |
| + except (ValueError, KeyError, TypeError) as exc: |
| + # KeyError exception has very confusing str conversion: it's just a |
| + # missing key value and nothing else. So print exception class name |
| + # as well. |
| + raise MappingError('Invalid handshake response (%s): %s' % ( |
| + exc.__class__.__name__, exc)) |
| + return self._server_caps |
| def fetch(self, item, expected_size): |
| assert isinstance(item, basestring) |
| - assert ( |
| - isinstance(expected_size, (int, long)) or |
| + assert (isinstance(expected_size, (int, long)) or |
| expected_size == UNKNOWN_FILE_SIZE) |
| - zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item) |
| - logging.debug('download_file(%s)', zipped_url) |
| + |
| + source_url = '%s/content-gs/retrieve/%s/%s' % ( |
| + self.base_url, self.namespace, item) |
| + logging.debug('download_file(%s)', source_url) |
| # Because the app engine DB is only eventually consistent, retry 404 errors |
| # because the file might just not be visible yet (even though it has been |
| # uploaded). |
| connection = net.url_open( |
| - zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) |
| + source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) |
| if not connection: |
| - raise IOError('Unable to open connection to %s' % zipped_url) |
| + raise IOError('Unable to open connection to %s' % source_url) |
| - # TODO(maruel): Must only decompress when needed. |
| - decompressor = zlib.decompressobj() |
| try: |
| - compressed_size = 0 |
| - decompressed_size = 0 |
| - while True: |
| - chunk = connection.read(ZIPPED_FILE_CHUNK) |
| - if not chunk: |
| - break |
| - compressed_size += len(chunk) |
| - decompressed = decompressor.decompress(chunk) |
| - decompressed_size += len(decompressed) |
| - yield decompressed |
| - |
| - # Ensure that all the data was properly decompressed. |
| - uncompressed_data = decompressor.flush() |
| - if uncompressed_data: |
| - raise IOError('Decompression failed') |
| - if (expected_size != UNKNOWN_FILE_SIZE and |
| - decompressed_size != expected_size): |
| - raise IOError('File incorrect size after download of %s. Got %s and ' |
| - 'expected %s' % (item, decompressed_size, expected_size)) |
| - except zlib.error as e: |
| - msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % ( |
| - item, compressed_size, connection.content_length, e) |
| - logging.warning(msg) |
| - |
| - # Testing seems to show that if a few machines are trying to download |
| - # the same blob, they can cause each other to fail. So if we hit a zip |
| - # error, this is the most likely cause (it only downloads some of the |
| - # data). Randomly sleep for between 5 and 25 seconds to try and spread |
| - # out the downloads. |
| - sleep_duration = (random.random() * 20) + 5 |
| - time.sleep(sleep_duration) |
| - raise IOError(msg) |
| - |
| - def push(self, item, expected_size, content_generator, push_urls=None): |
| + # Prepare reading pipeline. |
| + generator = stream_read(connection, NET_IO_FILE_CHUNK) |
| + if self._use_zip: |
| + generator = zip_decompress(generator, DISK_FILE_CHUNK) |
| + |
| + # Read and yield data, calculate total length of the decompressed stream. |
| + total_size = 0 |
| + for chunk in generator: |
| + total_size += len(chunk) |
| + yield chunk |
| + |
| + # Verify data length matches expectation. |
| + if expected_size != UNKNOWN_FILE_SIZE and total_size != expected_size: |
| + raise IOError('Incorrect file size: expected %d, got %d' % ( |
| + expected_size, total_size)) |
| + |
| + except IOError as err: |
| + logging.warning('Failed to fetch %s: %s', item, err) |
| + raise |
| + |
| + def push(self, pending_push, content_generator): |
| + assert isinstance(pending_push, IsolateServer.PendingIsolatePush) |
| + # TODO: use |pending_push| below. |
| assert isinstance(item, basestring) |
| assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE |
| - item = str(item) |
| + assert push_urls and len(push_urls) == 2 |
| + upload_url, finalize_url = push_urls |
| # TODO(maruel): Support large files. This would require streaming support. |
| + # TODO(vadimsh): Do not read from |content_generator| when retrying push. |
| + # If |content_generator| is indeed a generator, it can not be re-winded back |
| + # to the beginning of the stream. A retry will find it exhausted. A possible |
| + # solution is to wrap content_generator with some sort of caching |
| + # restartable generator. It should be done alongside streaming support |
| + # implementation. |
| + |
| # A cheese way to avoid memcpy of (possibly huge) file, until streaming |
| # upload support is implemented. |
| if isinstance(content_generator, list) and len(content_generator) == 1: |
| @@ -625,64 +666,77 @@ class IsolateServer(StorageApi): |
| else: |
| content = ''.join(content_generator) |
| - if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE: |
| - return self._upload_hash_content_to_blobstore(item, content) |
| - |
| - url = '%sstore/%s/%s?token=%s' % ( |
| - self.content_url, self.namespace, item, self.token) |
| - return url_read(url, data=content, content_type='application/octet-stream') |
| + # PUT file to |upload_url|. |
| + response = net.url_read( |
| + url=upload_url, |
| + data=content, |
| + content_type='application/octet-stream', |
| + method='PUT') |
| + if response is None: |
| + raise IOError('Failed to upload a file %s to %s' % (item, upload_url)) |
| + |
| + # Optionally notify the server that it's done. |
| + if finalize_url: |
| + # TODO(vadimsh): Calculate MD5 sum while uploading a file and send it to |
| + # isolated server. That way isolate server can verify that the data safely |
| + # reached Google Storage (GS provides MD5 of stored files). |
| + response = net.url_read( |
| + url=finalize_url, |
| + data='', |
| + content_type='application/json', |
| + method='POST') |
| + # TODO(vadimsh): Do not reupload item again if only finalize_url request |
| + # failed. |
| + if response is None: |
| + raise IOError('Failed to finalize an upload of %s' % item) |
| def contains(self, files): |
| logging.info('Checking existence of %d files...', len(files)) |
| - body = ''.join( |
| - (binascii.unhexlify(metadata['h']) for (_, metadata) in files)) |
| - assert (len(body) % self.algo().digest_size) == 0, repr(body) |
| + # Request body is a json encoded list of dicts. |
| + body = [ |
| + { |
| + 'h': metadata['h'], |
| + 's': metadata['s'], |
| + 'i': 1 if metadata.get('priority') == '0' else 0, |
| + } for _, metadata in files |
| + ] |
| - query_url = '%scontains/%s?token=%s' % ( |
| - self.content_url, self.namespace, self.token) |
| - response = url_read( |
| - query_url, data=body, content_type='application/octet-stream') |
| - if len(files) != len(response): |
| + query_url = '%s/content-gs/pre-upload/%s?token=%s' % ( |
| + self.base_url, |
| + self.namespace, |
| + urllib.quote(self.server_capabilities['access_token'])) |
| + response_body = net.url_read( |
| + url=query_url, |
| + data=json.dumps(body, separators=(',', ':')), |
| + content_type='application/json', |
| + method='POST') |
| + if response_body is None: |
| + raise MappingError('Failed to execute /pre-upload query') |
| + |
| + # Response body is a list of push_urls (or null if file is already present). |
| + try: |
| + response = json.loads(response_body) |
| + if not isinstance(response, list): |
| + raise ValueError('Expecting response with json-encoded list') |
| + if len(response) != len(files): |
| + raise ValueError( |
| + 'Incorrect number of items in the list, expected %d, ' |
| + 'but got %d' % (len(files), len(response))) |
| + except ValueError as err: |
| raise MappingError( |
| - 'Got an incorrect number of responses from the server. Expected %d, ' |
| - 'but got %d' % (len(files), len(response))) |
| + 'Invalid response from server: %s, body is %s' % (err, response_body)) |
| - # This implementation of IsolateServer doesn't use push_urls field, |
| - # set it to None. |
| + # Convert response into a list of triplets with info about missing files. |
| missing_files = [ |
| - files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00' |
| + IsolateServer.PendingIsolatePush(files[0], files[1], push_urls) |
|
M-A Ruel
2013/09/30 19:38:27
You probably meant files[i][0], files[i][1] :)
|
| + for i, push_urls in enumerate(response) |
| + if push_urls |
| ] |
| logging.info('Queried %d files, %d cache hit', |
| len(files), len(files) - len(missing_files)) |
| return missing_files |
| - def _upload_hash_content_to_blobstore(self, item, content): |
| - """Uploads the content directly to the blobstore via a generated url.""" |
| - # TODO(maruel): Support large files. This would require streaming support. |
| - gen_url = '%sgenerate_blobstore_url/%s/%s' % ( |
| - self.content_url, self.namespace, item) |
| - # Token is guaranteed to be already quoted but it is unnecessary here, and |
| - # only here. |
| - data = [('token', urllib.unquote(self.token))] |
| - content_type, body = encode_multipart_formdata( |
| - data, [('content', item, content)]) |
| - last_url = gen_url |
| - for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS): |
| - # Retry HTTP 50x here but not 404. |
| - upload_url = net.url_read(gen_url, data=data) |
| - if not upload_url: |
| - raise MappingError('Unable to connect to server %s' % gen_url) |
| - last_url = upload_url |
| - |
| - # Do not retry this request on HTTP 50x. Regenerate an upload url each |
| - # time since uploading "consumes" the upload url. |
| - result = net.url_read( |
| - upload_url, data=body, content_type=content_type, retry_50x=False) |
| - if result is not None: |
| - return result |
| - raise MappingError('Unable to connect to server %s' % last_url) |
| - |
| class FileSystem(StorageApi): |
| """StorageApi implementation that fetches data from the file system. |
| @@ -690,6 +744,7 @@ class FileSystem(StorageApi): |
| The common use case is a NFS/CIFS file server that is mounted locally that is |
| used to fetch the file on a local partition. |
| """ |
| + |
| def __init__(self, base_path): |
| super(FileSystem, self).__init__() |
| self.base_path = base_path |
| @@ -715,7 +770,7 @@ class FileSystem(StorageApi): |
| def contains(self, files): |
| return [ |
| - (filename, metadata, None) |
| + PendingPush(filename, metadata) |
| for filename, metadata in files |
| if not os.path.exists(os.path.join(self.base_path, metadata['h'])) |
| ] |