| OLD | NEW |
| (Empty) |
| 1 #!/usr/bin/env python | |
| 2 # Copyright 2013 The Chromium Authors. All rights reserved. | |
| 3 # Use of this source code is governed by a BSD-style license that can be | |
| 4 # found in the LICENSE file. | |
| 5 | |
| 6 """Archives a set of files to a server.""" | |
| 7 | |
| 8 __version__ = '0.2' | |
| 9 | |
| 10 import functools | |
| 11 import hashlib | |
| 12 import json | |
| 13 import logging | |
| 14 import os | |
| 15 import re | |
| 16 import sys | |
| 17 import threading | |
| 18 import time | |
| 19 import urllib | |
| 20 import zlib | |
| 21 | |
| 22 from third_party import colorama | |
| 23 from third_party.depot_tools import fix_encoding | |
| 24 from third_party.depot_tools import subcommand | |
| 25 | |
| 26 from utils import net | |
| 27 from utils import threading_utils | |
| 28 from utils import tools | |
| 29 | |
| 30 | |
| 31 # Version of isolate protocol passed to the server in /handshake request. | |
| 32 ISOLATE_PROTOCOL_VERSION = '1.0' | |
| 33 | |
| 34 | |
| 35 # The number of files to check the isolate server per /pre-upload query. | |
| 36 # All files are sorted by likelihood of a change in the file content | |
| 37 # (currently file size is used to estimate this: larger the file -> larger the | |
| 38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files | |
| 39 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1], | |
| 40 # and so on. Numbers here is a trade-off; the more per request, the lower the | |
| 41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, | |
| 42 # larger values cause longer lookups, increasing the initial latency to start | |
| 43 # uploading, which is especially an issue for large files. This value is | |
| 44 # optimized for the "few thousands files to look up with minimal number of large | |
| 45 # files missing" case. | |
| 46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] | |
| 47 | |
| 48 | |
| 49 # A list of already compressed extension types that should not receive any | |
| 50 # compression before being uploaded. | |
| 51 ALREADY_COMPRESSED_TYPES = [ | |
| 52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', | |
| 53 'wav', 'zip' | |
| 54 ] | |
| 55 | |
| 56 | |
| 57 # The file size to be used when we don't know the correct file size, | |
| 58 # generally used for .isolated files. | |
| 59 UNKNOWN_FILE_SIZE = None | |
| 60 | |
| 61 | |
| 62 # The size of each chunk to read when downloading and unzipping files. | |
| 63 ZIPPED_FILE_CHUNK = 16 * 1024 | |
| 64 | |
| 65 # Chunk size to use when doing disk I/O. | |
| 66 DISK_FILE_CHUNK = 1024 * 1024 | |
| 67 | |
| 68 # Chunk size to use when reading from network stream. | |
| 69 NET_IO_FILE_CHUNK = 16 * 1024 | |
| 70 | |
| 71 | |
| 72 # Read timeout in seconds for downloads from isolate storage. If there's no | |
| 73 # response from the server within this timeout whole download will be aborted. | |
| 74 DOWNLOAD_READ_TIMEOUT = 60 | |
| 75 | |
| 76 # Maximum expected delay (in seconds) between successive file fetches | |
| 77 # in run_tha_test. If it takes longer than that, a deadlock might be happening | |
| 78 # and all stack frames for all threads are dumped to log. | |
| 79 DEADLOCK_TIMEOUT = 5 * 60 | |
| 80 | |
| 81 | |
| 82 # The delay (in seconds) to wait between logging statements when retrieving | |
| 83 # the required files. This is intended to let the user (or buildbot) know that | |
| 84 # the program is still running. | |
| 85 DELAY_BETWEEN_UPDATES_IN_SECS = 30 | |
| 86 | |
| 87 | |
| 88 # Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly | |
| 89 # specify the names here. | |
| 90 SUPPORTED_ALGOS = { | |
| 91 'md5': hashlib.md5, | |
| 92 'sha-1': hashlib.sha1, | |
| 93 'sha-512': hashlib.sha512, | |
| 94 } | |
| 95 | |
| 96 | |
| 97 # Used for serialization. | |
| 98 SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems()) | |
| 99 | |
| 100 | |
| 101 class ConfigError(ValueError): | |
| 102 """Generic failure to load a .isolated file.""" | |
| 103 pass | |
| 104 | |
| 105 | |
| 106 class MappingError(OSError): | |
| 107 """Failed to recreate the tree.""" | |
| 108 pass | |
| 109 | |
| 110 | |
| 111 def is_valid_hash(value, algo): | |
| 112 """Returns if the value is a valid hash for the corresponding algorithm.""" | |
| 113 size = 2 * algo().digest_size | |
| 114 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) | |
| 115 | |
| 116 | |
| 117 def hash_file(filepath, algo): | |
| 118 """Calculates the hash of a file without reading it all in memory at once. | |
| 119 | |
| 120 |algo| should be one of hashlib hashing algorithm. | |
| 121 """ | |
| 122 digest = algo() | |
| 123 with open(filepath, 'rb') as f: | |
| 124 while True: | |
| 125 chunk = f.read(DISK_FILE_CHUNK) | |
| 126 if not chunk: | |
| 127 break | |
| 128 digest.update(chunk) | |
| 129 return digest.hexdigest() | |
| 130 | |
| 131 | |
| 132 def stream_read(stream, chunk_size): | |
| 133 """Reads chunks from |stream| and yields them.""" | |
| 134 while True: | |
| 135 data = stream.read(chunk_size) | |
| 136 if not data: | |
| 137 break | |
| 138 yield data | |
| 139 | |
| 140 | |
| 141 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): | |
| 142 """Yields file content in chunks of given |chunk_size|.""" | |
| 143 with open(filepath, 'rb') as f: | |
| 144 while True: | |
| 145 data = f.read(chunk_size) | |
| 146 if not data: | |
| 147 break | |
| 148 yield data | |
| 149 | |
| 150 | |
| 151 def file_write(filepath, content_generator): | |
| 152 """Writes file content as generated by content_generator. | |
| 153 | |
| 154 Creates the intermediary directory as needed. | |
| 155 | |
| 156 Returns the number of bytes written. | |
| 157 | |
| 158 Meant to be mocked out in unit tests. | |
| 159 """ | |
| 160 filedir = os.path.dirname(filepath) | |
| 161 if not os.path.isdir(filedir): | |
| 162 os.makedirs(filedir) | |
| 163 total = 0 | |
| 164 with open(filepath, 'wb') as f: | |
| 165 for d in content_generator: | |
| 166 total += len(d) | |
| 167 f.write(d) | |
| 168 return total | |
| 169 | |
| 170 | |
| 171 def zip_compress(content_generator, level=7): | |
| 172 """Reads chunks from |content_generator| and yields zip compressed chunks.""" | |
| 173 compressor = zlib.compressobj(level) | |
| 174 for chunk in content_generator: | |
| 175 compressed = compressor.compress(chunk) | |
| 176 if compressed: | |
| 177 yield compressed | |
| 178 tail = compressor.flush(zlib.Z_FINISH) | |
| 179 if tail: | |
| 180 yield tail | |
| 181 | |
| 182 | |
| 183 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK): | |
| 184 """Reads zipped data from |content_generator| and yields decompressed data. | |
| 185 | |
| 186 Decompresses data in small chunks (no larger than |chunk_size|) so that | |
| 187 zip bomb file doesn't cause zlib to preallocate huge amount of memory. | |
| 188 | |
| 189 Raises IOError if data is corrupted or incomplete. | |
| 190 """ | |
| 191 decompressor = zlib.decompressobj() | |
| 192 compressed_size = 0 | |
| 193 try: | |
| 194 for chunk in content_generator: | |
| 195 compressed_size += len(chunk) | |
| 196 data = decompressor.decompress(chunk, chunk_size) | |
| 197 if data: | |
| 198 yield data | |
| 199 while decompressor.unconsumed_tail: | |
| 200 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size) | |
| 201 if data: | |
| 202 yield data | |
| 203 tail = decompressor.flush() | |
| 204 if tail: | |
| 205 yield tail | |
| 206 except zlib.error as e: | |
| 207 raise IOError( | |
| 208 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e)) | |
| 209 # Ensure all data was read and decompressed. | |
| 210 if decompressor.unused_data or decompressor.unconsumed_tail: | |
| 211 raise IOError('Not all data was decompressed') | |
| 212 | |
| 213 | |
| 214 def get_zip_compression_level(filename): | |
| 215 """Given a filename calculates the ideal zip compression level to use.""" | |
| 216 file_ext = os.path.splitext(filename)[1].lower() | |
| 217 # TODO(csharp): Profile to find what compression level works best. | |
| 218 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 | |
| 219 | |
| 220 | |
| 221 def create_directories(base_directory, files): | |
| 222 """Creates the directory structure needed by the given list of files.""" | |
| 223 logging.debug('create_directories(%s, %d)', base_directory, len(files)) | |
| 224 # Creates the tree of directories to create. | |
| 225 directories = set(os.path.dirname(f) for f in files) | |
| 226 for item in list(directories): | |
| 227 while item: | |
| 228 directories.add(item) | |
| 229 item = os.path.dirname(item) | |
| 230 for d in sorted(directories): | |
| 231 if d: | |
| 232 os.mkdir(os.path.join(base_directory, d)) | |
| 233 | |
| 234 | |
| 235 def create_links(base_directory, files): | |
| 236 """Creates any links needed by the given set of files.""" | |
| 237 for filepath, properties in files: | |
| 238 if 'l' not in properties: | |
| 239 continue | |
| 240 if sys.platform == 'win32': | |
| 241 # TODO(maruel): Create junctions or empty text files similar to what | |
| 242 # cygwin do? | |
| 243 logging.warning('Ignoring symlink %s', filepath) | |
| 244 continue | |
| 245 outfile = os.path.join(base_directory, filepath) | |
| 246 # symlink doesn't exist on Windows. So the 'link' property should | |
| 247 # never be specified for windows .isolated file. | |
| 248 os.symlink(properties['l'], outfile) # pylint: disable=E1101 | |
| 249 if 'm' in properties: | |
| 250 lchmod = getattr(os, 'lchmod', None) | |
| 251 if lchmod: | |
| 252 lchmod(outfile, properties['m']) | |
| 253 | |
| 254 | |
| 255 def is_valid_file(filepath, size): | |
| 256 """Determines if the given files appears valid. | |
| 257 | |
| 258 Currently it just checks the file's size. | |
| 259 """ | |
| 260 if size == UNKNOWN_FILE_SIZE: | |
| 261 return os.path.isfile(filepath) | |
| 262 actual_size = os.stat(filepath).st_size | |
| 263 if size != actual_size: | |
| 264 logging.warning( | |
| 265 'Found invalid item %s; %d != %d', | |
| 266 os.path.basename(filepath), actual_size, size) | |
| 267 return False | |
| 268 return True | |
| 269 | |
| 270 | |
| 271 class WorkerPool(threading_utils.AutoRetryThreadPool): | |
| 272 """Thread pool that automatically retries on IOError and runs a preconfigured | |
| 273 function. | |
| 274 """ | |
| 275 # Initial and maximum number of worker threads. | |
| 276 INITIAL_WORKERS = 2 | |
| 277 MAX_WORKERS = 16 | |
| 278 RETRIES = 5 | |
| 279 | |
| 280 def __init__(self): | |
| 281 super(WorkerPool, self).__init__( | |
| 282 [IOError], | |
| 283 self.RETRIES, | |
| 284 self.INITIAL_WORKERS, | |
| 285 self.MAX_WORKERS, | |
| 286 0, | |
| 287 'remote') | |
| 288 | |
| 289 | |
| 290 class Item(object): | |
| 291 """An item to push to Storage. | |
| 292 | |
| 293 It starts its life in a main thread, travels to 'contains' thread, then to | |
| 294 'push' thread and then finally back to the main thread. | |
| 295 | |
| 296 It is never used concurrently from multiple threads. | |
| 297 """ | |
| 298 | |
| 299 def __init__(self, digest, size, is_isolated=False): | |
| 300 self.digest = digest | |
| 301 self.size = size | |
| 302 self.is_isolated = is_isolated | |
| 303 self.compression_level = 6 | |
| 304 self.push_state = None | |
| 305 | |
| 306 def content(self, chunk_size): | |
| 307 """Iterable with content of this item in chunks of given size. | |
| 308 | |
| 309 Arguments: | |
| 310 chunk_size: preferred size of the chunk to produce, may be ignored. | |
| 311 """ | |
| 312 raise NotImplementedError() | |
| 313 | |
| 314 | |
| 315 class FileItem(Item): | |
| 316 """A file to push to Storage.""" | |
| 317 | |
| 318 def __init__(self, path, digest, size, is_isolated): | |
| 319 super(FileItem, self).__init__(digest, size, is_isolated) | |
| 320 self.path = path | |
| 321 self.compression_level = get_zip_compression_level(path) | |
| 322 | |
| 323 def content(self, chunk_size): | |
| 324 return file_read(self.path, chunk_size) | |
| 325 | |
| 326 | |
| 327 class BufferItem(Item): | |
| 328 """A byte buffer to push to Storage.""" | |
| 329 | |
| 330 def __init__(self, buf, algo, is_isolated=False): | |
| 331 super(BufferItem, self).__init__( | |
| 332 algo(buf).hexdigest(), len(buf), is_isolated) | |
| 333 self.buffer = buf | |
| 334 | |
| 335 def content(self, _chunk_size): | |
| 336 return [self.buffer] | |
| 337 | |
| 338 | |
| 339 class Storage(object): | |
| 340 """Efficiently downloads or uploads large set of files via StorageApi.""" | |
| 341 | |
| 342 def __init__(self, storage_api, use_zip): | |
| 343 self.use_zip = use_zip | |
| 344 self._storage_api = storage_api | |
| 345 self._cpu_thread_pool = None | |
| 346 self._net_thread_pool = None | |
| 347 | |
| 348 @property | |
| 349 def cpu_thread_pool(self): | |
| 350 """ThreadPool for CPU-bound tasks like zipping.""" | |
| 351 if self._cpu_thread_pool is None: | |
| 352 self._cpu_thread_pool = threading_utils.ThreadPool( | |
| 353 2, max(threading_utils.num_processors(), 2), 0, 'zip') | |
| 354 return self._cpu_thread_pool | |
| 355 | |
| 356 @property | |
| 357 def net_thread_pool(self): | |
| 358 """AutoRetryThreadPool for IO-bound tasks, retries IOError.""" | |
| 359 if self._net_thread_pool is None: | |
| 360 self._net_thread_pool = WorkerPool() | |
| 361 return self._net_thread_pool | |
| 362 | |
| 363 def close(self): | |
| 364 """Waits for all pending tasks to finish.""" | |
| 365 if self._cpu_thread_pool: | |
| 366 self._cpu_thread_pool.join() | |
| 367 self._cpu_thread_pool.close() | |
| 368 self._cpu_thread_pool = None | |
| 369 if self._net_thread_pool: | |
| 370 self._net_thread_pool.join() | |
| 371 self._net_thread_pool.close() | |
| 372 self._net_thread_pool = None | |
| 373 | |
| 374 def __enter__(self): | |
| 375 """Context manager interface.""" | |
| 376 return self | |
| 377 | |
| 378 def __exit__(self, _exc_type, _exc_value, _traceback): | |
| 379 """Context manager interface.""" | |
| 380 self.close() | |
| 381 return False | |
| 382 | |
| 383 def upload_tree(self, indir, infiles): | |
| 384 """Uploads the given tree to the isolate server. | |
| 385 | |
| 386 Arguments: | |
| 387 indir: root directory the infiles are based in. | |
| 388 infiles: dict of files to upload from |indir|. | |
| 389 | |
| 390 Returns: | |
| 391 List of items that were uploaded. All other items are already there. | |
| 392 """ | |
| 393 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles)) | |
| 394 | |
| 395 # Convert |indir| + |infiles| into a list of FileItem objects. | |
| 396 # Filter out symlinks, since they are not represented by items on isolate | |
| 397 # server side. | |
| 398 items = [ | |
| 399 FileItem( | |
| 400 path=os.path.join(indir, filepath), | |
| 401 digest=metadata['h'], | |
| 402 size=metadata['s'], | |
| 403 is_isolated=metadata.get('priority') == '0') | |
| 404 for filepath, metadata in infiles.iteritems() | |
| 405 if 'l' not in metadata | |
| 406 ] | |
| 407 | |
| 408 return self.upload_items(items) | |
| 409 | |
| 410 def upload_items(self, items): | |
| 411 """Uploads bunch of items to the isolate server. | |
| 412 | |
| 413 Will upload only items that are missing. | |
| 414 | |
| 415 Arguments: | |
| 416 items: list of Item instances that represents data to upload. | |
| 417 | |
| 418 Returns: | |
| 419 List of items that were uploaded. All other items are already there. | |
| 420 """ | |
| 421 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently | |
| 422 # used by swarming.py. There's no need to spawn multiple threads and try to | |
| 423 # do stuff in parallel: there's nothing to parallelize. 'contains' check and | |
| 424 # 'push' should be performed sequentially in the context of current thread. | |
| 425 | |
| 426 # For each digest keep only first Item that matches it. All other items | |
| 427 # are just indistinguishable copies from the point of view of isolate | |
| 428 # server (it doesn't care about paths at all, only content and digests). | |
| 429 seen = {} | |
| 430 duplicates = 0 | |
| 431 for item in items: | |
| 432 if seen.setdefault(item.digest, item) is not item: | |
| 433 duplicates += 1 | |
| 434 items = seen.values() | |
| 435 if duplicates: | |
| 436 logging.info('Skipped %d duplicated files', duplicates) | |
| 437 | |
| 438 # Enqueue all upload tasks. | |
| 439 missing = set() | |
| 440 channel = threading_utils.TaskChannel() | |
| 441 for missing_item in self.get_missing_items(items): | |
| 442 missing.add(missing_item) | |
| 443 self.async_push( | |
| 444 channel, | |
| 445 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED, | |
| 446 missing_item) | |
| 447 | |
| 448 uploaded = [] | |
| 449 # No need to spawn deadlock detector thread if there's nothing to upload. | |
| 450 if missing: | |
| 451 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: | |
| 452 # Wait for all started uploads to finish. | |
| 453 while len(uploaded) != len(missing): | |
| 454 detector.ping() | |
| 455 item = channel.pull() | |
| 456 uploaded.append(item) | |
| 457 logging.debug( | |
| 458 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest) | |
| 459 logging.info('All files are uploaded') | |
| 460 | |
| 461 # Print stats. | |
| 462 total = len(items) | |
| 463 total_size = sum(f.size for f in items) | |
| 464 logging.info( | |
| 465 'Total: %6d, %9.1fkb', | |
| 466 total, | |
| 467 total_size / 1024.) | |
| 468 cache_hit = set(items) - missing | |
| 469 cache_hit_size = sum(f.size for f in cache_hit) | |
| 470 logging.info( | |
| 471 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', | |
| 472 len(cache_hit), | |
| 473 cache_hit_size / 1024., | |
| 474 len(cache_hit) * 100. / total, | |
| 475 cache_hit_size * 100. / total_size if total_size else 0) | |
| 476 cache_miss = missing | |
| 477 cache_miss_size = sum(f.size for f in cache_miss) | |
| 478 logging.info( | |
| 479 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', | |
| 480 len(cache_miss), | |
| 481 cache_miss_size / 1024., | |
| 482 len(cache_miss) * 100. / total, | |
| 483 cache_miss_size * 100. / total_size if total_size else 0) | |
| 484 | |
| 485 return uploaded | |
| 486 | |
| 487 def get_fetch_url(self, digest): | |
| 488 """Returns an URL that can be used to fetch an item with given digest. | |
| 489 | |
| 490 Arguments: | |
| 491 digest: hex digest of item to fetch. | |
| 492 | |
| 493 Returns: | |
| 494 An URL or None if underlying protocol doesn't support this. | |
| 495 """ | |
| 496 return self._storage_api.get_fetch_url(digest) | |
| 497 | |
| 498 def async_push(self, channel, priority, item): | |
| 499 """Starts asynchronous push to the server in a parallel thread. | |
| 500 | |
| 501 Arguments: | |
| 502 channel: TaskChannel that receives back |item| when upload ends. | |
| 503 priority: thread pool task priority for the push. | |
| 504 item: item to upload as instance of Item class. | |
| 505 """ | |
| 506 def push(content): | |
| 507 """Pushes an item and returns its id, to pass as a result to |channel|.""" | |
| 508 self._storage_api.push(item, content) | |
| 509 return item | |
| 510 | |
| 511 # If zipping is not required, just start a push task. | |
| 512 if not self.use_zip: | |
| 513 self.net_thread_pool.add_task_with_channel(channel, priority, push, | |
| 514 item.content(DISK_FILE_CHUNK)) | |
| 515 return | |
| 516 | |
| 517 # If zipping is enabled, zip in a separate thread. | |
| 518 def zip_and_push(): | |
| 519 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble | |
| 520 # content right here. It will block until all file is zipped. | |
| 521 try: | |
| 522 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK), | |
| 523 item.compression_level) | |
| 524 data = ''.join(stream) | |
| 525 except Exception as exc: | |
| 526 logging.error('Failed to zip \'%s\': %s', item, exc) | |
| 527 channel.send_exception(exc) | |
| 528 return | |
| 529 self.net_thread_pool.add_task_with_channel( | |
| 530 channel, priority, push, [data]) | |
| 531 self.cpu_thread_pool.add_task(priority, zip_and_push) | |
| 532 | |
| 533 def async_fetch(self, channel, priority, digest, size, sink): | |
| 534 """Starts asynchronous fetch from the server in a parallel thread. | |
| 535 | |
| 536 Arguments: | |
| 537 channel: TaskChannel that receives back |digest| when download ends. | |
| 538 priority: thread pool task priority for the fetch. | |
| 539 digest: hex digest of an item to download. | |
| 540 size: expected size of the item (after decompression). | |
| 541 sink: function that will be called as sink(generator). | |
| 542 """ | |
| 543 def fetch(): | |
| 544 try: | |
| 545 # Prepare reading pipeline. | |
| 546 stream = self._storage_api.fetch(digest) | |
| 547 if self.use_zip: | |
| 548 stream = zip_decompress(stream, DISK_FILE_CHUNK) | |
| 549 # Run |stream| through verifier that will assert its size. | |
| 550 verifier = FetchStreamVerifier(stream, size) | |
| 551 # Verified stream goes to |sink|. | |
| 552 sink(verifier.run()) | |
| 553 except Exception as err: | |
| 554 logging.warning('Failed to fetch %s: %s', digest, err) | |
| 555 raise | |
| 556 return digest | |
| 557 | |
| 558 # Don't bother with zip_thread_pool for decompression. Decompression is | |
| 559 # really fast and most probably IO bound anyway. | |
| 560 self.net_thread_pool.add_task_with_channel(channel, priority, fetch) | |
| 561 | |
| 562 def get_missing_items(self, items): | |
| 563 """Yields items that are missing from the server. | |
| 564 | |
| 565 Issues multiple parallel queries via StorageApi's 'contains' method. | |
| 566 | |
| 567 Arguments: | |
| 568 items: a list of Item objects to check. | |
| 569 | |
| 570 Yields: | |
| 571 Item objects that are missing from the server. | |
| 572 """ | |
| 573 channel = threading_utils.TaskChannel() | |
| 574 pending = 0 | |
| 575 # Enqueue all requests. | |
| 576 for batch in self.batch_items_for_check(items): | |
| 577 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH, | |
| 578 self._storage_api.contains, batch) | |
| 579 pending += 1 | |
| 580 # Yield results as they come in. | |
| 581 for _ in xrange(pending): | |
| 582 for missing in channel.pull(): | |
| 583 yield missing | |
| 584 | |
| 585 @staticmethod | |
| 586 def batch_items_for_check(items): | |
| 587 """Splits list of items to check for existence on the server into batches. | |
| 588 | |
| 589 Each batch corresponds to a single 'exists?' query to the server via a call | |
| 590 to StorageApi's 'contains' method. | |
| 591 | |
| 592 Arguments: | |
| 593 items: a list of Item objects. | |
| 594 | |
| 595 Yields: | |
| 596 Batches of items to query for existence in a single operation, | |
| 597 each batch is a list of Item objects. | |
| 598 """ | |
| 599 batch_count = 0 | |
| 600 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0] | |
| 601 next_queries = [] | |
| 602 for item in sorted(items, key=lambda x: x.size, reverse=True): | |
| 603 next_queries.append(item) | |
| 604 if len(next_queries) == batch_size_limit: | |
| 605 yield next_queries | |
| 606 next_queries = [] | |
| 607 batch_count += 1 | |
| 608 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[ | |
| 609 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)] | |
| 610 if next_queries: | |
| 611 yield next_queries | |
| 612 | |
| 613 | |
| 614 class FetchQueue(object): | |
| 615 """Fetches items from Storage and places them into LocalCache. | |
| 616 | |
| 617 It manages multiple concurrent fetch operations. Acts as a bridge between | |
| 618 Storage and LocalCache so that Storage and LocalCache don't depend on each | |
| 619 other at all. | |
| 620 """ | |
| 621 | |
| 622 def __init__(self, storage, cache): | |
| 623 self.storage = storage | |
| 624 self.cache = cache | |
| 625 self._channel = threading_utils.TaskChannel() | |
| 626 self._pending = set() | |
| 627 self._accessed = set() | |
| 628 self._fetched = cache.cached_set() | |
| 629 | |
| 630 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE): | |
| 631 """Starts asynchronous fetch of item |digest|.""" | |
| 632 # Fetching it now? | |
| 633 if digest in self._pending: | |
| 634 return | |
| 635 | |
| 636 # Mark this file as in use, verify_all_cached will later ensure it is still | |
| 637 # in cache. | |
| 638 self._accessed.add(digest) | |
| 639 | |
| 640 # Already fetched? Notify cache to update item's LRU position. | |
| 641 if digest in self._fetched: | |
| 642 # 'touch' returns True if item is in cache and not corrupted. | |
| 643 if self.cache.touch(digest, size): | |
| 644 return | |
| 645 # Item is corrupted, remove it from cache and fetch it again. | |
| 646 self._fetched.remove(digest) | |
| 647 self.cache.evict(digest) | |
| 648 | |
| 649 # TODO(maruel): It should look at the free disk space, the current cache | |
| 650 # size and the size of the new item on every new item: | |
| 651 # - Trim the cache as more entries are listed when free disk space is low, | |
| 652 # otherwise if the amount of data downloaded during the run > free disk | |
| 653 # space, it'll crash. | |
| 654 # - Make sure there's enough free disk space to fit all dependencies of | |
| 655 # this run! If not, abort early. | |
| 656 | |
| 657 # Start fetching. | |
| 658 self._pending.add(digest) | |
| 659 self.storage.async_fetch( | |
| 660 self._channel, priority, digest, size, | |
| 661 functools.partial(self.cache.write, digest)) | |
| 662 | |
| 663 def wait(self, digests): | |
| 664 """Starts a loop that waits for at least one of |digests| to be retrieved. | |
| 665 | |
| 666 Returns the first digest retrieved. | |
| 667 """ | |
| 668 # Flush any already fetched items. | |
| 669 for digest in digests: | |
| 670 if digest in self._fetched: | |
| 671 return digest | |
| 672 | |
| 673 # Ensure all requested items are being fetched now. | |
| 674 assert all(digest in self._pending for digest in digests), ( | |
| 675 digests, self._pending) | |
| 676 | |
| 677 # Wait for some requested item to finish fetching. | |
| 678 while self._pending: | |
| 679 digest = self._channel.pull() | |
| 680 self._pending.remove(digest) | |
| 681 self._fetched.add(digest) | |
| 682 if digest in digests: | |
| 683 return digest | |
| 684 | |
| 685 # Should never reach this point due to assert above. | |
| 686 raise RuntimeError('Impossible state') | |
| 687 | |
| 688 def inject_local_file(self, path, algo): | |
| 689 """Adds local file to the cache as if it was fetched from storage.""" | |
| 690 with open(path, 'rb') as f: | |
| 691 data = f.read() | |
| 692 digest = algo(data).hexdigest() | |
| 693 self.cache.write(digest, [data]) | |
| 694 self._fetched.add(digest) | |
| 695 return digest | |
| 696 | |
| 697 @property | |
| 698 def pending_count(self): | |
| 699 """Returns number of items to be fetched.""" | |
| 700 return len(self._pending) | |
| 701 | |
| 702 def verify_all_cached(self): | |
| 703 """True if all accessed items are in cache.""" | |
| 704 return self._accessed.issubset(self.cache.cached_set()) | |
| 705 | |
| 706 | |
| 707 class FetchStreamVerifier(object): | |
| 708 """Verifies that fetched file is valid before passing it to the LocalCache.""" | |
| 709 | |
| 710 def __init__(self, stream, expected_size): | |
| 711 self.stream = stream | |
| 712 self.expected_size = expected_size | |
| 713 self.current_size = 0 | |
| 714 | |
| 715 def run(self): | |
| 716 """Generator that yields same items as |stream|. | |
| 717 | |
| 718 Verifies |stream| is complete before yielding a last chunk to consumer. | |
| 719 | |
| 720 Also wraps IOError produced by consumer into MappingError exceptions since | |
| 721 otherwise Storage will retry fetch on unrelated local cache errors. | |
| 722 """ | |
| 723 # Read one chunk ahead, keep it in |stored|. | |
| 724 # That way a complete stream can be verified before pushing last chunk | |
| 725 # to consumer. | |
| 726 stored = None | |
| 727 for chunk in self.stream: | |
| 728 assert chunk is not None | |
| 729 if stored is not None: | |
| 730 self._inspect_chunk(stored, is_last=False) | |
| 731 try: | |
| 732 yield stored | |
| 733 except IOError as exc: | |
| 734 raise MappingError('Failed to store an item in cache: %s' % exc) | |
| 735 stored = chunk | |
| 736 if stored is not None: | |
| 737 self._inspect_chunk(stored, is_last=True) | |
| 738 try: | |
| 739 yield stored | |
| 740 except IOError as exc: | |
| 741 raise MappingError('Failed to store an item in cache: %s' % exc) | |
| 742 | |
| 743 def _inspect_chunk(self, chunk, is_last): | |
| 744 """Called for each fetched chunk before passing it to consumer.""" | |
| 745 self.current_size += len(chunk) | |
| 746 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and | |
| 747 (self.expected_size != self.current_size)): | |
| 748 raise IOError('Incorrect file size: expected %d, got %d' % ( | |
| 749 self.expected_size, self.current_size)) | |
| 750 | |
| 751 | |
| 752 class StorageApi(object): | |
| 753 """Interface for classes that implement low-level storage operations.""" | |
| 754 | |
| 755 def get_fetch_url(self, digest): | |
| 756 """Returns an URL that can be used to fetch an item with given digest. | |
| 757 | |
| 758 Arguments: | |
| 759 digest: hex digest of item to fetch. | |
| 760 | |
| 761 Returns: | |
| 762 An URL or None if the protocol doesn't support this. | |
| 763 """ | |
| 764 raise NotImplementedError() | |
| 765 | |
| 766 def fetch(self, digest): | |
| 767 """Fetches an object and yields its content. | |
| 768 | |
| 769 Arguments: | |
| 770 digest: hash digest of item to download. | |
| 771 | |
| 772 Yields: | |
| 773 Chunks of downloaded item (as str objects). | |
| 774 """ | |
| 775 raise NotImplementedError() | |
| 776 | |
| 777 def push(self, item, content): | |
| 778 """Uploads an |item| with content generated by |content| generator. | |
| 779 | |
| 780 Arguments: | |
| 781 item: Item object that holds information about an item being pushed. | |
| 782 content: a generator that yields chunks to push. | |
| 783 | |
| 784 Returns: | |
| 785 None. | |
| 786 """ | |
| 787 raise NotImplementedError() | |
| 788 | |
| 789 def contains(self, items): | |
| 790 """Checks for existence of given |items| on the server. | |
| 791 | |
| 792 Mutates |items| by assigning opaque implement specific object to Item's | |
| 793 push_state attribute on missing entries in the datastore. | |
| 794 | |
| 795 Arguments: | |
| 796 items: list of Item objects. | |
| 797 | |
| 798 Returns: | |
| 799 A list of items missing on server as a list of Item objects. | |
| 800 """ | |
| 801 raise NotImplementedError() | |
| 802 | |
| 803 | |
| 804 class IsolateServer(StorageApi): | |
| 805 """StorageApi implementation that downloads and uploads to Isolate Server. | |
| 806 | |
| 807 It uploads and downloads directly from Google Storage whenever appropriate. | |
| 808 """ | |
| 809 | |
| 810 class _PushState(object): | |
| 811 """State needed to call .push(), to be stored in Item.push_state.""" | |
| 812 def __init__(self, upload_url, finalize_url): | |
| 813 self.upload_url = upload_url | |
| 814 self.finalize_url = finalize_url | |
| 815 self.uploaded = False | |
| 816 self.finalized = False | |
| 817 | |
| 818 def __init__(self, base_url, namespace): | |
| 819 super(IsolateServer, self).__init__() | |
| 820 assert base_url.startswith('http'), base_url | |
| 821 self.base_url = base_url.rstrip('/') | |
| 822 self.namespace = namespace | |
| 823 self._lock = threading.Lock() | |
| 824 self._server_caps = None | |
| 825 | |
| 826 @staticmethod | |
| 827 def _generate_handshake_request(): | |
| 828 """Returns a dict to be sent as handshake request body.""" | |
| 829 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage. | |
| 830 return { | |
| 831 'client_app_version': __version__, | |
| 832 'fetcher': True, | |
| 833 'protocol_version': ISOLATE_PROTOCOL_VERSION, | |
| 834 'pusher': True, | |
| 835 } | |
| 836 | |
| 837 @staticmethod | |
| 838 def _validate_handshake_response(caps): | |
| 839 """Validates and normalizes handshake response.""" | |
| 840 logging.info('Protocol version: %s', caps['protocol_version']) | |
| 841 logging.info('Server version: %s', caps['server_app_version']) | |
| 842 if caps.get('error'): | |
| 843 raise MappingError(caps['error']) | |
| 844 if not caps['access_token']: | |
| 845 raise ValueError('access_token is missing') | |
| 846 return caps | |
| 847 | |
| 848 @property | |
| 849 def _server_capabilities(self): | |
| 850 """Performs handshake with the server if not yet done. | |
| 851 | |
| 852 Returns: | |
| 853 Server capabilities dictionary as returned by /handshake endpoint. | |
| 854 | |
| 855 Raises: | |
| 856 MappingError if server rejects the handshake. | |
| 857 """ | |
| 858 # TODO(maruel): Make this request much earlier asynchronously while the | |
| 859 # files are being enumerated. | |
| 860 with self._lock: | |
| 861 if self._server_caps is None: | |
| 862 request_body = json.dumps( | |
| 863 self._generate_handshake_request(), separators=(',', ':')) | |
| 864 response = net.url_read( | |
| 865 url=self.base_url + '/content-gs/handshake', | |
| 866 data=request_body, | |
| 867 content_type='application/json', | |
| 868 method='POST') | |
| 869 if response is None: | |
| 870 raise MappingError('Failed to perform handshake.') | |
| 871 try: | |
| 872 caps = json.loads(response) | |
| 873 if not isinstance(caps, dict): | |
| 874 raise ValueError('Expecting JSON dict') | |
| 875 self._server_caps = self._validate_handshake_response(caps) | |
| 876 except (ValueError, KeyError, TypeError) as exc: | |
| 877 # KeyError exception has very confusing str conversion: it's just a | |
| 878 # missing key value and nothing else. So print exception class name | |
| 879 # as well. | |
| 880 raise MappingError('Invalid handshake response (%s): %s' % ( | |
| 881 exc.__class__.__name__, exc)) | |
| 882 return self._server_caps | |
| 883 | |
| 884 def get_fetch_url(self, digest): | |
| 885 assert isinstance(digest, basestring) | |
| 886 return '%s/content-gs/retrieve/%s/%s' % ( | |
| 887 self.base_url, self.namespace, digest) | |
| 888 | |
| 889 def fetch(self, digest): | |
| 890 source_url = self.get_fetch_url(digest) | |
| 891 logging.debug('download_file(%s)', source_url) | |
| 892 | |
| 893 # Because the app engine DB is only eventually consistent, retry 404 errors | |
| 894 # because the file might just not be visible yet (even though it has been | |
| 895 # uploaded). | |
| 896 connection = net.url_open( | |
| 897 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) | |
| 898 if not connection: | |
| 899 raise IOError('Unable to open connection to %s' % source_url) | |
| 900 return stream_read(connection, NET_IO_FILE_CHUNK) | |
| 901 | |
| 902 def push(self, item, content): | |
| 903 assert isinstance(item, Item) | |
| 904 assert isinstance(item.push_state, IsolateServer._PushState) | |
| 905 assert not item.push_state.finalized | |
| 906 | |
| 907 # TODO(vadimsh): Do not read from |content| generator when retrying push. | |
| 908 # If |content| is indeed a generator, it can not be re-winded back | |
| 909 # to the beginning of the stream. A retry will find it exhausted. A possible | |
| 910 # solution is to wrap |content| generator with some sort of caching | |
| 911 # restartable generator. It should be done alongside streaming support | |
| 912 # implementation. | |
| 913 | |
| 914 # This push operation may be a retry after failed finalization call below, | |
| 915 # no need to reupload contents in that case. | |
| 916 if not item.push_state.uploaded: | |
| 917 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming | |
| 918 # upload support is implemented. | |
| 919 if isinstance(content, list) and len(content) == 1: | |
| 920 content = content[0] | |
| 921 else: | |
| 922 content = ''.join(content) | |
| 923 # PUT file to |upload_url|. | |
| 924 response = net.url_read( | |
| 925 url=item.push_state.upload_url, | |
| 926 data=content, | |
| 927 content_type='application/octet-stream', | |
| 928 method='PUT') | |
| 929 if response is None: | |
| 930 raise IOError('Failed to upload a file %s to %s' % ( | |
| 931 item.digest, item.push_state.upload_url)) | |
| 932 item.push_state.uploaded = True | |
| 933 else: | |
| 934 logging.info( | |
| 935 'A file %s already uploaded, retrying finalization only', item.digest) | |
| 936 | |
| 937 # Optionally notify the server that it's done. | |
| 938 if item.push_state.finalize_url: | |
| 939 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and | |
| 940 # send it to isolated server. That way isolate server can verify that | |
| 941 # the data safely reached Google Storage (GS provides MD5 and CRC32C of | |
| 942 # stored files). | |
| 943 response = net.url_read( | |
| 944 url=item.push_state.finalize_url, | |
| 945 data='', | |
| 946 content_type='application/json', | |
| 947 method='POST') | |
| 948 if response is None: | |
| 949 raise IOError('Failed to finalize an upload of %s' % item.digest) | |
| 950 item.push_state.finalized = True | |
| 951 | |
| 952 def contains(self, items): | |
| 953 logging.info('Checking existence of %d files...', len(items)) | |
| 954 | |
| 955 # Request body is a json encoded list of dicts. | |
| 956 body = [ | |
| 957 { | |
| 958 'h': item.digest, | |
| 959 's': item.size, | |
| 960 'i': int(item.is_isolated), | |
| 961 } for item in items | |
| 962 ] | |
| 963 | |
| 964 query_url = '%s/content-gs/pre-upload/%s?token=%s' % ( | |
| 965 self.base_url, | |
| 966 self.namespace, | |
| 967 urllib.quote(self._server_capabilities['access_token'])) | |
| 968 response_body = net.url_read( | |
| 969 url=query_url, | |
| 970 data=json.dumps(body, separators=(',', ':')), | |
| 971 content_type='application/json', | |
| 972 method='POST') | |
| 973 if response_body is None: | |
| 974 raise MappingError('Failed to execute /pre-upload query') | |
| 975 | |
| 976 # Response body is a list of push_urls (or null if file is already present). | |
| 977 try: | |
| 978 response = json.loads(response_body) | |
| 979 if not isinstance(response, list): | |
| 980 raise ValueError('Expecting response with json-encoded list') | |
| 981 if len(response) != len(items): | |
| 982 raise ValueError( | |
| 983 'Incorrect number of items in the list, expected %d, ' | |
| 984 'but got %d' % (len(items), len(response))) | |
| 985 except ValueError as err: | |
| 986 raise MappingError( | |
| 987 'Invalid response from server: %s, body is %s' % (err, response_body)) | |
| 988 | |
| 989 # Pick Items that are missing, attach _PushState to them. | |
| 990 missing_items = [] | |
| 991 for i, push_urls in enumerate(response): | |
| 992 if push_urls: | |
| 993 assert len(push_urls) == 2, str(push_urls) | |
| 994 item = items[i] | |
| 995 assert item.push_state is None | |
| 996 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1]) | |
| 997 missing_items.append(item) | |
| 998 logging.info('Queried %d files, %d cache hit', | |
| 999 len(items), len(items) - len(missing_items)) | |
| 1000 return missing_items | |
| 1001 | |
| 1002 | |
| 1003 class FileSystem(StorageApi): | |
| 1004 """StorageApi implementation that fetches data from the file system. | |
| 1005 | |
| 1006 The common use case is a NFS/CIFS file server that is mounted locally that is | |
| 1007 used to fetch the file on a local partition. | |
| 1008 """ | |
| 1009 | |
| 1010 def __init__(self, base_path): | |
| 1011 super(FileSystem, self).__init__() | |
| 1012 self.base_path = base_path | |
| 1013 | |
| 1014 def get_fetch_url(self, digest): | |
| 1015 return None | |
| 1016 | |
| 1017 def fetch(self, digest): | |
| 1018 assert isinstance(digest, basestring) | |
| 1019 return file_read(os.path.join(self.base_path, digest)) | |
| 1020 | |
| 1021 def push(self, item, content): | |
| 1022 assert isinstance(item, Item) | |
| 1023 file_write(os.path.join(self.base_path, item.digest), content) | |
| 1024 | |
| 1025 def contains(self, items): | |
| 1026 return [ | |
| 1027 item for item in items | |
| 1028 if not os.path.exists(os.path.join(self.base_path, item.digest)) | |
| 1029 ] | |
| 1030 | |
| 1031 | |
| 1032 class LocalCache(object): | |
| 1033 """Local cache that stores objects fetched via Storage. | |
| 1034 | |
| 1035 It can be accessed concurrently from multiple threads, so it should protect | |
| 1036 its internal state with some lock. | |
| 1037 """ | |
| 1038 | |
| 1039 def __enter__(self): | |
| 1040 """Context manager interface.""" | |
| 1041 return self | |
| 1042 | |
| 1043 def __exit__(self, _exc_type, _exec_value, _traceback): | |
| 1044 """Context manager interface.""" | |
| 1045 return False | |
| 1046 | |
| 1047 def cached_set(self): | |
| 1048 """Returns a set of all cached digests (always a new object).""" | |
| 1049 raise NotImplementedError() | |
| 1050 | |
| 1051 def touch(self, digest, size): | |
| 1052 """Ensures item is not corrupted and updates its LRU position. | |
| 1053 | |
| 1054 Arguments: | |
| 1055 digest: hash digest of item to check. | |
| 1056 size: expected size of this item. | |
| 1057 | |
| 1058 Returns: | |
| 1059 True if item is in cache and not corrupted. | |
| 1060 """ | |
| 1061 raise NotImplementedError() | |
| 1062 | |
| 1063 def evict(self, digest): | |
| 1064 """Removes item from cache if it's there.""" | |
| 1065 raise NotImplementedError() | |
| 1066 | |
| 1067 def read(self, digest): | |
| 1068 """Returns contents of the cached item as a single str.""" | |
| 1069 raise NotImplementedError() | |
| 1070 | |
| 1071 def write(self, digest, content): | |
| 1072 """Reads data from |content| generator and stores it in cache.""" | |
| 1073 raise NotImplementedError() | |
| 1074 | |
| 1075 def link(self, digest, dest, file_mode=None): | |
| 1076 """Ensures file at |dest| has same content as cached |digest|.""" | |
| 1077 raise NotImplementedError() | |
| 1078 | |
| 1079 | |
| 1080 class MemoryCache(LocalCache): | |
| 1081 """LocalCache implementation that stores everything in memory.""" | |
| 1082 | |
| 1083 def __init__(self): | |
| 1084 super(MemoryCache, self).__init__() | |
| 1085 # Let's not assume dict is thread safe. | |
| 1086 self._lock = threading.Lock() | |
| 1087 self._contents = {} | |
| 1088 | |
| 1089 def cached_set(self): | |
| 1090 with self._lock: | |
| 1091 return set(self._contents) | |
| 1092 | |
| 1093 def touch(self, digest, size): | |
| 1094 with self._lock: | |
| 1095 return digest in self._contents | |
| 1096 | |
| 1097 def evict(self, digest): | |
| 1098 with self._lock: | |
| 1099 self._contents.pop(digest, None) | |
| 1100 | |
| 1101 def read(self, digest): | |
| 1102 with self._lock: | |
| 1103 return self._contents[digest] | |
| 1104 | |
| 1105 def write(self, digest, content): | |
| 1106 # Assemble whole stream before taking the lock. | |
| 1107 data = ''.join(content) | |
| 1108 with self._lock: | |
| 1109 self._contents[digest] = data | |
| 1110 | |
| 1111 def link(self, digest, dest, file_mode=None): | |
| 1112 file_write(dest, [self.read(digest)]) | |
| 1113 if file_mode is not None: | |
| 1114 os.chmod(dest, file_mode) | |
| 1115 | |
| 1116 | |
| 1117 def get_hash_algo(_namespace): | |
| 1118 """Return hash algorithm class to use when uploading to given |namespace|.""" | |
| 1119 # TODO(vadimsh): Implement this at some point. | |
| 1120 return hashlib.sha1 | |
| 1121 | |
| 1122 | |
| 1123 def is_namespace_with_compression(namespace): | |
| 1124 """Returns True if given |namespace| stores compressed objects.""" | |
| 1125 return namespace.endswith(('-gzip', '-deflate')) | |
| 1126 | |
| 1127 | |
| 1128 def get_storage_api(file_or_url, namespace): | |
| 1129 """Returns an object that implements StorageApi interface.""" | |
| 1130 if re.match(r'^https?://.+$', file_or_url): | |
| 1131 return IsolateServer(file_or_url, namespace) | |
| 1132 else: | |
| 1133 return FileSystem(file_or_url) | |
| 1134 | |
| 1135 | |
| 1136 def get_storage(file_or_url, namespace): | |
| 1137 """Returns Storage class configured with appropriate StorageApi instance.""" | |
| 1138 return Storage( | |
| 1139 get_storage_api(file_or_url, namespace), | |
| 1140 is_namespace_with_compression(namespace)) | |
| 1141 | |
| 1142 | |
| 1143 def upload_tree(base_url, indir, infiles, namespace): | |
| 1144 """Uploads the given tree to the given url. | |
| 1145 | |
| 1146 Arguments: | |
| 1147 base_url: The base url, it is assume that |base_url|/has/ can be used to | |
| 1148 query if an element was already uploaded, and |base_url|/store/ | |
| 1149 can be used to upload a new element. | |
| 1150 indir: Root directory the infiles are based in. | |
| 1151 infiles: dict of files to upload from |indir| to |base_url|. | |
| 1152 namespace: The namespace to use on the server. | |
| 1153 """ | |
| 1154 with get_storage(base_url, namespace) as storage: | |
| 1155 storage.upload_tree(indir, infiles) | |
| 1156 return 0 | |
| 1157 | |
| 1158 | |
| 1159 def load_isolated(content, os_flavor, algo): | |
| 1160 """Verifies the .isolated file is valid and loads this object with the json | |
| 1161 data. | |
| 1162 | |
| 1163 Arguments: | |
| 1164 - content: raw serialized content to load. | |
| 1165 - os_flavor: OS to load this file on. Optional. | |
| 1166 - algo: hashlib algorithm class. Used to confirm the algorithm matches the | |
| 1167 algorithm used on the Isolate Server. | |
| 1168 """ | |
| 1169 try: | |
| 1170 data = json.loads(content) | |
| 1171 except ValueError: | |
| 1172 raise ConfigError('Failed to parse: %s...' % content[:100]) | |
| 1173 | |
| 1174 if not isinstance(data, dict): | |
| 1175 raise ConfigError('Expected dict, got %r' % data) | |
| 1176 | |
| 1177 # Check 'version' first, since it could modify the parsing after. | |
| 1178 value = data.get('version', '1.0') | |
| 1179 if not isinstance(value, basestring): | |
| 1180 raise ConfigError('Expected string, got %r' % value) | |
| 1181 if not re.match(r'^(\d+)\.(\d+)$', value): | |
| 1182 raise ConfigError('Expected a compatible version, got %r' % value) | |
| 1183 if value.split('.', 1)[0] != '1': | |
| 1184 raise ConfigError('Expected compatible \'1.x\' version, got %r' % value) | |
| 1185 | |
| 1186 if algo is None: | |
| 1187 # Default the algorithm used in the .isolated file itself, falls back to | |
| 1188 # 'sha-1' if unspecified. | |
| 1189 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')] | |
| 1190 | |
| 1191 for key, value in data.iteritems(): | |
| 1192 if key == 'algo': | |
| 1193 if not isinstance(value, basestring): | |
| 1194 raise ConfigError('Expected string, got %r' % value) | |
| 1195 if value not in SUPPORTED_ALGOS: | |
| 1196 raise ConfigError( | |
| 1197 'Expected one of \'%s\', got %r' % | |
| 1198 (', '.join(sorted(SUPPORTED_ALGOS)), value)) | |
| 1199 if value != SUPPORTED_ALGOS_REVERSE[algo]: | |
| 1200 raise ConfigError( | |
| 1201 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value)) | |
| 1202 | |
| 1203 elif key == 'command': | |
| 1204 if not isinstance(value, list): | |
| 1205 raise ConfigError('Expected list, got %r' % value) | |
| 1206 if not value: | |
| 1207 raise ConfigError('Expected non-empty command') | |
| 1208 for subvalue in value: | |
| 1209 if not isinstance(subvalue, basestring): | |
| 1210 raise ConfigError('Expected string, got %r' % subvalue) | |
| 1211 | |
| 1212 elif key == 'files': | |
| 1213 if not isinstance(value, dict): | |
| 1214 raise ConfigError('Expected dict, got %r' % value) | |
| 1215 for subkey, subvalue in value.iteritems(): | |
| 1216 if not isinstance(subkey, basestring): | |
| 1217 raise ConfigError('Expected string, got %r' % subkey) | |
| 1218 if not isinstance(subvalue, dict): | |
| 1219 raise ConfigError('Expected dict, got %r' % subvalue) | |
| 1220 for subsubkey, subsubvalue in subvalue.iteritems(): | |
| 1221 if subsubkey == 'l': | |
| 1222 if not isinstance(subsubvalue, basestring): | |
| 1223 raise ConfigError('Expected string, got %r' % subsubvalue) | |
| 1224 elif subsubkey == 'm': | |
| 1225 if not isinstance(subsubvalue, int): | |
| 1226 raise ConfigError('Expected int, got %r' % subsubvalue) | |
| 1227 elif subsubkey == 'h': | |
| 1228 if not is_valid_hash(subsubvalue, algo): | |
| 1229 raise ConfigError('Expected sha-1, got %r' % subsubvalue) | |
| 1230 elif subsubkey == 's': | |
| 1231 if not isinstance(subsubvalue, int): | |
| 1232 raise ConfigError('Expected int, got %r' % subsubvalue) | |
| 1233 else: | |
| 1234 raise ConfigError('Unknown subsubkey %s' % subsubkey) | |
| 1235 if bool('h' in subvalue) == bool('l' in subvalue): | |
| 1236 raise ConfigError( | |
| 1237 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' % | |
| 1238 subvalue) | |
| 1239 if bool('h' in subvalue) != bool('s' in subvalue): | |
| 1240 raise ConfigError( | |
| 1241 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' % | |
| 1242 subvalue) | |
| 1243 if bool('s' in subvalue) == bool('l' in subvalue): | |
| 1244 raise ConfigError( | |
| 1245 'Need only one of \'s\' (size) or \'l\' (link), got: %r' % | |
| 1246 subvalue) | |
| 1247 if bool('l' in subvalue) and bool('m' in subvalue): | |
| 1248 raise ConfigError( | |
| 1249 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' % | |
| 1250 subvalue) | |
| 1251 | |
| 1252 elif key == 'includes': | |
| 1253 if not isinstance(value, list): | |
| 1254 raise ConfigError('Expected list, got %r' % value) | |
| 1255 if not value: | |
| 1256 raise ConfigError('Expected non-empty includes list') | |
| 1257 for subvalue in value: | |
| 1258 if not is_valid_hash(subvalue, algo): | |
| 1259 raise ConfigError('Expected sha-1, got %r' % subvalue) | |
| 1260 | |
| 1261 elif key == 'read_only': | |
| 1262 if not isinstance(value, bool): | |
| 1263 raise ConfigError('Expected bool, got %r' % value) | |
| 1264 | |
| 1265 elif key == 'relative_cwd': | |
| 1266 if not isinstance(value, basestring): | |
| 1267 raise ConfigError('Expected string, got %r' % value) | |
| 1268 | |
| 1269 elif key == 'os': | |
| 1270 if os_flavor and value != os_flavor: | |
| 1271 raise ConfigError( | |
| 1272 'Expected \'os\' to be \'%s\' but got \'%s\'' % | |
| 1273 (os_flavor, value)) | |
| 1274 | |
| 1275 elif key == 'version': | |
| 1276 # Already checked above. | |
| 1277 pass | |
| 1278 | |
| 1279 else: | |
| 1280 raise ConfigError('Unknown key %r' % key) | |
| 1281 | |
| 1282 # Automatically fix os.path.sep if necessary. While .isolated files are always | |
| 1283 # in the the native path format, someone could want to download an .isolated | |
| 1284 # tree from another OS. | |
| 1285 wrong_path_sep = '/' if os.path.sep == '\\' else '\\' | |
| 1286 if 'files' in data: | |
| 1287 data['files'] = dict( | |
| 1288 (k.replace(wrong_path_sep, os.path.sep), v) | |
| 1289 for k, v in data['files'].iteritems()) | |
| 1290 for v in data['files'].itervalues(): | |
| 1291 if 'l' in v: | |
| 1292 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep) | |
| 1293 if 'relative_cwd' in data: | |
| 1294 data['relative_cwd'] = data['relative_cwd'].replace( | |
| 1295 wrong_path_sep, os.path.sep) | |
| 1296 return data | |
| 1297 | |
| 1298 | |
| 1299 class IsolatedFile(object): | |
| 1300 """Represents a single parsed .isolated file.""" | |
| 1301 def __init__(self, obj_hash, algo): | |
| 1302 """|obj_hash| is really the sha-1 of the file.""" | |
| 1303 logging.debug('IsolatedFile(%s)' % obj_hash) | |
| 1304 self.obj_hash = obj_hash | |
| 1305 self.algo = algo | |
| 1306 # Set once all the left-side of the tree is parsed. 'Tree' here means the | |
| 1307 # .isolate and all the .isolated files recursively included by it with | |
| 1308 # 'includes' key. The order of each sha-1 in 'includes', each representing a | |
| 1309 # .isolated file in the hash table, is important, as the later ones are not | |
| 1310 # processed until the firsts are retrieved and read. | |
| 1311 self.can_fetch = False | |
| 1312 | |
| 1313 # Raw data. | |
| 1314 self.data = {} | |
| 1315 # A IsolatedFile instance, one per object in self.includes. | |
| 1316 self.children = [] | |
| 1317 | |
| 1318 # Set once the .isolated file is loaded. | |
| 1319 self._is_parsed = False | |
| 1320 # Set once the files are fetched. | |
| 1321 self.files_fetched = False | |
| 1322 | |
| 1323 def load(self, os_flavor, content): | |
| 1324 """Verifies the .isolated file is valid and loads this object with the json | |
| 1325 data. | |
| 1326 """ | |
| 1327 logging.debug('IsolatedFile.load(%s)' % self.obj_hash) | |
| 1328 assert not self._is_parsed | |
| 1329 self.data = load_isolated(content, os_flavor, self.algo) | |
| 1330 self.children = [ | |
| 1331 IsolatedFile(i, self.algo) for i in self.data.get('includes', []) | |
| 1332 ] | |
| 1333 self._is_parsed = True | |
| 1334 | |
| 1335 def fetch_files(self, fetch_queue, files): | |
| 1336 """Adds files in this .isolated file not present in |files| dictionary. | |
| 1337 | |
| 1338 Preemptively request files. | |
| 1339 | |
| 1340 Note that |files| is modified by this function. | |
| 1341 """ | |
| 1342 assert self.can_fetch | |
| 1343 if not self._is_parsed or self.files_fetched: | |
| 1344 return | |
| 1345 logging.debug('fetch_files(%s)' % self.obj_hash) | |
| 1346 for filepath, properties in self.data.get('files', {}).iteritems(): | |
| 1347 # Root isolated has priority on the files being mapped. In particular, | |
| 1348 # overriden files must not be fetched. | |
| 1349 if filepath not in files: | |
| 1350 files[filepath] = properties | |
| 1351 if 'h' in properties: | |
| 1352 # Preemptively request files. | |
| 1353 logging.debug('fetching %s' % filepath) | |
| 1354 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s']) | |
| 1355 self.files_fetched = True | |
| 1356 | |
| 1357 | |
| 1358 class Settings(object): | |
| 1359 """Results of a completely parsed .isolated file.""" | |
| 1360 def __init__(self): | |
| 1361 self.command = [] | |
| 1362 self.files = {} | |
| 1363 self.read_only = None | |
| 1364 self.relative_cwd = None | |
| 1365 # The main .isolated file, a IsolatedFile instance. | |
| 1366 self.root = None | |
| 1367 | |
| 1368 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo): | |
| 1369 """Loads the .isolated and all the included .isolated asynchronously. | |
| 1370 | |
| 1371 It enables support for "included" .isolated files. They are processed in | |
| 1372 strict order but fetched asynchronously from the cache. This is important so | |
| 1373 that a file in an included .isolated file that is overridden by an embedding | |
| 1374 .isolated file is not fetched needlessly. The includes are fetched in one | |
| 1375 pass and the files are fetched as soon as all the ones on the left-side | |
| 1376 of the tree were fetched. | |
| 1377 | |
| 1378 The prioritization is very important here for nested .isolated files. | |
| 1379 'includes' have the highest priority and the algorithm is optimized for both | |
| 1380 deep and wide trees. A deep one is a long link of .isolated files referenced | |
| 1381 one at a time by one item in 'includes'. A wide one has a large number of | |
| 1382 'includes' in a single .isolated file. 'left' is defined as an included | |
| 1383 .isolated file earlier in the 'includes' list. So the order of the elements | |
| 1384 in 'includes' is important. | |
| 1385 """ | |
| 1386 self.root = IsolatedFile(root_isolated_hash, algo) | |
| 1387 | |
| 1388 # Isolated files being retrieved now: hash -> IsolatedFile instance. | |
| 1389 pending = {} | |
| 1390 # Set of hashes of already retrieved items to refuse recursive includes. | |
| 1391 seen = set() | |
| 1392 | |
| 1393 def retrieve(isolated_file): | |
| 1394 h = isolated_file.obj_hash | |
| 1395 if h in seen: | |
| 1396 raise ConfigError('IsolatedFile %s is retrieved recursively' % h) | |
| 1397 assert h not in pending | |
| 1398 seen.add(h) | |
| 1399 pending[h] = isolated_file | |
| 1400 fetch_queue.add(WorkerPool.HIGH, h) | |
| 1401 | |
| 1402 retrieve(self.root) | |
| 1403 | |
| 1404 while pending: | |
| 1405 item_hash = fetch_queue.wait(pending) | |
| 1406 item = pending.pop(item_hash) | |
| 1407 item.load(os_flavor, fetch_queue.cache.read(item_hash)) | |
| 1408 if item_hash == root_isolated_hash: | |
| 1409 # It's the root item. | |
| 1410 item.can_fetch = True | |
| 1411 | |
| 1412 for new_child in item.children: | |
| 1413 retrieve(new_child) | |
| 1414 | |
| 1415 # Traverse the whole tree to see if files can now be fetched. | |
| 1416 self._traverse_tree(fetch_queue, self.root) | |
| 1417 | |
| 1418 def check(n): | |
| 1419 return all(check(x) for x in n.children) and n.files_fetched | |
| 1420 assert check(self.root) | |
| 1421 | |
| 1422 self.relative_cwd = self.relative_cwd or '' | |
| 1423 self.read_only = self.read_only or False | |
| 1424 | |
| 1425 def _traverse_tree(self, fetch_queue, node): | |
| 1426 if node.can_fetch: | |
| 1427 if not node.files_fetched: | |
| 1428 self._update_self(fetch_queue, node) | |
| 1429 will_break = False | |
| 1430 for i in node.children: | |
| 1431 if not i.can_fetch: | |
| 1432 if will_break: | |
| 1433 break | |
| 1434 # Automatically mark the first one as fetcheable. | |
| 1435 i.can_fetch = True | |
| 1436 will_break = True | |
| 1437 self._traverse_tree(fetch_queue, i) | |
| 1438 | |
| 1439 def _update_self(self, fetch_queue, node): | |
| 1440 node.fetch_files(fetch_queue, self.files) | |
| 1441 # Grabs properties. | |
| 1442 if not self.command and node.data.get('command'): | |
| 1443 # Ensure paths are correctly separated on windows. | |
| 1444 self.command = node.data['command'] | |
| 1445 if self.command: | |
| 1446 self.command[0] = self.command[0].replace('/', os.path.sep) | |
| 1447 self.command = tools.fix_python_path(self.command) | |
| 1448 if self.read_only is None and node.data.get('read_only') is not None: | |
| 1449 self.read_only = node.data['read_only'] | |
| 1450 if (self.relative_cwd is None and | |
| 1451 node.data.get('relative_cwd') is not None): | |
| 1452 self.relative_cwd = node.data['relative_cwd'] | |
| 1453 | |
| 1454 | |
| 1455 def fetch_isolated( | |
| 1456 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command): | |
| 1457 """Aggressively downloads the .isolated file(s), then download all the files. | |
| 1458 | |
| 1459 Arguments: | |
| 1460 isolated_hash: hash of the root *.isolated file. | |
| 1461 storage: Storage class that communicates with isolate storage. | |
| 1462 cache: LocalCache class that knows how to store and map files locally. | |
| 1463 algo: hash algorithm to use. | |
| 1464 outdir: Output directory to map file tree to. | |
| 1465 os_flavor: OS flavor to choose when reading sections of *.isolated file. | |
| 1466 require_command: Ensure *.isolated specifies a command to run. | |
| 1467 | |
| 1468 Returns: | |
| 1469 Settings object that holds details about loaded *.isolated file. | |
| 1470 """ | |
| 1471 with cache: | |
| 1472 fetch_queue = FetchQueue(storage, cache) | |
| 1473 settings = Settings() | |
| 1474 | |
| 1475 with tools.Profiler('GetIsolateds'): | |
| 1476 # Optionally support local files by manually adding them to cache. | |
| 1477 if not is_valid_hash(isolated_hash, algo): | |
| 1478 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo) | |
| 1479 | |
| 1480 # Load all *.isolated and start loading rest of the files. | |
| 1481 settings.load(fetch_queue, isolated_hash, os_flavor, algo) | |
| 1482 if require_command and not settings.command: | |
| 1483 # TODO(vadimsh): All fetch operations are already enqueue and there's no | |
| 1484 # easy way to cancel them. | |
| 1485 raise ConfigError('No command to run') | |
| 1486 | |
| 1487 with tools.Profiler('GetRest'): | |
| 1488 # Create file system hierarchy. | |
| 1489 if not os.path.isdir(outdir): | |
| 1490 os.makedirs(outdir) | |
| 1491 create_directories(outdir, settings.files) | |
| 1492 create_links(outdir, settings.files.iteritems()) | |
| 1493 | |
| 1494 # Ensure working directory exists. | |
| 1495 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd)) | |
| 1496 if not os.path.isdir(cwd): | |
| 1497 os.makedirs(cwd) | |
| 1498 | |
| 1499 # Multimap: digest -> list of pairs (path, props). | |
| 1500 remaining = {} | |
| 1501 for filepath, props in settings.files.iteritems(): | |
| 1502 if 'h' in props: | |
| 1503 remaining.setdefault(props['h'], []).append((filepath, props)) | |
| 1504 | |
| 1505 # Now block on the remaining files to be downloaded and mapped. | |
| 1506 logging.info('Retrieving remaining files (%d of them)...', | |
| 1507 fetch_queue.pending_count) | |
| 1508 last_update = time.time() | |
| 1509 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: | |
| 1510 while remaining: | |
| 1511 detector.ping() | |
| 1512 | |
| 1513 # Wait for any item to finish fetching to cache. | |
| 1514 digest = fetch_queue.wait(remaining) | |
| 1515 | |
| 1516 # Link corresponding files to a fetched item in cache. | |
| 1517 for filepath, props in remaining.pop(digest): | |
| 1518 cache.link(digest, os.path.join(outdir, filepath), props.get('m')) | |
| 1519 | |
| 1520 # Report progress. | |
| 1521 duration = time.time() - last_update | |
| 1522 if duration > DELAY_BETWEEN_UPDATES_IN_SECS: | |
| 1523 msg = '%d files remaining...' % len(remaining) | |
| 1524 print msg | |
| 1525 logging.info(msg) | |
| 1526 last_update = time.time() | |
| 1527 | |
| 1528 # Cache could evict some items we just tried to fetch, it's a fatal error. | |
| 1529 if not fetch_queue.verify_all_cached(): | |
| 1530 raise MappingError('Cache is too small to hold all requested files') | |
| 1531 return settings | |
| 1532 | |
| 1533 | |
| 1534 @subcommand.usage('<file1..fileN> or - to read from stdin') | |
| 1535 def CMDarchive(parser, args): | |
| 1536 """Archives data to the server.""" | |
| 1537 options, files = parser.parse_args(args) | |
| 1538 | |
| 1539 if files == ['-']: | |
| 1540 files = sys.stdin.readlines() | |
| 1541 | |
| 1542 if not files: | |
| 1543 parser.error('Nothing to upload') | |
| 1544 | |
| 1545 # Load the necessary metadata. | |
| 1546 # TODO(maruel): Use a worker pool to upload as the hashing is being done. | |
| 1547 infiles = dict( | |
| 1548 ( | |
| 1549 f, | |
| 1550 { | |
| 1551 's': os.stat(f).st_size, | |
| 1552 'h': hash_file(f, get_hash_algo(options.namespace)), | |
| 1553 } | |
| 1554 ) | |
| 1555 for f in files) | |
| 1556 | |
| 1557 with tools.Profiler('Archive'): | |
| 1558 ret = upload_tree( | |
| 1559 base_url=options.isolate_server, | |
| 1560 indir=os.getcwd(), | |
| 1561 infiles=infiles, | |
| 1562 namespace=options.namespace) | |
| 1563 if not ret: | |
| 1564 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles)) | |
| 1565 return ret | |
| 1566 | |
| 1567 | |
| 1568 def CMDdownload(parser, args): | |
| 1569 """Download data from the server. | |
| 1570 | |
| 1571 It can either download individual files or a complete tree from a .isolated | |
| 1572 file. | |
| 1573 """ | |
| 1574 parser.add_option( | |
| 1575 '-i', '--isolated', metavar='HASH', | |
| 1576 help='hash of an isolated file, .isolated file content is discarded, use ' | |
| 1577 '--file if you need it') | |
| 1578 parser.add_option( | |
| 1579 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2, | |
| 1580 help='hash and destination of a file, can be used multiple times') | |
| 1581 parser.add_option( | |
| 1582 '-t', '--target', metavar='DIR', default=os.getcwd(), | |
| 1583 help='destination directory') | |
| 1584 options, args = parser.parse_args(args) | |
| 1585 if args: | |
| 1586 parser.error('Unsupported arguments: %s' % args) | |
| 1587 if bool(options.isolated) == bool(options.file): | |
| 1588 parser.error('Use one of --isolated or --file, and only one.') | |
| 1589 | |
| 1590 options.target = os.path.abspath(options.target) | |
| 1591 storage = get_storage(options.isolate_server, options.namespace) | |
| 1592 cache = MemoryCache() | |
| 1593 algo = get_hash_algo(options.namespace) | |
| 1594 | |
| 1595 # Fetching individual files. | |
| 1596 if options.file: | |
| 1597 channel = threading_utils.TaskChannel() | |
| 1598 pending = {} | |
| 1599 for digest, dest in options.file: | |
| 1600 pending[digest] = dest | |
| 1601 storage.async_fetch( | |
| 1602 channel, | |
| 1603 WorkerPool.MED, | |
| 1604 digest, | |
| 1605 UNKNOWN_FILE_SIZE, | |
| 1606 functools.partial(file_write, os.path.join(options.target, dest))) | |
| 1607 while pending: | |
| 1608 fetched = channel.pull() | |
| 1609 dest = pending.pop(fetched) | |
| 1610 logging.info('%s: %s', fetched, dest) | |
| 1611 | |
| 1612 # Fetching whole isolated tree. | |
| 1613 if options.isolated: | |
| 1614 settings = fetch_isolated( | |
| 1615 isolated_hash=options.isolated, | |
| 1616 storage=storage, | |
| 1617 cache=cache, | |
| 1618 algo=algo, | |
| 1619 outdir=options.target, | |
| 1620 os_flavor=None, | |
| 1621 require_command=False) | |
| 1622 rel = os.path.join(options.target, settings.relative_cwd) | |
| 1623 print('To run this test please run from the directory %s:' % | |
| 1624 os.path.join(options.target, rel)) | |
| 1625 print(' ' + ' '.join(settings.command)) | |
| 1626 | |
| 1627 return 0 | |
| 1628 | |
| 1629 | |
| 1630 class OptionParserIsolateServer(tools.OptionParserWithLogging): | |
| 1631 def __init__(self, **kwargs): | |
| 1632 tools.OptionParserWithLogging.__init__(self, **kwargs) | |
| 1633 self.add_option( | |
| 1634 '-I', '--isolate-server', | |
| 1635 metavar='URL', default='', | |
| 1636 help='Isolate server to use') | |
| 1637 self.add_option( | |
| 1638 '--namespace', default='default-gzip', | |
| 1639 help='The namespace to use on the server, default: %default') | |
| 1640 | |
| 1641 def parse_args(self, *args, **kwargs): | |
| 1642 options, args = tools.OptionParserWithLogging.parse_args( | |
| 1643 self, *args, **kwargs) | |
| 1644 options.isolate_server = options.isolate_server.rstrip('/') | |
| 1645 if not options.isolate_server: | |
| 1646 self.error('--isolate-server is required.') | |
| 1647 return options, args | |
| 1648 | |
| 1649 | |
| 1650 def main(args): | |
| 1651 dispatcher = subcommand.CommandDispatcher(__name__) | |
| 1652 try: | |
| 1653 return dispatcher.execute( | |
| 1654 OptionParserIsolateServer(version=__version__), args) | |
| 1655 except Exception as e: | |
| 1656 tools.report_error(e) | |
| 1657 return 1 | |
| 1658 | |
| 1659 | |
| 1660 if __name__ == '__main__': | |
| 1661 fix_encoding.fix_encoding() | |
| 1662 tools.disable_buffering() | |
| 1663 colorama.init() | |
| 1664 sys.exit(main(sys.argv[1:])) | |
| OLD | NEW |