Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1960)

Unified Diff: appengine/swarming/handlers_endpoints.py

Issue 2856733002: swarming: add transaction_id to tasks.new request
Patch Set: nits Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/swarming/handlers_endpoints.py
diff --git a/appengine/swarming/handlers_endpoints.py b/appengine/swarming/handlers_endpoints.py
index 46fb36b9018453126180accc5b96263746c0e107..4ac25743f35c4e9beaf0ce481e01a3ea27a27ecd 100644
--- a/appengine/swarming/handlers_endpoints.py
+++ b/appengine/swarming/handlers_endpoints.py
@@ -36,6 +36,9 @@ from server import task_result
from server import task_scheduler
+TRANSACTION_ID_MEMCACHE_NAMESPACE = 'tasks.new/transaction_id'
+
+
### Helper Methods
@@ -50,13 +53,13 @@ protojson.ProtoJson.decode_field = _decode_field
def get_request_and_result(task_id):
- """Provides the key and TaskRequest corresponding to a task ID.
+ """Provides the TaskRequest and result corresponding to a task ID.
Enforces the ACL for users. Allows bots all access for the moment.
Returns:
tuple(TaskRequest, result): result can be either for a TaskRunResult or a
- TaskResultSummay.
+ TaskResultSummary.
"""
try:
request_key, result_key = task_pack.get_request_and_result_keys(task_id)
@@ -70,6 +73,32 @@ def get_request_and_result(task_id):
raise endpoints.BadRequestException('%s is an invalid key.' % task_id)
+def get_request_and_result_by_transaction_id(transaction_id):
+ """Provides the TaskRequest and result corresponding to a transaction ID.
+
+ Enforces the ACL for users. Allows bots all access for the moment.
+
+ May return (None, None) if the transaction, request or result is not found.
+
+ Returns:
+ tuple(TaskRequest, result): result can be either for a TaskRunResult or a
+ TaskResultSummary.
+ """
+ task_id = memcache.get(
+ transaction_id, namespace=TRANSACTION_ID_MEMCACHE_NAMESPACE)
+ if not task_id:
+ return None, None
+ request_key, result_key = task_pack.get_request_and_result_keys(task_id)
+ request, result = ndb.get_multi((request_key, result_key))
+ if not request or not result:
+ return None, None
+ if not acl.is_bot() and not request.has_access:
+ raise endpoints.ForbiddenException(
+ '%s, created for transaction %s, is not accessible.' %
+ (task_id, transaction_id))
+ return request, result
+
+
def get_or_raise(key):
"""Returns an entity or raises an endpoints exception if it does not exist."""
result = key.get()
@@ -259,6 +288,11 @@ TaskId = endpoints.ResourceContainer(
task_id=messages.StringField(1, required=True))
+TransactionId = endpoints.ResourceContainer(
+ message_types.VoidMessage,
+ transaction_id=messages.StringField(1))
+
+
TaskIdWithPerf = endpoints.ResourceContainer(
message_types.VoidMessage,
task_id=messages.StringField(1, required=True),
@@ -356,20 +390,47 @@ class SwarmingTasksService(remote.Service):
sb = (request.properties.secret_bytes
if request.properties is not None else None)
if sb is not None:
- request.properties.secret_bytes = "HIDDEN"
+ request.properties.secret_bytes = 'HIDDEN'
logging.info('%s', request)
if sb is not None:
request.properties.secret_bytes = sb
try:
- request, secret_bytes = message_conversion.new_task_request_from_rpc(
+ request_obj, secret_bytes = message_conversion.new_task_request_from_rpc(
request, utils.utcnow())
- apply_property_defaults(request.properties)
+ apply_property_defaults(request_obj.properties)
task_request.init_new_request(
- request, acl.can_schedule_high_priority_tasks(), secret_bytes)
+ request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes)
+ except (TypeError, ValueError) as e:
+ raise endpoints.BadRequestException(e.message)
- result_summary = task_scheduler.schedule_request(request, secret_bytes)
- except (datastore_errors.BadValueError, TypeError, ValueError) as e:
+ # Check if a request was already created with the same transaction.
+ if request.transaction_id:
+ txn_request_obj, txn_result_summary = (
+ get_request_and_result_by_transaction_id(request.transaction_id))
+ if txn_request_obj and txn_result_summary:
+ todict = lambda r: r.to_dict(['created_ts', 'expiration_ts'])
+ if todict(txn_request_obj) != todict(request_obj):
+ raise endpoints.BadRequestException(
+ 'A request with different parameters was created for '
+ 'transaction %s. Send the same tasks.new request '
+ 'within the same transaction!' % request.transaction_id)
+ task_id = task_pack.pack_result_summary_key(txn_result_summary.key)
+ logging.info(
+ 'reusing existing %s for transaction %s',
+ task_id, request.transaction_id)
+ return swarming_rpcs.TaskRequestMetadata(
+ request=message_conversion.task_request_to_rpc(txn_request_obj),
+ task_id=task_id,
+ task_result=message_conversion.task_result_to_rpc(
+ txn_result_summary, False))
+
+ try:
+ result_summary = task_scheduler.schedule_request(
+ request_obj, secret_bytes,
+ memcache_key_for_task_id=request.transaction_id,
+ memcache_ns_for_task_id=TRANSACTION_ID_MEMCACHE_NAMESPACE)
+ except datastore_errors.BadValueError as e:
raise endpoints.BadRequestException(e.message)
previous_result = None
@@ -378,7 +439,7 @@ class SwarmingTasksService(remote.Service):
result_summary, False)
return swarming_rpcs.TaskRequestMetadata(
- request=message_conversion.task_request_to_rpc(request),
+ request=message_conversion.task_request_to_rpc(request_obj),
task_id=task_pack.pack_result_summary_key(result_summary.key),
task_result=previous_result)
@@ -503,6 +564,35 @@ class SwarmingTasksService(remote.Service):
@gae_ts_mon.instrument_endpoint()
@auth.endpoints_method(
+ TransactionId, swarming_rpcs.CancelResponse,
+ name='cancel_by_transaction_id',
+ path='cancel_by_transaction_id/{transaction_id}')
+ @auth.require(acl.is_bot_or_user)
+ def cancel_by_transaction_id(self, request):
+ """Cancels a task by a transaction ID.
+
+ Useful when id of a new task is unknown because the request failed.
+ The transaction ID here is the one supplied by the user when creating the
+ task.
+
+ If a bot was running the task, the bot will forcibly cancel the task.
+ """
+ logging.info('%s', request)
+
+ request_obj, result = get_request_and_result_by_transaction_id(
+ request.transaction_id)
+ if not request_obj or not result:
+ raise endpoints.NotFoundException(
+ 'request for transaction %s not found.' % request.transaction_id)
+
+ ok, was_running = task_scheduler.cancel_task(request_obj, result.key)
+ return swarming_rpcs.CancelResponse(
+ task_id=request_obj.task_id,
+ ok=ok,
+ was_running=was_running)
+
+ @gae_ts_mon.instrument_endpoint()
+ @auth.endpoints_method(
swarming_rpcs.TasksCountRequest, swarming_rpcs.TasksCount,
http_method='GET')
@auth.require(acl.is_privileged_user)

Powered by Google App Engine
This is Rietveld 408576698