Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(487)

Side by Side Diff: appengine/swarming/server/task_scheduler.py

Issue 2856733002: swarming: add transaction_id to tasks.new request
Patch Set: nits Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2014 The LUCI Authors. All rights reserved. 1 # Copyright 2014 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """High level tasks execution scheduling API. 5 """High level tasks execution scheduling API.
6 6
7 This is the interface closest to the HTTP handlers. 7 This is the interface closest to the HTTP handlers.
8 """ 8 """
9 9
10 import datetime 10 import datetime
11 import logging 11 import logging
12 import math 12 import math
13 import random 13 import random
14 14
15 from google.appengine.api import memcache
15 from google.appengine.ext import ndb 16 from google.appengine.ext import ndb
16 17
17 from components import auth 18 from components import auth
18 from components import datastore_utils 19 from components import datastore_utils
19 from components import pubsub 20 from components import pubsub
20 from components import utils 21 from components import utils
21 22
22 import event_mon_metrics 23 import event_mon_metrics
23 import ts_mon_metrics 24 import ts_mon_metrics
24 25
(...skipping 442 matching lines...) Expand 10 before | Expand all | Expand 10 after
467 468
468 # If the user provided a max then use it, otherwise 469 # If the user provided a max then use it, otherwise
469 # use default 470 # use default
470 max_wait = config.settings().max_bot_sleep_time 471 max_wait = config.settings().max_bot_sleep_time
471 if not max_wait: 472 if not max_wait:
472 # Enforces more frequent polls on staging. 473 # Enforces more frequent polls on staging.
473 max_wait = 3. if utils.is_dev() else 60. 474 max_wait = 3. if utils.is_dev() else 60.
474 return min(max_wait, math.pow(1.5, min(attempt_num, 10) + 1)) 475 return min(max_wait, math.pow(1.5, min(attempt_num, 10) + 1))
475 476
476 477
477 def schedule_request(request, secret_bytes, check_acls=True): 478 def schedule_request(
479 request, secret_bytes, check_acls=True,
480 memcache_key_for_task_id=None, memcache_ns_for_task_id=None):
478 """Creates and stores all the entities to schedule a new task request. 481 """Creates and stores all the entities to schedule a new task request.
479 482
480 Checks ACLs first. Raises auth.AuthorizationError if caller is not authorized 483 Checks ACLs first. Raises auth.AuthorizationError if caller is not authorized
481 to post this request. 484 to post this request.
482 485
483 The number of entities created is 3: TaskRequest, TaskToRun and 486 The number of entities created is 3: TaskRequest, TaskToRun and
484 TaskResultSummary. 487 TaskResultSummary. If memcache_key_for_task_id is not None, sets it to the
488 task_id right after creation.
485 489
486 All 4 entities in the same entity group (TaskReqest, TaskToRun, 490 All 4 entities in the same entity group (TaskReqest, TaskToRun,
487 TaskResultSummary, SecretBytes) are saved as a DB transaction. 491 TaskResultSummary, SecretBytes) are saved as a DB transaction.
488 492
489 Arguments: 493 Arguments:
490 - request: TaskRequest entity to be saved in the DB. It's key must not be set 494 - request: TaskRequest entity to be saved in the DB. It's key must not be set
491 and the entity must not be saved in the DB yet. 495 and the entity must not be saved in the DB yet.
492 - secret_bytes: SecretBytes entity to be saved in the DB. It's key will be set 496 - secret_bytes: SecretBytes entity to be saved in the DB. It's key will be set
493 and the entity will be stored by this function. None is allowed if 497 and the entity will be stored by this function. None is allowed if
494 there are no SecretBytes for this task. 498 there are no SecretBytes for this task.
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
559 'New request %s reusing %s', result_summary.task_id, 563 'New request %s reusing %s', result_summary.task_id,
560 dupe_summary.task_id) 564 dupe_summary.task_id)
561 deduped = True 565 deduped = True
562 566
563 if not deduped: 567 if not deduped:
564 # Storing these entities makes this task live. It is important at this point 568 # Storing these entities makes this task live. It is important at this point
565 # that the HTTP handler returns as fast as possible, otherwise the task will 569 # that the HTTP handler returns as fast as possible, otherwise the task will
566 # be run but the client will not know about it. 570 # be run but the client will not know about it.
567 datastore_utils.insert(request, get_new_keys, 571 datastore_utils.insert(request, get_new_keys,
568 extra=filter(bool, [task, result_summary, secret_bytes])) 572 extra=filter(bool, [task, result_summary, secret_bytes]))
573 if memcache_key_for_task_id:
574 memcache.set(
575 memcache_key_for_task_id, result_summary.task_id,
576 namespace=memcache_ns_for_task_id)
569 logging.debug('New request %s', result_summary.task_id) 577 logging.debug('New request %s', result_summary.task_id)
570 578
571 # Get parent task details if applicable. 579 # Get parent task details if applicable.
572 if request.parent_task_id: 580 if request.parent_task_id:
573 parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id) 581 parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id)
574 parent_task_keys = [ 582 parent_task_keys = [
575 parent_run_key, 583 parent_run_key,
576 task_pack.run_result_key_to_result_summary_key(parent_run_key), 584 task_pack.run_result_key_to_result_summary_key(parent_run_key),
577 ] 585 ]
578 586
(...skipping 429 matching lines...) Expand 10 before | Expand all | Expand 10 after
1008 ## Task queue tasks. 1016 ## Task queue tasks.
1009 1017
1010 1018
1011 def task_handle_pubsub_task(payload): 1019 def task_handle_pubsub_task(payload):
1012 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" 1020 """Handles task enqueued by _maybe_pubsub_notify_via_tq."""
1013 # Do not catch errors to trigger task queue task retry. Errors should not 1021 # Do not catch errors to trigger task queue task retry. Errors should not
1014 # happen in normal case. 1022 # happen in normal case.
1015 _pubsub_notify( 1023 _pubsub_notify(
1016 payload['task_id'], payload['topic'], 1024 payload['task_id'], payload['topic'],
1017 payload['auth_token'], payload['userdata']) 1025 payload['auth_token'], payload['userdata'])
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698