| Index: isolateserver.py
|
| diff --git a/isolateserver.py b/isolateserver.py
|
| index 2205a182ea81eedf2dd366438ddc8ae5a12d4e25..c11fa7f0117a3ed01bf69b399338a7913b66c957 100755
|
| --- a/isolateserver.py
|
| +++ b/isolateserver.py
|
| @@ -7,12 +7,10 @@
|
|
|
| __version__ = '0.2'
|
|
|
| -import binascii
|
| import hashlib
|
| import json
|
| import logging
|
| import os
|
| -import random
|
| import re
|
| import sys
|
| import threading
|
| @@ -29,14 +27,15 @@ from utils import threading_utils
|
| from utils import tools
|
|
|
|
|
| -# The minimum size of files to upload directly to the blobstore.
|
| -MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024
|
| +# Version of isolate protocol passed to the server in /handshake request.
|
| +ISOLATE_PROTOCOL_VERSION = '1.0'
|
|
|
| -# The number of files to check the isolate server per /contains query.
|
| +
|
| +# The number of files to check the isolate server per /pre-upload query.
|
| # All files are sorted by likelihood of a change in the file content
|
| # (currently file size is used to estimate this: larger the file -> larger the
|
| # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
|
| -# are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1],
|
| +# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
|
| # and so on. Numbers here is a trade-off; the more per request, the lower the
|
| # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
|
| # larger values cause longer lookups, increasing the initial latency to start
|
| @@ -62,10 +61,12 @@ UNKNOWN_FILE_SIZE = None
|
| # The size of each chunk to read when downloading and unzipping files.
|
| ZIPPED_FILE_CHUNK = 16 * 1024
|
|
|
| -
|
| # Chunk size to use when doing disk I/O.
|
| DISK_FILE_CHUNK = 1024 * 1024
|
|
|
| +# Chunk size to use when reading from network stream.
|
| +NET_IO_FILE_CHUNK = 16 * 1024
|
| +
|
|
|
| # Read timeout in seconds for downloads from isolate storage. If there's no
|
| # response from the server within this timeout whole download will be aborted.
|
| @@ -106,63 +107,6 @@ class MappingError(OSError):
|
| pass
|
|
|
|
|
| -def randomness():
|
| - """Generates low-entropy randomness for MIME encoding.
|
| -
|
| - Exists so it can be mocked out in unit tests.
|
| - """
|
| - return str(time.time())
|
| -
|
| -
|
| -def encode_multipart_formdata(fields, files,
|
| - mime_mapper=lambda _: 'application/octet-stream'):
|
| - """Encodes a Multipart form data object.
|
| -
|
| - Args:
|
| - fields: a sequence (name, value) elements for
|
| - regular form fields.
|
| - files: a sequence of (name, filename, value) elements for data to be
|
| - uploaded as files.
|
| - mime_mapper: function to return the mime type from the filename.
|
| - Returns:
|
| - content_type: for httplib.HTTP instance
|
| - body: for httplib.HTTP instance
|
| - """
|
| - boundary = hashlib.md5(randomness()).hexdigest()
|
| - body_list = []
|
| - for (key, value) in fields:
|
| - if isinstance(key, unicode):
|
| - value = key.encode('utf-8')
|
| - if isinstance(value, unicode):
|
| - value = value.encode('utf-8')
|
| - body_list.append('--' + boundary)
|
| - body_list.append('Content-Disposition: form-data; name="%s"' % key)
|
| - body_list.append('')
|
| - body_list.append(value)
|
| - body_list.append('--' + boundary)
|
| - body_list.append('')
|
| - for (key, filename, value) in files:
|
| - if isinstance(key, unicode):
|
| - value = key.encode('utf-8')
|
| - if isinstance(filename, unicode):
|
| - value = filename.encode('utf-8')
|
| - if isinstance(value, unicode):
|
| - value = value.encode('utf-8')
|
| - body_list.append('--' + boundary)
|
| - body_list.append('Content-Disposition: form-data; name="%s"; '
|
| - 'filename="%s"' % (key, filename))
|
| - body_list.append('Content-Type: %s' % mime_mapper(filename))
|
| - body_list.append('')
|
| - body_list.append(value)
|
| - body_list.append('--' + boundary)
|
| - body_list.append('')
|
| - if body_list:
|
| - body_list[-2] += '--'
|
| - body = '\r\n'.join(body_list)
|
| - content_type = 'multipart/form-data; boundary=%s' % boundary
|
| - return content_type, body
|
| -
|
| -
|
| def is_valid_hash(value, algo):
|
| """Returns if the value is a valid hash for the corresponding algorithm."""
|
| size = 2 * algo().digest_size
|
| @@ -184,6 +128,15 @@ def hash_file(filepath, algo):
|
| return digest.hexdigest()
|
|
|
|
|
| +def stream_read(stream, chunk_size):
|
| + """Reads chunks from |stream| and yields them."""
|
| + while True:
|
| + data = stream.read(chunk_size)
|
| + if not data:
|
| + break
|
| + yield data
|
| +
|
| +
|
| def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
|
| """Yields file content in chunks of given |chunk_size|."""
|
| with open(filepath, 'rb') as f:
|
| @@ -226,6 +179,37 @@ def zip_compress(content_generator, level=7):
|
| yield tail
|
|
|
|
|
| +def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
|
| + """Reads zipped data from |content_generator| and yields decompressed data.
|
| +
|
| + Decompresses data in small chunks (no larger than |chunk_size|) so that
|
| + zip bomb file doesn't cause zlib to preallocate huge amount of memory.
|
| +
|
| + Raises IOError if data is corrupted or incomplete.
|
| + """
|
| + decompressor = zlib.decompressobj()
|
| + compressed_size = 0
|
| + try:
|
| + for chunk in content_generator:
|
| + compressed_size += len(chunk)
|
| + data = decompressor.decompress(chunk, chunk_size)
|
| + if data:
|
| + yield data
|
| + while decompressor.unconsumed_tail:
|
| + data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
|
| + if data:
|
| + yield data
|
| + tail = decompressor.flush()
|
| + if tail:
|
| + yield tail
|
| + except zlib.error as e:
|
| + raise IOError(
|
| + 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
|
| + # Ensure all data was read and decompressed.
|
| + if decompressor.unused_data or decompressor.unconsumed_tail:
|
| + raise IOError('Not all data was decompressed')
|
| +
|
| +
|
| def get_zip_compression_level(filename):
|
| """Given a filename calculates the ideal zip compression level to use."""
|
| file_ext = os.path.splitext(filename)[1].lower()
|
| @@ -301,15 +285,6 @@ def try_remove(filepath):
|
| pass
|
|
|
|
|
| -def url_read(url, **kwargs):
|
| - result = net.url_read(url, **kwargs)
|
| - if result is None:
|
| - # If we get no response from the server, assume it is down and raise an
|
| - # exception.
|
| - raise MappingError('Unable to connect to server %s' % url)
|
| - return result
|
| -
|
| -
|
| class Storage(object):
|
| """Efficiently downloads or uploads large set of files via StorageApi."""
|
|
|
| @@ -540,84 +515,132 @@ class StorageApi(object):
|
|
|
|
|
| class IsolateServer(StorageApi):
|
| - """StorageApi implementation that downloads and uploads to Isolate Server."""
|
| + """StorageApi implementation that downloads and uploads to Isolate Server.
|
| +
|
| + It uploads and downloads directly from Google Storage whenever appropriate.
|
| + """
|
| +
|
| def __init__(self, base_url, namespace):
|
| super(IsolateServer, self).__init__()
|
| assert base_url.startswith('http'), base_url
|
| - self.content_url = base_url.rstrip('/') + '/content/'
|
| + self.base_url = base_url.rstrip('/')
|
| self.namespace = namespace
|
| self.algo = get_hash_algo(namespace)
|
| - self._token = None
|
| + self._use_zip = is_namespace_with_compression(namespace)
|
| self._lock = threading.Lock()
|
| + self._server_caps = None
|
| +
|
| + @staticmethod
|
| + def generate_handshake_request():
|
| + """Returns a dict to be sent as handshake request body."""
|
| + # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
|
| + return {
|
| + 'protocol_version': ISOLATE_PROTOCOL_VERSION,
|
| + 'client_app_version': __version__,
|
| + 'fetcher': True,
|
| + 'pusher': True,
|
| + }
|
| +
|
| + @staticmethod
|
| + def validate_handshake_response(caps):
|
| + """Validates and normalizes handshake response."""
|
| + logging.info('Protocol version: %s', caps['protocol_version'])
|
| + logging.info('Server version: %s', caps['server_app_version'])
|
| + if caps.get('error'):
|
| + raise MappingError(caps['error'])
|
| + if not caps['access_token']:
|
| + raise ValueError('access_token is missing')
|
| + return caps
|
|
|
| @property
|
| - def token(self):
|
| + def server_capabilities(self):
|
| + """Performs handshake with the server if not yet done.
|
| +
|
| + Returns:
|
| + Server capabilities dictionary as returned by /handshake endpoint.
|
| +
|
| + Raises:
|
| + MappingError if server rejects the handshake.
|
| + """
|
| # TODO(maruel): Make this request much earlier asynchronously while the
|
| # files are being enumerated.
|
| with self._lock:
|
| - if not self._token:
|
| - self._token = urllib.quote(url_read(self.content_url + 'get_token'))
|
| - return self._token
|
| + if self._server_caps is None:
|
| + request_body = json.dumps(
|
| + self.generate_handshake_request(), separators=(',', ':'))
|
| + response = net.url_read(
|
| + url=self.base_url + '/content-gs/handshake',
|
| + data=request_body,
|
| + content_type='application/json',
|
| + method='POST')
|
| + if response is None:
|
| + raise MappingError('Failed to perform handshake.')
|
| + try:
|
| + caps = json.loads(response)
|
| + if not isinstance(caps, dict):
|
| + raise ValueError('Expecting JSON dict')
|
| + self._server_caps = self.validate_handshake_response(caps)
|
| + except (ValueError, KeyError, TypeError) as exc:
|
| + # KeyError exception has very confusing str conversion: it's just a
|
| + # missing key value and nothing else. So print exception class name
|
| + # as well.
|
| + raise MappingError('Invalid handshake response (%s): %s' % (
|
| + exc.__class__.__name__, exc))
|
| + return self._server_caps
|
|
|
| def fetch(self, item, expected_size):
|
| assert isinstance(item, basestring)
|
| - assert (
|
| - isinstance(expected_size, (int, long)) or
|
| + assert (isinstance(expected_size, (int, long)) or
|
| expected_size == UNKNOWN_FILE_SIZE)
|
| - zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item)
|
| - logging.debug('download_file(%s)', zipped_url)
|
| +
|
| + source_url = '%s/content-gs/retrieve/%s/%s' % (
|
| + self.base_url, self.namespace, item)
|
| + logging.debug('download_file(%s)', source_url)
|
|
|
| # Because the app engine DB is only eventually consistent, retry 404 errors
|
| # because the file might just not be visible yet (even though it has been
|
| # uploaded).
|
| connection = net.url_open(
|
| - zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
|
| + source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
|
| if not connection:
|
| - raise IOError('Unable to open connection to %s' % zipped_url)
|
| + raise IOError('Unable to open connection to %s' % source_url)
|
|
|
| - # TODO(maruel): Must only decompress when needed.
|
| - decompressor = zlib.decompressobj()
|
| try:
|
| - compressed_size = 0
|
| - decompressed_size = 0
|
| - while True:
|
| - chunk = connection.read(ZIPPED_FILE_CHUNK)
|
| - if not chunk:
|
| - break
|
| - compressed_size += len(chunk)
|
| - decompressed = decompressor.decompress(chunk)
|
| - decompressed_size += len(decompressed)
|
| - yield decompressed
|
| -
|
| - # Ensure that all the data was properly decompressed.
|
| - uncompressed_data = decompressor.flush()
|
| - if uncompressed_data:
|
| - raise IOError('Decompression failed')
|
| - if (expected_size != UNKNOWN_FILE_SIZE and
|
| - decompressed_size != expected_size):
|
| - raise IOError('File incorrect size after download of %s. Got %s and '
|
| - 'expected %s' % (item, decompressed_size, expected_size))
|
| - except zlib.error as e:
|
| - msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % (
|
| - item, compressed_size, connection.content_length, e)
|
| - logging.warning(msg)
|
| -
|
| - # Testing seems to show that if a few machines are trying to download
|
| - # the same blob, they can cause each other to fail. So if we hit a zip
|
| - # error, this is the most likely cause (it only downloads some of the
|
| - # data). Randomly sleep for between 5 and 25 seconds to try and spread
|
| - # out the downloads.
|
| - sleep_duration = (random.random() * 20) + 5
|
| - time.sleep(sleep_duration)
|
| - raise IOError(msg)
|
| + # Prepare reading pipeline.
|
| + generator = stream_read(connection, NET_IO_FILE_CHUNK)
|
| + if self._use_zip:
|
| + generator = zip_decompress(generator, DISK_FILE_CHUNK)
|
| +
|
| + # Read and yield data, calculate total length of the decompressed stream.
|
| + total_size = 0
|
| + for chunk in generator:
|
| + total_size += len(chunk)
|
| + yield chunk
|
| +
|
| + # Verify data length matches expectation.
|
| + if expected_size != UNKNOWN_FILE_SIZE and total_size != expected_size:
|
| + raise IOError('Incorrect file size: expected %d, got %d' % (
|
| + expected_size, total_size))
|
| +
|
| + except IOError as err:
|
| + logging.warning('Failed to fetch %s: %s', item, err)
|
| + raise
|
|
|
| def push(self, item, expected_size, content_generator, push_urls=None):
|
| assert isinstance(item, basestring)
|
| assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
|
| - item = str(item)
|
| + assert push_urls and len(push_urls) == 2
|
| + upload_url, finalize_url = push_urls
|
|
|
| # TODO(maruel): Support large files. This would require streaming support.
|
|
|
| + # TODO(vadimsh): Do not read from |content_generator| when retrying push.
|
| + # If |content_generator| is indeed a generator, it can not be re-winded back
|
| + # to the beginning of the stream. A retry will find it exhausted. A possible
|
| + # solution is to wrap content_generator with some sort of caching
|
| + # restartable generator. It should be done alongside streaming support
|
| + # implementation.
|
| +
|
| # A cheese way to avoid memcpy of (possibly huge) file, until streaming
|
| # upload support is implemented.
|
| if isinstance(content_generator, list) and len(content_generator) == 1:
|
| @@ -625,64 +648,76 @@ class IsolateServer(StorageApi):
|
| else:
|
| content = ''.join(content_generator)
|
|
|
| - if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE:
|
| - return self._upload_hash_content_to_blobstore(item, content)
|
| -
|
| - url = '%sstore/%s/%s?token=%s' % (
|
| - self.content_url, self.namespace, item, self.token)
|
| - return url_read(url, data=content, content_type='application/octet-stream')
|
| + # PUT file to |upload_url|.
|
| + response = net.url_read(
|
| + url=upload_url,
|
| + data=content,
|
| + content_type='application/octet-stream',
|
| + method='PUT')
|
| + if response is None:
|
| + raise IOError('Failed to upload a file %s to %s' % (item, upload_url))
|
| +
|
| + # Optionally notify the server that it's done.
|
| + if finalize_url:
|
| + # TODO(vadimsh): Calculate MD5 sum while uploading a file and send it to
|
| + # isolated server. That way isolate server can verify that the data safely
|
| + # reached Google Storage (GS provides MD5 of stored files).
|
| + response = net.url_read(
|
| + url=finalize_url,
|
| + data='',
|
| + content_type='application/json',
|
| + method='POST')
|
| + # TODO(vadimsh): Do not reupload item again if only finalize_url request
|
| + # failed.
|
| + if response is None:
|
| + raise IOError('Failed to finalize an upload of %s' % item)
|
|
|
| def contains(self, files):
|
| logging.info('Checking existence of %d files...', len(files))
|
|
|
| - body = ''.join(
|
| - (binascii.unhexlify(metadata['h']) for (_, metadata) in files))
|
| - assert (len(body) % self.algo().digest_size) == 0, repr(body)
|
| + # Request body is a json encoded list of dicts.
|
| + body = [
|
| + {
|
| + 'h': metadata['h'],
|
| + 's': metadata['s'],
|
| + 'i': 1 if metadata.get('priority') == '0' else 0,
|
| + } for _, metadata in files
|
| + ]
|
|
|
| - query_url = '%scontains/%s?token=%s' % (
|
| - self.content_url, self.namespace, self.token)
|
| - response = url_read(
|
| - query_url, data=body, content_type='application/octet-stream')
|
| - if len(files) != len(response):
|
| + query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
|
| + self.base_url,
|
| + self.namespace,
|
| + urllib.quote(self.server_capabilities['access_token']))
|
| + response_body = net.url_read(
|
| + url=query_url,
|
| + data=json.dumps(body, separators=(',', ':')),
|
| + content_type='application/json',
|
| + method='POST')
|
| + if response_body is None:
|
| + raise MappingError('Failed to execute /pre-upload query')
|
| +
|
| + # Response body is a list of push_urls (or null if file is already present).
|
| + try:
|
| + response = json.loads(response_body)
|
| + if not isinstance(response, list):
|
| + raise ValueError('Expecting response with json-encoded list')
|
| + if len(response) != len(files):
|
| + raise ValueError(
|
| + 'Incorrect number of items in the list, expected %d, '
|
| + 'but got %d' % (len(files), len(response)))
|
| + except ValueError as err:
|
| raise MappingError(
|
| - 'Got an incorrect number of responses from the server. Expected %d, '
|
| - 'but got %d' % (len(files), len(response)))
|
| + 'Invalid response from server: %s, body is %s' % (err, response_body))
|
|
|
| - # This implementation of IsolateServer doesn't use push_urls field,
|
| - # set it to None.
|
| + # Convert response into a list of triplets with info about missing files.
|
| missing_files = [
|
| - files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00'
|
| + files[i] + (push_urls,) for i, push_urls in enumerate(response)
|
| + if push_urls
|
| ]
|
| logging.info('Queried %d files, %d cache hit',
|
| len(files), len(files) - len(missing_files))
|
| return missing_files
|
|
|
| - def _upload_hash_content_to_blobstore(self, item, content):
|
| - """Uploads the content directly to the blobstore via a generated url."""
|
| - # TODO(maruel): Support large files. This would require streaming support.
|
| - gen_url = '%sgenerate_blobstore_url/%s/%s' % (
|
| - self.content_url, self.namespace, item)
|
| - # Token is guaranteed to be already quoted but it is unnecessary here, and
|
| - # only here.
|
| - data = [('token', urllib.unquote(self.token))]
|
| - content_type, body = encode_multipart_formdata(
|
| - data, [('content', item, content)])
|
| - last_url = gen_url
|
| - for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
|
| - # Retry HTTP 50x here but not 404.
|
| - upload_url = net.url_read(gen_url, data=data)
|
| - if not upload_url:
|
| - raise MappingError('Unable to connect to server %s' % gen_url)
|
| - last_url = upload_url
|
| -
|
| - # Do not retry this request on HTTP 50x. Regenerate an upload url each
|
| - # time since uploading "consumes" the upload url.
|
| - result = net.url_read(
|
| - upload_url, data=body, content_type=content_type, retry_50x=False)
|
| - if result is not None:
|
| - return result
|
| - raise MappingError('Unable to connect to server %s' % last_url)
|
| -
|
|
|
| class FileSystem(StorageApi):
|
| """StorageApi implementation that fetches data from the file system.
|
|
|