| OLD | NEW |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 from datetime import datetime | 5 from datetime import datetime |
| 6 import time | 6 import time |
| 7 | 7 |
| 8 from common import buildbucket_client | 8 from common import buildbucket_client |
| 9 from common.buildbucket_client import BuildbucketBuild |
| 9 from model import wf_analysis_status | 10 from model import wf_analysis_status |
| 10 from model.wf_try_job import WfTryJob | 11 from model.wf_try_job import WfTryJob |
| 11 from model.wf_try_job_data import WfTryJobData | 12 from model.wf_try_job_data import WfTryJobData |
| 12 from pipeline_wrapper import BasePipeline | 13 from pipeline_wrapper import BasePipeline |
| 13 from pipeline_wrapper import pipeline | 14 from pipeline_wrapper import pipeline |
| 15 from waterfall.try_job_type import TryJobType |
| 14 | 16 |
| 15 | 17 |
| 16 class MonitorTryJobPipeline(BasePipeline): | 18 class MonitorTryJobPipeline(BasePipeline): |
| 17 """A pipeline for monitoring a try job and recording results when it's done. | 19 """A pipeline for monitoring a try job and recording results when it's done. |
| 18 | 20 |
| 19 The result will be stored to compile_results or test_results according to | 21 The result will be stored to compile_results or test_results according to |
| 20 which type of build failure we are running try job for. | 22 which type of build failure we are running try job for. |
| 21 """ | 23 """ |
| 22 | 24 |
| 23 BUILDBUCKET_CLIENT_QUERY_INTERVAL_SECONDS = 60 | 25 BUILDBUCKET_CLIENT_QUERY_INTERVAL_SECONDS = 60 |
| (...skipping 28 matching lines...) Expand all Loading... |
| 52 try_job_data.try_job_url = build.url # pragma: no cover | 54 try_job_data.try_job_url = build.url # pragma: no cover |
| 53 try_job_data.regression_range_size = build.report.get( | 55 try_job_data.regression_range_size = build.report.get( |
| 54 'metadata', {}).get('regression_range_size') | 56 'metadata', {}).get('regression_range_size') |
| 55 if timed_out: | 57 if timed_out: |
| 56 try_job_data.error = { | 58 try_job_data.error = { |
| 57 'message': MonitorTryJobPipeline.TIMEOUT, | 59 'message': MonitorTryJobPipeline.TIMEOUT, |
| 58 'reason': MonitorTryJobPipeline.TIMEOUT | 60 'reason': MonitorTryJobPipeline.TIMEOUT |
| 59 } | 61 } |
| 60 try_job_data.put() | 62 try_job_data.put() |
| 61 | 63 |
| 64 def _UpdateTryJobResult( |
| 65 self, status, master_name, builder_name, build_number, try_job_type, |
| 66 try_job_id, try_job_url, result_content=None): |
| 67 """Updates try job result based on responsed try job status and result.""" |
| 68 result = { |
| 69 'report': result_content, |
| 70 'url': try_job_url, |
| 71 'try_job_id': try_job_id, |
| 72 } |
| 73 |
| 74 try_job_result = WfTryJob.Get(master_name, builder_name, build_number) |
| 75 if try_job_type == TryJobType.COMPILE: |
| 76 result_to_update = try_job_result.compile_results |
| 77 else: |
| 78 result_to_update = try_job_result.test_results |
| 79 if (result_to_update and |
| 80 result_to_update[-1]['try_job_id'] == try_job_id): |
| 81 result_to_update[-1].update(result) |
| 82 else: # pragma: no cover |
| 83 # Normally result for current try job should've been saved in |
| 84 # schedule_try_job_pipeline, so this branch shouldn't be reached. |
| 85 result_to_update.append(result) |
| 86 |
| 87 if status == BuildbucketBuild.STARTED: # pragma: no cover |
| 88 try_job_result.status = wf_analysis_status.ANALYZING |
| 89 try_job_result.put() |
| 90 return result_to_update |
| 91 |
| 62 # Arguments number differs from overridden method - pylint: disable=W0221 | 92 # Arguments number differs from overridden method - pylint: disable=W0221 |
| 63 # TODO(chanli): Handle try job for test failures later. | 93 # TODO(chanli): Handle try job for test failures later. |
| 64 def run(self, master_name, builder_name, build_number, try_job_id): | 94 def run( |
| 95 self, master_name, builder_name, build_number, try_job_type, try_job_id): |
| 65 assert try_job_id | 96 assert try_job_id |
| 66 | 97 |
| 67 timeout_hours = 5 # Timeout after 5 hours. | 98 timeout_hours = 5 |
| 68 deadline = time.time() + timeout_hours * 60 * 60 | 99 deadline = time.time() + timeout_hours * 60 * 60 |
| 69 try_job_data = (WfTryJobData.Get(try_job_id) or | 100 try_job_data = (WfTryJobData.Get(try_job_id) or |
| 70 WfTryJobData.Create(try_job_id)) | 101 WfTryJobData.Create(try_job_id)) |
| 71 | 102 |
| 72 already_set_started = False | 103 already_set_started = False |
| 73 start_time = None | 104 start_time = None |
| 74 while True: | 105 while True: |
| 75 error, build = buildbucket_client.GetTryJobs([try_job_id])[0] | 106 error, build = buildbucket_client.GetTryJobs([try_job_id])[0] |
| 76 if error: # pragma: no cover | 107 if error: # pragma: no cover |
| 77 self._UpdateTryJobMetadataForBuildError(try_job_data, error) | 108 self._UpdateTryJobMetadataForBuildError(try_job_data, error) |
| 78 raise pipeline.Retry( | 109 raise pipeline.Retry( |
| 79 'Error "%s" occurred. Reason: "%s"' % (error.message, error.reason)) | 110 'Error "%s" occurred. Reason: "%s"' % (error.message, error.reason)) |
| 80 | 111 elif build.status == BuildbucketBuild.COMPLETED: |
| 81 if build.status == 'COMPLETED': | |
| 82 self._UpdateTryJobMetadataForCompletedBuild( | 112 self._UpdateTryJobMetadataForCompletedBuild( |
| 83 try_job_data, build, start_time) | 113 try_job_data, build, start_time) |
| 84 | 114 result_to_update = self._UpdateTryJobResult( |
| 85 result = { | 115 BuildbucketBuild.COMPLETED, master_name, builder_name, build_number, |
| 86 'report': build.report, | 116 try_job_type, try_job_id, build.url, build.report) |
| 87 'url': build.url, | 117 return result_to_update[-1] |
| 88 'try_job_id': try_job_id, | |
| 89 } | |
| 90 | |
| 91 try_job_result = WfTryJob.Get(master_name, builder_name, build_number) | |
| 92 if (try_job_result.compile_results and | |
| 93 try_job_result.compile_results[-1]['try_job_id'] == try_job_id): | |
| 94 try_job_result.compile_results[-1].update(result) | |
| 95 else: # pragma: no cover | |
| 96 try_job_result.compile_results.append(result) | |
| 97 try_job_result.put() | |
| 98 return try_job_result.compile_results[-1] | |
| 99 else: # pragma: no cover | 118 else: # pragma: no cover |
| 100 if build.status == 'STARTED' and not already_set_started: | 119 if build.status == BuildbucketBuild.STARTED and not already_set_started: |
| 101 # It is possible this branch is skipped if a fast build goes from | 120 # It is possible this branch is skipped if a fast build goes from |
| 102 # 'SCHEDULED' to 'COMPLETED' between queries, so start_time may be | 121 # 'SCHEDULED' to 'COMPLETED' between queries, so start_time may be |
| 103 # unavailable. | 122 # unavailable. |
| 104 start_time = self._MicrosecondsToDatetime(build.updated_time) | 123 start_time = self._MicrosecondsToDatetime(build.updated_time) |
| 105 | 124 self._UpdateTryJobResult( |
| 106 result = { | 125 BuildbucketBuild.STARTED, master_name, builder_name, build_number, |
| 107 'report': None, | 126 try_job_type, try_job_id, build.url) |
| 108 'url': build.url, | |
| 109 'try_job_id': try_job_id, | |
| 110 } | |
| 111 | |
| 112 try_job_result = WfTryJob.Get(master_name, builder_name, build_number) | |
| 113 if (try_job_result.compile_results and | |
| 114 try_job_result.compile_results[-1]['try_job_id'] == try_job_id): | |
| 115 try_job_result.compile_results[-1].update(result) | |
| 116 else: # pragma: no cover | |
| 117 # Normally result for current try job should've been saved in | |
| 118 # schedule_try_job_pipeline, so this branch shouldn't be reached. | |
| 119 try_job_result.compile_results.append(result) | |
| 120 | |
| 121 try_job_result.status = wf_analysis_status.ANALYZING | |
| 122 try_job_result.put() | |
| 123 already_set_started = True | 127 already_set_started = True |
| 124 | 128 |
| 125 time.sleep(self.BUILDBUCKET_CLIENT_QUERY_INTERVAL_SECONDS) | 129 time.sleep(self.BUILDBUCKET_CLIENT_QUERY_INTERVAL_SECONDS) |
| 126 | 130 |
| 127 if time.time() > deadline: # pragma: no cover | 131 if time.time() > deadline: # pragma: no cover |
| 132 try_job_result = WfTryJob.Get(master_name, builder_name, build_number) |
| 128 try_job_result.status = wf_analysis_status.ERROR | 133 try_job_result.status = wf_analysis_status.ERROR |
| 129 try_job_result.put() | 134 try_job_result.put() |
| 130 self._UpdateTryJobMetadataForCompletedBuild( | 135 self._UpdateTryJobMetadataForCompletedBuild( |
| 131 try_job_data, build, start_time, timed_out=True) | 136 try_job_data, build, start_time, timed_out=True) |
| 132 # Explicitly abort the whole pipeline. | 137 # Explicitly abort the whole pipeline. |
| 133 raise pipeline.Abort( | 138 raise pipeline.Abort( |
| 134 'Try job %s timed out after %d hours.' % ( | 139 'Try job %s timed out after %d hours.' % ( |
| 135 try_job_id, timeout_hours)) | 140 try_job_id, timeout_hours)) |
| OLD | NEW |