| OLD | NEW |
| (Empty) | |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import copy |
| 6 from datetime import datetime |
| 7 import logging |
| 8 import time |
| 9 |
| 10 from google.appengine.ext import ndb |
| 11 |
| 12 from common.http_client_appengine import HttpClientAppengine as HttpClient |
| 13 from common.pipeline_wrapper import BasePipeline |
| 14 from model import analysis_status |
| 15 from waterfall import swarming_util |
| 16 from waterfall import waterfall_config |
| 17 |
| 18 |
| 19 class TriggerBaseSwarmingTaskPipeline(BasePipeline): #pragma: no cover |
| 20 """A pipeline to trigger a Swarming task to re-run selected tests of a step. |
| 21 |
| 22 This pipeline only supports test steps that run on Swarming and support the |
| 23 gtest filter. |
| 24 """ |
| 25 |
| 26 def _GetSwarmingTaskName(self, ref_task_id): # pragma: no cover. |
| 27 return 'findit/deflake/ref_task_id/%s/%s' % ( |
| 28 ref_task_id, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S %f')) |
| 29 |
| 30 def _CreateNewSwarmingTaskRequest(self, ref_task_id, ref_request, master_name, |
| 31 builder_name, build_number,step_name, |
| 32 tests, iterations): |
| 33 """Returns a SwarmingTaskRequest instance to run the given tests only.""" |
| 34 # Make a copy of the referred request and drop or overwrite some fields. |
| 35 new_request = copy.deepcopy(ref_request) |
| 36 new_request.name = self._GetSwarmingTaskName(ref_task_id) |
| 37 new_request.parent_task_id = '' |
| 38 new_request.user = '' |
| 39 |
| 40 # To force a fresh re-run and ignore cached result of any equivalent run. |
| 41 new_request.idempotent = False |
| 42 |
| 43 # Set the gtest_filter to run the given tests only. |
| 44 new_request.extra_args.append('--gtest_repeat=%s' % iterations) |
| 45 new_request.extra_args.append('--test-launcher-retry-limit=0') |
| 46 new_request.extra_args = [ |
| 47 a for a in new_request.extra_args if not a.startswith('--gtest_filter') |
| 48 ] |
| 49 new_request.extra_args.append('--gtest_filter=%s' % ':'.join(tests)) |
| 50 |
| 51 # Remove the env setting for sharding. |
| 52 sharding_settings = ['GTEST_SHARD_INDEX', 'GTEST_TOTAL_SHARDS'] |
| 53 new_request.env = [ |
| 54 e for e in new_request.env if e['key'] not in sharding_settings |
| 55 ] |
| 56 |
| 57 # Reset tags for searching and monitoring. |
| 58 ref_name = swarming_util.GetTagValue(ref_request.tags, 'name') |
| 59 new_request.tags = [] |
| 60 new_request.tags.append('purpose:deflake') |
| 61 new_request.tags.append('ref_master:%s' % master_name) |
| 62 new_request.tags.append('ref_buildername:%s' % builder_name) |
| 63 new_request.tags.append('ref_buildnumber:%s' % build_number) |
| 64 new_request.tags.append('ref_stepname:%s' % step_name) |
| 65 new_request.tags.append('ref_task_id:%s' % ref_task_id) |
| 66 new_request.tags.append('ref_name:%s' % ref_name) |
| 67 |
| 68 return new_request |
| 69 |
| 70 def _GetArgs(self, master_name, builder_name, build_number, step_name, tests): |
| 71 #returns an array you can pass into _GetSwarmingTask, _CreateSwarmingTask, |
| 72 #_NeedANewSwarmingTask as the arguments |
| 73 #Should be overwritten in child method |
| 74 raise NotImplementedError |
| 75 |
| 76 def _GetSwarmingTask(self): |
| 77 # Get the appropriate kind of Swarming Task (Wf or Flake) |
| 78 # Should be overwritten in child method |
| 79 raise NotImplementedError |
| 80 |
| 81 def _CreateSwarmingTask(self): |
| 82 # Create the appropriate kind of Swarming Task (Wf or Flake) |
| 83 # Should be overwritten in child method |
| 84 raise NotImplementedError |
| 85 |
| 86 def _NeedANewSwarmingTask(self, *args): |
| 87 swarming_task = self._GetSwarmingTask(*args) |
| 88 if not swarming_task: |
| 89 swarming_task = self._CreateSwarmingTask(*args) |
| 90 swarming_task.status = analysis_status.PENDING |
| 91 swarming_task.put() |
| 92 return True |
| 93 else: |
| 94 # TODO(http://crbug.com/585676): Rerun the Swarming task if it runs into |
| 95 # unexpected infra errors. |
| 96 return False |
| 97 |
| 98 def _GetSwarmingTaskId(self, *args): |
| 99 deadline = time.time() + 5 * 60 # Wait for 5 minutes. |
| 100 while time.time() < deadline: |
| 101 swarming_task = self._GetSwarmingTask(*args) |
| 102 |
| 103 if not swarming_task: # pragma: no cover. Pipeline will retry. |
| 104 raise Exception('Swarming task was deleted unexpectedly!!!') |
| 105 |
| 106 if swarming_task.task_id: |
| 107 return swarming_task.task_id |
| 108 |
| 109 # Wait for the existing pipeline to start the Swarming task. |
| 110 time.sleep(10) |
| 111 |
| 112 raise Exception('Time out!') # pragma: no cover. Pipeline will retry. |
| 113 |
| 114 def _GetIterationsToRerun(self): |
| 115 # How many times we want to run the swarming rerun |
| 116 # By default, it's what's in wf_config |
| 117 raise NotImplementedError |
| 118 |
| 119 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 120 def run(self, master_name, builder_name, build_number, step_name, tests): |
| 121 """Triggers a new Swarming task to run the given tests. |
| 122 |
| 123 Args: |
| 124 master_name (str): The master name. |
| 125 builder_name (str): The builder name. |
| 126 build_number (str): The build number. |
| 127 step_name (str): The failed test step name. |
| 128 tests (list): A list of test cases, eg: ['suite1.test1', 'suite2.testw2'] |
| 129 |
| 130 Returns: |
| 131 task_id (str): The new Swarming task that re-run the given tests. |
| 132 """ |
| 133 call_args = self._GetArgs(master_name, builder_name, |
| 134 build_number, step_name, tests) |
| 135 # Check if a new Swarming Task is really needed. |
| 136 if not self._NeedANewSwarmingTask(*call_args): |
| 137 return self._GetSwarmingTaskId(*call_args) |
| 138 assert tests |
| 139 http_client = HttpClient() |
| 140 |
| 141 # 0. Retrieve existing Swarming task ids for the given step. |
| 142 swarming_task_items = swarming_util.ListSwarmingTasksDataByTags( |
| 143 master_name, builder_name, build_number, http_client, step_name) |
| 144 assert len(swarming_task_items) > 0, 'No Swarming task was run.' |
| 145 ref_task_id = swarming_task_items[0]['task_id'] |
| 146 |
| 147 # 1. Retrieve Swarming task parameters from a given Swarming task id. |
| 148 ref_request = swarming_util.GetSwarmingTaskRequest( |
| 149 ref_task_id, http_client) |
| 150 |
| 151 # 2. Update/Overwrite parameters for the re-run. |
| 152 iterations_to_rerun = self._GetIterationsToRerun() |
| 153 |
| 154 new_request = self._CreateNewSwarmingTaskRequest( |
| 155 ref_task_id, ref_request, master_name, builder_name, build_number, |
| 156 step_name, tests, iterations_to_rerun) |
| 157 |
| 158 # 3. Trigger a new Swarming task to re-run the failed tests. |
| 159 task_id = swarming_util.TriggerSwarmingTask(new_request, http_client) |
| 160 |
| 161 # Save the task id. |
| 162 swarming_task = self._GetSwarmingTask(*call_args) |
| 163 swarming_task.task_id = task_id |
| 164 swarming_task.parameters['tests'] = tests |
| 165 swarming_task.parameters['iterations_to_rerun'] = iterations_to_rerun |
| 166 swarming_task.parameters['ref_name'] = swarming_util.GetTagValue( |
| 167 new_request.tags, 'ref_name') |
| 168 swarming_task.put() |
| 169 |
| 170 logging.info('A Swarming task was triggered:%s', task_id) |
| 171 return task_id |
| OLD | NEW |