OLD | NEW |
| (Empty) |
1 #!/usr/bin/env python | |
2 # Copyright 2013 The Chromium Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 """Archives a set of files to a server.""" | |
7 | |
8 __version__ = '0.2' | |
9 | |
10 import functools | |
11 import hashlib | |
12 import json | |
13 import logging | |
14 import os | |
15 import re | |
16 import sys | |
17 import threading | |
18 import time | |
19 import urllib | |
20 import zlib | |
21 | |
22 from third_party import colorama | |
23 from third_party.depot_tools import fix_encoding | |
24 from third_party.depot_tools import subcommand | |
25 | |
26 from utils import net | |
27 from utils import threading_utils | |
28 from utils import tools | |
29 | |
30 | |
31 # Version of isolate protocol passed to the server in /handshake request. | |
32 ISOLATE_PROTOCOL_VERSION = '1.0' | |
33 | |
34 | |
35 # The number of files to check the isolate server per /pre-upload query. | |
36 # All files are sorted by likelihood of a change in the file content | |
37 # (currently file size is used to estimate this: larger the file -> larger the | |
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files | |
39 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1], | |
40 # and so on. Numbers here is a trade-off; the more per request, the lower the | |
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, | |
42 # larger values cause longer lookups, increasing the initial latency to start | |
43 # uploading, which is especially an issue for large files. This value is | |
44 # optimized for the "few thousands files to look up with minimal number of large | |
45 # files missing" case. | |
46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] | |
47 | |
48 | |
49 # A list of already compressed extension types that should not receive any | |
50 # compression before being uploaded. | |
51 ALREADY_COMPRESSED_TYPES = [ | |
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', | |
53 'wav', 'zip' | |
54 ] | |
55 | |
56 | |
57 # The file size to be used when we don't know the correct file size, | |
58 # generally used for .isolated files. | |
59 UNKNOWN_FILE_SIZE = None | |
60 | |
61 | |
62 # The size of each chunk to read when downloading and unzipping files. | |
63 ZIPPED_FILE_CHUNK = 16 * 1024 | |
64 | |
65 # Chunk size to use when doing disk I/O. | |
66 DISK_FILE_CHUNK = 1024 * 1024 | |
67 | |
68 # Chunk size to use when reading from network stream. | |
69 NET_IO_FILE_CHUNK = 16 * 1024 | |
70 | |
71 | |
72 # Read timeout in seconds for downloads from isolate storage. If there's no | |
73 # response from the server within this timeout whole download will be aborted. | |
74 DOWNLOAD_READ_TIMEOUT = 60 | |
75 | |
76 # Maximum expected delay (in seconds) between successive file fetches | |
77 # in run_tha_test. If it takes longer than that, a deadlock might be happening | |
78 # and all stack frames for all threads are dumped to log. | |
79 DEADLOCK_TIMEOUT = 5 * 60 | |
80 | |
81 | |
82 # The delay (in seconds) to wait between logging statements when retrieving | |
83 # the required files. This is intended to let the user (or buildbot) know that | |
84 # the program is still running. | |
85 DELAY_BETWEEN_UPDATES_IN_SECS = 30 | |
86 | |
87 | |
88 # Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly | |
89 # specify the names here. | |
90 SUPPORTED_ALGOS = { | |
91 'md5': hashlib.md5, | |
92 'sha-1': hashlib.sha1, | |
93 'sha-512': hashlib.sha512, | |
94 } | |
95 | |
96 | |
97 # Used for serialization. | |
98 SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems()) | |
99 | |
100 | |
101 class ConfigError(ValueError): | |
102 """Generic failure to load a .isolated file.""" | |
103 pass | |
104 | |
105 | |
106 class MappingError(OSError): | |
107 """Failed to recreate the tree.""" | |
108 pass | |
109 | |
110 | |
111 def is_valid_hash(value, algo): | |
112 """Returns if the value is a valid hash for the corresponding algorithm.""" | |
113 size = 2 * algo().digest_size | |
114 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) | |
115 | |
116 | |
117 def hash_file(filepath, algo): | |
118 """Calculates the hash of a file without reading it all in memory at once. | |
119 | |
120 |algo| should be one of hashlib hashing algorithm. | |
121 """ | |
122 digest = algo() | |
123 with open(filepath, 'rb') as f: | |
124 while True: | |
125 chunk = f.read(DISK_FILE_CHUNK) | |
126 if not chunk: | |
127 break | |
128 digest.update(chunk) | |
129 return digest.hexdigest() | |
130 | |
131 | |
132 def stream_read(stream, chunk_size): | |
133 """Reads chunks from |stream| and yields them.""" | |
134 while True: | |
135 data = stream.read(chunk_size) | |
136 if not data: | |
137 break | |
138 yield data | |
139 | |
140 | |
141 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): | |
142 """Yields file content in chunks of given |chunk_size|.""" | |
143 with open(filepath, 'rb') as f: | |
144 while True: | |
145 data = f.read(chunk_size) | |
146 if not data: | |
147 break | |
148 yield data | |
149 | |
150 | |
151 def file_write(filepath, content_generator): | |
152 """Writes file content as generated by content_generator. | |
153 | |
154 Creates the intermediary directory as needed. | |
155 | |
156 Returns the number of bytes written. | |
157 | |
158 Meant to be mocked out in unit tests. | |
159 """ | |
160 filedir = os.path.dirname(filepath) | |
161 if not os.path.isdir(filedir): | |
162 os.makedirs(filedir) | |
163 total = 0 | |
164 with open(filepath, 'wb') as f: | |
165 for d in content_generator: | |
166 total += len(d) | |
167 f.write(d) | |
168 return total | |
169 | |
170 | |
171 def zip_compress(content_generator, level=7): | |
172 """Reads chunks from |content_generator| and yields zip compressed chunks.""" | |
173 compressor = zlib.compressobj(level) | |
174 for chunk in content_generator: | |
175 compressed = compressor.compress(chunk) | |
176 if compressed: | |
177 yield compressed | |
178 tail = compressor.flush(zlib.Z_FINISH) | |
179 if tail: | |
180 yield tail | |
181 | |
182 | |
183 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK): | |
184 """Reads zipped data from |content_generator| and yields decompressed data. | |
185 | |
186 Decompresses data in small chunks (no larger than |chunk_size|) so that | |
187 zip bomb file doesn't cause zlib to preallocate huge amount of memory. | |
188 | |
189 Raises IOError if data is corrupted or incomplete. | |
190 """ | |
191 decompressor = zlib.decompressobj() | |
192 compressed_size = 0 | |
193 try: | |
194 for chunk in content_generator: | |
195 compressed_size += len(chunk) | |
196 data = decompressor.decompress(chunk, chunk_size) | |
197 if data: | |
198 yield data | |
199 while decompressor.unconsumed_tail: | |
200 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size) | |
201 if data: | |
202 yield data | |
203 tail = decompressor.flush() | |
204 if tail: | |
205 yield tail | |
206 except zlib.error as e: | |
207 raise IOError( | |
208 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e)) | |
209 # Ensure all data was read and decompressed. | |
210 if decompressor.unused_data or decompressor.unconsumed_tail: | |
211 raise IOError('Not all data was decompressed') | |
212 | |
213 | |
214 def get_zip_compression_level(filename): | |
215 """Given a filename calculates the ideal zip compression level to use.""" | |
216 file_ext = os.path.splitext(filename)[1].lower() | |
217 # TODO(csharp): Profile to find what compression level works best. | |
218 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 | |
219 | |
220 | |
221 def create_directories(base_directory, files): | |
222 """Creates the directory structure needed by the given list of files.""" | |
223 logging.debug('create_directories(%s, %d)', base_directory, len(files)) | |
224 # Creates the tree of directories to create. | |
225 directories = set(os.path.dirname(f) for f in files) | |
226 for item in list(directories): | |
227 while item: | |
228 directories.add(item) | |
229 item = os.path.dirname(item) | |
230 for d in sorted(directories): | |
231 if d: | |
232 os.mkdir(os.path.join(base_directory, d)) | |
233 | |
234 | |
235 def create_links(base_directory, files): | |
236 """Creates any links needed by the given set of files.""" | |
237 for filepath, properties in files: | |
238 if 'l' not in properties: | |
239 continue | |
240 if sys.platform == 'win32': | |
241 # TODO(maruel): Create junctions or empty text files similar to what | |
242 # cygwin do? | |
243 logging.warning('Ignoring symlink %s', filepath) | |
244 continue | |
245 outfile = os.path.join(base_directory, filepath) | |
246 # symlink doesn't exist on Windows. So the 'link' property should | |
247 # never be specified for windows .isolated file. | |
248 os.symlink(properties['l'], outfile) # pylint: disable=E1101 | |
249 if 'm' in properties: | |
250 lchmod = getattr(os, 'lchmod', None) | |
251 if lchmod: | |
252 lchmod(outfile, properties['m']) | |
253 | |
254 | |
255 def is_valid_file(filepath, size): | |
256 """Determines if the given files appears valid. | |
257 | |
258 Currently it just checks the file's size. | |
259 """ | |
260 if size == UNKNOWN_FILE_SIZE: | |
261 return os.path.isfile(filepath) | |
262 actual_size = os.stat(filepath).st_size | |
263 if size != actual_size: | |
264 logging.warning( | |
265 'Found invalid item %s; %d != %d', | |
266 os.path.basename(filepath), actual_size, size) | |
267 return False | |
268 return True | |
269 | |
270 | |
271 class WorkerPool(threading_utils.AutoRetryThreadPool): | |
272 """Thread pool that automatically retries on IOError and runs a preconfigured | |
273 function. | |
274 """ | |
275 # Initial and maximum number of worker threads. | |
276 INITIAL_WORKERS = 2 | |
277 MAX_WORKERS = 16 | |
278 RETRIES = 5 | |
279 | |
280 def __init__(self): | |
281 super(WorkerPool, self).__init__( | |
282 [IOError], | |
283 self.RETRIES, | |
284 self.INITIAL_WORKERS, | |
285 self.MAX_WORKERS, | |
286 0, | |
287 'remote') | |
288 | |
289 | |
290 class Item(object): | |
291 """An item to push to Storage. | |
292 | |
293 It starts its life in a main thread, travels to 'contains' thread, then to | |
294 'push' thread and then finally back to the main thread. | |
295 | |
296 It is never used concurrently from multiple threads. | |
297 """ | |
298 | |
299 def __init__(self, digest, size, is_isolated=False): | |
300 self.digest = digest | |
301 self.size = size | |
302 self.is_isolated = is_isolated | |
303 self.compression_level = 6 | |
304 self.push_state = None | |
305 | |
306 def content(self, chunk_size): | |
307 """Iterable with content of this item in chunks of given size. | |
308 | |
309 Arguments: | |
310 chunk_size: preferred size of the chunk to produce, may be ignored. | |
311 """ | |
312 raise NotImplementedError() | |
313 | |
314 | |
315 class FileItem(Item): | |
316 """A file to push to Storage.""" | |
317 | |
318 def __init__(self, path, digest, size, is_isolated): | |
319 super(FileItem, self).__init__(digest, size, is_isolated) | |
320 self.path = path | |
321 self.compression_level = get_zip_compression_level(path) | |
322 | |
323 def content(self, chunk_size): | |
324 return file_read(self.path, chunk_size) | |
325 | |
326 | |
327 class BufferItem(Item): | |
328 """A byte buffer to push to Storage.""" | |
329 | |
330 def __init__(self, buf, algo, is_isolated=False): | |
331 super(BufferItem, self).__init__( | |
332 algo(buf).hexdigest(), len(buf), is_isolated) | |
333 self.buffer = buf | |
334 | |
335 def content(self, _chunk_size): | |
336 return [self.buffer] | |
337 | |
338 | |
339 class Storage(object): | |
340 """Efficiently downloads or uploads large set of files via StorageApi.""" | |
341 | |
342 def __init__(self, storage_api, use_zip): | |
343 self.use_zip = use_zip | |
344 self._storage_api = storage_api | |
345 self._cpu_thread_pool = None | |
346 self._net_thread_pool = None | |
347 | |
348 @property | |
349 def cpu_thread_pool(self): | |
350 """ThreadPool for CPU-bound tasks like zipping.""" | |
351 if self._cpu_thread_pool is None: | |
352 self._cpu_thread_pool = threading_utils.ThreadPool( | |
353 2, max(threading_utils.num_processors(), 2), 0, 'zip') | |
354 return self._cpu_thread_pool | |
355 | |
356 @property | |
357 def net_thread_pool(self): | |
358 """AutoRetryThreadPool for IO-bound tasks, retries IOError.""" | |
359 if self._net_thread_pool is None: | |
360 self._net_thread_pool = WorkerPool() | |
361 return self._net_thread_pool | |
362 | |
363 def close(self): | |
364 """Waits for all pending tasks to finish.""" | |
365 if self._cpu_thread_pool: | |
366 self._cpu_thread_pool.join() | |
367 self._cpu_thread_pool.close() | |
368 self._cpu_thread_pool = None | |
369 if self._net_thread_pool: | |
370 self._net_thread_pool.join() | |
371 self._net_thread_pool.close() | |
372 self._net_thread_pool = None | |
373 | |
374 def __enter__(self): | |
375 """Context manager interface.""" | |
376 return self | |
377 | |
378 def __exit__(self, _exc_type, _exc_value, _traceback): | |
379 """Context manager interface.""" | |
380 self.close() | |
381 return False | |
382 | |
383 def upload_tree(self, indir, infiles): | |
384 """Uploads the given tree to the isolate server. | |
385 | |
386 Arguments: | |
387 indir: root directory the infiles are based in. | |
388 infiles: dict of files to upload from |indir|. | |
389 | |
390 Returns: | |
391 List of items that were uploaded. All other items are already there. | |
392 """ | |
393 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles)) | |
394 | |
395 # Convert |indir| + |infiles| into a list of FileItem objects. | |
396 # Filter out symlinks, since they are not represented by items on isolate | |
397 # server side. | |
398 items = [ | |
399 FileItem( | |
400 path=os.path.join(indir, filepath), | |
401 digest=metadata['h'], | |
402 size=metadata['s'], | |
403 is_isolated=metadata.get('priority') == '0') | |
404 for filepath, metadata in infiles.iteritems() | |
405 if 'l' not in metadata | |
406 ] | |
407 | |
408 return self.upload_items(items) | |
409 | |
410 def upload_items(self, items): | |
411 """Uploads bunch of items to the isolate server. | |
412 | |
413 Will upload only items that are missing. | |
414 | |
415 Arguments: | |
416 items: list of Item instances that represents data to upload. | |
417 | |
418 Returns: | |
419 List of items that were uploaded. All other items are already there. | |
420 """ | |
421 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently | |
422 # used by swarming.py. There's no need to spawn multiple threads and try to | |
423 # do stuff in parallel: there's nothing to parallelize. 'contains' check and | |
424 # 'push' should be performed sequentially in the context of current thread. | |
425 | |
426 # For each digest keep only first Item that matches it. All other items | |
427 # are just indistinguishable copies from the point of view of isolate | |
428 # server (it doesn't care about paths at all, only content and digests). | |
429 seen = {} | |
430 duplicates = 0 | |
431 for item in items: | |
432 if seen.setdefault(item.digest, item) is not item: | |
433 duplicates += 1 | |
434 items = seen.values() | |
435 if duplicates: | |
436 logging.info('Skipped %d duplicated files', duplicates) | |
437 | |
438 # Enqueue all upload tasks. | |
439 missing = set() | |
440 channel = threading_utils.TaskChannel() | |
441 for missing_item in self.get_missing_items(items): | |
442 missing.add(missing_item) | |
443 self.async_push( | |
444 channel, | |
445 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED, | |
446 missing_item) | |
447 | |
448 uploaded = [] | |
449 # No need to spawn deadlock detector thread if there's nothing to upload. | |
450 if missing: | |
451 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: | |
452 # Wait for all started uploads to finish. | |
453 while len(uploaded) != len(missing): | |
454 detector.ping() | |
455 item = channel.pull() | |
456 uploaded.append(item) | |
457 logging.debug( | |
458 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest) | |
459 logging.info('All files are uploaded') | |
460 | |
461 # Print stats. | |
462 total = len(items) | |
463 total_size = sum(f.size for f in items) | |
464 logging.info( | |
465 'Total: %6d, %9.1fkb', | |
466 total, | |
467 total_size / 1024.) | |
468 cache_hit = set(items) - missing | |
469 cache_hit_size = sum(f.size for f in cache_hit) | |
470 logging.info( | |
471 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', | |
472 len(cache_hit), | |
473 cache_hit_size / 1024., | |
474 len(cache_hit) * 100. / total, | |
475 cache_hit_size * 100. / total_size if total_size else 0) | |
476 cache_miss = missing | |
477 cache_miss_size = sum(f.size for f in cache_miss) | |
478 logging.info( | |
479 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', | |
480 len(cache_miss), | |
481 cache_miss_size / 1024., | |
482 len(cache_miss) * 100. / total, | |
483 cache_miss_size * 100. / total_size if total_size else 0) | |
484 | |
485 return uploaded | |
486 | |
487 def get_fetch_url(self, digest): | |
488 """Returns an URL that can be used to fetch an item with given digest. | |
489 | |
490 Arguments: | |
491 digest: hex digest of item to fetch. | |
492 | |
493 Returns: | |
494 An URL or None if underlying protocol doesn't support this. | |
495 """ | |
496 return self._storage_api.get_fetch_url(digest) | |
497 | |
498 def async_push(self, channel, priority, item): | |
499 """Starts asynchronous push to the server in a parallel thread. | |
500 | |
501 Arguments: | |
502 channel: TaskChannel that receives back |item| when upload ends. | |
503 priority: thread pool task priority for the push. | |
504 item: item to upload as instance of Item class. | |
505 """ | |
506 def push(content): | |
507 """Pushes an item and returns its id, to pass as a result to |channel|.""" | |
508 self._storage_api.push(item, content) | |
509 return item | |
510 | |
511 # If zipping is not required, just start a push task. | |
512 if not self.use_zip: | |
513 self.net_thread_pool.add_task_with_channel(channel, priority, push, | |
514 item.content(DISK_FILE_CHUNK)) | |
515 return | |
516 | |
517 # If zipping is enabled, zip in a separate thread. | |
518 def zip_and_push(): | |
519 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble | |
520 # content right here. It will block until all file is zipped. | |
521 try: | |
522 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK), | |
523 item.compression_level) | |
524 data = ''.join(stream) | |
525 except Exception as exc: | |
526 logging.error('Failed to zip \'%s\': %s', item, exc) | |
527 channel.send_exception(exc) | |
528 return | |
529 self.net_thread_pool.add_task_with_channel( | |
530 channel, priority, push, [data]) | |
531 self.cpu_thread_pool.add_task(priority, zip_and_push) | |
532 | |
533 def async_fetch(self, channel, priority, digest, size, sink): | |
534 """Starts asynchronous fetch from the server in a parallel thread. | |
535 | |
536 Arguments: | |
537 channel: TaskChannel that receives back |digest| when download ends. | |
538 priority: thread pool task priority for the fetch. | |
539 digest: hex digest of an item to download. | |
540 size: expected size of the item (after decompression). | |
541 sink: function that will be called as sink(generator). | |
542 """ | |
543 def fetch(): | |
544 try: | |
545 # Prepare reading pipeline. | |
546 stream = self._storage_api.fetch(digest) | |
547 if self.use_zip: | |
548 stream = zip_decompress(stream, DISK_FILE_CHUNK) | |
549 # Run |stream| through verifier that will assert its size. | |
550 verifier = FetchStreamVerifier(stream, size) | |
551 # Verified stream goes to |sink|. | |
552 sink(verifier.run()) | |
553 except Exception as err: | |
554 logging.warning('Failed to fetch %s: %s', digest, err) | |
555 raise | |
556 return digest | |
557 | |
558 # Don't bother with zip_thread_pool for decompression. Decompression is | |
559 # really fast and most probably IO bound anyway. | |
560 self.net_thread_pool.add_task_with_channel(channel, priority, fetch) | |
561 | |
562 def get_missing_items(self, items): | |
563 """Yields items that are missing from the server. | |
564 | |
565 Issues multiple parallel queries via StorageApi's 'contains' method. | |
566 | |
567 Arguments: | |
568 items: a list of Item objects to check. | |
569 | |
570 Yields: | |
571 Item objects that are missing from the server. | |
572 """ | |
573 channel = threading_utils.TaskChannel() | |
574 pending = 0 | |
575 # Enqueue all requests. | |
576 for batch in self.batch_items_for_check(items): | |
577 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH, | |
578 self._storage_api.contains, batch) | |
579 pending += 1 | |
580 # Yield results as they come in. | |
581 for _ in xrange(pending): | |
582 for missing in channel.pull(): | |
583 yield missing | |
584 | |
585 @staticmethod | |
586 def batch_items_for_check(items): | |
587 """Splits list of items to check for existence on the server into batches. | |
588 | |
589 Each batch corresponds to a single 'exists?' query to the server via a call | |
590 to StorageApi's 'contains' method. | |
591 | |
592 Arguments: | |
593 items: a list of Item objects. | |
594 | |
595 Yields: | |
596 Batches of items to query for existence in a single operation, | |
597 each batch is a list of Item objects. | |
598 """ | |
599 batch_count = 0 | |
600 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0] | |
601 next_queries = [] | |
602 for item in sorted(items, key=lambda x: x.size, reverse=True): | |
603 next_queries.append(item) | |
604 if len(next_queries) == batch_size_limit: | |
605 yield next_queries | |
606 next_queries = [] | |
607 batch_count += 1 | |
608 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[ | |
609 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)] | |
610 if next_queries: | |
611 yield next_queries | |
612 | |
613 | |
614 class FetchQueue(object): | |
615 """Fetches items from Storage and places them into LocalCache. | |
616 | |
617 It manages multiple concurrent fetch operations. Acts as a bridge between | |
618 Storage and LocalCache so that Storage and LocalCache don't depend on each | |
619 other at all. | |
620 """ | |
621 | |
622 def __init__(self, storage, cache): | |
623 self.storage = storage | |
624 self.cache = cache | |
625 self._channel = threading_utils.TaskChannel() | |
626 self._pending = set() | |
627 self._accessed = set() | |
628 self._fetched = cache.cached_set() | |
629 | |
630 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE): | |
631 """Starts asynchronous fetch of item |digest|.""" | |
632 # Fetching it now? | |
633 if digest in self._pending: | |
634 return | |
635 | |
636 # Mark this file as in use, verify_all_cached will later ensure it is still | |
637 # in cache. | |
638 self._accessed.add(digest) | |
639 | |
640 # Already fetched? Notify cache to update item's LRU position. | |
641 if digest in self._fetched: | |
642 # 'touch' returns True if item is in cache and not corrupted. | |
643 if self.cache.touch(digest, size): | |
644 return | |
645 # Item is corrupted, remove it from cache and fetch it again. | |
646 self._fetched.remove(digest) | |
647 self.cache.evict(digest) | |
648 | |
649 # TODO(maruel): It should look at the free disk space, the current cache | |
650 # size and the size of the new item on every new item: | |
651 # - Trim the cache as more entries are listed when free disk space is low, | |
652 # otherwise if the amount of data downloaded during the run > free disk | |
653 # space, it'll crash. | |
654 # - Make sure there's enough free disk space to fit all dependencies of | |
655 # this run! If not, abort early. | |
656 | |
657 # Start fetching. | |
658 self._pending.add(digest) | |
659 self.storage.async_fetch( | |
660 self._channel, priority, digest, size, | |
661 functools.partial(self.cache.write, digest)) | |
662 | |
663 def wait(self, digests): | |
664 """Starts a loop that waits for at least one of |digests| to be retrieved. | |
665 | |
666 Returns the first digest retrieved. | |
667 """ | |
668 # Flush any already fetched items. | |
669 for digest in digests: | |
670 if digest in self._fetched: | |
671 return digest | |
672 | |
673 # Ensure all requested items are being fetched now. | |
674 assert all(digest in self._pending for digest in digests), ( | |
675 digests, self._pending) | |
676 | |
677 # Wait for some requested item to finish fetching. | |
678 while self._pending: | |
679 digest = self._channel.pull() | |
680 self._pending.remove(digest) | |
681 self._fetched.add(digest) | |
682 if digest in digests: | |
683 return digest | |
684 | |
685 # Should never reach this point due to assert above. | |
686 raise RuntimeError('Impossible state') | |
687 | |
688 def inject_local_file(self, path, algo): | |
689 """Adds local file to the cache as if it was fetched from storage.""" | |
690 with open(path, 'rb') as f: | |
691 data = f.read() | |
692 digest = algo(data).hexdigest() | |
693 self.cache.write(digest, [data]) | |
694 self._fetched.add(digest) | |
695 return digest | |
696 | |
697 @property | |
698 def pending_count(self): | |
699 """Returns number of items to be fetched.""" | |
700 return len(self._pending) | |
701 | |
702 def verify_all_cached(self): | |
703 """True if all accessed items are in cache.""" | |
704 return self._accessed.issubset(self.cache.cached_set()) | |
705 | |
706 | |
707 class FetchStreamVerifier(object): | |
708 """Verifies that fetched file is valid before passing it to the LocalCache.""" | |
709 | |
710 def __init__(self, stream, expected_size): | |
711 self.stream = stream | |
712 self.expected_size = expected_size | |
713 self.current_size = 0 | |
714 | |
715 def run(self): | |
716 """Generator that yields same items as |stream|. | |
717 | |
718 Verifies |stream| is complete before yielding a last chunk to consumer. | |
719 | |
720 Also wraps IOError produced by consumer into MappingError exceptions since | |
721 otherwise Storage will retry fetch on unrelated local cache errors. | |
722 """ | |
723 # Read one chunk ahead, keep it in |stored|. | |
724 # That way a complete stream can be verified before pushing last chunk | |
725 # to consumer. | |
726 stored = None | |
727 for chunk in self.stream: | |
728 assert chunk is not None | |
729 if stored is not None: | |
730 self._inspect_chunk(stored, is_last=False) | |
731 try: | |
732 yield stored | |
733 except IOError as exc: | |
734 raise MappingError('Failed to store an item in cache: %s' % exc) | |
735 stored = chunk | |
736 if stored is not None: | |
737 self._inspect_chunk(stored, is_last=True) | |
738 try: | |
739 yield stored | |
740 except IOError as exc: | |
741 raise MappingError('Failed to store an item in cache: %s' % exc) | |
742 | |
743 def _inspect_chunk(self, chunk, is_last): | |
744 """Called for each fetched chunk before passing it to consumer.""" | |
745 self.current_size += len(chunk) | |
746 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and | |
747 (self.expected_size != self.current_size)): | |
748 raise IOError('Incorrect file size: expected %d, got %d' % ( | |
749 self.expected_size, self.current_size)) | |
750 | |
751 | |
752 class StorageApi(object): | |
753 """Interface for classes that implement low-level storage operations.""" | |
754 | |
755 def get_fetch_url(self, digest): | |
756 """Returns an URL that can be used to fetch an item with given digest. | |
757 | |
758 Arguments: | |
759 digest: hex digest of item to fetch. | |
760 | |
761 Returns: | |
762 An URL or None if the protocol doesn't support this. | |
763 """ | |
764 raise NotImplementedError() | |
765 | |
766 def fetch(self, digest): | |
767 """Fetches an object and yields its content. | |
768 | |
769 Arguments: | |
770 digest: hash digest of item to download. | |
771 | |
772 Yields: | |
773 Chunks of downloaded item (as str objects). | |
774 """ | |
775 raise NotImplementedError() | |
776 | |
777 def push(self, item, content): | |
778 """Uploads an |item| with content generated by |content| generator. | |
779 | |
780 Arguments: | |
781 item: Item object that holds information about an item being pushed. | |
782 content: a generator that yields chunks to push. | |
783 | |
784 Returns: | |
785 None. | |
786 """ | |
787 raise NotImplementedError() | |
788 | |
789 def contains(self, items): | |
790 """Checks for existence of given |items| on the server. | |
791 | |
792 Mutates |items| by assigning opaque implement specific object to Item's | |
793 push_state attribute on missing entries in the datastore. | |
794 | |
795 Arguments: | |
796 items: list of Item objects. | |
797 | |
798 Returns: | |
799 A list of items missing on server as a list of Item objects. | |
800 """ | |
801 raise NotImplementedError() | |
802 | |
803 | |
804 class IsolateServer(StorageApi): | |
805 """StorageApi implementation that downloads and uploads to Isolate Server. | |
806 | |
807 It uploads and downloads directly from Google Storage whenever appropriate. | |
808 """ | |
809 | |
810 class _PushState(object): | |
811 """State needed to call .push(), to be stored in Item.push_state.""" | |
812 def __init__(self, upload_url, finalize_url): | |
813 self.upload_url = upload_url | |
814 self.finalize_url = finalize_url | |
815 self.uploaded = False | |
816 self.finalized = False | |
817 | |
818 def __init__(self, base_url, namespace): | |
819 super(IsolateServer, self).__init__() | |
820 assert base_url.startswith('http'), base_url | |
821 self.base_url = base_url.rstrip('/') | |
822 self.namespace = namespace | |
823 self._lock = threading.Lock() | |
824 self._server_caps = None | |
825 | |
826 @staticmethod | |
827 def _generate_handshake_request(): | |
828 """Returns a dict to be sent as handshake request body.""" | |
829 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage. | |
830 return { | |
831 'client_app_version': __version__, | |
832 'fetcher': True, | |
833 'protocol_version': ISOLATE_PROTOCOL_VERSION, | |
834 'pusher': True, | |
835 } | |
836 | |
837 @staticmethod | |
838 def _validate_handshake_response(caps): | |
839 """Validates and normalizes handshake response.""" | |
840 logging.info('Protocol version: %s', caps['protocol_version']) | |
841 logging.info('Server version: %s', caps['server_app_version']) | |
842 if caps.get('error'): | |
843 raise MappingError(caps['error']) | |
844 if not caps['access_token']: | |
845 raise ValueError('access_token is missing') | |
846 return caps | |
847 | |
848 @property | |
849 def _server_capabilities(self): | |
850 """Performs handshake with the server if not yet done. | |
851 | |
852 Returns: | |
853 Server capabilities dictionary as returned by /handshake endpoint. | |
854 | |
855 Raises: | |
856 MappingError if server rejects the handshake. | |
857 """ | |
858 # TODO(maruel): Make this request much earlier asynchronously while the | |
859 # files are being enumerated. | |
860 with self._lock: | |
861 if self._server_caps is None: | |
862 request_body = json.dumps( | |
863 self._generate_handshake_request(), separators=(',', ':')) | |
864 response = net.url_read( | |
865 url=self.base_url + '/content-gs/handshake', | |
866 data=request_body, | |
867 content_type='application/json', | |
868 method='POST') | |
869 if response is None: | |
870 raise MappingError('Failed to perform handshake.') | |
871 try: | |
872 caps = json.loads(response) | |
873 if not isinstance(caps, dict): | |
874 raise ValueError('Expecting JSON dict') | |
875 self._server_caps = self._validate_handshake_response(caps) | |
876 except (ValueError, KeyError, TypeError) as exc: | |
877 # KeyError exception has very confusing str conversion: it's just a | |
878 # missing key value and nothing else. So print exception class name | |
879 # as well. | |
880 raise MappingError('Invalid handshake response (%s): %s' % ( | |
881 exc.__class__.__name__, exc)) | |
882 return self._server_caps | |
883 | |
884 def get_fetch_url(self, digest): | |
885 assert isinstance(digest, basestring) | |
886 return '%s/content-gs/retrieve/%s/%s' % ( | |
887 self.base_url, self.namespace, digest) | |
888 | |
889 def fetch(self, digest): | |
890 source_url = self.get_fetch_url(digest) | |
891 logging.debug('download_file(%s)', source_url) | |
892 | |
893 # Because the app engine DB is only eventually consistent, retry 404 errors | |
894 # because the file might just not be visible yet (even though it has been | |
895 # uploaded). | |
896 connection = net.url_open( | |
897 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) | |
898 if not connection: | |
899 raise IOError('Unable to open connection to %s' % source_url) | |
900 return stream_read(connection, NET_IO_FILE_CHUNK) | |
901 | |
902 def push(self, item, content): | |
903 assert isinstance(item, Item) | |
904 assert isinstance(item.push_state, IsolateServer._PushState) | |
905 assert not item.push_state.finalized | |
906 | |
907 # TODO(vadimsh): Do not read from |content| generator when retrying push. | |
908 # If |content| is indeed a generator, it can not be re-winded back | |
909 # to the beginning of the stream. A retry will find it exhausted. A possible | |
910 # solution is to wrap |content| generator with some sort of caching | |
911 # restartable generator. It should be done alongside streaming support | |
912 # implementation. | |
913 | |
914 # This push operation may be a retry after failed finalization call below, | |
915 # no need to reupload contents in that case. | |
916 if not item.push_state.uploaded: | |
917 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming | |
918 # upload support is implemented. | |
919 if isinstance(content, list) and len(content) == 1: | |
920 content = content[0] | |
921 else: | |
922 content = ''.join(content) | |
923 # PUT file to |upload_url|. | |
924 response = net.url_read( | |
925 url=item.push_state.upload_url, | |
926 data=content, | |
927 content_type='application/octet-stream', | |
928 method='PUT') | |
929 if response is None: | |
930 raise IOError('Failed to upload a file %s to %s' % ( | |
931 item.digest, item.push_state.upload_url)) | |
932 item.push_state.uploaded = True | |
933 else: | |
934 logging.info( | |
935 'A file %s already uploaded, retrying finalization only', item.digest) | |
936 | |
937 # Optionally notify the server that it's done. | |
938 if item.push_state.finalize_url: | |
939 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and | |
940 # send it to isolated server. That way isolate server can verify that | |
941 # the data safely reached Google Storage (GS provides MD5 and CRC32C of | |
942 # stored files). | |
943 response = net.url_read( | |
944 url=item.push_state.finalize_url, | |
945 data='', | |
946 content_type='application/json', | |
947 method='POST') | |
948 if response is None: | |
949 raise IOError('Failed to finalize an upload of %s' % item.digest) | |
950 item.push_state.finalized = True | |
951 | |
952 def contains(self, items): | |
953 logging.info('Checking existence of %d files...', len(items)) | |
954 | |
955 # Request body is a json encoded list of dicts. | |
956 body = [ | |
957 { | |
958 'h': item.digest, | |
959 's': item.size, | |
960 'i': int(item.is_isolated), | |
961 } for item in items | |
962 ] | |
963 | |
964 query_url = '%s/content-gs/pre-upload/%s?token=%s' % ( | |
965 self.base_url, | |
966 self.namespace, | |
967 urllib.quote(self._server_capabilities['access_token'])) | |
968 response_body = net.url_read( | |
969 url=query_url, | |
970 data=json.dumps(body, separators=(',', ':')), | |
971 content_type='application/json', | |
972 method='POST') | |
973 if response_body is None: | |
974 raise MappingError('Failed to execute /pre-upload query') | |
975 | |
976 # Response body is a list of push_urls (or null if file is already present). | |
977 try: | |
978 response = json.loads(response_body) | |
979 if not isinstance(response, list): | |
980 raise ValueError('Expecting response with json-encoded list') | |
981 if len(response) != len(items): | |
982 raise ValueError( | |
983 'Incorrect number of items in the list, expected %d, ' | |
984 'but got %d' % (len(items), len(response))) | |
985 except ValueError as err: | |
986 raise MappingError( | |
987 'Invalid response from server: %s, body is %s' % (err, response_body)) | |
988 | |
989 # Pick Items that are missing, attach _PushState to them. | |
990 missing_items = [] | |
991 for i, push_urls in enumerate(response): | |
992 if push_urls: | |
993 assert len(push_urls) == 2, str(push_urls) | |
994 item = items[i] | |
995 assert item.push_state is None | |
996 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1]) | |
997 missing_items.append(item) | |
998 logging.info('Queried %d files, %d cache hit', | |
999 len(items), len(items) - len(missing_items)) | |
1000 return missing_items | |
1001 | |
1002 | |
1003 class FileSystem(StorageApi): | |
1004 """StorageApi implementation that fetches data from the file system. | |
1005 | |
1006 The common use case is a NFS/CIFS file server that is mounted locally that is | |
1007 used to fetch the file on a local partition. | |
1008 """ | |
1009 | |
1010 def __init__(self, base_path): | |
1011 super(FileSystem, self).__init__() | |
1012 self.base_path = base_path | |
1013 | |
1014 def get_fetch_url(self, digest): | |
1015 return None | |
1016 | |
1017 def fetch(self, digest): | |
1018 assert isinstance(digest, basestring) | |
1019 return file_read(os.path.join(self.base_path, digest)) | |
1020 | |
1021 def push(self, item, content): | |
1022 assert isinstance(item, Item) | |
1023 file_write(os.path.join(self.base_path, item.digest), content) | |
1024 | |
1025 def contains(self, items): | |
1026 return [ | |
1027 item for item in items | |
1028 if not os.path.exists(os.path.join(self.base_path, item.digest)) | |
1029 ] | |
1030 | |
1031 | |
1032 class LocalCache(object): | |
1033 """Local cache that stores objects fetched via Storage. | |
1034 | |
1035 It can be accessed concurrently from multiple threads, so it should protect | |
1036 its internal state with some lock. | |
1037 """ | |
1038 | |
1039 def __enter__(self): | |
1040 """Context manager interface.""" | |
1041 return self | |
1042 | |
1043 def __exit__(self, _exc_type, _exec_value, _traceback): | |
1044 """Context manager interface.""" | |
1045 return False | |
1046 | |
1047 def cached_set(self): | |
1048 """Returns a set of all cached digests (always a new object).""" | |
1049 raise NotImplementedError() | |
1050 | |
1051 def touch(self, digest, size): | |
1052 """Ensures item is not corrupted and updates its LRU position. | |
1053 | |
1054 Arguments: | |
1055 digest: hash digest of item to check. | |
1056 size: expected size of this item. | |
1057 | |
1058 Returns: | |
1059 True if item is in cache and not corrupted. | |
1060 """ | |
1061 raise NotImplementedError() | |
1062 | |
1063 def evict(self, digest): | |
1064 """Removes item from cache if it's there.""" | |
1065 raise NotImplementedError() | |
1066 | |
1067 def read(self, digest): | |
1068 """Returns contents of the cached item as a single str.""" | |
1069 raise NotImplementedError() | |
1070 | |
1071 def write(self, digest, content): | |
1072 """Reads data from |content| generator and stores it in cache.""" | |
1073 raise NotImplementedError() | |
1074 | |
1075 def link(self, digest, dest, file_mode=None): | |
1076 """Ensures file at |dest| has same content as cached |digest|.""" | |
1077 raise NotImplementedError() | |
1078 | |
1079 | |
1080 class MemoryCache(LocalCache): | |
1081 """LocalCache implementation that stores everything in memory.""" | |
1082 | |
1083 def __init__(self): | |
1084 super(MemoryCache, self).__init__() | |
1085 # Let's not assume dict is thread safe. | |
1086 self._lock = threading.Lock() | |
1087 self._contents = {} | |
1088 | |
1089 def cached_set(self): | |
1090 with self._lock: | |
1091 return set(self._contents) | |
1092 | |
1093 def touch(self, digest, size): | |
1094 with self._lock: | |
1095 return digest in self._contents | |
1096 | |
1097 def evict(self, digest): | |
1098 with self._lock: | |
1099 self._contents.pop(digest, None) | |
1100 | |
1101 def read(self, digest): | |
1102 with self._lock: | |
1103 return self._contents[digest] | |
1104 | |
1105 def write(self, digest, content): | |
1106 # Assemble whole stream before taking the lock. | |
1107 data = ''.join(content) | |
1108 with self._lock: | |
1109 self._contents[digest] = data | |
1110 | |
1111 def link(self, digest, dest, file_mode=None): | |
1112 file_write(dest, [self.read(digest)]) | |
1113 if file_mode is not None: | |
1114 os.chmod(dest, file_mode) | |
1115 | |
1116 | |
1117 def get_hash_algo(_namespace): | |
1118 """Return hash algorithm class to use when uploading to given |namespace|.""" | |
1119 # TODO(vadimsh): Implement this at some point. | |
1120 return hashlib.sha1 | |
1121 | |
1122 | |
1123 def is_namespace_with_compression(namespace): | |
1124 """Returns True if given |namespace| stores compressed objects.""" | |
1125 return namespace.endswith(('-gzip', '-deflate')) | |
1126 | |
1127 | |
1128 def get_storage_api(file_or_url, namespace): | |
1129 """Returns an object that implements StorageApi interface.""" | |
1130 if re.match(r'^https?://.+$', file_or_url): | |
1131 return IsolateServer(file_or_url, namespace) | |
1132 else: | |
1133 return FileSystem(file_or_url) | |
1134 | |
1135 | |
1136 def get_storage(file_or_url, namespace): | |
1137 """Returns Storage class configured with appropriate StorageApi instance.""" | |
1138 return Storage( | |
1139 get_storage_api(file_or_url, namespace), | |
1140 is_namespace_with_compression(namespace)) | |
1141 | |
1142 | |
1143 def upload_tree(base_url, indir, infiles, namespace): | |
1144 """Uploads the given tree to the given url. | |
1145 | |
1146 Arguments: | |
1147 base_url: The base url, it is assume that |base_url|/has/ can be used to | |
1148 query if an element was already uploaded, and |base_url|/store/ | |
1149 can be used to upload a new element. | |
1150 indir: Root directory the infiles are based in. | |
1151 infiles: dict of files to upload from |indir| to |base_url|. | |
1152 namespace: The namespace to use on the server. | |
1153 """ | |
1154 with get_storage(base_url, namespace) as storage: | |
1155 storage.upload_tree(indir, infiles) | |
1156 return 0 | |
1157 | |
1158 | |
1159 def load_isolated(content, os_flavor, algo): | |
1160 """Verifies the .isolated file is valid and loads this object with the json | |
1161 data. | |
1162 | |
1163 Arguments: | |
1164 - content: raw serialized content to load. | |
1165 - os_flavor: OS to load this file on. Optional. | |
1166 - algo: hashlib algorithm class. Used to confirm the algorithm matches the | |
1167 algorithm used on the Isolate Server. | |
1168 """ | |
1169 try: | |
1170 data = json.loads(content) | |
1171 except ValueError: | |
1172 raise ConfigError('Failed to parse: %s...' % content[:100]) | |
1173 | |
1174 if not isinstance(data, dict): | |
1175 raise ConfigError('Expected dict, got %r' % data) | |
1176 | |
1177 # Check 'version' first, since it could modify the parsing after. | |
1178 value = data.get('version', '1.0') | |
1179 if not isinstance(value, basestring): | |
1180 raise ConfigError('Expected string, got %r' % value) | |
1181 if not re.match(r'^(\d+)\.(\d+)$', value): | |
1182 raise ConfigError('Expected a compatible version, got %r' % value) | |
1183 if value.split('.', 1)[0] != '1': | |
1184 raise ConfigError('Expected compatible \'1.x\' version, got %r' % value) | |
1185 | |
1186 if algo is None: | |
1187 # Default the algorithm used in the .isolated file itself, falls back to | |
1188 # 'sha-1' if unspecified. | |
1189 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')] | |
1190 | |
1191 for key, value in data.iteritems(): | |
1192 if key == 'algo': | |
1193 if not isinstance(value, basestring): | |
1194 raise ConfigError('Expected string, got %r' % value) | |
1195 if value not in SUPPORTED_ALGOS: | |
1196 raise ConfigError( | |
1197 'Expected one of \'%s\', got %r' % | |
1198 (', '.join(sorted(SUPPORTED_ALGOS)), value)) | |
1199 if value != SUPPORTED_ALGOS_REVERSE[algo]: | |
1200 raise ConfigError( | |
1201 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value)) | |
1202 | |
1203 elif key == 'command': | |
1204 if not isinstance(value, list): | |
1205 raise ConfigError('Expected list, got %r' % value) | |
1206 if not value: | |
1207 raise ConfigError('Expected non-empty command') | |
1208 for subvalue in value: | |
1209 if not isinstance(subvalue, basestring): | |
1210 raise ConfigError('Expected string, got %r' % subvalue) | |
1211 | |
1212 elif key == 'files': | |
1213 if not isinstance(value, dict): | |
1214 raise ConfigError('Expected dict, got %r' % value) | |
1215 for subkey, subvalue in value.iteritems(): | |
1216 if not isinstance(subkey, basestring): | |
1217 raise ConfigError('Expected string, got %r' % subkey) | |
1218 if not isinstance(subvalue, dict): | |
1219 raise ConfigError('Expected dict, got %r' % subvalue) | |
1220 for subsubkey, subsubvalue in subvalue.iteritems(): | |
1221 if subsubkey == 'l': | |
1222 if not isinstance(subsubvalue, basestring): | |
1223 raise ConfigError('Expected string, got %r' % subsubvalue) | |
1224 elif subsubkey == 'm': | |
1225 if not isinstance(subsubvalue, int): | |
1226 raise ConfigError('Expected int, got %r' % subsubvalue) | |
1227 elif subsubkey == 'h': | |
1228 if not is_valid_hash(subsubvalue, algo): | |
1229 raise ConfigError('Expected sha-1, got %r' % subsubvalue) | |
1230 elif subsubkey == 's': | |
1231 if not isinstance(subsubvalue, int): | |
1232 raise ConfigError('Expected int, got %r' % subsubvalue) | |
1233 else: | |
1234 raise ConfigError('Unknown subsubkey %s' % subsubkey) | |
1235 if bool('h' in subvalue) == bool('l' in subvalue): | |
1236 raise ConfigError( | |
1237 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' % | |
1238 subvalue) | |
1239 if bool('h' in subvalue) != bool('s' in subvalue): | |
1240 raise ConfigError( | |
1241 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' % | |
1242 subvalue) | |
1243 if bool('s' in subvalue) == bool('l' in subvalue): | |
1244 raise ConfigError( | |
1245 'Need only one of \'s\' (size) or \'l\' (link), got: %r' % | |
1246 subvalue) | |
1247 if bool('l' in subvalue) and bool('m' in subvalue): | |
1248 raise ConfigError( | |
1249 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' % | |
1250 subvalue) | |
1251 | |
1252 elif key == 'includes': | |
1253 if not isinstance(value, list): | |
1254 raise ConfigError('Expected list, got %r' % value) | |
1255 if not value: | |
1256 raise ConfigError('Expected non-empty includes list') | |
1257 for subvalue in value: | |
1258 if not is_valid_hash(subvalue, algo): | |
1259 raise ConfigError('Expected sha-1, got %r' % subvalue) | |
1260 | |
1261 elif key == 'read_only': | |
1262 if not isinstance(value, bool): | |
1263 raise ConfigError('Expected bool, got %r' % value) | |
1264 | |
1265 elif key == 'relative_cwd': | |
1266 if not isinstance(value, basestring): | |
1267 raise ConfigError('Expected string, got %r' % value) | |
1268 | |
1269 elif key == 'os': | |
1270 if os_flavor and value != os_flavor: | |
1271 raise ConfigError( | |
1272 'Expected \'os\' to be \'%s\' but got \'%s\'' % | |
1273 (os_flavor, value)) | |
1274 | |
1275 elif key == 'version': | |
1276 # Already checked above. | |
1277 pass | |
1278 | |
1279 else: | |
1280 raise ConfigError('Unknown key %r' % key) | |
1281 | |
1282 # Automatically fix os.path.sep if necessary. While .isolated files are always | |
1283 # in the the native path format, someone could want to download an .isolated | |
1284 # tree from another OS. | |
1285 wrong_path_sep = '/' if os.path.sep == '\\' else '\\' | |
1286 if 'files' in data: | |
1287 data['files'] = dict( | |
1288 (k.replace(wrong_path_sep, os.path.sep), v) | |
1289 for k, v in data['files'].iteritems()) | |
1290 for v in data['files'].itervalues(): | |
1291 if 'l' in v: | |
1292 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep) | |
1293 if 'relative_cwd' in data: | |
1294 data['relative_cwd'] = data['relative_cwd'].replace( | |
1295 wrong_path_sep, os.path.sep) | |
1296 return data | |
1297 | |
1298 | |
1299 class IsolatedFile(object): | |
1300 """Represents a single parsed .isolated file.""" | |
1301 def __init__(self, obj_hash, algo): | |
1302 """|obj_hash| is really the sha-1 of the file.""" | |
1303 logging.debug('IsolatedFile(%s)' % obj_hash) | |
1304 self.obj_hash = obj_hash | |
1305 self.algo = algo | |
1306 # Set once all the left-side of the tree is parsed. 'Tree' here means the | |
1307 # .isolate and all the .isolated files recursively included by it with | |
1308 # 'includes' key. The order of each sha-1 in 'includes', each representing a | |
1309 # .isolated file in the hash table, is important, as the later ones are not | |
1310 # processed until the firsts are retrieved and read. | |
1311 self.can_fetch = False | |
1312 | |
1313 # Raw data. | |
1314 self.data = {} | |
1315 # A IsolatedFile instance, one per object in self.includes. | |
1316 self.children = [] | |
1317 | |
1318 # Set once the .isolated file is loaded. | |
1319 self._is_parsed = False | |
1320 # Set once the files are fetched. | |
1321 self.files_fetched = False | |
1322 | |
1323 def load(self, os_flavor, content): | |
1324 """Verifies the .isolated file is valid and loads this object with the json | |
1325 data. | |
1326 """ | |
1327 logging.debug('IsolatedFile.load(%s)' % self.obj_hash) | |
1328 assert not self._is_parsed | |
1329 self.data = load_isolated(content, os_flavor, self.algo) | |
1330 self.children = [ | |
1331 IsolatedFile(i, self.algo) for i in self.data.get('includes', []) | |
1332 ] | |
1333 self._is_parsed = True | |
1334 | |
1335 def fetch_files(self, fetch_queue, files): | |
1336 """Adds files in this .isolated file not present in |files| dictionary. | |
1337 | |
1338 Preemptively request files. | |
1339 | |
1340 Note that |files| is modified by this function. | |
1341 """ | |
1342 assert self.can_fetch | |
1343 if not self._is_parsed or self.files_fetched: | |
1344 return | |
1345 logging.debug('fetch_files(%s)' % self.obj_hash) | |
1346 for filepath, properties in self.data.get('files', {}).iteritems(): | |
1347 # Root isolated has priority on the files being mapped. In particular, | |
1348 # overriden files must not be fetched. | |
1349 if filepath not in files: | |
1350 files[filepath] = properties | |
1351 if 'h' in properties: | |
1352 # Preemptively request files. | |
1353 logging.debug('fetching %s' % filepath) | |
1354 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s']) | |
1355 self.files_fetched = True | |
1356 | |
1357 | |
1358 class Settings(object): | |
1359 """Results of a completely parsed .isolated file.""" | |
1360 def __init__(self): | |
1361 self.command = [] | |
1362 self.files = {} | |
1363 self.read_only = None | |
1364 self.relative_cwd = None | |
1365 # The main .isolated file, a IsolatedFile instance. | |
1366 self.root = None | |
1367 | |
1368 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo): | |
1369 """Loads the .isolated and all the included .isolated asynchronously. | |
1370 | |
1371 It enables support for "included" .isolated files. They are processed in | |
1372 strict order but fetched asynchronously from the cache. This is important so | |
1373 that a file in an included .isolated file that is overridden by an embedding | |
1374 .isolated file is not fetched needlessly. The includes are fetched in one | |
1375 pass and the files are fetched as soon as all the ones on the left-side | |
1376 of the tree were fetched. | |
1377 | |
1378 The prioritization is very important here for nested .isolated files. | |
1379 'includes' have the highest priority and the algorithm is optimized for both | |
1380 deep and wide trees. A deep one is a long link of .isolated files referenced | |
1381 one at a time by one item in 'includes'. A wide one has a large number of | |
1382 'includes' in a single .isolated file. 'left' is defined as an included | |
1383 .isolated file earlier in the 'includes' list. So the order of the elements | |
1384 in 'includes' is important. | |
1385 """ | |
1386 self.root = IsolatedFile(root_isolated_hash, algo) | |
1387 | |
1388 # Isolated files being retrieved now: hash -> IsolatedFile instance. | |
1389 pending = {} | |
1390 # Set of hashes of already retrieved items to refuse recursive includes. | |
1391 seen = set() | |
1392 | |
1393 def retrieve(isolated_file): | |
1394 h = isolated_file.obj_hash | |
1395 if h in seen: | |
1396 raise ConfigError('IsolatedFile %s is retrieved recursively' % h) | |
1397 assert h not in pending | |
1398 seen.add(h) | |
1399 pending[h] = isolated_file | |
1400 fetch_queue.add(WorkerPool.HIGH, h) | |
1401 | |
1402 retrieve(self.root) | |
1403 | |
1404 while pending: | |
1405 item_hash = fetch_queue.wait(pending) | |
1406 item = pending.pop(item_hash) | |
1407 item.load(os_flavor, fetch_queue.cache.read(item_hash)) | |
1408 if item_hash == root_isolated_hash: | |
1409 # It's the root item. | |
1410 item.can_fetch = True | |
1411 | |
1412 for new_child in item.children: | |
1413 retrieve(new_child) | |
1414 | |
1415 # Traverse the whole tree to see if files can now be fetched. | |
1416 self._traverse_tree(fetch_queue, self.root) | |
1417 | |
1418 def check(n): | |
1419 return all(check(x) for x in n.children) and n.files_fetched | |
1420 assert check(self.root) | |
1421 | |
1422 self.relative_cwd = self.relative_cwd or '' | |
1423 self.read_only = self.read_only or False | |
1424 | |
1425 def _traverse_tree(self, fetch_queue, node): | |
1426 if node.can_fetch: | |
1427 if not node.files_fetched: | |
1428 self._update_self(fetch_queue, node) | |
1429 will_break = False | |
1430 for i in node.children: | |
1431 if not i.can_fetch: | |
1432 if will_break: | |
1433 break | |
1434 # Automatically mark the first one as fetcheable. | |
1435 i.can_fetch = True | |
1436 will_break = True | |
1437 self._traverse_tree(fetch_queue, i) | |
1438 | |
1439 def _update_self(self, fetch_queue, node): | |
1440 node.fetch_files(fetch_queue, self.files) | |
1441 # Grabs properties. | |
1442 if not self.command and node.data.get('command'): | |
1443 # Ensure paths are correctly separated on windows. | |
1444 self.command = node.data['command'] | |
1445 if self.command: | |
1446 self.command[0] = self.command[0].replace('/', os.path.sep) | |
1447 self.command = tools.fix_python_path(self.command) | |
1448 if self.read_only is None and node.data.get('read_only') is not None: | |
1449 self.read_only = node.data['read_only'] | |
1450 if (self.relative_cwd is None and | |
1451 node.data.get('relative_cwd') is not None): | |
1452 self.relative_cwd = node.data['relative_cwd'] | |
1453 | |
1454 | |
1455 def fetch_isolated( | |
1456 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command): | |
1457 """Aggressively downloads the .isolated file(s), then download all the files. | |
1458 | |
1459 Arguments: | |
1460 isolated_hash: hash of the root *.isolated file. | |
1461 storage: Storage class that communicates with isolate storage. | |
1462 cache: LocalCache class that knows how to store and map files locally. | |
1463 algo: hash algorithm to use. | |
1464 outdir: Output directory to map file tree to. | |
1465 os_flavor: OS flavor to choose when reading sections of *.isolated file. | |
1466 require_command: Ensure *.isolated specifies a command to run. | |
1467 | |
1468 Returns: | |
1469 Settings object that holds details about loaded *.isolated file. | |
1470 """ | |
1471 with cache: | |
1472 fetch_queue = FetchQueue(storage, cache) | |
1473 settings = Settings() | |
1474 | |
1475 with tools.Profiler('GetIsolateds'): | |
1476 # Optionally support local files by manually adding them to cache. | |
1477 if not is_valid_hash(isolated_hash, algo): | |
1478 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo) | |
1479 | |
1480 # Load all *.isolated and start loading rest of the files. | |
1481 settings.load(fetch_queue, isolated_hash, os_flavor, algo) | |
1482 if require_command and not settings.command: | |
1483 # TODO(vadimsh): All fetch operations are already enqueue and there's no | |
1484 # easy way to cancel them. | |
1485 raise ConfigError('No command to run') | |
1486 | |
1487 with tools.Profiler('GetRest'): | |
1488 # Create file system hierarchy. | |
1489 if not os.path.isdir(outdir): | |
1490 os.makedirs(outdir) | |
1491 create_directories(outdir, settings.files) | |
1492 create_links(outdir, settings.files.iteritems()) | |
1493 | |
1494 # Ensure working directory exists. | |
1495 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd)) | |
1496 if not os.path.isdir(cwd): | |
1497 os.makedirs(cwd) | |
1498 | |
1499 # Multimap: digest -> list of pairs (path, props). | |
1500 remaining = {} | |
1501 for filepath, props in settings.files.iteritems(): | |
1502 if 'h' in props: | |
1503 remaining.setdefault(props['h'], []).append((filepath, props)) | |
1504 | |
1505 # Now block on the remaining files to be downloaded and mapped. | |
1506 logging.info('Retrieving remaining files (%d of them)...', | |
1507 fetch_queue.pending_count) | |
1508 last_update = time.time() | |
1509 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: | |
1510 while remaining: | |
1511 detector.ping() | |
1512 | |
1513 # Wait for any item to finish fetching to cache. | |
1514 digest = fetch_queue.wait(remaining) | |
1515 | |
1516 # Link corresponding files to a fetched item in cache. | |
1517 for filepath, props in remaining.pop(digest): | |
1518 cache.link(digest, os.path.join(outdir, filepath), props.get('m')) | |
1519 | |
1520 # Report progress. | |
1521 duration = time.time() - last_update | |
1522 if duration > DELAY_BETWEEN_UPDATES_IN_SECS: | |
1523 msg = '%d files remaining...' % len(remaining) | |
1524 print msg | |
1525 logging.info(msg) | |
1526 last_update = time.time() | |
1527 | |
1528 # Cache could evict some items we just tried to fetch, it's a fatal error. | |
1529 if not fetch_queue.verify_all_cached(): | |
1530 raise MappingError('Cache is too small to hold all requested files') | |
1531 return settings | |
1532 | |
1533 | |
1534 @subcommand.usage('<file1..fileN> or - to read from stdin') | |
1535 def CMDarchive(parser, args): | |
1536 """Archives data to the server.""" | |
1537 options, files = parser.parse_args(args) | |
1538 | |
1539 if files == ['-']: | |
1540 files = sys.stdin.readlines() | |
1541 | |
1542 if not files: | |
1543 parser.error('Nothing to upload') | |
1544 | |
1545 # Load the necessary metadata. | |
1546 # TODO(maruel): Use a worker pool to upload as the hashing is being done. | |
1547 infiles = dict( | |
1548 ( | |
1549 f, | |
1550 { | |
1551 's': os.stat(f).st_size, | |
1552 'h': hash_file(f, get_hash_algo(options.namespace)), | |
1553 } | |
1554 ) | |
1555 for f in files) | |
1556 | |
1557 with tools.Profiler('Archive'): | |
1558 ret = upload_tree( | |
1559 base_url=options.isolate_server, | |
1560 indir=os.getcwd(), | |
1561 infiles=infiles, | |
1562 namespace=options.namespace) | |
1563 if not ret: | |
1564 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles)) | |
1565 return ret | |
1566 | |
1567 | |
1568 def CMDdownload(parser, args): | |
1569 """Download data from the server. | |
1570 | |
1571 It can either download individual files or a complete tree from a .isolated | |
1572 file. | |
1573 """ | |
1574 parser.add_option( | |
1575 '-i', '--isolated', metavar='HASH', | |
1576 help='hash of an isolated file, .isolated file content is discarded, use ' | |
1577 '--file if you need it') | |
1578 parser.add_option( | |
1579 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2, | |
1580 help='hash and destination of a file, can be used multiple times') | |
1581 parser.add_option( | |
1582 '-t', '--target', metavar='DIR', default=os.getcwd(), | |
1583 help='destination directory') | |
1584 options, args = parser.parse_args(args) | |
1585 if args: | |
1586 parser.error('Unsupported arguments: %s' % args) | |
1587 if bool(options.isolated) == bool(options.file): | |
1588 parser.error('Use one of --isolated or --file, and only one.') | |
1589 | |
1590 options.target = os.path.abspath(options.target) | |
1591 storage = get_storage(options.isolate_server, options.namespace) | |
1592 cache = MemoryCache() | |
1593 algo = get_hash_algo(options.namespace) | |
1594 | |
1595 # Fetching individual files. | |
1596 if options.file: | |
1597 channel = threading_utils.TaskChannel() | |
1598 pending = {} | |
1599 for digest, dest in options.file: | |
1600 pending[digest] = dest | |
1601 storage.async_fetch( | |
1602 channel, | |
1603 WorkerPool.MED, | |
1604 digest, | |
1605 UNKNOWN_FILE_SIZE, | |
1606 functools.partial(file_write, os.path.join(options.target, dest))) | |
1607 while pending: | |
1608 fetched = channel.pull() | |
1609 dest = pending.pop(fetched) | |
1610 logging.info('%s: %s', fetched, dest) | |
1611 | |
1612 # Fetching whole isolated tree. | |
1613 if options.isolated: | |
1614 settings = fetch_isolated( | |
1615 isolated_hash=options.isolated, | |
1616 storage=storage, | |
1617 cache=cache, | |
1618 algo=algo, | |
1619 outdir=options.target, | |
1620 os_flavor=None, | |
1621 require_command=False) | |
1622 rel = os.path.join(options.target, settings.relative_cwd) | |
1623 print('To run this test please run from the directory %s:' % | |
1624 os.path.join(options.target, rel)) | |
1625 print(' ' + ' '.join(settings.command)) | |
1626 | |
1627 return 0 | |
1628 | |
1629 | |
1630 class OptionParserIsolateServer(tools.OptionParserWithLogging): | |
1631 def __init__(self, **kwargs): | |
1632 tools.OptionParserWithLogging.__init__(self, **kwargs) | |
1633 self.add_option( | |
1634 '-I', '--isolate-server', | |
1635 metavar='URL', default='', | |
1636 help='Isolate server to use') | |
1637 self.add_option( | |
1638 '--namespace', default='default-gzip', | |
1639 help='The namespace to use on the server, default: %default') | |
1640 | |
1641 def parse_args(self, *args, **kwargs): | |
1642 options, args = tools.OptionParserWithLogging.parse_args( | |
1643 self, *args, **kwargs) | |
1644 options.isolate_server = options.isolate_server.rstrip('/') | |
1645 if not options.isolate_server: | |
1646 self.error('--isolate-server is required.') | |
1647 return options, args | |
1648 | |
1649 | |
1650 def main(args): | |
1651 dispatcher = subcommand.CommandDispatcher(__name__) | |
1652 try: | |
1653 return dispatcher.execute( | |
1654 OptionParserIsolateServer(version=__version__), args) | |
1655 except Exception as e: | |
1656 tools.report_error(e) | |
1657 return 1 | |
1658 | |
1659 | |
1660 if __name__ == '__main__': | |
1661 fix_encoding.fix_encoding() | |
1662 tools.disable_buffering() | |
1663 colorama.init() | |
1664 sys.exit(main(sys.argv[1:])) | |
OLD | NEW |