| Index: appengine/findit/crash/crash_pipeline.py
|
| diff --git a/appengine/findit/crash/crash_pipeline.py b/appengine/findit/crash/crash_pipeline.py
|
| index 6aa184692e2b6434388bc6de29f846a1b66963d1..c09d84aa98c2185088ee2dc47d977a802a1c7fac 100644
|
| --- a/appengine/findit/crash/crash_pipeline.py
|
| +++ b/appengine/findit/crash/crash_pipeline.py
|
| @@ -6,48 +6,60 @@ import copy
|
| import json
|
| import logging
|
|
|
| -from google.appengine.api import app_identity
|
| -from google.appengine.ext import ndb
|
| -
|
| from common import appengine_util
|
| -from common import constants
|
| from common import pubsub_util
|
| from common import time_util
|
| from common.pipeline_wrapper import BasePipeline
|
| from common.pipeline_wrapper import pipeline
|
| -from crash import findit_for_client
|
| from model import analysis_status
|
| from model.crash.crash_config import CrashConfig
|
|
|
|
|
| class CrashBasePipeline(BasePipeline):
|
| - def __init__(self, crash_identifiers, client_id):
|
| - super(CrashBasePipeline, self).__init__(crash_identifiers, client_id)
|
| - self.crash_identifiers = crash_identifiers
|
| - self.client_id = client_id
|
| -
|
| + def __init__(self, findit_client, crash_identifiers):
|
| + super(CrashBasePipeline, self).__init__(
|
| + crash_identifiers, findit_client.client_id)
|
| +
|
| + # TODO(wrengr): why do we take crash_identifiers as an argument and
|
| + # cache them, when they'll be passed in to every call to run?
|
| + self._crash_identifiers = crash_identifiers
|
| + self._findit = findit_client
|
| +
|
| + @property
|
| + def client_id(self):
|
| + return self._findit.client_id
|
| +
|
| + # This function is specified by the AppEngine pipeline API. So we can't
|
| + # change the name nor restrict the arguments (even though subclasses
|
| + # will restrict the arguments).
|
| def run(self, *args, **kwargs):
|
| raise NotImplementedError()
|
|
|
|
|
| class CrashAnalysisPipeline(CrashBasePipeline):
|
| - def _SetErrorIfAborted(self, aborted):
|
| - if not aborted:
|
| - return
|
| -
|
| - logging.error('Aborted analysis for %s', repr(self.crash_identifiers))
|
| - analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers,
|
| - self.client_id)
|
| + def finalized(self):
|
| + if self.was_aborted:
|
| + self._PutAbortedError()
|
| +
|
| + # N.B., this method must be factored out for unittest reasons; since
|
| + # |was_aborted| can't be altered directly.
|
| + def _PutAbortedError(self):
|
| + """Update the ndb.Model to indicate that this pipeline was aborted."""
|
| + logging.error('Aborted analysis for %s', repr(self._crash_identifiers))
|
| + analysis = self._findit.GetAnalysis(self._crash_identifiers)
|
| analysis.status = analysis_status.ERROR
|
| analysis.put()
|
|
|
| - def finalized(self):
|
| - self._SetErrorIfAborted(self.was_aborted)
|
| -
|
| # Arguments number differs from overridden method - pylint: disable=W0221
|
| - def run(self, crash_identifiers, client_id):
|
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
|
| - client_id)
|
| + def run(self, crash_identifiers):
|
| + # In practice, the |crash_identifiers| this method gets will be
|
| + # identical to the ones passed to __init__; and yet they must still be
|
| + # passed for some reason having to do with AppEngine API. Just to make
|
| + # certain that our logging is consistent (in the face of mocking etc),
|
| + # we reset the cached copy anyways.
|
| + self._crash_identifiers = crash_identifiers
|
| +
|
| + analysis = self._findit.GetAnalysis(crash_identifiers)
|
|
|
| # Update analysis status.
|
| analysis.pipeline_status_path = self.pipeline_status_path()
|
| @@ -57,7 +69,7 @@ class CrashAnalysisPipeline(CrashBasePipeline):
|
| analysis.put()
|
|
|
| # Run the analysis.
|
| - result, tags = findit_for_client.FindCulprit(analysis)
|
| + result, tags = self._findit.FindCulprit(analysis).ToDicts()
|
|
|
| # Update analysis status and save the analysis result.
|
| analysis.completed_time = time_util.GetUTCNow()
|
| @@ -74,81 +86,37 @@ class PublishResultPipeline(CrashBasePipeline):
|
| def finalized(self):
|
| if self.was_aborted: # pragma: no cover.
|
| logging.error('Failed to publish %s analysis result for %s',
|
| - repr(self.crash_identifiers), self.client_id)
|
| -
|
| + repr(self._crash_identifiers), self.client_id)
|
|
|
| # Arguments number differs from overridden method - pylint: disable=W0221
|
| - def run(self, crash_identifiers, client_id):
|
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
|
| - client_id)
|
| - result = findit_for_client.GetPublishResultFromAnalysis(analysis,
|
| - crash_identifiers,
|
| - client_id)
|
| + def run(self, crash_identifiers):
|
| + # In practice, the |crash_identifiers| this method gets will be
|
| + # identical to the ones passed to __init__; and yet they must still be
|
| + # passed for some reason having to do with AppEngine API. Just to make
|
| + # certain that our logging is consistent (in the face of mocking etc),
|
| + # we reset the cached copy anyways.
|
| + self._crash_identifiers = crash_identifiers
|
| +
|
| + analysis = self._findit.GetAnalysis(crash_identifiers)
|
| + result = analysis.GetPublishableResult(crash_identifiers)
|
| messages_data = [json.dumps(result, sort_keys=True)]
|
|
|
| - client_config = CrashConfig.Get().GetClientConfig(client_id)
|
| + client_config = self._findit.config
|
| # TODO(katesonia): Clean string uses in config.
|
| topic = client_config['analysis_result_pubsub_topic']
|
| pubsub_util.PublishMessagesToTopic(messages_data, topic)
|
| - logging.info('Published %s analysis result for %s', client_id,
|
| + logging.info('Published %s analysis result for %s', self.client_id,
|
| repr(crash_identifiers))
|
|
|
|
|
| +# TODO(wrengr): why isn't this a subclass of CrashBasePipeline? That'd
|
| +# make it more consistent about caching the findit_client in __init__
|
| class CrashWrapperPipeline(BasePipeline):
|
| # Arguments number differs from overridden method - pylint: disable=W0221
|
| - def run(self, crash_identifiers, client_id):
|
| - run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id)
|
| + def run(self, findit_client, crash_identifiers):
|
| + # Whatever coroutine consumes what this pipleline |yield|s is the
|
| + # one that double-passes the |crash_identifiers| and will throw an
|
| + # error if |run| doesn't accept arguments.
|
| + run_analysis = yield CrashAnalysisPipeline(findit_client, crash_identifiers)
|
| with pipeline.After(run_analysis):
|
| - yield PublishResultPipeline(crash_identifiers, client_id)
|
| -
|
| -
|
| -@ndb.transactional
|
| -def _NeedsNewAnalysis(
|
| - crash_identifiers, chrome_version, signature, client_id,
|
| - platform, stack_trace, customized_data):
|
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
|
| - client_id)
|
| - regression_range = findit_for_client.GetRegressionRange(client_id,
|
| - customized_data)
|
| - if (analysis and not analysis.failed and
|
| - regression_range == analysis.regression_range):
|
| - logging.info('The analysis of %s has already been done.',
|
| - repr(crash_identifiers))
|
| - return False
|
| -
|
| - # Create analysis for findit to run if this is not a rerun.
|
| - if not analysis:
|
| - analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers,
|
| - client_id)
|
| -
|
| - findit_for_client.ResetAnalysis(analysis, chrome_version, signature,
|
| - client_id, platform, stack_trace,
|
| - customized_data, regression_range)
|
| - return True
|
| -
|
| -
|
| -def ScheduleNewAnalysisForCrash(
|
| - crash_identifiers, chrome_version, signature, client_id,
|
| - platform, stack_trace, customized_data,
|
| - queue_name=constants.DEFAULT_QUEUE):
|
| - """Schedules an analysis."""
|
| - # Check policy and tune arguments if needed.
|
| - pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient(
|
| - crash_identifiers, chrome_version, signature,
|
| - client_id, platform, stack_trace,
|
| - customized_data)
|
| -
|
| - if not pass_policy:
|
| - return False
|
| -
|
| - if _NeedsNewAnalysis(*updated_analysis_args):
|
| - analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id)
|
| - # Attribute defined outside __init__ - pylint: disable=W0201
|
| - analysis_pipeline.target = appengine_util.GetTargetNameForModule(
|
| - constants.CRASH_BACKEND[client_id])
|
| - analysis_pipeline.start(queue_name=queue_name)
|
| - logging.info('New %s analysis is scheduled for %s', client_id,
|
| - repr(crash_identifiers))
|
| - return True
|
| -
|
| - return False
|
| + yield PublishResultPipeline(findit_client, crash_identifiers)
|
|
|