Chromium Code Reviews| Index: appengine/findit/waterfall/swarming_util.py |
| diff --git a/appengine/findit/waterfall/swarming_util.py b/appengine/findit/waterfall/swarming_util.py |
| index 403a25a1e08fde887186c18279727e9a4864b39d..0a23a9dd219b914a2829ae8336678f60e9cfe76c 100644 |
| --- a/appengine/findit/waterfall/swarming_util.py |
| +++ b/appengine/findit/waterfall/swarming_util.py |
| @@ -9,6 +9,8 @@ import logging |
| import urllib |
| import zlib |
| +from google.appengine.api.urlfetch_errors import ( |
| + DeadlineExceededError, DownloadError, ConnectionClosedError) |
|
chanli
2016/11/11 00:05:13
Nit: according to https://engdoc.corp.google.com/e
lijeffrey
2016/11/11 20:55:41
Done.
|
| from google.appengine.ext import ndb |
| from common import auth_util |
| @@ -22,34 +24,89 @@ STATE_COMPLETED = 'COMPLETED' |
| STATES_NOT_RUNNING = ( |
| 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED') |
| +# General error codes. |
| +UNKNOWN = 0 |
|
chanli
2016/11/11 00:05:13
Based on process_base_swarming_task_result_pipelin
lijeffrey
2016/11/11 20:55:41
Yes that probably is confusing so i made UNKNOWN i
|
| + |
|
chanli
2016/11/11 00:05:13
In my opinion, since they are all error codes so t
lijeffrey
2016/11/11 20:55:41
Done.
|
| + |
| +# Urlfetch error codes. |
| +URLFETCH_DOWNLOAD_ERROR = 100 |
| +URLFETCH_DEADLINE_EXCEEDED_ERROR = 110 |
| +URLFETCH_CONNECTION_CLOSED_ERROR = 120 |
| +EXCEEDED_MAX_RETRIES_ERROR = 210 |
|
chanli
2016/11/11 00:05:13
Maybe we can use enum for error codes?
lijeffrey
2016/11/11 20:55:41
I don't think there's a python enum?
|
| + |
| + |
| +# Swarming task exit codes. |
|
chanli
2016/11/11 00:05:13
Should we use enum here?
chanli
2016/11/11 00:05:13
I was confused then I realize you mixed swarming s
lijeffrey
2016/11/11 20:55:41
Moved to a separate dict
|
| ALL_TESTS_PASSED = 0 |
| SOME_TESTS_FAILED = 1 |
| TASK_FAILED = 2 |
| +TIMED_OUT = 3 |
| +EXPIRED = 4 |
| +BOT_DIED = 5 |
| +CANCELED = 6 |
| + |
| +# Swarming task exit code descriptions. |
| EXIT_CODE_DESCRIPTIONS = { |
| - ALL_TESTS_PASSED: 'all tests passed', |
| - SOME_TESTS_FAILED: 'some tests failed', |
| - TASK_FAILED: 'swarming task failed' |
| + UNKNOWN: 'Unknown', |
| + ALL_TESTS_PASSED: 'All tests passed', |
| + SOME_TESTS_FAILED: 'Some tests failed', |
| + TASK_FAILED: 'Swarming task failed', |
| + TIMED_OUT: 'Swarming task timed out', |
| + EXPIRED: 'Swarming task expired', |
| + BOT_DIED: 'Swarming task bot died', |
| + CANCELED: 'Swarming task was canceled', |
| } |
| def _SendRequestToServer(url, http_client, post_data=None): |
| """Sends GET/POST request to arbitrary url and returns response content.""" |
| headers = {'Authorization': 'Bearer ' + auth_util.GetAuthToken()} |
| - if post_data: |
| - post_data = json.dumps(post_data, sort_keys=True, separators=(',', ':')) |
| - headers['Content-Type'] = 'application/json; charset=UTF-8' |
| - headers['Content-Length'] = len(post_data) |
| - status_code, content = http_client.Post(url, post_data, headers=headers) |
| - else: |
| - status_code, content = http_client.Get(url, headers=headers) |
| - |
| - if status_code != 200: |
| + error = None |
| + |
| + try: |
| + if post_data: |
| + post_data = json.dumps(post_data, sort_keys=True, separators=(',', ':')) |
| + headers['Content-Type'] = 'application/json; charset=UTF-8' |
| + headers['Content-Length'] = len(post_data) |
| + status_code, content = http_client.Post(url, post_data, headers=headers) |
| + else: |
| + status_code, content = http_client.Get(url, headers=headers) |
| + except ConnectionClosedError as e: |
| + error = { |
| + 'code': URLFETCH_CONNECTION_CLOSED_ERROR, |
| + 'message': e.message |
| + } |
| + except DeadlineExceededError as e: |
| + error = { |
| + 'code': URLFETCH_DEADLINE_EXCEEDED_ERROR, |
| + 'message': e.message |
| + } |
| + except DownloadError as e: |
| + error = { |
| + 'code': URLFETCH_DOWNLOAD_ERROR, |
| + 'message': e.message |
| + } |
| + except Exception as e: # pragma: no cover |
| + error = { |
| + 'code': UNKNOWN, |
| + 'message': e.message |
| + } |
| + # Still raise an exception here for cases not encountered before in order |
| + # to see what went wrong in the logs. |
| + raise e |
| + |
| + if error or status_code != 200: |
| # The retry upon 50x (501 excluded) is automatically handled in the |
| # underlying http_client. |
| # By default, it retries 5 times with exponential backoff. |
| - return None |
| - return content |
| + error = error or { |
| + 'code': EXCEEDED_MAX_RETRIES_ERROR, |
| + 'message': 'Max retries exceeded trying to reach %s' % url |
| + } |
| + logging.error(error['message']) |
| + return None, error |
| + |
| + return content, None |
| def GetSwarmingTaskRequest(task_id, http_client): |
| @@ -58,8 +115,13 @@ def GetSwarmingTaskRequest(task_id, http_client): |
| 'server_host') |
| url = ('https://%s/_ah/api/swarming/v1/task/%s/request') % ( |
| swarming_server_host, task_id) |
| - json_data = json.loads(_SendRequestToServer(url, http_client)) |
| - return SwarmingTaskRequest.Deserialize(json_data) |
| + content, error = _SendRequestToServer(url, http_client) |
| + |
| + # TODO(lijeffrey): Handle/report error in calling functions. |
| + if not error: |
| + json_data = json.loads(content) |
| + return SwarmingTaskRequest.Deserialize(json_data) |
| + return None |
| def TriggerSwarmingTask(request, http_client): |
| @@ -82,8 +144,14 @@ def TriggerSwarmingTask(request, http_client): |
| url = 'https://%s/_ah/api/swarming/v1/tasks/new' % swarming_settings.get( |
| 'server_host') |
| - response_data = _SendRequestToServer(url, http_client, request.Serialize()) |
| - return json.loads(response_data)['task_id'] |
| + response_data, error = _SendRequestToServer( |
| + url, http_client, request.Serialize()) |
| + |
| + # TODO(lijeffrey): Handle error in calling functions. |
| + if not error: |
| + return json.loads(response_data)['task_id'], None |
| + |
| + return None, error |
| def ListSwarmingTasksDataByTags( |
| @@ -116,7 +184,9 @@ def ListSwarmingTasksDataByTags( |
| url = base_url |
| else: |
| url = base_url + '&cursor=%s' % urllib.quote(cursor) |
| - new_data = _SendRequestToServer(url, http_client) |
| + new_data, _ = _SendRequestToServer(url, http_client) |
| + |
| + # TODO(lijeffrey): handle error in calling functions. |
| if not new_data: |
| break |
| @@ -146,10 +216,14 @@ def GetSwarmingTaskResultById(task_id, http_client): |
| """Gets swarming result, checks state and returns outputs ref if needed.""" |
| base_url = ('https://%s/_ah/api/swarming/v1/task/%s/result') % ( |
| waterfall_config.GetSwarmingSettings().get('server_host'), task_id) |
| - data = _SendRequestToServer(base_url, http_client) |
| - json_data = json.loads(data) |
| + json_data = {} |
| + |
| + data, error = _SendRequestToServer(base_url, http_client) |
| - return json_data |
| + if not error: |
| + json_data = json.loads(data) |
| + |
| + return json_data, error |
| def GetSwarmingTaskFailureLog(outputs_ref, http_client): |
| @@ -244,8 +318,8 @@ def _FetchOutputJsonInfoFromIsolatedServer(isolated_data, http_client): |
| } |
| url = '%s/_ah/api/isolateservice/v1/retrieve' % ( |
| isolated_data['isolatedserver']) |
| - content = _SendRequestToServer(url, http_client, post_data) |
| - return content |
| + |
| + return _SendRequestToServer(url, http_client, post_data) |
| def _GetOutputJsonHash(content): |
| @@ -273,7 +347,8 @@ def _RetrieveOutputJsonFile(output_json_content, http_client): |
| json_content = json.loads(output_json_content) |
| output_json_url = json_content.get('url') |
| if output_json_url: |
| - get_content = _SendRequestToServer(output_json_url, http_client) |
| + get_content, _ = _SendRequestToServer(output_json_url, http_client) |
| + # TODO(lijeffrey): handle error in calling function. |
| elif json_content.get('content'): |
| get_content = base64.b64decode(json_content['content']) |
| else: # pragma: no cover |
| @@ -285,16 +360,18 @@ def _RetrieveOutputJsonFile(output_json_content, http_client): |
| 'swarming result is invalid: %s' % zlib.decompress(get_content)) |
| return None |
| + |
| def _DownloadTestResults(isolated_data, http_client): |
| """Downloads the output.json file and returns the json object.""" |
| # First POST request to get hash for the output.json file. |
| - content = _FetchOutputJsonInfoFromIsolatedServer( |
| + content, error = _FetchOutputJsonInfoFromIsolatedServer( |
| isolated_data, http_client) |
| - if not content: |
| - return None |
| + if error: |
| + return None, error |
| + |
| output_json_hash = _GetOutputJsonHash(content) |
| if not output_json_hash: |
| - return None |
| + return None, None |
| # Second POST request to get the redirect url for the output.json file. |
| data_for_output_json = { |
| @@ -302,14 +379,14 @@ def _DownloadTestResults(isolated_data, http_client): |
| 'namespace': isolated_data['namespace'], |
| 'isolatedserver': isolated_data['isolatedserver'] |
| } |
| - output_json_content = _FetchOutputJsonInfoFromIsolatedServer( |
| + |
| + output_json_content, error = _FetchOutputJsonInfoFromIsolatedServer( |
| data_for_output_json, http_client) |
| - if not output_json_content: |
| - return None |
| + if error: |
| + return None, error |
| # GET Request to get output.json file. |
| - return _RetrieveOutputJsonFile( |
| - output_json_content, http_client) |
| + return _RetrieveOutputJsonFile(output_json_content, http_client), None |
| def _MergeListsOfDicts(merged, shard): |
| @@ -372,8 +449,9 @@ def RetrieveShardedTestResultsFromIsolatedServer( |
| """Gets test results from isolated server and merge the results.""" |
| shard_results = [] |
| for isolated_data in list_isolated_data: |
| - output_json = _DownloadTestResults(isolated_data, http_client) |
| + output_json, _ = _DownloadTestResults(isolated_data, http_client) |
| if not output_json: |
| + # TODO(lijeffrey): Report/handle error returned from _DownloadTestResults. |
| return None |
| shard_results.append(output_json) |