Chromium Code Reviews| Index: appengine/findit/crash/crash_pipeline.py |
| diff --git a/appengine/findit/crash/crash_pipeline.py b/appengine/findit/crash/crash_pipeline.py |
| index 6aa184692e2b6434388bc6de29f846a1b66963d1..c09d84aa98c2185088ee2dc47d977a802a1c7fac 100644 |
| --- a/appengine/findit/crash/crash_pipeline.py |
| +++ b/appengine/findit/crash/crash_pipeline.py |
| @@ -6,48 +6,60 @@ import copy |
| import json |
| import logging |
| -from google.appengine.api import app_identity |
| -from google.appengine.ext import ndb |
| - |
| from common import appengine_util |
| -from common import constants |
| from common import pubsub_util |
| from common import time_util |
| from common.pipeline_wrapper import BasePipeline |
| from common.pipeline_wrapper import pipeline |
| -from crash import findit_for_client |
| from model import analysis_status |
| from model.crash.crash_config import CrashConfig |
| class CrashBasePipeline(BasePipeline): |
| - def __init__(self, crash_identifiers, client_id): |
| - super(CrashBasePipeline, self).__init__(crash_identifiers, client_id) |
| - self.crash_identifiers = crash_identifiers |
| - self.client_id = client_id |
| - |
| + def __init__(self, findit_client, crash_identifiers): |
| + super(CrashBasePipeline, self).__init__( |
| + crash_identifiers, findit_client.client_id) |
| + |
| + # TODO(wrengr): why do we take crash_identifiers as an argument and |
| + # cache them, when they'll be passed in to every call to run? |
| + self._crash_identifiers = crash_identifiers |
| + self._findit = findit_client |
| + |
| + @property |
| + def client_id(self): |
| + return self._findit.client_id |
| + |
| + # This function is specified by the AppEngine pipeline API. So we can't |
| + # change the name nor restrict the arguments (even though subclasses |
| + # will restrict the arguments). |
| def run(self, *args, **kwargs): |
| raise NotImplementedError() |
| class CrashAnalysisPipeline(CrashBasePipeline): |
| - def _SetErrorIfAborted(self, aborted): |
| - if not aborted: |
| - return |
| - |
| - logging.error('Aborted analysis for %s', repr(self.crash_identifiers)) |
| - analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers, |
| - self.client_id) |
| + def finalized(self): |
| + if self.was_aborted: |
| + self._PutAbortedError() |
| + |
| + # N.B., this method must be factored out for unittest reasons; since |
| + # |was_aborted| can't be altered directly. |
| + def _PutAbortedError(self): |
| + """Update the ndb.Model to indicate that this pipeline was aborted.""" |
| + logging.error('Aborted analysis for %s', repr(self._crash_identifiers)) |
| + analysis = self._findit.GetAnalysis(self._crash_identifiers) |
| analysis.status = analysis_status.ERROR |
| analysis.put() |
| - def finalized(self): |
| - self._SetErrorIfAborted(self.was_aborted) |
| - |
| # Arguments number differs from overridden method - pylint: disable=W0221 |
| - def run(self, crash_identifiers, client_id): |
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, |
| - client_id) |
| + def run(self, crash_identifiers): |
| + # In practice, the |crash_identifiers| this method gets will be |
| + # identical to the ones passed to __init__; and yet they must still be |
| + # passed for some reason having to do with AppEngine API. Just to make |
| + # certain that our logging is consistent (in the face of mocking etc), |
| + # we reset the cached copy anyways. |
| + self._crash_identifiers = crash_identifiers |
| + |
| + analysis = self._findit.GetAnalysis(crash_identifiers) |
| # Update analysis status. |
| analysis.pipeline_status_path = self.pipeline_status_path() |
| @@ -57,7 +69,7 @@ class CrashAnalysisPipeline(CrashBasePipeline): |
| analysis.put() |
| # Run the analysis. |
| - result, tags = findit_for_client.FindCulprit(analysis) |
| + result, tags = self._findit.FindCulprit(analysis).ToDicts() |
| # Update analysis status and save the analysis result. |
| analysis.completed_time = time_util.GetUTCNow() |
| @@ -74,81 +86,37 @@ class PublishResultPipeline(CrashBasePipeline): |
| def finalized(self): |
| if self.was_aborted: # pragma: no cover. |
| logging.error('Failed to publish %s analysis result for %s', |
| - repr(self.crash_identifiers), self.client_id) |
| - |
| + repr(self._crash_identifiers), self.client_id) |
| # Arguments number differs from overridden method - pylint: disable=W0221 |
| - def run(self, crash_identifiers, client_id): |
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, |
| - client_id) |
| - result = findit_for_client.GetPublishResultFromAnalysis(analysis, |
| - crash_identifiers, |
| - client_id) |
| + def run(self, crash_identifiers): |
| + # In practice, the |crash_identifiers| this method gets will be |
| + # identical to the ones passed to __init__; and yet they must still be |
| + # passed for some reason having to do with AppEngine API. Just to make |
| + # certain that our logging is consistent (in the face of mocking etc), |
| + # we reset the cached copy anyways. |
| + self._crash_identifiers = crash_identifiers |
| + |
| + analysis = self._findit.GetAnalysis(crash_identifiers) |
| + result = analysis.GetPublishableResult(crash_identifiers) |
| messages_data = [json.dumps(result, sort_keys=True)] |
| - client_config = CrashConfig.Get().GetClientConfig(client_id) |
| + client_config = self._findit.config |
| # TODO(katesonia): Clean string uses in config. |
| topic = client_config['analysis_result_pubsub_topic'] |
| pubsub_util.PublishMessagesToTopic(messages_data, topic) |
| - logging.info('Published %s analysis result for %s', client_id, |
| + logging.info('Published %s analysis result for %s', self.client_id, |
| repr(crash_identifiers)) |
| +# TODO(wrengr): why isn't this a subclass of CrashBasePipeline? That'd |
| +# make it more consistent about caching the findit_client in __init__ |
| class CrashWrapperPipeline(BasePipeline): |
| # Arguments number differs from overridden method - pylint: disable=W0221 |
| - def run(self, crash_identifiers, client_id): |
| - run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id) |
| + def run(self, findit_client, crash_identifiers): |
| + # Whatever coroutine consumes what this pipleline |yield|s is the |
| + # one that double-passes the |crash_identifiers| and will throw an |
| + # error if |run| doesn't accept arguments. |
| + run_analysis = yield CrashAnalysisPipeline(findit_client, crash_identifiers) |
|
stgao
2016/10/19 18:20:53
And same here for the sub-pipelines, `findit_clien
wrengr
2016/10/24 18:36:14
Done.
|
| with pipeline.After(run_analysis): |
| - yield PublishResultPipeline(crash_identifiers, client_id) |
| - |
| - |
| -@ndb.transactional |
| -def _NeedsNewAnalysis( |
| - crash_identifiers, chrome_version, signature, client_id, |
| - platform, stack_trace, customized_data): |
| - analysis = findit_for_client.GetAnalysisForClient(crash_identifiers, |
| - client_id) |
| - regression_range = findit_for_client.GetRegressionRange(client_id, |
| - customized_data) |
| - if (analysis and not analysis.failed and |
| - regression_range == analysis.regression_range): |
| - logging.info('The analysis of %s has already been done.', |
| - repr(crash_identifiers)) |
| - return False |
| - |
| - # Create analysis for findit to run if this is not a rerun. |
| - if not analysis: |
| - analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers, |
| - client_id) |
| - |
| - findit_for_client.ResetAnalysis(analysis, chrome_version, signature, |
| - client_id, platform, stack_trace, |
| - customized_data, regression_range) |
| - return True |
| - |
| - |
| -def ScheduleNewAnalysisForCrash( |
| - crash_identifiers, chrome_version, signature, client_id, |
| - platform, stack_trace, customized_data, |
| - queue_name=constants.DEFAULT_QUEUE): |
| - """Schedules an analysis.""" |
| - # Check policy and tune arguments if needed. |
| - pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient( |
| - crash_identifiers, chrome_version, signature, |
| - client_id, platform, stack_trace, |
| - customized_data) |
| - |
| - if not pass_policy: |
| - return False |
| - |
| - if _NeedsNewAnalysis(*updated_analysis_args): |
| - analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id) |
| - # Attribute defined outside __init__ - pylint: disable=W0201 |
| - analysis_pipeline.target = appengine_util.GetTargetNameForModule( |
| - constants.CRASH_BACKEND[client_id]) |
| - analysis_pipeline.start(queue_name=queue_name) |
| - logging.info('New %s analysis is scheduled for %s', client_id, |
| - repr(crash_identifiers)) |
| - return True |
| - |
| - return False |
| + yield PublishResultPipeline(findit_client, crash_identifiers) |