Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(432)

Unified Diff: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py

Issue 2491473002: [Findit] Implementing swarming task error detection (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
diff --git a/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
index e493145670b4608e36b2180bb4624f6f9cf71dc5..938112fc66adc80699093b825de6f2435ef09dd7 100644
--- a/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
+++ b/appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py
@@ -2,6 +2,7 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+from collections import defaultdict
import datetime
import logging
import time
@@ -22,24 +23,27 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
HTTP_CLIENT = HttpClient()
- def _CheckTestsRunStatuses(self, output_json):
- # Checks result status for each test run and saves the numbers accordingly.
- # Should be overridden by subclass.
- raise NotImplementedError(
- '_CheckTestsRunStatuses should be implemented in the child class')
+ def _CheckTestsRunStatuses(self, output_json, *_):
+ """Checks result status for each test run and saves the numbers accordingly.
- def _ConvertDateTime(self, time_string):
- """Convert UTC time string to datetime.datetime."""
- # Match the time conversion with swarming.py which elides the suffix
- # when microseconds are 0.
- if not time_string:
- return None
- for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
- try:
- return datetime.datetime.strptime(time_string, fmt)
- except ValueError:
- pass
- raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
+ Args:
+ output_json (dict): A dict of all test results in the swarming task.
+
+ Returns:
+ tests_statuses (dict): A dict of different statuses for each test.
+
+ Currently for each test, we are saving number of total runs,
+ number of succeeded runs and number of failed runs.
+ """
+ tests_statuses = defaultdict(lambda: defaultdict(int))
+ if output_json:
+ for iteration in output_json.get('per_iteration_data'):
+ for test_name, tests in iteration.iteritems():
+ tests_statuses[test_name]['total_run'] += len(tests)
+ for test in tests:
+ tests_statuses[test_name][test['status']] += 1
+
+ return tests_statuses
def _GetSwarmingTask(self):
# Get the appropriate kind of Swarming Task (Wf or Flake).
@@ -53,22 +57,20 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
raise NotImplementedError(
'_GetArgs should be implemented in the child class')
- # Arguments number differs from overridden method - pylint: disable=W0221
- def run(self, master_name, builder_name, build_number,
- step_name, task_id, *args): # pragma: no cover.
- """
- Args:
- master_name (str): The master name.
- builder_name (str): The builder name.
- build_number (str): The build number.
- step_name (str): The failed test step name.
- task_id (str): Id for the swarming task which is triggered by Findit.
+ def _ConvertDateTime(self, time_string):
+ """Convert UTC time string to datetime.datetime."""
+ if not time_string:
+ return None
+ for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
+ # When microseconds are 0, the '.123456' suffix is elided.
+ try:
+ return datetime.datetime.strptime(time_string, fmt)
+ except ValueError:
+ pass
+ raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
- Returns:
- A dict of lists for reliable/flaky tests.
- """
- call_args = self._GetArgs(master_name, builder_name, build_number,
- step_name, *args)
+ def _MonitorSwarmingTask(self, task_id, *call_args):
+ """Monitors the swarming task and waits for it to complete."""
assert task_id
timeout_hours = waterfall_config.GetSwarmingSettings().get(
'task_timeout_hours')
@@ -79,61 +81,117 @@ class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
task_completed = False
tests_statuses = {}
step_name_no_platform = None
+ task = self._GetSwarmingTask(*call_args)
while not task_completed:
- # Keeps monitoring the swarming task, waits for it to complete.
- data = swarming_util.GetSwarmingTaskResultById(
+ data, error = swarming_util.GetSwarmingTaskResultById(
task_id, self.HTTP_CLIENT)
+
+ if error:
+ # An error occurred when trying to contact the swarming server.
+ logging.error(error.get('message'))
chanli 2016/11/11 00:05:13 In swarming_util._SendRequestToServer the error me
lijeffrey 2016/11/11 20:55:41 Done.
+ task.status = analysis_status.ERROR
+ task.error = error
+ task.put()
+ break
+
task_state = data['state']
- exit_code = (data['exit_code'] if
+ exit_code = (data.get('exit_code') if
task_state == swarming_util.STATE_COMPLETED else None)
- step_name_no_platform = swarming_util.GetTagValue(
- data.get('tags', {}), 'ref_name')
+ step_name_no_platform = (
+ step_name_no_platform or swarming_util.GetTagValue(
+ data.get('tags', {}), 'ref_name'))
+
if task_state not in swarming_util.STATES_RUNNING:
task_completed = True
- task = self._GetSwarmingTask(*call_args)
+
if (task_state == swarming_util.STATE_COMPLETED and
int(exit_code) != swarming_util.TASK_FAILED):
outputs_ref = data.get('outputs_ref')
- output_json = swarming_util.GetSwarmingTaskFailureLog(
+ output_json, error = swarming_util.GetSwarmingTaskFailureLog(
outputs_ref, self.HTTP_CLIENT)
- tests_statuses = self._CheckTestsRunStatuses(
- output_json, *call_args)
- task.status = analysis_status.COMPLETED
+
+ if error:
+ logging.error(error.get('message'))
chanli 2016/11/11 00:05:13 Same here
lijeffrey 2016/11/11 20:55:41 Done.
+ task.status = analysis_status.ERROR
+ task.error = error
+ else:
+ task.status = analysis_status.COMPLETED
+
+ tests_statuses = self._CheckTestsRunStatuses(output_json, *call_args)
task.tests_statuses = tests_statuses
+ task.put()
else:
+ code = int(exit_code) if exit_code is not None else (
+ swarming_util.UNKNOWN)
chanli 2016/11/11 00:05:13 I think UNKNOWN is misleading here. If task_state
task.status = analysis_status.ERROR
+ task.error = {
+ 'code': code,
+ 'message': swarming_util.EXIT_CODE_DESCRIPTIONS[code]
+ }
+ task.put()
+
logging_str = 'Swarming task stopped with status: %s' % task_state
- if exit_code:
+ if exit_code: # pragma: no cover
logging_str += ' and exit_code: %s - %s' % (
- exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)])
+ exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[code])
logging.error(logging_str)
- priority_str = swarming_util.GetTagValue(
- data.get('tags', {}), 'priority')
+ tags = data.get('tags', {})
+ priority_str = swarming_util.GetTagValue(tags, 'priority')
if priority_str:
task.parameters['priority'] = int(priority_str)
+
task.put()
else: # pragma: no cover
if task_state == 'RUNNING' and not task_started:
# swarming task just starts, update status.
task_started = True
- task = self._GetSwarmingTask(*call_args)
task.status = analysis_status.RUNNING
task.put()
time.sleep(server_query_interval_seconds)
+
+ # Timeout.
if time.time() > deadline:
# Updates status as ERROR.
- task = self._GetSwarmingTask(*call_args)
task.status = analysis_status.ERROR
+ task.error = {
+ 'code': swarming_util.TIMED_OUT,
+ 'message': 'Process swarming task result timed out'
+ }
task.put()
logging.error('Swarming task timed out after %d hours.' % timeout_hours)
break # Stops the loop and return.
+
# Update swarming task metadata.
- task = self._GetSwarmingTask(*call_args)
- task.created_time = self._ConvertDateTime(data.get('created_ts'))
- task.started_time = self._ConvertDateTime(data.get('started_ts'))
- task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
+ task.created_time = (task.created_time or
+ self._ConvertDateTime(data.get('created_ts')))
+ task.started_time = (task.started_time or
+ self._ConvertDateTime(data.get('started_ts')))
+ task.completed_time = (task.completed_time or
+ self._ConvertDateTime(data.get('completed_ts')))
task.put()
+ return step_name_no_platform
+
+ # Arguments number differs from overridden method - pylint: disable=W0221
+ def run(self, master_name, builder_name, build_number, step_name, task_id,
+ *args):
+ """Monitors a swarming task.
+
+ Args:
+ master_name (str): The master name.
+ builder_name (str): The builder name.
+ build_number (str): The build number.
+ step_name (str): The failed test step name.
+ task_id (str): The task id to query the swarming server on the progresss
+ of a swarming task.
+
+ Returns:
+ A dict of lists for reliable/flaky tests.
+ """
+ call_args = self._GetArgs(master_name, builder_name, build_number,
+ step_name, *args)
+ step_name_no_platform = self._MonitorSwarmingTask(task_id, *call_args)
return step_name, step_name_no_platform
+

Powered by Google App Engine
This is Rietveld 408576698