| Index: appengine/findit/waterfall/monitor_try_job_pipeline.py
|
| diff --git a/appengine/findit/waterfall/monitor_try_job_pipeline.py b/appengine/findit/waterfall/monitor_try_job_pipeline.py
|
| index 9e27fb8d40550366af1668f38423bbb764bb38d3..419b6615b7b976cea50f8b75a8a18a0a1f4a8a3b 100644
|
| --- a/appengine/findit/waterfall/monitor_try_job_pipeline.py
|
| +++ b/appengine/findit/waterfall/monitor_try_job_pipeline.py
|
| @@ -6,11 +6,13 @@ from datetime import datetime
|
| import time
|
|
|
| from common import buildbucket_client
|
| +from common.buildbucket_client import BuildbucketBuild
|
| from model import wf_analysis_status
|
| from model.wf_try_job import WfTryJob
|
| from model.wf_try_job_data import WfTryJobData
|
| from pipeline_wrapper import BasePipeline
|
| from pipeline_wrapper import pipeline
|
| +from waterfall.try_job_type import TryJobType
|
|
|
|
|
| class MonitorTryJobPipeline(BasePipeline):
|
| @@ -59,12 +61,41 @@ class MonitorTryJobPipeline(BasePipeline):
|
| }
|
| try_job_data.put()
|
|
|
| + def _UpdateTryJobResult(
|
| + self, status, master_name, builder_name, build_number, try_job_type,
|
| + try_job_id, try_job_url, result_content=None):
|
| + """Updates try job result based on responsed try job status and result."""
|
| + result = {
|
| + 'report': result_content,
|
| + 'url': try_job_url,
|
| + 'try_job_id': try_job_id,
|
| + }
|
| +
|
| + try_job_result = WfTryJob.Get(master_name, builder_name, build_number)
|
| + if try_job_type == TryJobType.COMPILE:
|
| + result_to_update = try_job_result.compile_results
|
| + else:
|
| + result_to_update = try_job_result.test_results
|
| + if (result_to_update and
|
| + result_to_update[-1]['try_job_id'] == try_job_id):
|
| + result_to_update[-1].update(result)
|
| + else: # pragma: no cover
|
| + # Normally result for current try job should've been saved in
|
| + # schedule_try_job_pipeline, so this branch shouldn't be reached.
|
| + result_to_update.append(result)
|
| +
|
| + if status == BuildbucketBuild.STARTED: # pragma: no cover
|
| + try_job_result.status = wf_analysis_status.ANALYZING
|
| + try_job_result.put()
|
| + return result_to_update
|
| +
|
| # Arguments number differs from overridden method - pylint: disable=W0221
|
| # TODO(chanli): Handle try job for test failures later.
|
| - def run(self, master_name, builder_name, build_number, try_job_id):
|
| + def run(
|
| + self, master_name, builder_name, build_number, try_job_type, try_job_id):
|
| assert try_job_id
|
|
|
| - timeout_hours = 5 # Timeout after 5 hours.
|
| + timeout_hours = 5
|
| deadline = time.time() + timeout_hours * 60 * 60
|
| try_job_data = (WfTryJobData.Get(try_job_id) or
|
| WfTryJobData.Create(try_job_id))
|
| @@ -77,54 +108,28 @@ class MonitorTryJobPipeline(BasePipeline):
|
| self._UpdateTryJobMetadataForBuildError(try_job_data, error)
|
| raise pipeline.Retry(
|
| 'Error "%s" occurred. Reason: "%s"' % (error.message, error.reason))
|
| -
|
| - if build.status == 'COMPLETED':
|
| + elif build.status == BuildbucketBuild.COMPLETED:
|
| self._UpdateTryJobMetadataForCompletedBuild(
|
| try_job_data, build, start_time)
|
| -
|
| - result = {
|
| - 'report': build.report,
|
| - 'url': build.url,
|
| - 'try_job_id': try_job_id,
|
| - }
|
| -
|
| - try_job_result = WfTryJob.Get(master_name, builder_name, build_number)
|
| - if (try_job_result.compile_results and
|
| - try_job_result.compile_results[-1]['try_job_id'] == try_job_id):
|
| - try_job_result.compile_results[-1].update(result)
|
| - else: # pragma: no cover
|
| - try_job_result.compile_results.append(result)
|
| - try_job_result.put()
|
| - return try_job_result.compile_results[-1]
|
| + result_to_update = self._UpdateTryJobResult(
|
| + BuildbucketBuild.COMPLETED, master_name, builder_name, build_number,
|
| + try_job_type, try_job_id, build.url, build.report)
|
| + return result_to_update[-1]
|
| else: # pragma: no cover
|
| - if build.status == 'STARTED' and not already_set_started:
|
| + if build.status == BuildbucketBuild.STARTED and not already_set_started:
|
| # It is possible this branch is skipped if a fast build goes from
|
| # 'SCHEDULED' to 'COMPLETED' between queries, so start_time may be
|
| # unavailable.
|
| start_time = self._MicrosecondsToDatetime(build.updated_time)
|
| -
|
| - result = {
|
| - 'report': None,
|
| - 'url': build.url,
|
| - 'try_job_id': try_job_id,
|
| - }
|
| -
|
| - try_job_result = WfTryJob.Get(master_name, builder_name, build_number)
|
| - if (try_job_result.compile_results and
|
| - try_job_result.compile_results[-1]['try_job_id'] == try_job_id):
|
| - try_job_result.compile_results[-1].update(result)
|
| - else: # pragma: no cover
|
| - # Normally result for current try job should've been saved in
|
| - # schedule_try_job_pipeline, so this branch shouldn't be reached.
|
| - try_job_result.compile_results.append(result)
|
| -
|
| - try_job_result.status = wf_analysis_status.ANALYZING
|
| - try_job_result.put()
|
| + self._UpdateTryJobResult(
|
| + BuildbucketBuild.STARTED, master_name, builder_name, build_number,
|
| + try_job_type, try_job_id, build.url)
|
| already_set_started = True
|
|
|
| time.sleep(self.BUILDBUCKET_CLIENT_QUERY_INTERVAL_SECONDS)
|
|
|
| if time.time() > deadline: # pragma: no cover
|
| + try_job_result = WfTryJob.Get(master_name, builder_name, build_number)
|
| try_job_result.status = wf_analysis_status.ERROR
|
| try_job_result.put()
|
| self._UpdateTryJobMetadataForCompletedBuild(
|
|
|