| Index: appengine/findit/waterfall/swarming_util.py
|
| diff --git a/appengine/findit/waterfall/swarming_util.py b/appengine/findit/waterfall/swarming_util.py
|
| index 403a25a1e08fde887186c18279727e9a4864b39d..3efa9f96d582e39a7c089b68f7fa6826b72d0982 100644
|
| --- a/appengine/findit/waterfall/swarming_util.py
|
| +++ b/appengine/findit/waterfall/swarming_util.py
|
| @@ -9,6 +9,9 @@ import logging
|
| import urllib
|
| import zlib
|
|
|
| +from google.appengine.api.urlfetch_errors import DeadlineExceededError
|
| +from google.appengine.api.urlfetch_errors import DownloadError
|
| +from google.appengine.api.urlfetch_errors import ConnectionClosedError
|
| from google.appengine.ext import ndb
|
|
|
| from common import auth_util
|
| @@ -17,39 +20,100 @@ from waterfall import waterfall_config
|
| from waterfall.swarming_task_request import SwarmingTaskRequest
|
|
|
|
|
| +# Swarming task states.
|
| STATES_RUNNING = ('RUNNING', 'PENDING')
|
| STATE_COMPLETED = 'COMPLETED'
|
| STATES_NOT_RUNNING = (
|
| - 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED')
|
| + 'BOT_DIED', 'CANCELED', 'COMPLETED', 'EXPIRED', 'TIMED_OUT')
|
|
|
| +
|
| +# Swarming task stopped error codes.
|
| +BOT_DIED = 30
|
| +CANCELED = 40
|
| +EXPIRED = 50
|
| +TIMED_OUT = 60
|
| +
|
| +STATES_NOT_RUNNING_TO_ERROR_CODES = {
|
| + 'BOT_DIED': BOT_DIED,
|
| + 'CANCELED': CANCELED,
|
| + 'EXPIRED': EXPIRED,
|
| + 'TIMED_OUT': TIMED_OUT,
|
| +}
|
| +
|
| +
|
| +# Urlfetch error codes.
|
| +URLFETCH_DOWNLOAD_ERROR = 100
|
| +URLFETCH_DEADLINE_EXCEEDED_ERROR = 110
|
| +URLFETCH_CONNECTION_CLOSED_ERROR = 120
|
| +EXCEEDED_MAX_RETRIES_ERROR = 210
|
| +
|
| +
|
| +# Other/miscellaneous error codes.
|
| +UNKNOWN = 1000
|
| +
|
| +
|
| +# Swarming task exit codes.
|
| ALL_TESTS_PASSED = 0
|
| SOME_TESTS_FAILED = 1
|
| TASK_FAILED = 2
|
|
|
| +# Swarming task exit code descriptions.
|
| EXIT_CODE_DESCRIPTIONS = {
|
| - ALL_TESTS_PASSED: 'all tests passed',
|
| - SOME_TESTS_FAILED: 'some tests failed',
|
| - TASK_FAILED: 'swarming task failed'
|
| + ALL_TESTS_PASSED: 'All tests passed',
|
| + SOME_TESTS_FAILED: 'Some tests failed',
|
| + TASK_FAILED: 'Swarming task failed',
|
| }
|
|
|
|
|
| def _SendRequestToServer(url, http_client, post_data=None):
|
| """Sends GET/POST request to arbitrary url and returns response content."""
|
| headers = {'Authorization': 'Bearer ' + auth_util.GetAuthToken()}
|
| - if post_data:
|
| - post_data = json.dumps(post_data, sort_keys=True, separators=(',', ':'))
|
| - headers['Content-Type'] = 'application/json; charset=UTF-8'
|
| - headers['Content-Length'] = len(post_data)
|
| - status_code, content = http_client.Post(url, post_data, headers=headers)
|
| - else:
|
| - status_code, content = http_client.Get(url, headers=headers)
|
| -
|
| - if status_code != 200:
|
| + error = None
|
| +
|
| + try:
|
| + if post_data:
|
| + post_data = json.dumps(post_data, sort_keys=True, separators=(',', ':'))
|
| + headers['Content-Type'] = 'application/json; charset=UTF-8'
|
| + headers['Content-Length'] = len(post_data)
|
| + status_code, content = http_client.Post(url, post_data, headers=headers)
|
| + else:
|
| + status_code, content = http_client.Get(url, headers=headers)
|
| + except ConnectionClosedError as e:
|
| + error = {
|
| + 'code': URLFETCH_CONNECTION_CLOSED_ERROR,
|
| + 'message': e.message
|
| + }
|
| + except DeadlineExceededError as e:
|
| + error = {
|
| + 'code': URLFETCH_DEADLINE_EXCEEDED_ERROR,
|
| + 'message': e.message
|
| + }
|
| + except DownloadError as e:
|
| + error = {
|
| + 'code': URLFETCH_DOWNLOAD_ERROR,
|
| + 'message': e.message
|
| + }
|
| + except Exception as e: # pragma: no cover
|
| + error = {
|
| + 'code': UNKNOWN,
|
| + 'message': e.message
|
| + }
|
| + # Still raise an exception here for cases not encountered before in order
|
| + # to see what went wrong in the logs.
|
| + raise e
|
| +
|
| + if error or status_code != 200:
|
| # The retry upon 50x (501 excluded) is automatically handled in the
|
| # underlying http_client.
|
| # By default, it retries 5 times with exponential backoff.
|
| - return None
|
| - return content
|
| + error = error or {
|
| + 'code': EXCEEDED_MAX_RETRIES_ERROR,
|
| + 'message': 'Max retries exceeded trying to reach %s' % url
|
| + }
|
| + logging.error(error['message'])
|
| + return None, error
|
| +
|
| + return content, None
|
|
|
|
|
| def GetSwarmingTaskRequest(task_id, http_client):
|
| @@ -58,8 +122,13 @@ def GetSwarmingTaskRequest(task_id, http_client):
|
| 'server_host')
|
| url = ('https://%s/_ah/api/swarming/v1/task/%s/request') % (
|
| swarming_server_host, task_id)
|
| - json_data = json.loads(_SendRequestToServer(url, http_client))
|
| - return SwarmingTaskRequest.Deserialize(json_data)
|
| + content, error = _SendRequestToServer(url, http_client)
|
| +
|
| + # TODO(lijeffrey): Handle/report error in calling functions.
|
| + if not error:
|
| + json_data = json.loads(content)
|
| + return SwarmingTaskRequest.Deserialize(json_data)
|
| + return None
|
|
|
|
|
| def TriggerSwarmingTask(request, http_client):
|
| @@ -82,8 +151,14 @@ def TriggerSwarmingTask(request, http_client):
|
|
|
| url = 'https://%s/_ah/api/swarming/v1/tasks/new' % swarming_settings.get(
|
| 'server_host')
|
| - response_data = _SendRequestToServer(url, http_client, request.Serialize())
|
| - return json.loads(response_data)['task_id']
|
| + response_data, error = _SendRequestToServer(
|
| + url, http_client, request.Serialize())
|
| +
|
| + # TODO(lijeffrey): Handle error in calling functions.
|
| + if not error:
|
| + return json.loads(response_data)['task_id'], None
|
| +
|
| + return None, error
|
|
|
|
|
| def ListSwarmingTasksDataByTags(
|
| @@ -116,7 +191,9 @@ def ListSwarmingTasksDataByTags(
|
| url = base_url
|
| else:
|
| url = base_url + '&cursor=%s' % urllib.quote(cursor)
|
| - new_data = _SendRequestToServer(url, http_client)
|
| + new_data, _ = _SendRequestToServer(url, http_client)
|
| +
|
| + # TODO(lijeffrey): handle error in calling functions.
|
| if not new_data:
|
| break
|
|
|
| @@ -146,10 +223,14 @@ def GetSwarmingTaskResultById(task_id, http_client):
|
| """Gets swarming result, checks state and returns outputs ref if needed."""
|
| base_url = ('https://%s/_ah/api/swarming/v1/task/%s/result') % (
|
| waterfall_config.GetSwarmingSettings().get('server_host'), task_id)
|
| - data = _SendRequestToServer(base_url, http_client)
|
| - json_data = json.loads(data)
|
| + json_data = {}
|
| +
|
| + data, error = _SendRequestToServer(base_url, http_client)
|
| +
|
| + if not error:
|
| + json_data = json.loads(data)
|
|
|
| - return json_data
|
| + return json_data, error
|
|
|
|
|
| def GetSwarmingTaskFailureLog(outputs_ref, http_client):
|
| @@ -244,8 +325,8 @@ def _FetchOutputJsonInfoFromIsolatedServer(isolated_data, http_client):
|
| }
|
| url = '%s/_ah/api/isolateservice/v1/retrieve' % (
|
| isolated_data['isolatedserver'])
|
| - content = _SendRequestToServer(url, http_client, post_data)
|
| - return content
|
| +
|
| + return _SendRequestToServer(url, http_client, post_data)
|
|
|
|
|
| def _GetOutputJsonHash(content):
|
| @@ -273,7 +354,8 @@ def _RetrieveOutputJsonFile(output_json_content, http_client):
|
| json_content = json.loads(output_json_content)
|
| output_json_url = json_content.get('url')
|
| if output_json_url:
|
| - get_content = _SendRequestToServer(output_json_url, http_client)
|
| + get_content, _ = _SendRequestToServer(output_json_url, http_client)
|
| + # TODO(lijeffrey): handle error in calling function.
|
| elif json_content.get('content'):
|
| get_content = base64.b64decode(json_content['content'])
|
| else: # pragma: no cover
|
| @@ -285,16 +367,18 @@ def _RetrieveOutputJsonFile(output_json_content, http_client):
|
| 'swarming result is invalid: %s' % zlib.decompress(get_content))
|
| return None
|
|
|
| +
|
| def _DownloadTestResults(isolated_data, http_client):
|
| """Downloads the output.json file and returns the json object."""
|
| # First POST request to get hash for the output.json file.
|
| - content = _FetchOutputJsonInfoFromIsolatedServer(
|
| + content, error = _FetchOutputJsonInfoFromIsolatedServer(
|
| isolated_data, http_client)
|
| - if not content:
|
| - return None
|
| + if error:
|
| + return None, error
|
| +
|
| output_json_hash = _GetOutputJsonHash(content)
|
| if not output_json_hash:
|
| - return None
|
| + return None, None
|
|
|
| # Second POST request to get the redirect url for the output.json file.
|
| data_for_output_json = {
|
| @@ -302,14 +386,14 @@ def _DownloadTestResults(isolated_data, http_client):
|
| 'namespace': isolated_data['namespace'],
|
| 'isolatedserver': isolated_data['isolatedserver']
|
| }
|
| - output_json_content = _FetchOutputJsonInfoFromIsolatedServer(
|
| +
|
| + output_json_content, error = _FetchOutputJsonInfoFromIsolatedServer(
|
| data_for_output_json, http_client)
|
| - if not output_json_content:
|
| - return None
|
| + if error:
|
| + return None, error
|
|
|
| # GET Request to get output.json file.
|
| - return _RetrieveOutputJsonFile(
|
| - output_json_content, http_client)
|
| + return _RetrieveOutputJsonFile(output_json_content, http_client), None
|
|
|
|
|
| def _MergeListsOfDicts(merged, shard):
|
| @@ -372,8 +456,9 @@ def RetrieveShardedTestResultsFromIsolatedServer(
|
| """Gets test results from isolated server and merge the results."""
|
| shard_results = []
|
| for isolated_data in list_isolated_data:
|
| - output_json = _DownloadTestResults(isolated_data, http_client)
|
| + output_json, _ = _DownloadTestResults(isolated_data, http_client)
|
| if not output_json:
|
| + # TODO(lijeffrey): Report/handle error returned from _DownloadTestResults.
|
| return None
|
| shard_results.append(output_json)
|
|
|
|
|