Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7)

Side by Side Diff: swarm_client/isolateserver.py

Issue 69143004: Delete swarm_client. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « swarm_client/isolate_merge.py ('k') | swarm_client/run_isolated.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Archives a set of files to a server."""
7
8 __version__ = '0.2'
9
10 import functools
11 import hashlib
12 import json
13 import logging
14 import os
15 import re
16 import sys
17 import threading
18 import time
19 import urllib
20 import zlib
21
22 from third_party import colorama
23 from third_party.depot_tools import fix_encoding
24 from third_party.depot_tools import subcommand
25
26 from utils import net
27 from utils import threading_utils
28 from utils import tools
29
30
31 # Version of isolate protocol passed to the server in /handshake request.
32 ISOLATE_PROTOCOL_VERSION = '1.0'
33
34
35 # The number of files to check the isolate server per /pre-upload query.
36 # All files are sorted by likelihood of a change in the file content
37 # (currently file size is used to estimate this: larger the file -> larger the
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
39 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
40 # and so on. Numbers here is a trade-off; the more per request, the lower the
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42 # larger values cause longer lookups, increasing the initial latency to start
43 # uploading, which is especially an issue for large files. This value is
44 # optimized for the "few thousands files to look up with minimal number of large
45 # files missing" case.
46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
47
48
49 # A list of already compressed extension types that should not receive any
50 # compression before being uploaded.
51 ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip'
54 ]
55
56
57 # The file size to be used when we don't know the correct file size,
58 # generally used for .isolated files.
59 UNKNOWN_FILE_SIZE = None
60
61
62 # The size of each chunk to read when downloading and unzipping files.
63 ZIPPED_FILE_CHUNK = 16 * 1024
64
65 # Chunk size to use when doing disk I/O.
66 DISK_FILE_CHUNK = 1024 * 1024
67
68 # Chunk size to use when reading from network stream.
69 NET_IO_FILE_CHUNK = 16 * 1024
70
71
72 # Read timeout in seconds for downloads from isolate storage. If there's no
73 # response from the server within this timeout whole download will be aborted.
74 DOWNLOAD_READ_TIMEOUT = 60
75
76 # Maximum expected delay (in seconds) between successive file fetches
77 # in run_tha_test. If it takes longer than that, a deadlock might be happening
78 # and all stack frames for all threads are dumped to log.
79 DEADLOCK_TIMEOUT = 5 * 60
80
81
82 # The delay (in seconds) to wait between logging statements when retrieving
83 # the required files. This is intended to let the user (or buildbot) know that
84 # the program is still running.
85 DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
88 # Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
89 # specify the names here.
90 SUPPORTED_ALGOS = {
91 'md5': hashlib.md5,
92 'sha-1': hashlib.sha1,
93 'sha-512': hashlib.sha512,
94 }
95
96
97 # Used for serialization.
98 SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
99
100
101 class ConfigError(ValueError):
102 """Generic failure to load a .isolated file."""
103 pass
104
105
106 class MappingError(OSError):
107 """Failed to recreate the tree."""
108 pass
109
110
111 def is_valid_hash(value, algo):
112 """Returns if the value is a valid hash for the corresponding algorithm."""
113 size = 2 * algo().digest_size
114 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
115
116
117 def hash_file(filepath, algo):
118 """Calculates the hash of a file without reading it all in memory at once.
119
120 |algo| should be one of hashlib hashing algorithm.
121 """
122 digest = algo()
123 with open(filepath, 'rb') as f:
124 while True:
125 chunk = f.read(DISK_FILE_CHUNK)
126 if not chunk:
127 break
128 digest.update(chunk)
129 return digest.hexdigest()
130
131
132 def stream_read(stream, chunk_size):
133 """Reads chunks from |stream| and yields them."""
134 while True:
135 data = stream.read(chunk_size)
136 if not data:
137 break
138 yield data
139
140
141 def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
142 """Yields file content in chunks of given |chunk_size|."""
143 with open(filepath, 'rb') as f:
144 while True:
145 data = f.read(chunk_size)
146 if not data:
147 break
148 yield data
149
150
151 def file_write(filepath, content_generator):
152 """Writes file content as generated by content_generator.
153
154 Creates the intermediary directory as needed.
155
156 Returns the number of bytes written.
157
158 Meant to be mocked out in unit tests.
159 """
160 filedir = os.path.dirname(filepath)
161 if not os.path.isdir(filedir):
162 os.makedirs(filedir)
163 total = 0
164 with open(filepath, 'wb') as f:
165 for d in content_generator:
166 total += len(d)
167 f.write(d)
168 return total
169
170
171 def zip_compress(content_generator, level=7):
172 """Reads chunks from |content_generator| and yields zip compressed chunks."""
173 compressor = zlib.compressobj(level)
174 for chunk in content_generator:
175 compressed = compressor.compress(chunk)
176 if compressed:
177 yield compressed
178 tail = compressor.flush(zlib.Z_FINISH)
179 if tail:
180 yield tail
181
182
183 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
184 """Reads zipped data from |content_generator| and yields decompressed data.
185
186 Decompresses data in small chunks (no larger than |chunk_size|) so that
187 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
188
189 Raises IOError if data is corrupted or incomplete.
190 """
191 decompressor = zlib.decompressobj()
192 compressed_size = 0
193 try:
194 for chunk in content_generator:
195 compressed_size += len(chunk)
196 data = decompressor.decompress(chunk, chunk_size)
197 if data:
198 yield data
199 while decompressor.unconsumed_tail:
200 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
201 if data:
202 yield data
203 tail = decompressor.flush()
204 if tail:
205 yield tail
206 except zlib.error as e:
207 raise IOError(
208 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
209 # Ensure all data was read and decompressed.
210 if decompressor.unused_data or decompressor.unconsumed_tail:
211 raise IOError('Not all data was decompressed')
212
213
214 def get_zip_compression_level(filename):
215 """Given a filename calculates the ideal zip compression level to use."""
216 file_ext = os.path.splitext(filename)[1].lower()
217 # TODO(csharp): Profile to find what compression level works best.
218 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
219
220
221 def create_directories(base_directory, files):
222 """Creates the directory structure needed by the given list of files."""
223 logging.debug('create_directories(%s, %d)', base_directory, len(files))
224 # Creates the tree of directories to create.
225 directories = set(os.path.dirname(f) for f in files)
226 for item in list(directories):
227 while item:
228 directories.add(item)
229 item = os.path.dirname(item)
230 for d in sorted(directories):
231 if d:
232 os.mkdir(os.path.join(base_directory, d))
233
234
235 def create_links(base_directory, files):
236 """Creates any links needed by the given set of files."""
237 for filepath, properties in files:
238 if 'l' not in properties:
239 continue
240 if sys.platform == 'win32':
241 # TODO(maruel): Create junctions or empty text files similar to what
242 # cygwin do?
243 logging.warning('Ignoring symlink %s', filepath)
244 continue
245 outfile = os.path.join(base_directory, filepath)
246 # symlink doesn't exist on Windows. So the 'link' property should
247 # never be specified for windows .isolated file.
248 os.symlink(properties['l'], outfile) # pylint: disable=E1101
249 if 'm' in properties:
250 lchmod = getattr(os, 'lchmod', None)
251 if lchmod:
252 lchmod(outfile, properties['m'])
253
254
255 def is_valid_file(filepath, size):
256 """Determines if the given files appears valid.
257
258 Currently it just checks the file's size.
259 """
260 if size == UNKNOWN_FILE_SIZE:
261 return os.path.isfile(filepath)
262 actual_size = os.stat(filepath).st_size
263 if size != actual_size:
264 logging.warning(
265 'Found invalid item %s; %d != %d',
266 os.path.basename(filepath), actual_size, size)
267 return False
268 return True
269
270
271 class WorkerPool(threading_utils.AutoRetryThreadPool):
272 """Thread pool that automatically retries on IOError and runs a preconfigured
273 function.
274 """
275 # Initial and maximum number of worker threads.
276 INITIAL_WORKERS = 2
277 MAX_WORKERS = 16
278 RETRIES = 5
279
280 def __init__(self):
281 super(WorkerPool, self).__init__(
282 [IOError],
283 self.RETRIES,
284 self.INITIAL_WORKERS,
285 self.MAX_WORKERS,
286 0,
287 'remote')
288
289
290 class Item(object):
291 """An item to push to Storage.
292
293 It starts its life in a main thread, travels to 'contains' thread, then to
294 'push' thread and then finally back to the main thread.
295
296 It is never used concurrently from multiple threads.
297 """
298
299 def __init__(self, digest, size, is_isolated=False):
300 self.digest = digest
301 self.size = size
302 self.is_isolated = is_isolated
303 self.compression_level = 6
304 self.push_state = None
305
306 def content(self, chunk_size):
307 """Iterable with content of this item in chunks of given size.
308
309 Arguments:
310 chunk_size: preferred size of the chunk to produce, may be ignored.
311 """
312 raise NotImplementedError()
313
314
315 class FileItem(Item):
316 """A file to push to Storage."""
317
318 def __init__(self, path, digest, size, is_isolated):
319 super(FileItem, self).__init__(digest, size, is_isolated)
320 self.path = path
321 self.compression_level = get_zip_compression_level(path)
322
323 def content(self, chunk_size):
324 return file_read(self.path, chunk_size)
325
326
327 class BufferItem(Item):
328 """A byte buffer to push to Storage."""
329
330 def __init__(self, buf, algo, is_isolated=False):
331 super(BufferItem, self).__init__(
332 algo(buf).hexdigest(), len(buf), is_isolated)
333 self.buffer = buf
334
335 def content(self, _chunk_size):
336 return [self.buffer]
337
338
339 class Storage(object):
340 """Efficiently downloads or uploads large set of files via StorageApi."""
341
342 def __init__(self, storage_api, use_zip):
343 self.use_zip = use_zip
344 self._storage_api = storage_api
345 self._cpu_thread_pool = None
346 self._net_thread_pool = None
347
348 @property
349 def cpu_thread_pool(self):
350 """ThreadPool for CPU-bound tasks like zipping."""
351 if self._cpu_thread_pool is None:
352 self._cpu_thread_pool = threading_utils.ThreadPool(
353 2, max(threading_utils.num_processors(), 2), 0, 'zip')
354 return self._cpu_thread_pool
355
356 @property
357 def net_thread_pool(self):
358 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
359 if self._net_thread_pool is None:
360 self._net_thread_pool = WorkerPool()
361 return self._net_thread_pool
362
363 def close(self):
364 """Waits for all pending tasks to finish."""
365 if self._cpu_thread_pool:
366 self._cpu_thread_pool.join()
367 self._cpu_thread_pool.close()
368 self._cpu_thread_pool = None
369 if self._net_thread_pool:
370 self._net_thread_pool.join()
371 self._net_thread_pool.close()
372 self._net_thread_pool = None
373
374 def __enter__(self):
375 """Context manager interface."""
376 return self
377
378 def __exit__(self, _exc_type, _exc_value, _traceback):
379 """Context manager interface."""
380 self.close()
381 return False
382
383 def upload_tree(self, indir, infiles):
384 """Uploads the given tree to the isolate server.
385
386 Arguments:
387 indir: root directory the infiles are based in.
388 infiles: dict of files to upload from |indir|.
389
390 Returns:
391 List of items that were uploaded. All other items are already there.
392 """
393 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
394
395 # Convert |indir| + |infiles| into a list of FileItem objects.
396 # Filter out symlinks, since they are not represented by items on isolate
397 # server side.
398 items = [
399 FileItem(
400 path=os.path.join(indir, filepath),
401 digest=metadata['h'],
402 size=metadata['s'],
403 is_isolated=metadata.get('priority') == '0')
404 for filepath, metadata in infiles.iteritems()
405 if 'l' not in metadata
406 ]
407
408 return self.upload_items(items)
409
410 def upload_items(self, items):
411 """Uploads bunch of items to the isolate server.
412
413 Will upload only items that are missing.
414
415 Arguments:
416 items: list of Item instances that represents data to upload.
417
418 Returns:
419 List of items that were uploaded. All other items are already there.
420 """
421 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
422 # used by swarming.py. There's no need to spawn multiple threads and try to
423 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
424 # 'push' should be performed sequentially in the context of current thread.
425
426 # For each digest keep only first Item that matches it. All other items
427 # are just indistinguishable copies from the point of view of isolate
428 # server (it doesn't care about paths at all, only content and digests).
429 seen = {}
430 duplicates = 0
431 for item in items:
432 if seen.setdefault(item.digest, item) is not item:
433 duplicates += 1
434 items = seen.values()
435 if duplicates:
436 logging.info('Skipped %d duplicated files', duplicates)
437
438 # Enqueue all upload tasks.
439 missing = set()
440 channel = threading_utils.TaskChannel()
441 for missing_item in self.get_missing_items(items):
442 missing.add(missing_item)
443 self.async_push(
444 channel,
445 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
446 missing_item)
447
448 uploaded = []
449 # No need to spawn deadlock detector thread if there's nothing to upload.
450 if missing:
451 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
452 # Wait for all started uploads to finish.
453 while len(uploaded) != len(missing):
454 detector.ping()
455 item = channel.pull()
456 uploaded.append(item)
457 logging.debug(
458 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
459 logging.info('All files are uploaded')
460
461 # Print stats.
462 total = len(items)
463 total_size = sum(f.size for f in items)
464 logging.info(
465 'Total: %6d, %9.1fkb',
466 total,
467 total_size / 1024.)
468 cache_hit = set(items) - missing
469 cache_hit_size = sum(f.size for f in cache_hit)
470 logging.info(
471 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
472 len(cache_hit),
473 cache_hit_size / 1024.,
474 len(cache_hit) * 100. / total,
475 cache_hit_size * 100. / total_size if total_size else 0)
476 cache_miss = missing
477 cache_miss_size = sum(f.size for f in cache_miss)
478 logging.info(
479 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
480 len(cache_miss),
481 cache_miss_size / 1024.,
482 len(cache_miss) * 100. / total,
483 cache_miss_size * 100. / total_size if total_size else 0)
484
485 return uploaded
486
487 def get_fetch_url(self, digest):
488 """Returns an URL that can be used to fetch an item with given digest.
489
490 Arguments:
491 digest: hex digest of item to fetch.
492
493 Returns:
494 An URL or None if underlying protocol doesn't support this.
495 """
496 return self._storage_api.get_fetch_url(digest)
497
498 def async_push(self, channel, priority, item):
499 """Starts asynchronous push to the server in a parallel thread.
500
501 Arguments:
502 channel: TaskChannel that receives back |item| when upload ends.
503 priority: thread pool task priority for the push.
504 item: item to upload as instance of Item class.
505 """
506 def push(content):
507 """Pushes an item and returns its id, to pass as a result to |channel|."""
508 self._storage_api.push(item, content)
509 return item
510
511 # If zipping is not required, just start a push task.
512 if not self.use_zip:
513 self.net_thread_pool.add_task_with_channel(channel, priority, push,
514 item.content(DISK_FILE_CHUNK))
515 return
516
517 # If zipping is enabled, zip in a separate thread.
518 def zip_and_push():
519 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
520 # content right here. It will block until all file is zipped.
521 try:
522 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
523 item.compression_level)
524 data = ''.join(stream)
525 except Exception as exc:
526 logging.error('Failed to zip \'%s\': %s', item, exc)
527 channel.send_exception(exc)
528 return
529 self.net_thread_pool.add_task_with_channel(
530 channel, priority, push, [data])
531 self.cpu_thread_pool.add_task(priority, zip_and_push)
532
533 def async_fetch(self, channel, priority, digest, size, sink):
534 """Starts asynchronous fetch from the server in a parallel thread.
535
536 Arguments:
537 channel: TaskChannel that receives back |digest| when download ends.
538 priority: thread pool task priority for the fetch.
539 digest: hex digest of an item to download.
540 size: expected size of the item (after decompression).
541 sink: function that will be called as sink(generator).
542 """
543 def fetch():
544 try:
545 # Prepare reading pipeline.
546 stream = self._storage_api.fetch(digest)
547 if self.use_zip:
548 stream = zip_decompress(stream, DISK_FILE_CHUNK)
549 # Run |stream| through verifier that will assert its size.
550 verifier = FetchStreamVerifier(stream, size)
551 # Verified stream goes to |sink|.
552 sink(verifier.run())
553 except Exception as err:
554 logging.warning('Failed to fetch %s: %s', digest, err)
555 raise
556 return digest
557
558 # Don't bother with zip_thread_pool for decompression. Decompression is
559 # really fast and most probably IO bound anyway.
560 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
561
562 def get_missing_items(self, items):
563 """Yields items that are missing from the server.
564
565 Issues multiple parallel queries via StorageApi's 'contains' method.
566
567 Arguments:
568 items: a list of Item objects to check.
569
570 Yields:
571 Item objects that are missing from the server.
572 """
573 channel = threading_utils.TaskChannel()
574 pending = 0
575 # Enqueue all requests.
576 for batch in self.batch_items_for_check(items):
577 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
578 self._storage_api.contains, batch)
579 pending += 1
580 # Yield results as they come in.
581 for _ in xrange(pending):
582 for missing in channel.pull():
583 yield missing
584
585 @staticmethod
586 def batch_items_for_check(items):
587 """Splits list of items to check for existence on the server into batches.
588
589 Each batch corresponds to a single 'exists?' query to the server via a call
590 to StorageApi's 'contains' method.
591
592 Arguments:
593 items: a list of Item objects.
594
595 Yields:
596 Batches of items to query for existence in a single operation,
597 each batch is a list of Item objects.
598 """
599 batch_count = 0
600 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
601 next_queries = []
602 for item in sorted(items, key=lambda x: x.size, reverse=True):
603 next_queries.append(item)
604 if len(next_queries) == batch_size_limit:
605 yield next_queries
606 next_queries = []
607 batch_count += 1
608 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
609 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
610 if next_queries:
611 yield next_queries
612
613
614 class FetchQueue(object):
615 """Fetches items from Storage and places them into LocalCache.
616
617 It manages multiple concurrent fetch operations. Acts as a bridge between
618 Storage and LocalCache so that Storage and LocalCache don't depend on each
619 other at all.
620 """
621
622 def __init__(self, storage, cache):
623 self.storage = storage
624 self.cache = cache
625 self._channel = threading_utils.TaskChannel()
626 self._pending = set()
627 self._accessed = set()
628 self._fetched = cache.cached_set()
629
630 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
631 """Starts asynchronous fetch of item |digest|."""
632 # Fetching it now?
633 if digest in self._pending:
634 return
635
636 # Mark this file as in use, verify_all_cached will later ensure it is still
637 # in cache.
638 self._accessed.add(digest)
639
640 # Already fetched? Notify cache to update item's LRU position.
641 if digest in self._fetched:
642 # 'touch' returns True if item is in cache and not corrupted.
643 if self.cache.touch(digest, size):
644 return
645 # Item is corrupted, remove it from cache and fetch it again.
646 self._fetched.remove(digest)
647 self.cache.evict(digest)
648
649 # TODO(maruel): It should look at the free disk space, the current cache
650 # size and the size of the new item on every new item:
651 # - Trim the cache as more entries are listed when free disk space is low,
652 # otherwise if the amount of data downloaded during the run > free disk
653 # space, it'll crash.
654 # - Make sure there's enough free disk space to fit all dependencies of
655 # this run! If not, abort early.
656
657 # Start fetching.
658 self._pending.add(digest)
659 self.storage.async_fetch(
660 self._channel, priority, digest, size,
661 functools.partial(self.cache.write, digest))
662
663 def wait(self, digests):
664 """Starts a loop that waits for at least one of |digests| to be retrieved.
665
666 Returns the first digest retrieved.
667 """
668 # Flush any already fetched items.
669 for digest in digests:
670 if digest in self._fetched:
671 return digest
672
673 # Ensure all requested items are being fetched now.
674 assert all(digest in self._pending for digest in digests), (
675 digests, self._pending)
676
677 # Wait for some requested item to finish fetching.
678 while self._pending:
679 digest = self._channel.pull()
680 self._pending.remove(digest)
681 self._fetched.add(digest)
682 if digest in digests:
683 return digest
684
685 # Should never reach this point due to assert above.
686 raise RuntimeError('Impossible state')
687
688 def inject_local_file(self, path, algo):
689 """Adds local file to the cache as if it was fetched from storage."""
690 with open(path, 'rb') as f:
691 data = f.read()
692 digest = algo(data).hexdigest()
693 self.cache.write(digest, [data])
694 self._fetched.add(digest)
695 return digest
696
697 @property
698 def pending_count(self):
699 """Returns number of items to be fetched."""
700 return len(self._pending)
701
702 def verify_all_cached(self):
703 """True if all accessed items are in cache."""
704 return self._accessed.issubset(self.cache.cached_set())
705
706
707 class FetchStreamVerifier(object):
708 """Verifies that fetched file is valid before passing it to the LocalCache."""
709
710 def __init__(self, stream, expected_size):
711 self.stream = stream
712 self.expected_size = expected_size
713 self.current_size = 0
714
715 def run(self):
716 """Generator that yields same items as |stream|.
717
718 Verifies |stream| is complete before yielding a last chunk to consumer.
719
720 Also wraps IOError produced by consumer into MappingError exceptions since
721 otherwise Storage will retry fetch on unrelated local cache errors.
722 """
723 # Read one chunk ahead, keep it in |stored|.
724 # That way a complete stream can be verified before pushing last chunk
725 # to consumer.
726 stored = None
727 for chunk in self.stream:
728 assert chunk is not None
729 if stored is not None:
730 self._inspect_chunk(stored, is_last=False)
731 try:
732 yield stored
733 except IOError as exc:
734 raise MappingError('Failed to store an item in cache: %s' % exc)
735 stored = chunk
736 if stored is not None:
737 self._inspect_chunk(stored, is_last=True)
738 try:
739 yield stored
740 except IOError as exc:
741 raise MappingError('Failed to store an item in cache: %s' % exc)
742
743 def _inspect_chunk(self, chunk, is_last):
744 """Called for each fetched chunk before passing it to consumer."""
745 self.current_size += len(chunk)
746 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
747 (self.expected_size != self.current_size)):
748 raise IOError('Incorrect file size: expected %d, got %d' % (
749 self.expected_size, self.current_size))
750
751
752 class StorageApi(object):
753 """Interface for classes that implement low-level storage operations."""
754
755 def get_fetch_url(self, digest):
756 """Returns an URL that can be used to fetch an item with given digest.
757
758 Arguments:
759 digest: hex digest of item to fetch.
760
761 Returns:
762 An URL or None if the protocol doesn't support this.
763 """
764 raise NotImplementedError()
765
766 def fetch(self, digest):
767 """Fetches an object and yields its content.
768
769 Arguments:
770 digest: hash digest of item to download.
771
772 Yields:
773 Chunks of downloaded item (as str objects).
774 """
775 raise NotImplementedError()
776
777 def push(self, item, content):
778 """Uploads an |item| with content generated by |content| generator.
779
780 Arguments:
781 item: Item object that holds information about an item being pushed.
782 content: a generator that yields chunks to push.
783
784 Returns:
785 None.
786 """
787 raise NotImplementedError()
788
789 def contains(self, items):
790 """Checks for existence of given |items| on the server.
791
792 Mutates |items| by assigning opaque implement specific object to Item's
793 push_state attribute on missing entries in the datastore.
794
795 Arguments:
796 items: list of Item objects.
797
798 Returns:
799 A list of items missing on server as a list of Item objects.
800 """
801 raise NotImplementedError()
802
803
804 class IsolateServer(StorageApi):
805 """StorageApi implementation that downloads and uploads to Isolate Server.
806
807 It uploads and downloads directly from Google Storage whenever appropriate.
808 """
809
810 class _PushState(object):
811 """State needed to call .push(), to be stored in Item.push_state."""
812 def __init__(self, upload_url, finalize_url):
813 self.upload_url = upload_url
814 self.finalize_url = finalize_url
815 self.uploaded = False
816 self.finalized = False
817
818 def __init__(self, base_url, namespace):
819 super(IsolateServer, self).__init__()
820 assert base_url.startswith('http'), base_url
821 self.base_url = base_url.rstrip('/')
822 self.namespace = namespace
823 self._lock = threading.Lock()
824 self._server_caps = None
825
826 @staticmethod
827 def _generate_handshake_request():
828 """Returns a dict to be sent as handshake request body."""
829 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
830 return {
831 'client_app_version': __version__,
832 'fetcher': True,
833 'protocol_version': ISOLATE_PROTOCOL_VERSION,
834 'pusher': True,
835 }
836
837 @staticmethod
838 def _validate_handshake_response(caps):
839 """Validates and normalizes handshake response."""
840 logging.info('Protocol version: %s', caps['protocol_version'])
841 logging.info('Server version: %s', caps['server_app_version'])
842 if caps.get('error'):
843 raise MappingError(caps['error'])
844 if not caps['access_token']:
845 raise ValueError('access_token is missing')
846 return caps
847
848 @property
849 def _server_capabilities(self):
850 """Performs handshake with the server if not yet done.
851
852 Returns:
853 Server capabilities dictionary as returned by /handshake endpoint.
854
855 Raises:
856 MappingError if server rejects the handshake.
857 """
858 # TODO(maruel): Make this request much earlier asynchronously while the
859 # files are being enumerated.
860 with self._lock:
861 if self._server_caps is None:
862 request_body = json.dumps(
863 self._generate_handshake_request(), separators=(',', ':'))
864 response = net.url_read(
865 url=self.base_url + '/content-gs/handshake',
866 data=request_body,
867 content_type='application/json',
868 method='POST')
869 if response is None:
870 raise MappingError('Failed to perform handshake.')
871 try:
872 caps = json.loads(response)
873 if not isinstance(caps, dict):
874 raise ValueError('Expecting JSON dict')
875 self._server_caps = self._validate_handshake_response(caps)
876 except (ValueError, KeyError, TypeError) as exc:
877 # KeyError exception has very confusing str conversion: it's just a
878 # missing key value and nothing else. So print exception class name
879 # as well.
880 raise MappingError('Invalid handshake response (%s): %s' % (
881 exc.__class__.__name__, exc))
882 return self._server_caps
883
884 def get_fetch_url(self, digest):
885 assert isinstance(digest, basestring)
886 return '%s/content-gs/retrieve/%s/%s' % (
887 self.base_url, self.namespace, digest)
888
889 def fetch(self, digest):
890 source_url = self.get_fetch_url(digest)
891 logging.debug('download_file(%s)', source_url)
892
893 # Because the app engine DB is only eventually consistent, retry 404 errors
894 # because the file might just not be visible yet (even though it has been
895 # uploaded).
896 connection = net.url_open(
897 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
898 if not connection:
899 raise IOError('Unable to open connection to %s' % source_url)
900 return stream_read(connection, NET_IO_FILE_CHUNK)
901
902 def push(self, item, content):
903 assert isinstance(item, Item)
904 assert isinstance(item.push_state, IsolateServer._PushState)
905 assert not item.push_state.finalized
906
907 # TODO(vadimsh): Do not read from |content| generator when retrying push.
908 # If |content| is indeed a generator, it can not be re-winded back
909 # to the beginning of the stream. A retry will find it exhausted. A possible
910 # solution is to wrap |content| generator with some sort of caching
911 # restartable generator. It should be done alongside streaming support
912 # implementation.
913
914 # This push operation may be a retry after failed finalization call below,
915 # no need to reupload contents in that case.
916 if not item.push_state.uploaded:
917 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
918 # upload support is implemented.
919 if isinstance(content, list) and len(content) == 1:
920 content = content[0]
921 else:
922 content = ''.join(content)
923 # PUT file to |upload_url|.
924 response = net.url_read(
925 url=item.push_state.upload_url,
926 data=content,
927 content_type='application/octet-stream',
928 method='PUT')
929 if response is None:
930 raise IOError('Failed to upload a file %s to %s' % (
931 item.digest, item.push_state.upload_url))
932 item.push_state.uploaded = True
933 else:
934 logging.info(
935 'A file %s already uploaded, retrying finalization only', item.digest)
936
937 # Optionally notify the server that it's done.
938 if item.push_state.finalize_url:
939 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
940 # send it to isolated server. That way isolate server can verify that
941 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
942 # stored files).
943 response = net.url_read(
944 url=item.push_state.finalize_url,
945 data='',
946 content_type='application/json',
947 method='POST')
948 if response is None:
949 raise IOError('Failed to finalize an upload of %s' % item.digest)
950 item.push_state.finalized = True
951
952 def contains(self, items):
953 logging.info('Checking existence of %d files...', len(items))
954
955 # Request body is a json encoded list of dicts.
956 body = [
957 {
958 'h': item.digest,
959 's': item.size,
960 'i': int(item.is_isolated),
961 } for item in items
962 ]
963
964 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
965 self.base_url,
966 self.namespace,
967 urllib.quote(self._server_capabilities['access_token']))
968 response_body = net.url_read(
969 url=query_url,
970 data=json.dumps(body, separators=(',', ':')),
971 content_type='application/json',
972 method='POST')
973 if response_body is None:
974 raise MappingError('Failed to execute /pre-upload query')
975
976 # Response body is a list of push_urls (or null if file is already present).
977 try:
978 response = json.loads(response_body)
979 if not isinstance(response, list):
980 raise ValueError('Expecting response with json-encoded list')
981 if len(response) != len(items):
982 raise ValueError(
983 'Incorrect number of items in the list, expected %d, '
984 'but got %d' % (len(items), len(response)))
985 except ValueError as err:
986 raise MappingError(
987 'Invalid response from server: %s, body is %s' % (err, response_body))
988
989 # Pick Items that are missing, attach _PushState to them.
990 missing_items = []
991 for i, push_urls in enumerate(response):
992 if push_urls:
993 assert len(push_urls) == 2, str(push_urls)
994 item = items[i]
995 assert item.push_state is None
996 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
997 missing_items.append(item)
998 logging.info('Queried %d files, %d cache hit',
999 len(items), len(items) - len(missing_items))
1000 return missing_items
1001
1002
1003 class FileSystem(StorageApi):
1004 """StorageApi implementation that fetches data from the file system.
1005
1006 The common use case is a NFS/CIFS file server that is mounted locally that is
1007 used to fetch the file on a local partition.
1008 """
1009
1010 def __init__(self, base_path):
1011 super(FileSystem, self).__init__()
1012 self.base_path = base_path
1013
1014 def get_fetch_url(self, digest):
1015 return None
1016
1017 def fetch(self, digest):
1018 assert isinstance(digest, basestring)
1019 return file_read(os.path.join(self.base_path, digest))
1020
1021 def push(self, item, content):
1022 assert isinstance(item, Item)
1023 file_write(os.path.join(self.base_path, item.digest), content)
1024
1025 def contains(self, items):
1026 return [
1027 item for item in items
1028 if not os.path.exists(os.path.join(self.base_path, item.digest))
1029 ]
1030
1031
1032 class LocalCache(object):
1033 """Local cache that stores objects fetched via Storage.
1034
1035 It can be accessed concurrently from multiple threads, so it should protect
1036 its internal state with some lock.
1037 """
1038
1039 def __enter__(self):
1040 """Context manager interface."""
1041 return self
1042
1043 def __exit__(self, _exc_type, _exec_value, _traceback):
1044 """Context manager interface."""
1045 return False
1046
1047 def cached_set(self):
1048 """Returns a set of all cached digests (always a new object)."""
1049 raise NotImplementedError()
1050
1051 def touch(self, digest, size):
1052 """Ensures item is not corrupted and updates its LRU position.
1053
1054 Arguments:
1055 digest: hash digest of item to check.
1056 size: expected size of this item.
1057
1058 Returns:
1059 True if item is in cache and not corrupted.
1060 """
1061 raise NotImplementedError()
1062
1063 def evict(self, digest):
1064 """Removes item from cache if it's there."""
1065 raise NotImplementedError()
1066
1067 def read(self, digest):
1068 """Returns contents of the cached item as a single str."""
1069 raise NotImplementedError()
1070
1071 def write(self, digest, content):
1072 """Reads data from |content| generator and stores it in cache."""
1073 raise NotImplementedError()
1074
1075 def link(self, digest, dest, file_mode=None):
1076 """Ensures file at |dest| has same content as cached |digest|."""
1077 raise NotImplementedError()
1078
1079
1080 class MemoryCache(LocalCache):
1081 """LocalCache implementation that stores everything in memory."""
1082
1083 def __init__(self):
1084 super(MemoryCache, self).__init__()
1085 # Let's not assume dict is thread safe.
1086 self._lock = threading.Lock()
1087 self._contents = {}
1088
1089 def cached_set(self):
1090 with self._lock:
1091 return set(self._contents)
1092
1093 def touch(self, digest, size):
1094 with self._lock:
1095 return digest in self._contents
1096
1097 def evict(self, digest):
1098 with self._lock:
1099 self._contents.pop(digest, None)
1100
1101 def read(self, digest):
1102 with self._lock:
1103 return self._contents[digest]
1104
1105 def write(self, digest, content):
1106 # Assemble whole stream before taking the lock.
1107 data = ''.join(content)
1108 with self._lock:
1109 self._contents[digest] = data
1110
1111 def link(self, digest, dest, file_mode=None):
1112 file_write(dest, [self.read(digest)])
1113 if file_mode is not None:
1114 os.chmod(dest, file_mode)
1115
1116
1117 def get_hash_algo(_namespace):
1118 """Return hash algorithm class to use when uploading to given |namespace|."""
1119 # TODO(vadimsh): Implement this at some point.
1120 return hashlib.sha1
1121
1122
1123 def is_namespace_with_compression(namespace):
1124 """Returns True if given |namespace| stores compressed objects."""
1125 return namespace.endswith(('-gzip', '-deflate'))
1126
1127
1128 def get_storage_api(file_or_url, namespace):
1129 """Returns an object that implements StorageApi interface."""
1130 if re.match(r'^https?://.+$', file_or_url):
1131 return IsolateServer(file_or_url, namespace)
1132 else:
1133 return FileSystem(file_or_url)
1134
1135
1136 def get_storage(file_or_url, namespace):
1137 """Returns Storage class configured with appropriate StorageApi instance."""
1138 return Storage(
1139 get_storage_api(file_or_url, namespace),
1140 is_namespace_with_compression(namespace))
1141
1142
1143 def upload_tree(base_url, indir, infiles, namespace):
1144 """Uploads the given tree to the given url.
1145
1146 Arguments:
1147 base_url: The base url, it is assume that |base_url|/has/ can be used to
1148 query if an element was already uploaded, and |base_url|/store/
1149 can be used to upload a new element.
1150 indir: Root directory the infiles are based in.
1151 infiles: dict of files to upload from |indir| to |base_url|.
1152 namespace: The namespace to use on the server.
1153 """
1154 with get_storage(base_url, namespace) as storage:
1155 storage.upload_tree(indir, infiles)
1156 return 0
1157
1158
1159 def load_isolated(content, os_flavor, algo):
1160 """Verifies the .isolated file is valid and loads this object with the json
1161 data.
1162
1163 Arguments:
1164 - content: raw serialized content to load.
1165 - os_flavor: OS to load this file on. Optional.
1166 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1167 algorithm used on the Isolate Server.
1168 """
1169 try:
1170 data = json.loads(content)
1171 except ValueError:
1172 raise ConfigError('Failed to parse: %s...' % content[:100])
1173
1174 if not isinstance(data, dict):
1175 raise ConfigError('Expected dict, got %r' % data)
1176
1177 # Check 'version' first, since it could modify the parsing after.
1178 value = data.get('version', '1.0')
1179 if not isinstance(value, basestring):
1180 raise ConfigError('Expected string, got %r' % value)
1181 if not re.match(r'^(\d+)\.(\d+)$', value):
1182 raise ConfigError('Expected a compatible version, got %r' % value)
1183 if value.split('.', 1)[0] != '1':
1184 raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
1185
1186 if algo is None:
1187 # Default the algorithm used in the .isolated file itself, falls back to
1188 # 'sha-1' if unspecified.
1189 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1190
1191 for key, value in data.iteritems():
1192 if key == 'algo':
1193 if not isinstance(value, basestring):
1194 raise ConfigError('Expected string, got %r' % value)
1195 if value not in SUPPORTED_ALGOS:
1196 raise ConfigError(
1197 'Expected one of \'%s\', got %r' %
1198 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1199 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1200 raise ConfigError(
1201 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1202
1203 elif key == 'command':
1204 if not isinstance(value, list):
1205 raise ConfigError('Expected list, got %r' % value)
1206 if not value:
1207 raise ConfigError('Expected non-empty command')
1208 for subvalue in value:
1209 if not isinstance(subvalue, basestring):
1210 raise ConfigError('Expected string, got %r' % subvalue)
1211
1212 elif key == 'files':
1213 if not isinstance(value, dict):
1214 raise ConfigError('Expected dict, got %r' % value)
1215 for subkey, subvalue in value.iteritems():
1216 if not isinstance(subkey, basestring):
1217 raise ConfigError('Expected string, got %r' % subkey)
1218 if not isinstance(subvalue, dict):
1219 raise ConfigError('Expected dict, got %r' % subvalue)
1220 for subsubkey, subsubvalue in subvalue.iteritems():
1221 if subsubkey == 'l':
1222 if not isinstance(subsubvalue, basestring):
1223 raise ConfigError('Expected string, got %r' % subsubvalue)
1224 elif subsubkey == 'm':
1225 if not isinstance(subsubvalue, int):
1226 raise ConfigError('Expected int, got %r' % subsubvalue)
1227 elif subsubkey == 'h':
1228 if not is_valid_hash(subsubvalue, algo):
1229 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1230 elif subsubkey == 's':
1231 if not isinstance(subsubvalue, int):
1232 raise ConfigError('Expected int, got %r' % subsubvalue)
1233 else:
1234 raise ConfigError('Unknown subsubkey %s' % subsubkey)
1235 if bool('h' in subvalue) == bool('l' in subvalue):
1236 raise ConfigError(
1237 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1238 subvalue)
1239 if bool('h' in subvalue) != bool('s' in subvalue):
1240 raise ConfigError(
1241 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1242 subvalue)
1243 if bool('s' in subvalue) == bool('l' in subvalue):
1244 raise ConfigError(
1245 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1246 subvalue)
1247 if bool('l' in subvalue) and bool('m' in subvalue):
1248 raise ConfigError(
1249 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
1250 subvalue)
1251
1252 elif key == 'includes':
1253 if not isinstance(value, list):
1254 raise ConfigError('Expected list, got %r' % value)
1255 if not value:
1256 raise ConfigError('Expected non-empty includes list')
1257 for subvalue in value:
1258 if not is_valid_hash(subvalue, algo):
1259 raise ConfigError('Expected sha-1, got %r' % subvalue)
1260
1261 elif key == 'read_only':
1262 if not isinstance(value, bool):
1263 raise ConfigError('Expected bool, got %r' % value)
1264
1265 elif key == 'relative_cwd':
1266 if not isinstance(value, basestring):
1267 raise ConfigError('Expected string, got %r' % value)
1268
1269 elif key == 'os':
1270 if os_flavor and value != os_flavor:
1271 raise ConfigError(
1272 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1273 (os_flavor, value))
1274
1275 elif key == 'version':
1276 # Already checked above.
1277 pass
1278
1279 else:
1280 raise ConfigError('Unknown key %r' % key)
1281
1282 # Automatically fix os.path.sep if necessary. While .isolated files are always
1283 # in the the native path format, someone could want to download an .isolated
1284 # tree from another OS.
1285 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1286 if 'files' in data:
1287 data['files'] = dict(
1288 (k.replace(wrong_path_sep, os.path.sep), v)
1289 for k, v in data['files'].iteritems())
1290 for v in data['files'].itervalues():
1291 if 'l' in v:
1292 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1293 if 'relative_cwd' in data:
1294 data['relative_cwd'] = data['relative_cwd'].replace(
1295 wrong_path_sep, os.path.sep)
1296 return data
1297
1298
1299 class IsolatedFile(object):
1300 """Represents a single parsed .isolated file."""
1301 def __init__(self, obj_hash, algo):
1302 """|obj_hash| is really the sha-1 of the file."""
1303 logging.debug('IsolatedFile(%s)' % obj_hash)
1304 self.obj_hash = obj_hash
1305 self.algo = algo
1306 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1307 # .isolate and all the .isolated files recursively included by it with
1308 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1309 # .isolated file in the hash table, is important, as the later ones are not
1310 # processed until the firsts are retrieved and read.
1311 self.can_fetch = False
1312
1313 # Raw data.
1314 self.data = {}
1315 # A IsolatedFile instance, one per object in self.includes.
1316 self.children = []
1317
1318 # Set once the .isolated file is loaded.
1319 self._is_parsed = False
1320 # Set once the files are fetched.
1321 self.files_fetched = False
1322
1323 def load(self, os_flavor, content):
1324 """Verifies the .isolated file is valid and loads this object with the json
1325 data.
1326 """
1327 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1328 assert not self._is_parsed
1329 self.data = load_isolated(content, os_flavor, self.algo)
1330 self.children = [
1331 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1332 ]
1333 self._is_parsed = True
1334
1335 def fetch_files(self, fetch_queue, files):
1336 """Adds files in this .isolated file not present in |files| dictionary.
1337
1338 Preemptively request files.
1339
1340 Note that |files| is modified by this function.
1341 """
1342 assert self.can_fetch
1343 if not self._is_parsed or self.files_fetched:
1344 return
1345 logging.debug('fetch_files(%s)' % self.obj_hash)
1346 for filepath, properties in self.data.get('files', {}).iteritems():
1347 # Root isolated has priority on the files being mapped. In particular,
1348 # overriden files must not be fetched.
1349 if filepath not in files:
1350 files[filepath] = properties
1351 if 'h' in properties:
1352 # Preemptively request files.
1353 logging.debug('fetching %s' % filepath)
1354 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
1355 self.files_fetched = True
1356
1357
1358 class Settings(object):
1359 """Results of a completely parsed .isolated file."""
1360 def __init__(self):
1361 self.command = []
1362 self.files = {}
1363 self.read_only = None
1364 self.relative_cwd = None
1365 # The main .isolated file, a IsolatedFile instance.
1366 self.root = None
1367
1368 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
1369 """Loads the .isolated and all the included .isolated asynchronously.
1370
1371 It enables support for "included" .isolated files. They are processed in
1372 strict order but fetched asynchronously from the cache. This is important so
1373 that a file in an included .isolated file that is overridden by an embedding
1374 .isolated file is not fetched needlessly. The includes are fetched in one
1375 pass and the files are fetched as soon as all the ones on the left-side
1376 of the tree were fetched.
1377
1378 The prioritization is very important here for nested .isolated files.
1379 'includes' have the highest priority and the algorithm is optimized for both
1380 deep and wide trees. A deep one is a long link of .isolated files referenced
1381 one at a time by one item in 'includes'. A wide one has a large number of
1382 'includes' in a single .isolated file. 'left' is defined as an included
1383 .isolated file earlier in the 'includes' list. So the order of the elements
1384 in 'includes' is important.
1385 """
1386 self.root = IsolatedFile(root_isolated_hash, algo)
1387
1388 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1389 pending = {}
1390 # Set of hashes of already retrieved items to refuse recursive includes.
1391 seen = set()
1392
1393 def retrieve(isolated_file):
1394 h = isolated_file.obj_hash
1395 if h in seen:
1396 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1397 assert h not in pending
1398 seen.add(h)
1399 pending[h] = isolated_file
1400 fetch_queue.add(WorkerPool.HIGH, h)
1401
1402 retrieve(self.root)
1403
1404 while pending:
1405 item_hash = fetch_queue.wait(pending)
1406 item = pending.pop(item_hash)
1407 item.load(os_flavor, fetch_queue.cache.read(item_hash))
1408 if item_hash == root_isolated_hash:
1409 # It's the root item.
1410 item.can_fetch = True
1411
1412 for new_child in item.children:
1413 retrieve(new_child)
1414
1415 # Traverse the whole tree to see if files can now be fetched.
1416 self._traverse_tree(fetch_queue, self.root)
1417
1418 def check(n):
1419 return all(check(x) for x in n.children) and n.files_fetched
1420 assert check(self.root)
1421
1422 self.relative_cwd = self.relative_cwd or ''
1423 self.read_only = self.read_only or False
1424
1425 def _traverse_tree(self, fetch_queue, node):
1426 if node.can_fetch:
1427 if not node.files_fetched:
1428 self._update_self(fetch_queue, node)
1429 will_break = False
1430 for i in node.children:
1431 if not i.can_fetch:
1432 if will_break:
1433 break
1434 # Automatically mark the first one as fetcheable.
1435 i.can_fetch = True
1436 will_break = True
1437 self._traverse_tree(fetch_queue, i)
1438
1439 def _update_self(self, fetch_queue, node):
1440 node.fetch_files(fetch_queue, self.files)
1441 # Grabs properties.
1442 if not self.command and node.data.get('command'):
1443 # Ensure paths are correctly separated on windows.
1444 self.command = node.data['command']
1445 if self.command:
1446 self.command[0] = self.command[0].replace('/', os.path.sep)
1447 self.command = tools.fix_python_path(self.command)
1448 if self.read_only is None and node.data.get('read_only') is not None:
1449 self.read_only = node.data['read_only']
1450 if (self.relative_cwd is None and
1451 node.data.get('relative_cwd') is not None):
1452 self.relative_cwd = node.data['relative_cwd']
1453
1454
1455 def fetch_isolated(
1456 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
1457 """Aggressively downloads the .isolated file(s), then download all the files.
1458
1459 Arguments:
1460 isolated_hash: hash of the root *.isolated file.
1461 storage: Storage class that communicates with isolate storage.
1462 cache: LocalCache class that knows how to store and map files locally.
1463 algo: hash algorithm to use.
1464 outdir: Output directory to map file tree to.
1465 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1466 require_command: Ensure *.isolated specifies a command to run.
1467
1468 Returns:
1469 Settings object that holds details about loaded *.isolated file.
1470 """
1471 with cache:
1472 fetch_queue = FetchQueue(storage, cache)
1473 settings = Settings()
1474
1475 with tools.Profiler('GetIsolateds'):
1476 # Optionally support local files by manually adding them to cache.
1477 if not is_valid_hash(isolated_hash, algo):
1478 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1479
1480 # Load all *.isolated and start loading rest of the files.
1481 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
1482 if require_command and not settings.command:
1483 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1484 # easy way to cancel them.
1485 raise ConfigError('No command to run')
1486
1487 with tools.Profiler('GetRest'):
1488 # Create file system hierarchy.
1489 if not os.path.isdir(outdir):
1490 os.makedirs(outdir)
1491 create_directories(outdir, settings.files)
1492 create_links(outdir, settings.files.iteritems())
1493
1494 # Ensure working directory exists.
1495 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1496 if not os.path.isdir(cwd):
1497 os.makedirs(cwd)
1498
1499 # Multimap: digest -> list of pairs (path, props).
1500 remaining = {}
1501 for filepath, props in settings.files.iteritems():
1502 if 'h' in props:
1503 remaining.setdefault(props['h'], []).append((filepath, props))
1504
1505 # Now block on the remaining files to be downloaded and mapped.
1506 logging.info('Retrieving remaining files (%d of them)...',
1507 fetch_queue.pending_count)
1508 last_update = time.time()
1509 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1510 while remaining:
1511 detector.ping()
1512
1513 # Wait for any item to finish fetching to cache.
1514 digest = fetch_queue.wait(remaining)
1515
1516 # Link corresponding files to a fetched item in cache.
1517 for filepath, props in remaining.pop(digest):
1518 cache.link(digest, os.path.join(outdir, filepath), props.get('m'))
1519
1520 # Report progress.
1521 duration = time.time() - last_update
1522 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1523 msg = '%d files remaining...' % len(remaining)
1524 print msg
1525 logging.info(msg)
1526 last_update = time.time()
1527
1528 # Cache could evict some items we just tried to fetch, it's a fatal error.
1529 if not fetch_queue.verify_all_cached():
1530 raise MappingError('Cache is too small to hold all requested files')
1531 return settings
1532
1533
1534 @subcommand.usage('<file1..fileN> or - to read from stdin')
1535 def CMDarchive(parser, args):
1536 """Archives data to the server."""
1537 options, files = parser.parse_args(args)
1538
1539 if files == ['-']:
1540 files = sys.stdin.readlines()
1541
1542 if not files:
1543 parser.error('Nothing to upload')
1544
1545 # Load the necessary metadata.
1546 # TODO(maruel): Use a worker pool to upload as the hashing is being done.
1547 infiles = dict(
1548 (
1549 f,
1550 {
1551 's': os.stat(f).st_size,
1552 'h': hash_file(f, get_hash_algo(options.namespace)),
1553 }
1554 )
1555 for f in files)
1556
1557 with tools.Profiler('Archive'):
1558 ret = upload_tree(
1559 base_url=options.isolate_server,
1560 indir=os.getcwd(),
1561 infiles=infiles,
1562 namespace=options.namespace)
1563 if not ret:
1564 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
1565 return ret
1566
1567
1568 def CMDdownload(parser, args):
1569 """Download data from the server.
1570
1571 It can either download individual files or a complete tree from a .isolated
1572 file.
1573 """
1574 parser.add_option(
1575 '-i', '--isolated', metavar='HASH',
1576 help='hash of an isolated file, .isolated file content is discarded, use '
1577 '--file if you need it')
1578 parser.add_option(
1579 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1580 help='hash and destination of a file, can be used multiple times')
1581 parser.add_option(
1582 '-t', '--target', metavar='DIR', default=os.getcwd(),
1583 help='destination directory')
1584 options, args = parser.parse_args(args)
1585 if args:
1586 parser.error('Unsupported arguments: %s' % args)
1587 if bool(options.isolated) == bool(options.file):
1588 parser.error('Use one of --isolated or --file, and only one.')
1589
1590 options.target = os.path.abspath(options.target)
1591 storage = get_storage(options.isolate_server, options.namespace)
1592 cache = MemoryCache()
1593 algo = get_hash_algo(options.namespace)
1594
1595 # Fetching individual files.
1596 if options.file:
1597 channel = threading_utils.TaskChannel()
1598 pending = {}
1599 for digest, dest in options.file:
1600 pending[digest] = dest
1601 storage.async_fetch(
1602 channel,
1603 WorkerPool.MED,
1604 digest,
1605 UNKNOWN_FILE_SIZE,
1606 functools.partial(file_write, os.path.join(options.target, dest)))
1607 while pending:
1608 fetched = channel.pull()
1609 dest = pending.pop(fetched)
1610 logging.info('%s: %s', fetched, dest)
1611
1612 # Fetching whole isolated tree.
1613 if options.isolated:
1614 settings = fetch_isolated(
1615 isolated_hash=options.isolated,
1616 storage=storage,
1617 cache=cache,
1618 algo=algo,
1619 outdir=options.target,
1620 os_flavor=None,
1621 require_command=False)
1622 rel = os.path.join(options.target, settings.relative_cwd)
1623 print('To run this test please run from the directory %s:' %
1624 os.path.join(options.target, rel))
1625 print(' ' + ' '.join(settings.command))
1626
1627 return 0
1628
1629
1630 class OptionParserIsolateServer(tools.OptionParserWithLogging):
1631 def __init__(self, **kwargs):
1632 tools.OptionParserWithLogging.__init__(self, **kwargs)
1633 self.add_option(
1634 '-I', '--isolate-server',
1635 metavar='URL', default='',
1636 help='Isolate server to use')
1637 self.add_option(
1638 '--namespace', default='default-gzip',
1639 help='The namespace to use on the server, default: %default')
1640
1641 def parse_args(self, *args, **kwargs):
1642 options, args = tools.OptionParserWithLogging.parse_args(
1643 self, *args, **kwargs)
1644 options.isolate_server = options.isolate_server.rstrip('/')
1645 if not options.isolate_server:
1646 self.error('--isolate-server is required.')
1647 return options, args
1648
1649
1650 def main(args):
1651 dispatcher = subcommand.CommandDispatcher(__name__)
1652 try:
1653 return dispatcher.execute(
1654 OptionParserIsolateServer(version=__version__), args)
1655 except Exception as e:
1656 tools.report_error(e)
1657 return 1
1658
1659
1660 if __name__ == '__main__':
1661 fix_encoding.fix_encoding()
1662 tools.disable_buffering()
1663 colorama.init()
1664 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « swarm_client/isolate_merge.py ('k') | swarm_client/run_isolated.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698