Index: appengine/findit/waterfall/build_failure_analysis_pipelines.py |
diff --git a/appengine/findit/waterfall/build_failure_analysis_pipelines.py b/appengine/findit/waterfall/build_failure_analysis_pipelines.py |
index 91af3608be8b35d6959fd22afd1d4e436faa60f2..2adc4337c3b804ea8fbdd80b7fe3474ee217cd7a 100644 |
--- a/appengine/findit/waterfall/build_failure_analysis_pipelines.py |
+++ b/appengine/findit/waterfall/build_failure_analysis_pipelines.py |
@@ -2,14 +2,23 @@ |
# Use of this source code is governed by a BSD-style license that can be |
# found in the LICENSE file. |
+import collections |
+from datetime import datetime |
import logging |
+import random |
+import time |
+from google.appengine.api import memcache |
from google.appengine.ext import ndb |
from pipeline_utils import pipelines |
+from pipeline_utils.appengine_third_party_pipeline_src_pipeline import pipeline |
+from common.http_client_appengine import HttpClientAppengine as HttpClient |
from model.build import Build |
+from model.build_analysis import BuildAnalysis |
from model.build_analysis_status import BuildAnalysisStatus |
+from waterfall import buildbot |
# TODO(stgao): remove BasePipeline after http://crrev.com/810193002 is landed. |
@@ -27,55 +36,289 @@ class BasePipeline(pipelines.AppenginePipeline): # pragma: no cover |
raise NotImplementedError() |
+_MEMCACHE_MASTER_DOWNLOAD_LOCK = 'master-download-lock-%s' |
+_MEMCACHE_MASTER_DOWNLOAD_EXPIRATION_SECONDS = 60 * 60 |
+_DOWNLOAD_INTERVAL_SECONDS = 5 |
+_MAX_BUILDS_TO_CHECK_FOR_FIRST_FAILURE = 20 |
+ |
+ |
+def _WaitUntilDownloadAllowed( |
+ master_name, timeout_seconds=90): # pragma: no cover |
+ """Waits until next download from the specified master is allowed. |
+ |
+ Returns: |
+ True if download is allowed to proceed. |
+ False if download is not allowed until the given timeout occurs. |
+ """ |
+ client = memcache.Client() |
+ key = _MEMCACHE_MASTER_DOWNLOAD_LOCK % master_name |
+ |
+ deadline = time.time() + timeout_seconds |
+ while True: |
+ info = client.gets(key) |
+ if not info or time.time() - info['time'] >= _DOWNLOAD_INTERVAL_SECONDS: |
+ new_info = { |
+ 'time': time.time() |
+ } |
+ if not info: |
+ success = client.add( |
+ key, new_info, time=_MEMCACHE_MASTER_DOWNLOAD_EXPIRATION_SECONDS) |
+ else: |
+ success = client.cas( |
+ key, new_info, time=_MEMCACHE_MASTER_DOWNLOAD_EXPIRATION_SECONDS) |
+ |
+ if success: |
+ logging.info('Download from %s is allowed. Waited %s seconds.', |
+ master_name, (time.time() + timeout_seconds - deadline)) |
+ return True |
+ |
+ if time.time() > deadline: |
+ logging.info('Download from %s is not allowed. Waited %s seconds.', |
+ master_name, timeout_seconds) |
+ return False |
+ |
+ logging.info('Waiting to download from %s', master_name) |
+ time.sleep(_DOWNLOAD_INTERVAL_SECONDS + random.random()) |
+ |
+ |
+class DetectFirstFailurePipeline(BasePipeline): |
stgao
2015/01/06 23:54:26
I'm planning to move sub-pipelines to separate mod
stgao
2015/01/09 01:28:23
Done.
|
+ HTTP_CLIENT = HttpClient() |
+ |
+ def _BuildDataNeedUpdating(self, build): |
+ return (not build.data or (not build.completed and |
+ (datetime.utcnow() - build.last_crawled_time).total_seconds >= 60 * 5)) |
+ |
+ def _DownloadBuildData(self, master_name, builder_name, build_number): |
+ """Downloads build data and returns a Build instance.""" |
+ build = Build.GetBuild(master_name, builder_name, build_number) |
+ if not build: # pragma: no cover |
+ build = Build.CreateBuild(master_name, builder_name, build_number) |
+ |
+ # Cache the data to avoid pulling from master again. |
+ if self._BuildDataNeedUpdating(build): # pragma: no cover |
+ if not _WaitUntilDownloadAllowed(master_name): # pragma: no cover |
+ raise pipeline.Retry('Too many download from %s' % master_name) |
+ |
+ build.data = buildbot.GetBuildData( |
+ build.master_name, build.builder_name, build.build_number, |
+ self.HTTP_CLIENT) |
+ build.last_crawled_time = datetime.utcnow() |
+ build.put() |
+ |
+ return build |
+ |
+ def _ExtractBuildInfo(self, master_name, builder_name, build_number): |
+ """Returns a BuildInfo instance for the specified build.""" |
+ build = self._DownloadBuildData(master_name, builder_name, build_number) |
+ if not build.data: # pragma: no cover |
+ return None |
+ |
+ build_info = buildbot.ExtractBuildInfo( |
+ master_name, builder_name, build_number, build.data) |
+ |
+ if not build.completed: # pragma: no cover |
+ build.start_time = build_info.build_start_time |
+ build.completed = build_info.completed |
+ build.result = build_info.result |
+ build.put() |
+ |
+ analysis = BuildAnalysis.GetBuildAnalysis( |
+ master_name, builder_name, build_number) |
+ if analysis and not analysis.build_start_time: |
+ analysis.build_start_time = build_info.build_start_time |
+ analysis.put() |
+ |
+ return build_info |
+ |
+ def _SaveBlamelistAndChromiumRevisionIntoDict(self, build_info, builds): |
+ """ |
+ Args: |
+ build_info (BuildInfo): a BuildInfo instance which contains blame list and |
+ chromium revision. |
+ builds (dict): to which the blame list and chromium revision is saved. It |
+ will be updated and looks like: |
+ { |
+ 555 : { |
+ 'chromium_revision': 'a_git_hash', |
+ 'blame_list': ['git_hash1', 'git_hash2'], |
+ }, |
+ } |
+ """ |
+ builds[build_info.build_number] = { |
+ 'chromium_revision': build_info.chromium_revision, |
+ 'blame_list': build_info.blame_list |
+ } |
+ |
+ def _CreateADictOfFailedSteps(self, build_info): |
+ """ Returns a dict with build number for failed steps. |
+ |
+ Args: |
+ failed_steps (list): a list of failed steps. |
+ |
+ Returns: |
+ A dict like this: |
+ { |
+ 'step_name': { |
+ 'current_failure': 555, |
+ 'first_failure': 553, |
+ }, |
+ } |
+ """ |
+ failed_steps = dict() |
+ for step_name in build_info.failed_steps: |
+ failed_steps[step_name] = { |
+ 'current_failure': build_info.build_number, |
+ 'first_failure': build_info.build_number, |
+ } |
+ |
+ return failed_steps |
+ |
+ def _CheckForFirstKnownFailure(self, master_name, builder_name, build_number, |
+ failed_steps, builds): |
+ """Checks for first known failures of the given failed steps. |
+ |
+ Args: |
+ master_name (str): master of the failed build. |
+ builder_name (str): builder of the failed build. |
+ build_number (int): builder number of the current failed build. |
+ failed_steps (dict): the failed steps of the current failed build. It will |
+ be updated with build numbers for 'first_failure' and 'last_pass' of |
+ each failed step. |
+ builds (dict): a dict to save blame list and chromium revision. |
+ """ |
+ # Look back for first known failures. |
+ for i in range(_MAX_BUILDS_TO_CHECK_FOR_FIRST_FAILURE): |
+ build_info = self._ExtractBuildInfo( |
+ master_name, builder_name, build_number - i - 1) |
+ |
+ if not build_info: # pragma: no cover |
+ # Failed to extract the build information, bail out. |
+ return |
+ |
+ self._SaveBlamelistAndChromiumRevisionIntoDict(build_info, builds) |
+ |
+ if build_info.result == buildbot.SUCCESS: |
+ for step_name in failed_steps: |
+ if 'last_pass' not in failed_steps[step_name]: |
+ failed_steps[step_name]['last_pass'] = build_info.build_number |
+ |
+ # All steps passed, so stop looking back. |
+ return |
+ else: |
+ # If a step is not run due to some bot exception, we are not sure |
+ # whether the step could pass or not. So we only check failed/passed |
+ # steps here. |
+ |
+ for step_name in build_info.failed_steps: |
+ if step_name in failed_steps: |
+ failed_steps[step_name]['first_failure'] = build_info.build_number |
+ |
+ for step_name in failed_steps: |
+ if step_name in build_info.passed_steps: |
+ failed_steps[step_name]['last_pass'] = build_info.build_number |
+ |
+ if all('last_pass' in step_info for step_info in failed_steps.values()): |
+ # All failed steps passed in this build cycle. |
+ return # pragma: no cover |
+ |
+ # Arguments number differs from overridden method - pylint: disable=W0221 |
+ def run(self, master_name, builder_name, build_number): |
+ build_info = self._ExtractBuildInfo(master_name, builder_name, build_number) |
+ |
+ if not build_info: # pragma: no cover |
+ raise pipeline.Retry('Failed to extract build info.') |
+ |
+ failure_info = { |
+ 'failed': True, |
+ 'master_name': master_name, |
+ 'builder_name': builder_name, |
+ 'build_number': build_number |
+ } |
+ |
+ if (build_info.result == buildbot.SUCCESS or |
+ not build_info.failed_steps): # pragma: no cover |
+ failure_info['failed'] = False |
+ return failure_info |
+ |
+ builds = dict() |
+ self._SaveBlamelistAndChromiumRevisionIntoDict(build_info, builds) |
+ |
+ failed_steps = self._CreateADictOfFailedSteps(build_info) |
+ |
+ self._CheckForFirstKnownFailure( |
+ master_name, builder_name, build_number, failed_steps, builds) |
+ |
+ failure_info['builds'] = builds |
+ failure_info['failed_steps'] = failed_steps |
+ return failure_info |
+ |
+ |
class BuildFailurePipeline(BasePipeline): |
+ def __init__(self, master_name, builder_name, build_number): |
+ super(BuildFailurePipeline, self).__init__( |
+ master_name, builder_name, build_number) |
+ self.master_name = master_name |
+ self.builder_name = builder_name |
+ self.build_number = build_number |
+ |
+ def finalized(self): |
+ analysis = BuildAnalysis.GetBuildAnalysis( |
+ self.master_name, self.builder_name, self.build_number) |
+ if self.was_aborted: # pragma: no cover |
+ analysis.status = BuildAnalysisStatus.ERROR |
+ else: |
+ analysis.status = BuildAnalysisStatus.ANALYZED |
+ analysis.put() |
+ |
# Arguments number differs from overridden method - pylint: disable=W0221 |
def run(self, master_name, builder_name, build_number): |
- build = Build.GetBuild(master_name, builder_name, build_number) |
+ analysis = BuildAnalysis.GetBuildAnalysis( |
+ master_name, builder_name, build_number) |
+ analysis.pipeline_url = self.pipeline_status_url() |
+ analysis.status = BuildAnalysisStatus.ANALYZING |
+ analysis.start_time = datetime.utcnow() |
+ analysis.put() |
- # TODO: implement the logic. |
- build.analysis_status = BuildAnalysisStatus.ANALYZED |
- build.put() |
+ yield DetectFirstFailurePipeline(master_name, builder_name, build_number) |
@ndb.transactional |
def NeedANewAnalysis(master_name, builder_name, build_number, force): |
- """Check analysis status of a build and decide if a new analysis is needed. |
+ """Checks status of analysis for the build and decides if a new one is needed. |
+ |
+ A BuildAnalysis entity for the given build will be created if none exists. |
Returns: |
- (build, need_analysis) |
- build (Build): the build as specified by the input. |
- need_analysis (bool): True if an analysis is needed, otherwise False. |
+ True if an analysis is needed, otherwise False. |
""" |
- build_key = Build.CreateKey(master_name, builder_name, build_number) |
- build = build_key.get() |
- |
- if not build: |
- build = Build.CreateBuild(master_name, builder_name, build_number) |
- build.analysis_status = BuildAnalysisStatus.PENDING |
- build.put() |
- return build, True |
+ analysis = BuildAnalysis.GetBuildAnalysis( |
+ master_name, builder_name, build_number) |
+ |
+ if not analysis: |
+ analysis = BuildAnalysis.CreateBuildAnalysis( |
+ master_name, builder_name, build_number) |
+ analysis.status = BuildAnalysisStatus.PENDING |
+ analysis.put() |
+ return True |
elif force: |
# TODO: avoid concurrent analysis. |
- build.Reset() |
- build.put() |
- return build, True |
+ analysis.Reset() |
+ analysis.put() |
+ return True |
else: |
# TODO: support following cases |
# 1. Automatically retry if last analysis failed with errors. |
# 2. Start another analysis if the build cycle wasn't completed in last |
# analysis request. |
# 3. Analysis is not complete and no update in the last 5 minutes. |
- return build, False |
+ return False |
def ScheduleAnalysisIfNeeded(master_name, builder_name, build_number, force, |
queue_name): |
- """Schedule an analysis if needed and return the build.""" |
- build, need_new_analysis = NeedANewAnalysis( |
- master_name, builder_name, build_number, force) |
- |
- if need_new_analysis: |
+ """Schedules an analysis if needed and returns the build analysis.""" |
+ if NeedANewAnalysis(master_name, builder_name, build_number, force): |
pipeline_job = BuildFailurePipeline(master_name, builder_name, build_number) |
pipeline_job.start(queue_name=queue_name) |
@@ -85,4 +328,4 @@ def ScheduleAnalysisIfNeeded(master_name, builder_name, build_number, force, |
else: # pragma: no cover |
logging.info('Analysis was already triggered or the result is recent.') |
- return build |
+ return BuildAnalysis.GetBuildAnalysis(master_name, builder_name, build_number) |