Chromium Code Reviews| Index: isolateserver.py |
| diff --git a/isolateserver.py b/isolateserver.py |
| index 2205a182ea81eedf2dd366438ddc8ae5a12d4e25..f0ae85107f9c66c5196adcb93e063dae22cdc6e6 100755 |
| --- a/isolateserver.py |
| +++ b/isolateserver.py |
| @@ -29,6 +29,13 @@ from utils import threading_utils |
| from utils import tools |
| +# Switch that controls what IsolateServer implementation to use by default. |
| +ENABLE_GS_ISOLATE_API = False |
|
Vadim Sh.
2013/09/26 01:14:19
I need your advice on how to deal with conditional
M-A Ruel
2013/09/26 01:21:47
3)
Let it bake on the canary, then roll deps and
Vadim Sh.
2013/09/27 20:28:12
Sweet. It simplifies a lot.
|
| + |
| +# Version of isolate protocol passed to the server in /handshake request. |
| +ISOLATE_PROTOCOL_VERSION = '1.0' |
| + |
| + |
| # The minimum size of files to upload directly to the blobstore. |
| MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 |
| @@ -62,10 +69,12 @@ UNKNOWN_FILE_SIZE = None |
| # The size of each chunk to read when downloading and unzipping files. |
| ZIPPED_FILE_CHUNK = 16 * 1024 |
| - |
| # Chunk size to use when doing disk I/O. |
| DISK_FILE_CHUNK = 1024 * 1024 |
| +# Chunk size to use when reading from network stream. |
| +NET_IO_FILE_CHUNK = 16 * 1024 |
| + |
| # Read timeout in seconds for downloads from isolate storage. If there's no |
| # response from the server within this timeout whole download will be aborted. |
| @@ -184,6 +193,15 @@ def hash_file(filepath, algo): |
| return digest.hexdigest() |
| +def stream_read(stream, chunk_size): |
| + """Reads chunks from |stream| and yields them.""" |
| + while True: |
| + data = stream.read(chunk_size) |
| + if not data: |
| + break |
| + yield data |
| + |
| + |
| def file_read(filepath, chunk_size=DISK_FILE_CHUNK): |
| """Yields file content in chunks of given |chunk_size|.""" |
| with open(filepath, 'rb') as f: |
| @@ -226,6 +244,33 @@ def zip_compress(content_generator, level=7): |
| yield tail |
| +def zip_decompress(content_generator): |
| + """Reads zipped data from |content_generator| and yields decompressed data. |
| + |
| + |content_generator| should yield an entire zipped stream, this function will |
| + ensure all passed data is decompressed. |
| + |
| + Raises IOError if data is corrupted or incomplete. |
| + """ |
| + decompressor = zlib.decompressobj() |
| + compressed_size = 0 |
| + try: |
| + for chunk in content_generator: |
| + compressed_size += len(chunk) |
| + decompressed = decompressor.decompress(chunk) |
|
M-A Ruel
2013/09/26 01:21:47
You should create an inner loop here, so that if a
Vadim Sh.
2013/09/27 20:28:12
Done.
|
| + if decompressed: |
| + yield decompressed |
| + tail = decompressor.flush() |
| + if tail: |
| + yield tail |
| + except zlib.error as e: |
| + raise IOError( |
| + 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e)) |
| + # Ensure all data was read and decompressed. |
| + if decompressor.unused_data or decompressor.unconsumed_tail: |
| + raise IOError('Not all data was decompressed') |
| + |
| + |
| def get_zip_compression_level(filename): |
| """Given a filename calculates the ideal zip compression level to use.""" |
| file_ext = os.path.splitext(filename)[1].lower() |
| @@ -684,6 +729,198 @@ class IsolateServer(StorageApi): |
| raise MappingError('Unable to connect to server %s' % last_url) |
| +class IsolateServerGS(StorageApi): |
| + """StorageApi implementation that downloads and uploads to Isolate Server. |
| + |
| + It uploads and downloads directly from Google Storage whenever appropriate. |
| + """ |
| + |
| + def __init__(self, base_url, namespace): |
| + super(IsolateServerGS, self).__init__() |
| + assert base_url.startswith('http'), base_url |
| + self.base_url = base_url.rstrip('/') |
| + self.namespace = namespace |
| + self.algo = get_hash_algo(namespace) |
| + self._use_zip = is_namespace_with_compression(namespace) |
| + self._lock = threading.Lock() |
| + self._server_caps = None |
| + |
| + @staticmethod |
| + def generate_handshake_request(): |
| + """Returns a dict to be sent as handshake request body.""" |
| + # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage. |
| + return { |
| + 'protocol_version': ISOLATE_PROTOCOL_VERSION, |
| + 'client_app_version': __version__, |
| + 'fetcher': True, |
| + 'pusher': True, |
| + } |
| + |
| + @staticmethod |
| + def validate_handshake_response(caps): |
| + """Validates and normalizes handshake response.""" |
| + logging.info('Protocol version: %s', caps['protocol_version']) |
| + logging.info('Server version: %s', caps['server_app_version']) |
| + if "error" in caps: |
|
M-A Ruel
2013/09/26 01:21:47
'error'
Vadim Sh.
2013/09/27 20:28:12
Done.
|
| + raise MappingError(caps['error']) |
| + if not caps['access_token']: |
| + raise ValueError('access_token is missing') |
| + return caps |
| + |
| + @property |
| + def server_capabilities(self): |
| + """Performs handshake with the server if not yet done. |
| + |
| + Returns: |
| + Server capabilities dictionary as returned by /handshake endpoint. |
| + |
| + Raises: |
| + MappingError if server rejects the handshake. |
| + """ |
| + # TODO(maruel): Make this request much earlier asynchronously while the |
| + # files are being enumerated. |
| + with self._lock: |
| + if self._server_caps is None: |
| + response = net.url_read( |
| + url=self.base_url + '/content-gs/handshake', |
| + data=json.dumps(self.generate_handshake_request()), |
| + content_type='application/json') |
| + try: |
| + caps = json.loads(response) |
| + if not isinstance(caps, dict): |
| + raise ValueError('Expecting JSON dict') |
| + self._server_caps = self.validate_handshake_response(caps) |
| + except (ValueError, KeyError, TypeError) as exc: |
| + raise MappingError('Invalid handshake response: %s' % exc) |
| + return self._server_caps |
| + |
| + @property |
| + def token(self): |
| + """Returns access token received through handshake process.""" |
| + return self.server_capabilities['access_token'] |
| + |
| + def fetch(self, item, expected_size): |
| + assert isinstance(item, basestring) |
| + assert (isinstance(expected_size, (int, long)) or |
| + expected_size == UNKNOWN_FILE_SIZE) |
| + |
| + source_url = '%s/content-gs/retrieve/%s/%s' % ( |
| + self.base_url, self.namespace, item) |
| + logging.debug('download_file(%s)', source_url) |
| + |
| + # Because the app engine DB is only eventually consistent, retry 404 errors |
| + # because the file might just not be visible yet (even though it has been |
| + # uploaded). |
| + connection = net.url_open( |
| + source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) |
| + if not connection: |
| + raise IOError('Unable to open connection to %s' % source_url) |
| + |
| + try: |
| + # Prepare reading pipeline. |
| + generator = stream_read(connection, NET_IO_FILE_CHUNK) |
| + if self._use_zip: |
| + generator = zip_decompress(generator) |
| + |
| + # Read and yield data, calculate total length of the decompressed stream. |
| + total_size = 0 |
| + for chunk in generator: |
| + total_size += len(chunk) |
| + yield chunk |
| + |
| + # Verify data length matches expectation. |
| + if expected_size != UNKNOWN_FILE_SIZE and total_size != expected_size: |
| + raise IOError('Incorrect file size: expected %d, got %d' % ( |
| + expected_size, total_size)) |
| + |
| + except IOError as err: |
| + logging.warning('Failed to fetch %s: %s', item, err) |
| + |
| + # Testing seems to show that if a few machines are trying to download |
|
M-A Ruel
2013/09/26 01:21:47
This likely doesn't apply here.
Vadim Sh.
2013/09/27 20:28:12
Done.
|
| + # the same blob, they can cause each other to fail. So if we hit a zip |
| + # error, this is the most likely cause (it only downloads some of the |
| + # data). Randomly sleep for between 5 and 25 seconds to try and spread |
| + # out the downloads. |
| + sleep_duration = (random.random() * 20) + 5 |
| + time.sleep(sleep_duration) |
| + raise |
| + |
| + def push(self, item, expected_size, content_generator, push_urls=None): |
| + assert isinstance(item, basestring) |
| + assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE |
| + assert push_urls and len(push_urls) == 2 |
| + upload_url, finalize_url = push_urls |
| + |
| + # TODO(maruel): Support large files. This would require streaming support. |
| + |
| + # TODO(vadimsh): Do not read from |content_generator| when retrying push. |
| + # If |content_generator| is indeed a generator, it can not be re-winded back |
| + # to the beginning of the stream. A retry will find it exhausted. A possible |
| + # solution is to wrap content_generator with some sort of caching |
| + # restartable generator. It should be done alongside streaming support |
| + # implementation. |
| + |
| + # A cheese way to avoid memcpy of (possibly huge) file, until streaming |
| + # upload support is implemented. |
| + if isinstance(content_generator, list) and len(content_generator) == 1: |
| + content = content_generator[0] |
| + else: |
| + content = ''.join(content_generator) |
| + |
| + # PUT file to |upload_url|. |
| + response = net.url_read(upload_url, data=content, |
| + content_type='application/octet-stream', method='PUT') |
| + if response is None: |
| + raise IOError('Failed to upload a file %s to %s' % (item, upload_url)) |
| + |
| + # Optionally notify the server that it's done. |
| + if finalize_url: |
| + response = net.url_read(finalize_url, data={}, method='POST') |
| + if response is None: |
| + raise IOError('Failed to finalize an upload of %s' % item) |
| + |
| + def contains(self, files): |
| + logging.info('Checking existence of %d files...', len(files)) |
| + |
| + # Request body is a json encoded list of dicts. |
| + body = [ |
| + { |
| + 'h': metadata['h'], |
| + 's': metadata['s'], |
| + 'i': 1 if metadata.get('priority') == '0' else 0, |
| + } for _, metadata in files |
| + ] |
| + |
| + query_url = '%s/content-gs/pre-upload/%s?token=%s' % ( |
| + self.base_url, self.namespace, self.token) |
| + response_body = url_read( |
| + query_url, |
| + data=json.dumps(body, separators=(',', ':')), |
| + content_type='application/json') |
| + |
| + # Response body is a list of push_urls (or null if file is already present). |
| + try: |
| + response = json.loads(response_body) |
| + if not isinstance(response, list): |
| + raise ValueError('Expecting response with json-encoded list') |
| + if len(response) != len(files): |
| + raise ValueError( |
| + 'Incorrect number of items in the list, expected %d, ' |
| + 'but got %d' % (len(files), len(response))) |
| + except ValueError as err: |
| + raise MappingError( |
| + 'Invalid response from server: %s, body is %s' % (err, response_body)) |
| + |
| + # Convert response into a list of triplets with info about missing files. |
| + missing_files = [ |
| + files[i] + (push_urls,) for i, push_urls in enumerate(response) |
| + if push_urls is not None |
|
M-A Ruel
2013/09/26 01:21:47
is "if push_urls" sufficient?
Vadim Sh.
2013/09/27 20:28:12
Done.
|
| + ] |
| + logging.info('Queried %d files, %d cache hit', |
| + len(files), len(files) - len(missing_files)) |
| + return missing_files |
| + |
| + |
| class FileSystem(StorageApi): |
| """StorageApi implementation that fetches data from the file system. |
| @@ -735,7 +972,10 @@ def is_namespace_with_compression(namespace): |
| def get_storage_api(file_or_url, namespace): |
| """Returns an object that implements StorageApi interface.""" |
| if re.match(r'^https?://.+$', file_or_url): |
| - return IsolateServer(file_or_url, namespace) |
| + if ENABLE_GS_ISOLATE_API: |
| + return IsolateServerGS(file_or_url, namespace) |
| + else: |
| + return IsolateServer(file_or_url, namespace) |
| else: |
| return FileSystem(file_or_url) |