Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(469)

Side by Side Diff: appengine/swarming/handlers_endpoints.py

Issue 2856733002: swarming: add transaction_id to tasks.new request
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """This module defines Swarming Server endpoints handlers.""" 5 """This module defines Swarming Server endpoints handlers."""
6 6
7 import logging 7 import logging
8 import os 8 import os
9 9
10 from google.appengine.api import datastore_errors 10 from google.appengine.api import datastore_errors
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
43 _old_decode_field = protojson.ProtoJson.decode_field 43 _old_decode_field = protojson.ProtoJson.decode_field
44 def _decode_field(self, field, value): 44 def _decode_field(self, field, value):
45 if (isinstance(field, messages.BooleanField) and 45 if (isinstance(field, messages.BooleanField) and
46 isinstance(value, basestring)): 46 isinstance(value, basestring)):
47 return value.lower() == 'true' 47 return value.lower() == 'true'
48 return _old_decode_field(self, field, value) 48 return _old_decode_field(self, field, value)
49 protojson.ProtoJson.decode_field = _decode_field 49 protojson.ProtoJson.decode_field = _decode_field
50 50
51 51
52 def get_request_and_result(task_id): 52 def get_request_and_result(task_id):
53 """Provides the key and TaskRequest corresponding to a task ID. 53 """Provides the TaskRequest and result corresponding to a task ID.
54 54
55 Enforces the ACL for users. Allows bots all access for the moment. 55 Enforces the ACL for users. Allows bots all access for the moment.
56 56
57 Returns: 57 Returns:
58 tuple(TaskRequest, result): result can be either for a TaskRunResult or a 58 tuple(TaskRequest, result): result can be either for a TaskRunResult or a
59 TaskResultSummay. 59 TaskResultSummary.
60 """ 60 """
61 try: 61 try:
62 request_key, result_key = task_pack.get_request_and_result_keys(task_id) 62 request_key, result_key = task_pack.get_request_and_result_keys(task_id)
63 request, result = ndb.get_multi((request_key, result_key)) 63 request, result = ndb.get_multi((request_key, result_key))
64 if not request or not result: 64 if not request or not result:
65 raise endpoints.NotFoundException('%s not found.' % task_id) 65 raise endpoints.NotFoundException('%s not found.' % task_id)
66 if not acl.is_bot() and not request.has_access: 66 if not acl.is_bot() and not request.has_access:
67 raise endpoints.ForbiddenException('%s is not accessible.' % task_id) 67 raise endpoints.ForbiddenException('%s is not accessible.' % task_id)
68 return request, result 68 return request, result
69 except ValueError: 69 except ValueError:
70 raise endpoints.BadRequestException('%s is an invalid key.' % task_id) 70 raise endpoints.BadRequestException('%s is an invalid key.' % task_id)
71 71
72 72
73 def get_request_and_result_by_transaction_id(transaction_id):
74 """Provides the TaskRequest and result corresponding to a transaction ID.
75
76 Enforces the ACL for users. Allows bots all access for the moment.
77
78 May return (None, None) if the transaction, request or result is not found.
79
80 Returns:
81 tuple(TaskRequest, result): result can be either for a TaskRunResult or a
82 TaskResultSummary.
83 """
84 task_id = memcache.get(transaction_id_to_memcache_key(transaction_id))
M-A Ruel 2017/05/03 01:33:20 use a namespace, it's simpler. Then no need for tr
nodir 2017/05/05 18:11:38 Done.
85 if not task_id:
86 return None, None
M-A Ruel 2017/05/03 01:33:20 This makes the contract a bit scary though. :/ I m
nodir 2017/05/05 18:11:38 maybe set timeout to 1min?
87 request_key, result_key = task_pack.get_request_and_result_keys(task_id)
88 request, result = ndb.get_multi((request_key, result_key))
89 if not request or not result:
90 return None, None
91 if not acl.is_bot() and not request.has_access:
92 raise endpoints.ForbiddenException(
93 '%s, created for transaction %s, is not accessible.' %
94 (task_id, transaction_id))
95 return request, result
96
97
98 def transaction_id_to_memcache_key(transaction_id):
99 assert transaction_id
100 return 'tasks.new/txn/%s' % transaction_id
101
102
73 def get_or_raise(key): 103 def get_or_raise(key):
74 """Returns an entity or raises an endpoints exception if it does not exist.""" 104 """Returns an entity or raises an endpoints exception if it does not exist."""
75 result = key.get() 105 result = key.get()
76 if not result: 106 if not result:
77 raise endpoints.NotFoundException('%s not found.' % key.id()) 107 raise endpoints.NotFoundException('%s not found.' % key.id())
78 return result 108 return result
79 109
80 110
81 def apply_property_defaults(properties): 111 def apply_property_defaults(properties):
82 """Fills ndb task properties with default values read from server settings.""" 112 """Fills ndb task properties with default values read from server settings."""
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after
252 who=obj.who.to_bytes() if obj.who else None, 282 who=obj.who.to_bytes() if obj.who else None,
253 when=obj.created_ts, 283 when=obj.created_ts,
254 version=str(obj.version)) 284 version=str(obj.version))
255 285
256 286
257 TaskId = endpoints.ResourceContainer( 287 TaskId = endpoints.ResourceContainer(
258 message_types.VoidMessage, 288 message_types.VoidMessage,
259 task_id=messages.StringField(1, required=True)) 289 task_id=messages.StringField(1, required=True))
260 290
261 291
292 TaskIdWithTransactionID = endpoints.ResourceContainer(
293 message_types.VoidMessage,
294 task_id=messages.StringField(1),
295 transaction_id=messages.StringField(2))
296
297
262 TaskIdWithPerf = endpoints.ResourceContainer( 298 TaskIdWithPerf = endpoints.ResourceContainer(
263 message_types.VoidMessage, 299 message_types.VoidMessage,
264 task_id=messages.StringField(1, required=True), 300 task_id=messages.StringField(1, required=True),
265 include_performance_stats=messages.BooleanField(2, default=False)) 301 include_performance_stats=messages.BooleanField(2, default=False))
266 302
267 303
268 @swarming_api.api_class(resource_name='task', path='task') 304 @swarming_api.api_class(resource_name='task', path='task')
269 class SwarmingTaskService(remote.Service): 305 class SwarmingTaskService(remote.Service):
270 """Swarming's task-related API.""" 306 """Swarming's task-related API."""
271 @gae_ts_mon.instrument_endpoint() 307 @gae_ts_mon.instrument_endpoint()
(...skipping 25 matching lines...) Expand all
297 http_method='GET') 333 http_method='GET')
298 @auth.require(acl.is_bot_or_user) 334 @auth.require(acl.is_bot_or_user)
299 def request(self, request): 335 def request(self, request):
300 """Returns the task request corresponding to a task ID.""" 336 """Returns the task request corresponding to a task ID."""
301 logging.info('%s', request) 337 logging.info('%s', request)
302 request_obj, _ = get_request_and_result(request.task_id) 338 request_obj, _ = get_request_and_result(request.task_id)
303 return message_conversion.task_request_to_rpc(request_obj) 339 return message_conversion.task_request_to_rpc(request_obj)
304 340
305 @gae_ts_mon.instrument_endpoint() 341 @gae_ts_mon.instrument_endpoint()
306 @auth.endpoints_method( 342 @auth.endpoints_method(
307 TaskId, swarming_rpcs.CancelResponse, 343 TaskIdWithTransactionID, swarming_rpcs.CancelResponse,
308 name='cancel', 344 name='cancel',
309 path='{task_id}/cancel') 345 path='{task_id}/cancel')
310 @auth.require(acl.is_bot_or_user) 346 @auth.require(acl.is_bot_or_user)
311 def cancel(self, request): 347 def cancel(self, request):
312 """Cancels a task. 348 """Cancels a task.
313 349
314 If a bot was running the task, the bot will forcibly cancel the task. 350 If a bot was running the task, the bot will forcibly cancel the task.
315 """ 351 """
316 logging.info('%s', request) 352 logging.info('%s', request)
317 request_obj, result = get_request_and_result(request.task_id) 353 if request.task_id and request.transaction_id:
354 raise endpoints.BadRequestException(
355 'only one of task_id or transaction_id must be specified')
M-A Ruel 2017/05/03 01:33:20 That can't work because path is '{task_id}/cancel'
nodir 2017/05/05 18:11:38 ugh, should I define a new API then? cancel_by_tra
M-A Ruel 2017/05/05 18:15:33 Yes I prefer a new API. If we get rid of the featu
nodir 2017/05/05 18:41:37 Done.
356 if request.task_id:
357 request_obj, result = get_request_and_result(request.task_id)
358 else:
359 request_obj, result = get_request_and_result_by_transaction_id(
360 request.transaction_id)
361 if not request_obj or not result:
362 raise endpoints.NotFoundException(
363 'request for transaction %s not found.' % request.transaction_id)
364
318 ok, was_running = task_scheduler.cancel_task(request_obj, result.key) 365 ok, was_running = task_scheduler.cancel_task(request_obj, result.key)
319 return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running) 366 return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running)
320 367
321 @gae_ts_mon.instrument_endpoint() 368 @gae_ts_mon.instrument_endpoint()
322 @auth.endpoints_method( 369 @auth.endpoints_method(
323 TaskId, swarming_rpcs.TaskOutput, 370 TaskId, swarming_rpcs.TaskOutput,
324 name='stdout', 371 name='stdout',
325 path='{task_id}/stdout', 372 path='{task_id}/stdout',
326 http_method='GET') 373 http_method='GET')
327 @auth.require(acl.is_bot_or_user) 374 @auth.require(acl.is_bot_or_user)
(...skipping 21 matching lines...) Expand all
349 def new(self, request): 396 def new(self, request):
350 """Creates a new task. 397 """Creates a new task.
351 398
352 The task will be enqueued in the tasks list and will be executed at the 399 The task will be enqueued in the tasks list and will be executed at the
353 earliest opportunity by a bot that has at least the dimensions as described 400 earliest opportunity by a bot that has at least the dimensions as described
354 in the task request. 401 in the task request.
355 """ 402 """
356 sb = (request.properties.secret_bytes 403 sb = (request.properties.secret_bytes
357 if request.properties is not None else None) 404 if request.properties is not None else None)
358 if sb is not None: 405 if sb is not None:
359 request.properties.secret_bytes = "HIDDEN" 406 request.properties.secret_bytes = 'HIDDEN'
360 logging.info('%s', request) 407 logging.info('%s', request)
361 if sb is not None: 408 if sb is not None:
362 request.properties.secret_bytes = sb 409 request.properties.secret_bytes = sb
363 410
364 try: 411 try:
365 request, secret_bytes = message_conversion.new_task_request_from_rpc( 412 request_obj, secret_bytes = message_conversion.new_task_request_from_rpc(
366 request, utils.utcnow()) 413 request, utils.utcnow())
367 apply_property_defaults(request.properties) 414 apply_property_defaults(request_obj.properties)
368 task_request.init_new_request( 415 task_request.init_new_request(
369 request, acl.can_schedule_high_priority_tasks(), secret_bytes) 416 request_obj, acl.can_schedule_high_priority_tasks(), secret_bytes)
417 except (TypeError, ValueError) as e:
418 raise endpoints.BadRequestException(e.message)
370 419
371 result_summary = task_scheduler.schedule_request(request, secret_bytes) 420 # Check if a request was already created with the same transaction.
372 except (datastore_errors.BadValueError, TypeError, ValueError) as e: 421 transaction_memcache_key = None
422 if request.transaction_id:
423 transaction_memcache_key = transaction_id_to_memcache_key(
424 request.transaction_id)
425 txn_request_obj, txn_result_summary = (
426 get_request_and_result_by_transaction_id(request.transaction_id))
427 if txn_request_obj and txn_result_summary:
428 todict = lambda r: r.to_dict(['created_ts', 'expiration_ts'])
429 if todict(txn_request_obj) != todict(request_obj):
430 raise endpoints.BadRequestException(
431 'A request with different parameters was created for '
432 'transaction %s. Send the same tasks.new request '
433 'within the same transaction!' % request.transaction_id)
434 task_id = task_pack.pack_result_summary_key(txn_result_summary.key)
435 logging.info(
436 'reusing existing %s for transaction %s',
437 task_id, request.transaction_id)
438 return swarming_rpcs.TaskRequestMetadata(
439 request=message_conversion.task_request_to_rpc(txn_request_obj),
440 task_id=task_id,
441 task_result=message_conversion.task_result_to_rpc(
442 txn_result_summary, False))
443
444 try:
445 result_summary = task_scheduler.schedule_request(
446 request_obj, secret_bytes,
447 memcache_key_for_task_id=transaction_memcache_key)
448 except datastore_errors.BadValueError as e:
373 raise endpoints.BadRequestException(e.message) 449 raise endpoints.BadRequestException(e.message)
374 450
375 previous_result = None 451 previous_result = None
376 if result_summary.deduped_from: 452 if result_summary.deduped_from:
377 previous_result = message_conversion.task_result_to_rpc( 453 previous_result = message_conversion.task_result_to_rpc(
378 result_summary, False) 454 result_summary, False)
379 455
380 return swarming_rpcs.TaskRequestMetadata( 456 return swarming_rpcs.TaskRequestMetadata(
381 request=message_conversion.task_request_to_rpc(request), 457 request=message_conversion.task_request_to_rpc(request_obj),
382 task_id=task_pack.pack_result_summary_key(result_summary.key), 458 task_id=task_pack.pack_result_summary_key(result_summary.key),
383 task_result=previous_result) 459 task_result=previous_result)
384 460
385 @gae_ts_mon.instrument_endpoint() 461 @gae_ts_mon.instrument_endpoint()
386 @auth.endpoints_method( 462 @auth.endpoints_method(
387 swarming_rpcs.TasksRequest, swarming_rpcs.TaskList, 463 swarming_rpcs.TasksRequest, swarming_rpcs.TaskList,
388 http_method='GET') 464 http_method='GET')
389 @auth.require(acl.is_privileged_user) 465 @auth.require(acl.is_privileged_user)
390 def list(self, request): 466 def list(self, request):
391 """Returns tasks results based on the filters. 467 """Returns tasks results based on the filters.
(...skipping 442 matching lines...) Expand 10 before | Expand all | Expand 10 after
834 def get_routes(): 910 def get_routes():
835 return ( 911 return (
836 endpoints_webapp2.api_routes(SwarmingServerService) + 912 endpoints_webapp2.api_routes(SwarmingServerService) +
837 endpoints_webapp2.api_routes(SwarmingTaskService) + 913 endpoints_webapp2.api_routes(SwarmingTaskService) +
838 endpoints_webapp2.api_routes(SwarmingTasksService) + 914 endpoints_webapp2.api_routes(SwarmingTasksService) +
839 endpoints_webapp2.api_routes(SwarmingBotService) + 915 endpoints_webapp2.api_routes(SwarmingBotService) +
840 endpoints_webapp2.api_routes(SwarmingBotsService) + 916 endpoints_webapp2.api_routes(SwarmingBotsService) +
841 # components.config endpoints for validation and configuring of luci-config 917 # components.config endpoints for validation and configuring of luci-config
842 # service URL. 918 # service URL.
843 endpoints_webapp2.api_routes(config.ConfigApi)) 919 endpoints_webapp2.api_routes(config.ConfigApi))
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698