Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(411)

Side by Side Diff: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py

Issue 2477343003: [Findit] Refactoring monitor swarming task pipelines (Closed)
Patch Set: Addressing comment Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import datetime 5 import datetime
6 import logging 6 import logging
7 import time 7 import time
8 8
9 from common.http_client_appengine import HttpClientAppengine as HttpClient 9 from common.http_client_appengine import HttpClientAppengine as HttpClient
10 from common.pipeline_wrapper import BasePipeline 10 from common.pipeline_wrapper import BasePipeline
(...skipping 10 matching lines...) Expand all
21 """ 21 """
22 22
23 HTTP_CLIENT = HttpClient() 23 HTTP_CLIENT = HttpClient()
24 24
25 def _CheckTestsRunStatuses(self, output_json): 25 def _CheckTestsRunStatuses(self, output_json):
26 # Checks result status for each test run and saves the numbers accordingly. 26 # Checks result status for each test run and saves the numbers accordingly.
27 # Should be overridden by subclass. 27 # Should be overridden by subclass.
28 raise NotImplementedError( 28 raise NotImplementedError(
29 '_CheckTestsRunStatuses should be implemented in the child class') 29 '_CheckTestsRunStatuses should be implemented in the child class')
30 30
31 def _ConvertDateTime(self, time_string):
32 """Convert UTC time string to datetime.datetime."""
33 # Match the time conversion with swarming.py which elides the suffix
34 # when microseconds are 0.
35 if not time_string:
36 return None
37 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
38 try:
39 return datetime.datetime.strptime(time_string, fmt)
40 except ValueError:
41 pass
42 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
43
44 def _GetSwarmingTask(self): 31 def _GetSwarmingTask(self):
45 # Get the appropriate kind of Swarming Task (Wf or Flake). 32 # Get the appropriate kind of Swarming Task (Wf or Flake).
46 # Should be overwritten by subclass. 33 # Should be overwritten by subclass.
47 raise NotImplementedError( 34 raise NotImplementedError(
48 '_GetSwarmingTask should be implemented in the child class') 35 '_GetSwarmingTask should be implemented in the child class')
49 36
50 def _GetArgs(self): 37 def _GetArgs(self):
51 # Return list of arguments to call _CheckTestsRunStatuses with - output_json 38 # Return list of arguments to call _CheckTestsRunStatuses with - output_json
52 # Should be overwritten by subclass. 39 # Should be overwritten by subclass.
53 raise NotImplementedError( 40 raise NotImplementedError(
54 '_GetArgs should be implemented in the child class') 41 '_GetArgs should be implemented in the child class')
55 42
43 def _ConvertDateTime(self, time_string):
44 """Convert UTC time string to datetime.datetime."""
45 if not time_string:
46 return None
47 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
48 # When microseconds are 0, the '.123456' suffix is elided.
49 try:
50 return datetime.datetime.strptime(time_string, fmt)
51 except ValueError:
52 pass
53 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
54
55 def _MonitorSwarmingTask(self, task_id, *call_args):
56 """Monitors the swarming task and waits for it to complete."""
57 timeout_hours = waterfall_config.GetSwarmingSettings().get(
58 'task_timeout_hours')
59 deadline = time.time() + timeout_hours * 60 * 60
60 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
61 'server_query_interval_seconds')
62
63 task_started = False
64 task_completed = False
65 tests_statuses = {}
66 step_name_no_platform = None
67 task = self._GetSwarmingTask(*call_args)
68
69 while not task_completed:
70 data = swarming_util.GetSwarmingTaskResultById(task_id, self.HTTP_CLIENT)
71 task_state = data['state']
72 exit_code = (data.get('exit_code') if
73 task_state == swarming_util.STATE_COMPLETED else None)
74 step_name_no_platform = (
75 step_name_no_platform or swarming_util.GetTagValue(
76 data.get('tags', {}), 'ref_name'))
77
78 if task_state not in swarming_util.STATES_RUNNING:
79 task_completed = True
80
81 if (task_state == swarming_util.STATE_COMPLETED and
82 int(exit_code) != swarming_util.TASK_FAILED):
83 outputs_ref = data.get('outputs_ref')
84 output_json = swarming_util.GetSwarmingTaskFailureLog(
85 outputs_ref, self.HTTP_CLIENT)
86 tests_statuses = self._CheckTestsRunStatuses(output_json, *call_args)
87 task.status = analysis_status.COMPLETED
88 task.tests_statuses = tests_statuses
89 else:
90 task.status = analysis_status.ERROR
91 logging_str = 'Swarming task stopped with status: %s' % task_state
92 if exit_code: # pragma: no cover
93 logging_str += ' and exit_code: %s - %s' % (
94 exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)])
95 logging.error(logging_str)
96
97 tags = data.get('tags', {})
98 priority_str = swarming_util.GetTagValue(tags, 'priority')
99 if priority_str:
100 task.parameters['priority'] = int(priority_str)
101
102 task.put()
103 else: # pragma: no cover
104 if task_state == 'RUNNING' and not task_started:
105 # swarming task just starts, update status.
106 task_started = True
107 task.status = analysis_status.RUNNING
108 task.put()
109 time.sleep(server_query_interval_seconds)
110
111 # Timeout.
112 if time.time() > deadline:
113 # Updates status as ERROR.
114 task.status = analysis_status.ERROR
115 task.put()
116 logging.error('Swarming task timed out after %d hours.' % timeout_hours)
117 break # Stops the loop and return.
118
119 # Update swarming task metadata timestamps.
120 task.created_time = self._ConvertDateTime(data.get('created_ts'))
121 task.started_time = self._ConvertDateTime(data.get('started_ts'))
122 task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
123 task.put()
124
125 return step_name_no_platform
126
56 # Arguments number differs from overridden method - pylint: disable=W0221 127 # Arguments number differs from overridden method - pylint: disable=W0221
57 def run(self, master_name, builder_name, build_number, 128 def run(self, master_name, builder_name, build_number, step_name, task_id,
58 step_name, task_id, *args): # pragma: no cover. 129 *args):
59 """ 130 """Monitors a swarming task.
131
60 Args: 132 Args:
61 master_name (str): The master name. 133 master_name (str): The master name.
62 builder_name (str): The builder name. 134 builder_name (str): The builder name.
63 build_number (str): The build number. 135 build_number (str): The build number.
64 step_name (str): The failed test step name. 136 step_name (str): The failed test step name.
65 task_id (str): Id for the swarming task which is triggered by Findit. 137 task_id (str): The task id to query the swarming server on the progresss
138 of a swarming task.
66 139
67 Returns: 140 Returns:
68 A dict of lists for reliable/flaky tests. 141 A dict of lists for reliable/flaky tests.
69 """ 142 """
70 call_args = self._GetArgs(master_name, builder_name, build_number, 143 call_args = self._GetArgs(master_name, builder_name, build_number,
71 step_name, *args) 144 step_name, *args)
72 assert task_id 145 step_name_no_platform = self._MonitorSwarmingTask(task_id, *call_args)
73 timeout_hours = waterfall_config.GetSwarmingSettings().get( 146 return step_name, step_name_no_platform
74 'task_timeout_hours')
75 deadline = time.time() + timeout_hours * 60 * 60
76 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
77 'server_query_interval_seconds')
78 task_started = False
79 task_completed = False
80 tests_statuses = {}
81 step_name_no_platform = None
82 147
83 while not task_completed:
84 # Keeps monitoring the swarming task, waits for it to complete.
85 data = swarming_util.GetSwarmingTaskResultById(
86 task_id, self.HTTP_CLIENT)
87 task_state = data['state']
88 exit_code = (data['exit_code'] if
89 task_state == swarming_util.STATE_COMPLETED else None)
90 step_name_no_platform = swarming_util.GetTagValue(
91 data.get('tags', {}), 'ref_name')
92 if task_state not in swarming_util.STATES_RUNNING:
93 task_completed = True
94 task = self._GetSwarmingTask(*call_args)
95 if (task_state == swarming_util.STATE_COMPLETED and
96 int(exit_code) != swarming_util.TASK_FAILED):
97 outputs_ref = data.get('outputs_ref')
98 output_json = swarming_util.GetSwarmingTaskFailureLog(
99 outputs_ref, self.HTTP_CLIENT)
100 tests_statuses = self._CheckTestsRunStatuses(
101 output_json, *call_args)
102 task.status = analysis_status.COMPLETED
103 task.tests_statuses = tests_statuses
104 else:
105 task.status = analysis_status.ERROR
106 logging_str = 'Swarming task stopped with status: %s' % task_state
107 if exit_code:
108 logging_str += ' and exit_code: %s - %s' % (
109 exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)])
110 logging.error(logging_str)
111
112 priority_str = swarming_util.GetTagValue(
113 data.get('tags', {}), 'priority')
114 if priority_str:
115 task.parameters['priority'] = int(priority_str)
116 task.put()
117 else: # pragma: no cover
118 if task_state == 'RUNNING' and not task_started:
119 # swarming task just starts, update status.
120 task_started = True
121 task = self._GetSwarmingTask(*call_args)
122 task.status = analysis_status.RUNNING
123 task.put()
124 time.sleep(server_query_interval_seconds)
125 if time.time() > deadline:
126 # Updates status as ERROR.
127 task = self._GetSwarmingTask(*call_args)
128 task.status = analysis_status.ERROR
129 task.put()
130 logging.error('Swarming task timed out after %d hours.' % timeout_hours)
131 break # Stops the loop and return.
132 # Update swarming task metadata.
133 task = self._GetSwarmingTask(*call_args)
134 task.created_time = self._ConvertDateTime(data.get('created_ts'))
135 task.started_time = self._ConvertDateTime(data.get('started_ts'))
136 task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
137 task.put()
138
139 return step_name, step_name_no_platform
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698