Chromium Code Reviews| Index: appengine/findit/waterfall/trigger_swarming_tasks_pipeline.py |
| diff --git a/appengine/findit/waterfall/trigger_swarming_tasks_pipeline.py b/appengine/findit/waterfall/trigger_swarming_tasks_pipeline.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..c943a7a2b7d0cd5df7d58705401847bbedb7bbfa |
| --- /dev/null |
| +++ b/appengine/findit/waterfall/trigger_swarming_tasks_pipeline.py |
| @@ -0,0 +1,63 @@ |
| +# Copyright 2016 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +from collections import defaultdict |
| + |
| +from google.appengine.ext import ndb |
| + |
| +from common.pipeline_wrapper import BasePipeline |
| +from common.waterfall import failure_type |
| +from model.wf_analysis import WfAnalysis |
| +from waterfall.trigger_swarming_task_pipeline import TriggerSwarmingTaskPipeline |
| + |
| + |
| +@ndb.transactional |
| +def _GetsStepsNeedToTriggerSwarmingTasks( |
|
lijeffrey
2016/07/12 22:36:28
Nit: how about rename this function to
_GetStepsT
chanli
2016/07/12 22:54:56
Done.
|
| + master_name, builder_name, build_number, failure_info): |
| + """Gets first time failed steps and tests which haven't triggered |
| + swarming tasks. |
| + """ |
| + result_steps = defaultdict(list) |
| + analysis = WfAnalysis.Get(master_name, builder_name, build_number) |
| + |
| + if not analysis: |
| + return result_steps |
| + failure_result_map = analysis.failure_result_map |
| + |
| + # A dict to store all the first time failed steps and/ or tests which |
| + # have not triggered a swarming task yet. |
| + for failed_step, step_failure in failure_info['failed_steps'].iteritems(): |
| + if failure_result_map.get(failed_step): |
| + # The step has been processed. |
| + continue |
| + |
| + if not step_failure.get('tests'): # Not a swarming gtest. |
| + continue |
| + |
| + failure_result_map[failed_step] = {} |
| + for failed_test, test_failure in step_failure['tests'].iteritems(): |
| + task_key = '%s/%s/%s' % ( |
| + master_name, builder_name, test_failure['first_failure']) |
| + failure_result_map[failed_step][failed_test] = task_key |
| + |
| + if test_failure['first_failure'] == test_failure['current_failure']: |
| + # First time failure, add to result_steps. |
| + result_steps[failed_step].append(test_failure['base_test_name']) |
| + analysis.put() |
| + return result_steps |
| + |
|
lijeffrey
2016/07/12 22:36:28
nit: Shouldn't there be 2 empty lines here?
chanli
2016/07/12 22:54:56
Done.
|
| +class TriggerSwarmingTasksPipeline(BasePipeline): |
| + """Root Pipeline to trigger swarming tasks.""" |
| + |
| + # Arguments number differs from overridden method - pylint: disable=W0221 |
| + def run(self, master_name, builder_name, build_number, failure_info): |
| + if (not failure_info or not failure_info['failed_steps'] or |
| + not failure_info['failure_type'] == failure_type.TEST): |
| + return |
| + |
| + steps = _GetsStepsNeedToTriggerSwarmingTasks( |
| + master_name, builder_name, build_number, failure_info) |
| + for step_name, base_tests in steps.iteritems(): |
| + yield TriggerSwarmingTaskPipeline( |
| + master_name, builder_name, build_number, step_name, base_tests) |