Chromium Code Reviews| Index: isolateserver.py |
| diff --git a/isolateserver.py b/isolateserver.py |
| index 2205a182ea81eedf2dd366438ddc8ae5a12d4e25..5c58b5fe6f648ffa01f2712fe3fe1ba3a7141174 100755 |
| --- a/isolateserver.py |
| +++ b/isolateserver.py |
| @@ -7,12 +7,10 @@ |
| __version__ = '0.2' |
| -import binascii |
| import hashlib |
| import json |
| import logging |
| import os |
| -import random |
| import re |
| import sys |
| import threading |
| @@ -29,14 +27,15 @@ from utils import threading_utils |
| from utils import tools |
| -# The minimum size of files to upload directly to the blobstore. |
| -MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 |
| +# Version of isolate protocol passed to the server in /handshake request. |
| +ISOLATE_PROTOCOL_VERSION = '1.0' |
| -# The number of files to check the isolate server per /contains query. |
| + |
| +# The number of files to check the isolate server per /pre-upload query. |
| # All files are sorted by likelihood of a change in the file content |
| # (currently file size is used to estimate this: larger the file -> larger the |
| # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files |
| -# are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], |
| +# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1], |
| # and so on. Numbers here is a trade-off; the more per request, the lower the |
| # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, |
| # larger values cause longer lookups, increasing the initial latency to start |
| @@ -62,10 +61,12 @@ UNKNOWN_FILE_SIZE = None |
| # The size of each chunk to read when downloading and unzipping files. |
| ZIPPED_FILE_CHUNK = 16 * 1024 |
| - |
| # Chunk size to use when doing disk I/O. |
| DISK_FILE_CHUNK = 1024 * 1024 |
| +# Chunk size to use when reading from network stream. |
| +NET_IO_FILE_CHUNK = 16 * 1024 |
| + |
| # Read timeout in seconds for downloads from isolate storage. If there's no |
| # response from the server within this timeout whole download will be aborted. |
| @@ -106,63 +107,6 @@ class MappingError(OSError): |
| pass |
| -def randomness(): |
| - """Generates low-entropy randomness for MIME encoding. |
| - |
| - Exists so it can be mocked out in unit tests. |
| - """ |
| - return str(time.time()) |
| - |
| - |
| -def encode_multipart_formdata(fields, files, |
| - mime_mapper=lambda _: 'application/octet-stream'): |
| - """Encodes a Multipart form data object. |
| - |
| - Args: |
| - fields: a sequence (name, value) elements for |
| - regular form fields. |
| - files: a sequence of (name, filename, value) elements for data to be |
| - uploaded as files. |
| - mime_mapper: function to return the mime type from the filename. |
| - Returns: |
| - content_type: for httplib.HTTP instance |
| - body: for httplib.HTTP instance |
| - """ |
| - boundary = hashlib.md5(randomness()).hexdigest() |
| - body_list = [] |
| - for (key, value) in fields: |
| - if isinstance(key, unicode): |
| - value = key.encode('utf-8') |
| - if isinstance(value, unicode): |
| - value = value.encode('utf-8') |
| - body_list.append('--' + boundary) |
| - body_list.append('Content-Disposition: form-data; name="%s"' % key) |
| - body_list.append('') |
| - body_list.append(value) |
| - body_list.append('--' + boundary) |
| - body_list.append('') |
| - for (key, filename, value) in files: |
| - if isinstance(key, unicode): |
| - value = key.encode('utf-8') |
| - if isinstance(filename, unicode): |
| - value = filename.encode('utf-8') |
| - if isinstance(value, unicode): |
| - value = value.encode('utf-8') |
| - body_list.append('--' + boundary) |
| - body_list.append('Content-Disposition: form-data; name="%s"; ' |
| - 'filename="%s"' % (key, filename)) |
| - body_list.append('Content-Type: %s' % mime_mapper(filename)) |
| - body_list.append('') |
| - body_list.append(value) |
| - body_list.append('--' + boundary) |
| - body_list.append('') |
| - if body_list: |
| - body_list[-2] += '--' |
| - body = '\r\n'.join(body_list) |
| - content_type = 'multipart/form-data; boundary=%s' % boundary |
| - return content_type, body |
| - |
| - |
| def is_valid_hash(value, algo): |
| """Returns if the value is a valid hash for the corresponding algorithm.""" |
| size = 2 * algo().digest_size |
| @@ -184,6 +128,15 @@ def hash_file(filepath, algo): |
| return digest.hexdigest() |
| +def stream_read(stream, chunk_size): |
| + """Reads chunks from |stream| and yields them.""" |
| + while True: |
| + data = stream.read(chunk_size) |
| + if not data: |
| + break |
| + yield data |
| + |
| + |
| def file_read(filepath, chunk_size=DISK_FILE_CHUNK): |
| """Yields file content in chunks of given |chunk_size|.""" |
| with open(filepath, 'rb') as f: |
| @@ -226,6 +179,37 @@ def zip_compress(content_generator, level=7): |
| yield tail |
| +def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK): |
| + """Reads zipped data from |content_generator| and yields decompressed data. |
| + |
| + Decompresses data in small chunks (no larger than |chunk_size|) so that |
| + zip bomb file doesn't cause zlib to preallocate huge amount of memory. |
| + |
| + Raises IOError if data is corrupted or incomplete. |
| + """ |
| + decompressor = zlib.decompressobj() |
| + compressed_size = 0 |
| + try: |
| + for chunk in content_generator: |
| + compressed_size += len(chunk) |
| + data = decompressor.decompress(chunk, chunk_size) |
| + if data: |
| + yield data |
| + while decompressor.unconsumed_tail: |
| + data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size) |
| + if data: |
| + yield data |
| + tail = decompressor.flush() |
| + if tail: |
| + yield tail |
| + except zlib.error as e: |
| + raise IOError( |
| + 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e)) |
| + # Ensure all data was read and decompressed. |
| + if decompressor.unused_data or decompressor.unconsumed_tail: |
| + raise IOError('Not all data was decompressed') |
| + |
| + |
| def get_zip_compression_level(filename): |
| """Given a filename calculates the ideal zip compression level to use.""" |
| file_ext = os.path.splitext(filename)[1].lower() |
| @@ -301,13 +285,49 @@ def try_remove(filepath): |
| pass |
| -def url_read(url, **kwargs): |
| - result = net.url_read(url, **kwargs) |
| - if result is None: |
| - # If we get no response from the server, assume it is down and raise an |
| - # exception. |
| - raise MappingError('Unable to connect to server %s' % url) |
| - return result |
| +class Item(object): |
|
Vadim Sh.
2013/09/30 21:35:21
I decided to call it Item instead of File for two
M-A Ruel
2013/09/30 22:36:33
I like not using File but Item is a tad generic, b
|
| + """An item to push to Storage. |
| + |
| + It starts its life in a main thread, travels to 'contains' thread, then to |
| + 'push' thread and then finally back to the main thread. |
| + |
| + It never used concurrently from multiple threads. |
| + """ |
| + |
| + def __init__(self, digest, size, is_isolated): |
| + self.digest = digest |
| + self.size = size |
| + self.is_isolated = is_isolated |
| + self.priority = WorkerPool.HIGH if self.is_isolated else WorkerPool.MED |
| + self.compression_level = 6 |
| + self._push_state = None |
| + |
| + @property |
| + def push_state(self): |
| + """StorageApi specific push state information.""" |
| + return self._push_state |
| + |
| + @push_state.setter |
| + def push_state(self, push_state): |
| + """StorageApi specific push state information.""" |
| + assert self._push_state is None |
| + self._push_state = push_state |
| + |
| + def content(self, chunk_size): |
| + """Generator that produces content of this item in chunks of given size.""" |
| + raise NotImplementedError() |
| + |
| + |
| +class FileItem(Item): |
| + """A file to push to Storage.""" |
| + |
| + def __init__(self, path, digest, size, is_isolated): |
| + super(FileItem, self).__init__(digest, size, is_isolated) |
| + self.path = path |
| + self.compression_level = get_zip_compression_level(path) |
| + |
| + def content(self, chunk_size): |
| + return file_read(self.path, chunk_size) |
| class Storage(object): |
| @@ -363,21 +383,27 @@ class Storage(object): |
| """ |
| logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles)) |
| + # TODO(vadimsh): Introduce Item as a part of the public interface? |
| + |
| + # Convert |indir| + |infiles| into a list of FileItem objects. |
| + # Filter out symlinks, since they are not represented by items on isolate |
| + # server side. |
| + items = [ |
| + FileItem( |
| + path=os.path.join(indir, filepath), |
| + digest=metadata['h'], |
| + size=metadata['s'], |
| + is_isolated=metadata.get('priority') == '0') |
| + for filepath, metadata in infiles.iteritems() |
| + if 'l' not in metadata |
| + ] |
| + |
| # Enqueue all upload tasks. |
| + missing = set() |
| channel = threading_utils.TaskChannel() |
| - missing = [] |
| - for filename, metadata, push_urls in self.get_missing_files(infiles): |
| - missing.append((filename, metadata)) |
| - path = os.path.join(indir, filename) |
| - if metadata.get('priority', '1') == '0': |
| - priority = WorkerPool.HIGH |
| - else: |
| - priority = WorkerPool.MED |
| - compression_level = get_zip_compression_level(path) |
| - chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK |
| - content = file_read(path, chunk_size) |
| - self.async_push(channel, priority, metadata['h'], metadata['s'], |
| - content, compression_level, push_urls) |
| + for missing_item in self.get_missing_items(items): |
| + missing.add(missing_item) |
| + self.async_push(channel, missing_item) |
| # No need to spawn deadlock detector thread if there's nothing to upload. |
| if missing: |
| @@ -388,18 +414,19 @@ class Storage(object): |
| detector.ping() |
| item = channel.pull() |
| uploaded += 1 |
| - logging.debug('Uploaded %d / %d: %s', uploaded, len(missing), item) |
| + logging.debug( |
| + 'Uploaded %d / %d: %s', uploaded, len(missing), item.digest) |
| logging.info('All files are uploaded') |
| # Print stats. |
| - total = len(infiles) |
| - total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues()) |
| + total = len(items) |
| + total_size = sum(f.size for f in items) |
| logging.info( |
| 'Total: %6d, %9.1fkb', |
| total, |
| - sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.) |
| - cache_hit = set(infiles.iterkeys()) - set(x[0] for x in missing) |
| - cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit) |
| + total_size / 1024.) |
| + cache_hit = set(items) - missing |
| + cache_hit_size = sum(f.size for f in cache_hit) |
| logging.info( |
| 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', |
| len(cache_hit), |
| @@ -407,7 +434,7 @@ class Storage(object): |
| len(cache_hit) * 100. / total, |
| cache_hit_size * 100. / total_size if total_size else 0) |
| cache_miss = missing |
| - cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss) |
| + cache_miss_size = sum(f.size for f in cache_miss) |
| logging.info( |
| 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', |
| len(cache_miss), |
| @@ -415,18 +442,22 @@ class Storage(object): |
| len(cache_miss) * 100. / total, |
| cache_miss_size * 100. / total_size if total_size else 0) |
| - def async_push(self, channel, priority, item, expected_size, |
| - content_generator, compression_level, push_urls=None): |
| - """Starts asynchronous push to the server in a parallel thread.""" |
| + def async_push(self, channel, item): |
| + """Starts asynchronous push to the server in a parallel thread. |
| + |
| + Arguments: |
| + channel: TaskChannel object that receives back |item| when upload ends. |
| + item: item to upload as instance of Item class. |
| + """ |
| def push(content, size): |
| """Pushes an item and returns its id, to pass as a result to |channel|.""" |
| - self._storage_api.push(item, size, content, push_urls) |
| + self._storage_api.push(item, content, size) |
| return item |
| # If zipping is not required, just start a push task. |
| if not self.use_zip: |
| - self.net_thread_pool.add_task_with_channel(channel, priority, push, |
| - content_generator, expected_size) |
| + self.net_thread_pool.add_task_with_channel(channel, item.priority, push, |
| + item.content(DISK_FILE_CHUNK), item.size) |
| return |
| # If zipping is enabled, zip in a separate thread. |
| @@ -434,31 +465,32 @@ class Storage(object): |
| # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble |
| # content right here. It will block until all file is zipped. |
| try: |
| - stream = zip_compress(content_generator, compression_level) |
| + stream = zip_compress(item.content(ZIPPED_FILE_CHUNK), |
| + item.compression_level) |
| data = ''.join(stream) |
| except Exception as exc: |
| logging.error('Failed to zip \'%s\': %s', item, exc) |
| channel.send_exception(exc) |
| return |
| - self.net_thread_pool.add_task_with_channel(channel, priority, push, |
| + self.net_thread_pool.add_task_with_channel(channel, item.priority, push, |
| [data], UNKNOWN_FILE_SIZE) |
| - self.cpu_thread_pool.add_task(0, zip_and_push) |
| + self.cpu_thread_pool.add_task(item.priority, zip_and_push) |
| - def get_missing_files(self, files): |
| - """Yields files that are missing from the server. |
| + def get_missing_items(self, items): |
| + """Yields items that are missing from the server. |
| Issues multiple parallel queries via StorageApi's 'contains' method. |
| Arguments: |
| - files: a dictionary file name -> metadata dict. |
| + items: a list of Item objects to check. |
| Yields: |
| - Triplets (file name, metadata dict, push_urls object to pass to push). |
| + Item objects that are missing from the server. |
| """ |
| channel = threading_utils.TaskChannel() |
| pending = 0 |
| # Enqueue all requests. |
| - for batch in self.batch_files_for_check(files): |
| + for batch in self.batch_items_for_check(items): |
| self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH, |
| self._storage_api.contains, batch) |
| pending += 1 |
| @@ -468,25 +500,24 @@ class Storage(object): |
| yield missing |
| @staticmethod |
| - def batch_files_for_check(files): |
| - """Splits list of files to check for existence on the server into batches. |
| + def batch_items_for_check(items): |
| + """Splits list of items to check for existence on the server into batches. |
| Each batch corresponds to a single 'exists?' query to the server via a call |
| to StorageApi's 'contains' method. |
| Arguments: |
| - files: a dictionary file name -> metadata dict. |
| + items: a list of Item objects. |
| Yields: |
| - Batches of files to query for existence in a single operation, |
| - each batch is a list of pairs: (file name, metadata dict). |
| + Batches of items to query for existence in a single operation, |
| + each batch is a list of Item objects. |
| """ |
| batch_count = 0 |
| batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0] |
| next_queries = [] |
| - items = ((k, v) for k, v in files.iteritems() if 's' in v) |
| - for filename, metadata in sorted(items, key=lambda x: -x[1]['s']): |
| - next_queries.append((filename, metadata)) |
| + for item in sorted(items, key=lambda x: x.size, reverse=True): |
| + next_queries.append(item) |
| if len(next_queries) == batch_size_limit: |
| yield next_queries |
| next_queries = [] |
| @@ -500,188 +531,266 @@ class Storage(object): |
| class StorageApi(object): |
| """Interface for classes that implement low-level storage operations.""" |
| - def fetch(self, item, expected_size): |
| + # TODO(vadimsh): Make 'fetch' use Item abstraction as well. |
| + |
| + def fetch(self, digest, size): |
| """Fetches an object and yields its content. |
| Arguments: |
| - item: hash digest of item to download. |
| - expected_size: expected size of the item, to validate it. |
| + digest: hash digest of item to download. |
| + size: expected size of the item, to validate it. |
| Yields: |
| Chunks of downloaded item (as str objects). |
| """ |
| raise NotImplementedError() |
| - def push(self, item, expected_size, content_generator, push_urls=None): |
| - """Uploads content generated by |content_generator| as |item|. |
| + def push(self, item, content, size): |
| + """Uploads an |item| with content generated by |content| generator. |
| Arguments: |
| - item: hash digest of item to upload. |
| - expected_size: total length of the content yielded by |content_generator|. |
| - content_generator: generator that produces chunks to push. |
| - push_urls: optional URLs returned by 'contains' call for this item. |
| + item: Item object that holds information about an item being pushed. |
| + content: a generator that yields chunks to push. |
| + size: expected size of stream produced by |content|. |
| Returns: |
| None. |
| """ |
| raise NotImplementedError() |
| - def contains(self, files): |
| - """Checks for existence of given |files| on the server. |
| + def contains(self, items): |
| + """Checks for existence of given |items| on the server. |
| + |
| + Can also mutated state of the items by assigning an opaque implementation |
| + specific object to Item's push_state attribute. |
| Arguments: |
| - files: list of pairs (file name, metadata dict). |
| + items: list of Item objects. |
| Returns: |
| - A list of files missing on server as a list of triplets |
| - (file name, metadata dict, push_urls object to pass to push). |
| + A list of items missing on server as a list of Item objects. |
| """ |
| raise NotImplementedError() |
| class IsolateServer(StorageApi): |
| - """StorageApi implementation that downloads and uploads to Isolate Server.""" |
| + """StorageApi implementation that downloads and uploads to Isolate Server. |
| + |
| + It uploads and downloads directly from Google Storage whenever appropriate. |
| + """ |
| + |
| + class PushState(object): |
| + """A state of push operation carried along with Item in push_state.""" |
| + def __init__(self, upload_url, finalize_url): |
| + self.upload_url = upload_url |
| + self.finalize_url = finalize_url |
| + self.uploaded = False |
| + self.finalized = False |
| + |
| def __init__(self, base_url, namespace): |
| super(IsolateServer, self).__init__() |
| assert base_url.startswith('http'), base_url |
| - self.content_url = base_url.rstrip('/') + '/content/' |
| + self.base_url = base_url.rstrip('/') |
| self.namespace = namespace |
| self.algo = get_hash_algo(namespace) |
| - self._token = None |
| + self._use_zip = is_namespace_with_compression(namespace) |
| self._lock = threading.Lock() |
| + self._server_caps = None |
| + |
| + @staticmethod |
| + def generate_handshake_request(): |
| + """Returns a dict to be sent as handshake request body.""" |
| + # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage. |
| + return { |
| + 'client_app_version': __version__, |
| + 'fetcher': True, |
| + 'protocol_version': ISOLATE_PROTOCOL_VERSION, |
| + 'pusher': True, |
| + } |
| + |
| + @staticmethod |
| + def validate_handshake_response(caps): |
| + """Validates and normalizes handshake response.""" |
| + logging.info('Protocol version: %s', caps['protocol_version']) |
| + logging.info('Server version: %s', caps['server_app_version']) |
| + if caps.get('error'): |
| + raise MappingError(caps['error']) |
| + if not caps['access_token']: |
| + raise ValueError('access_token is missing') |
| + return caps |
| @property |
| - def token(self): |
| + def server_capabilities(self): |
| + """Performs handshake with the server if not yet done. |
| + |
| + Returns: |
| + Server capabilities dictionary as returned by /handshake endpoint. |
| + |
| + Raises: |
| + MappingError if server rejects the handshake. |
| + """ |
| # TODO(maruel): Make this request much earlier asynchronously while the |
| # files are being enumerated. |
| with self._lock: |
| - if not self._token: |
| - self._token = urllib.quote(url_read(self.content_url + 'get_token')) |
| - return self._token |
| - |
| - def fetch(self, item, expected_size): |
| - assert isinstance(item, basestring) |
| - assert ( |
| - isinstance(expected_size, (int, long)) or |
| - expected_size == UNKNOWN_FILE_SIZE) |
| - zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item) |
| - logging.debug('download_file(%s)', zipped_url) |
| + if self._server_caps is None: |
| + request_body = json.dumps( |
| + self.generate_handshake_request(), separators=(',', ':')) |
| + response = net.url_read( |
| + url=self.base_url + '/content-gs/handshake', |
| + data=request_body, |
| + content_type='application/json', |
| + method='POST') |
| + if response is None: |
| + raise MappingError('Failed to perform handshake.') |
| + try: |
| + caps = json.loads(response) |
| + if not isinstance(caps, dict): |
| + raise ValueError('Expecting JSON dict') |
| + self._server_caps = self.validate_handshake_response(caps) |
| + except (ValueError, KeyError, TypeError) as exc: |
| + # KeyError exception has very confusing str conversion: it's just a |
| + # missing key value and nothing else. So print exception class name |
| + # as well. |
| + raise MappingError('Invalid handshake response (%s): %s' % ( |
| + exc.__class__.__name__, exc)) |
| + return self._server_caps |
| + |
| + def fetch(self, digest, size): |
| + assert isinstance(digest, basestring) |
| + assert (isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE) |
| + |
| + source_url = '%s/content-gs/retrieve/%s/%s' % ( |
| + self.base_url, self.namespace, digest) |
| + logging.debug('download_file(%s)', source_url) |
| # Because the app engine DB is only eventually consistent, retry 404 errors |
| # because the file might just not be visible yet (even though it has been |
| # uploaded). |
| connection = net.url_open( |
| - zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) |
| + source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) |
| if not connection: |
| - raise IOError('Unable to open connection to %s' % zipped_url) |
| + raise IOError('Unable to open connection to %s' % source_url) |
| - # TODO(maruel): Must only decompress when needed. |
| - decompressor = zlib.decompressobj() |
| try: |
| - compressed_size = 0 |
| - decompressed_size = 0 |
| - while True: |
| - chunk = connection.read(ZIPPED_FILE_CHUNK) |
| - if not chunk: |
| - break |
| - compressed_size += len(chunk) |
| - decompressed = decompressor.decompress(chunk) |
| - decompressed_size += len(decompressed) |
| - yield decompressed |
| - |
| - # Ensure that all the data was properly decompressed. |
| - uncompressed_data = decompressor.flush() |
| - if uncompressed_data: |
| - raise IOError('Decompression failed') |
| - if (expected_size != UNKNOWN_FILE_SIZE and |
| - decompressed_size != expected_size): |
| - raise IOError('File incorrect size after download of %s. Got %s and ' |
| - 'expected %s' % (item, decompressed_size, expected_size)) |
| - except zlib.error as e: |
| - msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % ( |
| - item, compressed_size, connection.content_length, e) |
| - logging.warning(msg) |
| - |
| - # Testing seems to show that if a few machines are trying to download |
| - # the same blob, they can cause each other to fail. So if we hit a zip |
| - # error, this is the most likely cause (it only downloads some of the |
| - # data). Randomly sleep for between 5 and 25 seconds to try and spread |
| - # out the downloads. |
| - sleep_duration = (random.random() * 20) + 5 |
| - time.sleep(sleep_duration) |
| - raise IOError(msg) |
| - |
| - def push(self, item, expected_size, content_generator, push_urls=None): |
| - assert isinstance(item, basestring) |
| - assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE |
| - item = str(item) |
| + # Prepare reading pipeline. |
| + generator = stream_read(connection, NET_IO_FILE_CHUNK) |
| + if self._use_zip: |
| + generator = zip_decompress(generator, DISK_FILE_CHUNK) |
| + |
| + # Read and yield data, calculate total length of the decompressed stream. |
| + total_size = 0 |
| + for chunk in generator: |
| + total_size += len(chunk) |
| + yield chunk |
| + |
| + # Verify data length matches expectation. |
| + if size != UNKNOWN_FILE_SIZE and total_size != size: |
| + raise IOError('Incorrect file size: expected %d, got %d' % ( |
| + size, total_size)) |
| + |
| + except IOError as err: |
| + logging.warning('Failed to fetch %s: %s', digest, err) |
| + raise |
| + |
| + def push(self, item, content, size): |
| + assert isinstance(item, Item) |
| + assert isinstance(item.push_state, IsolateServer.PushState) |
| + assert not item.push_state.finalized |
| # TODO(maruel): Support large files. This would require streaming support. |
| - # A cheese way to avoid memcpy of (possibly huge) file, until streaming |
| - # upload support is implemented. |
| - if isinstance(content_generator, list) and len(content_generator) == 1: |
| - content = content_generator[0] |
| + # TODO(vadimsh): Do not read from |content_generator| when retrying push. |
| + # If |content_generator| is indeed a generator, it can not be re-winded back |
| + # to the beginning of the stream. A retry will find it exhausted. A possible |
| + # solution is to wrap content_generator with some sort of caching |
| + # restartable generator. It should be done alongside streaming support |
| + # implementation. |
| + |
| + # This push operation may be a retry after failed finalization call below, |
| + # no need to reupload contents in that case. |
| + if not item.push_state.uploaded: |
| + # A cheese way to avoid memcpy of (possibly huge) file, until streaming |
| + # upload support is implemented. |
| + if isinstance(content, list) and len(content) == 1: |
| + content = content[0] |
| + else: |
| + content = ''.join(content) |
| + # PUT file to |upload_url|. |
| + response = net.url_read( |
| + url=item.push_state.upload_url, |
| + data=content, |
| + content_type='application/octet-stream', |
| + method='PUT') |
| + if response is None: |
| + raise IOError('Failed to upload a file %s to %s' % ( |
| + item.digest, item.push_state.upload_url)) |
| + item.push_state.uploaded = True |
| else: |
| - content = ''.join(content_generator) |
| - |
| - if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE: |
| - return self._upload_hash_content_to_blobstore(item, content) |
| - |
| - url = '%sstore/%s/%s?token=%s' % ( |
| - self.content_url, self.namespace, item, self.token) |
| - return url_read(url, data=content, content_type='application/octet-stream') |
| - |
| - def contains(self, files): |
| - logging.info('Checking existence of %d files...', len(files)) |
| - |
| - body = ''.join( |
| - (binascii.unhexlify(metadata['h']) for (_, metadata) in files)) |
| - assert (len(body) % self.algo().digest_size) == 0, repr(body) |
| + logging.info( |
| + 'A file %s already uploaded, retrying finalization only', item.digest) |
| + |
| + # Optionally notify the server that it's done. |
| + if item.push_state.finalize_url: |
| + # TODO(vadimsh): Calculate MD5 sum while uploading a file and send it to |
| + # isolated server. That way isolate server can verify that the data safely |
| + # reached Google Storage (GS provides MD5 of stored files). |
| + response = net.url_read( |
| + url=item.push_state.finalize_url, |
| + data='', |
| + content_type='application/json', |
| + method='POST') |
| + if response is None: |
| + raise IOError('Failed to finalize an upload of %s' % item.digest) |
| + item.push_state.finalized = True |
| + |
| + def contains(self, items): |
| + logging.info('Checking existence of %d files...', len(items)) |
| + |
| + # Request body is a json encoded list of dicts. |
| + body = [ |
| + { |
| + 'h': item.digest, |
| + 's': item.size, |
| + 'i': int(item.is_isolated), |
| + } for item in items |
| + ] |
| - query_url = '%scontains/%s?token=%s' % ( |
| - self.content_url, self.namespace, self.token) |
| - response = url_read( |
| - query_url, data=body, content_type='application/octet-stream') |
| - if len(files) != len(response): |
| + query_url = '%s/content-gs/pre-upload/%s?token=%s' % ( |
| + self.base_url, |
| + self.namespace, |
| + urllib.quote(self.server_capabilities['access_token'])) |
| + response_body = net.url_read( |
| + url=query_url, |
| + data=json.dumps(body, separators=(',', ':')), |
| + content_type='application/json', |
| + method='POST') |
| + if response_body is None: |
| + raise MappingError('Failed to execute /pre-upload query') |
| + |
| + # Response body is a list of push_urls (or null if file is already present). |
| + try: |
| + response = json.loads(response_body) |
| + if not isinstance(response, list): |
| + raise ValueError('Expecting response with json-encoded list') |
| + if len(response) != len(items): |
| + raise ValueError( |
| + 'Incorrect number of items in the list, expected %d, ' |
| + 'but got %d' % (len(files), len(response))) |
| + except ValueError as err: |
| raise MappingError( |
| - 'Got an incorrect number of responses from the server. Expected %d, ' |
| - 'but got %d' % (len(files), len(response))) |
| - |
| - # This implementation of IsolateServer doesn't use push_urls field, |
| - # set it to None. |
| - missing_files = [ |
| - files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00' |
| - ] |
| + 'Invalid response from server: %s, body is %s' % (err, response_body)) |
| + |
| + # Pick Items that are missing, attach PushState to them. |
| + missing_items = [] |
| + for i, push_urls in enumerate(response): |
| + if push_urls: |
| + assert len(push_urls) == 2, str(push_urls) |
| + item = items[i] |
| + item.push_state = IsolateServer.PushState(push_urls[0], push_urls[1]) |
| + missing_items.append(item) |
| logging.info('Queried %d files, %d cache hit', |
| - len(files), len(files) - len(missing_files)) |
| - return missing_files |
| - |
| - def _upload_hash_content_to_blobstore(self, item, content): |
| - """Uploads the content directly to the blobstore via a generated url.""" |
| - # TODO(maruel): Support large files. This would require streaming support. |
| - gen_url = '%sgenerate_blobstore_url/%s/%s' % ( |
| - self.content_url, self.namespace, item) |
| - # Token is guaranteed to be already quoted but it is unnecessary here, and |
| - # only here. |
| - data = [('token', urllib.unquote(self.token))] |
| - content_type, body = encode_multipart_formdata( |
| - data, [('content', item, content)]) |
| - last_url = gen_url |
| - for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS): |
| - # Retry HTTP 50x here but not 404. |
| - upload_url = net.url_read(gen_url, data=data) |
| - if not upload_url: |
| - raise MappingError('Unable to connect to server %s' % gen_url) |
| - last_url = upload_url |
| - |
| - # Do not retry this request on HTTP 50x. Regenerate an upload url each |
| - # time since uploading "consumes" the upload url. |
| - result = net.url_read( |
| - upload_url, data=body, content_type=content_type, retry_50x=False) |
| - if result is not None: |
| - return result |
| - raise MappingError('Unable to connect to server %s' % last_url) |
| + len(items), len(items) - len(missing_items)) |
| + return missing_items |
| class FileSystem(StorageApi): |
| @@ -690,34 +799,32 @@ class FileSystem(StorageApi): |
| The common use case is a NFS/CIFS file server that is mounted locally that is |
| used to fetch the file on a local partition. |
| """ |
| + |
| def __init__(self, base_path): |
| super(FileSystem, self).__init__() |
| self.base_path = base_path |
| - def fetch(self, item, expected_size): |
| - assert isinstance(item, basestring) |
| - assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE |
| - source = os.path.join(self.base_path, item) |
| - if (expected_size != UNKNOWN_FILE_SIZE and |
| - not is_valid_file(source, expected_size)): |
| - raise IOError('Invalid file %s' % item) |
| + def fetch(self, digest, size): |
| + assert isinstance(digest, basestring) |
| + assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE |
| + source = os.path.join(self.base_path, digest) |
| + if size != UNKNOWN_FILE_SIZE and not is_valid_file(source, size): |
| + raise IOError('Invalid file %s' % digest) |
| return file_read(source) |
| - def push(self, item, expected_size, content_generator, push_urls=None): |
| - assert isinstance(item, basestring) |
| - assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE |
| - dest = os.path.join(self.base_path, item) |
| - total = file_write(dest, content_generator) |
| - if expected_size != UNKNOWN_FILE_SIZE and total != expected_size: |
| + def push(self, item, content, size): |
| + assert isinstance(item, Item) |
| + assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE |
| + dest = os.path.join(self.base_path, item.digest) |
| + total = file_write(dest, content) |
| + if size != UNKNOWN_FILE_SIZE and total != size: |
| os.remove(dest) |
| - raise IOError( |
| - 'Invalid file %s, %d != %d' % (item, total, expected_size)) |
| + raise IOError('Invalid file %s, %d != %d' % (item.digest, total, size)) |
| - def contains(self, files): |
| + def contains(self, items): |
| return [ |
| - (filename, metadata, None) |
| - for filename, metadata in files |
| - if not os.path.exists(os.path.join(self.base_path, metadata['h'])) |
| + item for item in items |
| + if not os.path.exists(os.path.join(self.base_path, item.digest)) |
| ] |
| @@ -767,7 +874,7 @@ def upload_tree(base_url, indir, infiles, namespace): |
| query if an element was already uploaded, and |base_url|/store/ |
| can be used to upload a new element. |
| indir: Root directory the infiles are based in. |
| - infiles: dict of files to upload files from |indir| to |base_url|. |
| + infiles: dict of files to upload from |indir| to |base_url|. |
| namespace: The namespace to use on the server. |
| """ |
| remote = get_storage_api(base_url, namespace) |