Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(430)

Side by Side Diff: isolateserver.py

Issue 24578004: Client side implementation of new /content-gs isolate protocol. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: decompress by chunks, test for zip/unzip Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Archives a set of files to a server.""" 6 """Archives a set of files to a server."""
7 7
8 __version__ = '0.2' 8 __version__ = '0.2'
9 9
10 import binascii
11 import hashlib 10 import hashlib
12 import json 11 import json
13 import logging 12 import logging
14 import os 13 import os
15 import random
16 import re 14 import re
17 import sys 15 import sys
18 import threading 16 import threading
19 import time 17 import time
20 import urllib 18 import urllib
21 import zlib 19 import zlib
22 20
23 from third_party import colorama 21 from third_party import colorama
24 from third_party.depot_tools import fix_encoding 22 from third_party.depot_tools import fix_encoding
25 from third_party.depot_tools import subcommand 23 from third_party.depot_tools import subcommand
26 24
27 from utils import net 25 from utils import net
28 from utils import threading_utils 26 from utils import threading_utils
29 from utils import tools 27 from utils import tools
30 28
31 29
32 # The minimum size of files to upload directly to the blobstore. 30 # Version of isolate protocol passed to the server in /handshake request.
33 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 31 ISOLATE_PROTOCOL_VERSION = '1.0'
34 32
35 # The number of files to check the isolate server per /contains query. 33
34 # The number of files to check the isolate server per /pre-upload query.
36 # All files are sorted by likelihood of a change in the file content 35 # All files are sorted by likelihood of a change in the file content
37 # (currently file size is used to estimate this: larger the file -> larger the 36 # (currently file size is used to estimate this: larger the file -> larger the
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files 37 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
39 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], 38 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
40 # and so on. Numbers here is a trade-off; the more per request, the lower the 39 # and so on. Numbers here is a trade-off; the more per request, the lower the
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, 40 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42 # larger values cause longer lookups, increasing the initial latency to start 41 # larger values cause longer lookups, increasing the initial latency to start
43 # uploading, which is especially an issue for large files. This value is 42 # uploading, which is especially an issue for large files. This value is
44 # optimized for the "few thousands files to look up with minimal number of large 43 # optimized for the "few thousands files to look up with minimal number of large
45 # files missing" case. 44 # files missing" case.
46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] 45 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
47 46
48 47
49 # A list of already compressed extension types that should not receive any 48 # A list of already compressed extension types that should not receive any
50 # compression before being uploaded. 49 # compression before being uploaded.
51 ALREADY_COMPRESSED_TYPES = [ 50 ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', 51 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip' 52 'wav', 'zip'
54 ] 53 ]
55 54
56 55
57 # The file size to be used when we don't know the correct file size, 56 # The file size to be used when we don't know the correct file size,
58 # generally used for .isolated files. 57 # generally used for .isolated files.
59 UNKNOWN_FILE_SIZE = None 58 UNKNOWN_FILE_SIZE = None
60 59
61 60
62 # The size of each chunk to read when downloading and unzipping files. 61 # The size of each chunk to read when downloading and unzipping files.
63 ZIPPED_FILE_CHUNK = 16 * 1024 62 ZIPPED_FILE_CHUNK = 16 * 1024
64 63
65
66 # Chunk size to use when doing disk I/O. 64 # Chunk size to use when doing disk I/O.
67 DISK_FILE_CHUNK = 1024 * 1024 65 DISK_FILE_CHUNK = 1024 * 1024
68 66
67 # Chunk size to use when reading from network stream.
68 NET_IO_FILE_CHUNK = 16 * 1024
69
69 70
70 # Read timeout in seconds for downloads from isolate storage. If there's no 71 # Read timeout in seconds for downloads from isolate storage. If there's no
71 # response from the server within this timeout whole download will be aborted. 72 # response from the server within this timeout whole download will be aborted.
72 DOWNLOAD_READ_TIMEOUT = 60 73 DOWNLOAD_READ_TIMEOUT = 60
73 74
74 # Maximum expected delay (in seconds) between successive file fetches 75 # Maximum expected delay (in seconds) between successive file fetches
75 # in run_tha_test. If it takes longer than that, a deadlock might be happening 76 # in run_tha_test. If it takes longer than that, a deadlock might be happening
76 # and all stack frames for all threads are dumped to log. 77 # and all stack frames for all threads are dumped to log.
77 DEADLOCK_TIMEOUT = 5 * 60 78 DEADLOCK_TIMEOUT = 5 * 60
78 79
(...skipping 20 matching lines...) Expand all
99 class ConfigError(ValueError): 100 class ConfigError(ValueError):
100 """Generic failure to load a .isolated file.""" 101 """Generic failure to load a .isolated file."""
101 pass 102 pass
102 103
103 104
104 class MappingError(OSError): 105 class MappingError(OSError):
105 """Failed to recreate the tree.""" 106 """Failed to recreate the tree."""
106 pass 107 pass
107 108
108 109
109 def randomness():
110 """Generates low-entropy randomness for MIME encoding.
111
112 Exists so it can be mocked out in unit tests.
113 """
114 return str(time.time())
115
116
117 def encode_multipart_formdata(fields, files,
118 mime_mapper=lambda _: 'application/octet-stream'):
119 """Encodes a Multipart form data object.
120
121 Args:
122 fields: a sequence (name, value) elements for
123 regular form fields.
124 files: a sequence of (name, filename, value) elements for data to be
125 uploaded as files.
126 mime_mapper: function to return the mime type from the filename.
127 Returns:
128 content_type: for httplib.HTTP instance
129 body: for httplib.HTTP instance
130 """
131 boundary = hashlib.md5(randomness()).hexdigest()
132 body_list = []
133 for (key, value) in fields:
134 if isinstance(key, unicode):
135 value = key.encode('utf-8')
136 if isinstance(value, unicode):
137 value = value.encode('utf-8')
138 body_list.append('--' + boundary)
139 body_list.append('Content-Disposition: form-data; name="%s"' % key)
140 body_list.append('')
141 body_list.append(value)
142 body_list.append('--' + boundary)
143 body_list.append('')
144 for (key, filename, value) in files:
145 if isinstance(key, unicode):
146 value = key.encode('utf-8')
147 if isinstance(filename, unicode):
148 value = filename.encode('utf-8')
149 if isinstance(value, unicode):
150 value = value.encode('utf-8')
151 body_list.append('--' + boundary)
152 body_list.append('Content-Disposition: form-data; name="%s"; '
153 'filename="%s"' % (key, filename))
154 body_list.append('Content-Type: %s' % mime_mapper(filename))
155 body_list.append('')
156 body_list.append(value)
157 body_list.append('--' + boundary)
158 body_list.append('')
159 if body_list:
160 body_list[-2] += '--'
161 body = '\r\n'.join(body_list)
162 content_type = 'multipart/form-data; boundary=%s' % boundary
163 return content_type, body
164
165
166 def is_valid_hash(value, algo): 110 def is_valid_hash(value, algo):
167 """Returns if the value is a valid hash for the corresponding algorithm.""" 111 """Returns if the value is a valid hash for the corresponding algorithm."""
168 size = 2 * algo().digest_size 112 size = 2 * algo().digest_size
169 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) 113 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
170 114
171 115
172 def hash_file(filepath, algo): 116 def hash_file(filepath, algo):
173 """Calculates the hash of a file without reading it all in memory at once. 117 """Calculates the hash of a file without reading it all in memory at once.
174 118
175 |algo| should be one of hashlib hashing algorithm. 119 |algo| should be one of hashlib hashing algorithm.
176 """ 120 """
177 digest = algo() 121 digest = algo()
178 with open(filepath, 'rb') as f: 122 with open(filepath, 'rb') as f:
179 while True: 123 while True:
180 chunk = f.read(DISK_FILE_CHUNK) 124 chunk = f.read(DISK_FILE_CHUNK)
181 if not chunk: 125 if not chunk:
182 break 126 break
183 digest.update(chunk) 127 digest.update(chunk)
184 return digest.hexdigest() 128 return digest.hexdigest()
185 129
186 130
131 def stream_read(stream, chunk_size):
132 """Reads chunks from |stream| and yields them."""
133 while True:
134 data = stream.read(chunk_size)
135 if not data:
136 break
137 yield data
138
139
187 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): 140 def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
188 """Yields file content in chunks of given |chunk_size|.""" 141 """Yields file content in chunks of given |chunk_size|."""
189 with open(filepath, 'rb') as f: 142 with open(filepath, 'rb') as f:
190 while True: 143 while True:
191 data = f.read(chunk_size) 144 data = f.read(chunk_size)
192 if not data: 145 if not data:
193 break 146 break
194 yield data 147 yield data
195 148
196 149
(...skipping 22 matching lines...) Expand all
219 compressor = zlib.compressobj(level) 172 compressor = zlib.compressobj(level)
220 for chunk in content_generator: 173 for chunk in content_generator:
221 compressed = compressor.compress(chunk) 174 compressed = compressor.compress(chunk)
222 if compressed: 175 if compressed:
223 yield compressed 176 yield compressed
224 tail = compressor.flush(zlib.Z_FINISH) 177 tail = compressor.flush(zlib.Z_FINISH)
225 if tail: 178 if tail:
226 yield tail 179 yield tail
227 180
228 181
182 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
183 """Reads zipped data from |content_generator| and yields decompressed data.
184
185 |content_generator| should yield an entire zipped stream, this function will
Vadim Sh. 2013/09/27 20:28:12 I wrote a test to check that (incomplete stream ->
186 ensure all passed data is decompressed.
187
188 Decompresses data in small chunks (no larger than |chunk_size|) so that
189 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
190
191 Raises IOError if data is corrupted or incomplete.
192 """
193 decompressor = zlib.decompressobj()
194 compressed_size = 0
195 try:
196 for chunk in content_generator:
197 compressed_size += len(chunk)
198 data = decompressor.decompress(chunk, chunk_size)
199 if data:
200 yield data
201 while decompressor.unconsumed_tail:
202 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
203 if data:
204 yield data
205 tail = decompressor.flush()
206 if tail:
207 yield tail
208 except zlib.error as e:
209 raise IOError(
210 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
211 # Ensure all data was read and decompressed.
212 if decompressor.unused_data or decompressor.unconsumed_tail:
213 raise IOError('Not all data was decompressed')
214
215
229 def get_zip_compression_level(filename): 216 def get_zip_compression_level(filename):
230 """Given a filename calculates the ideal zip compression level to use.""" 217 """Given a filename calculates the ideal zip compression level to use."""
231 file_ext = os.path.splitext(filename)[1].lower() 218 file_ext = os.path.splitext(filename)[1].lower()
232 # TODO(csharp): Profile to find what compression level works best. 219 # TODO(csharp): Profile to find what compression level works best.
233 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 220 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
234 221
235 222
236 def create_directories(base_directory, files): 223 def create_directories(base_directory, files):
237 """Creates the directory structure needed by the given list of files.""" 224 """Creates the directory structure needed by the given list of files."""
238 logging.debug('create_directories(%s, %d)', base_directory, len(files)) 225 logging.debug('create_directories(%s, %d)', base_directory, len(files))
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
294 281
295 282
296 def try_remove(filepath): 283 def try_remove(filepath):
297 """Removes a file without crashing even if it doesn't exist.""" 284 """Removes a file without crashing even if it doesn't exist."""
298 try: 285 try:
299 os.remove(filepath) 286 os.remove(filepath)
300 except OSError: 287 except OSError:
301 pass 288 pass
302 289
303 290
304 def url_read(url, **kwargs):
305 result = net.url_read(url, **kwargs)
306 if result is None:
307 # If we get no response from the server, assume it is down and raise an
308 # exception.
309 raise MappingError('Unable to connect to server %s' % url)
310 return result
311
312
313 class Storage(object): 291 class Storage(object):
314 """Efficiently downloads or uploads large set of files via StorageApi.""" 292 """Efficiently downloads or uploads large set of files via StorageApi."""
315 293
316 def __init__(self, storage_api, use_zip): 294 def __init__(self, storage_api, use_zip):
317 self.use_zip = use_zip 295 self.use_zip = use_zip
318 self._storage_api = storage_api 296 self._storage_api = storage_api
319 self._cpu_thread_pool = None 297 self._cpu_thread_pool = None
320 self._net_thread_pool = None 298 self._net_thread_pool = None
321 299
322 @property 300 @property
(...skipping 209 matching lines...) Expand 10 before | Expand all | Expand 10 after
532 Arguments: 510 Arguments:
533 files: list of pairs (file name, metadata dict). 511 files: list of pairs (file name, metadata dict).
534 512
535 Returns: 513 Returns:
536 A list of files missing on server as a list of triplets 514 A list of files missing on server as a list of triplets
537 (file name, metadata dict, push_urls object to pass to push). 515 (file name, metadata dict, push_urls object to pass to push).
538 """ 516 """
539 raise NotImplementedError() 517 raise NotImplementedError()
540 518
541 519
542 class IsolateServer(StorageApi): 520 class IsolateServerGS(StorageApi):
543 """StorageApi implementation that downloads and uploads to Isolate Server.""" 521 """StorageApi implementation that downloads and uploads to Isolate Server.
522
523 It uploads and downloads directly from Google Storage whenever appropriate.
524 """
525
544 def __init__(self, base_url, namespace): 526 def __init__(self, base_url, namespace):
545 super(IsolateServer, self).__init__() 527 super(IsolateServerGS, self).__init__()
546 assert base_url.startswith('http'), base_url 528 assert base_url.startswith('http'), base_url
547 self.content_url = base_url.rstrip('/') + '/content/' 529 self.base_url = base_url.rstrip('/')
548 self.namespace = namespace 530 self.namespace = namespace
549 self.algo = get_hash_algo(namespace) 531 self.algo = get_hash_algo(namespace)
550 self._token = None 532 self._use_zip = is_namespace_with_compression(namespace)
551 self._lock = threading.Lock() 533 self._lock = threading.Lock()
534 self._server_caps = None
535
536 @staticmethod
537 def generate_handshake_request():
538 """Returns a dict to be sent as handshake request body."""
539 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
540 return {
541 'protocol_version': ISOLATE_PROTOCOL_VERSION,
542 'client_app_version': __version__,
543 'fetcher': True,
544 'pusher': True,
545 }
546
547 @staticmethod
548 def validate_handshake_response(caps):
549 """Validates and normalizes handshake response."""
550 logging.info('Protocol version: %s', caps['protocol_version'])
551 logging.info('Server version: %s', caps['server_app_version'])
552 if 'error' in caps:
553 raise MappingError(caps['error'])
554 if not caps['access_token']:
555 raise ValueError('access_token is missing')
556 return caps
557
558 @property
559 def server_capabilities(self):
560 """Performs handshake with the server if not yet done.
561
562 Returns:
563 Server capabilities dictionary as returned by /handshake endpoint.
564
565 Raises:
566 MappingError if server rejects the handshake.
567 """
568 # TODO(maruel): Make this request much earlier asynchronously while the
569 # files are being enumerated.
570 with self._lock:
571 if self._server_caps is None:
572 response = net.url_read(
573 url=self.base_url + '/content-gs/handshake',
574 data=json.dumps(self.generate_handshake_request()),
575 content_type='application/json')
576 if response is None:
577 raise MappingError('Failed to perform handshake.')
578 try:
579 caps = json.loads(response)
580 if not isinstance(caps, dict):
581 raise ValueError('Expecting JSON dict')
582 self._server_caps = self.validate_handshake_response(caps)
583 except (ValueError, KeyError, TypeError) as exc:
584 raise MappingError('Invalid handshake response: %s' % exc)
585 return self._server_caps
552 586
553 @property 587 @property
554 def token(self): 588 def token(self):
555 # TODO(maruel): Make this request much earlier asynchronously while the 589 """Returns access token received through handshake process."""
556 # files are being enumerated. 590 return self.server_capabilities['access_token']
557 with self._lock:
558 if not self._token:
559 self._token = urllib.quote(url_read(self.content_url + 'get_token'))
560 return self._token
561 591
562 def fetch(self, item, expected_size): 592 def fetch(self, item, expected_size):
563 assert isinstance(item, basestring) 593 assert isinstance(item, basestring)
564 assert ( 594 assert (isinstance(expected_size, (int, long)) or
565 isinstance(expected_size, (int, long)) or
566 expected_size == UNKNOWN_FILE_SIZE) 595 expected_size == UNKNOWN_FILE_SIZE)
567 zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item) 596
568 logging.debug('download_file(%s)', zipped_url) 597 source_url = '%s/content-gs/retrieve/%s/%s' % (
598 self.base_url, self.namespace, item)
599 logging.debug('download_file(%s)', source_url)
569 600
570 # Because the app engine DB is only eventually consistent, retry 404 errors 601 # Because the app engine DB is only eventually consistent, retry 404 errors
571 # because the file might just not be visible yet (even though it has been 602 # because the file might just not be visible yet (even though it has been
572 # uploaded). 603 # uploaded).
573 connection = net.url_open( 604 connection = net.url_open(
574 zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) 605 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
575 if not connection: 606 if not connection:
576 raise IOError('Unable to open connection to %s' % zipped_url) 607 raise IOError('Unable to open connection to %s' % source_url)
577 608
578 # TODO(maruel): Must only decompress when needed.
579 decompressor = zlib.decompressobj()
580 try: 609 try:
581 compressed_size = 0 610 # Prepare reading pipeline.
582 decompressed_size = 0 611 generator = stream_read(connection, NET_IO_FILE_CHUNK)
583 while True: 612 if self._use_zip:
584 chunk = connection.read(ZIPPED_FILE_CHUNK) 613 generator = zip_decompress(generator)
585 if not chunk:
586 break
587 compressed_size += len(chunk)
588 decompressed = decompressor.decompress(chunk)
589 decompressed_size += len(decompressed)
590 yield decompressed
591 614
592 # Ensure that all the data was properly decompressed. 615 # Read and yield data, calculate total length of the decompressed stream.
593 uncompressed_data = decompressor.flush() 616 total_size = 0
594 if uncompressed_data: 617 for chunk in generator:
595 raise IOError('Decompression failed') 618 total_size += len(chunk)
596 if (expected_size != UNKNOWN_FILE_SIZE and 619 yield chunk
597 decompressed_size != expected_size):
598 raise IOError('File incorrect size after download of %s. Got %s and '
599 'expected %s' % (item, decompressed_size, expected_size))
600 except zlib.error as e:
601 msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % (
602 item, compressed_size, connection.content_length, e)
603 logging.warning(msg)
604 620
605 # Testing seems to show that if a few machines are trying to download 621 # Verify data length matches expectation.
606 # the same blob, they can cause each other to fail. So if we hit a zip 622 if expected_size != UNKNOWN_FILE_SIZE and total_size != expected_size:
607 # error, this is the most likely cause (it only downloads some of the 623 raise IOError('Incorrect file size: expected %d, got %d' % (
608 # data). Randomly sleep for between 5 and 25 seconds to try and spread 624 expected_size, total_size))
609 # out the downloads. 625
610 sleep_duration = (random.random() * 20) + 5 626 except IOError as err:
611 time.sleep(sleep_duration) 627 logging.warning('Failed to fetch %s: %s', item, err)
612 raise IOError(msg) 628 raise
613 629
614 def push(self, item, expected_size, content_generator, push_urls=None): 630 def push(self, item, expected_size, content_generator, push_urls=None):
615 assert isinstance(item, basestring) 631 assert isinstance(item, basestring)
616 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 632 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
617 item = str(item) 633 assert push_urls and len(push_urls) == 2
634 upload_url, finalize_url = push_urls
618 635
619 # TODO(maruel): Support large files. This would require streaming support. 636 # TODO(maruel): Support large files. This would require streaming support.
620 637
638 # TODO(vadimsh): Do not read from |content_generator| when retrying push.
639 # If |content_generator| is indeed a generator, it can not be re-winded back
640 # to the beginning of the stream. A retry will find it exhausted. A possible
641 # solution is to wrap content_generator with some sort of caching
642 # restartable generator. It should be done alongside streaming support
643 # implementation.
644
621 # A cheese way to avoid memcpy of (possibly huge) file, until streaming 645 # A cheese way to avoid memcpy of (possibly huge) file, until streaming
622 # upload support is implemented. 646 # upload support is implemented.
623 if isinstance(content_generator, list) and len(content_generator) == 1: 647 if isinstance(content_generator, list) and len(content_generator) == 1:
624 content = content_generator[0] 648 content = content_generator[0]
625 else: 649 else:
626 content = ''.join(content_generator) 650 content = ''.join(content_generator)
627 651
628 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE: 652 # PUT file to |upload_url|.
629 return self._upload_hash_content_to_blobstore(item, content) 653 response = net.url_read(upload_url, data=content,
654 content_type='application/octet-stream', method='PUT')
655 if response is None:
656 raise IOError('Failed to upload a file %s to %s' % (item, upload_url))
630 657
631 url = '%sstore/%s/%s?token=%s' % ( 658 # Optionally notify the server that it's done.
632 self.content_url, self.namespace, item, self.token) 659 if finalize_url:
633 return url_read(url, data=content, content_type='application/octet-stream') 660 response = net.url_read(finalize_url, data={}, method='POST')
661 if response is None:
662 raise IOError('Failed to finalize an upload of %s' % item)
634 663
635 def contains(self, files): 664 def contains(self, files):
636 logging.info('Checking existence of %d files...', len(files)) 665 logging.info('Checking existence of %d files...', len(files))
637 666
638 body = ''.join( 667 # Request body is a json encoded list of dicts.
639 (binascii.unhexlify(metadata['h']) for (_, metadata) in files)) 668 body = [
640 assert (len(body) % self.algo().digest_size) == 0, repr(body) 669 {
670 'h': metadata['h'],
671 's': metadata['s'],
672 'i': 1 if metadata.get('priority') == '0' else 0,
673 } for _, metadata in files
674 ]
641 675
642 query_url = '%scontains/%s?token=%s' % ( 676 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
643 self.content_url, self.namespace, self.token) 677 self.base_url, self.namespace, urllib.quote(self.token))
644 response = url_read( 678 response_body = net.url_read(
645 query_url, data=body, content_type='application/octet-stream') 679 query_url,
646 if len(files) != len(response): 680 data=json.dumps(body, separators=(',', ':')),
681 content_type='application/json')
682 if response_body is None:
683 raise MappingError('Failed to execute /pre-upload query')
684
685 # Response body is a list of push_urls (or null if file is already present).
686 try:
687 response = json.loads(response_body)
688 if not isinstance(response, list):
689 raise ValueError('Expecting response with json-encoded list')
690 if len(response) != len(files):
691 raise ValueError(
692 'Incorrect number of items in the list, expected %d, '
693 'but got %d' % (len(files), len(response)))
694 except ValueError as err:
647 raise MappingError( 695 raise MappingError(
648 'Got an incorrect number of responses from the server. Expected %d, ' 696 'Invalid response from server: %s, body is %s' % (err, response_body))
649 'but got %d' % (len(files), len(response)))
650 697
651 # This implementation of IsolateServer doesn't use push_urls field, 698 # Convert response into a list of triplets with info about missing files.
652 # set it to None.
653 missing_files = [ 699 missing_files = [
654 files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00' 700 files[i] + (push_urls,) for i, push_urls in enumerate(response)
701 if push_urls
655 ] 702 ]
656 logging.info('Queried %d files, %d cache hit', 703 logging.info('Queried %d files, %d cache hit',
657 len(files), len(files) - len(missing_files)) 704 len(files), len(files) - len(missing_files))
658 return missing_files 705 return missing_files
659 706
660 def _upload_hash_content_to_blobstore(self, item, content):
661 """Uploads the content directly to the blobstore via a generated url."""
662 # TODO(maruel): Support large files. This would require streaming support.
663 gen_url = '%sgenerate_blobstore_url/%s/%s' % (
664 self.content_url, self.namespace, item)
665 # Token is guaranteed to be already quoted but it is unnecessary here, and
666 # only here.
667 data = [('token', urllib.unquote(self.token))]
668 content_type, body = encode_multipart_formdata(
669 data, [('content', item, content)])
670 last_url = gen_url
671 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
672 # Retry HTTP 50x here but not 404.
673 upload_url = net.url_read(gen_url, data=data)
674 if not upload_url:
675 raise MappingError('Unable to connect to server %s' % gen_url)
676 last_url = upload_url
677
678 # Do not retry this request on HTTP 50x. Regenerate an upload url each
679 # time since uploading "consumes" the upload url.
680 result = net.url_read(
681 upload_url, data=body, content_type=content_type, retry_50x=False)
682 if result is not None:
683 return result
684 raise MappingError('Unable to connect to server %s' % last_url)
685
686 707
687 class FileSystem(StorageApi): 708 class FileSystem(StorageApi):
688 """StorageApi implementation that fetches data from the file system. 709 """StorageApi implementation that fetches data from the file system.
689 710
690 The common use case is a NFS/CIFS file server that is mounted locally that is 711 The common use case is a NFS/CIFS file server that is mounted locally that is
691 used to fetch the file on a local partition. 712 used to fetch the file on a local partition.
692 """ 713 """
693 def __init__(self, base_path): 714 def __init__(self, base_path):
694 super(FileSystem, self).__init__() 715 super(FileSystem, self).__init__()
695 self.base_path = base_path 716 self.base_path = base_path
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
728 749
729 750
730 def is_namespace_with_compression(namespace): 751 def is_namespace_with_compression(namespace):
731 """Returns True if given |namespace| stores compressed objects.""" 752 """Returns True if given |namespace| stores compressed objects."""
732 return namespace.endswith(('-gzip', '-deflate')) 753 return namespace.endswith(('-gzip', '-deflate'))
733 754
734 755
735 def get_storage_api(file_or_url, namespace): 756 def get_storage_api(file_or_url, namespace):
736 """Returns an object that implements StorageApi interface.""" 757 """Returns an object that implements StorageApi interface."""
737 if re.match(r'^https?://.+$', file_or_url): 758 if re.match(r'^https?://.+$', file_or_url):
738 return IsolateServer(file_or_url, namespace) 759 return IsolateServerGS(file_or_url, namespace)
739 else: 760 else:
740 return FileSystem(file_or_url) 761 return FileSystem(file_or_url)
741 762
742 763
743 class WorkerPool(threading_utils.AutoRetryThreadPool): 764 class WorkerPool(threading_utils.AutoRetryThreadPool):
744 """Thread pool that automatically retries on IOError and runs a preconfigured 765 """Thread pool that automatically retries on IOError and runs a preconfigured
745 function. 766 function.
746 """ 767 """
747 # Initial and maximum number of worker threads. 768 # Initial and maximum number of worker threads.
748 INITIAL_WORKERS = 2 769 INITIAL_WORKERS = 2
(...skipping 553 matching lines...) Expand 10 before | Expand all | Expand 10 after
1302 sys.stderr.write(str(e)) 1323 sys.stderr.write(str(e))
1303 sys.stderr.write('\n') 1324 sys.stderr.write('\n')
1304 return 1 1325 return 1
1305 1326
1306 1327
1307 if __name__ == '__main__': 1328 if __name__ == '__main__':
1308 fix_encoding.fix_encoding() 1329 fix_encoding.fix_encoding()
1309 tools.disable_buffering() 1330 tools.disable_buffering()
1310 colorama.init() 1331 colorama.init()
1311 sys.exit(main(sys.argv[1:])) 1332 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698