Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(41)

Side by Side Diff: appengine/swarming/server/task_queues.py

Issue 2926713004: Add support for repeated keys in TaskRequest. (Closed)
Patch Set: rebase Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/swarming/message_conversion.py ('k') | appengine/swarming/server/task_queues_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2017 The LUCI Authors. All rights reserved. 1 # Copyright 2017 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Ambient task queues generated from the actual load. 5 """Ambient task queues generated from the actual load.
6 6
7 This means that the task queues are deduced by the actual load, they are never 7 This means that the task queues are deduced by the actual load, they are never
8 explicitly defined. They are eventually deleted by a cron job once no incoming 8 explicitly defined. They are eventually deleted by a cron job once no incoming
9 task with the exact set of dimensions is triggered anymore. 9 task with the exact set of dimensions is triggered anymore.
10 10
(...skipping 220 matching lines...) Expand 10 before | Expand all | Expand 10 after
231 s.valid_until_ts = valid_until_ts 231 s.valid_until_ts = valid_until_ts
232 self.sets = [s for s in self.sets if s.valid_until_ts >= now] 232 self.sets = [s for s in self.sets if s.valid_until_ts >= now]
233 return True 233 return True
234 # It was updated already, skip storing again. 234 # It was updated already, skip storing again.
235 old = len(self.sets) 235 old = len(self.sets)
236 self.sets = [s for s in self.sets if s.valid_until_ts >= now] 236 self.sets = [s for s in self.sets if s.valid_until_ts >= now]
237 return len(self.sets) != old 237 return len(self.sets) != old
238 238
239 def match_request(self, dimensions): 239 def match_request(self, dimensions):
240 """Confirms that this instance actually stores this set.""" 240 """Confirms that this instance actually stores this set."""
241 return self._match_request_flat( 241 assert isinstance(dimensions, list), repr(dimensions)
242 u'%s:%s' % (k, v) for k, v in dimensions.iteritems()) 242 return self._match_request_flat(u'%s:%s' % (k, v) for k, v in dimensions)
243 243
244 def match_bot(self, bot_dimensions): 244 def match_bot(self, bot_dimensions):
245 """Returns the TaskDimensionsSet that matches this bot_dimensions, if any. 245 """Returns the TaskDimensionsSet that matches this bot_dimensions, if any.
246 """ 246 """
247 for s in self.sets: 247 for s in self.sets:
248 if s.match_bot(bot_dimensions): 248 if s.match_bot(bot_dimensions):
249 return s 249 return s
250 250
251 def _match_request_flat(self, dimensions_flat): 251 def _match_request_flat(self, dimensions_flat):
252 d = frozenset(dimensions_flat) 252 d = frozenset(dimensions_flat)
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after
398 id=1, parent=bot_root_key, 398 id=1, parent=bot_root_key,
399 dimensions_flat=_flatten_bot_dimensions(bot_dimensions)).put() 399 dimensions_flat=_flatten_bot_dimensions(bot_dimensions)).put()
400 memcache.set(bot_id, sorted(matches), namespace='task_queues') 400 memcache.set(bot_id, sorted(matches), namespace='task_queues')
401 finally: 401 finally:
402 logging.debug( 402 logging.debug(
403 '_rebuild_bot_cache(%s) in %.3fs. Registered for %d queues; cleaned %d', 403 '_rebuild_bot_cache(%s) in %.3fs. Registered for %d queues; cleaned %d',
404 bot_id, (utils.utcnow()-now).total_seconds(), len(matches), cleaned) 404 bot_id, (utils.utcnow()-now).total_seconds(), len(matches), cleaned)
405 405
406 406
407 def _get_task_dims_key(dimensions_hash, dimensions): 407 def _get_task_dims_key(dimensions_hash, dimensions):
408 if u'id' in dimensions: 408 """Return the ndb.Key() for the request dimensions.
409 return ndb.Key( 409
410 TaskDimensionsRoot, u'id:%s' % dimensions['id'], 410 The dimensions may have at most one 'id' key but may have multiple 'pool'. In
411 TaskDimensions, dimensions_hash) 411 the case of 'pool', return the first value in sorted order. This means that
412 return ndb.Key( 412 for a request with both 'pool:bar' and 'pool:foo', the request is rooted at
413 TaskDimensionsRoot, u'pool:%s' % dimensions['pool'], 413 'pool:bar'.
414 TaskDimensions, dimensions_hash) 414
415 Arguments:
416 - dimensions: sorted list(tuple(unicode, unicode))
417 """
418 assert isinstance(dimensions, list), repr(dimensions)
419 # Search for ID.
420 for key, value in dimensions:
421 if key == u'id':
422 return ndb.Key(
423 TaskDimensionsRoot, u'id:%s' % value, TaskDimensions, dimensions_hash)
424 # 'pool' comes after 'id'.
425 if key == u'pool':
426 return ndb.Key(
427 TaskDimensionsRoot, u'pool:%s' % value,
428 TaskDimensions, dimensions_hash)
429 raise datastore_errors.BadValueError(u'invalid request')
415 430
416 431
417 def _hash_data(data): 432 def _hash_data(data):
418 """Returns a 32 bits non-zero hash.""" 433 """Returns a 32 bits non-zero hash."""
419 assert isinstance(data, str), repr(data) 434 assert isinstance(data, str), repr(data)
420 digest = hashlib.md5(data).digest() 435 digest = hashlib.md5(data).digest()
421 # Note that 'L' means C++ unsigned long which is (usually) 32 bits and 436 # Note that 'L' means C++ unsigned long which is (usually) 32 bits and
422 # python's int is 64 bits. 437 # python's int is 64 bits.
423 return int(struct.unpack('<L', digest[:4])[0]) or 1 438 return int(struct.unpack('<L', digest[:4])[0]) or 1
424 439
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
500 raise ndb.Return(need_db_store) 515 raise ndb.Return(need_db_store)
501 516
502 517
503 ### Public APIs. 518 ### Public APIs.
504 519
505 520
506 def hash_dimensions(dimensions): 521 def hash_dimensions(dimensions):
507 """Returns a 32 bits int that is a hash of the request dimensions specified. 522 """Returns a 32 bits int that is a hash of the request dimensions specified.
508 523
509 Arguments: 524 Arguments:
510 dimensions: dict(str, str) 525 dimensions: sorted list(tuple(unicode, unicode))
511 526
512 The return value is guaranteed to be non-zero so it can be used as a key id in 527 The return value is guaranteed to be non-zero so it can be used as a key id in
513 a ndb.Key. 528 a ndb.Key.
514 """ 529 """
530 assert isinstance(dimensions, list), repr(dimensions)
515 # This horrible code is the product of micro benchmarks. 531 # This horrible code is the product of micro benchmarks.
516 data = '' 532 data = ''
517 for k, v in sorted(dimensions.items()): 533 for k, v in dimensions:
518 data += k.encode('utf8') 534 data += k.encode('utf8')
519 data += '\000' 535 data += '\000'
520 data += v.encode('utf8') 536 data += v.encode('utf8')
521 data += '\000' 537 data += '\000'
522 return _hash_data(data) 538 return _hash_data(data)
523 539
524 540
525 def assert_bot(bot_dimensions): 541 def assert_bot(bot_dimensions):
526 """Prepares BotTaskDimensions entities as needed. 542 """Prepares BotTaskDimensions entities as needed.
527 543
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
606 data = { 622 data = {
607 u'dimensions': request.properties.dimensions, 623 u'dimensions': request.properties.dimensions,
608 u'dimensions_hash': str(dimensions_hash), 624 u'dimensions_hash': str(dimensions_hash),
609 u'valid_until_ts': request.expiration_ts + _ADVANCE, 625 u'valid_until_ts': request.expiration_ts + _ADVANCE,
610 } 626 }
611 payload = utils.encode_to_json(data) 627 payload = utils.encode_to_json(data)
612 628
613 # If this task specifies an 'id' value, updates the cache inline since we know 629 # If this task specifies an 'id' value, updates the cache inline since we know
614 # there's only one bot that can run it, so it won't take long. This permits 630 # there's only one bot that can run it, so it won't take long. This permits
615 # tasks like 'terminate' tasks to execute faster. 631 # tasks like 'terminate' tasks to execute faster.
616 if request.properties.dimensions.get(u'id'): 632 if any(k == u'id' for k, _ in request.properties.dimensions):
617 rebuild_task_cache(payload) 633 rebuild_task_cache(payload)
618 return 634 return
619 635
620 # We can't use the request ID since the request was not stored yet, so embed 636 # We can't use the request ID since the request was not stored yet, so embed
621 # all the necessary information. 637 # all the necessary information.
622 url = '/internal/taskqueue/rebuild-task-cache' 638 url = '/internal/taskqueue/rebuild-task-cache'
623 if not utils.enqueue_task( 639 if not utils.enqueue_task(
624 url, queue_name='rebuild-task-cache', payload=payload): 640 url, queue_name='rebuild-task-cache', payload=payload):
625 logging.error('Failed to enqueue TaskDimensions update %x', dimensions_hash) 641 logging.error('Failed to enqueue TaskDimensions update %x', dimensions_hash)
626 # Technically we'd want to raise a endpoints.InternalServerErrorException. 642 # Technically we'd want to raise a endpoints.InternalServerErrorException.
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
681 - 'dimensions': dict of task dimensions to refresh 697 - 'dimensions': dict of task dimensions to refresh
682 - 'dimensions_hash': precalculated hash for dimensions 698 - 'dimensions_hash': precalculated hash for dimensions
683 - 'valid_until_ts': expiration_ts + _ADVANCE for how long this cache is 699 - 'valid_until_ts': expiration_ts + _ADVANCE for how long this cache is
684 valid 700 valid
685 """ 701 """
686 data = json.loads(payload) 702 data = json.loads(payload)
687 logging.debug('rebuild_task_cache(%s)', data) 703 logging.debug('rebuild_task_cache(%s)', data)
688 dimensions = data[u'dimensions'] 704 dimensions = data[u'dimensions']
689 dimensions_hash = int(data[u'dimensions_hash']) 705 dimensions_hash = int(data[u'dimensions_hash'])
690 valid_until_ts = utils.parse_datetime(data[u'valid_until_ts']) 706 valid_until_ts = utils.parse_datetime(data[u'valid_until_ts'])
691 dimensions_flat = sorted(u'%s:%s' % (k, v) for k, v in dimensions.iteritems()) 707 dimensions_flat = sorted(u'%s:%s' % (k, v) for k, v in dimensions)
692 708
693 now = utils.utcnow() 709 now = utils.utcnow()
694 updated = 0 710 updated = 0
695 viable = 0 711 viable = 0
696 try: 712 try:
697 pending = set() 713 pending = set()
698 for bot_task_key in _yield_BotTaskDimensions_keys( 714 for bot_task_key in _yield_BotTaskDimensions_keys(
699 dimensions_hash, dimensions_flat): 715 dimensions_hash, dimensions_flat):
700 viable += 1 716 viable += 1
701 future = _refresh_BotTaskDimensions( 717 future = _refresh_BotTaskDimensions(
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
758 btd_deleted += 1 774 btd_deleted += 1
759 bot_id = key.parent().id() 775 bot_id = key.parent().id()
760 memcache.delete(bot_id, namespace='task_queues') 776 memcache.delete(bot_id, namespace='task_queues')
761 logging.debug('- %d for bot %s', key.id(), bot_id) 777 logging.debug('- %d for bot %s', key.id(), bot_id)
762 finally: 778 finally:
763 logging.info( 779 logging.info(
764 'tidy_stale() in %.3fs; TaskDimensions: found %d, deleted %d; ' 780 'tidy_stale() in %.3fs; TaskDimensions: found %d, deleted %d; '
765 'BotTaskDimensions: found %d, deleted %d', 781 'BotTaskDimensions: found %d, deleted %d',
766 (utils.utcnow() - now).total_seconds(), 782 (utils.utcnow() - now).total_seconds(),
767 td_found, td_deleted, btd_found, btd_deleted) 783 td_found, td_deleted, btd_found, btd_deleted)
OLDNEW
« no previous file with comments | « appengine/swarming/message_conversion.py ('k') | appengine/swarming/server/task_queues_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698