Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1639)

Unified Diff: appengine/swarming/handlers_endpoints.py

Issue 2856733002: swarming: add transaction_id to tasks.new request
Patch Set: Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/swarming/handlers_endpoints.py
diff --git a/appengine/swarming/handlers_endpoints.py b/appengine/swarming/handlers_endpoints.py
index 46fb36b9018453126180accc5b96263746c0e107..716df83e9b65a6e59f42d7ee722f4429cb1e7673 100644
--- a/appengine/swarming/handlers_endpoints.py
+++ b/appengine/swarming/handlers_endpoints.py
@@ -50,13 +50,13 @@ protojson.ProtoJson.decode_field = _decode_field
def get_request_and_result(task_id):
- """Provides the key and TaskRequest corresponding to a task ID.
+ """Provides the TaskRequest and result corresponding to a task ID.
Enforces the ACL for users. Allows bots all access for the moment.
Returns:
tuple(TaskRequest, result): result can be either for a TaskRunResult or a
- TaskResultSummay.
+ TaskResultSummary.
"""
try:
request_key, result_key = task_pack.get_request_and_result_keys(task_id)
@@ -70,6 +70,36 @@ def get_request_and_result(task_id):
raise endpoints.BadRequestException('%s is an invalid key.' % task_id)
+def get_request_and_result_by_transaction_id(transaction_id):
+ """Provides the TaskRequest and result corresponding to a transaction ID.
+
+ Enforces the ACL for users. Allows bots all access for the moment.
+
+ May return (None, None) if the transaction, request or result is not found.
+
+ Returns:
+ tuple(TaskRequest, result): result can be either for a TaskRunResult or a
+ TaskResultSummary.
+ """
+ task_id = memcache.get(transaction_id_to_memcache_key(transaction_id))
M-A Ruel 2017/05/03 01:33:20 use a namespace, it's simpler. Then no need for tr
nodir 2017/05/05 18:11:38 Done.
+ if not task_id:
+ return None, None
M-A Ruel 2017/05/03 01:33:20 This makes the contract a bit scary though. :/ I m
nodir 2017/05/05 18:11:38 maybe set timeout to 1min?
+ request_key, result_key = task_pack.get_request_and_result_keys(task_id)
+ request, result = ndb.get_multi((request_key, result_key))
+ if not request or not result:
+ return None, None
+ if not acl.is_bot() and not request.has_access:
+ raise endpoints.ForbiddenException(
+ '%s, created for transaction %s, is not accessible.' %
+ (task_id, transaction_id))
+ return request, result
+
+
+def transaction_id_to_memcache_key(transaction_id):
+ assert transaction_id
+ return 'tasks.new/txn/%s' % transaction_id
+
+
def get_or_raise(key):
"""Returns an entity or raises an endpoints exception if it does not exist."""
result = key.get()
@@ -259,6 +289,12 @@ TaskId = endpoints.ResourceContainer(
task_id=messages.StringField(1, required=True))
+TaskIdWithTransactionID = endpoints.ResourceContainer(
+ message_types.VoidMessage,
+ task_id=messages.StringField(1),
+ transaction_id=messages.StringField(2))
+
+
TaskIdWithPerf = endpoints.ResourceContainer(
message_types.VoidMessage,
task_id=messages.StringField(1, required=True),
@@ -304,7 +340,7 @@ class SwarmingTaskService(remote.Service):
@gae_ts_mon.instrument_endpoint()
@auth.endpoints_method(
- TaskId, swarming_rpcs.CancelResponse,
+ TaskIdWithTransactionID, swarming_rpcs.CancelResponse,
name='cancel',
path='{task_id}/cancel')
@auth.require(acl.is_bot_or_user)
@@ -314,7 +350,18 @@ class SwarmingTaskService(remote.Service):
If a bot was running the task, the bot will forcibly cancel the task.
"""
logging.info('%s', request)
- request_obj, result = get_request_and_result(request.task_id)
+ if request.task_id and request.transaction_id:
+ raise endpoints.BadRequestException(
+ 'only one of task_id or transaction_id must be specified')
M-A Ruel 2017/05/03 01:33:20 That can't work because path is '{task_id}/cancel'
nodir 2017/05/05 18:11:38 ugh, should I define a new API then? cancel_by_tra
M-A Ruel 2017/05/05 18:15:33 Yes I prefer a new API. If we get rid of the featu
nodir 2017/05/05 18:41:37 Done.
+ if request.task_id:
+ request_obj, result = get_request_and_result(request.task_id)
+ else:
+ request_obj, result = get_request_and_result_by_transaction_id(
+ request.transaction_id)
+ if not request_obj or not result:
+ raise endpoints.NotFoundException(
+ 'request for transaction %s not found.' % request.transaction_id)
+
ok, was_running = task_scheduler.cancel_task(request_obj, result.key)
return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running)
@@ -356,20 +403,49 @@ class SwarmingTasksService(remote.Service):
sb = (request.properties.secret_bytes
if request.properties is not None else None)
if sb is not None:
- request.properties.secret_bytes = "HIDDEN"
+ request.properties.secret_bytes = 'HIDDEN'
logging.info('%s', request)
if sb is not None:
request.properties.secret_bytes = sb
try:
- request, secret_bytes = message_conversion.new_task_request_from_rpc(
+ request_obj, secret_bytes = message_conversion.new_task_request_from_rpc(
request, utils.utcnow())
- apply_property_defaults(request.properties)
+ apply_property_defaults(request_obj.properties)
task_request.init_new_request(
- request, acl.can_schedule_high_priority_tasks(), secret_bytes)
+ request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes)
+ except (TypeError, ValueError) as e:
+ raise endpoints.BadRequestException(e.message)
- result_summary = task_scheduler.schedule_request(request, secret_bytes)
- except (datastore_errors.BadValueError, TypeError, ValueError) as e:
+ # Check if a request was already created with the same transaction.
+ transaction_memcache_key = None
+ if request.transaction_id:
+ transaction_memcache_key = transaction_id_to_memcache_key(
+ request.transaction_id)
+ txn_request_obj, txn_result_summary = (
+ get_request_and_result_by_transaction_id(request.transaction_id))
+ if txn_request_obj and txn_result_summary:
+ todict = lambda r: r.to_dict(['created_ts', 'expiration_ts'])
+ if todict(txn_request_obj) != todict(request_obj):
+ raise endpoints.BadRequestException(
+ 'A request with different parameters was created for '
+ 'transaction %s. Send the same tasks.new request '
+ 'within the same transaction!' % request.transaction_id)
+ task_id = task_pack.pack_result_summary_key(txn_result_summary.key)
+ logging.info(
+ 'reusing existing %s for transaction %s',
+ task_id, request.transaction_id)
+ return swarming_rpcs.TaskRequestMetadata(
+ request=message_conversion.task_request_to_rpc(txn_request_obj),
+ task_id=task_id,
+ task_result=message_conversion.task_result_to_rpc(
+ txn_result_summary, False))
+
+ try:
+ result_summary = task_scheduler.schedule_request(
+ request_obj, secret_bytes,
+ memcache_key_for_task_id=transaction_memcache_key)
+ except datastore_errors.BadValueError as e:
raise endpoints.BadRequestException(e.message)
previous_result = None
@@ -378,7 +454,7 @@ class SwarmingTasksService(remote.Service):
result_summary, False)
return swarming_rpcs.TaskRequestMetadata(
- request=message_conversion.task_request_to_rpc(request),
+ request=message_conversion.task_request_to_rpc(request_obj),
task_id=task_pack.pack_result_summary_key(result_summary.key),
task_result=previous_result)

Powered by Google App Engine
This is Rietveld 408576698