Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(685)

Unified Diff: appengine/findit/crash/crash_pipeline.py

Issue 2414523002: [Findit] Reorganizing findit_for_*.py (Closed)
Patch Set: Fixing call to ScheduleNewAnalysis in handlers/crash/crash_handler.py Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/findit/crash/crash_pipeline.py
diff --git a/appengine/findit/crash/crash_pipeline.py b/appengine/findit/crash/crash_pipeline.py
index 6aa184692e2b6434388bc6de29f846a1b66963d1..c09d84aa98c2185088ee2dc47d977a802a1c7fac 100644
--- a/appengine/findit/crash/crash_pipeline.py
+++ b/appengine/findit/crash/crash_pipeline.py
@@ -6,48 +6,60 @@ import copy
import json
import logging
-from google.appengine.api import app_identity
-from google.appengine.ext import ndb
-
from common import appengine_util
-from common import constants
from common import pubsub_util
from common import time_util
from common.pipeline_wrapper import BasePipeline
from common.pipeline_wrapper import pipeline
-from crash import findit_for_client
from model import analysis_status
from model.crash.crash_config import CrashConfig
class CrashBasePipeline(BasePipeline):
- def __init__(self, crash_identifiers, client_id):
- super(CrashBasePipeline, self).__init__(crash_identifiers, client_id)
- self.crash_identifiers = crash_identifiers
- self.client_id = client_id
-
+ def __init__(self, findit_client, crash_identifiers):
+ super(CrashBasePipeline, self).__init__(
+ crash_identifiers, findit_client.client_id)
+
+ # TODO(wrengr): why do we take crash_identifiers as an argument and
+ # cache them, when they'll be passed in to every call to run?
+ self._crash_identifiers = crash_identifiers
+ self._findit = findit_client
+
+ @property
+ def client_id(self):
+ return self._findit.client_id
+
+ # This function is specified by the AppEngine pipeline API. So we can't
+ # change the name nor restrict the arguments (even though subclasses
+ # will restrict the arguments).
def run(self, *args, **kwargs):
raise NotImplementedError()
class CrashAnalysisPipeline(CrashBasePipeline):
- def _SetErrorIfAborted(self, aborted):
- if not aborted:
- return
-
- logging.error('Aborted analysis for %s', repr(self.crash_identifiers))
- analysis = findit_for_client.GetAnalysisForClient(self.crash_identifiers,
- self.client_id)
+ def finalized(self):
+ if self.was_aborted:
+ self._PutAbortedError()
+
+ # N.B., this method must be factored out for unittest reasons; since
+ # |was_aborted| can't be altered directly.
+ def _PutAbortedError(self):
+ """Update the ndb.Model to indicate that this pipeline was aborted."""
+ logging.error('Aborted analysis for %s', repr(self._crash_identifiers))
+ analysis = self._findit.GetAnalysis(self._crash_identifiers)
analysis.status = analysis_status.ERROR
analysis.put()
- def finalized(self):
- self._SetErrorIfAborted(self.was_aborted)
-
# Arguments number differs from overridden method - pylint: disable=W0221
- def run(self, crash_identifiers, client_id):
- analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
- client_id)
+ def run(self, crash_identifiers):
+ # In practice, the |crash_identifiers| this method gets will be
+ # identical to the ones passed to __init__; and yet they must still be
+ # passed for some reason having to do with AppEngine API. Just to make
+ # certain that our logging is consistent (in the face of mocking etc),
+ # we reset the cached copy anyways.
+ self._crash_identifiers = crash_identifiers
+
+ analysis = self._findit.GetAnalysis(crash_identifiers)
# Update analysis status.
analysis.pipeline_status_path = self.pipeline_status_path()
@@ -57,7 +69,7 @@ class CrashAnalysisPipeline(CrashBasePipeline):
analysis.put()
# Run the analysis.
- result, tags = findit_for_client.FindCulprit(analysis)
+ result, tags = self._findit.FindCulprit(analysis).ToDicts()
# Update analysis status and save the analysis result.
analysis.completed_time = time_util.GetUTCNow()
@@ -74,81 +86,37 @@ class PublishResultPipeline(CrashBasePipeline):
def finalized(self):
if self.was_aborted: # pragma: no cover.
logging.error('Failed to publish %s analysis result for %s',
- repr(self.crash_identifiers), self.client_id)
-
+ repr(self._crash_identifiers), self.client_id)
# Arguments number differs from overridden method - pylint: disable=W0221
- def run(self, crash_identifiers, client_id):
- analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
- client_id)
- result = findit_for_client.GetPublishResultFromAnalysis(analysis,
- crash_identifiers,
- client_id)
+ def run(self, crash_identifiers):
+ # In practice, the |crash_identifiers| this method gets will be
+ # identical to the ones passed to __init__; and yet they must still be
+ # passed for some reason having to do with AppEngine API. Just to make
+ # certain that our logging is consistent (in the face of mocking etc),
+ # we reset the cached copy anyways.
+ self._crash_identifiers = crash_identifiers
+
+ analysis = self._findit.GetAnalysis(crash_identifiers)
+ result = analysis.GetPublishableResult(crash_identifiers)
messages_data = [json.dumps(result, sort_keys=True)]
- client_config = CrashConfig.Get().GetClientConfig(client_id)
+ client_config = self._findit.config
# TODO(katesonia): Clean string uses in config.
topic = client_config['analysis_result_pubsub_topic']
pubsub_util.PublishMessagesToTopic(messages_data, topic)
- logging.info('Published %s analysis result for %s', client_id,
+ logging.info('Published %s analysis result for %s', self.client_id,
repr(crash_identifiers))
+# TODO(wrengr): why isn't this a subclass of CrashBasePipeline? That'd
+# make it more consistent about caching the findit_client in __init__
class CrashWrapperPipeline(BasePipeline):
# Arguments number differs from overridden method - pylint: disable=W0221
- def run(self, crash_identifiers, client_id):
- run_analysis = yield CrashAnalysisPipeline(crash_identifiers, client_id)
+ def run(self, findit_client, crash_identifiers):
+ # Whatever coroutine consumes what this pipleline |yield|s is the
+ # one that double-passes the |crash_identifiers| and will throw an
+ # error if |run| doesn't accept arguments.
+ run_analysis = yield CrashAnalysisPipeline(findit_client, crash_identifiers)
with pipeline.After(run_analysis):
- yield PublishResultPipeline(crash_identifiers, client_id)
-
-
-@ndb.transactional
-def _NeedsNewAnalysis(
- crash_identifiers, chrome_version, signature, client_id,
- platform, stack_trace, customized_data):
- analysis = findit_for_client.GetAnalysisForClient(crash_identifiers,
- client_id)
- regression_range = findit_for_client.GetRegressionRange(client_id,
- customized_data)
- if (analysis and not analysis.failed and
- regression_range == analysis.regression_range):
- logging.info('The analysis of %s has already been done.',
- repr(crash_identifiers))
- return False
-
- # Create analysis for findit to run if this is not a rerun.
- if not analysis:
- analysis = findit_for_client.CreateAnalysisForClient(crash_identifiers,
- client_id)
-
- findit_for_client.ResetAnalysis(analysis, chrome_version, signature,
- client_id, platform, stack_trace,
- customized_data, regression_range)
- return True
-
-
-def ScheduleNewAnalysisForCrash(
- crash_identifiers, chrome_version, signature, client_id,
- platform, stack_trace, customized_data,
- queue_name=constants.DEFAULT_QUEUE):
- """Schedules an analysis."""
- # Check policy and tune arguments if needed.
- pass_policy, updated_analysis_args = findit_for_client.CheckPolicyForClient(
- crash_identifiers, chrome_version, signature,
- client_id, platform, stack_trace,
- customized_data)
-
- if not pass_policy:
- return False
-
- if _NeedsNewAnalysis(*updated_analysis_args):
- analysis_pipeline = CrashWrapperPipeline(crash_identifiers, client_id)
- # Attribute defined outside __init__ - pylint: disable=W0201
- analysis_pipeline.target = appengine_util.GetTargetNameForModule(
- constants.CRASH_BACKEND[client_id])
- analysis_pipeline.start(queue_name=queue_name)
- logging.info('New %s analysis is scheduled for %s', client_id,
- repr(crash_identifiers))
- return True
-
- return False
+ yield PublishResultPipeline(findit_client, crash_identifiers)

Powered by Google App Engine
This is Rietveld 408576698