Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2013 The Chromium Authors. All rights reserved. | 2 # Copyright 2013 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 | 5 |
| 6 """Archives a set of files to a server.""" | 6 """Archives a set of files to a server.""" |
| 7 | 7 |
| 8 __version__ = '0.2' | 8 __version__ = '0.2' |
| 9 | 9 |
| 10 import binascii | |
| 11 import hashlib | 10 import hashlib |
| 12 import json | 11 import json |
| 13 import logging | 12 import logging |
| 14 import os | 13 import os |
| 15 import random | |
| 16 import re | 14 import re |
| 17 import sys | 15 import sys |
| 18 import threading | 16 import threading |
| 19 import time | 17 import time |
| 20 import urllib | 18 import urllib |
| 21 import zlib | 19 import zlib |
| 22 | 20 |
| 23 from third_party import colorama | 21 from third_party import colorama |
| 24 from third_party.depot_tools import fix_encoding | 22 from third_party.depot_tools import fix_encoding |
| 25 from third_party.depot_tools import subcommand | 23 from third_party.depot_tools import subcommand |
| 26 | 24 |
| 27 from utils import net | 25 from utils import net |
| 28 from utils import threading_utils | 26 from utils import threading_utils |
| 29 from utils import tools | 27 from utils import tools |
| 30 | 28 |
| 31 | 29 |
| 32 # The minimum size of files to upload directly to the blobstore. | 30 # Version of isolate protocol passed to the server in /handshake request. |
| 33 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 | 31 ISOLATE_PROTOCOL_VERSION = '1.0' |
| 34 | 32 |
| 35 # The number of files to check the isolate server per /contains query. | 33 |
| 34 # The number of files to check the isolate server per /pre-upload query. | |
| 36 # All files are sorted by likelihood of a change in the file content | 35 # All files are sorted by likelihood of a change in the file content |
| 37 # (currently file size is used to estimate this: larger the file -> larger the | 36 # (currently file size is used to estimate this: larger the file -> larger the |
| 38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files | 37 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files |
| 39 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], | 38 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1], |
| 40 # and so on. Numbers here is a trade-off; the more per request, the lower the | 39 # and so on. Numbers here is a trade-off; the more per request, the lower the |
| 41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, | 40 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, |
| 42 # larger values cause longer lookups, increasing the initial latency to start | 41 # larger values cause longer lookups, increasing the initial latency to start |
| 43 # uploading, which is especially an issue for large files. This value is | 42 # uploading, which is especially an issue for large files. This value is |
| 44 # optimized for the "few thousands files to look up with minimal number of large | 43 # optimized for the "few thousands files to look up with minimal number of large |
| 45 # files missing" case. | 44 # files missing" case. |
| 46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] | 45 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] |
| 47 | 46 |
| 48 | 47 |
| 49 # A list of already compressed extension types that should not receive any | 48 # A list of already compressed extension types that should not receive any |
| 50 # compression before being uploaded. | 49 # compression before being uploaded. |
| 51 ALREADY_COMPRESSED_TYPES = [ | 50 ALREADY_COMPRESSED_TYPES = [ |
| 52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', | 51 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', |
| 53 'wav', 'zip' | 52 'wav', 'zip' |
| 54 ] | 53 ] |
| 55 | 54 |
| 56 | 55 |
| 57 # The file size to be used when we don't know the correct file size, | 56 # The file size to be used when we don't know the correct file size, |
| 58 # generally used for .isolated files. | 57 # generally used for .isolated files. |
| 59 UNKNOWN_FILE_SIZE = None | 58 UNKNOWN_FILE_SIZE = None |
| 60 | 59 |
| 61 | 60 |
| 62 # The size of each chunk to read when downloading and unzipping files. | 61 # The size of each chunk to read when downloading and unzipping files. |
| 63 ZIPPED_FILE_CHUNK = 16 * 1024 | 62 ZIPPED_FILE_CHUNK = 16 * 1024 |
| 64 | 63 |
| 65 | |
| 66 # Chunk size to use when doing disk I/O. | 64 # Chunk size to use when doing disk I/O. |
| 67 DISK_FILE_CHUNK = 1024 * 1024 | 65 DISK_FILE_CHUNK = 1024 * 1024 |
| 68 | 66 |
| 67 # Chunk size to use when reading from network stream. | |
| 68 NET_IO_FILE_CHUNK = 16 * 1024 | |
| 69 | |
| 69 | 70 |
| 70 # Read timeout in seconds for downloads from isolate storage. If there's no | 71 # Read timeout in seconds for downloads from isolate storage. If there's no |
| 71 # response from the server within this timeout whole download will be aborted. | 72 # response from the server within this timeout whole download will be aborted. |
| 72 DOWNLOAD_READ_TIMEOUT = 60 | 73 DOWNLOAD_READ_TIMEOUT = 60 |
| 73 | 74 |
| 74 # Maximum expected delay (in seconds) between successive file fetches | 75 # Maximum expected delay (in seconds) between successive file fetches |
| 75 # in run_tha_test. If it takes longer than that, a deadlock might be happening | 76 # in run_tha_test. If it takes longer than that, a deadlock might be happening |
| 76 # and all stack frames for all threads are dumped to log. | 77 # and all stack frames for all threads are dumped to log. |
| 77 DEADLOCK_TIMEOUT = 5 * 60 | 78 DEADLOCK_TIMEOUT = 5 * 60 |
| 78 | 79 |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 99 class ConfigError(ValueError): | 100 class ConfigError(ValueError): |
| 100 """Generic failure to load a .isolated file.""" | 101 """Generic failure to load a .isolated file.""" |
| 101 pass | 102 pass |
| 102 | 103 |
| 103 | 104 |
| 104 class MappingError(OSError): | 105 class MappingError(OSError): |
| 105 """Failed to recreate the tree.""" | 106 """Failed to recreate the tree.""" |
| 106 pass | 107 pass |
| 107 | 108 |
| 108 | 109 |
| 109 def randomness(): | |
| 110 """Generates low-entropy randomness for MIME encoding. | |
| 111 | |
| 112 Exists so it can be mocked out in unit tests. | |
| 113 """ | |
| 114 return str(time.time()) | |
| 115 | |
| 116 | |
| 117 def encode_multipart_formdata(fields, files, | |
| 118 mime_mapper=lambda _: 'application/octet-stream'): | |
| 119 """Encodes a Multipart form data object. | |
| 120 | |
| 121 Args: | |
| 122 fields: a sequence (name, value) elements for | |
| 123 regular form fields. | |
| 124 files: a sequence of (name, filename, value) elements for data to be | |
| 125 uploaded as files. | |
| 126 mime_mapper: function to return the mime type from the filename. | |
| 127 Returns: | |
| 128 content_type: for httplib.HTTP instance | |
| 129 body: for httplib.HTTP instance | |
| 130 """ | |
| 131 boundary = hashlib.md5(randomness()).hexdigest() | |
| 132 body_list = [] | |
| 133 for (key, value) in fields: | |
| 134 if isinstance(key, unicode): | |
| 135 value = key.encode('utf-8') | |
| 136 if isinstance(value, unicode): | |
| 137 value = value.encode('utf-8') | |
| 138 body_list.append('--' + boundary) | |
| 139 body_list.append('Content-Disposition: form-data; name="%s"' % key) | |
| 140 body_list.append('') | |
| 141 body_list.append(value) | |
| 142 body_list.append('--' + boundary) | |
| 143 body_list.append('') | |
| 144 for (key, filename, value) in files: | |
| 145 if isinstance(key, unicode): | |
| 146 value = key.encode('utf-8') | |
| 147 if isinstance(filename, unicode): | |
| 148 value = filename.encode('utf-8') | |
| 149 if isinstance(value, unicode): | |
| 150 value = value.encode('utf-8') | |
| 151 body_list.append('--' + boundary) | |
| 152 body_list.append('Content-Disposition: form-data; name="%s"; ' | |
| 153 'filename="%s"' % (key, filename)) | |
| 154 body_list.append('Content-Type: %s' % mime_mapper(filename)) | |
| 155 body_list.append('') | |
| 156 body_list.append(value) | |
| 157 body_list.append('--' + boundary) | |
| 158 body_list.append('') | |
| 159 if body_list: | |
| 160 body_list[-2] += '--' | |
| 161 body = '\r\n'.join(body_list) | |
| 162 content_type = 'multipart/form-data; boundary=%s' % boundary | |
| 163 return content_type, body | |
| 164 | |
| 165 | |
| 166 def is_valid_hash(value, algo): | 110 def is_valid_hash(value, algo): |
| 167 """Returns if the value is a valid hash for the corresponding algorithm.""" | 111 """Returns if the value is a valid hash for the corresponding algorithm.""" |
| 168 size = 2 * algo().digest_size | 112 size = 2 * algo().digest_size |
| 169 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) | 113 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) |
| 170 | 114 |
| 171 | 115 |
| 172 def hash_file(filepath, algo): | 116 def hash_file(filepath, algo): |
| 173 """Calculates the hash of a file without reading it all in memory at once. | 117 """Calculates the hash of a file without reading it all in memory at once. |
| 174 | 118 |
| 175 |algo| should be one of hashlib hashing algorithm. | 119 |algo| should be one of hashlib hashing algorithm. |
| 176 """ | 120 """ |
| 177 digest = algo() | 121 digest = algo() |
| 178 with open(filepath, 'rb') as f: | 122 with open(filepath, 'rb') as f: |
| 179 while True: | 123 while True: |
| 180 chunk = f.read(DISK_FILE_CHUNK) | 124 chunk = f.read(DISK_FILE_CHUNK) |
| 181 if not chunk: | 125 if not chunk: |
| 182 break | 126 break |
| 183 digest.update(chunk) | 127 digest.update(chunk) |
| 184 return digest.hexdigest() | 128 return digest.hexdigest() |
| 185 | 129 |
| 186 | 130 |
| 131 def stream_read(stream, chunk_size): | |
| 132 """Reads chunks from |stream| and yields them.""" | |
| 133 while True: | |
| 134 data = stream.read(chunk_size) | |
| 135 if not data: | |
| 136 break | |
| 137 yield data | |
| 138 | |
| 139 | |
| 187 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): | 140 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): |
| 188 """Yields file content in chunks of given |chunk_size|.""" | 141 """Yields file content in chunks of given |chunk_size|.""" |
| 189 with open(filepath, 'rb') as f: | 142 with open(filepath, 'rb') as f: |
| 190 while True: | 143 while True: |
| 191 data = f.read(chunk_size) | 144 data = f.read(chunk_size) |
| 192 if not data: | 145 if not data: |
| 193 break | 146 break |
| 194 yield data | 147 yield data |
| 195 | 148 |
| 196 | 149 |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 219 compressor = zlib.compressobj(level) | 172 compressor = zlib.compressobj(level) |
| 220 for chunk in content_generator: | 173 for chunk in content_generator: |
| 221 compressed = compressor.compress(chunk) | 174 compressed = compressor.compress(chunk) |
| 222 if compressed: | 175 if compressed: |
| 223 yield compressed | 176 yield compressed |
| 224 tail = compressor.flush(zlib.Z_FINISH) | 177 tail = compressor.flush(zlib.Z_FINISH) |
| 225 if tail: | 178 if tail: |
| 226 yield tail | 179 yield tail |
| 227 | 180 |
| 228 | 181 |
| 182 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK): | |
| 183 """Reads zipped data from |content_generator| and yields decompressed data. | |
| 184 | |
| 185 Decompresses data in small chunks (no larger than |chunk_size|) so that | |
| 186 zip bomb file doesn't cause zlib to preallocate huge amount of memory. | |
| 187 | |
| 188 Raises IOError if data is corrupted or incomplete. | |
| 189 """ | |
| 190 decompressor = zlib.decompressobj() | |
| 191 compressed_size = 0 | |
| 192 try: | |
| 193 for chunk in content_generator: | |
| 194 compressed_size += len(chunk) | |
| 195 data = decompressor.decompress(chunk, chunk_size) | |
| 196 if data: | |
| 197 yield data | |
| 198 while decompressor.unconsumed_tail: | |
| 199 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size) | |
| 200 if data: | |
| 201 yield data | |
| 202 tail = decompressor.flush() | |
| 203 if tail: | |
| 204 yield tail | |
| 205 except zlib.error as e: | |
| 206 raise IOError( | |
| 207 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e)) | |
| 208 # Ensure all data was read and decompressed. | |
| 209 if decompressor.unused_data or decompressor.unconsumed_tail: | |
| 210 raise IOError('Not all data was decompressed') | |
| 211 | |
| 212 | |
| 229 def get_zip_compression_level(filename): | 213 def get_zip_compression_level(filename): |
| 230 """Given a filename calculates the ideal zip compression level to use.""" | 214 """Given a filename calculates the ideal zip compression level to use.""" |
| 231 file_ext = os.path.splitext(filename)[1].lower() | 215 file_ext = os.path.splitext(filename)[1].lower() |
| 232 # TODO(csharp): Profile to find what compression level works best. | 216 # TODO(csharp): Profile to find what compression level works best. |
| 233 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 | 217 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 |
| 234 | 218 |
| 235 | 219 |
| 236 def create_directories(base_directory, files): | 220 def create_directories(base_directory, files): |
| 237 """Creates the directory structure needed by the given list of files.""" | 221 """Creates the directory structure needed by the given list of files.""" |
| 238 logging.debug('create_directories(%s, %d)', base_directory, len(files)) | 222 logging.debug('create_directories(%s, %d)', base_directory, len(files)) |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 294 | 278 |
| 295 | 279 |
| 296 def try_remove(filepath): | 280 def try_remove(filepath): |
| 297 """Removes a file without crashing even if it doesn't exist.""" | 281 """Removes a file without crashing even if it doesn't exist.""" |
| 298 try: | 282 try: |
| 299 os.remove(filepath) | 283 os.remove(filepath) |
| 300 except OSError: | 284 except OSError: |
| 301 pass | 285 pass |
| 302 | 286 |
| 303 | 287 |
| 304 def url_read(url, **kwargs): | 288 class Item(object): |
| 305 result = net.url_read(url, **kwargs) | 289 """An item to push to Storage. |
| 306 if result is None: | 290 |
| 307 # If we get no response from the server, assume it is down and raise an | 291 It starts its life in a main thread, travels to 'contains' thread, then to |
| 308 # exception. | 292 'push' thread and then finally back to the main thread. |
| 309 raise MappingError('Unable to connect to server %s' % url) | 293 |
| 310 return result | 294 It is never used concurrently from multiple threads. |
| 295 """ | |
| 296 | |
| 297 def __init__(self, digest, size, is_isolated): | |
| 298 self.digest = digest | |
| 299 self.size = size | |
| 300 self.is_isolated = is_isolated | |
| 301 self.compression_level = 6 | |
| 302 self.push_state = None | |
| 303 | |
| 304 def content(self, chunk_size): | |
| 305 """Generator that produces content of this item in chunks of given size.""" | |
| 306 raise NotImplementedError() | |
| 307 | |
| 308 | |
| 309 class FileItem(Item): | |
| 310 """A file to push to Storage.""" | |
| 311 | |
| 312 def __init__(self, path, digest, size, is_isolated): | |
| 313 super(FileItem, self).__init__(digest, size, is_isolated) | |
| 314 self.path = path | |
| 315 self.compression_level = get_zip_compression_level(path) | |
| 316 | |
| 317 def content(self, chunk_size): | |
| 318 return file_read(self.path, chunk_size) | |
| 311 | 319 |
| 312 | 320 |
| 313 class Storage(object): | 321 class Storage(object): |
| 314 """Efficiently downloads or uploads large set of files via StorageApi.""" | 322 """Efficiently downloads or uploads large set of files via StorageApi.""" |
| 315 | 323 |
| 316 def __init__(self, storage_api, use_zip): | 324 def __init__(self, storage_api, use_zip): |
| 317 self.use_zip = use_zip | 325 self.use_zip = use_zip |
| 318 self._storage_api = storage_api | 326 self._storage_api = storage_api |
| 319 self._cpu_thread_pool = None | 327 self._cpu_thread_pool = None |
| 320 self._net_thread_pool = None | 328 self._net_thread_pool = None |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 356 | 364 |
| 357 def upload_tree(self, indir, infiles): | 365 def upload_tree(self, indir, infiles): |
| 358 """Uploads the given tree to the isolate server. | 366 """Uploads the given tree to the isolate server. |
| 359 | 367 |
| 360 Arguments: | 368 Arguments: |
| 361 indir: root directory the infiles are based in. | 369 indir: root directory the infiles are based in. |
| 362 infiles: dict of files to upload from |indir|. | 370 infiles: dict of files to upload from |indir|. |
| 363 """ | 371 """ |
| 364 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles)) | 372 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles)) |
| 365 | 373 |
| 374 # TODO(vadimsh): Introduce Item as a part of the public interface? | |
|
M-A Ruel
2013/09/30 22:36:33
I think it'd be better if the caller created the i
Vadim Sh.
2013/09/30 23:07:25
Yes. I agree. I left it for future CLs.
| |
| 375 | |
| 376 # Convert |indir| + |infiles| into a list of FileItem objects. | |
| 377 # Filter out symlinks, since they are not represented by items on isolate | |
| 378 # server side. | |
| 379 items = [ | |
| 380 FileItem( | |
| 381 path=os.path.join(indir, filepath), | |
| 382 digest=metadata['h'], | |
| 383 size=metadata['s'], | |
| 384 is_isolated=metadata.get('priority') == '0') | |
| 385 for filepath, metadata in infiles.iteritems() | |
| 386 if 'l' not in metadata | |
| 387 ] | |
| 388 | |
| 366 # Enqueue all upload tasks. | 389 # Enqueue all upload tasks. |
| 390 missing = set() | |
| 367 channel = threading_utils.TaskChannel() | 391 channel = threading_utils.TaskChannel() |
| 368 missing = [] | 392 for missing_item in self.get_missing_items(items): |
| 369 for filename, metadata, push_urls in self.get_missing_files(infiles): | 393 missing.add(missing_item) |
| 370 missing.append((filename, metadata)) | 394 self.async_push( |
| 371 path = os.path.join(indir, filename) | 395 channel, |
| 372 if metadata.get('priority', '1') == '0': | 396 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED, |
| 373 priority = WorkerPool.HIGH | 397 missing_item) |
| 374 else: | |
| 375 priority = WorkerPool.MED | |
| 376 compression_level = get_zip_compression_level(path) | |
| 377 chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK | |
| 378 content = file_read(path, chunk_size) | |
| 379 self.async_push(channel, priority, metadata['h'], metadata['s'], | |
| 380 content, compression_level, push_urls) | |
| 381 | 398 |
| 382 # No need to spawn deadlock detector thread if there's nothing to upload. | 399 # No need to spawn deadlock detector thread if there's nothing to upload. |
| 383 if missing: | 400 if missing: |
| 384 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: | 401 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: |
| 385 # Wait for all started uploads to finish. | 402 # Wait for all started uploads to finish. |
| 386 uploaded = 0 | 403 uploaded = 0 |
| 387 while uploaded != len(missing): | 404 while uploaded != len(missing): |
| 388 detector.ping() | 405 detector.ping() |
| 389 item = channel.pull() | 406 item = channel.pull() |
| 390 uploaded += 1 | 407 uploaded += 1 |
| 391 logging.debug('Uploaded %d / %d: %s', uploaded, len(missing), item) | 408 logging.debug( |
| 409 'Uploaded %d / %d: %s', uploaded, len(missing), item.digest) | |
| 392 logging.info('All files are uploaded') | 410 logging.info('All files are uploaded') |
| 393 | 411 |
| 394 # Print stats. | 412 # Print stats. |
| 395 total = len(infiles) | 413 total = len(items) |
| 396 total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues()) | 414 total_size = sum(f.size for f in items) |
| 397 logging.info( | 415 logging.info( |
| 398 'Total: %6d, %9.1fkb', | 416 'Total: %6d, %9.1fkb', |
| 399 total, | 417 total, |
| 400 sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.) | 418 total_size / 1024.) |
| 401 cache_hit = set(infiles.iterkeys()) - set(x[0] for x in missing) | 419 cache_hit = set(items) - missing |
| 402 cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit) | 420 cache_hit_size = sum(f.size for f in cache_hit) |
| 403 logging.info( | 421 logging.info( |
| 404 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', | 422 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', |
| 405 len(cache_hit), | 423 len(cache_hit), |
| 406 cache_hit_size / 1024., | 424 cache_hit_size / 1024., |
| 407 len(cache_hit) * 100. / total, | 425 len(cache_hit) * 100. / total, |
| 408 cache_hit_size * 100. / total_size if total_size else 0) | 426 cache_hit_size * 100. / total_size if total_size else 0) |
| 409 cache_miss = missing | 427 cache_miss = missing |
| 410 cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss) | 428 cache_miss_size = sum(f.size for f in cache_miss) |
| 411 logging.info( | 429 logging.info( |
| 412 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', | 430 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', |
| 413 len(cache_miss), | 431 len(cache_miss), |
| 414 cache_miss_size / 1024., | 432 cache_miss_size / 1024., |
| 415 len(cache_miss) * 100. / total, | 433 len(cache_miss) * 100. / total, |
| 416 cache_miss_size * 100. / total_size if total_size else 0) | 434 cache_miss_size * 100. / total_size if total_size else 0) |
| 417 | 435 |
| 418 def async_push(self, channel, priority, item, expected_size, | 436 def async_push(self, channel, priority, item): |
| 419 content_generator, compression_level, push_urls=None): | 437 """Starts asynchronous push to the server in a parallel thread. |
| 420 """Starts asynchronous push to the server in a parallel thread.""" | 438 |
| 439 Arguments: | |
| 440 channel: TaskChannel object that receives back |item| when upload ends. | |
| 441 priority: thread pool task priority for the push. | |
| 442 item: item to upload as instance of Item class. | |
| 443 """ | |
| 421 def push(content, size): | 444 def push(content, size): |
| 422 """Pushes an item and returns its id, to pass as a result to |channel|.""" | 445 """Pushes an item and returns its id, to pass as a result to |channel|.""" |
| 423 self._storage_api.push(item, size, content, push_urls) | 446 self._storage_api.push(item, content, size) |
| 424 return item | 447 return item |
| 425 | 448 |
| 426 # If zipping is not required, just start a push task. | 449 # If zipping is not required, just start a push task. |
| 427 if not self.use_zip: | 450 if not self.use_zip: |
| 428 self.net_thread_pool.add_task_with_channel(channel, priority, push, | 451 self.net_thread_pool.add_task_with_channel(channel, item.priority, push, |
| 429 content_generator, expected_size) | 452 item.content(DISK_FILE_CHUNK), item.size) |
| 430 return | 453 return |
| 431 | 454 |
| 432 # If zipping is enabled, zip in a separate thread. | 455 # If zipping is enabled, zip in a separate thread. |
| 433 def zip_and_push(): | 456 def zip_and_push(): |
| 434 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble | 457 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble |
| 435 # content right here. It will block until all file is zipped. | 458 # content right here. It will block until all file is zipped. |
| 436 try: | 459 try: |
| 437 stream = zip_compress(content_generator, compression_level) | 460 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK), |
| 461 item.compression_level) | |
| 438 data = ''.join(stream) | 462 data = ''.join(stream) |
| 439 except Exception as exc: | 463 except Exception as exc: |
| 440 logging.error('Failed to zip \'%s\': %s', item, exc) | 464 logging.error('Failed to zip \'%s\': %s', item, exc) |
| 441 channel.send_exception(exc) | 465 channel.send_exception(exc) |
| 442 return | 466 return |
| 443 self.net_thread_pool.add_task_with_channel(channel, priority, push, | 467 self.net_thread_pool.add_task_with_channel(channel, priority, push, |
| 444 [data], UNKNOWN_FILE_SIZE) | 468 [data], UNKNOWN_FILE_SIZE) |
| 445 self.cpu_thread_pool.add_task(0, zip_and_push) | 469 self.cpu_thread_pool.add_task(priority, zip_and_push) |
| 446 | 470 |
| 447 def get_missing_files(self, files): | 471 def get_missing_items(self, items): |
| 448 """Yields files that are missing from the server. | 472 """Yields items that are missing from the server. |
| 449 | 473 |
| 450 Issues multiple parallel queries via StorageApi's 'contains' method. | 474 Issues multiple parallel queries via StorageApi's 'contains' method. |
| 451 | 475 |
| 452 Arguments: | 476 Arguments: |
| 453 files: a dictionary file name -> metadata dict. | 477 items: a list of Item objects to check. |
| 454 | 478 |
| 455 Yields: | 479 Yields: |
| 456 Triplets (file name, metadata dict, push_urls object to pass to push). | 480 Item objects that are missing from the server. |
| 457 """ | 481 """ |
| 458 channel = threading_utils.TaskChannel() | 482 channel = threading_utils.TaskChannel() |
| 459 pending = 0 | 483 pending = 0 |
| 460 # Enqueue all requests. | 484 # Enqueue all requests. |
| 461 for batch in self.batch_files_for_check(files): | 485 for batch in self.batch_items_for_check(items): |
| 462 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH, | 486 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH, |
| 463 self._storage_api.contains, batch) | 487 self._storage_api.contains, batch) |
| 464 pending += 1 | 488 pending += 1 |
| 465 # Yield results as they come in. | 489 # Yield results as they come in. |
| 466 for _ in xrange(pending): | 490 for _ in xrange(pending): |
| 467 for missing in channel.pull(): | 491 for missing in channel.pull(): |
| 468 yield missing | 492 yield missing |
| 469 | 493 |
| 470 @staticmethod | 494 @staticmethod |
| 471 def batch_files_for_check(files): | 495 def batch_items_for_check(items): |
| 472 """Splits list of files to check for existence on the server into batches. | 496 """Splits list of items to check for existence on the server into batches. |
| 473 | 497 |
| 474 Each batch corresponds to a single 'exists?' query to the server via a call | 498 Each batch corresponds to a single 'exists?' query to the server via a call |
| 475 to StorageApi's 'contains' method. | 499 to StorageApi's 'contains' method. |
| 476 | 500 |
| 477 Arguments: | 501 Arguments: |
| 478 files: a dictionary file name -> metadata dict. | 502 items: a list of Item objects. |
| 479 | 503 |
| 480 Yields: | 504 Yields: |
| 481 Batches of files to query for existence in a single operation, | 505 Batches of items to query for existence in a single operation, |
| 482 each batch is a list of pairs: (file name, metadata dict). | 506 each batch is a list of Item objects. |
| 483 """ | 507 """ |
| 484 batch_count = 0 | 508 batch_count = 0 |
| 485 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0] | 509 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0] |
| 486 next_queries = [] | 510 next_queries = [] |
| 487 items = ((k, v) for k, v in files.iteritems() if 's' in v) | 511 for item in sorted(items, key=lambda x: x.size, reverse=True): |
| 488 for filename, metadata in sorted(items, key=lambda x: -x[1]['s']): | 512 next_queries.append(item) |
| 489 next_queries.append((filename, metadata)) | |
| 490 if len(next_queries) == batch_size_limit: | 513 if len(next_queries) == batch_size_limit: |
| 491 yield next_queries | 514 yield next_queries |
| 492 next_queries = [] | 515 next_queries = [] |
| 493 batch_count += 1 | 516 batch_count += 1 |
| 494 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[ | 517 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[ |
| 495 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)] | 518 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)] |
| 496 if next_queries: | 519 if next_queries: |
| 497 yield next_queries | 520 yield next_queries |
| 498 | 521 |
| 499 | 522 |
| 500 class StorageApi(object): | 523 class StorageApi(object): |
| 501 """Interface for classes that implement low-level storage operations.""" | 524 """Interface for classes that implement low-level storage operations.""" |
| 502 | 525 |
| 503 def fetch(self, item, expected_size): | 526 # TODO(vadimsh): Make 'fetch' use Item abstraction as well. |
|
M-A Ruel
2013/09/30 22:36:33
I don't think it's necessary. Personally I'd remov
Vadim Sh.
2013/09/30 23:07:25
Done.
| |
| 527 | |
| 528 def fetch(self, digest, size): | |
| 504 """Fetches an object and yields its content. | 529 """Fetches an object and yields its content. |
| 505 | 530 |
| 506 Arguments: | 531 Arguments: |
| 507 item: hash digest of item to download. | 532 digest: hash digest of item to download. |
| 508 expected_size: expected size of the item, to validate it. | 533 size: expected size of the item, to validate it. |
| 509 | 534 |
| 510 Yields: | 535 Yields: |
| 511 Chunks of downloaded item (as str objects). | 536 Chunks of downloaded item (as str objects). |
| 512 """ | 537 """ |
| 513 raise NotImplementedError() | 538 raise NotImplementedError() |
| 514 | 539 |
| 515 def push(self, item, expected_size, content_generator, push_urls=None): | 540 def push(self, item, content, size): |
| 516 """Uploads content generated by |content_generator| as |item|. | 541 """Uploads an |item| with content generated by |content| generator. |
| 517 | 542 |
| 518 Arguments: | 543 Arguments: |
| 519 item: hash digest of item to upload. | 544 item: Item object that holds information about an item being pushed. |
| 520 expected_size: total length of the content yielded by |content_generator|. | 545 content: a generator that yields chunks to push. |
| 521 content_generator: generator that produces chunks to push. | 546 size: expected size of stream produced by |content|. |
| 522 push_urls: optional URLs returned by 'contains' call for this item. | |
| 523 | 547 |
| 524 Returns: | 548 Returns: |
| 525 None. | 549 None. |
| 526 """ | 550 """ |
| 527 raise NotImplementedError() | 551 raise NotImplementedError() |
| 528 | 552 |
| 529 def contains(self, files): | 553 def contains(self, items): |
| 530 """Checks for existence of given |files| on the server. | 554 """Checks for existence of given |items| on the server. |
| 555 | |
| 556 Can also mutated state of the items by assigning an opaque implementation | |
| 557 specific object to Item's push_state attribute. | |
| 531 | 558 |
| 532 Arguments: | 559 Arguments: |
| 533 files: list of pairs (file name, metadata dict). | 560 items: list of Item objects. |
| 534 | 561 |
| 535 Returns: | 562 Returns: |
| 536 A list of files missing on server as a list of triplets | 563 A list of items missing on server as a list of Item objects. |
| 537 (file name, metadata dict, push_urls object to pass to push). | |
| 538 """ | 564 """ |
| 539 raise NotImplementedError() | 565 raise NotImplementedError() |
| 540 | 566 |
| 541 | 567 |
| 542 class IsolateServer(StorageApi): | 568 class IsolateServer(StorageApi): |
| 543 """StorageApi implementation that downloads and uploads to Isolate Server.""" | 569 """StorageApi implementation that downloads and uploads to Isolate Server. |
| 570 | |
| 571 It uploads and downloads directly from Google Storage whenever appropriate. | |
| 572 """ | |
| 573 | |
| 574 class PushState(object): | |
| 575 """A state of push operation carried along with Item in push_state.""" | |
| 576 def __init__(self, upload_url, finalize_url): | |
| 577 self.upload_url = upload_url | |
| 578 self.finalize_url = finalize_url | |
| 579 self.uploaded = False | |
| 580 self.finalized = False | |
| 581 | |
| 544 def __init__(self, base_url, namespace): | 582 def __init__(self, base_url, namespace): |
| 545 super(IsolateServer, self).__init__() | 583 super(IsolateServer, self).__init__() |
| 546 assert base_url.startswith('http'), base_url | 584 assert base_url.startswith('http'), base_url |
| 547 self.content_url = base_url.rstrip('/') + '/content/' | 585 self.base_url = base_url.rstrip('/') |
| 548 self.namespace = namespace | 586 self.namespace = namespace |
| 549 self.algo = get_hash_algo(namespace) | 587 self.algo = get_hash_algo(namespace) |
| 550 self._token = None | 588 self._use_zip = is_namespace_with_compression(namespace) |
| 551 self._lock = threading.Lock() | 589 self._lock = threading.Lock() |
| 590 self._server_caps = None | |
| 591 | |
| 592 @staticmethod | |
| 593 def generate_handshake_request(): | |
| 594 """Returns a dict to be sent as handshake request body.""" | |
| 595 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage. | |
| 596 return { | |
| 597 'client_app_version': __version__, | |
| 598 'fetcher': True, | |
| 599 'protocol_version': ISOLATE_PROTOCOL_VERSION, | |
| 600 'pusher': True, | |
| 601 } | |
| 602 | |
| 603 @staticmethod | |
| 604 def validate_handshake_response(caps): | |
| 605 """Validates and normalizes handshake response.""" | |
| 606 logging.info('Protocol version: %s', caps['protocol_version']) | |
| 607 logging.info('Server version: %s', caps['server_app_version']) | |
| 608 if caps.get('error'): | |
| 609 raise MappingError(caps['error']) | |
| 610 if not caps['access_token']: | |
| 611 raise ValueError('access_token is missing') | |
| 612 return caps | |
| 552 | 613 |
| 553 @property | 614 @property |
| 554 def token(self): | 615 def server_capabilities(self): |
| 616 """Performs handshake with the server if not yet done. | |
| 617 | |
| 618 Returns: | |
| 619 Server capabilities dictionary as returned by /handshake endpoint. | |
| 620 | |
| 621 Raises: | |
| 622 MappingError if server rejects the handshake. | |
| 623 """ | |
| 555 # TODO(maruel): Make this request much earlier asynchronously while the | 624 # TODO(maruel): Make this request much earlier asynchronously while the |
| 556 # files are being enumerated. | 625 # files are being enumerated. |
| 557 with self._lock: | 626 with self._lock: |
| 558 if not self._token: | 627 if self._server_caps is None: |
| 559 self._token = urllib.quote(url_read(self.content_url + 'get_token')) | 628 request_body = json.dumps( |
| 560 return self._token | 629 self.generate_handshake_request(), separators=(',', ':')) |
| 561 | 630 response = net.url_read( |
| 562 def fetch(self, item, expected_size): | 631 url=self.base_url + '/content-gs/handshake', |
| 563 assert isinstance(item, basestring) | 632 data=request_body, |
| 564 assert ( | 633 content_type='application/json', |
| 565 isinstance(expected_size, (int, long)) or | 634 method='POST') |
|
M-A Ruel
2013/09/30 22:36:33
optional nit: method is not strictly necessary
Vadim Sh.
2013/09/30 23:07:25
I keep it here for uniformity with rest of calls i
| |
| 566 expected_size == UNKNOWN_FILE_SIZE) | 635 if response is None: |
| 567 zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item) | 636 raise MappingError('Failed to perform handshake.') |
| 568 logging.debug('download_file(%s)', zipped_url) | 637 try: |
| 638 caps = json.loads(response) | |
| 639 if not isinstance(caps, dict): | |
| 640 raise ValueError('Expecting JSON dict') | |
| 641 self._server_caps = self.validate_handshake_response(caps) | |
| 642 except (ValueError, KeyError, TypeError) as exc: | |
| 643 # KeyError exception has very confusing str conversion: it's just a | |
| 644 # missing key value and nothing else. So print exception class name | |
| 645 # as well. | |
| 646 raise MappingError('Invalid handshake response (%s): %s' % ( | |
| 647 exc.__class__.__name__, exc)) | |
| 648 return self._server_caps | |
| 649 | |
| 650 def fetch(self, digest, size): | |
| 651 assert isinstance(digest, basestring) | |
| 652 assert (isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE) | |
| 653 | |
| 654 source_url = '%s/content-gs/retrieve/%s/%s' % ( | |
| 655 self.base_url, self.namespace, digest) | |
| 656 logging.debug('download_file(%s)', source_url) | |
| 569 | 657 |
| 570 # Because the app engine DB is only eventually consistent, retry 404 errors | 658 # Because the app engine DB is only eventually consistent, retry 404 errors |
| 571 # because the file might just not be visible yet (even though it has been | 659 # because the file might just not be visible yet (even though it has been |
| 572 # uploaded). | 660 # uploaded). |
| 573 connection = net.url_open( | 661 connection = net.url_open( |
| 574 zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) | 662 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) |
| 575 if not connection: | 663 if not connection: |
| 576 raise IOError('Unable to open connection to %s' % zipped_url) | 664 raise IOError('Unable to open connection to %s' % source_url) |
| 577 | 665 |
| 578 # TODO(maruel): Must only decompress when needed. | |
| 579 decompressor = zlib.decompressobj() | |
| 580 try: | 666 try: |
| 581 compressed_size = 0 | 667 # Prepare reading pipeline. |
| 582 decompressed_size = 0 | 668 generator = stream_read(connection, NET_IO_FILE_CHUNK) |
| 583 while True: | 669 if self._use_zip: |
| 584 chunk = connection.read(ZIPPED_FILE_CHUNK) | 670 generator = zip_decompress(generator, DISK_FILE_CHUNK) |
| 585 if not chunk: | 671 |
| 586 break | 672 # Read and yield data, calculate total length of the decompressed stream. |
| 587 compressed_size += len(chunk) | 673 total_size = 0 |
| 588 decompressed = decompressor.decompress(chunk) | 674 for chunk in generator: |
| 589 decompressed_size += len(decompressed) | 675 total_size += len(chunk) |
| 590 yield decompressed | 676 yield chunk |
| 591 | 677 |
| 592 # Ensure that all the data was properly decompressed. | 678 # Verify data length matches expectation. |
| 593 uncompressed_data = decompressor.flush() | 679 if size != UNKNOWN_FILE_SIZE and total_size != size: |
| 594 if uncompressed_data: | 680 raise IOError('Incorrect file size: expected %d, got %d' % ( |
| 595 raise IOError('Decompression failed') | 681 size, total_size)) |
| 596 if (expected_size != UNKNOWN_FILE_SIZE and | 682 |
| 597 decompressed_size != expected_size): | 683 except IOError as err: |
| 598 raise IOError('File incorrect size after download of %s. Got %s and ' | 684 logging.warning('Failed to fetch %s: %s', digest, err) |
| 599 'expected %s' % (item, decompressed_size, expected_size)) | 685 raise |
| 600 except zlib.error as e: | 686 |
| 601 msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % ( | 687 def push(self, item, content, size): |
| 602 item, compressed_size, connection.content_length, e) | 688 assert isinstance(item, Item) |
| 603 logging.warning(msg) | 689 assert isinstance(item.push_state, IsolateServer.PushState) |
| 604 | 690 assert not item.push_state.finalized |
| 605 # Testing seems to show that if a few machines are trying to download | |
| 606 # the same blob, they can cause each other to fail. So if we hit a zip | |
| 607 # error, this is the most likely cause (it only downloads some of the | |
| 608 # data). Randomly sleep for between 5 and 25 seconds to try and spread | |
| 609 # out the downloads. | |
| 610 sleep_duration = (random.random() * 20) + 5 | |
| 611 time.sleep(sleep_duration) | |
| 612 raise IOError(msg) | |
| 613 | |
| 614 def push(self, item, expected_size, content_generator, push_urls=None): | |
| 615 assert isinstance(item, basestring) | |
| 616 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE | |
| 617 item = str(item) | |
| 618 | 691 |
| 619 # TODO(maruel): Support large files. This would require streaming support. | 692 # TODO(maruel): Support large files. This would require streaming support. |
|
M-A Ruel
2013/09/30 22:36:33
I think you can remove the todo.
Vadim Sh.
2013/09/30 23:07:25
Done.
| |
| 620 | 693 |
| 621 # A cheese way to avoid memcpy of (possibly huge) file, until streaming | 694 # TODO(vadimsh): Do not read from |content_generator| when retrying push. |
| 622 # upload support is implemented. | 695 # If |content_generator| is indeed a generator, it can not be re-winded back |
| 623 if isinstance(content_generator, list) and len(content_generator) == 1: | 696 # to the beginning of the stream. A retry will find it exhausted. A possible |
| 624 content = content_generator[0] | 697 # solution is to wrap content_generator with some sort of caching |
| 698 # restartable generator. It should be done alongside streaming support | |
| 699 # implementation. | |
| 700 | |
| 701 # This push operation may be a retry after failed finalization call below, | |
| 702 # no need to reupload contents in that case. | |
| 703 if not item.push_state.uploaded: | |
| 704 # A cheese way to avoid memcpy of (possibly huge) file, until streaming | |
|
M-A Ruel
2013/09/30 22:36:33
s/cheese/cheezy/ :)
Vadim Sh.
2013/09/30 23:07:25
Done.
| |
| 705 # upload support is implemented. | |
| 706 if isinstance(content, list) and len(content) == 1: | |
| 707 content = content[0] | |
| 708 else: | |
| 709 content = ''.join(content) | |
| 710 # PUT file to |upload_url|. | |
| 711 response = net.url_read( | |
| 712 url=item.push_state.upload_url, | |
| 713 data=content, | |
| 714 content_type='application/octet-stream', | |
| 715 method='PUT') | |
| 716 if response is None: | |
| 717 raise IOError('Failed to upload a file %s to %s' % ( | |
| 718 item.digest, item.push_state.upload_url)) | |
| 719 item.push_state.uploaded = True | |
| 625 else: | 720 else: |
| 626 content = ''.join(content_generator) | 721 logging.info( |
| 627 | 722 'A file %s already uploaded, retrying finalization only', item.digest) |
| 628 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE: | 723 |
| 629 return self._upload_hash_content_to_blobstore(item, content) | 724 # Optionally notify the server that it's done. |
| 630 | 725 if item.push_state.finalize_url: |
| 631 url = '%sstore/%s/%s?token=%s' % ( | 726 # TODO(vadimsh): Calculate MD5 sum while uploading a file and send it to |
|
M-A Ruel
2013/09/30 22:36:33
IIRC, Google Storage only calculates the MD5 for s
Vadim Sh.
2013/09/30 23:07:25
Hmm... Docs say CRC32 is available for all objects
| |
| 632 self.content_url, self.namespace, item, self.token) | 727 # isolated server. That way isolate server can verify that the data safely |
| 633 return url_read(url, data=content, content_type='application/octet-stream') | 728 # reached Google Storage (GS provides MD5 of stored files). |
| 634 | 729 response = net.url_read( |
| 635 def contains(self, files): | 730 url=item.push_state.finalize_url, |
| 636 logging.info('Checking existence of %d files...', len(files)) | 731 data='', |
| 637 | 732 content_type='application/json', |
| 638 body = ''.join( | 733 method='POST') |
| 639 (binascii.unhexlify(metadata['h']) for (_, metadata) in files)) | 734 if response is None: |
| 640 assert (len(body) % self.algo().digest_size) == 0, repr(body) | 735 raise IOError('Failed to finalize an upload of %s' % item.digest) |
| 641 | 736 item.push_state.finalized = True |
| 642 query_url = '%scontains/%s?token=%s' % ( | 737 |
| 643 self.content_url, self.namespace, self.token) | 738 def contains(self, items): |
| 644 response = url_read( | 739 logging.info('Checking existence of %d files...', len(items)) |
| 645 query_url, data=body, content_type='application/octet-stream') | 740 |
| 646 if len(files) != len(response): | 741 # Request body is a json encoded list of dicts. |
| 742 body = [ | |
| 743 { | |
| 744 'h': item.digest, | |
| 745 's': item.size, | |
| 746 'i': int(item.is_isolated), | |
| 747 } for item in items | |
| 748 ] | |
| 749 | |
| 750 query_url = '%s/content-gs/pre-upload/%s?token=%s' % ( | |
| 751 self.base_url, | |
| 752 self.namespace, | |
| 753 urllib.quote(self.server_capabilities['access_token'])) | |
| 754 response_body = net.url_read( | |
| 755 url=query_url, | |
| 756 data=json.dumps(body, separators=(',', ':')), | |
| 757 content_type='application/json', | |
| 758 method='POST') | |
| 759 if response_body is None: | |
| 760 raise MappingError('Failed to execute /pre-upload query') | |
| 761 | |
| 762 # Response body is a list of push_urls (or null if file is already present). | |
| 763 try: | |
| 764 response = json.loads(response_body) | |
| 765 if not isinstance(response, list): | |
| 766 raise ValueError('Expecting response with json-encoded list') | |
| 767 if len(response) != len(items): | |
| 768 raise ValueError( | |
| 769 'Incorrect number of items in the list, expected %d, ' | |
| 770 'but got %d' % (len(files), len(response))) | |
| 771 except ValueError as err: | |
| 647 raise MappingError( | 772 raise MappingError( |
| 648 'Got an incorrect number of responses from the server. Expected %d, ' | 773 'Invalid response from server: %s, body is %s' % (err, response_body)) |
| 649 'but got %d' % (len(files), len(response))) | 774 |
| 650 | 775 # Pick Items that are missing, attach PushState to them. |
| 651 # This implementation of IsolateServer doesn't use push_urls field, | 776 missing_items = [] |
| 652 # set it to None. | 777 for i, push_urls in enumerate(response): |
| 653 missing_files = [ | 778 if push_urls: |
| 654 files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00' | 779 assert len(push_urls) == 2, str(push_urls) |
| 655 ] | 780 item = items[i] |
| 781 assert item.push_state is None | |
| 782 item.push_state = IsolateServer.PushState(push_urls[0], push_urls[1]) | |
| 783 missing_items.append(item) | |
| 656 logging.info('Queried %d files, %d cache hit', | 784 logging.info('Queried %d files, %d cache hit', |
| 657 len(files), len(files) - len(missing_files)) | 785 len(items), len(items) - len(missing_items)) |
| 658 return missing_files | 786 return missing_items |
| 659 | |
| 660 def _upload_hash_content_to_blobstore(self, item, content): | |
| 661 """Uploads the content directly to the blobstore via a generated url.""" | |
| 662 # TODO(maruel): Support large files. This would require streaming support. | |
| 663 gen_url = '%sgenerate_blobstore_url/%s/%s' % ( | |
| 664 self.content_url, self.namespace, item) | |
| 665 # Token is guaranteed to be already quoted but it is unnecessary here, and | |
| 666 # only here. | |
| 667 data = [('token', urllib.unquote(self.token))] | |
| 668 content_type, body = encode_multipart_formdata( | |
| 669 data, [('content', item, content)]) | |
| 670 last_url = gen_url | |
| 671 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS): | |
| 672 # Retry HTTP 50x here but not 404. | |
| 673 upload_url = net.url_read(gen_url, data=data) | |
| 674 if not upload_url: | |
| 675 raise MappingError('Unable to connect to server %s' % gen_url) | |
| 676 last_url = upload_url | |
| 677 | |
| 678 # Do not retry this request on HTTP 50x. Regenerate an upload url each | |
| 679 # time since uploading "consumes" the upload url. | |
| 680 result = net.url_read( | |
| 681 upload_url, data=body, content_type=content_type, retry_50x=False) | |
| 682 if result is not None: | |
| 683 return result | |
| 684 raise MappingError('Unable to connect to server %s' % last_url) | |
| 685 | 787 |
| 686 | 788 |
| 687 class FileSystem(StorageApi): | 789 class FileSystem(StorageApi): |
| 688 """StorageApi implementation that fetches data from the file system. | 790 """StorageApi implementation that fetches data from the file system. |
| 689 | 791 |
| 690 The common use case is a NFS/CIFS file server that is mounted locally that is | 792 The common use case is a NFS/CIFS file server that is mounted locally that is |
| 691 used to fetch the file on a local partition. | 793 used to fetch the file on a local partition. |
| 692 """ | 794 """ |
| 795 | |
| 693 def __init__(self, base_path): | 796 def __init__(self, base_path): |
| 694 super(FileSystem, self).__init__() | 797 super(FileSystem, self).__init__() |
| 695 self.base_path = base_path | 798 self.base_path = base_path |
| 696 | 799 |
| 697 def fetch(self, item, expected_size): | 800 def fetch(self, digest, size): |
| 698 assert isinstance(item, basestring) | 801 assert isinstance(digest, basestring) |
| 699 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE | 802 assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE |
| 700 source = os.path.join(self.base_path, item) | 803 source = os.path.join(self.base_path, digest) |
| 701 if (expected_size != UNKNOWN_FILE_SIZE and | 804 if size != UNKNOWN_FILE_SIZE and not is_valid_file(source, size): |
| 702 not is_valid_file(source, expected_size)): | 805 raise IOError('Invalid file %s' % digest) |
| 703 raise IOError('Invalid file %s' % item) | |
| 704 return file_read(source) | 806 return file_read(source) |
| 705 | 807 |
| 706 def push(self, item, expected_size, content_generator, push_urls=None): | 808 def push(self, item, content, size): |
| 707 assert isinstance(item, basestring) | 809 assert isinstance(item, Item) |
| 708 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE | 810 assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE |
| 709 dest = os.path.join(self.base_path, item) | 811 dest = os.path.join(self.base_path, item.digest) |
| 710 total = file_write(dest, content_generator) | 812 total = file_write(dest, content) |
| 711 if expected_size != UNKNOWN_FILE_SIZE and total != expected_size: | 813 if size != UNKNOWN_FILE_SIZE and total != size: |
| 712 os.remove(dest) | 814 os.remove(dest) |
| 713 raise IOError( | 815 raise IOError('Invalid file %s, %d != %d' % (item.digest, total, size)) |
| 714 'Invalid file %s, %d != %d' % (item, total, expected_size)) | |
| 715 | 816 |
| 716 def contains(self, files): | 817 def contains(self, items): |
| 717 return [ | 818 return [ |
| 718 (filename, metadata, None) | 819 item for item in items |
| 719 for filename, metadata in files | 820 if not os.path.exists(os.path.join(self.base_path, item.digest)) |
| 720 if not os.path.exists(os.path.join(self.base_path, metadata['h'])) | |
| 721 ] | 821 ] |
| 722 | 822 |
| 723 | 823 |
| 724 def get_hash_algo(_namespace): | 824 def get_hash_algo(_namespace): |
| 725 """Return hash algorithm class to use when uploading to given |namespace|.""" | 825 """Return hash algorithm class to use when uploading to given |namespace|.""" |
| 726 # TODO(vadimsh): Implement this at some point. | 826 # TODO(vadimsh): Implement this at some point. |
| 727 return hashlib.sha1 | 827 return hashlib.sha1 |
| 728 | 828 |
| 729 | 829 |
| 730 def is_namespace_with_compression(namespace): | 830 def is_namespace_with_compression(namespace): |
| (...skipping 29 matching lines...) Expand all Loading... | |
| 760 | 860 |
| 761 | 861 |
| 762 def upload_tree(base_url, indir, infiles, namespace): | 862 def upload_tree(base_url, indir, infiles, namespace): |
| 763 """Uploads the given tree to the given url. | 863 """Uploads the given tree to the given url. |
| 764 | 864 |
| 765 Arguments: | 865 Arguments: |
| 766 base_url: The base url, it is assume that |base_url|/has/ can be used to | 866 base_url: The base url, it is assume that |base_url|/has/ can be used to |
| 767 query if an element was already uploaded, and |base_url|/store/ | 867 query if an element was already uploaded, and |base_url|/store/ |
| 768 can be used to upload a new element. | 868 can be used to upload a new element. |
| 769 indir: Root directory the infiles are based in. | 869 indir: Root directory the infiles are based in. |
| 770 infiles: dict of files to upload files from |indir| to |base_url|. | 870 infiles: dict of files to upload from |indir| to |base_url|. |
| 771 namespace: The namespace to use on the server. | 871 namespace: The namespace to use on the server. |
| 772 """ | 872 """ |
| 773 remote = get_storage_api(base_url, namespace) | 873 remote = get_storage_api(base_url, namespace) |
| 774 with Storage(remote, is_namespace_with_compression(namespace)) as storage: | 874 with Storage(remote, is_namespace_with_compression(namespace)) as storage: |
| 775 storage.upload_tree(indir, infiles) | 875 storage.upload_tree(indir, infiles) |
| 776 return 0 | 876 return 0 |
| 777 | 877 |
| 778 | 878 |
| 779 class MemoryCache(object): | 879 class MemoryCache(object): |
| 780 """This class is intended to be usable everywhere the Cache class is. | 880 """This class is intended to be usable everywhere the Cache class is. |
| (...skipping 521 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1302 sys.stderr.write(str(e)) | 1402 sys.stderr.write(str(e)) |
| 1303 sys.stderr.write('\n') | 1403 sys.stderr.write('\n') |
| 1304 return 1 | 1404 return 1 |
| 1305 | 1405 |
| 1306 | 1406 |
| 1307 if __name__ == '__main__': | 1407 if __name__ == '__main__': |
| 1308 fix_encoding.fix_encoding() | 1408 fix_encoding.fix_encoding() |
| 1309 tools.disable_buffering() | 1409 tools.disable_buffering() |
| 1310 colorama.init() | 1410 colorama.init() |
| 1311 sys.exit(main(sys.argv[1:])) | 1411 sys.exit(main(sys.argv[1:])) |
| OLD | NEW |