| Index: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
|
| diff --git a/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
|
| index ed7bf862345cf9a8f350dfac8009dff83ef335a7..bae4096c6d5d0edd8e7bb787c44c99a1db310b93 100644
|
| --- a/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
|
| +++ b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
|
| @@ -71,12 +71,12 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
|
|
|
| def _MonitorSwarmingTask(self, task_id, *call_args):
|
| """Monitors the swarming task and waits for it to complete."""
|
| + assert task_id
|
| timeout_hours = waterfall_config.GetSwarmingSettings().get(
|
| 'task_timeout_hours')
|
| deadline = time.time() + timeout_hours * 60 * 60
|
| server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
|
| 'server_query_interval_seconds')
|
| -
|
| task_started = False
|
| task_completed = False
|
| tests_statuses = {}
|
| @@ -84,7 +84,16 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
|
| task = self._GetSwarmingTask(*call_args)
|
|
|
| while not task_completed:
|
| - data = swarming_util.GetSwarmingTaskResultById(task_id, self.HTTP_CLIENT)
|
| + data, error = swarming_util.GetSwarmingTaskResultById(
|
| + task_id, self.HTTP_CLIENT)
|
| +
|
| + if error:
|
| + # An error occurred when trying to contact the swarming server.
|
| + task.status = analysis_status.ERROR
|
| + task.error = error
|
| + task.put()
|
| + break
|
| +
|
| task_state = data['state']
|
| exit_code = (data.get('exit_code') if
|
| task_state == swarming_util.STATE_COMPLETED else None)
|
| @@ -98,17 +107,39 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
|
| if (task_state == swarming_util.STATE_COMPLETED and
|
| int(exit_code) != swarming_util.TASK_FAILED):
|
| outputs_ref = data.get('outputs_ref')
|
| - output_json = swarming_util.GetSwarmingTaskFailureLog(
|
| + output_json, error = swarming_util.GetSwarmingTaskFailureLog(
|
| outputs_ref, self.HTTP_CLIENT)
|
| +
|
| + if error:
|
| + task.status = analysis_status.ERROR
|
| + task.error = error
|
| + else:
|
| + task.status = analysis_status.COMPLETED
|
| +
|
| tests_statuses = self._CheckTestsRunStatuses(output_json, *call_args)
|
| - task.status = analysis_status.COMPLETED
|
| task.tests_statuses = tests_statuses
|
| + task.put()
|
| else:
|
| + if exit_code is not None:
|
| + # Swarming task completed, but the task failed.
|
| + code = int(exit_code)
|
| + message = swarming_util.EXIT_CODE_DESCRIPTIONS[code]
|
| + else:
|
| + # The swarming task did not complete.
|
| + code = swarming_util.STATES_NOT_RUNNING_TO_ERROR_CODES[task_state]
|
| + message = task_state
|
| +
|
| task.status = analysis_status.ERROR
|
| + task.error = {
|
| + 'code': code,
|
| + 'message': message
|
| + }
|
| + task.put()
|
| +
|
| logging_str = 'Swarming task stopped with status: %s' % task_state
|
| if exit_code: # pragma: no cover
|
| logging_str += ' and exit_code: %s - %s' % (
|
| - exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)])
|
| + exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[code])
|
| logging.error(logging_str)
|
|
|
| tags = data.get('tags', {})
|
| @@ -129,14 +160,21 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
|
| if time.time() > deadline:
|
| # Updates status as ERROR.
|
| task.status = analysis_status.ERROR
|
| + task.error = {
|
| + 'code': swarming_util.TIMED_OUT,
|
| + 'message': 'Process swarming task result timed out'
|
| + }
|
| task.put()
|
| logging.error('Swarming task timed out after %d hours.' % timeout_hours)
|
| break # Stops the loop and return.
|
|
|
| - # Update swarming task metadata timestamps.
|
| - task.created_time = self._ConvertDateTime(data.get('created_ts'))
|
| - task.started_time = self._ConvertDateTime(data.get('started_ts'))
|
| - task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
|
| + # Update swarming task metadata.
|
| + task.created_time = (task.created_time or
|
| + self._ConvertDateTime(data.get('created_ts')))
|
| + task.started_time = (task.started_time or
|
| + self._ConvertDateTime(data.get('started_ts')))
|
| + task.completed_time = (task.completed_time or
|
| + self._ConvertDateTime(data.get('completed_ts')))
|
| task.put()
|
|
|
| return step_name_no_platform
|
| @@ -161,4 +199,3 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
|
| step_name, *args)
|
| step_name_no_platform = self._MonitorSwarmingTask(task_id, *call_args)
|
| return step_name, step_name_no_platform
|
| -
|
|
|