Index: appengine/monorail/services/spam_svc.py |
diff --git a/appengine/monorail/services/spam_svc.py b/appengine/monorail/services/spam_svc.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..2823f130251ad3c544d9e20426531ec9373b5524 |
--- /dev/null |
+++ b/appengine/monorail/services/spam_svc.py |
@@ -0,0 +1,391 @@ |
+# Copyright 2016 The Chromium Authors. All rights reserved. |
+# Use of this source code is govered by a BSD-style |
+# license that can be found in the LICENSE file or at |
+# https://developers.google.com/open-source/licenses/bsd |
+ |
+""" Set of functions for detaling with spam reports. |
+""" |
+ |
+import collections |
+import httplib2 |
+import logging |
+import settings |
+import sys |
+import settings |
+ |
+from features import filterrules_helpers |
+from framework import sql |
+from infra_libs import ts_mon |
+ |
+from apiclient.discovery import build |
+from oauth2client.client import GoogleCredentials |
+from apiclient.errors import Error as ApiClientError |
+from oauth2client.client import Error as Oauth2ClientError |
+ |
+SPAMREPORT_TABLE_NAME = 'SpamReport' |
+SPAMVERDICT_TABLE_NAME = 'SpamVerdict' |
+ISSUE_TABLE = 'Issue' |
+ |
+REASON_MANUAL = 'manual' |
+REASON_THRESHOLD = 'threshold' |
+REASON_CLASSIFIER = 'classifier' |
+ |
+SPAMREPORT_COLS = ['issue_id', 'reported_user_id', 'user_id'] |
+MANUALVERDICT_COLS = ['user_id', 'issue_id', 'is_spam', 'reason', 'project_id'] |
+THRESHVERDICT_COLS = ['issue_id', 'is_spam', 'reason', 'project_id'] |
+ |
+ |
+class SpamService(object): |
+ """The persistence layer for spam reports.""" |
+ issue_actions = ts_mon.CounterMetric('monorail/spam_svc/issue') |
+ comment_actions = ts_mon.CounterMetric('monorail/spam_svc/comment') |
+ |
+ def __init__(self): |
+ self.report_tbl = sql.SQLTableManager(SPAMREPORT_TABLE_NAME) |
+ self.verdict_tbl = sql.SQLTableManager(SPAMVERDICT_TABLE_NAME) |
+ self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE) |
+ |
+ self.prediction_service = None |
+ try: |
+ credentials = GoogleCredentials.get_application_default() |
+ self.prediction_service = build('prediction', 'v1.6', |
+ http=httplib2.Http(), |
+ credentials=credentials) |
+ except (Oauth2ClientError, ApiClientError): |
+ logging.error("Error getting GoogleCredentials: %s" % sys.exc_info()[0]) |
+ |
+ def LookupFlaggers(self, cnxn, issue_id): |
+ """Returns users who've reported the issue or its comments as spam. |
+ |
+ Returns a tuple. First element is a list of users who flagged the issue; |
+ second element is a dictionary of comment id to a list of users who flagged |
+ that comment. |
+ """ |
+ rows = self.report_tbl.Select( |
+ cnxn, cols=['user_id', 'comment_id'], |
+ issue_id=issue_id) |
+ |
+ issue_reporters = [] |
+ comment_reporters = collections.defaultdict(list) |
+ for row in rows: |
+ if row[1]: |
+ comment_reporters[row[1]].append(row[0]) |
+ else: |
+ issue_reporters.append(row[0]) |
+ |
+ return issue_reporters, comment_reporters |
+ |
+ def LookUpFlagCounts(self, cnxn, issue_ids): |
+ """Returns a map of issue_id to flag counts""" |
+ rows = self.report_tbl.Select(cnxn, cols=['issue_id', 'COUNT(*)'], |
+ issue_id=issue_ids, group_by=['issue_id']) |
+ counts = {} |
+ for row in rows: |
+ counts[long(row[0])] = row[1] |
+ return counts |
+ |
+ def LookUpIssueVerdicts(self, cnxn, issue_ids): |
+ """Returns a map of issue_id to most recent spam verdicts""" |
+ rows = self.verdict_tbl.Select(cnxn, |
+ cols=['issue_id', 'reason', 'MAX(created)'], |
+ issue_id=issue_ids, group_by=['issue_id']) |
+ counts = {} |
+ for row in rows: |
+ counts[long(row[0])] = row[1] |
+ return counts |
+ |
+ def LookUpIssueVerdictHistory(self, cnxn, issue_ids): |
+ """Returns a map of issue_id to most recent spam verdicts""" |
+ rows = self.verdict_tbl.Select(cnxn, cols=[ |
+ 'issue_id', 'reason', 'created', 'is_spam', 'classifier_confidence', |
+ 'user_id', 'overruled'], |
+ issue_id=issue_ids, order_by=[('issue_id', []), ('created', [])]) |
+ |
+ # TODO: group by issue_id, make class instead of dict for verdict. |
+ verdicts = [] |
+ for row in rows: |
+ verdicts.append({ |
+ 'issue_id': row[0], |
+ 'reason': row[1], |
+ 'created': row[2], |
+ 'is_spam': row[3], |
+ 'classifier_confidence': row[4], |
+ 'user_id': row[5], |
+ 'overruled': row[6], |
+ }) |
+ |
+ return verdicts |
+ |
+ def FlagIssues(self, cnxn, issue_service, issues, reporting_user_id, |
+ flagged_spam): |
+ """Creates or deletes a spam report on an issue.""" |
+ verdict_updates = [] |
+ if flagged_spam: |
+ rows = [(issue.issue_id, issue.reporter_id, reporting_user_id) |
+ for issue in issues] |
+ self.report_tbl.InsertRows(cnxn, SPAMREPORT_COLS, rows, ignore=True) |
+ else: |
+ issue_ids = [issue.issue_id for issue in issues] |
+ self.report_tbl.Delete( |
+ cnxn, issue_id=issue_ids, user_id=reporting_user_id, |
+ comment_id=None) |
+ |
+ project_id = issues[0].project_id |
+ |
+ # Now record new verdicts and update issue.is_spam, if they've changed. |
+ ids = [issue.issue_id for issue in issues] |
+ counts = self.LookUpFlagCounts(cnxn, ids) |
+ previous_verdicts = self.LookUpIssueVerdicts(cnxn, ids) |
+ |
+ for issue_id in counts: |
+ # If the flag counts changed enough to toggle the is_spam bit, need to |
+ # record a new verdict and update the Issue. |
+ if ((flagged_spam and counts[issue_id] >= settings.spam_flag_thresh or |
+ not flagged_spam and counts[issue_id] < settings.spam_flag_thresh) and |
+ (previous_verdicts[issue_id] != REASON_MANUAL if issue_id in |
+ previous_verdicts else True)): |
+ verdict_updates.append(issue_id) |
+ |
+ if len(verdict_updates) == 0: |
+ return |
+ |
+ # Some of the issues may have exceed the flag threshold, so issue verdicts |
+ # and mark as spam in those cases. |
+ rows = [(issue_id, flagged_spam, REASON_THRESHOLD, project_id) |
+ for issue_id in verdict_updates] |
+ self.verdict_tbl.InsertRows(cnxn, THRESHVERDICT_COLS, rows, ignore=True) |
+ update_issues = [] |
+ for issue in issues: |
+ if issue.issue_id in verdict_updates: |
+ issue.is_spam = flagged_spam |
+ update_issues.append(issue) |
+ |
+ if flagged_spam: |
+ self.issue_actions.increment_by(len(update_issues), {'type': 'flag'}) |
+ |
+ issue_service.UpdateIssues(cnxn, update_issues, update_cols=['is_spam']) |
+ |
+ def FlagComment(self, cnxn, issue_id, comment_id, reported_user_id, |
+ reporting_user_id, flagged_spam): |
+ """Creates or deletes a spam report on a comment.""" |
+ # TODO(seanmccullough): Bulk comment flagging? There's no UI for that. |
+ if flagged_spam: |
+ self.report_tbl.InsertRow( |
+ cnxn, ignore=True, issue_id=issue_id, |
+ comment_id=comment_id, reported_user_id=reported_user_id, |
+ user_id=reporting_user_id) |
+ self.comment_actions.increment({'type': 'flag'}) |
+ else: |
+ self.report_tbl.Delete( |
+ cnxn, issue_id=issue_id, comment_id=comment_id, |
+ user_id=reporting_user_id) |
+ |
+ def RecordClassifierIssueVerdict(self, cnxn, issue, is_spam, confidence): |
+ self.verdict_tbl.InsertRow(cnxn, issue_id=issue.issue_id, is_spam=is_spam, |
+ reason=REASON_CLASSIFIER, classifier_confidence=confidence) |
+ if is_spam: |
+ self.issue_actions.increment({'type': 'classifier'}) |
+ # This is called at issue creation time, so there's nothing else to do here. |
+ |
+ def RecordManualIssueVerdicts(self, cnxn, issue_service, issues, user_id, |
+ is_spam): |
+ rows = [(user_id, issue.issue_id, is_spam, REASON_MANUAL, issue.project_id) |
+ for issue in issues] |
+ issue_ids = [issue.issue_id for issue in issues] |
+ |
+ # Overrule all previous verdicts. |
+ self.verdict_tbl.Update(cnxn, {'overruled': True}, [ |
+ ('issue_id IN (%s)' % sql.PlaceHolders(issue_ids), issue_ids) |
+ ], commit=False) |
+ |
+ self.verdict_tbl.InsertRows(cnxn, MANUALVERDICT_COLS, rows, ignore=True) |
+ |
+ for issue in issues: |
+ issue.is_spam = is_spam |
+ |
+ if is_spam: |
+ self.issue_actions.increment_by(len(issues), {'type': 'manual'}) |
+ else: |
+ issue_service.AllocateNewLocalIDs(cnxn, issues) |
+ |
+ # This will commit the transaction. |
+ issue_service.UpdateIssues(cnxn, issues, update_cols=['is_spam']) |
+ |
+ def RecordManualCommentVerdict(self, cnxn, issue_service, user_service, |
+ comment_id, sequence_num, user_id, is_spam): |
+ # TODO(seanmccullough): Bulk comment verdicts? There's no UI for that. |
+ self.verdict_tbl.InsertRow(cnxn, ignore=True, |
+ user_id=user_id, comment_id=comment_id, is_spam=is_spam, |
+ reason=REASON_MANUAL) |
+ comment = issue_service.GetComment(cnxn, comment_id) |
+ comment.is_spam = is_spam |
+ issue = issue_service.GetIssue(cnxn, comment.issue_id) |
+ issue_service.SoftDeleteComment(cnxn, comment.project_id, issue.local_id, |
+ sequence_num, user_id, user_service, |
+ is_spam, True, is_spam) |
+ if is_spam: |
+ self.comment_actions.increment({'type': 'manual'}) |
+ |
+ def RecordClassifierCommentVerdict(self, cnxn, comment, is_spam, confidence): |
+ self.verdict_tbl.InsertRow(cnxn, comment_id=comment.id, is_spam=is_spam, |
+ reason=REASON_CLASSIFIER, classifier_confidence=confidence, |
+ project_id=comment.project_id) |
+ if is_spam: |
+ self.comment_actions.increment({'type': 'classifier'}) |
+ |
+ def ClassifyIssue(self, issue, firstComment): |
+ """Classify an issue as either spam or ham. |
+ |
+ Args: |
+ issue: the Issue. |
+ firstComment: the first Comment on issue. |
+ |
+ Returns a JSON dict of classifier prediction results from |
+ the Cloud Prediction API. |
+ """ |
+ # Fail-safe: not spam. |
+ result = {'outputLabel': 'ham', |
+ 'outputMulti': [{'label':'ham', 'score': '1.0'}]} |
+ if not self.prediction_service: |
+ logging.error("prediction_service not initialized.") |
+ return result |
+ |
+ remaining_retries = 3 |
+ while remaining_retries > 0: |
+ try: |
+ result = self.prediction_service.trainedmodels().predict( |
+ project=settings.classifier_project_id, |
+ id=settings.classifier_model_id, |
+ body={'input': { |
+ 'csvInstance': [issue.summary, firstComment.content]}} |
+ ).execute() |
+ return result |
+ except Exception: |
+ remaining_retries = remaining_retries - 1 |
+ logging.error('Error calling prediction API: %s' % sys.exc_info()[0]) |
+ |
+ return result |
+ |
+ def ClassifyComment(self, comment_content): |
+ """Classify a comment as either spam or ham. |
+ |
+ Args: |
+ comment: the comment text. |
+ |
+ Returns a JSON dict of classifier prediction results from |
+ the Cloud Prediction API. |
+ """ |
+ # Fail-safe: not spam. |
+ result = {'outputLabel': 'ham', |
+ 'outputMulti': [{'label':'ham', 'score': '1.0'}]} |
+ if not self.prediction_service: |
+ logging.error("prediction_service not initialized.") |
+ return result |
+ |
+ remaining_retries = 3 |
+ while remaining_retries > 0: |
+ try: |
+ result = self.prediction_service.trainedmodels().predict( |
+ project=settings.classifier_project_id, |
+ id=settings.classifier_model_id, |
+ # We re-use the issue classifier here, with a blank |
+ # description and use the comment content as the body. |
+ body={'input': {'csvInstance': ['', comment_content]}} |
+ ).execute() |
+ return result |
+ except Exception: |
+ remaining_retries = remaining_retries - 1 |
+ logging.error('Error calling prediction API: %s' % sys.exc_info()[0]) |
+ |
+ return result |
+ |
+ def GetModerationQueue( |
+ self, cnxn, _issue_service, project_id, offset=0, limit=10): |
+ """Returns list of recent issues with spam verdicts, |
+ ranked in ascending order of confidence (so uncertain items are first). |
+ """ |
+ # TODO(seanmccullough): Optimize pagination. This query probably gets |
+ # slower as the number of SpamVerdicts grows, regardless of offset |
+ # and limit values used here. Using offset,limit in general may not |
+ # be the best way to do this. |
+ results = self.verdict_tbl.Select(cnxn, |
+ cols=['issue_id', 'is_spam', 'reason', 'classifier_confidence', |
+ 'created'], |
+ where=[ |
+ ('project_id = %s', [project_id]), |
+ ('classifier_confidence <= %s', |
+ [settings.classifier_moderation_thresh]), |
+ ('overruled = %s', [False]), |
+ ('issue_id IS NOT NULL', []), |
+ ], |
+ order_by=[ |
+ ('classifier_confidence ASC', []), |
+ ('created ASC', []), |
+ ], |
+ group_by=['issue_id'], |
+ offset=offset, |
+ limit=limit, |
+ ) |
+ |
+ ret = [] |
+ for row in results: |
+ ret.append(ModerationItem( |
+ issue_id=long(row[0]), |
+ is_spam=row[1] == 1, |
+ reason=row[2], |
+ classifier_confidence=row[3], |
+ verdict_time='%s' % row[4], |
+ )) |
+ |
+ count = self.verdict_tbl.SelectValue(cnxn, |
+ col='COUNT(*)', |
+ where=[ |
+ ('project_id = %s', [project_id]), |
+ ('classifier_confidence <= %s', |
+ [settings.classifier_moderation_thresh]), |
+ ('overruled = %s', [False]), |
+ ('issue_id IS NOT NULL', []), |
+ ]) |
+ |
+ return ret, count |
+ |
+ def GetTrainingIssues(self, cnxn, issue_service, since, offset=0, limit=100): |
+ """Returns list of recent issues with spam verdicts, |
+ ranked in ascending order of confidence (so uncertain items are first). |
+ """ |
+ |
+ # get all of the manual verdicts in the past day. |
+ results = self.verdict_tbl.Select(cnxn, |
+ cols=['issue_id'], |
+ where=[ |
+ ('overruled = %s', [False]), |
+ ('reason = %s', ['manual']), |
+ ('issue_id IS NOT NULL', []), |
+ ('created > %s', [since.isoformat()]), |
+ ], |
+ offset=offset, |
+ limit=limit, |
+ ) |
+ |
+ issue_ids = [long(row[0]) for row in results if row[0]] |
+ issues = issue_service.GetIssues(cnxn, issue_ids) |
+ comments = issue_service.GetCommentsForIssues(cnxn, issue_ids) |
+ first_comments = {} |
+ for issue in issues: |
+ first_comments[issue.issue_id] = (comments[issue.issue_id][0].content |
+ if issue.issue_id in comments else "[Empty]") |
+ |
+ count = self.verdict_tbl.SelectValue(cnxn, |
+ col='COUNT(*)', |
+ where=[ |
+ ('overruled = %s', [False]), |
+ ('reason = %s', ['manual']), |
+ ('issue_id IS NOT NULL', []), |
+ ('created > %s', [since.isoformat()]), |
+ ]) |
+ |
+ return issues, first_comments, count |
+ |
+class ModerationItem: |
+ def __init__(self, **kwargs): |
+ self.__dict__ = kwargs |