Chromium Code Reviews| Index: appengine/findit/waterfall/trigger_base_swarming_task_pipeline.py |
| diff --git a/appengine/findit/waterfall/trigger_base_swarming_task_pipeline.py b/appengine/findit/waterfall/trigger_base_swarming_task_pipeline.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..ea43d38c8483ad88759edf9670a84f2f08b48373 |
| --- /dev/null |
| +++ b/appengine/findit/waterfall/trigger_base_swarming_task_pipeline.py |
| @@ -0,0 +1,181 @@ |
| +# Copyright 2016 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +import copy |
| +from datetime import datetime |
| +import logging |
| +import time |
| + |
| +from google.appengine.ext import ndb |
| + |
| +from common.http_client_appengine import HttpClientAppengine as HttpClient |
| +from common.pipeline_wrapper import BasePipeline |
| +from model import analysis_status |
| +from waterfall import swarming_util |
| +from waterfall import waterfall_config |
| + |
| + |
| +class TriggerBaseSwarmingTaskPipeline(BasePipeline): |
| + """A pipeline to trigger a Swarming task to re-run selected tests of a step. |
| + |
| + This pipeline only supports test steps that run on Swarming and support the |
| + gtest filter. |
| + """ |
| + |
| + def _GetSwarmingTaskName(self, ref_task_id): # pragma: no cover. |
| + return 'findit/deflake/ref_task_id/%s/%s' % ( |
| + ref_task_id, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S %f')) |
| + |
| + def _CreateNewSwarmingTaskRequest(self, ref_task_id, ref_request, master_name, |
| + builder_name, build_number,step_name, |
| + tests, iterations): |
| + """Returns a SwarmingTaskRequest instance to run the given tests only.""" |
| + # Make a copy of the referred request and drop or overwrite some fields. |
| + new_request = copy.deepcopy(ref_request) |
| + new_request.name = self._GetSwarmingTaskName(ref_task_id) |
| + new_request.parent_task_id = '' |
| + new_request.user = '' |
| + |
| + # To force a fresh re-run and ignore cached result of any equivalent run. |
| + new_request.idempotent = False |
| + |
| + # Set the gtest_filter to run the given tests only. |
| + new_request.extra_args.append('--gtest_repeat=%s' % iterations) |
| + new_request.extra_args.append('--test-launcher-retry-limit=0') |
| + new_request.extra_args = [ |
| + a for a in new_request.extra_args if not a.startswith('--gtest_filter') |
| + ] |
| + new_request.extra_args.append('--gtest_filter=%s' % ':'.join(tests)) |
| + |
| + # Remove the env setting for sharding. |
| + sharding_settings = ['GTEST_SHARD_INDEX', 'GTEST_TOTAL_SHARDS'] |
| + new_request.env = [ |
| + e for e in new_request.env if e['key'] not in sharding_settings |
| + ] |
| + |
| + # Reset tags for searching and monitoring. |
| + ref_name = swarming_util.GetTagValue(ref_request.tags, 'name') |
| + new_request.tags = [] |
| + new_request.tags.append('purpose:deflake') |
| + new_request.tags.append('ref_master:%s' % master_name) |
| + new_request.tags.append('ref_buildername:%s' % builder_name) |
| + new_request.tags.append('ref_buildnumber:%s' % build_number) |
| + new_request.tags.append('ref_stepname:%s' % step_name) |
| + new_request.tags.append('ref_task_id:%s' % ref_task_id) |
| + new_request.tags.append('ref_name:%s' % ref_name) |
| + |
| + return new_request |
| + |
| + @ndb.transactional |
| + def _GetSwarmingTask(self, master_name, builder_name, |
| + build_number, step_name): |
| + # Get the appropriate kind of Swarming Task (Wf or Flake) |
| + # Should be overwritten in child method |
| + pass |
|
stgao
2016/07/09 00:04:34
Raise NotImplementedError ?
Same for below.
caiw
2016/07/14 00:59:45
Done.
|
| + |
| + @ndb.transactional |
| + def _CreateSwarmingTask(self, master_name, builder_name, |
| + build_number, step_name): |
| + # Create the appropriate kind of Swarming Task (Wf or Flake) |
| + # Should be overwritten in child method |
| + pass |
| + |
| + # pylint: disable=no-value-for-parameter |
| + @ndb.transactional(xg=True) |
|
stgao
2016/07/09 00:04:34
Why cross-group transaction is needed here?
caiw
2016/07/14 00:59:45
I don't remember, but I got an error requiring me
|
| + def _NeedANewSwarmingTask(self, master_name, builder_name, |
| + build_number, step_name): |
| + swarming_task = self._GetSwarmingTask( |
| + master_name, builder_name, build_number, step_name) |
| + |
| + if not swarming_task: |
| + swarming_task = self._CreateSwarmingTask( |
| + master_name, builder_name, step_name, build_number) |
| + swarming_task.status = analysis_status.PENDING |
| + swarming_task.put() |
| + return True |
| + else: |
| + # TODO(http://crbug.com/585676): Rerun the Swarming task if it runs into |
| + # unexpected infra errors. |
| + return False |
| + |
| + |
| + def _GetSwarmingTaskId(self, master_name, builder_name, |
| + build_number, step_name): |
| + deadline = time.time() + 5 * 60 # Wait for 5 minutes. |
| + while time.time() < deadline: |
| + swarming_task = self._GetSwarmingTask( |
| + master_name, builder_name, step_name, build_number) |
| + |
| + if not swarming_task: # pragma: no cover. Pipeline will retry. |
| + raise Exception('Swarming task was deleted unexpectedly!!!') |
| + |
| + if swarming_task.task_id: |
| + return swarming_task.task_id |
| + |
| + # Wait for the existing pipeline to start the Swarming task. |
| + time.sleep(10) |
| + |
| + raise Exception('Time out!') # pragma: no cover. Pipeline will retry. |
| + |
| + def _GetIterationsToRerun(self): |
| + # How many times we want to run the swarming rerun |
| + # By default, it's what's in wf_config |
| + iterations_to_rerun = waterfall_config.GetSwarmingSettings().get( |
|
stgao
2016/07/09 00:04:34
Should this be implemented in the subclass instead
caiw
2016/07/14 00:59:45
Sure, that would be fine.
|
| + 'iterations_to_rerun') |
| + return iterations_to_rerun |
| + |
| + # Arguments number differs from overridden method - pylint: disable=W0221 |
| + def run(self, master_name, builder_name, build_number, step_name, tests): |
| + """Triggers a new Swarming task to run the given tests. |
| + |
| + Args: |
| + master_name (str): The master name. |
| + builder_name (str): The builder name. |
| + build_number (str): The build number. |
| + step_name (str): The failed test step name. |
| + tests (list): A list of test cases, eg: ['suite1.test1', 'suite2.test2']. |
| + |
| + Returns: |
| + task_id (str): The new Swarming task that re-run the given tests. |
| + """ |
| + # Check if a new Swarming Task is really needed. |
| + if not self._NeedANewSwarmingTask( |
| + master_name, builder_name, build_number, step_name): |
| + return self._GetSwarmingTaskId( |
| + master_name, builder_name, build_number, step_name) |
| + |
| + assert tests |
| + http_client = HttpClient() |
| + |
| + # 0. Retrieve existing Swarming task ids for the given step. |
| + swarming_task_items = swarming_util.ListSwarmingTasksDataByTags( |
| + master_name, builder_name, build_number, http_client, step_name) |
| + assert len(swarming_task_items) > 0, 'No Swarming task was run.' |
| + ref_task_id = swarming_task_items[0]['task_id'] |
| + |
| + # 1. Retrieve Swarming task parameters from a given Swarming task id. |
| + ref_request = swarming_util.GetSwarmingTaskRequest( |
| + ref_task_id, http_client) |
| + |
| + # 2. Update/Overwrite parameters for the re-run. |
| + iterations_to_rerun = self._GetIterationsToRerun() |
| + new_request = self._CreateNewSwarmingTaskRequest( |
| + ref_task_id, ref_request, master_name, builder_name, build_number, |
| + step_name, tests, iterations_to_rerun) |
| + |
| + # 3. Trigger a new Swarming task to re-run the failed tests. |
| + task_id = swarming_util.TriggerSwarmingTask(new_request, http_client) |
| + |
| + # Save the task id. |
| + swarming_task = self._GetSwarmingTask( |
| + master_name, builder_name, step_name, build_number) |
| + swarming_task.task_id = task_id |
| + swarming_task.parameters['tests'] = tests |
| + swarming_task.parameters['iterations_to_rerun'] = iterations_to_rerun |
| + swarming_task.parameters['ref_name'] = swarming_util.GetTagValue( |
| + new_request.tags, 'ref_name') |
| + swarming_task.put() |
| + |
| + logging.info('A Swarming task was triggered:%s', task_id) |
| + return task_id |