Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(235)

Side by Side Diff: appengine/findit/waterfall/swarming_util.py

Issue 2491473002: [Findit] Implementing swarming task error detection (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import base64 5 import base64
6 from collections import defaultdict 6 from collections import defaultdict
7 import json 7 import json
8 import logging 8 import logging
9 import urllib 9 import urllib
10 import zlib 10 import zlib
11 11
12 from google.appengine.api.urlfetch_errors import (
13 DeadlineExceededError, DownloadError, ConnectionClosedError)
chanli 2016/11/11 00:05:13 Nit: according to https://engdoc.corp.google.com/e
lijeffrey 2016/11/11 20:55:41 Done.
12 from google.appengine.ext import ndb 14 from google.appengine.ext import ndb
13 15
14 from common import auth_util 16 from common import auth_util
15 from model.wf_step import WfStep 17 from model.wf_step import WfStep
16 from waterfall import waterfall_config 18 from waterfall import waterfall_config
17 from waterfall.swarming_task_request import SwarmingTaskRequest 19 from waterfall.swarming_task_request import SwarmingTaskRequest
18 20
19 21
20 STATES_RUNNING = ('RUNNING', 'PENDING') 22 STATES_RUNNING = ('RUNNING', 'PENDING')
21 STATE_COMPLETED = 'COMPLETED' 23 STATE_COMPLETED = 'COMPLETED'
22 STATES_NOT_RUNNING = ( 24 STATES_NOT_RUNNING = (
23 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED') 25 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED')
24 26
27 # General error codes.
28 UNKNOWN = 0
chanli 2016/11/11 00:05:13 Based on process_base_swarming_task_result_pipelin
lijeffrey 2016/11/11 20:55:41 Yes that probably is confusing so i made UNKNOWN i
29
chanli 2016/11/11 00:05:13 In my opinion, since they are all error codes so t
lijeffrey 2016/11/11 20:55:41 Done.
30
31 # Urlfetch error codes.
32 URLFETCH_DOWNLOAD_ERROR = 100
33 URLFETCH_DEADLINE_EXCEEDED_ERROR = 110
34 URLFETCH_CONNECTION_CLOSED_ERROR = 120
35 EXCEEDED_MAX_RETRIES_ERROR = 210
chanli 2016/11/11 00:05:13 Maybe we can use enum for error codes?
lijeffrey 2016/11/11 20:55:41 I don't think there's a python enum?
36
37
38 # Swarming task exit codes.
chanli 2016/11/11 00:05:13 Should we use enum here?
chanli 2016/11/11 00:05:13 I was confused then I realize you mixed swarming s
lijeffrey 2016/11/11 20:55:41 Moved to a separate dict
25 ALL_TESTS_PASSED = 0 39 ALL_TESTS_PASSED = 0
26 SOME_TESTS_FAILED = 1 40 SOME_TESTS_FAILED = 1
27 TASK_FAILED = 2 41 TASK_FAILED = 2
42 TIMED_OUT = 3
43 EXPIRED = 4
44 BOT_DIED = 5
45 CANCELED = 6
28 46
47
48 # Swarming task exit code descriptions.
29 EXIT_CODE_DESCRIPTIONS = { 49 EXIT_CODE_DESCRIPTIONS = {
30 ALL_TESTS_PASSED: 'all tests passed', 50 UNKNOWN: 'Unknown',
31 SOME_TESTS_FAILED: 'some tests failed', 51 ALL_TESTS_PASSED: 'All tests passed',
32 TASK_FAILED: 'swarming task failed' 52 SOME_TESTS_FAILED: 'Some tests failed',
53 TASK_FAILED: 'Swarming task failed',
54 TIMED_OUT: 'Swarming task timed out',
55 EXPIRED: 'Swarming task expired',
56 BOT_DIED: 'Swarming task bot died',
57 CANCELED: 'Swarming task was canceled',
33 } 58 }
34 59
35 60
36 def _SendRequestToServer(url, http_client, post_data=None): 61 def _SendRequestToServer(url, http_client, post_data=None):
37 """Sends GET/POST request to arbitrary url and returns response content.""" 62 """Sends GET/POST request to arbitrary url and returns response content."""
38 headers = {'Authorization': 'Bearer ' + auth_util.GetAuthToken()} 63 headers = {'Authorization': 'Bearer ' + auth_util.GetAuthToken()}
39 if post_data: 64 error = None
40 post_data = json.dumps(post_data, sort_keys=True, separators=(',', ':'))
41 headers['Content-Type'] = 'application/json; charset=UTF-8'
42 headers['Content-Length'] = len(post_data)
43 status_code, content = http_client.Post(url, post_data, headers=headers)
44 else:
45 status_code, content = http_client.Get(url, headers=headers)
46 65
47 if status_code != 200: 66 try:
67 if post_data:
68 post_data = json.dumps(post_data, sort_keys=True, separators=(',', ':'))
69 headers['Content-Type'] = 'application/json; charset=UTF-8'
70 headers['Content-Length'] = len(post_data)
71 status_code, content = http_client.Post(url, post_data, headers=headers)
72 else:
73 status_code, content = http_client.Get(url, headers=headers)
74 except ConnectionClosedError as e:
75 error = {
76 'code': URLFETCH_CONNECTION_CLOSED_ERROR,
77 'message': e.message
78 }
79 except DeadlineExceededError as e:
80 error = {
81 'code': URLFETCH_DEADLINE_EXCEEDED_ERROR,
82 'message': e.message
83 }
84 except DownloadError as e:
85 error = {
86 'code': URLFETCH_DOWNLOAD_ERROR,
87 'message': e.message
88 }
89 except Exception as e: # pragma: no cover
90 error = {
91 'code': UNKNOWN,
92 'message': e.message
93 }
94 # Still raise an exception here for cases not encountered before in order
95 # to see what went wrong in the logs.
96 raise e
97
98 if error or status_code != 200:
48 # The retry upon 50x (501 excluded) is automatically handled in the 99 # The retry upon 50x (501 excluded) is automatically handled in the
49 # underlying http_client. 100 # underlying http_client.
50 # By default, it retries 5 times with exponential backoff. 101 # By default, it retries 5 times with exponential backoff.
51 return None 102 error = error or {
52 return content 103 'code': EXCEEDED_MAX_RETRIES_ERROR,
104 'message': 'Max retries exceeded trying to reach %s' % url
105 }
106 logging.error(error['message'])
107 return None, error
108
109 return content, None
53 110
54 111
55 def GetSwarmingTaskRequest(task_id, http_client): 112 def GetSwarmingTaskRequest(task_id, http_client):
56 """Returns an instance of SwarmingTaskRequest representing the given task.""" 113 """Returns an instance of SwarmingTaskRequest representing the given task."""
57 swarming_server_host = waterfall_config.GetSwarmingSettings().get( 114 swarming_server_host = waterfall_config.GetSwarmingSettings().get(
58 'server_host') 115 'server_host')
59 url = ('https://%s/_ah/api/swarming/v1/task/%s/request') % ( 116 url = ('https://%s/_ah/api/swarming/v1/task/%s/request') % (
60 swarming_server_host, task_id) 117 swarming_server_host, task_id)
61 json_data = json.loads(_SendRequestToServer(url, http_client)) 118 content, error = _SendRequestToServer(url, http_client)
62 return SwarmingTaskRequest.Deserialize(json_data) 119
120 # TODO(lijeffrey): Handle/report error in calling functions.
121 if not error:
122 json_data = json.loads(content)
123 return SwarmingTaskRequest.Deserialize(json_data)
124 return None
63 125
64 126
65 def TriggerSwarmingTask(request, http_client): 127 def TriggerSwarmingTask(request, http_client):
66 """Triggers a new Swarming task for the given request. 128 """Triggers a new Swarming task for the given request.
67 129
68 The Swarming task priority will be overwritten, and extra tags might be added. 130 The Swarming task priority will be overwritten, and extra tags might be added.
69 Args: 131 Args:
70 request (SwarmingTaskRequest): A Swarming task request. 132 request (SwarmingTaskRequest): A Swarming task request.
71 http_client (RetryHttpClient): An http client with automatic retry. 133 http_client (RetryHttpClient): An http client with automatic retry.
72 """ 134 """
73 # Use a priority much lower than CQ for now (CQ's priority is 30). 135 # Use a priority much lower than CQ for now (CQ's priority is 30).
74 # Later we might use a higher priority -- a lower value here. 136 # Later we might use a higher priority -- a lower value here.
75 # Note: the smaller value, the higher priority. 137 # Note: the smaller value, the higher priority.
76 swarming_settings = waterfall_config.GetSwarmingSettings() 138 swarming_settings = waterfall_config.GetSwarmingSettings()
77 request_expiration_hours = swarming_settings.get('request_expiration_hours') 139 request_expiration_hours = swarming_settings.get('request_expiration_hours')
78 request.priority = max(100, swarming_settings.get('default_request_priority')) 140 request.priority = max(100, swarming_settings.get('default_request_priority'))
79 request.expiration_secs = request_expiration_hours * 60 * 60 141 request.expiration_secs = request_expiration_hours * 60 * 60
80 142
81 request.tags.extend(['findit:1', 'project:Chromium', 'purpose:post-commit']) 143 request.tags.extend(['findit:1', 'project:Chromium', 'purpose:post-commit'])
82 144
83 url = 'https://%s/_ah/api/swarming/v1/tasks/new' % swarming_settings.get( 145 url = 'https://%s/_ah/api/swarming/v1/tasks/new' % swarming_settings.get(
84 'server_host') 146 'server_host')
85 response_data = _SendRequestToServer(url, http_client, request.Serialize()) 147 response_data, error = _SendRequestToServer(
86 return json.loads(response_data)['task_id'] 148 url, http_client, request.Serialize())
149
150 # TODO(lijeffrey): Handle error in calling functions.
151 if not error:
152 return json.loads(response_data)['task_id'], None
153
154 return None, error
87 155
88 156
89 def ListSwarmingTasksDataByTags( 157 def ListSwarmingTasksDataByTags(
90 master_name, builder_name, build_number, http_client, 158 master_name, builder_name, build_number, http_client,
91 additional_tag_filters=None): 159 additional_tag_filters=None):
92 """Downloads tasks data from swarming server. 160 """Downloads tasks data from swarming server.
93 161
94 Args: 162 Args:
95 master_name(str): Value of the master tag. 163 master_name(str): Value of the master tag.
96 builder_name(str): Value of the buildername tag. 164 builder_name(str): Value of the buildername tag.
(...skipping 12 matching lines...) Expand all
109 base_url += '&tags=%s' % urllib.quote('%s:%s' % (tag_name, tag_value)) 177 base_url += '&tags=%s' % urllib.quote('%s:%s' % (tag_name, tag_value))
110 178
111 items = [] 179 items = []
112 cursor = None 180 cursor = None
113 181
114 while True: 182 while True:
115 if not cursor: 183 if not cursor:
116 url = base_url 184 url = base_url
117 else: 185 else:
118 url = base_url + '&cursor=%s' % urllib.quote(cursor) 186 url = base_url + '&cursor=%s' % urllib.quote(cursor)
119 new_data = _SendRequestToServer(url, http_client) 187 new_data, _ = _SendRequestToServer(url, http_client)
188
189 # TODO(lijeffrey): handle error in calling functions.
120 if not new_data: 190 if not new_data:
121 break 191 break
122 192
123 new_data_json = json.loads(new_data) 193 new_data_json = json.loads(new_data)
124 if new_data_json.get('items'): 194 if new_data_json.get('items'):
125 items.extend(new_data_json['items']) 195 items.extend(new_data_json['items'])
126 196
127 if new_data_json.get('cursor'): 197 if new_data_json.get('cursor'):
128 cursor = new_data_json['cursor'] 198 cursor = new_data_json['cursor']
129 else: 199 else:
130 break 200 break
131 201
132 return items 202 return items
133 203
134 204
135 def _GenerateIsolatedData(outputs_ref): 205 def _GenerateIsolatedData(outputs_ref):
136 if not outputs_ref: 206 if not outputs_ref:
137 return {} 207 return {}
138 return { 208 return {
139 'digest': outputs_ref['isolated'], 209 'digest': outputs_ref['isolated'],
140 'namespace': outputs_ref['namespace'], 210 'namespace': outputs_ref['namespace'],
141 'isolatedserver': outputs_ref['isolatedserver'] 211 'isolatedserver': outputs_ref['isolatedserver']
142 } 212 }
143 213
144 214
145 def GetSwarmingTaskResultById(task_id, http_client): 215 def GetSwarmingTaskResultById(task_id, http_client):
146 """Gets swarming result, checks state and returns outputs ref if needed.""" 216 """Gets swarming result, checks state and returns outputs ref if needed."""
147 base_url = ('https://%s/_ah/api/swarming/v1/task/%s/result') % ( 217 base_url = ('https://%s/_ah/api/swarming/v1/task/%s/result') % (
148 waterfall_config.GetSwarmingSettings().get('server_host'), task_id) 218 waterfall_config.GetSwarmingSettings().get('server_host'), task_id)
149 data = _SendRequestToServer(base_url, http_client) 219 json_data = {}
150 json_data = json.loads(data)
151 220
152 return json_data 221 data, error = _SendRequestToServer(base_url, http_client)
222
223 if not error:
224 json_data = json.loads(data)
225
226 return json_data, error
153 227
154 228
155 def GetSwarmingTaskFailureLog(outputs_ref, http_client): 229 def GetSwarmingTaskFailureLog(outputs_ref, http_client):
156 """Downloads failure log from isolated server.""" 230 """Downloads failure log from isolated server."""
157 isolated_data = _GenerateIsolatedData(outputs_ref) 231 isolated_data = _GenerateIsolatedData(outputs_ref)
158 return _DownloadTestResults(isolated_data, http_client) 232 return _DownloadTestResults(isolated_data, http_client)
159 233
160 234
161 def GetTagValue(tags, tag_name): 235 def GetTagValue(tags, tag_name):
162 """Returns the content for a specific tag.""" 236 """Returns the content for a specific tag."""
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
237 return None 311 return None
238 312
239 post_data = { 313 post_data = {
240 'digest': isolated_data['digest'], 314 'digest': isolated_data['digest'],
241 'namespace': { 315 'namespace': {
242 'namespace': isolated_data['namespace'] 316 'namespace': isolated_data['namespace']
243 } 317 }
244 } 318 }
245 url = '%s/_ah/api/isolateservice/v1/retrieve' % ( 319 url = '%s/_ah/api/isolateservice/v1/retrieve' % (
246 isolated_data['isolatedserver']) 320 isolated_data['isolatedserver'])
247 content = _SendRequestToServer(url, http_client, post_data) 321
248 return content 322 return _SendRequestToServer(url, http_client, post_data)
249 323
250 324
251 def _GetOutputJsonHash(content): 325 def _GetOutputJsonHash(content):
252 """Gets hash for output.json. 326 """Gets hash for output.json.
253 327
254 Parses response content of the request using hash for .isolated file and 328 Parses response content of the request using hash for .isolated file and
255 returns the hash for output.json file. 329 returns the hash for output.json file.
256 330
257 Args: 331 Args:
258 content (string): Content returned by the POST request to isolated server 332 content (string): Content returned by the POST request to isolated server
259 for hash to output.json. 333 for hash to output.json.
260 """ 334 """
261 content_json = json.loads(content) 335 content_json = json.loads(content)
262 content_string = zlib.decompress(base64.b64decode(content_json['content'])) 336 content_string = zlib.decompress(base64.b64decode(content_json['content']))
263 json_result = json.loads(content_string) 337 json_result = json.loads(content_string)
264 return json_result.get('files', {}).get('output.json', {}).get('h') 338 return json_result.get('files', {}).get('output.json', {}).get('h')
265 339
266 340
267 def _RetrieveOutputJsonFile(output_json_content, http_client): 341 def _RetrieveOutputJsonFile(output_json_content, http_client):
268 """Downloads output.json file from isolated server or process it directly. 342 """Downloads output.json file from isolated server or process it directly.
269 343
270 If there is a url provided, send get request to that url to download log; 344 If there is a url provided, send get request to that url to download log;
271 else the log would be in content so use it directly. 345 else the log would be in content so use it directly.
272 """ 346 """
273 json_content = json.loads(output_json_content) 347 json_content = json.loads(output_json_content)
274 output_json_url = json_content.get('url') 348 output_json_url = json_content.get('url')
275 if output_json_url: 349 if output_json_url:
276 get_content = _SendRequestToServer(output_json_url, http_client) 350 get_content, _ = _SendRequestToServer(output_json_url, http_client)
351 # TODO(lijeffrey): handle error in calling function.
277 elif json_content.get('content'): 352 elif json_content.get('content'):
278 get_content = base64.b64decode(json_content['content']) 353 get_content = base64.b64decode(json_content['content'])
279 else: # pragma: no cover 354 else: # pragma: no cover
280 get_content = None # Just for precausion. 355 get_content = None # Just for precausion.
281 try: 356 try:
282 return json.loads(zlib.decompress(get_content)) if get_content else None 357 return json.loads(zlib.decompress(get_content)) if get_content else None
283 except ValueError: # pragma: no cover 358 except ValueError: # pragma: no cover
284 logging.info( 359 logging.info(
285 'swarming result is invalid: %s' % zlib.decompress(get_content)) 360 'swarming result is invalid: %s' % zlib.decompress(get_content))
286 return None 361 return None
287 362
363
288 def _DownloadTestResults(isolated_data, http_client): 364 def _DownloadTestResults(isolated_data, http_client):
289 """Downloads the output.json file and returns the json object.""" 365 """Downloads the output.json file and returns the json object."""
290 # First POST request to get hash for the output.json file. 366 # First POST request to get hash for the output.json file.
291 content = _FetchOutputJsonInfoFromIsolatedServer( 367 content, error = _FetchOutputJsonInfoFromIsolatedServer(
292 isolated_data, http_client) 368 isolated_data, http_client)
293 if not content: 369 if error:
294 return None 370 return None, error
371
295 output_json_hash = _GetOutputJsonHash(content) 372 output_json_hash = _GetOutputJsonHash(content)
296 if not output_json_hash: 373 if not output_json_hash:
297 return None 374 return None, None
298 375
299 # Second POST request to get the redirect url for the output.json file. 376 # Second POST request to get the redirect url for the output.json file.
300 data_for_output_json = { 377 data_for_output_json = {
301 'digest': output_json_hash, 378 'digest': output_json_hash,
302 'namespace': isolated_data['namespace'], 379 'namespace': isolated_data['namespace'],
303 'isolatedserver': isolated_data['isolatedserver'] 380 'isolatedserver': isolated_data['isolatedserver']
304 } 381 }
305 output_json_content = _FetchOutputJsonInfoFromIsolatedServer( 382
383 output_json_content, error = _FetchOutputJsonInfoFromIsolatedServer(
306 data_for_output_json, http_client) 384 data_for_output_json, http_client)
307 if not output_json_content: 385 if error:
308 return None 386 return None, error
309 387
310 # GET Request to get output.json file. 388 # GET Request to get output.json file.
311 return _RetrieveOutputJsonFile( 389 return _RetrieveOutputJsonFile(output_json_content, http_client), None
312 output_json_content, http_client)
313 390
314 391
315 def _MergeListsOfDicts(merged, shard): 392 def _MergeListsOfDicts(merged, shard):
316 output = [] 393 output = []
317 for i in xrange(max(len(merged), len(shard))): 394 for i in xrange(max(len(merged), len(shard))):
318 merged_dict = merged[i] if i < len(merged) else {} 395 merged_dict = merged[i] if i < len(merged) else {}
319 shard_dict = shard[i] if i < len(shard) else {} 396 shard_dict = shard[i] if i < len(shard) else {}
320 output_dict = merged_dict.copy() 397 output_dict = merged_dict.copy()
321 output_dict.update(shard_dict) 398 output_dict.update(shard_dict)
322 output.append(output_dict) 399 output.append(output_dict)
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
365 shard_result.get('per_iteration_data', [])) 442 shard_result.get('per_iteration_data', []))
366 merged_results['all_tests'] = sorted(merged_results['all_tests']) 443 merged_results['all_tests'] = sorted(merged_results['all_tests'])
367 return merged_results 444 return merged_results
368 445
369 446
370 def RetrieveShardedTestResultsFromIsolatedServer( 447 def RetrieveShardedTestResultsFromIsolatedServer(
371 list_isolated_data, http_client): 448 list_isolated_data, http_client):
372 """Gets test results from isolated server and merge the results.""" 449 """Gets test results from isolated server and merge the results."""
373 shard_results = [] 450 shard_results = []
374 for isolated_data in list_isolated_data: 451 for isolated_data in list_isolated_data:
375 output_json = _DownloadTestResults(isolated_data, http_client) 452 output_json, _ = _DownloadTestResults(isolated_data, http_client)
376 if not output_json: 453 if not output_json:
454 # TODO(lijeffrey): Report/handle error returned from _DownloadTestResults.
377 return None 455 return None
378 shard_results.append(output_json) 456 shard_results.append(output_json)
379 457
380 if len(list_isolated_data) == 1: 458 if len(list_isolated_data) == 1:
381 return shard_results[0] 459 return shard_results[0]
382 return _MergeSwarmingTestShards(shard_results) 460 return _MergeSwarmingTestShards(shard_results)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698