Index: appengine/swarming/handlers_endpoints.py |
diff --git a/appengine/swarming/handlers_endpoints.py b/appengine/swarming/handlers_endpoints.py |
index 46fb36b9018453126180accc5b96263746c0e107..716df83e9b65a6e59f42d7ee722f4429cb1e7673 100644 |
--- a/appengine/swarming/handlers_endpoints.py |
+++ b/appengine/swarming/handlers_endpoints.py |
@@ -50,13 +50,13 @@ protojson.ProtoJson.decode_field = _decode_field |
def get_request_and_result(task_id): |
- """Provides the key and TaskRequest corresponding to a task ID. |
+ """Provides the TaskRequest and result corresponding to a task ID. |
Enforces the ACL for users. Allows bots all access for the moment. |
Returns: |
tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
- TaskResultSummay. |
+ TaskResultSummary. |
""" |
try: |
request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
@@ -70,6 +70,36 @@ def get_request_and_result(task_id): |
raise endpoints.BadRequestException('%s is an invalid key.' % task_id) |
+def get_request_and_result_by_transaction_id(transaction_id): |
+ """Provides the TaskRequest and result corresponding to a transaction ID. |
+ |
+ Enforces the ACL for users. Allows bots all access for the moment. |
+ |
+ May return (None, None) if the transaction, request or result is not found. |
+ |
+ Returns: |
+ tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
+ TaskResultSummary. |
+ """ |
+ task_id = memcache.get(transaction_id_to_memcache_key(transaction_id)) |
M-A Ruel
2017/05/03 01:33:20
use a namespace, it's simpler. Then no need for tr
nodir
2017/05/05 18:11:38
Done.
|
+ if not task_id: |
+ return None, None |
M-A Ruel
2017/05/03 01:33:20
This makes the contract a bit scary though. :/ I m
nodir
2017/05/05 18:11:38
maybe set timeout to 1min?
|
+ request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
+ request, result = ndb.get_multi((request_key, result_key)) |
+ if not request or not result: |
+ return None, None |
+ if not acl.is_bot() and not request.has_access: |
+ raise endpoints.ForbiddenException( |
+ '%s, created for transaction %s, is not accessible.' % |
+ (task_id, transaction_id)) |
+ return request, result |
+ |
+ |
+def transaction_id_to_memcache_key(transaction_id): |
+ assert transaction_id |
+ return 'tasks.new/txn/%s' % transaction_id |
+ |
+ |
def get_or_raise(key): |
"""Returns an entity or raises an endpoints exception if it does not exist.""" |
result = key.get() |
@@ -259,6 +289,12 @@ TaskId = endpoints.ResourceContainer( |
task_id=messages.StringField(1, required=True)) |
+TaskIdWithTransactionID = endpoints.ResourceContainer( |
+ message_types.VoidMessage, |
+ task_id=messages.StringField(1), |
+ transaction_id=messages.StringField(2)) |
+ |
+ |
TaskIdWithPerf = endpoints.ResourceContainer( |
message_types.VoidMessage, |
task_id=messages.StringField(1, required=True), |
@@ -304,7 +340,7 @@ class SwarmingTaskService(remote.Service): |
@gae_ts_mon.instrument_endpoint() |
@auth.endpoints_method( |
- TaskId, swarming_rpcs.CancelResponse, |
+ TaskIdWithTransactionID, swarming_rpcs.CancelResponse, |
name='cancel', |
path='{task_id}/cancel') |
@auth.require(acl.is_bot_or_user) |
@@ -314,7 +350,18 @@ class SwarmingTaskService(remote.Service): |
If a bot was running the task, the bot will forcibly cancel the task. |
""" |
logging.info('%s', request) |
- request_obj, result = get_request_and_result(request.task_id) |
+ if request.task_id and request.transaction_id: |
+ raise endpoints.BadRequestException( |
+ 'only one of task_id or transaction_id must be specified') |
M-A Ruel
2017/05/03 01:33:20
That can't work because path is '{task_id}/cancel'
nodir
2017/05/05 18:11:38
ugh, should I define a new API then? cancel_by_tra
M-A Ruel
2017/05/05 18:15:33
Yes I prefer a new API. If we get rid of the featu
nodir
2017/05/05 18:41:37
Done.
|
+ if request.task_id: |
+ request_obj, result = get_request_and_result(request.task_id) |
+ else: |
+ request_obj, result = get_request_and_result_by_transaction_id( |
+ request.transaction_id) |
+ if not request_obj or not result: |
+ raise endpoints.NotFoundException( |
+ 'request for transaction %s not found.' % request.transaction_id) |
+ |
ok, was_running = task_scheduler.cancel_task(request_obj, result.key) |
return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running) |
@@ -356,20 +403,49 @@ class SwarmingTasksService(remote.Service): |
sb = (request.properties.secret_bytes |
if request.properties is not None else None) |
if sb is not None: |
- request.properties.secret_bytes = "HIDDEN" |
+ request.properties.secret_bytes = 'HIDDEN' |
logging.info('%s', request) |
if sb is not None: |
request.properties.secret_bytes = sb |
try: |
- request, secret_bytes = message_conversion.new_task_request_from_rpc( |
+ request_obj, secret_bytes = message_conversion.new_task_request_from_rpc( |
request, utils.utcnow()) |
- apply_property_defaults(request.properties) |
+ apply_property_defaults(request_obj.properties) |
task_request.init_new_request( |
- request, acl.can_schedule_high_priority_tasks(), secret_bytes) |
+ request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes) |
+ except (TypeError, ValueError) as e: |
+ raise endpoints.BadRequestException(e.message) |
- result_summary = task_scheduler.schedule_request(request, secret_bytes) |
- except (datastore_errors.BadValueError, TypeError, ValueError) as e: |
+ # Check if a request was already created with the same transaction. |
+ transaction_memcache_key = None |
+ if request.transaction_id: |
+ transaction_memcache_key = transaction_id_to_memcache_key( |
+ request.transaction_id) |
+ txn_request_obj, txn_result_summary = ( |
+ get_request_and_result_by_transaction_id(request.transaction_id)) |
+ if txn_request_obj and txn_result_summary: |
+ todict = lambda r: r.to_dict(['created_ts', 'expiration_ts']) |
+ if todict(txn_request_obj) != todict(request_obj): |
+ raise endpoints.BadRequestException( |
+ 'A request with different parameters was created for ' |
+ 'transaction %s. Send the same tasks.new request ' |
+ 'within the same transaction!' % request.transaction_id) |
+ task_id = task_pack.pack_result_summary_key(txn_result_summary.key) |
+ logging.info( |
+ 'reusing existing %s for transaction %s', |
+ task_id, request.transaction_id) |
+ return swarming_rpcs.TaskRequestMetadata( |
+ request=message_conversion.task_request_to_rpc(txn_request_obj), |
+ task_id=task_id, |
+ task_result=message_conversion.task_result_to_rpc( |
+ txn_result_summary, False)) |
+ |
+ try: |
+ result_summary = task_scheduler.schedule_request( |
+ request_obj, secret_bytes, |
+ memcache_key_for_task_id=transaction_memcache_key) |
+ except datastore_errors.BadValueError as e: |
raise endpoints.BadRequestException(e.message) |
previous_result = None |
@@ -378,7 +454,7 @@ class SwarmingTasksService(remote.Service): |
result_summary, False) |
return swarming_rpcs.TaskRequestMetadata( |
- request=message_conversion.task_request_to_rpc(request), |
+ request=message_conversion.task_request_to_rpc(request_obj), |
task_id=task_pack.pack_result_summary_key(result_summary.key), |
task_result=previous_result) |