Chromium Code Reviews| Index: appengine/findit/crash/crash_pipeline.py |
| diff --git a/appengine/findit/crash/crash_pipeline.py b/appengine/findit/crash/crash_pipeline.py |
| index 6aa184692e2b6434388bc6de29f846a1b66963d1..0026cbd353738cb50e2e0e82f1f5a698142c4595 100644 |
| --- a/appengine/findit/crash/crash_pipeline.py |
| +++ b/appengine/findit/crash/crash_pipeline.py |
| @@ -6,60 +6,119 @@ import copy |
| import json |
| import logging |
| -from google.appengine.api import app_identity |
| -from google.appengine.ext import ndb |
| - |
| from common import appengine_util |
| -from common import constants |
| from common import pubsub_util |
| from common import time_util |
| from common.pipeline_wrapper import BasePipeline |
| from common.pipeline_wrapper import pipeline |
| -from crash import findit_for_client |
| +from crash import findit_for_chromecrash |
| +from crash import findit_for_clusterfuzz |
| +from crash.type_enums import CrashClient |
| from model import analysis_status |
| from model.crash.crash_config import CrashConfig |
| +# TODO(wrengr): write complete coverage tests for this. |
| +def FinditForClientID(client_id): # pragma: no cover |
| + """Construct a Findit object from a client id string specifying the class. |
| + |
| + We cannot pass Findit objects to the various methods in |
| + |crash.crash_pipeline|, because they are not JSON serializable. For now, |
| + we just serialize Findit objects as their |client_id|, and then use this |
| + function to reconstruct them. Alas, this means we will lose various |
| + other information stored in the Findit object (i.e., stuff that comes |
| + from CrashConfig); which could lead to some hard-to-diagnose coherency |
| + bugs, since the new Findit object will be based on the CrashConfig at |
| + the time it's constructed, which may be different than the CrashConfig |
| + at the time the previous Findit object was constructed. In the future |
| + we should fix all this to serialize Findit objects in a more robust way. |
| + """ |
| + assert isinstance(client_id, (str, unicode)), ( |
| + 'FinditForClientID: expected string or unicode, but got %s' |
| + % client_id.__class__.__name__) |
| + # TODO(wrengr): it'd be nice to replace this with a single lookup in |
| + # a dict; but that's buggy for some unknown reason. |
| + if client_id == CrashClient.FRACAS: |
| + cls = findit_for_chromecrash.FinditForFracas |
| + elif client_id == CrashClient.CRACAS: |
| + cls = findit_for_chromecrash.FinditForCracas |
| + elif client_id == CrashClient.CLUSTERFUZZ: |
| + cls = findit_for_clusterfuzz.FinditForClusterfuzz |
| + else: |
| + raise ValueError('FinditForClientID: ' |
| + 'unknown or unsupported client %s' % client_id) |
| + |
| + return cls(CrashWrapperPipeline) |
| + |
| + |
| +# Some notes about the classes below, for people who are not |
| +# familiar with AppEngine. The thing that really kicks everything off |
| +# is |CrashWrapperPipeline.run|. However, an important thing to bear in |
| +# mind is that whatever arguments are passed to that method will also |
| +# be passed to the |run| method on whatever objects it yields. Thus, |
| +# all the |run| methods across these different classes must have the same |
| +# type. In practice, we end up passing all the arguments to the |
| +# constructors, because we need to have the fields around for logging |
| +# (e.g., in |finalized|); thus, there's nothing that needs to be passed |
| +# to |run|. Another thing to bear in mind is that whatever objects |
| +# |CrashWrapperPipeline.run| yields must be JSON-serializable. The base |
| +# class handles most of that for us, so the force of this constraint is |
| +# that all the arguments to the constructors for those classes must be |
| +# JSON-serializable. Thus, we cannot actually pass a Findit object to |
| +# the constructor, but rather must pass only the |client_id| (or whatever |
| +# JSON dict) and then reconstruct the Findit object from that. Moreover, |
| +# the |run| method and the |finalized| method will be run in different |
| +# processes, so we will actually end up reconstructing the Findit object |
| +# twice. Thus, we shouldn't store anything in the pipeline objects outside |
| +# of what their constructors store. |
| + |
| class CrashBasePipeline(BasePipeline): |
| - def __init__(self, crash_identifiers, client_id): |
| - super(CrashBasePipeline, self).__init__(crash_identifiers, client_id) |
| - self.crash_identifiers = crash_identifiers |
| - self.client_id = client_id |
| + def __init__(self, client_id, crash_identifiers): |
| + super(CrashBasePipeline, self).__init__(client_id, crash_identifiers) |
| + self._crash_identifiers = crash_identifiers |
| + self._findit = FinditForClientID(client_id) |
| + |
| + @property |
| + def client_id(self): # pragma: no cover |
| + return self._findit.client_id |
| def run(self, *args, **kwargs): |
| raise NotImplementedError() |
| class CrashAnalysisPipeline(CrashBasePipeline): |
| - def _SetErrorIfAborted(self, aborted): |
| - if not aborted: |
| - return |
| - |
| - logging.error('Aborted analysis for %s', repr(self.crash_identifiers)) |
| - analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers, |
| - self.client_id) |
| + def finalized(self): # pragma: no cover |
| + if self.was_aborted: |
| + self._PutAbortedError() |
| + |
| + # N.B., this method must be factored out for unittest reasons; since |
| + # |finalized| takes no arguments (by AppEngine's spec) and |was_aborted| |
| + # can't be altered directly. |
| + def _PutAbortedError(self): |
| + """Update the ndb.Model to indicate that this pipeline was aborted.""" |
| + logging.error('Aborted analysis for %s', repr(self._crash_identifiers)) |
| + analysis = self._findit.GetAnalysis(self._crash_identifiers) |
| analysis.status = analysis_status.ERROR |
| analysis.put() |
| - def finalized(self): |
| - self._SetErrorIfAborted(self.was_aborted) |
| - |
| + # TODO(wrengr): we misplaced the coverage test; find it! |
| # Arguments number differs from overridden method - pylint: disable=W0221 |
| - def run(self, crash_identifiers, client_id): |
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, |
| - client_id) |
| + def run(self): |
| + # TODO(wrengr): shouldn't this method somehow call _NeedsNewAnalysis |
| + # to guard against race conditions? |
| + analysis = self._findit.GetAnalysis(self._crash_identifiers) |
| - # Update analysis status. |
| + # Update the model's status to say we're in the process of doing analysis. |
| analysis.pipeline_status_path = self.pipeline_status_path() |
| analysis.status = analysis_status.RUNNING |
| analysis.started_time = time_util.GetUTCNow() |
| analysis.findit_version = appengine_util.GetCurrentVersion() |
| analysis.put() |
| - # Run the analysis. |
| - result, tags = findit_for_client.FindCulprit(analysis) |
| + # Actually do the analysis. |
| + result, tags = self._findit.FindCulprit(analysis).ToDicts() |
| - # Update analysis status and save the analysis result. |
| + # Update model's status to say we're done, and save the results. |
| analysis.completed_time = time_util.GetUTCNow() |
| analysis.result = result |
| for tag_name, tag_value in tags.iteritems(): |
| @@ -74,81 +133,45 @@ class PublishResultPipeline(CrashBasePipeline): |
| def finalized(self): |
| if self.was_aborted: # pragma: no cover. |
| logging.error('Failed to publish %s analysis result for %s', |
| - repr(self.crash_identifiers), self.client_id) |
| - |
| + repr(self._crash_identifiers), self.client_id) |
| + # TODO(wrengr): we misplaced the coverage test; find it! |
| # Arguments number differs from overridden method - pylint: disable=W0221 |
| - def run(self, crash_identifiers, client_id): |
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, |
| - client_id) |
| - result = findit_for_client.GetPublishResultFromAnalysis(analysis, |
| - crash_identifiers, |
| - client_id) |
| + def run(self): |
| + analysis = self._findit.GetAnalysis(self._crash_identifiers) |
| + result = analysis.ToPublishableResult(self._crash_identifiers) |
| messages_data = [json.dumps(result, sort_keys=True)] |
| - client_config = CrashConfig.Get().GetClientConfig(client_id) |
| + # TODO(wrengr): remove Findit's dependency on CrashConfig. |
| + client_config = self._findit.config |
|
stgao
2016/10/25 18:03:41
IMO, it's OK for the pipeline to depends on the co
wrengr
2016/10/25 19:49:54
My concern is that the config is volatile. Since w
|
| # TODO(katesonia): Clean string uses in config. |
| topic = client_config['analysis_result_pubsub_topic'] |
| pubsub_util.PublishMessagesToTopic(messages_data, topic) |
| - logging.info('Published %s analysis result for %s', client_id, |
| - repr(crash_identifiers)) |
| + logging.info('Published %s analysis result for %s', self.client_id, |
| + repr(self._crash_identifiers)) |
| class CrashWrapperPipeline(BasePipeline): |
| + """Fire off pipelines to (1) do the analysis and (2) publish results. |
| + |
| + The reason we have analysis and publishing as separate pipelines is |
| + because each of them can fail for independent reasons. E.g., if we |
| + successfully finish the analysis, but then the publishing fails due to |
| + network errors, we don't want to have to redo the analysis in order |
| + to redo the publishing. We could try to cache the fact that analysis |
| + succeeded in the pipeline object itself, but we'd have to be careful |
| + because the |run| and |finalized| methods are executed in different |
| + processes. |
| + """ |
| + def __init__(self, client_id, crash_identifiers): |
| + super(CrashWrapperPipeline, self).__init__(client_id, crash_identifiers) |
| + self._crash_identifiers = crash_identifiers |
| + self._client_id = client_id |
| + |
| + # TODO(wrengr): we misplaced the coverage test; find it! |
| # Arguments number differs from overridden method - pylint: disable=W0221 |
| - def run(self, crash_identifiers, client_id): |
| - run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id) |
| + def run(self): |
| + run_analysis = yield CrashAnalysisPipeline( |
| + self._client_id, self._crash_identifiers) |
| with pipeline.After(run_analysis): |
| - yield PublishResultPipeline(crash_identifiers, client_id) |
| - |
| - |
| -@ndb.transactional |
| -def _NeedsNewAnalysis( |
| - crash_identifiers, chrome_version, signature, client_id, |
| - platform, stack_trace, customized_data): |
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, |
| - client_id) |
| - regression_range = findit_for_client.GetRegressionRange(client_id, |
| - customized_data) |
| - if (analysis and not analysis.failed and |
| - regression_range == analysis.regression_range): |
| - logging.info('The analysis of %s has already been done.', |
| - repr(crash_identifiers)) |
| - return False |
| - |
| - # Create analysis for findit to run if this is not a rerun. |
| - if not analysis: |
| - analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers, |
| - client_id) |
| - |
| - findit_for_client.ResetAnalysis(analysis, chrome_version, signature, |
| - client_id, platform, stack_trace, |
| - customized_data, regression_range) |
| - return True |
| - |
| - |
| -def ScheduleNewAnalysisForCrash( |
| - crash_identifiers, chrome_version, signature, client_id, |
| - platform, stack_trace, customized_data, |
| - queue_name=constants.DEFAULT_QUEUE): |
| - """Schedules an analysis.""" |
| - # Check policy and tune arguments if needed. |
| - pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient( |
| - crash_identifiers, chrome_version, signature, |
| - client_id, platform, stack_trace, |
| - customized_data) |
| - |
| - if not pass_policy: |
| - return False |
| - |
| - if _NeedsNewAnalysis(*updated_analysis_args): |
| - analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id) |
| - # Attribute defined outside __init__ - pylint: disable=W0201 |
| - analysis_pipeline.target = appengine_util.GetTargetNameForModule( |
| - constants.CRASH_BACKEND[client_id]) |
| - analysis_pipeline.start(queue_name=queue_name) |
| - logging.info('New %s analysis is scheduled for %s', client_id, |
| - repr(crash_identifiers)) |
| - return True |
| - |
| - return False |
| + yield PublishResultPipeline(self._client_id, self._crash_identifiers) |