OLD | NEW |
| (Empty) |
1 #!/usr/bin/env python | |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 """Reads a manifest, creates a tree of hardlinks and runs the test. | |
7 | |
8 Keeps a local cache. | |
9 """ | |
10 | |
11 import ctypes | |
12 import hashlib | |
13 import json | |
14 import logging | |
15 import optparse | |
16 import os | |
17 import Queue | |
18 import re | |
19 import shutil | |
20 import stat | |
21 import subprocess | |
22 import sys | |
23 import tempfile | |
24 import threading | |
25 import time | |
26 import urllib | |
27 | |
28 | |
29 # Types of action accepted by recreate_tree(). | |
30 HARDLINK, SYMLINK, COPY = range(1, 4) | |
31 | |
32 RE_IS_SHA1 = re.compile(r'^[a-fA-F0-9]{40}$') | |
33 | |
34 | |
35 class ConfigError(ValueError): | |
36 """Generic failure to load a manifest.""" | |
37 pass | |
38 | |
39 | |
40 class MappingError(OSError): | |
41 """Failed to recreate the tree.""" | |
42 pass | |
43 | |
44 | |
45 def get_flavor(): | |
46 """Returns the system default flavor. Copied from gyp/pylib/gyp/common.py.""" | |
47 flavors = { | |
48 'cygwin': 'win', | |
49 'win32': 'win', | |
50 'darwin': 'mac', | |
51 'sunos5': 'solaris', | |
52 'freebsd7': 'freebsd', | |
53 'freebsd8': 'freebsd', | |
54 } | |
55 return flavors.get(sys.platform, 'linux') | |
56 | |
57 | |
58 def os_link(source, link_name): | |
59 """Add support for os.link() on Windows.""" | |
60 if sys.platform == 'win32': | |
61 if not ctypes.windll.kernel32.CreateHardLinkW( | |
62 unicode(link_name), unicode(source), 0): | |
63 raise OSError() | |
64 else: | |
65 os.link(source, link_name) | |
66 | |
67 | |
68 def readable_copy(outfile, infile): | |
69 """Makes a copy of the file that is readable by everyone.""" | |
70 shutil.copy(infile, outfile) | |
71 read_enabled_mode = (os.stat(outfile).st_mode | stat.S_IRUSR | | |
72 stat.S_IRGRP | stat.S_IROTH) | |
73 os.chmod(outfile, read_enabled_mode) | |
74 | |
75 | |
76 def link_file(outfile, infile, action): | |
77 """Links a file. The type of link depends on |action|.""" | |
78 logging.debug('Mapping %s to %s' % (infile, outfile)) | |
79 if action not in (HARDLINK, SYMLINK, COPY): | |
80 raise ValueError('Unknown mapping action %s' % action) | |
81 if not os.path.isfile(infile): | |
82 raise MappingError('%s is missing' % infile) | |
83 if os.path.isfile(outfile): | |
84 raise MappingError( | |
85 '%s already exist; insize:%d; outsize:%d' % | |
86 (outfile, os.stat(infile).st_size, os.stat(outfile).st_size)) | |
87 | |
88 if action == COPY: | |
89 readable_copy(outfile, infile) | |
90 elif action == SYMLINK and sys.platform != 'win32': | |
91 # On windows, symlink are converted to hardlink and fails over to copy. | |
92 os.symlink(infile, outfile) | |
93 else: | |
94 try: | |
95 os_link(infile, outfile) | |
96 except OSError: | |
97 # Probably a different file system. | |
98 logging.warn( | |
99 'Failed to hardlink, failing back to copy %s to %s' % ( | |
100 infile, outfile)) | |
101 readable_copy(outfile, infile) | |
102 | |
103 | |
104 def _set_write_bit(path, read_only): | |
105 """Sets or resets the executable bit on a file or directory.""" | |
106 mode = os.lstat(path).st_mode | |
107 if read_only: | |
108 mode = mode & 0500 | |
109 else: | |
110 mode = mode | 0200 | |
111 if hasattr(os, 'lchmod'): | |
112 os.lchmod(path, mode) # pylint: disable=E1101 | |
113 else: | |
114 if stat.S_ISLNK(mode): | |
115 # Skip symlink without lchmod() support. | |
116 logging.debug('Can\'t change +w bit on symlink %s' % path) | |
117 return | |
118 | |
119 # TODO(maruel): Implement proper DACL modification on Windows. | |
120 os.chmod(path, mode) | |
121 | |
122 | |
123 def make_writable(root, read_only): | |
124 """Toggle the writable bit on a directory tree.""" | |
125 root = os.path.abspath(root) | |
126 for dirpath, dirnames, filenames in os.walk(root, topdown=True): | |
127 for filename in filenames: | |
128 _set_write_bit(os.path.join(dirpath, filename), read_only) | |
129 | |
130 for dirname in dirnames: | |
131 _set_write_bit(os.path.join(dirpath, dirname), read_only) | |
132 | |
133 | |
134 def rmtree(root): | |
135 """Wrapper around shutil.rmtree() to retry automatically on Windows.""" | |
136 make_writable(root, False) | |
137 if sys.platform == 'win32': | |
138 for i in range(3): | |
139 try: | |
140 shutil.rmtree(root) | |
141 break | |
142 except WindowsError: # pylint: disable=E0602 | |
143 delay = (i+1)*2 | |
144 print >> sys.stderr, ( | |
145 'The test has subprocess outliving it. Sleep %d seconds.' % delay) | |
146 time.sleep(delay) | |
147 else: | |
148 shutil.rmtree(root) | |
149 | |
150 | |
151 def is_same_filesystem(path1, path2): | |
152 """Returns True if both paths are on the same filesystem. | |
153 | |
154 This is required to enable the use of hardlinks. | |
155 """ | |
156 assert os.path.isabs(path1), path1 | |
157 assert os.path.isabs(path2), path2 | |
158 if sys.platform == 'win32': | |
159 # If the drive letter mismatches, assume it's a separate partition. | |
160 # TODO(maruel): It should look at the underlying drive, a drive letter could | |
161 # be a mount point to a directory on another drive. | |
162 assert re.match(r'^[a-zA-Z]\:\\.*', path1), path1 | |
163 assert re.match(r'^[a-zA-Z]\:\\.*', path2), path2 | |
164 if path1[0].lower() != path2[0].lower(): | |
165 return False | |
166 return os.stat(path1).st_dev == os.stat(path2).st_dev | |
167 | |
168 | |
169 def get_free_space(path): | |
170 """Returns the number of free bytes.""" | |
171 if sys.platform == 'win32': | |
172 free_bytes = ctypes.c_ulonglong(0) | |
173 ctypes.windll.kernel32.GetDiskFreeSpaceExW( | |
174 ctypes.c_wchar_p(path), None, None, ctypes.pointer(free_bytes)) | |
175 return free_bytes.value | |
176 f = os.statvfs(path) | |
177 return f.f_bfree * f.f_frsize | |
178 | |
179 | |
180 def make_temp_dir(prefix, root_dir): | |
181 """Returns a temporary directory on the same file system as root_dir.""" | |
182 base_temp_dir = None | |
183 if not is_same_filesystem(root_dir, tempfile.gettempdir()): | |
184 base_temp_dir = os.path.dirname(root_dir) | |
185 return tempfile.mkdtemp(prefix=prefix, dir=base_temp_dir) | |
186 | |
187 | |
188 def load_manifest(content): | |
189 """Verifies the manifest is valid and loads this object with the json data. | |
190 """ | |
191 try: | |
192 data = json.loads(content) | |
193 except ValueError: | |
194 raise ConfigError('Failed to parse: %s...' % content[:100]) | |
195 | |
196 if not isinstance(data, dict): | |
197 raise ConfigError('Expected dict, got %r' % data) | |
198 | |
199 for key, value in data.iteritems(): | |
200 if key == 'command': | |
201 if not isinstance(value, list): | |
202 raise ConfigError('Expected list, got %r' % value) | |
203 for subvalue in value: | |
204 if not isinstance(subvalue, basestring): | |
205 raise ConfigError('Expected string, got %r' % subvalue) | |
206 | |
207 elif key == 'files': | |
208 if not isinstance(value, dict): | |
209 raise ConfigError('Expected dict, got %r' % value) | |
210 for subkey, subvalue in value.iteritems(): | |
211 if not isinstance(subkey, basestring): | |
212 raise ConfigError('Expected string, got %r' % subkey) | |
213 if not isinstance(subvalue, dict): | |
214 raise ConfigError('Expected dict, got %r' % subvalue) | |
215 for subsubkey, subsubvalue in subvalue.iteritems(): | |
216 if subsubkey == 'link': | |
217 if not isinstance(subsubvalue, basestring): | |
218 raise ConfigError('Expected string, got %r' % subsubvalue) | |
219 elif subsubkey == 'mode': | |
220 if not isinstance(subsubvalue, int): | |
221 raise ConfigError('Expected int, got %r' % subsubvalue) | |
222 elif subsubkey == 'sha-1': | |
223 if not RE_IS_SHA1.match(subsubvalue): | |
224 raise ConfigError('Expected sha-1, got %r' % subsubvalue) | |
225 elif subsubkey == 'size': | |
226 if not isinstance(subsubvalue, int): | |
227 raise ConfigError('Expected int, got %r' % subsubvalue) | |
228 elif subsubkey == 'timestamp': | |
229 if not isinstance(subsubvalue, int): | |
230 raise ConfigError('Expected int, got %r' % subsubvalue) | |
231 elif subsubkey == 'touched_only': | |
232 if not isinstance(subsubvalue, bool): | |
233 raise ConfigError('Expected bool, got %r' % subsubvalue) | |
234 else: | |
235 raise ConfigError('Unknown subsubkey %s' % subsubkey) | |
236 if bool('sha-1' in subvalue) and bool('link' in subvalue): | |
237 raise ConfigError( | |
238 'Did not expect both \'sha-1\' and \'link\', got: %r' % subvalue) | |
239 | |
240 elif key == 'includes': | |
241 if not isinstance(value, list): | |
242 raise ConfigError('Expected list, got %r' % value) | |
243 for subvalue in value: | |
244 if not RE_IS_SHA1.match(subvalue): | |
245 raise ConfigError('Expected sha-1, got %r' % subvalue) | |
246 | |
247 elif key == 'read_only': | |
248 if not isinstance(value, bool): | |
249 raise ConfigError('Expected bool, got %r' % value) | |
250 | |
251 elif key == 'relative_cwd': | |
252 if not isinstance(value, basestring): | |
253 raise ConfigError('Expected string, got %r' % value) | |
254 | |
255 elif key == 'os': | |
256 if value != get_flavor(): | |
257 raise ConfigError( | |
258 'Expected \'os\' to be \'%s\' but got \'%s\'' % | |
259 (get_flavor(), value)) | |
260 | |
261 else: | |
262 raise ConfigError('Unknown key %s' % key) | |
263 | |
264 return data | |
265 | |
266 | |
267 def fix_python_path(cmd): | |
268 """Returns the fixed command line to call the right python executable.""" | |
269 out = cmd[:] | |
270 if out[0] == 'python': | |
271 out[0] = sys.executable | |
272 elif out[0].endswith('.py'): | |
273 out.insert(0, sys.executable) | |
274 return out | |
275 | |
276 | |
277 class Profiler(object): | |
278 def __init__(self, name): | |
279 self.name = name | |
280 self.start_time = None | |
281 | |
282 def __enter__(self): | |
283 self.start_time = time.time() | |
284 return self | |
285 | |
286 def __exit__(self, _exc_type, _exec_value, _traceback): | |
287 time_taken = time.time() - self.start_time | |
288 logging.info('Profiling: Section %s took %3.3f seconds', | |
289 self.name, time_taken) | |
290 | |
291 | |
292 class Remote(object): | |
293 """Priority based worker queue to fetch or upload files from a | |
294 content-address server. Any function may be given as the fetcher/upload, | |
295 as long as it takes two inputs (the item contents, and their relative | |
296 destination). | |
297 | |
298 Supports local file system, CIFS or http remotes. | |
299 | |
300 When the priority of items is equals, works in strict FIFO mode. | |
301 """ | |
302 # Initial and maximum number of worker threads. | |
303 INITIAL_WORKERS = 2 | |
304 MAX_WORKERS = 16 | |
305 # Priorities. | |
306 LOW, MED, HIGH = (1<<8, 2<<8, 3<<8) | |
307 INTERNAL_PRIORITY_BITS = (1<<8) - 1 | |
308 RETRIES = 5 | |
309 | |
310 def __init__(self, destination_root): | |
311 # Function to fetch a remote object or upload to a remote location.. | |
312 self._do_item = self.get_file_handler(destination_root) | |
313 # Contains tuple(priority, index, obj, destination). | |
314 self._queue = Queue.PriorityQueue() | |
315 # Contains tuple(priority, index, obj). | |
316 self._done = Queue.PriorityQueue() | |
317 | |
318 # Contains generated exceptions that haven't been handled yet. | |
319 self._exceptions = Queue.Queue() | |
320 | |
321 # To keep FIFO ordering in self._queue. It is assumed xrange's iterator is | |
322 # thread-safe. | |
323 self._next_index = xrange(0, 1<<30).__iter__().next | |
324 | |
325 # Control access to the following member. | |
326 self._ready_lock = threading.Lock() | |
327 # Number of threads in wait state. | |
328 self._ready = 0 | |
329 | |
330 # Control access to the following member. | |
331 self._workers_lock = threading.Lock() | |
332 self._workers = [] | |
333 for _ in range(self.INITIAL_WORKERS): | |
334 self._add_worker() | |
335 | |
336 def join(self): | |
337 """Blocks until the queue is empty.""" | |
338 self._queue.join() | |
339 | |
340 def next_exception(self): | |
341 """Returns the next unhandled exception, or None if there is | |
342 no exception.""" | |
343 try: | |
344 return self._exceptions.get_nowait() | |
345 except Queue.Empty: | |
346 return None | |
347 | |
348 def add_item(self, priority, obj, dest): | |
349 """Retrieves an object from the remote data store. | |
350 | |
351 The smaller |priority| gets fetched first. | |
352 | |
353 Thread-safe. | |
354 """ | |
355 assert (priority & self.INTERNAL_PRIORITY_BITS) == 0 | |
356 self._add_to_queue(priority, obj, dest) | |
357 | |
358 def get_result(self): | |
359 """Returns the next file that was successfully fetched.""" | |
360 r = self._done.get() | |
361 if r[0] == -1: | |
362 # It's an exception. | |
363 raise r[2][0], r[2][1], r[2][2] | |
364 return r[2] | |
365 | |
366 def _add_to_queue(self, priority, obj, dest): | |
367 with self._ready_lock: | |
368 start_new_worker = not self._ready | |
369 self._queue.put((priority, self._next_index(), obj, dest)) | |
370 if start_new_worker: | |
371 self._add_worker() | |
372 | |
373 def _add_worker(self): | |
374 """Add one worker thread if there isn't too many. Thread-safe.""" | |
375 with self._workers_lock: | |
376 if len(self._workers) >= self.MAX_WORKERS: | |
377 return False | |
378 worker = threading.Thread(target=self._run) | |
379 self._workers.append(worker) | |
380 worker.daemon = True | |
381 worker.start() | |
382 | |
383 def _step_done(self, result): | |
384 """Worker helper function""" | |
385 self._done.put(result) | |
386 self._queue.task_done() | |
387 if result[0] == -1: | |
388 self._exceptions.put(sys.exc_info()) | |
389 | |
390 def _run(self): | |
391 """Worker thread loop.""" | |
392 while True: | |
393 try: | |
394 with self._ready_lock: | |
395 self._ready += 1 | |
396 item = self._queue.get() | |
397 finally: | |
398 with self._ready_lock: | |
399 self._ready -= 1 | |
400 if not item: | |
401 return | |
402 priority, index, obj, dest = item | |
403 try: | |
404 self._do_item(obj, dest) | |
405 except IOError: | |
406 # Retry a few times, lowering the priority. | |
407 if (priority & self.INTERNAL_PRIORITY_BITS) < self.RETRIES: | |
408 self._add_to_queue(priority + 1, obj, dest) | |
409 self._queue.task_done() | |
410 continue | |
411 # Transfers the exception back. It has maximum priority. | |
412 self._step_done((-1, 0, sys.exc_info())) | |
413 except: | |
414 # Transfers the exception back. It has maximum priority. | |
415 self._step_done((-1, 0, sys.exc_info())) | |
416 else: | |
417 self._step_done((priority, index, obj)) | |
418 | |
419 @staticmethod | |
420 def get_file_handler(file_or_url): | |
421 """Returns a object to retrieve objects from a remote.""" | |
422 if re.match(r'^https?://.+$', file_or_url): | |
423 file_or_url = file_or_url.rstrip('/') + '/' | |
424 def download_file(item, dest): | |
425 # TODO(maruel): Reuse HTTP connections. The stdlib doesn't make this | |
426 # easy. | |
427 source = file_or_url + item | |
428 logging.debug('download_file(%s, %s)', source, dest) | |
429 urllib.urlretrieve(source, dest) | |
430 return download_file | |
431 | |
432 def copy_file(item, dest): | |
433 source = os.path.join(file_or_url, item) | |
434 logging.debug('copy_file(%s, %s)', source, dest) | |
435 shutil.copy(source, dest) | |
436 return copy_file | |
437 | |
438 | |
439 class CachePolicies(object): | |
440 def __init__(self, max_cache_size, min_free_space, max_items): | |
441 """ | |
442 Arguments: | |
443 - max_cache_size: Trim if the cache gets larger than this value. If 0, the | |
444 cache is effectively a leak. | |
445 - min_free_space: Trim if disk free space becomes lower than this value. If | |
446 0, it unconditionally fill the disk. | |
447 - max_items: Maximum number of items to keep in the cache. If 0, do not | |
448 enforce a limit. | |
449 """ | |
450 self.max_cache_size = max_cache_size | |
451 self.min_free_space = min_free_space | |
452 self.max_items = max_items | |
453 | |
454 | |
455 class Cache(object): | |
456 """Stateful LRU cache. | |
457 | |
458 Saves its state as json file. | |
459 """ | |
460 STATE_FILE = 'state.json' | |
461 | |
462 def __init__(self, cache_dir, remote, policies): | |
463 """ | |
464 Arguments: | |
465 - cache_dir: Directory where to place the cache. | |
466 - remote: Remote where to fetch items from. | |
467 - policies: cache retention policies. | |
468 """ | |
469 self.cache_dir = cache_dir | |
470 self.remote = remote | |
471 self.policies = policies | |
472 self.state_file = os.path.join(cache_dir, self.STATE_FILE) | |
473 # The tuple(file, size) are kept as an array in a LRU style. E.g. | |
474 # self.state[0] is the oldest item. | |
475 self.state = [] | |
476 # A lookup map to speed up searching. | |
477 self._lookup = {} | |
478 self._dirty = False | |
479 | |
480 # Items currently being fetched. Keep it local to reduce lock contention. | |
481 self._pending_queue = set() | |
482 | |
483 # Profiling values. | |
484 self._added = [] | |
485 self._removed = [] | |
486 self._free_disk = 0 | |
487 | |
488 if not os.path.isdir(self.cache_dir): | |
489 os.makedirs(self.cache_dir) | |
490 if os.path.isfile(self.state_file): | |
491 try: | |
492 self.state = json.load(open(self.state_file, 'r')) | |
493 except (IOError, ValueError), e: | |
494 # Too bad. The file will be overwritten and the cache cleared. | |
495 logging.error( | |
496 'Broken state file %s, ignoring.\n%s' % (self.STATE_FILE, e)) | |
497 if (not isinstance(self.state, list) or | |
498 not all( | |
499 isinstance(i, (list, tuple)) and len(i) == 2 for i in self.state)): | |
500 # Discard. | |
501 self.state = [] | |
502 self._dirty = True | |
503 | |
504 # Ensure that all files listed in the state still exist and add new ones. | |
505 previous = set(filename for filename, _ in self.state) | |
506 if len(previous) != len(self.state): | |
507 logging.warn('Cache state is corrupted') | |
508 self._dirty = True | |
509 self.state = [] | |
510 else: | |
511 added = 0 | |
512 for filename in os.listdir(self.cache_dir): | |
513 if filename == self.STATE_FILE: | |
514 continue | |
515 if filename in previous: | |
516 previous.remove(filename) | |
517 continue | |
518 # An untracked file. | |
519 self._dirty = True | |
520 if not RE_IS_SHA1.match(filename): | |
521 logging.warn('Removing unknown file %s from cache', filename) | |
522 os.remove(self.path(filename)) | |
523 else: | |
524 # Insert as the oldest file. It will be deleted eventually if not | |
525 # accessed. | |
526 self._add(filename, False) | |
527 added += 1 | |
528 if added: | |
529 logging.warn('Added back %d unknown files', added) | |
530 self.state = [ | |
531 (filename, size) for filename, size in self.state | |
532 if filename not in previous | |
533 ] | |
534 self._update_lookup() | |
535 | |
536 with Profiler('SetupTrimming'): | |
537 self.trim() | |
538 | |
539 def __enter__(self): | |
540 return self | |
541 | |
542 def __exit__(self, _exc_type, _exec_value, _traceback): | |
543 with Profiler('CleanupTrimming'): | |
544 self.trim() | |
545 | |
546 logging.info( | |
547 '%4d (%7dkb) added', len(self._added), sum(self._added) / 1024) | |
548 logging.info( | |
549 '%4d (%7dkb) current', | |
550 len(self.state), | |
551 sum(i[1] for i in self.state) / 1024) | |
552 logging.info( | |
553 '%4d (%7dkb) removed', len(self._removed), sum(self._removed) / 1024) | |
554 logging.info('%7dkb free', self._free_disk / 1024) | |
555 | |
556 def remove_lru_file(self): | |
557 """Removes the last recently used file.""" | |
558 try: | |
559 filename, size = self.state.pop(0) | |
560 del self._lookup[filename] | |
561 self._removed.append(size) | |
562 os.remove(self.path(filename)) | |
563 self._dirty = True | |
564 except OSError as e: | |
565 logging.error('Error attempting to delete a file\n%s' % e) | |
566 | |
567 def trim(self): | |
568 """Trims anything we don't know, make sure enough free space exists.""" | |
569 # Ensure maximum cache size. | |
570 if self.policies.max_cache_size and self.state: | |
571 while sum(i[1] for i in self.state) > self.policies.max_cache_size: | |
572 self.remove_lru_file() | |
573 | |
574 # Ensure maximum number of items in the cache. | |
575 if self.policies.max_items and self.state: | |
576 while len(self.state) > self.policies.max_items: | |
577 self.remove_lru_file() | |
578 | |
579 # Ensure enough free space. | |
580 self._free_disk = get_free_space(self.cache_dir) | |
581 while ( | |
582 self.policies.min_free_space and | |
583 self.state and | |
584 self._free_disk < self.policies.min_free_space): | |
585 self.remove_lru_file() | |
586 self._free_disk = get_free_space(self.cache_dir) | |
587 | |
588 self.save() | |
589 | |
590 def retrieve(self, priority, item): | |
591 """Retrieves a file from the remote, if not already cached, and adds it to | |
592 the cache. | |
593 """ | |
594 assert not '/' in item | |
595 path = self.path(item) | |
596 index = self._lookup.get(item) | |
597 if index is None: | |
598 if item in self._pending_queue: | |
599 # Already pending. The same object could be referenced multiple times. | |
600 return | |
601 self.remote.add_item(priority, item, path) | |
602 self._pending_queue.add(item) | |
603 else: | |
604 if index != len(self.state) - 1: | |
605 # Was already in cache. Update it's LRU value by putting it at the end. | |
606 self.state.append(self.state.pop(index)) | |
607 self._dirty = True | |
608 self._update_lookup() | |
609 | |
610 def add(self, filepath, obj): | |
611 """Forcibly adds a file to the cache.""" | |
612 if not obj in self._lookup: | |
613 link_file(self.path(obj), filepath, HARDLINK) | |
614 self._add(obj, True) | |
615 | |
616 def path(self, item): | |
617 """Returns the path to one item.""" | |
618 return os.path.join(self.cache_dir, item) | |
619 | |
620 def save(self): | |
621 """Saves the LRU ordering.""" | |
622 json.dump(self.state, open(self.state_file, 'wb'), separators=(',',':')) | |
623 | |
624 def wait_for(self, items): | |
625 """Starts a loop that waits for at least one of |items| to be retrieved. | |
626 | |
627 Returns the first item retrieved. | |
628 """ | |
629 # Flush items already present. | |
630 for item in items: | |
631 if item in self._lookup: | |
632 return item | |
633 | |
634 assert all(i in self._pending_queue for i in items), ( | |
635 items, self._pending_queue) | |
636 # Note that: | |
637 # len(self._pending_queue) == | |
638 # ( len(self.remote._workers) - self.remote._ready + | |
639 # len(self._remote._queue) + len(self._remote.done)) | |
640 # There is no lock-free way to verify that. | |
641 while self._pending_queue: | |
642 item = self.remote.get_result() | |
643 self._pending_queue.remove(item) | |
644 self._add(item, True) | |
645 if item in items: | |
646 return item | |
647 | |
648 def _add(self, item, at_end): | |
649 """Adds an item in the internal state. | |
650 | |
651 If |at_end| is False, self._lookup becomes inconsistent and | |
652 self._update_lookup() must be called. | |
653 """ | |
654 size = os.stat(self.path(item)).st_size | |
655 self._added.append(size) | |
656 if at_end: | |
657 self.state.append((item, size)) | |
658 self._lookup[item] = len(self.state) - 1 | |
659 else: | |
660 self.state.insert(0, (item, size)) | |
661 self._dirty = True | |
662 | |
663 def _update_lookup(self): | |
664 self._lookup = dict( | |
665 (filename, index) for index, (filename, _) in enumerate(self.state)) | |
666 | |
667 | |
668 | |
669 class Manifest(object): | |
670 """Represents a single parsed manifest, e.g. a .results file.""" | |
671 def __init__(self, obj_hash): | |
672 """|obj_hash| is really the sha-1 of the file.""" | |
673 logging.debug('Manifest(%s)' % obj_hash) | |
674 self.obj_hash = obj_hash | |
675 # Set once all the left-side of the tree is parsed. 'Tree' here means the | |
676 # manifest and all the manifest recursively included by it with 'includes' | |
677 # key. The order of each manifest sha-1 in 'includes' is important, as the | |
678 # later ones are not processed until the firsts are retrieved and read. | |
679 self.can_fetch = False | |
680 | |
681 # Raw data. | |
682 self.data = {} | |
683 # A Manifest instance, one per object in self.includes. | |
684 self.children = [] | |
685 | |
686 # Set once the manifest is loaded. | |
687 self._manifest_parsed = False | |
688 # Set once the files are fetched. | |
689 self.files_fetched = False | |
690 | |
691 def load(self, content): | |
692 """Verifies the manifest is valid and loads this object with the json data. | |
693 """ | |
694 logging.debug('Manifest.load(%s)' % self.obj_hash) | |
695 assert not self._manifest_parsed | |
696 self.data = load_manifest(content) | |
697 self.children = [Manifest(i) for i in self.data.get('includes', [])] | |
698 self._manifest_parsed = True | |
699 | |
700 def fetch_files(self, cache, files): | |
701 """Adds files in this manifest not present in files dictionary. | |
702 | |
703 Preemptively request files. | |
704 | |
705 Note that |files| is modified by this function. | |
706 """ | |
707 assert self.can_fetch | |
708 if not self._manifest_parsed or self.files_fetched: | |
709 return | |
710 logging.debug('fetch_files(%s)' % self.obj_hash) | |
711 for filepath, properties in self.data.get('files', {}).iteritems(): | |
712 # Root manifest has priority on the files being mapped. In particular, | |
713 # overriden files must not be fetched. | |
714 if filepath not in files: | |
715 files[filepath] = properties | |
716 if 'sha-1' in properties: | |
717 # Preemptively request files. | |
718 logging.debug('fetching %s' % filepath) | |
719 cache.retrieve(Remote.MED, properties['sha-1']) | |
720 self.files_fetched = True | |
721 | |
722 | |
723 class Settings(object): | |
724 """Results of a completely parsed manifest.""" | |
725 def __init__(self): | |
726 self.command = [] | |
727 self.files = {} | |
728 self.read_only = None | |
729 self.relative_cwd = None | |
730 # The main manifest. | |
731 self.root = None | |
732 logging.debug('Settings') | |
733 | |
734 def load(self, cache, root_manifest_hash): | |
735 """Loads the manifest and all the included manifests asynchronously. | |
736 | |
737 It enables support for included manifest. They are processed in strict order | |
738 but fetched asynchronously from the cache. This is important so that a file | |
739 in an included manifest that is overridden by an embedding manifest is not | |
740 fetched neededlessly. The includes are fetched in one pass and the files are | |
741 fetched as soon as all the manifests on the left-side of the tree were | |
742 fetched. | |
743 | |
744 The prioritization is very important here for nested manifests. 'includes' | |
745 have the highest priority and the algorithm is optimized for both deep and | |
746 wide manifests. A deep one is a long link of manifest referenced one at a | |
747 time by one item in 'includes'. A wide one has a large number of 'includes' | |
748 in a single manifest. 'left' is defined as an included manifest earlier in | |
749 the 'includes' list. So the order of the elements in 'includes' is | |
750 important. | |
751 """ | |
752 self.root = Manifest(root_manifest_hash) | |
753 cache.retrieve(Remote.HIGH, root_manifest_hash) | |
754 pending = {root_manifest_hash: self.root} | |
755 # Keeps the list of retrieved items to refuse recursive includes. | |
756 retrieved = [root_manifest_hash] | |
757 | |
758 def update_self(node): | |
759 node.fetch_files(cache, self.files) | |
760 # Grabs properties. | |
761 if not self.command and node.data.get('command'): | |
762 self.command = node.data['command'] | |
763 if self.read_only is None and node.data.get('read_only') is not None: | |
764 self.read_only = node.data['read_only'] | |
765 if (self.relative_cwd is None and | |
766 node.data.get('relative_cwd') is not None): | |
767 self.relative_cwd = node.data['relative_cwd'] | |
768 | |
769 def traverse_tree(node): | |
770 if node.can_fetch: | |
771 if not node.files_fetched: | |
772 update_self(node) | |
773 will_break = False | |
774 for i in node.children: | |
775 if not i.can_fetch: | |
776 if will_break: | |
777 break | |
778 # Automatically mark the first one as fetcheable. | |
779 i.can_fetch = True | |
780 will_break = True | |
781 traverse_tree(i) | |
782 | |
783 while pending: | |
784 item_hash = cache.wait_for(pending) | |
785 item = pending.pop(item_hash) | |
786 item.load(open(cache.path(item_hash), 'r').read()) | |
787 if item_hash == root_manifest_hash: | |
788 # It's the root item. | |
789 item.can_fetch = True | |
790 | |
791 for new_child in item.children: | |
792 h = new_child.obj_hash | |
793 if h in retrieved: | |
794 raise ConfigError('Manifest %s is retrieved recursively' % h) | |
795 pending[h] = new_child | |
796 cache.retrieve(Remote.HIGH, h) | |
797 | |
798 # Traverse the whole tree to see if files can now be fetched. | |
799 traverse_tree(self.root) | |
800 def check(n): | |
801 return all(check(x) for x in n.children) and n.files_fetched | |
802 assert check(self.root) | |
803 self.relative_cwd = self.relative_cwd or '' | |
804 self.read_only = self.read_only or False | |
805 | |
806 | |
807 def run_tha_test(manifest_hash, cache_dir, remote, policies): | |
808 """Downloads the dependencies in the cache, hardlinks them into a temporary | |
809 directory and runs the executable. | |
810 """ | |
811 settings = Settings() | |
812 with Cache(cache_dir, Remote(remote), policies) as cache: | |
813 outdir = make_temp_dir('run_tha_test', cache_dir) | |
814 try: | |
815 # Initiate all the files download. | |
816 with Profiler('GetManifests') as _prof: | |
817 # Optionally support local files. | |
818 if not RE_IS_SHA1.match(manifest_hash): | |
819 # Adds it in the cache. While not strictly necessary, this simplifies | |
820 # the rest. | |
821 h = hashlib.sha1(open(manifest_hash, 'r').read()).hexdigest() | |
822 cache.add(manifest_hash, h) | |
823 manifest_hash = h | |
824 settings.load(cache, manifest_hash) | |
825 | |
826 if not settings.command: | |
827 print >> sys.stderr, 'No command to run' | |
828 return 1 | |
829 | |
830 with Profiler('GetRest') as _prof: | |
831 logging.debug('Creating directories') | |
832 # Creates the tree of directories to create. | |
833 directories = set(os.path.dirname(f) for f in settings.files) | |
834 for item in list(directories): | |
835 while item: | |
836 directories.add(item) | |
837 item = os.path.dirname(item) | |
838 for d in sorted(directories): | |
839 if d: | |
840 os.mkdir(os.path.join(outdir, d)) | |
841 | |
842 # Creates the links if necessary. | |
843 for filepath, properties in settings.files.iteritems(): | |
844 if 'link' not in properties: | |
845 continue | |
846 outfile = os.path.join(outdir, filepath) | |
847 os.symlink(properties['link'], outfile) | |
848 if 'mode' in properties: | |
849 # It's not set on Windows. | |
850 os.chmod(outfile, properties['mode']) | |
851 | |
852 # Remaining files to be processed. | |
853 # Note that files could still be not be downloaded yet here. | |
854 remaining = dict() | |
855 for filepath, props in settings.files.iteritems(): | |
856 if 'sha-1' in props: | |
857 remaining.setdefault(props['sha-1'], []).append((filepath, props)) | |
858 | |
859 # Do bookkeeping while files are being downloaded in the background. | |
860 cwd = os.path.join(outdir, settings.relative_cwd) | |
861 if not os.path.isdir(cwd): | |
862 os.makedirs(cwd) | |
863 cmd = settings.command[:] | |
864 # Ensure paths are correctly separated on windows. | |
865 cmd[0] = cmd[0].replace('/', os.path.sep) | |
866 cmd = fix_python_path(cmd) | |
867 | |
868 # Now block on the remaining files to be downloaded and mapped. | |
869 while remaining: | |
870 obj = cache.wait_for(remaining) | |
871 for filepath, properties in remaining.pop(obj): | |
872 outfile = os.path.join(outdir, filepath) | |
873 link_file(outfile, cache.path(obj), HARDLINK) | |
874 if 'mode' in properties: | |
875 # It's not set on Windows. | |
876 os.chmod(outfile, properties['mode']) | |
877 | |
878 if settings.read_only: | |
879 make_writable(outdir, True) | |
880 logging.info('Running %s, cwd=%s' % (cmd, cwd)) | |
881 try: | |
882 with Profiler('RunTest') as _prof: | |
883 return subprocess.call(cmd, cwd=cwd) | |
884 except OSError: | |
885 print >> sys.stderr, 'Failed to run %s; cwd=%s' % (cmd, cwd) | |
886 raise | |
887 finally: | |
888 rmtree(outdir) | |
889 | |
890 | |
891 def main(): | |
892 parser = optparse.OptionParser( | |
893 usage='%prog <options>', description=sys.modules[__name__].__doc__) | |
894 parser.add_option( | |
895 '-v', '--verbose', action='count', default=0, help='Use multiple times') | |
896 parser.add_option('--no-run', action='store_true', help='Skip the run part') | |
897 | |
898 group = optparse.OptionGroup(parser, 'Data source') | |
899 group.add_option( | |
900 '-m', '--manifest', | |
901 metavar='FILE', | |
902 help='File/url describing what to map or run') | |
903 group.add_option( | |
904 '-H', '--hash', | |
905 help='Hash of the manifest to grab from the hash table') | |
906 parser.add_option_group(group) | |
907 | |
908 group.add_option( | |
909 '-r', '--remote', metavar='URL', help='Remote where to get the items') | |
910 group = optparse.OptionGroup(parser, 'Cache management') | |
911 group.add_option( | |
912 '--cache', | |
913 default='cache', | |
914 metavar='DIR', | |
915 help='Cache directory, default=%default') | |
916 group.add_option( | |
917 '--max-cache-size', | |
918 type='int', | |
919 metavar='NNN', | |
920 default=20*1024*1024*1024, | |
921 help='Trim if the cache gets larger than this value, default=%default') | |
922 group.add_option( | |
923 '--min-free-space', | |
924 type='int', | |
925 metavar='NNN', | |
926 default=1*1024*1024*1024, | |
927 help='Trim if disk free space becomes lower than this value, ' | |
928 'default=%default') | |
929 group.add_option( | |
930 '--max-items', | |
931 type='int', | |
932 metavar='NNN', | |
933 default=100000, | |
934 help='Trim if more than this number of items are in the cache ' | |
935 'default=%default') | |
936 parser.add_option_group(group) | |
937 | |
938 options, args = parser.parse_args() | |
939 level = [logging.ERROR, logging.INFO, logging.DEBUG][min(2, options.verbose)] | |
940 logging.basicConfig( | |
941 level=level, | |
942 format='%(levelname)5s %(module)15s(%(lineno)3d): %(message)s') | |
943 | |
944 if bool(options.manifest) == bool(options.hash): | |
945 parser.error('One and only one of --manifest or --hash is required.') | |
946 if not options.remote: | |
947 parser.error('--remote is required.') | |
948 if args: | |
949 parser.error('Unsupported args %s' % ' '.join(args)) | |
950 | |
951 policies = CachePolicies( | |
952 options.max_cache_size, options.min_free_space, options.max_items) | |
953 try: | |
954 return run_tha_test( | |
955 options.manifest or options.hash, | |
956 os.path.abspath(options.cache), | |
957 options.remote, | |
958 policies) | |
959 except (ConfigError, MappingError), e: | |
960 print >> sys.stderr, str(e) | |
961 return 1 | |
962 | |
963 | |
964 if __name__ == '__main__': | |
965 sys.exit(main()) | |
OLD | NEW |