OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is govered by a BSD-style |
| 3 # license that can be found in the LICENSE file or at |
| 4 # https://developers.google.com/open-source/licenses/bsd |
| 5 |
| 6 """ Set of functions for detaling with spam reports. |
| 7 """ |
| 8 |
| 9 import collections |
| 10 import httplib2 |
| 11 import logging |
| 12 import settings |
| 13 import sys |
| 14 import settings |
| 15 |
| 16 from features import filterrules_helpers |
| 17 from framework import sql |
| 18 from infra_libs import ts_mon |
| 19 |
| 20 from apiclient.discovery import build |
| 21 from oauth2client.client import GoogleCredentials |
| 22 from apiclient.errors import Error as ApiClientError |
| 23 from oauth2client.client import Error as Oauth2ClientError |
| 24 |
| 25 SPAMREPORT_TABLE_NAME = 'SpamReport' |
| 26 SPAMVERDICT_TABLE_NAME = 'SpamVerdict' |
| 27 ISSUE_TABLE = 'Issue' |
| 28 |
| 29 REASON_MANUAL = 'manual' |
| 30 REASON_THRESHOLD = 'threshold' |
| 31 REASON_CLASSIFIER = 'classifier' |
| 32 |
| 33 SPAMREPORT_COLS = ['issue_id', 'reported_user_id', 'user_id'] |
| 34 MANUALVERDICT_COLS = ['user_id', 'issue_id', 'is_spam', 'reason', 'project_id'] |
| 35 THRESHVERDICT_COLS = ['issue_id', 'is_spam', 'reason', 'project_id'] |
| 36 |
| 37 |
| 38 class SpamService(object): |
| 39 """The persistence layer for spam reports.""" |
| 40 issue_actions = ts_mon.CounterMetric('monorail/spam_svc/issue') |
| 41 comment_actions = ts_mon.CounterMetric('monorail/spam_svc/comment') |
| 42 |
| 43 def __init__(self): |
| 44 self.report_tbl = sql.SQLTableManager(SPAMREPORT_TABLE_NAME) |
| 45 self.verdict_tbl = sql.SQLTableManager(SPAMVERDICT_TABLE_NAME) |
| 46 self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE) |
| 47 |
| 48 self.prediction_service = None |
| 49 try: |
| 50 credentials = GoogleCredentials.get_application_default() |
| 51 self.prediction_service = build('prediction', 'v1.6', |
| 52 http=httplib2.Http(), |
| 53 credentials=credentials) |
| 54 except (Oauth2ClientError, ApiClientError): |
| 55 logging.error("Error getting GoogleCredentials: %s" % sys.exc_info()[0]) |
| 56 |
| 57 def LookupFlaggers(self, cnxn, issue_id): |
| 58 """Returns users who've reported the issue or its comments as spam. |
| 59 |
| 60 Returns a tuple. First element is a list of users who flagged the issue; |
| 61 second element is a dictionary of comment id to a list of users who flagged |
| 62 that comment. |
| 63 """ |
| 64 rows = self.report_tbl.Select( |
| 65 cnxn, cols=['user_id', 'comment_id'], |
| 66 issue_id=issue_id) |
| 67 |
| 68 issue_reporters = [] |
| 69 comment_reporters = collections.defaultdict(list) |
| 70 for row in rows: |
| 71 if row[1]: |
| 72 comment_reporters[row[1]].append(row[0]) |
| 73 else: |
| 74 issue_reporters.append(row[0]) |
| 75 |
| 76 return issue_reporters, comment_reporters |
| 77 |
| 78 def LookUpFlagCounts(self, cnxn, issue_ids): |
| 79 """Returns a map of issue_id to flag counts""" |
| 80 rows = self.report_tbl.Select(cnxn, cols=['issue_id', 'COUNT(*)'], |
| 81 issue_id=issue_ids, group_by=['issue_id']) |
| 82 counts = {} |
| 83 for row in rows: |
| 84 counts[long(row[0])] = row[1] |
| 85 return counts |
| 86 |
| 87 def LookUpIssueVerdicts(self, cnxn, issue_ids): |
| 88 """Returns a map of issue_id to most recent spam verdicts""" |
| 89 rows = self.verdict_tbl.Select(cnxn, |
| 90 cols=['issue_id', 'reason', 'MAX(created)'], |
| 91 issue_id=issue_ids, group_by=['issue_id']) |
| 92 counts = {} |
| 93 for row in rows: |
| 94 counts[long(row[0])] = row[1] |
| 95 return counts |
| 96 |
| 97 def LookUpIssueVerdictHistory(self, cnxn, issue_ids): |
| 98 """Returns a map of issue_id to most recent spam verdicts""" |
| 99 rows = self.verdict_tbl.Select(cnxn, cols=[ |
| 100 'issue_id', 'reason', 'created', 'is_spam', 'classifier_confidence', |
| 101 'user_id', 'overruled'], |
| 102 issue_id=issue_ids, order_by=[('issue_id', []), ('created', [])]) |
| 103 |
| 104 # TODO: group by issue_id, make class instead of dict for verdict. |
| 105 verdicts = [] |
| 106 for row in rows: |
| 107 verdicts.append({ |
| 108 'issue_id': row[0], |
| 109 'reason': row[1], |
| 110 'created': row[2], |
| 111 'is_spam': row[3], |
| 112 'classifier_confidence': row[4], |
| 113 'user_id': row[5], |
| 114 'overruled': row[6], |
| 115 }) |
| 116 |
| 117 return verdicts |
| 118 |
| 119 def FlagIssues(self, cnxn, issue_service, issues, reporting_user_id, |
| 120 flagged_spam): |
| 121 """Creates or deletes a spam report on an issue.""" |
| 122 verdict_updates = [] |
| 123 if flagged_spam: |
| 124 rows = [(issue.issue_id, issue.reporter_id, reporting_user_id) |
| 125 for issue in issues] |
| 126 self.report_tbl.InsertRows(cnxn, SPAMREPORT_COLS, rows, ignore=True) |
| 127 else: |
| 128 issue_ids = [issue.issue_id for issue in issues] |
| 129 self.report_tbl.Delete( |
| 130 cnxn, issue_id=issue_ids, user_id=reporting_user_id, |
| 131 comment_id=None) |
| 132 |
| 133 project_id = issues[0].project_id |
| 134 |
| 135 # Now record new verdicts and update issue.is_spam, if they've changed. |
| 136 ids = [issue.issue_id for issue in issues] |
| 137 counts = self.LookUpFlagCounts(cnxn, ids) |
| 138 previous_verdicts = self.LookUpIssueVerdicts(cnxn, ids) |
| 139 |
| 140 for issue_id in counts: |
| 141 # If the flag counts changed enough to toggle the is_spam bit, need to |
| 142 # record a new verdict and update the Issue. |
| 143 if ((flagged_spam and counts[issue_id] >= settings.spam_flag_thresh or |
| 144 not flagged_spam and counts[issue_id] < settings.spam_flag_thresh) and |
| 145 (previous_verdicts[issue_id] != REASON_MANUAL if issue_id in |
| 146 previous_verdicts else True)): |
| 147 verdict_updates.append(issue_id) |
| 148 |
| 149 if len(verdict_updates) == 0: |
| 150 return |
| 151 |
| 152 # Some of the issues may have exceed the flag threshold, so issue verdicts |
| 153 # and mark as spam in those cases. |
| 154 rows = [(issue_id, flagged_spam, REASON_THRESHOLD, project_id) |
| 155 for issue_id in verdict_updates] |
| 156 self.verdict_tbl.InsertRows(cnxn, THRESHVERDICT_COLS, rows, ignore=True) |
| 157 update_issues = [] |
| 158 for issue in issues: |
| 159 if issue.issue_id in verdict_updates: |
| 160 issue.is_spam = flagged_spam |
| 161 update_issues.append(issue) |
| 162 |
| 163 if flagged_spam: |
| 164 self.issue_actions.increment_by(len(update_issues), {'type': 'flag'}) |
| 165 |
| 166 issue_service.UpdateIssues(cnxn, update_issues, update_cols=['is_spam']) |
| 167 |
| 168 def FlagComment(self, cnxn, issue_id, comment_id, reported_user_id, |
| 169 reporting_user_id, flagged_spam): |
| 170 """Creates or deletes a spam report on a comment.""" |
| 171 # TODO(seanmccullough): Bulk comment flagging? There's no UI for that. |
| 172 if flagged_spam: |
| 173 self.report_tbl.InsertRow( |
| 174 cnxn, ignore=True, issue_id=issue_id, |
| 175 comment_id=comment_id, reported_user_id=reported_user_id, |
| 176 user_id=reporting_user_id) |
| 177 self.comment_actions.increment({'type': 'flag'}) |
| 178 else: |
| 179 self.report_tbl.Delete( |
| 180 cnxn, issue_id=issue_id, comment_id=comment_id, |
| 181 user_id=reporting_user_id) |
| 182 |
| 183 def RecordClassifierIssueVerdict(self, cnxn, issue, is_spam, confidence): |
| 184 self.verdict_tbl.InsertRow(cnxn, issue_id=issue.issue_id, is_spam=is_spam, |
| 185 reason=REASON_CLASSIFIER, classifier_confidence=confidence) |
| 186 if is_spam: |
| 187 self.issue_actions.increment({'type': 'classifier'}) |
| 188 # This is called at issue creation time, so there's nothing else to do here. |
| 189 |
| 190 def RecordManualIssueVerdicts(self, cnxn, issue_service, issues, user_id, |
| 191 is_spam): |
| 192 rows = [(user_id, issue.issue_id, is_spam, REASON_MANUAL, issue.project_id) |
| 193 for issue in issues] |
| 194 issue_ids = [issue.issue_id for issue in issues] |
| 195 |
| 196 # Overrule all previous verdicts. |
| 197 self.verdict_tbl.Update(cnxn, {'overruled': True}, [ |
| 198 ('issue_id IN (%s)' % sql.PlaceHolders(issue_ids), issue_ids) |
| 199 ], commit=False) |
| 200 |
| 201 self.verdict_tbl.InsertRows(cnxn, MANUALVERDICT_COLS, rows, ignore=True) |
| 202 |
| 203 for issue in issues: |
| 204 issue.is_spam = is_spam |
| 205 |
| 206 if is_spam: |
| 207 self.issue_actions.increment_by(len(issues), {'type': 'manual'}) |
| 208 else: |
| 209 issue_service.AllocateNewLocalIDs(cnxn, issues) |
| 210 |
| 211 # This will commit the transaction. |
| 212 issue_service.UpdateIssues(cnxn, issues, update_cols=['is_spam']) |
| 213 |
| 214 def RecordManualCommentVerdict(self, cnxn, issue_service, user_service, |
| 215 comment_id, sequence_num, user_id, is_spam): |
| 216 # TODO(seanmccullough): Bulk comment verdicts? There's no UI for that. |
| 217 self.verdict_tbl.InsertRow(cnxn, ignore=True, |
| 218 user_id=user_id, comment_id=comment_id, is_spam=is_spam, |
| 219 reason=REASON_MANUAL) |
| 220 comment = issue_service.GetComment(cnxn, comment_id) |
| 221 comment.is_spam = is_spam |
| 222 issue = issue_service.GetIssue(cnxn, comment.issue_id) |
| 223 issue_service.SoftDeleteComment(cnxn, comment.project_id, issue.local_id, |
| 224 sequence_num, user_id, user_service, |
| 225 is_spam, True, is_spam) |
| 226 if is_spam: |
| 227 self.comment_actions.increment({'type': 'manual'}) |
| 228 |
| 229 def RecordClassifierCommentVerdict(self, cnxn, comment, is_spam, confidence): |
| 230 self.verdict_tbl.InsertRow(cnxn, comment_id=comment.id, is_spam=is_spam, |
| 231 reason=REASON_CLASSIFIER, classifier_confidence=confidence, |
| 232 project_id=comment.project_id) |
| 233 if is_spam: |
| 234 self.comment_actions.increment({'type': 'classifier'}) |
| 235 |
| 236 def ClassifyIssue(self, issue, firstComment): |
| 237 """Classify an issue as either spam or ham. |
| 238 |
| 239 Args: |
| 240 issue: the Issue. |
| 241 firstComment: the first Comment on issue. |
| 242 |
| 243 Returns a JSON dict of classifier prediction results from |
| 244 the Cloud Prediction API. |
| 245 """ |
| 246 # Fail-safe: not spam. |
| 247 result = {'outputLabel': 'ham', |
| 248 'outputMulti': [{'label':'ham', 'score': '1.0'}]} |
| 249 if not self.prediction_service: |
| 250 logging.error("prediction_service not initialized.") |
| 251 return result |
| 252 |
| 253 remaining_retries = 3 |
| 254 while remaining_retries > 0: |
| 255 try: |
| 256 result = self.prediction_service.trainedmodels().predict( |
| 257 project=settings.classifier_project_id, |
| 258 id=settings.classifier_model_id, |
| 259 body={'input': { |
| 260 'csvInstance': [issue.summary, firstComment.content]}} |
| 261 ).execute() |
| 262 return result |
| 263 except Exception: |
| 264 remaining_retries = remaining_retries - 1 |
| 265 logging.error('Error calling prediction API: %s' % sys.exc_info()[0]) |
| 266 |
| 267 return result |
| 268 |
| 269 def ClassifyComment(self, comment_content): |
| 270 """Classify a comment as either spam or ham. |
| 271 |
| 272 Args: |
| 273 comment: the comment text. |
| 274 |
| 275 Returns a JSON dict of classifier prediction results from |
| 276 the Cloud Prediction API. |
| 277 """ |
| 278 # Fail-safe: not spam. |
| 279 result = {'outputLabel': 'ham', |
| 280 'outputMulti': [{'label':'ham', 'score': '1.0'}]} |
| 281 if not self.prediction_service: |
| 282 logging.error("prediction_service not initialized.") |
| 283 return result |
| 284 |
| 285 remaining_retries = 3 |
| 286 while remaining_retries > 0: |
| 287 try: |
| 288 result = self.prediction_service.trainedmodels().predict( |
| 289 project=settings.classifier_project_id, |
| 290 id=settings.classifier_model_id, |
| 291 # We re-use the issue classifier here, with a blank |
| 292 # description and use the comment content as the body. |
| 293 body={'input': {'csvInstance': ['', comment_content]}} |
| 294 ).execute() |
| 295 return result |
| 296 except Exception: |
| 297 remaining_retries = remaining_retries - 1 |
| 298 logging.error('Error calling prediction API: %s' % sys.exc_info()[0]) |
| 299 |
| 300 return result |
| 301 |
| 302 def GetModerationQueue( |
| 303 self, cnxn, _issue_service, project_id, offset=0, limit=10): |
| 304 """Returns list of recent issues with spam verdicts, |
| 305 ranked in ascending order of confidence (so uncertain items are first). |
| 306 """ |
| 307 # TODO(seanmccullough): Optimize pagination. This query probably gets |
| 308 # slower as the number of SpamVerdicts grows, regardless of offset |
| 309 # and limit values used here. Using offset,limit in general may not |
| 310 # be the best way to do this. |
| 311 results = self.verdict_tbl.Select(cnxn, |
| 312 cols=['issue_id', 'is_spam', 'reason', 'classifier_confidence', |
| 313 'created'], |
| 314 where=[ |
| 315 ('project_id = %s', [project_id]), |
| 316 ('classifier_confidence <= %s', |
| 317 [settings.classifier_moderation_thresh]), |
| 318 ('overruled = %s', [False]), |
| 319 ('issue_id IS NOT NULL', []), |
| 320 ], |
| 321 order_by=[ |
| 322 ('classifier_confidence ASC', []), |
| 323 ('created ASC', []), |
| 324 ], |
| 325 group_by=['issue_id'], |
| 326 offset=offset, |
| 327 limit=limit, |
| 328 ) |
| 329 |
| 330 ret = [] |
| 331 for row in results: |
| 332 ret.append(ModerationItem( |
| 333 issue_id=long(row[0]), |
| 334 is_spam=row[1] == 1, |
| 335 reason=row[2], |
| 336 classifier_confidence=row[3], |
| 337 verdict_time='%s' % row[4], |
| 338 )) |
| 339 |
| 340 count = self.verdict_tbl.SelectValue(cnxn, |
| 341 col='COUNT(*)', |
| 342 where=[ |
| 343 ('project_id = %s', [project_id]), |
| 344 ('classifier_confidence <= %s', |
| 345 [settings.classifier_moderation_thresh]), |
| 346 ('overruled = %s', [False]), |
| 347 ('issue_id IS NOT NULL', []), |
| 348 ]) |
| 349 |
| 350 return ret, count |
| 351 |
| 352 def GetTrainingIssues(self, cnxn, issue_service, since, offset=0, limit=100): |
| 353 """Returns list of recent issues with spam verdicts, |
| 354 ranked in ascending order of confidence (so uncertain items are first). |
| 355 """ |
| 356 |
| 357 # get all of the manual verdicts in the past day. |
| 358 results = self.verdict_tbl.Select(cnxn, |
| 359 cols=['issue_id'], |
| 360 where=[ |
| 361 ('overruled = %s', [False]), |
| 362 ('reason = %s', ['manual']), |
| 363 ('issue_id IS NOT NULL', []), |
| 364 ('created > %s', [since.isoformat()]), |
| 365 ], |
| 366 offset=offset, |
| 367 limit=limit, |
| 368 ) |
| 369 |
| 370 issue_ids = [long(row[0]) for row in results if row[0]] |
| 371 issues = issue_service.GetIssues(cnxn, issue_ids) |
| 372 comments = issue_service.GetCommentsForIssues(cnxn, issue_ids) |
| 373 first_comments = {} |
| 374 for issue in issues: |
| 375 first_comments[issue.issue_id] = (comments[issue.issue_id][0].content |
| 376 if issue.issue_id in comments else "[Empty]") |
| 377 |
| 378 count = self.verdict_tbl.SelectValue(cnxn, |
| 379 col='COUNT(*)', |
| 380 where=[ |
| 381 ('overruled = %s', [False]), |
| 382 ('reason = %s', ['manual']), |
| 383 ('issue_id IS NOT NULL', []), |
| 384 ('created > %s', [since.isoformat()]), |
| 385 ]) |
| 386 |
| 387 return issues, first_comments, count |
| 388 |
| 389 class ModerationItem: |
| 390 def __init__(self, **kwargs): |
| 391 self.__dict__ = kwargs |
OLD | NEW |