Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(106)

Side by Side Diff: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py

Issue 2477343003: [Findit] Refactoring monitor swarming task pipelines (Closed)
Patch Set: Addressing comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import datetime 5 import datetime
6 import logging 6 import logging
7 import time 7 import time
8 8
9 from common.http_client_appengine import HttpClientAppengine as HttpClient 9 from common.http_client_appengine import HttpClientAppengine as HttpClient
10 from common.pipeline_wrapper import BasePipeline 10 from common.pipeline_wrapper import BasePipeline
(...skipping 10 matching lines...) Expand all
21 """ 21 """
22 22
23 HTTP_CLIENT = HttpClient() 23 HTTP_CLIENT = HttpClient()
24 24
25 def _CheckTestsRunStatuses(self, output_json): 25 def _CheckTestsRunStatuses(self, output_json):
26 # Checks result status for each test run and saves the numbers accordingly. 26 # Checks result status for each test run and saves the numbers accordingly.
27 # Should be overridden by subclass. 27 # Should be overridden by subclass.
28 raise NotImplementedError( 28 raise NotImplementedError(
29 '_CheckTestsRunStatuses should be implemented in the child class') 29 '_CheckTestsRunStatuses should be implemented in the child class')
30 30
31 def _ConvertDateTime(self, time_string):
32 """Convert UTC time string to datetime.datetime."""
33 # Match the time conversion with swarming.py which elides the suffix
34 # when microseconds are 0.
35 if not time_string:
36 return None
37 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
38 try:
39 return datetime.datetime.strptime(time_string, fmt)
40 except ValueError:
41 pass
42 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
43
44 def _GetSwarmingTask(self): 31 def _GetSwarmingTask(self):
45 # Get the appropriate kind of Swarming Task (Wf or Flake). 32 # Get the appropriate kind of Swarming Task (Wf or Flake).
46 # Should be overwritten by subclass. 33 # Should be overwritten by subclass.
47 raise NotImplementedError( 34 raise NotImplementedError(
48 '_GetSwarmingTask should be implemented in the child class') 35 '_GetSwarmingTask should be implemented in the child class')
49 36
50 def _GetArgs(self): 37 def _GetArgs(self):
51 # Return list of arguments to call _CheckTestsRunStatuses with - output_json 38 # Return list of arguments to call _CheckTestsRunStatuses with - output_json
52 # Should be overwritten by subclass. 39 # Should be overwritten by subclass.
53 raise NotImplementedError( 40 raise NotImplementedError(
54 '_GetArgs should be implemented in the child class') 41 '_GetArgs should be implemented in the child class')
55 42
43 def _ConvertDateTime(self, time_string):
44 """Convert UTC time string to datetime.datetime."""
45 if not time_string:
46 return None
47 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
48 # When microseconds are 0, the '.123456' suffix is elided.
49 try:
50 return datetime.datetime.strptime(time_string, fmt)
51 except ValueError:
52 pass
53 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
54
55 def _MonitorSwarmingTask(self, task_id, *call_args):
56 """Monitors the swarming task and waits for it to complete."""
57 timeout_hours = waterfall_config.GetSwarmingSettings().get(
58 'task_timeout_hours')
59 deadline = time.time() + timeout_hours * 60 * 60
60 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
61 'server_query_interval_seconds')
62
63 task_started = False
64 task_completed = False
65 tests_statuses = {}
66 step_name_no_platform = None
67 task = self._GetSwarmingTask(*call_args)
68
69 while not task_completed:
70 data = swarming_util.GetSwarmingTaskResultById(task_id, self.HTTP_CLIENT)
71 task_state = data['state']
72 exit_code = (data.get('exit_code') if
73 task_state == swarming_util.STATE_COMPLETED else None)
74 step_name_no_platform = swarming_util.GetTagValue(
chanli 2016/11/09 00:55:24 We can modify this to: step_name_no_platform = ste
lijeffrey 2016/11/09 01:03:21 Done.
75 data.get('tags', {}), 'ref_name')
76
77 if task_state not in swarming_util.STATES_RUNNING:
78 task_completed = True
79
80 if (task_state == swarming_util.STATE_COMPLETED and
81 int(exit_code) != swarming_util.TASK_FAILED):
82 outputs_ref = data.get('outputs_ref')
83 output_json = swarming_util.GetSwarmingTaskFailureLog(
84 outputs_ref, self.HTTP_CLIENT)
85 tests_statuses = self._CheckTestsRunStatuses(output_json, *call_args)
86 task.status = analysis_status.COMPLETED
87 task.tests_statuses = tests_statuses
88 else:
89 task.status = analysis_status.ERROR
90 logging_str = 'Swarming task stopped with status: %s' % task_state
91 if exit_code: # pragma: no cover
92 logging_str += ' and exit_code: %s - %s' % (
93 exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)])
94 logging.error(logging_str)
95
96 tags = data.get('tags', {})
97 priority_str = swarming_util.GetTagValue(tags, 'priority')
98 if priority_str:
99 task.parameters['priority'] = int(priority_str)
100
101 task.put()
102 else: # pragma: no cover
103 if task_state == 'RUNNING' and not task_started:
104 # swarming task just starts, update status.
105 task_started = True
106 task.status = analysis_status.RUNNING
107 task.put()
108 time.sleep(server_query_interval_seconds)
109
110 # Timeout.
111 if time.time() > deadline:
112 # Updates status as ERROR.
113 task.status = analysis_status.ERROR
114 task.put()
115 logging.error('Swarming task timed out after %d hours.' % timeout_hours)
116 break # Stops the loop and return.
117
118 # Update swarming task metadata timestamps.
119 task.created_time = self._ConvertDateTime(data.get('created_ts'))
120 task.started_time = self._ConvertDateTime(data.get('started_ts'))
121 task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
122 task.put()
123
124 return step_name_no_platform
125
56 # Arguments number differs from overridden method - pylint: disable=W0221 126 # Arguments number differs from overridden method - pylint: disable=W0221
57 def run(self, master_name, builder_name, build_number, 127 def run(self, master_name, builder_name, build_number, step_name, task_id,
58 step_name, task_id, *args): # pragma: no cover. 128 *args):
59 """ 129 """Monitors a swarming task.
130
60 Args: 131 Args:
61 master_name (str): The master name. 132 master_name (str): The master name.
62 builder_name (str): The builder name. 133 builder_name (str): The builder name.
63 build_number (str): The build number. 134 build_number (str): The build number.
64 step_name (str): The failed test step name. 135 step_name (str): The failed test step name.
65 task_id (str): Id for the swarming task which is triggered by Findit. 136 task_id (str): The task id to query the swarming server on the progresss
137 of a swarming task.
66 138
67 Returns: 139 Returns:
68 A dict of lists for reliable/flaky tests. 140 A dict of lists for reliable/flaky tests.
69 """ 141 """
70 call_args = self._GetArgs(master_name, builder_name, build_number, 142 call_args = self._GetArgs(master_name, builder_name, build_number,
71 step_name, *args) 143 step_name, *args)
72 assert task_id 144 step_name_no_platform = self._MonitorSwarmingTask(task_id, *call_args)
73 timeout_hours = waterfall_config.GetSwarmingSettings().get( 145 return step_name, step_name_no_platform
74 'task_timeout_hours')
75 deadline = time.time() + timeout_hours * 60 * 60
76 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
77 'server_query_interval_seconds')
78 task_started = False
79 task_completed = False
80 tests_statuses = {}
81 step_name_no_platform = None
82 146
83 while not task_completed:
84 # Keeps monitoring the swarming task, waits for it to complete.
85 data = swarming_util.GetSwarmingTaskResultById(
86 task_id, self.HTTP_CLIENT)
87 task_state = data['state']
88 exit_code = (data['exit_code'] if
89 task_state == swarming_util.STATE_COMPLETED else None)
90 step_name_no_platform = swarming_util.GetTagValue(
91 data.get('tags', {}), 'ref_name')
92 if task_state not in swarming_util.STATES_RUNNING:
93 task_completed = True
94 task = self._GetSwarmingTask(*call_args)
95 if (task_state == swarming_util.STATE_COMPLETED and
96 int(exit_code) != swarming_util.TASK_FAILED):
97 outputs_ref = data.get('outputs_ref')
98 output_json = swarming_util.GetSwarmingTaskFailureLog(
99 outputs_ref, self.HTTP_CLIENT)
100 tests_statuses = self._CheckTestsRunStatuses(
101 output_json, *call_args)
102 task.status = analysis_status.COMPLETED
103 task.tests_statuses = tests_statuses
104 else:
105 task.status = analysis_status.ERROR
106 logging_str = 'Swarming task stopped with status: %s' % task_state
107 if exit_code:
108 logging_str += ' and exit_code: %s - %s' % (
109 exit_code, swarming_util.EXIT_CODE_DESCRIPTIONS[int(exit_code)])
110 logging.error(logging_str)
111
112 priority_str = swarming_util.GetTagValue(
113 data.get('tags', {}), 'priority')
114 if priority_str:
115 task.parameters['priority'] = int(priority_str)
116 task.put()
117 else: # pragma: no cover
118 if task_state == 'RUNNING' and not task_started:
119 # swarming task just starts, update status.
120 task_started = True
121 task = self._GetSwarmingTask(*call_args)
122 task.status = analysis_status.RUNNING
123 task.put()
124 time.sleep(server_query_interval_seconds)
125 if time.time() > deadline:
126 # Updates status as ERROR.
127 task = self._GetSwarmingTask(*call_args)
128 task.status = analysis_status.ERROR
129 task.put()
130 logging.error('Swarming task timed out after %d hours.' % timeout_hours)
131 break # Stops the loop and return.
132 # Update swarming task metadata.
133 task = self._GetSwarmingTask(*call_args)
134 task.created_time = self._ConvertDateTime(data.get('created_ts'))
135 task.started_time = self._ConvertDateTime(data.get('started_ts'))
136 task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
137 task.put()
138
139 return step_name, step_name_no_platform
OLDNEW
« no previous file with comments | « appengine/findit/model/wf_swarming_task.py ('k') | appengine/findit/waterfall/process_flake_swarming_task_result_pipeline.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698