Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(131)

Side by Side Diff: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py

Issue 2477343003: [Findit] Refactoring monitor swarming task pipelines (Closed)
Patch Set: Addressing nit Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 from collections import defaultdict
5 import datetime 6 import datetime
6 import logging 7 import logging
7 import time 8 import time
8 9
9 from common.http_client_appengine import HttpClientAppengine as HttpClient 10 from common.http_client_appengine import HttpClientAppengine as HttpClient
10 from common.pipeline_wrapper import BasePipeline 11 from common.pipeline_wrapper import BasePipeline
11 from model import analysis_status 12 from model import analysis_status
12 from waterfall import swarming_util 13 from waterfall import swarming_util
13 from waterfall import waterfall_config 14 from waterfall import waterfall_config
14 15
15 16
16 class ProcessBaseSwarmingTaskResultPipeline(BasePipeline): 17 class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
17 """A pipeline for monitoring swarming task and processing task result. 18 """A pipeline for monitoring swarming task and processing task result.
18 19
19 This pipeline waits for result for a swarming task and processes the result to 20 This pipeline waits for result for a swarming task and processes the result to
20 generate a dict for statuses for each test run. 21 generate a dict for statuses for each test run.
21 """ 22 """
22 23
23 HTTP_CLIENT = HttpClient() 24 HTTP_CLIENT = HttpClient()
24 25
25 def _CheckTestsRunStatuses(self, output_json): 26 def _CheckTestsRunStatuses(self, output_json, *_):
26 # Checks result status for each test run and saves the numbers accordingly. 27 """Checks result status for each test run and saves the numbers accordingly.
27 # Should be overridden by subclass.
28 raise NotImplementedError(
29 '_CheckTestsRunStatuses should be implemented in the child class')
30 28
31 def _ConvertDateTime(self, time_string): 29 Args:
32 """Convert UTC time string to datetime.datetime.""" 30 output_json (dict): A dict of all test results in the swarming task.
33 # Match the time conversion with swarming.py which elides the suffix 31
34 # when microseconds are 0. 32 Returns:
35 if not time_string: 33 tests_statuses (dict): A dict of different statuses for each test.
36 return None 34
37 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'): 35 Currently for each test, we are saving number of total runs,
38 try: 36 number of succeeded runs and number of failed runs.
39 return datetime.datetime.strptime(time_string, fmt) 37 """
40 except ValueError: 38 tests_statuses = defaultdict(lambda: defaultdict(int))
41 pass 39 if output_json:
42 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover 40 for iteration in output_json.get('per_iteration_data'):
41 for test_name, tests in iteration.iteritems():
42 tests_statuses[test_name]['total_run'] += len(tests)
43 for test in tests:
44 tests_statuses[test_name][test['status']] += 1
45
46 return tests_statuses
43 47
44 def _GetSwarmingTask(self): 48 def _GetSwarmingTask(self):
45 # Get the appropriate kind of Swarming Task (Wf or Flake). 49 # Get the appropriate kind of Swarming Task (Wf or Flake).
46 # Should be overwritten by subclass. 50 # Should be overwritten by subclass.
47 raise NotImplementedError( 51 raise NotImplementedError(
48 '_GetSwarmingTask should be implemented in the child class') 52 '_GetSwarmingTask should be implemented in the child class')
49 53
50 def _GetArgs(self): 54 def _GetArgs(self):
51 # Return list of arguments to call _CheckTestsRunStatuses with - output_json 55 # Return list of arguments to call _CheckTestsRunStatuses with - output_json
52 # Should be overwritten by subclass. 56 # Should be overwritten by subclass.
53 raise NotImplementedError( 57 raise NotImplementedError(
54 '_GetArgs should be implemented in the child class') 58 '_GetArgs should be implemented in the child class')
55 59
60 def _ConvertDateTime(self, time_string):
61 """Convert UTC time string to datetime.datetime."""
62 if not time_string:
63 return None
64 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
65 # When microseconds are 0, the '.123456' suffix is elided.
66 try:
67 return datetime.datetime.strptime(time_string, fmt)
68 except ValueError:
69 pass
70 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
71
72 def _MonitorSwarmingTask(self, task_id, *call_args):
73 """Monitors the swarming task and waits for it to complete."""
74 timeout_hours = waterfall_config.GetSwarmingSettings().get(
75 'task_timeout_hours')
76 deadline = time.time() + timeout_hours * 60 * 60
77 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
78 'server_query_interval_seconds')
79
80 task_started = False
81 task_completed = False
82 tests_statuses = {}
83 step_name_no_platform = None
84 task = self._GetSwarmingTask(*call_args)
85
86 while not task_completed:
87 data = swarming_util.GetSwarmingTaskResultById(task_id, self.HTTP_CLIENT)
88 task_state = data['state']
89 exit_code = (data.get('exit_code') if
90 task_state == swarming_util.STATE_COMPLETED else None)
91 step_name_no_platform = (
92 step_name_no_platform or swarming_util.GetTagValue(
93 data.get('tags', {}), 'ref_name'))
94
95 if task_state not in swarming_util.STATES_RUNNING:
96 task_completed = True
97
98 if (task_state == swarming_util.STATE_COMPLETED and
99 int(exit_code) != swarming_util.TASK_FAILED):
100 outputs_ref = data.get('outputs_ref')
101 output_json = swarming_util.GetSwarmingTaskFailureLog(
102 outputs_ref, self.HTTP_CLIENT)
103 tests_statuses = self._CheckTestsRunStatuses(output_json, *call_args)
104 task.status = analysis_status.COMPLETED
105 task.tests_statuses = tests_statuses
106 else:
107 task.status = analysis_status.ERROR
108 logging_str = 'Swarming task stopped with status: %s' % task_state
109 if exit_code: # pragma: no cover
110 logging_str += ' and exit_code: %s - %s' % (
111 exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)])
112 logging.error(logging_str)
113
114 tags = data.get('tags', {})
115 priority_str = swarming_util.GetTagValue(tags, 'priority')
116 if priority_str:
117 task.parameters['priority'] = int(priority_str)
118
119 task.put()
120 else: # pragma: no cover
121 if task_state == 'RUNNING' and not task_started:
122 # swarming task just starts, update status.
123 task_started = True
124 task.status = analysis_status.RUNNING
125 task.put()
126 time.sleep(server_query_interval_seconds)
127
128 # Timeout.
129 if time.time() > deadline:
130 # Updates status as ERROR.
131 task.status = analysis_status.ERROR
132 task.put()
133 logging.error('Swarming task timed out after %d hours.' % timeout_hours)
134 break # Stops the loop and return.
135
136 # Update swarming task metadata timestamps.
137 task.created_time = self._ConvertDateTime(data.get('created_ts'))
138 task.started_time = self._ConvertDateTime(data.get('started_ts'))
139 task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
140 task.put()
141
142 return step_name_no_platform
143
56 # Arguments number differs from overridden method - pylint: disable=W0221 144 # Arguments number differs from overridden method - pylint: disable=W0221
57 def run(self, master_name, builder_name, build_number, 145 def run(self, master_name, builder_name, build_number, step_name, task_id,
58 step_name, task_id, *args): # pragma: no cover. 146 *args):
59 """ 147 """Monitors a swarming task.
148
60 Args: 149 Args:
61 master_name (str): The master name. 150 master_name (str): The master name.
62 builder_name (str): The builder name. 151 builder_name (str): The builder name.
63 build_number (str): The build number. 152 build_number (str): The build number.
64 step_name (str): The failed test step name. 153 step_name (str): The failed test step name.
65 task_id (str): Id for the swarming task which is triggered by Findit. 154 task_id (str): The task id to query the swarming server on the progresss
155 of a swarming task.
66 156
67 Returns: 157 Returns:
68 A dict of lists for reliable/flaky tests. 158 A dict of lists for reliable/flaky tests.
69 """ 159 """
70 call_args = self._GetArgs(master_name, builder_name, build_number, 160 call_args = self._GetArgs(master_name, builder_name, build_number,
71 step_name, *args) 161 step_name, *args)
72 assert task_id 162 step_name_no_platform = self._MonitorSwarmingTask(task_id, *call_args)
73 timeout_hours = waterfall_config.GetSwarmingSettings().get( 163 return step_name, step_name_no_platform
74 'task_timeout_hours')
75 deadline = time.time() + timeout_hours * 60 * 60
76 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
77 'server_query_interval_seconds')
78 task_started = False
79 task_completed = False
80 tests_statuses = {}
81 step_name_no_platform = None
82 164
83 while not task_completed:
84 # Keeps monitoring the swarming task, waits for it to complete.
85 data = swarming_util.GetSwarmingTaskResultById(
86 task_id, self.HTTP_CLIENT)
87 task_state = data['state']
88 exit_code = (data['exit_code'] if
89 task_state == swarming_util.STATE_COMPLETED else None)
90 step_name_no_platform = swarming_util.GetTagValue(
91 data.get('tags', {}), 'ref_name')
92 if task_state not in swarming_util.STATES_RUNNING:
93 task_completed = True
94 task = self._GetSwarmingTask(*call_args)
95 if (task_state == swarming_util.STATE_COMPLETED and
96 int(exit_code) != swarming_util.TASK_FAILED):
97 outputs_ref = data.get('outputs_ref')
98 output_json = swarming_util.GetSwarmingTaskFailureLog(
99 outputs_ref, self.HTTP_CLIENT)
100 tests_statuses = self._CheckTestsRunStatuses(
101 output_json, *call_args)
102 task.status = analysis_status.COMPLETED
103 task.tests_statuses = tests_statuses
104 else:
105 task.status = analysis_status.ERROR
106 logging_str = 'Swarming task stopped with status: %s' % task_state
107 if exit_code:
108 logging_str += ' and exit_code: %s - %s' % (
109 exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)])
110 logging.error(logging_str)
111
112 priority_str = swarming_util.GetTagValue(
113 data.get('tags', {}), 'priority')
114 if priority_str:
115 task.parameters['priority'] = int(priority_str)
116 task.put()
117 else: # pragma: no cover
118 if task_state == 'RUNNING' and not task_started:
119 # swarming task just starts, update status.
120 task_started = True
121 task = self._GetSwarmingTask(*call_args)
122 task.status = analysis_status.RUNNING
123 task.put()
124 time.sleep(server_query_interval_seconds)
125 if time.time() > deadline:
126 # Updates status as ERROR.
127 task = self._GetSwarmingTask(*call_args)
128 task.status = analysis_status.ERROR
129 task.put()
130 logging.error('Swarming task timed out after %d hours.' % timeout_hours)
131 break # Stops the loop and return.
132 # Update swarming task metadata.
133 task = self._GetSwarmingTask(*call_args)
134 task.created_time = self._ConvertDateTime(data.get('created_ts'))
135 task.started_time = self._ConvertDateTime(data.get('started_ts'))
136 task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
137 task.put()
138
139 return step_name, step_name_no_platform
OLDNEW
« no previous file with comments | « appengine/findit/model/wf_swarming_task.py ('k') | appengine/findit/waterfall/process_flake_swarming_task_result_pipeline.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698