Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(411)

Side by Side Diff: appengine/findit/waterfall/test/trigger_swarming_tasks_pipeline_test.py

Issue 2139093002: [Findit] Trigger swarming tasks after detech_first_faliure_pipeline (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 from common.pipeline_wrapper import BasePipeline
6 from common.pipeline_wrapper import pipeline_handlers
7 from common.waterfall import failure_type
8 from model.wf_analysis import WfAnalysis
9 from waterfall import trigger_swarming_task_pipeline
10 from waterfall import trigger_swarming_tasks_pipeline
11 from waterfall.test import wf_testcase
12 from waterfall.trigger_swarming_tasks_pipeline import (
13 TriggerSwarmingTasksPipeline)
14 from waterfall.trigger_swarming_task_pipeline import TriggerSwarmingTaskPipeline
15
16
17 class TriggerSwarmingTasksPipelineTest(wf_testcase.WaterfallTestCase):
18 app_module = pipeline_handlers._APP
19
20 def test_GetStepsThatNeedToTriggerSwarmingTasksNoAnalysis(self):
21 result = (
22 trigger_swarming_tasks_pipeline._GetStepsThatNeedToTriggerSwarmingTasks(
23 'm', 'b', 1, {}))
24 self.assertEqual(result, {})
25
26 def test_GetStepsThatNeedToTriggerSwarmingTasksNoFailureResultMap(self):
27 master_name = 'm'
28 builder_name = 'b'
29 build_number = 2
30 WfAnalysis.Create(master_name, builder_name, build_number).put()
31
32 failure_info = {
33 'failed': True,
34 'master_name': 'm',
35 'builder_name': 'b',
36 'build_number': 2,
37 'chromium_revision': None,
38 'builds': {
39 2: {
40 'blame_list': [],
41 'chromium_revision': None
42 }
43 },
44 'failed_steps': {
45 'abc_test': {
46 'current_failure': 2,
47 'first_failure': 1,
48 'last_pass': 0,
49 'tests': {
50 'Unittest2.Subtest1': {
51 'current_failure': 2,
52 'first_failure': 2,
53 'last_pass': 1,
54 'base_test_name': 'Unittest2.Subtest1'
55 },
56 'Unittest3.Subtest2': {
57 'current_failure': 2,
58 'first_failure': 1,
59 'last_pass': 0,
60 'base_test_name': 'Unittest3.Subtest2'
61 }
62 }
63 },
64 'a_test': {
65 'current_failure': 2,
66 'first_failure': 1,
67 'last_pass': 0,
68 }
69 },
70 'failure_type': failure_type.TEST
71 }
72
73 expected_result = {
74 'abc_test': ['Unittest2.Subtest1']
75 }
76 expected_failure_result_map = {
77 'abc_test': {
78 'Unittest2.Subtest1': '%s/%s/%d' % (
79 master_name, builder_name, build_number),
80 'Unittest3.Subtest2': '%s/%s/%d' % (
81 master_name, builder_name, 1)
82 }
83 }
84
85 result = (
86 trigger_swarming_tasks_pipeline._GetStepsThatNeedToTriggerSwarmingTasks(
87 master_name, builder_name, build_number, failure_info))
88 analysis = WfAnalysis.Get(master_name, builder_name, build_number)
89
90 self.assertEqual(result, expected_result)
91 self.assertEqual(analysis.failure_result_map, expected_failure_result_map)
92
93 def test_GetStepsThatNeedToTriggerSwarmingTasks(self):
94 master_name = 'm'
95 builder_name = 'b'
96 build_number = 2
97 analysis = WfAnalysis.Create(master_name, builder_name, build_number)
98 analysis.failure_result_map = {
99 'a_tests': {
100 'Unittest1.Subtest1': 'm/b/1'
101 }
102 }
103 analysis.put()
104
105 failure_info = {
106 'failed': True,
107 'master_name': 'm',
108 'builder_name': 'b',
109 'build_number': 2,
110 'chromium_revision': None,
111 'builds': {
112 2: {
113 'blame_list': [],
114 'chromium_revision': None
115 }
116 },
117 'failed_steps': {
118 'abc_test': {
119 'current_failure': 2,
120 'first_failure': 1,
121 'last_pass': 0,
122 'tests': {
123 'Unittest2.Subtest1': {
124 'current_failure': 2,
125 'first_failure': 2,
126 'last_pass': 1,
127 'base_test_name': 'Unittest2.Subtest1'
128 },
129 'Unittest3.Subtest2': {
130 'current_failure': 2,
131 'first_failure': 1,
132 'last_pass': 0,
133 'base_test_name': 'Unittest3.Subtest2'
134 }
135 }
136 },
137 'a_tests': {
138 'current_failure': 2,
139 'first_failure': 1,
140 'last_pass': 0,
141 'tests': {
142 'Unittest1.Subtest1': {
143 'current_failure': 2,
144 'first_failure': 1,
145 'last_pass': 0,
146 'base_test_name': 'Unittest3.Subtest2'
147 }
148 }
149 }
150 },
151 'failure_type': failure_type.TEST
152 }
153
154 expected_result = {
155 'abc_test': ['Unittest2.Subtest1']
156 }
157 result = (
158 trigger_swarming_tasks_pipeline._GetStepsThatNeedToTriggerSwarmingTasks(
159 master_name, builder_name, build_number, failure_info))
160 self.assertEqual(result, expected_result)
161
162 def testTriggerSwarmingTasksPipelineNoFailureInfo(self):
163 class _MockedTriggerSwarmingTaskPipeline(BasePipeline):
164 count = 0
165 def run(self, *_): # pragma: no cover
166 _MockedTriggerSwarmingTaskPipeline.count += 1
167 self.mock(trigger_swarming_task_pipeline, 'TriggerSwarmingTaskPipeline',
168 _MockedTriggerSwarmingTaskPipeline)
169 pipeline = TriggerSwarmingTasksPipeline('m', 'b', 1, {})
170 pipeline.start()
171 self.execute_queued_tasks()
172 self.assertEqual(_MockedTriggerSwarmingTaskPipeline.count, 0)
173
174 def testTriggerSwarmingTasksPipelineCompile(self):
175 class _MockedTriggerSwarmingTaskPipeline(BasePipeline):
176 count = 0
177 def run(self, *_): # pragma: no cover
178 _MockedTriggerSwarmingTaskPipeline.count += 1
179 self.mock(trigger_swarming_task_pipeline, 'TriggerSwarmingTaskPipeline',
180 _MockedTriggerSwarmingTaskPipeline)
181 failure_info = {
182 'failed': True,
183 'master_name': 'm',
184 'builder_name': 'b',
185 'build_number': 2,
186 'chromium_revision': None,
187 'builds': {
188 2: {
189 'blame_list': [],
190 'chromium_revision': None
191 }
192 },
193 'failed_steps': {
194 'compile': {
195 'current_failure': 2,
196 'first_failure': 1,
197 'last_pass': 0,
198 }
199 },
200 'failure_type': failure_type.COMPILE
201 }
202 pipeline = TriggerSwarmingTasksPipeline('m', 'b', 2, failure_info)
203 pipeline.start()
204 self.execute_queued_tasks()
205 self.assertEqual(_MockedTriggerSwarmingTaskPipeline.count, 0)
206
207 def testTriggerSwarmingTasksPipeline(self):
208 master_name = 'm'
209 builder_name = 'b'
210 build_number = 2
211 analysis = WfAnalysis.Create(master_name, builder_name, build_number)
212 analysis.failure_result_map = {
213 'a_tests': {
214 'Unittest1.Subtest1': 'm/b/1'
215 }
216 }
217 analysis.put()
218
219 failure_info = {
220 'failed': True,
221 'master_name': 'm',
222 'builder_name': 'b',
223 'build_number': 2,
224 'chromium_revision': None,
225 'builds': {
226 2: {
227 'blame_list': [],
228 'chromium_revision': None
229 }
230 },
231 'failed_steps': {
232 'abc_test': {
233 'current_failure': 2,
234 'first_failure': 1,
235 'last_pass': 0,
236 'tests': {
237 'Unittest2.Subtest1': {
238 'current_failure': 2,
239 'first_failure': 2,
240 'last_pass': 1,
241 'base_test_name': 'Unittest2.Subtest1'
242 },
243 'Unittest3.Subtest2': {
244 'current_failure': 2,
245 'first_failure': 1,
246 'last_pass': 0,
247 'base_test_name': 'Unittest3.Subtest2'
248 }
249 }
250 },
251 'a_tests': {
252 'current_failure': 2,
253 'first_failure': 1,
254 'last_pass': 0,
255 'tests': {
256 'Unittest1.Subtest1': {
257 'current_failure': 2,
258 'first_failure': 1,
259 'last_pass': 0,
260 'base_test_name': 'Unittest3.Subtest2'
261 }
262 }
263 }
264 },
265 'failure_type': failure_type.TEST
266 }
267
268 class _MockedTriggerSwarmingTaskPipeline(BasePipeline):
269 count = 0
270 def run(self, *_):
271 _MockedTriggerSwarmingTaskPipeline.count += 1
272 self.mock(trigger_swarming_task_pipeline, 'TriggerSwarmingTaskPipeline',
273 _MockedTriggerSwarmingTaskPipeline)
274 pipeline = TriggerSwarmingTasksPipeline(
275 master_name, builder_name, build_number, failure_info)
276 pipeline.start()
277 self.execute_queued_tasks()
278 self.assertEqual(_MockedTriggerSwarmingTaskPipeline.count, 1)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698