Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(369)

Side by Side Diff: appengine/swarming/server/task_scheduler.py

Issue 2298053002: Add more safety checks. (Closed)
Patch Set: Refactored searching code into function for readability Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/swarming/server/task_result.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2014 The LUCI Authors. All rights reserved. 1 # Copyright 2014 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """High level tasks execution scheduling API. 5 """High level tasks execution scheduling API.
6 6
7 This is the interface closest to the HTTP handlers. 7 This is the interface closest to the HTTP handlers.
8 """ 8 """
9 9
10 import datetime 10 import datetime
(...skipping 384 matching lines...) Expand 10 before | Expand all | Expand 10 after
395 k: dimension name. 395 k: dimension name.
396 v: dimension value. 396 v: dimension value.
397 """ 397 """
398 for e in dim_acls.entry: 398 for e in dim_acls.entry:
399 if '%s:%s' % (k, v) in e.dimension or '%s:*' % k in e.dimension: 399 if '%s:%s' % (k, v) in e.dimension or '%s:*' % k in e.dimension:
400 return auth.is_group_member(e.usable_by, ident) 400 return auth.is_group_member(e.usable_by, ident)
401 # A dimension not mentioned in 'dimension_acls' is allowed by default. 401 # A dimension not mentioned in 'dimension_acls' is allowed by default.
402 return True 402 return True
403 403
404 404
405 def _find_dupe_task(now, h):
406 """Finds a previously run task that is also idempotent and completed.
407
408 Fetch items that can be used to dedupe the task. See the comment for this
409 property for more details.
410
411 Do not use "task_result.TaskResultSummary.created_ts > oldest" here because
412 this would require a composite index. It's unnecessary because TaskRequest.key
413 is equivalent to decreasing TaskRequest.created_ts, ordering by key works as
414 well and doesn't require a composite index.
415 """
416 # TODO(maruel): Make a reverse map on successful task completion so this
417 # becomes a simple ndb.get().
418 cls = task_result.TaskResultSummary
419 q = cls.query(cls.properties_hash==h).order(cls.key)
420 for i, dupe_summary in enumerate(q.iter(batch_size=1)):
M-A Ruel 2016/08/31 20:26:30 Using batch_size=1 since the general case is to on
421 # It is possible for the query to return stale items.
422 if (dupe_summary.state != task_result.State.COMPLETED or
423 dupe_summary.failure):
424 if i == 2:
425 # Indexes are very inconsistent, give up.
426 return None
427 continue
428
429 # Refuse tasks older than X days. This is due to the isolate server
430 # dropping files.
431 # TODO(maruel): The value should be calculated from the isolate server
432 # setting and be unbounded when no isolated input was used.
433 oldest = now - datetime.timedelta(
434 seconds=config.settings().reusable_task_age_secs)
435 if dupe_summary.created_ts <= oldest:
436 return None
437 return dupe_summary
438 return None
439
440
405 ### Public API. 441 ### Public API.
406 442
407 443
408 def exponential_backoff(attempt_num): 444 def exponential_backoff(attempt_num):
409 """Returns an exponential backoff value in seconds.""" 445 """Returns an exponential backoff value in seconds."""
410 assert attempt_num >= 0 446 assert attempt_num >= 0
411 if random.random() < _PROBABILITY_OF_QUICK_COMEBACK: 447 if random.random() < _PROBABILITY_OF_QUICK_COMEBACK:
412 # Randomly ask the bot to return quickly. 448 # Randomly ask the bot to return quickly.
413 return 1.0 449 return 1.0
414 450
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
452 def get_new_keys(): 488 def get_new_keys():
453 # Warning: this assumes knowledge about the hierarchy of each entity. 489 # Warning: this assumes knowledge about the hierarchy of each entity.
454 key = task_request.new_request_key() 490 key = task_request.new_request_key()
455 task.key.parent = key 491 task.key.parent = key
456 old = result_summary.task_id 492 old = result_summary.task_id
457 result_summary.parent = key 493 result_summary.parent = key
458 logging.info('%s conflicted, using %s', old, result_summary.task_id) 494 logging.info('%s conflicted, using %s', old, result_summary.task_id)
459 return key 495 return key
460 496
461 deduped = False 497 deduped = False
462 stored = False
463 if request.properties.idempotent: 498 if request.properties.idempotent:
464 # Find a previously run task that is also idempotent and completed. Start a 499 dupe_summary = _find_dupe_task(now, request.properties.properties_hash)
465 # query to fetch items that can be used to dedupe the task. See the comment 500 if dupe_summary:
466 # for this property for more details. 501 # Setting task.queue_number to None removes it from the scheduling.
467 # 502 task.queue_number = None
468 # Do not use "cls.created_ts > oldest" here because this would require a 503 _copy_summary(
469 # composite index. It's unnecessary because TaskRequest.key is equivalent to 504 dupe_summary, result_summary,
470 # decreasing TaskRequest.created_ts, ordering by key works as well and 505 ('created_ts', 'modified_ts', 'name', 'user', 'tags'))
471 # doesn't require a composite index. 506 # Zap irrelevant properties. PerformanceStats is also not copied over,
472 cls = task_result.TaskResultSummary 507 # since it's not relevant.
473 h = request.properties.properties_hash 508 result_summary.properties_hash = None
509 result_summary.try_number = 0
510 result_summary.cost_saved_usd = result_summary.cost_usd
511 # Only zap after.
512 result_summary.costs_usd = []
513 result_summary.deduped_from = task_pack.pack_run_result_key(
514 dupe_summary.run_result_key)
515 # In this code path, there's not much to do as the task will not be run,
516 # previous results are returned. We still need to store all the entities
517 # correctly.
518 datastore_utils.insert(
519 request, get_new_keys, extra=[task, result_summary])
520 logging.debug(
521 'New request %s reusing %s', result_summary.task_id,
522 dupe_summary.task_id)
523 deduped = True
474 524
475 # TODO(maruel): Make a reverse map on successful task completion so this 525 if not deduped:
476 # becomes a simple ndb.get().
477 dupe_summary = cls.query(cls.properties_hash==h).order(cls.key).get()
478 if dupe_summary:
479 # Refuse tasks older than X days. This is due to the isolate server
480 # dropping files.
481 # TODO(maruel): The value should be calculated from the isolate server
482 # setting and be unbounded when no isolated input was used.
483 oldest = now - datetime.timedelta(
484 seconds=config.settings().reusable_task_age_secs)
485 if dupe_summary and dupe_summary.created_ts > oldest:
486 # Setting task.queue_number to None removes it from the scheduling.
487 task.queue_number = None
488 _copy_summary(
489 dupe_summary, result_summary,
490 ('created_ts', 'modified_ts', 'name', 'user', 'tags'))
491 # Zap irrelevant properties. PerformanceStats is also not copied over,
492 # since it's not relevant.
493 result_summary.properties_hash = None
494 result_summary.try_number = 0
495 result_summary.cost_saved_usd = result_summary.cost_usd
496 # Only zap after.
497 result_summary.costs_usd = []
498 result_summary.deduped_from = task_pack.pack_run_result_key(
499 dupe_summary.run_result_key)
500 # In this code path, there's not much to do as the task will not be run,
501 # previous results are returned. We still need to store all the entities
502 # correctly.
503 datastore_utils.insert(
504 request, get_new_keys, extra=[task, result_summary])
505 logging.debug(
506 'New request %s reusing %s', result_summary.task_id,
507 dupe_summary.task_id)
508 stored = True
509 deduped = True
510
511 if not stored:
512 # Storing these entities makes this task live. It is important at this point 526 # Storing these entities makes this task live. It is important at this point
513 # that the HTTP handler returns as fast as possible, otherwise the task will 527 # that the HTTP handler returns as fast as possible, otherwise the task will
514 # be run but the client will not know about it. 528 # be run but the client will not know about it.
515 datastore_utils.insert(request, get_new_keys, extra=[task, result_summary]) 529 datastore_utils.insert(request, get_new_keys, extra=[task, result_summary])
516 logging.debug('New request %s', result_summary.task_id) 530 logging.debug('New request %s', result_summary.task_id)
517 531
518 # Get parent task details if applicable. 532 # Get parent task details if applicable.
519 if request.parent_task_id: 533 if request.parent_task_id:
520 parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id) 534 parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id)
521 parent_task_keys = [ 535 parent_task_keys = [
(...skipping 431 matching lines...) Expand 10 before | Expand all | Expand 10 after
953 ## Task queue tasks. 967 ## Task queue tasks.
954 968
955 969
956 def task_handle_pubsub_task(payload): 970 def task_handle_pubsub_task(payload):
957 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" 971 """Handles task enqueued by _maybe_pubsub_notify_via_tq."""
958 # Do not catch errors to trigger task queue task retry. Errors should not 972 # Do not catch errors to trigger task queue task retry. Errors should not
959 # happen in normal case. 973 # happen in normal case.
960 _pubsub_notify( 974 _pubsub_notify(
961 payload['task_id'], payload['topic'], 975 payload['task_id'], payload['topic'],
962 payload['auth_token'], payload['userdata']) 976 payload['auth_token'], payload['userdata'])
OLDNEW
« no previous file with comments | « appengine/swarming/server/task_result.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698