Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(263)

Side by Side Diff: appengine/swarming/server/bot_code.py

Issue 2953253003: Replace custom blob gRPC API with ByteStream (Closed)
Patch Set: Minor cleanups Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2014 The LUCI Authors. All rights reserved. 1 # Copyright 2014 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Swarming bot code. Includes bootstrap and swarming_bot.zip. 5 """Swarming bot code. Includes bootstrap and swarming_bot.zip.
6 6
7 It includes everything that is AppEngine specific. The non-GAE code is in 7 It includes everything that is AppEngine specific. The non-GAE code is in
8 bot_archive.py. 8 bot_archive.py.
9 """ 9 """
10 10
(...skipping 10 matching lines...) Expand all
21 from components import auth 21 from components import auth
22 from components import config 22 from components import config
23 from components import datastore_utils 23 from components import datastore_utils
24 from components import utils 24 from components import utils
25 from server import bot_archive 25 from server import bot_archive
26 from server import config as local_config 26 from server import config as local_config
27 27
28 28
29 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 29 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
30 30
31 MAX_MEMCACHED_SIZE_BYTES = 1000000
32 BOT_CODE_NS = 'bot_code'
31 33
32 ### Models. 34 ### Models.
33 35
34 36
35 File = collections.namedtuple('File', ('content', 'who', 'when', 'version')) 37 File = collections.namedtuple('File', ('content', 'who', 'when', 'version'))
36 38
37 39
38 class VersionedFile(ndb.Model): 40 class VersionedFile(ndb.Model):
39 """Versionned entity. 41 """Versionned entity.
40 42
(...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after
215 return version, additionals 217 return version, additionals
216 218
217 219
218 def get_swarming_bot_zip(host): 220 def get_swarming_bot_zip(host):
219 """Returns a zipped file of all the files a bot needs to run. 221 """Returns a zipped file of all the files a bot needs to run.
220 222
221 Returns: 223 Returns:
222 A string representing the zipped file's contents. 224 A string representing the zipped file's contents.
223 """ 225 """
224 version, additionals = get_bot_version(host) 226 version, additionals = get_bot_version(host)
225 content = memcache.get('code-' + version, namespace='bot_code') 227 content = get_cached_swarming_bot_zip(version)
226 if content: 228 if content:
227 logging.debug('memcached bot code %s; %d bytes', version, len(content)) 229 logging.debug('memcached bot code %s; %d bytes', version, len(content))
228 return content 230 return content
229 231
230 # Get the start bot script from the database, if present. Pass an empty 232 # Get the start bot script from the database, if present. Pass an empty
231 # file if the files isn't present. 233 # file if the files isn't present.
232 additionals = additionals or { 234 additionals = additionals or {
233 'config/bot_config.py': get_bot_config().content, 235 'config/bot_config.py': get_bot_config().content,
234 } 236 }
235 bot_dir = os.path.join(ROOT_DIR, 'swarming_bot') 237 bot_dir = os.path.join(ROOT_DIR, 'swarming_bot')
236 content, version = bot_archive.get_swarming_bot_zip( 238 content, version = bot_archive.get_swarming_bot_zip(
237 bot_dir, host, utils.get_app_version(), additionals, 239 bot_dir, host, utils.get_app_version(), additionals,
238 local_config.settings().enable_ts_monitoring) 240 local_config.settings().enable_ts_monitoring)
239 # This is immutable so not no need to set expiration time.
240 memcache.set('code-' + version, content, namespace='bot_code')
241 logging.info('generated bot code %s; %d bytes', version, len(content)) 241 logging.info('generated bot code %s; %d bytes', version, len(content))
242 cache_swarming_bot_zip(version, content)
242 return content 243 return content
243 244
244 245
246 def get_cached_swarming_bot_zip(version):
247 """Returns the bot contents if its been cached, or None if missing."""
248 # see cache_swarming_bot_zip for how the "meta" entry is set
249 meta = bot_memcache_get(version, 'meta')
250 if meta is None:
251 logging.info('memcache did not include metadata for version %s', version)
252 return None
253 num_parts, true_sig = meta.split(':')
254
255 # Get everything asynchronously. If something's missing, the hash will be
256 # wrong so no need to check that we got something from each call.
257 futures = [bot_memcache_async_get(version, 'content', p)
258 for p in range((int)(num_parts))]
Vadim Sh. 2017/06/27 19:12:49 just int(num_parts)
aludwin 2017/06/27 20:10:16 Done.
259 content = ''.join(f.get_result() for f in futures)
Vadim Sh. 2017/06/27 19:12:49 this will raise TypeError if some of memcache entr
aludwin 2017/06/27 19:23:06 Would you recommend that I explicitly wait on the
Vadim Sh. 2017/06/27 19:27:09 Explicitly waiting is better.
aludwin 2017/06/27 20:10:16 Done.
260 h = hashlib.sha256()
261 h.update(content)
262 if h.hexdigest() != true_sig:
263 logging.error('bot %s had signature %s instead of expected %s', version,
264 h.hexdigest(), true_sig)
265 return None
266 return content
267
268
269 def cache_swarming_bot_zip(version, content):
270 """Caches the bot code to memcache."""
271 h = hashlib.sha256()
272 h.update(content)
273 p = 0
274 futures = []
275 while len(content) > 0:
276 chunk_size = min(MAX_MEMCACHED_SIZE_BYTES, len(content))
277 futures.append(bot_memcache_async_set(content[0:chunk_size],
278 version, 'content', p))
279 content=content[chunk_size:]
Vadim Sh. 2017/06/27 19:12:48 nit: spaces around =
aludwin 2017/06/27 20:10:17 Done.
280 p += 1
281 meta = "%s:%s" % (p, h.hexdigest())
282 futures.append(bot_memcache_async_set(meta, version, 'meta'))
Vadim Sh. 2017/06/27 19:12:49 this entry must be set only after all chunks are s
aludwin 2017/06/27 20:10:17 Done.
283 ndb.Future.wait_all(futures)
Vadim Sh. 2017/06/27 19:12:49 wait_all doesn't check for errors. It just waits f
aludwin 2017/06/27 19:23:06 But what's the failure recovery? Or do you just wa
Vadim Sh. 2017/06/27 19:27:09 'check_success' raises exceptions in case of error
aludwin 2017/06/27 20:10:16 Done.
284 logging.info('bot %s with sig %s saved in memcached in %d chunks',
285 version, h.hexdigest(), p)
286
287
288 def bot_memcache_get(version, desc):
Vadim Sh. 2017/06/27 19:12:49 you can get rid of non-async versions and just use
aludwin 2017/06/27 20:10:17 Done.
289 """Mockable sync memcache getter."""
290 return memcache.get(bot_key(version, desc), namespace=BOT_CODE_NS)
291
292
293 def bot_memcache_async_get(version, desc, part):
Vadim Sh. 2017/06/27 19:12:48 nit: _async is usually put in the end (by ndb conv
aludwin 2017/06/27 20:10:17 Since I'm getting rid of the non-async versions, I
294 """Mockable async memcache getter."""
295 return ndb.get_context().memcache_get(bot_key(version, desc, part),
296 namespace=BOT_CODE_NS)
297
298
299 def bot_memcache_async_set(value, version, desc, part=None):
300 """Mockable async memcache setter."""
301 return ndb.get_context().memcache_set(bot_key(version, desc, part),
302 value, namespace=BOT_CODE_NS)
303
304
305 def bot_key(version, desc, part=None):
306 """Returns a memcache key for bot entries."""
307 key = 'code-%s-%s' % (version, desc)
308 if part is not None:
309 key = '%s-%d' % (key, part)
310 return key
311
312
245 ### Bootstrap token. 313 ### Bootstrap token.
246 314
247 315
248 class BootstrapToken(auth.TokenKind): 316 class BootstrapToken(auth.TokenKind):
249 expiration_sec = 3600 317 expiration_sec = 3600
250 secret_key = auth.SecretKey('bot_bootstrap_token') 318 secret_key = auth.SecretKey('bot_bootstrap_token')
251 version = 1 319 version = 1
252 320
253 321
254 def generate_bootstrap_token(): 322 def generate_bootstrap_token():
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
306 374
307 ## Config validators 375 ## Config validators
308 376
309 377
310 @config.validation.self_rule('regex:scripts/.+\\.py') 378 @config.validation.self_rule('regex:scripts/.+\\.py')
311 def _validate_scripts(content, ctx): 379 def _validate_scripts(content, ctx):
312 try: 380 try:
313 ast.parse(content) 381 ast.parse(content)
314 except (SyntaxError, TypeError) as e: 382 except (SyntaxError, TypeError) as e:
315 ctx.error('invalid %s: %s' % (ctx.path, e)) 383 ctx.error('invalid %s: %s' % (ctx.path, e))
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698