Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2013 The Chromium Authors. All rights reserved. | 2 # Copyright 2013 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 | 5 |
| 6 """Archives a set of files to a server.""" | 6 """Archives a set of files to a server.""" |
| 7 | 7 |
| 8 __version__ = '0.2' | 8 __version__ = '0.2' |
| 9 | 9 |
| 10 import binascii | 10 import binascii |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 22 | 22 |
| 23 from third_party import colorama | 23 from third_party import colorama |
| 24 from third_party.depot_tools import fix_encoding | 24 from third_party.depot_tools import fix_encoding |
| 25 from third_party.depot_tools import subcommand | 25 from third_party.depot_tools import subcommand |
| 26 | 26 |
| 27 from utils import net | 27 from utils import net |
| 28 from utils import threading_utils | 28 from utils import threading_utils |
| 29 from utils import tools | 29 from utils import tools |
| 30 | 30 |
| 31 | 31 |
| 32 # Switch that controls what IsolateServer implementation to use by default. | |
| 33 ENABLE_GS_ISOLATE_API = False | |
|
Vadim Sh.
2013/09/26 01:14:19
I need your advice on how to deal with conditional
M-A Ruel
2013/09/26 01:21:47
3)
Let it bake on the canary, then roll deps and
Vadim Sh.
2013/09/27 20:28:12
Sweet. It simplifies a lot.
| |
| 34 | |
| 35 # Version of isolate protocol passed to the server in /handshake request. | |
| 36 ISOLATE_PROTOCOL_VERSION = '1.0' | |
| 37 | |
| 38 | |
| 32 # The minimum size of files to upload directly to the blobstore. | 39 # The minimum size of files to upload directly to the blobstore. |
| 33 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 | 40 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 |
| 34 | 41 |
| 35 # The number of files to check the isolate server per /contains query. | 42 # The number of files to check the isolate server per /contains query. |
| 36 # All files are sorted by likelihood of a change in the file content | 43 # All files are sorted by likelihood of a change in the file content |
| 37 # (currently file size is used to estimate this: larger the file -> larger the | 44 # (currently file size is used to estimate this: larger the file -> larger the |
| 38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files | 45 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files |
| 39 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], | 46 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], |
| 40 # and so on. Numbers here is a trade-off; the more per request, the lower the | 47 # and so on. Numbers here is a trade-off; the more per request, the lower the |
| 41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, | 48 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 55 | 62 |
| 56 | 63 |
| 57 # The file size to be used when we don't know the correct file size, | 64 # The file size to be used when we don't know the correct file size, |
| 58 # generally used for .isolated files. | 65 # generally used for .isolated files. |
| 59 UNKNOWN_FILE_SIZE = None | 66 UNKNOWN_FILE_SIZE = None |
| 60 | 67 |
| 61 | 68 |
| 62 # The size of each chunk to read when downloading and unzipping files. | 69 # The size of each chunk to read when downloading and unzipping files. |
| 63 ZIPPED_FILE_CHUNK = 16 * 1024 | 70 ZIPPED_FILE_CHUNK = 16 * 1024 |
| 64 | 71 |
| 65 | |
| 66 # Chunk size to use when doing disk I/O. | 72 # Chunk size to use when doing disk I/O. |
| 67 DISK_FILE_CHUNK = 1024 * 1024 | 73 DISK_FILE_CHUNK = 1024 * 1024 |
| 68 | 74 |
| 75 # Chunk size to use when reading from network stream. | |
| 76 NET_IO_FILE_CHUNK = 16 * 1024 | |
| 77 | |
| 69 | 78 |
| 70 # Read timeout in seconds for downloads from isolate storage. If there's no | 79 # Read timeout in seconds for downloads from isolate storage. If there's no |
| 71 # response from the server within this timeout whole download will be aborted. | 80 # response from the server within this timeout whole download will be aborted. |
| 72 DOWNLOAD_READ_TIMEOUT = 60 | 81 DOWNLOAD_READ_TIMEOUT = 60 |
| 73 | 82 |
| 74 # Maximum expected delay (in seconds) between successive file fetches | 83 # Maximum expected delay (in seconds) between successive file fetches |
| 75 # in run_tha_test. If it takes longer than that, a deadlock might be happening | 84 # in run_tha_test. If it takes longer than that, a deadlock might be happening |
| 76 # and all stack frames for all threads are dumped to log. | 85 # and all stack frames for all threads are dumped to log. |
| 77 DEADLOCK_TIMEOUT = 5 * 60 | 86 DEADLOCK_TIMEOUT = 5 * 60 |
| 78 | 87 |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 177 digest = algo() | 186 digest = algo() |
| 178 with open(filepath, 'rb') as f: | 187 with open(filepath, 'rb') as f: |
| 179 while True: | 188 while True: |
| 180 chunk = f.read(DISK_FILE_CHUNK) | 189 chunk = f.read(DISK_FILE_CHUNK) |
| 181 if not chunk: | 190 if not chunk: |
| 182 break | 191 break |
| 183 digest.update(chunk) | 192 digest.update(chunk) |
| 184 return digest.hexdigest() | 193 return digest.hexdigest() |
| 185 | 194 |
| 186 | 195 |
| 196 def stream_read(stream, chunk_size): | |
| 197 """Reads chunks from |stream| and yields them.""" | |
| 198 while True: | |
| 199 data = stream.read(chunk_size) | |
| 200 if not data: | |
| 201 break | |
| 202 yield data | |
| 203 | |
| 204 | |
| 187 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): | 205 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): |
| 188 """Yields file content in chunks of given |chunk_size|.""" | 206 """Yields file content in chunks of given |chunk_size|.""" |
| 189 with open(filepath, 'rb') as f: | 207 with open(filepath, 'rb') as f: |
| 190 while True: | 208 while True: |
| 191 data = f.read(chunk_size) | 209 data = f.read(chunk_size) |
| 192 if not data: | 210 if not data: |
| 193 break | 211 break |
| 194 yield data | 212 yield data |
| 195 | 213 |
| 196 | 214 |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 219 compressor = zlib.compressobj(level) | 237 compressor = zlib.compressobj(level) |
| 220 for chunk in content_generator: | 238 for chunk in content_generator: |
| 221 compressed = compressor.compress(chunk) | 239 compressed = compressor.compress(chunk) |
| 222 if compressed: | 240 if compressed: |
| 223 yield compressed | 241 yield compressed |
| 224 tail = compressor.flush(zlib.Z_FINISH) | 242 tail = compressor.flush(zlib.Z_FINISH) |
| 225 if tail: | 243 if tail: |
| 226 yield tail | 244 yield tail |
| 227 | 245 |
| 228 | 246 |
| 247 def zip_decompress(content_generator): | |
| 248 """Reads zipped data from |content_generator| and yields decompressed data. | |
| 249 | |
| 250 |content_generator| should yield an entire zipped stream, this function will | |
| 251 ensure all passed data is decompressed. | |
| 252 | |
| 253 Raises IOError if data is corrupted or incomplete. | |
| 254 """ | |
| 255 decompressor = zlib.decompressobj() | |
| 256 compressed_size = 0 | |
| 257 try: | |
| 258 for chunk in content_generator: | |
| 259 compressed_size += len(chunk) | |
| 260 decompressed = decompressor.decompress(chunk) | |
|
M-A Ruel
2013/09/26 01:21:47
You should create an inner loop here, so that if a
Vadim Sh.
2013/09/27 20:28:12
Done.
| |
| 261 if decompressed: | |
| 262 yield decompressed | |
| 263 tail = decompressor.flush() | |
| 264 if tail: | |
| 265 yield tail | |
| 266 except zlib.error as e: | |
| 267 raise IOError( | |
| 268 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e)) | |
| 269 # Ensure all data was read and decompressed. | |
| 270 if decompressor.unused_data or decompressor.unconsumed_tail: | |
| 271 raise IOError('Not all data was decompressed') | |
| 272 | |
| 273 | |
| 229 def get_zip_compression_level(filename): | 274 def get_zip_compression_level(filename): |
| 230 """Given a filename calculates the ideal zip compression level to use.""" | 275 """Given a filename calculates the ideal zip compression level to use.""" |
| 231 file_ext = os.path.splitext(filename)[1].lower() | 276 file_ext = os.path.splitext(filename)[1].lower() |
| 232 # TODO(csharp): Profile to find what compression level works best. | 277 # TODO(csharp): Profile to find what compression level works best. |
| 233 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 | 278 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 |
| 234 | 279 |
| 235 | 280 |
| 236 def create_directories(base_directory, files): | 281 def create_directories(base_directory, files): |
| 237 """Creates the directory structure needed by the given list of files.""" | 282 """Creates the directory structure needed by the given list of files.""" |
| 238 logging.debug('create_directories(%s, %d)', base_directory, len(files)) | 283 logging.debug('create_directories(%s, %d)', base_directory, len(files)) |
| (...skipping 438 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 677 | 722 |
| 678 # Do not retry this request on HTTP 50x. Regenerate an upload url each | 723 # Do not retry this request on HTTP 50x. Regenerate an upload url each |
| 679 # time since uploading "consumes" the upload url. | 724 # time since uploading "consumes" the upload url. |
| 680 result = net.url_read( | 725 result = net.url_read( |
| 681 upload_url, data=body, content_type=content_type, retry_50x=False) | 726 upload_url, data=body, content_type=content_type, retry_50x=False) |
| 682 if result is not None: | 727 if result is not None: |
| 683 return result | 728 return result |
| 684 raise MappingError('Unable to connect to server %s' % last_url) | 729 raise MappingError('Unable to connect to server %s' % last_url) |
| 685 | 730 |
| 686 | 731 |
| 732 class IsolateServerGS(StorageApi): | |
| 733 """StorageApi implementation that downloads and uploads to Isolate Server. | |
| 734 | |
| 735 It uploads and downloads directly from Google Storage whenever appropriate. | |
| 736 """ | |
| 737 | |
| 738 def __init__(self, base_url, namespace): | |
| 739 super(IsolateServerGS, self).__init__() | |
| 740 assert base_url.startswith('http'), base_url | |
| 741 self.base_url = base_url.rstrip('/') | |
| 742 self.namespace = namespace | |
| 743 self.algo = get_hash_algo(namespace) | |
| 744 self._use_zip = is_namespace_with_compression(namespace) | |
| 745 self._lock = threading.Lock() | |
| 746 self._server_caps = None | |
| 747 | |
| 748 @staticmethod | |
| 749 def generate_handshake_request(): | |
| 750 """Returns a dict to be sent as handshake request body.""" | |
| 751 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage. | |
| 752 return { | |
| 753 'protocol_version': ISOLATE_PROTOCOL_VERSION, | |
| 754 'client_app_version': __version__, | |
| 755 'fetcher': True, | |
| 756 'pusher': True, | |
| 757 } | |
| 758 | |
| 759 @staticmethod | |
| 760 def validate_handshake_response(caps): | |
| 761 """Validates and normalizes handshake response.""" | |
| 762 logging.info('Protocol version: %s', caps['protocol_version']) | |
| 763 logging.info('Server version: %s', caps['server_app_version']) | |
| 764 if "error" in caps: | |
|
M-A Ruel
2013/09/26 01:21:47
'error'
Vadim Sh.
2013/09/27 20:28:12
Done.
| |
| 765 raise MappingError(caps['error']) | |
| 766 if not caps['access_token']: | |
| 767 raise ValueError('access_token is missing') | |
| 768 return caps | |
| 769 | |
| 770 @property | |
| 771 def server_capabilities(self): | |
| 772 """Performs handshake with the server if not yet done. | |
| 773 | |
| 774 Returns: | |
| 775 Server capabilities dictionary as returned by /handshake endpoint. | |
| 776 | |
| 777 Raises: | |
| 778 MappingError if server rejects the handshake. | |
| 779 """ | |
| 780 # TODO(maruel): Make this request much earlier asynchronously while the | |
| 781 # files are being enumerated. | |
| 782 with self._lock: | |
| 783 if self._server_caps is None: | |
| 784 response = net.url_read( | |
| 785 url=self.base_url + '/content-gs/handshake', | |
| 786 data=json.dumps(self.generate_handshake_request()), | |
| 787 content_type='application/json') | |
| 788 try: | |
| 789 caps = json.loads(response) | |
| 790 if not isinstance(caps, dict): | |
| 791 raise ValueError('Expecting JSON dict') | |
| 792 self._server_caps = self.validate_handshake_response(caps) | |
| 793 except (ValueError, KeyError, TypeError) as exc: | |
| 794 raise MappingError('Invalid handshake response: %s' % exc) | |
| 795 return self._server_caps | |
| 796 | |
| 797 @property | |
| 798 def token(self): | |
| 799 """Returns access token received through handshake process.""" | |
| 800 return self.server_capabilities['access_token'] | |
| 801 | |
| 802 def fetch(self, item, expected_size): | |
| 803 assert isinstance(item, basestring) | |
| 804 assert (isinstance(expected_size, (int, long)) or | |
| 805 expected_size == UNKNOWN_FILE_SIZE) | |
| 806 | |
| 807 source_url = '%s/content-gs/retrieve/%s/%s' % ( | |
| 808 self.base_url, self.namespace, item) | |
| 809 logging.debug('download_file(%s)', source_url) | |
| 810 | |
| 811 # Because the app engine DB is only eventually consistent, retry 404 errors | |
| 812 # because the file might just not be visible yet (even though it has been | |
| 813 # uploaded). | |
| 814 connection = net.url_open( | |
| 815 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) | |
| 816 if not connection: | |
| 817 raise IOError('Unable to open connection to %s' % source_url) | |
| 818 | |
| 819 try: | |
| 820 # Prepare reading pipeline. | |
| 821 generator = stream_read(connection, NET_IO_FILE_CHUNK) | |
| 822 if self._use_zip: | |
| 823 generator = zip_decompress(generator) | |
| 824 | |
| 825 # Read and yield data, calculate total length of the decompressed stream. | |
| 826 total_size = 0 | |
| 827 for chunk in generator: | |
| 828 total_size += len(chunk) | |
| 829 yield chunk | |
| 830 | |
| 831 # Verify data length matches expectation. | |
| 832 if expected_size != UNKNOWN_FILE_SIZE and total_size != expected_size: | |
| 833 raise IOError('Incorrect file size: expected %d, got %d' % ( | |
| 834 expected_size, total_size)) | |
| 835 | |
| 836 except IOError as err: | |
| 837 logging.warning('Failed to fetch %s: %s', item, err) | |
| 838 | |
| 839 # Testing seems to show that if a few machines are trying to download | |
|
M-A Ruel
2013/09/26 01:21:47
This likely doesn't apply here.
Vadim Sh.
2013/09/27 20:28:12
Done.
| |
| 840 # the same blob, they can cause each other to fail. So if we hit a zip | |
| 841 # error, this is the most likely cause (it only downloads some of the | |
| 842 # data). Randomly sleep for between 5 and 25 seconds to try and spread | |
| 843 # out the downloads. | |
| 844 sleep_duration = (random.random() * 20) + 5 | |
| 845 time.sleep(sleep_duration) | |
| 846 raise | |
| 847 | |
| 848 def push(self, item, expected_size, content_generator, push_urls=None): | |
| 849 assert isinstance(item, basestring) | |
| 850 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE | |
| 851 assert push_urls and len(push_urls) == 2 | |
| 852 upload_url, finalize_url = push_urls | |
| 853 | |
| 854 # TODO(maruel): Support large files. This would require streaming support. | |
| 855 | |
| 856 # TODO(vadimsh): Do not read from |content_generator| when retrying push. | |
| 857 # If |content_generator| is indeed a generator, it can not be re-winded back | |
| 858 # to the beginning of the stream. A retry will find it exhausted. A possible | |
| 859 # solution is to wrap content_generator with some sort of caching | |
| 860 # restartable generator. It should be done alongside streaming support | |
| 861 # implementation. | |
| 862 | |
| 863 # A cheese way to avoid memcpy of (possibly huge) file, until streaming | |
| 864 # upload support is implemented. | |
| 865 if isinstance(content_generator, list) and len(content_generator) == 1: | |
| 866 content = content_generator[0] | |
| 867 else: | |
| 868 content = ''.join(content_generator) | |
| 869 | |
| 870 # PUT file to |upload_url|. | |
| 871 response = net.url_read(upload_url, data=content, | |
| 872 content_type='application/octet-stream', method='PUT') | |
| 873 if response is None: | |
| 874 raise IOError('Failed to upload a file %s to %s' % (item, upload_url)) | |
| 875 | |
| 876 # Optionally notify the server that it's done. | |
| 877 if finalize_url: | |
| 878 response = net.url_read(finalize_url, data={}, method='POST') | |
| 879 if response is None: | |
| 880 raise IOError('Failed to finalize an upload of %s' % item) | |
| 881 | |
| 882 def contains(self, files): | |
| 883 logging.info('Checking existence of %d files...', len(files)) | |
| 884 | |
| 885 # Request body is a json encoded list of dicts. | |
| 886 body = [ | |
| 887 { | |
| 888 'h': metadata['h'], | |
| 889 's': metadata['s'], | |
| 890 'i': 1 if metadata.get('priority') == '0' else 0, | |
| 891 } for _, metadata in files | |
| 892 ] | |
| 893 | |
| 894 query_url = '%s/content-gs/pre-upload/%s?token=%s' % ( | |
| 895 self.base_url, self.namespace, self.token) | |
| 896 response_body = url_read( | |
| 897 query_url, | |
| 898 data=json.dumps(body, separators=(',', ':')), | |
| 899 content_type='application/json') | |
| 900 | |
| 901 # Response body is a list of push_urls (or null if file is already present). | |
| 902 try: | |
| 903 response = json.loads(response_body) | |
| 904 if not isinstance(response, list): | |
| 905 raise ValueError('Expecting response with json-encoded list') | |
| 906 if len(response) != len(files): | |
| 907 raise ValueError( | |
| 908 'Incorrect number of items in the list, expected %d, ' | |
| 909 'but got %d' % (len(files), len(response))) | |
| 910 except ValueError as err: | |
| 911 raise MappingError( | |
| 912 'Invalid response from server: %s, body is %s' % (err, response_body)) | |
| 913 | |
| 914 # Convert response into a list of triplets with info about missing files. | |
| 915 missing_files = [ | |
| 916 files[i] + (push_urls,) for i, push_urls in enumerate(response) | |
| 917 if push_urls is not None | |
|
M-A Ruel
2013/09/26 01:21:47
is "if push_urls" sufficient?
Vadim Sh.
2013/09/27 20:28:12
Done.
| |
| 918 ] | |
| 919 logging.info('Queried %d files, %d cache hit', | |
| 920 len(files), len(files) - len(missing_files)) | |
| 921 return missing_files | |
| 922 | |
| 923 | |
| 687 class FileSystem(StorageApi): | 924 class FileSystem(StorageApi): |
| 688 """StorageApi implementation that fetches data from the file system. | 925 """StorageApi implementation that fetches data from the file system. |
| 689 | 926 |
| 690 The common use case is a NFS/CIFS file server that is mounted locally that is | 927 The common use case is a NFS/CIFS file server that is mounted locally that is |
| 691 used to fetch the file on a local partition. | 928 used to fetch the file on a local partition. |
| 692 """ | 929 """ |
| 693 def __init__(self, base_path): | 930 def __init__(self, base_path): |
| 694 super(FileSystem, self).__init__() | 931 super(FileSystem, self).__init__() |
| 695 self.base_path = base_path | 932 self.base_path = base_path |
| 696 | 933 |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 728 | 965 |
| 729 | 966 |
| 730 def is_namespace_with_compression(namespace): | 967 def is_namespace_with_compression(namespace): |
| 731 """Returns True if given |namespace| stores compressed objects.""" | 968 """Returns True if given |namespace| stores compressed objects.""" |
| 732 return namespace.endswith(('-gzip', '-deflate')) | 969 return namespace.endswith(('-gzip', '-deflate')) |
| 733 | 970 |
| 734 | 971 |
| 735 def get_storage_api(file_or_url, namespace): | 972 def get_storage_api(file_or_url, namespace): |
| 736 """Returns an object that implements StorageApi interface.""" | 973 """Returns an object that implements StorageApi interface.""" |
| 737 if re.match(r'^https?://.+$', file_or_url): | 974 if re.match(r'^https?://.+$', file_or_url): |
| 738 return IsolateServer(file_or_url, namespace) | 975 if ENABLE_GS_ISOLATE_API: |
| 976 return IsolateServerGS(file_or_url, namespace) | |
| 977 else: | |
| 978 return IsolateServer(file_or_url, namespace) | |
| 739 else: | 979 else: |
| 740 return FileSystem(file_or_url) | 980 return FileSystem(file_or_url) |
| 741 | 981 |
| 742 | 982 |
| 743 class WorkerPool(threading_utils.AutoRetryThreadPool): | 983 class WorkerPool(threading_utils.AutoRetryThreadPool): |
| 744 """Thread pool that automatically retries on IOError and runs a preconfigured | 984 """Thread pool that automatically retries on IOError and runs a preconfigured |
| 745 function. | 985 function. |
| 746 """ | 986 """ |
| 747 # Initial and maximum number of worker threads. | 987 # Initial and maximum number of worker threads. |
| 748 INITIAL_WORKERS = 2 | 988 INITIAL_WORKERS = 2 |
| (...skipping 553 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1302 sys.stderr.write(str(e)) | 1542 sys.stderr.write(str(e)) |
| 1303 sys.stderr.write('\n') | 1543 sys.stderr.write('\n') |
| 1304 return 1 | 1544 return 1 |
| 1305 | 1545 |
| 1306 | 1546 |
| 1307 if __name__ == '__main__': | 1547 if __name__ == '__main__': |
| 1308 fix_encoding.fix_encoding() | 1548 fix_encoding.fix_encoding() |
| 1309 tools.disable_buffering() | 1549 tools.disable_buffering() |
| 1310 colorama.init() | 1550 colorama.init() |
| 1311 sys.exit(main(sys.argv[1:])) | 1551 sys.exit(main(sys.argv[1:])) |
| OLD | NEW |