| Index: appengine/findit/crash/crash_pipeline.py
|
| diff --git a/appengine/findit/crash/crash_pipeline.py b/appengine/findit/crash/crash_pipeline.py
|
| index 198f0d21f92f73c9f3c9ec118a2256e5d434f6aa..eb69c72f358b6d88edca2c4b907a03cc5543addb 100644
|
| --- a/appengine/findit/crash/crash_pipeline.py
|
| +++ b/appengine/findit/crash/crash_pipeline.py
|
| @@ -6,9 +6,6 @@ import copy
|
| import json
|
| import logging
|
|
|
| -from google.appengine.api import app_identity
|
| -from google.appengine.ext import ndb
|
| -
|
| from common import appengine_util
|
| from common import constants
|
| from common import git_repository
|
| @@ -17,53 +14,116 @@ from common import time_util
|
| from common.http_client_appengine import HttpClientAppengine
|
| from common.pipeline_wrapper import BasePipeline
|
| from common.pipeline_wrapper import pipeline
|
| -from crash import findit_for_client
|
| +from crash import findit_for_chromecrash
|
| +from crash import findit_for_clusterfuzz
|
| +from crash.type_enums import CrashClient
|
| from model import analysis_status
|
| from model.crash.crash_config import CrashConfig
|
|
|
|
|
| +# TODO(http://crbug.com/659346): write complete coverage tests for this.
|
| +def FinditForClientID(client_id): # pragma: no cover
|
| + """Construct a Findit object from a client id string specifying the class.
|
| +
|
| + We cannot pass Findit objects to the various methods in
|
| + |crash.crash_pipeline|, because they are not JSON serializable. For now,
|
| + we just serialize Findit objects as their |client_id|, and then use this
|
| + function to reconstruct them. Alas, this means we will lose various
|
| + other information stored in the Findit object (i.e., stuff that comes
|
| + from CrashConfig); which could lead to some hard-to-diagnose coherency
|
| + bugs, since the new Findit object will be based on the CrashConfig at
|
| + the time it's constructed, which may be different than the CrashConfig
|
| + at the time the previous Findit object was constructed. In the future
|
| + we should fix all this to serialize Findit objects in a more robust way.
|
| + """
|
| + assert isinstance(client_id, (str, unicode)), (
|
| + 'FinditForClientID: expected string or unicode, but got %s'
|
| + % client_id.__class__.__name__)
|
| + # TODO(wrengr): it'd be nice to replace this with a single lookup in
|
| + # a dict; but that's buggy for some unknown reason.
|
| + if client_id == CrashClient.FRACAS:
|
| + cls = findit_for_chromecrash.FinditForFracas
|
| + elif client_id == CrashClient.CRACAS:
|
| + cls = findit_for_chromecrash.FinditForCracas
|
| + elif client_id == CrashClient.CLUSTERFUZZ:
|
| + cls = findit_for_clusterfuzz.FinditForClusterfuzz
|
| + else:
|
| + raise ValueError('FinditForClientID: '
|
| + 'unknown or unsupported client %s' % client_id)
|
| +
|
| + return cls(
|
| + git_repository.GitRepository(http_client=HttpClientAppengine()),
|
| + CrashWrapperPipeline)
|
| +
|
| +
|
| +# Some notes about the classes below, for people who are not
|
| +# familiar with AppEngine. The thing that really kicks everything off
|
| +# is |CrashWrapperPipeline.run|. However, an important thing to bear in
|
| +# mind is that whatever arguments are passed to that method will also
|
| +# be passed to the |run| method on whatever objects it yields. Thus,
|
| +# all the |run| methods across these different classes must have the same
|
| +# type. In practice, we end up passing all the arguments to the
|
| +# constructors, because we need to have the fields around for logging
|
| +# (e.g., in |finalized|); thus, there's nothing that needs to be passed
|
| +# to |run|. Another thing to bear in mind is that whatever objects
|
| +# |CrashWrapperPipeline.run| yields must be JSON-serializable. The base
|
| +# class handles most of that for us, so the force of this constraint is
|
| +# that all the arguments to the constructors for those classes must be
|
| +# JSON-serializable. Thus, we cannot actually pass a Findit object to
|
| +# the constructor, but rather must pass only the |client_id| (or whatever
|
| +# JSON dict) and then reconstruct the Findit object from that. Moreover,
|
| +# the |run| method and the |finalized| method will be run in different
|
| +# processes, so we will actually end up reconstructing the Findit object
|
| +# twice. Thus, we shouldn't store anything in the pipeline objects outside
|
| +# of what their constructors store.
|
| +
|
| class CrashBasePipeline(BasePipeline):
|
| - def __init__(self, crash_identifiers, client_id):
|
| - super(CrashBasePipeline, self).__init__(crash_identifiers, client_id)
|
| - self.crash_identifiers = crash_identifiers
|
| - self.client_id = client_id
|
| + def __init__(self, client_id, crash_identifiers):
|
| + super(CrashBasePipeline, self).__init__(client_id, crash_identifiers)
|
| + self._crash_identifiers = crash_identifiers
|
| + self._findit = FinditForClientID(client_id)
|
| +
|
| + @property
|
| + def client_id(self): # pragma: no cover
|
| + return self._findit.client_id
|
|
|
| def run(self, *args, **kwargs):
|
| raise NotImplementedError()
|
|
|
|
|
| class CrashAnalysisPipeline(CrashBasePipeline):
|
| - def _SetErrorIfAborted(self, aborted):
|
| - if not aborted:
|
| - return
|
| -
|
| - logging.error('Aborted analysis for %s', repr(self.crash_identifiers))
|
| - analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers,
|
| - self.client_id)
|
| + def finalized(self): # pragma: no cover
|
| + if self.was_aborted:
|
| + self._PutAbortedError()
|
| +
|
| + # N.B., this method must be factored out for unittest reasons; since
|
| + # |finalized| takes no arguments (by AppEngine's spec) and |was_aborted|
|
| + # can't be altered directly.
|
| + def _PutAbortedError(self):
|
| + """Update the ndb.Model to indicate that this pipeline was aborted."""
|
| + logging.error('Aborted analysis for %s', repr(self._crash_identifiers))
|
| + analysis = self._findit.GetAnalysis(self._crash_identifiers)
|
| analysis.status = analysis_status.ERROR
|
| analysis.put()
|
|
|
| - def finalized(self):
|
| - self._SetErrorIfAborted(self.was_aborted)
|
| -
|
| + # TODO(http://crbug.com/659346): we misplaced the coverage test; find it!
|
| # Arguments number differs from overridden method - pylint: disable=W0221
|
| - def run(self, crash_identifiers, client_id):
|
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
|
| - client_id)
|
| + def run(self):
|
| + # TODO(wrengr): shouldn't this method somehow call _NeedsNewAnalysis
|
| + # to guard against race conditions?
|
| + analysis = self._findit.GetAnalysis(self._crash_identifiers)
|
|
|
| - # Update analysis status.
|
| + # Update the model's status to say we're in the process of doing analysis.
|
| analysis.pipeline_status_path = self.pipeline_status_path()
|
| analysis.status = analysis_status.RUNNING
|
| analysis.started_time = time_util.GetUTCNow()
|
| analysis.findit_version = appengine_util.GetCurrentVersion()
|
| analysis.put()
|
|
|
| - # Run the analysis.
|
| - result, tags = findit_for_client.FindCulprit(
|
| - analysis, git_repository.GitRepository(
|
| - http_client=HttpClientAppengine()))
|
| + # Actually do the analysis.
|
| + result, tags = self._findit.FindCulprit(analysis).ToDicts()
|
|
|
| - # Update analysis status and save the analysis result.
|
| + # Update model's status to say we're done, and save the results.
|
| analysis.completed_time = time_util.GetUTCNow()
|
| analysis.result = result
|
| for tag_name, tag_value in tags.iteritems():
|
| @@ -75,84 +135,48 @@ class CrashAnalysisPipeline(CrashBasePipeline):
|
|
|
|
|
| class PublishResultPipeline(CrashBasePipeline):
|
| + # TODO(http://crbug.com/659346): we misplaced the coverage test; find it!
|
| def finalized(self):
|
| if self.was_aborted: # pragma: no cover.
|
| logging.error('Failed to publish %s analysis result for %s',
|
| - repr(self.crash_identifiers), self.client_id)
|
| -
|
| + repr(self._crash_identifiers), self.client_id)
|
|
|
| # Arguments number differs from overridden method - pylint: disable=W0221
|
| - def run(self, crash_identifiers, client_id):
|
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
|
| - client_id)
|
| - result = findit_for_client.GetPublishResultFromAnalysis(analysis,
|
| - crash_identifiers,
|
| - client_id)
|
| + def run(self):
|
| + analysis = self._findit.GetAnalysis(self._crash_identifiers)
|
| + result = analysis.ToPublishableResult(self._crash_identifiers)
|
| messages_data = [json.dumps(result, sort_keys=True)]
|
|
|
| - client_config = CrashConfig.Get().GetClientConfig(client_id)
|
| + # TODO(http://crbug.com/659354): remove Findit's dependency on CrashConfig.
|
| + client_config = self._findit.config
|
| # TODO(katesonia): Clean string uses in config.
|
| topic = client_config['analysis_result_pubsub_topic']
|
| pubsub_util.PublishMessagesToTopic(messages_data, topic)
|
| - logging.info('Published %s analysis result for %s', client_id,
|
| - repr(crash_identifiers))
|
| + logging.info('Published %s analysis result for %s', self.client_id,
|
| + repr(self._crash_identifiers))
|
|
|
|
|
| class CrashWrapperPipeline(BasePipeline):
|
| + """Fire off pipelines to (1) do the analysis and (2) publish results.
|
| +
|
| + The reason we have analysis and publishing as separate pipelines is
|
| + because each of them can fail for independent reasons. E.g., if we
|
| + successfully finish the analysis, but then the publishing fails due to
|
| + network errors, we don't want to have to redo the analysis in order
|
| + to redo the publishing. We could try to cache the fact that analysis
|
| + succeeded in the pipeline object itself, but we'd have to be careful
|
| + because the |run| and |finalized| methods are executed in different
|
| + processes.
|
| + """
|
| + def __init__(self, client_id, crash_identifiers):
|
| + super(CrashWrapperPipeline, self).__init__(client_id, crash_identifiers)
|
| + self._crash_identifiers = crash_identifiers
|
| + self._client_id = client_id
|
| +
|
| + # TODO(http://crbug.com/659346): write coverage tests.
|
| # Arguments number differs from overridden method - pylint: disable=W0221
|
| - def run(self, crash_identifiers, client_id):
|
| - run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id)
|
| + def run(self): # pragma: no cover
|
| + run_analysis = yield CrashAnalysisPipeline(
|
| + self._client_id, self._crash_identifiers)
|
| with pipeline.After(run_analysis):
|
| - yield PublishResultPipeline(crash_identifiers, client_id)
|
| -
|
| -
|
| -@ndb.transactional
|
| -def _NeedsNewAnalysis(
|
| - crash_identifiers, chrome_version, signature, client_id,
|
| - platform, stack_trace, customized_data):
|
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
|
| - client_id)
|
| - regression_range = findit_for_client.GetRegressionRange(client_id,
|
| - customized_data)
|
| - if (analysis and not analysis.failed and
|
| - regression_range == analysis.regression_range):
|
| - logging.info('The analysis of %s has already been done.',
|
| - repr(crash_identifiers))
|
| - return False
|
| -
|
| - # Create analysis for findit to run if this is not a rerun.
|
| - if not analysis:
|
| - analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers,
|
| - client_id)
|
| -
|
| - findit_for_client.ResetAnalysis(analysis, chrome_version, signature,
|
| - client_id, platform, stack_trace,
|
| - customized_data, regression_range)
|
| - return True
|
| -
|
| -
|
| -def ScheduleNewAnalysisForCrash(
|
| - crash_identifiers, chrome_version, signature, client_id,
|
| - platform, stack_trace, customized_data,
|
| - queue_name=constants.DEFAULT_QUEUE):
|
| - """Schedules an analysis."""
|
| - # Check policy and tune arguments if needed.
|
| - pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient(
|
| - crash_identifiers, chrome_version, signature,
|
| - client_id, platform, stack_trace,
|
| - customized_data)
|
| -
|
| - if not pass_policy:
|
| - return False
|
| -
|
| - if _NeedsNewAnalysis(*updated_analysis_args):
|
| - analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id)
|
| - # Attribute defined outside __init__ - pylint: disable=W0201
|
| - analysis_pipeline.target = appengine_util.GetTargetNameForModule(
|
| - constants.CRASH_BACKEND[client_id])
|
| - analysis_pipeline.start(queue_name=queue_name)
|
| - logging.info('New %s analysis is scheduled for %s', client_id,
|
| - repr(crash_identifiers))
|
| - return True
|
| -
|
| - return False
|
| + yield PublishResultPipeline(self._client_id, self._crash_identifiers)
|
|
|