Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(704)

Side by Side Diff: isolateserver.py

Issue 25093003: Client side implementation of new /content-gs isolate protocol. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: simplified a bit Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Archives a set of files to a server.""" 6 """Archives a set of files to a server."""
7 7
8 __version__ = '0.2' 8 __version__ = '0.2'
9 9
10 import binascii
11 import hashlib 10 import hashlib
12 import json 11 import json
13 import logging 12 import logging
14 import os 13 import os
15 import random
16 import re 14 import re
17 import sys 15 import sys
18 import threading 16 import threading
19 import time 17 import time
20 import urllib 18 import urllib
21 import zlib 19 import zlib
22 20
23 from third_party import colorama 21 from third_party import colorama
24 from third_party.depot_tools import fix_encoding 22 from third_party.depot_tools import fix_encoding
25 from third_party.depot_tools import subcommand 23 from third_party.depot_tools import subcommand
26 24
27 from utils import net 25 from utils import net
28 from utils import threading_utils 26 from utils import threading_utils
29 from utils import tools 27 from utils import tools
30 28
31 29
32 # The minimum size of files to upload directly to the blobstore. 30 # Version of isolate protocol passed to the server in /handshake request.
33 MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024 31 ISOLATE_PROTOCOL_VERSION = '1.0'
34 32
35 # The number of files to check the isolate server per /contains query. 33
34 # The number of files to check the isolate server per /pre-upload query.
36 # All files are sorted by likelihood of a change in the file content 35 # All files are sorted by likelihood of a change in the file content
37 # (currently file size is used to estimate this: larger the file -> larger the 36 # (currently file size is used to estimate this: larger the file -> larger the
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files 37 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
39 # are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1], 38 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
40 # and so on. Numbers here is a trade-off; the more per request, the lower the 39 # and so on. Numbers here is a trade-off; the more per request, the lower the
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand, 40 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42 # larger values cause longer lookups, increasing the initial latency to start 41 # larger values cause longer lookups, increasing the initial latency to start
43 # uploading, which is especially an issue for large files. This value is 42 # uploading, which is especially an issue for large files. This value is
44 # optimized for the "few thousands files to look up with minimal number of large 43 # optimized for the "few thousands files to look up with minimal number of large
45 # files missing" case. 44 # files missing" case.
46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100] 45 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
47 46
48 47
49 # A list of already compressed extension types that should not receive any 48 # A list of already compressed extension types that should not receive any
50 # compression before being uploaded. 49 # compression before being uploaded.
51 ALREADY_COMPRESSED_TYPES = [ 50 ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png', 51 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip' 52 'wav', 'zip'
54 ] 53 ]
55 54
56 55
57 # The file size to be used when we don't know the correct file size, 56 # The file size to be used when we don't know the correct file size,
58 # generally used for .isolated files. 57 # generally used for .isolated files.
59 UNKNOWN_FILE_SIZE = None 58 UNKNOWN_FILE_SIZE = None
60 59
61 60
62 # The size of each chunk to read when downloading and unzipping files. 61 # The size of each chunk to read when downloading and unzipping files.
63 ZIPPED_FILE_CHUNK = 16 * 1024 62 ZIPPED_FILE_CHUNK = 16 * 1024
64 63
65
66 # Chunk size to use when doing disk I/O. 64 # Chunk size to use when doing disk I/O.
67 DISK_FILE_CHUNK = 1024 * 1024 65 DISK_FILE_CHUNK = 1024 * 1024
68 66
67 # Chunk size to use when reading from network stream.
68 NET_IO_FILE_CHUNK = 16 * 1024
69
69 70
70 # Read timeout in seconds for downloads from isolate storage. If there's no 71 # Read timeout in seconds for downloads from isolate storage. If there's no
71 # response from the server within this timeout whole download will be aborted. 72 # response from the server within this timeout whole download will be aborted.
72 DOWNLOAD_READ_TIMEOUT = 60 73 DOWNLOAD_READ_TIMEOUT = 60
73 74
74 # Maximum expected delay (in seconds) between successive file fetches 75 # Maximum expected delay (in seconds) between successive file fetches
75 # in run_tha_test. If it takes longer than that, a deadlock might be happening 76 # in run_tha_test. If it takes longer than that, a deadlock might be happening
76 # and all stack frames for all threads are dumped to log. 77 # and all stack frames for all threads are dumped to log.
77 DEADLOCK_TIMEOUT = 5 * 60 78 DEADLOCK_TIMEOUT = 5 * 60
78 79
(...skipping 20 matching lines...) Expand all
99 class ConfigError(ValueError): 100 class ConfigError(ValueError):
100 """Generic failure to load a .isolated file.""" 101 """Generic failure to load a .isolated file."""
101 pass 102 pass
102 103
103 104
104 class MappingError(OSError): 105 class MappingError(OSError):
105 """Failed to recreate the tree.""" 106 """Failed to recreate the tree."""
106 pass 107 pass
107 108
108 109
109 def randomness():
110 """Generates low-entropy randomness for MIME encoding.
111
112 Exists so it can be mocked out in unit tests.
113 """
114 return str(time.time())
115
116
117 def encode_multipart_formdata(fields, files,
118 mime_mapper=lambda _: 'application/octet-stream'):
119 """Encodes a Multipart form data object.
120
121 Args:
122 fields: a sequence (name, value) elements for
123 regular form fields.
124 files: a sequence of (name, filename, value) elements for data to be
125 uploaded as files.
126 mime_mapper: function to return the mime type from the filename.
127 Returns:
128 content_type: for httplib.HTTP instance
129 body: for httplib.HTTP instance
130 """
131 boundary = hashlib.md5(randomness()).hexdigest()
132 body_list = []
133 for (key, value) in fields:
134 if isinstance(key, unicode):
135 value = key.encode('utf-8')
136 if isinstance(value, unicode):
137 value = value.encode('utf-8')
138 body_list.append('--' + boundary)
139 body_list.append('Content-Disposition: form-data; name="%s"' % key)
140 body_list.append('')
141 body_list.append(value)
142 body_list.append('--' + boundary)
143 body_list.append('')
144 for (key, filename, value) in files:
145 if isinstance(key, unicode):
146 value = key.encode('utf-8')
147 if isinstance(filename, unicode):
148 value = filename.encode('utf-8')
149 if isinstance(value, unicode):
150 value = value.encode('utf-8')
151 body_list.append('--' + boundary)
152 body_list.append('Content-Disposition: form-data; name="%s"; '
153 'filename="%s"' % (key, filename))
154 body_list.append('Content-Type: %s' % mime_mapper(filename))
155 body_list.append('')
156 body_list.append(value)
157 body_list.append('--' + boundary)
158 body_list.append('')
159 if body_list:
160 body_list[-2] += '--'
161 body = '\r\n'.join(body_list)
162 content_type = 'multipart/form-data; boundary=%s' % boundary
163 return content_type, body
164
165
166 def is_valid_hash(value, algo): 110 def is_valid_hash(value, algo):
167 """Returns if the value is a valid hash for the corresponding algorithm.""" 111 """Returns if the value is a valid hash for the corresponding algorithm."""
168 size = 2 * algo().digest_size 112 size = 2 * algo().digest_size
169 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value)) 113 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
170 114
171 115
172 def hash_file(filepath, algo): 116 def hash_file(filepath, algo):
173 """Calculates the hash of a file without reading it all in memory at once. 117 """Calculates the hash of a file without reading it all in memory at once.
174 118
175 |algo| should be one of hashlib hashing algorithm. 119 |algo| should be one of hashlib hashing algorithm.
176 """ 120 """
177 digest = algo() 121 digest = algo()
178 with open(filepath, 'rb') as f: 122 with open(filepath, 'rb') as f:
179 while True: 123 while True:
180 chunk = f.read(DISK_FILE_CHUNK) 124 chunk = f.read(DISK_FILE_CHUNK)
181 if not chunk: 125 if not chunk:
182 break 126 break
183 digest.update(chunk) 127 digest.update(chunk)
184 return digest.hexdigest() 128 return digest.hexdigest()
185 129
186 130
131 def stream_read(stream, chunk_size):
132 """Reads chunks from |stream| and yields them."""
133 while True:
134 data = stream.read(chunk_size)
135 if not data:
136 break
137 yield data
138
139
187 def file_read(filepath, chunk_size=DISK_FILE_CHUNK): 140 def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
188 """Yields file content in chunks of given |chunk_size|.""" 141 """Yields file content in chunks of given |chunk_size|."""
189 with open(filepath, 'rb') as f: 142 with open(filepath, 'rb') as f:
190 while True: 143 while True:
191 data = f.read(chunk_size) 144 data = f.read(chunk_size)
192 if not data: 145 if not data:
193 break 146 break
194 yield data 147 yield data
195 148
196 149
(...skipping 22 matching lines...) Expand all
219 compressor = zlib.compressobj(level) 172 compressor = zlib.compressobj(level)
220 for chunk in content_generator: 173 for chunk in content_generator:
221 compressed = compressor.compress(chunk) 174 compressed = compressor.compress(chunk)
222 if compressed: 175 if compressed:
223 yield compressed 176 yield compressed
224 tail = compressor.flush(zlib.Z_FINISH) 177 tail = compressor.flush(zlib.Z_FINISH)
225 if tail: 178 if tail:
226 yield tail 179 yield tail
227 180
228 181
182 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
183 """Reads zipped data from |content_generator| and yields decompressed data.
184
185 Decompresses data in small chunks (no larger than |chunk_size|) so that
186 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
187
188 Raises IOError if data is corrupted or incomplete.
189 """
190 decompressor = zlib.decompressobj()
191 compressed_size = 0
192 try:
193 for chunk in content_generator:
194 compressed_size += len(chunk)
195 data = decompressor.decompress(chunk, chunk_size)
196 if data:
197 yield data
198 while decompressor.unconsumed_tail:
199 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
200 if data:
201 yield data
202 tail = decompressor.flush()
203 if tail:
204 yield tail
205 except zlib.error as e:
206 raise IOError(
207 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
208 # Ensure all data was read and decompressed.
209 if decompressor.unused_data or decompressor.unconsumed_tail:
210 raise IOError('Not all data was decompressed')
211
212
229 def get_zip_compression_level(filename): 213 def get_zip_compression_level(filename):
230 """Given a filename calculates the ideal zip compression level to use.""" 214 """Given a filename calculates the ideal zip compression level to use."""
231 file_ext = os.path.splitext(filename)[1].lower() 215 file_ext = os.path.splitext(filename)[1].lower()
232 # TODO(csharp): Profile to find what compression level works best. 216 # TODO(csharp): Profile to find what compression level works best.
233 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 217 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
234 218
235 219
236 def create_directories(base_directory, files): 220 def create_directories(base_directory, files):
237 """Creates the directory structure needed by the given list of files.""" 221 """Creates the directory structure needed by the given list of files."""
238 logging.debug('create_directories(%s, %d)', base_directory, len(files)) 222 logging.debug('create_directories(%s, %d)', base_directory, len(files))
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
294 278
295 279
296 def try_remove(filepath): 280 def try_remove(filepath):
297 """Removes a file without crashing even if it doesn't exist.""" 281 """Removes a file without crashing even if it doesn't exist."""
298 try: 282 try:
299 os.remove(filepath) 283 os.remove(filepath)
300 except OSError: 284 except OSError:
301 pass 285 pass
302 286
303 287
304 def url_read(url, **kwargs): 288 class Item(object):
305 result = net.url_read(url, **kwargs) 289 """An item to push to Storage.
306 if result is None: 290
307 # If we get no response from the server, assume it is down and raise an 291 It starts its life in a main thread, travels to 'contains' thread, then to
308 # exception. 292 'push' thread and then finally back to the main thread.
309 raise MappingError('Unable to connect to server %s' % url) 293
310 return result 294 It is never used concurrently from multiple threads.
295 """
296
297 def __init__(self, digest, size, is_isolated):
298 self.digest = digest
299 self.size = size
300 self.is_isolated = is_isolated
301 self.compression_level = 6
302 self.push_state = None
303
304 def content(self, chunk_size):
305 """Generator that produces content of this item in chunks of given size."""
306 raise NotImplementedError()
307
308
309 class FileItem(Item):
310 """A file to push to Storage."""
311
312 def __init__(self, path, digest, size, is_isolated):
313 super(FileItem, self).__init__(digest, size, is_isolated)
314 self.path = path
315 self.compression_level = get_zip_compression_level(path)
316
317 def content(self, chunk_size):
318 return file_read(self.path, chunk_size)
311 319
312 320
313 class Storage(object): 321 class Storage(object):
314 """Efficiently downloads or uploads large set of files via StorageApi.""" 322 """Efficiently downloads or uploads large set of files via StorageApi."""
315 323
316 def __init__(self, storage_api, use_zip): 324 def __init__(self, storage_api, use_zip):
317 self.use_zip = use_zip 325 self.use_zip = use_zip
318 self._storage_api = storage_api 326 self._storage_api = storage_api
319 self._cpu_thread_pool = None 327 self._cpu_thread_pool = None
320 self._net_thread_pool = None 328 self._net_thread_pool = None
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
356 364
357 def upload_tree(self, indir, infiles): 365 def upload_tree(self, indir, infiles):
358 """Uploads the given tree to the isolate server. 366 """Uploads the given tree to the isolate server.
359 367
360 Arguments: 368 Arguments:
361 indir: root directory the infiles are based in. 369 indir: root directory the infiles are based in.
362 infiles: dict of files to upload from |indir|. 370 infiles: dict of files to upload from |indir|.
363 """ 371 """
364 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles)) 372 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
365 373
374 # TODO(vadimsh): Introduce Item as a part of the public interface?
M-A Ruel 2013/09/30 22:36:33 I think it'd be better if the caller created the i
Vadim Sh. 2013/09/30 23:07:25 Yes. I agree. I left it for future CLs.
375
376 # Convert |indir| + |infiles| into a list of FileItem objects.
377 # Filter out symlinks, since they are not represented by items on isolate
378 # server side.
379 items = [
380 FileItem(
381 path=os.path.join(indir, filepath),
382 digest=metadata['h'],
383 size=metadata['s'],
384 is_isolated=metadata.get('priority') == '0')
385 for filepath, metadata in infiles.iteritems()
386 if 'l' not in metadata
387 ]
388
366 # Enqueue all upload tasks. 389 # Enqueue all upload tasks.
390 missing = set()
367 channel = threading_utils.TaskChannel() 391 channel = threading_utils.TaskChannel()
368 missing = [] 392 for missing_item in self.get_missing_items(items):
369 for filename, metadata, push_urls in self.get_missing_files(infiles): 393 missing.add(missing_item)
370 missing.append((filename, metadata)) 394 self.async_push(
371 path = os.path.join(indir, filename) 395 channel,
372 if metadata.get('priority', '1') == '0': 396 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
373 priority = WorkerPool.HIGH 397 missing_item)
374 else:
375 priority = WorkerPool.MED
376 compression_level = get_zip_compression_level(path)
377 chunk_size = ZIPPED_FILE_CHUNK if self.use_zip else DISK_FILE_CHUNK
378 content = file_read(path, chunk_size)
379 self.async_push(channel, priority, metadata['h'], metadata['s'],
380 content, compression_level, push_urls)
381 398
382 # No need to spawn deadlock detector thread if there's nothing to upload. 399 # No need to spawn deadlock detector thread if there's nothing to upload.
383 if missing: 400 if missing:
384 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: 401 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
385 # Wait for all started uploads to finish. 402 # Wait for all started uploads to finish.
386 uploaded = 0 403 uploaded = 0
387 while uploaded != len(missing): 404 while uploaded != len(missing):
388 detector.ping() 405 detector.ping()
389 item = channel.pull() 406 item = channel.pull()
390 uploaded += 1 407 uploaded += 1
391 logging.debug('Uploaded %d / %d: %s', uploaded, len(missing), item) 408 logging.debug(
409 'Uploaded %d / %d: %s', uploaded, len(missing), item.digest)
392 logging.info('All files are uploaded') 410 logging.info('All files are uploaded')
393 411
394 # Print stats. 412 # Print stats.
395 total = len(infiles) 413 total = len(items)
396 total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues()) 414 total_size = sum(f.size for f in items)
397 logging.info( 415 logging.info(
398 'Total: %6d, %9.1fkb', 416 'Total: %6d, %9.1fkb',
399 total, 417 total,
400 sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.) 418 total_size / 1024.)
401 cache_hit = set(infiles.iterkeys()) - set(x[0] for x in missing) 419 cache_hit = set(items) - missing
402 cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit) 420 cache_hit_size = sum(f.size for f in cache_hit)
403 logging.info( 421 logging.info(
404 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', 422 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
405 len(cache_hit), 423 len(cache_hit),
406 cache_hit_size / 1024., 424 cache_hit_size / 1024.,
407 len(cache_hit) * 100. / total, 425 len(cache_hit) * 100. / total,
408 cache_hit_size * 100. / total_size if total_size else 0) 426 cache_hit_size * 100. / total_size if total_size else 0)
409 cache_miss = missing 427 cache_miss = missing
410 cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss) 428 cache_miss_size = sum(f.size for f in cache_miss)
411 logging.info( 429 logging.info(
412 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size', 430 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
413 len(cache_miss), 431 len(cache_miss),
414 cache_miss_size / 1024., 432 cache_miss_size / 1024.,
415 len(cache_miss) * 100. / total, 433 len(cache_miss) * 100. / total,
416 cache_miss_size * 100. / total_size if total_size else 0) 434 cache_miss_size * 100. / total_size if total_size else 0)
417 435
418 def async_push(self, channel, priority, item, expected_size, 436 def async_push(self, channel, priority, item):
419 content_generator, compression_level, push_urls=None): 437 """Starts asynchronous push to the server in a parallel thread.
420 """Starts asynchronous push to the server in a parallel thread.""" 438
439 Arguments:
440 channel: TaskChannel object that receives back |item| when upload ends.
441 priority: thread pool task priority for the push.
442 item: item to upload as instance of Item class.
443 """
421 def push(content, size): 444 def push(content, size):
422 """Pushes an item and returns its id, to pass as a result to |channel|.""" 445 """Pushes an item and returns its id, to pass as a result to |channel|."""
423 self._storage_api.push(item, size, content, push_urls) 446 self._storage_api.push(item, content, size)
424 return item 447 return item
425 448
426 # If zipping is not required, just start a push task. 449 # If zipping is not required, just start a push task.
427 if not self.use_zip: 450 if not self.use_zip:
428 self.net_thread_pool.add_task_with_channel(channel, priority, push, 451 self.net_thread_pool.add_task_with_channel(channel, item.priority, push,
429 content_generator, expected_size) 452 item.content(DISK_FILE_CHUNK), item.size)
430 return 453 return
431 454
432 # If zipping is enabled, zip in a separate thread. 455 # If zipping is enabled, zip in a separate thread.
433 def zip_and_push(): 456 def zip_and_push():
434 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble 457 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
435 # content right here. It will block until all file is zipped. 458 # content right here. It will block until all file is zipped.
436 try: 459 try:
437 stream = zip_compress(content_generator, compression_level) 460 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
461 item.compression_level)
438 data = ''.join(stream) 462 data = ''.join(stream)
439 except Exception as exc: 463 except Exception as exc:
440 logging.error('Failed to zip \'%s\': %s', item, exc) 464 logging.error('Failed to zip \'%s\': %s', item, exc)
441 channel.send_exception(exc) 465 channel.send_exception(exc)
442 return 466 return
443 self.net_thread_pool.add_task_with_channel(channel, priority, push, 467 self.net_thread_pool.add_task_with_channel(channel, priority, push,
444 [data], UNKNOWN_FILE_SIZE) 468 [data], UNKNOWN_FILE_SIZE)
445 self.cpu_thread_pool.add_task(0, zip_and_push) 469 self.cpu_thread_pool.add_task(priority, zip_and_push)
446 470
447 def get_missing_files(self, files): 471 def get_missing_items(self, items):
448 """Yields files that are missing from the server. 472 """Yields items that are missing from the server.
449 473
450 Issues multiple parallel queries via StorageApi's 'contains' method. 474 Issues multiple parallel queries via StorageApi's 'contains' method.
451 475
452 Arguments: 476 Arguments:
453 files: a dictionary file name -> metadata dict. 477 items: a list of Item objects to check.
454 478
455 Yields: 479 Yields:
456 Triplets (file name, metadata dict, push_urls object to pass to push). 480 Item objects that are missing from the server.
457 """ 481 """
458 channel = threading_utils.TaskChannel() 482 channel = threading_utils.TaskChannel()
459 pending = 0 483 pending = 0
460 # Enqueue all requests. 484 # Enqueue all requests.
461 for batch in self.batch_files_for_check(files): 485 for batch in self.batch_items_for_check(items):
462 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH, 486 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
463 self._storage_api.contains, batch) 487 self._storage_api.contains, batch)
464 pending += 1 488 pending += 1
465 # Yield results as they come in. 489 # Yield results as they come in.
466 for _ in xrange(pending): 490 for _ in xrange(pending):
467 for missing in channel.pull(): 491 for missing in channel.pull():
468 yield missing 492 yield missing
469 493
470 @staticmethod 494 @staticmethod
471 def batch_files_for_check(files): 495 def batch_items_for_check(items):
472 """Splits list of files to check for existence on the server into batches. 496 """Splits list of items to check for existence on the server into batches.
473 497
474 Each batch corresponds to a single 'exists?' query to the server via a call 498 Each batch corresponds to a single 'exists?' query to the server via a call
475 to StorageApi's 'contains' method. 499 to StorageApi's 'contains' method.
476 500
477 Arguments: 501 Arguments:
478 files: a dictionary file name -> metadata dict. 502 items: a list of Item objects.
479 503
480 Yields: 504 Yields:
481 Batches of files to query for existence in a single operation, 505 Batches of items to query for existence in a single operation,
482 each batch is a list of pairs: (file name, metadata dict). 506 each batch is a list of Item objects.
483 """ 507 """
484 batch_count = 0 508 batch_count = 0
485 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0] 509 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
486 next_queries = [] 510 next_queries = []
487 items = ((k, v) for k, v in files.iteritems() if 's' in v) 511 for item in sorted(items, key=lambda x: x.size, reverse=True):
488 for filename, metadata in sorted(items, key=lambda x: -x[1]['s']): 512 next_queries.append(item)
489 next_queries.append((filename, metadata))
490 if len(next_queries) == batch_size_limit: 513 if len(next_queries) == batch_size_limit:
491 yield next_queries 514 yield next_queries
492 next_queries = [] 515 next_queries = []
493 batch_count += 1 516 batch_count += 1
494 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[ 517 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
495 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)] 518 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
496 if next_queries: 519 if next_queries:
497 yield next_queries 520 yield next_queries
498 521
499 522
500 class StorageApi(object): 523 class StorageApi(object):
501 """Interface for classes that implement low-level storage operations.""" 524 """Interface for classes that implement low-level storage operations."""
502 525
503 def fetch(self, item, expected_size): 526 # TODO(vadimsh): Make 'fetch' use Item abstraction as well.
M-A Ruel 2013/09/30 22:36:33 I don't think it's necessary. Personally I'd remov
Vadim Sh. 2013/09/30 23:07:25 Done.
527
528 def fetch(self, digest, size):
504 """Fetches an object and yields its content. 529 """Fetches an object and yields its content.
505 530
506 Arguments: 531 Arguments:
507 item: hash digest of item to download. 532 digest: hash digest of item to download.
508 expected_size: expected size of the item, to validate it. 533 size: expected size of the item, to validate it.
509 534
510 Yields: 535 Yields:
511 Chunks of downloaded item (as str objects). 536 Chunks of downloaded item (as str objects).
512 """ 537 """
513 raise NotImplementedError() 538 raise NotImplementedError()
514 539
515 def push(self, item, expected_size, content_generator, push_urls=None): 540 def push(self, item, content, size):
516 """Uploads content generated by |content_generator| as |item|. 541 """Uploads an |item| with content generated by |content| generator.
517 542
518 Arguments: 543 Arguments:
519 item: hash digest of item to upload. 544 item: Item object that holds information about an item being pushed.
520 expected_size: total length of the content yielded by |content_generator|. 545 content: a generator that yields chunks to push.
521 content_generator: generator that produces chunks to push. 546 size: expected size of stream produced by |content|.
522 push_urls: optional URLs returned by 'contains' call for this item.
523 547
524 Returns: 548 Returns:
525 None. 549 None.
526 """ 550 """
527 raise NotImplementedError() 551 raise NotImplementedError()
528 552
529 def contains(self, files): 553 def contains(self, items):
530 """Checks for existence of given |files| on the server. 554 """Checks for existence of given |items| on the server.
555
556 Can also mutated state of the items by assigning an opaque implementation
557 specific object to Item's push_state attribute.
531 558
532 Arguments: 559 Arguments:
533 files: list of pairs (file name, metadata dict). 560 items: list of Item objects.
534 561
535 Returns: 562 Returns:
536 A list of files missing on server as a list of triplets 563 A list of items missing on server as a list of Item objects.
537 (file name, metadata dict, push_urls object to pass to push).
538 """ 564 """
539 raise NotImplementedError() 565 raise NotImplementedError()
540 566
541 567
542 class IsolateServer(StorageApi): 568 class IsolateServer(StorageApi):
543 """StorageApi implementation that downloads and uploads to Isolate Server.""" 569 """StorageApi implementation that downloads and uploads to Isolate Server.
570
571 It uploads and downloads directly from Google Storage whenever appropriate.
572 """
573
574 class PushState(object):
575 """A state of push operation carried along with Item in push_state."""
576 def __init__(self, upload_url, finalize_url):
577 self.upload_url = upload_url
578 self.finalize_url = finalize_url
579 self.uploaded = False
580 self.finalized = False
581
544 def __init__(self, base_url, namespace): 582 def __init__(self, base_url, namespace):
545 super(IsolateServer, self).__init__() 583 super(IsolateServer, self).__init__()
546 assert base_url.startswith('http'), base_url 584 assert base_url.startswith('http'), base_url
547 self.content_url = base_url.rstrip('/') + '/content/' 585 self.base_url = base_url.rstrip('/')
548 self.namespace = namespace 586 self.namespace = namespace
549 self.algo = get_hash_algo(namespace) 587 self.algo = get_hash_algo(namespace)
550 self._token = None 588 self._use_zip = is_namespace_with_compression(namespace)
551 self._lock = threading.Lock() 589 self._lock = threading.Lock()
590 self._server_caps = None
591
592 @staticmethod
593 def generate_handshake_request():
594 """Returns a dict to be sent as handshake request body."""
595 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
596 return {
597 'client_app_version': __version__,
598 'fetcher': True,
599 'protocol_version': ISOLATE_PROTOCOL_VERSION,
600 'pusher': True,
601 }
602
603 @staticmethod
604 def validate_handshake_response(caps):
605 """Validates and normalizes handshake response."""
606 logging.info('Protocol version: %s', caps['protocol_version'])
607 logging.info('Server version: %s', caps['server_app_version'])
608 if caps.get('error'):
609 raise MappingError(caps['error'])
610 if not caps['access_token']:
611 raise ValueError('access_token is missing')
612 return caps
552 613
553 @property 614 @property
554 def token(self): 615 def server_capabilities(self):
616 """Performs handshake with the server if not yet done.
617
618 Returns:
619 Server capabilities dictionary as returned by /handshake endpoint.
620
621 Raises:
622 MappingError if server rejects the handshake.
623 """
555 # TODO(maruel): Make this request much earlier asynchronously while the 624 # TODO(maruel): Make this request much earlier asynchronously while the
556 # files are being enumerated. 625 # files are being enumerated.
557 with self._lock: 626 with self._lock:
558 if not self._token: 627 if self._server_caps is None:
559 self._token = urllib.quote(url_read(self.content_url + 'get_token')) 628 request_body = json.dumps(
560 return self._token 629 self.generate_handshake_request(), separators=(',', ':'))
561 630 response = net.url_read(
562 def fetch(self, item, expected_size): 631 url=self.base_url + '/content-gs/handshake',
563 assert isinstance(item, basestring) 632 data=request_body,
564 assert ( 633 content_type='application/json',
565 isinstance(expected_size, (int, long)) or 634 method='POST')
M-A Ruel 2013/09/30 22:36:33 optional nit: method is not strictly necessary
Vadim Sh. 2013/09/30 23:07:25 I keep it here for uniformity with rest of calls i
566 expected_size == UNKNOWN_FILE_SIZE) 635 if response is None:
567 zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item) 636 raise MappingError('Failed to perform handshake.')
568 logging.debug('download_file(%s)', zipped_url) 637 try:
638 caps = json.loads(response)
639 if not isinstance(caps, dict):
640 raise ValueError('Expecting JSON dict')
641 self._server_caps = self.validate_handshake_response(caps)
642 except (ValueError, KeyError, TypeError) as exc:
643 # KeyError exception has very confusing str conversion: it's just a
644 # missing key value and nothing else. So print exception class name
645 # as well.
646 raise MappingError('Invalid handshake response (%s): %s' % (
647 exc.__class__.__name__, exc))
648 return self._server_caps
649
650 def fetch(self, digest, size):
651 assert isinstance(digest, basestring)
652 assert (isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE)
653
654 source_url = '%s/content-gs/retrieve/%s/%s' % (
655 self.base_url, self.namespace, digest)
656 logging.debug('download_file(%s)', source_url)
569 657
570 # Because the app engine DB is only eventually consistent, retry 404 errors 658 # Because the app engine DB is only eventually consistent, retry 404 errors
571 # because the file might just not be visible yet (even though it has been 659 # because the file might just not be visible yet (even though it has been
572 # uploaded). 660 # uploaded).
573 connection = net.url_open( 661 connection = net.url_open(
574 zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT) 662 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
575 if not connection: 663 if not connection:
576 raise IOError('Unable to open connection to %s' % zipped_url) 664 raise IOError('Unable to open connection to %s' % source_url)
577 665
578 # TODO(maruel): Must only decompress when needed.
579 decompressor = zlib.decompressobj()
580 try: 666 try:
581 compressed_size = 0 667 # Prepare reading pipeline.
582 decompressed_size = 0 668 generator = stream_read(connection, NET_IO_FILE_CHUNK)
583 while True: 669 if self._use_zip:
584 chunk = connection.read(ZIPPED_FILE_CHUNK) 670 generator = zip_decompress(generator, DISK_FILE_CHUNK)
585 if not chunk: 671
586 break 672 # Read and yield data, calculate total length of the decompressed stream.
587 compressed_size += len(chunk) 673 total_size = 0
588 decompressed = decompressor.decompress(chunk) 674 for chunk in generator:
589 decompressed_size += len(decompressed) 675 total_size += len(chunk)
590 yield decompressed 676 yield chunk
591 677
592 # Ensure that all the data was properly decompressed. 678 # Verify data length matches expectation.
593 uncompressed_data = decompressor.flush() 679 if size != UNKNOWN_FILE_SIZE and total_size != size:
594 if uncompressed_data: 680 raise IOError('Incorrect file size: expected %d, got %d' % (
595 raise IOError('Decompression failed') 681 size, total_size))
596 if (expected_size != UNKNOWN_FILE_SIZE and 682
597 decompressed_size != expected_size): 683 except IOError as err:
598 raise IOError('File incorrect size after download of %s. Got %s and ' 684 logging.warning('Failed to fetch %s: %s', digest, err)
599 'expected %s' % (item, decompressed_size, expected_size)) 685 raise
600 except zlib.error as e: 686
601 msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % ( 687 def push(self, item, content, size):
602 item, compressed_size, connection.content_length, e) 688 assert isinstance(item, Item)
603 logging.warning(msg) 689 assert isinstance(item.push_state, IsolateServer.PushState)
604 690 assert not item.push_state.finalized
605 # Testing seems to show that if a few machines are trying to download
606 # the same blob, they can cause each other to fail. So if we hit a zip
607 # error, this is the most likely cause (it only downloads some of the
608 # data). Randomly sleep for between 5 and 25 seconds to try and spread
609 # out the downloads.
610 sleep_duration = (random.random() * 20) + 5
611 time.sleep(sleep_duration)
612 raise IOError(msg)
613
614 def push(self, item, expected_size, content_generator, push_urls=None):
615 assert isinstance(item, basestring)
616 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
617 item = str(item)
618 691
619 # TODO(maruel): Support large files. This would require streaming support. 692 # TODO(maruel): Support large files. This would require streaming support.
M-A Ruel 2013/09/30 22:36:33 I think you can remove the todo.
Vadim Sh. 2013/09/30 23:07:25 Done.
620 693
621 # A cheese way to avoid memcpy of (possibly huge) file, until streaming 694 # TODO(vadimsh): Do not read from |content_generator| when retrying push.
622 # upload support is implemented. 695 # If |content_generator| is indeed a generator, it can not be re-winded back
623 if isinstance(content_generator, list) and len(content_generator) == 1: 696 # to the beginning of the stream. A retry will find it exhausted. A possible
624 content = content_generator[0] 697 # solution is to wrap content_generator with some sort of caching
698 # restartable generator. It should be done alongside streaming support
699 # implementation.
700
701 # This push operation may be a retry after failed finalization call below,
702 # no need to reupload contents in that case.
703 if not item.push_state.uploaded:
704 # A cheese way to avoid memcpy of (possibly huge) file, until streaming
M-A Ruel 2013/09/30 22:36:33 s/cheese/cheezy/ :)
Vadim Sh. 2013/09/30 23:07:25 Done.
705 # upload support is implemented.
706 if isinstance(content, list) and len(content) == 1:
707 content = content[0]
708 else:
709 content = ''.join(content)
710 # PUT file to |upload_url|.
711 response = net.url_read(
712 url=item.push_state.upload_url,
713 data=content,
714 content_type='application/octet-stream',
715 method='PUT')
716 if response is None:
717 raise IOError('Failed to upload a file %s to %s' % (
718 item.digest, item.push_state.upload_url))
719 item.push_state.uploaded = True
625 else: 720 else:
626 content = ''.join(content_generator) 721 logging.info(
627 722 'A file %s already uploaded, retrying finalization only', item.digest)
628 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE: 723
629 return self._upload_hash_content_to_blobstore(item, content) 724 # Optionally notify the server that it's done.
630 725 if item.push_state.finalize_url:
631 url = '%sstore/%s/%s?token=%s' % ( 726 # TODO(vadimsh): Calculate MD5 sum while uploading a file and send it to
M-A Ruel 2013/09/30 22:36:33 IIRC, Google Storage only calculates the MD5 for s
Vadim Sh. 2013/09/30 23:07:25 Hmm... Docs say CRC32 is available for all objects
632 self.content_url, self.namespace, item, self.token) 727 # isolated server. That way isolate server can verify that the data safely
633 return url_read(url, data=content, content_type='application/octet-stream') 728 # reached Google Storage (GS provides MD5 of stored files).
634 729 response = net.url_read(
635 def contains(self, files): 730 url=item.push_state.finalize_url,
636 logging.info('Checking existence of %d files...', len(files)) 731 data='',
637 732 content_type='application/json',
638 body = ''.join( 733 method='POST')
639 (binascii.unhexlify(metadata['h']) for (_, metadata) in files)) 734 if response is None:
640 assert (len(body) % self.algo().digest_size) == 0, repr(body) 735 raise IOError('Failed to finalize an upload of %s' % item.digest)
641 736 item.push_state.finalized = True
642 query_url = '%scontains/%s?token=%s' % ( 737
643 self.content_url, self.namespace, self.token) 738 def contains(self, items):
644 response = url_read( 739 logging.info('Checking existence of %d files...', len(items))
645 query_url, data=body, content_type='application/octet-stream') 740
646 if len(files) != len(response): 741 # Request body is a json encoded list of dicts.
742 body = [
743 {
744 'h': item.digest,
745 's': item.size,
746 'i': int(item.is_isolated),
747 } for item in items
748 ]
749
750 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
751 self.base_url,
752 self.namespace,
753 urllib.quote(self.server_capabilities['access_token']))
754 response_body = net.url_read(
755 url=query_url,
756 data=json.dumps(body, separators=(',', ':')),
757 content_type='application/json',
758 method='POST')
759 if response_body is None:
760 raise MappingError('Failed to execute /pre-upload query')
761
762 # Response body is a list of push_urls (or null if file is already present).
763 try:
764 response = json.loads(response_body)
765 if not isinstance(response, list):
766 raise ValueError('Expecting response with json-encoded list')
767 if len(response) != len(items):
768 raise ValueError(
769 'Incorrect number of items in the list, expected %d, '
770 'but got %d' % (len(files), len(response)))
771 except ValueError as err:
647 raise MappingError( 772 raise MappingError(
648 'Got an incorrect number of responses from the server. Expected %d, ' 773 'Invalid response from server: %s, body is %s' % (err, response_body))
649 'but got %d' % (len(files), len(response))) 774
650 775 # Pick Items that are missing, attach PushState to them.
651 # This implementation of IsolateServer doesn't use push_urls field, 776 missing_items = []
652 # set it to None. 777 for i, push_urls in enumerate(response):
653 missing_files = [ 778 if push_urls:
654 files[i] + (None,) for i, flag in enumerate(response) if flag == '\x00' 779 assert len(push_urls) == 2, str(push_urls)
655 ] 780 item = items[i]
781 assert item.push_state is None
782 item.push_state = IsolateServer.PushState(push_urls[0], push_urls[1])
783 missing_items.append(item)
656 logging.info('Queried %d files, %d cache hit', 784 logging.info('Queried %d files, %d cache hit',
657 len(files), len(files) - len(missing_files)) 785 len(items), len(items) - len(missing_items))
658 return missing_files 786 return missing_items
659
660 def _upload_hash_content_to_blobstore(self, item, content):
661 """Uploads the content directly to the blobstore via a generated url."""
662 # TODO(maruel): Support large files. This would require streaming support.
663 gen_url = '%sgenerate_blobstore_url/%s/%s' % (
664 self.content_url, self.namespace, item)
665 # Token is guaranteed to be already quoted but it is unnecessary here, and
666 # only here.
667 data = [('token', urllib.unquote(self.token))]
668 content_type, body = encode_multipart_formdata(
669 data, [('content', item, content)])
670 last_url = gen_url
671 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
672 # Retry HTTP 50x here but not 404.
673 upload_url = net.url_read(gen_url, data=data)
674 if not upload_url:
675 raise MappingError('Unable to connect to server %s' % gen_url)
676 last_url = upload_url
677
678 # Do not retry this request on HTTP 50x. Regenerate an upload url each
679 # time since uploading "consumes" the upload url.
680 result = net.url_read(
681 upload_url, data=body, content_type=content_type, retry_50x=False)
682 if result is not None:
683 return result
684 raise MappingError('Unable to connect to server %s' % last_url)
685 787
686 788
687 class FileSystem(StorageApi): 789 class FileSystem(StorageApi):
688 """StorageApi implementation that fetches data from the file system. 790 """StorageApi implementation that fetches data from the file system.
689 791
690 The common use case is a NFS/CIFS file server that is mounted locally that is 792 The common use case is a NFS/CIFS file server that is mounted locally that is
691 used to fetch the file on a local partition. 793 used to fetch the file on a local partition.
692 """ 794 """
795
693 def __init__(self, base_path): 796 def __init__(self, base_path):
694 super(FileSystem, self).__init__() 797 super(FileSystem, self).__init__()
695 self.base_path = base_path 798 self.base_path = base_path
696 799
697 def fetch(self, item, expected_size): 800 def fetch(self, digest, size):
698 assert isinstance(item, basestring) 801 assert isinstance(digest, basestring)
699 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 802 assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
700 source = os.path.join(self.base_path, item) 803 source = os.path.join(self.base_path, digest)
701 if (expected_size != UNKNOWN_FILE_SIZE and 804 if size != UNKNOWN_FILE_SIZE and not is_valid_file(source, size):
702 not is_valid_file(source, expected_size)): 805 raise IOError('Invalid file %s' % digest)
703 raise IOError('Invalid file %s' % item)
704 return file_read(source) 806 return file_read(source)
705 807
706 def push(self, item, expected_size, content_generator, push_urls=None): 808 def push(self, item, content, size):
707 assert isinstance(item, basestring) 809 assert isinstance(item, Item)
708 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE 810 assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
709 dest = os.path.join(self.base_path, item) 811 dest = os.path.join(self.base_path, item.digest)
710 total = file_write(dest, content_generator) 812 total = file_write(dest, content)
711 if expected_size != UNKNOWN_FILE_SIZE and total != expected_size: 813 if size != UNKNOWN_FILE_SIZE and total != size:
712 os.remove(dest) 814 os.remove(dest)
713 raise IOError( 815 raise IOError('Invalid file %s, %d != %d' % (item.digest, total, size))
714 'Invalid file %s, %d != %d' % (item, total, expected_size))
715 816
716 def contains(self, files): 817 def contains(self, items):
717 return [ 818 return [
718 (filename, metadata, None) 819 item for item in items
719 for filename, metadata in files 820 if not os.path.exists(os.path.join(self.base_path, item.digest))
720 if not os.path.exists(os.path.join(self.base_path, metadata['h']))
721 ] 821 ]
722 822
723 823
724 def get_hash_algo(_namespace): 824 def get_hash_algo(_namespace):
725 """Return hash algorithm class to use when uploading to given |namespace|.""" 825 """Return hash algorithm class to use when uploading to given |namespace|."""
726 # TODO(vadimsh): Implement this at some point. 826 # TODO(vadimsh): Implement this at some point.
727 return hashlib.sha1 827 return hashlib.sha1
728 828
729 829
730 def is_namespace_with_compression(namespace): 830 def is_namespace_with_compression(namespace):
(...skipping 29 matching lines...) Expand all
760 860
761 861
762 def upload_tree(base_url, indir, infiles, namespace): 862 def upload_tree(base_url, indir, infiles, namespace):
763 """Uploads the given tree to the given url. 863 """Uploads the given tree to the given url.
764 864
765 Arguments: 865 Arguments:
766 base_url: The base url, it is assume that |base_url|/has/ can be used to 866 base_url: The base url, it is assume that |base_url|/has/ can be used to
767 query if an element was already uploaded, and |base_url|/store/ 867 query if an element was already uploaded, and |base_url|/store/
768 can be used to upload a new element. 868 can be used to upload a new element.
769 indir: Root directory the infiles are based in. 869 indir: Root directory the infiles are based in.
770 infiles: dict of files to upload files from |indir| to |base_url|. 870 infiles: dict of files to upload from |indir| to |base_url|.
771 namespace: The namespace to use on the server. 871 namespace: The namespace to use on the server.
772 """ 872 """
773 remote = get_storage_api(base_url, namespace) 873 remote = get_storage_api(base_url, namespace)
774 with Storage(remote, is_namespace_with_compression(namespace)) as storage: 874 with Storage(remote, is_namespace_with_compression(namespace)) as storage:
775 storage.upload_tree(indir, infiles) 875 storage.upload_tree(indir, infiles)
776 return 0 876 return 0
777 877
778 878
779 class MemoryCache(object): 879 class MemoryCache(object):
780 """This class is intended to be usable everywhere the Cache class is. 880 """This class is intended to be usable everywhere the Cache class is.
(...skipping 521 matching lines...) Expand 10 before | Expand all | Expand 10 after
1302 sys.stderr.write(str(e)) 1402 sys.stderr.write(str(e))
1303 sys.stderr.write('\n') 1403 sys.stderr.write('\n')
1304 return 1 1404 return 1
1305 1405
1306 1406
1307 if __name__ == '__main__': 1407 if __name__ == '__main__':
1308 fix_encoding.fix_encoding() 1408 fix_encoding.fix_encoding()
1309 tools.disable_buffering() 1409 tools.disable_buffering()
1310 colorama.init() 1410 colorama.init()
1311 sys.exit(main(sys.argv[1:])) 1411 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « no previous file | tests/isolateserver_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698