Index: appengine/swarming/handlers_endpoints.py |
diff --git a/appengine/swarming/handlers_endpoints.py b/appengine/swarming/handlers_endpoints.py |
index 46fb36b9018453126180accc5b96263746c0e107..4ac25743f35c4e9beaf0ce481e01a3ea27a27ecd 100644 |
--- a/appengine/swarming/handlers_endpoints.py |
+++ b/appengine/swarming/handlers_endpoints.py |
@@ -36,6 +36,9 @@ from server import task_result |
from server import task_scheduler |
+TRANSACTION_ID_MEMCACHE_NAMESPACE = 'tasks.new/transaction_id' |
+ |
+ |
### Helper Methods |
@@ -50,13 +53,13 @@ protojson.ProtoJson.decode_field = _decode_field |
def get_request_and_result(task_id): |
- """Provides the key and TaskRequest corresponding to a task ID. |
+ """Provides the TaskRequest and result corresponding to a task ID. |
Enforces the ACL for users. Allows bots all access for the moment. |
Returns: |
tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
- TaskResultSummay. |
+ TaskResultSummary. |
""" |
try: |
request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
@@ -70,6 +73,32 @@ def get_request_and_result(task_id): |
raise endpoints.BadRequestException('%s is an invalid key.' % task_id) |
+def get_request_and_result_by_transaction_id(transaction_id): |
+ """Provides the TaskRequest and result corresponding to a transaction ID. |
+ |
+ Enforces the ACL for users. Allows bots all access for the moment. |
+ |
+ May return (None, None) if the transaction, request or result is not found. |
+ |
+ Returns: |
+ tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
+ TaskResultSummary. |
+ """ |
+ task_id = memcache.get( |
+ transaction_id, namespace=TRANSACTION_ID_MEMCACHE_NAMESPACE) |
+ if not task_id: |
+ return None, None |
+ request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
+ request, result = ndb.get_multi((request_key, result_key)) |
+ if not request or not result: |
+ return None, None |
+ if not acl.is_bot() and not request.has_access: |
+ raise endpoints.ForbiddenException( |
+ '%s, created for transaction %s, is not accessible.' % |
+ (task_id, transaction_id)) |
+ return request, result |
+ |
+ |
def get_or_raise(key): |
"""Returns an entity or raises an endpoints exception if it does not exist.""" |
result = key.get() |
@@ -259,6 +288,11 @@ TaskId = endpoints.ResourceContainer( |
task_id=messages.StringField(1, required=True)) |
+TransactionId = endpoints.ResourceContainer( |
+ message_types.VoidMessage, |
+ transaction_id=messages.StringField(1)) |
+ |
+ |
TaskIdWithPerf = endpoints.ResourceContainer( |
message_types.VoidMessage, |
task_id=messages.StringField(1, required=True), |
@@ -356,20 +390,47 @@ class SwarmingTasksService(remote.Service): |
sb = (request.properties.secret_bytes |
if request.properties is not None else None) |
if sb is not None: |
- request.properties.secret_bytes = "HIDDEN" |
+ request.properties.secret_bytes = 'HIDDEN' |
logging.info('%s', request) |
if sb is not None: |
request.properties.secret_bytes = sb |
try: |
- request, secret_bytes = message_conversion.new_task_request_from_rpc( |
+ request_obj, secret_bytes = message_conversion.new_task_request_from_rpc( |
request, utils.utcnow()) |
- apply_property_defaults(request.properties) |
+ apply_property_defaults(request_obj.properties) |
task_request.init_new_request( |
- request, acl.can_schedule_high_priority_tasks(), secret_bytes) |
+ request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes) |
+ except (TypeError, ValueError) as e: |
+ raise endpoints.BadRequestException(e.message) |
- result_summary = task_scheduler.schedule_request(request, secret_bytes) |
- except (datastore_errors.BadValueError, TypeError, ValueError) as e: |
+ # Check if a request was already created with the same transaction. |
+ if request.transaction_id: |
+ txn_request_obj, txn_result_summary = ( |
+ get_request_and_result_by_transaction_id(request.transaction_id)) |
+ if txn_request_obj and txn_result_summary: |
+ todict = lambda r: r.to_dict(['created_ts', 'expiration_ts']) |
+ if todict(txn_request_obj) != todict(request_obj): |
+ raise endpoints.BadRequestException( |
+ 'A request with different parameters was created for ' |
+ 'transaction %s. Send the same tasks.new request ' |
+ 'within the same transaction!' % request.transaction_id) |
+ task_id = task_pack.pack_result_summary_key(txn_result_summary.key) |
+ logging.info( |
+ 'reusing existing %s for transaction %s', |
+ task_id, request.transaction_id) |
+ return swarming_rpcs.TaskRequestMetadata( |
+ request=message_conversion.task_request_to_rpc(txn_request_obj), |
+ task_id=task_id, |
+ task_result=message_conversion.task_result_to_rpc( |
+ txn_result_summary, False)) |
+ |
+ try: |
+ result_summary = task_scheduler.schedule_request( |
+ request_obj, secret_bytes, |
+ memcache_key_for_task_id=request.transaction_id, |
+ memcache_ns_for_task_id=TRANSACTION_ID_MEMCACHE_NAMESPACE) |
+ except datastore_errors.BadValueError as e: |
raise endpoints.BadRequestException(e.message) |
previous_result = None |
@@ -378,7 +439,7 @@ class SwarmingTasksService(remote.Service): |
result_summary, False) |
return swarming_rpcs.TaskRequestMetadata( |
- request=message_conversion.task_request_to_rpc(request), |
+ request=message_conversion.task_request_to_rpc(request_obj), |
task_id=task_pack.pack_result_summary_key(result_summary.key), |
task_result=previous_result) |
@@ -503,6 +564,35 @@ class SwarmingTasksService(remote.Service): |
@gae_ts_mon.instrument_endpoint() |
@auth.endpoints_method( |
+ TransactionId, swarming_rpcs.CancelResponse, |
+ name='cancel_by_transaction_id', |
+ path='cancel_by_transaction_id/{transaction_id}') |
+ @auth.require(acl.is_bot_or_user) |
+ def cancel_by_transaction_id(self, request): |
+ """Cancels a task by a transaction ID. |
+ |
+ Useful when id of a new task is unknown because the request failed. |
+ The transaction ID here is the one supplied by the user when creating the |
+ task. |
+ |
+ If a bot was running the task, the bot will forcibly cancel the task. |
+ """ |
+ logging.info('%s', request) |
+ |
+ request_obj, result = get_request_and_result_by_transaction_id( |
+ request.transaction_id) |
+ if not request_obj or not result: |
+ raise endpoints.NotFoundException( |
+ 'request for transaction %s not found.' % request.transaction_id) |
+ |
+ ok, was_running = task_scheduler.cancel_task(request_obj, result.key) |
+ return swarming_rpcs.CancelResponse( |
+ task_id=request_obj.task_id, |
+ ok=ok, |
+ was_running=was_running) |
+ |
+ @gae_ts_mon.instrument_endpoint() |
+ @auth.endpoints_method( |
swarming_rpcs.TasksCountRequest, swarming_rpcs.TasksCount, |
http_method='GET') |
@auth.require(acl.is_privileged_user) |