Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(150)

Side by Side Diff: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py

Issue 2491473002: [Findit] Implementing swarming task error detection (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 from collections import defaultdict
5 import datetime 6 import datetime
6 import logging 7 import logging
7 import time 8 import time
8 9
9 from common.http_client_appengine import HttpClientAppengine as HttpClient 10 from common.http_client_appengine import HttpClientAppengine as HttpClient
10 from common.pipeline_wrapper import BasePipeline 11 from common.pipeline_wrapper import BasePipeline
11 from model import analysis_status 12 from model import analysis_status
12 from waterfall import swarming_util 13 from waterfall import swarming_util
13 from waterfall import waterfall_config 14 from waterfall import waterfall_config
14 15
15 16
16 class ProcessBaseSwarmingTaskResultPipeline(BasePipeline): 17 class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
17 """A pipeline for monitoring swarming task and processing task result. 18 """A pipeline for monitoring swarming task and processing task result.
18 19
19 This pipeline waits for result for a swarming task and processes the result to 20 This pipeline waits for result for a swarming task and processes the result to
20 generate a dict for statuses for each test run. 21 generate a dict for statuses for each test run.
21 """ 22 """
22 23
23 HTTP_CLIENT = HttpClient() 24 HTTP_CLIENT = HttpClient()
24 25
25 def _CheckTestsRunStatuses(self, output_json): 26 def _CheckTestsRunStatuses(self, output_json, *_):
26 # Checks result status for each test run and saves the numbers accordingly. 27 """Checks result status for each test run and saves the numbers accordingly.
27 # Should be overridden by subclass.
28 raise NotImplementedError(
29 '_CheckTestsRunStatuses should be implemented in the child class')
30 28
31 def _ConvertDateTime(self, time_string): 29 Args:
32 """Convert UTC time string to datetime.datetime.""" 30 output_json (dict): A dict of all test results in the swarming task.
33 # Match the time conversion with swarming.py which elides the suffix 31
34 # when microseconds are 0. 32 Returns:
35 if not time_string: 33 tests_statuses (dict): A dict of different statuses for each test.
36 return None 34
37 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'): 35 Currently for each test, we are saving number of total runs,
38 try: 36 number of succeeded runs and number of failed runs.
39 return datetime.datetime.strptime(time_string, fmt) 37 """
40 except ValueError: 38 tests_statuses = defaultdict(lambda: defaultdict(int))
41 pass 39 if output_json:
42 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover 40 for iteration in output_json.get('per_iteration_data'):
41 for test_name, tests in iteration.iteritems():
42 tests_statuses[test_name]['total_run'] += len(tests)
43 for test in tests:
44 tests_statuses[test_name][test['status']] += 1
45
46 return tests_statuses
43 47
44 def _GetSwarmingTask(self): 48 def _GetSwarmingTask(self):
45 # Get the appropriate kind of Swarming Task (Wf or Flake). 49 # Get the appropriate kind of Swarming Task (Wf or Flake).
46 # Should be overwritten by subclass. 50 # Should be overwritten by subclass.
47 raise NotImplementedError( 51 raise NotImplementedError(
48 '_GetSwarmingTask should be implemented in the child class') 52 '_GetSwarmingTask should be implemented in the child class')
49 53
50 def _GetArgs(self): 54 def _GetArgs(self):
51 # Return list of arguments to call _CheckTestsRunStatuses with - output_json 55 # Return list of arguments to call _CheckTestsRunStatuses with - output_json
52 # Should be overwritten by subclass. 56 # Should be overwritten by subclass.
53 raise NotImplementedError( 57 raise NotImplementedError(
54 '_GetArgs should be implemented in the child class') 58 '_GetArgs should be implemented in the child class')
55 59
56 # Arguments number differs from overridden method - pylint: disable=W0221 60 def _ConvertDateTime(self, time_string):
57 def run(self, master_name, builder_name, build_number, 61 """Convert UTC time string to datetime.datetime."""
58 step_name, task_id, *args): # pragma: no cover. 62 if not time_string:
59 """ 63 return None
60 Args: 64 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
61 master_name (str): The master name. 65 # When microseconds are 0, the '.123456' suffix is elided.
62 builder_name (str): The builder name. 66 try:
63 build_number (str): The build number. 67 return datetime.datetime.strptime(time_string, fmt)
64 step_name (str): The failed test step name. 68 except ValueError:
65 task_id (str): Id for the swarming task which is triggered by Findit. 69 pass
70 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
66 71
67 Returns: 72 def _MonitorSwarmingTask(self, task_id, *call_args):
68 A dict of lists for reliable/flaky tests. 73 """Monitors the swarming task and waits for it to complete."""
69 """
70 call_args = self._GetArgs(master_name, builder_name, build_number,
71 step_name, *args)
72 assert task_id 74 assert task_id
73 timeout_hours = waterfall_config.GetSwarmingSettings().get( 75 timeout_hours = waterfall_config.GetSwarmingSettings().get(
74 'task_timeout_hours') 76 'task_timeout_hours')
75 deadline = time.time() + timeout_hours * 60 * 60 77 deadline = time.time() + timeout_hours * 60 * 60
76 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get( 78 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
77 'server_query_interval_seconds') 79 'server_query_interval_seconds')
78 task_started = False 80 task_started = False
79 task_completed = False 81 task_completed = False
80 tests_statuses = {} 82 tests_statuses = {}
81 step_name_no_platform = None 83 step_name_no_platform = None
84 task = self._GetSwarmingTask(*call_args)
82 85
83 while not task_completed: 86 while not task_completed:
84 # Keeps monitoring the swarming task, waits for it to complete. 87 data, error = swarming_util.GetSwarmingTaskResultById(
85 data = swarming_util.GetSwarmingTaskResultById(
86 task_id, self.HTTP_CLIENT) 88 task_id, self.HTTP_CLIENT)
89
90 if error:
91 # An error occurred when trying to contact the swarming server.
92 logging.error(error.get('message'))
chanli 2016/11/11 00:05:13 In swarming_util._SendRequestToServer the error me
lijeffrey 2016/11/11 20:55:41 Done.
93 task.status = analysis_status.ERROR
94 task.error = error
95 task.put()
96 break
97
87 task_state = data['state'] 98 task_state = data['state']
88 exit_code = (data['exit_code'] if 99 exit_code = (data.get('exit_code') if
89 task_state == swarming_util.STATE_COMPLETED else None) 100 task_state == swarming_util.STATE_COMPLETED else None)
90 step_name_no_platform = swarming_util.GetTagValue( 101 step_name_no_platform = (
91 data.get('tags', {}), 'ref_name') 102 step_name_no_platform or swarming_util.GetTagValue(
103 data.get('tags', {}), 'ref_name'))
104
92 if task_state not in swarming_util.STATES_RUNNING: 105 if task_state not in swarming_util.STATES_RUNNING:
93 task_completed = True 106 task_completed = True
94 task = self._GetSwarmingTask(*call_args) 107
95 if (task_state == swarming_util.STATE_COMPLETED and 108 if (task_state == swarming_util.STATE_COMPLETED and
96 int(exit_code) != swarming_util.TASK_FAILED): 109 int(exit_code) != swarming_util.TASK_FAILED):
97 outputs_ref = data.get('outputs_ref') 110 outputs_ref = data.get('outputs_ref')
98 output_json = swarming_util.GetSwarmingTaskFailureLog( 111 output_json, error = swarming_util.GetSwarmingTaskFailureLog(
99 outputs_ref, self.HTTP_CLIENT) 112 outputs_ref, self.HTTP_CLIENT)
100 tests_statuses = self._CheckTestsRunStatuses( 113
101 output_json, *call_args) 114 if error:
102 task.status = analysis_status.COMPLETED 115 logging.error(error.get('message'))
chanli 2016/11/11 00:05:13 Same here
lijeffrey 2016/11/11 20:55:41 Done.
116 task.status = analysis_status.ERROR
117 task.error = error
118 else:
119 task.status = analysis_status.COMPLETED
120
121 tests_statuses = self._CheckTestsRunStatuses(output_json, *call_args)
103 task.tests_statuses = tests_statuses 122 task.tests_statuses = tests_statuses
123 task.put()
104 else: 124 else:
125 code = int(exit_code) if exit_code is not None else (
126 swarming_util.UNKNOWN)
chanli 2016/11/11 00:05:13 I think UNKNOWN is misleading here. If task_state
105 task.status = analysis_status.ERROR 127 task.status = analysis_status.ERROR
128 task.error = {
129 'code': code,
130 'message': swarming_util.EXIT_CODE_DESCRIPTIONS[code]
131 }
132 task.put()
133
106 logging_str = 'Swarming task stopped with status: %s' % task_state 134 logging_str = 'Swarming task stopped with status: %s' % task_state
107 if exit_code: 135 if exit_code: # pragma: no cover
108 logging_str += ' and exit_code: %s - %s' % ( 136 logging_str += ' and exit_code: %s - %s' % (
109 exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)]) 137 exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[code])
110 logging.error(logging_str) 138 logging.error(logging_str)
111 139
112 priority_str = swarming_util.GetTagValue( 140 tags = data.get('tags', {})
113 data.get('tags', {}), 'priority') 141 priority_str = swarming_util.GetTagValue(tags, 'priority')
114 if priority_str: 142 if priority_str:
115 task.parameters['priority'] = int(priority_str) 143 task.parameters['priority'] = int(priority_str)
144
116 task.put() 145 task.put()
117 else: # pragma: no cover 146 else: # pragma: no cover
118 if task_state == 'RUNNING' and not task_started: 147 if task_state == 'RUNNING' and not task_started:
119 # swarming task just starts, update status. 148 # swarming task just starts, update status.
120 task_started = True 149 task_started = True
121 task = self._GetSwarmingTask(*call_args)
122 task.status = analysis_status.RUNNING 150 task.status = analysis_status.RUNNING
123 task.put() 151 task.put()
124 time.sleep(server_query_interval_seconds) 152 time.sleep(server_query_interval_seconds)
153
154 # Timeout.
125 if time.time() > deadline: 155 if time.time() > deadline:
126 # Updates status as ERROR. 156 # Updates status as ERROR.
127 task = self._GetSwarmingTask(*call_args)
128 task.status = analysis_status.ERROR 157 task.status = analysis_status.ERROR
158 task.error = {
159 'code': swarming_util.TIMED_OUT,
160 'message': 'Process swarming task result timed out'
161 }
129 task.put() 162 task.put()
130 logging.error('Swarming task timed out after %d hours.' % timeout_hours) 163 logging.error('Swarming task timed out after %d hours.' % timeout_hours)
131 break # Stops the loop and return. 164 break # Stops the loop and return.
165
132 # Update swarming task metadata. 166 # Update swarming task metadata.
133 task = self._GetSwarmingTask(*call_args) 167 task.created_time = (task.created_time or
134 task.created_time = self._ConvertDateTime(data.get('created_ts')) 168 self._ConvertDateTime(data.get('created_ts')))
135 task.started_time = self._ConvertDateTime(data.get('started_ts')) 169 task.started_time = (task.started_time or
136 task.completed_time = self._ConvertDateTime(data.get('completed_ts')) 170 self._ConvertDateTime(data.get('started_ts')))
171 task.completed_time = (task.completed_time or
172 self._ConvertDateTime(data.get('completed_ts')))
137 task.put() 173 task.put()
138 174
175 return step_name_no_platform
176
177 # Arguments number differs from overridden method - pylint: disable=W0221
178 def run(self, master_name, builder_name, build_number, step_name, task_id,
179 *args):
180 """Monitors a swarming task.
181
182 Args:
183 master_name (str): The master name.
184 builder_name (str): The builder name.
185 build_number (str): The build number.
186 step_name (str): The failed test step name.
187 task_id (str): The task id to query the swarming server on the progresss
188 of a swarming task.
189
190 Returns:
191 A dict of lists for reliable/flaky tests.
192 """
193 call_args = self._GetArgs(master_name, builder_name, build_number,
194 step_name, *args)
195 step_name_no_platform = self._MonitorSwarmingTask(task_id, *call_args)
139 return step_name, step_name_no_platform 196 return step_name, step_name_no_platform
197
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698