Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(137)

Side by Side Diff: appengine/swarming/server/task_scheduler.py

Issue 2298053002: Add more safety checks. (Closed)
Patch Set: More check Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/swarming/server/task_result.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2014 The LUCI Authors. All rights reserved. 1 # Copyright 2014 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """High level tasks execution scheduling API. 5 """High level tasks execution scheduling API.
6 6
7 This is the interface closest to the HTTP handlers. 7 This is the interface closest to the HTTP handlers.
8 """ 8 """
9 9
10 import datetime 10 import datetime
(...skipping 456 matching lines...) Expand 10 before | Expand all | Expand 10 after
467 # 467 #
468 # Do not use "cls.created_ts > oldest" here because this would require a 468 # Do not use "cls.created_ts > oldest" here because this would require a
469 # composite index. It's unnecessary because TaskRequest.key is equivalent to 469 # composite index. It's unnecessary because TaskRequest.key is equivalent to
470 # decreasing TaskRequest.created_ts, ordering by key works as well and 470 # decreasing TaskRequest.created_ts, ordering by key works as well and
471 # doesn't require a composite index. 471 # doesn't require a composite index.
472 cls = task_result.TaskResultSummary 472 cls = task_result.TaskResultSummary
473 h = request.properties.properties_hash 473 h = request.properties.properties_hash
474 474
475 # TODO(maruel): Make a reverse map on successful task completion so this 475 # TODO(maruel): Make a reverse map on successful task completion so this
476 # becomes a simple ndb.get(). 476 # becomes a simple ndb.get().
477 dupe_summary = cls.query(cls.properties_hash==h).order(cls.key).get() 477 count = 0
478 if dupe_summary: 478 for dupe_summary in cls.query(cls.properties_hash==h):
Vadim Sh. 2016/08/31 19:46:06 no order by key anymore?
Vadim Sh. 2016/08/31 19:46:06 nit: for i, dupe_summary in iterate(....): ...
M-A Ruel 2016/08/31 20:26:30 Done.
M-A Ruel 2016/08/31 20:26:30 Humm not sure why I had removed it, added back.
479 count += 1
480 # It is possible for the query to return stale items.
481 if (dupe_summary.state != task_result.State.COMPLETED or
482 dupe_summary.failure):
483 if count == 3:
Vadim Sh. 2016/08/31 19:46:06 Then also use smaller batch_size (it is 20 by defa
M-A Ruel 2016/08/31 20:26:30 Done.
484 # Indexes are very inconsistent, give up.
485 break
486 continue
479 # Refuse tasks older than X days. This is due to the isolate server 487 # Refuse tasks older than X days. This is due to the isolate server
480 # dropping files. 488 # dropping files.
481 # TODO(maruel): The value should be calculated from the isolate server 489 # TODO(maruel): The value should be calculated from the isolate server
482 # setting and be unbounded when no isolated input was used. 490 # setting and be unbounded when no isolated input was used.
483 oldest = now - datetime.timedelta( 491 oldest = now - datetime.timedelta(
484 seconds=config.settings().reusable_task_age_secs) 492 seconds=config.settings().reusable_task_age_secs)
485 if dupe_summary and dupe_summary.created_ts > oldest: 493 if dupe_summary.created_ts <= oldest:
486 # Setting task.queue_number to None removes it from the scheduling. 494 # Task is too old.
487 task.queue_number = None 495 break
488 _copy_summary( 496 # Setting task.queue_number to None removes it from the scheduling.
Vadim Sh. 2016/08/31 19:46:06 nit: break out of the loop here and do the rest of
M-A Ruel 2016/08/31 20:26:30 Refactored searching code into a function.
489 dupe_summary, result_summary, 497 task.queue_number = None
490 ('created_ts', 'modified_ts', 'name', 'user', 'tags')) 498 _copy_summary(
491 # Zap irrelevant properties. PerformanceStats is also not copied over, 499 dupe_summary, result_summary,
492 # since it's not relevant. 500 ('created_ts', 'modified_ts', 'name', 'user', 'tags'))
493 result_summary.properties_hash = None 501 # Zap irrelevant properties. PerformanceStats is also not copied over,
494 result_summary.try_number = 0 502 # since it's not relevant.
495 result_summary.cost_saved_usd = result_summary.cost_usd 503 result_summary.properties_hash = None
496 # Only zap after. 504 result_summary.try_number = 0
497 result_summary.costs_usd = [] 505 result_summary.cost_saved_usd = result_summary.cost_usd
498 result_summary.deduped_from = task_pack.pack_run_result_key( 506 # Only zap after.
499 dupe_summary.run_result_key) 507 result_summary.costs_usd = []
500 # In this code path, there's not much to do as the task will not be run, 508 result_summary.deduped_from = task_pack.pack_run_result_key(
501 # previous results are returned. We still need to store all the entities 509 dupe_summary.run_result_key)
502 # correctly. 510 # In this code path, there's not much to do as the task will not be run,
503 datastore_utils.insert( 511 # previous results are returned. We still need to store all the entities
504 request, get_new_keys, extra=[task, result_summary]) 512 # correctly.
505 logging.debug( 513 datastore_utils.insert(
506 'New request %s reusing %s', result_summary.task_id, 514 request, get_new_keys, extra=[task, result_summary])
507 dupe_summary.task_id) 515 logging.debug(
508 stored = True 516 'New request %s reusing %s', result_summary.task_id,
509 deduped = True 517 dupe_summary.task_id)
518 stored = True
519 deduped = True
520 break
510 521
511 if not stored: 522 if not stored:
512 # Storing these entities makes this task live. It is important at this point 523 # Storing these entities makes this task live. It is important at this point
513 # that the HTTP handler returns as fast as possible, otherwise the task will 524 # that the HTTP handler returns as fast as possible, otherwise the task will
514 # be run but the client will not know about it. 525 # be run but the client will not know about it.
515 datastore_utils.insert(request, get_new_keys, extra=[task, result_summary]) 526 datastore_utils.insert(request, get_new_keys, extra=[task, result_summary])
516 logging.debug('New request %s', result_summary.task_id) 527 logging.debug('New request %s', result_summary.task_id)
517 528
518 # Get parent task details if applicable. 529 # Get parent task details if applicable.
519 if request.parent_task_id: 530 if request.parent_task_id:
(...skipping 433 matching lines...) Expand 10 before | Expand all | Expand 10 after
953 ## Task queue tasks. 964 ## Task queue tasks.
954 965
955 966
956 def task_handle_pubsub_task(payload): 967 def task_handle_pubsub_task(payload):
957 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" 968 """Handles task enqueued by _maybe_pubsub_notify_via_tq."""
958 # Do not catch errors to trigger task queue task retry. Errors should not 969 # Do not catch errors to trigger task queue task retry. Errors should not
959 # happen in normal case. 970 # happen in normal case.
960 _pubsub_notify( 971 _pubsub_notify(
961 payload['task_id'], payload['topic'], 972 payload['task_id'], payload['topic'],
962 payload['auth_token'], payload['userdata']) 973 payload['auth_token'], payload['userdata'])
OLDNEW
« no previous file with comments | « appengine/swarming/server/task_result.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698