OLD | NEW |
---|---|
1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """This module defines Swarming Server endpoints handlers.""" | 5 """This module defines Swarming Server endpoints handlers.""" |
6 | 6 |
7 import logging | 7 import logging |
8 import os | 8 import os |
9 | 9 |
10 from google.appengine.api import datastore_errors | 10 from google.appengine.api import datastore_errors |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
43 _old_decode_field = protojson.ProtoJson.decode_field | 43 _old_decode_field = protojson.ProtoJson.decode_field |
44 def _decode_field(self, field, value): | 44 def _decode_field(self, field, value): |
45 if (isinstance(field, messages.BooleanField) and | 45 if (isinstance(field, messages.BooleanField) and |
46 isinstance(value, basestring)): | 46 isinstance(value, basestring)): |
47 return value.lower() == 'true' | 47 return value.lower() == 'true' |
48 return _old_decode_field(self, field, value) | 48 return _old_decode_field(self, field, value) |
49 protojson.ProtoJson.decode_field = _decode_field | 49 protojson.ProtoJson.decode_field = _decode_field |
50 | 50 |
51 | 51 |
52 def get_request_and_result(task_id): | 52 def get_request_and_result(task_id): |
53 """Provides the key and TaskRequest corresponding to a task ID. | 53 """Provides the TaskRequest and result corresponding to a task ID. |
54 | 54 |
55 Enforces the ACL for users. Allows bots all access for the moment. | 55 Enforces the ACL for users. Allows bots all access for the moment. |
56 | 56 |
57 Returns: | 57 Returns: |
58 tuple(TaskRequest, result): result can be either for a TaskRunResult or a | 58 tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
59 TaskResultSummay. | 59 TaskResultSummary. |
60 """ | 60 """ |
61 try: | 61 try: |
62 request_key, result_key = task_pack.get_request_and_result_keys(task_id) | 62 request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
63 request, result = ndb.get_multi((request_key, result_key)) | 63 request, result = ndb.get_multi((request_key, result_key)) |
64 if not request or not result: | 64 if not request or not result: |
65 raise endpoints.NotFoundException('%s not found.' % task_id) | 65 raise endpoints.NotFoundException('%s not found.' % task_id) |
66 if not acl.is_bot() and not request.has_access: | 66 if not acl.is_bot() and not request.has_access: |
67 raise endpoints.ForbiddenException('%s is not accessible.' % task_id) | 67 raise endpoints.ForbiddenException('%s is not accessible.' % task_id) |
68 return request, result | 68 return request, result |
69 except ValueError: | 69 except ValueError: |
70 raise endpoints.BadRequestException('%s is an invalid key.' % task_id) | 70 raise endpoints.BadRequestException('%s is an invalid key.' % task_id) |
71 | 71 |
72 | 72 |
73 def get_request_and_result_by_transaction_id(transaction_id): | |
74 """Provides the TaskRequest and result corresponding to a transaction ID. | |
75 | |
76 Enforces the ACL for users. Allows bots all access for the moment. | |
77 | |
78 May return (None, None) if the transaction, request or result is not found. | |
79 | |
80 Returns: | |
81 tuple(TaskRequest, result): result can be either for a TaskRunResult or a | |
82 TaskResultSummary. | |
83 """ | |
84 task_id = memcache.get(transaction_id_to_memcache_key(transaction_id)) | |
M-A Ruel
2017/05/03 01:33:20
use a namespace, it's simpler. Then no need for tr
nodir
2017/05/05 18:11:38
Done.
| |
85 if not task_id: | |
86 return None, None | |
M-A Ruel
2017/05/03 01:33:20
This makes the contract a bit scary though. :/ I m
nodir
2017/05/05 18:11:38
maybe set timeout to 1min?
| |
87 request_key, result_key = task_pack.get_request_and_result_keys(task_id) | |
88 request, result = ndb.get_multi((request_key, result_key)) | |
89 if not request or not result: | |
90 return None, None | |
91 if not acl.is_bot() and not request.has_access: | |
92 raise endpoints.ForbiddenException( | |
93 '%s, created for transaction %s, is not accessible.' % | |
94 (task_id, transaction_id)) | |
95 return request, result | |
96 | |
97 | |
98 def transaction_id_to_memcache_key(transaction_id): | |
99 assert transaction_id | |
100 return 'tasks.new/txn/%s' % transaction_id | |
101 | |
102 | |
73 def get_or_raise(key): | 103 def get_or_raise(key): |
74 """Returns an entity or raises an endpoints exception if it does not exist.""" | 104 """Returns an entity or raises an endpoints exception if it does not exist.""" |
75 result = key.get() | 105 result = key.get() |
76 if not result: | 106 if not result: |
77 raise endpoints.NotFoundException('%s not found.' % key.id()) | 107 raise endpoints.NotFoundException('%s not found.' % key.id()) |
78 return result | 108 return result |
79 | 109 |
80 | 110 |
81 def apply_property_defaults(properties): | 111 def apply_property_defaults(properties): |
82 """Fills ndb task properties with default values read from server settings.""" | 112 """Fills ndb task properties with default values read from server settings.""" |
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
252 who=obj.who.to_bytes() if obj.who else None, | 282 who=obj.who.to_bytes() if obj.who else None, |
253 when=obj.created_ts, | 283 when=obj.created_ts, |
254 version=str(obj.version)) | 284 version=str(obj.version)) |
255 | 285 |
256 | 286 |
257 TaskId = endpoints.ResourceContainer( | 287 TaskId = endpoints.ResourceContainer( |
258 message_types.VoidMessage, | 288 message_types.VoidMessage, |
259 task_id=messages.StringField(1, required=True)) | 289 task_id=messages.StringField(1, required=True)) |
260 | 290 |
261 | 291 |
292 TaskIdWithTransactionID = endpoints.ResourceContainer( | |
293 message_types.VoidMessage, | |
294 task_id=messages.StringField(1), | |
295 transaction_id=messages.StringField(2)) | |
296 | |
297 | |
262 TaskIdWithPerf = endpoints.ResourceContainer( | 298 TaskIdWithPerf = endpoints.ResourceContainer( |
263 message_types.VoidMessage, | 299 message_types.VoidMessage, |
264 task_id=messages.StringField(1, required=True), | 300 task_id=messages.StringField(1, required=True), |
265 include_performance_stats=messages.BooleanField(2, default=False)) | 301 include_performance_stats=messages.BooleanField(2, default=False)) |
266 | 302 |
267 | 303 |
268 @swarming_api.api_class(resource_name='task', path='task') | 304 @swarming_api.api_class(resource_name='task', path='task') |
269 class SwarmingTaskService(remote.Service): | 305 class SwarmingTaskService(remote.Service): |
270 """Swarming's task-related API.""" | 306 """Swarming's task-related API.""" |
271 @gae_ts_mon.instrument_endpoint() | 307 @gae_ts_mon.instrument_endpoint() |
(...skipping 25 matching lines...) Expand all Loading... | |
297 http_method='GET') | 333 http_method='GET') |
298 @auth.require(acl.is_bot_or_user) | 334 @auth.require(acl.is_bot_or_user) |
299 def request(self, request): | 335 def request(self, request): |
300 """Returns the task request corresponding to a task ID.""" | 336 """Returns the task request corresponding to a task ID.""" |
301 logging.info('%s', request) | 337 logging.info('%s', request) |
302 request_obj, _ = get_request_and_result(request.task_id) | 338 request_obj, _ = get_request_and_result(request.task_id) |
303 return message_conversion.task_request_to_rpc(request_obj) | 339 return message_conversion.task_request_to_rpc(request_obj) |
304 | 340 |
305 @gae_ts_mon.instrument_endpoint() | 341 @gae_ts_mon.instrument_endpoint() |
306 @auth.endpoints_method( | 342 @auth.endpoints_method( |
307 TaskId, swarming_rpcs.CancelResponse, | 343 TaskIdWithTransactionID, swarming_rpcs.CancelResponse, |
308 name='cancel', | 344 name='cancel', |
309 path='{task_id}/cancel') | 345 path='{task_id}/cancel') |
310 @auth.require(acl.is_bot_or_user) | 346 @auth.require(acl.is_bot_or_user) |
311 def cancel(self, request): | 347 def cancel(self, request): |
312 """Cancels a task. | 348 """Cancels a task. |
313 | 349 |
314 If a bot was running the task, the bot will forcibly cancel the task. | 350 If a bot was running the task, the bot will forcibly cancel the task. |
315 """ | 351 """ |
316 logging.info('%s', request) | 352 logging.info('%s', request) |
317 request_obj, result = get_request_and_result(request.task_id) | 353 if request.task_id and request.transaction_id: |
354 raise endpoints.BadRequestException( | |
355 'only one of task_id or transaction_id must be specified') | |
M-A Ruel
2017/05/03 01:33:20
That can't work because path is '{task_id}/cancel'
nodir
2017/05/05 18:11:38
ugh, should I define a new API then? cancel_by_tra
M-A Ruel
2017/05/05 18:15:33
Yes I prefer a new API. If we get rid of the featu
nodir
2017/05/05 18:41:37
Done.
| |
356 if request.task_id: | |
357 request_obj, result = get_request_and_result(request.task_id) | |
358 else: | |
359 request_obj, result = get_request_and_result_by_transaction_id( | |
360 request.transaction_id) | |
361 if not request_obj or not result: | |
362 raise endpoints.NotFoundException( | |
363 'request for transaction %s not found.' % request.transaction_id) | |
364 | |
318 ok, was_running = task_scheduler.cancel_task(request_obj, result.key) | 365 ok, was_running = task_scheduler.cancel_task(request_obj, result.key) |
319 return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running) | 366 return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running) |
320 | 367 |
321 @gae_ts_mon.instrument_endpoint() | 368 @gae_ts_mon.instrument_endpoint() |
322 @auth.endpoints_method( | 369 @auth.endpoints_method( |
323 TaskId, swarming_rpcs.TaskOutput, | 370 TaskId, swarming_rpcs.TaskOutput, |
324 name='stdout', | 371 name='stdout', |
325 path='{task_id}/stdout', | 372 path='{task_id}/stdout', |
326 http_method='GET') | 373 http_method='GET') |
327 @auth.require(acl.is_bot_or_user) | 374 @auth.require(acl.is_bot_or_user) |
(...skipping 21 matching lines...) Expand all Loading... | |
349 def new(self, request): | 396 def new(self, request): |
350 """Creates a new task. | 397 """Creates a new task. |
351 | 398 |
352 The task will be enqueued in the tasks list and will be executed at the | 399 The task will be enqueued in the tasks list and will be executed at the |
353 earliest opportunity by a bot that has at least the dimensions as described | 400 earliest opportunity by a bot that has at least the dimensions as described |
354 in the task request. | 401 in the task request. |
355 """ | 402 """ |
356 sb = (request.properties.secret_bytes | 403 sb = (request.properties.secret_bytes |
357 if request.properties is not None else None) | 404 if request.properties is not None else None) |
358 if sb is not None: | 405 if sb is not None: |
359 request.properties.secret_bytes = "HIDDEN" | 406 request.properties.secret_bytes = 'HIDDEN' |
360 logging.info('%s', request) | 407 logging.info('%s', request) |
361 if sb is not None: | 408 if sb is not None: |
362 request.properties.secret_bytes = sb | 409 request.properties.secret_bytes = sb |
363 | 410 |
364 try: | 411 try: |
365 request, secret_bytes = message_conversion.new_task_request_from_rpc( | 412 request_obj, secret_bytes = message_conversion.new_task_request_from_rpc( |
366 request, utils.utcnow()) | 413 request, utils.utcnow()) |
367 apply_property_defaults(request.properties) | 414 apply_property_defaults(request_obj.properties) |
368 task_request.init_new_request( | 415 task_request.init_new_request( |
369 request, acl.can_schedule_high_priority_tasks(), secret_bytes) | 416 request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes) |
417 except (TypeError, ValueError) as e: | |
418 raise endpoints.BadRequestException(e.message) | |
370 | 419 |
371 result_summary = task_scheduler.schedule_request(request, secret_bytes) | 420 # Check if a request was already created with the same transaction. |
372 except (datastore_errors.BadValueError, TypeError, ValueError) as e: | 421 transaction_memcache_key = None |
422 if request.transaction_id: | |
423 transaction_memcache_key = transaction_id_to_memcache_key( | |
424 request.transaction_id) | |
425 txn_request_obj, txn_result_summary = ( | |
426 get_request_and_result_by_transaction_id(request.transaction_id)) | |
427 if txn_request_obj and txn_result_summary: | |
428 todict = lambda r: r.to_dict(['created_ts', 'expiration_ts']) | |
429 if todict(txn_request_obj) != todict(request_obj): | |
430 raise endpoints.BadRequestException( | |
431 'A request with different parameters was created for ' | |
432 'transaction %s. Send the same tasks.new request ' | |
433 'within the same transaction!' % request.transaction_id) | |
434 task_id = task_pack.pack_result_summary_key(txn_result_summary.key) | |
435 logging.info( | |
436 'reusing existing %s for transaction %s', | |
437 task_id, request.transaction_id) | |
438 return swarming_rpcs.TaskRequestMetadata( | |
439 request=message_conversion.task_request_to_rpc(txn_request_obj), | |
440 task_id=task_id, | |
441 task_result=message_conversion.task_result_to_rpc( | |
442 txn_result_summary, False)) | |
443 | |
444 try: | |
445 result_summary = task_scheduler.schedule_request( | |
446 request_obj, secret_bytes, | |
447 memcache_key_for_task_id=transaction_memcache_key) | |
448 except datastore_errors.BadValueError as e: | |
373 raise endpoints.BadRequestException(e.message) | 449 raise endpoints.BadRequestException(e.message) |
374 | 450 |
375 previous_result = None | 451 previous_result = None |
376 if result_summary.deduped_from: | 452 if result_summary.deduped_from: |
377 previous_result = message_conversion.task_result_to_rpc( | 453 previous_result = message_conversion.task_result_to_rpc( |
378 result_summary, False) | 454 result_summary, False) |
379 | 455 |
380 return swarming_rpcs.TaskRequestMetadata( | 456 return swarming_rpcs.TaskRequestMetadata( |
381 request=message_conversion.task_request_to_rpc(request), | 457 request=message_conversion.task_request_to_rpc(request_obj), |
382 task_id=task_pack.pack_result_summary_key(result_summary.key), | 458 task_id=task_pack.pack_result_summary_key(result_summary.key), |
383 task_result=previous_result) | 459 task_result=previous_result) |
384 | 460 |
385 @gae_ts_mon.instrument_endpoint() | 461 @gae_ts_mon.instrument_endpoint() |
386 @auth.endpoints_method( | 462 @auth.endpoints_method( |
387 swarming_rpcs.TasksRequest, swarming_rpcs.TaskList, | 463 swarming_rpcs.TasksRequest, swarming_rpcs.TaskList, |
388 http_method='GET') | 464 http_method='GET') |
389 @auth.require(acl.is_privileged_user) | 465 @auth.require(acl.is_privileged_user) |
390 def list(self, request): | 466 def list(self, request): |
391 """Returns tasks results based on the filters. | 467 """Returns tasks results based on the filters. |
(...skipping 442 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
834 def get_routes(): | 910 def get_routes(): |
835 return ( | 911 return ( |
836 endpoints_webapp2.api_routes(SwarmingServerService) + | 912 endpoints_webapp2.api_routes(SwarmingServerService) + |
837 endpoints_webapp2.api_routes(SwarmingTaskService) + | 913 endpoints_webapp2.api_routes(SwarmingTaskService) + |
838 endpoints_webapp2.api_routes(SwarmingTasksService) + | 914 endpoints_webapp2.api_routes(SwarmingTasksService) + |
839 endpoints_webapp2.api_routes(SwarmingBotService) + | 915 endpoints_webapp2.api_routes(SwarmingBotService) + |
840 endpoints_webapp2.api_routes(SwarmingBotsService) + | 916 endpoints_webapp2.api_routes(SwarmingBotsService) + |
841 # components.config endpoints for validation and configuring of luci-config | 917 # components.config endpoints for validation and configuring of luci-config |
842 # service URL. | 918 # service URL. |
843 endpoints_webapp2.api_routes(config.ConfigApi)) | 919 endpoints_webapp2.api_routes(config.ConfigApi)) |
OLD | NEW |