Chromium Code Reviews| Index: appengine/swarming/server/bot_code.py |
| diff --git a/appengine/swarming/server/bot_code.py b/appengine/swarming/server/bot_code.py |
| index 81a56e0d7ad39fbc51469174e872f37d4705ad7a..eb1c7ddb256c6b3f1d68086528da76ba64c811de 100644 |
| --- a/appengine/swarming/server/bot_code.py |
| +++ b/appengine/swarming/server/bot_code.py |
| @@ -28,6 +28,8 @@ from server import config as local_config |
| ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| +MAX_MEMCACHED_SIZE_BYTES = 1000000 |
| +BOT_CODE_NS = 'bot_code' |
| ### Models. |
| @@ -222,7 +224,7 @@ def get_swarming_bot_zip(host): |
| A string representing the zipped file's contents. |
| """ |
| version, additionals = get_bot_version(host) |
| - content = memcache.get('code-' + version, namespace='bot_code') |
| + content = get_cached_swarming_bot_zip(version) |
| if content: |
| logging.debug('memcached bot code %s; %d bytes', version, len(content)) |
| return content |
| @@ -236,12 +238,78 @@ def get_swarming_bot_zip(host): |
| content, version = bot_archive.get_swarming_bot_zip( |
| bot_dir, host, utils.get_app_version(), additionals, |
| local_config.settings().enable_ts_monitoring) |
| - # This is immutable so not no need to set expiration time. |
| - memcache.set('code-' + version, content, namespace='bot_code') |
| logging.info('generated bot code %s; %d bytes', version, len(content)) |
| + cache_swarming_bot_zip(version, content) |
| return content |
| +def get_cached_swarming_bot_zip(version): |
| + """Returns the bot contents if its been cached, or None if missing.""" |
| + # see cache_swarming_bot_zip for how the "meta" entry is set |
| + meta = bot_memcache_get(version, 'meta') |
| + if meta is None: |
| + logging.info('memcache did not include metadata for version %s', version) |
| + return None |
| + num_parts, true_sig = meta.split(':') |
| + |
| + # Get everything asynchronously. If something's missing, the hash will be |
| + # wrong so no need to check that we got something from each call. |
| + futures = [bot_memcache_async_get(version, 'content', p) |
| + for p in range((int)(num_parts))] |
|
Vadim Sh.
2017/06/27 19:12:49
just int(num_parts)
aludwin
2017/06/27 20:10:16
Done.
|
| + content = ''.join(f.get_result() for f in futures) |
|
Vadim Sh.
2017/06/27 19:12:49
this will raise TypeError if some of memcache entr
aludwin
2017/06/27 19:23:06
Would you recommend that I explicitly wait on the
Vadim Sh.
2017/06/27 19:27:09
Explicitly waiting is better.
aludwin
2017/06/27 20:10:16
Done.
|
| + h = hashlib.sha256() |
| + h.update(content) |
| + if h.hexdigest() != true_sig: |
| + logging.error('bot %s had signature %s instead of expected %s', version, |
| + h.hexdigest(), true_sig) |
| + return None |
| + return content |
| + |
| + |
| +def cache_swarming_bot_zip(version, content): |
| + """Caches the bot code to memcache.""" |
| + h = hashlib.sha256() |
| + h.update(content) |
| + p = 0 |
| + futures = [] |
| + while len(content) > 0: |
| + chunk_size = min(MAX_MEMCACHED_SIZE_BYTES, len(content)) |
| + futures.append(bot_memcache_async_set(content[0:chunk_size], |
| + version, 'content', p)) |
| + content=content[chunk_size:] |
|
Vadim Sh.
2017/06/27 19:12:48
nit: spaces around =
aludwin
2017/06/27 20:10:17
Done.
|
| + p += 1 |
| + meta = "%s:%s" % (p, h.hexdigest()) |
| + futures.append(bot_memcache_async_set(meta, version, 'meta')) |
|
Vadim Sh.
2017/06/27 19:12:49
this entry must be set only after all chunks are s
aludwin
2017/06/27 20:10:17
Done.
|
| + ndb.Future.wait_all(futures) |
|
Vadim Sh.
2017/06/27 19:12:49
wait_all doesn't check for errors. It just waits f
aludwin
2017/06/27 19:23:06
But what's the failure recovery? Or do you just wa
Vadim Sh.
2017/06/27 19:27:09
'check_success' raises exceptions in case of error
aludwin
2017/06/27 20:10:16
Done.
|
| + logging.info('bot %s with sig %s saved in memcached in %d chunks', |
| + version, h.hexdigest(), p) |
| + |
| + |
| +def bot_memcache_get(version, desc): |
|
Vadim Sh.
2017/06/27 19:12:49
you can get rid of non-async versions and just use
aludwin
2017/06/27 20:10:17
Done.
|
| + """Mockable sync memcache getter.""" |
| + return memcache.get(bot_key(version, desc), namespace=BOT_CODE_NS) |
| + |
| + |
| +def bot_memcache_async_get(version, desc, part): |
|
Vadim Sh.
2017/06/27 19:12:48
nit: _async is usually put in the end (by ndb conv
aludwin
2017/06/27 20:10:17
Since I'm getting rid of the non-async versions, I
|
| + """Mockable async memcache getter.""" |
| + return ndb.get_context().memcache_get(bot_key(version, desc, part), |
| + namespace=BOT_CODE_NS) |
| + |
| + |
| +def bot_memcache_async_set(value, version, desc, part=None): |
| + """Mockable async memcache setter.""" |
| + return ndb.get_context().memcache_set(bot_key(version, desc, part), |
| + value, namespace=BOT_CODE_NS) |
| + |
| + |
| +def bot_key(version, desc, part=None): |
| + """Returns a memcache key for bot entries.""" |
| + key = 'code-%s-%s' % (version, desc) |
| + if part is not None: |
| + key = '%s-%d' % (key, part) |
| + return key |
| + |
| + |
| ### Bootstrap token. |