Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3291)

Unified Diff: appengine/swarming/server/task_scheduler.py

Issue 2298053002: Add more safety checks. (Closed)
Patch Set: Refactored searching code into function for readability Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/swarming/server/task_result.py ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/swarming/server/task_scheduler.py
diff --git a/appengine/swarming/server/task_scheduler.py b/appengine/swarming/server/task_scheduler.py
index f0394b50167ee6a96e04e52c16e75a773182d78a..d29dbe8d46630104264c6f845b46d01ad00ef58c 100644
--- a/appengine/swarming/server/task_scheduler.py
+++ b/appengine/swarming/server/task_scheduler.py
@@ -402,6 +402,42 @@ def _can_use_dimension(dim_acls, ident, k, v):
return True
+def _find_dupe_task(now, h):
+ """Finds a previously run task that is also idempotent and completed.
+
+ Fetch items that can be used to dedupe the task. See the comment for this
+ property for more details.
+
+ Do not use "task_result.TaskResultSummary.created_ts > oldest" here because
+ this would require a composite index. It's unnecessary because TaskRequest.key
+ is equivalent to decreasing TaskRequest.created_ts, ordering by key works as
+ well and doesn't require a composite index.
+ """
+ # TODO(maruel): Make a reverse map on successful task completion so this
+ # becomes a simple ndb.get().
+ cls = task_result.TaskResultSummary
+ q = cls.query(cls.properties_hash==h).order(cls.key)
+ for i, dupe_summary in enumerate(q.iter(batch_size=1)):
M-A Ruel 2016/08/31 20:26:30 Using batch_size=1 since the general case is to on
+ # It is possible for the query to return stale items.
+ if (dupe_summary.state != task_result.State.COMPLETED or
+ dupe_summary.failure):
+ if i == 2:
+ # Indexes are very inconsistent, give up.
+ return None
+ continue
+
+ # Refuse tasks older than X days. This is due to the isolate server
+ # dropping files.
+ # TODO(maruel): The value should be calculated from the isolate server
+ # setting and be unbounded when no isolated input was used.
+ oldest = now - datetime.timedelta(
+ seconds=config.settings().reusable_task_age_secs)
+ if dupe_summary.created_ts <= oldest:
+ return None
+ return dupe_summary
+ return None
+
+
### Public API.
@@ -459,56 +495,34 @@ def schedule_request(request):
return key
deduped = False
- stored = False
if request.properties.idempotent:
- # Find a previously run task that is also idempotent and completed. Start a
- # query to fetch items that can be used to dedupe the task. See the comment
- # for this property for more details.
- #
- # Do not use "cls.created_ts > oldest" here because this would require a
- # composite index. It's unnecessary because TaskRequest.key is equivalent to
- # decreasing TaskRequest.created_ts, ordering by key works as well and
- # doesn't require a composite index.
- cls = task_result.TaskResultSummary
- h = request.properties.properties_hash
-
- # TODO(maruel): Make a reverse map on successful task completion so this
- # becomes a simple ndb.get().
- dupe_summary = cls.query(cls.properties_hash==h).order(cls.key).get()
+ dupe_summary = _find_dupe_task(now, request.properties.properties_hash)
if dupe_summary:
- # Refuse tasks older than X days. This is due to the isolate server
- # dropping files.
- # TODO(maruel): The value should be calculated from the isolate server
- # setting and be unbounded when no isolated input was used.
- oldest = now - datetime.timedelta(
- seconds=config.settings().reusable_task_age_secs)
- if dupe_summary and dupe_summary.created_ts > oldest:
- # Setting task.queue_number to None removes it from the scheduling.
- task.queue_number = None
- _copy_summary(
- dupe_summary, result_summary,
- ('created_ts', 'modified_ts', 'name', 'user', 'tags'))
- # Zap irrelevant properties. PerformanceStats is also not copied over,
- # since it's not relevant.
- result_summary.properties_hash = None
- result_summary.try_number = 0
- result_summary.cost_saved_usd = result_summary.cost_usd
- # Only zap after.
- result_summary.costs_usd = []
- result_summary.deduped_from = task_pack.pack_run_result_key(
- dupe_summary.run_result_key)
- # In this code path, there's not much to do as the task will not be run,
- # previous results are returned. We still need to store all the entities
- # correctly.
- datastore_utils.insert(
- request, get_new_keys, extra=[task, result_summary])
- logging.debug(
- 'New request %s reusing %s', result_summary.task_id,
- dupe_summary.task_id)
- stored = True
- deduped = True
-
- if not stored:
+ # Setting task.queue_number to None removes it from the scheduling.
+ task.queue_number = None
+ _copy_summary(
+ dupe_summary, result_summary,
+ ('created_ts', 'modified_ts', 'name', 'user', 'tags'))
+ # Zap irrelevant properties. PerformanceStats is also not copied over,
+ # since it's not relevant.
+ result_summary.properties_hash = None
+ result_summary.try_number = 0
+ result_summary.cost_saved_usd = result_summary.cost_usd
+ # Only zap after.
+ result_summary.costs_usd = []
+ result_summary.deduped_from = task_pack.pack_run_result_key(
+ dupe_summary.run_result_key)
+ # In this code path, there's not much to do as the task will not be run,
+ # previous results are returned. We still need to store all the entities
+ # correctly.
+ datastore_utils.insert(
+ request, get_new_keys, extra=[task, result_summary])
+ logging.debug(
+ 'New request %s reusing %s', result_summary.task_id,
+ dupe_summary.task_id)
+ deduped = True
+
+ if not deduped:
# Storing these entities makes this task live. It is important at this point
# that the HTTP handler returns as fast as possible, otherwise the task will
# be run but the client will not know about it.
« no previous file with comments | « appengine/swarming/server/task_result.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698