Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(310)

Side by Side Diff: appengine/findit/waterfall/trigger_base_swarming_task_pipeline.py

Issue 2130543004: Waterfall components of regression range finder. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import copy
6 from datetime import datetime
7 import logging
8 import time
9
10 from google.appengine.ext import ndb
11
12 from common.http_client_appengine import HttpClientAppengine as HttpClient
13 from common.pipeline_wrapper import BasePipeline
14 from model import analysis_status
15 from waterfall import swarming_util
16 from waterfall import waterfall_config
17
18
19 class TriggerBaseSwarmingTaskPipeline(BasePipeline):
20 """A pipeline to trigger a Swarming task to re-run selected tests of a step.
21
22 This pipeline only supports test steps that run on Swarming and support the
23 gtest filter.
24 """
25
26 def _GetSwarmingTaskName(self, ref_task_id): # pragma: no cover.
27 return 'findit/deflake/ref_task_id/%s/%s' % (
28 ref_task_id, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S %f'))
29
30 def _CreateNewSwarmingTaskRequest(self, ref_task_id, ref_request, master_name,
31 builder_name, build_number,step_name,
32 tests, iterations):
33 """Returns a SwarmingTaskRequest instance to run the given tests only."""
34 # Make a copy of the referred request and drop or overwrite some fields.
35 new_request = copy.deepcopy(ref_request)
36 new_request.name = self._GetSwarmingTaskName(ref_task_id)
37 new_request.parent_task_id = ''
38 new_request.user = ''
39
40 # To force a fresh re-run and ignore cached result of any equivalent run.
41 new_request.idempotent = False
42
43 # Set the gtest_filter to run the given tests only.
44 new_request.extra_args.append('--gtest_repeat=%s' % iterations)
45 new_request.extra_args.append('--test-launcher-retry-limit=0')
46 new_request.extra_args = [
47 a for a in new_request.extra_args if not a.startswith('--gtest_filter')
48 ]
49 new_request.extra_args.append('--gtest_filter=%s' % ':'.join(tests))
50
51 # Remove the env setting for sharding.
52 sharding_settings = ['GTEST_SHARD_INDEX', 'GTEST_TOTAL_SHARDS']
53 new_request.env = [
54 e for e in new_request.env if e['key'] not in sharding_settings
55 ]
56
57 # Reset tags for searching and monitoring.
58 ref_name = swarming_util.GetTagValue(ref_request.tags, 'name')
59 new_request.tags = []
60 new_request.tags.append('purpose:deflake')
61 new_request.tags.append('ref_master:%s' % master_name)
62 new_request.tags.append('ref_buildername:%s' % builder_name)
63 new_request.tags.append('ref_buildnumber:%s' % build_number)
64 new_request.tags.append('ref_stepname:%s' % step_name)
65 new_request.tags.append('ref_task_id:%s' % ref_task_id)
66 new_request.tags.append('ref_name:%s' % ref_name)
67
68 return new_request
69
70 @ndb.transactional
71 def _GetSwarmingTask(self, master_name, builder_name,
72 build_number, step_name):
73 # Get the appropriate kind of Swarming Task (Wf or Flake)
74 # Should be overwritten in child method
75 pass
stgao 2016/07/09 00:04:34 Raise NotImplementedError ? Same for below.
caiw 2016/07/14 00:59:45 Done.
76
77 @ndb.transactional
78 def _CreateSwarmingTask(self, master_name, builder_name,
79 build_number, step_name):
80 # Create the appropriate kind of Swarming Task (Wf or Flake)
81 # Should be overwritten in child method
82 pass
83
84 # pylint: disable=no-value-for-parameter
85 @ndb.transactional(xg=True)
stgao 2016/07/09 00:04:34 Why cross-group transaction is needed here?
caiw 2016/07/14 00:59:45 I don't remember, but I got an error requiring me
86 def _NeedANewSwarmingTask(self, master_name, builder_name,
87 build_number, step_name):
88 swarming_task = self._GetSwarmingTask(
89 master_name, builder_name, build_number, step_name)
90
91 if not swarming_task:
92 swarming_task = self._CreateSwarmingTask(
93 master_name, builder_name, step_name, build_number)
94 swarming_task.status = analysis_status.PENDING
95 swarming_task.put()
96 return True
97 else:
98 # TODO(http://crbug.com/585676): Rerun the Swarming task if it runs into
99 # unexpected infra errors.
100 return False
101
102
103 def _GetSwarmingTaskId(self, master_name, builder_name,
104 build_number, step_name):
105 deadline = time.time() + 5 * 60 # Wait for 5 minutes.
106 while time.time() < deadline:
107 swarming_task = self._GetSwarmingTask(
108 master_name, builder_name, step_name, build_number)
109
110 if not swarming_task: # pragma: no cover. Pipeline will retry.
111 raise Exception('Swarming task was deleted unexpectedly!!!')
112
113 if swarming_task.task_id:
114 return swarming_task.task_id
115
116 # Wait for the existing pipeline to start the Swarming task.
117 time.sleep(10)
118
119 raise Exception('Time out!') # pragma: no cover. Pipeline will retry.
120
121 def _GetIterationsToRerun(self):
122 # How many times we want to run the swarming rerun
123 # By default, it's what's in wf_config
124 iterations_to_rerun = waterfall_config.GetSwarmingSettings().get(
stgao 2016/07/09 00:04:34 Should this be implemented in the subclass instead
caiw 2016/07/14 00:59:45 Sure, that would be fine.
125 'iterations_to_rerun')
126 return iterations_to_rerun
127
128 # Arguments number differs from overridden method - pylint: disable=W0221
129 def run(self, master_name, builder_name, build_number, step_name, tests):
130 """Triggers a new Swarming task to run the given tests.
131
132 Args:
133 master_name (str): The master name.
134 builder_name (str): The builder name.
135 build_number (str): The build number.
136 step_name (str): The failed test step name.
137 tests (list): A list of test cases, eg: ['suite1.test1', 'suite2.test2'].
138
139 Returns:
140 task_id (str): The new Swarming task that re-run the given tests.
141 """
142 # Check if a new Swarming Task is really needed.
143 if not self._NeedANewSwarmingTask(
144 master_name, builder_name, build_number, step_name):
145 return self._GetSwarmingTaskId(
146 master_name, builder_name, build_number, step_name)
147
148 assert tests
149 http_client = HttpClient()
150
151 # 0. Retrieve existing Swarming task ids for the given step.
152 swarming_task_items = swarming_util.ListSwarmingTasksDataByTags(
153 master_name, builder_name, build_number, http_client, step_name)
154 assert len(swarming_task_items) > 0, 'No Swarming task was run.'
155 ref_task_id = swarming_task_items[0]['task_id']
156
157 # 1. Retrieve Swarming task parameters from a given Swarming task id.
158 ref_request = swarming_util.GetSwarmingTaskRequest(
159 ref_task_id, http_client)
160
161 # 2. Update/Overwrite parameters for the re-run.
162 iterations_to_rerun = self._GetIterationsToRerun()
163 new_request = self._CreateNewSwarmingTaskRequest(
164 ref_task_id, ref_request, master_name, builder_name, build_number,
165 step_name, tests, iterations_to_rerun)
166
167 # 3. Trigger a new Swarming task to re-run the failed tests.
168 task_id = swarming_util.TriggerSwarmingTask(new_request, http_client)
169
170 # Save the task id.
171 swarming_task = self._GetSwarmingTask(
172 master_name, builder_name, step_name, build_number)
173 swarming_task.task_id = task_id
174 swarming_task.parameters['tests'] = tests
175 swarming_task.parameters['iterations_to_rerun'] = iterations_to_rerun
176 swarming_task.parameters['ref_name'] = swarming_util.GetTagValue(
177 new_request.tags, 'ref_name')
178 swarming_task.put()
179
180 logging.info('A Swarming task was triggered:%s', task_id)
181 return task_id
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698