Chromium Code Reviews| Index: appengine/swarming/server/task_scheduler.py |
| diff --git a/appengine/swarming/server/task_scheduler.py b/appengine/swarming/server/task_scheduler.py |
| index f0394b50167ee6a96e04e52c16e75a773182d78a..d29dbe8d46630104264c6f845b46d01ad00ef58c 100644 |
| --- a/appengine/swarming/server/task_scheduler.py |
| +++ b/appengine/swarming/server/task_scheduler.py |
| @@ -402,6 +402,42 @@ def _can_use_dimension(dim_acls, ident, k, v): |
| return True |
| +def _find_dupe_task(now, h): |
| + """Finds a previously run task that is also idempotent and completed. |
| + |
| + Fetch items that can be used to dedupe the task. See the comment for this |
| + property for more details. |
| + |
| + Do not use "task_result.TaskResultSummary.created_ts > oldest" here because |
| + this would require a composite index. It's unnecessary because TaskRequest.key |
| + is equivalent to decreasing TaskRequest.created_ts, ordering by key works as |
| + well and doesn't require a composite index. |
| + """ |
| + # TODO(maruel): Make a reverse map on successful task completion so this |
| + # becomes a simple ndb.get(). |
| + cls = task_result.TaskResultSummary |
| + q = cls.query(cls.properties_hash==h).order(cls.key) |
| + for i, dupe_summary in enumerate(q.iter(batch_size=1)): |
|
M-A Ruel
2016/08/31 20:26:30
Using batch_size=1 since the general case is to on
|
| + # It is possible for the query to return stale items. |
| + if (dupe_summary.state != task_result.State.COMPLETED or |
| + dupe_summary.failure): |
| + if i == 2: |
| + # Indexes are very inconsistent, give up. |
| + return None |
| + continue |
| + |
| + # Refuse tasks older than X days. This is due to the isolate server |
| + # dropping files. |
| + # TODO(maruel): The value should be calculated from the isolate server |
| + # setting and be unbounded when no isolated input was used. |
| + oldest = now - datetime.timedelta( |
| + seconds=config.settings().reusable_task_age_secs) |
| + if dupe_summary.created_ts <= oldest: |
| + return None |
| + return dupe_summary |
| + return None |
| + |
| + |
| ### Public API. |
| @@ -459,56 +495,34 @@ def schedule_request(request): |
| return key |
| deduped = False |
| - stored = False |
| if request.properties.idempotent: |
| - # Find a previously run task that is also idempotent and completed. Start a |
| - # query to fetch items that can be used to dedupe the task. See the comment |
| - # for this property for more details. |
| - # |
| - # Do not use "cls.created_ts > oldest" here because this would require a |
| - # composite index. It's unnecessary because TaskRequest.key is equivalent to |
| - # decreasing TaskRequest.created_ts, ordering by key works as well and |
| - # doesn't require a composite index. |
| - cls = task_result.TaskResultSummary |
| - h = request.properties.properties_hash |
| - |
| - # TODO(maruel): Make a reverse map on successful task completion so this |
| - # becomes a simple ndb.get(). |
| - dupe_summary = cls.query(cls.properties_hash==h).order(cls.key).get() |
| + dupe_summary = _find_dupe_task(now, request.properties.properties_hash) |
| if dupe_summary: |
| - # Refuse tasks older than X days. This is due to the isolate server |
| - # dropping files. |
| - # TODO(maruel): The value should be calculated from the isolate server |
| - # setting and be unbounded when no isolated input was used. |
| - oldest = now - datetime.timedelta( |
| - seconds=config.settings().reusable_task_age_secs) |
| - if dupe_summary and dupe_summary.created_ts > oldest: |
| - # Setting task.queue_number to None removes it from the scheduling. |
| - task.queue_number = None |
| - _copy_summary( |
| - dupe_summary, result_summary, |
| - ('created_ts', 'modified_ts', 'name', 'user', 'tags')) |
| - # Zap irrelevant properties. PerformanceStats is also not copied over, |
| - # since it's not relevant. |
| - result_summary.properties_hash = None |
| - result_summary.try_number = 0 |
| - result_summary.cost_saved_usd = result_summary.cost_usd |
| - # Only zap after. |
| - result_summary.costs_usd = [] |
| - result_summary.deduped_from = task_pack.pack_run_result_key( |
| - dupe_summary.run_result_key) |
| - # In this code path, there's not much to do as the task will not be run, |
| - # previous results are returned. We still need to store all the entities |
| - # correctly. |
| - datastore_utils.insert( |
| - request, get_new_keys, extra=[task, result_summary]) |
| - logging.debug( |
| - 'New request %s reusing %s', result_summary.task_id, |
| - dupe_summary.task_id) |
| - stored = True |
| - deduped = True |
| - |
| - if not stored: |
| + # Setting task.queue_number to None removes it from the scheduling. |
| + task.queue_number = None |
| + _copy_summary( |
| + dupe_summary, result_summary, |
| + ('created_ts', 'modified_ts', 'name', 'user', 'tags')) |
| + # Zap irrelevant properties. PerformanceStats is also not copied over, |
| + # since it's not relevant. |
| + result_summary.properties_hash = None |
| + result_summary.try_number = 0 |
| + result_summary.cost_saved_usd = result_summary.cost_usd |
| + # Only zap after. |
| + result_summary.costs_usd = [] |
| + result_summary.deduped_from = task_pack.pack_run_result_key( |
| + dupe_summary.run_result_key) |
| + # In this code path, there's not much to do as the task will not be run, |
| + # previous results are returned. We still need to store all the entities |
| + # correctly. |
| + datastore_utils.insert( |
| + request, get_new_keys, extra=[task, result_summary]) |
| + logging.debug( |
| + 'New request %s reusing %s', result_summary.task_id, |
| + dupe_summary.task_id) |
| + deduped = True |
| + |
| + if not deduped: |
| # Storing these entities makes this task live. It is important at this point |
| # that the HTTP handler returns as fast as possible, otherwise the task will |
| # be run but the client will not know about it. |