Chromium Code Reviews| Index: appengine/swarming/handlers_endpoints.py |
| diff --git a/appengine/swarming/handlers_endpoints.py b/appengine/swarming/handlers_endpoints.py |
| index 46fb36b9018453126180accc5b96263746c0e107..716df83e9b65a6e59f42d7ee722f4429cb1e7673 100644 |
| --- a/appengine/swarming/handlers_endpoints.py |
| +++ b/appengine/swarming/handlers_endpoints.py |
| @@ -50,13 +50,13 @@ protojson.ProtoJson.decode_field = _decode_field |
| def get_request_and_result(task_id): |
| - """Provides the key and TaskRequest corresponding to a task ID. |
| + """Provides the TaskRequest and result corresponding to a task ID. |
| Enforces the ACL for users. Allows bots all access for the moment. |
| Returns: |
| tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
| - TaskResultSummay. |
| + TaskResultSummary. |
| """ |
| try: |
| request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
| @@ -70,6 +70,36 @@ def get_request_and_result(task_id): |
| raise endpoints.BadRequestException('%s is an invalid key.' % task_id) |
| +def get_request_and_result_by_transaction_id(transaction_id): |
| + """Provides the TaskRequest and result corresponding to a transaction ID. |
| + |
| + Enforces the ACL for users. Allows bots all access for the moment. |
| + |
| + May return (None, None) if the transaction, request or result is not found. |
| + |
| + Returns: |
| + tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
| + TaskResultSummary. |
| + """ |
| + task_id = memcache.get(transaction_id_to_memcache_key(transaction_id)) |
|
M-A Ruel
2017/05/03 01:33:20
use a namespace, it's simpler. Then no need for tr
nodir
2017/05/05 18:11:38
Done.
|
| + if not task_id: |
| + return None, None |
|
M-A Ruel
2017/05/03 01:33:20
This makes the contract a bit scary though. :/ I m
nodir
2017/05/05 18:11:38
maybe set timeout to 1min?
|
| + request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
| + request, result = ndb.get_multi((request_key, result_key)) |
| + if not request or not result: |
| + return None, None |
| + if not acl.is_bot() and not request.has_access: |
| + raise endpoints.ForbiddenException( |
| + '%s, created for transaction %s, is not accessible.' % |
| + (task_id, transaction_id)) |
| + return request, result |
| + |
| + |
| +def transaction_id_to_memcache_key(transaction_id): |
| + assert transaction_id |
| + return 'tasks.new/txn/%s' % transaction_id |
| + |
| + |
| def get_or_raise(key): |
| """Returns an entity or raises an endpoints exception if it does not exist.""" |
| result = key.get() |
| @@ -259,6 +289,12 @@ TaskId = endpoints.ResourceContainer( |
| task_id=messages.StringField(1, required=True)) |
| +TaskIdWithTransactionID = endpoints.ResourceContainer( |
| + message_types.VoidMessage, |
| + task_id=messages.StringField(1), |
| + transaction_id=messages.StringField(2)) |
| + |
| + |
| TaskIdWithPerf = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| task_id=messages.StringField(1, required=True), |
| @@ -304,7 +340,7 @@ class SwarmingTaskService(remote.Service): |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| - TaskId, swarming_rpcs.CancelResponse, |
| + TaskIdWithTransactionID, swarming_rpcs.CancelResponse, |
| name='cancel', |
| path='{task_id}/cancel') |
| @auth.require(acl.is_bot_or_user) |
| @@ -314,7 +350,18 @@ class SwarmingTaskService(remote.Service): |
| If a bot was running the task, the bot will forcibly cancel the task. |
| """ |
| logging.info('%s', request) |
| - request_obj, result = get_request_and_result(request.task_id) |
| + if request.task_id and request.transaction_id: |
| + raise endpoints.BadRequestException( |
| + 'only one of task_id or transaction_id must be specified') |
|
M-A Ruel
2017/05/03 01:33:20
That can't work because path is '{task_id}/cancel'
nodir
2017/05/05 18:11:38
ugh, should I define a new API then? cancel_by_tra
M-A Ruel
2017/05/05 18:15:33
Yes I prefer a new API. If we get rid of the featu
nodir
2017/05/05 18:41:37
Done.
|
| + if request.task_id: |
| + request_obj, result = get_request_and_result(request.task_id) |
| + else: |
| + request_obj, result = get_request_and_result_by_transaction_id( |
| + request.transaction_id) |
| + if not request_obj or not result: |
| + raise endpoints.NotFoundException( |
| + 'request for transaction %s not found.' % request.transaction_id) |
| + |
| ok, was_running = task_scheduler.cancel_task(request_obj, result.key) |
| return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running) |
| @@ -356,20 +403,49 @@ class SwarmingTasksService(remote.Service): |
| sb = (request.properties.secret_bytes |
| if request.properties is not None else None) |
| if sb is not None: |
| - request.properties.secret_bytes = "HIDDEN" |
| + request.properties.secret_bytes = 'HIDDEN' |
| logging.info('%s', request) |
| if sb is not None: |
| request.properties.secret_bytes = sb |
| try: |
| - request, secret_bytes = message_conversion.new_task_request_from_rpc( |
| + request_obj, secret_bytes = message_conversion.new_task_request_from_rpc( |
| request, utils.utcnow()) |
| - apply_property_defaults(request.properties) |
| + apply_property_defaults(request_obj.properties) |
| task_request.init_new_request( |
| - request, acl.can_schedule_high_priority_tasks(), secret_bytes) |
| + request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes) |
| + except (TypeError, ValueError) as e: |
| + raise endpoints.BadRequestException(e.message) |
| - result_summary = task_scheduler.schedule_request(request, secret_bytes) |
| - except (datastore_errors.BadValueError, TypeError, ValueError) as e: |
| + # Check if a request was already created with the same transaction. |
| + transaction_memcache_key = None |
| + if request.transaction_id: |
| + transaction_memcache_key = transaction_id_to_memcache_key( |
| + request.transaction_id) |
| + txn_request_obj, txn_result_summary = ( |
| + get_request_and_result_by_transaction_id(request.transaction_id)) |
| + if txn_request_obj and txn_result_summary: |
| + todict = lambda r: r.to_dict(['created_ts', 'expiration_ts']) |
| + if todict(txn_request_obj) != todict(request_obj): |
| + raise endpoints.BadRequestException( |
| + 'A request with different parameters was created for ' |
| + 'transaction %s. Send the same tasks.new request ' |
| + 'within the same transaction!' % request.transaction_id) |
| + task_id = task_pack.pack_result_summary_key(txn_result_summary.key) |
| + logging.info( |
| + 'reusing existing %s for transaction %s', |
| + task_id, request.transaction_id) |
| + return swarming_rpcs.TaskRequestMetadata( |
| + request=message_conversion.task_request_to_rpc(txn_request_obj), |
| + task_id=task_id, |
| + task_result=message_conversion.task_result_to_rpc( |
| + txn_result_summary, False)) |
| + |
| + try: |
| + result_summary = task_scheduler.schedule_request( |
| + request_obj, secret_bytes, |
| + memcache_key_for_task_id=transaction_memcache_key) |
| + except datastore_errors.BadValueError as e: |
| raise endpoints.BadRequestException(e.message) |
| previous_result = None |
| @@ -378,7 +454,7 @@ class SwarmingTasksService(remote.Service): |
| result_summary, False) |
| return swarming_rpcs.TaskRequestMetadata( |
| - request=message_conversion.task_request_to_rpc(request), |
| + request=message_conversion.task_request_to_rpc(request_obj), |
| task_id=task_pack.pack_result_summary_key(result_summary.key), |
| task_result=previous_result) |