Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3092)

Unified Diff: appengine/findit/crash/crash_pipeline.py

Issue 2414523002: [Findit] Reorganizing findit_for_*.py (Closed)
Patch Set: Finally fixed the mock tests! Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/findit/crash/component_classifier.py ('k') | appengine/findit/crash/crash_report.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/findit/crash/crash_pipeline.py
diff --git a/appengine/findit/crash/crash_pipeline.py b/appengine/findit/crash/crash_pipeline.py
index 198f0d21f92f73c9f3c9ec118a2256e5d434f6aa..eb69c72f358b6d88edca2c4b907a03cc5543addb 100644
--- a/appengine/findit/crash/crash_pipeline.py
+++ b/appengine/findit/crash/crash_pipeline.py
@@ -6,9 +6,6 @@ import copy
import json
import logging
-from google.appengine.api import app_identity
-from google.appengine.ext import ndb
-
from common import appengine_util
from common import constants
from common import git_repository
@@ -17,53 +14,116 @@ from common import time_util
from common.http_client_appengine import HttpClientAppengine
from common.pipeline_wrapper import BasePipeline
from common.pipeline_wrapper import pipeline
-from crash import findit_for_client
+from crash import findit_for_chromecrash
+from crash import findit_for_clusterfuzz
+from crash.type_enums import CrashClient
from model import analysis_status
from model.crash.crash_config import CrashConfig
+# TODO(http://crbug.com/659346): write complete coverage tests for this.
+def FinditForClientID(client_id): # pragma: no cover
+ """Construct a Findit object from a client id string specifying the class.
+
+ We cannot pass Findit objects to the various methods in
+ |crash.crash_pipeline|, because they are not JSON serializable. For now,
+ we just serialize Findit objects as their |client_id|, and then use this
+ function to reconstruct them. Alas, this means we will lose various
+ other information stored in the Findit object (i.e., stuff that comes
+ from CrashConfig); which could lead to some hard-to-diagnose coherency
+ bugs, since the new Findit object will be based on the CrashConfig at
+ the time it's constructed, which may be different than the CrashConfig
+ at the time the previous Findit object was constructed. In the future
+ we should fix all this to serialize Findit objects in a more robust way.
+ """
+ assert isinstance(client_id, (str, unicode)), (
+ 'FinditForClientID: expected string or unicode, but got %s'
+ % client_id.__class__.__name__)
+ # TODO(wrengr): it'd be nice to replace this with a single lookup in
+ # a dict; but that's buggy for some unknown reason.
+ if client_id == CrashClient.FRACAS:
+ cls = findit_for_chromecrash.FinditForFracas
+ elif client_id == CrashClient.CRACAS:
+ cls = findit_for_chromecrash.FinditForCracas
+ elif client_id == CrashClient.CLUSTERFUZZ:
+ cls = findit_for_clusterfuzz.FinditForClusterfuzz
+ else:
+ raise ValueError('FinditForClientID: '
+ 'unknown or unsupported client %s' % client_id)
+
+ return cls(
+ git_repository.GitRepository(http_client=HttpClientAppengine()),
+ CrashWrapperPipeline)
+
+
+# Some notes about the classes below, for people who are not
+# familiar with AppEngine. The thing that really kicks everything off
+# is |CrashWrapperPipeline.run|. However, an important thing to bear in
+# mind is that whatever arguments are passed to that method will also
+# be passed to the |run| method on whatever objects it yields. Thus,
+# all the |run| methods across these different classes must have the same
+# type. In practice, we end up passing all the arguments to the
+# constructors, because we need to have the fields around for logging
+# (e.g., in |finalized|); thus, there's nothing that needs to be passed
+# to |run|. Another thing to bear in mind is that whatever objects
+# |CrashWrapperPipeline.run| yields must be JSON-serializable. The base
+# class handles most of that for us, so the force of this constraint is
+# that all the arguments to the constructors for those classes must be
+# JSON-serializable. Thus, we cannot actually pass a Findit object to
+# the constructor, but rather must pass only the |client_id| (or whatever
+# JSON dict) and then reconstruct the Findit object from that. Moreover,
+# the |run| method and the |finalized| method will be run in different
+# processes, so we will actually end up reconstructing the Findit object
+# twice. Thus, we shouldn't store anything in the pipeline objects outside
+# of what their constructors store.
+
class CrashBasePipeline(BasePipeline):
- def __init__(self, crash_identifiers, client_id):
- super(CrashBasePipeline, self).__init__(crash_identifiers, client_id)
- self.crash_identifiers = crash_identifiers
- self.client_id = client_id
+ def __init__(self, client_id, crash_identifiers):
+ super(CrashBasePipeline, self).__init__(client_id, crash_identifiers)
+ self._crash_identifiers = crash_identifiers
+ self._findit = FinditForClientID(client_id)
+
+ @property
+ def client_id(self): # pragma: no cover
+ return self._findit.client_id
def run(self, *args, **kwargs):
raise NotImplementedError()
class CrashAnalysisPipeline(CrashBasePipeline):
- def _SetErrorIfAborted(self, aborted):
- if not aborted:
- return
-
- logging.error('Aborted analysis for %s', repr(self.crash_identifiers))
- analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers,
- self.client_id)
+ def finalized(self): # pragma: no cover
+ if self.was_aborted:
+ self._PutAbortedError()
+
+ # N.B., this method must be factored out for unittest reasons; since
+ # |finalized| takes no arguments (by AppEngine's spec) and |was_aborted|
+ # can't be altered directly.
+ def _PutAbortedError(self):
+ """Update the ndb.Model to indicate that this pipeline was aborted."""
+ logging.error('Aborted analysis for %s', repr(self._crash_identifiers))
+ analysis = self._findit.GetAnalysis(self._crash_identifiers)
analysis.status = analysis_status.ERROR
analysis.put()
- def finalized(self):
- self._SetErrorIfAborted(self.was_aborted)
-
+ # TODO(http://crbug.com/659346): we misplaced the coverage test; find it!
# Arguments number differs from overridden method - pylint: disable=W0221
- def run(self, crash_identifiers, client_id):
- analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
- client_id)
+ def run(self):
+ # TODO(wrengr): shouldn't this method somehow call _NeedsNewAnalysis
+ # to guard against race conditions?
+ analysis = self._findit.GetAnalysis(self._crash_identifiers)
- # Update analysis status.
+ # Update the model's status to say we're in the process of doing analysis.
analysis.pipeline_status_path = self.pipeline_status_path()
analysis.status = analysis_status.RUNNING
analysis.started_time = time_util.GetUTCNow()
analysis.findit_version = appengine_util.GetCurrentVersion()
analysis.put()
- # Run the analysis.
- result, tags = findit_for_client.FindCulprit(
- analysis, git_repository.GitRepository(
- http_client=HttpClientAppengine()))
+ # Actually do the analysis.
+ result, tags = self._findit.FindCulprit(analysis).ToDicts()
- # Update analysis status and save the analysis result.
+ # Update model's status to say we're done, and save the results.
analysis.completed_time = time_util.GetUTCNow()
analysis.result = result
for tag_name, tag_value in tags.iteritems():
@@ -75,84 +135,48 @@ class CrashAnalysisPipeline(CrashBasePipeline):
class PublishResultPipeline(CrashBasePipeline):
+ # TODO(http://crbug.com/659346): we misplaced the coverage test; find it!
def finalized(self):
if self.was_aborted: # pragma: no cover.
logging.error('Failed to publish %s analysis result for %s',
- repr(self.crash_identifiers), self.client_id)
-
+ repr(self._crash_identifiers), self.client_id)
# Arguments number differs from overridden method - pylint: disable=W0221
- def run(self, crash_identifiers, client_id):
- analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
- client_id)
- result = findit_for_client.GetPublishResultFromAnalysis(analysis,
- crash_identifiers,
- client_id)
+ def run(self):
+ analysis = self._findit.GetAnalysis(self._crash_identifiers)
+ result = analysis.ToPublishableResult(self._crash_identifiers)
messages_data = [json.dumps(result, sort_keys=True)]
- client_config = CrashConfig.Get().GetClientConfig(client_id)
+ # TODO(http://crbug.com/659354): remove Findit's dependency on CrashConfig.
+ client_config = self._findit.config
# TODO(katesonia): Clean string uses in config.
topic = client_config['analysis_result_pubsub_topic']
pubsub_util.PublishMessagesToTopic(messages_data, topic)
- logging.info('Published %s analysis result for %s', client_id,
- repr(crash_identifiers))
+ logging.info('Published %s analysis result for %s', self.client_id,
+ repr(self._crash_identifiers))
class CrashWrapperPipeline(BasePipeline):
+ """Fire off pipelines to (1) do the analysis and (2) publish results.
+
+ The reason we have analysis and publishing as separate pipelines is
+ because each of them can fail for independent reasons. E.g., if we
+ successfully finish the analysis, but then the publishing fails due to
+ network errors, we don't want to have to redo the analysis in order
+ to redo the publishing. We could try to cache the fact that analysis
+ succeeded in the pipeline object itself, but we'd have to be careful
+ because the |run| and |finalized| methods are executed in different
+ processes.
+ """
+ def __init__(self, client_id, crash_identifiers):
+ super(CrashWrapperPipeline, self).__init__(client_id, crash_identifiers)
+ self._crash_identifiers = crash_identifiers
+ self._client_id = client_id
+
+ # TODO(http://crbug.com/659346): write coverage tests.
# Arguments number differs from overridden method - pylint: disable=W0221
- def run(self, crash_identifiers, client_id):
- run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id)
+ def run(self): # pragma: no cover
+ run_analysis = yield CrashAnalysisPipeline(
+ self._client_id, self._crash_identifiers)
with pipeline.After(run_analysis):
- yield PublishResultPipeline(crash_identifiers, client_id)
-
-
-@ndb.transactional
-def _NeedsNewAnalysis(
- crash_identifiers, chrome_version, signature, client_id,
- platform, stack_trace, customized_data):
- analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
- client_id)
- regression_range = findit_for_client.GetRegressionRange(client_id,
- customized_data)
- if (analysis and not analysis.failed and
- regression_range == analysis.regression_range):
- logging.info('The analysis of %s has already been done.',
- repr(crash_identifiers))
- return False
-
- # Create analysis for findit to run if this is not a rerun.
- if not analysis:
- analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers,
- client_id)
-
- findit_for_client.ResetAnalysis(analysis, chrome_version, signature,
- client_id, platform, stack_trace,
- customized_data, regression_range)
- return True
-
-
-def ScheduleNewAnalysisForCrash(
- crash_identifiers, chrome_version, signature, client_id,
- platform, stack_trace, customized_data,
- queue_name=constants.DEFAULT_QUEUE):
- """Schedules an analysis."""
- # Check policy and tune arguments if needed.
- pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient(
- crash_identifiers, chrome_version, signature,
- client_id, platform, stack_trace,
- customized_data)
-
- if not pass_policy:
- return False
-
- if _NeedsNewAnalysis(*updated_analysis_args):
- analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id)
- # Attribute defined outside __init__ - pylint: disable=W0201
- analysis_pipeline.target = appengine_util.GetTargetNameForModule(
- constants.CRASH_BACKEND[client_id])
- analysis_pipeline.start(queue_name=queue_name)
- logging.info('New %s analysis is scheduled for %s', client_id,
- repr(crash_identifiers))
- return True
-
- return False
+ yield PublishResultPipeline(self._client_id, self._crash_identifiers)
« no previous file with comments | « appengine/findit/crash/component_classifier.py ('k') | appengine/findit/crash/crash_report.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698