| OLD | NEW |
| 1 # Copyright 2017 The LUCI Authors. All rights reserved. | 1 # Copyright 2017 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Ambient task queues generated from the actual load. | 5 """Ambient task queues generated from the actual load. |
| 6 | 6 |
| 7 This means that the task queues are deduced by the actual load, they are never | 7 This means that the task queues are deduced by the actual load, they are never |
| 8 explicitly defined. They are eventually deleted by a cron job once no incoming | 8 explicitly defined. They are eventually deleted by a cron job once no incoming |
| 9 task with the exact set of dimensions is triggered anymore. | 9 task with the exact set of dimensions is triggered anymore. |
| 10 | 10 |
| (...skipping 220 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 231 s.valid_until_ts = valid_until_ts | 231 s.valid_until_ts = valid_until_ts |
| 232 self.sets = [s for s in self.sets if s.valid_until_ts >= now] | 232 self.sets = [s for s in self.sets if s.valid_until_ts >= now] |
| 233 return True | 233 return True |
| 234 # It was updated already, skip storing again. | 234 # It was updated already, skip storing again. |
| 235 old = len(self.sets) | 235 old = len(self.sets) |
| 236 self.sets = [s for s in self.sets if s.valid_until_ts >= now] | 236 self.sets = [s for s in self.sets if s.valid_until_ts >= now] |
| 237 return len(self.sets) != old | 237 return len(self.sets) != old |
| 238 | 238 |
| 239 def match_request(self, dimensions): | 239 def match_request(self, dimensions): |
| 240 """Confirms that this instance actually stores this set.""" | 240 """Confirms that this instance actually stores this set.""" |
| 241 return self._match_request_flat( | 241 assert isinstance(dimensions, list), repr(dimensions) |
| 242 u'%s:%s' % (k, v) for k, v in dimensions.iteritems()) | 242 return self._match_request_flat(u'%s:%s' % (k, v) for k, v in dimensions) |
| 243 | 243 |
| 244 def match_bot(self, bot_dimensions): | 244 def match_bot(self, bot_dimensions): |
| 245 """Returns the TaskDimensionsSet that matches this bot_dimensions, if any. | 245 """Returns the TaskDimensionsSet that matches this bot_dimensions, if any. |
| 246 """ | 246 """ |
| 247 for s in self.sets: | 247 for s in self.sets: |
| 248 if s.match_bot(bot_dimensions): | 248 if s.match_bot(bot_dimensions): |
| 249 return s | 249 return s |
| 250 | 250 |
| 251 def _match_request_flat(self, dimensions_flat): | 251 def _match_request_flat(self, dimensions_flat): |
| 252 d = frozenset(dimensions_flat) | 252 d = frozenset(dimensions_flat) |
| (...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 398 id=1, parent=bot_root_key, | 398 id=1, parent=bot_root_key, |
| 399 dimensions_flat=_flatten_bot_dimensions(bot_dimensions)).put() | 399 dimensions_flat=_flatten_bot_dimensions(bot_dimensions)).put() |
| 400 memcache.set(bot_id, sorted(matches), namespace='task_queues') | 400 memcache.set(bot_id, sorted(matches), namespace='task_queues') |
| 401 finally: | 401 finally: |
| 402 logging.debug( | 402 logging.debug( |
| 403 '_rebuild_bot_cache(%s) in %.3fs. Registered for %d queues; cleaned %d', | 403 '_rebuild_bot_cache(%s) in %.3fs. Registered for %d queues; cleaned %d', |
| 404 bot_id, (utils.utcnow()-now).total_seconds(), len(matches), cleaned) | 404 bot_id, (utils.utcnow()-now).total_seconds(), len(matches), cleaned) |
| 405 | 405 |
| 406 | 406 |
| 407 def _get_task_dims_key(dimensions_hash, dimensions): | 407 def _get_task_dims_key(dimensions_hash, dimensions): |
| 408 if u'id' in dimensions: | 408 """Return the ndb.Key() for the request dimensions. |
| 409 return ndb.Key( | 409 |
| 410 TaskDimensionsRoot, u'id:%s' % dimensions['id'], | 410 The dimensions may have at most one 'id' key but may have multiple 'pool'. In |
| 411 TaskDimensions, dimensions_hash) | 411 the case of 'pool', return the first value in sorted order. This means that |
| 412 return ndb.Key( | 412 for a request with both 'pool:bar' and 'pool:foo', the request is rooted at |
| 413 TaskDimensionsRoot, u'pool:%s' % dimensions['pool'], | 413 'pool:bar'. |
| 414 TaskDimensions, dimensions_hash) | 414 |
| 415 Arguments: |
| 416 - dimensions: sorted list(tuple(unicode, unicode)) |
| 417 """ |
| 418 assert isinstance(dimensions, list), repr(dimensions) |
| 419 # Search for ID. |
| 420 for key, value in dimensions: |
| 421 if key == u'id': |
| 422 return ndb.Key( |
| 423 TaskDimensionsRoot, u'id:%s' % value, TaskDimensions, dimensions_hash) |
| 424 # 'pool' comes after 'id'. |
| 425 if key == u'pool': |
| 426 return ndb.Key( |
| 427 TaskDimensionsRoot, u'pool:%s' % value, |
| 428 TaskDimensions, dimensions_hash) |
| 429 raise datastore_errors.BadValueError(u'invalid request') |
| 415 | 430 |
| 416 | 431 |
| 417 def _hash_data(data): | 432 def _hash_data(data): |
| 418 """Returns a 32 bits non-zero hash.""" | 433 """Returns a 32 bits non-zero hash.""" |
| 419 assert isinstance(data, str), repr(data) | 434 assert isinstance(data, str), repr(data) |
| 420 digest = hashlib.md5(data).digest() | 435 digest = hashlib.md5(data).digest() |
| 421 # Note that 'L' means C++ unsigned long which is (usually) 32 bits and | 436 # Note that 'L' means C++ unsigned long which is (usually) 32 bits and |
| 422 # python's int is 64 bits. | 437 # python's int is 64 bits. |
| 423 return int(struct.unpack('<L', digest[:4])[0]) or 1 | 438 return int(struct.unpack('<L', digest[:4])[0]) or 1 |
| 424 | 439 |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 500 raise ndb.Return(need_db_store) | 515 raise ndb.Return(need_db_store) |
| 501 | 516 |
| 502 | 517 |
| 503 ### Public APIs. | 518 ### Public APIs. |
| 504 | 519 |
| 505 | 520 |
| 506 def hash_dimensions(dimensions): | 521 def hash_dimensions(dimensions): |
| 507 """Returns a 32 bits int that is a hash of the request dimensions specified. | 522 """Returns a 32 bits int that is a hash of the request dimensions specified. |
| 508 | 523 |
| 509 Arguments: | 524 Arguments: |
| 510 dimensions: dict(str, str) | 525 dimensions: sorted list(tuple(unicode, unicode)) |
| 511 | 526 |
| 512 The return value is guaranteed to be non-zero so it can be used as a key id in | 527 The return value is guaranteed to be non-zero so it can be used as a key id in |
| 513 a ndb.Key. | 528 a ndb.Key. |
| 514 """ | 529 """ |
| 530 assert isinstance(dimensions, list), repr(dimensions) |
| 515 # This horrible code is the product of micro benchmarks. | 531 # This horrible code is the product of micro benchmarks. |
| 516 data = '' | 532 data = '' |
| 517 for k, v in sorted(dimensions.items()): | 533 for k, v in dimensions: |
| 518 data += k.encode('utf8') | 534 data += k.encode('utf8') |
| 519 data += '\000' | 535 data += '\000' |
| 520 data += v.encode('utf8') | 536 data += v.encode('utf8') |
| 521 data += '\000' | 537 data += '\000' |
| 522 return _hash_data(data) | 538 return _hash_data(data) |
| 523 | 539 |
| 524 | 540 |
| 525 def assert_bot(bot_dimensions): | 541 def assert_bot(bot_dimensions): |
| 526 """Prepares BotTaskDimensions entities as needed. | 542 """Prepares BotTaskDimensions entities as needed. |
| 527 | 543 |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 606 data = { | 622 data = { |
| 607 u'dimensions': request.properties.dimensions, | 623 u'dimensions': request.properties.dimensions, |
| 608 u'dimensions_hash': str(dimensions_hash), | 624 u'dimensions_hash': str(dimensions_hash), |
| 609 u'valid_until_ts': request.expiration_ts + _ADVANCE, | 625 u'valid_until_ts': request.expiration_ts + _ADVANCE, |
| 610 } | 626 } |
| 611 payload = utils.encode_to_json(data) | 627 payload = utils.encode_to_json(data) |
| 612 | 628 |
| 613 # If this task specifies an 'id' value, updates the cache inline since we know | 629 # If this task specifies an 'id' value, updates the cache inline since we know |
| 614 # there's only one bot that can run it, so it won't take long. This permits | 630 # there's only one bot that can run it, so it won't take long. This permits |
| 615 # tasks like 'terminate' tasks to execute faster. | 631 # tasks like 'terminate' tasks to execute faster. |
| 616 if request.properties.dimensions.get(u'id'): | 632 if any(k == u'id' for k, _ in request.properties.dimensions): |
| 617 rebuild_task_cache(payload) | 633 rebuild_task_cache(payload) |
| 618 return | 634 return |
| 619 | 635 |
| 620 # We can't use the request ID since the request was not stored yet, so embed | 636 # We can't use the request ID since the request was not stored yet, so embed |
| 621 # all the necessary information. | 637 # all the necessary information. |
| 622 url = '/internal/taskqueue/rebuild-task-cache' | 638 url = '/internal/taskqueue/rebuild-task-cache' |
| 623 if not utils.enqueue_task( | 639 if not utils.enqueue_task( |
| 624 url, queue_name='rebuild-task-cache', payload=payload): | 640 url, queue_name='rebuild-task-cache', payload=payload): |
| 625 logging.error('Failed to enqueue TaskDimensions update %x', dimensions_hash) | 641 logging.error('Failed to enqueue TaskDimensions update %x', dimensions_hash) |
| 626 # Technically we'd want to raise a endpoints.InternalServerErrorException. | 642 # Technically we'd want to raise a endpoints.InternalServerErrorException. |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 681 - 'dimensions': dict of task dimensions to refresh | 697 - 'dimensions': dict of task dimensions to refresh |
| 682 - 'dimensions_hash': precalculated hash for dimensions | 698 - 'dimensions_hash': precalculated hash for dimensions |
| 683 - 'valid_until_ts': expiration_ts + _ADVANCE for how long this cache is | 699 - 'valid_until_ts': expiration_ts + _ADVANCE for how long this cache is |
| 684 valid | 700 valid |
| 685 """ | 701 """ |
| 686 data = json.loads(payload) | 702 data = json.loads(payload) |
| 687 logging.debug('rebuild_task_cache(%s)', data) | 703 logging.debug('rebuild_task_cache(%s)', data) |
| 688 dimensions = data[u'dimensions'] | 704 dimensions = data[u'dimensions'] |
| 689 dimensions_hash = int(data[u'dimensions_hash']) | 705 dimensions_hash = int(data[u'dimensions_hash']) |
| 690 valid_until_ts = utils.parse_datetime(data[u'valid_until_ts']) | 706 valid_until_ts = utils.parse_datetime(data[u'valid_until_ts']) |
| 691 dimensions_flat = sorted(u'%s:%s' % (k, v) for k, v in dimensions.iteritems()) | 707 dimensions_flat = sorted(u'%s:%s' % (k, v) for k, v in dimensions) |
| 692 | 708 |
| 693 now = utils.utcnow() | 709 now = utils.utcnow() |
| 694 updated = 0 | 710 updated = 0 |
| 695 viable = 0 | 711 viable = 0 |
| 696 try: | 712 try: |
| 697 pending = set() | 713 pending = set() |
| 698 for bot_task_key in _yield_BotTaskDimensions_keys( | 714 for bot_task_key in _yield_BotTaskDimensions_keys( |
| 699 dimensions_hash, dimensions_flat): | 715 dimensions_hash, dimensions_flat): |
| 700 viable += 1 | 716 viable += 1 |
| 701 future = _refresh_BotTaskDimensions( | 717 future = _refresh_BotTaskDimensions( |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 758 btd_deleted += 1 | 774 btd_deleted += 1 |
| 759 bot_id = key.parent().id() | 775 bot_id = key.parent().id() |
| 760 memcache.delete(bot_id, namespace='task_queues') | 776 memcache.delete(bot_id, namespace='task_queues') |
| 761 logging.debug('- %d for bot %s', key.id(), bot_id) | 777 logging.debug('- %d for bot %s', key.id(), bot_id) |
| 762 finally: | 778 finally: |
| 763 logging.info( | 779 logging.info( |
| 764 'tidy_stale() in %.3fs; TaskDimensions: found %d, deleted %d; ' | 780 'tidy_stale() in %.3fs; TaskDimensions: found %d, deleted %d; ' |
| 765 'BotTaskDimensions: found %d, deleted %d', | 781 'BotTaskDimensions: found %d, deleted %d', |
| 766 (utils.utcnow() - now).total_seconds(), | 782 (utils.utcnow() - now).total_seconds(), |
| 767 td_found, td_deleted, btd_found, btd_deleted) | 783 td_found, td_deleted, btd_found, btd_deleted) |
| OLD | NEW |