Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(33)

Side by Side Diff: appengine/findit/waterfall/process_base_swarming_task_result_pipeline.py

Issue 2130543004: Waterfall components of regression range finder. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: refactored Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 from collections import defaultdict 5 from collections import defaultdict
6 import datetime 6 import datetime
7 import logging 7 import logging
8 import time 8 import time
9 9
10 from common.http_client_appengine import HttpClientAppengine as HttpClient 10 from common.http_client_appengine import HttpClientAppengine as HttpClient
11 from common.pipeline_wrapper import BasePipeline 11 from common.pipeline_wrapper import BasePipeline
12 from model import analysis_status 12 from model import analysis_status
13 from model.wf_swarming_task import WfSwarmingTask 13 from model.wf_swarming_task import WfSwarmingTask
14 from waterfall import swarming_util 14 from waterfall import swarming_util
15 from waterfall import waterfall_config 15 from waterfall import waterfall_config
16 16
17 17
18 def _CheckTestsRunStatuses(output_json): 18 class ProcessBaseSwarmingTaskResultPipeline(BasePipeline):
19 """Checks result status for each test run and saves the numbers accordingly.
20
21 Args:
22 output_json (dict): A dict of all test results in the swarming task.
23
24 Returns:
25 tests_statuses (dict): A dict of different statuses for each test.
26
27 Currently for each test, we are saving number of total runs,
28 number of succeeded runs and number of failed runs.
29 """
30 tests_statuses = defaultdict(lambda: defaultdict(int))
31 if output_json:
32 for iteration in output_json.get('per_iteration_data'):
33 for test_name, tests in iteration.iteritems():
34 tests_statuses[test_name]['total_run'] += len(tests)
35 for test in tests:
36 tests_statuses[test_name][test['status']] += 1
37
38 return tests_statuses
39
40
41 def _ConvertDateTime(time_string):
42 """Convert UTC time string to datetime.datetime."""
43 # Match the time convertion with swarming.py.
44 # According to swarming.py,
45 # when microseconds are 0, the '.123456' suffix is elided.
46 if not time_string:
47 return None
48 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
49 try:
50 return datetime.datetime.strptime(time_string, fmt)
51 except ValueError:
52 pass
53 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
54
55
56 class ProcessSwarmingTaskResultPipeline(BasePipeline):
57 """A pipeline for monitoring swarming task and processing task result. 19 """A pipeline for monitoring swarming task and processing task result.
58 20
59 This pipeline waits for result for a swarming task and processes the result to 21 This pipeline waits for result for a swarming task and processes the result to
60 generate a dict for statuses for each test run. 22 generate a dict for statuses for each test run.
61 """ 23 """
24 def _CheckTestsRunStatuses(self, output_json):
25 """Checks result status for each test run and saves the numbers accordingly.
26
27 Args:
28 output_json (dict): A dict of all test results in the swarming task.
29
30 Returns:
31 tests_statuses (dict): A dict of different statuses for each test.
32
33 Currently for each test, we are saving number of total runs,
34 number of succeeded runs and number of failed runs.
35 """
36 tests_statuses = defaultdict(lambda: defaultdict(int))
37 if output_json:
38 for iteration in output_json.get('per_iteration_data'):
39 for test_name, tests in iteration.iteritems():
40 tests_statuses[test_name]['total_run'] += len(tests)
41 for test in tests:
42 tests_statuses[test_name][test['status']] += 1
43 return tests_statuses
44
45 def _ConvertDateTime(self, time_string):
46 """Convert UTC time string to datetime.datetime."""
47 # Match the time convertion with swarming.py.
48 # According to swarming.py,
49 # when microseconds are 0, the '.123456' suffix is elided.
50 if not time_string:
51 return None
52 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
53 try:
54 return datetime.datetime.strptime(time_string, fmt)
55 except ValueError:
56 pass
57 raise ValueError('Failed to parse %s' % time_string) # pragma: no cover
58
59 def _GetSwarmingTask(self):
60 # Get the appropriate kind of Swarming Task (Wf or Flake)
61 # Should be overwritten by subclass
62 raise NotImplementedError
63
64 def _GetArgs(self):
65 # Return list of arguments to call _CheckTestsRunStatuses with - output_json
66 # Should be overwritten by subclass.
67 raise NotImplementedError
62 68
63 HTTP_CLIENT = HttpClient() 69 HTTP_CLIENT = HttpClient()
stgao 2016/07/14 18:34:50 style nit: move this up before all class methods.
caiw 2016/07/15 00:25:10 Done.
64 # Arguments number differs from overridden method - pylint: disable=W0221 70 # Arguments number differs from overridden method - pylint: disable=W0221
65 def run(self, master_name, builder_name, build_number, step_name, task_id): 71 def run(self, master_name, builder_name, build_number,
72 step_name, task_id, *args):
66 """ 73 """
67 Args: 74 Args:
68 master_name (str): The master name. 75 master_name (str): The master name.
69 builder_name (str): The builder name. 76 builder_name (str): The builder name.
70 build_number (str): The build number. 77 build_number (str): The build number.
71 step_name (str): The failed test step name. 78 step_name (str): The failed test step name.
72 task_id (str): Id for the swarming task which is triggered by Findit. 79 task_id (str): Id for the swarming task which is triggered by Findit.
73 80
74 Returns: 81 Returns:
75 A dict of lists for reliable/flaky tests. 82 A dict of lists for reliable/flaky tests.
76 """ 83 """
77 84 logging.info("started_running??")
85 call_args = self._GetArgs(master_name, builder_name, build_number,
86 step_name, *args)
87 logging.info(call_args)
78 assert task_id 88 assert task_id
79 timeout_hours = waterfall_config.GetSwarmingSettings().get( 89 timeout_hours = waterfall_config.GetSwarmingSettings().get(
80 'task_timeout_hours') 90 'task_timeout_hours')
81 deadline = time.time() + timeout_hours * 60 * 60 91 deadline = time.time() + timeout_hours * 60 * 60
82 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get( 92 server_query_interval_seconds = waterfall_config.GetSwarmingSettings().get(
83 'server_query_interval_seconds') 93 'server_query_interval_seconds')
84
85 task_started = False 94 task_started = False
86 task_completed = False 95 task_completed = False
87 tests_statuses = {} 96 tests_statuses = {}
88 step_name_no_platform = None 97 step_name_no_platform = None
89 98
90 while not task_completed: 99 while not task_completed:
91 # Keeps monitoring the swarming task, waits for it to complete. 100 # Keeps monitoring the swarming task, waits for it to complete.
92 data = swarming_util.GetSwarmingTaskResultById( 101 data = swarming_util.GetSwarmingTaskResultById(
93 task_id, self.HTTP_CLIENT) 102 task_id, self.HTTP_CLIENT)
94 task_state = data['state'] 103 task_state = data['state']
95 step_name_no_platform = swarming_util.GetTagValue( 104 step_name_no_platform = swarming_util.GetTagValue(
96 data.get('tags', {}), 'ref_name') 105 data.get('tags', {}), 'ref_name')
106 logging.info(task_state)
97 if task_state not in swarming_util.STATES_RUNNING: 107 if task_state not in swarming_util.STATES_RUNNING:
108 logging.info("something is happening!")
98 task_completed = True 109 task_completed = True
99 task = WfSwarmingTask.Get( 110 task = self._GetSwarmingTask(*call_args)
100 master_name, builder_name, build_number, step_name)
101 if task_state == swarming_util.STATE_COMPLETED: 111 if task_state == swarming_util.STATE_COMPLETED:
102 outputs_ref = data.get('outputs_ref') 112 outputs_ref = data.get('outputs_ref')
103 output_json = swarming_util.GetSwarmingTaskFailureLog( 113 output_json = swarming_util.GetSwarmingTaskFailureLog(
104 outputs_ref, self.HTTP_CLIENT) 114 outputs_ref, self.HTTP_CLIENT)
105 tests_statuses = _CheckTestsRunStatuses(output_json) 115 # This following line will not be compatible!
106 116 tests_statuses = self._CheckTestsRunStatuses(
117 output_json, *call_args)
107 task.status = analysis_status.COMPLETED 118 task.status = analysis_status.COMPLETED
108 task.tests_statuses = tests_statuses 119 task.tests_statuses = tests_statuses
109 else: 120 else:
110 task.status = analysis_status.ERROR 121 task.status = analysis_status.ERROR
111 logging.error('Swarming task stopped with status: %s' % ( 122 logging.error('Swarming task stopped with status: %s' % (
112 task_state)) 123 task_state))
113 priority_str = swarming_util.GetTagValue( 124 priority_str = swarming_util.GetTagValue(
114 data.get('tags', {}), 'priority') 125 data.get('tags', {}), 'priority')
115 if priority_str: 126 if priority_str:
116 task.parameters['priority'] = int(priority_str) 127 task.parameters['priority'] = int(priority_str)
117 task.put() 128 task.put()
118 else: # pragma: no cover 129 else: # pragma: no cover
119 if task_state == 'RUNNING' and not task_started: 130 if task_state == 'RUNNING' and not task_started:
120 # swarming task just starts, update status. 131 # swarming task just starts, update status.
121 task_started = True 132 task_started = True
122 task = WfSwarmingTask.Get( 133 logging.info("special wait")
123 master_name, builder_name, build_number, step_name) 134 logging.info(call_args)
stgao 2016/07/14 18:34:50 I assume these loggings are for debugging and test
135 task = self._GetSwarmingTask(*call_args)
124 task.status = analysis_status.RUNNING 136 task.status = analysis_status.RUNNING
137 logging.info(task)
125 task.put() 138 task.put()
126
127 time.sleep(server_query_interval_seconds) 139 time.sleep(server_query_interval_seconds)
128
129 if time.time() > deadline: 140 if time.time() > deadline:
130 # Updates status as ERROR. 141 # Updates status as ERROR.
131 task = WfSwarmingTask.Get( 142 task = self._GetSwarmingTask(*call_args)
132 master_name, builder_name, build_number, step_name)
133 task.status = analysis_status.ERROR 143 task.status = analysis_status.ERROR
134 task.put() 144 task.put()
135 logging.error('Swarming task timed out after %d hours.' % timeout_hours) 145 logging.error('Swarming task timed out after %d hours.' % timeout_hours)
136 break # Stops the loop and return. 146 break # Stops the loop and return.
137
138 # Update swarming task metadate. 147 # Update swarming task metadate.
139 task = WfSwarmingTask.Get( 148 task = self._GetSwarmingTask(*call_args)
140 master_name, builder_name, build_number, step_name) 149 task.created_time = self._ConvertDateTime(data.get('created_ts'))
141 task.created_time = _ConvertDateTime(data.get('created_ts')) 150 task.started_time = self._ConvertDateTime(data.get('started_ts'))
142 task.started_time = _ConvertDateTime(data.get('started_ts')) 151 task.completed_time = self._ConvertDateTime(data.get('completed_ts'))
143 task.completed_time = _ConvertDateTime(data.get('completed_ts'))
144 task.put() 152 task.put()
145 153
146 return step_name, (step_name_no_platform, task.classified_tests) 154 return step_name, (step_name_no_platform, task.classified_tests)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698