Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(578)

Side by Side Diff: appengine/swarming/handlers_endpoints.py

Issue 2856733002: swarming: add transaction_id to tasks.new request
Patch Set: nits Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """This module defines Swarming Server endpoints handlers.""" 5 """This module defines Swarming Server endpoints handlers."""
6 6
7 import logging 7 import logging
8 import os 8 import os
9 9
10 from google.appengine.api import datastore_errors 10 from google.appengine.api import datastore_errors
(...skipping 18 matching lines...) Expand all
29 from server import acl 29 from server import acl
30 from server import bot_code 30 from server import bot_code
31 from server import bot_management 31 from server import bot_management
32 from server import config 32 from server import config
33 from server import task_pack 33 from server import task_pack
34 from server import task_request 34 from server import task_request
35 from server import task_result 35 from server import task_result
36 from server import task_scheduler 36 from server import task_scheduler
37 37
38 38
39 TRANSACTION_ID_MEMCACHE_NAMESPACE = 'tasks.new/transaction_id'
40
41
39 ### Helper Methods 42 ### Helper Methods
40 43
41 44
42 # Add support for BooleanField in protorpc in endpoints GET requests. 45 # Add support for BooleanField in protorpc in endpoints GET requests.
43 _old_decode_field = protojson.ProtoJson.decode_field 46 _old_decode_field = protojson.ProtoJson.decode_field
44 def _decode_field(self, field, value): 47 def _decode_field(self, field, value):
45 if (isinstance(field, messages.BooleanField) and 48 if (isinstance(field, messages.BooleanField) and
46 isinstance(value, basestring)): 49 isinstance(value, basestring)):
47 return value.lower() == 'true' 50 return value.lower() == 'true'
48 return _old_decode_field(self, field, value) 51 return _old_decode_field(self, field, value)
49 protojson.ProtoJson.decode_field = _decode_field 52 protojson.ProtoJson.decode_field = _decode_field
50 53
51 54
52 def get_request_and_result(task_id): 55 def get_request_and_result(task_id):
53 """Provides the key and TaskRequest corresponding to a task ID. 56 """Provides the TaskRequest and result corresponding to a task ID.
54 57
55 Enforces the ACL for users. Allows bots all access for the moment. 58 Enforces the ACL for users. Allows bots all access for the moment.
56 59
57 Returns: 60 Returns:
58 tuple(TaskRequest, result): result can be either for a TaskRunResult or a 61 tuple(TaskRequest, result): result can be either for a TaskRunResult or a
59 TaskResultSummay. 62 TaskResultSummary.
60 """ 63 """
61 try: 64 try:
62 request_key, result_key = task_pack.get_request_and_result_keys(task_id) 65 request_key, result_key = task_pack.get_request_and_result_keys(task_id)
63 request, result = ndb.get_multi((request_key, result_key)) 66 request, result = ndb.get_multi((request_key, result_key))
64 if not request or not result: 67 if not request or not result:
65 raise endpoints.NotFoundException('%s not found.' % task_id) 68 raise endpoints.NotFoundException('%s not found.' % task_id)
66 if not acl.is_bot() and not request.has_access: 69 if not acl.is_bot() and not request.has_access:
67 raise endpoints.ForbiddenException('%s is not accessible.' % task_id) 70 raise endpoints.ForbiddenException('%s is not accessible.' % task_id)
68 return request, result 71 return request, result
69 except ValueError: 72 except ValueError:
70 raise endpoints.BadRequestException('%s is an invalid key.' % task_id) 73 raise endpoints.BadRequestException('%s is an invalid key.' % task_id)
71 74
72 75
76 def get_request_and_result_by_transaction_id(transaction_id):
77 """Provides the TaskRequest and result corresponding to a transaction ID.
78
79 Enforces the ACL for users. Allows bots all access for the moment.
80
81 May return (None, None) if the transaction, request or result is not found.
82
83 Returns:
84 tuple(TaskRequest, result): result can be either for a TaskRunResult or a
85 TaskResultSummary.
86 """
87 task_id = memcache.get(
88 transaction_id, namespace=TRANSACTION_ID_MEMCACHE_NAMESPACE)
89 if not task_id:
90 return None, None
91 request_key, result_key = task_pack.get_request_and_result_keys(task_id)
92 request, result = ndb.get_multi((request_key, result_key))
93 if not request or not result:
94 return None, None
95 if not acl.is_bot() and not request.has_access:
96 raise endpoints.ForbiddenException(
97 '%s, created for transaction %s, is not accessible.' %
98 (task_id, transaction_id))
99 return request, result
100
101
73 def get_or_raise(key): 102 def get_or_raise(key):
74 """Returns an entity or raises an endpoints exception if it does not exist.""" 103 """Returns an entity or raises an endpoints exception if it does not exist."""
75 result = key.get() 104 result = key.get()
76 if not result: 105 if not result:
77 raise endpoints.NotFoundException('%s not found.' % key.id()) 106 raise endpoints.NotFoundException('%s not found.' % key.id())
78 return result 107 return result
79 108
80 109
81 def apply_property_defaults(properties): 110 def apply_property_defaults(properties):
82 """Fills ndb task properties with default values read from server settings.""" 111 """Fills ndb task properties with default values read from server settings."""
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after
252 who=obj.who.to_bytes() if obj.who else None, 281 who=obj.who.to_bytes() if obj.who else None,
253 when=obj.created_ts, 282 when=obj.created_ts,
254 version=str(obj.version)) 283 version=str(obj.version))
255 284
256 285
257 TaskId = endpoints.ResourceContainer( 286 TaskId = endpoints.ResourceContainer(
258 message_types.VoidMessage, 287 message_types.VoidMessage,
259 task_id=messages.StringField(1, required=True)) 288 task_id=messages.StringField(1, required=True))
260 289
261 290
291 TransactionId = endpoints.ResourceContainer(
292 message_types.VoidMessage,
293 transaction_id=messages.StringField(1))
294
295
262 TaskIdWithPerf = endpoints.ResourceContainer( 296 TaskIdWithPerf = endpoints.ResourceContainer(
263 message_types.VoidMessage, 297 message_types.VoidMessage,
264 task_id=messages.StringField(1, required=True), 298 task_id=messages.StringField(1, required=True),
265 include_performance_stats=messages.BooleanField(2, default=False)) 299 include_performance_stats=messages.BooleanField(2, default=False))
266 300
267 301
268 @swarming_api.api_class(resource_name='task', path='task') 302 @swarming_api.api_class(resource_name='task', path='task')
269 class SwarmingTaskService(remote.Service): 303 class SwarmingTaskService(remote.Service):
270 """Swarming's task-related API.""" 304 """Swarming's task-related API."""
271 @gae_ts_mon.instrument_endpoint() 305 @gae_ts_mon.instrument_endpoint()
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
349 def new(self, request): 383 def new(self, request):
350 """Creates a new task. 384 """Creates a new task.
351 385
352 The task will be enqueued in the tasks list and will be executed at the 386 The task will be enqueued in the tasks list and will be executed at the
353 earliest opportunity by a bot that has at least the dimensions as described 387 earliest opportunity by a bot that has at least the dimensions as described
354 in the task request. 388 in the task request.
355 """ 389 """
356 sb = (request.properties.secret_bytes 390 sb = (request.properties.secret_bytes
357 if request.properties is not None else None) 391 if request.properties is not None else None)
358 if sb is not None: 392 if sb is not None:
359 request.properties.secret_bytes = "HIDDEN" 393 request.properties.secret_bytes = 'HIDDEN'
360 logging.info('%s', request) 394 logging.info('%s', request)
361 if sb is not None: 395 if sb is not None:
362 request.properties.secret_bytes = sb 396 request.properties.secret_bytes = sb
363 397
364 try: 398 try:
365 request, secret_bytes = message_conversion.new_task_request_from_rpc( 399 request_obj, secret_bytes = message_conversion.new_task_request_from_rpc(
366 request, utils.utcnow()) 400 request, utils.utcnow())
367 apply_property_defaults(request.properties) 401 apply_property_defaults(request_obj.properties)
368 task_request.init_new_request( 402 task_request.init_new_request(
369 request, acl.can_schedule_high_priority_tasks(), secret_bytes) 403 request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes)
404 except (TypeError, ValueError) as e:
405 raise endpoints.BadRequestException(e.message)
370 406
371 result_summary = task_scheduler.schedule_request(request, secret_bytes) 407 # Check if a request was already created with the same transaction.
372 except (datastore_errors.BadValueError, TypeError, ValueError) as e: 408 if request.transaction_id:
409 txn_request_obj, txn_result_summary = (
410 get_request_and_result_by_transaction_id(request.transaction_id))
411 if txn_request_obj and txn_result_summary:
412 todict = lambda r: r.to_dict(['created_ts', 'expiration_ts'])
413 if todict(txn_request_obj) != todict(request_obj):
414 raise endpoints.BadRequestException(
415 'A request with different parameters was created for '
416 'transaction %s. Send the same tasks.new request '
417 'within the same transaction!' % request.transaction_id)
418 task_id = task_pack.pack_result_summary_key(txn_result_summary.key)
419 logging.info(
420 'reusing existing %s for transaction %s',
421 task_id, request.transaction_id)
422 return swarming_rpcs.TaskRequestMetadata(
423 request=message_conversion.task_request_to_rpc(txn_request_obj),
424 task_id=task_id,
425 task_result=message_conversion.task_result_to_rpc(
426 txn_result_summary, False))
427
428 try:
429 result_summary = task_scheduler.schedule_request(
430 request_obj, secret_bytes,
431 memcache_key_for_task_id=request.transaction_id,
432 memcache_ns_for_task_id=TRANSACTION_ID_MEMCACHE_NAMESPACE)
433 except datastore_errors.BadValueError as e:
373 raise endpoints.BadRequestException(e.message) 434 raise endpoints.BadRequestException(e.message)
374 435
375 previous_result = None 436 previous_result = None
376 if result_summary.deduped_from: 437 if result_summary.deduped_from:
377 previous_result = message_conversion.task_result_to_rpc( 438 previous_result = message_conversion.task_result_to_rpc(
378 result_summary, False) 439 result_summary, False)
379 440
380 return swarming_rpcs.TaskRequestMetadata( 441 return swarming_rpcs.TaskRequestMetadata(
381 request=message_conversion.task_request_to_rpc(request), 442 request=message_conversion.task_request_to_rpc(request_obj),
382 task_id=task_pack.pack_result_summary_key(result_summary.key), 443 task_id=task_pack.pack_result_summary_key(result_summary.key),
383 task_result=previous_result) 444 task_result=previous_result)
384 445
385 @gae_ts_mon.instrument_endpoint() 446 @gae_ts_mon.instrument_endpoint()
386 @auth.endpoints_method( 447 @auth.endpoints_method(
387 swarming_rpcs.TasksRequest, swarming_rpcs.TaskList, 448 swarming_rpcs.TasksRequest, swarming_rpcs.TaskList,
388 http_method='GET') 449 http_method='GET')
389 @auth.require(acl.is_privileged_user) 450 @auth.require(acl.is_privileged_user)
390 def list(self, request): 451 def list(self, request):
391 """Returns tasks results based on the filters. 452 """Returns tasks results based on the filters.
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
496 else: 557 else:
497 logging.info('No tasks to cancel.') 558 logging.info('No tasks to cancel.')
498 559
499 return swarming_rpcs.TasksCancelResponse( 560 return swarming_rpcs.TasksCancelResponse(
500 cursor=cursor, 561 cursor=cursor,
501 matched=len(tasks), 562 matched=len(tasks),
502 now=now) 563 now=now)
503 564
504 @gae_ts_mon.instrument_endpoint() 565 @gae_ts_mon.instrument_endpoint()
505 @auth.endpoints_method( 566 @auth.endpoints_method(
567 TransactionId, swarming_rpcs.CancelResponse,
568 name='cancel_by_transaction_id',
569 path='cancel_by_transaction_id/{transaction_id}')
570 @auth.require(acl.is_bot_or_user)
571 def cancel_by_transaction_id(self, request):
572 """Cancels a task by a transaction ID.
573
574 Useful when id of a new task is unknown because the request failed.
575 The transaction ID here is the one supplied by the user when creating the
576 task.
577
578 If a bot was running the task, the bot will forcibly cancel the task.
579 """
580 logging.info('%s', request)
581
582 request_obj, result = get_request_and_result_by_transaction_id(
583 request.transaction_id)
584 if not request_obj or not result:
585 raise endpoints.NotFoundException(
586 'request for transaction %s not found.' % request.transaction_id)
587
588 ok, was_running = task_scheduler.cancel_task(request_obj, result.key)
589 return swarming_rpcs.CancelResponse(
590 task_id=request_obj.task_id,
591 ok=ok,
592 was_running=was_running)
593
594 @gae_ts_mon.instrument_endpoint()
595 @auth.endpoints_method(
506 swarming_rpcs.TasksCountRequest, swarming_rpcs.TasksCount, 596 swarming_rpcs.TasksCountRequest, swarming_rpcs.TasksCount,
507 http_method='GET') 597 http_method='GET')
508 @auth.require(acl.is_privileged_user) 598 @auth.require(acl.is_privileged_user)
509 def count(self, request): 599 def count(self, request):
510 """Counts number of tasks in a given state.""" 600 """Counts number of tasks in a given state."""
511 logging.info('%s', request) 601 logging.info('%s', request)
512 if not request.start: 602 if not request.start:
513 raise endpoints.BadRequestException('start (as epoch) is required') 603 raise endpoints.BadRequestException('start (as epoch) is required')
514 now = utils.utcnow() 604 now = utils.utcnow()
515 mem_key = self._memcache_key(request, now) 605 mem_key = self._memcache_key(request, now)
(...skipping 318 matching lines...) Expand 10 before | Expand all | Expand 10 after
834 def get_routes(): 924 def get_routes():
835 return ( 925 return (
836 endpoints_webapp2.api_routes(SwarmingServerService) + 926 endpoints_webapp2.api_routes(SwarmingServerService) +
837 endpoints_webapp2.api_routes(SwarmingTaskService) + 927 endpoints_webapp2.api_routes(SwarmingTaskService) +
838 endpoints_webapp2.api_routes(SwarmingTasksService) + 928 endpoints_webapp2.api_routes(SwarmingTasksService) +
839 endpoints_webapp2.api_routes(SwarmingBotService) + 929 endpoints_webapp2.api_routes(SwarmingBotService) +
840 endpoints_webapp2.api_routes(SwarmingBotsService) + 930 endpoints_webapp2.api_routes(SwarmingBotsService) +
841 # components.config endpoints for validation and configuring of luci-config 931 # components.config endpoints for validation and configuring of luci-config
842 # service URL. 932 # service URL.
843 endpoints_webapp2.api_routes(config.ConfigApi)) 933 endpoints_webapp2.api_routes(config.ConfigApi))
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698