| Index: appengine/swarming/handlers_endpoints.py
|
| diff --git a/appengine/swarming/handlers_endpoints.py b/appengine/swarming/handlers_endpoints.py
|
| index 46fb36b9018453126180accc5b96263746c0e107..4ac25743f35c4e9beaf0ce481e01a3ea27a27ecd 100644
|
| --- a/appengine/swarming/handlers_endpoints.py
|
| +++ b/appengine/swarming/handlers_endpoints.py
|
| @@ -36,6 +36,9 @@ from server import task_result
|
| from server import task_scheduler
|
|
|
|
|
| +TRANSACTION_ID_MEMCACHE_NAMESPACE = 'tasks.new/transaction_id'
|
| +
|
| +
|
| ### Helper Methods
|
|
|
|
|
| @@ -50,13 +53,13 @@ protojson.ProtoJson.decode_field = _decode_field
|
|
|
|
|
| def get_request_and_result(task_id):
|
| - """Provides the key and TaskRequest corresponding to a task ID.
|
| + """Provides the TaskRequest and result corresponding to a task ID.
|
|
|
| Enforces the ACL for users. Allows bots all access for the moment.
|
|
|
| Returns:
|
| tuple(TaskRequest, result): result can be either for a TaskRunResult or a
|
| - TaskResultSummay.
|
| + TaskResultSummary.
|
| """
|
| try:
|
| request_key, result_key = task_pack.get_request_and_result_keys(task_id)
|
| @@ -70,6 +73,32 @@ def get_request_and_result(task_id):
|
| raise endpoints.BadRequestException('%s is an invalid key.' % task_id)
|
|
|
|
|
| +def get_request_and_result_by_transaction_id(transaction_id):
|
| + """Provides the TaskRequest and result corresponding to a transaction ID.
|
| +
|
| + Enforces the ACL for users. Allows bots all access for the moment.
|
| +
|
| + May return (None, None) if the transaction, request or result is not found.
|
| +
|
| + Returns:
|
| + tuple(TaskRequest, result): result can be either for a TaskRunResult or a
|
| + TaskResultSummary.
|
| + """
|
| + task_id = memcache.get(
|
| + transaction_id, namespace=TRANSACTION_ID_MEMCACHE_NAMESPACE)
|
| + if not task_id:
|
| + return None, None
|
| + request_key, result_key = task_pack.get_request_and_result_keys(task_id)
|
| + request, result = ndb.get_multi((request_key, result_key))
|
| + if not request or not result:
|
| + return None, None
|
| + if not acl.is_bot() and not request.has_access:
|
| + raise endpoints.ForbiddenException(
|
| + '%s, created for transaction %s, is not accessible.' %
|
| + (task_id, transaction_id))
|
| + return request, result
|
| +
|
| +
|
| def get_or_raise(key):
|
| """Returns an entity or raises an endpoints exception if it does not exist."""
|
| result = key.get()
|
| @@ -259,6 +288,11 @@ TaskId = endpoints.ResourceContainer(
|
| task_id=messages.StringField(1, required=True))
|
|
|
|
|
| +TransactionId = endpoints.ResourceContainer(
|
| + message_types.VoidMessage,
|
| + transaction_id=messages.StringField(1))
|
| +
|
| +
|
| TaskIdWithPerf = endpoints.ResourceContainer(
|
| message_types.VoidMessage,
|
| task_id=messages.StringField(1, required=True),
|
| @@ -356,20 +390,47 @@ class SwarmingTasksService(remote.Service):
|
| sb = (request.properties.secret_bytes
|
| if request.properties is not None else None)
|
| if sb is not None:
|
| - request.properties.secret_bytes = "HIDDEN"
|
| + request.properties.secret_bytes = 'HIDDEN'
|
| logging.info('%s', request)
|
| if sb is not None:
|
| request.properties.secret_bytes = sb
|
|
|
| try:
|
| - request, secret_bytes = message_conversion.new_task_request_from_rpc(
|
| + request_obj, secret_bytes = message_conversion.new_task_request_from_rpc(
|
| request, utils.utcnow())
|
| - apply_property_defaults(request.properties)
|
| + apply_property_defaults(request_obj.properties)
|
| task_request.init_new_request(
|
| - request, acl.can_schedule_high_priority_tasks(), secret_bytes)
|
| + request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes)
|
| + except (TypeError, ValueError) as e:
|
| + raise endpoints.BadRequestException(e.message)
|
|
|
| - result_summary = task_scheduler.schedule_request(request, secret_bytes)
|
| - except (datastore_errors.BadValueError, TypeError, ValueError) as e:
|
| + # Check if a request was already created with the same transaction.
|
| + if request.transaction_id:
|
| + txn_request_obj, txn_result_summary = (
|
| + get_request_and_result_by_transaction_id(request.transaction_id))
|
| + if txn_request_obj and txn_result_summary:
|
| + todict = lambda r: r.to_dict(['created_ts', 'expiration_ts'])
|
| + if todict(txn_request_obj) != todict(request_obj):
|
| + raise endpoints.BadRequestException(
|
| + 'A request with different parameters was created for '
|
| + 'transaction %s. Send the same tasks.new request '
|
| + 'within the same transaction!' % request.transaction_id)
|
| + task_id = task_pack.pack_result_summary_key(txn_result_summary.key)
|
| + logging.info(
|
| + 'reusing existing %s for transaction %s',
|
| + task_id, request.transaction_id)
|
| + return swarming_rpcs.TaskRequestMetadata(
|
| + request=message_conversion.task_request_to_rpc(txn_request_obj),
|
| + task_id=task_id,
|
| + task_result=message_conversion.task_result_to_rpc(
|
| + txn_result_summary, False))
|
| +
|
| + try:
|
| + result_summary = task_scheduler.schedule_request(
|
| + request_obj, secret_bytes,
|
| + memcache_key_for_task_id=request.transaction_id,
|
| + memcache_ns_for_task_id=TRANSACTION_ID_MEMCACHE_NAMESPACE)
|
| + except datastore_errors.BadValueError as e:
|
| raise endpoints.BadRequestException(e.message)
|
|
|
| previous_result = None
|
| @@ -378,7 +439,7 @@ class SwarmingTasksService(remote.Service):
|
| result_summary, False)
|
|
|
| return swarming_rpcs.TaskRequestMetadata(
|
| - request=message_conversion.task_request_to_rpc(request),
|
| + request=message_conversion.task_request_to_rpc(request_obj),
|
| task_id=task_pack.pack_result_summary_key(result_summary.key),
|
| task_result=previous_result)
|
|
|
| @@ -503,6 +564,35 @@ class SwarmingTasksService(remote.Service):
|
|
|
| @gae_ts_mon.instrument_endpoint()
|
| @auth.endpoints_method(
|
| + TransactionId, swarming_rpcs.CancelResponse,
|
| + name='cancel_by_transaction_id',
|
| + path='cancel_by_transaction_id/{transaction_id}')
|
| + @auth.require(acl.is_bot_or_user)
|
| + def cancel_by_transaction_id(self, request):
|
| + """Cancels a task by a transaction ID.
|
| +
|
| + Useful when id of a new task is unknown because the request failed.
|
| + The transaction ID here is the one supplied by the user when creating the
|
| + task.
|
| +
|
| + If a bot was running the task, the bot will forcibly cancel the task.
|
| + """
|
| + logging.info('%s', request)
|
| +
|
| + request_obj, result = get_request_and_result_by_transaction_id(
|
| + request.transaction_id)
|
| + if not request_obj or not result:
|
| + raise endpoints.NotFoundException(
|
| + 'request for transaction %s not found.' % request.transaction_id)
|
| +
|
| + ok, was_running = task_scheduler.cancel_task(request_obj, result.key)
|
| + return swarming_rpcs.CancelResponse(
|
| + task_id=request_obj.task_id,
|
| + ok=ok,
|
| + was_running=was_running)
|
| +
|
| + @gae_ts_mon.instrument_endpoint()
|
| + @auth.endpoints_method(
|
| swarming_rpcs.TasksCountRequest, swarming_rpcs.TasksCount,
|
| http_method='GET')
|
| @auth.require(acl.is_privileged_user)
|
|
|