Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(322)

Side by Side Diff: isolateserver.py

Issue 25093003: Client side implementation of new /content-gs isolate protocol. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: PendingPush stuff Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Archives a set of files to a server.""" 6 """Archives a set of files to a server."""
7 7
8 __version__ = '0.2' 8 __version__ = '0.2'
9 9
10 import binascii
11 import hashlib 10 import hashlib
12 import json 11 import json
13 import logging 12 import logging
14 import os 13 import os
15 import random
16 import re 14 import re
17 import sys 15 import sys
18 import threading 16 import threading
19 import time 17 import time
20 import urllib 18 import urllib
21 import zlib 19 import zlib
22 20
23 from third_party import colorama 21 from third_party import colorama
24 from third_party.depot_tools import fix_encoding 22 from third_party.depot_tools import fix_encoding
25 from third_party.depot_tools import subcommand 23 from third_party.depot_tools import subcommand
26 24
27 from utils import net 25 from utils import net
28 from utils import threading_utils 26 from utils import threading_utils
29 from utils import tools 27 from utils import tools
30 28
31 29
32 # The minimum size of files to upload directly to the blobstore. 30 # Version of isolate protocol passed to the server in /handshake request.
33 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 31 ISOLATE_PROTOCOL_VERSION = '1.0'
34 32
35 # The number of files to check the isolate server per /contains query. 33
34 # The number of files to check the isolate server per /pre-upload query.
36 # All files are sorted by likelihood of a change in the file content 35 # All files are sorted by likelihood of a change in the file content
37 # (currently file size is used to estimate this: larger the file -> larger the 36 # (currently file size is used to estimate this: larger the file -> larger the
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files 37 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
39 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], 38 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
40 # and so on. Numbers here is a trade-off; the more per request, the lower the 39 # and so on. Numbers here is a trade-off; the more per request, the lower the
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, 40 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42 # larger values cause longer lookups, increasing the initial latency to start 41 # larger values cause longer lookups, increasing the initial latency to start
43 # uploading, which is especially an issue for large files. This value is 42 # uploading, which is especially an issue for large files. This value is
44 # optimized for the "few thousands files to look up with minimal number of large 43 # optimized for the "few thousands files to look up with minimal number of large
45 # files missing" case. 44 # files missing" case.
46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] 45 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
47 46
48 47
49 # A list of already compressed extension types that should not receive any 48 # A list of already compressed extension types that should not receive any
50 # compression before being uploaded. 49 # compression before being uploaded.
51 ALREADY_COMPRESSED_TYPES = [ 50 ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', 51 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip' 52 'wav', 'zip'
54 ] 53 ]
55 54
56 55
57 # The file size to be used when we don't know the correct file size, 56 # The file size to be used when we don't know the correct file size,
58 # generally used for .isolated files. 57 # generally used for .isolated files.
59 UNKNOWN_FILE_SIZE = None 58 UNKNOWN_FILE_SIZE = None
60 59
61 60
62 # The size of each chunk to read when downloading and unzipping files. 61 # The size of each chunk to read when downloading and unzipping files.
63 ZIPPED_FILE_CHUNK = 16 * 1024 62 ZIPPED_FILE_CHUNK = 16 * 1024
64 63
65
66 # Chunk size to use when doing disk I/O. 64 # Chunk size to use when doing disk I/O.
67 DISK_FILE_CHUNK = 1024 * 1024 65 DISK_FILE_CHUNK = 1024 * 1024
68 66
67 # Chunk size to use when reading from network stream.
68 NET_IO_FILE_CHUNK = 16 * 1024
69
69 70
70 # Read timeout in seconds for downloads from isolate storage. If there's no 71 # Read timeout in seconds for downloads from isolate storage. If there's no
71 # response from the server within this timeout whole download will be aborted. 72 # response from the server within this timeout whole download will be aborted.
72 DOWNLOAD_READ_TIMEOUT = 60 73 DOWNLOAD_READ_TIMEOUT = 60
73 74
74 # Maximum expected delay (in seconds) between successive file fetches 75 # Maximum expected delay (in seconds) between successive file fetches
75 # in run_tha_test. If it takes longer than that, a deadlock might be happening 76 # in run_tha_test. If it takes longer than that, a deadlock might be happening
76 # and all stack frames for all threads are dumped to log. 77 # and all stack frames for all threads are dumped to log.
77 DEADLOCK_TIMEOUT = 5 * 60 78 DEADLOCK_TIMEOUT = 5 * 60
78 79
(...skipping 20 matching lines...) Expand all
99 class ConfigError(ValueError): 100 class ConfigError(ValueError):
100 """Generic failure to load a .isolated file.""" 101 """Generic failure to load a .isolated file."""
101 pass 102 pass
102 103
103 104
104 class MappingError(OSError): 105 class MappingError(OSError):
105 """Failed to recreate the tree.""" 106 """Failed to recreate the tree."""
106 pass 107 pass
107 108
108 109
109 def randomness():
110 """Generates low-entropy randomness for MIME encoding.
111
112 Exists so it can be mocked out in unit tests.
113 """
114 return str(time.time())
115
116
117 def encode_multipart_formdata(fields, files,
118 mime_mapper=lambda _: 'application/octet-stream'):
119 """Encodes a Multipart form data object.
120
121 Args:
122 fields: a sequence (name, value) elements for
123 regular form fields.
124 files: a sequence of (name, filename, value) elements for data to be
125 uploaded as files.
126 mime_mapper: function to return the mime type from the filename.
127 Returns:
128 content_type: for httplib.HTTP instance
129 body: for httplib.HTTP instance
130 """
131 boundary = hashlib.md5(randomness()).hexdigest()
132 body_list = []
133 for (key, value) in fields:
134 if isinstance(key, unicode):
135 value = key.encode('utf-8')
136 if isinstance(value, unicode):
137 value = value.encode('utf-8')
138 body_list.append('--' + boundary)
139 body_list.append('Content-Disposition: form-data; name="%s"' % key)
140 body_list.append('')
141 body_list.append(value)
142 body_list.append('--' + boundary)
143 body_list.append('')
144 for (key, filename, value) in files:
145 if isinstance(key, unicode):
146 value = key.encode('utf-8')
147 if isinstance(filename, unicode):
148 value = filename.encode('utf-8')
149 if isinstance(value, unicode):
150 value = value.encode('utf-8')
151 body_list.append('--' + boundary)
152 body_list.append('Content-Disposition: form-data; name="%s"; '
153 'filename="%s"' % (key, filename))
154 body_list.append('Content-Type: %s' % mime_mapper(filename))
155 body_list.append('')
156 body_list.append(value)
157 body_list.append('--' + boundary)
158 body_list.append('')
159 if body_list:
160 body_list[-2] += '--'
161 body = '\r\n'.join(body_list)
162 content_type = 'multipart/form-data; boundary=%s' % boundary
163 return content_type, body
164
165
166 def is_valid_hash(value, algo): 110 def is_valid_hash(value, algo):
167 """Returns if the value is a valid hash for the corresponding algorithm.""" 111 """Returns if the value is a valid hash for the corresponding algorithm."""
168 size = 2 * algo().digest_size 112 size = 2 * algo().digest_size
169 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) 113 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
170 114
171 115
172 def hash_file(filepath, algo): 116 def hash_file(filepath, algo):
173 """Calculates the hash of a file without reading it all in memory at once. 117 """Calculates the hash of a file without reading it all in memory at once.
174 118
175 |algo| should be one of hashlib hashing algorithm. 119 |algo| should be one of hashlib hashing algorithm.
176 """ 120 """
177 digest = algo() 121 digest = algo()
178 with open(filepath, 'rb') as f: 122 with open(filepath, 'rb') as f:
179 while True: 123 while True:
180 chunk = f.read(DISK_FILE_CHUNK) 124 chunk = f.read(DISK_FILE_CHUNK)
181 if not chunk: 125 if not chunk:
182 break 126 break
183 digest.update(chunk) 127 digest.update(chunk)
184 return digest.hexdigest() 128 return digest.hexdigest()
185 129
186 130
131 def stream_read(stream, chunk_size):
132 """Reads chunks from |stream| and yields them."""
133 while True:
134 data = stream.read(chunk_size)
135 if not data:
136 break
137 yield data
138
139
187 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): 140 def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
188 """Yields file content in chunks of given |chunk_size|.""" 141 """Yields file content in chunks of given |chunk_size|."""
189 with open(filepath, 'rb') as f: 142 with open(filepath, 'rb') as f:
190 while True: 143 while True:
191 data = f.read(chunk_size) 144 data = f.read(chunk_size)
192 if not data: 145 if not data:
193 break 146 break
194 yield data 147 yield data
195 148
196 149
(...skipping 22 matching lines...) Expand all
219 compressor = zlib.compressobj(level) 172 compressor = zlib.compressobj(level)
220 for chunk in content_generator: 173 for chunk in content_generator:
221 compressed = compressor.compress(chunk) 174 compressed = compressor.compress(chunk)
222 if compressed: 175 if compressed:
223 yield compressed 176 yield compressed
224 tail = compressor.flush(zlib.Z_FINISH) 177 tail = compressor.flush(zlib.Z_FINISH)
225 if tail: 178 if tail:
226 yield tail 179 yield tail
227 180
228 181
182 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
183 """Reads zipped data from |content_generator| and yields decompressed data.
184
185 Decompresses data in small chunks (no larger than |chunk_size|) so that
186 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
187
188 Raises IOError if data is corrupted or incomplete.
189 """
190 decompressor = zlib.decompressobj()
191 compressed_size = 0
192 try:
193 for chunk in content_generator:
194 compressed_size += len(chunk)
195 data = decompressor.decompress(chunk, chunk_size)
196 if data:
197 yield data
198 while decompressor.unconsumed_tail:
199 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
200 if data:
201 yield data
202 tail = decompressor.flush()
203 if tail:
204 yield tail
205 except zlib.error as e:
206 raise IOError(
207 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
208 # Ensure all data was read and decompressed.
209 if decompressor.unused_data or decompressor.unconsumed_tail:
210 raise IOError('Not all data was decompressed')
211
212
229 def get_zip_compression_level(filename): 213 def get_zip_compression_level(filename):
230 """Given a filename calculates the ideal zip compression level to use.""" 214 """Given a filename calculates the ideal zip compression level to use."""
231 file_ext = os.path.splitext(filename)[1].lower() 215 file_ext = os.path.splitext(filename)[1].lower()
232 # TODO(csharp): Profile to find what compression level works best. 216 # TODO(csharp): Profile to find what compression level works best.
233 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 217 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
234 218
235 219
236 def create_directories(base_directory, files): 220 def create_directories(base_directory, files):
237 """Creates the directory structure needed by the given list of files.""" 221 """Creates the directory structure needed by the given list of files."""
238 logging.debug('create_directories(%s, %d)', base_directory, len(files)) 222 logging.debug('create_directories(%s, %d)', base_directory, len(files))
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
294 278
295 279
296 def try_remove(filepath): 280 def try_remove(filepath):
297 """Removes a file without crashing even if it doesn't exist.""" 281 """Removes a file without crashing even if it doesn't exist."""
298 try: 282 try:
299 os.remove(filepath) 283 os.remove(filepath)
300 except OSError: 284 except OSError:
301 pass 285 pass
302 286
303 287
304 def url_read(url, **kwargs):
305 result = net.url_read(url, **kwargs)
306 if result is None:
307 # If we get no response from the server, assume it is down and raise an
308 # exception.
309 raise MappingError('Unable to connect to server %s' % url)
310 return result
311
312
313 class Storage(object): 288 class Storage(object):
314 """Efficiently downloads or uploads large set of files via StorageApi.""" 289 """Efficiently downloads or uploads large set of files via StorageApi."""
315 290
316 def __init__(self, storage_api, use_zip): 291 def __init__(self, storage_api, use_zip):
317 self.use_zip = use_zip 292 self.use_zip = use_zip
318 self._storage_api = storage_api 293 self._storage_api = storage_api
319 self._cpu_thread_pool = None 294 self._cpu_thread_pool = None
320 self._net_thread_pool = None 295 self._net_thread_pool = None
321 296
322 @property 297 @property
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
359 334
360 Arguments: 335 Arguments:
361 indir: root directory the infiles are based in. 336 indir: root directory the infiles are based in.
362 infiles: dict of files to upload from |indir|. 337 infiles: dict of files to upload from |indir|.
363 """ 338 """
364 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles)) 339 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
365 340
366 # Enqueue all upload tasks. 341 # Enqueue all upload tasks.
367 channel = threading_utils.TaskChannel() 342 channel = threading_utils.TaskChannel()
368 missing = [] 343 missing = []
369 for filename, metadata, push_urls in self.get_missing_files(infiles): 344 for pending_push in self.get_missing_files(infiles):
370 missing.append((filename, metadata)) 345 missing.append(pending_push)
371 path = os.path.join(indir, filename) 346 path = os.path.join(indir, pending_push.filename)
372 if metadata.get('priority', '1') == '0': 347 if metadata.get('priority', '1') == '0':
373 priority = WorkerPool.HIGH 348 priority = WorkerPool.HIGH
374 else: 349 else:
375 priority = WorkerPool.MED 350 priority = WorkerPool.MED
376 compression_level = get_zip_compression_level(path) 351 compression_level = get_zip_compression_level(path)
377 chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK 352 chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK
378 content = file_read(path, chunk_size) 353 content = file_read(path, chunk_size)
379 self.async_push(channel, priority, metadata['h'], metadata['s'], 354 self.async_push(channel, priority, pending_push,
380 content, compression_level, push_urls) 355 content, compression_level)
381 356
382 # No need to spawn deadlock detector thread if there's nothing to upload. 357 # No need to spawn deadlock detector thread if there's nothing to upload.
383 if missing: 358 if missing:
384 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: 359 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
385 # Wait for all started uploads to finish. 360 # Wait for all started uploads to finish.
386 uploaded = 0 361 uploaded = 0
387 while uploaded != len(missing): 362 while uploaded != len(missing):
388 detector.ping() 363 detector.ping()
389 item = channel.pull() 364 item = channel.pull()
390 uploaded += 1 365 uploaded += 1
(...skipping 17 matching lines...) Expand all
408 cache_hit_size * 100. / total_size if total_size else 0) 383 cache_hit_size * 100. / total_size if total_size else 0)
409 cache_miss = missing 384 cache_miss = missing
410 cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss) 385 cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss)
411 logging.info( 386 logging.info(
412 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', 387 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
413 len(cache_miss), 388 len(cache_miss),
414 cache_miss_size / 1024., 389 cache_miss_size / 1024.,
415 len(cache_miss) * 100. / total, 390 len(cache_miss) * 100. / total,
416 cache_miss_size * 100. / total_size if total_size else 0) 391 cache_miss_size * 100. / total_size if total_size else 0)
417 392
418 def async_push(self, channel, priority, item, expected_size, 393 def async_push(self, channel, priority, pending_push,
419 content_generator, compression_level, push_urls=None): 394 content_generator, compression_level):
420 """Starts asynchronous push to the server in a parallel thread.""" 395 """Starts asynchronous push to the server in a parallel thread."""
421 def push(content, size): 396 def push(content, size):
422 """Pushes an item and returns its id, to pass as a result to |channel|.""" 397 """Pushes an item and returns its id, to pass as a result to |channel|."""
423 self._storage_api.push(item, size, content, push_urls) 398 self._storage_api.push(item, size, content, push_urls)
424 return item 399 return item
425 400
426 # If zipping is not required, just start a push task. 401 # If zipping is not required, just start a push task.
427 if not self.use_zip: 402 if not self.use_zip:
428 self.net_thread_pool.add_task_with_channel(channel, priority, push, 403 self.net_thread_pool.add_task_with_channel(channel, priority, push,
429 content_generator, expected_size) 404 content_generator, expected_size)
(...skipping 16 matching lines...) Expand all
446 421
447 def get_missing_files(self, files): 422 def get_missing_files(self, files):
448 """Yields files that are missing from the server. 423 """Yields files that are missing from the server.
449 424
450 Issues multiple parallel queries via StorageApi's 'contains' method. 425 Issues multiple parallel queries via StorageApi's 'contains' method.
451 426
452 Arguments: 427 Arguments:
453 files: a dictionary file name -> metadata dict. 428 files: a dictionary file name -> metadata dict.
454 429
455 Yields: 430 Yields:
456 Triplets (file name, metadata dict, push_urls object to pass to push). 431 PendingPush objects that represent missing files.
457 """ 432 """
458 channel = threading_utils.TaskChannel() 433 channel = threading_utils.TaskChannel()
459 pending = 0 434 pending = 0
460 # Enqueue all requests. 435 # Enqueue all requests.
461 for batch in self.batch_files_for_check(files): 436 for batch in self.batch_files_for_check(files):
462 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH, 437 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
463 self._storage_api.contains, batch) 438 self._storage_api.contains, batch)
464 pending += 1 439 pending += 1
465 # Yield results as they come in. 440 # Yield results as they come in.
466 for _ in xrange(pending): 441 for _ in xrange(pending):
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
505 480
506 Arguments: 481 Arguments:
507 item: hash digest of item to download. 482 item: hash digest of item to download.
508 expected_size: expected size of the item, to validate it. 483 expected_size: expected size of the item, to validate it.
509 484
510 Yields: 485 Yields:
511 Chunks of downloaded item (as str objects). 486 Chunks of downloaded item (as str objects).
512 """ 487 """
513 raise NotImplementedError() 488 raise NotImplementedError()
514 489
515 def push(self, item, expected_size, content_generator, push_urls=None): 490 def push(self, pending_push, content_generator):
516 """Uploads content generated by |content_generator| as |item|. 491 """Uploads content generated by |content_generator|.
492
493 Actual item being pushed is identified by |pending_push| object.
517 494
518 Arguments: 495 Arguments:
519 item: hash digest of item to upload. 496 pending_push: PendingPush object returned by 'contains' call.
520 expected_size: total length of the content yielded by |content_generator|.
521 content_generator: generator that produces chunks to push. 497 content_generator: generator that produces chunks to push.
522 push_urls: optional URLs returned by 'contains' call for this item.
523 498
524 Returns: 499 Returns:
525 None. 500 None.
526 """ 501 """
527 raise NotImplementedError() 502 raise NotImplementedError()
528 503
529 def contains(self, files): 504 def contains(self, files):
530 """Checks for existence of given |files| on the server. 505 """Checks for existence of given |files| on the server.
531 506
532 Arguments: 507 Arguments:
533 files: list of pairs (file name, metadata dict). 508 files: list of pairs (file name, metadata dict).
534 509
535 Returns: 510 Returns:
536 A list of files missing on server as a list of triplets 511 A list of files missing on server as a list of PendingPush objects.
537 (file name, metadata dict, push_urls object to pass to push).
538 """ 512 """
539 raise NotImplementedError() 513 raise NotImplementedError()
540 514
541 515
516 class PendingPush(object):
M-A Ruel 2013/09/30 19:38:27 I prefer the class to be before Storage.
517 """A file that were not found on the server and that should be uploaded.
M-A Ruel 2013/09/30 19:38:27 """A file that was not ...
518
519 Returned by StorageApi's 'contains' call, passed to StorageApi's 'push'.
M-A Ruel 2013/09/30 19:38:27 Returned by StorageApi.contains(), to be passed to
520 """
521
522 def __init__(self, filename, metadata):
523 self.filename = filename
524 self.metadata = metadata
525
526
542 class IsolateServer(StorageApi): 527 class IsolateServer(StorageApi):
543 """StorageApi implementation that downloads and uploads to Isolate Server.""" 528 """StorageApi implementation that downloads and uploads to Isolate Server.
529
530 It uploads and downloads directly from Google Storage whenever appropriate.
531 """
532
533 class PendingIsolatePush(PendingPush):
534 def __init__(self, filename, metadata, push_urls):
535 super(IsolateServer.PendingIsolatePush, self).__init__(filename, metadata)
536 self.push_urls = push_urls
537 self.uploaded = False
538
544 def __init__(self, base_url, namespace): 539 def __init__(self, base_url, namespace):
545 super(IsolateServer, self).__init__() 540 super(IsolateServer, self).__init__()
546 assert base_url.startswith('http'), base_url 541 assert base_url.startswith('http'), base_url
547 self.content_url = base_url.rstrip('/') + '/content/' 542 self.base_url = base_url.rstrip('/')
548 self.namespace = namespace 543 self.namespace = namespace
549 self.algo = get_hash_algo(namespace) 544 self.algo = get_hash_algo(namespace)
550 self._token = None 545 self._use_zip = is_namespace_with_compression(namespace)
551 self._lock = threading.Lock() 546 self._lock = threading.Lock()
547 self._server_caps = None
548
549 @staticmethod
550 def generate_handshake_request():
551 """Returns a dict to be sent as handshake request body."""
552 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
553 return {
554 'protocol_version': ISOLATE_PROTOCOL_VERSION,
M-A Ruel 2013/09/30 19:38:27 Sort here too
555 'client_app_version': __version__,
556 'fetcher': True,
557 'pusher': True,
558 }
559
560 @staticmethod
561 def validate_handshake_response(caps):
562 """Validates and normalizes handshake response."""
563 logging.info('Protocol version: %s', caps['protocol_version'])
564 logging.info('Server version: %s', caps['server_app_version'])
565 if caps.get('error'):
566 raise MappingError(caps['error'])
567 if not caps['access_token']:
568 raise ValueError('access_token is missing')
569 return caps
552 570
553 @property 571 @property
554 def token(self): 572 def server_capabilities(self):
573 """Performs handshake with the server if not yet done.
574
575 Returns:
576 Server capabilities dictionary as returned by /handshake endpoint.
577
578 Raises:
579 MappingError if server rejects the handshake.
580 """
555 # TODO(maruel): Make this request much earlier asynchronously while the 581 # TODO(maruel): Make this request much earlier asynchronously while the
556 # files are being enumerated. 582 # files are being enumerated.
557 with self._lock: 583 with self._lock:
558 if not self._token: 584 if self._server_caps is None:
559 self._token = urllib.quote(url_read(self.content_url + 'get_token')) 585 request_body = json.dumps(
560 return self._token 586 self.generate_handshake_request(), separators=(',', ':'))
587 response = net.url_read(
588 url=self.base_url + '/content-gs/handshake',
589 data=request_body,
590 content_type='application/json',
591 method='POST')
592 if response is None:
593 raise MappingError('Failed to perform handshake.')
594 try:
595 caps = json.loads(response)
596 if not isinstance(caps, dict):
597 raise ValueError('Expecting JSON dict')
598 self._server_caps = self.validate_handshake_response(caps)
599 except (ValueError, KeyError, TypeError) as exc:
600 # KeyError exception has very confusing str conversion: it's just a
601 # missing key value and nothing else. So print exception class name
602 # as well.
603 raise MappingError('Invalid handshake response (%s): %s' % (
604 exc.__class__.__name__, exc))
605 return self._server_caps
561 606
562 def fetch(self, item, expected_size): 607 def fetch(self, item, expected_size):
563 assert isinstance(item, basestring) 608 assert isinstance(item, basestring)
564 assert ( 609 assert (isinstance(expected_size, (int, long)) or
565 isinstance(expected_size, (int, long)) or
566 expected_size == UNKNOWN_FILE_SIZE) 610 expected_size == UNKNOWN_FILE_SIZE)
567 zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item) 611
568 logging.debug('download_file(%s)', zipped_url) 612 source_url = '%s/content-gs/retrieve/%s/%s' % (
613 self.base_url, self.namespace, item)
614 logging.debug('download_file(%s)', source_url)
569 615
570 # Because the app engine DB is only eventually consistent, retry 404 errors 616 # Because the app engine DB is only eventually consistent, retry 404 errors
571 # because the file might just not be visible yet (even though it has been 617 # because the file might just not be visible yet (even though it has been
572 # uploaded). 618 # uploaded).
573 connection = net.url_open( 619 connection = net.url_open(
574 zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) 620 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
575 if not connection: 621 if not connection:
576 raise IOError('Unable to open connection to %s' % zipped_url) 622 raise IOError('Unable to open connection to %s' % source_url)
577 623
578 # TODO(maruel): Must only decompress when needed.
579 decompressor = zlib.decompressobj()
580 try: 624 try:
581 compressed_size = 0 625 # Prepare reading pipeline.
582 decompressed_size = 0 626 generator = stream_read(connection, NET_IO_FILE_CHUNK)
583 while True: 627 if self._use_zip:
584 chunk = connection.read(ZIPPED_FILE_CHUNK) 628 generator = zip_decompress(generator, DISK_FILE_CHUNK)
585 if not chunk:
586 break
587 compressed_size += len(chunk)
588 decompressed = decompressor.decompress(chunk)
589 decompressed_size += len(decompressed)
590 yield decompressed
591 629
592 # Ensure that all the data was properly decompressed. 630 # Read and yield data, calculate total length of the decompressed stream.
593 uncompressed_data = decompressor.flush() 631 total_size = 0
594 if uncompressed_data: 632 for chunk in generator:
595 raise IOError('Decompression failed') 633 total_size += len(chunk)
596 if (expected_size != UNKNOWN_FILE_SIZE and 634 yield chunk
597 decompressed_size != expected_size):
598 raise IOError('File incorrect size after download of %s. Got %s and '
599 'expected %s' % (item, decompressed_size, expected_size))
600 except zlib.error as e:
601 msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % (
602 item, compressed_size, connection.content_length, e)
603 logging.warning(msg)
604 635
605 # Testing seems to show that if a few machines are trying to download 636 # Verify data length matches expectation.
606 # the same blob, they can cause each other to fail. So if we hit a zip 637 if expected_size != UNKNOWN_FILE_SIZE and total_size != expected_size:
607 # error, this is the most likely cause (it only downloads some of the 638 raise IOError('Incorrect file size: expected %d, got %d' % (
608 # data). Randomly sleep for between 5 and 25 seconds to try and spread 639 expected_size, total_size))
609 # out the downloads.
610 sleep_duration = (random.random() * 20) + 5
611 time.sleep(sleep_duration)
612 raise IOError(msg)
613 640
614 def push(self, item, expected_size, content_generator, push_urls=None): 641 except IOError as err:
642 logging.warning('Failed to fetch %s: %s', item, err)
643 raise
644
645 def push(self, pending_push, content_generator):
646 assert isinstance(pending_push, IsolateServer.PendingIsolatePush)
647 # TODO: use |pending_push| below.
615 assert isinstance(item, basestring) 648 assert isinstance(item, basestring)
616 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 649 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
617 item = str(item) 650 assert push_urls and len(push_urls) == 2
651 upload_url, finalize_url = push_urls
618 652
619 # TODO(maruel): Support large files. This would require streaming support. 653 # TODO(maruel): Support large files. This would require streaming support.
620 654
655 # TODO(vadimsh): Do not read from |content_generator| when retrying push.
656 # If |content_generator| is indeed a generator, it can not be re-winded back
657 # to the beginning of the stream. A retry will find it exhausted. A possible
658 # solution is to wrap content_generator with some sort of caching
659 # restartable generator. It should be done alongside streaming support
660 # implementation.
661
621 # A cheese way to avoid memcpy of (possibly huge) file, until streaming 662 # A cheese way to avoid memcpy of (possibly huge) file, until streaming
622 # upload support is implemented. 663 # upload support is implemented.
623 if isinstance(content_generator, list) and len(content_generator) == 1: 664 if isinstance(content_generator, list) and len(content_generator) == 1:
624 content = content_generator[0] 665 content = content_generator[0]
625 else: 666 else:
626 content = ''.join(content_generator) 667 content = ''.join(content_generator)
627 668
628 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE: 669 # PUT file to |upload_url|.
629 return self._upload_hash_content_to_blobstore(item, content) 670 response = net.url_read(
671 url=upload_url,
672 data=content,
673 content_type='application/octet-stream',
674 method='PUT')
675 if response is None:
676 raise IOError('Failed to upload a file %s to %s' % (item, upload_url))
630 677
631 url = '%sstore/%s/%s?token=%s' % ( 678 # Optionally notify the server that it's done.
632 self.content_url, self.namespace, item, self.token) 679 if finalize_url:
633 return url_read(url, data=content, content_type='application/octet-stream') 680 # TODO(vadimsh): Calculate MD5 sum while uploading a file and send it to
681 # isolated server. That way isolate server can verify that the data safely
682 # reached Google Storage (GS provides MD5 of stored files).
683 response = net.url_read(
684 url=finalize_url,
685 data='',
686 content_type='application/json',
687 method='POST')
688 # TODO(vadimsh): Do not reupload item again if only finalize_url request
689 # failed.
690 if response is None:
691 raise IOError('Failed to finalize an upload of %s' % item)
634 692
635 def contains(self, files): 693 def contains(self, files):
636 logging.info('Checking existence of %d files...', len(files)) 694 logging.info('Checking existence of %d files...', len(files))
637 695
638 body = ''.join( 696 # Request body is a json encoded list of dicts.
639 (binascii.unhexlify(metadata['h']) for (_, metadata) in files)) 697 body = [
640 assert (len(body) % self.algo().digest_size) == 0, repr(body) 698 {
699 'h': metadata['h'],
700 's': metadata['s'],
701 'i': 1 if metadata.get('priority') == '0' else 0,
702 } for _, metadata in files
703 ]
641 704
642 query_url = '%scontains/%s?token=%s' % ( 705 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
643 self.content_url, self.namespace, self.token) 706 self.base_url,
644 response = url_read( 707 self.namespace,
645 query_url, data=body, content_type='application/octet-stream') 708 urllib.quote(self.server_capabilities['access_token']))
646 if len(files) != len(response): 709 response_body = net.url_read(
710 url=query_url,
711 data=json.dumps(body, separators=(',', ':')),
712 content_type='application/json',
713 method='POST')
714 if response_body is None:
715 raise MappingError('Failed to execute /pre-upload query')
716
717 # Response body is a list of push_urls (or null if file is already present).
718 try:
719 response = json.loads(response_body)
720 if not isinstance(response, list):
721 raise ValueError('Expecting response with json-encoded list')
722 if len(response) != len(files):
723 raise ValueError(
724 'Incorrect number of items in the list, expected %d, '
725 'but got %d' % (len(files), len(response)))
726 except ValueError as err:
647 raise MappingError( 727 raise MappingError(
648 'Got an incorrect number of responses from the server. Expected %d, ' 728 'Invalid response from server: %s, body is %s' % (err, response_body))
649 'but got %d' % (len(files), len(response)))
650 729
651 # This implementation of IsolateServer doesn't use push_urls field, 730 # Convert response into a list of triplets with info about missing files.
652 # set it to None.
653 missing_files = [ 731 missing_files = [
654 files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00' 732 IsolateServer.PendingIsolatePush(files[0], files[1], push_urls)
M-A Ruel 2013/09/30 19:38:27 You probably meant files[i][0], files[i][1] :)
733 for i, push_urls in enumerate(response)
734 if push_urls
655 ] 735 ]
656 logging.info('Queried %d files, %d cache hit', 736 logging.info('Queried %d files, %d cache hit',
657 len(files), len(files) - len(missing_files)) 737 len(files), len(files) - len(missing_files))
658 return missing_files 738 return missing_files
659 739
660 def _upload_hash_content_to_blobstore(self, item, content):
661 """Uploads the content directly to the blobstore via a generated url."""
662 # TODO(maruel): Support large files. This would require streaming support.
663 gen_url = '%sgenerate_blobstore_url/%s/%s' % (
664 self.content_url, self.namespace, item)
665 # Token is guaranteed to be already quoted but it is unnecessary here, and
666 # only here.
667 data = [('token', urllib.unquote(self.token))]
668 content_type, body = encode_multipart_formdata(
669 data, [('content', item, content)])
670 last_url = gen_url
671 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
672 # Retry HTTP 50x here but not 404.
673 upload_url = net.url_read(gen_url, data=data)
674 if not upload_url:
675 raise MappingError('Unable to connect to server %s' % gen_url)
676 last_url = upload_url
677
678 # Do not retry this request on HTTP 50x. Regenerate an upload url each
679 # time since uploading "consumes" the upload url.
680 result = net.url_read(
681 upload_url, data=body, content_type=content_type, retry_50x=False)
682 if result is not None:
683 return result
684 raise MappingError('Unable to connect to server %s' % last_url)
685
686 740
687 class FileSystem(StorageApi): 741 class FileSystem(StorageApi):
688 """StorageApi implementation that fetches data from the file system. 742 """StorageApi implementation that fetches data from the file system.
689 743
690 The common use case is a NFS/CIFS file server that is mounted locally that is 744 The common use case is a NFS/CIFS file server that is mounted locally that is
691 used to fetch the file on a local partition. 745 used to fetch the file on a local partition.
692 """ 746 """
747
693 def __init__(self, base_path): 748 def __init__(self, base_path):
694 super(FileSystem, self).__init__() 749 super(FileSystem, self).__init__()
695 self.base_path = base_path 750 self.base_path = base_path
696 751
697 def fetch(self, item, expected_size): 752 def fetch(self, item, expected_size):
698 assert isinstance(item, basestring) 753 assert isinstance(item, basestring)
699 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 754 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
700 source = os.path.join(self.base_path, item) 755 source = os.path.join(self.base_path, item)
701 if (expected_size != UNKNOWN_FILE_SIZE and 756 if (expected_size != UNKNOWN_FILE_SIZE and
702 not is_valid_file(source, expected_size)): 757 not is_valid_file(source, expected_size)):
703 raise IOError('Invalid file %s' % item) 758 raise IOError('Invalid file %s' % item)
704 return file_read(source) 759 return file_read(source)
705 760
706 def push(self, item, expected_size, content_generator, push_urls=None): 761 def push(self, item, expected_size, content_generator, push_urls=None):
M-A Ruel 2013/09/30 19:38:27 You forgot to update it
707 assert isinstance(item, basestring) 762 assert isinstance(item, basestring)
708 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 763 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
709 dest = os.path.join(self.base_path, item) 764 dest = os.path.join(self.base_path, item)
710 total = file_write(dest, content_generator) 765 total = file_write(dest, content_generator)
711 if expected_size != UNKNOWN_FILE_SIZE and total != expected_size: 766 if expected_size != UNKNOWN_FILE_SIZE and total != expected_size:
712 os.remove(dest) 767 os.remove(dest)
713 raise IOError( 768 raise IOError(
714 'Invalid file %s, %d != %d' % (item, total, expected_size)) 769 'Invalid file %s, %d != %d' % (item, total, expected_size))
715 770
716 def contains(self, files): 771 def contains(self, files):
717 return [ 772 return [
718 (filename, metadata, None) 773 PendingPush(filename, metadata)
719 for filename, metadata in files 774 for filename, metadata in files
720 if not os.path.exists(os.path.join(self.base_path, metadata['h'])) 775 if not os.path.exists(os.path.join(self.base_path, metadata['h']))
721 ] 776 ]
722 777
723 778
724 def get_hash_algo(_namespace): 779 def get_hash_algo(_namespace):
725 """Return hash algorithm class to use when uploading to given |namespace|.""" 780 """Return hash algorithm class to use when uploading to given |namespace|."""
726 # TODO(vadimsh): Implement this at some point. 781 # TODO(vadimsh): Implement this at some point.
727 return hashlib.sha1 782 return hashlib.sha1
728 783
(...skipping 573 matching lines...) Expand 10 before | Expand all | Expand 10 after
1302 sys.stderr.write(str(e)) 1357 sys.stderr.write(str(e))
1303 sys.stderr.write('\n') 1358 sys.stderr.write('\n')
1304 return 1 1359 return 1
1305 1360
1306 1361
1307 if __name__ == '__main__': 1362 if __name__ == '__main__':
1308 fix_encoding.fix_encoding() 1363 fix_encoding.fix_encoding()
1309 tools.disable_buffering() 1364 tools.disable_buffering()
1310 colorama.init() 1365 colorama.init()
1311 sys.exit(main(sys.argv[1:])) 1366 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698