Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(472)

Side by Side Diff: isolateserver.py

Issue 25093003: Client side implementation of new /content-gs isolate protocol. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: introduce Item class Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Archives a set of files to a server.""" 6 """Archives a set of files to a server."""
7 7
8 __version__ = '0.2' 8 __version__ = '0.2'
9 9
10 import binascii
11 import hashlib 10 import hashlib
12 import json 11 import json
13 import logging 12 import logging
14 import os 13 import os
15 import random
16 import re 14 import re
17 import sys 15 import sys
18 import threading 16 import threading
19 import time 17 import time
20 import urllib 18 import urllib
21 import zlib 19 import zlib
22 20
23 from third_party import colorama 21 from third_party import colorama
24 from third_party.depot_tools import fix_encoding 22 from third_party.depot_tools import fix_encoding
25 from third_party.depot_tools import subcommand 23 from third_party.depot_tools import subcommand
26 24
27 from utils import net 25 from utils import net
28 from utils import threading_utils 26 from utils import threading_utils
29 from utils import tools 27 from utils import tools
30 28
31 29
32 # The minimum size of files to upload directly to the blobstore. 30 # Version of isolate protocol passed to the server in /handshake request.
33 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 31 ISOLATE_PROTOCOL_VERSION = '1.0'
34 32
35 # The number of files to check the isolate server per /contains query. 33
34 # The number of files to check the isolate server per /pre-upload query.
36 # All files are sorted by likelihood of a change in the file content 35 # All files are sorted by likelihood of a change in the file content
37 # (currently file size is used to estimate this: larger the file -> larger the 36 # (currently file size is used to estimate this: larger the file -> larger the
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files 37 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
39 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], 38 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
40 # and so on. Numbers here is a trade-off; the more per request, the lower the 39 # and so on. Numbers here is a trade-off; the more per request, the lower the
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, 40 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42 # larger values cause longer lookups, increasing the initial latency to start 41 # larger values cause longer lookups, increasing the initial latency to start
43 # uploading, which is especially an issue for large files. This value is 42 # uploading, which is especially an issue for large files. This value is
44 # optimized for the "few thousands files to look up with minimal number of large 43 # optimized for the "few thousands files to look up with minimal number of large
45 # files missing" case. 44 # files missing" case.
46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] 45 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
47 46
48 47
49 # A list of already compressed extension types that should not receive any 48 # A list of already compressed extension types that should not receive any
50 # compression before being uploaded. 49 # compression before being uploaded.
51 ALREADY_COMPRESSED_TYPES = [ 50 ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', 51 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip' 52 'wav', 'zip'
54 ] 53 ]
55 54
56 55
57 # The file size to be used when we don't know the correct file size, 56 # The file size to be used when we don't know the correct file size,
58 # generally used for .isolated files. 57 # generally used for .isolated files.
59 UNKNOWN_FILE_SIZE = None 58 UNKNOWN_FILE_SIZE = None
60 59
61 60
62 # The size of each chunk to read when downloading and unzipping files. 61 # The size of each chunk to read when downloading and unzipping files.
63 ZIPPED_FILE_CHUNK = 16 * 1024 62 ZIPPED_FILE_CHUNK = 16 * 1024
64 63
65
66 # Chunk size to use when doing disk I/O. 64 # Chunk size to use when doing disk I/O.
67 DISK_FILE_CHUNK = 1024 * 1024 65 DISK_FILE_CHUNK = 1024 * 1024
68 66
67 # Chunk size to use when reading from network stream.
68 NET_IO_FILE_CHUNK = 16 * 1024
69
69 70
70 # Read timeout in seconds for downloads from isolate storage. If there's no 71 # Read timeout in seconds for downloads from isolate storage. If there's no
71 # response from the server within this timeout whole download will be aborted. 72 # response from the server within this timeout whole download will be aborted.
72 DOWNLOAD_READ_TIMEOUT = 60 73 DOWNLOAD_READ_TIMEOUT = 60
73 74
74 # Maximum expected delay (in seconds) between successive file fetches 75 # Maximum expected delay (in seconds) between successive file fetches
75 # in run_tha_test. If it takes longer than that, a deadlock might be happening 76 # in run_tha_test. If it takes longer than that, a deadlock might be happening
76 # and all stack frames for all threads are dumped to log. 77 # and all stack frames for all threads are dumped to log.
77 DEADLOCK_TIMEOUT = 5 * 60 78 DEADLOCK_TIMEOUT = 5 * 60
78 79
(...skipping 20 matching lines...) Expand all
99 class ConfigError(ValueError): 100 class ConfigError(ValueError):
100 """Generic failure to load a .isolated file.""" 101 """Generic failure to load a .isolated file."""
101 pass 102 pass
102 103
103 104
104 class MappingError(OSError): 105 class MappingError(OSError):
105 """Failed to recreate the tree.""" 106 """Failed to recreate the tree."""
106 pass 107 pass
107 108
108 109
109 def randomness():
110 """Generates low-entropy randomness for MIME encoding.
111
112 Exists so it can be mocked out in unit tests.
113 """
114 return str(time.time())
115
116
117 def encode_multipart_formdata(fields, files,
118 mime_mapper=lambda _: 'application/octet-stream'):
119 """Encodes a Multipart form data object.
120
121 Args:
122 fields: a sequence (name, value) elements for
123 regular form fields.
124 files: a sequence of (name, filename, value) elements for data to be
125 uploaded as files.
126 mime_mapper: function to return the mime type from the filename.
127 Returns:
128 content_type: for httplib.HTTP instance
129 body: for httplib.HTTP instance
130 """
131 boundary = hashlib.md5(randomness()).hexdigest()
132 body_list = []
133 for (key, value) in fields:
134 if isinstance(key, unicode):
135 value = key.encode('utf-8')
136 if isinstance(value, unicode):
137 value = value.encode('utf-8')
138 body_list.append('--' + boundary)
139 body_list.append('Content-Disposition: form-data; name="%s"' % key)
140 body_list.append('')
141 body_list.append(value)
142 body_list.append('--' + boundary)
143 body_list.append('')
144 for (key, filename, value) in files:
145 if isinstance(key, unicode):
146 value = key.encode('utf-8')
147 if isinstance(filename, unicode):
148 value = filename.encode('utf-8')
149 if isinstance(value, unicode):
150 value = value.encode('utf-8')
151 body_list.append('--' + boundary)
152 body_list.append('Content-Disposition: form-data; name="%s"; '
153 'filename="%s"' % (key, filename))
154 body_list.append('Content-Type: %s' % mime_mapper(filename))
155 body_list.append('')
156 body_list.append(value)
157 body_list.append('--' + boundary)
158 body_list.append('')
159 if body_list:
160 body_list[-2] += '--'
161 body = '\r\n'.join(body_list)
162 content_type = 'multipart/form-data; boundary=%s' % boundary
163 return content_type, body
164
165
166 def is_valid_hash(value, algo): 110 def is_valid_hash(value, algo):
167 """Returns if the value is a valid hash for the corresponding algorithm.""" 111 """Returns if the value is a valid hash for the corresponding algorithm."""
168 size = 2 * algo().digest_size 112 size = 2 * algo().digest_size
169 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) 113 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
170 114
171 115
172 def hash_file(filepath, algo): 116 def hash_file(filepath, algo):
173 """Calculates the hash of a file without reading it all in memory at once. 117 """Calculates the hash of a file without reading it all in memory at once.
174 118
175 |algo| should be one of hashlib hashing algorithm. 119 |algo| should be one of hashlib hashing algorithm.
176 """ 120 """
177 digest = algo() 121 digest = algo()
178 with open(filepath, 'rb') as f: 122 with open(filepath, 'rb') as f:
179 while True: 123 while True:
180 chunk = f.read(DISK_FILE_CHUNK) 124 chunk = f.read(DISK_FILE_CHUNK)
181 if not chunk: 125 if not chunk:
182 break 126 break
183 digest.update(chunk) 127 digest.update(chunk)
184 return digest.hexdigest() 128 return digest.hexdigest()
185 129
186 130
131 def stream_read(stream, chunk_size):
132 """Reads chunks from |stream| and yields them."""
133 while True:
134 data = stream.read(chunk_size)
135 if not data:
136 break
137 yield data
138
139
187 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): 140 def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
188 """Yields file content in chunks of given |chunk_size|.""" 141 """Yields file content in chunks of given |chunk_size|."""
189 with open(filepath, 'rb') as f: 142 with open(filepath, 'rb') as f:
190 while True: 143 while True:
191 data = f.read(chunk_size) 144 data = f.read(chunk_size)
192 if not data: 145 if not data:
193 break 146 break
194 yield data 147 yield data
195 148
196 149
(...skipping 22 matching lines...) Expand all
219 compressor = zlib.compressobj(level) 172 compressor = zlib.compressobj(level)
220 for chunk in content_generator: 173 for chunk in content_generator:
221 compressed = compressor.compress(chunk) 174 compressed = compressor.compress(chunk)
222 if compressed: 175 if compressed:
223 yield compressed 176 yield compressed
224 tail = compressor.flush(zlib.Z_FINISH) 177 tail = compressor.flush(zlib.Z_FINISH)
225 if tail: 178 if tail:
226 yield tail 179 yield tail
227 180
228 181
182 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
183 """Reads zipped data from |content_generator| and yields decompressed data.
184
185 Decompresses data in small chunks (no larger than |chunk_size|) so that
186 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
187
188 Raises IOError if data is corrupted or incomplete.
189 """
190 decompressor = zlib.decompressobj()
191 compressed_size = 0
192 try:
193 for chunk in content_generator:
194 compressed_size += len(chunk)
195 data = decompressor.decompress(chunk, chunk_size)
196 if data:
197 yield data
198 while decompressor.unconsumed_tail:
199 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
200 if data:
201 yield data
202 tail = decompressor.flush()
203 if tail:
204 yield tail
205 except zlib.error as e:
206 raise IOError(
207 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
208 # Ensure all data was read and decompressed.
209 if decompressor.unused_data or decompressor.unconsumed_tail:
210 raise IOError('Not all data was decompressed')
211
212
229 def get_zip_compression_level(filename): 213 def get_zip_compression_level(filename):
230 """Given a filename calculates the ideal zip compression level to use.""" 214 """Given a filename calculates the ideal zip compression level to use."""
231 file_ext = os.path.splitext(filename)[1].lower() 215 file_ext = os.path.splitext(filename)[1].lower()
232 # TODO(csharp): Profile to find what compression level works best. 216 # TODO(csharp): Profile to find what compression level works best.
233 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 217 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
234 218
235 219
236 def create_directories(base_directory, files): 220 def create_directories(base_directory, files):
237 """Creates the directory structure needed by the given list of files.""" 221 """Creates the directory structure needed by the given list of files."""
238 logging.debug('create_directories(%s, %d)', base_directory, len(files)) 222 logging.debug('create_directories(%s, %d)', base_directory, len(files))
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
294 278
295 279
296 def try_remove(filepath): 280 def try_remove(filepath):
297 """Removes a file without crashing even if it doesn't exist.""" 281 """Removes a file without crashing even if it doesn't exist."""
298 try: 282 try:
299 os.remove(filepath) 283 os.remove(filepath)
300 except OSError: 284 except OSError:
301 pass 285 pass
302 286
303 287
304 def url_read(url, **kwargs): 288 class Item(object):
Vadim Sh. 2013/09/30 21:35:21 I decided to call it Item instead of File for two
M-A Ruel 2013/09/30 22:36:33 I like not using File but Item is a tad generic, b
305 result = net.url_read(url, **kwargs) 289 """An item to push to Storage.
306 if result is None: 290
307 # If we get no response from the server, assume it is down and raise an 291 It starts its life in a main thread, travels to 'contains' thread, then to
308 # exception. 292 'push' thread and then finally back to the main thread.
309 raise MappingError('Unable to connect to server %s' % url) 293
310 return result 294 It never used concurrently from multiple threads.
295 """
296
297 def __init__(self, digest, size, is_isolated):
298 self.digest = digest
299 self.size = size
300 self.is_isolated = is_isolated
301 self.priority = WorkerPool.HIGH if self.is_isolated else WorkerPool.MED
302 self.compression_level = 6
303 self._push_state = None
304
305 @property
306 def push_state(self):
307 """StorageApi specific push state information."""
308 return self._push_state
309
310 @push_state.setter
311 def push_state(self, push_state):
312 """StorageApi specific push state information."""
313 assert self._push_state is None
314 self._push_state = push_state
315
316 def content(self, chunk_size):
317 """Generator that produces content of this item in chunks of given size."""
318 raise NotImplementedError()
319
320
321 class FileItem(Item):
322 """A file to push to Storage."""
323
324 def __init__(self, path, digest, size, is_isolated):
325 super(FileItem, self).__init__(digest, size, is_isolated)
326 self.path = path
327 self.compression_level = get_zip_compression_level(path)
328
329 def content(self, chunk_size):
330 return file_read(self.path, chunk_size)
311 331
312 332
313 class Storage(object): 333 class Storage(object):
314 """Efficiently downloads or uploads large set of files via StorageApi.""" 334 """Efficiently downloads or uploads large set of files via StorageApi."""
315 335
316 def __init__(self, storage_api, use_zip): 336 def __init__(self, storage_api, use_zip):
317 self.use_zip = use_zip 337 self.use_zip = use_zip
318 self._storage_api = storage_api 338 self._storage_api = storage_api
319 self._cpu_thread_pool = None 339 self._cpu_thread_pool = None
320 self._net_thread_pool = None 340 self._net_thread_pool = None
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
356 376
357 def upload_tree(self, indir, infiles): 377 def upload_tree(self, indir, infiles):
358 """Uploads the given tree to the isolate server. 378 """Uploads the given tree to the isolate server.
359 379
360 Arguments: 380 Arguments:
361 indir: root directory the infiles are based in. 381 indir: root directory the infiles are based in.
362 infiles: dict of files to upload from |indir|. 382 infiles: dict of files to upload from |indir|.
363 """ 383 """
364 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles)) 384 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
365 385
386 # TODO(vadimsh): Introduce Item as a part of the public interface?
387
388 # Convert |indir| + |infiles| into a list of FileItem objects.
389 # Filter out symlinks, since they are not represented by items on isolate
390 # server side.
391 items = [
392 FileItem(
393 path=os.path.join(indir, filepath),
394 digest=metadata['h'],
395 size=metadata['s'],
396 is_isolated=metadata.get('priority') == '0')
397 for filepath, metadata in infiles.iteritems()
398 if 'l' not in metadata
399 ]
400
366 # Enqueue all upload tasks. 401 # Enqueue all upload tasks.
402 missing = set()
367 channel = threading_utils.TaskChannel() 403 channel = threading_utils.TaskChannel()
368 missing = [] 404 for missing_item in self.get_missing_items(items):
369 for filename, metadata, push_urls in self.get_missing_files(infiles): 405 missing.add(missing_item)
370 missing.append((filename, metadata)) 406 self.async_push(channel, missing_item)
371 path = os.path.join(indir, filename)
372 if metadata.get('priority', '1') == '0':
373 priority = WorkerPool.HIGH
374 else:
375 priority = WorkerPool.MED
376 compression_level = get_zip_compression_level(path)
377 chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK
378 content = file_read(path, chunk_size)
379 self.async_push(channel, priority, metadata['h'], metadata['s'],
380 content, compression_level, push_urls)
381 407
382 # No need to spawn deadlock detector thread if there's nothing to upload. 408 # No need to spawn deadlock detector thread if there's nothing to upload.
383 if missing: 409 if missing:
384 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: 410 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
385 # Wait for all started uploads to finish. 411 # Wait for all started uploads to finish.
386 uploaded = 0 412 uploaded = 0
387 while uploaded != len(missing): 413 while uploaded != len(missing):
388 detector.ping() 414 detector.ping()
389 item = channel.pull() 415 item = channel.pull()
390 uploaded += 1 416 uploaded += 1
391 logging.debug('Uploaded %d / %d: %s', uploaded, len(missing), item) 417 logging.debug(
418 'Uploaded %d / %d: %s', uploaded, len(missing), item.digest)
392 logging.info('All files are uploaded') 419 logging.info('All files are uploaded')
393 420
394 # Print stats. 421 # Print stats.
395 total = len(infiles) 422 total = len(items)
396 total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues()) 423 total_size = sum(f.size for f in items)
397 logging.info( 424 logging.info(
398 'Total: %6d, %9.1fkb', 425 'Total: %6d, %9.1fkb',
399 total, 426 total,
400 sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.) 427 total_size / 1024.)
401 cache_hit = set(infiles.iterkeys()) - set(x[0] for x in missing) 428 cache_hit = set(items) - missing
402 cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit) 429 cache_hit_size = sum(f.size for f in cache_hit)
403 logging.info( 430 logging.info(
404 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', 431 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
405 len(cache_hit), 432 len(cache_hit),
406 cache_hit_size / 1024., 433 cache_hit_size / 1024.,
407 len(cache_hit) * 100. / total, 434 len(cache_hit) * 100. / total,
408 cache_hit_size * 100. / total_size if total_size else 0) 435 cache_hit_size * 100. / total_size if total_size else 0)
409 cache_miss = missing 436 cache_miss = missing
410 cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss) 437 cache_miss_size = sum(f.size for f in cache_miss)
411 logging.info( 438 logging.info(
412 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', 439 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
413 len(cache_miss), 440 len(cache_miss),
414 cache_miss_size / 1024., 441 cache_miss_size / 1024.,
415 len(cache_miss) * 100. / total, 442 len(cache_miss) * 100. / total,
416 cache_miss_size * 100. / total_size if total_size else 0) 443 cache_miss_size * 100. / total_size if total_size else 0)
417 444
418 def async_push(self, channel, priority, item, expected_size, 445 def async_push(self, channel, item):
419 content_generator, compression_level, push_urls=None): 446 """Starts asynchronous push to the server in a parallel thread.
420 """Starts asynchronous push to the server in a parallel thread.""" 447
448 Arguments:
449 channel: TaskChannel object that receives back |item| when upload ends.
450 item: item to upload as instance of Item class.
451 """
421 def push(content, size): 452 def push(content, size):
422 """Pushes an item and returns its id, to pass as a result to |channel|.""" 453 """Pushes an item and returns its id, to pass as a result to |channel|."""
423 self._storage_api.push(item, size, content, push_urls) 454 self._storage_api.push(item, content, size)
424 return item 455 return item
425 456
426 # If zipping is not required, just start a push task. 457 # If zipping is not required, just start a push task.
427 if not self.use_zip: 458 if not self.use_zip:
428 self.net_thread_pool.add_task_with_channel(channel, priority, push, 459 self.net_thread_pool.add_task_with_channel(channel, item.priority, push,
429 content_generator, expected_size) 460 item.content(DISK_FILE_CHUNK), item.size)
430 return 461 return
431 462
432 # If zipping is enabled, zip in a separate thread. 463 # If zipping is enabled, zip in a separate thread.
433 def zip_and_push(): 464 def zip_and_push():
434 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble 465 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
435 # content right here. It will block until all file is zipped. 466 # content right here. It will block until all file is zipped.
436 try: 467 try:
437 stream = zip_compress(content_generator, compression_level) 468 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
469 item.compression_level)
438 data = ''.join(stream) 470 data = ''.join(stream)
439 except Exception as exc: 471 except Exception as exc:
440 logging.error('Failed to zip \'%s\': %s', item, exc) 472 logging.error('Failed to zip \'%s\': %s', item, exc)
441 channel.send_exception(exc) 473 channel.send_exception(exc)
442 return 474 return
443 self.net_thread_pool.add_task_with_channel(channel, priority, push, 475 self.net_thread_pool.add_task_with_channel(channel, item.priority, push,
444 [data], UNKNOWN_FILE_SIZE) 476 [data], UNKNOWN_FILE_SIZE)
445 self.cpu_thread_pool.add_task(0, zip_and_push) 477 self.cpu_thread_pool.add_task(item.priority, zip_and_push)
446 478
447 def get_missing_files(self, files): 479 def get_missing_items(self, items):
448 """Yields files that are missing from the server. 480 """Yields items that are missing from the server.
449 481
450 Issues multiple parallel queries via StorageApi's 'contains' method. 482 Issues multiple parallel queries via StorageApi's 'contains' method.
451 483
452 Arguments: 484 Arguments:
453 files: a dictionary file name -> metadata dict. 485 items: a list of Item objects to check.
454 486
455 Yields: 487 Yields:
456 Triplets (file name, metadata dict, push_urls object to pass to push). 488 Item objects that are missing from the server.
457 """ 489 """
458 channel = threading_utils.TaskChannel() 490 channel = threading_utils.TaskChannel()
459 pending = 0 491 pending = 0
460 # Enqueue all requests. 492 # Enqueue all requests.
461 for batch in self.batch_files_for_check(files): 493 for batch in self.batch_items_for_check(items):
462 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH, 494 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
463 self._storage_api.contains, batch) 495 self._storage_api.contains, batch)
464 pending += 1 496 pending += 1
465 # Yield results as they come in. 497 # Yield results as they come in.
466 for _ in xrange(pending): 498 for _ in xrange(pending):
467 for missing in channel.pull(): 499 for missing in channel.pull():
468 yield missing 500 yield missing
469 501
470 @staticmethod 502 @staticmethod
471 def batch_files_for_check(files): 503 def batch_items_for_check(items):
472 """Splits list of files to check for existence on the server into batches. 504 """Splits list of items to check for existence on the server into batches.
473 505
474 Each batch corresponds to a single 'exists?' query to the server via a call 506 Each batch corresponds to a single 'exists?' query to the server via a call
475 to StorageApi's 'contains' method. 507 to StorageApi's 'contains' method.
476 508
477 Arguments: 509 Arguments:
478 files: a dictionary file name -> metadata dict. 510 items: a list of Item objects.
479 511
480 Yields: 512 Yields:
481 Batches of files to query for existence in a single operation, 513 Batches of items to query for existence in a single operation,
482 each batch is a list of pairs: (file name, metadata dict). 514 each batch is a list of Item objects.
483 """ 515 """
484 batch_count = 0 516 batch_count = 0
485 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0] 517 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
486 next_queries = [] 518 next_queries = []
487 items = ((k, v) for k, v in files.iteritems() if 's' in v) 519 for item in sorted(items, key=lambda x: x.size, reverse=True):
488 for filename, metadata in sorted(items, key=lambda x: -x[1]['s']): 520 next_queries.append(item)
489 next_queries.append((filename, metadata))
490 if len(next_queries) == batch_size_limit: 521 if len(next_queries) == batch_size_limit:
491 yield next_queries 522 yield next_queries
492 next_queries = [] 523 next_queries = []
493 batch_count += 1 524 batch_count += 1
494 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[ 525 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
495 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)] 526 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
496 if next_queries: 527 if next_queries:
497 yield next_queries 528 yield next_queries
498 529
499 530
500 class StorageApi(object): 531 class StorageApi(object):
501 """Interface for classes that implement low-level storage operations.""" 532 """Interface for classes that implement low-level storage operations."""
502 533
503 def fetch(self, item, expected_size): 534 # TODO(vadimsh): Make 'fetch' use Item abstraction as well.
535
536 def fetch(self, digest, size):
504 """Fetches an object and yields its content. 537 """Fetches an object and yields its content.
505 538
506 Arguments: 539 Arguments:
507 item: hash digest of item to download. 540 digest: hash digest of item to download.
508 expected_size: expected size of the item, to validate it. 541 size: expected size of the item, to validate it.
509 542
510 Yields: 543 Yields:
511 Chunks of downloaded item (as str objects). 544 Chunks of downloaded item (as str objects).
512 """ 545 """
513 raise NotImplementedError() 546 raise NotImplementedError()
514 547
515 def push(self, item, expected_size, content_generator, push_urls=None): 548 def push(self, item, content, size):
516 """Uploads content generated by |content_generator| as |item|. 549 """Uploads an |item| with content generated by |content| generator.
517 550
518 Arguments: 551 Arguments:
519 item: hash digest of item to upload. 552 item: Item object that holds information about an item being pushed.
520 expected_size: total length of the content yielded by |content_generator|. 553 content: a generator that yields chunks to push.
521 content_generator: generator that produces chunks to push. 554 size: expected size of stream produced by |content|.
522 push_urls: optional URLs returned by 'contains' call for this item.
523 555
524 Returns: 556 Returns:
525 None. 557 None.
526 """ 558 """
527 raise NotImplementedError() 559 raise NotImplementedError()
528 560
529 def contains(self, files): 561 def contains(self, items):
530 """Checks for existence of given |files| on the server. 562 """Checks for existence of given |items| on the server.
563
564 Can also mutated state of the items by assigning an opaque implementation
565 specific object to Item's push_state attribute.
531 566
532 Arguments: 567 Arguments:
533 files: list of pairs (file name, metadata dict). 568 items: list of Item objects.
534 569
535 Returns: 570 Returns:
536 A list of files missing on server as a list of triplets 571 A list of items missing on server as a list of Item objects.
537 (file name, metadata dict, push_urls object to pass to push).
538 """ 572 """
539 raise NotImplementedError() 573 raise NotImplementedError()
540 574
541 575
542 class IsolateServer(StorageApi): 576 class IsolateServer(StorageApi):
543 """StorageApi implementation that downloads and uploads to Isolate Server.""" 577 """StorageApi implementation that downloads and uploads to Isolate Server.
578
579 It uploads and downloads directly from Google Storage whenever appropriate.
580 """
581
582 class PushState(object):
583 """A state of push operation carried along with Item in push_state."""
584 def __init__(self, upload_url, finalize_url):
585 self.upload_url = upload_url
586 self.finalize_url = finalize_url
587 self.uploaded = False
588 self.finalized = False
589
544 def __init__(self, base_url, namespace): 590 def __init__(self, base_url, namespace):
545 super(IsolateServer, self).__init__() 591 super(IsolateServer, self).__init__()
546 assert base_url.startswith('http'), base_url 592 assert base_url.startswith('http'), base_url
547 self.content_url = base_url.rstrip('/') + '/content/' 593 self.base_url = base_url.rstrip('/')
548 self.namespace = namespace 594 self.namespace = namespace
549 self.algo = get_hash_algo(namespace) 595 self.algo = get_hash_algo(namespace)
550 self._token = None 596 self._use_zip = is_namespace_with_compression(namespace)
551 self._lock = threading.Lock() 597 self._lock = threading.Lock()
598 self._server_caps = None
599
600 @staticmethod
601 def generate_handshake_request():
602 """Returns a dict to be sent as handshake request body."""
603 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
604 return {
605 'client_app_version': __version__,
606 'fetcher': True,
607 'protocol_version': ISOLATE_PROTOCOL_VERSION,
608 'pusher': True,
609 }
610
611 @staticmethod
612 def validate_handshake_response(caps):
613 """Validates and normalizes handshake response."""
614 logging.info('Protocol version: %s', caps['protocol_version'])
615 logging.info('Server version: %s', caps['server_app_version'])
616 if caps.get('error'):
617 raise MappingError(caps['error'])
618 if not caps['access_token']:
619 raise ValueError('access_token is missing')
620 return caps
552 621
553 @property 622 @property
554 def token(self): 623 def server_capabilities(self):
624 """Performs handshake with the server if not yet done.
625
626 Returns:
627 Server capabilities dictionary as returned by /handshake endpoint.
628
629 Raises:
630 MappingError if server rejects the handshake.
631 """
555 # TODO(maruel): Make this request much earlier asynchronously while the 632 # TODO(maruel): Make this request much earlier asynchronously while the
556 # files are being enumerated. 633 # files are being enumerated.
557 with self._lock: 634 with self._lock:
558 if not self._token: 635 if self._server_caps is None:
559 self._token = urllib.quote(url_read(self.content_url + 'get_token')) 636 request_body = json.dumps(
560 return self._token 637 self.generate_handshake_request(), separators=(',', ':'))
561 638 response = net.url_read(
562 def fetch(self, item, expected_size): 639 url=self.base_url + '/content-gs/handshake',
563 assert isinstance(item, basestring) 640 data=request_body,
564 assert ( 641 content_type='application/json',
565 isinstance(expected_size, (int, long)) or 642 method='POST')
566 expected_size == UNKNOWN_FILE_SIZE) 643 if response is None:
567 zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item) 644 raise MappingError('Failed to perform handshake.')
568 logging.debug('download_file(%s)', zipped_url) 645 try:
646 caps = json.loads(response)
647 if not isinstance(caps, dict):
648 raise ValueError('Expecting JSON dict')
649 self._server_caps = self.validate_handshake_response(caps)
650 except (ValueError, KeyError, TypeError) as exc:
651 # KeyError exception has very confusing str conversion: it's just a
652 # missing key value and nothing else. So print exception class name
653 # as well.
654 raise MappingError('Invalid handshake response (%s): %s' % (
655 exc.__class__.__name__, exc))
656 return self._server_caps
657
658 def fetch(self, digest, size):
659 assert isinstance(digest, basestring)
660 assert (isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE)
661
662 source_url = '%s/content-gs/retrieve/%s/%s' % (
663 self.base_url, self.namespace, digest)
664 logging.debug('download_file(%s)', source_url)
569 665
570 # Because the app engine DB is only eventually consistent, retry 404 errors 666 # Because the app engine DB is only eventually consistent, retry 404 errors
571 # because the file might just not be visible yet (even though it has been 667 # because the file might just not be visible yet (even though it has been
572 # uploaded). 668 # uploaded).
573 connection = net.url_open( 669 connection = net.url_open(
574 zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) 670 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
575 if not connection: 671 if not connection:
576 raise IOError('Unable to open connection to %s' % zipped_url) 672 raise IOError('Unable to open connection to %s' % source_url)
577 673
578 # TODO(maruel): Must only decompress when needed.
579 decompressor = zlib.decompressobj()
580 try: 674 try:
581 compressed_size = 0 675 # Prepare reading pipeline.
582 decompressed_size = 0 676 generator = stream_read(connection, NET_IO_FILE_CHUNK)
583 while True: 677 if self._use_zip:
584 chunk = connection.read(ZIPPED_FILE_CHUNK) 678 generator = zip_decompress(generator, DISK_FILE_CHUNK)
585 if not chunk: 679
586 break 680 # Read and yield data, calculate total length of the decompressed stream.
587 compressed_size += len(chunk) 681 total_size = 0
588 decompressed = decompressor.decompress(chunk) 682 for chunk in generator:
589 decompressed_size += len(decompressed) 683 total_size += len(chunk)
590 yield decompressed 684 yield chunk
591 685
592 # Ensure that all the data was properly decompressed. 686 # Verify data length matches expectation.
593 uncompressed_data = decompressor.flush() 687 if size != UNKNOWN_FILE_SIZE and total_size != size:
594 if uncompressed_data: 688 raise IOError('Incorrect file size: expected %d, got %d' % (
595 raise IOError('Decompression failed') 689 size, total_size))
596 if (expected_size != UNKNOWN_FILE_SIZE and 690
597 decompressed_size != expected_size): 691 except IOError as err:
598 raise IOError('File incorrect size after download of %s. Got %s and ' 692 logging.warning('Failed to fetch %s: %s', digest, err)
599 'expected %s' % (item, decompressed_size, expected_size)) 693 raise
600 except zlib.error as e: 694
601 msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % ( 695 def push(self, item, content, size):
602 item, compressed_size, connection.content_length, e) 696 assert isinstance(item, Item)
603 logging.warning(msg) 697 assert isinstance(item.push_state, IsolateServer.PushState)
604 698 assert not item.push_state.finalized
605 # Testing seems to show that if a few machines are trying to download
606 # the same blob, they can cause each other to fail. So if we hit a zip
607 # error, this is the most likely cause (it only downloads some of the
608 # data). Randomly sleep for between 5 and 25 seconds to try and spread
609 # out the downloads.
610 sleep_duration = (random.random() * 20) + 5
611 time.sleep(sleep_duration)
612 raise IOError(msg)
613
614 def push(self, item, expected_size, content_generator, push_urls=None):
615 assert isinstance(item, basestring)
616 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
617 item = str(item)
618 699
619 # TODO(maruel): Support large files. This would require streaming support. 700 # TODO(maruel): Support large files. This would require streaming support.
620 701
621 # A cheese way to avoid memcpy of (possibly huge) file, until streaming 702 # TODO(vadimsh): Do not read from |content_generator| when retrying push.
622 # upload support is implemented. 703 # If |content_generator| is indeed a generator, it can not be re-winded back
623 if isinstance(content_generator, list) and len(content_generator) == 1: 704 # to the beginning of the stream. A retry will find it exhausted. A possible
624 content = content_generator[0] 705 # solution is to wrap content_generator with some sort of caching
706 # restartable generator. It should be done alongside streaming support
707 # implementation.
708
709 # This push operation may be a retry after failed finalization call below,
710 # no need to reupload contents in that case.
711 if not item.push_state.uploaded:
712 # A cheese way to avoid memcpy of (possibly huge) file, until streaming
713 # upload support is implemented.
714 if isinstance(content, list) and len(content) == 1:
715 content = content[0]
716 else:
717 content = ''.join(content)
718 # PUT file to |upload_url|.
719 response = net.url_read(
720 url=item.push_state.upload_url,
721 data=content,
722 content_type='application/octet-stream',
723 method='PUT')
724 if response is None:
725 raise IOError('Failed to upload a file %s to %s' % (
726 item.digest, item.push_state.upload_url))
727 item.push_state.uploaded = True
625 else: 728 else:
626 content = ''.join(content_generator) 729 logging.info(
627 730 'A file %s already uploaded, retrying finalization only', item.digest)
628 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE: 731
629 return self._upload_hash_content_to_blobstore(item, content) 732 # Optionally notify the server that it's done.
630 733 if item.push_state.finalize_url:
631 url = '%sstore/%s/%s?token=%s' % ( 734 # TODO(vadimsh): Calculate MD5 sum while uploading a file and send it to
632 self.content_url, self.namespace, item, self.token) 735 # isolated server. That way isolate server can verify that the data safely
633 return url_read(url, data=content, content_type='application/octet-stream') 736 # reached Google Storage (GS provides MD5 of stored files).
634 737 response = net.url_read(
635 def contains(self, files): 738 url=item.push_state.finalize_url,
636 logging.info('Checking existence of %d files...', len(files)) 739 data='',
637 740 content_type='application/json',
638 body = ''.join( 741 method='POST')
639 (binascii.unhexlify(metadata['h']) for (_, metadata) in files)) 742 if response is None:
640 assert (len(body) % self.algo().digest_size) == 0, repr(body) 743 raise IOError('Failed to finalize an upload of %s' % item.digest)
641 744 item.push_state.finalized = True
642 query_url = '%scontains/%s?token=%s' % ( 745
643 self.content_url, self.namespace, self.token) 746 def contains(self, items):
644 response = url_read( 747 logging.info('Checking existence of %d files...', len(items))
645 query_url, data=body, content_type='application/octet-stream') 748
646 if len(files) != len(response): 749 # Request body is a json encoded list of dicts.
750 body = [
751 {
752 'h': item.digest,
753 's': item.size,
754 'i': int(item.is_isolated),
755 } for item in items
756 ]
757
758 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
759 self.base_url,
760 self.namespace,
761 urllib.quote(self.server_capabilities['access_token']))
762 response_body = net.url_read(
763 url=query_url,
764 data=json.dumps(body, separators=(',', ':')),
765 content_type='application/json',
766 method='POST')
767 if response_body is None:
768 raise MappingError('Failed to execute /pre-upload query')
769
770 # Response body is a list of push_urls (or null if file is already present).
771 try:
772 response = json.loads(response_body)
773 if not isinstance(response, list):
774 raise ValueError('Expecting response with json-encoded list')
775 if len(response) != len(items):
776 raise ValueError(
777 'Incorrect number of items in the list, expected %d, '
778 'but got %d' % (len(files), len(response)))
779 except ValueError as err:
647 raise MappingError( 780 raise MappingError(
648 'Got an incorrect number of responses from the server. Expected %d, ' 781 'Invalid response from server: %s, body is %s' % (err, response_body))
649 'but got %d' % (len(files), len(response))) 782
650 783 # Pick Items that are missing, attach PushState to them.
651 # This implementation of IsolateServer doesn't use push_urls field, 784 missing_items = []
652 # set it to None. 785 for i, push_urls in enumerate(response):
653 missing_files = [ 786 if push_urls:
654 files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00' 787 assert len(push_urls) == 2, str(push_urls)
655 ] 788 item = items[i]
789 item.push_state = IsolateServer.PushState(push_urls[0], push_urls[1])
790 missing_items.append(item)
656 logging.info('Queried %d files, %d cache hit', 791 logging.info('Queried %d files, %d cache hit',
657 len(files), len(files) - len(missing_files)) 792 len(items), len(items) - len(missing_items))
658 return missing_files 793 return missing_items
659
660 def _upload_hash_content_to_blobstore(self, item, content):
661 """Uploads the content directly to the blobstore via a generated url."""
662 # TODO(maruel): Support large files. This would require streaming support.
663 gen_url = '%sgenerate_blobstore_url/%s/%s' % (
664 self.content_url, self.namespace, item)
665 # Token is guaranteed to be already quoted but it is unnecessary here, and
666 # only here.
667 data = [('token', urllib.unquote(self.token))]
668 content_type, body = encode_multipart_formdata(
669 data, [('content', item, content)])
670 last_url = gen_url
671 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
672 # Retry HTTP 50x here but not 404.
673 upload_url = net.url_read(gen_url, data=data)
674 if not upload_url:
675 raise MappingError('Unable to connect to server %s' % gen_url)
676 last_url = upload_url
677
678 # Do not retry this request on HTTP 50x. Regenerate an upload url each
679 # time since uploading "consumes" the upload url.
680 result = net.url_read(
681 upload_url, data=body, content_type=content_type, retry_50x=False)
682 if result is not None:
683 return result
684 raise MappingError('Unable to connect to server %s' % last_url)
685 794
686 795
687 class FileSystem(StorageApi): 796 class FileSystem(StorageApi):
688 """StorageApi implementation that fetches data from the file system. 797 """StorageApi implementation that fetches data from the file system.
689 798
690 The common use case is a NFS/CIFS file server that is mounted locally that is 799 The common use case is a NFS/CIFS file server that is mounted locally that is
691 used to fetch the file on a local partition. 800 used to fetch the file on a local partition.
692 """ 801 """
802
693 def __init__(self, base_path): 803 def __init__(self, base_path):
694 super(FileSystem, self).__init__() 804 super(FileSystem, self).__init__()
695 self.base_path = base_path 805 self.base_path = base_path
696 806
697 def fetch(self, item, expected_size): 807 def fetch(self, digest, size):
698 assert isinstance(item, basestring) 808 assert isinstance(digest, basestring)
699 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 809 assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
700 source = os.path.join(self.base_path, item) 810 source = os.path.join(self.base_path, digest)
701 if (expected_size != UNKNOWN_FILE_SIZE and 811 if size != UNKNOWN_FILE_SIZE and not is_valid_file(source, size):
702 not is_valid_file(source, expected_size)): 812 raise IOError('Invalid file %s' % digest)
703 raise IOError('Invalid file %s' % item)
704 return file_read(source) 813 return file_read(source)
705 814
706 def push(self, item, expected_size, content_generator, push_urls=None): 815 def push(self, item, content, size):
707 assert isinstance(item, basestring) 816 assert isinstance(item, Item)
708 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 817 assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
709 dest = os.path.join(self.base_path, item) 818 dest = os.path.join(self.base_path, item.digest)
710 total = file_write(dest, content_generator) 819 total = file_write(dest, content)
711 if expected_size != UNKNOWN_FILE_SIZE and total != expected_size: 820 if size != UNKNOWN_FILE_SIZE and total != size:
712 os.remove(dest) 821 os.remove(dest)
713 raise IOError( 822 raise IOError('Invalid file %s, %d != %d' % (item.digest, total, size))
714 'Invalid file %s, %d != %d' % (item, total, expected_size))
715 823
716 def contains(self, files): 824 def contains(self, items):
717 return [ 825 return [
718 (filename, metadata, None) 826 item for item in items
719 for filename, metadata in files 827 if not os.path.exists(os.path.join(self.base_path, item.digest))
720 if not os.path.exists(os.path.join(self.base_path, metadata['h']))
721 ] 828 ]
722 829
723 830
724 def get_hash_algo(_namespace): 831 def get_hash_algo(_namespace):
725 """Return hash algorithm class to use when uploading to given |namespace|.""" 832 """Return hash algorithm class to use when uploading to given |namespace|."""
726 # TODO(vadimsh): Implement this at some point. 833 # TODO(vadimsh): Implement this at some point.
727 return hashlib.sha1 834 return hashlib.sha1
728 835
729 836
730 def is_namespace_with_compression(namespace): 837 def is_namespace_with_compression(namespace):
(...skipping 29 matching lines...) Expand all
760 867
761 868
762 def upload_tree(base_url, indir, infiles, namespace): 869 def upload_tree(base_url, indir, infiles, namespace):
763 """Uploads the given tree to the given url. 870 """Uploads the given tree to the given url.
764 871
765 Arguments: 872 Arguments:
766 base_url: The base url, it is assume that |base_url|/has/ can be used to 873 base_url: The base url, it is assume that |base_url|/has/ can be used to
767 query if an element was already uploaded, and |base_url|/store/ 874 query if an element was already uploaded, and |base_url|/store/
768 can be used to upload a new element. 875 can be used to upload a new element.
769 indir: Root directory the infiles are based in. 876 indir: Root directory the infiles are based in.
770 infiles: dict of files to upload files from |indir| to |base_url|. 877 infiles: dict of files to upload from |indir| to |base_url|.
771 namespace: The namespace to use on the server. 878 namespace: The namespace to use on the server.
772 """ 879 """
773 remote = get_storage_api(base_url, namespace) 880 remote = get_storage_api(base_url, namespace)
774 with Storage(remote, is_namespace_with_compression(namespace)) as storage: 881 with Storage(remote, is_namespace_with_compression(namespace)) as storage:
775 storage.upload_tree(indir, infiles) 882 storage.upload_tree(indir, infiles)
776 return 0 883 return 0
777 884
778 885
779 class MemoryCache(object): 886 class MemoryCache(object):
780 """This class is intended to be usable everywhere the Cache class is. 887 """This class is intended to be usable everywhere the Cache class is.
(...skipping 521 matching lines...) Expand 10 before | Expand all | Expand 10 after
1302 sys.stderr.write(str(e)) 1409 sys.stderr.write(str(e))
1303 sys.stderr.write('\n') 1410 sys.stderr.write('\n')
1304 return 1 1411 return 1
1305 1412
1306 1413
1307 if __name__ == '__main__': 1414 if __name__ == '__main__':
1308 fix_encoding.fix_encoding() 1415 fix_encoding.fix_encoding()
1309 tools.disable_buffering() 1416 tools.disable_buffering()
1310 colorama.init() 1417 colorama.init()
1311 sys.exit(main(sys.argv[1:])) 1418 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698