OLD | NEW |
1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """This module defines Swarming Server endpoints handlers.""" | 5 """This module defines Swarming Server endpoints handlers.""" |
6 | 6 |
7 import logging | 7 import logging |
8 import os | 8 import os |
9 | 9 |
10 from google.appengine.api import datastore_errors | 10 from google.appengine.api import datastore_errors |
(...skipping 18 matching lines...) Expand all Loading... |
29 from server import acl | 29 from server import acl |
30 from server import bot_code | 30 from server import bot_code |
31 from server import bot_management | 31 from server import bot_management |
32 from server import config | 32 from server import config |
33 from server import task_pack | 33 from server import task_pack |
34 from server import task_request | 34 from server import task_request |
35 from server import task_result | 35 from server import task_result |
36 from server import task_scheduler | 36 from server import task_scheduler |
37 | 37 |
38 | 38 |
| 39 TRANSACTION_ID_MEMCACHE_NAMESPACE = 'tasks.new/transaction_id' |
| 40 |
| 41 |
39 ### Helper Methods | 42 ### Helper Methods |
40 | 43 |
41 | 44 |
42 # Add support for BooleanField in protorpc in endpoints GET requests. | 45 # Add support for BooleanField in protorpc in endpoints GET requests. |
43 _old_decode_field = protojson.ProtoJson.decode_field | 46 _old_decode_field = protojson.ProtoJson.decode_field |
44 def _decode_field(self, field, value): | 47 def _decode_field(self, field, value): |
45 if (isinstance(field, messages.BooleanField) and | 48 if (isinstance(field, messages.BooleanField) and |
46 isinstance(value, basestring)): | 49 isinstance(value, basestring)): |
47 return value.lower() == 'true' | 50 return value.lower() == 'true' |
48 return _old_decode_field(self, field, value) | 51 return _old_decode_field(self, field, value) |
49 protojson.ProtoJson.decode_field = _decode_field | 52 protojson.ProtoJson.decode_field = _decode_field |
50 | 53 |
51 | 54 |
52 def get_request_and_result(task_id): | 55 def get_request_and_result(task_id): |
53 """Provides the key and TaskRequest corresponding to a task ID. | 56 """Provides the TaskRequest and result corresponding to a task ID. |
54 | 57 |
55 Enforces the ACL for users. Allows bots all access for the moment. | 58 Enforces the ACL for users. Allows bots all access for the moment. |
56 | 59 |
57 Returns: | 60 Returns: |
58 tuple(TaskRequest, result): result can be either for a TaskRunResult or a | 61 tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
59 TaskResultSummay. | 62 TaskResultSummary. |
60 """ | 63 """ |
61 try: | 64 try: |
62 request_key, result_key = task_pack.get_request_and_result_keys(task_id) | 65 request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
63 request, result = ndb.get_multi((request_key, result_key)) | 66 request, result = ndb.get_multi((request_key, result_key)) |
64 if not request or not result: | 67 if not request or not result: |
65 raise endpoints.NotFoundException('%s not found.' % task_id) | 68 raise endpoints.NotFoundException('%s not found.' % task_id) |
66 if not acl.is_bot() and not request.has_access: | 69 if not acl.is_bot() and not request.has_access: |
67 raise endpoints.ForbiddenException('%s is not accessible.' % task_id) | 70 raise endpoints.ForbiddenException('%s is not accessible.' % task_id) |
68 return request, result | 71 return request, result |
69 except ValueError: | 72 except ValueError: |
70 raise endpoints.BadRequestException('%s is an invalid key.' % task_id) | 73 raise endpoints.BadRequestException('%s is an invalid key.' % task_id) |
71 | 74 |
72 | 75 |
| 76 def get_request_and_result_by_transaction_id(transaction_id): |
| 77 """Provides the TaskRequest and result corresponding to a transaction ID. |
| 78 |
| 79 Enforces the ACL for users. Allows bots all access for the moment. |
| 80 |
| 81 May return (None, None) if the transaction, request or result is not found. |
| 82 |
| 83 Returns: |
| 84 tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
| 85 TaskResultSummary. |
| 86 """ |
| 87 task_id = memcache.get( |
| 88 transaction_id, namespace=TRANSACTION_ID_MEMCACHE_NAMESPACE) |
| 89 if not task_id: |
| 90 return None, None |
| 91 request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
| 92 request, result = ndb.get_multi((request_key, result_key)) |
| 93 if not request or not result: |
| 94 return None, None |
| 95 if not acl.is_bot() and not request.has_access: |
| 96 raise endpoints.ForbiddenException( |
| 97 '%s, created for transaction %s, is not accessible.' % |
| 98 (task_id, transaction_id)) |
| 99 return request, result |
| 100 |
| 101 |
73 def get_or_raise(key): | 102 def get_or_raise(key): |
74 """Returns an entity or raises an endpoints exception if it does not exist.""" | 103 """Returns an entity or raises an endpoints exception if it does not exist.""" |
75 result = key.get() | 104 result = key.get() |
76 if not result: | 105 if not result: |
77 raise endpoints.NotFoundException('%s not found.' % key.id()) | 106 raise endpoints.NotFoundException('%s not found.' % key.id()) |
78 return result | 107 return result |
79 | 108 |
80 | 109 |
81 def apply_property_defaults(properties): | 110 def apply_property_defaults(properties): |
82 """Fills ndb task properties with default values read from server settings.""" | 111 """Fills ndb task properties with default values read from server settings.""" |
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
252 who=obj.who.to_bytes() if obj.who else None, | 281 who=obj.who.to_bytes() if obj.who else None, |
253 when=obj.created_ts, | 282 when=obj.created_ts, |
254 version=str(obj.version)) | 283 version=str(obj.version)) |
255 | 284 |
256 | 285 |
257 TaskId = endpoints.ResourceContainer( | 286 TaskId = endpoints.ResourceContainer( |
258 message_types.VoidMessage, | 287 message_types.VoidMessage, |
259 task_id=messages.StringField(1, required=True)) | 288 task_id=messages.StringField(1, required=True)) |
260 | 289 |
261 | 290 |
| 291 TransactionId = endpoints.ResourceContainer( |
| 292 message_types.VoidMessage, |
| 293 transaction_id=messages.StringField(1)) |
| 294 |
| 295 |
262 TaskIdWithPerf = endpoints.ResourceContainer( | 296 TaskIdWithPerf = endpoints.ResourceContainer( |
263 message_types.VoidMessage, | 297 message_types.VoidMessage, |
264 task_id=messages.StringField(1, required=True), | 298 task_id=messages.StringField(1, required=True), |
265 include_performance_stats=messages.BooleanField(2, default=False)) | 299 include_performance_stats=messages.BooleanField(2, default=False)) |
266 | 300 |
267 | 301 |
268 @swarming_api.api_class(resource_name='task', path='task') | 302 @swarming_api.api_class(resource_name='task', path='task') |
269 class SwarmingTaskService(remote.Service): | 303 class SwarmingTaskService(remote.Service): |
270 """Swarming's task-related API.""" | 304 """Swarming's task-related API.""" |
271 @gae_ts_mon.instrument_endpoint() | 305 @gae_ts_mon.instrument_endpoint() |
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
349 def new(self, request): | 383 def new(self, request): |
350 """Creates a new task. | 384 """Creates a new task. |
351 | 385 |
352 The task will be enqueued in the tasks list and will be executed at the | 386 The task will be enqueued in the tasks list and will be executed at the |
353 earliest opportunity by a bot that has at least the dimensions as described | 387 earliest opportunity by a bot that has at least the dimensions as described |
354 in the task request. | 388 in the task request. |
355 """ | 389 """ |
356 sb = (request.properties.secret_bytes | 390 sb = (request.properties.secret_bytes |
357 if request.properties is not None else None) | 391 if request.properties is not None else None) |
358 if sb is not None: | 392 if sb is not None: |
359 request.properties.secret_bytes = "HIDDEN" | 393 request.properties.secret_bytes = 'HIDDEN' |
360 logging.info('%s', request) | 394 logging.info('%s', request) |
361 if sb is not None: | 395 if sb is not None: |
362 request.properties.secret_bytes = sb | 396 request.properties.secret_bytes = sb |
363 | 397 |
364 try: | 398 try: |
365 request, secret_bytes = message_conversion.new_task_request_from_rpc( | 399 request_obj, secret_bytes = message_conversion.new_task_request_from_rpc( |
366 request, utils.utcnow()) | 400 request, utils.utcnow()) |
367 apply_property_defaults(request.properties) | 401 apply_property_defaults(request_obj.properties) |
368 task_request.init_new_request( | 402 task_request.init_new_request( |
369 request, acl.can_schedule_high_priority_tasks(), secret_bytes) | 403 request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes) |
| 404 except (TypeError, ValueError) as e: |
| 405 raise endpoints.BadRequestException(e.message) |
370 | 406 |
371 result_summary = task_scheduler.schedule_request(request, secret_bytes) | 407 # Check if a request was already created with the same transaction. |
372 except (datastore_errors.BadValueError, TypeError, ValueError) as e: | 408 if request.transaction_id: |
| 409 txn_request_obj, txn_result_summary = ( |
| 410 get_request_and_result_by_transaction_id(request.transaction_id)) |
| 411 if txn_request_obj and txn_result_summary: |
| 412 todict = lambda r: r.to_dict(['created_ts', 'expiration_ts']) |
| 413 if todict(txn_request_obj) != todict(request_obj): |
| 414 raise endpoints.BadRequestException( |
| 415 'A request with different parameters was created for ' |
| 416 'transaction %s. Send the same tasks.new request ' |
| 417 'within the same transaction!' % request.transaction_id) |
| 418 task_id = task_pack.pack_result_summary_key(txn_result_summary.key) |
| 419 logging.info( |
| 420 'reusing existing %s for transaction %s', |
| 421 task_id, request.transaction_id) |
| 422 return swarming_rpcs.TaskRequestMetadata( |
| 423 request=message_conversion.task_request_to_rpc(txn_request_obj), |
| 424 task_id=task_id, |
| 425 task_result=message_conversion.task_result_to_rpc( |
| 426 txn_result_summary, False)) |
| 427 |
| 428 try: |
| 429 result_summary = task_scheduler.schedule_request( |
| 430 request_obj, secret_bytes, |
| 431 memcache_key_for_task_id=request.transaction_id, |
| 432 memcache_ns_for_task_id=TRANSACTION_ID_MEMCACHE_NAMESPACE) |
| 433 except datastore_errors.BadValueError as e: |
373 raise endpoints.BadRequestException(e.message) | 434 raise endpoints.BadRequestException(e.message) |
374 | 435 |
375 previous_result = None | 436 previous_result = None |
376 if result_summary.deduped_from: | 437 if result_summary.deduped_from: |
377 previous_result = message_conversion.task_result_to_rpc( | 438 previous_result = message_conversion.task_result_to_rpc( |
378 result_summary, False) | 439 result_summary, False) |
379 | 440 |
380 return swarming_rpcs.TaskRequestMetadata( | 441 return swarming_rpcs.TaskRequestMetadata( |
381 request=message_conversion.task_request_to_rpc(request), | 442 request=message_conversion.task_request_to_rpc(request_obj), |
382 task_id=task_pack.pack_result_summary_key(result_summary.key), | 443 task_id=task_pack.pack_result_summary_key(result_summary.key), |
383 task_result=previous_result) | 444 task_result=previous_result) |
384 | 445 |
385 @gae_ts_mon.instrument_endpoint() | 446 @gae_ts_mon.instrument_endpoint() |
386 @auth.endpoints_method( | 447 @auth.endpoints_method( |
387 swarming_rpcs.TasksRequest, swarming_rpcs.TaskList, | 448 swarming_rpcs.TasksRequest, swarming_rpcs.TaskList, |
388 http_method='GET') | 449 http_method='GET') |
389 @auth.require(acl.is_privileged_user) | 450 @auth.require(acl.is_privileged_user) |
390 def list(self, request): | 451 def list(self, request): |
391 """Returns tasks results based on the filters. | 452 """Returns tasks results based on the filters. |
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
496 else: | 557 else: |
497 logging.info('No tasks to cancel.') | 558 logging.info('No tasks to cancel.') |
498 | 559 |
499 return swarming_rpcs.TasksCancelResponse( | 560 return swarming_rpcs.TasksCancelResponse( |
500 cursor=cursor, | 561 cursor=cursor, |
501 matched=len(tasks), | 562 matched=len(tasks), |
502 now=now) | 563 now=now) |
503 | 564 |
504 @gae_ts_mon.instrument_endpoint() | 565 @gae_ts_mon.instrument_endpoint() |
505 @auth.endpoints_method( | 566 @auth.endpoints_method( |
| 567 TransactionId, swarming_rpcs.CancelResponse, |
| 568 name='cancel_by_transaction_id', |
| 569 path='cancel_by_transaction_id/{transaction_id}') |
| 570 @auth.require(acl.is_bot_or_user) |
| 571 def cancel_by_transaction_id(self, request): |
| 572 """Cancels a task by a transaction ID. |
| 573 |
| 574 Useful when id of a new task is unknown because the request failed. |
| 575 The transaction ID here is the one supplied by the user when creating the |
| 576 task. |
| 577 |
| 578 If a bot was running the task, the bot will forcibly cancel the task. |
| 579 """ |
| 580 logging.info('%s', request) |
| 581 |
| 582 request_obj, result = get_request_and_result_by_transaction_id( |
| 583 request.transaction_id) |
| 584 if not request_obj or not result: |
| 585 raise endpoints.NotFoundException( |
| 586 'request for transaction %s not found.' % request.transaction_id) |
| 587 |
| 588 ok, was_running = task_scheduler.cancel_task(request_obj, result.key) |
| 589 return swarming_rpcs.CancelResponse( |
| 590 task_id=request_obj.task_id, |
| 591 ok=ok, |
| 592 was_running=was_running) |
| 593 |
| 594 @gae_ts_mon.instrument_endpoint() |
| 595 @auth.endpoints_method( |
506 swarming_rpcs.TasksCountRequest, swarming_rpcs.TasksCount, | 596 swarming_rpcs.TasksCountRequest, swarming_rpcs.TasksCount, |
507 http_method='GET') | 597 http_method='GET') |
508 @auth.require(acl.is_privileged_user) | 598 @auth.require(acl.is_privileged_user) |
509 def count(self, request): | 599 def count(self, request): |
510 """Counts number of tasks in a given state.""" | 600 """Counts number of tasks in a given state.""" |
511 logging.info('%s', request) | 601 logging.info('%s', request) |
512 if not request.start: | 602 if not request.start: |
513 raise endpoints.BadRequestException('start (as epoch) is required') | 603 raise endpoints.BadRequestException('start (as epoch) is required') |
514 now = utils.utcnow() | 604 now = utils.utcnow() |
515 mem_key = self._memcache_key(request, now) | 605 mem_key = self._memcache_key(request, now) |
(...skipping 318 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
834 def get_routes(): | 924 def get_routes(): |
835 return ( | 925 return ( |
836 endpoints_webapp2.api_routes(SwarmingServerService) + | 926 endpoints_webapp2.api_routes(SwarmingServerService) + |
837 endpoints_webapp2.api_routes(SwarmingTaskService) + | 927 endpoints_webapp2.api_routes(SwarmingTaskService) + |
838 endpoints_webapp2.api_routes(SwarmingTasksService) + | 928 endpoints_webapp2.api_routes(SwarmingTasksService) + |
839 endpoints_webapp2.api_routes(SwarmingBotService) + | 929 endpoints_webapp2.api_routes(SwarmingBotService) + |
840 endpoints_webapp2.api_routes(SwarmingBotsService) + | 930 endpoints_webapp2.api_routes(SwarmingBotsService) + |
841 # components.config endpoints for validation and configuring of luci-config | 931 # components.config endpoints for validation and configuring of luci-config |
842 # service URL. | 932 # service URL. |
843 endpoints_webapp2.api_routes(config.ConfigApi)) | 933 endpoints_webapp2.api_routes(config.ConfigApi)) |
OLD | NEW |