OLD | NEW |
1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
6 | 6 |
7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
8 """ | 8 """ |
9 | 9 |
10 import datetime | 10 import datetime |
11 import logging | 11 import logging |
12 import math | 12 import math |
13 import random | 13 import random |
14 | 14 |
| 15 from google.appengine.api import memcache |
15 from google.appengine.ext import ndb | 16 from google.appengine.ext import ndb |
16 | 17 |
17 from components import auth | 18 from components import auth |
18 from components import datastore_utils | 19 from components import datastore_utils |
19 from components import pubsub | 20 from components import pubsub |
20 from components import utils | 21 from components import utils |
21 | 22 |
22 import event_mon_metrics | 23 import event_mon_metrics |
23 import ts_mon_metrics | 24 import ts_mon_metrics |
24 | 25 |
(...skipping 442 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
467 | 468 |
468 # If the user provided a max then use it, otherwise | 469 # If the user provided a max then use it, otherwise |
469 # use default | 470 # use default |
470 max_wait = config.settings().max_bot_sleep_time | 471 max_wait = config.settings().max_bot_sleep_time |
471 if not max_wait: | 472 if not max_wait: |
472 # Enforces more frequent polls on staging. | 473 # Enforces more frequent polls on staging. |
473 max_wait = 3. if utils.is_dev() else 60. | 474 max_wait = 3. if utils.is_dev() else 60. |
474 return min(max_wait, math.pow(1.5, min(attempt_num, 10) + 1)) | 475 return min(max_wait, math.pow(1.5, min(attempt_num, 10) + 1)) |
475 | 476 |
476 | 477 |
477 def schedule_request(request, secret_bytes, check_acls=True): | 478 def schedule_request( |
| 479 request, secret_bytes, check_acls=True, |
| 480 memcache_key_for_task_id=None, memcache_ns_for_task_id=None): |
478 """Creates and stores all the entities to schedule a new task request. | 481 """Creates and stores all the entities to schedule a new task request. |
479 | 482 |
480 Checks ACLs first. Raises auth.AuthorizationError if caller is not authorized | 483 Checks ACLs first. Raises auth.AuthorizationError if caller is not authorized |
481 to post this request. | 484 to post this request. |
482 | 485 |
483 The number of entities created is 3: TaskRequest, TaskToRun and | 486 The number of entities created is 3: TaskRequest, TaskToRun and |
484 TaskResultSummary. | 487 TaskResultSummary. If memcache_key_for_task_id is not None, sets it to the |
| 488 task_id right after creation. |
485 | 489 |
486 All 4 entities in the same entity group (TaskReqest, TaskToRun, | 490 All 4 entities in the same entity group (TaskReqest, TaskToRun, |
487 TaskResultSummary, SecretBytes) are saved as a DB transaction. | 491 TaskResultSummary, SecretBytes) are saved as a DB transaction. |
488 | 492 |
489 Arguments: | 493 Arguments: |
490 - request: TaskRequest entity to be saved in the DB. It's key must not be set | 494 - request: TaskRequest entity to be saved in the DB. It's key must not be set |
491 and the entity must not be saved in the DB yet. | 495 and the entity must not be saved in the DB yet. |
492 - secret_bytes: SecretBytes entity to be saved in the DB. It's key will be set | 496 - secret_bytes: SecretBytes entity to be saved in the DB. It's key will be set |
493 and the entity will be stored by this function. None is allowed if | 497 and the entity will be stored by this function. None is allowed if |
494 there are no SecretBytes for this task. | 498 there are no SecretBytes for this task. |
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
559 'New request %s reusing %s', result_summary.task_id, | 563 'New request %s reusing %s', result_summary.task_id, |
560 dupe_summary.task_id) | 564 dupe_summary.task_id) |
561 deduped = True | 565 deduped = True |
562 | 566 |
563 if not deduped: | 567 if not deduped: |
564 # Storing these entities makes this task live. It is important at this point | 568 # Storing these entities makes this task live. It is important at this point |
565 # that the HTTP handler returns as fast as possible, otherwise the task will | 569 # that the HTTP handler returns as fast as possible, otherwise the task will |
566 # be run but the client will not know about it. | 570 # be run but the client will not know about it. |
567 datastore_utils.insert(request, get_new_keys, | 571 datastore_utils.insert(request, get_new_keys, |
568 extra=filter(bool, [task, result_summary, secret_bytes])) | 572 extra=filter(bool, [task, result_summary, secret_bytes])) |
| 573 if memcache_key_for_task_id: |
| 574 memcache.set( |
| 575 memcache_key_for_task_id, result_summary.task_id, |
| 576 namespace=memcache_ns_for_task_id) |
569 logging.debug('New request %s', result_summary.task_id) | 577 logging.debug('New request %s', result_summary.task_id) |
570 | 578 |
571 # Get parent task details if applicable. | 579 # Get parent task details if applicable. |
572 if request.parent_task_id: | 580 if request.parent_task_id: |
573 parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id) | 581 parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id) |
574 parent_task_keys = [ | 582 parent_task_keys = [ |
575 parent_run_key, | 583 parent_run_key, |
576 task_pack.run_result_key_to_result_summary_key(parent_run_key), | 584 task_pack.run_result_key_to_result_summary_key(parent_run_key), |
577 ] | 585 ] |
578 | 586 |
(...skipping 429 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1008 ## Task queue tasks. | 1016 ## Task queue tasks. |
1009 | 1017 |
1010 | 1018 |
1011 def task_handle_pubsub_task(payload): | 1019 def task_handle_pubsub_task(payload): |
1012 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 1020 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
1013 # Do not catch errors to trigger task queue task retry. Errors should not | 1021 # Do not catch errors to trigger task queue task retry. Errors should not |
1014 # happen in normal case. | 1022 # happen in normal case. |
1015 _pubsub_notify( | 1023 _pubsub_notify( |
1016 payload['task_id'], payload['topic'], | 1024 payload['task_id'], payload['topic'], |
1017 payload['auth_token'], payload['userdata']) | 1025 payload['auth_token'], payload['userdata']) |
OLD | NEW |