Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """This module defines Swarming Server endpoints handlers.""" | 5 """This module defines Swarming Server endpoints handlers.""" |
| 6 | 6 |
| 7 import logging | 7 import logging |
| 8 import os | 8 import os |
| 9 | 9 |
| 10 from google.appengine.api import datastore_errors | 10 from google.appengine.api import datastore_errors |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 43 _old_decode_field = protojson.ProtoJson.decode_field | 43 _old_decode_field = protojson.ProtoJson.decode_field |
| 44 def _decode_field(self, field, value): | 44 def _decode_field(self, field, value): |
| 45 if (isinstance(field, messages.BooleanField) and | 45 if (isinstance(field, messages.BooleanField) and |
| 46 isinstance(value, basestring)): | 46 isinstance(value, basestring)): |
| 47 return value.lower() == 'true' | 47 return value.lower() == 'true' |
| 48 return _old_decode_field(self, field, value) | 48 return _old_decode_field(self, field, value) |
| 49 protojson.ProtoJson.decode_field = _decode_field | 49 protojson.ProtoJson.decode_field = _decode_field |
| 50 | 50 |
| 51 | 51 |
| 52 def get_request_and_result(task_id): | 52 def get_request_and_result(task_id): |
| 53 """Provides the key and TaskRequest corresponding to a task ID. | 53 """Provides the TaskRequest and result corresponding to a task ID. |
| 54 | 54 |
| 55 Enforces the ACL for users. Allows bots all access for the moment. | 55 Enforces the ACL for users. Allows bots all access for the moment. |
| 56 | 56 |
| 57 Returns: | 57 Returns: |
| 58 tuple(TaskRequest, result): result can be either for a TaskRunResult or a | 58 tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
| 59 TaskResultSummay. | 59 TaskResultSummary. |
| 60 """ | 60 """ |
| 61 try: | 61 try: |
| 62 request_key, result_key = task_pack.get_request_and_result_keys(task_id) | 62 request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
| 63 request, result = ndb.get_multi((request_key, result_key)) | 63 request, result = ndb.get_multi((request_key, result_key)) |
| 64 if not request or not result: | 64 if not request or not result: |
| 65 raise endpoints.NotFoundException('%s not found.' % task_id) | 65 raise endpoints.NotFoundException('%s not found.' % task_id) |
| 66 if not acl.is_bot() and not request.has_access: | 66 if not acl.is_bot() and not request.has_access: |
| 67 raise endpoints.ForbiddenException('%s is not accessible.' % task_id) | 67 raise endpoints.ForbiddenException('%s is not accessible.' % task_id) |
| 68 return request, result | 68 return request, result |
| 69 except ValueError: | 69 except ValueError: |
| 70 raise endpoints.BadRequestException('%s is an invalid key.' % task_id) | 70 raise endpoints.BadRequestException('%s is an invalid key.' % task_id) |
| 71 | 71 |
| 72 | 72 |
| 73 def get_request_and_result_by_transaction_id(transaction_id): | |
| 74 """Provides the TaskRequest and result corresponding to a transaction ID. | |
| 75 | |
| 76 Enforces the ACL for users. Allows bots all access for the moment. | |
| 77 | |
| 78 May return (None, None) if the transaction, request or result is not found. | |
| 79 | |
| 80 Returns: | |
| 81 tuple(TaskRequest, result): result can be either for a TaskRunResult or a | |
| 82 TaskResultSummary. | |
| 83 """ | |
| 84 task_id = memcache.get(transaction_id_to_memcache_key(transaction_id)) | |
|
M-A Ruel
2017/05/03 01:33:20
use a namespace, it's simpler. Then no need for tr
nodir
2017/05/05 18:11:38
Done.
| |
| 85 if not task_id: | |
| 86 return None, None | |
|
M-A Ruel
2017/05/03 01:33:20
This makes the contract a bit scary though. :/ I m
nodir
2017/05/05 18:11:38
maybe set timeout to 1min?
| |
| 87 request_key, result_key = task_pack.get_request_and_result_keys(task_id) | |
| 88 request, result = ndb.get_multi((request_key, result_key)) | |
| 89 if not request or not result: | |
| 90 return None, None | |
| 91 if not acl.is_bot() and not request.has_access: | |
| 92 raise endpoints.ForbiddenException( | |
| 93 '%s, created for transaction %s, is not accessible.' % | |
| 94 (task_id, transaction_id)) | |
| 95 return request, result | |
| 96 | |
| 97 | |
| 98 def transaction_id_to_memcache_key(transaction_id): | |
| 99 assert transaction_id | |
| 100 return 'tasks.new/txn/%s' % transaction_id | |
| 101 | |
| 102 | |
| 73 def get_or_raise(key): | 103 def get_or_raise(key): |
| 74 """Returns an entity or raises an endpoints exception if it does not exist.""" | 104 """Returns an entity or raises an endpoints exception if it does not exist.""" |
| 75 result = key.get() | 105 result = key.get() |
| 76 if not result: | 106 if not result: |
| 77 raise endpoints.NotFoundException('%s not found.' % key.id()) | 107 raise endpoints.NotFoundException('%s not found.' % key.id()) |
| 78 return result | 108 return result |
| 79 | 109 |
| 80 | 110 |
| 81 def apply_property_defaults(properties): | 111 def apply_property_defaults(properties): |
| 82 """Fills ndb task properties with default values read from server settings.""" | 112 """Fills ndb task properties with default values read from server settings.""" |
| (...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 252 who=obj.who.to_bytes() if obj.who else None, | 282 who=obj.who.to_bytes() if obj.who else None, |
| 253 when=obj.created_ts, | 283 when=obj.created_ts, |
| 254 version=str(obj.version)) | 284 version=str(obj.version)) |
| 255 | 285 |
| 256 | 286 |
| 257 TaskId = endpoints.ResourceContainer( | 287 TaskId = endpoints.ResourceContainer( |
| 258 message_types.VoidMessage, | 288 message_types.VoidMessage, |
| 259 task_id=messages.StringField(1, required=True)) | 289 task_id=messages.StringField(1, required=True)) |
| 260 | 290 |
| 261 | 291 |
| 292 TaskIdWithTransactionID = endpoints.ResourceContainer( | |
| 293 message_types.VoidMessage, | |
| 294 task_id=messages.StringField(1), | |
| 295 transaction_id=messages.StringField(2)) | |
| 296 | |
| 297 | |
| 262 TaskIdWithPerf = endpoints.ResourceContainer( | 298 TaskIdWithPerf = endpoints.ResourceContainer( |
| 263 message_types.VoidMessage, | 299 message_types.VoidMessage, |
| 264 task_id=messages.StringField(1, required=True), | 300 task_id=messages.StringField(1, required=True), |
| 265 include_performance_stats=messages.BooleanField(2, default=False)) | 301 include_performance_stats=messages.BooleanField(2, default=False)) |
| 266 | 302 |
| 267 | 303 |
| 268 @swarming_api.api_class(resource_name='task', path='task') | 304 @swarming_api.api_class(resource_name='task', path='task') |
| 269 class SwarmingTaskService(remote.Service): | 305 class SwarmingTaskService(remote.Service): |
| 270 """Swarming's task-related API.""" | 306 """Swarming's task-related API.""" |
| 271 @gae_ts_mon.instrument_endpoint() | 307 @gae_ts_mon.instrument_endpoint() |
| (...skipping 25 matching lines...) Expand all Loading... | |
| 297 http_method='GET') | 333 http_method='GET') |
| 298 @auth.require(acl.is_bot_or_user) | 334 @auth.require(acl.is_bot_or_user) |
| 299 def request(self, request): | 335 def request(self, request): |
| 300 """Returns the task request corresponding to a task ID.""" | 336 """Returns the task request corresponding to a task ID.""" |
| 301 logging.info('%s', request) | 337 logging.info('%s', request) |
| 302 request_obj, _ = get_request_and_result(request.task_id) | 338 request_obj, _ = get_request_and_result(request.task_id) |
| 303 return message_conversion.task_request_to_rpc(request_obj) | 339 return message_conversion.task_request_to_rpc(request_obj) |
| 304 | 340 |
| 305 @gae_ts_mon.instrument_endpoint() | 341 @gae_ts_mon.instrument_endpoint() |
| 306 @auth.endpoints_method( | 342 @auth.endpoints_method( |
| 307 TaskId, swarming_rpcs.CancelResponse, | 343 TaskIdWithTransactionID, swarming_rpcs.CancelResponse, |
| 308 name='cancel', | 344 name='cancel', |
| 309 path='{task_id}/cancel') | 345 path='{task_id}/cancel') |
| 310 @auth.require(acl.is_bot_or_user) | 346 @auth.require(acl.is_bot_or_user) |
| 311 def cancel(self, request): | 347 def cancel(self, request): |
| 312 """Cancels a task. | 348 """Cancels a task. |
| 313 | 349 |
| 314 If a bot was running the task, the bot will forcibly cancel the task. | 350 If a bot was running the task, the bot will forcibly cancel the task. |
| 315 """ | 351 """ |
| 316 logging.info('%s', request) | 352 logging.info('%s', request) |
| 317 request_obj, result = get_request_and_result(request.task_id) | 353 if request.task_id and request.transaction_id: |
| 354 raise endpoints.BadRequestException( | |
| 355 'only one of task_id or transaction_id must be specified') | |
|
M-A Ruel
2017/05/03 01:33:20
That can't work because path is '{task_id}/cancel'
nodir
2017/05/05 18:11:38
ugh, should I define a new API then? cancel_by_tra
M-A Ruel
2017/05/05 18:15:33
Yes I prefer a new API. If we get rid of the featu
nodir
2017/05/05 18:41:37
Done.
| |
| 356 if request.task_id: | |
| 357 request_obj, result = get_request_and_result(request.task_id) | |
| 358 else: | |
| 359 request_obj, result = get_request_and_result_by_transaction_id( | |
| 360 request.transaction_id) | |
| 361 if not request_obj or not result: | |
| 362 raise endpoints.NotFoundException( | |
| 363 'request for transaction %s not found.' % request.transaction_id) | |
| 364 | |
| 318 ok, was_running = task_scheduler.cancel_task(request_obj, result.key) | 365 ok, was_running = task_scheduler.cancel_task(request_obj, result.key) |
| 319 return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running) | 366 return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running) |
| 320 | 367 |
| 321 @gae_ts_mon.instrument_endpoint() | 368 @gae_ts_mon.instrument_endpoint() |
| 322 @auth.endpoints_method( | 369 @auth.endpoints_method( |
| 323 TaskId, swarming_rpcs.TaskOutput, | 370 TaskId, swarming_rpcs.TaskOutput, |
| 324 name='stdout', | 371 name='stdout', |
| 325 path='{task_id}/stdout', | 372 path='{task_id}/stdout', |
| 326 http_method='GET') | 373 http_method='GET') |
| 327 @auth.require(acl.is_bot_or_user) | 374 @auth.require(acl.is_bot_or_user) |
| (...skipping 21 matching lines...) Expand all Loading... | |
| 349 def new(self, request): | 396 def new(self, request): |
| 350 """Creates a new task. | 397 """Creates a new task. |
| 351 | 398 |
| 352 The task will be enqueued in the tasks list and will be executed at the | 399 The task will be enqueued in the tasks list and will be executed at the |
| 353 earliest opportunity by a bot that has at least the dimensions as described | 400 earliest opportunity by a bot that has at least the dimensions as described |
| 354 in the task request. | 401 in the task request. |
| 355 """ | 402 """ |
| 356 sb = (request.properties.secret_bytes | 403 sb = (request.properties.secret_bytes |
| 357 if request.properties is not None else None) | 404 if request.properties is not None else None) |
| 358 if sb is not None: | 405 if sb is not None: |
| 359 request.properties.secret_bytes = "HIDDEN" | 406 request.properties.secret_bytes = 'HIDDEN' |
| 360 logging.info('%s', request) | 407 logging.info('%s', request) |
| 361 if sb is not None: | 408 if sb is not None: |
| 362 request.properties.secret_bytes = sb | 409 request.properties.secret_bytes = sb |
| 363 | 410 |
| 364 try: | 411 try: |
| 365 request, secret_bytes = message_conversion.new_task_request_from_rpc( | 412 request_obj, secret_bytes = message_conversion.new_task_request_from_rpc( |
| 366 request, utils.utcnow()) | 413 request, utils.utcnow()) |
| 367 apply_property_defaults(request.properties) | 414 apply_property_defaults(request_obj.properties) |
| 368 task_request.init_new_request( | 415 task_request.init_new_request( |
| 369 request, acl.can_schedule_high_priority_tasks(), secret_bytes) | 416 request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes) |
| 417 except (TypeError, ValueError) as e: | |
| 418 raise endpoints.BadRequestException(e.message) | |
| 370 | 419 |
| 371 result_summary = task_scheduler.schedule_request(request, secret_bytes) | 420 # Check if a request was already created with the same transaction. |
| 372 except (datastore_errors.BadValueError, TypeError, ValueError) as e: | 421 transaction_memcache_key = None |
| 422 if request.transaction_id: | |
| 423 transaction_memcache_key = transaction_id_to_memcache_key( | |
| 424 request.transaction_id) | |
| 425 txn_request_obj, txn_result_summary = ( | |
| 426 get_request_and_result_by_transaction_id(request.transaction_id)) | |
| 427 if txn_request_obj and txn_result_summary: | |
| 428 todict = lambda r: r.to_dict(['created_ts', 'expiration_ts']) | |
| 429 if todict(txn_request_obj) != todict(request_obj): | |
| 430 raise endpoints.BadRequestException( | |
| 431 'A request with different parameters was created for ' | |
| 432 'transaction %s. Send the same tasks.new request ' | |
| 433 'within the same transaction!' % request.transaction_id) | |
| 434 task_id = task_pack.pack_result_summary_key(txn_result_summary.key) | |
| 435 logging.info( | |
| 436 'reusing existing %s for transaction %s', | |
| 437 task_id, request.transaction_id) | |
| 438 return swarming_rpcs.TaskRequestMetadata( | |
| 439 request=message_conversion.task_request_to_rpc(txn_request_obj), | |
| 440 task_id=task_id, | |
| 441 task_result=message_conversion.task_result_to_rpc( | |
| 442 txn_result_summary, False)) | |
| 443 | |
| 444 try: | |
| 445 result_summary = task_scheduler.schedule_request( | |
| 446 request_obj, secret_bytes, | |
| 447 memcache_key_for_task_id=transaction_memcache_key) | |
| 448 except datastore_errors.BadValueError as e: | |
| 373 raise endpoints.BadRequestException(e.message) | 449 raise endpoints.BadRequestException(e.message) |
| 374 | 450 |
| 375 previous_result = None | 451 previous_result = None |
| 376 if result_summary.deduped_from: | 452 if result_summary.deduped_from: |
| 377 previous_result = message_conversion.task_result_to_rpc( | 453 previous_result = message_conversion.task_result_to_rpc( |
| 378 result_summary, False) | 454 result_summary, False) |
| 379 | 455 |
| 380 return swarming_rpcs.TaskRequestMetadata( | 456 return swarming_rpcs.TaskRequestMetadata( |
| 381 request=message_conversion.task_request_to_rpc(request), | 457 request=message_conversion.task_request_to_rpc(request_obj), |
| 382 task_id=task_pack.pack_result_summary_key(result_summary.key), | 458 task_id=task_pack.pack_result_summary_key(result_summary.key), |
| 383 task_result=previous_result) | 459 task_result=previous_result) |
| 384 | 460 |
| 385 @gae_ts_mon.instrument_endpoint() | 461 @gae_ts_mon.instrument_endpoint() |
| 386 @auth.endpoints_method( | 462 @auth.endpoints_method( |
| 387 swarming_rpcs.TasksRequest, swarming_rpcs.TaskList, | 463 swarming_rpcs.TasksRequest, swarming_rpcs.TaskList, |
| 388 http_method='GET') | 464 http_method='GET') |
| 389 @auth.require(acl.is_privileged_user) | 465 @auth.require(acl.is_privileged_user) |
| 390 def list(self, request): | 466 def list(self, request): |
| 391 """Returns tasks results based on the filters. | 467 """Returns tasks results based on the filters. |
| (...skipping 442 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 834 def get_routes(): | 910 def get_routes(): |
| 835 return ( | 911 return ( |
| 836 endpoints_webapp2.api_routes(SwarmingServerService) + | 912 endpoints_webapp2.api_routes(SwarmingServerService) + |
| 837 endpoints_webapp2.api_routes(SwarmingTaskService) + | 913 endpoints_webapp2.api_routes(SwarmingTaskService) + |
| 838 endpoints_webapp2.api_routes(SwarmingTasksService) + | 914 endpoints_webapp2.api_routes(SwarmingTasksService) + |
| 839 endpoints_webapp2.api_routes(SwarmingBotService) + | 915 endpoints_webapp2.api_routes(SwarmingBotService) + |
| 840 endpoints_webapp2.api_routes(SwarmingBotsService) + | 916 endpoints_webapp2.api_routes(SwarmingBotsService) + |
| 841 # components.config endpoints for validation and configuring of luci-config | 917 # components.config endpoints for validation and configuring of luci-config |
| 842 # service URL. | 918 # service URL. |
| 843 endpoints_webapp2.api_routes(config.ConfigApi)) | 919 endpoints_webapp2.api_routes(config.ConfigApi)) |
| OLD | NEW |