Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(232)

Side by Side Diff: appengine/findit/waterfall/swarming_util.py

Issue 2491473002: [Findit] Implementing swarming task error detection (Closed)
Patch Set: Rebase Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import base64 5 import base64
6 from collections import defaultdict 6 from collections import defaultdict
7 import json 7 import json
8 import logging 8 import logging
9 import urllib 9 import urllib
10 import zlib 10 import zlib
11 11
12 from google.appengine.api.urlfetch_errors import DeadlineExceededError
13 from google.appengine.api.urlfetch_errors import DownloadError
14 from google.appengine.api.urlfetch_errors import ConnectionClosedError
12 from google.appengine.ext import ndb 15 from google.appengine.ext import ndb
13 16
14 from common import auth_util 17 from common import auth_util
15 from model.wf_step import WfStep 18 from model.wf_step import WfStep
16 from waterfall import waterfall_config 19 from waterfall import waterfall_config
17 from waterfall.swarming_task_request import SwarmingTaskRequest 20 from waterfall.swarming_task_request import SwarmingTaskRequest
18 21
19 22
23 # Swarming task states.
20 STATES_RUNNING = ('RUNNING', 'PENDING') 24 STATES_RUNNING = ('RUNNING', 'PENDING')
21 STATE_COMPLETED = 'COMPLETED' 25 STATE_COMPLETED = 'COMPLETED'
22 STATES_NOT_RUNNING = ( 26 STATES_NOT_RUNNING = (
23 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED') 27 'BOT_DIED', 'CANCELED', 'COMPLETED', 'EXPIRED', 'TIMED_OUT')
24 28
29
30 # Swarming task stopped error codes.
31 BOT_DIED = 30
32 CANCELED = 40
33 EXPIRED = 50
34 TIMED_OUT = 60
35
36 STATES_NOT_RUNNING_TO_ERROR_CODES = {
37 'BOT_DIED': BOT_DIED,
38 'CANCELED': CANCELED,
39 'EXPIRED': EXPIRED,
40 'TIMED_OUT': TIMED_OUT,
41 }
42
43
44 # Urlfetch error codes.
45 URLFETCH_DOWNLOAD_ERROR = 100
46 URLFETCH_DEADLINE_EXCEEDED_ERROR = 110
47 URLFETCH_CONNECTION_CLOSED_ERROR = 120
48 EXCEEDED_MAX_RETRIES_ERROR = 210
49
50
51 # Other/miscellaneous error codes.
52 UNKNOWN = 1000
53
54
55 # Swarming task exit codes.
25 ALL_TESTS_PASSED = 0 56 ALL_TESTS_PASSED = 0
26 SOME_TESTS_FAILED = 1 57 SOME_TESTS_FAILED = 1
27 TASK_FAILED = 2 58 TASK_FAILED = 2
28 59
60 # Swarming task exit code descriptions.
29 EXIT_CODE_DESCRIPTIONS = { 61 EXIT_CODE_DESCRIPTIONS = {
30 ALL_TESTS_PASSED: 'all tests passed', 62 ALL_TESTS_PASSED: 'All tests passed',
31 SOME_TESTS_FAILED: 'some tests failed', 63 SOME_TESTS_FAILED: 'Some tests failed',
32 TASK_FAILED: 'swarming task failed' 64 TASK_FAILED: 'Swarming task failed',
33 } 65 }
34 66
35 67
36 def _SendRequestToServer(url, http_client, post_data=None): 68 def _SendRequestToServer(url, http_client, post_data=None):
37 """Sends GET/POST request to arbitrary url and returns response content.""" 69 """Sends GET/POST request to arbitrary url and returns response content."""
38 headers = {'Authorization': 'Bearer ' + auth_util.GetAuthToken()} 70 headers = {'Authorization': 'Bearer ' + auth_util.GetAuthToken()}
39 if post_data: 71 error = None
40 post_data = json.dumps(post_data, sort_keys=True, separators=(',', ':'))
41 headers['Content-Type'] = 'application/json; charset=UTF-8'
42 headers['Content-Length'] = len(post_data)
43 status_code, content = http_client.Post(url, post_data, headers=headers)
44 else:
45 status_code, content = http_client.Get(url, headers=headers)
46 72
47 if status_code != 200: 73 try:
74 if post_data:
75 post_data = json.dumps(post_data, sort_keys=True, separators=(',', ':'))
76 headers['Content-Type'] = 'application/json; charset=UTF-8'
77 headers['Content-Length'] = len(post_data)
78 status_code, content = http_client.Post(url, post_data, headers=headers)
79 else:
80 status_code, content = http_client.Get(url, headers=headers)
81 except ConnectionClosedError as e:
82 error = {
83 'code': URLFETCH_CONNECTION_CLOSED_ERROR,
84 'message': e.message
85 }
86 except DeadlineExceededError as e:
87 error = {
88 'code': URLFETCH_DEADLINE_EXCEEDED_ERROR,
89 'message': e.message
90 }
91 except DownloadError as e:
92 error = {
93 'code': URLFETCH_DOWNLOAD_ERROR,
94 'message': e.message
95 }
96 except Exception as e: # pragma: no cover
97 error = {
98 'code': UNKNOWN,
99 'message': e.message
100 }
101 # Still raise an exception here for cases not encountered before in order
102 # to see what went wrong in the logs.
103 raise e
104
105 if error or status_code != 200:
48 # The retry upon 50x (501 excluded) is automatically handled in the 106 # The retry upon 50x (501 excluded) is automatically handled in the
49 # underlying http_client. 107 # underlying http_client.
50 # By default, it retries 5 times with exponential backoff. 108 # By default, it retries 5 times with exponential backoff.
51 return None 109 error = error or {
52 return content 110 'code': EXCEEDED_MAX_RETRIES_ERROR,
111 'message': 'Max retries exceeded trying to reach %s' % url
112 }
113 logging.error(error['message'])
114 return None, error
115
116 return content, None
53 117
54 118
55 def GetSwarmingTaskRequest(task_id, http_client): 119 def GetSwarmingTaskRequest(task_id, http_client):
56 """Returns an instance of SwarmingTaskRequest representing the given task.""" 120 """Returns an instance of SwarmingTaskRequest representing the given task."""
57 swarming_server_host = waterfall_config.GetSwarmingSettings().get( 121 swarming_server_host = waterfall_config.GetSwarmingSettings().get(
58 'server_host') 122 'server_host')
59 url = ('https://%s/_ah/api/swarming/v1/task/%s/request') % ( 123 url = ('https://%s/_ah/api/swarming/v1/task/%s/request') % (
60 swarming_server_host, task_id) 124 swarming_server_host, task_id)
61 json_data = json.loads(_SendRequestToServer(url, http_client)) 125 content, error = _SendRequestToServer(url, http_client)
62 return SwarmingTaskRequest.Deserialize(json_data) 126
127 # TODO(lijeffrey): Handle/report error in calling functions.
128 if not error:
129 json_data = json.loads(content)
130 return SwarmingTaskRequest.Deserialize(json_data)
131 return None
63 132
64 133
65 def TriggerSwarmingTask(request, http_client): 134 def TriggerSwarmingTask(request, http_client):
66 """Triggers a new Swarming task for the given request. 135 """Triggers a new Swarming task for the given request.
67 136
68 The Swarming task priority will be overwritten, and extra tags might be added. 137 The Swarming task priority will be overwritten, and extra tags might be added.
69 Args: 138 Args:
70 request (SwarmingTaskRequest): A Swarming task request. 139 request (SwarmingTaskRequest): A Swarming task request.
71 http_client (RetryHttpClient): An http client with automatic retry. 140 http_client (RetryHttpClient): An http client with automatic retry.
72 """ 141 """
73 # Use a priority much lower than CQ for now (CQ's priority is 30). 142 # Use a priority much lower than CQ for now (CQ's priority is 30).
74 # Later we might use a higher priority -- a lower value here. 143 # Later we might use a higher priority -- a lower value here.
75 # Note: the smaller value, the higher priority. 144 # Note: the smaller value, the higher priority.
76 swarming_settings = waterfall_config.GetSwarmingSettings() 145 swarming_settings = waterfall_config.GetSwarmingSettings()
77 request_expiration_hours = swarming_settings.get('request_expiration_hours') 146 request_expiration_hours = swarming_settings.get('request_expiration_hours')
78 request.priority = max(100, swarming_settings.get('default_request_priority')) 147 request.priority = max(100, swarming_settings.get('default_request_priority'))
79 request.expiration_secs = request_expiration_hours * 60 * 60 148 request.expiration_secs = request_expiration_hours * 60 * 60
80 149
81 request.tags.extend(['findit:1', 'project:Chromium', 'purpose:post-commit']) 150 request.tags.extend(['findit:1', 'project:Chromium', 'purpose:post-commit'])
82 151
83 url = 'https://%s/_ah/api/swarming/v1/tasks/new' % swarming_settings.get( 152 url = 'https://%s/_ah/api/swarming/v1/tasks/new' % swarming_settings.get(
84 'server_host') 153 'server_host')
85 response_data = _SendRequestToServer(url, http_client, request.Serialize()) 154 response_data, error = _SendRequestToServer(
86 return json.loads(response_data)['task_id'] 155 url, http_client, request.Serialize())
156
157 # TODO(lijeffrey): Handle error in calling functions.
158 if not error:
159 return json.loads(response_data)['task_id'], None
160
161 return None, error
87 162
88 163
89 def ListSwarmingTasksDataByTags( 164 def ListSwarmingTasksDataByTags(
90 master_name, builder_name, build_number, http_client, 165 master_name, builder_name, build_number, http_client,
91 additional_tag_filters=None): 166 additional_tag_filters=None):
92 """Downloads tasks data from swarming server. 167 """Downloads tasks data from swarming server.
93 168
94 Args: 169 Args:
95 master_name(str): Value of the master tag. 170 master_name(str): Value of the master tag.
96 builder_name(str): Value of the buildername tag. 171 builder_name(str): Value of the buildername tag.
(...skipping 12 matching lines...) Expand all
109 base_url += '&tags=%s' % urllib.quote('%s:%s' % (tag_name, tag_value)) 184 base_url += '&tags=%s' % urllib.quote('%s:%s' % (tag_name, tag_value))
110 185
111 items = [] 186 items = []
112 cursor = None 187 cursor = None
113 188
114 while True: 189 while True:
115 if not cursor: 190 if not cursor:
116 url = base_url 191 url = base_url
117 else: 192 else:
118 url = base_url + '&cursor=%s' % urllib.quote(cursor) 193 url = base_url + '&cursor=%s' % urllib.quote(cursor)
119 new_data = _SendRequestToServer(url, http_client) 194 new_data, _ = _SendRequestToServer(url, http_client)
195
196 # TODO(lijeffrey): handle error in calling functions.
120 if not new_data: 197 if not new_data:
121 break 198 break
122 199
123 new_data_json = json.loads(new_data) 200 new_data_json = json.loads(new_data)
124 if new_data_json.get('items'): 201 if new_data_json.get('items'):
125 items.extend(new_data_json['items']) 202 items.extend(new_data_json['items'])
126 203
127 if new_data_json.get('cursor'): 204 if new_data_json.get('cursor'):
128 cursor = new_data_json['cursor'] 205 cursor = new_data_json['cursor']
129 else: 206 else:
130 break 207 break
131 208
132 return items 209 return items
133 210
134 211
135 def _GenerateIsolatedData(outputs_ref): 212 def _GenerateIsolatedData(outputs_ref):
136 if not outputs_ref: 213 if not outputs_ref:
137 return {} 214 return {}
138 return { 215 return {
139 'digest': outputs_ref['isolated'], 216 'digest': outputs_ref['isolated'],
140 'namespace': outputs_ref['namespace'], 217 'namespace': outputs_ref['namespace'],
141 'isolatedserver': outputs_ref['isolatedserver'] 218 'isolatedserver': outputs_ref['isolatedserver']
142 } 219 }
143 220
144 221
145 def GetSwarmingTaskResultById(task_id, http_client): 222 def GetSwarmingTaskResultById(task_id, http_client):
146 """Gets swarming result, checks state and returns outputs ref if needed.""" 223 """Gets swarming result, checks state and returns outputs ref if needed."""
147 base_url = ('https://%s/_ah/api/swarming/v1/task/%s/result') % ( 224 base_url = ('https://%s/_ah/api/swarming/v1/task/%s/result') % (
148 waterfall_config.GetSwarmingSettings().get('server_host'), task_id) 225 waterfall_config.GetSwarmingSettings().get('server_host'), task_id)
149 data = _SendRequestToServer(base_url, http_client) 226 json_data = {}
150 json_data = json.loads(data)
151 227
152 return json_data 228 data, error = _SendRequestToServer(base_url, http_client)
229
230 if not error:
231 json_data = json.loads(data)
232
233 return json_data, error
153 234
154 235
155 def GetSwarmingTaskFailureLog(outputs_ref, http_client): 236 def GetSwarmingTaskFailureLog(outputs_ref, http_client):
156 """Downloads failure log from isolated server.""" 237 """Downloads failure log from isolated server."""
157 isolated_data = _GenerateIsolatedData(outputs_ref) 238 isolated_data = _GenerateIsolatedData(outputs_ref)
158 return _DownloadTestResults(isolated_data, http_client) 239 return _DownloadTestResults(isolated_data, http_client)
159 240
160 241
161 def GetTagValue(tags, tag_name): 242 def GetTagValue(tags, tag_name):
162 """Returns the content for a specific tag.""" 243 """Returns the content for a specific tag."""
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
237 return None 318 return None
238 319
239 post_data = { 320 post_data = {
240 'digest': isolated_data['digest'], 321 'digest': isolated_data['digest'],
241 'namespace': { 322 'namespace': {
242 'namespace': isolated_data['namespace'] 323 'namespace': isolated_data['namespace']
243 } 324 }
244 } 325 }
245 url = '%s/_ah/api/isolateservice/v1/retrieve' % ( 326 url = '%s/_ah/api/isolateservice/v1/retrieve' % (
246 isolated_data['isolatedserver']) 327 isolated_data['isolatedserver'])
247 content = _SendRequestToServer(url, http_client, post_data) 328
248 return content 329 return _SendRequestToServer(url, http_client, post_data)
249 330
250 331
251 def _GetOutputJsonHash(content): 332 def _GetOutputJsonHash(content):
252 """Gets hash for output.json. 333 """Gets hash for output.json.
253 334
254 Parses response content of the request using hash for .isolated file and 335 Parses response content of the request using hash for .isolated file and
255 returns the hash for output.json file. 336 returns the hash for output.json file.
256 337
257 Args: 338 Args:
258 content (string): Content returned by the POST request to isolated server 339 content (string): Content returned by the POST request to isolated server
259 for hash to output.json. 340 for hash to output.json.
260 """ 341 """
261 content_json = json.loads(content) 342 content_json = json.loads(content)
262 content_string = zlib.decompress(base64.b64decode(content_json['content'])) 343 content_string = zlib.decompress(base64.b64decode(content_json['content']))
263 json_result = json.loads(content_string) 344 json_result = json.loads(content_string)
264 return json_result.get('files', {}).get('output.json', {}).get('h') 345 return json_result.get('files', {}).get('output.json', {}).get('h')
265 346
266 347
267 def _RetrieveOutputJsonFile(output_json_content, http_client): 348 def _RetrieveOutputJsonFile(output_json_content, http_client):
268 """Downloads output.json file from isolated server or process it directly. 349 """Downloads output.json file from isolated server or process it directly.
269 350
270 If there is a url provided, send get request to that url to download log; 351 If there is a url provided, send get request to that url to download log;
271 else the log would be in content so use it directly. 352 else the log would be in content so use it directly.
272 """ 353 """
273 json_content = json.loads(output_json_content) 354 json_content = json.loads(output_json_content)
274 output_json_url = json_content.get('url') 355 output_json_url = json_content.get('url')
275 if output_json_url: 356 if output_json_url:
276 get_content = _SendRequestToServer(output_json_url, http_client) 357 get_content, _ = _SendRequestToServer(output_json_url, http_client)
358 # TODO(lijeffrey): handle error in calling function.
277 elif json_content.get('content'): 359 elif json_content.get('content'):
278 get_content = base64.b64decode(json_content['content']) 360 get_content = base64.b64decode(json_content['content'])
279 else: # pragma: no cover 361 else: # pragma: no cover
280 get_content = None # Just for precausion. 362 get_content = None # Just for precausion.
281 try: 363 try:
282 return json.loads(zlib.decompress(get_content)) if get_content else None 364 return json.loads(zlib.decompress(get_content)) if get_content else None
283 except ValueError: # pragma: no cover 365 except ValueError: # pragma: no cover
284 logging.info( 366 logging.info(
285 'swarming result is invalid: %s' % zlib.decompress(get_content)) 367 'swarming result is invalid: %s' % zlib.decompress(get_content))
286 return None 368 return None
287 369
370
288 def _DownloadTestResults(isolated_data, http_client): 371 def _DownloadTestResults(isolated_data, http_client):
289 """Downloads the output.json file and returns the json object.""" 372 """Downloads the output.json file and returns the json object."""
290 # First POST request to get hash for the output.json file. 373 # First POST request to get hash for the output.json file.
291 content = _FetchOutputJsonInfoFromIsolatedServer( 374 content, error = _FetchOutputJsonInfoFromIsolatedServer(
292 isolated_data, http_client) 375 isolated_data, http_client)
293 if not content: 376 if error:
294 return None 377 return None, error
378
295 output_json_hash = _GetOutputJsonHash(content) 379 output_json_hash = _GetOutputJsonHash(content)
296 if not output_json_hash: 380 if not output_json_hash:
297 return None 381 return None, None
298 382
299 # Second POST request to get the redirect url for the output.json file. 383 # Second POST request to get the redirect url for the output.json file.
300 data_for_output_json = { 384 data_for_output_json = {
301 'digest': output_json_hash, 385 'digest': output_json_hash,
302 'namespace': isolated_data['namespace'], 386 'namespace': isolated_data['namespace'],
303 'isolatedserver': isolated_data['isolatedserver'] 387 'isolatedserver': isolated_data['isolatedserver']
304 } 388 }
305 output_json_content = _FetchOutputJsonInfoFromIsolatedServer( 389
390 output_json_content, error = _FetchOutputJsonInfoFromIsolatedServer(
306 data_for_output_json, http_client) 391 data_for_output_json, http_client)
307 if not output_json_content: 392 if error:
308 return None 393 return None, error
309 394
310 # GET Request to get output.json file. 395 # GET Request to get output.json file.
311 return _RetrieveOutputJsonFile( 396 return _RetrieveOutputJsonFile(output_json_content, http_client), None
312 output_json_content, http_client)
313 397
314 398
315 def _MergeListsOfDicts(merged, shard): 399 def _MergeListsOfDicts(merged, shard):
316 output = [] 400 output = []
317 for i in xrange(max(len(merged), len(shard))): 401 for i in xrange(max(len(merged), len(shard))):
318 merged_dict = merged[i] if i < len(merged) else {} 402 merged_dict = merged[i] if i < len(merged) else {}
319 shard_dict = shard[i] if i < len(shard) else {} 403 shard_dict = shard[i] if i < len(shard) else {}
320 output_dict = merged_dict.copy() 404 output_dict = merged_dict.copy()
321 output_dict.update(shard_dict) 405 output_dict.update(shard_dict)
322 output.append(output_dict) 406 output.append(output_dict)
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
365 shard_result.get('per_iteration_data', [])) 449 shard_result.get('per_iteration_data', []))
366 merged_results['all_tests'] = sorted(merged_results['all_tests']) 450 merged_results['all_tests'] = sorted(merged_results['all_tests'])
367 return merged_results 451 return merged_results
368 452
369 453
370 def RetrieveShardedTestResultsFromIsolatedServer( 454 def RetrieveShardedTestResultsFromIsolatedServer(
371 list_isolated_data, http_client): 455 list_isolated_data, http_client):
372 """Gets test results from isolated server and merge the results.""" 456 """Gets test results from isolated server and merge the results."""
373 shard_results = [] 457 shard_results = []
374 for isolated_data in list_isolated_data: 458 for isolated_data in list_isolated_data:
375 output_json = _DownloadTestResults(isolated_data, http_client) 459 output_json, _ = _DownloadTestResults(isolated_data, http_client)
376 if not output_json: 460 if not output_json:
461 # TODO(lijeffrey): Report/handle error returned from _DownloadTestResults.
377 return None 462 return None
378 shard_results.append(output_json) 463 shard_results.append(output_json)
379 464
380 if len(list_isolated_data) == 1: 465 if len(list_isolated_data) == 1:
381 return shard_results[0] 466 return shard_results[0]
382 return _MergeSwarmingTestShards(shard_results) 467 return _MergeSwarmingTestShards(shard_results)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698