Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(425)

Side by Side Diff: isolateserver.py

Issue 25093003: Client side implementation of new /content-gs isolate protocol. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | tests/isolateserver_smoke_test.py » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Archives a set of files to a server.""" 6 """Archives a set of files to a server."""
7 7
8 __version__ = '0.2' 8 __version__ = '0.2'
9 9
10 import binascii
11 import hashlib 10 import hashlib
12 import json 11 import json
13 import logging 12 import logging
14 import os 13 import os
15 import random
16 import re 14 import re
17 import sys 15 import sys
18 import threading 16 import threading
19 import time 17 import time
20 import urllib 18 import urllib
21 import zlib 19 import zlib
22 20
23 from third_party import colorama 21 from third_party import colorama
24 from third_party.depot_tools import fix_encoding 22 from third_party.depot_tools import fix_encoding
25 from third_party.depot_tools import subcommand 23 from third_party.depot_tools import subcommand
26 24
27 from utils import net 25 from utils import net
28 from utils import threading_utils 26 from utils import threading_utils
29 from utils import tools 27 from utils import tools
30 28
31 29
32 # The minimum size of files to upload directly to the blobstore. 30 # Version of isolate protocol passed to the server in /handshake request.
33 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 31 ISOLATE_PROTOCOL_VERSION = '1.0'
34 32
35 # The number of files to check the isolate server per /contains query. 33
34 # The number of files to check the isolate server per /pre-upload query.
36 # All files are sorted by likelihood of a change in the file content 35 # All files are sorted by likelihood of a change in the file content
37 # (currently file size is used to estimate this: larger the file -> larger the 36 # (currently file size is used to estimate this: larger the file -> larger the
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files 37 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
39 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], 38 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
40 # and so on. Numbers here is a trade-off; the more per request, the lower the 39 # and so on. Numbers here is a trade-off; the more per request, the lower the
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, 40 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42 # larger values cause longer lookups, increasing the initial latency to start 41 # larger values cause longer lookups, increasing the initial latency to start
43 # uploading, which is especially an issue for large files. This value is 42 # uploading, which is especially an issue for large files. This value is
44 # optimized for the "few thousands files to look up with minimal number of large 43 # optimized for the "few thousands files to look up with minimal number of large
45 # files missing" case. 44 # files missing" case.
46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] 45 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
47 46
48 47
49 # A list of already compressed extension types that should not receive any 48 # A list of already compressed extension types that should not receive any
50 # compression before being uploaded. 49 # compression before being uploaded.
51 ALREADY_COMPRESSED_TYPES = [ 50 ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', 51 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip' 52 'wav', 'zip'
54 ] 53 ]
55 54
56 55
57 # The file size to be used when we don't know the correct file size, 56 # The file size to be used when we don't know the correct file size,
58 # generally used for .isolated files. 57 # generally used for .isolated files.
59 UNKNOWN_FILE_SIZE = None 58 UNKNOWN_FILE_SIZE = None
60 59
61 60
62 # The size of each chunk to read when downloading and unzipping files. 61 # The size of each chunk to read when downloading and unzipping files.
63 ZIPPED_FILE_CHUNK = 16 * 1024 62 ZIPPED_FILE_CHUNK = 16 * 1024
64 63
65
66 # Chunk size to use when doing disk I/O. 64 # Chunk size to use when doing disk I/O.
67 DISK_FILE_CHUNK = 1024 * 1024 65 DISK_FILE_CHUNK = 1024 * 1024
68 66
67 # Chunk size to use when reading from network stream.
68 NET_IO_FILE_CHUNK = 16 * 1024
69
69 70
70 # Read timeout in seconds for downloads from isolate storage. If there's no 71 # Read timeout in seconds for downloads from isolate storage. If there's no
71 # response from the server within this timeout whole download will be aborted. 72 # response from the server within this timeout whole download will be aborted.
72 DOWNLOAD_READ_TIMEOUT = 60 73 DOWNLOAD_READ_TIMEOUT = 60
73 74
74 # Maximum expected delay (in seconds) between successive file fetches 75 # Maximum expected delay (in seconds) between successive file fetches
75 # in run_tha_test. If it takes longer than that, a deadlock might be happening 76 # in run_tha_test. If it takes longer than that, a deadlock might be happening
76 # and all stack frames for all threads are dumped to log. 77 # and all stack frames for all threads are dumped to log.
77 DEADLOCK_TIMEOUT = 5 * 60 78 DEADLOCK_TIMEOUT = 5 * 60
78 79
(...skipping 20 matching lines...) Expand all
99 class ConfigError(ValueError): 100 class ConfigError(ValueError):
100 """Generic failure to load a .isolated file.""" 101 """Generic failure to load a .isolated file."""
101 pass 102 pass
102 103
103 104
104 class MappingError(OSError): 105 class MappingError(OSError):
105 """Failed to recreate the tree.""" 106 """Failed to recreate the tree."""
106 pass 107 pass
107 108
108 109
109 def randomness():
110 """Generates low-entropy randomness for MIME encoding.
111
112 Exists so it can be mocked out in unit tests.
113 """
114 return str(time.time())
115
116
117 def encode_multipart_formdata(fields, files,
118 mime_mapper=lambda _: 'application/octet-stream'):
119 """Encodes a Multipart form data object.
120
121 Args:
122 fields: a sequence (name, value) elements for
123 regular form fields.
124 files: a sequence of (name, filename, value) elements for data to be
125 uploaded as files.
126 mime_mapper: function to return the mime type from the filename.
127 Returns:
128 content_type: for httplib.HTTP instance
129 body: for httplib.HTTP instance
130 """
131 boundary = hashlib.md5(randomness()).hexdigest()
132 body_list = []
133 for (key, value) in fields:
134 if isinstance(key, unicode):
135 value = key.encode('utf-8')
136 if isinstance(value, unicode):
137 value = value.encode('utf-8')
138 body_list.append('--' + boundary)
139 body_list.append('Content-Disposition: form-data; name="%s"' % key)
140 body_list.append('')
141 body_list.append(value)
142 body_list.append('--' + boundary)
143 body_list.append('')
144 for (key, filename, value) in files:
145 if isinstance(key, unicode):
146 value = key.encode('utf-8')
147 if isinstance(filename, unicode):
148 value = filename.encode('utf-8')
149 if isinstance(value, unicode):
150 value = value.encode('utf-8')
151 body_list.append('--' + boundary)
152 body_list.append('Content-Disposition: form-data; name="%s"; '
153 'filename="%s"' % (key, filename))
154 body_list.append('Content-Type: %s' % mime_mapper(filename))
155 body_list.append('')
156 body_list.append(value)
157 body_list.append('--' + boundary)
158 body_list.append('')
159 if body_list:
160 body_list[-2] += '--'
161 body = '\r\n'.join(body_list)
162 content_type = 'multipart/form-data; boundary=%s' % boundary
163 return content_type, body
164
165
166 def is_valid_hash(value, algo): 110 def is_valid_hash(value, algo):
167 """Returns if the value is a valid hash for the corresponding algorithm.""" 111 """Returns if the value is a valid hash for the corresponding algorithm."""
168 size = 2 * algo().digest_size 112 size = 2 * algo().digest_size
169 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) 113 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
170 114
171 115
172 def hash_file(filepath, algo): 116 def hash_file(filepath, algo):
173 """Calculates the hash of a file without reading it all in memory at once. 117 """Calculates the hash of a file without reading it all in memory at once.
174 118
175 |algo| should be one of hashlib hashing algorithm. 119 |algo| should be one of hashlib hashing algorithm.
176 """ 120 """
177 digest = algo() 121 digest = algo()
178 with open(filepath, 'rb') as f: 122 with open(filepath, 'rb') as f:
179 while True: 123 while True:
180 chunk = f.read(DISK_FILE_CHUNK) 124 chunk = f.read(DISK_FILE_CHUNK)
181 if not chunk: 125 if not chunk:
182 break 126 break
183 digest.update(chunk) 127 digest.update(chunk)
184 return digest.hexdigest() 128 return digest.hexdigest()
185 129
186 130
131 def stream_read(stream, chunk_size):
132 """Reads chunks from |stream| and yields them."""
133 while True:
134 data = stream.read(chunk_size)
135 if not data:
136 break
137 yield data
138
139
187 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): 140 def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
188 """Yields file content in chunks of given |chunk_size|.""" 141 """Yields file content in chunks of given |chunk_size|."""
189 with open(filepath, 'rb') as f: 142 with open(filepath, 'rb') as f:
190 while True: 143 while True:
191 data = f.read(chunk_size) 144 data = f.read(chunk_size)
192 if not data: 145 if not data:
193 break 146 break
194 yield data 147 yield data
195 148
196 149
(...skipping 22 matching lines...) Expand all
219 compressor = zlib.compressobj(level) 172 compressor = zlib.compressobj(level)
220 for chunk in content_generator: 173 for chunk in content_generator:
221 compressed = compressor.compress(chunk) 174 compressed = compressor.compress(chunk)
222 if compressed: 175 if compressed:
223 yield compressed 176 yield compressed
224 tail = compressor.flush(zlib.Z_FINISH) 177 tail = compressor.flush(zlib.Z_FINISH)
225 if tail: 178 if tail:
226 yield tail 179 yield tail
227 180
228 181
182 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
183 """Reads zipped data from |content_generator| and yields decompressed data.
184
185 Decompresses data in small chunks (no larger than |chunk_size|) so that
186 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
187
188 Raises IOError if data is corrupted or incomplete.
189 """
190 decompressor = zlib.decompressobj()
191 compressed_size = 0
192 try:
193 for chunk in content_generator:
194 compressed_size += len(chunk)
195 data = decompressor.decompress(chunk, chunk_size)
196 if data:
197 yield data
198 while decompressor.unconsumed_tail:
199 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
200 if data:
201 yield data
202 tail = decompressor.flush()
203 if tail:
204 yield tail
205 except zlib.error as e:
206 raise IOError(
207 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
208 # Ensure all data was read and decompressed.
209 if decompressor.unused_data or decompressor.unconsumed_tail:
210 raise IOError('Not all data was decompressed')
211
212
229 def get_zip_compression_level(filename): 213 def get_zip_compression_level(filename):
230 """Given a filename calculates the ideal zip compression level to use.""" 214 """Given a filename calculates the ideal zip compression level to use."""
231 file_ext = os.path.splitext(filename)[1].lower() 215 file_ext = os.path.splitext(filename)[1].lower()
232 # TODO(csharp): Profile to find what compression level works best. 216 # TODO(csharp): Profile to find what compression level works best.
233 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 217 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
234 218
235 219
236 def create_directories(base_directory, files): 220 def create_directories(base_directory, files):
237 """Creates the directory structure needed by the given list of files.""" 221 """Creates the directory structure needed by the given list of files."""
238 logging.debug('create_directories(%s, %d)', base_directory, len(files)) 222 logging.debug('create_directories(%s, %d)', base_directory, len(files))
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
294 278
295 279
296 def try_remove(filepath): 280 def try_remove(filepath):
297 """Removes a file without crashing even if it doesn't exist.""" 281 """Removes a file without crashing even if it doesn't exist."""
298 try: 282 try:
299 os.remove(filepath) 283 os.remove(filepath)
300 except OSError: 284 except OSError:
301 pass 285 pass
302 286
303 287
304 def url_read(url, **kwargs):
305 result = net.url_read(url, **kwargs)
306 if result is None:
307 # If we get no response from the server, assume it is down and raise an
308 # exception.
309 raise MappingError('Unable to connect to server %s' % url)
310 return result
311
312
313 class Storage(object): 288 class Storage(object):
314 """Efficiently downloads or uploads large set of files via StorageApi.""" 289 """Efficiently downloads or uploads large set of files via StorageApi."""
315 290
316 def __init__(self, storage_api, use_zip): 291 def __init__(self, storage_api, use_zip):
317 self.use_zip = use_zip 292 self.use_zip = use_zip
318 self._storage_api = storage_api 293 self._storage_api = storage_api
319 self._cpu_thread_pool = None 294 self._cpu_thread_pool = None
320 self._net_thread_pool = None 295 self._net_thread_pool = None
321 296
322 @property 297 @property
(...skipping 210 matching lines...) Expand 10 before | Expand all | Expand 10 after
533 files: list of pairs (file name, metadata dict). 508 files: list of pairs (file name, metadata dict).
534 509
535 Returns: 510 Returns:
536 A list of files missing on server as a list of triplets 511 A list of files missing on server as a list of triplets
537 (file name, metadata dict, push_urls object to pass to push). 512 (file name, metadata dict, push_urls object to pass to push).
538 """ 513 """
539 raise NotImplementedError() 514 raise NotImplementedError()
540 515
541 516
542 class IsolateServer(StorageApi): 517 class IsolateServer(StorageApi):
543 """StorageApi implementation that downloads and uploads to Isolate Server.""" 518 """StorageApi implementation that downloads and uploads to Isolate Server.
519
520 It uploads and downloads directly from Google Storage whenever appropriate.
521 """
522
544 def __init__(self, base_url, namespace): 523 def __init__(self, base_url, namespace):
545 super(IsolateServer, self).__init__() 524 super(IsolateServer, self).__init__()
546 assert base_url.startswith('http'), base_url 525 assert base_url.startswith('http'), base_url
547 self.content_url = base_url.rstrip('/') + '/content/' 526 self.base_url = base_url.rstrip('/')
548 self.namespace = namespace 527 self.namespace = namespace
549 self.algo = get_hash_algo(namespace) 528 self.algo = get_hash_algo(namespace)
550 self._token = None 529 self._use_zip = is_namespace_with_compression(namespace)
M-A Ruel 2013/09/28 01:42:58 No need to do this right now; I'd like it to be a
Vadim Sh. 2013/09/30 19:10:51 It's literally 3 lines of code, that are gonna be
551 self._lock = threading.Lock() 530 self._lock = threading.Lock()
531 self._server_caps = None
532
533 @staticmethod
534 def generate_handshake_request():
535 """Returns a dict to be sent as handshake request body."""
536 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
537 return {
538 'protocol_version': ISOLATE_PROTOCOL_VERSION,
M-A Ruel 2013/09/28 01:42:58 I think it's better to sort the keys
Vadim Sh. 2013/09/30 19:10:51 Done.
539 'client_app_version': __version__,
540 'fetcher': True,
541 'pusher': True,
542 }
543
544 @staticmethod
545 def validate_handshake_response(caps):
546 """Validates and normalizes handshake response."""
547 logging.info('Protocol version: %s', caps['protocol_version'])
548 logging.info('Server version: %s', caps['server_app_version'])
549 if caps.get('error'):
550 raise MappingError(caps['error'])
551 if not caps['access_token']:
552 raise ValueError('access_token is missing')
553 return caps
552 554
553 @property 555 @property
554 def token(self): 556 def server_capabilities(self):
557 """Performs handshake with the server if not yet done.
558
559 Returns:
560 Server capabilities dictionary as returned by /handshake endpoint.
561
562 Raises:
563 MappingError if server rejects the handshake.
564 """
555 # TODO(maruel): Make this request much earlier asynchronously while the 565 # TODO(maruel): Make this request much earlier asynchronously while the
556 # files are being enumerated. 566 # files are being enumerated.
557 with self._lock: 567 with self._lock:
558 if not self._token: 568 if self._server_caps is None:
559 self._token = urllib.quote(url_read(self.content_url + 'get_token')) 569 request_body = json.dumps(
560 return self._token 570 self.generate_handshake_request(), separators=(',', ':'))
571 response = net.url_read(
572 url=self.base_url + '/content-gs/handshake',
573 data=request_body,
574 content_type='application/json',
575 method='POST')
576 if response is None:
577 raise MappingError('Failed to perform handshake.')
578 try:
579 caps = json.loads(response)
580 if not isinstance(caps, dict):
581 raise ValueError('Expecting JSON dict')
582 self._server_caps = self.validate_handshake_response(caps)
583 except (ValueError, KeyError, TypeError) as exc:
584 # KeyError exception has very confusing str conversion: it's just a
585 # missing key value and nothing else. So print exception class name
586 # as well.
587 raise MappingError('Invalid handshake response (%s): %s' % (
588 exc.__class__.__name__, exc))
589 return self._server_caps
561 590
562 def fetch(self, item, expected_size): 591 def fetch(self, item, expected_size):
563 assert isinstance(item, basestring) 592 assert isinstance(item, basestring)
564 assert ( 593 assert (isinstance(expected_size, (int, long)) or
565 isinstance(expected_size, (int, long)) or
566 expected_size == UNKNOWN_FILE_SIZE) 594 expected_size == UNKNOWN_FILE_SIZE)
567 zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item) 595
568 logging.debug('download_file(%s)', zipped_url) 596 source_url = '%s/content-gs/retrieve/%s/%s' % (
597 self.base_url, self.namespace, item)
598 logging.debug('download_file(%s)', source_url)
569 599
570 # Because the app engine DB is only eventually consistent, retry 404 errors 600 # Because the app engine DB is only eventually consistent, retry 404 errors
571 # because the file might just not be visible yet (even though it has been 601 # because the file might just not be visible yet (even though it has been
572 # uploaded). 602 # uploaded).
573 connection = net.url_open( 603 connection = net.url_open(
574 zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) 604 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
575 if not connection: 605 if not connection:
576 raise IOError('Unable to open connection to %s' % zipped_url) 606 raise IOError('Unable to open connection to %s' % source_url)
577 607
578 # TODO(maruel): Must only decompress when needed.
579 decompressor = zlib.decompressobj()
580 try: 608 try:
581 compressed_size = 0 609 # Prepare reading pipeline.
582 decompressed_size = 0 610 generator = stream_read(connection, NET_IO_FILE_CHUNK)
583 while True: 611 if self._use_zip:
584 chunk = connection.read(ZIPPED_FILE_CHUNK) 612 generator = zip_decompress(generator, DISK_FILE_CHUNK)
585 if not chunk:
586 break
587 compressed_size += len(chunk)
588 decompressed = decompressor.decompress(chunk)
589 decompressed_size += len(decompressed)
590 yield decompressed
591 613
592 # Ensure that all the data was properly decompressed. 614 # Read and yield data, calculate total length of the decompressed stream.
593 uncompressed_data = decompressor.flush() 615 total_size = 0
594 if uncompressed_data: 616 for chunk in generator:
595 raise IOError('Decompression failed') 617 total_size += len(chunk)
596 if (expected_size != UNKNOWN_FILE_SIZE and 618 yield chunk
597 decompressed_size != expected_size):
598 raise IOError('File incorrect size after download of %s. Got %s and '
599 'expected %s' % (item, decompressed_size, expected_size))
600 except zlib.error as e:
601 msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % (
602 item, compressed_size, connection.content_length, e)
603 logging.warning(msg)
604 619
605 # Testing seems to show that if a few machines are trying to download 620 # Verify data length matches expectation.
606 # the same blob, they can cause each other to fail. So if we hit a zip 621 if expected_size != UNKNOWN_FILE_SIZE and total_size != expected_size:
607 # error, this is the most likely cause (it only downloads some of the 622 raise IOError('Incorrect file size: expected %d, got %d' % (
608 # data). Randomly sleep for between 5 and 25 seconds to try and spread 623 expected_size, total_size))
609 # out the downloads. 624
610 sleep_duration = (random.random() * 20) + 5 625 except IOError as err:
611 time.sleep(sleep_duration) 626 logging.warning('Failed to fetch %s: %s', item, err)
612 raise IOError(msg) 627 raise
613 628
614 def push(self, item, expected_size, content_generator, push_urls=None): 629 def push(self, item, expected_size, content_generator, push_urls=None):
615 assert isinstance(item, basestring) 630 assert isinstance(item, basestring)
616 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 631 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
617 item = str(item) 632 assert push_urls and len(push_urls) == 2
M-A Ruel 2013/09/28 01:42:58 I'm confused. push_urls defaults to None. I'd lik
Vadim Sh. 2013/09/30 19:10:51 It's stupid pylinter :) It requires method overrid
633 upload_url, finalize_url = push_urls
618 634
619 # TODO(maruel): Support large files. This would require streaming support. 635 # TODO(maruel): Support large files. This would require streaming support.
620 636
637 # TODO(vadimsh): Do not read from |content_generator| when retrying push.
638 # If |content_generator| is indeed a generator, it can not be re-winded back
639 # to the beginning of the stream. A retry will find it exhausted. A possible
640 # solution is to wrap content_generator with some sort of caching
641 # restartable generator. It should be done alongside streaming support
642 # implementation.
643
621 # A cheese way to avoid memcpy of (possibly huge) file, until streaming 644 # A cheese way to avoid memcpy of (possibly huge) file, until streaming
622 # upload support is implemented. 645 # upload support is implemented.
623 if isinstance(content_generator, list) and len(content_generator) == 1: 646 if isinstance(content_generator, list) and len(content_generator) == 1:
624 content = content_generator[0] 647 content = content_generator[0]
625 else: 648 else:
626 content = ''.join(content_generator) 649 content = ''.join(content_generator)
627 650
628 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE: 651 # PUT file to |upload_url|.
629 return self._upload_hash_content_to_blobstore(item, content) 652 response = net.url_read(
653 url=upload_url,
654 data=content,
655 content_type='application/octet-stream',
656 method='PUT')
657 if response is None:
658 raise IOError('Failed to upload a file %s to %s' % (item, upload_url))
630 659
631 url = '%sstore/%s/%s?token=%s' % ( 660 # Optionally notify the server that it's done.
632 self.content_url, self.namespace, item, self.token) 661 if finalize_url:
633 return url_read(url, data=content, content_type='application/octet-stream') 662 # TODO(vadimsh): Calculate MD5 sum while uploading a file and send it to
663 # isolated server. That way isolate server can verify that the data safely
664 # reached Google Storage (GS provides MD5 of stored files).
665 response = net.url_read(
666 url=finalize_url,
667 data='',
668 content_type='application/json',
669 method='POST')
670 # TODO(vadimsh): Do not reupload item again if only finalize_url request
671 # failed.
672 if response is None:
673 raise IOError('Failed to finalize an upload of %s' % item)
634 674
635 def contains(self, files): 675 def contains(self, files):
636 logging.info('Checking existence of %d files...', len(files)) 676 logging.info('Checking existence of %d files...', len(files))
637 677
638 body = ''.join( 678 # Request body is a json encoded list of dicts.
639 (binascii.unhexlify(metadata['h']) for (_, metadata) in files)) 679 body = [
640 assert (len(body) % self.algo().digest_size) == 0, repr(body) 680 {
681 'h': metadata['h'],
682 's': metadata['s'],
683 'i': 1 if metadata.get('priority') == '0' else 0,
684 } for _, metadata in files
685 ]
641 686
642 query_url = '%scontains/%s?token=%s' % ( 687 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
643 self.content_url, self.namespace, self.token) 688 self.base_url,
644 response = url_read( 689 self.namespace,
645 query_url, data=body, content_type='application/octet-stream') 690 urllib.quote(self.server_capabilities['access_token']))
646 if len(files) != len(response): 691 response_body = net.url_read(
692 url=query_url,
693 data=json.dumps(body, separators=(',', ':')),
694 content_type='application/json',
695 method='POST')
696 if response_body is None:
697 raise MappingError('Failed to execute /pre-upload query')
698
699 # Response body is a list of push_urls (or null if file is already present).
700 try:
701 response = json.loads(response_body)
702 if not isinstance(response, list):
703 raise ValueError('Expecting response with json-encoded list')
704 if len(response) != len(files):
705 raise ValueError(
706 'Incorrect number of items in the list, expected %d, '
707 'but got %d' % (len(files), len(response)))
708 except ValueError as err:
647 raise MappingError( 709 raise MappingError(
648 'Got an incorrect number of responses from the server. Expected %d, ' 710 'Invalid response from server: %s, body is %s' % (err, response_body))
649 'but got %d' % (len(files), len(response)))
650 711
651 # This implementation of IsolateServer doesn't use push_urls field, 712 # Convert response into a list of triplets with info about missing files.
652 # set it to None.
653 missing_files = [ 713 missing_files = [
654 files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00' 714 files[i] + (push_urls,) for i, push_urls in enumerate(response)
715 if push_urls
655 ] 716 ]
656 logging.info('Queried %d files, %d cache hit', 717 logging.info('Queried %d files, %d cache hit',
657 len(files), len(files) - len(missing_files)) 718 len(files), len(files) - len(missing_files))
658 return missing_files 719 return missing_files
659 720
660 def _upload_hash_content_to_blobstore(self, item, content):
661 """Uploads the content directly to the blobstore via a generated url."""
662 # TODO(maruel): Support large files. This would require streaming support.
663 gen_url = '%sgenerate_blobstore_url/%s/%s' % (
664 self.content_url, self.namespace, item)
665 # Token is guaranteed to be already quoted but it is unnecessary here, and
666 # only here.
667 data = [('token', urllib.unquote(self.token))]
668 content_type, body = encode_multipart_formdata(
669 data, [('content', item, content)])
670 last_url = gen_url
671 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
672 # Retry HTTP 50x here but not 404.
673 upload_url = net.url_read(gen_url, data=data)
674 if not upload_url:
675 raise MappingError('Unable to connect to server %s' % gen_url)
676 last_url = upload_url
677
678 # Do not retry this request on HTTP 50x. Regenerate an upload url each
679 # time since uploading "consumes" the upload url.
680 result = net.url_read(
681 upload_url, data=body, content_type=content_type, retry_50x=False)
682 if result is not None:
683 return result
684 raise MappingError('Unable to connect to server %s' % last_url)
685
686 721
687 class FileSystem(StorageApi): 722 class FileSystem(StorageApi):
688 """StorageApi implementation that fetches data from the file system. 723 """StorageApi implementation that fetches data from the file system.
689 724
690 The common use case is a NFS/CIFS file server that is mounted locally that is 725 The common use case is a NFS/CIFS file server that is mounted locally that is
691 used to fetch the file on a local partition. 726 used to fetch the file on a local partition.
692 """ 727 """
693 def __init__(self, base_path): 728 def __init__(self, base_path):
694 super(FileSystem, self).__init__() 729 super(FileSystem, self).__init__()
695 self.base_path = base_path 730 self.base_path = base_path
(...skipping 606 matching lines...) Expand 10 before | Expand all | Expand 10 after
1302 sys.stderr.write(str(e)) 1337 sys.stderr.write(str(e))
1303 sys.stderr.write('\n') 1338 sys.stderr.write('\n')
1304 return 1 1339 return 1
1305 1340
1306 1341
1307 if __name__ == '__main__': 1342 if __name__ == '__main__':
1308 fix_encoding.fix_encoding() 1343 fix_encoding.fix_encoding()
1309 tools.disable_buffering() 1344 tools.disable_buffering()
1310 colorama.init() 1345 colorama.init()
1311 sys.exit(main(sys.argv[1:])) 1346 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | tests/isolateserver_smoke_test.py » ('J')

Powered by Google App Engine
This is Rietveld 408576698