Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(483)

Side by Side Diff: isolateserver.py

Issue 24578004: Client side implementation of new /content-gs isolate protocol. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Archives a set of files to a server.""" 6 """Archives a set of files to a server."""
7 7
8 __version__ = '0.2' 8 __version__ = '0.2'
9 9
10 import binascii 10 import binascii
(...skipping 11 matching lines...) Expand all
22 22
23 from third_party import colorama 23 from third_party import colorama
24 from third_party.depot_tools import fix_encoding 24 from third_party.depot_tools import fix_encoding
25 from third_party.depot_tools import subcommand 25 from third_party.depot_tools import subcommand
26 26
27 from utils import net 27 from utils import net
28 from utils import threading_utils 28 from utils import threading_utils
29 from utils import tools 29 from utils import tools
30 30
31 31
32 # Switch that controls what IsolateServer implementation to use by default.
33 ENABLE_GS_ISOLATE_API = False
Vadim Sh. 2013/09/26 01:14:19 I need your advice on how to deal with conditional
M-A Ruel 2013/09/26 01:21:47 3) Let it bake on the canary, then roll deps and
Vadim Sh. 2013/09/27 20:28:12 Sweet. It simplifies a lot.
34
35 # Version of isolate protocol passed to the server in /handshake request.
36 ISOLATE_PROTOCOL_VERSION = '1.0'
37
38
32 # The minimum size of files to upload directly to the blobstore. 39 # The minimum size of files to upload directly to the blobstore.
33 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 40 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024
34 41
35 # The number of files to check the isolate server per /contains query. 42 # The number of files to check the isolate server per /contains query.
36 # All files are sorted by likelihood of a change in the file content 43 # All files are sorted by likelihood of a change in the file content
37 # (currently file size is used to estimate this: larger the file -> larger the 44 # (currently file size is used to estimate this: larger the file -> larger the
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files 45 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
39 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], 46 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1],
40 # and so on. Numbers here is a trade-off; the more per request, the lower the 47 # and so on. Numbers here is a trade-off; the more per request, the lower the
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, 48 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
(...skipping 13 matching lines...) Expand all
55 62
56 63
57 # The file size to be used when we don't know the correct file size, 64 # The file size to be used when we don't know the correct file size,
58 # generally used for .isolated files. 65 # generally used for .isolated files.
59 UNKNOWN_FILE_SIZE = None 66 UNKNOWN_FILE_SIZE = None
60 67
61 68
62 # The size of each chunk to read when downloading and unzipping files. 69 # The size of each chunk to read when downloading and unzipping files.
63 ZIPPED_FILE_CHUNK = 16 * 1024 70 ZIPPED_FILE_CHUNK = 16 * 1024
64 71
65
66 # Chunk size to use when doing disk I/O. 72 # Chunk size to use when doing disk I/O.
67 DISK_FILE_CHUNK = 1024 * 1024 73 DISK_FILE_CHUNK = 1024 * 1024
68 74
75 # Chunk size to use when reading from network stream.
76 NET_IO_FILE_CHUNK = 16 * 1024
77
69 78
70 # Read timeout in seconds for downloads from isolate storage. If there's no 79 # Read timeout in seconds for downloads from isolate storage. If there's no
71 # response from the server within this timeout whole download will be aborted. 80 # response from the server within this timeout whole download will be aborted.
72 DOWNLOAD_READ_TIMEOUT = 60 81 DOWNLOAD_READ_TIMEOUT = 60
73 82
74 # Maximum expected delay (in seconds) between successive file fetches 83 # Maximum expected delay (in seconds) between successive file fetches
75 # in run_tha_test. If it takes longer than that, a deadlock might be happening 84 # in run_tha_test. If it takes longer than that, a deadlock might be happening
76 # and all stack frames for all threads are dumped to log. 85 # and all stack frames for all threads are dumped to log.
77 DEADLOCK_TIMEOUT = 5 * 60 86 DEADLOCK_TIMEOUT = 5 * 60
78 87
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
177 digest = algo() 186 digest = algo()
178 with open(filepath, 'rb') as f: 187 with open(filepath, 'rb') as f:
179 while True: 188 while True:
180 chunk = f.read(DISK_FILE_CHUNK) 189 chunk = f.read(DISK_FILE_CHUNK)
181 if not chunk: 190 if not chunk:
182 break 191 break
183 digest.update(chunk) 192 digest.update(chunk)
184 return digest.hexdigest() 193 return digest.hexdigest()
185 194
186 195
196 def stream_read(stream, chunk_size):
197 """Reads chunks from |stream| and yields them."""
198 while True:
199 data = stream.read(chunk_size)
200 if not data:
201 break
202 yield data
203
204
187 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): 205 def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
188 """Yields file content in chunks of given |chunk_size|.""" 206 """Yields file content in chunks of given |chunk_size|."""
189 with open(filepath, 'rb') as f: 207 with open(filepath, 'rb') as f:
190 while True: 208 while True:
191 data = f.read(chunk_size) 209 data = f.read(chunk_size)
192 if not data: 210 if not data:
193 break 211 break
194 yield data 212 yield data
195 213
196 214
(...skipping 22 matching lines...) Expand all
219 compressor = zlib.compressobj(level) 237 compressor = zlib.compressobj(level)
220 for chunk in content_generator: 238 for chunk in content_generator:
221 compressed = compressor.compress(chunk) 239 compressed = compressor.compress(chunk)
222 if compressed: 240 if compressed:
223 yield compressed 241 yield compressed
224 tail = compressor.flush(zlib.Z_FINISH) 242 tail = compressor.flush(zlib.Z_FINISH)
225 if tail: 243 if tail:
226 yield tail 244 yield tail
227 245
228 246
247 def zip_decompress(content_generator):
248 """Reads zipped data from |content_generator| and yields decompressed data.
249
250 |content_generator| should yield an entire zipped stream, this function will
251 ensure all passed data is decompressed.
252
253 Raises IOError if data is corrupted or incomplete.
254 """
255 decompressor = zlib.decompressobj()
256 compressed_size = 0
257 try:
258 for chunk in content_generator:
259 compressed_size += len(chunk)
260 decompressed = decompressor.decompress(chunk)
M-A Ruel 2013/09/26 01:21:47 You should create an inner loop here, so that if a
Vadim Sh. 2013/09/27 20:28:12 Done.
261 if decompressed:
262 yield decompressed
263 tail = decompressor.flush()
264 if tail:
265 yield tail
266 except zlib.error as e:
267 raise IOError(
268 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
269 # Ensure all data was read and decompressed.
270 if decompressor.unused_data or decompressor.unconsumed_tail:
271 raise IOError('Not all data was decompressed')
272
273
229 def get_zip_compression_level(filename): 274 def get_zip_compression_level(filename):
230 """Given a filename calculates the ideal zip compression level to use.""" 275 """Given a filename calculates the ideal zip compression level to use."""
231 file_ext = os.path.splitext(filename)[1].lower() 276 file_ext = os.path.splitext(filename)[1].lower()
232 # TODO(csharp): Profile to find what compression level works best. 277 # TODO(csharp): Profile to find what compression level works best.
233 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 278 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
234 279
235 280
236 def create_directories(base_directory, files): 281 def create_directories(base_directory, files):
237 """Creates the directory structure needed by the given list of files.""" 282 """Creates the directory structure needed by the given list of files."""
238 logging.debug('create_directories(%s, %d)', base_directory, len(files)) 283 logging.debug('create_directories(%s, %d)', base_directory, len(files))
(...skipping 438 matching lines...) Expand 10 before | Expand all | Expand 10 after
677 722
678 # Do not retry this request on HTTP 50x. Regenerate an upload url each 723 # Do not retry this request on HTTP 50x. Regenerate an upload url each
679 # time since uploading "consumes" the upload url. 724 # time since uploading "consumes" the upload url.
680 result = net.url_read( 725 result = net.url_read(
681 upload_url, data=body, content_type=content_type, retry_50x=False) 726 upload_url, data=body, content_type=content_type, retry_50x=False)
682 if result is not None: 727 if result is not None:
683 return result 728 return result
684 raise MappingError('Unable to connect to server %s' % last_url) 729 raise MappingError('Unable to connect to server %s' % last_url)
685 730
686 731
732 class IsolateServerGS(StorageApi):
733 """StorageApi implementation that downloads and uploads to Isolate Server.
734
735 It uploads and downloads directly from Google Storage whenever appropriate.
736 """
737
738 def __init__(self, base_url, namespace):
739 super(IsolateServerGS, self).__init__()
740 assert base_url.startswith('http'), base_url
741 self.base_url = base_url.rstrip('/')
742 self.namespace = namespace
743 self.algo = get_hash_algo(namespace)
744 self._use_zip = is_namespace_with_compression(namespace)
745 self._lock = threading.Lock()
746 self._server_caps = None
747
748 @staticmethod
749 def generate_handshake_request():
750 """Returns a dict to be sent as handshake request body."""
751 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
752 return {
753 'protocol_version': ISOLATE_PROTOCOL_VERSION,
754 'client_app_version': __version__,
755 'fetcher': True,
756 'pusher': True,
757 }
758
759 @staticmethod
760 def validate_handshake_response(caps):
761 """Validates and normalizes handshake response."""
762 logging.info('Protocol version: %s', caps['protocol_version'])
763 logging.info('Server version: %s', caps['server_app_version'])
764 if "error" in caps:
M-A Ruel 2013/09/26 01:21:47 'error'
Vadim Sh. 2013/09/27 20:28:12 Done.
765 raise MappingError(caps['error'])
766 if not caps['access_token']:
767 raise ValueError('access_token is missing')
768 return caps
769
770 @property
771 def server_capabilities(self):
772 """Performs handshake with the server if not yet done.
773
774 Returns:
775 Server capabilities dictionary as returned by /handshake endpoint.
776
777 Raises:
778 MappingError if server rejects the handshake.
779 """
780 # TODO(maruel): Make this request much earlier asynchronously while the
781 # files are being enumerated.
782 with self._lock:
783 if self._server_caps is None:
784 response = net.url_read(
785 url=self.base_url + '/content-gs/handshake',
786 data=json.dumps(self.generate_handshake_request()),
787 content_type='application/json')
788 try:
789 caps = json.loads(response)
790 if not isinstance(caps, dict):
791 raise ValueError('Expecting JSON dict')
792 self._server_caps = self.validate_handshake_response(caps)
793 except (ValueError, KeyError, TypeError) as exc:
794 raise MappingError('Invalid handshake response: %s' % exc)
795 return self._server_caps
796
797 @property
798 def token(self):
799 """Returns access token received through handshake process."""
800 return self.server_capabilities['access_token']
801
802 def fetch(self, item, expected_size):
803 assert isinstance(item, basestring)
804 assert (isinstance(expected_size, (int, long)) or
805 expected_size == UNKNOWN_FILE_SIZE)
806
807 source_url = '%s/content-gs/retrieve/%s/%s' % (
808 self.base_url, self.namespace, item)
809 logging.debug('download_file(%s)', source_url)
810
811 # Because the app engine DB is only eventually consistent, retry 404 errors
812 # because the file might just not be visible yet (even though it has been
813 # uploaded).
814 connection = net.url_open(
815 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
816 if not connection:
817 raise IOError('Unable to open connection to %s' % source_url)
818
819 try:
820 # Prepare reading pipeline.
821 generator = stream_read(connection, NET_IO_FILE_CHUNK)
822 if self._use_zip:
823 generator = zip_decompress(generator)
824
825 # Read and yield data, calculate total length of the decompressed stream.
826 total_size = 0
827 for chunk in generator:
828 total_size += len(chunk)
829 yield chunk
830
831 # Verify data length matches expectation.
832 if expected_size != UNKNOWN_FILE_SIZE and total_size != expected_size:
833 raise IOError('Incorrect file size: expected %d, got %d' % (
834 expected_size, total_size))
835
836 except IOError as err:
837 logging.warning('Failed to fetch %s: %s', item, err)
838
839 # Testing seems to show that if a few machines are trying to download
M-A Ruel 2013/09/26 01:21:47 This likely doesn't apply here.
Vadim Sh. 2013/09/27 20:28:12 Done.
840 # the same blob, they can cause each other to fail. So if we hit a zip
841 # error, this is the most likely cause (it only downloads some of the
842 # data). Randomly sleep for between 5 and 25 seconds to try and spread
843 # out the downloads.
844 sleep_duration = (random.random() * 20) + 5
845 time.sleep(sleep_duration)
846 raise
847
848 def push(self, item, expected_size, content_generator, push_urls=None):
849 assert isinstance(item, basestring)
850 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
851 assert push_urls and len(push_urls) == 2
852 upload_url, finalize_url = push_urls
853
854 # TODO(maruel): Support large files. This would require streaming support.
855
856 # TODO(vadimsh): Do not read from |content_generator| when retrying push.
857 # If |content_generator| is indeed a generator, it can not be re-winded back
858 # to the beginning of the stream. A retry will find it exhausted. A possible
859 # solution is to wrap content_generator with some sort of caching
860 # restartable generator. It should be done alongside streaming support
861 # implementation.
862
863 # A cheese way to avoid memcpy of (possibly huge) file, until streaming
864 # upload support is implemented.
865 if isinstance(content_generator, list) and len(content_generator) == 1:
866 content = content_generator[0]
867 else:
868 content = ''.join(content_generator)
869
870 # PUT file to |upload_url|.
871 response = net.url_read(upload_url, data=content,
872 content_type='application/octet-stream', method='PUT')
873 if response is None:
874 raise IOError('Failed to upload a file %s to %s' % (item, upload_url))
875
876 # Optionally notify the server that it's done.
877 if finalize_url:
878 response = net.url_read(finalize_url, data={}, method='POST')
879 if response is None:
880 raise IOError('Failed to finalize an upload of %s' % item)
881
882 def contains(self, files):
883 logging.info('Checking existence of %d files...', len(files))
884
885 # Request body is a json encoded list of dicts.
886 body = [
887 {
888 'h': metadata['h'],
889 's': metadata['s'],
890 'i': 1 if metadata.get('priority') == '0' else 0,
891 } for _, metadata in files
892 ]
893
894 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
895 self.base_url, self.namespace, self.token)
896 response_body = url_read(
897 query_url,
898 data=json.dumps(body, separators=(',', ':')),
899 content_type='application/json')
900
901 # Response body is a list of push_urls (or null if file is already present).
902 try:
903 response = json.loads(response_body)
904 if not isinstance(response, list):
905 raise ValueError('Expecting response with json-encoded list')
906 if len(response) != len(files):
907 raise ValueError(
908 'Incorrect number of items in the list, expected %d, '
909 'but got %d' % (len(files), len(response)))
910 except ValueError as err:
911 raise MappingError(
912 'Invalid response from server: %s, body is %s' % (err, response_body))
913
914 # Convert response into a list of triplets with info about missing files.
915 missing_files = [
916 files[i] + (push_urls,) for i, push_urls in enumerate(response)
917 if push_urls is not None
M-A Ruel 2013/09/26 01:21:47 is "if push_urls" sufficient?
Vadim Sh. 2013/09/27 20:28:12 Done.
918 ]
919 logging.info('Queried %d files, %d cache hit',
920 len(files), len(files) - len(missing_files))
921 return missing_files
922
923
687 class FileSystem(StorageApi): 924 class FileSystem(StorageApi):
688 """StorageApi implementation that fetches data from the file system. 925 """StorageApi implementation that fetches data from the file system.
689 926
690 The common use case is a NFS/CIFS file server that is mounted locally that is 927 The common use case is a NFS/CIFS file server that is mounted locally that is
691 used to fetch the file on a local partition. 928 used to fetch the file on a local partition.
692 """ 929 """
693 def __init__(self, base_path): 930 def __init__(self, base_path):
694 super(FileSystem, self).__init__() 931 super(FileSystem, self).__init__()
695 self.base_path = base_path 932 self.base_path = base_path
696 933
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
728 965
729 966
730 def is_namespace_with_compression(namespace): 967 def is_namespace_with_compression(namespace):
731 """Returns True if given |namespace| stores compressed objects.""" 968 """Returns True if given |namespace| stores compressed objects."""
732 return namespace.endswith(('-gzip', '-deflate')) 969 return namespace.endswith(('-gzip', '-deflate'))
733 970
734 971
735 def get_storage_api(file_or_url, namespace): 972 def get_storage_api(file_or_url, namespace):
736 """Returns an object that implements StorageApi interface.""" 973 """Returns an object that implements StorageApi interface."""
737 if re.match(r'^https?://.+$', file_or_url): 974 if re.match(r'^https?://.+$', file_or_url):
738 return IsolateServer(file_or_url, namespace) 975 if ENABLE_GS_ISOLATE_API:
976 return IsolateServerGS(file_or_url, namespace)
977 else:
978 return IsolateServer(file_or_url, namespace)
739 else: 979 else:
740 return FileSystem(file_or_url) 980 return FileSystem(file_or_url)
741 981
742 982
743 class WorkerPool(threading_utils.AutoRetryThreadPool): 983 class WorkerPool(threading_utils.AutoRetryThreadPool):
744 """Thread pool that automatically retries on IOError and runs a preconfigured 984 """Thread pool that automatically retries on IOError and runs a preconfigured
745 function. 985 function.
746 """ 986 """
747 # Initial and maximum number of worker threads. 987 # Initial and maximum number of worker threads.
748 INITIAL_WORKERS = 2 988 INITIAL_WORKERS = 2
(...skipping 553 matching lines...) Expand 10 before | Expand all | Expand 10 after
1302 sys.stderr.write(str(e)) 1542 sys.stderr.write(str(e))
1303 sys.stderr.write('\n') 1543 sys.stderr.write('\n')
1304 return 1 1544 return 1
1305 1545
1306 1546
1307 if __name__ == '__main__': 1547 if __name__ == '__main__':
1308 fix_encoding.fix_encoding() 1548 fix_encoding.fix_encoding()
1309 tools.disable_buffering() 1549 tools.disable_buffering()
1310 colorama.init() 1550 colorama.init()
1311 sys.exit(main(sys.argv[1:])) 1551 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698