| OLD | NEW |
| 1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
| 6 | 6 |
| 7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
| 8 """ | 8 """ |
| 9 | 9 |
| 10 import datetime | 10 import datetime |
| 11 import logging | 11 import logging |
| 12 import math | 12 import math |
| 13 import random | 13 import random |
| 14 | 14 |
| 15 from google.appengine.api import memcache |
| 15 from google.appengine.ext import ndb | 16 from google.appengine.ext import ndb |
| 16 | 17 |
| 17 from components import auth | 18 from components import auth |
| 18 from components import datastore_utils | 19 from components import datastore_utils |
| 19 from components import pubsub | 20 from components import pubsub |
| 20 from components import utils | 21 from components import utils |
| 21 | 22 |
| 22 import event_mon_metrics | 23 import event_mon_metrics |
| 23 import ts_mon_metrics | 24 import ts_mon_metrics |
| 24 | 25 |
| (...skipping 442 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 467 | 468 |
| 468 # If the user provided a max then use it, otherwise | 469 # If the user provided a max then use it, otherwise |
| 469 # use default | 470 # use default |
| 470 max_wait = config.settings().max_bot_sleep_time | 471 max_wait = config.settings().max_bot_sleep_time |
| 471 if not max_wait: | 472 if not max_wait: |
| 472 # Enforces more frequent polls on staging. | 473 # Enforces more frequent polls on staging. |
| 473 max_wait = 3. if utils.is_dev() else 60. | 474 max_wait = 3. if utils.is_dev() else 60. |
| 474 return min(max_wait, math.pow(1.5, min(attempt_num, 10) + 1)) | 475 return min(max_wait, math.pow(1.5, min(attempt_num, 10) + 1)) |
| 475 | 476 |
| 476 | 477 |
| 477 def schedule_request(request, secret_bytes, check_acls=True): | 478 def schedule_request( |
| 479 request, secret_bytes, check_acls=True, |
| 480 memcache_key_for_task_id=None, memcache_ns_for_task_id=None): |
| 478 """Creates and stores all the entities to schedule a new task request. | 481 """Creates and stores all the entities to schedule a new task request. |
| 479 | 482 |
| 480 Checks ACLs first. Raises auth.AuthorizationError if caller is not authorized | 483 Checks ACLs first. Raises auth.AuthorizationError if caller is not authorized |
| 481 to post this request. | 484 to post this request. |
| 482 | 485 |
| 483 The number of entities created is 3: TaskRequest, TaskToRun and | 486 The number of entities created is 3: TaskRequest, TaskToRun and |
| 484 TaskResultSummary. | 487 TaskResultSummary. If memcache_key_for_task_id is not None, sets it to the |
| 488 task_id right after creation. |
| 485 | 489 |
| 486 All 4 entities in the same entity group (TaskReqest, TaskToRun, | 490 All 4 entities in the same entity group (TaskReqest, TaskToRun, |
| 487 TaskResultSummary, SecretBytes) are saved as a DB transaction. | 491 TaskResultSummary, SecretBytes) are saved as a DB transaction. |
| 488 | 492 |
| 489 Arguments: | 493 Arguments: |
| 490 - request: TaskRequest entity to be saved in the DB. It's key must not be set | 494 - request: TaskRequest entity to be saved in the DB. It's key must not be set |
| 491 and the entity must not be saved in the DB yet. | 495 and the entity must not be saved in the DB yet. |
| 492 - secret_bytes: SecretBytes entity to be saved in the DB. It's key will be set | 496 - secret_bytes: SecretBytes entity to be saved in the DB. It's key will be set |
| 493 and the entity will be stored by this function. None is allowed if | 497 and the entity will be stored by this function. None is allowed if |
| 494 there are no SecretBytes for this task. | 498 there are no SecretBytes for this task. |
| (...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 559 'New request %s reusing %s', result_summary.task_id, | 563 'New request %s reusing %s', result_summary.task_id, |
| 560 dupe_summary.task_id) | 564 dupe_summary.task_id) |
| 561 deduped = True | 565 deduped = True |
| 562 | 566 |
| 563 if not deduped: | 567 if not deduped: |
| 564 # Storing these entities makes this task live. It is important at this point | 568 # Storing these entities makes this task live. It is important at this point |
| 565 # that the HTTP handler returns as fast as possible, otherwise the task will | 569 # that the HTTP handler returns as fast as possible, otherwise the task will |
| 566 # be run but the client will not know about it. | 570 # be run but the client will not know about it. |
| 567 datastore_utils.insert(request, get_new_keys, | 571 datastore_utils.insert(request, get_new_keys, |
| 568 extra=filter(bool, [task, result_summary, secret_bytes])) | 572 extra=filter(bool, [task, result_summary, secret_bytes])) |
| 573 if memcache_key_for_task_id: |
| 574 memcache.set( |
| 575 memcache_key_for_task_id, result_summary.task_id, |
| 576 namespace=memcache_ns_for_task_id) |
| 569 logging.debug('New request %s', result_summary.task_id) | 577 logging.debug('New request %s', result_summary.task_id) |
| 570 | 578 |
| 571 # Get parent task details if applicable. | 579 # Get parent task details if applicable. |
| 572 if request.parent_task_id: | 580 if request.parent_task_id: |
| 573 parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id) | 581 parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id) |
| 574 parent_task_keys = [ | 582 parent_task_keys = [ |
| 575 parent_run_key, | 583 parent_run_key, |
| 576 task_pack.run_result_key_to_result_summary_key(parent_run_key), | 584 task_pack.run_result_key_to_result_summary_key(parent_run_key), |
| 577 ] | 585 ] |
| 578 | 586 |
| (...skipping 429 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1008 ## Task queue tasks. | 1016 ## Task queue tasks. |
| 1009 | 1017 |
| 1010 | 1018 |
| 1011 def task_handle_pubsub_task(payload): | 1019 def task_handle_pubsub_task(payload): |
| 1012 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 1020 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
| 1013 # Do not catch errors to trigger task queue task retry. Errors should not | 1021 # Do not catch errors to trigger task queue task retry. Errors should not |
| 1014 # happen in normal case. | 1022 # happen in normal case. |
| 1015 _pubsub_notify( | 1023 _pubsub_notify( |
| 1016 payload['task_id'], payload['topic'], | 1024 payload['task_id'], payload['topic'], |
| 1017 payload['auth_token'], payload['userdata']) | 1025 payload['auth_token'], payload['userdata']) |
| OLD | NEW |